写点什么

[Pulsar] ChunkMessageID 介绍及其原理

作者:Zike Yang
  • 2021 年 11 月 19 日
  • 本文字数:1167 字

    阅读完需:约 4 分钟

ChunkMessageID 是最近 Pulsar 提出的一个正在开发的新特性,旨在解决原先用 MessageIdImpl 处理 Chunked message 的 MessageId 所带来的一些问题。本文将对 ChunkMessageId 及其原理做简单的介绍。


使用 MessageIdImpl 处理 Chunked Message 的问题

在原先的实现中,Producer 发送 Chunked message 消息时,返回的是 MessageIdImpl,但一个 Chunked message 会被分成许多个 chunks,producer 实际上返回的是最后一个 chunk 的 messageId,而用户是无法拿到第一个 chunk 的 messageId 的。

如果我们拿这个 messageId 去进行 inclusive seek,在 consumer 端会带来一些问题。因为我们拿的是最后一个 chunk 的位置去 seek,那么 consumer 就会从最后一个 chunk 开始读取消息,而前面的 chunks 的消息都无法被读取到,整个 chunked message 将会作为异常消息而被丢弃掉,造成了数据的丢失。

如以下的代码可阐述这个问题:

var msgId = producer.send(...); // eg. 返回 0:1:-1
var otherMsg = producer.send(...); // 返回 0:2:-1
consumer.seek(msgId); // inclusive seek
var receiveMsgId = consumer.receive().getMessageId(); // 会丢弃掉第一个消息并返回 0:2:-1
Assert.assertEquals(msgId, receiveMsgId); // 错误
复制代码

引入 ChunkMessageId

以下是 ChunkMessgeId 的简易实现:

public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {private final MessageIdImpl firstChunkMsgId;


public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId {    private final MessageIdImpl firstChunkMsgId;
public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) { super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex()); this.firstChunkMsgId = firstChunkMsgId; }
public MessageIdImpl getFirstChunkMessageId() { return firstChunkMsgId; }
public MessageIdImpl getLastChunkMessageId() { return this; }}
复制代码

可以看到,ChunkMessageId 包含了第一个 chunk 和最后一个 chunk 的 MessageId,而当它被转换为 MessageIdImpl 类型时,则会保持为 lastChunkMessageId 的行为,这样可以兼容原先的行为逻辑。

引入 ChunkMessageId 后,如果 Producer 发送 Message 时,因为开启了 chunking 并且该消息已被分片发送,那么返回给用户 MessageId 的实现将不再是 MessageIdImpl 而是 ChunkMessageIdImpl。在 Consumer 消费 Chunked message 时,返回的也是 ChunkMessageIdImpl。

在 consumer.seek 的时候,我们可以将 seek 所使用的 messageId 改为 ChunkMessageId 中的第一个 chunk 的 messageId。这样就可以解决上述的问题了。


目前这个特性还在开发中,感兴趣的小伙伴可以关注:https://github.com/apache/pulsar/issues/12402

发布于: 4 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] ChunkMessageID介绍及其原理