写点什么

[Pulsar] 订阅跳过一定数量消息的原理

作者:Zike Yang
  • 2021 年 12 月 07 日
  • 本文字数:1451 字

    阅读完需:约 5 分钟

在使用 Pulsar 的过程中,我们有时候在消费消息的时候,希望当前的订阅能够跳过一定数量的消息再继续进行消费。但是 Pulsar 并不像 Kafka,有单调递增的 Index 可以实现这样的功能,在 Pulsar 中,消息是用四元组 MessageId 来标记位置的,本文将介绍 Pulsar 中订阅跳过一定数量消息的原理。


MessageId 的结构

MessageId 内部结构是一个四元组,其中包括 ledgerId、entryId、batchId 和 partitionedIndex。而在 Broker 和 Bookie 中,主要使用 ledgerId 和 entryId 来定位消息所在 entry 的位置,对于 batchMessage,具体的消息在 entry 中的位置则由 client 计算而成,本文不对此进行讨论。

一个 Topic 由多个 ledger 组成,每个 ledger 有全局唯一的 ledgerId,而每个 ledger 由多个 entry 组成,在一个 ledger 内部的 entry 的 entryId 是单调递增的,而 ledger 的 ledgerId 在一个 topic 中并不一定是单调递增的(ledgerId 是全局进行分配的)。

所以在 Broker 看来,message 的位置主要由 ledgerId 和 entryId 共同决定,这对我们实现各类 messageID 操作非常重要。


获取 N 个位置后的 messageId

首先,我们需要获取当前订阅所指向的消息的后第 N 个消息的 messageId。

这里使用了 ManagedLedger.getPositionAfterN 方法来实现。其核心代码如下:

while (entriesToSkip >= 0) {  // for the current ledger, the number of entries written is deduced from the lastConfirmedEntry  // for previous ledgers, LedgerInfo in ZK has the number of entries  if (currentLedger != null && currentLedgerId == currentLedger.getId()) {    lastLedger = true;    if (currentLedgerEntries > 0) {      totalEntriesInCurrentLedger = lastConfirmedEntry.getEntryId() + 1;    } else {      totalEntriesInCurrentLedger = 0;    }  } else {    totalEntriesInCurrentLedger = ledgers.get(currentLedgerId).getEntries();  }
long unreadEntriesInCurrentLedger = totalEntriesInCurrentLedger - currentEntryId;
if (unreadEntriesInCurrentLedger >= entriesToSkip) { // if the current ledger has more entries than what we need to skip // then the return position is in the same ledger currentEntryId += entriesToSkip; break; } else { // skip remaining entry from the next ledger entriesToSkip -= unreadEntriesInCurrentLedger; if (lastLedger) { // there are no more ledgers, return the last position currentEntryId = totalEntriesInCurrentLedger; break; } else { Long lid = ledgers.ceilingKey(currentLedgerId + 1); currentLedgerId = lid != null ? lid : (ledgers.lastKey() + 1); currentEntryId = 0; } }}
复制代码

entriesToSkip 就是这里的 N。首先计算出当前 ledger 的 entry 个数 totalEntriesInCurrentLedger 以及未读取的 entry 个数(当前订阅所指的 entry 到当前 ledger 最后一个 entry 的数量)unreadEntriesInCurrentLedger,用于后面的计算。

通过判断 unreadEntriesInCurrentLedger 和 entriesToSkip 的大小可以判断出目标的 entry 是否在当前的 ledger 中,如果是则可以直接返回 entryId,从而拿到了目标的 messageId。不是的话就需要切换到下一个 ledger 中,再重复以上的步骤进行查找。

通过这样的操作后,我们就可以拿到第 N 个后的 messageId 了。


最后的操作就是非常简单了,直接通过 ManagedCurosr 的 asyncSkipEntries 方法,就可以将当前的订阅指向到目标的 messageId 了,整个流程即完成。

发布于: 4 小时前阅读数: 7
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 订阅跳过一定数量消息的原理