写点什么

[Pulsar] Broker 处理消息确认

作者:Zike Yang
  • 2021 年 12 月 13 日
  • 本文字数:2403 字

    阅读完需:约 8 分钟

本文介绍 Broker 如何处理从 Consumer 发送过来的消息确认命名。


首先我们需要看一下 Consumer 发送给 Broker 的 CommandAck 的结构:

message CommandAck {    enum AckType {        Individual = 0;        Cumulative = 1;    }
required uint64 consumer_id = 1; required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids repeated MessageIdData message_id = 3;
// Acks can contain a flag to indicate the consumer // received an invalid message that got discarded // before being passed on to the application. enum ValidationError { UncompressedSizeCorruption = 0; DecompressionError = 1; ChecksumMismatch = 2; BatchDeSerializeError = 3; DecryptionError = 4; }
optional ValidationError validation_error = 4; repeated KeyLongValue properties = 5;
optional uint64 txnid_least_bits = 6 [default = 0]; optional uint64 txnid_most_bits = 7 [default = 0]; optional uint64 request_id = 8;}
复制代码

主要字段包括 AckType,用来指定当前的 Ack 类型;message_id 用来指定确认的消息的位置(如果是 CumulativeAck 则是确认的最后一条消息的位置),其中 message_id 中还包括 ack_sets,主要用于 chunk message 的中。


Broker 在收到 CommandAck 后,会调用相应的 subscription 的 ack 方法,在其中会调用 ManagedCursor 的 asyncDelete 方法,具体的 ack 逻辑都在其中。

在 Broker 中,使用 LongPairRangeSet 结构来维护 ack 的 range 关系。

if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())                    || position.compareTo(markDeletePosition) <= 0) {                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {                        BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);                        if (bitSetRecyclable != null) {                            bitSetRecyclable.recycle();                        }                    }                    if (log.isDebugEnabled()) {                        log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);                    }                    continue;                }
复制代码

如上,individualDeletedMessages 就是 LongPairRangeSet 结构,position 指代当前处理的消息的位置,首先会判断当前消息是否在 range 中,即代表该消息已经被 ack 了。

对于非 chunk 的 message,接着会进行如下的操作:

PositionImpl previousPosition = ledger.getPreviousPosition(position);individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),                                        position.getLedgerId(), position.getEntryId());
复制代码

主要就是将当前消息添加都 range set 中。而对于 batch message,则进行如下的操作:

BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);bitSet.and(givenBitSet);givenBitSet.recycle();if (bitSet.isEmpty()) {  PositionImpl previousPosition = ledger.getPreviousPosition(position);  individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),                                          position.getLedgerId(), position.getEntryId());  MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);  BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);  if (bitSetRecyclable != null) {    bitSetRecyclable.recycle();  }}
复制代码

不同的是,会首先设置当前消息的 bitSet,Consumer 传过来的 ack_sets 代表没有被 ack 的 batch,会对 Broker 所存的 ack_sets 取交集,这时就能过滤出剩下的没有被 ack 的 batch 了。只有当 bitSet 为空时,则代表该 batchMessage 下的所有 batch 都已经被 ack 了,那么就可以像普通消息那样将其添加到 range set 中了。

Range<PositionImpl> range = individualDeletedMessages.firstRange();
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-betweenif (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
if (log.isDebugEnabled()) { log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(), name, range); }
newMarkDeletePosition = range.upperEndpoint(); }
if (newMarkDeletePosition != null) { newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition); } else { newMarkDeletePosition = markDeletePosition; }
复制代码

接下来的工作就是获取 range set 中的第一个 range,第一个 range 的最后的位置代表的就是我们希望订阅跳转到的位置,也就是 newMarkDeletePosition,最后我们让当前的 cursor(也就是 subscription)循环地移动到 newMarkDeletePosition,即可完成当前的 ack 操作。

发布于: 1 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Broker处理消息确认