深入解析 Apache Pulsar 系列: Broker 消息确认的管理
导语
我们在之前的《深入解析 Apache Pulsar 系列:客户端消息确认》中介绍过 Apache Pulsar 客户端的多种消息确认模式。在这篇文章中,我们将介绍 Broker 侧对于消息确认的管理。
作者简介
林琳
腾讯云中间件专家工程师
Apache Pulsar PMC,《深入解析 Apache Pulsar》作者。目前专注于中间件领域,在消息队列和微服务方向具有丰富的经验。负责 TDMQ 的设计与开发工作,目前致力于打造稳定、高效和可扩展的基础组件与服务。
客户端通过消息确认机制通知 Broker 某些消息已经被消费,后续不要再重复推送。Broker 侧则使用游标来存储当前订阅的消费位置信息,包含了消费位置中的所有元数据,避免 Broker 重启后,消费者要从头消费的问题。Pulsar 中的订阅分为持久订阅和非持久订阅,他们之间的区别是:持久订阅的游标(Cursor)是持久化的,元数据会保存在 ZooKeeper,而非持久化游标只保存在 Broker 的内存中。
游标的简介
Pulsar 中每个订阅都会包含一个游标,如果多个消费者拥有相同的订阅名(消费组),那这些消费者们会共享一个游标。游标的共享又和消费者的消费模式有关,如果是 Exclusive 或者 FailOver 模式的订阅,那同一时间只有一个消费者使用这个游标。如果是 Shared 或者 Key_Shared 模式的订阅,那多个消费者会同时使用这个游标。
每当消费者 Ack 一条消息,游标中指针的位置都有可能会变化,为什么说是有可能呢?这涉及到我们在客户端章节介绍的 Acknowledge 的方式:单条消息确认(Acknowledge)、批消息中的单个消息确认(Acknowledge)、累积消息确认(AcknowledgeCumulative)。否定应答(NegativeAcknowledge)不涉及游标的变化,因此不在讨论范围之内。
我们先看单条消息的确认,如果是独占式的消费,每确认一条消息,游标位置都会往后移动一个 Entry,如下图所示:
累积消息确认,只需要确认一条消息,游标可以往后移动多个 Entry,如:Consumer-1 累积确认了 Entry-4,则从 0 开始的 Entry 都会被确认,如下图所示:
对于共享式的消费,因为有多个消费者同时消费消息,因此消息的确认可能会出现空洞,空洞如下图所示:
这里也解释了为什么 MarkeDeletePosition 指针的位置可能发生变化,我们可以从共享式的消费中看到,消息确认是可能出现空洞的,只有当前面所有的 Entry 都被消费并确认,MarkeDeletePosition 指针才会移动。如果存在空洞,MarkeDeletePosition 指针是不会往后移动的。那这个 MarkeDeletePosition 指针和游标是什么关系呢?游标是一个对象,里面包含了多个属性,MarkeDeletePosition 指针只是游标的其中一个属性。正如上面所说的 Ack 空洞,在游标中有另外专门的方式进行存储。如果我们不单独存储空洞,那 Broker 重启后,消费者只能从 MarkDeletePosition 开始消费,会存在重复消费的问题。如果以上图为例,Broker 重启后 Entry-4 就会被重复消费。当然,Pulsar 中对空洞信息是有单独存储的。
然后,我们看看游标里到底记录了什么元数据,此处只列出一些关键的属性:
看到 CursorLedger,说明数据被保存到了 Bookkeeper 中。有的读者可能会有疑问,既然数据都保存到 Bookkeeper 中了,那 ZooKeeper 中保存的 Cursor 信息有什么用呢?我们可以认为 ZooKeeper 中保存的游标信息只是索引,包含了以下几个属性:
当前的 cursorLedger 名以及 ID,用于打开 Bookkeeper 中的 Ledger;
LastMarkDeleteEntry,最后被标记为删除的 Entry 信息,里面包含了 LedgerId 和 EntryId;
游标最后的活动时间戳。
游标保存到 ZooKeeper 的时机有几个:
当 cursor 被关闭时;
当发生 Ledger 切换导致 cursorLedger 变化时;
当持久化空洞数据到 Bookkeeper 失败并尝试持久化空洞数据到 ZooKeeper 时。
我们可以把 ZooKeeper 中的游标信息看作 Check Point,当恢复数据时,会先从 ZooKeeper 中恢复元数据,获取到 Bookkeeper Ledger 信息,然后再通过 Ledger 恢复最新的 LastMarkDeleteEntry 位置和空洞信息。
既然游标不会实时往 ZooKeeper 中写入数据,那是如何保证消费位置不丢失的呢?
Bookkeeper 中的一个 Ledger 能写很多的 Entry,因此高频的保存操作都由 Bookkeeper 来承担了,ZooKeeper 只负责存储低频的索引更新。
消息空洞的管理
在游标对象中,使用了一个 IndividualDeletedMessages 容器来存储所有的空洞信息。得益于 Java 中丰富的轮子生态,Broker 中直接使用了 Guava Range 这个库来实现空洞的存储。举个例子,假设在 Ledger-1 中我们的空洞如下:
则我们存储的空洞信息如下,即会用区间来表示已经连续 Ack 的范围:
[ (1:-1, 1:2] , (1:3, 1:6] ]
使用区间的好处是,可以用很少的区间数来表示整个 Ledger 的空洞情况,而不需要每个 Entry 都记录。当某个范围都已经被消费且确认了,会出现两个区间合并为一个区间,这都是 Guava Range 自动支持的能力。如果从当前 MarkDeletePosition 指针的位置到后面某个 Entry 为止,都连成了一个区间,则 MarkDeletePosition 指针就可以往后移动了。
记录了这些消息空洞之后,是如何用来避免消息重复消费的呢?
当 Broker 从 Ledger 中读取到消息后,会进入一个清洗阶段,如:过滤掉延迟消息等等。在这个阶段,Broker 会遍历所有消息,看消息是否存在于 Range 里,如果存在,则说明已经被确认过了,这条消息会被过滤掉,不再推送给客户端。Guava Range 提供了 Contains 接口,可以快速查看某个位置是否落在区间里。这种 Entry 需要被过滤的场景,基本上只会出现在 Broker 重启后,此时游标信息刚恢复。当 ReadPosition 超过了这段空洞的位置时,就不会出现读到重复消息要被过滤的情况了。
然后,我们来看看 IndividualDeletedMessages 这个容器的实现。
IndividualDeletedMessages 的类型是 LongPairRangeSet,默认实现是 DefaultRangeSet,是一个基于 Google Guava Range 包装的实现类。另外一个 Pulsar 自己实现的优化版:ConcurrentOpenLongPairRangeSet。优化版的 RangeSet 和 Guava Range 的存储方式有些不一样,Guava Range 使用区间来记录数据,优化版 RangeSet 对外提供的接口也是 Range,但是内部使用了 BitSet 来记录每个 Entry 是否被确认。
优化版 RangeSet 在空洞较多的情况下对内存更加友好。我们可以假设一个场景,有 100W 的消息被拉取,但是只有 50W 的消息已经被 Ack,并且每隔一条消息 Ack 一条,这样就会出现 50W 个空洞。此时的 Range 就无法发挥区间的优势了,会出现 50W 个 Range 对象,如下图所示。而优化版的 RangeSet 使用了 BitSet,每个 ack 只占一位。
我们可以在 broker.conf 中,通过配置项 managedLedgerUnackedRangesOpenCacheSetEnabled=true 来开启使用优化版的 RangeSet。
也正因如此,如果整个集群的订阅数比较多,游标对象的数据量其实并不小。所以在 Pulsar 中,MetaDataStore 中只保存了游标的索引信息,即保存了游标存储在哪个 Ledger 中。真正的游标数据会通过上面介绍的 cursorLedger 写入到 Bookkeeper 中持久化。message PositionInfo {required int64 ledgerId = 1;required int64 entryId = 2;repeated MessageRange individualDeletedMessages = 3;repeated LongProperty properties = 4;repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;}
看到这里,其实 Batch 消息中单条消息确认的实现也清晰了,BatchDeletedIndexes 是一个 ConcurrentSkipListMap,Key 为一个 Position 对象,对象里面包含了 LedgerId 和 EntryId。Value 是一个 BitSet,记录了这个 Batch 里面哪些消息已经被确认。BatchDeletedIndexes 会和单条消息的空洞一起放在同一个对象(PositionInfo)中,最后持久化到 Bookkeeper。
空洞数据如果写入 Bookkeeper 失败了,现在 Pulsar 还会尝试往 ZooKeeper 中保存,和索引信息一起保存。但是 ZooKeeper 不会保存所有的数据,只会保存一小部分,尽可能的让客户端不出现重复消费。我们可以通过 broker.conf 中的配置项来决定最多持久化多少数据到 ZooKeeper,配置项名为:managedLedgerMaxUnackedRangesToPersistInZooKeeper,默认值是 1000。
消息空洞管理的优化
空洞存储的方案看起来已经很完美,但是在海量未确认消息的场景下还是会出现一些问题。首先是大量的订阅会让游标数量暴增,导致 Broker 内存的占用过大。其次,有很多空洞其实是根本没有变化的,现在每次都要保存全量的空洞数据。最后,虽然优化版 RangeSet 在内存中使用了 BitSet 来存储,但是实际存储在 Bookkeeper 中的数据 MessageRange,还是一个个由 LedgerId 和 EntryId 组成的对象,每个 MessageRange 占用 16 字节。当空洞数量比较多时,总体体积会超过 5MB,而现在 Bookkeeper 能写入的单个 Entry 大小上限是 5MB,如果超过这个阈值就会出现空洞信息持久化失败的情况。
对于这种情况,已经有专门的 PIP 在解决这个问题,笔者在写这篇文章的时候,这个 PIP 代码已经提交,正在 Review 阶段,因此下面的内容可能会和最终代码有一定差距。
新的方案中主要使用 LRU+分段存储的方式来解决上述问题。由于游标中空洞信息数据量可能会很大,因此内存中只保存少量热点区间,通过 LRU 算法来切换冷热数据,从而进一步压缩内存的使用率。分段存储主要是把空洞信息存储到不同的 Entry 中去,这样能避免超过一个 Entry 最大消息 5MB 的限制。
如果我们把空洞信息拆分为多个 Entry 来存储,首先面临的问题是索引。因为使用单个 Entry 记录时,只需要读取 Ledger 中最后一个 Entry 即可,而拆分为多个 Entry 后,我们不知道要读取多少个 Entry。因此,新方案中引入了 Marker,如下图所示:
当所有的 Entry 保存完成后,插入一个 Marker,Marker 是一个特殊的 Entry,记录了当前所有拆分存储的 Entry。当数据恢复时,从后往前读,先读出索引,然后再根据索引读取所有的 Entry。
由于存储涉及到多个 Entry,因此需要保证原子性,只要最后一个 Entry 读出来不是 Marker,则说明上次的保存没有完成就中断了,会继续往前读,直到找到一个完整的 Marker。
空洞信息的存储,也不需要每次全量了。以 Ledger 为单位,记录每个 Ledger 下的数据是否有修改过,如果空洞数据被修改过会被标识为脏数据,存储时只会保存有脏数据的部分,然后修改 Marker 中的索引。
假设 Entry-2 中存储的空洞信息有修改,则 Entry-2 会被标记为脏数据,下次存储时,只需要存储一个 Entry-2,再存储一个 Marker 即可。只有当整个 Ledger 写满的情况下,才会触发 Marker 中所有 Entry 复制到新 Ledger 的情况。如下图所示:
ManagedLedger 在内存中通过 LinkedHashMap 实现了一个 LRU 链表,会有线程定时检查空洞信息的内存占用是否已经达到阈值,如果达到了阈值则需要进行 LRU 换出,切换以 Ledger 为单位,把最少使用的数据从 Map 中移除。LRU 数据的换入是同步的,当添加或者调用 Contains 时,发现 Marker 中存在这个 Ledger 的索引,但是内存中没有对应的数据,则会触发同步数据的加载。异步换出和同步换入,主要是为了让数据尽量在内存中多待一会,避免出现频繁的换入换出。
后记
Pulsar 中的设计细节非常多,由于篇幅有限,作者会整理一系列的文章进行技术分享,敬请期待。如果各位希望系统性地学习 Pulsar,可以购买作者出版的新书《深入解析 Apache Pulsar》。
▊《深入解析 Apache Pulsar》
林琳 著
详解 ApachePulsar 源码,深入分析背后原理与实现,实战 Pulsar 线上问题处理
本书由浅入深地讲解了 Apache Pulsar 中各个组件的使用方式及内部实现原理,通过阅读本书,读者可以快速、轻松地了解 Apache Pulsar 内部的运行机制。
第 1 章介绍 Apache Pulsar 的背景,以及如何快速部署一个 Apache Pulsar 服务。第 2 章介绍 Apache Pulsar 客户端的实现机制与原理,包括生产者、消费者、管理流客户端等。第 3 章介绍 Apache Pulsar 中最重要的逻辑组件—Broker,读者通过这部分内容可以了解 Broker 所有的特性。除了最基础的收发消息,Apache Pulsar 还能进行轻量级的函数计算、数据流转。第 4 章详细介绍 Apache Pulsar 的 Function 和 Pulsar IO (Connector)。第 5 章介绍 Apache Pulsar 的存储层—BookKeeper,通过对本章的学习,读者可以了解 Apache Pulsar 的数据存储模型及流程实现。第 6 章介绍线上实战的一些经验,包括高可用、扩/缩容、资源隔离等。
评论