[Pulsar] ChunkMessageID 介绍及其原理
ChunkMessageID 是最近 Pulsar 提出的一个正在开发的新特性,旨在解决原先用 MessageIdImpl 处理 Chunked message 的 MessageId 所带来的一些问题。本文将对 ChunkMessageId 及其原理做简单的介绍。
使用 MessageIdImpl 处理 Chunked Message 的问题
在原先的实现中,Producer 发送 Chunked message 消息时,返回的是 MessageIdImpl,但一个 Chunked message 会被分成许多个 chunks,producer 实际上返回的是最后一个 chunk 的 messageId,而用户是无法拿到第一个 chunk 的 messageId 的。
如果我们拿这个 messageId 去进行 inclusive seek,在 consumer 端会带来一些问题。因为我们拿的是最后一个 chunk 的位置去 seek,那么 consumer 就会从最后一个 chunk 开始读取消息,而前面的 chunks 的消息都无法被读取到,整个 chunked message 将会作为异常消息而被丢弃掉,造成了数据的丢失。
如以下的代码可阐述这个问题:
引入 ChunkMessageId
以下是 ChunkMessgeId 的简易实现:
public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {private final MessageIdImpl firstChunkMsgId;
可以看到,ChunkMessageId 包含了第一个 chunk 和最后一个 chunk 的 MessageId,而当它被转换为 MessageIdImpl 类型时,则会保持为 lastChunkMessageId 的行为,这样可以兼容原先的行为逻辑。
引入 ChunkMessageId 后,如果 Producer 发送 Message 时,因为开启了 chunking 并且该消息已被分片发送,那么返回给用户 MessageId 的实现将不再是 MessageIdImpl 而是 ChunkMessageIdImpl。在 Consumer 消费 Chunked message 时,返回的也是 ChunkMessageIdImpl。
在 consumer.seek 的时候,我们可以将 seek 所使用的 messageId 改为 ChunkMessageId 中的第一个 chunk 的 messageId。这样就可以解决上述的问题了。
目前这个特性还在开发中,感兴趣的小伙伴可以关注:https://github.com/apache/pulsar/issues/12402
版权声明: 本文为 InfoQ 作者【Zike Yang】的原创文章。
原文链接:【http://xie.infoq.cn/article/a8f7dd472ab02e818f393b66a】。文章转载请联系作者。
评论