写点什么

Pulsar 的 Chunk Message 原理剖析

作者:Zike Yang
  • 2021 年 11 月 16 日
  • 本文字数:1987 字

    阅读完需:约 7 分钟

在使用 Pulsar 的过程中,有时候会遇到需要处理单个消息包含大量数据的情况,如 Producer 需要发送一个大文件给 Consumer,如果文件的大小超过了 Broker 所设置的单条消息大小的限制maxMessageSize(在broker.conf中定义,默认为 5MB),则会导致发送失败。这时我们就需要将这个大文件拆分成多条消息进行发送,Pulsar 提供了 Chunk Messege 的功能,当开启后,客户端能够自动对大消息进行拆分,并能够保证消息的完整性,在 Consumer 端能够自动整合消息,让用户能够无感知地发送大消息。

本文将通过从 ChunkMessage 的生产到消费的过程来讲解 Chunk Message 的原理。实际上,Chunk Message 功能的实现主要是在客户端中,在 Broker 看来,每个 Chunk 其实和普通的消息没有大的差别,所以本文主要是介绍 Producer 和 Consumer 是如何生产和消费 Chunk Message 的。

注:下文提到的 Chunk Message 指的是用户所发送的大消息,而 chunk 则代表被拆分出来的小消息。

Producer 发送 Chunk Message


Producer 在发送 Chunk Message 时,会首先计算出所需要的 chunk 数量totalChunks。主要是通过从 Broker 获得的参数maxMessageSize计算,这个参数可在broker.conf中设置,Broker 在与PulsarClient建立连接时则会传递这个参数。以下是计算totalChunks的代码:

int totalChunks = canAddToBatch(msg) ? 1        : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()                + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
复制代码


获取到totalChunks后,会对消息的数据进行拆分,然后逐个按顺序地发送每个 chunk 的数据,与发送普通消息不同的是,这里会为每个 chunk 都设置相同的 uuid,这样可以标记哪些消息是在同一个 chunk message 中的。

String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;for (int chunkId = 0; chunkId < totalChunks; chunkId++) {    serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,            readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,            compressedPayload.readableBytes(), uncompressedSize, callback);    readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());}
复制代码

在 Producer 发送每一个 chunk 时,Broker 也会异步并按顺序地给 Producer 发送每个 chunk 的确认接收的 ack,并返回MessageId给 Producer,在 Producer 接收到最后一个 chunk 的 MessageId 时,则代表整个 Chunk Message 发送完成。

// if message is chunked then call callback only on last chunkif (op.totalChunks <= 1 || (op.chunkId == op.totalChunks - 1)) {    try {        // Need to protect ourselves from any exception being thrown in the future handler from the        // application        op.sendComplete(null);    } catch (Throwable t) {        log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic,                producerName, sequenceId, t);    }}
复制代码

Consumer 消费 Chunk Message

Consumer 通过每个 chunk 所携带的 uuid 来整合 Chunk Message。在 Consumer 中维护了一个 Chunk Message 的上下文结构ChunkedMessageCtx,如下

static class ChunkedMessageCtx {
protected int totalChunks = -1; protected ByteBuf chunkedMsgBuffer; protected int lastChunkedMessageId = -1; protected MessageIdImpl[] chunkedMessageIds; protected long receivedTime = 0; ... ...}
复制代码

在接收到第一个 chunk 时,则会创建 Chunk Message 上下文,做好接收后续 chunks 的准备,当后续 chunk 来到的时候,会通过如下的chunkedMessagesMap并利用 uuid 的映射找到相应的ChunkedMessageCtx,并将消息整合进上下文中。

protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>();
复制代码

在创建 Ctx 的时候,会要求 Consumer 等待后续的 chunks,Consumer 会记录正在等待的 chunks 的个数,如果超过了 Consumer 所设置的maxPendingChunkedMessage大小,则会移除最老的ChunkedMessageCtx,这使得可能存在数据丢失的情况。

当接收到最后一个 chunk 时,则代表 Chunk Message 接收成功,这时可整合成MessageImpl并返回给用户。


实际上,Chunk Message 的功能主要是在 Client 端实现,在 Broker 的视角来看,每个 chunk 除了 Metadata 不同(携带了如 uuid 等信息),与其他普通消息无异。Producer 通过将大消息拆分成多个消息发送数据,而 Consumer 则通过ChunkedMessageCtx作为上下文整合多个消息并返回给用户。这里需要注意的是,如果 Consumer 的maxPendingChunkedMessage参数设置得过小,可能导致丢消息的情况发生,需要根据实际业务进行合理设置。

发布于: 3 小时前阅读数: 14
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

还未添加个人简介

评论

发布
暂无评论
Pulsar的Chunk Message原理剖析