写点什么

[Pulsar] 一个消息的生命历程(二)——Batch 和消息重复处理

作者:Zike Yang
  • 2021 年 11 月 22 日
  • 本文字数:1888 字

    阅读完需:约 6 分钟

本文将介绍 Producer 如何处理 Batch 发送消息的情况已经当开启 Batch 后如何处理重复消息的情况。


Batch Message

Producer 默认开启了 Batch,当消息积攒了一定大小或者累积了一小段时间后,才会触发消息发送,并且会将一个 batch 的消息打包发送给 Broker。因为 Producer 不能同时开启 Batch 和 Chunk,所以这两块逻辑都是分开处理的,本小节只讨论 Producer 开启 Batch 的情况。

当 Producer 接收到 Message 时,并不会马上进行发送,而是将它放入 batchMessageContainer 中。

如果当前的 batch Message container 已满了,就会直接发送当前的 batch 然后再把当前的消息加入到下一个 batch 中,这样可以实现 batch 达到一定消息大小后会触发消息发送的功能。

以下是 batch container 判断空间是否已满的代码:

@Overridepublic boolean haveEnoughSpace(MessageImpl<?> msg) {    int messageSize = msg.getDataBuffer().readableBytes();    return (        (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())        || (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)    ) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);}
复制代码

重复消息检查

在 Pulsar 中还包括消息去重的功能,这为 Batch 处理带来了一些要求。因为在 Broker 的角度来看,一个 Batch 其实就是一条普通的消息,只是这条消息内部包含有许多子消息。为了方便 Broker 进行消息去重,我们需要让重复的消息单独放在一个 batch 中。

那在 Producer 端如何检查消息是否重复呢?这里需要用到 Producer 的两个变量:代表上一个已发送到 Broker 的消息的 sequenceId 的 lastSequenceIdPushed;代表上一个已发送到并被 Broker 确认的消息的 sequenceId 的 lastSequenceIdPublished。在 Producer 中,sequence id 是单调递增的,如果发现当前消息的 sequence id 竟然比上一次发往 Broker 但还未确认的消息的小,那么代表消息可能是重复的,至少是重复发送的,只是 Broker 并不一定重复接收到和持久化;如果发现当前的消息的 sequence id 比上一次被 Broker 确认的消息的小,那么能肯定消息已经是重复了。不过对于这两种情况,都是将当前消息作为独立的 batch 发送来处理。

以下是上面逻辑的代码:

    if (canAddToBatch(msg) && totalChunks <= 1) {        if (canAddToCurrentBatch(msg)) {            // should trigger complete the batch message, new message will add to a new batch and new batch            // sequence id use the new message, so that broker can handle the message duplication            if (sequenceId <= lastSequenceIdPushed) {                isLastSequenceIdPotentialDuplicated = true;                if (sequenceId <= lastSequenceIdPublished) {                    log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);                } else {                    log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",                        sequenceId);                }                doBatchSendAndAdd(msg, callback, payload);            } else {                // Should flush the last potential duplicated since can't combine potential duplicated messages                // and non-duplicated messages into a batch.                if (isLastSequenceIdPotentialDuplicated) {                    doBatchSendAndAdd(msg, callback, payload);                } else {                    // handle boundary cases where message being added would exceed                    // batch size and/or max message size                    boolean isBatchFull = batchMessageContainer.add(msg, callback);                    lastSendFuture = callback.getFuture();                    payload.release();                    if (isBatchFull) {                        batchMessageAndSend();                    }                }                isLastSequenceIdPotentialDuplicated = false;            }        } else {            doBatchSendAndAdd(msg, callback, payload);        }    } 
复制代码

通过以上的处理,满足了 Broker 对于重复消息的要求,方便 Broker 端的处理。


未完待续,在后续的文章中我们将讨论这条不平凡的消息被发往 Broker 的过程。

发布于: 9 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 一个消息的生命历程(二)——Batch和消息重复处理