写点什么

[Pulsar] 消息生命历程(三)——客户端发送消息

作者:Zike Yang
  • 2021 年 11 月 23 日
  • 本文字数:1246 字

    阅读完需:约 4 分钟

当消息经过一系列的前期处理,如拆分 chunk、整合 batch、整合压缩等等,接着就是客户端真正将消息发送到 Broker 的环节了。


生成 OpSendMsg

当 Producer 准备将当前消息发送出去,或者 BatchContainer 已经达到发送条件准备将整个 batch 的消息发送出去时,会首先创建一个 OpSendMsg 对象,这是一个操作消息发送的对象,内部存储了发送消息所需要的指令数据(ByteBuf 类型)、metadata 信息等等。如以下是创建 OpSendMsg 的例子:

ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);op = OpSendMsg.create(msg, cmd, sequenceId, callback);op.setNumMessagesInBatch(numMessages);op.setBatchSizeByte(encryptedPayload.readableBytes());if (totalChunks > 1) {    op.totalChunks = totalChunks;    op.chunkId = chunkId;}
复制代码

首先是创建指令数据,pulsar 使用 protobuf 来进行各类 Client 和 Broker 通讯所使用的数据结构的序列化和反序列化。发送消息需要使用 CommandSend 结构,其 protobuf 定义如下:

message CommandSend {    required uint64 producer_id = 1;    required uint64 sequence_id = 2;    optional int32 num_messages = 3 [default = 1];    optional uint64 txnid_least_bits = 4 [default = 0];    optional uint64 txnid_most_bits = 5 [default = 0];
/// Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 6 [default = 0]; optional bool is_chunk =7 [default = false];
// Specify if the message being published is a Pulsar marker or not optional bool marker = 8 [default = false];}
复制代码

ProducerImpl.sendMessage 方法用来构造 CommandSend 并将其序列化为 ByteBufPair。其中,producer_id 和 sequence_id 都是必要的参数,sequence_id 主要用于去重,numMessages 用于批量消息发送时指定当前消息的数量。


处理 OpSendMsg

在生成 OpSendMsg 后,会调用 ProducerImpl 的 processOpSendMsg 用来处理 OpSendMsg,这时,producer 会将当前的 OpSendMsg 加入到 pendingMessages 队列中,当后续消息发送成功后,Broker 会发送确认信息 ack,因为 producer 发送消息都是按顺序发送的,所以收到的 ack 也是按顺序的,这时就可以直接获取 pendingMessages 的第一个元素。

pendingMessages.add(op);ClientCnx cnx = cnx();// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new// connection is establishedop.cmd.retain();cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
复制代码

这时,会创建一个 WriteInEventLoopCallback 对象,让 client 底层的 connection 的 eventLoop 去完成具体的发送调度的任务。在 WriteInEventLoopCallback 中会调用如下的代码将刚刚生成的 cmd 数据写入到 TCP 连接中,从而发送给 Broker。

cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
复制代码


发布于: 2 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 消息生命历程(三)——客户端发送消息