Pulsar 对于 Batch Message 的确认不能简单地像普通的 Individual message 一样进行确认,因为一个 batch 内部往往会有许多消息,如果像普通的确认处理方法,会导致对于 batch 的某个消息的确认变成整个 batch 的确认,所以,我们需要额外的逻辑来处理 batch message 的确认。
AckSet
我们在对 batch message 确认的时候,需要维护 batch 中每条消息的确认状态信息。在给 Broker 发送 Batch message 的 ack 命令时,我们需要在当前的 messageId 中指定具体需要 ack 哪些子消息。我们可以先看看 MessageIdData 的 proto 定义:
message MessageIdData {
required uint64 ledgerId = 1;
required uint64 entryId = 2;
optional int32 partition = 3 [default = -1];
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5;
optional int32 batch_size = 6;
}
复制代码
MessageIdData 是 Cosumer 在 Ack 时需要传递给 Broker 的数据,其中 ledgerId 和 entryId 共同确认了当前整个 batch message 在 bookie 中的位置,而我们需要额外借助一个 int 数组 ack_set,来指明当前 batch 中还有哪些未被确认。
在实现中,Pulsar 使用 ConcurrentBitSet 结构,在 java 原有的 BitSet 的数据结构的基础上,添加了对多线程的支持。
Batch 消息确认流程
接下来我们讲解具体如何确认 Batch 消息。
consumer.onAcknowledge(msgId, null);
// ack this ack carry bitSet index and judge bit set are all ack
if (batchMessageId.ackIndividual()) {
MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
return doIndividualAck(messageId, properties);
} else if (batchIndexAckEnabled){
return doIndividualBatchAck(batchMessageId, properties);
} else {
// if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
// all ack complete
return CompletableFuture.completedFuture(null);
}
复制代码
在 consumer 判断当前消息为 batch message 后,会调用 batchMessageId 的 ackIndividual 方法,在每个 batchMessageId 中同时也维护了 BatchMessageAcker(也是 BitSet 的结构),当 Consumer 没有开启 batchIndexAck 时,这时 consumer 只有在 batch 中所有的消息都 ack 了才会发送 ack 的命令。
public synchronized boolean ackIndividual(int batchIndex) {
bitSet.clear(batchIndex);
return bitSet.isEmpty();
}
复制代码
同时会判断当前的 batch 是否已经确认完了,如果确认完了,就按照普通消息的 ack 流程进行确认。如果没有,则检查是否开启了 batchIndex,如果没开启就不需要做任何操作(不需要发送 ack 命令),开启了则需要发送针对子消息的 ack 命令(默认不是立马发送,而是会经过一次 batch 流程)。
private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
batchMessageId.getPartitionIndex()), (v) -> {
ConcurrentBitSetRecyclable value;
if (batchMessageId.getAcker() != null &&
!(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {
value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, batchMessageId.getBatchIndex());
}
return value;
});
bitSet.clear(batchMessageId.getBatchIndex());
}
复制代码
这时会用到 ConcurrentBitSetRecyclable(ConcurrentBitSet 的拓展)结构,并对对应 batchIndex 的消息进行 clear,原理同上。
最后,consumer 会在发送 ack 命令给 broker 的时候,计算出当前 batch 中的消息有哪些没有 ack,并在 MessageIdData 中进行设置,Broker 就能知道这些信息了,在 Client 端的 batch message ack 流程就完成了。
评论