写点什么

[Pulsar] Acknowledgement 原理

作者:Zike Yang
  • 2021 年 12 月 15 日
  • 本文字数:4820 字

    阅读完需:约 16 分钟

在 Pulsar 中,Producer 生产的消息进入到队列后,由 Consumer 进行消费,Consumer 使用订阅来记录消费的位置,在消费完消息后,就需要对已消费的消息进行确认,以更新订阅的位置,接着再不断消费不断订阅。本文将介绍 Pulsar 中 Ack 消息的原理。


确认消息分为两种,一种是正常的确认;另一种是 Negative 确认,同时还分为 IndividualAck 和 CumulativeAck 的情况。

Consumer Individual Ack

每个 Consumer 内部都有用来追踪 ack 的追踪器 acknowledgmentsGroupingTracker,当用户调用 consumer 的 acknowledge 方法时,就会调用 acknowledgmentsGroupingTracker 的 addAcknowledgment 方法,大部分确认消息逻辑都在其中完成。

首先会检查该消息是否为 batch,batch 消息的逻辑本文先略过。当检测到消息不在一个 Batch 中时,且是 IndividualAck 时,将进入 doIndividualAck 方法。

    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an            // uncommon condition since it's only used for the compaction subscription.            return doImmediateAck(messageId, AckType.Individual, properties, null);        } else {            if (isAckReceiptEnabled(consumer.getClientCnx())) {                // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't                // change currentFuture. but we can lock by the read lock, because the currentFuture is not change                // any ack operation is allowed.                this.lock.readLock().lock();                try {                    doIndividualAckAsync(messageId);                    return this.currentIndividualAckFuture;                } finally {                    this.lock.readLock().unlock();                    if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {                        flush();                    }                }            } else {                doIndividualAckAsync(messageId);                if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {                    flush();                }                return CompletableFuture.completedFuture(null);            }        }    }
复制代码

Consumer 默认不会马上发送 Ack 的命令给 Broker,会在积攒到一定时间或者达到一定大小(MAX_ACK_GROUP_SIZE)后再将所有 ack 统一发送,这个 delay 时间可通过 AcknowledgementsGroupTimeMicros 进行设置。

如果 AcknowledgementsGroupTimeMicros 为 0,则代表不进行统一发送(类似于不开启 batch),那么就马上发送。除此之外,如果 Ack 包含一些特殊的属性,也马上进行 ack。

在 doIndividualAckAsync 方法中,会将当前的 Ack 的 MessageId 添加进 pendingIndividualAcks 中。


在 flush 方法中,会为每个 ack 的消息创建 CommandAck 命令,其结构的主要部分如下:

message CommandAck {    enum AckType {        Individual = 0;        Cumulative = 1;    }
required uint64 consumer_id = 1; required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids repeated MessageIdData message_id = 3;
optional ValidationError validation_error = 4; repeated KeyLongValue properties = 5; optional uint64 request_id = 8;}
复制代码

接着 Broker 收到 CommandAck 就开始进行 Ack 的处理。


Consumer Cumulative Ack

在我们使用 Consumer 的过程中,有时候我们并不希望一条一条地发送对每条消息的确认命令,这时我们可以使用 Cumulative Ack,对当前消息及以前的消息都进行确认。Cumulative Ack 大体的实现逻辑和 Individual Ack 相同,只是在发送 Ack 命令的时候,只需要发送最后一个消息的 Ack 即可。如下是 cumulative ack 的主要逻辑:

private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {    // Handle concurrent updates from different threads    LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);    while (true) {        LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;        if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {            if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {                if (lastCumulativeAck.bitSetRecyclable != null) {                    try {                        lastCumulativeAck.bitSetRecyclable.recycle();                    } catch (Exception ignore) {                        // no-op                    }                    lastCumulativeAck.bitSetRecyclable = null;                }                lastCumulativeAck.recycle();                // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.                cumulativeAckFlushRequired = true;                return;            }        } else {            currentCumulativeAck.recycle();            // message id acknowledging an before the current last cumulative ack            return;        }    }}
复制代码

在 PersistentAcknowledgmentsGroupingTracker 中会维护一个变量 lastCumulativeAck,用来保存用户所需要 Ack 的消息中,messageId 最大的一条消息的 ack 信息,其中包括 messageId 和 bitset。

所以在进行 cumulativeAck 时,则判断当前消息的 messageId 是否比 lastCumulativeAck 大,是的话就进行替换,如果不是则代表当前消息已经被调用过 ack 了(可能已经发往 broker,也可能还没有),那么就不需要做改变。

consumer 同样会批量发送 ack 命令,如果需要 flush 的时候,就会用 lastCumulativeAck 中的 messageId 和 bitset 发送 ack 命令。


cumulativeAck 逻辑上大体和 individual ack 相同,不同的就是只维护了最后一个 ack 的 messageid,且如果在 lastCumulativeAck 之前的消息没有被调用 ack,在给 broker 发送 ack 命令时也会被 ack。cumulativeAck 可以大大减少所需要的 ack 操作,适合一些业务使用。

Broker 处理消息确认

在 Consumer 发送给 Broker 的 CommandAck 的结构中,主要字段包括 AckType,用来指定当前的 Ack 类型;message_id 用来指定确认的消息的位置(如果是 CumulativeAck 则是确认的最后一条消息的位置),其中 message_id 中还包括 ack_sets,主要用于 chunk message 的中。


Broker 在收到 CommandAck 后,会调用相应的 subscription 的 ack 方法,在其中会调用 ManagedCursor 的 asyncDelete 方法,具体的 ack 逻辑都在其中。

在 Broker 中,使用 LongPairRangeSet 结构来维护 ack 的 range 关系。


if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())                    || position.compareTo(markDeletePosition) <= 0) {                    if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {                        BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);                        if (bitSetRecyclable != null) {                            bitSetRecyclable.recycle();                        }                    }                    if (log.isDebugEnabled()) {                        log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);                    }                    continue;                }
复制代码

如上,individualDeletedMessages 就是 LongPairRangeSet 结构,position 指代当前处理的消息的位置,首先会判断当前消息是否在 range 中,即代表该消息已经被 ack 了。

对于非 chunk 的 message,接着会进行如下的操作:

PositionImpl previousPosition = ledger.getPreviousPosition(position);individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),                                        position.getLedgerId(), position.getEntryId());
复制代码

主要就是将当前消息添加都 range set 中。而对于 batch message,则进行如下的操作:

BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);bitSet.and(givenBitSet);givenBitSet.recycle();if (bitSet.isEmpty()) {  PositionImpl previousPosition = ledger.getPreviousPosition(position);  individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),                                          position.getLedgerId(), position.getEntryId());  MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);  BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);  if (bitSetRecyclable != null) {    bitSetRecyclable.recycle();  }}
复制代码

不同的是,会首先设置当前消息的 bitSet,Consumer 传过来的 ack_sets 代表没有被 ack 的 batch,会对 Broker 所存的 ack_sets 取交集,这时就能过滤出剩下的没有被 ack 的 batch 了。只有当 bitSet 为空时,则代表该 batchMessage 下的所有 batch 都已经被 ack 了,那么就可以像普通消息那样将其添加到 range set 中了。

Range<PositionImpl> range = individualDeletedMessages.firstRange();
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-betweenif (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
if (log.isDebugEnabled()) { log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(), name, range); }
newMarkDeletePosition = range.upperEndpoint(); }
if (newMarkDeletePosition != null) { newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition); } else { newMarkDeletePosition = markDeletePosition; }
复制代码

接下来的工作就是获取 range set 中的第一个 range,第一个 range 的最后的位置代表的就是我们希望订阅跳转到的位置,也就是 newMarkDeletePosition,最后我们让当前的 cursor(也就是 subscription)循环地移动到 newMarkDeletePosition,即可完成当前的 ack 操作。


本文讲解了从 Consumer 到 Broker 的消息确认过程包括 individual ack 和 cumulative ack。

发布于: 2 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Acknowledgement原理