在 Pulsar 中,Producer 生产的消息进入到队列后,由 Consumer 进行消费,Consumer 使用订阅来记录消费的位置,在消费完消息后,就需要对已消费的消息进行确认,以更新订阅的位置,接着再不断消费不断订阅。本文将介绍 Pulsar 中 Ack 消息的原理。
确认消息分为两种,一种是正常的确认;另一种是 Negative 确认,同时还分为 IndividualAck 和 CumulativeAck 的情况。
Consumer Individual Ack
每个 Consumer 内部都有用来追踪 ack 的追踪器 acknowledgmentsGroupingTracker,当用户调用 consumer 的 acknowledge 方法时,就会调用 acknowledgmentsGroupingTracker 的 addAcknowledgment 方法,大部分确认消息逻辑都在其中完成。
首先会检查该消息是否为 batch,batch 消息的逻辑本文先略过。当检测到消息不在一个 Batch 中时,且是 IndividualAck 时,将进入 doIndividualAck 方法。
private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Individual, properties, null);
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doIndividualAckAsync(messageId);
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
doIndividualAckAsync(messageId);
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
复制代码
Consumer 默认不会马上发送 Ack 的命令给 Broker,会在积攒到一定时间或者达到一定大小(MAX_ACK_GROUP_SIZE)后再将所有 ack 统一发送,这个 delay 时间可通过 AcknowledgementsGroupTimeMicros 进行设置。
如果 AcknowledgementsGroupTimeMicros 为 0,则代表不进行统一发送(类似于不开启 batch),那么就马上发送。除此之外,如果 Ack 包含一些特殊的属性,也马上进行 ack。
在 doIndividualAckAsync 方法中,会将当前的 Ack 的 MessageId 添加进 pendingIndividualAcks 中。
在 flush 方法中,会为每个 ack 的消息创建 CommandAck 命令,其结构的主要部分如下:
message CommandAck {
enum AckType {
Individual = 0;
Cumulative = 1;
}
required uint64 consumer_id = 1;
required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids
repeated MessageIdData message_id = 3;
optional ValidationError validation_error = 4;
repeated KeyLongValue properties = 5;
optional uint64 request_id = 8;
}
复制代码
接着 Broker 收到 CommandAck 就开始进行 Ack 的处理。
Consumer Cumulative Ack
在我们使用 Consumer 的过程中,有时候我们并不希望一条一条地发送对每条消息的确认命令,这时我们可以使用 Cumulative Ack,对当前消息及以前的消息都进行确认。Cumulative Ack 大体的实现逻辑和 Individual Ack 相同,只是在发送 Ack 命令的时候,只需要发送最后一个消息的 Ack 即可。如下是 cumulative ack 的主要逻辑:
private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
// Handle concurrent updates from different threads
LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
while (true) {
LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
if (lastCumulativeAck.bitSetRecyclable != null) {
try {
lastCumulativeAck.bitSetRecyclable.recycle();
} catch (Exception ignore) {
// no-op
}
lastCumulativeAck.bitSetRecyclable = null;
}
lastCumulativeAck.recycle();
// Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
cumulativeAckFlushRequired = true;
return;
}
} else {
currentCumulativeAck.recycle();
// message id acknowledging an before the current last cumulative ack
return;
}
}
}
复制代码
在 PersistentAcknowledgmentsGroupingTracker 中会维护一个变量 lastCumulativeAck,用来保存用户所需要 Ack 的消息中,messageId 最大的一条消息的 ack 信息,其中包括 messageId 和 bitset。
所以在进行 cumulativeAck 时,则判断当前消息的 messageId 是否比 lastCumulativeAck 大,是的话就进行替换,如果不是则代表当前消息已经被调用过 ack 了(可能已经发往 broker,也可能还没有),那么就不需要做改变。
consumer 同样会批量发送 ack 命令,如果需要 flush 的时候,就会用 lastCumulativeAck 中的 messageId 和 bitset 发送 ack 命令。
cumulativeAck 逻辑上大体和 individual ack 相同,不同的就是只维护了最后一个 ack 的 messageid,且如果在 lastCumulativeAck 之前的消息没有被调用 ack,在给 broker 发送 ack 命令时也会被 ack。cumulativeAck 可以大大减少所需要的 ack 操作,适合一些业务使用。
Broker 处理消息确认
在 Consumer 发送给 Broker 的 CommandAck 的结构中,主要字段包括 AckType,用来指定当前的 Ack 类型;message_id 用来指定确认的消息的位置(如果是 CumulativeAck 则是确认的最后一条消息的位置),其中 message_id 中还包括 ack_sets,主要用于 chunk message 的中。
Broker 在收到 CommandAck 后,会调用相应的 subscription 的 ack 方法,在其中会调用 ManagedCursor 的 asyncDelete 方法,具体的 ack 逻辑都在其中。
在 Broker 中,使用 LongPairRangeSet 结构来维护 ack 的 range 关系。
if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
|| position.compareTo(markDeletePosition) <= 0) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
}
continue;
}
复制代码
如上,individualDeletedMessages 就是 LongPairRangeSet 结构,position 指代当前处理的消息的位置,首先会判断当前消息是否在 range 中,即代表该消息已经被 ack 了。
对于非 chunk 的 message,接着会进行如下的操作:
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
复制代码
主要就是将当前消息添加都 range set 中。而对于 batch message,则进行如下的操作:
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
bitSet.and(givenBitSet);
givenBitSet.recycle();
if (bitSet.isEmpty()) {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
复制代码
不同的是,会首先设置当前消息的 bitSet,Consumer 传过来的 ack_sets 代表没有被 ack 的 batch,会对 Broker 所存的 ack_sets 取交集,这时就能过滤出剩下的没有被 ack 的 batch 了。只有当 bitSet 为空时,则代表该 batchMessage 下的所有 batch 都已经被 ack 了,那么就可以像普通消息那样将其添加到 range set 中了。
Range<PositionImpl> range = individualDeletedMessages.firstRange();
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
name, range);
}
newMarkDeletePosition = range.upperEndpoint();
}
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
复制代码
接下来的工作就是获取 range set 中的第一个 range,第一个 range 的最后的位置代表的就是我们希望订阅跳转到的位置,也就是 newMarkDeletePosition,最后我们让当前的 cursor(也就是 subscription)循环地移动到 newMarkDeletePosition,即可完成当前的 ack 操作。
本文讲解了从 Consumer 到 Broker 的消息确认过程包括 individual ack 和 cumulative ack。
评论