写点什么

[Pulsar] 消息的消费

作者:Zike Yang
  • 2021 年 11 月 29 日
  • 本文字数:1349 字

    阅读完需:约 4 分钟

本文将以一个消息的角度,介绍消息的消费过程,包括 Consumer 流控、Broker 中的 dispatcher 等等。


CommandFlow

在介绍 Consumer 从 Broker 拉取并消费消息前,即在建立连接后(ConsumerImpl 的 connectionOpened 回调方法),首先要介绍 Consumer 的流控。Consumer 在拉取消息前,都会向 Broker 发送 CommandFlow,告诉 Broker 我想要接收的消息数,Broker 就根据这个命令给 Consumer 至多发送多少条消息,当这个 permits 用完后,consumer 需要再次给 Broker 发送 CommandFlow。

如下是 CommandFlow 的结构:

message CommandFlow {    required uint64 consumer_id       = 1;    required uint32 messagePermits     = 2;}
复制代码

需要注意的是 messagePermits 还包括已发送的消息数量。


Broker 处理 CommandFlow

Broker 在接收到 CommanFlow 指令后,首先会根据其中的 consumer_id 找到对应的 consumer,设置 consumer 的 permits,然后会通过 consumer 所对应的 subscription 调用相对应的 dispatcher 的 consumerFlow 方法,通过这个方法,可以根据目前所给的 permits 的大小去读取消息。

在 Broker 中有许多 Dispatcher 的实现,它们都是用来调度和分发给 consumer 的消息。这里我们介绍默认使用的 subscription(Exclusive)所对应的 dispatcher 实现:PersistentDispatcherSingleActiveConsumer,顾名思义,这个 dispatcher 只会将消息分发给单个 active 的 consumer。在处理 Flow 之后,会调用 readMoreEntries 方法来读取来自 Bookie 的消息(Bookie 中的 Entry)。

@Overrideprotected void readMoreEntries(Consumer consumer) {  // consumer can be null when all consumers are disconnected from broker.  // so skip reading more entries if currently there is no active consumer.  if (null == consumer) {    return;  }
if (consumer.getAvailablePermits() > 0) { Pair<Integer, Long> calculateResult = calculateToRead(consumer); int messagesToRead = calculateResult.getLeft(); long bytesToRead = calculateResult.getRight();
if (-1 == messagesToRead || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate. return; }
// Schedule read havePendingRead = true; if (consumer.readCompacted()) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, topic.getMaxReadPosition()); } } else { if (log.isDebugEnabled()) { log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer); } }}
复制代码

在 dispatcher 的 readMoreEntries 方法中,首先会计算所需要读取的消息数和字节数。对于非压缩 topic(默认情况),会调用其对应的 manged cursor 的 asyncReadEntriesOrWait 方法,通过回调函数将调用 readEntriesComplete 方法,完成读取 entry 的过程。


后续 Dispatcher、consumer 会将这些 entry 整合成 CommandMessage 发往客户端中的 consumer,具体细节将在后续的文章中介绍。

发布于: 1 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 消息的消费