[Pulsar] Delayed message 原理
在使用 Pulsar 的过程中,我们可能希望 message 不会立即被分发到 consumer 中,而是等待一段时间后再进行分发,这时可以使用 Pulsar 提供的 Delayed message 的功能。本文主要介绍 Delayed message 的实现原理。
延时消息的添加
Pulsar 主要是通过 DelayedDeliveryTracker 实现 Delayed message。当 Producer 发送携带有 DeliverAt 信息的 message 时,Broker 会把这条消息加入到 DelayedDeliveryTracker 中,通过这个 Tracker 完成等待和消息分发的操作。
DelayedDeliveryTracker 主要是通过时间轮的方式来实现消息的延时发送,这样能够更好地处理大量消息延迟的情况。当有 Delayed message 来到时,则先检查时间是否已到,如果已经到达可以分发的时间则立刻分发出去,避免一次不必要的延时。
接着对需要延时的消息,将其加入到 priorityQueue 中,这个队列后续将用于延时。关于添加消息的主要代码如下:
Timer 与 priorityQueue
priorityQueue 存储了各个延时消息的位置,位置包括 ledgerId 和 entryId,其底层的实现是一个三元组优先队列,三元指的是预计的分发时间 timestamp、ledgerId 和 entryId。每次往 priorityQueue 中添加延时消息,会将 timestamp 最小的消息排到第一位,这样方便在后续的 updateTimer 中对第一个消息进行延时。
在添加延时消息后,会调用一次 updateTimer(),每次 updateTimer 则会重新计时,计时会首先使用 priorityQueue 的第一个消息的 timestamp,设置 Timer 延时到该 timestamp,以触发第一个消息的分发。
当在第一个消息延时的过程中,添加了其他消息,并且其 timestamp 比正在延时的消息的 timestamp(在代码中则用 currentTimeoutTarget 体现)还小,那么它会后来居上被排到 priorityQueue 的第一位,然后在调用 updateTimer 的时候重新对新来的第一位进行延时;如果新添加的消息的 timestamp 比正在延时的消息的 timestamp 大,则不会影响 priorityQueue 的第一位,也不会重新设置 timer。
主要的延时代码如下:
延时消息的分发
当延时消息到达时间后,则会调用 dispatcher.readMoreEntries:
dispatcher 会重新从 DelayedDeliveryTracker 获取目前 priorityQueue 前面的消息,并将到达时间的消息分发给 consumer,最后重新 updateTimer 来对剩余的消息进行计时。其主要代码如下:
本文介绍了 Delayed message 的实现原理,需要注意的是 Delayed message 只能用于 Shared 类型的订阅中,这些延时消息的位置信息都会被持久化,所以 Broker 宕机重启后还会对已存在的 delayed message 进行延时计时。
版权声明: 本文为 InfoQ 作者【Zike Yang】的原创文章。
原文链接:【http://xie.infoq.cn/article/4b5c4707194180b1f51955602】。文章转载请联系作者。
评论