在 Pulsar 中,为了提升消息的吞吐率,增加带宽利用率和存储利用率,我们可以将多条消息整合成一条消息合并发送到 Broker 中。Pulsar 提供了 Batch Message 的功能,可以将多条消息整合成一条消息进行处理,本文将介绍 Batch Message 的基本原理。
Batch Message Container
Batch Message 的管理、处理和整合主要是在 BatchMessageContainer 中进行。当用户调用 Producer 的 send 方法后,并且在开启 BatchEnabled(默认开启)并进行了各项检查后,检查当前消息是否可以加入到当前的 batch,一系列检查通过后,会调用 BatchMessageContainer 的 add 方法,将当前的消息加入到当前的 batch 中。
因为整个 Batch 的 Messages 共享的是同一个 MessageMetadata,所以在添加消息的时候,会设置 MessageMetadata。
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (++numMessagesInBatch == 1) {
try {
// some properties are common amongst the different messages in the batch, hence we just pick it up from
// the first message
messageMetadata.setSequenceId(msg.getSequenceId());
lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());
this.firstCallback = callback;
batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT
.buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));
if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {
currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();
}
if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {
currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();
}
} catch (Throwable e) {
log.error("construct first message failed, exception is ", e);
if (batchedMessageMetadataAndPayload != null) {
// if payload has been allocated release it
batchedMessageMetadataAndPayload.release();
}
discard(new PulsarClientException(e));
return false;
}
}
if (previousCallback != null) {
previousCallback.addCallback(msg, callback);
}
previousCallback = callback;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();
messages.add(msg);
if (lowestSequenceId == -1L) {
lowestSequenceId = msg.getSequenceId();
messageMetadata.setSequenceId(lowestSequenceId);
}
highestSequenceId = msg.getSequenceId();
ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
return isBatchFull();
}
复制代码
如上面的代码,在添加第一条消息的时候,会设置一些 MessageMetadata 的共有字段。
当当前的 Batch 已满时,会告知 producer,producer 在添加消息后发现 batch 已满会触发一次 batch 发送
Batch 发送
Producer 在触发 Batch 发送时,会调用 batchContainer 的 createOpSendMsg 方法,在 createOpSendMsg 中,batch container 会生成一个 OpSendMsg 对象,并为其设置 MessageMetadata 和子消息,也就是消息整合的过程。
@Override
public OpSendMsg createOpSendMsg() throws IOException {
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {
discard(new PulsarClientException.InvalidMessageException(
"Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));
return null;
}
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
messageMetadata.setHighestSequenceId(highestSequenceId);
if (currentTxnidMostBits != -1) {
messageMetadata.setTxnidMostBits(currentTxnidMostBits);
}
if (currentTxnidLeastBits != -1) {
messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);
}
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload);
OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);
op.setNumMessagesInBatch(numMessagesInBatch);
op.setBatchSizeByte(currentBatchSizeBytes);
lowestSequenceId = -1L;
return op;
}
复制代码
这样在 Proudcer 看来,可以将一整个 batch 的消息整合成一个 OpSendMsg,然后像发送单个普通消息一样发送 batch 消息。
Batch Timer Task
在 Pulsar 的 Batch 中,满足触发 batch 发送的条件除了达到一定大小后发送,还有达到一定时间后就触发发送,这是通过 Producer 中的 batchTimerTask 来实现的。
在 producer 初始化 connection 的时候,会对 batchTimerTask 进行设置。
batchTimerTask = cnx.ctx().executor()
.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
synchronized (ProducerImpl.this) {
if (getState() == State.Closing || getState() == State.Closed) {
return;
}
batchMessageAndSend();
}
}), 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
复制代码
达到所设置的间隔时间后,会调用 batchMessageAndSend 方法,这时就会调用 batchContainer 生成 OpSendMsg,然后发送消息。
本文介绍了 BatchMessageContainer 和 BatchTimerTask,并介绍了两种触发 batch 发送的条件:达到一定大小或达到一定时间。但本文所涉及的仍然是 Batch Message 的基本原理,像 BatchMessageContainer 还有针对 OrderingKey 的另一个实现:BatchMessageKeyBasedContainer,将在后续的文章中进行介绍。
评论