本文介绍 Broker 如何处理从 Consumer 发送过来的消息确认命名。
首先我们需要看一下 Consumer 发送给 Broker 的 CommandAck 的结构:
message CommandAck {
enum AckType {
Individual = 0;
Cumulative = 1;
}
required uint64 consumer_id = 1;
required AckType ack_type = 2;
// In case of individual acks, the client can pass a list of message ids
repeated MessageIdData message_id = 3;
// Acks can contain a flag to indicate the consumer
// received an invalid message that got discarded
// before being passed on to the application.
enum ValidationError {
UncompressedSizeCorruption = 0;
DecompressionError = 1;
ChecksumMismatch = 2;
BatchDeSerializeError = 3;
DecryptionError = 4;
}
optional ValidationError validation_error = 4;
repeated KeyLongValue properties = 5;
optional uint64 txnid_least_bits = 6 [default = 0];
optional uint64 txnid_most_bits = 7 [default = 0];
optional uint64 request_id = 8;
}
复制代码
主要字段包括 AckType,用来指定当前的 Ack 类型;message_id 用来指定确认的消息的位置(如果是 CumulativeAck 则是确认的最后一条消息的位置),其中 message_id 中还包括 ack_sets,主要用于 chunk message 的中。
Broker 在收到 CommandAck 后,会调用相应的 subscription 的 ack 方法,在其中会调用 ManagedCursor 的 asyncDelete 方法,具体的 ack 逻辑都在其中。
在 Broker 中,使用 LongPairRangeSet 结构来维护 ack 的 range 关系。
if (individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())
|| position.compareTo(markDeletePosition) <= 0) {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Position was already deleted {}", ledger.getName(), name, position);
}
continue;
}
复制代码
如上,individualDeletedMessages 就是 LongPairRangeSet 结构,position 指代当前处理的消息的位置,首先会判断当前消息是否在 range 中,即代表该消息已经被 ack 了。
对于非 chunk 的 message,接着会进行如下的操作:
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
复制代码
主要就是将当前消息添加都 range set 中。而对于 batch message,则进行如下的操作:
BitSetRecyclable bitSet = batchDeletedIndexes.computeIfAbsent(position, (v) -> BitSetRecyclable.create().resetWords(position.ackSet));
BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(position.ackSet);
bitSet.and(givenBitSet);
givenBitSet.recycle();
if (bitSet.isEmpty()) {
PositionImpl previousPosition = ledger.getPreviousPosition(position);
individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(),
position.getLedgerId(), position.getEntryId());
MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this);
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
}
复制代码
不同的是,会首先设置当前消息的 bitSet,Consumer 传过来的 ack_sets 代表没有被 ack 的 batch,会对 Broker 所存的 ack_sets 取交集,这时就能过滤出剩下的没有被 ack 的 batch 了。只有当 bitSet 为空时,则代表该 batchMessage 下的所有 batch 都已经被 ack 了,那么就可以像普通消息那样将其添加到 range set 中了。
Range<PositionImpl> range = individualDeletedMessages.firstRange();
// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", ledger.getName(),
name, range);
}
newMarkDeletePosition = range.upperEndpoint();
}
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
复制代码
接下来的工作就是获取 range set 中的第一个 range,第一个 range 的最后的位置代表的就是我们希望订阅跳转到的位置,也就是 newMarkDeletePosition,最后我们让当前的 cursor(也就是 subscription)循环地移动到 newMarkDeletePosition,即可完成当前的 ack 操作。
评论