一、引言
RocketMQ 是一款优秀的分布式消息中间件,在各方面的性能都比目前已有的消息队列要好,RocketMQ 默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。
RocketMQ 主要由 Producer、Broker、Consumer、Namesvr 等组件组成,其中 Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,Namesvr 负责存储元数据,各组件的主要功能如下:
RocketMQ 整体消息处理逻辑上以 Topic 维度进行生产消费、物理上会存储到具体的 Broker 上的某个 MessageQueue 当中,正因为一个 Topic 会存在多个 Broker 节点上的多个 MessageQueue,所以自然而然就产生了消息生产消费的负载均衡需求。
本篇文章分析的核心在于介绍 RocketMQ 的消息生产者(Producer)和消息消费者(Consumer)在整个消息的生产消费过程中如何实现负载均衡以及其中的实现细节。
二、RocketMQ 的整体架构
(图片来自于 Apache RocketMQ)
RocketMQ 架构上主要分为四部分,如上图所示:
RocketMQ 的 Topic 的物理分布如上图所示:
Topic 作为消息生产和消费的逻辑概念,具体的消息存储分布在不同的 Broker 当中。
Broker 中的 Queue 是 Topic 对应消息的物理存储单元。
在 RocketMQ 的整体设计理念当中,消息的生产消费以 Topic 维度进行,每个 Topic 会在 RocketMQ 的集群中的 Broker 节点创建对应的 MessageQueue。
producer 生产消息的过程本质上就是选择 Topic 在 Broker 的所有的 MessageQueue 并按照一定的规则选择其中一个进行消息发送,正常情况的策略是轮询。
consumer 消费消息的过程本质上就是一个订阅同一个 Topic 的 consumerGroup 下的每个 consumer 按照一定的规则负责 Topic 下一部分 MessageQueue 进行消费。
在 RocketMQ 整个消息的生命周期内,不管是生产消息还是消费消息都会涉及到负载均衡的概念,消息的生成过程中主要涉及到 Broker 选择的负载均衡,消息的消费过程主要涉及多 consumer 和多 Broker 之间的负责均衡。
三、producer 消息生产过程
producer 消息生产过程:
3.1 路由同步过程
public class MQClientInstance { public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic, false, null); } public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { // 省略对应的代码 } else { // 1、负责查询指定的Topic对应的路由信息 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { // 2、比较路由数据topicRouteData是否发生变更 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } // 3、解析路由信息转化为生产者的路由信息和消费者的路由信息 if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // 生成生产者对应的Topic信息 { TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } } } // 保存到本地生产者路由表当中 this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } } finally { this.lockNamesrv.unlock(); } } else { } } catch (InterruptedException e) { } return false; }}
复制代码
路由同步过程:
public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; // 按照broker维度保存的Queue信息 private List<QueueData> queueDatas; // 按照broker维度保存的broker信息 private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;} public class QueueData implements Comparable<QueueData> { // broker的名称 private String brokerName; // 读队列大小 private int readQueueNums; // 写队列大小 private int writeQueueNums; // 读写权限 private int perm; private int topicSynFlag;} public class BrokerData implements Comparable<BrokerData> { // broker所属集群信息 private String cluster; // broker的名称 private String brokerName; // broker对应的ip地址信息 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; private final Random random = new Random();} -------------------------------------------------------------------------------------------------- public class TopicPublishInfo { private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; // 最细粒度的队列信息 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData;} public class MessageQueue implements Comparable<MessageQueue>, Serializable { private static final long serialVersionUID = 6191200464116433425L; // Topic信息 private String topic; // 所属的brokerName信息 private String brokerName; // Topic下的队列信息Id private int queueId;}
复制代码
路由解析过程:
public class MQClientInstance { public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { TopicPublishInfo info = new TopicPublishInfo(); info.setTopicRouteData(route); if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) { // 省略相关代码 } else { List<QueueData> qds = route.getQueueDatas(); // 按照brokerName进行排序 Collections.sort(qds); // 遍历所有broker生成队列维度信息 for (QueueData qd : qds) { // 具备写能力的QueueData能够用于队列生成 if (PermName.isWriteable(qd.getPerm())) { // 遍历获得指定brokerData进行异常条件过滤 BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } if (null == brokerData) { continue; } if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } // 遍历QueueData的写队列的数量大小,生成MessageQueue保存指定TopicPublishInfo for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } } } info.setOrderTopic(false); } return info; }}
复制代码
路由生成过程:
{ "TBW102": [{ "brokerName": "broker-a", "perm": 7, "readQueueNums": 8, "topicSynFlag": 0, "writeQueueNums": 8 }, { "brokerName": "broker-b", "perm": 7, "readQueueNums": 8, "topicSynFlag": 0, "writeQueueNums": 8 }]}
复制代码
路由解析举例:
3.2 负载均衡过程
public class DefaultMQProducerImpl implements MQProducerInner { private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 1、查询消息发送的TopicPublishInfo信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { String[] brokersSent = new String[timesTotal]; // 根据重试次数进行消息发送 for (; times < timesTotal; times++) { // 记录上次发送失败的brokerName String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 2、从TopicPublishInfo获取发送消息的队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { // 3、执行发送并判断发送结果,如果发送失败根据重试次数选择消息队列进行重新发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); switch (communicationMode) { case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (MQBrokerException e) { // 省略相关代码 } catch (InterruptedException e) { // 省略相关代码 } } else { break; } } if (sendResult != null) { return sendResult; } } }}
复制代码
消息发送过程:
public class TopicPublishInfo { public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 按照轮询进行选择发送的MessageQueue for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避开上一次上一次发送失败的MessageQueue if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }}
复制代码
路由选择过程:
Producer 消息发送示意图:
四、consumer 消息消费过程
consumer 消息消费过程:
4.1 路由同步过程
public class MQClientInstance { // 1、启动定时任务从namesvr定时同步路由信息 private void startScheduledTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); } public void updateTopicRouteInfoFromNameServer() { Set<String> topicList = new HashSet<String>(); // 遍历所有的consumer订阅的Topic并从namesvr获取路由信息 { Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { Set<SubscriptionData> subList = impl.subscriptions(); if (subList != null) { for (SubscriptionData subData : subList) { topicList.add(subData.getTopic()); } } } } } for (String topic : topicList) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { // 省略代码 } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // 构建consumer侧的路由信息 { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } this.topicRouteTable.put(topic, cloneTopicRouteData); return true; } } } finally { this.lockNamesrv.unlock(); } } } catch (InterruptedException e) { } return false; }}
复制代码
路由同步过程:
4.2 负载均衡过程
public class RebalanceService extends ServiceThread { private static long waitInterval = Long.parseLong(System.getProperty( "rocketmq.client.rebalance.waitInterval", "20000")); private final MQClientInstance mqClientFactory; public RebalanceService(MQClientInstance mqClientFactory) { this.mqClientFactory = mqClientFactory; } @Override public void run() { while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } }}
复制代码
负载均衡过程:
public abstract class RebalanceImpl { private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { // 省略相关代码 break; } case CLUSTERING: { // 集群模式下的负载均衡 // 1、获取topic下所有的MessageQueue Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 2、获取topic下该consumerGroup下所有的consumer对象 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // 3、开始重新分配进行rebalance if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { // 4、通过分配策略重新进行分配 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 5、根据分配结果执行真正的rebalance动作 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; } }
复制代码
重新分配流程:
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); // 核心逻辑计算开始 // 计算当前cid的下标 int index = cidAll.indexOf(currentCID); // 计算多余的模值 int mod = mqAll.size() % cidAll.size(); // 计算平均大小 int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); // 计算起始下标 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 计算范围大小 int range = Math.min(averageSize, mqAll.size() - startIndex); // 组装结果 for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; } // 核心逻辑计算结束 @Override public String getName() { return "AVG"; }} ------------------------------------------------------------------------------------ rocketMq的集群存在3个broker,分别是broker_a、broker_b、broker_c。 rocketMq上存在名为topic_demo的topic,writeQueue写队列数量为3,分布在3个broker。排序后的mqAll的大小为9,依次为[broker_a_0 broker_a_1 broker_a_2 broker_b_0 broker_b_1 broker_b_2 broker_c_0 broker_c_1 broker_c_2] rocketMq存在包含4个consumer的consumer_group,排序后cidAll依次为[192.168.0.6@15956 192.168.0.7@15957 192.168.0.8@15958 192.168.0.9@15959] 192.168.0.6@15956 的分配MessageQueue结算过程index:0mod:9%4=1averageSize:9 / 4 + 1 = 3startIndex:0range:3messageQueue:[broker_a_0、broker_a_1、broker_a_2] 192.168.0.6@15957 的分配MessageQueue结算过程index:1mod:9%4=1averageSize:9 / 4 = 2startIndex:3range:2messageQueue:[broker_b_0、broker_b_1] 192.168.0.6@15958 的分配MessageQueue结算过程index:2mod:9%4=1averageSize:9 / 4 = 2startIndex:5range:2messageQueue:[broker_b_2、broker_c_0] 192.168.0.6@15959 的分配MessageQueue结算过程index:3mod:9%4=1averageSize:9 / 4 = 2startIndex:7range:2messageQueue:[broker_c_1、broker_c_2]
复制代码
分配策略分析:
consumer 的分配:
五、RocketMQ 指定机器消费设计思路
日常测试环境当中会存在多台 consumer 进行消费,但实际开发当中某台 consumer 新上了功能后希望消息只由该机器进行消费进行逻辑覆盖,这个时候 consumerGroup 的集群模式就会给我们造成困扰,因为消费负载均衡的原因不确定消息具体由那台 consumer 进行消费。当然我们可以通过介入 consumer 的负载均衡机制来实现指定机器消费。
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { private final InternalLogger log = ClientLogger.getLog(); @Override public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { List<MessageQueue> result = new ArrayList<MessageQueue>(); // 通过改写这部分逻辑,增加判断是否是指定IP的机器,如果不是直接返回空列表表示该机器不负责消费 if (!cidAll.contains(currentCID)) { return result; } int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); for (int i = 0; i < range; i++) { result.add(mqAll.get((startIndex + i) % mqAll.size())); } return result; }}
复制代码
consumer 负载均衡策略改写:
六、小结
本文主要介绍了 RocketMQ 在生产和消费过程中的负载均衡机制,结合源码和实际案例力求给读者一个易于理解的技术普及,希望能对读者有参考和借鉴价值。囿于文章篇幅,有些方面未涉及,也有很多技术细节未详细阐述,如有疑问欢迎继续交流。
作者:vivo 互联网服务器团队-Wang Zhi
评论