本文将介绍从 Broker 分发出来的消息,如果在 Consumer 端进行初步处理。
当 Broker 分发消息给 consumer 时,会向 consumer 发送 CommnadMessage 的命令,在其中指明 consumerId 和 messageId,CommnadMessage 的 protobuf 定义如下:
message CommandMessage {
required uint64 consumer_id = 1;
required MessageIdData message_id = 2;
optional uint32 redelivery_count = 3 [default = 0];
repeated int64 ack_set = 4;
}
复制代码
在 CommandMessage 后将附带消息的 metadata 和 Payload(headersAndPayload)。
Client 接收到命令后,会通过 consumerId 寻找到对应的 consumer,然后调用 consumer 的 messageReceived 方法,让 consumer 去处理接收进来的消息,这也是 consumer 处理消息的核心逻辑。
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
BrokerEntryMetadata brokerEntryMetadata;
MessageMetadata msgMetadata;
try {
brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
复制代码
首先会进行校验和检验,扔掉损坏的消息,然后再从 headersAndPayload 中提取出 Metadata,这里有两种 metadata,一种是携带有 broker 处理该消息相关信息的 metadata,如该消息的 index(类似于 kafka 中的 offset),一种是消息的 metadata,其中携带各类消息元信息,如 chunk 相关、压缩相关、事务相关等等。
接着 Consumer 将对 Payload 进行各项处理,如解压缩消息等,如果该消息是 chunk message,那么会按照 chunk message 的逻辑进行处理。
最后,生成出 MessageImpl 对象,用于后续返回给用户。
final MessageImpl<T> message =
newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, schema, redeliveryCount);
uncompressedPayload.release();
executeNotifyCallback(message);
复制代码
调用 executeNotifyCallback 方法,Consumer 会使用 internalPinnedExecutor 去调度另一个线程去将消息分发给用户端代码,这样可以防止阻塞网络线程。在另一个线程中,会调用 enqueueMessageAndCheckBatchReceive 方法:
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
int messageSize = message.size();
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
}
return hasEnoughMessagesForBatchReceive();
}
复制代码
这时,会将消息放进 incomingMessages 队列中,在用户调用 consumer 的 receive 方法时,内部会调用 internalReceive 方法:
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
}
}
复制代码
在其中会将 incomingMessages 中的消息拿出来,并返回给用户,至此,Broker 分发消息到 Consumer 再到用户业务代码的过程就结束了。
至此,《一个消息的生命历程》系列结束了。消息从 Producer 中经过各种处理发送给 Broker,Broker 对消息进行各项处理后将消息持久化到 Bookie 中,然后 Broker 再利用 Dispathcer 将消息调度分发给 Consumer,Consumer 处理消息再将消息发到用户手中,一个消息的生命历程就这样完成了,希望能通过本系列文章,让你更清除消息在 Pulsar 中是如何流转处理的。当然,本系列文章所假设的各类场景和所假设的各类设置都是默认设置以及简单设置,也略过了许多功能原理介绍,只对最核心的部分进行讲解,后续将会有更多的文章介绍 Pulsar 的其他特性。
评论