本文将从消息的角度,介绍消息在从 Producer 处理,发送到 Broker,再到 Broker 进行处理和持久化到 Bookie 的过程,是《一个消息的生命历程》系列文章的部分整合。
Producer
用户构建完一个 Message 后,调用 Producer 的 Send 方法,这时整个消息的生命历程开始。如果 Producer 所对应的 Topic 是 Partitioned Topic,那么首先 Producer 会调用消息路由器,通过用户在初始化 Producer 时所设置的消息路由选择消息所发往的分区,以下是选择分区的部分代码。
@Override
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
int partition = routerPolicy.choosePartition(message, topicMetadata);
checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
"Illegal partition index chosen by the message routing policy: " + partition);
... ...
return producers.get(partition).internalSendWithTxnAsync(message, txn);
}
复制代码
当选择分区完成后,则会调用相应的子 producer 完成消息的发送操作。
接下来会对消息的 payload 进行压缩,以此来减少带宽的占用量以及存储的占用空间。注意,如果此时 Producer 开启了 Batching(默认是开启的),那么这个压缩操作将在发送整一个 batch 消息时才会进行。
// Batch will be compressed when closed
// If a message has a delayed delivery time, we'll always send it individually
if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
compressedPayload = applyCompression(payload);
compressed = true;
}
复制代码
接下来将进行 chunk 的拆分,注意,chunk 拆分仅对无法加入到 batch 的消息有效,如 Producer 未开启 batching,或者已开启了 batching 但是 message 被指定了具体的分发时间 deliverAtTime,那么这些消息就会根据 Broker 中所设置的 MaxMessageSize 来进行拆分,保证每个消息的大小不会超过 Broker 所设置的消息大小的上限,否则,将过大的消息发送到 Broker 中,Broker 并不会接收。
// send in chunks
int totalChunks = canAddToBatch(msg) ? 1
: Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()
+ (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
复制代码
接下来,将为消息生成一个 Producer 范围内递增的 sequenceID,然后根据得出的总 chunk 数量,对消息的数据进行拆分,作为每一条子消息发送出去。
try {
synchronized (this) {
int readStartIndex = 0;
long sequenceId;
if (!msgMetadata.hasSequenceId()) {
sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
msgMetadata.setSequenceId(sequenceId);
} else {
sequenceId = msgMetadata.getSequenceId();
}
String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
compressedPayload.readableBytes(), uncompressedSize, callback);
readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());
}
}
} catch (PulsarClientException e) {
e.setSequenceId(msg.getSequenceId());
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e);
} catch (Throwable t) {
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, new PulsarClientException(t, msg.getSequenceId()));
}
复制代码
Batch Message
Producer 默认开启了 Batch,当消息积攒了一定大小或者累积了一小段时间后,才会触发消息发送,并且会将一个 batch 的消息打包发送给 Broker。因为 Producer 不能同时开启 Batch 和 Chunk,所以这两块逻辑都是分开处理的,本小节只讨论 Producer 开启 Batch 的情况。
当 Producer 接收到 Message 时,并不会马上进行发送,而是将它放入 batchMessageContainer 中。
如果当前的 batch Message container 已满了,就会直接发送当前的 batch 然后再把当前的消息加入到下一个 batch 中,这样可以实现 batch 达到一定消息大小后会触发消息发送的功能。
以下是 batch container 判断空间是否已满的代码:
@Override
public boolean haveEnoughSpace(MessageImpl<?> msg) {
int messageSize = msg.getDataBuffer().readableBytes();
return (
(maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())
|| (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)
) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);
}
复制代码
重复消息检查
在 Pulsar 中还包括消息去重的功能,这为 Batch 处理带来了一些要求。因为在 Broker 的角度来看,一个 Batch 其实就是一条普通的消息,只是这条消息内部包含有许多子消息。为了方便 Broker 进行消息去重,我们需要让重复的消息单独放在一个 batch 中。
那在 Producer 端如何检查消息是否重复呢?这里需要用到 Producer 的两个变量:代表上一个已发送到 Broker 的消息的 sequenceId 的 lastSequenceIdPushed;代表上一个已发送到并被 Broker 确认的消息的 sequenceId 的 lastSequenceIdPublished。在 Producer 中,sequence id 是单调递增的,如果发现当前消息的 sequence id 竟然比上一次发往 Broker 但还未确认的消息的小,那么代表消息可能是重复的,至少是重复发送的,只是 Broker 并不一定重复接收到和持久化;如果发现当前的消息的 sequence id 比上一次被 Broker 确认的消息的小,那么能肯定消息已经是重复了。不过对于这两种情况,都是将当前消息作为独立的 batch 发送来处理。
以下是上面逻辑的代码:
if (canAddToBatch(msg) && totalChunks <= 1) {
if (canAddToCurrentBatch(msg)) {
// should trigger complete the batch message, new message will add to a new batch and new batch
// sequence id use the new message, so that broker can handle the message duplication
if (sequenceId <= lastSequenceIdPushed) {
isLastSequenceIdPotentialDuplicated = true;
if (sequenceId <= lastSequenceIdPublished) {
log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);
} else {
log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",
sequenceId);
}
doBatchSendAndAdd(msg, callback, payload);
} else {
// Should flush the last potential duplicated since can't combine potential duplicated messages
// and non-duplicated messages into a batch.
if (isLastSequenceIdPotentialDuplicated) {
doBatchSendAndAdd(msg, callback, payload);
} else {
// handle boundary cases where message being added would exceed
// batch size and/or max message size
boolean isBatchFull = batchMessageContainer.add(msg, callback);
lastSendFuture = callback.getFuture();
payload.release();
if (isBatchFull) {
batchMessageAndSend();
}
}
isLastSequenceIdPotentialDuplicated = false;
}
} else {
doBatchSendAndAdd(msg, callback, payload);
}
}
复制代码
通过以上的处理,满足了 Broker 对于重复消息的要求,方便 Broker 端的处理。
生成 OpSendMsg
当 Producer 准备将当前消息发送出去,或者 BatchContainer 已经达到发送条件准备将整个 batch 的消息发送出去时,会首先创建一个 OpSendMsg 对象,这是一个操作消息发送的对象,内部存储了发送消息所需要的指令数据(ByteBuf 类型)、metadata 信息等等。如以下是创建 OpSendMsg 的例子:
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
op = OpSendMsg.create(msg, cmd, sequenceId, callback);
op.setNumMessagesInBatch(numMessages);
op.setBatchSizeByte(encryptedPayload.readableBytes());
if (totalChunks > 1) {
op.totalChunks = totalChunks;
op.chunkId = chunkId;
}
复制代码
首先是创建指令数据,pulsar 使用 protobuf 来进行各类 Client 和 Broker 通讯所使用的数据结构的序列化和反序列化。发送消息需要使用 CommandSend 结构,其 protobuf 定义如下:
message CommandSend {
required uint64 producer_id = 1;
required uint64 sequence_id = 2;
optional int32 num_messages = 3 [default = 1];
optional uint64 txnid_least_bits = 4 [default = 0];
optional uint64 txnid_most_bits = 5 [default = 0];
/// Add highest sequence id to support batch message with external sequence id
optional uint64 highest_sequence_id = 6 [default = 0];
optional bool is_chunk =7 [default = false];
// Specify if the message being published is a Pulsar marker or not
optional bool marker = 8 [default = false];
}
复制代码
ProducerImpl.sendMessage 方法用来构造 CommandSend 并将其序列化为 ByteBufPair。其中,producer_id 和 sequence_id 都是必要的参数,sequence_id 主要用于去重,numMessages 用于批量消息发送时指定当前消息的数量。
处理 OpSendMsg
在生成 OpSendMsg 后,会调用 ProducerImpl 的 processOpSendMsg 用来处理 OpSendMsg,这时,producer 会将当前的 OpSendMsg 加入到 pendingMessages 队列中,当后续消息发送成功后,Broker 会发送确认信息 ack,因为 producer 发送消息都是按顺序发送的,所以收到的 ack 也是按顺序的,这时就可以直接获取 pendingMessages 的第一个元素。
pendingMessages.add(op);
ClientCnx cnx = cnx();
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
// connection is established
op.cmd.retain();
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
复制代码
这时,会创建一个 WriteInEventLoopCallback 对象,让 client 底层的 connection 的 eventLoop 去完成具体的发送调度的任务。在 WriteInEventLoopCallback 中会调用如下的代码将刚刚生成的 cmd 数据写入到 TCP 连接中,从而发送给 Broker。
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
复制代码
Broker 处理 CommandSend
Client 的 Producer 将 CommandSend 发送给 Broker 进行处理,Broker 在处理完成后会返回 CommandSendReceipt 给 Client,其中 Response 会携带 MessageId,本小节详细介绍 Broker 如何处理从 Producer 进来的消息。
首先 Broker 会判断 Producer 所对应的 Topic 是否是 PersistentTopic,如果不是,则这条消息并不需要被持久化,因为不走 Bookkeeper,所以没有 MessageId,在 Response 中以及 Consumer 获得的 MessageId 的 ledgerId 和 entryId 都为-1,同时也因为没有持久化,可以直接将 Response 返回给 Client。除此之外,还做了消息限流的操作。
if (producer.isNonPersistentTopic()) {
// avoid processing non-persist message if reached max concurrent-message limit
if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
final long highestSequenceId = send.getHighestSequenceId();
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
}));
producer.recordMessageDrop(send.getNumMessages());
return;
} else {
nonPersistentPendingMessages++;
}
}
复制代码
Broker 中的 Producer
Broker 中也存在的 Producer,与 Client 的 Producer 一一对应,当然其结构定义都是不一样的。Broker 会调用 Producer.publishMessageToTopic 的方法,将消息发送到对应的 topic 上。这个 topic 可能是 PersistentTopic 也可能是 NonPersistentTopic,走的则是不一样的逻辑。我们首先讨论 PersistentTopic 的逻辑。
PersistentTopic
消息进入 PersistentTopic 后,首先会先检查消息是否重复,重复消息检查和 Client 端中的类似。
PersistentTopic 内部有两个变量:highestSequencedPushed 和 highestSequencedPersisted 分别代表目前被发送到 Topic 上的消息的最高 sequenceId(最近被发送的)以及已经被持久化的最高 sequenceId。通过这两个变量可以判断当前消息是否已经被发送过来了以及是否已经被持久化了。如果消息已经被持久化了,那么消息肯定是重复的,这时消息的 MessageDupSttus 就是 Dup;如果消息已经被发送过来了,但是无法判断是否已经被持久化,则无法判断消息是否重复,因为在持久化的过程中也有可能出错,这时消息的 MessageDupStatus 则是 Unknown。
Long lastSequenceIdPushed = highestSequencedPushed.get(producerName);
if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {
if (log.isDebugEnabled()) {
log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",
topic.getName(), producerName, sequenceId, lastSequenceIdPushed);
}
// Also need to check sequence ids that has been persisted.
// If current message's seq id is smaller or equals to the
// lastSequenceIdPersisted than its definitely a dup
// If current message's seq id is between lastSequenceIdPersisted and
// lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not
// we should return an error to the producer for the latter case so that it can retry at a future time
Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName);
if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
}
}
highestSequencedPushed.put(producerName, highestSequenceId);
复制代码
对于重复的消息,Broker 会拒绝掉,并返回 ledgerId 和 entryId 均为-1 的 messageId。
ManagedLedger
在 Bookkeeper 中,最小的存储单位是 entry,对应的是 Pulsar 中的消息(非 batch 中的子消息),而这些 entry 都存储在 ledger 中,Ledger 存在一定的大小限制,而 Pulsar 中的 ManagedLedger 就是用来管理多个 Ledger,一个 Topic 往往对应一个 ManagedLedger。
PersistentTopic 在进行消息重复检查并去重后,会将去重后的消息持久化到 ManagedLedger 中。调用其 asyncAddEntry 的方法,
接着会检查当前的 ledger(Bookkeeper 的 ledger)是否已满。
private boolean currentLedgerIsFull() {
boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();
if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
boolean switchLedger = timeSinceLedgerCreationMs > config.getMinimumRolloverTimeMs();
return switchLedger;
} else {
return true;
}
} else {
return false;
}
}
复制代码
如果 Ledger 中的 entry 数达到一定数量或者整个数据大小达到设置条件,就会触发容量限制,除此之外,如果 Ledger 达到一定时间,也会触发时间限制,这时 mleger 当前所使用的 ledger 会被判定为满,PersistentTopic 会新创建一个 ledger,并将后续的 entry(message)加入到新的 ledger 中。
在加入成功后,ledger 会将 entryId 返回给 mledger,mlegder 可以根据当前的 ledger 的 ID 来获取 ledgerId,并将其返回给 PersistentTopic,调用 PersistentTopic 的 addComplete 的 callback。
@Override
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
PositionImpl position = (PositionImpl) pos;
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
if (!publishContext.isMarkerMessage()) {
lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
}
// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
}
复制代码
这时,就代表消息已经被持久化成功,PersistentTopic 会通过 publishContext,调用 producer 的 callback,这时,producer 就得到了消息持久化的 ledgerId 和 entryId,可以作为 messageId 返回给用户。
至此,消息就完成了从用户端调用 Producer 发送再到 Broker 处理并持久化到 Bookie 的过程。
评论