本文将介绍 Producer 如何处理 Batch 发送消息的情况已经当开启 Batch 后如何处理重复消息的情况。
Batch Message
Producer 默认开启了 Batch,当消息积攒了一定大小或者累积了一小段时间后,才会触发消息发送,并且会将一个 batch 的消息打包发送给 Broker。因为 Producer 不能同时开启 Batch 和 Chunk,所以这两块逻辑都是分开处理的,本小节只讨论 Producer 开启 Batch 的情况。
当 Producer 接收到 Message 时,并不会马上进行发送,而是将它放入 batchMessageContainer 中。
如果当前的 batch Message container 已满了,就会直接发送当前的 batch 然后再把当前的消息加入到下一个 batch 中,这样可以实现 batch 达到一定消息大小后会触发消息发送的功能。
以下是 batch container 判断空间是否已满的代码:
@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
int messageSize = msg.getDataBuffer().readableBytes();
return (
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())
|| (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)
) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);
}
复制代码
重复消息检查
在 Pulsar 中还包括消息去重的功能,这为 Batch 处理带来了一些要求。因为在 Broker 的角度来看,一个 Batch 其实就是一条普通的消息,只是这条消息内部包含有许多子消息。为了方便 Broker 进行消息去重,我们需要让重复的消息单独放在一个 batch 中。
那在 Producer 端如何检查消息是否重复呢?这里需要用到 Producer 的两个变量:代表上一个已发送到 Broker 的消息的 sequenceId 的 lastSequenceIdPushed;代表上一个已发送到并被 Broker 确认的消息的 sequenceId 的 lastSequenceIdPublished。在 Producer 中,sequence id 是单调递增的,如果发现当前消息的 sequence id 竟然比上一次发往 Broker 但还未确认的消息的小,那么代表消息可能是重复的,至少是重复发送的,只是 Broker 并不一定重复接收到和持久化;如果发现当前的消息的 sequence id 比上一次被 Broker 确认的消息的小,那么能肯定消息已经是重复了。不过对于这两种情况,都是将当前消息作为独立的 batch 发送来处理。
以下是上面逻辑的代码:
if (canAddToBatch(msg) && totalChunks <= 1) {
if (canAddToCurrentBatch(msg)) {
// should trigger complete the batch message, new message will add to a new batch and new batch
// sequence id use the new message, so that broker can handle the message duplication
if (sequenceId <= lastSequenceIdPushed) {
isLastSequenceIdPotentialDuplicated = true;
if (sequenceId <= lastSequenceIdPublished) {
log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
} else {
log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",
sequenceId);
}
doBatchSendAndAdd(msg, callback, payload);
} else {
// Should flush the last potential duplicated since can't combine potential duplicated messages
// and non-duplicated messages into a batch.
if (isLastSequenceIdPotentialDuplicated) {
doBatchSendAndAdd(msg, callback, payload);
} else {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend();
}
}
isLastSequenceIdPotentialDuplicated = false;
}
} else {
doBatchSendAndAdd(msg, callback, payload);
}
}
复制代码
通过以上的处理,满足了 Broker 对于重复消息的要求,方便 Broker 端的处理。
未完待续,在后续的文章中我们将讨论这条不平凡的消息被发往 Broker 的过程。
评论