[Pulsar] Broker 消息分发
当 Broker 接收到 Consumer 的接收消息的请求后,会从 ManagedLedger 读取消息,得到消息后,将进行消息的分发操作,本文将介绍 Broker 将消息发送到 Consumer 的分发过程。
Entry 过滤
在分发前,首先要对从 MangedLedger 获取的 entries 进行过滤,主要针对的过滤类型有:
校验和或者元数据损坏的
该消息是一个内部的标记,如是否是事务功能所使用的标记或者是服务端内部的标记消息
该消息不是马上分发的,即该消息被设置了具体的分发时间,属于 DelayMessage
这些过滤过程主要是调用 Dispatcher 的 filterEntriesForConsumer 方法。
消息分发
当消息过滤完成后,就进入到具体的消息分发,这时 Dispatcher 才会调用对应的 consumer 的 sendMessages 方法,在其中,会进行 metadataAndPayload 的数据包组建。
其核心逻辑如下:
如,会获取当前的 batch 大小,设置相应的 entry 的 ledgerId 和 entryId 等等。最终打包发送给 Consumer,由 client 端的 consumer 进行解包和处理。
在一次发送完成后,如下,将对当前 consumer 再次调用 readMoreEntries 方法,如果当前的 permit 足够,将继续读取更多的 entry 并分发给 cosumer。
至此,Broker 完成了将消息发送给 Consumer 的操作,接下来将由客户端的 consumer 进行处理并将消息发送给用户端。
版权声明: 本文为 InfoQ 作者【Zike Yang】的原创文章。
原文链接:【http://xie.infoq.cn/article/c3b0ea2254ce3a84ed8517f2e】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论