写点什么

🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」Broker 配置介绍及发送流程、异常 (XX Busy) 问题分析

作者:浩宇天尚
  • 2021 年 12 月 10 日
  • 本文字数:7294 字

    阅读完需:约 24 分钟

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析

参考资料

  • Rocketmq 官网:http://rocketmq.apache.org/

  • Rocketmq 的其它项目:https://github.com/apache/rocketmq-externals

  • Rocketmq-console 安装:https://blog.csdn.net/zzzgd_666/article/details/81387237

RocketMQ 的参数指南

NameServer 配置属性

#broker名字,注意此处不同的配置文件填写的不一样brokerClusterName=rocketmqclusterbrokerName=broker-a#0 表示 Master, >0 表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#这个配置可解决双网卡,发送消息走外网的问题,这里配上内网ip就可以了brokerIP1=10.30.51.149#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=8#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=false#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 0点deleteWhen=03#文件保留时间,默认 48 小时fileReservedTime=48#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=1000000destroyMapedFileIntervalForcibly=120000redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/app/data/rocketmq/data#commitLog 存储路径storePathCommitLog=/app/data/rocketmq/data/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/app/data/rocketmq/data/consumerqueue#消息索引存储路径storePathIndex=/app/data/rocketmq/data/index#checkpoint 文件存储路径storeCheckpoint=/app/data/rocketmq/data/checkpoint#abort 文件存储路径abortFile=/app/data/rocketmq/data/abort#限制的消息大小 修改为16MmaxMessageSize=‭16777216‬#发送队列等待时间waitTimeMillsInSendQueue=3000osPageCacheBusyTimeOutMills=5000flushCommitLogLeastPages=12flushConsumeQueueLeastPages=6flushCommitLogThoroughInterval=30000flushConsumeQueueThoroughInterval=180000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=ASYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=ASYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量sendMessageThreadPoolNums=80#拉消息线程池数量pullMessageThreadPoolNums=128useReentrantLockWhenPutMessage=true
复制代码

Rocketmq 发送控制流程


针对前 4 种 broker busy ,主要是由于 Broker 在追加消息时持有的锁时间超过了设置的 1s,Broker 为了自我保护,会抛出错误,客户端会选择其他 broker 服务器进行重试。


如果对不是金融级服务,建议将 transientStorePoolEnable = true,可以有效避免前面 4 种 broker ,因为开启这个参数,消息首先会存储在堆外内存中,并且 RocketMQ 提供了内存锁定的功能,其追加性能能得到一定的保障,这样可以做到在内存使用层面的读写分离,即写消息是直接写入堆外内存,消费消息直接从 pagecache 中读,然后定时将堆外内存的消息写入 pagecache。


但这种方案随之带来的就是可能存在消息丢失,如果对消息非常严谨的话,建议扩容集群,或迁移 topic 到新的集群。


可以看出来,抛出这种错误,在 broker 还没有发送“严重”的 pagecache 繁忙,即消息追加到内存中的最大时延没有超过 1s,通常追加是很快的,绝大部分都会低于 1ms,但可能会由于出现一个超过 200ms 的追加时间,导致排队中的任务等待时间超过了 200ms,则此时会触发 broker 端的快速失败,让请求快速失败,便于客户端快速重试。但是这种请求并不是实时的,而是每隔 10s 检查一遍。


值得注意的是,一旦出现 TIMEOUT_CLEAN_QUEUE,可能在一个点会有多个这样的错误信息,具体多少与当前积压在待发送队列中的个数有关。

Rocketmq 发送时异常

system busy 和 broker busy 解决方案


  • [REJECTREQUEST]system busy too many requests and system thread pool busy

  • [PC_SYNCHRONIZED]broker busy

  • [PCBUSY_CLEAN_QUEUE]broker busy

  • [TIMEOUT_CLEAN_QUEUE]broker busy


之前写的解决方案,都是基于测试环境测试的.到生产环境之后,正常使用没有问题,生产环境压测时,又出现了 system busy 异常(简直崩溃)


com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)  at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030)  at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989)  at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90)  at   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)  at java.lang.Thread.run(Thread.java:748)
复制代码

报错定位

  • cleanExpiredRequestInQueue 会处理发送消息、拉取消息、心跳、事务消息队列中的数据,此次遇到的问题是发送 Topic 消息报出来的错误,所以接下来针对发送消息流程进行分析。

  • 报出此错误的源码位置为 broker 快速失败机制 BrokerFastFailure.java 类(该类在 Broker 启动时会启动一个定时任务,每 10 毫秒执行一次),报错位置代码如下:


void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {        while (true) {            try {                if (!blockingQueue.isEmpty()) {                    // 获取队列头元素                    final Runnable runnable = blockingQueue.peek();                    if (null == runnable) {                        break;                    }                    final RequestTask rt = castRunnable(runnable);                    if (rt == null || rt.isStopRun()) {                        break;                    }
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp(); // 如果头元素对应的任务处理时间超过设置的最大等待时间,则处理请求返回该错误,并移除掉该任务 if (behind >= maxWaitTimeMillsInQueue) { if (blockingQueue.remove(runnable)) { rt.setStopRun(true); rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size())); } } else { break; } } else { break; } } catch (Throwable ignored) { } } }
复制代码


这段代码是 Broker 快速失败机制的核心代码,如果一个等待队列的头元素(也就是第一个要处理或者正在处理的元素)等待时间超过该队列设置的最大等待时间,则丢弃该元素对象的任务,并对这个请求返回[TIMEOUT_CLEAN_QUEUE]broker busy 异常信息。

发送 Topic 消息报该错误

sendThreadPoolQueue 取出头元素,转换成对应的任务,判断任务在队列存活时间是否超过了队列设置的最大等待时间,如果超过了则组装处理返回对象 response,response 的 code 为 RemotingSysResponseCode.SYSTEM_BUSY,内容为:


[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: [当前任务在队列存活时间], size of queue: [当前队列的长度]
复制代码


MQClientAPIImpl.processSendResponse 处理返回 response,根据 response.getCode()的处理分支,最终返回 MQBrokerException 异常,response 分支处理代码如下:


// 只有ResponseCode.SUCCESS的情况下返回结果,其他情况抛出MQBrokerException异常private SendResult processSendResponse(        final String brokerName,        final Message msg,        final RemotingCommand response    ) throws MQBrokerException, RemotingCommandException {        switch (response.getCode()) {            case ResponseCode.FLUSH_DISK_TIMEOUT:            case ResponseCode.FLUSH_SLAVE_TIMEOUT:            case ResponseCode.SLAVE_NOT_AVAILABLE: {            }            case ResponseCode.SUCCESS: {                // 省略部分代码                return sendResult;            }            default:                break;        }        throw new MQBrokerException(response.getCode(), response.getRemark());    }
复制代码


消息发送客户端接收到 MQBrokerException 异常信息,捕获异常处理中不符合消息重试逻辑,直接抛出该异常,也就是用户看到的;// timesTotal 为消息生产者设置的发送失败重试次数


for (; times < timesTotal; times++) {                String lastBrokerName = null == mq ? null : mq.getBrokerName();                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);                if (mqSelected != null) {                    mq = mqSelected;                    brokersSent[times] = mq.getBrokerName();                    try {                        // 省略部分代码                    } catch (RemotingException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQClientException e) {                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        continue;                    } catch (MQBrokerException e) {                        // 此处为MQBrokerException异常处理逻辑,RemotingSysResponseCode.SYSTEM_BUSY不符合分支条件,最终throw e抛出异常                        endTimestamp = System.currentTimeMillis();                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                        log.warn(msg.toString());                        exception = e;                        switch (e.getResponseCode()) {                            case ResponseCode.TOPIC_NOT_EXIST:                            case ResponseCode.SERVICE_NOT_AVAILABLE:                            case ResponseCode.SYSTEM_ERROR:                            case ResponseCode.NO_PERMISSION:                            case ResponseCode.NO_BUYER_ID:                            case ResponseCode.NOT_IN_CURRENT_UNIT:                                continue;                            default:                                if (sendResult != null) {                                    return sendResult;                                }
throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString());
log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } }
复制代码

生产环境各种参数:

  • broker busy 异常: 可通过增大 waitTimeMillsInSendQueue 解决

  • system busy 异常:可通过增大 osPageCacheBusyTimeOutMills 解决


#发送队列等待时间waitTimeMillsInSendQueue=3000#系统页面缓存繁忙超时时间(翻译),默认值 1000osPageCacheBusyTimeOutMills=5000
复制代码

出现问题分析

出现异常的原因是因为我们同一台服务器部署的多个应用造成的。我们一台服务器上部署了 三个 ES、八个 redis、一个 rocketmq ,压力测试时这些都在使用,虽然 cpu、内存都还有很大剩余,但是磁盘 io 和内存频率毕竟只有那么多可能已经占满,或者还有其他都会有影响。


之前测试环境测试其他东西时,发现 mq 和 redis 同时大量使用时,redis 速度会降低三到四倍,由此可见应用分服务器部署的重要性。以前知道会有影响,没想到影响这么大。


最终结解决方案:应该给 rocketmq 单独部署性能较高的服务器.


记一次 rocketmq 使用时的异常。

问题分析总结
  1. system busy , start flow control for a while


该异常会造成 消息丢失。


  1. broker busy , start flow control for a while


该异常不会造成消息丢失。

问题解决过程

1、最开始时候 ,测试发现在性能好的服务器上只会出现 system busy,也就是说出现异常就会消息丢失。


所以:业务代码进行处理,出现异常就会重发到当前 topic 的 bak 队列,当时想的是既然这个 topic busy 了,就换到另外的 topic 去发,总不能都 busy 吧。也算是临时解决了。


2、发现有消息重复的现象。不用想肯定是报 broker busy 异常,重发到 topic 的 bak 队列了。又因为 broker busy 可能不会造成消息丢失,所以消息重复就出现了。

解决方案 1:

修改 rocketmq 配置文件:


  • 方案一:sendMessageThreadPoolNums 改成 1 ,没有的话新增一行。sendMessageThreadPoolNums=1

  • 方案二:useReentrantLockWhenPutMessage 改成 true,没有的话新增一行。


sendMessageThreadPoolNums=32useReentrantLockWhenPutMessage=true
复制代码


sendMessageThreadPoolNums 这个属性是发送线程池大小, rocketmq4.1 版本之后默认为 1,之前版本默认什么不知道但是肯定大于 1。这个属性改成 1 的话,就不用管 useReentrantLockWhenPutMessage 这个属性了;


如果改成大于 1,就需要将 useReentrantLockWhenPutMessage 这个属性设置为 true;


目前测试 未发现这两个方案有什么区别,sendMessageThreadPoolNums=1 时也支持多线程发送,发送速度感觉和 sendMessageThreadPoolNums 大于 1 没有区别,都能跑满 100M 的网卡。


感觉如果 useReentrantLockWhenPutMessage=true 的时候,就是打开锁,然后关键代码其实还是单线程处理;

解决方案 2:
  1. 业务逻辑处理中进行异常捕获,如果捕获到异常为 MQBrokerException 并且 responseCode 为 2 则重发消息;

  2. 修改 broker 的默认发送消息任务队列等待时长 waitTimeMillsInSendQueue(单位: 毫秒);


除此之外,还可以观察报错时磁盘的 IO 情况,出现这种错误很有可能是当时的磁盘 IO 很高,导致消息落盘时间变长。

参考资料

  • http://thinkinjava.cn/2019/05/08/2019/2019-05-09-rmq-busy-exp/

  • https://www.cnblogs.com/zhyg/p/10255656.html

  • https://blog.csdn.net/prestigeding/article/details/92800672

发布于: 10 小时前阅读数: 12
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“ 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、APM专题及微服务/分布式体系等

评论

发布
暂无评论
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」Broker配置介绍及发送流程、异常(XX Busy)问题分析