写点什么

技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上)

作者:AscentStream
  • 2025-08-13
    上海
  • 本文字数:5397 字

    阅读完需:约 18 分钟

技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上)

在 Pulsar broker 中, 消息的 Retention, Expiry 和 Backlog quota 是比较重要的功能,它们表现的是 Pulsar 对于流经它的数据的管理。


但是受限于复杂度和文档语言等因素,使用者可能无法在第一时间很直观的了解它们。


因此本文将对这三个功能进行详细的介绍,包括概念、行为、应用、实现和注意事项等方面,希望能够对大家有所帮助。


另外,这三个特性属于 Pulsar 的高级特性,阅读本文之前,建议先对 Pulsar 的基本概念有所了解。


文章较长,并且偏向工具类,建议大家先收藏,如果暂时没耐心看完,也可以在后续时间慢慢阅读。


话不多说,直接开冲!

一、Retention

1. 概念

Retention 是 Pulsar 对消息的保留策略,它针对于所有消息,包含已经消费的消息和未消费的消息。


Pulsar 的消息保留略有一点反直觉,我们一般会认为消息保留针对已经消费完毕的消息,在消费完毕后保留一段时间之后再进行清理,腾出磁盘空间。很多资料以及之前版本的 Pulsar 官网都是这样理解和表达的,但是实际并非如此。


Retention 首先保证的是消息回溯,比如将保留策略设置为 3 天,用户一定能够回溯 3 天之内的消息。在此之后再考虑删除更早的消息,以腾出磁盘空间。


由于不同 Subscription 的消费进度可能不同,有的较快,有的较慢,因此 Pulsar 对数据的删除是以该 Topic 中消费最慢的 Subscription 为基准,被它消费过后的数据才是可删除的。但是,删除与否还需要看 Retention 如何设置。同时,无论未被消费的消息是否超出了 Retention 的限制,它们都会保留,不会删除。


总结一下:如果未消费的消息超出了 Retention 限制,则保留所有未消费的消息,删除所有已消费的消息;如果未被消费的消息没有超出 Retention 限制,则保留所有未消费的消息以及一部分已经消费的消息,删除其余已消费的消息。



2. 行为

Pulsar 的数据清理由定时任务驱动,每隔一段时间 Broker 会检查当前实例上的所有 Topic,如果 Topic 设置了 Retention 策略,它会根据相应的策略来检查该 Topic 中的数据,并清理数据。Retention 策略包含了两项限制,分别是:retentionTimeInMinutes(单位:分钟)和retentionSizeInMB(单位:MB)。如果有任意一项条件满足,Pulsar 将执行清理流程。


数据清理的维度在 Topic 级别,粒度在 Ledger 级别。即:


  1. 如果旧数据清理完成,该 Topic 所有的 Subscription 的所有 Consumer 将无法消费已经清理的旧数据,即使调用seekAPI 重置游标;

  2. Broker 无法按照retentionTimeInMinutesretentionSizeInMB来完全精确地清理磁盘,Pulsar 对于数据的管理是在 Ledger 维度,因此,如果在一个 Ledger 中有些数据应该被清理而另一些数据应该被保留,这个 Ledger 也会被保留,不做清理。在最差的情况下,会为每个 Topic/Partition 额外保存 2GB 数据(精度取决于broker.confmanagedLedgerMaxSizePerLedgerMbytes(default=2GB))。


对于消费者们来说,数据清理是在清理流程之后立即生效的,也就意味着在清理流程完成之后,Consumer 便无法消费更早的数据;但是在磁盘的角度来看,Pulsar 的数据清理流程和实际的磁盘空间清理之间会有一个短暂延迟,这是因为磁盘清理流程是由 Bookkeeper 异步调度的。这个延迟通常不会太久,在 Pulsar 的清理流程完成之后,很快就能看到数据磁盘空间的释放。

3. 应用

a.监控

Pulsar 并未专门为 Retention 暴露指标,但是可以通过一些指标来间接的监控 Retention 的情况:


  1. pulsar_storage_size:该指标表示 Pulsar 的存储空间使用情况,如果该指标持续增长,可能是 Retention 策略没有生效,需要检查一下 Retention 策略是否设置正确。正确的 Retention 策略应该是周期性的清理旧数据,pulsar_storage_size表现为周期性的上下波动;

  2. Topic internal stats: Pulsar 提供了一些 Topic 的内部指标,它暴露了当前 Topic 的 Ledger 大小,Ledger 关闭时间等指标,可以通过这些指标间接监控 Topic 的数据清理情况(通过sizetimestamp计算该 Ledger 是否应该删除):


pulsar-admin topics stats-internal persistent://my-tenant/my-ns/my-topic{    ...  "ledgers" : [   {    "ledgerId" : 20,    "entries" : 50000,    "size" : 500000,    "timestamp": 1627584000000  },   {    "ledgerId" : 21,    "entries" : 50000,    "size" : 500000,    "timestamp": 1627585000000   }   ],    ...}
复制代码

b. 设置

Retention 的设置分为namespacetopic两个级别(实际上有 3 个,还有一种是 Broker 级别,本文不讨论该级别)。在 Namespace 级别设置了 Retention 之后,该 Namespace 的所有 Topic 都会继承它;但是我们可以在 Topic 级别对其进行覆盖,使该 Topic 使用自定义的 Retention 策略。


  1. Namespace 级别

  • 查看 Namespace 的 Retention 策略


pulsar-admin namespaces get-retention my-tenant/my-ns{  "retentionTimeInMinutes": 10,  "retentionSizeInMB": 500}
复制代码


  • 设置 Namespace 的 Retention 策略


pulsar-admin namespaces set-retention my-tenant/my-ns --time 10 --size 500
复制代码


  • 删除 Namespace 的 Retention 策略


pulsar-admin namespaces remove-retention my-tenant/my-ns
复制代码


  1. Topic 级别

  • 查看 Topic 的 Retention 策略


pulsar-admin topicPolicies get-retention persistent://my-tenant/my-ns/my-topic{  "retentionTimeInMinutes": 10,  "retentionSizeInMB": 500}
复制代码


  • 设置 Topic 的 Retention 策略


pulsar-admin topicPolicies set-retention persistent://my-tenant/my-ns/my-topic --time 10 --size 500
复制代码


  • 删除 Topic 的 Retention 策略


pulsar-admin topicPolicies remove-retention persistent://my-tenant/my-ns/my-topic
复制代码

4. 实现

Retention 执行的入口是BrokerService#startConsumedLedgersMonitor(), 这里不对实现做详细分析,只是简单的介绍一下 Retention 策略的执行流程,如果各位感兴趣,可以自行查看源码。

a. Retention Checker 初始化

Pulsar broker 启动时,向线程池注册一个定时任务,执行周期是broker.confretentionCheckIntervalInSeconds(default=120s)。Broker 每 120s 会检查一次所有的 Topic,如果 Topic 设置了 Retention 策略,则执行数据清理。

b. 清理流程

Retention 的执行流程如下:


  1. 遍历所有的 Topic,如果 Topic 设置了 Retention 策略,执行后续流程。否则,跳过该 Topic;

  2. 找到该 Topic 中消费最慢的 Subscription,得到它的消费位置;

  3. 根据上一步得到的消费位置,找出在它之前的所有 Ledger,得到所有消费完毕的 Ledger 列表;

  4. 由创建时间从远到近遍历上一步得到 Ledger 列表,并根据当前 Ledger 的元数据做相应的计算。注意:当前正在写入的 Ledger 不会进入清理流程:

  5. 累加当前 Ledger 的 size,如果 Topic 数据的总大小减去累加的 size 大于等于retentionSizeInMB,将该 Ledger 加入待清理列表;

  6. 根据当前时间和 Ledger 元数据中timestamp(即Ledegr的关闭时间)计算当前 Ledger 的存活时间(当前时间减去 Ledger 关闭时间),如果存活时间大于retentionTimeInMinutes,将该 Ledger 加入待清理列表;

  7. 待清理列表中的 Ledger 分为两部分,一部分是存在于 Bookkeeper 中,另一部分是存在于 Tiered storage 的中(如果设置了 Ledger offload)。 针对这两部分 Ledger,Pulsar 会分别调用不同的清理接口(Bookkeeper 或者 Tiered storage 接口)清理数据。

5. 注意事项

  • 如果 Namespace 和 Topic 都设置了 Retention 策略,Topic 的 Retention 策略会覆盖 Namespace 的 Retention 策略;

  • 如果 Namespace 设置了 Retention 策略,但是 Topic 没有设置,Topic 会继承 Namespace 的 Retention 策略;

  • 如果 Topic 没有一个持久订阅,不对该 Topic 的数据做任何保留,所有数据将被删除;

  • retentionTimeInMinutesretentionSizeInMB的单位分别是分钟和 MB,它们之间的关系如下所示:



  • 如果 Topic 中有长时间未活跃的过期 Subscription,会导致数据无法被清理。如果发现这种情况,可以通过以下命令来手动删除过期的 Subscription:


pulsar-admin topics unsubscribe persistent://my-tenant/my-ns/my-topic --subscription my-sub
复制代码

二、Expiry

1. 概念

Expiry 定义了一种数据的过期策略,在 Pulsar 中,它通常的表现形式是 TTL(Time to live)。


在某些场景中,数据具有时效性,如果数据生产了一段时间之后还没有被消费者消费,后面再继续去消费它就不再具有业务意义。


为了满足此类应用场景,Pulsar 提供了 Expiry 机制。用户可以为一个 Topic 设置消息的存活时间(TTL),如果消息超过了这个时间还未被消费,在 Expiry 机制的作用下,它们会被认定过期并且被 Pulsar 自动确认,消费者不会再收到这些消息。


2. 行为

和工作在 Topic 级别的 Retention 不同,Expiry 工作在 Subscription 级别。它同样由定时任务驱动,每隔一段时间 Broker 会检查当前实例上的所有 Topic,如果设置了该 Topic 的messageTTL,Pulsar 会根据messageTTL检查该 Topic 下的所有 Subscription 中的数据,如果发现有消息超时未被消费,Pulsar 将自动移动 Subscription 的游标(自动确认超时消息),使得过期消息对消费者不可见。

3. 应用

a. 监控

  1. 在 Prometheus 级别,Pulsar 为 Expiry 暴露了pulsar_subscription_msg_rate_expiredpulsar_subscription_total_msg_expired 等指标, 一般来说只需要关注pulsar_subscription_msg_rate_expired 即可, 它表示每秒过期的消息数量。如果它始终为 0,可能是 Expiry 机制没有生效或者所有消息都被及时消费;

  2. 同样可以通过Topic stats 来监控 Expiry 的情况:


pulsar-admin topics stats persistent://my-tenant/my-ns/my-topic{  ...  "msgBacklog" : 0,  "msgRateExpired" : 0.0,  ..."subscriptions" : { "test_sub" : {     ...   "msgRateExpired" : 0.0, // 每秒过期的消息数量   "totalMsgExpired" : 0,  // 总共过期的消息数量   "lastExpireTimestamp" : 0,     ... }},}
复制代码

b. 设置

和 Retention 一样,Expiry 机制的设置也分为namespacetopic两个级别。在 Namespace 级别设置了之后,该 Namespace 的所有 Topic 都会继承该策略;在 Topic 级别设置了之后,会覆盖 Namespace 的设置。


  1. Namespace 级别

  • 查看 Namespace 的 Expiry 策略


pulsar-admin namespaces get-message-ttl my-tenant/my-ns{  "messageTTLInSeconds": 10}
复制代码


  • 设置 Namespace 的 Expiry 策略


pulsar-admin namespaces set-message-ttl my-tenant/my-ns --messageTTLInSeconds 10
复制代码


  • 删除 Namespace 的 Expiry 策略


pulsar-admin namespaces remove-message-ttl my-tenant/my-ns
复制代码


  1. Topic 级别

  • 查看当前 Topic 的 Expiry 策略


pulsar-admin topicPolicies get-message-ttl persistent://my-tenant/my-ns/my-topic{  "messageTTLInSeconds": 10}
复制代码


  • 设置 Topic 的 Expiry 策略


pulsar-admin topicPolicies set-message-ttl persistent://my-tenant/my-ns/my-topic --messageTTLInSeconds 10
复制代码


  • 删除 Topic 的 Expiry 策略


pulsar-admin topicPolicies remove-message-ttl persistent://my-tenant/my-ns/my-topic
复制代码

4. 实现

Message Expiry 的实现的入口是BrokerService#startMessageExpiryMonitor(),这里不对实现做详细分析,只是简单的介绍一下 Expiry 策略的执行流程,如果各位感兴趣,可自行查看源码。

a. Expiry Checker 初始化

在 Pulsar broker 启动时,向线程池注册一个定时任务,定时任务的执行周期是broker.confmessageExpiryCheckIntervalInMinutes(default=5m)


Broker 每 5 分钟检查一次所有的 Topic,如果 Topic 设置了messageTTL,则执行 Expiry 检查。

b. Expiry 执行流程

  1. 遍历所有 Topic,如果该 Topic 设置了messageTTL,执行后续流程。否则,跳过该 Topic;

  2. 遍历所有 Subscription,如果该 Subscription 满足条件(有消息积压、有活跃的 Consumer、最早未消费的消息已经过期),执行后续流程。否则,跳过该 Subscription;

  3. 获取当前 Broker 的时间戳,再根据messageTTL计算出消息的过期时间戳;

  4. 根据上一步中计算出的过期消息时间戳,使用二分查找算法找到相应的消息位置;

  5. 移动 Subscription 的游标到过期消息的位置,使得过期消息对消费者不可见。

5. 注意事项

在使用 Expiry 机制的时候,需要注意以下几点:


  • 客户端需要保证本地时钟和 Broker 时钟同步,否则可能会导致消息过早或者过晚过期,因为对于过期消息的判定是以 Broker 时间为基准的;

  • Expiry 机制只会移动游标,不会删除消息,因此过期消息仍然会占用磁盘空间,如果需要删除过期消息,需要使用 Retention 机制;

  • Expiry 机制只会移动游标,不会删除消息,即使过期消息被移动到游标之后,消费者仍然可以通过seekAPI 来消费过期消息;

  • 由于 Expiry 机制对于过期消息位置的查找是使用二分查找算法,因此在消息量较大的情况下,可能会导致性能问题;

  • 同样因为 Expiry 的二分查找机制,如果消息的时间戳是乱序的(在使用 Shared 模式的 Producer 的情况下,这很可能发生), 可能会导致 Expiry 机制无法正确的移动游标,从而导致某些过期消息会被继续消费。如果遇到这种情况,可以通过设置broker.confbrokerEntryMetadataInterceptorsorg.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor解决;

  • Expiry 由定时任务驱动,因此它的精度会存在一定限制,无法保证 100%准确。

四、总结

  • Retention 是 Pulsar 对于过期数据的保留和清理策略,它工作在 Topic 级别,通过定时任务清理过期数据,将全部 Subscription 都消费过后的数据从存储介质上删除来清理存储空间;

  • Expiry 即为 Message TTL,它工作在 Subscription 级别,通过定时任务来检查 Subscription 中超时未消费的消息,并自动的将这些消息确认,使其对消费者不可见;


本系列下篇将为大家带来关于 Backlog quota 的解析,欢迎关注我们,第一时间获取相关动态;也欢迎加入社群讨论或在评论区留言,与我们交流更多关于 Pulsar 的问题。

用户头像

AscentStream

关注

还未添加个人签名 2017-10-19 加入

还未添加个人简介

评论

发布
暂无评论
技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上)_消息队列_AscentStream_InfoQ写作社区