写点什么

大数据 -67 Kafka 分区分配策略详解与实战:Range、RoundRobin、Sticky 全面解析

作者:武子康
  • 2025-08-14
    山东
  • 本文字数:4689 字

    阅读完需:约 15 分钟

大数据-67 Kafka 分区分配策略详解与实战:Range、RoundRobin、Sticky 全面解析

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 08 月 11 日更新到:Java-94 深入浅出 MySQL EXPLAIN 详解:索引分析与查询优化详解 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下内容:现实中业务中我们遇到了分区副本数量想要调整的问题,假设起初我们的分区副本数只有 1,想要修改为 2、3 来保证当部分 Kafka 的 Broker 宕机时,仍然可以提供服务给我们,但是不可以用脚本直接修改,所以我们通过 JSON+脚本的方式,来达到 Kafka 副本分区的调整。


  • 启动服务、创建主题、查看主题

  • 修改分区副本因子(不允许)、修改分区副本因子(成功)

  • 查看结果


分区分配策略

在 Kafka 的消息队列架构中,Topic 作为消息的逻辑分类单位,会被划分为多个分区(Partition)以实现数据的分布式存储和并行处理。默认情况下,每个分区在同一消费组(Consumer Group)中只能被一个消费者(Consumer)消费,这就引出了分区分配策略(Partition Assignment Strategy)的关键问题。


Kafka 提供了以下几种核心的分区分配算法(PartitionAssignor),每种算法都有其特定的适用场景和权衡考量:


  1. RangeAssignor(范围分配器)

  2. 工作原理:按照分区编号范围进行分配

  3. 特点:将连续的分区范围分配给消费者

  4. 示例:假设有分区 0-9 和 3 个消费者,分配结果为:

  5. 消费者 1:0-3

  6. 消费者 2:4-6

  7. 消费者 3:7-9

  8. 适用场景:消费者数量固定的稳定环境

  9. 潜在问题:可能导致分配不均,特别是在分区数无法被消费者数整除时

  10. RoundRobinAssignor(轮询分配器)

  11. 工作原理:采用轮询方式均匀分配分区

  12. 特点:最大化实现负载均衡

  13. 示例:同样分区 0-9 和 3 个消费者,分配结果为:

  14. 消费者 1:0,3,6,9

  15. 消费者 2:1,4,7

  16. 消费者 3:2,5,8

  17. 适用场景:需要均衡负载的动态消费环境

  18. 优势:相比 RangeAssignor 能提供更均衡的分配

  19. StickyAssignor(粘性分配器)

  20. 工作原理:在保证均衡的前提下尽量减少分区重分配

  21. 特点:当消费者变动时,尽量保持原有分配关系

  22. 示例:新增消费者时,只从现有消费者处转移最少量的分区

  23. 适用场景:消费者频繁变更的弹性环境

  24. 核心价值:减少再平衡时的分区迁移开销,提高系统稳定性


这些分配策略可以通过修改消费者客户端的partition.assignment.strategy参数进行配置。在实际生产环境中,选择合适的分区分配策略需要综合考虑消费者数量、分区数量、消费者变动频率等因素,以优化消息处理的吞吐量和延迟表现。


RangeAssignor

  • PartionAssignor 接口用于用户自定义分区分配算法,以实现 Consumer 之间的分区分配。

  • 消费组的成员定义他们感兴趣的 Topic 并将这种订阅关系传递给作为订阅组协调者的 Broker,协调者选择其中一个消费者来执行这个消费组的分区,并将分配结果转发给消费组内所有的消费者。

  • Kafka 默认采用的是 RangeAssignor 的分配算法。



  • RangeAssignor 对每个 Topic 进行独立的分区分配,对于每一个 Topic,首先对分区按照分区 ID 进行数值排序,然后订阅这个 Topic 的消费组的消费者再进行字典排序,之后尽量均衡的将分区分配给消费者,这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,有一些消费者就会多分配到一些分区。

  • RangeAssignor 策略的原理是按照消费者总数和分区总数进行整除运行来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能的均匀的分配给所有的消费者。

  • 对于每一个 Topic,RangerAssignor 策略会将消费组内所有订阅这个 Topic 的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够分配均衡,那么字典序靠前的消费者会被多分配一个分区。


RoundRobinAssignor

  • RoundRobinAssignor 的分配策略是将消费组内订阅的所有 Topic 的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor 是针对单个 Topic 的分区进行排序分配的)。

  • 如果消费组内,消费订阅的 Topic 列表是相同的(每个消费者都订阅了相同的 Topic),那么分配结果尽量均衡。如果订阅的 Topic 列表是不同的,那么分配结果不保证尽量均衡。



  • 对于 RangeAssignor,在订阅多个 Topic 的情况下,RoundRobinAssignor 的方式能让消费者之间尽量均衡的分配到分区(分配到的分区的差值不会超过 1,而 RangeAssignor 的分配策略可能随着订阅的 Topic 越来越多,差值越来越大)

  • 对于消费组内消费者订阅 Topic 不一致的情况:假设有两个消费者分别为 C0 和 C1,有 2 个 TopicT1、T2,分别有 3 个分区、2 个分区,并且 C0 订阅了 T1 和 T2,那么 RoundRobinAssignor 的分配结果如下:


StickyAssignor

尽管 RoundRobinAssignor 已经在 RangeAssignoror 上做了一些优化来更均衡的分配分区,但是在一些情况下依旧会产生严重的分配偏差,比如消费组中订阅的 Topic 列表不相同的情况下。更核心的问题是无论是 RangeAssignor,还是 RoundRobinAssignor,当前的分区分配算法都没有考虑上一次的分配结果。显然,在执行一次新的分配之前,如果能考虑到上一次的分配结果,尽量少的调整分区分配的变动,显然是能减少很多开销的。Sticky 是“粘性的”,可以理解为分配是带粘性的:


  • 分区的分配尽量的均衡

  • 每一次重分配的结果尽量与上一次分配结果保持一致


当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个才是真正体现出 StickyAssignor 特性的。


假设当前有如下内容:


  • 3 个 Consumer C0、C1、C2

  • 4 个 Topic:T0、T1、T2、T3 每个 Topic 有 2 个分区

  • 所有 Consumer 都订阅了 4 个分区



如果 C1 宕机,此时 StickyAssignor 的结果:


自定义分区策略

基本概念

需要实现:org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口其中定义了两个内部类:


  • Subscription:用来表示消费者的订阅信息,类中有两个属性:topics、userData,分别表示消费者所订阅 Topic 列表和用户自定义信息。

  • PartitionAssignor 接口通过 subscription()方法来设置消费者自身相关的 Subscription 信息,注意此方法中只有一个参数 Topics,与 Subscription 类中的 topics 相互呼应,但是并没有有关 userData 的参数体现。为了增强用户对分配结果的控制,可以在 Subscription()方法内部添加一些影响分配的用户自定义信息赋予 userData,比如:权重、IP 地址、HOST 或者机架

  • Assignment:用来表示分配信息的,类中有两个属性:partitions、userData,分别表示所分配到的分区集合和用户自定义的数据,可以通过 PartitonAssignor 接口中的 onAssignment()方法是在每个消费者收到消费组 Leader 分配结果时的回调函数,例如在:StickyAssignor 策略中就是通过这个方法保存当前的分配方案,以备下次消费组再平衡(Rebalance)时可以提供分配参考依据。


Kafka 还提供了一个抽象类:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以简化 PartitionAssignor 接口的实现,对 assign() 方法进行了实现,其中将 Subscription 的 userData 信息去掉后,在进行分配。

代码实现

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.TopicPartition;
import java.nio.ByteBuffer;import java.util.*;
public class WeightedPartitionAssignor implements ConsumerPartitionAssignor {
@Override public Subscription subscription(Set<String> topics) { // 在这里添加权重信息到 userData ByteBuffer buffer = ByteBuffer.allocate(4); buffer.putInt(getWeight()); buffer.flip(); return new Subscription(new ArrayList<>(topics), buffer); }
@Override public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { Map<String, Assignment> assignments = new HashMap<>(); Map<TopicPartition, List<String>> partitionConsumers = new HashMap<>();
// 遍历所有订阅的topics for (String topic : metadata.topics()) { List<TopicPartition> partitions = metadata.partitionsForTopic(topic); for (TopicPartition partition : partitions) { partitionConsumers.putIfAbsent(partition, new ArrayList<>()); } }
// 根据权重分配分区 for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) { String consumerId = subscriptionEntry.getKey(); Subscription subscription = subscriptionEntry.getValue(); int weight = subscription.userData().getInt();
for (String topic : subscription.topics()) { List<TopicPartition> partitions = metadata.partitionsForTopic(topic); for (TopicPartition partition : partitions) { List<String> consumers = partitionConsumers.get(partition); for (int i = 0; i < weight; i++) { consumers.add(consumerId); // 权重高的消费者多次添加,增加选中的机会 } } } }
// 随机分配分区给消费者 Random random = new Random(); for (Map.Entry<TopicPartition, List<String>> entry : partitionConsumers.entrySet()) { List<String> consumers = entry.getValue(); String assignedConsumer = consumers.get(random.nextInt(consumers.size())); assignments.computeIfAbsent(assignedConsumer, k -> new Assignment(new ArrayList<>())) .partitions().add(entry.getKey()); }
return assignments; }
@Override public void onAssignment(Assignment assignment, Cluster metadata) { // 可以在这里处理分配后的逻辑,比如保存当前分配的快照 }
@Override public String name() { return "weighted"; }
private int getWeight() { // 获取权重,可以从配置文件或环境变量中获取 return 10; // 默认权重为10 }}
复制代码

注册使用

Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, WeightedPartitionAssignor.class.getName());// 配置其他消费者属性
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic1", "topic2"));
复制代码


发布于: 刚刚阅读数: 5
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-67 Kafka 分区分配策略详解与实战:Range、RoundRobin、Sticky 全面解析_Java_武子康_InfoQ写作社区