写点什么

[Pulsar] Batch Messge 的基本原理

作者:Zike Yang
  • 2021 年 11 月 28 日
  • 本文字数:2646 字

    阅读完需:约 9 分钟

在 Pulsar 中,为了提升消息的吞吐率,增加带宽利用率和存储利用率,我们可以将多条消息整合成一条消息合并发送到 Broker 中。Pulsar 提供了 Batch Message 的功能,可以将多条消息整合成一条消息进行处理,本文将介绍 Batch Message 的基本原理。


Batch Message Container

Batch Message 的管理、处理和整合主要是在 BatchMessageContainer 中进行。当用户调用 Producer 的 send 方法后,并且在开启 BatchEnabled(默认开启)并进行了各项检查后,检查当前消息是否可以加入到当前的 batch,一系列检查通过后,会调用 BatchMessageContainer 的 add 方法,将当前的消息加入到当前的 batch 中。

因为整个 Batch 的 Messages 共享的是同一个 MessageMetadata,所以在添加消息的时候,会设置 MessageMetadata。

@Overridepublic boolean add(MessageImpl<?> msg, SendCallback callback) {    if (++numMessagesInBatch == 1) {        try {            // some properties are common amongst the different messages in the batch, hence we just pick it up from            // the first message            messageMetadata.setSequenceId(msg.getSequenceId());            lowestSequenceId = Commands.initBatchMessageMetadata(messageMetadata, msg.getMessageBuilder());            this.firstCallback = callback;            batchedMessageMetadataAndPayload = PulsarByteBufAllocator.DEFAULT                    .buffer(Math.min(maxBatchSize, ClientCnx.getMaxMessageSize()));            if (msg.getMessageBuilder().hasTxnidMostBits() && currentTxnidMostBits == -1) {                currentTxnidMostBits = msg.getMessageBuilder().getTxnidMostBits();            }            if (msg.getMessageBuilder().hasTxnidLeastBits() && currentTxnidLeastBits == -1) {                currentTxnidLeastBits = msg.getMessageBuilder().getTxnidLeastBits();            }        } catch (Throwable e) {            log.error("construct first message failed, exception is ", e);            if (batchedMessageMetadataAndPayload != null) {                // if payload has been allocated release it                batchedMessageMetadataAndPayload.release();            }            discard(new PulsarClientException(e));            return false;        }    }
if (previousCallback != null) { previousCallback.addCallback(msg, callback); } previousCallback = callback; currentBatchSizeBytes += msg.getDataBuffer().readableBytes(); messages.add(msg);
if (lowestSequenceId == -1L) { lowestSequenceId = msg.getSequenceId(); messageMetadata.setSequenceId(lowestSequenceId); } highestSequenceId = msg.getSequenceId(); ProducerImpl.LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(producer, prev -> Math.max(prev, msg.getSequenceId()));
return isBatchFull();}
复制代码

如上面的代码,在添加第一条消息的时候,会设置一些 MessageMetadata 的共有字段。

当当前的 Batch 已满时,会告知 producer,producer 在添加消息后发现 batch 已满会触发一次 batch 发送


Batch 发送

Producer 在触发 Batch 发送时,会调用 batchContainer 的 createOpSendMsg 方法,在 createOpSendMsg 中,batch container 会生成一个 OpSendMsg 对象,并为其设置 MessageMetadata 和子消息,也就是消息整合的过程。

@Overridepublic OpSendMsg createOpSendMsg() throws IOException {    ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());    if (encryptedPayload.readableBytes() > ClientCnx.getMaxMessageSize()) {        discard(new PulsarClientException.InvalidMessageException(                "Message size is bigger than " + ClientCnx.getMaxMessageSize() + " bytes"));        return null;    }    messageMetadata.setNumMessagesInBatch(numMessagesInBatch);    messageMetadata.setHighestSequenceId(highestSequenceId);    if (currentTxnidMostBits != -1) {        messageMetadata.setTxnidMostBits(currentTxnidMostBits);    }    if (currentTxnidLeastBits != -1) {        messageMetadata.setTxnidLeastBits(currentTxnidLeastBits);    }    ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),            messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload);
OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(), messageMetadata.getHighestSequenceId(), firstCallback);
op.setNumMessagesInBatch(numMessagesInBatch); op.setBatchSizeByte(currentBatchSizeBytes); lowestSequenceId = -1L; return op;}
复制代码

这样在 Proudcer 看来,可以将一整个 batch 的消息整合成一个 OpSendMsg,然后像发送单个普通消息一样发送 batch 消息。

Batch Timer Task

在 Pulsar 的 Batch 中,满足触发 batch 发送的条件除了达到一定大小后发送,还有达到一定时间后就触发发送,这是通过 Producer 中的 batchTimerTask 来实现的。

在 producer 初始化 connection 的时候,会对 batchTimerTask 进行设置。

batchTimerTask = cnx.ctx().executor()  .scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {    synchronized (ProducerImpl.this) {      if (getState() == State.Closing || getState() == State.Closed) {        return;      }
batchMessageAndSend(); } }), 0, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
复制代码

达到所设置的间隔时间后,会调用 batchMessageAndSend 方法,这时就会调用 batchContainer 生成 OpSendMsg,然后发送消息。


本文介绍了 BatchMessageContainer 和 BatchTimerTask,并介绍了两种触发 batch 发送的条件:达到一定大小或达到一定时间。但本文所涉及的仍然是 Batch Message 的基本原理,像 BatchMessageContainer 还有针对 OrderingKey 的另一个实现:BatchMessageKeyBasedContainer,将在后续的文章中进行介绍。

发布于: 4 小时前阅读数: 8
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Batch Messge的基本原理