
[Pulsar] 消息从 Broker 到 Consumer 的历程

作者:Zike Yang
  • 2021 年 12 月 05 日
本文将从消息的角度,介绍消息在从 Broker 到 Cosumer 的过程,是《一个消息的生命历程》系列文章的部分整合。上一篇见:https://xie.infoq.cn/article/4a8c95f5019cf1ec769476790


在 Consumer 从 Broker 拉取并消费消息前,即在建立连接后(ConsumerImpl 的 connectionOpened 回调方法),首先要介绍 Consumer 的流控。Consumer 在拉取消息前,都会向 Broker 发送 CommandFlow,告诉 Broker 我想要接收的消息数,Broker 就根据这个命令给 Consumer 至多发送多少条消息,当这个 permits 用完后,consumer 需要再次给 Broker 发送 CommandFlow。

如下是 CommandFlow 的结构:

message CommandFlow {    required uint64 consumer_id       = 1;    required uint32 messagePermits     = 2;}

需要注意的是 messagePermits 还包括已发送的消息数量。

Broker 处理 CommandFlow

Broker 在接收到 CommanFlow 指令后,首先会根据其中的 consumer_id 找到对应的 consumer,设置 consumer 的 permits,然后会通过 consumer 所对应的 subscription 调用相对应的 dispatcher 的 consumerFlow 方法,通过这个方法,可以根据目前所给的 permits 的大小去读取消息。

在 Broker 中有许多 Dispatcher 的实现,它们都是用来调度和分发给 consumer 的消息。这里我们介绍默认使用的 subscription(Exclusive)所对应的 dispatcher 实现:PersistentDispatcherSingleActiveConsumer,顾名思义,这个 dispatcher 只会将消息分发给单个 active 的 consumer。在处理 Flow 之后,会调用 readMoreEntries 方法来读取来自 Bookie 的消息(Bookie 中的 Entry)。

@Overrideprotected void readMoreEntries(Consumer consumer) {  // consumer can be null when all consumers are disconnected from broker.  // so skip reading more entries if currently there is no active consumer.  if (null == consumer) {    return;  }
if (consumer.getAvailablePermits() > 0) { Pair<Integer, Long> calculateResult = calculateToRead(consumer); int messagesToRead = calculateResult.getLeft(); long bytesToRead = calculateResult.getRight();
if (-1 == messagesToRead || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate. return; }
// Schedule read havePendingRead = true; if (consumer.readCompacted()) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, topic.getMaxReadPosition()); } } else { if (log.isDebugEnabled()) { log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer); } }}

在 dispatcher 的 readMoreEntries 方法中,首先会计算所需要读取的消息数和字节数。对于非压缩 topic(默认情况),会调用其对应的 manged cursor 的 asyncReadEntriesOrWait 方法,通过回调函数将调用 readEntriesComplete 方法,完成读取 entry 的过程。

KeyHashRange 原理

这里以订阅模型为 Exclusive 的 subscription 为例,Broker 会使用 PersistentDispatcherSingleActiveConsumer 的 Dispatcher 实现来进行消息的调度和分发。

在 ManagedCurosr 读取完消息后,会调用 PersistentDispatcherSingleActiveConsumer 的 readEntriesComplete 的回调方法。接着会检查是否开启了 keyHashRange 过滤,如果是则会进入 keyHashRange 逻辑。

if (isKeyHashRangeFiltered) {  Iterator<Entry> iterator = entries.iterator();  while (iterator.hasNext()) {    Entry entry = iterator.next();    byte[] key = peekStickyKey(entry.getDataBuffer());    Consumer consumer = stickyKeyConsumerSelector.select(key);    // Skip the entry if it's not for current active consumer.    if (consumer == null || currentConsumer != consumer) {      entry.release();      iterator.remove();    }  }}

首先会获取消息的 key(byte[]类型),然后根据这个 key 选择对应的 consumer。

以下是 select 的具体实现

@Overridepublic Consumer select(int hash) {    if (rangeMap.size() > 0) {        int slot = hash % rangeSize;        Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);        Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);        Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;        Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;        if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {            return ceilingConsumer;        } else {            return null;        }    } else {        return null;    }}

Broker 在添加 consumer 的时候,会将 consumer 及其对应的 range 加入到 rangeMap 中,在 select 时,首先会利用上面所得到的 key 得到对应的 hash,然后对 rangeSize 取余(默认是 65536),再在 rangeMap 中进行比对,找到符合范围的 consumer。

dispatcher 在 select 后,如果发现所得到的 consumer 是当前的 consumer,就会将该消息发送给当前的 consumer,完成 keyHashRange 分发的流程。

Entry 过滤

在分发前,首先要对从 MangedLedger 获取的 entries 进行过滤,主要针对的过滤类型有:

  1. 校验和或者元数据损坏的

  2. 该消息是一个内部的标记,如是否是事务功能所使用的标记或者是服务端内部的标记消息

  3. 该消息不是马上分发的,即该消息被设置了具体的分发时间,属于 DelayMessage

这些过滤过程主要是调用 Dispatcher 的 filterEntriesForConsumer 方法。


当消息过滤完成后,就进入到具体的消息分发,这时 Dispatcher 才会调用对应的 consumer 的 sendMessages 方法,在其中,会进行 metadataAndPayload 的数据包组建。


for (int i = 0; i < entries.size(); i++) {  Entry entry = entries.get(i);  if (entry == null) {    // Entry was filtered out    continue;  }
int batchSize = batchSizes.getBatchSize(i);
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) { log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}", topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId()); ctx.close(); entry.release(); continue; }
ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: // so, we can get chance to call entry.release metadataAndPayload.retain();
int redeliveryCount = 0; PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); if (redeliveryTracker.contains(position)) { redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); }
ctx.write( cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise()); entry.release();}

如,会获取当前的 batch 大小,设置相应的 entry 的 ledgerId 和 entryId 等等。最终打包发送给 Consumer,由 client 端的 consumer 进行解包和处理。

在一次发送完成后,如下,将对当前 consumer 再次调用 readMoreEntries 方法,如果当前的 permit 足够,将继续读取更多的 entry 并分发给 cosumer。

                // Schedule a new read batch operation only after the previous batch has been written to the socket.                topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,                    SafeRun.safeRun(() -> {                        synchronized (PersistentDispatcherSingleActiveConsumer.this) {                            Consumer newConsumer = getActiveConsumer();                            if (newConsumer != null && !havePendingRead) {                                readMoreEntries(newConsumer);                            } else {                                log.debug(                                        "[{}-{}] Ignoring write future complete."                                                + " consumerAvailable={} havePendingRead={}",                                        name, newConsumer, newConsumer != null, havePendingRead);                            }                        }                    }));

至此,Broker 完成了将消息发送给 Consumer 的操作,接下来将由客户端的 consumer 进行处理并将消息发送给用户端。

发布于: 1 小时前

[Pulsar] 消息从Broker到Consumer的历程