[Pulsar] 消息生命历程(三)——客户端发送消息
当消息经过一系列的前期处理,如拆分 chunk、整合 batch、整合压缩等等,接着就是客户端真正将消息发送到 Broker 的环节了。
生成 OpSendMsg
当 Producer 准备将当前消息发送出去,或者 BatchContainer 已经达到发送条件准备将整个 batch 的消息发送出去时,会首先创建一个 OpSendMsg 对象,这是一个操作消息发送的对象,内部存储了发送消息所需要的指令数据(ByteBuf 类型)、metadata 信息等等。如以下是创建 OpSendMsg 的例子:
首先是创建指令数据,pulsar 使用 protobuf 来进行各类 Client 和 Broker 通讯所使用的数据结构的序列化和反序列化。发送消息需要使用 CommandSend 结构,其 protobuf 定义如下:
ProducerImpl.sendMessage 方法用来构造 CommandSend 并将其序列化为 ByteBufPair。其中,producer_id 和 sequence_id 都是必要的参数,sequence_id 主要用于去重,numMessages 用于批量消息发送时指定当前消息的数量。
处理 OpSendMsg
在生成 OpSendMsg 后,会调用 ProducerImpl 的 processOpSendMsg 用来处理 OpSendMsg,这时,producer 会将当前的 OpSendMsg 加入到 pendingMessages 队列中,当后续消息发送成功后,Broker 会发送确认信息 ack,因为 producer 发送消息都是按顺序发送的,所以收到的 ack 也是按顺序的,这时就可以直接获取 pendingMessages 的第一个元素。
这时,会创建一个 WriteInEventLoopCallback 对象,让 client 底层的 connection 的 eventLoop 去完成具体的发送调度的任务。在 WriteInEventLoopCallback 中会调用如下的代码将刚刚生成的 cmd 数据写入到 TCP 连接中,从而发送给 Broker。
版权声明: 本文为 InfoQ 作者【Zike Yang】的原创文章。
原文链接:【http://xie.infoq.cn/article/56baef945bc05b83be197356f】。
本文遵守【CC BY-NC-ND】协议,转载请保留原文出处及本版权声明。
评论