Pulsar 的 Chunk Message 原理剖析
在使用 Pulsar 的过程中,有时候会遇到需要处理单个消息包含大量数据的情况,如 Producer 需要发送一个大文件给 Consumer,如果文件的大小超过了 Broker 所设置的单条消息大小的限制maxMessageSize
(在broker.conf
中定义,默认为 5MB),则会导致发送失败。这时我们就需要将这个大文件拆分成多条消息进行发送,Pulsar 提供了 Chunk Messege 的功能,当开启后,客户端能够自动对大消息进行拆分,并能够保证消息的完整性,在 Consumer 端能够自动整合消息,让用户能够无感知地发送大消息。
本文将通过从 ChunkMessage 的生产到消费的过程来讲解 Chunk Message 的原理。实际上,Chunk Message 功能的实现主要是在客户端中,在 Broker 看来,每个 Chunk 其实和普通的消息没有大的差别,所以本文主要是介绍 Producer 和 Consumer 是如何生产和消费 Chunk Message 的。
注:下文提到的 Chunk Message 指的是用户所发送的大消息,而 chunk 则代表被拆分出来的小消息。
Producer 发送 Chunk Message
Producer 在发送 Chunk Message 时,会首先计算出所需要的 chunk 数量totalChunks
。主要是通过从 Broker 获得的参数maxMessageSize
计算,这个参数可在broker.conf
中设置,Broker 在与PulsarClient
建立连接时则会传递这个参数。以下是计算totalChunks
的代码:
获取到totalChunks
后,会对消息的数据进行拆分,然后逐个按顺序地发送每个 chunk 的数据,与发送普通消息不同的是,这里会为每个 chunk 都设置相同的 uuid,这样可以标记哪些消息是在同一个 chunk message 中的。
在 Producer 发送每一个 chunk 时,Broker 也会异步并按顺序地给 Producer 发送每个 chunk 的确认接收的 ack,并返回MessageId
给 Producer,在 Producer 接收到最后一个 chunk 的 MessageId 时,则代表整个 Chunk Message 发送完成。
Consumer 消费 Chunk Message
Consumer 通过每个 chunk 所携带的 uuid 来整合 Chunk Message。在 Consumer 中维护了一个 Chunk Message 的上下文结构ChunkedMessageCtx
,如下
在接收到第一个 chunk 时,则会创建 Chunk Message 上下文,做好接收后续 chunks 的准备,当后续 chunk 来到的时候,会通过如下的chunkedMessagesMap
并利用 uuid 的映射找到相应的ChunkedMessageCtx
,并将消息整合进上下文中。
在创建 Ctx 的时候,会要求 Consumer 等待后续的 chunks,Consumer 会记录正在等待的 chunks 的个数,如果超过了 Consumer 所设置的maxPendingChunkedMessage
大小,则会移除最老的ChunkedMessageCtx
,这使得可能存在数据丢失的情况。
当接收到最后一个 chunk 时,则代表 Chunk Message 接收成功,这时可整合成MessageImpl
并返回给用户。
实际上,Chunk Message 的功能主要是在 Client 端实现,在 Broker 的视角来看,每个 chunk 除了 Metadata 不同(携带了如 uuid 等信息),与其他普通消息无异。Producer 通过将大消息拆分成多个消息发送数据,而 Consumer 则通过ChunkedMessageCtx
作为上下文整合多个消息并返回给用户。这里需要注意的是,如果 Consumer 的maxPendingChunkedMessage
参数设置得过小,可能导致丢消息的情况发生,需要根据实际业务进行合理设置。
版权声明: 本文为 InfoQ 作者【Zike Yang】的原创文章。
原文链接:【http://xie.infoq.cn/article/fa1450b51d8a65ebb2ba55c3c】。文章转载请联系作者。
评论