写点什么

[Pulsar] 一个消息的生命历程(一)

作者:Zike Yang
  • 2021 年 11 月 21 日
  • 本文字数:1721 字

    阅读完需:约 6 分钟

本文将从消息的角度,介绍消息在 Pulsar 中传输的生命历程,介绍从生产者到 Pulsar 再到消费者的历程,让读者对 Pulsar 这一强大的消息系统有个更清晰的了解。


Producer

用户构建完一个 Message 后,调用 Producer 的 Send 方法,这时整个消息的生命历程开始。如果 Producer 所对应的 Topic 是 Partitioned Topic,那么首先 Producer 会调用消息路由器,通过用户在初始化 Producer 时所设置的消息路由选择消息所发往的分区,以下是选择分区的部分代码。

@OverrideCompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {    int partition = routerPolicy.choosePartition(message, topicMetadata);    checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),            "Illegal partition index chosen by the message routing policy: " + partition);
... ...
return producers.get(partition).internalSendWithTxnAsync(message, txn);}
复制代码

当选择分区完成后,则会调用相应的子 producer 完成消息的发送操作。

接下来会对消息的 payload 进行压缩,以此来减少带宽的占用量以及存储的占用空间。注意,如果此时 Producer 开启了 Batching(默认是开启的),那么这个压缩操作将在发送整一个 batch 消息时才会进行。

    // Batch will be compressed when closed    // If a message has a delayed delivery time, we'll always send it individually    if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {        compressedPayload = applyCompression(payload);        compressed = true;    }
复制代码

接下来将进行 chunk 的拆分,注意,chunk 拆分仅对无法加入到 batch 的消息有效,如 Producer 未开启 batching,或者已开启了 batching 但是 message 被指定了具体的分发时间 deliverAtTime,那么这些消息就会根据 Broker 中所设置的 MaxMessageSize 来进行拆分,保证每个消息的大小不会超过 Broker 所设置的消息大小的上限,否则,将过大的消息发送到 Broker 中,Broker 并不会接收。

// send in chunksint totalChunks = canAddToBatch(msg) ? 1        : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize()                + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
复制代码

接下来,将为消息生成一个 Producer 范围内递增的 sequenceID,然后根据得出的总 chunk 数量,对消息的数据进行拆分,作为每一条子消息发送出去。

    try {        synchronized (this) {            int readStartIndex = 0;            long sequenceId;            if (!msgMetadata.hasSequenceId()) {                sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);                msgMetadata.setSequenceId(sequenceId);            } else {                sequenceId = msgMetadata.getSequenceId();            }            String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null;            for (int chunkId = 0; chunkId < totalChunks; chunkId++) {                serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks,                        readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,                        compressedPayload.readableBytes(), uncompressedSize, callback);                readStartIndex = ((chunkId + 1) * ClientCnx.getMaxMessageSize());            }        }    } catch (PulsarClientException e) {        e.setSequenceId(msg.getSequenceId());        completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e);    } catch (Throwable t) {        completeCallbackAndReleaseSemaphore(uncompressedSize, callback, new PulsarClientException(t, msg.getSequenceId()));    }
复制代码


未完待续……


本文介绍了消息在 Producer 中产生的过程,后续将介绍消息是如何从 Producer 发送到 Broker 中的。

发布于: 2 小时前阅读数: 6
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 一个消息的生命历程(一)