写点什么

【深度挖掘 RocketMQ 底层源码】「底层系列」深度挖掘 RocketMQ 底层导致消息丢失透析(Broker Busy 和 ToManyRequest)

作者:洛神灬殇
  • 2023-03-10
    江苏
  • 本文字数:2262 字

    阅读完需:约 7 分钟

【深度挖掘RocketMQ底层源码】「底层系列」深度挖掘RocketMQ底层导致消息丢失透析(Broker Busy和ToManyRequest)

承接上文

通过上一篇文章《【深度挖掘 RocketMQ 底层源码】「底层问题分析系列」深度挖掘 RocketMQ 底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]》,我们知道了对应的“[REJECTREQUEST]system busy, start flow control for a while”的主要原因。

回顾问题要点

再次追踪以下问题代码的未知,我们可以看到其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand。



上面的原理分析部分已经详细介绍其实现原理,总结如下:

主要由两种场景的考虑

  • 在不开启 transientStorePoolEnable 机制时,如果 Broker PageCache 繁忙时则抛出上述错误,判断 PageCache 繁忙的依据就是向 PageCache 追加消息时,如果持有锁的时间超过 1s,则会抛出该错误。

  • 在开启 transientStorePoolEnable 机制时,其判断依据是如果 TransientStorePool 中不存在可用的堆外内存时抛出该错误。

本章内容方向

  • too many requests and system thread pool busy, RejectedExecutionException

  • [PC_SYNCHRONIZED]broker busy, start flow control for a while

  • [PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d

问题:too many requests and system thread pool busy, RejectedExecutionException


其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand,其调用地方紧跟 3.1,是在向线程池执行任务时,被线程池拒绝执行时抛出的,我们可以顺便看看 Broker 消息处理发送的线程信息:

BrokerController#registerProcessor


主要看的是队列的 BlockedQueue 的长度进行控制说明:



该线程池的队列长度默认为 10000,我们可以通过 sendThreadPoolQueueCapacity 来改变默认值,如下图所示。



private int sendThreadPoolQueueCapacity = 10000;
复制代码

[PC_SYNCHRONIZED]broker busy


其抛出的源码入口点:DefaultMessageStore#putMessage,在进行消息追加时,再一次判断 PageCache 是否繁忙,如果繁忙,则抛出上述错误。


    @Override    public boolean isOSPageCacheBusy() {        long begin = this.getCommitLog().getBeginTimeInLock();        long diff = this.systemClock.now() - begin;        return diff < 10000000            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();    }    @Override    public long lockTimeMills() {        return this.commitLog.lockTimeMills();    }    public SystemClock getSystemClock() {        return systemClock;    }    public CommitLog getCommitLog() {        return commitLog;    }
复制代码


上述的代码已经在上一章节已经完成了介绍,对应的服务处理和逻辑引发的问题,大家可以关注上一章的内容即可。

broker busy, period in queue: %sms, size of queue: %d

其源码的入口点


BrokerFastFailure#cleanExpiredRequest,该方法的调用频率为每隔 10s 中执行一次,不过有一个执行前提条件就是 Broker 端要开启快速失败,默认为开启,可以通过参数brokerFastFailureEnable来设置。该方法的实现要点是每隔 10s,检测一次,如果检测到 PageCache 繁忙,并且发送队列中还有排队的任务,则直接不再等待,直接抛出系统繁忙错误,使正在排队的线程快速失败,结束等待。

不靠谱得方案

消息发送时抛出system busy、broker busy的原因都是 PageCache 繁忙,那是不是可以通过调整上述提到的某些参数来避免抛出错误呢?例如,如下参数:

osPageCacheBusyTimeOutMills

设置 PageCache 系统超时的时间,默认为 1000,表示 1s,那是不是可以把增加这个值,例如设置为 2000 或 3000。不推荐

sendThreadPoolQueueCapacity

Broker 服务器处理的排队队列,默认为 10000,如果队列中积压了 10000 个请求,则会抛出 RejectExecutionException。不推荐

brokerFastFailureEnable

是否启用快速失败,默认为 true,表示当如果发现 Broker 服务器的 PageCache 繁忙,如果发现sendThreadPoolQueue队列中不为空,表示还有排队的发送请求在排队等待执行,则直接结束等待,返回 broker busy。那如果不开启快速失败,则同样可以避免抛出这个错误。不推荐

问题总结

修改上述参数,都不可取,原因是出现 system busy、broker busy 这个错误,其本质是系统的 PageCache 繁忙,通俗一点讲就是向 PageCache 追加消息时,单个消息发送占用的时间超过 1s 了,如果继续往该 Broker 服务器发送消息并等待,其 TPS 根本无法满足,哪还是高性能的消息中间了呀。

靠谱得方案

开启 transientStorePoolEnable,在 broker 配置文件中将 transientStorePoolEnable 设置为 true。这个方案在上一篇文章已经介绍过了,再次就不过多赘余。

它得问题重点是集中于

会增加数据丢失的可能性,如果 Broker JVM 进程异常退出,提交到 PageCache 中的消息是不会丢失的,但存在堆外内存(DirectByteBuffer)中但还未提交到 PageCache 中的这部分消息,将会丢失。但通常情况下,RocketMQ 进程退出的可能性不大。

扩容 Broker 服务器

当 Broker 服务器自身比较忙的时候,才会采用快速失败机制,直接给消息发送者返回错误,消息发送者默认情况会重试 2 次,将消息发往其他 Broker,保证其高可用,并且在接下来的一段时间内会规避该 Broker,这样该 Broker 恢复提供了时间保证,Broker 本身的架构是支持分布式水平扩容的,增加 Topic 的队列数,降低单台 Broker 服务器的负载,从而避免出现 PageCache。


Broker 扩容时候,可以复制集群中任意一台 Broker 服务下 ${ROCKETMQ_HOME}/store/config/topics.json 到新 Broker 服务器指定目录,避免在新 Broker 服务器上为 Broker 创建队列,然后,消息发送者、消息消费者都能动态获取 Topic 的路由信息

发布于: 2023-03-10阅读数: 4
用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【深度挖掘RocketMQ底层源码】「底层系列」深度挖掘RocketMQ底层导致消息丢失透析(Broker Busy和ToManyRequest)_源码分析_洛神灬殇_InfoQ写作社区