在 Pulsar 中,我们能够为 Consumer 或者 Reader 指定相应的 KeyHashRange,使得 Broker 在进行消息分发的时候,能够对消息的 key 进行 hash 并根据 consumer 所设置的 keyHashRange 选择对应的 consumer,并将消息分发给 cosumer。本文将介绍 KeyHashRange 的原理。
设置 KeyHashRange
在 Reader 中,我们可以直接像下面这样设置 KeyHashRange
pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
.create();
复制代码
HashRange 默认大小是 65536,这意味着 65536 范围中,这个 reader 只会占用 0-10000 和 20001-30000 的范围,落入该范围的消息会被分发到这个 reader 上。
同样的,对于 consumer,我们可以这样设置:
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.keySharedPolicy(KeySharedPolicy
.stickyHashRange()
.ranges(Range.of(0, 10000), Range.of(20001, 30000))
.topic(topicName)
.subscribe();
复制代码
其效果和上面的 reader 是一样的。
KeyHashRange 原理
这里以订阅模型为 Exclusive 的 subscription 为例,Broker 会使用 PersistentDispatcherSingleActiveConsumer 的 Dispatcher 实现来进行消息的调度和分发。
在 ManagedCurosr 读取完消息后,会调用 PersistentDispatcherSingleActiveConsumer 的 readEntriesComplete 的回调方法。接着会检查是否开启了 keyHashRange 过滤,如果是则会进入 keyHashRange 逻辑。
if (isKeyHashRangeFiltered) {
Iterator<Entry> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry entry = iterator.next();
byte[] key = peekStickyKey(entry.getDataBuffer());
Consumer consumer = stickyKeyConsumerSelector.select(key);
// Skip the entry if it's not for current active consumer.
if (consumer == null || currentConsumer != consumer) {
entry.release();
iterator.remove();
}
}
}
复制代码
首先会获取消息的 key(byte[]类型),然后根据这个 key 选择对应的 consumer。
以下是 select 的具体实现
@Override
public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);
Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
return ceilingConsumer;
} else {
return null;
}
} else {
return null;
}
}
复制代码
Broker 在添加 consumer 的时候,会将 consumer 及其对应的 range 加入到 rangeMap 中,在 select 时,首先会利用上面所得到的 key 得到对应的 hash,然后对 rangeSize 取余(默认是 65536),再在 rangeMap 中进行比对,找到符合范围的 consumer。
dispatcher 在 select 后,如果发现所得到的 consumer 是当前的 consumer,就会将该消息发送给当前的 consumer,完成 keyHashRange 分发的流程。
评论