在 Pulsar 中,我们能够为 Consumer 或者 Reader 指定相应的 KeyHashRange,使得 Broker 在进行消息分发的时候,能够对消息的 key 进行 hash 并根据 consumer 所设置的 keyHashRange 选择对应的 consumer,并将消息分发给 cosumer。本文将介绍 KeyHashRange 的原理。
设置 KeyHashRange
在 Reader 中,我们可以直接像下面这样设置 KeyHashRange
pulsarClient.newReader() .topic(topic) .startMessageId(MessageId.earliest) .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000)) .create();
复制代码
HashRange 默认大小是 65536,这意味着 65536 范围中,这个 reader 只会占用 0-10000 和 20001-30000 的范围,落入该范围的消息会被分发到这个 reader 上。
同样的,对于 consumer,我们可以这样设置:
Consumer<String> consumer = client.newConsumer(Schema.STRING) .keySharedPolicy(KeySharedPolicy .stickyHashRange() .ranges(Range.of(0, 10000), Range.of(20001, 30000)) .topic(topicName) .subscribe();
复制代码
其效果和上面的 reader 是一样的。
KeyHashRange 原理
这里以订阅模型为 Exclusive 的 subscription 为例,Broker 会使用 PersistentDispatcherSingleActiveConsumer 的 Dispatcher 实现来进行消息的调度和分发。
在 ManagedCurosr 读取完消息后,会调用 PersistentDispatcherSingleActiveConsumer 的 readEntriesComplete 的回调方法。接着会检查是否开启了 keyHashRange 过滤,如果是则会进入 keyHashRange 逻辑。
if (isKeyHashRangeFiltered) { Iterator<Entry> iterator = entries.iterator(); while (iterator.hasNext()) { Entry entry = iterator.next(); byte[] key = peekStickyKey(entry.getDataBuffer()); Consumer consumer = stickyKeyConsumerSelector.select(key); // Skip the entry if it's not for current active consumer. if (consumer == null || currentConsumer != consumer) { entry.release(); iterator.remove(); } }}
复制代码
首先会获取消息的 key(byte[]类型),然后根据这个 key 选择对应的 consumer。
以下是 select 的具体实现
@Overridepublic Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot); Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot); Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null; Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null; if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) { return ceilingConsumer; } else { return null; } } else { return null; }}
复制代码
Broker 在添加 consumer 的时候,会将 consumer 及其对应的 range 加入到 rangeMap 中,在 select 时,首先会利用上面所得到的 key 得到对应的 hash,然后对 rangeSize 取余(默认是 65536),再在 rangeMap 中进行比对,找到符合范围的 consumer。
dispatcher 在 select 后,如果发现所得到的 consumer 是当前的 consumer,就会将该消息发送给当前的 consumer,完成 keyHashRange 分发的流程。
评论