写点什么

RocketMQ—Producer(五)路由队列选择

作者:IT巅峰技术
  • 2022 年 5 月 17 日
  • 本文字数:5569 字

    阅读完需:约 18 分钟

前言

路由队列选择的作用在于发送消息时可以指定发送到某个 broker 队列,或均衡发送到 broker 队列,其作用就是在于选择合适的队列进行消息发送。


目前客户端队列选择分为三种方式:


  • 第一种:可根据 MessageQueueSelector 的实现或自扩展实现选择队列;

  • 第二种:未开启 Broker 故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制(默认是此种实现方式);

  • 第三种:开启 Broker 故障延迟机制(sendLatencyFaultEnable:true),会根据 brokerName 的可用性选择队列发送。


接下来我们就以这三种方式展开讨论。

一、队列选择

MessageQueueSelector 方式队列选择在了解 MessageQueueSelector 的方式进行队列选择时,我们先回顾下 MQProducer 接口:里面有多个方法签名带参数 MessageQueueSelector,其实就是表明使用此种方式选择消息队列需要显示穿参数才能使用;用下面这个接口方法进行举例分析:


SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
复制代码


接下来我们直接看内部实现源码如何实现的:


DefaultMQProducerImpl#sendSelectImpl


private SendResult sendSelectImpl(  Message msg, MessageQueueSelector selector,  Object arg, final CommunicationMode communicationMode,  final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException,   MQBrokerException, InterruptedException {  long beginStartTime = System.currentTimeMillis();  this.makeSureStateOK();     // 状态检测  Validators.checkMessage(msg, this.defaultMQProducer); // 消息验证
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-队列选择 } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); }
long costTime = System.currentTimeMillis() - beginStartTime; if (timeout < costTime) { throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); } if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已经分析 } else { throw new MQClientException("select message queue return null.", null); // 异常抛出 } } throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); }
复制代码


分析:


selector.select(topicPublishInfo.getMessageQueueList(), msg, arg) 队列选择;


其实现有以下三种:


  • SelectMessageQueueByHash(hash)

  • SelectMessageQueueByMachineRoom(机器随机)

  • SelectMessageQueueByRandom(队列随机)


当然自己也可以定制化扩展,你说简单不简单?我们可简单查看其中之一的实现源码:


SelectMessageQueueByRandom


public class SelectMessageQueueByRandom implements MessageQueueSelector {    private Random random = new Random(System.currentTimeMillis());    @Override    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {        int value = random.nextInt(mqs.size());        return mqs.get(value);    }}
复制代码

二、轮训机制

未开启 Broker 故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制;


来来来,我们直接上大餐,看到下面你就明白原来默认机制是如此的简单哈。


以下方法为入口:


DefaultMQProducerImpl#selectOneMessageQueue


/*** 选择一个消息队列, lastBrokerName 就是上 一 次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时, lastBrokerName 为 null* @param tpInfo* @param lastBrokerName* @return*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}
复制代码


重头戏:


MQFaultStrategy#selectOneMessageQueue


public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {    if (this.sendLatencyFaultEnable) {        ...下面分析...        return tpInfo.selectOneMessageQueue();    }    return tpInfo.selectOneMessageQueue(lastBrokerName);}
复制代码


备注:


TopicPublishInfo 熟悉不熟悉哈?可以翻翻文章(路由动态更新)selectOneMessageQueue 源码注释分析,可以简单理解为队列轮询。

三、队列发送

开启 Broker 故障延迟机制(sendLatencyFaultEnable:true),进行选择队列发送。

3.1 发送延迟故障

如果发送延迟故障打开[sendLatencyFaultEnable:true],则发送时会统计发送耗时和失败[updateFaultItem],当某个 broker 节点发送失败和发送耗时较长,则在一段时间内不再选择该 broker[selectOneMessageQueue]

3.2 流程图

简单流程图片如下:



描述:


  1. LatencyFaultToleranceImpl 包含一个 Map(key:brokerName,value:FaultItem 可用信息):ConcurrentHashMap faultItemTable

  2. FaultItem 的数据结构如下:


class FaultItem implements Comparable<FaultItem> {  //条目唯一键,这里为 brokerName。  private final String name;  //本次消息发送延迟 。(消耗时间)  private volatile long currentLatency;  //故障规避开始时间 = 发生的时间 + notAvailableDuration 。(故障恢复时间)  private volatile long startTimestamp;
.....}
复制代码


备注:


FaultItem 包含 currentLatency 发送耗时,brokerName 节点名称,startTimestamp 时间戳后 broker 可用。

3.3 元数据映射

从 MQFaultStrategy 可以看出:发送延时一可用性延时~元数据映射


private final static InternalLogger log = ClientLogger.getLog();private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();//启用 Broker故障延迟机制 ,默认不启用private boolean sendLatencyFaultEnable = false;private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
复制代码


简单总结如下:


  • latencyMax 数组:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L 对应:发送耗时 50ms 100ms 550ms 1s 2s 3s 15s

  • notAvailableDuration 数组:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L 对应:可用延时 0s 0s 30s 60s 2min 3min 10min


故可以理解为:


  1. 发送耗时 50 和 100 毫秒,则当前 broker 延迟 0 秒

  2. 发送耗时 550 和 1000 毫秒,则当前 broker 延迟 30 秒和 60 秒

  3. 发送耗时 2 秒,则当前 broker 延迟 2min

  4. 发送耗时 3 秒,则当前 broker 延迟 3min

  5. 发送耗时 15 秒,则当前 broker 延迟 10min6

  6. 如果发送失败,则直接延迟 10min

3.4 源码分析:

1:选择队列


MQFaultStrategy#selectOneMessageQueue


public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {  if (this.sendLatencyFaultEnable) { // 1> 默认false,等于true相当于开启      try {          int index = tpInfo.getSendWhichQueue().getAndIncrement();          for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {              int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();              if (pos < 0)                  pos = 0;              MessageQueue mq = tpInfo.getMessageQueueList().get(pos);              if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 2> 判断可用性-时间判断(存储了所有发送消息失败过的broker)                  if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                      return mq; // 3> 说明找到了可用的MessageQueue直接返回              }          }          //4> 尝试从规避的 Broker 中选择一个可用的 Broker(shuffle),如果没有找到,将返回 null。--          final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();          int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);          if (writeQueueNums > 0) {              final MessageQueue mq = tpInfo.selectOneMessageQueue();              if (notBestBroker != null) {                  mq.setBrokerName(notBestBroker);                  mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);              }              return mq;          } else {              latencyFaultTolerance.remove(notBestBroker);          }      } catch (Exception e) {          log.error("Error occurred when selecting message queue", e);      }      //5>兜底选择队列      return tpInfo.selectOneMessageQueue();  }  return tpInfo.selectOneMessageQueue(lastBrokerName);}
复制代码


备注:


此处主要是通过判断 brokerName 是否可用、不可用则该 brokerName 所有 queue 不可能、继续找下一个 brokerName、如果找不到则排序 shuffle 找一个可用的。如果最终找不到则调用 TopicPublishInfo.selectOneMessageQueue 兜底选择一个队列返回。


2:更新 broker 的可用性


(根据发送延时换算可用性延时):updateFaultItem#updateFaultItem


/** * 如果 isolation为 true,则使用 30s作为 computeNotAvailableDuration方法的参数; * 如果 isolation为 false,则使用本次消息发送时延作为 computeNotAvailableDuration方法的参数, * 那 computeNotAvailableDuration 的作用 是 计算因本次消息发送故障需要 将 Broker 规避的时长, * @param brokerName * @param currentLatency * @param isolation =true 表示10分钟后可用,参见MQFaultStrategy的数组元数据 */public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {    if (this.sendLatencyFaultEnable) {        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);    }}
private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; // 计算可用性延时 }
return 0;}
复制代码


备注:


此处逻辑相对简单、小知识点:记住:isolation=true 的特殊情况


3:更新 broker 可用性延时


LatencyFaultToleranceImpl#faultItemTable


/** * 根据 broker名称从 缓存表中获 取 Faultitem,如果找到则更新 Faultltem,否则创 建 Faultltem。 * 1) currentLatency、 startTimeStamp被volatile修饰。 * 2) startTimeStamp 为当前系统时间加上需要规避的时长 。startTimeStamp 是 判断 broker当前是否可用的直接一句,请看 Faultltem#isAvailable方法。 * @param name brokerName * @param currentLatency 消息发送故障延迟时间 。 * @param notAvailableDuration 不可用持续时辰, 在这个时间内, Broker 将被规避 。 */@Overridepublic void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {    FaultItem old = this.faultItemTable.get(name);    if (null == old) {        final FaultItem faultItem = new FaultItem(name);        faultItem.setCurrentLatency(currentLatency);        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);//计算新的可用性延时 } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //计算新的可用性延时 }}
复制代码


备注:


此处逻辑就是更新计算 FaultItem 的 StartTimestamp

四、结论

  1. 通过本文分析我们已经清楚消息发送流程队列选择的三种方式,由于发送消息流程过程中不能动态切换此三种方式,故每种选择队列方式建议根据实际情况进行选择使用;

  2. 至此 Producer 的核心流程源码已经分析完、建议有兴趣可以回顾历史文章。




程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT 巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。


作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

发布于: 刚刚阅读数: 6
用户头像

一线架构师、二线开发、三线管理 2021.12.07 加入

Redis6.X、ES7.X、Kafka3.X、RocketMQ5.0、Flink1.X、ClickHouse20.X、SpringCloud、Netty5等热门技术分享;架构设计方法论与实践;作者热销新书《RocketMQ技术内幕》;

评论

发布
暂无评论
RocketMQ—Producer(五)路由队列选择_RocketMQ_IT巅峰技术_InfoQ写作社区