智联招聘 × Pulsar|Pulsar 客户端在高吞吐场景下的内存控制实践
作者:智联招聘 汪苏诚(萧易客)
1. 背景介绍
在现代分布式系统架构中,消息队列作为核心组件承担着系统解耦、流量削峰和异步处理的重要职责。Apache Pulsar 作为云原生时代的统一消息平台,其在高并发、大数据量场景下的表现直接影响着整个系统的稳定性和性能。这其中容易忽视的是在高吞吐场景下的客户端的表现。
在高吞吐的业务场景下,会面临以下技术痛点 :
内存溢出风险:高频大消息发送导致客户端内存缓冲区快速增长
线程阻塞问题:不当的配置可能导致异步发送退化为同步阻塞
资源竞争冲突:Producer 和 Consumer 共享内存限制导致相互影响
监控和调优困难:缺乏有效的内存使用监控和调优策略
因此,本篇文章结合了我们对 Pulsar 客户端内存控制的一些理解和实践,希望帮助对大家构建高性能、高可用的分布式系统有所帮助。
2. 消息发送模式分析
在实际业务场景里,有阻塞和非阻塞两种消息发送方式。我们可以按照具体需求,灵活做出选择 。
2.1 同步发送的局限性
同步发送(Blocking Send)是 Pulsar 客户端最基础的消息发送方式。在这种模式下,发送线程会一直阻塞,直到收到 Broker 的确认响应。
代码示例:
在我们的业务中,需要在用户发起搜索请求时,记录本次搜索请求的事件,以便后续进行热点分析和搜索优化。这里的搜索请求事件包含了搜索请求的上下文信息,例如搜索词等,而这一过程本身其实不会反馈到用户的搜索结果中。然后使用同步方式发送,必然会导致主线程阻塞,增加整体搜索延迟,这样会影响用户体验,同时也会给搜索执行过程带来更多的不确定性。
在这种情况下,我们需要明确区分主线流程和支线流程。主线流程是用户的核心交互路径,而记录搜索请求等操作属于支线流程。将支线流程嵌套在主线流程中不仅会增加系统的复杂性,还可能影响用户体验。因此,异步发送可能是一个更好的选择。
2.2 异步发送的优势
于是,我们可以优化一下,调整为非阻塞方式,将记录搜索事件放到其它线程中完成:
异步发送最大的优势在于其非阻塞特性。异步发送允许客户端并行处理多个发送请求,充分利用网络带宽和处理器资源。异步发送也天然支持消息批处理,同时异步模式也提供了更加灵活的背压处理机制。
2.3 高吞吐场景下数据发送的问题
在现实中,若用户搜索的 TPS 较高,例如在单实例上可以超过 1000 QPS(高和低都是相对而言的,这里只是举个例子)。若恰好记录的搜索事件内容较多(例如包含了搜索请求的完整上下文和搜索结果等),序列化之后大小能达到 100KB 甚至 1MB,那么上面代码在运行时你可能会遇到MemoryBufferIsFullError
异常:
另外若服务本身与 Pulsar Broker 之间出现了网络波动,或者 Pulsar 服务内部组件之间出现网络波动,导致整体 Producer 写入延迟升高,亦或是短时间出现大量写入,你还可能会遇到ProducerQueueIsFullError
异常:
3. Producer 端内存控制机制
3.1 基于消息数量的控制
maxPendingMessages 配置
maxPendingMessagesAcrossPartitions 配置
下面我们对上面两种异常产生的原因作一下分析,我们先来看一下构建 Producer 时,ProducerBuilder 中与内存使用有关的配置项:
maxPendingMessages
用来控制 producer 内部队列中正在发送还没有接收到 broker 确认的消息数量,若队列大小超出了这个限制,默认的行为就是抛出ProducerQueueIsFullError
异常,你可以通过修改另外一个配置blockIfQueueFull=true
调整为阻塞等待队列中空出新的空间,这里还有另外需要注意的地方在下面会细说。
maxPendingMessages
这个配置实际上是直接传递给底层各个分区的内部 producer 的,对于多分区的 topic,实际处于 pending 状态的最大消息数量是maxPendingMessages
乘以 topic 分区数量。由于maxPendingMessages
结合可变的 topic 分区数量使得最终的 pending 消息数量变得不可控,因此还有另外一个优先级更高的配置maxPendingMessagesAcrossPartitions
用来控制整个 topic 所有分区的总的一个 pending 消息数量,最终到各个分区内部 producer 取 maxPendingMessages
和 maxPendingMessagesAcrossPartitions / partitions
的较小值。
3.2 基于内存大小的控制
memoryLimit 配置
PIP-74 改进
然而,在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用,开发者需要预估平均消息大小,这几乎不可能做到,因为消息的实际大小很可能会随着业务演进而发生变化,因此在PIP-74 中,在构建 PulsarClient 时,ClientBuilder提供了一个面向整个 client 实例统一的内存限制配置:
当客户端所有 producer 中所有 pending 的消息大小总和超过这个限制时,默认则会抛出MemoryBufferIsFullError
异常,若同时配置了blockIfQueueFull=true
,则当前线程会阻塞等待前面 pending 的消息发送完成。
3.3 阻塞与非阻塞模式
blockIfQueueFull 配置
使用建议
前面提到关于blockIfQueueFull
配置的使用有一个细节需要注意,这个配置是为了限制客户端 producer 内存使用的同时,让开发者简化处理队列或者内存 buffer 满了的情况可以继续发送消息,例如在一个后台定时任务的场景中批量发送消息。然而这里需要强调的是 blockIfQueueFull 一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send
方法还是非阻塞的 Producer.sendAsync
方法都会出现阻塞等待,“卡”住当前线程,那么对于我们上面的业务来说这是不可接受的,若由于支线流程(特殊情况容忍丢失的用户搜索事件)异常抖动,阻塞了主线流程(搜索主线程)就得不偿失了。
PIP-120对2.10.0
以及之后版本的客户端中,默认启用了 memoryLimit 配置,其默认值为64MB
,同时默认禁用了maxPendingMessages
和maxPendingMessagesAcrossPartitions
配置(默认值修改为 0),另外将maxPendingMessagesAcrossPartitions
配置标记为了Deprecated
,因为使用这个配置最终目的就是控制客户端 producer 的内存使用,现在已经有 memoryLimit 这个更加直接的配置可以替代了。
4. Consumer 端内存控制
4.1 Consumer 端的内存使用特点
上面说的全部都是围绕着 Producer 侧的内存使用来讲的,其实在 PIP-74 中也提到了 Pulsar 客户端 consumer 侧的内存使用,只不过在实现中是分阶段进行的。
我们先来看一下 Pulsar 客户端的 API 早期在构造一个 Consumer 时,ConsumerBuilder 提供的与内存使用有关的选项:
Pulsar 客户端通过预接收队列临时存放 broker 推送过来的消息,以便应用程序调用Consumer#receive
或者Consumer#receiveAsync
方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。上面两个选项是给这个“空间”设置一个数量上的上限,注意这里仅是数量上的上限,实际的内存空间使用还要取决于平均消息大小。receiverQueueSize
控制每个分区 consumer 的接收队列大小,maxTotalReceiverQueueSizeAcrossPartitions
来控制所有分区 consumer 和 parent consumer 的接收队列总大小。
前面提到receiverQueueSize
和maxTotalReceiverQueueSizeAcrossPartitions
参数是以数量的形式间接的控制 Consumer 预接收队列的内存使用,在 PIP-74 中提出了整个 client 级别的 memoryLimit,同时提出了一个新的控制 Consumer 内存使用的方案,就是autoScaledReceiverQueueSizeEnabled
:
当启用了这个特性之后,receiverQueueSize
会从 1 开始呈 2 的指数倍增长,直至达到receiverQueueSize
的限制或达到 client 的 memoryLimit 限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。
4.2 Consumer 端的内存注意事项
除了上面说的 Producer 和 Consumer 在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建 Consumer 时ackTimeout
和ackTimeoutTickTime
的配置如果不匹配,会消耗较多堆内内存。
若 Consumer 配置了 ackTimeout 并且配置了较大的时间窗口(例如 1 小时或者更长)时,应适当的调大ackTimeoutTickTime
,这是因为 Consumer 内部使用了一个简单时间轮的算法,若 ackTimeout 时间窗口很大,ackTimeoutTickTime
仍然使用其默认值1s
,时间轮本身将会占用大量堆内存空间。
5. 最佳实践小结
使用
sendAsync
非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull
之后,它会在特定情况下演变成阻塞方法。对于同时使用到了 Producer 和 Consumer 的应用,推荐创建两个 client,分别用来创建 Producer 和 Consumer,避免由于共用 memoryLimit 导致相互影响。
评论