写点什么

[Pulsar] Delayed message 原理

作者:Zike Yang
  • 2021 年 11 月 18 日
  • 本文字数:1776 字

    阅读完需:约 6 分钟

在使用 Pulsar 的过程中,我们可能希望 message 不会立即被分发到 consumer 中,而是等待一段时间后再进行分发,这时可以使用 Pulsar 提供的 Delayed message 的功能。本文主要介绍 Delayed message 的实现原理。


延时消息的添加

Pulsar 主要是通过 DelayedDeliveryTracker 实现 Delayed message。当 Producer 发送携带有 DeliverAt 信息的 message 时,Broker 会把这条消息加入到 DelayedDeliveryTracker 中,通过这个 Tracker 完成等待和消息分发的操作。

DelayedDeliveryTracker 主要是通过时间轮的方式来实现消息的延时发送,这样能够更好地处理大量消息延迟的情况。当有 Delayed message 来到时,则先检查时间是否已到,如果已经到达可以分发的时间则立刻分发出去,避免一次不必要的延时。

接着对需要延时的消息,将其加入到 priorityQueue 中,这个队列后续将用于延时。关于添加消息的主要代码如下:

public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {    long now = clock.millis();    if (deliveryAt < (now + tickTimeMillis)) {        // 将消息立刻分发出去        return false;    }
priorityQueue.add(deliveryAt, ledgerId, entryId); updateTimer(); return true;}
复制代码

Timer 与 priorityQueue

priorityQueue 存储了各个延时消息的位置,位置包括 ledgerId 和 entryId,其底层的实现是一个三元组优先队列,三元指的是预计的分发时间 timestamp、ledgerId 和 entryId。每次往 priorityQueue 中添加延时消息,会将 timestamp 最小的消息排到第一位,这样方便在后续的 updateTimer 中对第一个消息进行延时。

在添加延时消息后,会调用一次 updateTimer(),每次 updateTimer 则会重新计时,计时会首先使用 priorityQueue 的第一个消息的 timestamp,设置 Timer 延时到该 timestamp,以触发第一个消息的分发。

当在第一个消息延时的过程中,添加了其他消息,并且其 timestamp 比正在延时的消息的 timestamp(在代码中则用 currentTimeoutTarget 体现)还小,那么它会后来居上被排到 priorityQueue 的第一位,然后在调用 updateTimer 的时候重新对新来的第一位进行延时;如果新添加的消息的 timestamp 比正在延时的消息的 timestamp 大,则不会影响 priorityQueue 的第一位,也不会重新设置 timer。

主要的延时代码如下:


private void updateTimer() {    long timestamp = priorityQueue.peekN1();    if (timestamp == currentTimeoutTarget) {        // 不需要重新设置timer        return;    }
if (timeout != null) { timeout.cancel(); }
long delayMillis = timestamp - clock.millis(); currentTimeoutTarget = timestamp; timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS);}
复制代码

延时消息的分发

当延时消息到达时间后,则会调用 dispatcher.readMoreEntries:


public void run(Timeout timeout) throws Exception {    if (timeout.isCancelled()) {        return;    }
synchronized (dispatcher) { currentTimeoutTarget = -1; timeout = null; dispatcher.readMoreEntries(); }}
复制代码

dispatcher 会重新从 DelayedDeliveryTracker 获取目前 priorityQueue 前面的消息,并将到达时间的消息分发给 consumer,最后重新 updateTimer 来对剩余的消息进行计时。其主要代码如下:

public Set<PositionImpl> getScheduledMessages(int maxMessages) {    int n = maxMessages;    Set<PositionImpl> positions = new TreeSet<>();    long now = clock.millis();    long cutoffTime = now + tickTimeMillis;
while (n > 0 && !priorityQueue.isEmpty()) { long timestamp = priorityQueue.peekN1(); if (timestamp > cutoffTime) { break; }
long ledgerId = priorityQueue.peekN2(); long entryId = priorityQueue.peekN3(); positions.add(new PositionImpl(ledgerId, entryId));
priorityQueue.pop(); --n; }
updateTimer(); return positions;}
复制代码


本文介绍了 Delayed message 的实现原理,需要注意的是 Delayed message 只能用于 Shared 类型的订阅中,这些延时消息的位置信息都会被持久化,所以 Broker 宕机重启后还会对已存在的 delayed message 进行延时计时。

发布于: 刚刚阅读数: 2
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Delayed message原理