写点什么

[Pulsar] Producer 流控

作者:Zike Yang
  • 2021 年 12 月 17 日
  • 本文字数:1302 字

    阅读完需:约 4 分钟

我们在使用 Pulsar Producer 的时候,可能需要对发送给 Broker 的流量进行控制,目前 Pulsar Client 主要提供两种流控方式:按照消息数量进行流控和按照消息大小进行流控,这两种方式可以共同使用。本文讲述这两种流控方式的实现原理。


按照消息数量流控

这是 Pulsar 最早引入的 Producer 流控方案,目的是控制正在发往 Broker 的消息数量。主要通过 ProducerImpl 的 semaphore 进行实现。

初始化 semaphore 的代码如下:

if (conf.getMaxPendingMessages() > 0) {  this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true));} else {  this.semaphore = Optional.empty();}
复制代码

semaphore 是信号量会被设置为 maxPedingMessages,表示同时发送消息的最大数量。

if (conf.isBlockIfQueueFull()) {  if (semaphore.isPresent()) {    semaphore.get().acquire();  }  client.getMemoryLimitController().reserveMemory(payloadSize);} else {  if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));    return false;  }
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) { semaphore.ifPresent(Semaphore::release); callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId)); return false; }}
复制代码

如上所示,当 Prodcuer 要发送消息前,就会走上面的逻辑。

Producer 带有 isBlockIfQueueFull 设置,这代表是否在当前队列已满时(即触发流控时),阻塞当前线程。

首先会获取 semaphore,如果 isBlockIfQueueFull 开启,获取时如果当前队列已满,则等待知道当前队列有空位;否则,当队列已满,则返回给用户错误信息。


按照消息大小的流控

按照消息数量流控实现较为容易,但是不够准确,存在各个消息大小不平均的情况,造成流量控制并不稳定。为此,Pulsar 后来引入了按照消息大小的流控。

引入后的流控代码:

if (conf.isBlockIfQueueFull()) {  if (semaphore.isPresent()) {    semaphore.get().acquire();  }  client.getMemoryLimitController().reserveMemory(payloadSize);} else {  if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));    return false;  }
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) { semaphore.ifPresent(Semaphore::release); callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId)); return false; }}
复制代码

当获取玩 semaphore 后,则会调用 MemoryLimitController 进行流控,其内部统计了当前所有正在发送消息的总大小。通过这种方式,能精确控制发送给 Broker 的最大字节数,达到更好的流控效果。


目前 MemoryLimitController 默认是关闭的,如果想要达到更好的流控效果,可以开启。

发布于: 1 小时前阅读数: 6
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Producer 流控