写点什么

【深度挖掘 RocketMQ 底层源码】「底层问题分析系列」深度挖掘 RocketMQ 底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]system busy, start flow control for a while)

作者:洛神灬殇
  • 2023-03-09
    江苏
  • 本文字数:3691 字

    阅读完需:约 12 分钟

【深度挖掘RocketMQ底层源码】「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]system busy, start flow control for a while)

常见问题汇总分析

最近因为项目的并发量以及数据的吞吐处理量越来越高,我们的 RocketMQ 的处理数据的能力,已经慢慢成为了我们的问题和瓶颈了,频繁会出现 OOM 的瓶颈问题,当然内存的问题我们可以扩充资源和调整配额就可以解决了,但是又出现了其他可怕的问题,消息会出现丢失的常见,其中有代码层面的失误和 bug,也有资源引起的问题,所以我们本期先重点分析说明一下因为资源所引起的问题,主要由以下这几个大场景所导致,看看你有没有遇到过?



接下来,我们就将对以上这几个场景进行分析和介绍它们出现的根本原因以及如何解决!本篇内容主要针对于[REJECTREQUEST]system busy, start flow control for a while进行分析说明。

问题介绍

[REJECTREQUEST] system busy, start flow control for a while

首先,我们限定为源码的未知,这样子才可以方便我们去分析问题,以及探究根本的因素所在,源码方法太长,只好截图了,如下图所示。



可以看到话红框的地方就是报错和打印的地方,大家可以使用自己的 IDE 进行搜索,这块的代码在org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand。从上图可以看出,抛出上述错误的关键原因是:pair.getObject1().rejectRequest()返回了 true 所引起的错误。如果想要看清楚这个地方的代码,需要先认识一下 NettyRequestProcessor、Pair、RequestCode。

Pair

Pair 主要用来封装 NettyRequestProcessor 与 ExecuteService 的绑定关系。在 RocketMQ 的网络处理模型中,会为每一个 NettyRequestProcessor 与特定的线程池绑定,所有该 NettyRequestProcessor 的处理逻辑都在该线程池中运行。



上述的pair.getObject1()的方法主要来自于上图的这个 Pair 的包装类,可以看出来 Rocket-Client 端将不同的请求定义不同的请求命令 CODE,服务端会将客户端请求进行分类,每个命令或每类请求命令定义一个处理器(NettyRequestProcessor),然后每一个 NettyRequestProcessor 绑定到一个单独的线程池,进行命令处理,不同类型的请求将使用不同的线程池进行处理,实现线程隔离。



  • object1:对应的是 NettyRequestProcessor 对象

  • object2:对应的是 ExecuteService 对象


所以可以得出结论 getObject1()说明获取的正是 NettyRequestProcessor 的接口的实现类。而对应的 NettyRequestProcessor 的实现类也有很多,如下图所示。

NettyRequestProcessor


RocketMQ 服务端请求处理器。例如 SendMessageProcessor 是消息发送处理器、PullMessageProcessor 是消息拉取命令处理器、ClientRemotingProcessor 等等。

RequestCode

请求 CODE,用来区分请求的类型,例如 SEND_MESSAGE:表示该请求为消息发送,PULL_MESSAGE:消息拉取请求。当然还有很多其他的编码,可以看看这个类org.apache.rocketmq.common.protocol.RequestCode。如下代码所示进行获取对应的处理器以及线程池。


final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
复制代码



这样子相信大家应该知道了整体的相关的结构和执行机制了,我们来看一下为什么会出现这个错误!

SendMessageProcessor#rejectRequest(broker 端源码)

public boolean rejectRequest() {    return this.brokerController.getMessageStore().isOSPageCacheBusy() ||         this.brokerController.getMessageStore().isTransientStorePoolDeficient();}
复制代码


拒绝请求的条件有两个,只要其中任意一个满足,则返回 true。


  • Os PageCache busy:判断操作系统 PageCache 是否繁忙,如果忙,则返回 true。

  • transientStorePool 是否不足

isOSPageCacheBusy()的操作系统缓存页繁忙

DefaultMessageStore#isOSPageCacheBusy()主要的目的就是针对于 CommitLog 的锁定时间进行判定,OSPageCache 是否存在超时锁定的问题。


public boolean isOSPageCacheBusy() {    long begin = this.getCommitLog().getBeginTimeInLock();    long diff = this.systemClock.now() - begin;    return diff < 10000000                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); }
复制代码


begin、diff 两个局部变量



  • begin(开始时间)


将消息写入 Commitlog 文件所持有锁的开始时间,就是将消息体追加到内存映射文件(DirectByteBuffer)或 pageCache(FileChannel#map),在该过程中开始持有锁的时间戳。具体可以查看源码CommitLog#putMessage


  • diff(总长时间)


一次消息追加过程中持有锁的总时长,即往内存映射文件或 pageCache 追加一条消息所耗时间。如果一次消息追加过程的时间超过了 Broker 配置文件 osPageCacheBusyTimeOutMills,则认为 pageCache 繁忙,osPageCacheBusyTimeOutMills 默认值为 1000,表示 1s。

isTransientStorePoolDeficient

说到了isTransientStorePoolDeficient就要说一下 transientStorePoolEnable 机制。

transientStorePoolEnable 机制

Java NIO 的内存映射机制,提供了将文件系统中的文件映射到内存机制,实现对文件的操作转换对内存地址的操作,极大的提高了 IO 特性,但这部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ 为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的 commitlog 文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是 transientStorePoolEnable。

DefaultMessageStore#isTransientStorePoolDeficient
public boolean isTransientStorePoolDeficient() {    return remainTransientStoreBufferNumbs() == 0;}
public int remainTransientStoreBufferNumbs() { return this.transientStorePool.remainBufferNumbs();}
复制代码


从上面可以看出来,最底层调用 TransientStorePool#remainBufferNumbs 方法。


public int remainBufferNumbs() {        if (storeConfig.isTransientStorePoolEnable()) {            return availableBuffers.size();        }        return Integer.MAX_VALUE;}
复制代码


如果启用 transientStorePoolEnable 机制,返回当前可用的 ByteBuffer 个数,即整个 isTransientStorePoolDeficient 方法的用意是是否还存在可用的 ByteBuffer,如果不存在,即表示 pageCache 繁忙。


transientStorePoolEnable 的 MappedFile 实现


MappedFile 的 ByteBuffer writeBuffer、MappedByteBuffer mappedByteBuffer 这两个属性的初始化,两个方法是写消息与查消息操作的直接数据结构。



MappedFile 底层是使用 ByteBuffer 以及 MappedByteBuffer 进行实现的堆外直接内存的写入和读取操作当 RocketMQ 开启了 transientStorePoolEnable,则使用 ByteBuffer.allocateDirect(fileSize),创建(java.nio 的内存映射机制)。如果未开启,则为空。



MappedByteBuffer


MappedByteBuffer 使用 FileChannel#map 方法创建,即真正意义上的 PageCache。


transientStorePoolEnable 的读写分离模式

顺序写模式

RocketMQ 在消息写入时采用的顺序写模式,如果 writerBuffer 不为空,说明开启了 transientStorePoolEnable 机制,则消息首先写入 writerBuffer 中,如果其为空,则写入 mappedByteBuffer 中。


随机读模式

消息读取时,是从 mappedByteBuffer 中读(pageCache),并且采用的随机读的模式。开启 transientStorePoolEnable 机制,有点类似于读写分离的效果,先写入 writerBuffer 中,读却是从 mappedByteBuffer 中读取。



至此,我们基本分析了对应的出现的大概出现了([REJECTREQUEST]system busy, start flow control for a while)的原因了,因为要想解决 OSPageCacheBusy 锁的问题。我们尽可能减少 OsPageCache 锁以及读写冲突的问题,所以就是使用 transientStorePoolEnable 进行读写分离,从而减少对应的冲突问题。

开启 transientStorePoolEnable

我们先来对比以下两种模式的基底原理有什么不同?

mmap+PageCache 的模式

读写消息都走的是 pageCache,这样子读写都在 pagecache 里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁(锁超时)、并发冲突的问题。


transientStorePoolEnable 的模式

开启该模式的效果,从而使用了 DirectByteBuffer(堆外内存)+PageCache 的叠加效果,可以实现读写消息分离。



写入消息时候写到的是 DirectByteBuffer—堆外内存中,读消息走的是 PageCache,对于,DirectByteBuffer 是两步刷盘,一步是刷到 PageCache,之后才是刷到 commitLog 中,带来的好处就是,避免了内存操作所造成的 OsPageCache 锁等待的问题,降低了时延,比如说缺页中断降低,内存加锁。


引发的问题


带来的风险就是 OSPageCache 会保证即使在宕机的时候,也会将数据写入到磁盘中,但是堆外内存,一旦 Broker 宕机了,例如 OOM,还没来的及写入到磁盘(commitLog)中就会丢失。所以当存在高可用场景下,需要考虑该问题。如下图所示。


最终结论

问题:如何避免([REJECTREQUEST]system busy, start flow control for a while)的问题?


  • 开启对应的 transientStorePoolEnable 的配置开启堆外内存模式,实现读写分离,从而减少了直接对 OSPageCache 的读写分离冲突问题。

  • 但是也会带来一个负面问题


本身的 OSPageCache 会保证即使在宕机的时候,也会将数据写入到磁盘中,但是如果开启了 transientStorePoolEnable 的堆外内存,一旦 Broker 宕机了,例如 OOM,还没来的及写入到磁盘(commitLog)中就会丢失。

发布于: 刚刚阅读数: 3
用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【深度挖掘RocketMQ底层源码】「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]system busy, start flow control for a while)_RocketMQ_洛神灬殇_InfoQ写作社区