[Pulsar] 订阅跳过一定数量消息的原理
在使用 Pulsar 的过程中,我们有时候在消费消息的时候,希望当前的订阅能够跳过一定数量的消息再继续进行消费。但是 Pulsar 并不像 Kafka,有单调递增的 Index 可以实现这样的功能,在 Pulsar 中,消息是用四元组 MessageId 来标记位置的,本文将介绍 Pulsar 中订阅跳过一定数量消息的原理。
MessageId 的结构
MessageId 内部结构是一个四元组,其中包括 ledgerId、entryId、batchId 和 partitionedIndex。而在 Broker 和 Bookie 中,主要使用 ledgerId 和 entryId 来定位消息所在 entry 的位置,对于 batchMessage,具体的消息在 entry 中的位置则由 client 计算而成,本文不对此进行讨论。
一个 Topic 由多个 ledger 组成,每个 ledger 有全局唯一的 ledgerId,而每个 ledger 由多个 entry 组成,在一个 ledger 内部的 entry 的 entryId 是单调递增的,而 ledger 的 ledgerId 在一个 topic 中并不一定是单调递增的(ledgerId 是全局进行分配的)。
所以在 Broker 看来,message 的位置主要由 ledgerId 和 entryId 共同决定,这对我们实现各类 messageID 操作非常重要。
获取 N 个位置后的 messageId
首先,我们需要获取当前订阅所指向的消息的后第 N 个消息的 messageId。
这里使用了 ManagedLedger.getPositionAfterN 方法来实现。其核心代码如下:
entriesToSkip 就是这里的 N。首先计算出当前 ledger 的 entry 个数 totalEntriesInCurrentLedger 以及未读取的 entry 个数(当前订阅所指的 entry 到当前 ledger 最后一个 entry 的数量)unreadEntriesInCurrentLedger,用于后面的计算。
通过判断 unreadEntriesInCurrentLedger 和 entriesToSkip 的大小可以判断出目标的 entry 是否在当前的 ledger 中,如果是则可以直接返回 entryId,从而拿到了目标的 messageId。不是的话就需要切换到下一个 ledger 中,再重复以上的步骤进行查找。
通过这样的操作后,我们就可以拿到第 N 个后的 messageId 了。
最后的操作就是非常简单了,直接通过 ManagedCurosr 的 asyncSkipEntries 方法,就可以将当前的订阅指向到目标的 messageId 了,整个流程即完成。
版权声明: 本文为 InfoQ 作者【Zike Yang】的原创文章。
原文链接:【http://xie.infoq.cn/article/ae4c561d92133e63eb96ec414】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论