写点什么

[Pulsar] Consumer 确认消息原理

作者:Zike Yang
  • 2021 年 12 月 09 日
  • 本文字数:1644 字

    阅读完需:约 5 分钟

在 Pulsar 中,Producer 生产的消息进入到队列后,由 Consumer 进行消费,Consumer 使用订阅来记录消费的位置,在消费完消息后,就需要对已消费的消息进行确认,以更新订阅的位置,接着再不断消费不断订阅。本文将介绍 Consumer 确认消息的原理。


确认消息分为两种,一种是正常的确认;另一种是 Negative 确认,同时还分为 IndividualAck 和 CumulativeAck 的情况,本文先讲述 IndividualAck 确认非 batch 消息的原理。


每个 Consumer 内部都有用来追踪 ack 的追踪器 acknowledgmentsGroupingTracker,当用户调用 consumer 的 acknowledge 方法时,就会调用 acknowledgmentsGroupingTracker 的 addAcknowledgment 方法,大部分确认消息逻辑都在其中完成。

首先会检查该消息是否为 batch,batch 消息的逻辑本文先略过。当检测到消息不在一个 Batch 中时,且是 IndividualAck 时,将进入 doIndividualAck 方法。

    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an            // uncommon condition since it's only used for the compaction subscription.            return doImmediateAck(messageId, AckType.Individual, properties, null);        } else {            if (isAckReceiptEnabled(consumer.getClientCnx())) {                // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't                // change currentFuture. but we can lock by the read lock, because the currentFuture is not change                // any ack operation is allowed.                this.lock.readLock().lock();                try {                    doIndividualAckAsync(messageId);                    return this.currentIndividualAckFuture;                } finally {                    this.lock.readLock().unlock();                    if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {                        flush();                    }                }            } else {                doIndividualAckAsync(messageId);                if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {                    flush();                }                return CompletableFuture.completedFuture(null);            }        }    }
复制代码

Consumer 默认不会马上发送 Ack 的命令给 Broker,会在积攒到一定时间或者达到一定大小(MAX_ACK_GROUP_SIZE)后再将所有 ack 统一发送,这个 delay 时间可通过 AcknowledgementsGroupTimeMicros 进行设置。

如果 AcknowledgementsGroupTimeMicros 为 0,则代表不进行统一发送(类似于不开启 batch),那么就马上发送。除此之外,如果 Ack 包含一些特殊的属性,也马上进行 ack。

在 doIndividualAckAsync 方法中,会将当前的 Ack 的 MessageId 添加进 pendingIndividualAcks 中。


在 flush 方法中,会为每个 ack 的消息创建 CommandAck 命令,其结构的主要部分如下:

message CommandAck {    enum AckType {        Individual = 0;        Cumulative = 1;    }
required uint64 consumer_id = 1; required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids repeated MessageIdData message_id = 3;
optional ValidationError validation_error = 4; repeated KeyLongValue properties = 5; optional uint64 request_id = 8;}
复制代码


接着 Broker 收到 CommandAck 就开始进行 Ack 的处理,至此,Consumer 对单条非 batch 消息的 ack 的大体过程结束。

发布于: 1 小时前阅读数: 6
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Consumer 确认消息原理