写点什么

[Pulsar] 按照 KeyHashRange 读取消息

作者:Zike Yang
  • 2021 年 11 月 30 日
  • 本文字数:1389 字

    阅读完需:约 5 分钟

在 Pulsar 中,我们能够为 Consumer 或者 Reader 指定相应的 KeyHashRange,使得 Broker 在进行消息分发的时候,能够对消息的 key 进行 hash 并根据 consumer 所设置的 keyHashRange 选择对应的 consumer,并将消息分发给 cosumer。本文将介绍 KeyHashRange 的原理。


设置 KeyHashRange

在 Reader 中,我们可以直接像下面这样设置 KeyHashRange

pulsarClient.newReader()  .topic(topic)  .startMessageId(MessageId.earliest)  .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))  .create();
复制代码

HashRange 默认大小是 65536,这意味着 65536 范围中,这个 reader 只会占用 0-10000 和 20001-30000 的范围,落入该范围的消息会被分发到这个 reader 上。

同样的,对于 consumer,我们可以这样设置:

Consumer<String> consumer = client.newConsumer(Schema.STRING)  .keySharedPolicy(KeySharedPolicy                    .stickyHashRange()                   .ranges(Range.of(0, 10000), Range.of(20001, 30000))                   .topic(topicName)                   .subscribe();
复制代码

其效果和上面的 reader 是一样的。


KeyHashRange 原理

这里以订阅模型为 Exclusive 的 subscription 为例,Broker 会使用 PersistentDispatcherSingleActiveConsumer 的 Dispatcher 实现来进行消息的调度和分发。

在 ManagedCurosr 读取完消息后,会调用 PersistentDispatcherSingleActiveConsumer 的 readEntriesComplete 的回调方法。接着会检查是否开启了 keyHashRange 过滤,如果是则会进入 keyHashRange 逻辑。

if (isKeyHashRangeFiltered) {  Iterator<Entry> iterator = entries.iterator();  while (iterator.hasNext()) {    Entry entry = iterator.next();    byte[] key = peekStickyKey(entry.getDataBuffer());    Consumer consumer = stickyKeyConsumerSelector.select(key);    // Skip the entry if it's not for current active consumer.    if (consumer == null || currentConsumer != consumer) {      entry.release();      iterator.remove();    }  }}
复制代码

首先会获取消息的 key(byte[]类型),然后根据这个 key 选择对应的 consumer。

以下是 select 的具体实现

@Overridepublic Consumer select(int hash) {    if (rangeMap.size() > 0) {        int slot = hash % rangeSize;        Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);        Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(slot);        Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;        Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;        if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {            return ceilingConsumer;        } else {            return null;        }    } else {        return null;    }}
复制代码

Broker 在添加 consumer 的时候,会将 consumer 及其对应的 range 加入到 rangeMap 中,在 select 时,首先会利用上面所得到的 key 得到对应的 hash,然后对 rangeSize 取余(默认是 65536),再在 rangeMap 中进行比对,找到符合范围的 consumer。

dispatcher 在 select 后,如果发现所得到的 consumer 是当前的 consumer,就会将该消息发送给当前的 consumer,完成 keyHashRange 分发的流程。

发布于: 2 小时前阅读数: 5
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 按照KeyHashRange读取消息