我们在使用 Pulsar Producer 的时候,可能需要对发送给 Broker 的流量进行控制,目前 Pulsar Client 主要提供两种流控方式:按照消息数量进行流控和按照消息大小进行流控,这两种方式可以共同使用。本文讲述这两种流控方式的实现原理。
按照消息数量流控
这是 Pulsar 最早引入的 Producer 流控方案,目的是控制正在发往 Broker 的消息数量。主要通过 ProducerImpl 的 semaphore 进行实现。
初始化 semaphore 的代码如下:
if (conf.getMaxPendingMessages() > 0) {
this.semaphore = Optional.of(new Semaphore(conf.getMaxPendingMessages(), true));
} else {
this.semaphore = Optional.empty();
}
复制代码
semaphore 是信号量会被设置为 maxPedingMessages,表示同时发送消息的最大数量。
if (conf.isBlockIfQueueFull()) {
if (semaphore.isPresent()) {
semaphore.get().acquire();
}
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));
return false;
}
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId));
return false;
}
}
复制代码
如上所示,当 Prodcuer 要发送消息前,就会走上面的逻辑。
Producer 带有 isBlockIfQueueFull 设置,这代表是否在当前队列已满时(即触发流控时),阻塞当前线程。
首先会获取 semaphore,如果 isBlockIfQueueFull 开启,获取时如果当前队列已满,则等待知道当前队列有空位;否则,当队列已满,则返回给用户错误信息。
按照消息大小的流控
按照消息数量流控实现较为容易,但是不够准确,存在各个消息大小不平均的情况,造成流量控制并不稳定。为此,Pulsar 后来引入了按照消息大小的流控。
引入后的流控代码:
if (conf.isBlockIfQueueFull()) {
if (semaphore.isPresent()) {
semaphore.get().acquire();
}
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));
return false;
}
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId));
return false;
}
}
复制代码
当获取玩 semaphore 后,则会调用 MemoryLimitController 进行流控,其内部统计了当前所有正在发送消息的总大小。通过这种方式,能精确控制发送给 Broker 的最大字节数,达到更好的流控效果。
目前 MemoryLimitController 默认是关闭的,如果想要达到更好的流控效果,可以开启。
评论