本文将介绍 Conusmer 消费的大体过程。
CommandFlow
在 Consumer 从 Broker 拉取并消费消息前,即在建立连接后(ConsumerImpl 的 connectionOpened 回调方法),首先要介绍 Consumer 的流控。Consumer 在拉取消息前,都会向 Broker 发送 CommandFlow,告诉 Broker 我想要接收的消息数,Broker 就根据这个命令给 Consumer 至多发送多少条消息,当这个 permits 用完后,consumer 需要再次给 Broker 发送 CommandFlow。
如下是 CommandFlow 的结构:
message CommandFlow {
required uint64 consumer_id = 1;
required uint32 messagePermits = 2;
}
复制代码
需要注意的是 messagePermits 还包括已发送的消息数量。
Broker 处理 CommandFlow
Broker 在接收到 CommanFlow 指令后,首先会根据其中的 consumer_id 找到对应的 consumer,设置 consumer 的 permits,然后会通过 consumer 所对应的 subscription 调用相对应的 dispatcher 的 consumerFlow 方法,通过这个方法,可以根据目前所给的 permits 的大小去读取消息。
在 Broker 中有许多 Dispatcher 的实现,它们都是用来调度和分发给 consumer 的消息。这里我们介绍默认使用的 subscription(Exclusive)所对应的 dispatcher 实现:PersistentDispatcherSingleActiveConsumer,顾名思义,这个 dispatcher 只会将消息分发给单个 active 的 consumer。在处理 Flow 之后,会调用 readMoreEntries 方法来读取来自 Bookie 的消息(Bookie 中的 Entry)。
@Override
protected void readMoreEntries(Consumer consumer) {
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
return;
}
if (consumer.getAvailablePermits() > 0) {
Pair<Integer, Long> calculateResult = calculateToRead(consumer);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();
if (-1 == messagesToRead || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}
// Schedule read
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, consumer, topic.getMaxReadPosition());
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
}
}
}
复制代码
在 dispatcher 的 readMoreEntries 方法中,首先会计算所需要读取的消息数和字节数。对于非压缩 topic(默认情况),会调用其对应的 manged cursor 的 asyncReadEntriesOrWait 方法,通过回调函数将调用 readEntriesComplete 方法,完成读取 entry 的过程。
Entry 过滤
在分发前,首先要对从 MangedLedger 获取的 entries 进行过滤,主要针对的过滤类型有:
校验和或者元数据损坏的
该消息是一个内部的标记,如是否是事务功能所使用的标记或者是服务端内部的标记消息
该消息不是马上分发的,即该消息被设置了具体的分发时间,属于 DelayMessage
这些过滤过程主要是调用 Dispatcher 的 filterEntriesForConsumer 方法。
消息分发
当消息过滤完成后,就进入到具体的消息分发,这时 Dispatcher 才会调用对应的 consumer 的 sendMessages 方法,在其中,会进行 metadataAndPayload 的数据包组建。
其核心逻辑如下:
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry == null) {
// Entry was filtered out
continue;
}
int batchSize = batchSizes.getBatchSize(i);
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}",
topicName, subscription,
consumerId, entry.getLedgerId(), entry.getEntryId());
ctx.close();
entry.release();
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
// increment ref-count of data and release at the end of process:
// so, we can get chance to call entry.release
metadataAndPayload.retain();
int redeliveryCount = 0;
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
if (redeliveryTracker.contains(position)) {
redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
}
ctx.write(
cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx,
redeliveryCount, metadataAndPayload,
batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName),
ctx.voidPromise());
entry.release();
}
复制代码
如,会获取当前的 batch 大小,设置相应的 entry 的 ledgerId 和 entryId 等等。最终打包发送给 Consumer,由 client 端的 consumer 进行解包和处理。
在一次发送完成后,如下,将对当前 consumer 再次调用 readMoreEntries 方法,如果当前的 permit 足够,将继续读取更多的 entry 并分发给 cosumer。
// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
log.debug(
"[{}-{}] Ignoring write future complete."
+ " consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
}
}));
复制代码
至此,Broker 完成了将消息发送给 Consumer 的操作,接下来将由客户端的 consumer 进行处理并将消息发送给用户端。
Consumer 消费消息
当 Broker 分发消息给 consumer 时,会向 consumer 发送 CommnadMessage 的命令,在其中指明 consumerId 和 messageId,CommnadMessage 的 protobuf 定义如下:
message CommandMessage {
required uint64 consumer_id = 1;
required MessageIdData message_id = 2;
optional uint32 redelivery_count = 3 [default = 0];
repeated int64 ack_set = 4;
}
复制代码
在 CommandMessage 后将附带消息的 metadata 和 Payload(headersAndPayload)。
Client 接收到命令后,会通过 consumerId 寻找到对应的 consumer,然后调用 consumer 的 messageReceived 方法,让 consumer 去处理接收进来的消息,这也是 consumer 处理消息的核心逻辑。
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
BrokerEntryMetadata brokerEntryMetadata;
MessageMetadata msgMetadata;
try {
brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
复制代码
首先会进行校验和检验,扔掉损坏的消息,然后再从 headersAndPayload 中提取出 Metadata,这里有两种 metadata,一种是携带有 broker 处理该消息相关信息的 metadata,如该消息的 index(类似于 kafka 中的 offset),一种是消息的 metadata,其中携带各类消息元信息,如 chunk 相关、压缩相关、事务相关等等。
接着 Consumer 将对 Payload 进行各项处理,如解压缩消息等,如果该消息是 chunk message,那么会按照 chunk message 的逻辑进行处理。
最后,生成出 MessageImpl 对象,用于后续返回给用户。
final MessageImpl<T> message =
newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload, schema, redeliveryCount);
uncompressedPayload.release();
executeNotifyCallback(message);
复制代码
调用 executeNotifyCallback 方法,Consumer 会使用 internalPinnedExecutor 去调度另一个线程去将消息分发给用户端代码,这样可以防止阻塞网络线程。在另一个线程中,会调用 enqueueMessageAndCheckBatchReceive 方法:
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
int messageSize = message.size();
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
}
return hasEnoughMessagesForBatchReceive();
}
复制代码
这时,会将消息放进 incomingMessages 队列中,在用户调用 consumer 的 receive 方法时,内部会调用 internalReceive 方法:
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
}
}
复制代码
在其中会将 incomingMessages 中的消息拿出来,并返回给用户,至此,Broker 分发消息到 Consumer 再到用户业务代码的过程就结束了。
评论