写点什么

深入剖析 RocketMQ 源码 - 负载均衡机制

  • 2022 年 4 月 07 日
  • 本文字数:14031 字

    阅读完需:约 46 分钟

一、引言


RocketMQ 是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ 默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。


RocketMQ 主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr 负责存储元数据,各组件的主要功能如下:


  • 消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到 Broker 服务器。RocketMQ 提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要 Broker 返回确认信息,单向发送不需要。


  • 消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从 Broker 服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。


  • 代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在 RocketMQ 系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。


  • 名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。


  • 生产者组(Producer Group):同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。


  • 消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。


RocketMQ 整体消息处理逻辑上以 Topic 维度进行生产消费、物理上会存储到具体的 Broker 上的某个 MessageQueue 当中,正因为一个 Topic 会存在多个 Broker 节点上的多个 MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。


本篇文章分析的核心在于介绍 RocketMQ 的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。


二、RocketMQ 的整体架构



(图片来自于 Apache RocketMQ)


RocketMQ 架构上主要分为四部分,如上图所示:


  • Producer:消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。


  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以 push 推,pull 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。


  • NameServer:NameServer 是一个非常简单的 Topic 路由注册中心,支持分布式集群方式部署,其角色类似 Dubbo 中的 zookeeper,支持 Broker 的动态注册与发现。


  • BrokerServer:Broker 主要负责消息的存储、投递和查询以及服务高可用保证,支持分布式集群方式部署。



RocketMQ 的 Topic 的物理分布如上图所示:


Topic 作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的 Broker 当中。


Broker 中的 Queue 是 Topic 对应消息的物理存储单元。


在 RocketMQ 的整体设计理念当中,消息的生产消费以 Topic 维度进行,每个 Topic 会在 RocketMQ 的集群中的 Broker 节点创建对应的 MessageQueue。


producer 生产消息的过程本质上就是选择 Topic 在 Broker 的所有的 MessageQueue 并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。


consumer 消费消息的过程本质上就是一个订阅同一个 Topic 的 consumerGroup 下的每个 consumer 按照一定的规则负责 Topic 下一部分 MessageQueue 进行消费。


在 RocketMQ 整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到 Broker 选择的负载均衡,消息的消费过程主要涉及多 consumer 和多 Broker 之间的负责均衡。


三、producer 消息生产过程



producer 消息生产过程:


  • producer 首先访问 namesvr 获取路由信息,namesvr 存储 Topic 维度的所有路由信息(包括每个 topic 在每个 Broker 的队列分布情况)。


  • producer 解析路由信息生成本地的路由信息,解析 Topic 在 Broker 队列信息并转化为本地的消息生产的路由信息。


  • producer 根据本地路由信息向 Broker 发送消息,选择本地路由中具体的 Broker 进行消息发送。


3.1 路由同步过程


public class MQClientInstance {     public boolean updateTopicRouteInfoFromNameServer(final String topic) {        return updateTopicRouteInfoFromNameServer(topic, false, null);    }      public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,        DefaultMQProducer defaultMQProducer) {        try {            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {                try {                    TopicRouteData topicRouteData;                    if (isDefault && defaultMQProducer != null) {                        // 省略对应的代码                    } else {                        // 1、负责查询指定的Topic对应的路由信息                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                    }                     if (topicRouteData != null) {                        // 2、比较路由数据topicRouteData是否发生变更                        TopicRouteData old = this.topicRouteTable.get(topic);                        boolean changed = topicRouteDataIsChange(old, topicRouteData);                        if (!changed) {                            changed = this.isNeedUpdateTopicRouteInfo(topic);                        }                        // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息                        if (changed) {                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());                            }                             // 生成生产者对应的Topic信息                            {                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);                                publishInfo.setHaveTopicRouterInfo(true);                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();                                while (it.hasNext()) {                                    Entry<String, MQProducerInner> entry = it.next();                                    MQProducerInner impl = entry.getValue();                                    if (impl != null) {                                        impl.updateTopicPublishInfo(topic, publishInfo);                                    }                                }                            }                            // 保存到本地生产者路由表当中                            this.topicRouteTable.put(topic, cloneTopicRouteData);                            return true;                        }                    }                } finally {                    this.lockNamesrv.unlock();                }            } else {            }        } catch (InterruptedException e) {        }         return false;    }}
复制代码


路由同步过程


  • 路由同步过程是消息生产者发送消息的前置条件,没有路由的同步就无法感知具体发往那个 Broker 节点。


  • 路由同步主要负责查询指定的 Topic 对应的路由信息,比较路由数据 topicRouteData 是否发生变更,最终解析路由信息转化为生产者的路由信息和消费者的路由信息。


public class TopicRouteData extends RemotingSerializable {    private String orderTopicConf;    // 按照broker维度保存的Queue信息    private List<QueueData> queueDatas;    // 按照broker维度保存的broker信息    private List<BrokerData> brokerDatas;    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;}  public class QueueData implements Comparable<QueueData> {    // broker的名称    private String brokerName;    // 读队列大小    private int readQueueNums;    // 写队列大小    private int writeQueueNums;    // 读写权限    private int perm;    private int topicSynFlag;}  public class BrokerData implements Comparable<BrokerData> {    // broker所属集群信息    private String cluster;    // broker的名称    private String brokerName;    // broker对应的ip地址信息    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;    private final Random random = new Random();}  --------------------------------------------------------------------------------------------------  public class TopicPublishInfo {    private boolean orderTopic = false;    private boolean haveTopicRouterInfo = false;    // 最细粒度的队列信息    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();    private TopicRouteData topicRouteData;} public class MessageQueue implements Comparable<MessageQueue>, Serializable {    private static final long serialVersionUID = 6191200464116433425L;    // Topic信息    private String topic;    // 所属的brokerName信息    private String brokerName;    // Topic下的队列信息Id    private int queueId;}
复制代码


路由解析过程:


  • TopicRouteData 核心变量 QueueData 保存每个 Broker 的队列信息,BrokerData 保存 Broker 的地址信息。


  • TopicPublishInfo 核心变量 MessageQueue 保存最细粒度的队列信息。


  • producer 负责将从 namesvr 获取的 TopicRouteData 转化为 producer 本地的 TopicPublishInfo。


public class MQClientInstance {     public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {         TopicPublishInfo info = new TopicPublishInfo();         info.setTopicRouteData(route);        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {          // 省略相关代码        } else {             List<QueueData> qds = route.getQueueDatas();             // 按照brokerName进行排序            Collections.sort(qds);             // 遍历所有broker生成队列维度信息            for (QueueData qd : qds) {                // 具备写能力的QueueData能够用于队列生成                if (PermName.isWriteable(qd.getPerm())) {                    // 遍历获得指定brokerData进行异常条件过滤                    BrokerData brokerData = null;                    for (BrokerData bd : route.getBrokerDatas()) {                        if (bd.getBrokerName().equals(qd.getBrokerName())) {                            brokerData = bd;                            break;                        }                    }                    if (null == brokerData) {                        continue;                    }                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {                        continue;                    }                     // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);                        info.getMessageQueueList().add(mq);                    }                }            }             info.setOrderTopic(false);        }         return info;    }}
复制代码


路由生成过程:


  • 路由生成过程主要是根据 QueueData 的 BrokerName 和 writeQueueNums 来生成 MessageQueue 对象。


  • MessageQueue 是消息发送过程中选择的最细粒度的可发送消息的队列。


{    "TBW102": [{        "brokerName": "broker-a",        "perm": 7,        "readQueueNums": 8,        "topicSynFlag": 0,        "writeQueueNums": 8    }, {        "brokerName": "broker-b",        "perm": 7,        "readQueueNums": 8,        "topicSynFlag": 0,        "writeQueueNums": 8    }]}
复制代码


路由解析举例:


  • topic(TBW102)在 broker-a 和 broker-b 上存在队列信息,其中读写队列个数都为 8。


  • 先按照 broker-a、broker-b 的名字顺序针对 broker 信息进行排序。


  • 针对 broker-a 会生成 8 个 topic 为 TBW102 的 MessageQueue 对象,queueId 分别是 0-7。


  • 针对 broker-b 会生成 8 个 topic 为 TBW102 的 MessageQueue 对象,queueId 分别是 0-7。


  • topic(名为 TBW102)的 TopicPublishInfo 整体包含 16 个 MessageQueue 对象,其中有 8 个 broker-a 的 MessageQueue,有 8 个 broker-b 的 MessageQueue。


  • 消息发送过程中的路由选择就是从这 16 个 MessageQueue 对象当中获取一个进行消息发送。


3.2 负载均衡过程


public class DefaultMQProducerImpl implements MQProducerInner {     private SendResult sendDefaultImpl(        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {                // 1、查询消息发送的TopicPublishInfo信息        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());         if (topicPublishInfo != null && topicPublishInfo.ok()) {                         String[] brokersSent = new String[timesTotal];            // 根据重试次数进行消息发送            for (; times < timesTotal; times++) {                // 记录上次发送失败的brokerName                String lastBrokerName = null == mq ? null : mq.getBrokerName();                // 2、从TopicPublishInfo获取发送消息的队列                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        // 3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                        switch (communicationMode) {                            case SYNC:                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                        continue;                                    }                                }                                 return sendResult;                            default:                                break;                        }                    } catch (MQBrokerException e) {                        // 省略相关代码                    } catch (InterruptedException e) {                        // 省略相关代码                    }                } else {                    break;                }            }             if (sendResult != null) {                return sendResult;            }        }    }}
复制代码


消息发送过程:


  • 查询 Topic 对应的路由信息对象 TopicPublishInfo。


  • 从 TopicPublishInfo 中通过 selectOneMessageQueue 获取发送消息的队列,该队列代表具体落到具体的 Broker 的 queue 队列当中。


  • 执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送,重新选择队列会避开上一次发送失败的 Broker 的队列。


public class TopicPublishInfo {     public MessageQueue selectOneMessageQueue(final String lastBrokerName) {        if (lastBrokerName == null) {            return selectOneMessageQueue();        } else {            // 按照轮询进行选择发送的MessageQueue            for (int i = 0; i < this.messageQueueList.size(); i++) {                int index = this.sendWhichQueue.getAndIncrement();                int pos = Math.abs(index) % this.messageQueueList.size();                if (pos < 0)                    pos = 0;                MessageQueue mq = this.messageQueueList.get(pos);                // 避开上一次上一次发送失败的MessageQueue                if (!mq.getBrokerName().equals(lastBrokerName)) {                    return mq;                }            }            return selectOneMessageQueue();        }    }}
复制代码


路由选择过程:


  • MessageQueue 的选择按照轮询进行选择,通过全局维护索引进行累加取模选择发送队列。


  • MessageQueue 的选择过程中会避开上一次发送失败 Broker 对应的 MessageQueue。



Producer 消息发送示意图


  • 某 Topic 的队列分布为 Broker_A_Queue1、Broker_A_Queue2、Broker_B_Queue1、Broker_B_Queue2、Broker_C_Queue1、Broker_C_Queue2,根据轮询策略依次进行选择。


  • 发送失败的场景下如 Broker_A_Queue1 发送失败那么就会跳过 Broker_A 选择 Broker_B_Queue1 进行发送。


四、consumer 消息消费过程



consumer 消息消费过程


  • consumer 访问 namesvr 同步 topic 对应的路由信息。


  • consumer 在本地解析远程路由信息并保存到本地。


  • consumer 在本地进行 Reblance 负载均衡确定本节点负责消费的 MessageQueue。


  • consumer 访问 Broker 消费指定的 MessageQueue 的消息。


4.1 路由同步过程


public class MQClientInstance {     // 1、启动定时任务从namesvr定时同步路由信息    private void startScheduledTask() {        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {             @Override            public void run() {                try {                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();                } catch (Exception e) {                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);                }            }        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);    }     public void updateTopicRouteInfoFromNameServer() {        Set<String> topicList = new HashSet<String>();         // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息        {            Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();            while (it.hasNext()) {                Entry<String, MQConsumerInner> entry = it.next();                MQConsumerInner impl = entry.getValue();                if (impl != null) {                    Set<SubscriptionData> subList = impl.subscriptions();                    if (subList != null) {                        for (SubscriptionData subData : subList) {                            topicList.add(subData.getTopic());                        }                    }                }            }        }         for (String topic : topicList) {            this.updateTopicRouteInfoFromNameServer(topic);        }    }     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,        DefaultMQProducer defaultMQProducer) {         try {            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {                try {                    TopicRouteData topicRouteData;                    if (isDefault && defaultMQProducer != null) {                        // 省略代码                    } else {                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                    }                     if (topicRouteData != null) {                        TopicRouteData old = this.topicRouteTable.get(topic);                        boolean changed = topicRouteDataIsChange(old, topicRouteData);                        if (!changed) {                            changed = this.isNeedUpdateTopicRouteInfo(topic);                        }                         if (changed) {                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());                            }                             // 构建consumer侧的路由信息                            {                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();                                while (it.hasNext()) {                                    Entry<String, MQConsumerInner> entry = it.next();                                    MQConsumerInner impl = entry.getValue();                                    if (impl != null) {                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);                                    }                                }                            }                                 this.topicRouteTable.put(topic, cloneTopicRouteData);                            return true;                        }                    }                } finally {                    this.lockNamesrv.unlock();                }            }        } catch (InterruptedException e) {        }         return false;    }}
复制代码


路由同步过程


  • 路由同步过程是消息消费者消费消息的前置条件,没有路由的同步就无法感知具体待消费的消息的 Broker 节点。


  • consumer 节点通过定时任务定期从 namesvr 同步该消费节点订阅的 topic 的路由信息。


  • consumer 通过 updateTopicSubscribeInfo 将同步的路由信息构建成本地的路由信息并用以后续的负责均衡。


4.2 负载均衡过程


public class RebalanceService extends ServiceThread {     private static long waitInterval =        Long.parseLong(System.getProperty(            "rocketmq.client.rebalance.waitInterval", "20000"));     private final MQClientInstance mqClientFactory;     public RebalanceService(MQClientInstance mqClientFactory) {        this.mqClientFactory = mqClientFactory;    }     @Override    public void run() {         while (!this.isStopped()) {            this.waitForRunning(waitInterval);            this.mqClientFactory.doRebalance();        }     }}
复制代码


负载均衡过程


  • consumer 通过 RebalanceService 来定期进行重新负载均衡。


  • RebalanceService 的核心在于完成 MessageQueue 和 consumer 的分配关系。


public abstract class RebalanceImpl {     private void rebalanceByTopic(final String topic, final boolean isOrder) {        switch (messageModel) {            case BROADCASTING: {                // 省略相关代码                break;            }            case CLUSTERING: { // 集群模式下的负载均衡                // 1、获取topic下所有的MessageQueue                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);                 // 2、获取topic下该consumerGroup下所有的consumer对象                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);                                 // 3、开始重新分配进行rebalance                if (mqSet != null && cidAll != null) {                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();                    mqAll.addAll(mqSet);                     Collections.sort(mqAll);                    Collections.sort(cidAll);                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;                     List<MessageQueue> allocateResult = null;                    try {                        // 4、通过分配策略重新进行分配                        allocateResult = strategy.allocate(                            this.consumerGroup,                            this.mQClientFactory.getClientId(),                            mqAll,                            cidAll);                    } catch (Throwable e) {                         return;                    }                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();                    if (allocateResult != null) {                        allocateResultSet.addAll(allocateResult);                    }                    // 5、根据分配结果执行真正的rebalance动作                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);                    if (changed) {                        this.messageQueueChanged(topic, mqSet, allocateResultSet);                    }                }                break;            }            default:                break;        }    }
复制代码


重新分配流程


  • 获取 topic 下所有的 MessageQueue。


  • 获取 topic 下该 consumerGroup 下所有的 consumer 的 cid(如 192.168.0.8@15958)。


  • 针对 mqAll 和 cidAll 进行排序,mqAll 排序顺序按照先 BrokerName 后 BrokerId,cidAll 排序按照字符串排序。


  • 通过分配策略


  • AllocateMessageQueueStrategy 重新分配。


  • 根据分配结果执行真正的 rebalance 动作。


public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {    private final InternalLogger log = ClientLogger.getLog();     @Override    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,        List<String> cidAll) {                 List<MessageQueue> result = new ArrayList<MessageQueue>();                 // 核心逻辑计算开始         // 计算当前cid的下标        int index = cidAll.indexOf(currentCID);                 // 计算多余的模值        int mod = mqAll.size() % cidAll.size();         // 计算平均大小        int averageSize =            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()                + 1 : mqAll.size() / cidAll.size());        // 计算起始下标        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;        // 计算范围大小        int range = Math.min(averageSize, mqAll.size() - startIndex);        // 组装结果        for (int i = 0; i < range; i++) {            result.add(mqAll.get((startIndex + i) % mqAll.size()));        }        return result;    }    // 核心逻辑计算结束     @Override    public String getName() {        return "AVG";    }} ------------------------------------------------------------------------------------ rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。 rocketMq上存在名为topic_demo的topic,writeQueue写队列数量为3,分布在3个broker。排序后的mqAll的大小为9,依次为[broker_a_0  broker_a_1  broker_a_2  broker_b_0  broker_b_1  broker_b_2  broker_c_0  broker_c_1  broker_c_2] rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为[192.168.0.6@15956  192.168.0.7@15957  192.168.0.8@15958  192.168.0.9@15959] 192.168.0.6@15956 的分配MessageQueue结算过程index:0mod:9%4=1averageSize:9 / 4 + 1 = 3startIndex:0range:3messageQueue:[broker_a_0、broker_a_1、broker_a_2]  192.168.0.6@15957 的分配MessageQueue结算过程index:1mod:9%4=1averageSize:9 / 4 = 2startIndex:3range:2messageQueue:[broker_b_0、broker_b_1]  192.168.0.6@15958 的分配MessageQueue结算过程index:2mod:9%4=1averageSize:9 / 4 = 2startIndex:5range:2messageQueue:[broker_b_2、broker_c_0]  192.168.0.6@15959 的分配MessageQueue结算过程index:3mod:9%4=1averageSize:9 / 4 = 2startIndex:7range:2messageQueue:[broker_c_1、broker_c_2]
复制代码


分配策略分析:


  • 整体分配策略可以参考上图的具体例子,可以更好的理解分配的逻辑。



consumer 的分配


  • 同一个 consumerGroup 下的 consumer 对象会分配到同一个 Topic 下不同的 MessageQueue。


  • 每个 MessageQueue 最终会分配到具体的 consumer 当中。


五、RocketMQ 指定机器消费设计思路


日常测试环境当中会存在多台 consumer 进行消费,但实际开发当中某台 consumer 新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候 consumerGroup 的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由那台 consumer 进行消费。当然我们可以通过介入 consumer 的负载均衡机制来实现指定机器消费。


public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {    private final InternalLogger log = ClientLogger.getLog();     @Override    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,        List<String> cidAll) {                 List<MessageQueue> result = new ArrayList<MessageQueue>();        // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费        if (!cidAll.contains(currentCID)) {            return result;        }         int index = cidAll.indexOf(currentCID);        int mod = mqAll.size() % cidAll.size();        int averageSize =            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()                + 1 : mqAll.size() / cidAll.size());        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;        int range = Math.min(averageSize, mqAll.size() - startIndex);        for (int i = 0; i < range; i++) {            result.add(mqAll.get((startIndex + i) % mqAll.size()));        }        return result;    }}
复制代码


consumer 负载均衡策略改写


  • 通过改写负载均衡策略 AllocateMessageQueueAveragely 的 allocate 机制保证只有指定 IP 的机器能够进行消费。


  • 通过 IP 进行判断是基于 RocketMQ 的 cid 格式是 192.168.0.6@15956,其中前面的 IP 地址就是对于的消费机器的 ip 地址,整个方案可行且可以实际落地。


六、小结


本文主要介绍了 RocketMQ 在生产和消费过程中的负载均衡机制,结合源码和实际案例力求给读者一个易于理解的技术普及,希望能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未涉及,也有很多技术细节未详细阐述,如有疑问欢迎继续交流。


作者:vivo 互联网服务器团队-Wang Zhi

发布于: 刚刚阅读数: 2
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020.07.10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
深入剖析 RocketMQ 源码 - 负载均衡机制_负载均衡_vivo互联网技术_InfoQ写作平台