写点什么

智联招聘 × Pulsar|Pulsar 客户端在高吞吐场景下的内存控制实践

作者:AscentStream
  • 2025-08-27
    上海
  • 本文字数:5300 字

    阅读完需:约 17 分钟

作者:智联招聘 汪苏诚(萧易客)

1. 背景介绍

在现代分布式系统架构中,消息队列作为核心组件承担着系统解耦、流量削峰和异步处理的重要职责。Apache Pulsar 作为云原生时代的统一消息平台,其在高并发、大数据量场景下的表现直接影响着整个系统的稳定性和性能。这其中容易忽视的是在高吞吐场景下的客户端的表现。


在高吞吐的业务场景下,会面临以下技术痛点


  • 内存溢出风险:高频大消息发送导致客户端内存缓冲区快速增长

  • 线程阻塞问题:不当的配置可能导致异步发送退化为同步阻塞

  • 资源竞争冲突:Producer 和 Consumer 共享内存限制导致相互影响

  • 监控和调优困难:缺乏有效的内存使用监控和调优策略


因此,本篇文章结合了我们对 Pulsar 客户端内存控制的一些理解和实践,希望帮助对大家构建高性能、高可用的分布式系统有所帮助。

2. 消息发送模式分析

在实际业务场景里,有阻塞非阻塞两种消息发送方式。我们可以按照具体需求,灵活做出选择 。

2.1 同步发送的局限性

同步发送(Blocking Send)是 Pulsar 客户端最基础的消息发送方式。在这种模式下,发送线程会一直阻塞,直到收到 Broker 的确认响应。


代码示例:


// 初始化 pulsarClient 和 producerPulsarClient pulsarClient =    PulsarClient.builder().serviceUrl("pulsar://xxx:6650").build();Producer<byte[]> producer =    pulsarClient.newProducer().topic("topic_name").create();
// 同步方式发送消息try { MessageId messageId = producer.send(/* 消息内容 */); log.debug("...");} catch (Exception e) { log.error("...", e);}
复制代码


在我们的业务中,需要在用户发起搜索请求时,记录本次搜索请求的事件,以便后续进行热点分析和搜索优化。这里的搜索请求事件包含了搜索请求的上下文信息,例如搜索词等,而这一过程本身其实不会反馈到用户的搜索结果中。然后使用同步方式发送,必然会导致主线程阻塞,增加整体搜索延迟,这样会影响用户体验,同时也会给搜索执行过程带来更多的不确定性。


在这种情况下,我们需要明确区分主线流程和支线流程。主线流程是用户的核心交互路径,而记录搜索请求等操作属于支线流程。将支线流程嵌套在主线流程中不仅会增加系统的复杂性,还可能影响用户体验。因此,异步发送可能是一个更好的选择。

2.2 异步发送的优势

于是,我们可以优化一下,调整为非阻塞方式,将记录搜索事件放到其它线程中完成:


producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {      if (ex != null) {          log.error("Failed to record search activity", ex);      } else {          log.debug("Search activity messageId={}", msgId);      }});
复制代码


异步发送最大的优势在于其非阻塞特性。异步发送允许客户端并行处理多个发送请求,充分利用网络带宽和处理器资源。异步发送也天然支持消息批处理,同时异步模式也提供了更加灵活的背压处理机制。

2.3 高吞吐场景下数据发送的问题

在现实中,若用户搜索的 TPS 较高,例如在单实例上可以超过 1000 QPS(高和低都是相对而言的,这里只是举个例子)。若恰好记录的搜索事件内容较多(例如包含了搜索请求的完整上下文和搜索结果等),序列化之后大小能达到 100KB 甚至 1MB,那么上面代码在运行时你可能会遇到MemoryBufferIsFullError异常:


org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full  at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:972)  at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)  at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)  at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)  at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)
复制代码


另外若服务本身与 Pulsar Broker 之间出现了网络波动,或者 Pulsar 服务内部组件之间出现网络波动,导致整体 Producer 写入延迟升高,亦或是短时间出现大量写入,你还可能会遇到ProducerQueueIsFullError异常:


org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full  at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:965)  at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)  at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)  at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)  at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)
复制代码

3. Producer 端内存控制机制

3.1 基于消息数量的控制

  • maxPendingMessages 配置

  • maxPendingMessagesAcrossPartitions 配置


下面我们对上面两种异常产生的原因作一下分析,我们先来看一下构建 Producer 时,ProducerBuilder 中与内存使用有关的配置项:


/* * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. */ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);
/* * Set the number of max pending messages across all partitions. */ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
复制代码


maxPendingMessages用来控制 producer 内部队列中正在发送还没有接收到 broker 确认的消息数量,若队列大小超出了这个限制,默认的行为就是抛出ProducerQueueIsFullError异常,你可以通过修改另外一个配置blockIfQueueFull=true调整为阻塞等待队列中空出新的空间,这里还有另外需要注意的地方在下面会细说。


maxPendingMessages这个配置实际上是直接传递给底层各个分区的内部 producer 的,对于多分区的 topic,实际处于 pending 状态的最大消息数量是maxPendingMessages乘以 topic 分区数量。由于maxPendingMessages结合可变的 topic 分区数量使得最终的 pending 消息数量变得不可控,因此还有另外一个优先级更高的配置maxPendingMessagesAcrossPartitions用来控制整个 topic 所有分区的总的一个 pending 消息数量,最终到各个分区内部 producer 取 maxPendingMessagesmaxPendingMessagesAcrossPartitions / partitions的较小值。

3.2 基于内存大小的控制

  • memoryLimit 配置

  • PIP-74 改进


然而,在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用,开发者需要预估平均消息大小,这几乎不可能做到,因为消息的实际大小很可能会随着业务演进而发生变化,因此在PIP-74 中,在构建 PulsarClient 时,ClientBuilder提供了一个面向整个 client 实例统一的内存限制配置:


/* * Configure a limit on the amount of memory that will be allocated by this client instance. * * Setting this to 0 will disable the limit. */ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
复制代码


当客户端所有 producer 中所有 pending 的消息大小总和超过这个限制时,默认则会抛出MemoryBufferIsFullError异常,若同时配置了blockIfQueueFull=true,则当前线程会阻塞等待前面 pending 的消息发送完成。

3.3 阻塞与非阻塞模式

  • blockIfQueueFull 配置

  • 使用建议


前面提到关于blockIfQueueFull配置的使用有一个细节需要注意,这个配置是为了限制客户端 producer 内存使用的同时,让开发者简化处理队列或者内存 buffer 满了的情况可以继续发送消息,例如在一个后台定时任务的场景中批量发送消息。然而这里需要强调的是 blockIfQueueFull 一旦配置为 true,不论是应用发送消息调用的是阻塞的 Producer.send 方法还是非阻塞的 Producer.sendAsync 方法都会出现阻塞等待,“卡”住当前线程,那么对于我们上面的业务来说这是不可接受的,若由于支线流程(特殊情况容忍丢失的用户搜索事件)异常抖动,阻塞了主线流程(搜索主线程)就得不偿失了。


// 注意:若producer配置了blockIfQueueFull=true,// 当发送队列满或者内存buffer满,当前线程将卡在sendAsync方法调用producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {      if (ex != null) {          log.error("Failed to record search activity", ex);      } else {          log.debug("Search activity messageId={}", msgId);      }});
复制代码


PIP-1202.10.0以及之后版本的客户端中,默认启用了 memoryLimit 配置,其默认值为64MB,同时默认禁用了maxPendingMessagesmaxPendingMessagesAcrossPartitions配置(默认值修改为 0),另外将maxPendingMessagesAcrossPartitions配置标记为了Deprecated,因为使用这个配置最终目的就是控制客户端 producer 的内存使用,现在已经有 memoryLimit 这个更加直接的配置可以替代了。

4. Consumer 端内存控制

4.1 Consumer 端的内存使用特点

上面说的全部都是围绕着 Producer 侧的内存使用来讲的,其实在 PIP-74 中也提到了 Pulsar 客户端 consumer 侧的内存使用,只不过在实现中是分阶段进行的。


我们先来看一下 Pulsar 客户端的 API 早期在构造一个 Consumer 时,ConsumerBuilder 提供的与内存使用有关的选项:


/* * Sets the size of the consumer receive queue. * * (default: 1000) */ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);
/* * Sets the max total receiver queue size across partitions. * * (default: 50000) */ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);
复制代码


Pulsar 客户端通过预接收队列临时存放 broker 推送过来的消息,以便应用程序调用Consumer#receive或者Consumer#receiveAsync方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。上面两个选项是给这个“空间”设置一个数量上的上限,注意这里仅是数量上的上限,实际的内存空间使用还要取决于平均消息大小。receiverQueueSize控制每个分区 consumer 的接收队列大小,maxTotalReceiverQueueSizeAcrossPartitions来控制所有分区 consumer 和 parent consumer 的接收队列总大小。


前面提到receiverQueueSizemaxTotalReceiverQueueSizeAcrossPartitions参数是以数量的形式间接的控制 Consumer 预接收队列的内存使用,在 PIP-74 中提出了整个 client 级别的 memoryLimit,同时提出了一个新的控制 Consumer 内存使用的方案,就是autoScaledReceiverQueueSizeEnabled:


/* * If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default,   * and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client   * memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}. */ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
复制代码


当启用了这个特性之后,receiverQueueSize会从 1 开始呈 2 的指数倍增长,直至达到receiverQueueSize的限制或达到 client 的 memoryLimit 限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

4.2 Consumer 端的内存注意事项

除了上面说的 Producer 和 Consumer 在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建 Consumer 时ackTimeoutackTimeoutTickTime的配置如果不匹配,会消耗较多堆内内存。


/* * Sets the timeout for unacknowledged messages, truncated to the nearest millisecond. The timeout must be greater than 1 second. */ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
/** * Define the granularity of the ack-timeout redelivery. * * <p>By default, the tick time is set to 1 second. Using a higher tick time * reduces the memory overhead to track messages when the ack-timeout is set to * bigger values (e.g., 1 hour). */ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
复制代码


若 Consumer 配置了 ackTimeout 并且配置了较大的时间窗口(例如 1 小时或者更长)时,应适当的调大ackTimeoutTickTime,这是因为 Consumer 内部使用了一个简单时间轮的算法,若 ackTimeout 时间窗口很大,ackTimeoutTickTime仍然使用其默认值1s,时间轮本身将会占用大量堆内存空间。

5. 最佳实践小结

  1. 使用sendAsync非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull之后,它会在特定情况下演变成阻塞方法。

  2. 对于同时使用到了 Producer 和 Consumer 的应用,推荐创建两个 client,分别用来创建 Producer 和 Consumer,避免由于共用 memoryLimit 导致相互影响。

参考

用户头像

AscentStream

关注

还未添加个人签名 2017-10-19 加入

还未添加个人简介

评论

发布
暂无评论
智联招聘 × Pulsar|Pulsar 客户端在高吞吐场景下的内存控制实践_消息队列_AscentStream_InfoQ写作社区