本文介绍 Broker 收到 CommandSend 所进行的前期处理操作,以及 PersisentTopic 的重复消息判断机制等。
Broker 处理 CommandSend
Client 的 Producer 将 CommandSend 发送给 Broker 进行处理,Broker 在处理完成后会返回 CommandSendReceipt 给 Client,其中 Response 会携带 MessageId,本小节详细介绍 Broker 如何处理从 Producer 进来的消息。
首先 Broker 会判断 Producer 所对应的 Topic 是否是 PersistentTopic,如果不是,则这条消息并不需要被持久化,因为不走 Bookkeeper,所以没有 MessageId,在 Response 中以及 Consumer 获得的 MessageId 的 ledgerId 和 entryId 都为-1,同时也因为没有持久化,可以直接将 Response 返回给 Client。除此之外,还做了消息限流的操作。
if (producer.isNonPersistentTopic()) {
// avoid processing non-persist message if reached max concurrent-message limit
if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
final long highestSequenceId = send.getHighestSequenceId();
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
}));
producer.recordMessageDrop(send.getNumMessages());
return;
} else {
nonPersistentPendingMessages++;
}
}
复制代码
Broker 中的 Producer
Broker 中也存在的 Producer,与 Client 的 Producer 一一对应,当然其结构定义都是不一样的。Broker 会调用 Producer.publishMessageToTopic 的方法,将消息发送到对应的 topic 上。这个 topic 可能是 PersistentTopic 也可能是 NonPersistentTopic,走的则是不一样的逻辑。我们首先讨论 PersistentTopic 的逻辑。
PersistentTopic
消息进入 PersistentTopic 后,首先会先检查消息是否重复,重复消息检查和 Client 端中的类似。
PersistentTopic 内部有两个变量:highestSequencedPushed 和 highestSequencedPersisted 分别代表目前被发送到 Topic 上的消息的最高 sequenceId(最近被发送的)以及已经被持久化的最高 sequenceId。通过这两个变量可以判断当前消息是否已经被发送过来了以及是否已经被持久化了。如果消息已经被持久化了,那么消息肯定是重复的,这时消息的 MessageDupSttus 就是 Dup;如果消息已经被发送过来了,但是无法判断是否已经被持久化,则无法判断消息是否重复,因为在持久化的过程中也有可能出错,这时消息的 MessageDupStatus 则是 Unknown。
Long lastSequenceIdPushed = highestSequencedPushed.get(producerName);
if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {
if (log.isDebugEnabled()) {
log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",
topic.getName(), producerName, sequenceId, lastSequenceIdPushed);
}
// Also need to check sequence ids that has been persisted.
// If current message's seq id is smaller or equals to the
// lastSequenceIdPersisted than its definitely a dup
// If current message's seq id is between lastSequenceIdPersisted and
// lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not
// we should return an error to the producer for the latter case so that it can retry at a future time
Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName);
if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
}
}
highestSequencedPushed.put(producerName, highestSequenceId);
复制代码
对于重复的消息,Broker 会拒绝掉,并返回 ledgerId 和 entryId 均为-1 的 messageId。
未完待续。在下文中我们将介绍 PersistentTopic 如何持久化消息。
评论