写点什么

八股 MQ002——说说 Rebalance?

作者:Codyida
  • 2023-05-04
    广东
  • 本文字数:3607 字

    阅读完需:约 12 分钟

写在前面

这是一道实际的面试题,而且在实际的工作场景中也是经常遇到。在不了解 Rebalance 时,MQ 会带来很多预期之外的问题,而当时的处理方式多少会有点头疼医头,脚痛医脚。因此这次希望能针对这个问题从根源思考,并结合实际的业务场景来重新理解这个问题。

说说 Rebalance

这个问题将从以下几个方面回答。


  1. Rebalance 是什么?

  2. 为什么会有 Rebalance?

  3. Rebalance 的原理是什么?

  4. Rebalance 会有哪些问题?

  5. 如何降低 Rebalance 的负面影响?

Rebalance 是什么?

Rebalance 是一种现象,是描述 MQ 重新分配 Partition 给 Consumer 的动作。


Rebalance 也是一种机制,是 MQ 保障监听 Topic 的 Consumer 都能正常消费的机制。


Rebalance 还可以看做是一种协议,是 MQ 设计用来确保 Consumer 如何达成一致的消费状态,进而确保 Topic 里的消息顺利投送到 Consumer 的方法流程。

为什么会有 Rebalance?

因为监听 MQ 的 ConsumerGroup 内的 Consumer 会动态变化,比如:新加入成员、成员离开、成员断链、成员处理超时等,还有可能 MQ 需要分配分区动态变化,比如增加分区,因此在不同的变化情况下,MQ 为了确保 Consumer 都能正常消费,设计了 Rebalance 机制。这也能解释什么时候会出现 Rebalance:


  1. ConsumerGroup 成员变化:上线/下线/故障异常

  2. Partition 分区数量变化:

  3. Coordinate 节点异常;

  4. 当 Consmer 使用正则订阅 Topic 后,有满足正则匹配的新 Topic 创建之后,也会触发一次 Rebalance,来保证 Consumer 能订阅到新 Topic。

Rebalance 的触发时机是什么?

以 Kafka 为例(目前主要看了 Kafka)。首先针对 Rebalance 出现的原因来看,原理可以归为以下两类:


  1. MQ 分区数量调整

  2. Consumer 动态变化

Partition 分区数量变化

从一条消息的视角来看,一条消息会被放置在投递的 Topic 的某个 Partition 中。而为了确保消息不会被 Consumer 重复消费,一个 Partition 只能被同一个 ConsumerGroup 里的一个 Consumer 消费到。因此当 MQ 增加了 Partition 数量,意味着新的消息会放在新的 Partition 里,而已有的 Consumer 订阅的是旧的 Partition,这就会有消息消费不到,所以这时需要一个 Rebalance,来重新分配现有的 Partition 给各个 Consumer。


这里可以引申两个问题:


  1. 消息是怎么决定要投放到哪个 Partition 中的?

  2. Kafka 调整分区数量有什么需要注意的么?


简单的说明一下:


  1. 有三种方式:a. Kafka 自定义投递规则,默认采用轮询的方式将生产的消息依次轮流放入 Topic 的各个 Partition 里。这样的好处是:没有热点问题,Partition 消息数量分布均匀;b. 设置 PartitionKey,Kafka 会根据这个 Key 来做 hash 进而路由到对应的 Partition 上。这样的好处是:可以指定消息投递的 Partition,进而保证消息局部的有序;缺点是:如果 Partition 分布不均,会有热点问题;c. 自定义规则。

  2. Kafka 分区数量对应到消费并发能力,因为一个 Partition 只能被同 Group 的一个 Consumer 消费,所以 Partition 的数量不会超过 Consumer 的数量(否则会有 Consumer 分配不到 Partition 而空跑)。Kafka 分区数量支持增加,但不支持减少(实现成本太高收益太少)。但还是建议预估 Topic 承载与消费的量级,而提前预估分区的数量。

ConsumerGroup 成员变化

主动的

  1. 新的 Consumer 加入 Group

  2. Consumer 离开 Group


Consumer 的加入和离开 Group 可以以 Consumer 视角为主,来用另一篇文章说明 Kafka 的 Consumer 的状态流转。这里先按下不表。

被动的

Kafka 主要通过三个参数来判断一个 Consumer 是否健康:


  1. session.timeout.ms:Consumer 与 Kafka 连接 Session 的最大超时时长。在这个时长内,Kafka 必须收到 Consumer 的心跳消息,才会认为 Consumer 是健康的。在 Kafka3.0 中,该参数默认值为 45s。

  2. heartbeat.interval.ms:Consumer 向 Kafka 发送心跳的间隔,也可理解为 Consumer 刷新与 Kafka 连接 Session 的间隔时间。该参数默认值为 3s。

  3. max.poll.interval.ms:Consumer 消费消息的最长处理时间,该参数默认为 5min。

Consumer 与 Kafka 的维活 Session

Cosnumer 需要每隔 heartbeat.interval.ms 向 broker 上的 Group Coordinator 来发送心跳,同时 Group Coordinator 也会按照 session.timeout.ms 时间来等待 Consumer 的心跳。考虑 Group Coordinator 是否有在 session.timeout.ms 内收到心跳:


  1. 若收到心跳,则重置等待心跳时间,重新计算等待心跳时间直到等待时长到达 session.timeout.ms

  2. 若没有收到心跳,则 Group Coordinator 会触发一次 Rebalance。


考虑到 Consumer 向 Kafka 发送的心跳有可能丢失,session.timeout.ms 可以设置为 heartbeat.interval.ms 的 3~5 倍。

Consumer 处理耗时

Consumer 通过 poll()方法获取批量待处理的消息后,Group Coordinator 便会等到计算下一次 poll()方法的耗时。考虑 Group Coordinator 是否有在 max.poll.interval.ms 内收到下一个 poll()方法的请求:


  1. 若收到,则重置等待处理完成时间,重新计算等待处理完成时间直到等待时长到达 max.poll.interval.ms

  2. 若没有收到,则 Group Coordinator 移除 Consumer 触发一次 Rebalance。

Rebalance 的流程是怎样的?

考虑一个 Consumer A 在正常消费时,同 Group 有新的 Consumer B 加入。则:


  1. Consumer B 向 Group Coordinator 发送 JoinGroup 请求,Group Coordinator 准备开启一次 Rebalance。

  2. Group Coordinator 会在开启 Rebalance 之后,会通过给 Consumer A 的 HeartBeat 响应,告知 Consumer A 即将开启 Rebalance。

  3. Consumer A 收到要开启 Rebalance 的 HeartBeat Response 后,还可以在 max.poll.interval.ms 时长内处理完 poll 获取的消息。然后向 Group Coordinator 发送自己的 JoinGroup 请求。

  4. Group Coordinator 会等待全部的 Consumer 发送 JoinGroupRequest,Group Coordinator 会根据 JoinGroup Request 中的 session.timeout.ms 和 rebalance_timeout_ms( max.poll.interval.ms)来做接受 Join 请求的等待。

  5. Group Coordinate 会根据所有收到的 JoinGroup 请求,来确认 Group 中可用的 Consumer,Group Leader 和 Partition 的分配策略,将这些信息通过 JoinGroupResponse 返回给 Consumer Leader

  6. Consumer A 与 Consumer B 收到 JoinGroupResponse 之后,非 Group Leader 收到的 Response 为空,不作处理,等待后续流程;Group Leader 收到的 Response 会包含所有 Consumer 与 Partition 分片策略,Group Leader 会根据这些信息计算 Partition 分配结果。

  7. 接下来所有 Consumer 会进入 Synchronizing Group State 阶段。所有 Consumer 会向 Group Coordinator 发送 SyncGroupRequest 请求,其中,非 Group Leader 发送的 Request 内包含信息为空,Group Leader 发送的 Request 包含自己计算的 Partition 分配结果。

  8. Group Coordinator 会将收到的来自 Group Leader 的 Request 里的 Partition 分配结果包装为 SyncGroupResponse 返回给所有 Consumer。

  9. 所有的 Consumer 收到 SyncGroupResposne 后,便可明确自己分配到的 Partition,至此 Rebalance 完成。

  10. Consumer 与 Group Coordinator 通过 poll()与 HeartBeat 交互,等待下一次的 Rebalance。

Rebalance 会有哪些问题?

  1. 重复消息。Rebalance 后,因为 Consumer 重新分配 Partition,Kafka 会重新投递消息,这会导致部分消息会被重复消费。

  2. Stop the world。Rebalance 期间,Group 内的 Consumer 是停止消费的,因此不合预期的 Rebalance 会导致 Group 内的消费停止,这会影响消费效率。

  3. Rebalance Storm。因为 Group Coordinator 在完成 Rebalance 时,会等待 max.poll.interval.ms,如果这时某个 Consumer 在处理 poll()的批消息时,超过了这个时间,那么当 Rebalance 完成后,这个 Consumer 再次 poll(),就又会触发一次 Rebalance。那么如果这种超时响应是由于某短时间网络固定的延迟波动,那么就会导致频繁的 rejoin,进而频繁的 Rebalance,从而产生 Rebalance Storm。

如何降低 Rebalance 的负面影响?

  1. 设置合理的参数。设置 session.timeout.ms 给 HeartBeat 维活一定的兼容性;关注 max.poll.interval.ms 与自身 Consumer 消费消息的处理时长,及时调整参数或者优化 Consumer 消费逻辑;

  2. 保证消息幂等,或者合理的消息滤重。Rebalance 不可完全避免(正常的服务发布更新时的 Consumer 上下线,或者偶发的 Consumer 实例故障而触发 Kafka 故障转移),需要考虑将重复消息的影响降低;

  3. 考虑 Static Membership。该机制可以通过设置 Consumer 的 group.instance.id 来标识 Consumer 为 static member。而后的 Rebalance 时,会将原来的 Partition 分配给原来的 Consumer。而且 Static Membership 限制了 Rebalance 的触发情况,会大大降低 Rebalance 触发的概率

  4. 升级 Kafka 版本,kafka2.4 支持 Incremental Cooperative Rebalance,该 Rebalance 协议尝试将全局的 Rebalance 分解为多次小的 Rebalance,降低 Stop the world 的影响。

写在后面

还需要优化:


  • 补充 RocketMQ 的 Rebalance 机制

  • 文章内容的再版与整理

  • 补充部分文字描述的图示

  • 补充 Rebalance 机制状态机流转说明

  • 补充实际场景中使用的 MQ Rebalance 是否存在其他优化思路

  • 补充 Static Membership 与 Incremental Cooperative Rebalance 在实际 MQ 组件中的使用场景。

参考资料

发布于: 刚刚阅读数: 2
用户头像

Codyida

关注

还未添加个人签名 2017-12-21 加入

还未添加个人简介

评论

发布
暂无评论
八股MQ002——说说Rebalance?_后端_Codyida_InfoQ写作社区