写点什么

RocketMQ 你不得不了解的 Rebalance 机制源码分析

作者:Java你猿哥
  • 2023-05-19
    湖南
  • 本文字数:4175 字

    阅读完需:约 14 分钟

RocketMQ 版本

  • version: 5.1.0

RocketMQ 中 consumer 消费模型

在了解 RocketMQ 的 Rebalance 机制之前,我们必须先简单了解下 rocketmq 的消费模型

我们知道在我们创建 topic 的时候需要指定一个参数就是读队列数


这里假设我们的 topic 是 xiaozoujishu-topic,我们的读队列数 是 4 个,我们同一 gid 下的集群消费模式的消费者有两个,那么我们消费者是如何消费消息的呢 首先需要明确的是:

  1. 这里我们的消费模式是集群消费

  2. queue 的负载均衡算法是使用默认的 AllocateMessageQueueAveragely(平均分配) 假设我们项目刚开始只有一个消费者,那么我们的消费队列分配就如下:


四个队列分配给一个消费者

此时如果我们再启动一个消费者,那么这时候就会进行 Rebalance,然后此时我们的队列分配就变成如下:


所以通过上面的队列分配我就知道 Rebalance 是个啥了,我们下面对 Rebalance 进行一些定义

RocketMQ 的 Rebalance 是什么

Rebalance(重新平衡)机制指的是:将一个 Topic 下的多个队列(queue),在同一个消费者组(consumer group)(gid)下的多个消费者实例(consumer instance)之间进行重新分配

Rebalance 的目的

从上面可以看出 Rebalance 的本意是把一个 topic 的 queue 分配给合适的 consumer,本意其实是为了提升消息的并行处理能力

但是 Rebalance 也带来了一些危害,后面我们会重点分析下

Rebalance 的触发原因

我们这里先说结论

  1. 订阅 Topic 的队列数量变化

  2. 消费者组信息变化

这里是最深层的原因,就是 topic 的队列数量、消费组信息 实际我们可以将这些归结为 Rebalance 的元数据,这些元数据的变更,就会引起 clinet 的 Rebalance

注意 RocketMQ 的 Rebalance 是发生在 client

这些元数据都在管 broker 管理 核心就是这三个类

  • TopicConfigManager

  • SubscriptionGroupManager

  • ConsumerManager

只要这个三个类的信息有变化,client 就会进行 Rebalance。 下面我们可以具体说下什么情况下会让这三个类变化

订阅 Topic 的队列数量变化

什么情况下订阅 Topic 的队列数量会变化呢?

  1. broker 扩容

  2. broker 缩容

  3. broker 宕机(本质也是类似缩容)

消费者组信息变化

什么时候消费者组信息会变化呢?

核心就是 consumer 的上下线,具体细分又可以分为如下原因:

  1. 服务日常滚动升级

  2. 服务扩容

  3. 服务订阅消息发生变化

源码分析

上面大致介绍了 Rebalance 的触发原因,现在我们结合源码来具体分析下

我们就从 consumer 的启动开始分析吧

这里我们以最简单的 demo 为例

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);consumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);    //return ConsumeConcurrentlyStatus.RECONSUME_LATER;    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();
System.out.printf("Consumer Started.%n");
复制代码

这里我们直接注意到 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); 这个方法,看名字就知道是 client 向所有的 broker 发送心跳


我们进入到 sendHeartbeatToAllBrokerWithLock 方法看看

private void sendHeartbeatToAllBroker() {    final HeartbeatData heartbeatData = this.prepareHeartbeatData();    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();    if (producerEmpty && consumerEmpty) {        log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);        return;    }
if (this.brokerAddrTable.isEmpty()) { return; } long times = this.sendHeartbeatTimesTotal.getAndIncrement(); for (Entry<String, HashMap<Long, String>> brokerClusterInfo : this.brokerAddrTable.entrySet()) { String brokerName = brokerClusterInfo.getKey(); HashMap<Long, String> oneTable = brokerClusterInfo.getValue(); if (oneTable == null) { continue; } for (Entry<Long, String> singleBrokerInstance : oneTable.entrySet()) { Long id = singleBrokerInstance.getKey(); String addr = singleBrokerInstance.getValue(); if (addr == null) { continue; } if (consumerEmpty && MixAll.MASTER_ID != id) { continue; }
try { int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout()); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap<>(4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e); } else { log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr, e); } } }}
复制代码

这段代码主要是通过 this.brokerAddrTable.entrySet()获取到所有的 master broker 地址,然后进行心跳发送

具体的心跳发送代码实际是在下面代码中进行的

int version = this.mQClientAPIImpl.sendHeartbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
复制代码

我们进入到该方法会发现和我们之前分析的一样,就是发送一个请求到 broker,请求码是 RequestCode.HEART_BEAT


我们看看 RequestCode.HEART_BEAT 的调用找到`broker 的处理逻辑

很快我们通过方法名就能定位到处理 client 的请求的方法是 ClientManageProcessor 类的 processRequest


我们具体进去看看这个方法


可以看到具体的逻辑被封装在 return this.heartBeat(ctx, request);这个方法中,所以我们需要再进去看看

进去这个方法我们能看到一个比较核心的方法 registerConsumer


很明显这个方法就是注册 consumer 的方法


这个方法里面和 Rebalance 相关比较核心的方法就是这三个

  1. consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);

这里我们可以看看 clientChannelInfo 里面是个啥玩意


具体深入到 updateChannel 方法里面就是判断是否为新的 client,是就更新 channelInfoTable


  1. updateSubscription

这个方法就是判断订阅关系是否发生了变化并更新订阅关系


  1. callConsumerIdsChangeListener

  • callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); 这个方法就是通知 client 进行 Rebalance,具体的实现是参考了类似事件订阅的方式去实现的,这里是发送了一个 CHANGE 事件


这里我们可以简单看看事件定义的类型有哪些


我们直接看看具体的事件处理类


可以看到实现类有多个,我们直接看 broker 模块的 DefaultConsumerIdsChangeListener 类即可


可以看到这里是给该 group 所有的 client 发送 Rebalance 消息


具体的消息状态码是 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED


client Rebalance

通过上面我们大致找到了整个通信过程,但是实际的 Rebalance 是发生在 client,所以我们还是需要继续回到 client 的代码

我们通过状态码 RequestCode.NOTIFY_CONSUMER_IDS_CHANGED 找到 client 的处理类 ClientRemotingProcessor


实际处理方法就是

this.mqClientFactory.rebalanceImmediately();
复制代码

我们进入这个方法看看这里最终就是唤醒阻塞的 Rebalance 线程


所以实际的方法调用还是在 RebalanceService 的 run 方法


最终还是调用的是 MQConsumerInner 接口中的 doRebalance 方法

这里有个细节,为什么不是直接调用一个静态方法,要搞这么多花里胡哨的唤醒线程操作?

原来是 cleint 也会定时去 Rebalance 默认是 20s 一次,可以配置


可以通过参数 rocketmq.client.rebalance.waitInterval 去配置

那么为什么 client 还要自己去循环 Rebalance

原来这里是防止因为网络等其他原因丢失了 broker 的请求,后续网络回复了,也能进行进行 Rebalance

下面我们继续看看 Rebalance 的实现细节

这里我们以常用的 DefaultMQPushConsumerImpl 为例

实际这里最终调用的还是抽象类 RebalanceImpl 的 doRebalance 方法


可以看到这里的 Rebalance 是按照 topic 的维度

我们先理解订阅单个 topic 的原理


这里的就是先对 topic 的 queue 排序,然后对 consumer 排序, 然后调用 AllocateMessageQueueStrategy 的 allocate 方法 这里我们暂时只分析默认的平均分配算法(AllocateMessageQueueAveragely),也就是我们最先说的分配算法。其他算法可以详细分析

这里的分配方式就是我们前面画图的,比如 4 个 queue,2 个 consumer,那么就是每个 consumer2 个 queue。简单举例就是我们的 queue 有 q1、q2、q3、q4consumer 有 c1、c2

那么就是 c1:q1、q2 c2:q2、q3

需要注意的是如果 consumer 大于 queue 数量,多出的 consumer 就不会被分配到 queue

client 什么时候触发 Rebalance

上面分析了这么多原理,这里我们总结下 client 什么时候会触发 Rebalance

  1. consumer 启动时会向所有 master broker 发送心跳,然后 broker 发送信息通知所有 consumer 触发 Rebalance

  2. 启动完成后 consumer 会周期的触发 Rebalance,防止因为网络等问题丢失 broker 的通知而没有 Rebalance

  3. 当 consumer 停止时,也会通过之前分析的事件机制,触发注销 comsuer 事件然后通知所有的 comsuer 触发 Rebalance

总结

这里我们详细介绍了 client 是如何触发 Rebalance 的,以及触发 Rebalance 的时机,也介绍了 Rebalance 的好处。 实际还有很多细节我们限于篇幅暂未分析。 后面我们会继续分析 Rebalance 的坏处和一些详细的 Rebalance 算法

用户头像

Java你猿哥

关注

一只在编程路上渐行渐远的程序猿 2023-03-09 加入

关注我,了解更多Java、架构、Spring等知识

评论

发布
暂无评论
RocketMQ你不得不了解的 Rebalance机制源码分析_Java_Java你猿哥_InfoQ写作社区