本文将从消息的角度,介绍消息在从 Broker 到 Cosumer 的过程,是《一个消息的生命历程》系列文章的部分整合。上一篇见:https://xie.infoq.cn/article/4a8c95f5019cf1ec769476790
CommandFlow
在 Consumer 从 Broker 拉取并消费消息前,即在建立连接后(ConsumerImpl 的 connectionOpened 回调方法),首先要介绍 Consumer 的流控。Consumer 在拉取消息前,都会向 Broker 发送 CommandFlow,告诉 Broker 我想要接收的消息数,Broker 就根据这个命令给 Consumer 至多发送多少条消息,当这个 permits 用完后,consumer 需要再次给 Broker 发送 CommandFlow。
如下是 CommandFlow 的结构:
message CommandFlow {
required uint64 consumer_id = 1;
required uint32 messagePermits = 2;
}
复制代码
需要注意的是 messagePermits 还包括已发送的消息数量。
Broker 处理 CommandFlow
Broker 在接收到 CommanFlow 指令后,首先会根据其中的 consumer_id 找到对应的 consumer,设置 consumer 的 permits,然后会通过 consumer 所对应的 subscription 调用相对应的 dispatcher 的 consumerFlow 方法,通过这个方法,可以根据目前所给的 permits 的大小去读取消息。
在 Broker 中有许多 Dispatcher 的实现,它们都是用来调度和分发给 consumer 的消息。这里我们介绍默认使用的 subscription(Exclusive)所对应的 dispatcher 实现:PersistentDispatcherSingleActiveConsumer,顾名思义,这个 dispatcher 只会将消息分发给单个 active 的 consumer。在处理 Flow 之后,会调用 readMoreEntries 方法来读取来自 Bookie 的消息(Bookie 中的 Entry)。
@Override
protected void readMoreEntries(Consumer consumer) {
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
return;
}
if (consumer.getAvailablePermits() > 0) {
Pair<Integer, Long> calculateResult = calculateToRead(consumer);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();
if (-1 == messagesToRead || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}
// Schedule read
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead,
bytesToRead, this, consumer, topic.getMaxReadPosition());
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
}
}
}
复制代码
在 dispatcher 的 readMoreEntries 方法中,首先会计算所需要读取的消息数和字节数。对于非压缩 topic(默认情况),会调用其对应的 manged cursor 的 asyncReadEntriesOrWait 方法,通过回调函数将调用 readEntriesComplete 方法,完成读取 entry 的过程。
KeyHashRange 原理
这里以订阅模型为 Exclusive 的 subscription 为例,Broker 会使用 PersistentDispatcherSingleActiveConsumer 的 Dispatcher 实现来进行消息的调度和分发。
在 ManagedCurosr 读取完消息后,会调用 PersistentDispatcherSingleActiveConsumer 的 readEntriesComplete 的回调方法。接着会检查是否开启了 keyHashRange 过滤,如果是则会进入 keyHashRange 逻辑。
if (isKeyHashRangeFiltered) {
Iterator<Entry> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry entry = iterator.next();
byte[] key = peekStickyKey(entry.getDataBuffer());
Consumer consumer = stickyKeyConsumerSelector.select(key);
// Skip the entry if it's not for current active consumer.
if (consumer == null || currentConsumer != consumer) {
entry.release();
iterator.remove();
}
}
}
复制代码
首先会获取消息的 key(byte[]类型),然后根据这个 key 选择对应的 consumer。
以下是 select 的具体实现
@Override
public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);
Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
return ceilingConsumer;
} else {
return null;
}
} else {
return null;
}
}
复制代码
Broker 在添加 consumer 的时候,会将 consumer 及其对应的 range 加入到 rangeMap 中,在 select 时,首先会利用上面所得到的 key 得到对应的 hash,然后对 rangeSize 取余(默认是 65536),再在 rangeMap 中进行比对,找到符合范围的 consumer。
dispatcher 在 select 后,如果发现所得到的 consumer 是当前的 consumer,就会将该消息发送给当前的 consumer,完成 keyHashRange 分发的流程。
Entry 过滤
在分发前,首先要对从 MangedLedger 获取的 entries 进行过滤,主要针对的过滤类型有:
校验和或者元数据损坏的
该消息是一个内部的标记,如是否是事务功能所使用的标记或者是服务端内部的标记消息
该消息不是马上分发的,即该消息被设置了具体的分发时间,属于 DelayMessage
这些过滤过程主要是调用 Dispatcher 的 filterEntriesForConsumer 方法。
消息分发
当消息过滤完成后,就进入到具体的消息分发,这时 Dispatcher 才会调用对应的 consumer 的 sendMessages 方法,在其中,会进行 metadataAndPayload 的数据包组建。
其核心逻辑如下:
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry == null) {
// Entry was filtered out
continue;
}
int batchSize = batchSizes.getBatchSize(i);
if (batchSize > 1 && !cnx.isBatchMessageCompatibleVersion()) {
log.warn("[{}-{}] Consumer doesn't support batch messages - consumerId {}, msg id {}-{}",
topicName, subscription,
consumerId, entry.getLedgerId(), entry.getEntryId());
ctx.close();
entry.release();
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
// increment ref-count of data and release at the end of process:
// so, we can get chance to call entry.release
metadataAndPayload.retain();
int redeliveryCount = 0;
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
if (redeliveryTracker.contains(position)) {
redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
}
ctx.write(
cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx,
redeliveryCount, metadataAndPayload,
batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName),
ctx.voidPromise());
entry.release();
}
复制代码
如,会获取当前的 batch 大小,设置相应的 entry 的 ledgerId 和 entryId 等等。最终打包发送给 Consumer,由 client 端的 consumer 进行解包和处理。
在一次发送完成后,如下,将对当前 consumer 再次调用 readMoreEntries 方法,如果当前的 permit 足够,将继续读取更多的 entry 并分发给 cosumer。
// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
log.debug(
"[{}-{}] Ignoring write future complete."
+ " consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
}
}));
复制代码
至此,Broker 完成了将消息发送给 Consumer 的操作,接下来将由客户端的 consumer 进行处理并将消息发送给用户端。
评论