本文将介绍 Producer 如何处理 Batch 发送消息的情况已经当开启 Batch 后如何处理重复消息的情况。
Batch Message
Producer 默认开启了 Batch,当消息积攒了一定大小或者累积了一小段时间后,才会触发消息发送,并且会将一个 batch 的消息打包发送给 Broker。因为 Producer 不能同时开启 Batch 和 Chunk,所以这两块逻辑都是分开处理的,本小节只讨论 Producer 开启 Batch 的情况。
当 Producer 接收到 Message 时,并不会马上进行发送,而是将它放入 batchMessageContainer 中。
如果当前的 batch Message container 已满了,就会直接发送当前的 batch 然后再把当前的消息加入到下一个 batch 中,这样可以实现 batch 达到一定消息大小后会触发消息发送的功能。
以下是 batch container 判断空间是否已满的代码:
@Overridepublic boolean haveEnoughSpace(MessageImpl<?> msg) { int messageSize = msg.getDataBuffer().readableBytes(); return ( (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize()) || (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch) ) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);}
复制代码
重复消息检查
在 Pulsar 中还包括消息去重的功能,这为 Batch 处理带来了一些要求。因为在 Broker 的角度来看,一个 Batch 其实就是一条普通的消息,只是这条消息内部包含有许多子消息。为了方便 Broker 进行消息去重,我们需要让重复的消息单独放在一个 batch 中。
那在 Producer 端如何检查消息是否重复呢?这里需要用到 Producer 的两个变量:代表上一个已发送到 Broker 的消息的 sequenceId 的 lastSequenceIdPushed;代表上一个已发送到并被 Broker 确认的消息的 sequenceId 的 lastSequenceIdPublished。在 Producer 中,sequence id 是单调递增的,如果发现当前消息的 sequence id 竟然比上一次发往 Broker 但还未确认的消息的小,那么代表消息可能是重复的,至少是重复发送的,只是 Broker 并不一定重复接收到和持久化;如果发现当前的消息的 sequence id 比上一次被 Broker 确认的消息的小,那么能肯定消息已经是重复了。不过对于这两种情况,都是将当前消息作为独立的 batch 发送来处理。
以下是上面逻辑的代码:
if (canAddToBatch(msg) && totalChunks <= 1) { if (canAddToCurrentBatch(msg)) { // should trigger complete the batch message, new message will add to a new batch and new batch // sequence id use the new message, so that broker can handle the message duplication if (sequenceId <= lastSequenceIdPushed) { isLastSequenceIdPotentialDuplicated = true; if (sequenceId <= lastSequenceIdPublished) { log.warn("Message with sequence id {} is definitely a duplicate", sequenceId); } else { log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.", sequenceId); } doBatchSendAndAdd(msg, callback, payload); } else { // Should flush the last potential duplicated since can't combine potential duplicated messages // and non-duplicated messages into a batch. if (isLastSequenceIdPotentialDuplicated) { doBatchSendAndAdd(msg, callback, payload); } else { // handle boundary cases where message being added would exceed // batch size and/or max message size boolean isBatchFull = batchMessageContainer.add(msg, callback); lastSendFuture = callback.getFuture(); payload.release(); if (isBatchFull) { batchMessageAndSend(); } } isLastSequenceIdPotentialDuplicated = false; } } else { doBatchSendAndAdd(msg, callback, payload); } }
复制代码
通过以上的处理,满足了 Broker 对于重复消息的要求,方便 Broker 端的处理。
未完待续,在后续的文章中我们将讨论这条不平凡的消息被发往 Broker 的过程。
评论