写点什么

[Pulsar] Batch message 的确认

作者:Zike Yang
  • 2021 年 12 月 10 日
  • 本文字数:1655 字

    阅读完需:约 5 分钟

Pulsar 对于 Batch Message 的确认不能简单地像普通的 Individual message 一样进行确认,因为一个 batch 内部往往会有许多消息,如果像普通的确认处理方法,会导致对于 batch 的某个消息的确认变成整个 batch 的确认,所以,我们需要额外的逻辑来处理 batch message 的确认。


AckSet

我们在对 batch message 确认的时候,需要维护 batch 中每条消息的确认状态信息。在给 Broker 发送 Batch message 的 ack 命令时,我们需要在当前的 messageId 中指定具体需要 ack 哪些子消息。我们可以先看看 MessageIdData 的 proto 定义:

message MessageIdData {    required uint64 ledgerId = 1;    required uint64 entryId  = 2;    optional int32 partition = 3 [default = -1];    optional int32 batch_index = 4 [default = -1];    repeated int64 ack_set = 5;    optional int32 batch_size = 6;}
复制代码

MessageIdData 是 Cosumer 在 Ack 时需要传递给 Broker 的数据,其中 ledgerId 和 entryId 共同确认了当前整个 batch message 在 bookie 中的位置,而我们需要额外借助一个 int 数组 ack_set,来指明当前 batch 中还有哪些未被确认。

在实现中,Pulsar 使用 ConcurrentBitSet 结构,在 java 原有的 BitSet 的数据结构的基础上,添加了对多线程的支持。


Batch 消息确认流程

接下来我们讲解具体如何确认 Batch 消息。

consumer.onAcknowledge(msgId, null);// ack this ack carry bitSet index and judge bit set are all ackif (batchMessageId.ackIndividual()) {  MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);  return doIndividualAck(messageId, properties);} else if (batchIndexAckEnabled){  return doIndividualBatchAck(batchMessageId, properties);} else {  // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are  // all ack complete  return CompletableFuture.completedFuture(null);}
复制代码

在 consumer 判断当前消息为 batch message 后,会调用 batchMessageId 的 ackIndividual 方法,在每个 batchMessageId 中同时也维护了 BatchMessageAcker(也是 BitSet 的结构),当 Consumer 没有开启 batchIndexAck 时,这时 consumer 只有在 batch 中所有的消息都 ack 了才会发送 ack 的命令。

public synchronized boolean ackIndividual(int batchIndex) {  bitSet.clear(batchIndex);  return bitSet.isEmpty();}
复制代码

同时会判断当前的 batch 是否已经确认完了,如果确认完了,就按照普通消息的 ack 流程进行确认。如果没有,则检查是否开启了 batchIndex,如果没开启就不需要做任何操作(不需要发送 ack 命令),开启了则需要发送针对子消息的 ack 命令(默认不是立马发送,而是会经过一次 batch 流程)。

private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {  ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(    new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),                      batchMessageId.getPartitionIndex()), (v) -> {      ConcurrentBitSetRecyclable value;      if (batchMessageId.getAcker() != null &&          !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {        value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());      } else {        value = ConcurrentBitSetRecyclable.create();        value.set(0, batchMessageId.getBatchIndex());      }      return value;    });  bitSet.clear(batchMessageId.getBatchIndex());}
复制代码

这时会用到 ConcurrentBitSetRecyclable(ConcurrentBitSet 的拓展)结构,并对对应 batchIndex 的消息进行 clear,原理同上。


最后,consumer 会在发送 ack 命令给 broker 的时候,计算出当前 batch 中的消息有哪些没有 ack,并在 MessageIdData 中进行设置,Broker 就能知道这些信息了,在 Client 端的 batch message ack 流程就完成了。

发布于: 2 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Batch message的确认