写点什么

RocketMQ 高可用设计之故障规避机制

作者:周杰伦本人
  • 2022 年 8 月 17 日
    贵州
  • 本文字数:2996 字

    阅读完需:约 10 分钟

RocketMQ 高可用设计之故障规避机制

NameServer 为了简化和客户端通信,发现 Broker 故障时并不会立即通知客户端。故障规避机制就是用来解决当 Broker 出现故障,Producer 不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的 Broker 暂时排除在队列选择列表外

MQFaultStrategy 类

MQFaultStrategy 类的


public class MQFaultStrategy {    private final static InternalLogger log = ClientLogger.getLog();    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
public long[] getNotAvailableDuration() { return notAvailableDuration; }
public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; }
public long[] getLatencyMax() { return latencyMax; }
public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; }
public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; }
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; }
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } }
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); }
return tpInfo.selectOneMessageQueue(); }
return tpInfo.selectOneMessageQueue(lastBrokerName); }
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; }
return 0; }}
复制代码


MQFaultStrategy 顾名思义是 MQ 的默认策略类,它的是否开启故障延迟机制,判断 Queue 是否可用,然后选择默认的轮询机制 selectOneMessageQueue()方法中会判断是否


在选择查找路由时,选择消息队列的关键步骤:


  1. 先按轮询算法选择一个消息队列

  2. 从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl

LatencyFaultToleranceImpl 中判断是否可用:


@Overridepublic boolean isAvailable(final String name) {    final FaultItem faultItem = this.faultItemTable.get(name);    if (faultItem != null) {        return faultItem.isAvailable();    }    return true;}
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
复制代码


  1. 判断是否在故障列表中,不在故障列表中代表可用。

  2. 在故障列表中判断当前时间是否大于等于故障规避的开始时间 startTimestamp


在消息发送结束后和发送出现异常时调用 updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker 异常、客户端异常都是固定响应时长 30s,它们故障周期时长为 10 分钟。消息发送成功或线程中断异常响应时间在 100 毫秒以内,故障周期时长为 0。

LatencyFaultToleranceImpl 类的 updateFaultItem 方法

LatencyFaultToleranceImpl 类的 updateFaultItem 方法:


@Overridepublic void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {    FaultItem old = this.faultItemTable.get(name);    if (null == old) {        final FaultItem faultItem = new FaultItem(name);        faultItem.setCurrentLatency(currentLatency);        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//加入故障列表 old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); }}
复制代码


FaultItem 存储 Broker 名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断 Queue 是否可用

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:


  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏

  2. 关注盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。

发布于: 刚刚阅读数: 6
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ高可用设计之故障规避机制_8月月更_周杰伦本人_InfoQ写作社区