写点什么

RocketMQ 高可用设计之消息发送重试

作者:周杰伦本人
  • 2022 年 8 月 16 日
    贵州
  • 本文字数:4259 字

    阅读完需:约 14 分钟

RocketMQ 高可用设计之消息发送重试

RocketMQ 高可用设计主要表现四个方面


  • 消息发送的高可用:NameServer 不会及时检查 Broker 和通知 Producer,Producer 不能及时感知 Broker 下线。因此有重试机制与故障延迟机制。

  • 集群管理的高可用:主要体现在 NameServer 宕机不影响已经运行的 Broker、Producer 和 Consumer

  • 消息消费的高可用:重试机制和消息 ACK 机制

  • 消息存储的高可用:消息持久化、主从复制、读写分离机制这篇文章我们就讲讲 RocketMQ 的消息发送重试机制

消息发送重试

DefaultMQProducerImpl 的 sendDefaultImpl 方法:


private SendResult sendDefaultImpl(    Message msg,    final CommunicationMode communicationMode,    final SendCallback sendCallback,    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    this.makeSureStateOK();    Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; }
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } }
return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; }
throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString());
log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } }
if (sendResult != null) { return sendResult; }
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); }
if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); }
throw mqClientException; }
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); }
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
复制代码


从源码我们能了解到,重试机制在消息发送出现异常时尝试再次发送,默认最多重试三次。重试机制仅支持同步发送方式,不支持异步和单向发送。根据发送失败的异常类型处理策略,如果是网络异常 RemotingException 和客户端异常 MQClientException 会进行重试,而 Broker 服务端异常 MQBrokerException 和线程中断异常 InterruptedException 不会重试,直接抛出异常。

总结

这就是 RocketMQ 高可用设计之消息发送重试,看源码我们就明白了怎么回事,其实就是个 for 循环,很简单的实现,如果你感觉这篇文章还不错的话,欢迎给我留言和点赞哟,您的赞和评论是对我最大的鼓励和支持。

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:


  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏

  2. 关注盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ高可用设计之消息发送重试_8月月更_周杰伦本人_InfoQ写作社区