写点什么

[Pulsar] Broker 消息分发

作者:Zike Yang
  • 2021 年 12 月 04 日
  • 本文字数:1638 字

    阅读完需:约 5 分钟

当 Broker 接收到 Consumer 的接收消息的请求后,会从 ManagedLedger 读取消息,得到消息后,将进行消息的分发操作,本文将介绍 Broker 将消息发送到 Consumer 的分发过程。


Entry 过滤

在分发前,首先要对从 MangedLedger 获取的 entries 进行过滤,主要针对的过滤类型有:

  1. 校验和或者元数据损坏的

  2. 该消息是一个内部的标记,如是否是事务功能所使用的标记或者是服务端内部的标记消息

  3. 该消息不是马上分发的,即该消息被设置了具体的分发时间,属于 DelayMessage

这些过滤过程主要是调用 Dispatcher 的 filterEntriesForConsumer 方法。


消息分发

当消息过滤完成后,就进入到具体的消息分发,这时 Dispatcher 才会调用对应的 consumer 的 sendMessages 方法,在其中,会进行 metadataAndPayload 的数据包组建。

其核心逻辑如下:

for (int i = 0; i < entries.size(); i++) {  Entry entry = entries.get(i);  if (entry == null) {    // Entry was filtered out    continue;  }
int batchSize = batchSizes.getBatchSize(i);
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) { log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}", topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId()); ctx.close(); entry.release(); continue; }
ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: // so, we can get chance to call entry.release metadataAndPayload.retain();
int redeliveryCount = 0; PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); if (redeliveryTracker.contains(position)) { redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); }
ctx.write( cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise()); entry.release();}
复制代码

如,会获取当前的 batch 大小,设置相应的 entry 的 ledgerId 和 entryId 等等。最终打包发送给 Consumer,由 client 端的 consumer 进行解包和处理。


在一次发送完成后,如下,将对当前 consumer 再次调用 readMoreEntries 方法,如果当前的 permit 足够,将继续读取更多的 entry 并分发给 cosumer。

                // Schedule a new read batch operation only after the previous batch has been written to the socket.                topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,                    SafeRun.safeRun(() -> {                        synchronized (PersistentDispatcherSingleActiveConsumer.this) {                            Consumer newConsumer = getActiveConsumer();                            if (newConsumer != null && !havePendingRead) {                                readMoreEntries(newConsumer);                            } else {                                log.debug(                                        "[{}-{}] Ignoring write future complete."                                                + " consumerAvailable={} havePendingRead={}",                                        name, newConsumer, newConsumer != null, havePendingRead);                            }                        }                    }));
复制代码

至此,Broker 完成了将消息发送给 Consumer 的操作,接下来将由客户端的 consumer 进行处理并将消息发送给用户端。

发布于: 5 小时前阅读数: 6
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Broker 消息分发