写点什么

RocketMQ 一行代码造成大量消息发送失败

  • 2021 年 11 月 11 日
  • 本文字数:1983 字

    阅读完需:约 7 分钟

首先我们根据关键字:TIMEOUT_CLEAN_QUEUE 去 RocketMQ 中查询,去探究在什么时候会抛出如上错误。根据全文搜索如下图所示:



该方法是在 BrokerFastFailure 中定义的,通过名称即可以看成其设计目的:Broker 端快速失败机制。


Broker 端快速失败其原理图如下:


![在这里插入图片描述](https://img-blog.csdnimg.cn/20200517091356312.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70#


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


pic_center)


  • 消息发送者向 Broker 发送消息写入请求,Broker 端在接收到请求后会首先放入一个队列中(SendThreadPoolQueue),默认容量为 10000。

  • Broker 会专门使用一个线程池(SendMessageExecutor)去从队列中获取任务并执行消息写入请求,为了保证消息的顺序处理,该线程池默认线程个数为 1。


如果 Broker 端受到垃圾回收等等因素造成单条写入数据发生抖动,单个 Broker 端积压的请求太多从而得不到及时处理,会极大的造成客户端消息发送的时间延长。


设想一下,如果由于 Broker 压力增大,写入一条消息需要 500ms 甚至超过 1s,并且队列中积压了 5000 条消息,消息发送端的默认超时时间为 3s,如果按照这样的速度,这些请求在轮到 Broker 执行写入请求时,客户端已经将这个请求超时了,这样不仅会造成大量的无效处理,还会导致客户端发送超时。


故 RocketMQ 为了解决该问题,引入 Broker 端快速失败机制,即开启一个定时调度线程,每隔 10 毫秒去检查队列中的第一个排队节点,如果该节点的排队时间已经超过了 200ms,就会取消该队列中所有已超过 200ms 的请求,立即向客户端返回失败,这样客户端能尽快进行重试,因为 Broker 都是集群部署,下次重试可以发送到其他 Broker 上,这样能最大程度保证消息发送在默认 3s 的时间内经过重试机制,能有效避免某一台 Broker 由于瞬时压力大而造成的消息发送不可用,从而实现消息发送的高可用。


从 Broker 端快速失败机制引入的初衷来看,快速失败后会发起重试,除非同一深刻集群内所有的 Broker 都繁忙,不然消息会发送成功,用户是不会感知这个错误的,那为什么用户感知了呢?难道 TIMEOUT_ CLEAN _ QUEUE 错误,Broker 不重试?


为了解开这个谜团,接下来会采用源码分析的手段去探究真相。接下来将以消息同步发送为例揭示其消息发送处理流程中的核心关键点。


MQ Client 消息发送端首先会利用网络通道将请求发送到 Broker,然后接收到请求结果后并调用 processSendResponse 方法对响应结果进行解析,如下图所示:



在这里返回的 code 为 RemotingSysResponseCode . SYSTEM_BUSY。


我们从 proccessSendResponse 方法中可以得知,如果 code 为 SYSTEM_BUSY,该方法会抛出 MQBrokerException,响应 code 为 SYSTEM_BUSY,其错误描述为开头部分的错误信息。


那我们沿着该方法的调用链,可以找到其直接调用方为:DefaultMQProducerImpl 的 sendKernelImpl,我们重点考虑如果底层方法抛出 MQBrokerException 该方法会如何处理。


其关键代码如下图所示:



可以看出在 sendKernelImpl 方法中首先会捕捉异常,先执行注册的钩子函数,即就算执行失败,对应的消息发送后置钩子函数也会执行,然后再原封不动的将该异常向上抛出。


sendKernelImpl 方法被 DefaultMQProducerImpl 的 sendDefaultImpl 方法调用,下面是其核心实现截图:



从这里可以看出 RocketMQ 消息发送高可用设计一个非常关键的点,重试机制,其实现是在 for 循环中 使用 try catch 将 sendKernelImpl 方法包裹,就可以保证该方法抛出异常后能继续重试。从上文可知,如果 SYSTEM_BUSY 会抛出 MQBrokerException,但发现只有上述几个错误码才会重试,因为如果不是上述错误码,会继续向外抛出异常,此时 for 循环会被中断,即不会重试。


这里非常令人意外的是连 SYSTEM_ERROR 都会重试,却没有包含 SYSTEM_BUSY,显然违背了快速失败的设计初衷,故笔者断定,这是 RocketMQ 的一个 BUG,将 SYSTEM_BUSY 遗漏了,后面与 RocketMQ 核心成员进行过沟通,也印证了这点,后续会提一个 PR,在上面增加一行代码,将 SYSTEM_BUSY 加上即可。


问题分析到这里,该问题应该就非常明了。


3、解决方案




如果大家在网上搜索 TIMEOUT_CLEAN_QUEUE 的解决方法,大家不约而同提出的解决方案是增加 waitTimeMillsInSendQueue 的值,该值默认为 200ms,例如将其设置为 1000s 等等,以前我是反对的,因为我的认知里 Broker 会重试,但现在发现 Broker 不会重试,所以我现在认为该 BUG 未解决的情况下适当提高该值能有效的缓解。


但这是并不是好的解决方案,我会在近期向官方提交一个 PR,将这个问题修复,建议大家在公司尽量对自己使用的版本进行修改,重新打一个包即可,因为这已经违背了 Broker 端快速失败的设计初衷。

评论

发布
暂无评论
RocketMQ一行代码造成大量消息发送失败