RocketMQ 你不得不了解的 Rebalance 机制源码分析
RocketMQ 版本
version: 5.1.0
RocketMQ 中 consumer 消费模型
在了解 RocketMQ 的 Rebalance 机制之前,我们必须先简单了解下 rocketmq 的消费模型
我们知道在我们创建 topic 的时候需要指定一个参数就是读队列数
这里假设我们的 topic 是 xiaozoujishu-topic,我们的读队列数 是 4 个,我们同一 gid 下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢 首先需要明确的是:
这里我们的消费模式是集群消费
queue 的负载均衡算法是使用默认的 AllocateMessageQueueAveragely(平均分配) 假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:
四个队列分配给一个消费者
此时如果我们再启动一个消费者,那么这时候就会进行 Rebalance,然后此时我们的队列分配就变成如下:
所以通过上面的队列分配我就知道 Rebalance 是个啥了,我们下面对 Rebalance 进行一些定义
RocketMQ 的 Rebalance 是什么
Rebalance(重新平衡)机制指的是:将一个 Topic 下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配
Rebalance 的目的
从上面可以看出 Rebalance 的本意是把一个 topic 的 queue 分配给合适的 consumer,本意其实是为了提升消息的并行处理能力
但是 Rebalance 也带来了一些危害,后面我们会重点分析下
Rebalance 的触发原因
我们这里先说结论
订阅 Topic 的队列数量变化
消费者组信息变化
这里是最深层的原因,就是 topic 的队列数量、消费组信息 实际我们可以将这些归结为 Rebalance 的元数据,这些元数据的变更,就会引起 clinet 的 Rebalance
注意 RocketMQ 的 Rebalance 是发生在 client
这些元数据都在管 broker 管理 核心就是这三个类
TopicConfigManager
SubscriptionGroupManager
ConsumerManager
只要这个三个类的信息有变化,client 就会进行 Rebalance。 下面我们可以具体说下什么情况下会让这三个类变化
订阅 Topic 的队列数量变化
什么情况下订阅 Topic 的队列数量会变化呢?
broker 扩容
broker 缩容
broker 宕机(本质也是类似缩容)
消费者组信息变化
什么时候消费者组信息会变化呢?
核心就是 consumer 的上下线,具体细分又可以分为如下原因:
服务日常滚动升级
服务扩容
服务订阅消息发生变化
源码分析
上面大致介绍了 Rebalance 的触发原因,现在我们结合源码来具体分析下
我们就从 consumer 的启动开始分析吧
这里我们以最简单的 demo 为例
这里我们直接注意到 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 这个方法,看名字就知道是 client 向所有的 broker 发送心跳
我们进入到 sendHeartbeatToAllBrokerWithLock 方法看看
这段代码主要是通过 this.brokerAddrTable.entrySet()获取到所有的 master broker 地址,然后进行心跳发送
具体的心跳发送代码实际是在下面代码中进行的
我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到 broker,请求码是 RequestCode.HEART_BEAT
我们看看 RequestCode.HEART_BEAT 的调用找到`broker 的处理逻辑
很快我们通过方法名就能定位到处理 client 的请求的方法是 ClientManageProcessor 类的 processRequest
我们具体进去看看这个方法
可以看到具体的逻辑被封装在 return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看
进去这个方法我们能看到一个比较核心的方法 registerConsumer
很明显这个方法就是注册 consumer 的方法
这个方法里面和 Rebalance 相关比较核心的方法就是这三个
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
这里我们可以看看 clientChannelInfo 里面是个啥玩意
具体深入到 updateChannel 方法里面就是判断是否为新的 client,是就更新 channelInfoTable
updateSubscription
这个方法就是判断订阅关系是否发生了变化并更新订阅关系
callConsumerIdsChangeListener
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); 这个方法就是通知 client 进行 Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个 CHANGE 事件
这里我们可以简单看看事件定义的类型有哪些
我们直接看看具体的事件处理类
可以看到实现类有多个,我们直接看 broker 模块的 DefaultConsumerIdsChangeListener 类即可
可以看到这里是给该 group 所有的 client 发送 Rebalance 消息
具体的消息状态码是 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED
client Rebalance
通过上面我们大致找到了整个通信过程,但是实际的 Rebalance 是发生在 client,所以我们还是需要继续回到 client 的代码
我们通过状态码 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED 找到 client 的处理类 ClientRemotingProcessor
实际处理方法就是
我们进入这个方法看看这里最终就是唤醒阻塞的 Rebalance 线程
所以实际的方法调用还是在 RebalanceService 的 run 方法
最终还是调用的是 MQConsumerInner 接口中的 doRebalance 方法
这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?
原来是 cleint 也会定时去 Rebalance 默认是 20s 一次,可以配置
可以通过参数 rocketmq.client.rebalance.waitInterval 去配置
那么为什么 client 还要自己去循环 Rebalance
原来这里是防止因为网络等其他原因丢失了 broker 的请求,后续网络回复了,也能进行进行 Rebalance
下面我们继续看看 Rebalance 的实现细节
这里我们以常用的 DefaultMQPushConsumerImpl 为例
实际这里最终调用的还是抽象类 RebalanceImpl 的 doRebalance 方法
可以看到这里的 Rebalance 是按照 topic 的维度
我们先理解订阅单个 topic 的原理
这里的就是先对 topic 的 queue 排序,然后对 consumer 排序, 然后调用 AllocateMessageQueueStrategy 的 allocate 方法 这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析
这里的分配方式就是我们前面画图的,比如 4 个 queue,2 个 consumer,那么就是每个 consumer2 个 queue。简单举例就是我们的 queue 有 q1、q2、q3、q4consumer 有 c1、c2
那么就是 c1:q1、q2 c2:q2、q3
需要注意的是如果 consumer 大于 queue 数量,多出的 consumer 就不会被分配到 queue
client 什么时候触发 Rebalance
上面分析了这么多原理,这里我们总结下 client 什么时候会触发 Rebalance
consumer 启动时会向所有 master broker 发送心跳,然后 broker 发送信息通知所有 consumer 触发 Rebalance
启动完成后 consumer 会周期的触发 Rebalance,防止因为网络等问题丢失 broker 的通知而没有 Rebalance
当 consumer 停止时,也会通过之前分析的事件机制,触发注销 comsuer 事件然后通知所有的 comsuer 触发 Rebalance
总结
这里我们详细介绍了 client 是如何触发 Rebalance 的,以及触发 Rebalance 的时机,也介绍了 Rebalance 的好处。 实际还有很多细节我们限于篇幅暂未分析。 后面我们会继续分析 Rebalance 的坏处和一些详细的 Rebalance 算法
评论