写点什么

图解 Kafka 消费者客户端分区分配策略

  • 2023-09-11
    江西
  • 本文字数:5586 字

    阅读完需:约 18 分钟

提示:本文可能已过期,请点击原文查看:图解Kafka消费者客户端分区分配策略

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming PMC, 《Kafka 运维与实战宝典》电子书作者。领取《Kafka 运维与实战宝典》PDF 请联系石臻臻


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,可以联系我呀

@

  • 1. 分配策略的作用

  • 2. 分配策略的选择 2.1 分配策略配置 2.2 选择合适的策略

  • 3. 分配策略计算和传播 3.1 分配策略计算时机 3.2 分配策略传播

  • 4. 图解所有分配策略 4.1 RangeAssignor 范围分区分配策略 4.2 RoundRobinAssignor 轮询分区策略 4.3 StickyAssignor 粘性分区策略 4.4 CooperativeStickyAssignor 策略 4.5 自定义分配策略

  • 5. 重平衡协议


11. 分配策略的作用

我们在分析生产者的时候有专门写过文章分析生产者的分区分配策略

Kafka中生产消息时的三种分区分配策略

生成者的分配策略是把我们产生的消息选择一个合适的分区去发送,

那么今天我们要讲解一下 消费者的分区分配策略 他要做的事情是

同一个消费组中 给不同消费者分配能够消费的分区数;

同一个消费组中,一个分区只会被一个消费者消费。

22. 分配策略的选择

2.1 分配策略配置

每个消费组客户端都可以配置一个partition.assignment.strategy属性并且可以配置多个自己支持的分配策略,例如:


partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RoundRobinAssignor

复制代码

默认策略是 org.apache.kafka.clients.consumer.RoundRobinAssignor

2.2 选择合适的策略

既然每个客户端成员都可以配置多个自己支持的分配策略, 那么 GroupCoordinator(消费组协调器)使用哪个分配策略去分配这些资源呢?

肯定是需要消费组下面的所有成员都使用同一种分配策略来进行分配。 所以 GroupCoordinator 就面临着选择哪个分配策略。

选择的逻辑如下

  1. 选择所有 Member 都支持的分配策略

  2. 在 1 的基础上,优先选择每个partition.assignment.strategy配置靠前的策略。

请看下面的 2 个例子

Case-1

  1. 所有支持的分配策略为:roundrobin,rang

  2. 每个 consumer 都在 1 的基础上,给自己排最前面的投票, consumer-0 投 roundrobin, consumer-1 投 rang, consumer-3 投 roundrobin; 这样算下来 roundrobin 是有 2 票的, 那么久选择 roundrobin 为分配策略;

Case-2

  1. 所有支持的分配策略为:rang

  2. 都不用投票, 直接选择 rang 当选

如果新 Member 加入 Group 的时候, 带上的分配策略跟现有 Group 中所有 Member(Group 有 Member 的情况下)都支持的协议都不交叉

那么就会抛出异常:INCONSISTENT_GROUP_PROTOCOL


[2022-09-08 14:34:12,508] INFO [Consumer clientId=client2, groupId=consumer0] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.[2022-09-08 14:34:12,511] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)

复制代码

这个协议的选择的代码逻辑在 GroupMetadata#selectProtocol

调用的时机是当前发起 JoinGroup 的 Member 都完成 JoinGroup,并调用 onCompleteJoin

具体详情可以看 : Kafka消费者JoinGroupRequest流程解析

33. 分配策略计算和传播

3.1 分配策略计算时机

既然我们已经知道了分区分配策略的选择, 那么什么时候会触发这个策略的逻辑计算呢?

如果你有看过之前的文章: Kafka消费者JoinGroupRequest流程解析 那么对此就肯定会有一定的了解

当所有的 Member(成员)发起 JoinGroup 请求, 并且组协调器(GroupCoordinator)也都处理正常,就会回调当前发起 JoinGroup 请求的 Member(成员)

其中有个最特别的就是, 组协调器(GroupCoordinator)会把所有的 Member(成员)的元信息打包一并返回给那个 Leader Member, 而 Follow Member 是不会返回的。

Leader Member 接受到回调并拿到这个元信息之后, 就开始去计算每个成员应该被分配到的分区。

代码定位

ConsumerCoordinator#performAssignment


@Override    protected Map<String, ByteBuffer> performAssignment(String leaderId,                                                        String assignmentStrategy,                                                        List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {        ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);        if (assignor == null)            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        //省略部分代码...                // 更新一下所有订阅的Topic的元信息        // 如果有变更的元信息则更新一下        updateGroupSubscription(allSubscribedTopics);
         //省略部分代码...
        Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
        if (protocol == RebalanceProtocol.COOPERATIVE) {            validateCooperativeAssignment(ownedPartitions, assignments);        }
          //省略部分代码...    }

复制代码

上面的代码主要是 根据分配策略,获取分配策略实例, 然后调用 assign方法进行计算,得到分配方式。

但是最终调用的计算逻辑是每个 AbstractPartitionAssignor 实现类的assign方法。

并且也可以实现自定义的分配策略.只需要实现接口 AbstractPartitionAssignor 就行。

3.2 分配策略传播

3.1 分配策略计算时机 中我们知道分配策略的计算时机, 那么计算好了之后如何告知其他的 Member, 他们对应的分配状态呢?

当每个 Member 收到 JoinGroup 的回调之后, 他们会发起一个 SyncGroupRequest, 其中 Leader Member 就会把刚刚计算好的分配策略, 一起当做入参发起请求。请看下图

上面发起的请求也只是告知了组协调器(GroupCoordinator)分配的情况, 最终还是需要组协调器(GroupCoordinator)来告知每个 Member 的。

那么这个告知的过程就是所有 Member 都同步完成后的回调 ;

具体请看:KafkaConsumer SyncGroupRequest详解

44. 图解所有分配策略

上面所有的铺垫都讲解清楚了,那么目前 Kafka 支持哪些分配策略呢?

我们来一一分析一下

4.1 RangeAssignor 范围分区分配策略


partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor

复制代码

这也是默认的分配策略

它是以单个 Topic 为一个维度来计算分配的, 他只负责将每一个 Topic 的分区尽可能均衡的分配给消费者

  1. 消费组里面所有消费者(Member)按照字母排序, 给 Topic 的分区按照分区号排序。

  2. 先计算每个分区最少平均分配多少个分区数, 然后余下的逐个分举个例子:Topic 为 Topic1 有 11 个分区;有 3 个消费者订阅那么平均每个 11/3=3 余 2, 那么前面两个可以分到 4 个分区,最后一个分到 3 个;[ 4, 4, 3 ]

    他们最终分配方式如下

    分配是先分完一个消费者再分配下一个的,跟遍历是有区别。clientId-1 先分到 [ 0 , 1 , 2 , 3 ] 号分区, 后面的接着分。


图里面的 Member 就是消费者, 对消费组来说他内部的对象是 Member

Range 弊端

Range 针对单个 Topic 的情况下显得比较均衡, 但是假如 Topic 很多的话, Member 排序靠前的可能会比 Member 排序靠后的负载多很多。

看,像这种情况, 3 个 Member 都订阅了这 4 个 Topic, 可是 Member 这么多分区愣是没有分配到 1 个

4.2 RoundRobinAssignor 轮询分区策略

把所有 Member 排序, 所有 TopicPartition 排序。轮训遍历分配

Member-3 下线



RoundRobin 的一些弊端

如果成员订阅的 Topic 不尽相同的时候, 最终结果也不可能会完全均衡的。

如果图中的 Memner-3 比另外两个多订阅了 Topic-4,那他总共就消费了 6 个分区了, 但是另外两个分别只消费了 2 个分区。

如果这里的 Member-3 把分区 Topic2-0、Topic3-1 分给另外两个那才是最均衡的情况。

那么有什么策略能解决这个问题吗?接下来我们另外一个分区策略 -- 粘性分区

4.3 StickyAssignor 粘性分区策略

上面介绍的两种分区分配方式,多多少少都会有一些分配上的偏差, 而且每次重新分配的时候都是把所有的都重新来计算并分配一遍, 那么每次分配的结果都会偏差很多, 如果我们在计算的时候能够考虑上一次的分配情况,来尽量的减少分配的变动,这样我们将尽可能地撤销更少的分区,因为撤销过程是昂贵的

我们之前在讲生产者的时候也讲过粘性分区: Kafka中生产消息时的三种分区分配策略

那么消费者的粘性分区策略是什么样子的呢?

目标:

  1. 分区的分配尽量的均衡

  2. 每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出 StickyAssignor 特性的。

首先, StickyAssignor 粘性分区在进行分配的时候,是以 RoundRobinAssignor 的分配逻辑来计算的,但是它又弥补了 RoundRobinAssignor 的一些可能造成不均衡的弊端。

比如在讲 RoundRobinAssignor 弊端的那种 case, 但是在 StickyAssignor 中就是下图的分配情况

把 RoundRobinAssignor 的弊端给优化了

体现粘性分区地方就在于重新分配的时候了, 还是上面的 case(上图右边的 StickAssignor), 假如 Member-2 离线了

粘性分区的计算方式把把离线的那个 Member 所属的分区分配给其他的 Member, 在其他的 Member 已拥有的分区不变的前提下,尽量的均衡。

Member-2 有 3 个分区, 可以分两个分区给 Member-1,分 1 个分区给 Member-3 最终分配图如下:

4.4 CooperativeStickyAssignor 策略

上面分析的 StickyAssignor 粘性分区策略,主要作用是保证消费者客户端在重平衡之后能够维持原本的分配方案。

但是 StickyAssignor 还是属于 RebalanceProtocol.EAGER 协议, 重平衡的时候需要每个客户端都要先放弃当前持有的资源。

为了解决这个问题, 所以就有了 CooperativeStickyAssignor 分配策略

你可以理解为 CooperativeStickyAssignor 的分配策略跟 StickyAssignor 的策略差不多。

但是它在此基础上是用的 RebalanceProtocol.COOPERATIVE 协议。渐进式的重平衡。

后续专门写一篇文章来讲解一下这一块内容,挖个坑 0.0

4.5 自定义分配策略

我们先看一下分区策略的类图

我们想要自定义分配策略,只需要实现接口:

public interface ConsumerPartitionAssignor {
    /**     * 返回序列化后的自定义数据     */    default ByteBuffer subscriptionUserData(Set<String> topics) {        return null;    }
    /**     * 分区分配的计算逻辑     */    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
    /**     * 当组成员从领导者那里收到其分配时调用的回调     */    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {    }
    /**     * 指明使用的再平衡协议     * 默认使用RebalanceProtocol.EAGER协议, 另外一个可选项为 RebalanceProtocol.COOPERATIVE     */    default List<RebalanceProtocol> supportedProtocols() {        return Collections.singletonList(RebalanceProtocol.EAGER);    }
    /**     * Return the version of the assignor which indicates how the user metadata encodings     * and the assignment algorithm gets evolved.     */    default short version() {        return (short) 0;    }
    /**     * 分配器的名字     * 例如 RangeAssignor、RoundRobinAssignor、StickyAssignor、CooperativeStickyAssignor     * 对应的名字为     * range、roundrobin、sticky、cooperative-sticky     */    String name();
复制代码

当然我们也可以根据自己的需求来实现其他的抽象类

比如:AbstractStickyAssignor 抽象类就是专门给粘性分区使用的抽象类

55. 重平衡协议

上面我们讲的是分区策略, 但是分区策略本质上又分为两大类

  1. RebalanceProtocol.EAGER

  2. RebalanceProtocol.COOPERATIVE 协作重平衡,kafak2.4 出的功能。

这两个区别是

EAGER 重新平衡协议要求消费者在参与重新平衡事件之前始终撤销其拥有的所有分区。因此,它允许完全改组分配

COOPERATIVE 协议允许消费者在参与再平衡事件之前保留其当前拥有的分区。分配者不应该立即重新分配任何拥有的分区,而是可以指示消费者需要撤销分区,以便可以在下一次重新平衡事件中将被撤销的分区重新分配给其他消费者

COOPERATIVE 协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。

COOPERATIVE 有效的改进来在此之前 EAGER 协议重平衡而触发的 stop-the-world(STW)

我们上面讲的分配策略 3 种策略都是 RebalanceProtocol.EAGER 协议

  1. RangeAssignor 范围分区分配策略

  2. RoundRobinAssignor 轮询分区策略

  3. StickyAssignor 粘性分区策略

CooperativeStickyAssignor 分配策略是使用的 RebalanceProtocol.COOPERATIVE 协议

关于更多的关于重平衡协议的讲解,请看: Kafka 重平衡的两种协议讲解

发布于: 刚刚阅读数: 3
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
图解Kafka消费者客户端分区分配策略_Kafk_石臻臻的杂货铺_InfoQ写作社区