写点什么

[Pulsar] 一个消息的生命历程

作者:Zike Yang
  • 2021 年 12 月 11 日
  • 本文字数:13404 字

    阅读完需:约 44 分钟

本文将从消息的角度,介绍消息在 Pulsar 中传输的生命历程,介绍从生产者到 Pulsar 再到消费者的历程,让读者对 Pulsar 这一强大的消息系统有个更清晰的了解。


Producer

用户构建完一个 Message 后,调用 Producer 的 Send 方法,这时整个消息的生命历程开始。如果 Producer 所对应的 Topic 是 Partitioned Topic,那么首先 Producer 会调用消息路由器,通过用户在初始化 Producer 时所设置的消息路由选择消息所发往的分区,以下是选择分区的部分代码。

@OverrideCompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {    int partition = routerPolicy.choosePartition(message, topicMetadata);    checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),            "Illegal partition index chosen by the message routing policy: " + partition);
... ...
return producers.get(partition).internalSendWithTxnAsync(message, txn);}
复制代码

当选择分区完成后,则会调用相应的子 producer 完成消息的发送操作。

接下来会对消息的 payload 进行压缩,以此来减少带宽的占用量以及存储的占用空间。注意,如果此时 Producer 开启了 Batching(默认是开启的),那么这个压缩操作将在发送整一个 batch 消息时才会进行。

    // Batch will be compressed when closed    // If a message has a delayed delivery time, we'll always send it individually    if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {        compressedPayload = applyCompression(payload);        compressed = true;    }
复制代码

接下来将进行 chunk 的拆分,注意,chunk 拆分仅对无法加入到 batch 的消息有效,如 Producer 未开启 batching,或者已开启了 batching 但是 message 被指定了具体的分发时间 deliverAtTime,那么这些消息就会根据 Broker 中所设置的 MaxMessageSize 来进行拆分,保证每个消息的大小不会超过 Broker 所设置的消息大小的上限,否则,将过大的消息发送到 Broker 中,Broker 并不会接收。

// send in chunksint totalChunks = canAddToBatch(msg) ? 1        : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()                + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
复制代码

接下来,将为消息生成一个 Producer 范围内递增的 sequenceID,然后根据得出的总 chunk 数量,对消息的数据进行拆分,作为每一条子消息发送出去。

    try {        synchronized (this) {            int readStartIndex = 0;            long sequenceId;            if (!msgMetadata.hasSequenceId()) {                sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);                msgMetadata.setSequenceId(sequenceId);            } else {                sequenceId = msgMetadata.getSequenceId();            }            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {                serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,                        readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,                        compressedPayload.readableBytes(), uncompressedSize, callback);                readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());            }        }    } catch (PulsarClientException e) {        e.setSequenceId(msg.getSequenceId());        completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e);    } catch (Throwable t) {        completeCallbackAndReleaseSemaphore(uncompressedSize, callback, new PulsarClientException(t, msg.getSequenceId()));    }
复制代码

Batch Message

Producer 默认开启了 Batch,当消息积攒了一定大小或者累积了一小段时间后,才会触发消息发送,并且会将一个 batch 的消息打包发送给 Broker。因为 Producer 不能同时开启 Batch 和 Chunk,所以这两块逻辑都是分开处理的,本小节只讨论 Producer 开启 Batch 的情况。

当 Producer 接收到 Message 时,并不会马上进行发送,而是将它放入 batchMessageContainer 中。

如果当前的 batch Message container 已满了,就会直接发送当前的 batch 然后再把当前的消息加入到下一个 batch 中,这样可以实现 batch 达到一定消息大小后会触发消息发送的功能。

以下是 batch container 判断空间是否已满的代码:

@Overridepublic boolean haveEnoughSpace(MessageImpl<?> msg) {    int messageSize = msg.getDataBuffer().readableBytes();    return (        (maxBytesInBatch <= 0 && (messageSize + currentBatchSizeBytes) <= ClientCnx.getMaxMessageSize())        || (maxBytesInBatch > 0 && (messageSize + currentBatchSizeBytes) <= maxBytesInBatch)    ) && (maxNumMessagesInBatch <= 0 || numMessagesInBatch < maxNumMessagesInBatch);}
复制代码

重复消息检查

在 Pulsar 中还包括消息去重的功能,这为 Batch 处理带来了一些要求。因为在 Broker 的角度来看,一个 Batch 其实就是一条普通的消息,只是这条消息内部包含有许多子消息。为了方便 Broker 进行消息去重,我们需要让重复的消息单独放在一个 batch 中。

那在 Producer 端如何检查消息是否重复呢?这里需要用到 Producer 的两个变量:代表上一个已发送到 Broker 的消息的 sequenceId 的 lastSequenceIdPushed;代表上一个已发送到并被 Broker 确认的消息的 sequenceId 的 lastSequenceIdPublished。在 Producer 中,sequence id 是单调递增的,如果发现当前消息的 sequence id 竟然比上一次发往 Broker 但还未确认的消息的小,那么代表消息可能是重复的,至少是重复发送的,只是 Broker 并不一定重复接收到和持久化;如果发现当前的消息的 sequence id 比上一次被 Broker 确认的消息的小,那么能肯定消息已经是重复了。不过对于这两种情况,都是将当前消息作为独立的 batch 发送来处理。

以下是上面逻辑的代码:


if (canAddToBatch(msg) && totalChunks <= 1) {    if (canAddToCurrentBatch(msg)) {        // should trigger complete the batch message, new message will add to a new batch and new batch        // sequence id use the new message, so that broker can handle the message duplication        if (sequenceId <= lastSequenceIdPushed) {            isLastSequenceIdPotentialDuplicated = true;            if (sequenceId <= lastSequenceIdPublished) {                log.warn("Message with sequence id {} is definitely a duplicate", sequenceId);            } else {                log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.",                    sequenceId);            }            doBatchSendAndAdd(msg, callback, payload);        } else {            // Should flush the last potential duplicated since can't combine potential duplicated messages            // and non-duplicated messages into a batch.            if (isLastSequenceIdPotentialDuplicated) {                doBatchSendAndAdd(msg, callback, payload);            } else {                // handle boundary cases where message being added would exceed                // batch size and/or max message size                boolean isBatchFull = batchMessageContainer.add(msg, callback);                lastSendFuture = callback.getFuture();                payload.release();                if (isBatchFull) {                    batchMessageAndSend();                }            }            isLastSequenceIdPotentialDuplicated = false;        }    } else {        doBatchSendAndAdd(msg, callback, payload);    }} 
复制代码

通过以上的处理,满足了 Broker 对于重复消息的要求,方便 Broker 端的处理。


生成 OpSendMsg

当 Producer 准备将当前消息发送出去,或者 BatchContainer 已经达到发送条件准备将整个 batch 的消息发送出去时,会首先创建一个 OpSendMsg 对象,这是一个操作消息发送的对象,内部存储了发送消息所需要的指令数据(ByteBuf 类型)、metadata 信息等等。如以下是创建 OpSendMsg 的例子:

ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);op = OpSendMsg.create(msg, cmd, sequenceId, callback);op.setNumMessagesInBatch(numMessages);op.setBatchSizeByte(encryptedPayload.readableBytes());if (totalChunks > 1) {    op.totalChunks = totalChunks;    op.chunkId = chunkId;}
复制代码

首先是创建指令数据,pulsar 使用 protobuf 来进行各类 Client 和 Broker 通讯所使用的数据结构的序列化和反序列化。发送消息需要使用 CommandSend 结构,其 protobuf 定义如下:

message CommandSend {    required uint64 producer_id = 1;    required uint64 sequence_id = 2;    optional int32 num_messages = 3 [default = 1];    optional uint64 txnid_least_bits = 4 [default = 0];    optional uint64 txnid_most_bits = 5 [default = 0];
/// Add highest sequence id to support batch message with external sequence id optional uint64 highest_sequence_id = 6 [default = 0]; optional bool is_chunk =7 [default = false];
// Specify if the message being published is a Pulsar marker or not optional bool marker = 8 [default = false];}
复制代码

ProducerImpl.sendMessage 方法用来构造 CommandSend 并将其序列化为 ByteBufPair。其中,producer_id 和 sequence_id 都是必要的参数,sequence_id 主要用于去重,numMessages 用于批量消息发送时指定当前消息的数量。


处理 OpSendMsg

在生成 OpSendMsg 后,会调用 ProducerImpl 的 processOpSendMsg 用来处理 OpSendMsg,这时,producer 会将当前的 OpSendMsg 加入到 pendingMessages 队列中,当后续消息发送成功后,Broker 会发送确认信息 ack,因为 producer 发送消息都是按顺序发送的,所以收到的 ack 也是按顺序的,这时就可以直接获取 pendingMessages 的第一个元素。

pendingMessages.add(op);ClientCnx cnx = cnx();// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new// connection is establishedop.cmd.retain();cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
复制代码

这时,会创建一个 WriteInEventLoopCallback 对象,让 client 底层的 connection 的 eventLoop 去完成具体的发送调度的任务。在 WriteInEventLoopCallback 中会调用如下的代码将刚刚生成的 cmd 数据写入到 TCP 连接中,从而发送给 Broker。

cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
复制代码


Broker 处理 CommandSend

Client 的 Producer 将 CommandSend 发送给 Broker 进行处理,Broker 在处理完成后会返回 CommandSendReceipt 给 Client,其中 Response 会携带 MessageId,本小节详细介绍 Broker 如何处理从 Producer 进来的消息。

首先 Broker 会判断 Producer 所对应的 Topic 是否是 PersistentTopic,如果不是,则这条消息并不需要被持久化,因为不走 Bookkeeper,所以没有 MessageId,在 Response 中以及 Consumer 获得的 MessageId 的 ledgerId 和 entryId 都为-1,同时也因为没有持久化,可以直接将 Response 返回给 Client。除此之外,还做了消息限流的操作。

if (producer.isNonPersistentTopic()) {  // avoid processing non-persist message if reached max concurrent-message limit  if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {    final long producerId = send.getProducerId();    final long sequenceId = send.getSequenceId();    final long highestSequenceId = send.getHighestSequenceId();    service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {      commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);    }));    producer.recordMessageDrop(send.getNumMessages());    return;  } else {    nonPersistentPendingMessages++;  }}
复制代码

Broker 中的 Producer

Broker 中也存在的 Producer,与 Client 的 Producer 一一对应,当然其结构定义都是不一样的。Broker 会调用 Producer.publishMessageToTopic 的方法,将消息发送到对应的 topic 上。这个 topic 可能是 PersistentTopic 也可能是 NonPersistentTopic,走的则是不一样的逻辑。我们首先讨论 PersistentTopic 的逻辑。

PersistentTopic

消息进入 PersistentTopic 后,首先会先检查消息是否重复,重复消息检查和 Client 端中的类似。

PersistentTopic 内部有两个变量:highestSequencedPushed 和 highestSequencedPersisted 分别代表目前被发送到 Topic 上的消息的最高 sequenceId(最近被发送的)以及已经被持久化的最高 sequenceId。通过这两个变量可以判断当前消息是否已经被发送过来了以及是否已经被持久化了。如果消息已经被持久化了,那么消息肯定是重复的,这时消息的 MessageDupSttus 就是 Dup;如果消息已经被发送过来了,但是无法判断是否已经被持久化,则无法判断消息是否重复,因为在持久化的过程中也有可能出错,这时消息的 MessageDupStatus 则是 Unknown。

Long lastSequenceIdPushed = highestSequencedPushed.get(producerName);if (lastSequenceIdPushed != null && sequenceId <= lastSequenceIdPushed) {  if (log.isDebugEnabled()) {    log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",              topic.getName(), producerName, sequenceId, lastSequenceIdPushed);  }
// Also need to check sequence ids that has been persisted. // If current message's seq id is smaller or equals to the // lastSequenceIdPersisted than its definitely a dup // If current message's seq id is between lastSequenceIdPersisted and // lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not // we should return an error to the producer for the latter case so that it can retry at a future time Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName); if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) { return MessageDupStatus.Dup; } else { return MessageDupStatus.Unknown; }}highestSequencedPushed.put(producerName, highestSequenceId);
复制代码

对于重复的消息,Broker 会拒绝掉,并返回 ledgerId 和 entryId 均为-1 的 messageId。

ManagedLedger

在 Bookkeeper 中,最小的存储单位是 entry,对应的是 Pulsar 中的消息(非 batch 中的子消息),而这些 entry 都存储在 ledger 中,Ledger 存在一定的大小限制,而 Pulsar 中的 ManagedLedger 就是用来管理多个 Ledger,一个 Topic 往往对应一个 ManagedLedger。

PersistentTopic 在进行消息重复检查并去重后,会将去重后的消息持久化到 ManagedLedger 中。调用其 asyncAddEntry 的方法,

接着会检查当前的 ledger(Bookkeeper 的 ledger)是否已满。

private boolean currentLedgerIsFull() {    boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()            || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();
if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) {
boolean switchLedger = timeSinceLedgerCreationMs > config.getMinimumRolloverTimeMs(); return switchLedger; } else { return true; } } else { return false; }}
复制代码

如果 Ledger 中的 entry 数达到一定数量或者整个数据大小达到设置条件,就会触发容量限制,除此之外,如果 Ledger 达到一定时间,也会触发时间限制,这时 mleger 当前所使用的 ledger 会被判定为满,PersistentTopic 会新创建一个 ledger,并将后续的 entry(message)加入到新的 ledger 中。

在加入成功后,ledger 会将 entryId 返回给 mledger,mlegder 可以根据当前的 ledger 的 ID 来获取 ledgerId,并将其返回给 PersistentTopic,调用 PersistentTopic 的 addComplete 的 callback。

@Overridepublic void addComplete(Position pos, ByteBuf entryData, Object ctx) {    PublishContext publishContext = (PublishContext) ctx;    PositionImpl position = (PositionImpl) pos;
// Message has been successfully persisted messageDeduplication.recordMessagePersisted(publishContext, position);
if (!publishContext.isMarkerMessage()) { lastDataMessagePublishedTimestamp = Clock.systemUTC().millis(); }
// in order to sync the max position when cursor read entries transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); publishContext.setMetadataFromEntryData(entryData); publishContext.completed(null, position.getLedgerId(), position.getEntryId()); decrementPendingWriteOpsAndCheck();}
复制代码

这时,就代表消息已经被持久化成功,PersistentTopic 会通过 publishContext,调用 producer 的 callback,这时,producer 就得到了消息持久化的 ledgerId 和 entryId,可以作为 messageId 返回给用户。

至此,消息就完成了从用户端调用 Producer 发送再到 Broker 处理并持久化到 Bookie 的过程。接下来就是 Broker 将消息发送到 Consumer 的过程。

Consumer

CommandFlow

在 Consumer 从 Broker 拉取并消费消息前,即在建立连接后(ConsumerImpl 的 connectionOpened 回调方法),首先要介绍 Consumer 的流控。Consumer 在拉取消息前,都会向 Broker 发送 CommandFlow,告诉 Broker 我想要接收的消息数,Broker 就根据这个命令给 Consumer 至多发送多少条消息,当这个 permits 用完后,consumer 需要再次给 Broker 发送 CommandFlow。

如下是 CommandFlow 的结构:

message CommandFlow {    required uint64 consumer_id       = 1;    required uint32 messagePermits     = 2;}
复制代码

需要注意的是 messagePermits 还包括已发送的消息数量。

Broker 处理 CommandFlow

Broker 在接收到 CommanFlow 指令后,首先会根据其中的 consumer_id 找到对应的 consumer,设置 consumer 的 permits,然后会通过 consumer 所对应的 subscription 调用相对应的 dispatcher 的 consumerFlow 方法,通过这个方法,可以根据目前所给的 permits 的大小去读取消息。

在 Broker 中有许多 Dispatcher 的实现,它们都是用来调度和分发给 consumer 的消息。这里我们介绍默认使用的 subscription(Exclusive)所对应的 dispatcher 实现:PersistentDispatcherSingleActiveConsumer,顾名思义,这个 dispatcher 只会将消息分发给单个 active 的 consumer。在处理 Flow 之后,会调用 readMoreEntries 方法来读取来自 Bookie 的消息(Bookie 中的 Entry)。

@Overrideprotected void readMoreEntries(Consumer consumer) {  // consumer can be null when all consumers are disconnected from broker.  // so skip reading more entries if currently there is no active consumer.  if (null == consumer) {    return;  }
if (consumer.getAvailablePermits() > 0) { Pair<Integer, Long> calculateResult = calculateToRead(consumer); int messagesToRead = calculateResult.getLeft(); long bytesToRead = calculateResult.getRight();
if (-1 == messagesToRead || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate. return; }
// Schedule read havePendingRead = true; if (consumer.readCompacted()) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, consumer, topic.getMaxReadPosition()); } } else { if (log.isDebugEnabled()) { log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer); } }}
复制代码

在 dispatcher 的 readMoreEntries 方法中,首先会计算所需要读取的消息数和字节数。对于非压缩 topic(默认情况),会调用其对应的 manged cursor 的 asyncReadEntriesOrWait 方法,通过回调函数将调用 readEntriesComplete 方法,完成读取 entry 的过程。


KeyHashRange 原理

这里以订阅模型为 Exclusive 的 subscription 为例,Broker 会使用 PersistentDispatcherSingleActiveConsumer 的 Dispatcher 实现来进行消息的调度和分发。

在 ManagedCurosr 读取完消息后,会调用 PersistentDispatcherSingleActiveConsumer 的 readEntriesComplete 的回调方法。接着会检查是否开启了 keyHashRange 过滤,如果是则会进入 keyHashRange 逻辑。

if (isKeyHashRangeFiltered) {  Iterator<Entry> iterator = entries.iterator();  while (iterator.hasNext()) {    Entry entry = iterator.next();    byte[] key = peekStickyKey(entry.getDataBuffer());    Consumer consumer = stickyKeyConsumerSelector.select(key);    // Skip the entry if it's not for current active consumer.    if (consumer == null || currentConsumer != consumer) {      entry.release();      iterator.remove();    }  }}
复制代码

首先会获取消息的 key(byte[]类型),然后根据这个 key 选择对应的 consumer。

以下是 select 的具体实现

@Overridepublic Consumer select(int hash) {    if (rangeMap.size() > 0) {        int slot = hash % rangeSize;        Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);        Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);        Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;        Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;        if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {            return ceilingConsumer;        } else {            return null;        }    } else {        return null;    }}
复制代码

Broker 在添加 consumer 的时候,会将 consumer 及其对应的 range 加入到 rangeMap 中,在 select 时,首先会利用上面所得到的 key 得到对应的 hash,然后对 rangeSize 取余(默认是 65536),再在 rangeMap 中进行比对,找到符合范围的 consumer。

dispatcher 在 select 后,如果发现所得到的 consumer 是当前的 consumer,就会将该消息发送给当前的 consumer,完成 keyHashRange 分发的流程。


Entry 过滤

在分发前,首先要对从 MangedLedger 获取的 entries 进行过滤,主要针对的过滤类型有:

  1. 校验和或者元数据损坏的

  2. 该消息是一个内部的标记,如是否是事务功能所使用的标记或者是服务端内部的标记消息

  3. 该消息不是马上分发的,即该消息被设置了具体的分发时间,属于 DelayMessage

这些过滤过程主要是调用 Dispatcher 的 filterEntriesForConsumer 方法。


消息分发

当消息过滤完成后,就进入到具体的消息分发,这时 Dispatcher 才会调用对应的 consumer 的 sendMessages 方法,在其中,会进行 metadataAndPayload 的数据包组建。

其核心逻辑如下:

for (int i = 0; i < entries.size(); i++) {  Entry entry = entries.get(i);  if (entry == null) {    // Entry was filtered out    continue;  }
int batchSize = batchSizes.getBatchSize(i);
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) { log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}", topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId()); ctx.close(); entry.release(); continue; }
ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: // so, we can get chance to call entry.release metadataAndPayload.retain();
int redeliveryCount = 0; PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); if (redeliveryTracker.contains(position)) { redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position); }
ctx.write( cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise()); entry.release();}
复制代码

如,会获取当前的 batch 大小,设置相应的 entry 的 ledgerId 和 entryId 等等。最终打包发送给 Consumer,由 client 端的 consumer 进行解包和处理。


在一次发送完成后,如下,将对当前 consumer 再次调用 readMoreEntries 方法,如果当前的 permit 足够,将继续读取更多的 entry 并分发给 cosumer。

                // Schedule a new read batch operation only after the previous batch has been written to the socket.                topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,                    SafeRun.safeRun(() -> {                        synchronized (PersistentDispatcherSingleActiveConsumer.this) {                            Consumer newConsumer = getActiveConsumer();                            if (newConsumer != null && !havePendingRead) {                                readMoreEntries(newConsumer);                            } else {                                log.debug(                                        "[{}-{}] Ignoring write future complete."                                                + " consumerAvailable={} havePendingRead={}",                                        name, newConsumer, newConsumer != null, havePendingRead);                            }                        }                    }));
复制代码

至此,Broker 完成了将消息发送给 Consumer 的操作,接下来将由客户端的 consumer 进行处理并将消息发送给用户端。

Consumer 消费消息

当 Broker 分发消息给 consumer 时,会向 consumer 发送 CommnadMessage 的命令,在其中指明 consumerId 和 messageId,CommnadMessage 的 protobuf 定义如下:

message CommandMessage {    required uint64 consumer_id       = 1;    required MessageIdData message_id = 2;    optional uint32 redelivery_count  = 3 [default = 0];    repeated int64 ack_set = 4;}
复制代码

在 CommandMessage 后将附带消息的 metadata 和 Payload(headersAndPayload)。


Client 接收到命令后,会通过 consumerId 寻找到对应的 consumer,然后调用 consumer 的 messageReceived 方法,让 consumer 去处理接收进来的消息,这也是 consumer 处理消息的核心逻辑。

if (!verifyChecksum(headersAndPayload, messageId)) {  // discard message with checksum error  discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);  return;}
BrokerEntryMetadata brokerEntryMetadata;MessageMetadata msgMetadata;try { brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload); msgMetadata = Commands.parseMessageMetadata(headersAndPayload);} catch (Throwable t) { discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); return;}
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
复制代码

首先会进行校验和检验,扔掉损坏的消息,然后再从 headersAndPayload 中提取出 Metadata,这里有两种 metadata,一种是携带有 broker 处理该消息相关信息的 metadata,如该消息的 index(类似于 kafka 中的 offset),一种是消息的 metadata,其中携带各类消息元信息,如 chunk 相关、压缩相关、事务相关等等。

接着 Consumer 将对 Payload 进行各项处理,如解压缩消息等,如果该消息是 chunk message,那么会按照 chunk message 的逻辑进行处理。

最后,生成出 MessageImpl 对象,用于后续返回给用户。

final MessageImpl<T> message =  newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, schema, redeliveryCount);uncompressedPayload.release();
executeNotifyCallback(message);
复制代码

调用 executeNotifyCallback 方法,Consumer 会使用 internalPinnedExecutor 去调度另一个线程去将消息分发给用户端代码,这样可以防止阻塞网络线程。在另一个线程中,会调用 enqueueMessageAndCheckBatchReceive 方法:

protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {  int messageSize = message.size();  if (canEnqueueMessage(message) && incomingMessages.offer(message)) {    // After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance    // anymore, since for pooled messages, this instance was possibly already been released and recycled.    INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);  }  return hasEnoughMessagesForBatchReceive();}
复制代码

这时,会将消息放进 incomingMessages 队列中,在用户调用 consumer 的 receive 方法时,内部会调用 internalReceive 方法:

protected Message<T> internalReceive() throws PulsarClientException {  Message<T> message;  try {    message = incomingMessages.take();    messageProcessed(message);    return beforeConsume(message);  } catch (InterruptedException e) {    stats.incrementNumReceiveFailed();    throw PulsarClientException.unwrap(e);  }}
复制代码

在其中会将 incomingMessages 中的消息拿出来,并返回给用户,至此,Broker 分发消息到 Consumer 再到用户业务代码的过程就结束了。


至此,一个消息的生命历程结束了。消息从 Producer 中经过各种处理发送给 Broker,Broker 对消息进行各项处理后将消息持久化到 Bookie 中,然后 Broker 再利用 Dispathcer 将消息调度分发给 Consumer,Consumer 处理消息再将消息发到用户手中,一个消息的生命历程就这样完成了,希望能通过本系列文章,让你更清除消息在 Pulsar 中是如何流转处理的。当然,本系列文章所假设的各类场景和所假设的各类设置都是默认设置以及简单设置,也略过了许多功能原理介绍,只对最核心的部分进行讲解,后续将会有更多的文章介绍 Pulsar 的其他特性。

发布于: 4 小时前阅读数: 6
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 一个消息的生命历程