写点什么

[Pulsar] Persistent Topic 持久化消息

作者:Zike Yang
  • 2021 年 11 月 26 日
  • 本文字数:1349 字

    阅读完需:约 4 分钟

在 Pulsar 中,有两种 Topic,Persistent Topic 和 NonPersistent Topic,前者会将消息持久化到 Bookie 的硬盘中,而后者则不会进行持久化,只会在内存中。本文将介绍 Persistent Topic 如何进行消息持久化。


ManagedLedger

在 Bookkeeper 中,最小的存储单位是 entry,对应的是 Pulsar 中的消息(非 batch 中的子消息),而这些 entry 都存储在 ledger 中,Ledger 存在一定的大小限制,而 Pulsar 中的 ManagedLedger 就是用来管理多个 Ledger,一个 Topic 往往对应一个 ManagedLedger。

PersistentTopic 在进行消息重复检查并去重后,会将去重后的消息持久化到 ManagedLedger 中。调用其 asyncAddEntry 的方法,

接着会检查当前的 ledger(Bookkeeper 的 ledger)是否已满。

private boolean currentLedgerIsFull() {    boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()            || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();
if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) {
boolean switchLedger = timeSinceLedgerCreationMs > config.getMinimumRolloverTimeMs(); return switchLedger; } else { return true; } } else { return false; }}
复制代码

如果 Ledger 中的 entry 数达到一定数量或者整个数据大小达到设置条件,就会触发容量限制,除此之外,如果 Ledger 达到一定时间,也会触发时间限制,这时 mleger 当前所使用的 ledger 会被判定为满,PersistentTopic 会新创建一个 ledger,并将后续的 entry(message)加入到新的 ledger 中。

在加入成功后,ledger 会将 entryId 返回给 mledger,mlegder 可以根据当前的 ledger 的 ID 来获取 ledgerId,并将其返回给 PersistentTopic,调用 PersistentTopic 的 addComplete 的 callback。

@Overridepublic void addComplete(Position pos, ByteBuf entryData, Object ctx) {    PublishContext publishContext = (PublishContext) ctx;    PositionImpl position = (PositionImpl) pos;
// Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position);
if (!publishContext.isMarkerMessage()) { lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); }
// in order to sync the max position when cursor read entries transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); publishContext.setMetadataFromEntryData(entryData); publishContext.completed(null, position.getLedgerId(), position.getEntryId()); decrementPendingWriteOpsAndCheck();}
复制代码

这时,就代表消息已经被持久化成功,PersistentTopic 会通过 publishContext,调用 producer 的 callback,这时,producer 就得到了消息持久化的 ledgerId 和 entryId,可以作为 messageId 返回给用户。

发布于: 10 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] Persistent Topic持久化消息