写点什么

[Pulsar] 使用 Partitioned Topic 提高吞吐

作者:Zike Yang
  • 2021 年 11 月 20 日
  • 本文字数:1470 字

    阅读完需:约 5 分钟

在 Pulsar 中,一个 Topic(非 partitioned topic)只能在一个 Broker 中,所以客户端往一个 topic 生产和消费数据会受限于单个 Broker 的带宽和性能。Pulsar 提供的 Partitioned Topic 功能能够将一个 topic 被分成多个子 topic 并可被多个 Brokers 服务,本文将对 Parititoned Topic 进行简要介绍。


创建 Partitioned Topic

Partitioned Topic 实际上是一个逻辑概念,其底层存储和资源分配均是由多个非 Partitioned Topic 组成,比如我们设置一个有 12 个 partitions 的 Partitioned Topic,那么其实里面有 12 个子 topic,它们都可以被不同的 Broker 服务,大大提高了 Topic 的吞吐量。

我们可以使用一下的 Pulsar Admin 命令来创建 Partitioned Topic:

pulsar-admin topics create-partitioned-topic my-topic --partitions 12
复制代码

Broker 在创建这个 Partitioned Topic 的时候,还会创建 12 个子 Topic,每个子 Topic 都会带有后缀,如第一个字 topic 带的后缀是:my-topic-partition-0。我们还可以通过 Partition Lookup 的命令去查询 Partitioned Topic 的分区数。

Partitioned Producer

Partitioned Producer 是 Producer 的一个实现,当在创建 Producer 时,Client 会从 Broker 查询该 Topic 的分区数,如果分区数大于 0,则代表创建的是 Partitioned Topic,那么会使用 Partitioned Producer 的实现。Partitioned Producer 的内部同样维护了多个子 Producer,每个 Producer 对应一个子 topic。

以下是 Partitioned Producer 在初始化子 Producer 的相关代码:

private void start(List<Integer> indexList) {    AtomicReference<Throwable> createFail = new AtomicReference<Throwable>();    AtomicInteger completed = new AtomicInteger();
for (int partitionIndex : indexList) { createProducer(partitionIndex).producerCreatedFuture().handle((prod, createException) -> { if (createException != null) { setState(State.Failed); createFail.compareAndSet(null, createException); }
return null; }); }}
复制代码

Routing Mode

Parititoned Producer 在发送消息的时候,将会根据用设置的不同的 routing 规则去调度消息,如默认使用的 RoudRobinRouter,会将消息按顺序循环地分发给子 topics。用户也可以自定义实现 Router 来实现自定义的路由规则。如一下是 RoudRobinRouter 的实现的部分代码:

public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {  @Override  public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {      // If the message has a key, it supersedes the round robin routing policy      if (msg.hasKey()) {          return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());      }
if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary. long currentMs = clock.millis(); return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions()); } else { return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions()); } }}
复制代码

用户在实现的时候只需要实现 choosePartition 方法即可。


本文介绍了 Partitioned Topic 的基本原理,使用 Partition Topic 可以提高系统的吞吐量,我们也可以使用不同的 Routing Mode 去设置不同的消息分发规则,这些模式需要根据业务使用。

发布于: 27 分钟前阅读数: 4
用户头像

Zike Yang

关注

还未添加个人签名 2020.10.20 加入

Apache Pulsar Contributor

评论

发布
暂无评论
[Pulsar] 使用Partitioned Topic提高吞吐