在 Pulsar 中,有两种 Topic,Persistent Topic 和 NonPersistent Topic,前者会将消息持久化到 Bookie 的硬盘中,而后者则不会进行持久化,只会在内存中。本文将介绍 Persistent Topic 如何进行消息持久化。
ManagedLedger
在 Bookkeeper 中,最小的存储单位是 entry,对应的是 Pulsar 中的消息(非 batch 中的子消息),而这些 entry 都存储在 ledger 中,Ledger 存在一定的大小限制,而 Pulsar 中的 ManagedLedger 就是用来管理多个 Ledger,一个 Topic 往往对应一个 ManagedLedger。
PersistentTopic 在进行消息重复检查并去重后,会将去重后的消息持久化到 ManagedLedger 中。调用其 asyncAddEntry 的方法,
接着会检查当前的 ledger(Bookkeeper 的 ledger)是否已满。
private boolean currentLedgerIsFull() {
boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();
if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
boolean switchLedger = timeSinceLedgerCreationMs > config.getMinimumRolloverTimeMs();
return switchLedger;
} else {
return true;
}
} else {
return false;
}
}
复制代码
如果 Ledger 中的 entry 数达到一定数量或者整个数据大小达到设置条件,就会触发容量限制,除此之外,如果 Ledger 达到一定时间,也会触发时间限制,这时 mleger 当前所使用的 ledger 会被判定为满,PersistentTopic 会新创建一个 ledger,并将后续的 entry(message)加入到新的 ledger 中。
在加入成功后,ledger 会将 entryId 返回给 mledger,mlegder 可以根据当前的 ledger 的 ID 来获取 ledgerId,并将其返回给 PersistentTopic,调用 PersistentTopic 的 addComplete 的 callback。
@Override
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
PositionImpl position = (PositionImpl) pos;
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
if (!publishContext.isMarkerMessage()) {
lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
}
// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
}
复制代码
这时,就代表消息已经被持久化成功,PersistentTopic 会通过 publishContext,调用 producer 的 callback,这时,producer 就得到了消息持久化的 ledgerId 和 entryId,可以作为 messageId 返回给用户。
评论