在 Pulsar 中,Producer 生产的消息进入到队列后,由 Consumer 进行消费,Consumer 使用订阅来记录消费的位置,在消费完消息后,就需要对已消费的消息进行确认,以更新订阅的位置,接着再不断消费不断订阅。本文将介绍 Consumer 确认消息的原理。
确认消息分为两种,一种是正常的确认;另一种是 Negative 确认,同时还分为 IndividualAck 和 CumulativeAck 的情况,本文先讲述 IndividualAck 确认非 batch 消息的原理。
每个 Consumer 内部都有用来追踪 ack 的追踪器 acknowledgmentsGroupingTracker,当用户调用 consumer 的 acknowledge 方法时,就会调用 acknowledgmentsGroupingTracker 的 addAcknowledgment 方法,大部分确认消息逻辑都在其中完成。
首先会检查该消息是否为 batch,batch 消息的逻辑本文先略过。当检测到消息不在一个 Batch 中时,且是 IndividualAck 时,将进入 doIndividualAck 方法。
private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Individual, properties, null);
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doIndividualAckAsync(messageId);
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
doIndividualAckAsync(messageId);
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
复制代码
Consumer 默认不会马上发送 Ack 的命令给 Broker,会在积攒到一定时间或者达到一定大小(MAX_ACK_GROUP_SIZE)后再将所有 ack 统一发送,这个 delay 时间可通过 AcknowledgementsGroupTimeMicros 进行设置。
如果 AcknowledgementsGroupTimeMicros 为 0,则代表不进行统一发送(类似于不开启 batch),那么就马上发送。除此之外,如果 Ack 包含一些特殊的属性,也马上进行 ack。
在 doIndividualAckAsync 方法中,会将当前的 Ack 的 MessageId 添加进 pendingIndividualAcks 中。
在 flush 方法中,会为每个 ack 的消息创建 CommandAck 命令,其结构的主要部分如下:
message CommandAck {
enum AckType {
Individual = 0;
Cumulative = 1;
}
required uint64 consumer_id = 1;
required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids
repeated MessageIdData message_id = 3;
optional ValidationError validation_error = 4;
repeated KeyLongValue properties = 5;
optional uint64 request_id = 8;
}
复制代码
接着 Broker 收到 CommandAck 就开始进行 Ack 的处理,至此,Consumer 对单条非 batch 消息的 ack 的大体过程结束。
评论