写点什么

[Pulsar] Cumulative Ack 原理

作者:Zike Yang
  • 2021 年 12 月 12 日
  • 本文字数:1149 字

    阅读完需:约 4 分钟

在我们使用 Consumer 的过程中,有时候我们并不希望一条一条地发送对每条消息的确认命令,这时我们可以使用 Cumulative Ack,对当前消息及以前的消息都进行确认。本文将介绍 Cumulative Ack 在客户端的实现原理。


Cumulative Ack 大体的实现逻辑和 Individual Ack 相同,只是在发送 Ack 命令的时候,只需要发送最后一个消息的 Ack 即可。如下是 cumulative ack 的主要逻辑:

private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {    // Handle concurrent updates from different threads    LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);    while (true) {        LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;        if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {            if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {                if (lastCumulativeAck.bitSetRecyclable != null) {                    try {                        lastCumulativeAck.bitSetRecyclable.recycle();                    } catch (Exception ignore) {                        // no-op                    }                    lastCumulativeAck.bitSetRecyclable = null;                }                lastCumulativeAck.recycle();                // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.                cumulativeAckFlushRequired = true;                return;            }        } else {            currentCumulativeAck.recycle();            // message id acknowledging an before the current last cumulative ack            return;        }    }}
复制代码

在 PersistentAcknowledgmentsGroupingTracker 中会维护一个变量 lastCumulativeAck,用来保存用户所需要 Ack 的消息中,messageId 最大的一条消息的 ack 信息,其中包括 messageId 和 bitset。

所以在进行 cumulativeAck 时,则判断当前消息的 messageId 是否比 lastCumulativeAck 大,是的话就进行替换,如果不是则代表当前消息已经被调用过 ack 了(可能已经发往 broker,也可能还没有),那么就不需要做改变。

consumer 同样会批量发送 ack 命令,如果需要 flush 的时候,就会用 lastCumulativeAck 中的 messageId 和 bitset 发送 ack 命令。


cumulativeAck 逻辑上大体和 individual ack 相同,不同的就是只维护了最后一个 ack 的 messageid,且如果在 lastCumulativeAck 之前的消息没有被调用 ack,在给 broker 发送 ack 命令时也会被 ack。cumulativeAck 可以大大减少所需要的 ack 操作,适合一些业务使用。

发布于: 4 小时前阅读数: 7
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Cumulative Ack原理