前言
路由队列选择的作用在于发送消息时可以指定发送到某个 broker 队列,或均衡发送到 broker 队列,其作用就是在于选择合适的队列进行消息发送。
目前客户端队列选择分为三种方式:
第一种:可根据 MessageQueueSelector 的实现或自扩展实现选择队列;
第二种:未开启 Broker 故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制(默认是此种实现方式);
第三种:开启 Broker 故障延迟机制(sendLatencyFaultEnable:true),会根据 brokerName 的可用性选择队列发送。
接下来我们就以这三种方式展开讨论。
一、队列选择
MessageQueueSelector 方式队列选择在了解 MessageQueueSelector 的方式进行队列选择时,我们先回顾下 MQProducer 接口:里面有多个方法签名带参数 MessageQueueSelector,其实就是表明使用此种方式选择消息队列需要显示穿参数才能使用;用下面这个接口方法进行举例分析:
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
复制代码
接下来我们直接看内部实现源码如何实现的:
DefaultMQProducerImpl#sendSelectImpl
private SendResult sendSelectImpl(
Message msg, MessageQueueSelector selector,
Object arg, final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK(); // 状态检测
Validators.checkMessage(msg, this.defaultMQProducer); // 消息验证
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); // 1-队列选择
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); // 熟悉的配方,前面已经分析
} else {
throw new MQClientException("select message queue return null.", null); // 异常抛出
}
}
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
复制代码
分析:
selector.select(topicPublishInfo.getMessageQueueList(), msg, arg) 队列选择;
其实现有以下三种:
SelectMessageQueueByHash(hash)
SelectMessageQueueByMachineRoom(机器随机)
SelectMessageQueueByRandom(队列随机)
当然自己也可以定制化扩展,你说简单不简单?我们可简单查看其中之一的实现源码:
SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
复制代码
二、轮训机制
未开启 Broker 故障延迟机制(sendLatencyFaultEnable:false),会采用默认轮训机制;
来来来,我们直接上大餐,看到下面你就明白原来默认机制是如此的简单哈。
以下方法为入口:
DefaultMQProducerImpl#selectOneMessageQueue
/**
* 选择一个消息队列, lastBrokerName 就是上 一 次选择的执行发送消息失败的 Broker。第一次执行消息队列选择时, lastBrokerName 为 null
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
复制代码
重头戏:
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
...下面分析...
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
复制代码
备注:
TopicPublishInfo 熟悉不熟悉哈?可以翻翻文章(路由动态更新)selectOneMessageQueue 源码注释分析,可以简单理解为队列轮询。
三、队列发送
开启 Broker 故障延迟机制(sendLatencyFaultEnable:true),进行选择队列发送。
3.1 发送延迟故障
如果发送延迟故障打开[sendLatencyFaultEnable:true],则发送时会统计发送耗时和失败[updateFaultItem],当某个 broker 节点发送失败和发送耗时较长,则在一段时间内不再选择该 broker[selectOneMessageQueue]
3.2 流程图
简单流程图片如下:
描述:
LatencyFaultToleranceImpl 包含一个 Map(key:brokerName,value:FaultItem 可用信息):ConcurrentHashMap faultItemTable
FaultItem 的数据结构如下:
class FaultItem implements Comparable<FaultItem> {
//条目唯一键,这里为 brokerName。
private final String name;
//本次消息发送延迟 。(消耗时间)
private volatile long currentLatency;
//故障规避开始时间 = 发生的时间 + notAvailableDuration 。(故障恢复时间)
private volatile long startTimestamp;
.....
}
复制代码
备注:
FaultItem 包含 currentLatency 发送耗时,brokerName 节点名称,startTimestamp 时间戳后 broker 可用。
3.3 元数据映射
从 MQFaultStrategy 可以看出:发送延时一可用性延时~元数据映射
private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//启用 Broker故障延迟机制 ,默认不启用
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
复制代码
简单总结如下:
latencyMax 数组:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L 对应:发送耗时 50ms 100ms 550ms 1s 2s 3s 15s
notAvailableDuration 数组:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L 对应:可用延时 0s 0s 30s 60s 2min 3min 10min
故可以理解为:
发送耗时 50 和 100 毫秒,则当前 broker 延迟 0 秒
发送耗时 550 和 1000 毫秒,则当前 broker 延迟 30 秒和 60 秒
发送耗时 2 秒,则当前 broker 延迟 2min
发送耗时 3 秒,则当前 broker 延迟 3min
发送耗时 15 秒,则当前 broker 延迟 10min6
如果发送失败,则直接延迟 10min
3.4 源码分析:
1:选择队列
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) { // 1> 默认false,等于true相当于开启
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 2> 判断可用性-时间判断(存储了所有发送消息失败过的broker)
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq; // 3> 说明找到了可用的MessageQueue直接返回
}
}
//4> 尝试从规避的 Broker 中选择一个可用的 Broker(shuffle),如果没有找到,将返回 null。--
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//5>兜底选择队列
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
复制代码
备注:
此处主要是通过判断 brokerName 是否可用、不可用则该 brokerName 所有 queue 不可能、继续找下一个 brokerName、如果找不到则排序 shuffle 找一个可用的。如果最终找不到则调用 TopicPublishInfo.selectOneMessageQueue 兜底选择一个队列返回。
2:更新 broker 的可用性
(根据发送延时换算可用性延时):updateFaultItem#updateFaultItem
/**
* 如果 isolation为 true,则使用 30s作为 computeNotAvailableDuration方法的参数;
* 如果 isolation为 false,则使用本次消息发送时延作为 computeNotAvailableDuration方法的参数,
* 那 computeNotAvailableDuration 的作用 是 计算因本次消息发送故障需要 将 Broker 规避的时长,
* @param brokerName
* @param currentLatency
* @param isolation =true 表示10分钟后可用,参见MQFaultStrategy的数组元数据
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i]; // 计算可用性延时
}
return 0;
}
复制代码
备注:
此处逻辑相对简单、小知识点:记住:isolation=true 的特殊情况
3:更新 broker 可用性延时
LatencyFaultToleranceImpl#faultItemTable
/**
* 根据 broker名称从 缓存表中获 取 Faultitem,如果找到则更新 Faultltem,否则创 建 Faultltem。
* 1) currentLatency、 startTimeStamp被volatile修饰。
* 2) startTimeStamp 为当前系统时间加上需要规避的时长 。startTimeStamp 是 判断 broker当前是否可用的直接一句,请看 Faultltem#isAvailable方法。
* @param name brokerName
* @param currentLatency 消息发送故障延迟时间 。
* @param notAvailableDuration 不可用持续时辰, 在这个时间内, Broker 将被规避 。
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);//计算新的可用性延时
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //计算新的可用性延时
}
}
复制代码
备注:
此处逻辑就是更新计算 FaultItem 的 StartTimestamp
四、结论
通过本文分析我们已经清楚消息发送流程队列选择的三种方式,由于发送消息流程过程中不能动态切换此三种方式,故每种选择队列方式建议根据实际情况进行选择使用;
至此 Producer 的核心流程源码已经分析完、建议有兴趣可以回顾历史文章。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT 巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
评论