参考资料
Rocketmq 官网:http://rocketmq.apache.org/
Rocketmq 的其它项目:https://github.com/apache/rocketmq-externals
Rocketmq-console 安装:https://blog.csdn.net/zzzgd_666/article/details/81387237
RocketMQ 的参数指南
NameServer 配置属性
#broker名字,注意此处不同的配置文件填写的不一样
brokerClusterName=rocketmqcluster
brokerName=broker-a
#0 表示 Master, >0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#这个配置可解决双网卡,发送消息走外网的问题,这里配上内网ip就可以了
brokerIP1=10.30.51.149
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=8
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=false
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 0点
deleteWhen=03
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=1000000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/app/data/rocketmq/data
#commitLog 存储路径
storePathCommitLog=/app/data/rocketmq/data/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/app/data/rocketmq/data/consumerqueue
#消息索引存储路径
storePathIndex=/app/data/rocketmq/data/index
#checkpoint 文件存储路径
storeCheckpoint=/app/data/rocketmq/data/checkpoint
#abort 文件存储路径
abortFile=/app/data/rocketmq/data/abort
#限制的消息大小 修改为16M
maxMessageSize=16777216
#发送队列等待时间
waitTimeMillsInSendQueue=3000
osPageCacheBusyTimeOutMills=5000
flushCommitLogLeastPages=12
flushConsumeQueueLeastPages=6
flushCommitLogThoroughInterval=30000
flushConsumeQueueThoroughInterval=180000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
sendMessageThreadPoolNums=80
#拉消息线程池数量
pullMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
复制代码
Rocketmq 发送控制流程
针对前 4 种 broker busy ,主要是由于 Broker 在追加消息时持有的锁时间超过了设置的 1s,Broker 为了自我保护,会抛出错误,客户端会选择其他 broker 服务器进行重试。
如果对不是金融级服务,建议将 transientStorePoolEnable = true,可以有效避免前面 4 种 broker ,因为开启这个参数,消息首先会存储在堆外内存中,并且 RocketMQ 提供了内存锁定的功能,其追加性能能得到一定的保障,这样可以做到在内存使用层面的读写分离,即写消息是直接写入堆外内存,消费消息直接从 pagecache 中读,然后定时将堆外内存的消息写入 pagecache。
但这种方案随之带来的就是可能存在消息丢失,如果对消息非常严谨的话,建议扩容集群,或迁移 topic 到新的集群。
可以看出来,抛出这种错误,在 broker 还没有发送“严重”的 pagecache 繁忙,即消息追加到内存中的最大时延没有超过 1s,通常追加是很快的,绝大部分都会低于 1ms,但可能会由于出现一个超过 200ms 的追加时间,导致排队中的任务等待时间超过了 200ms,则此时会触发 broker 端的快速失败,让请求快速失败,便于客户端快速重试。但是这种请求并不是实时的,而是每隔 10s 检查一遍。
值得注意的是,一旦出现 TIMEOUT_CLEAN_QUEUE,可能在一个点会有多个这样的错误信息,具体多少与当前积压在待发送队列中的个数有关。
Rocketmq 发送时异常
system busy 和 broker busy 解决方案
[REJECTREQUEST]system busy too many requests and system thread pool busy
[PC_SYNCHRONIZED]broker busy
[PCBUSY_CLEAN_QUEUE]broker busy
[TIMEOUT_CLEAN_QUEUE]broker busy
之前写的解决方案,都是基于测试环境测试的.到生产环境之后,正常使用没有问题,生产环境压测时,又出现了 system busy 异常(简直崩溃)
com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 2 DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 208ms, size of queue: 8
For more information, please visit the url, http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:455)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:272)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:253)
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:215)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:671)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:440)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1030)
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:989)
at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90)
at
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
复制代码
报错定位
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
if (!blockingQueue.isEmpty()) {
// 获取队列头元素
final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
final RequestTask rt = castRunnable(runnable);
if (rt == null || rt.isStopRun()) {
break;
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
// 如果头元素对应的任务处理时间超过设置的最大等待时间,则处理请求返回该错误,并移除掉该任务
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
}
} else {
break;
}
} catch (Throwable ignored) {
}
}
}
复制代码
这段代码是 Broker 快速失败机制的核心代码,如果一个等待队列的头元素(也就是第一个要处理或者正在处理的元素)等待时间超过该队列设置的最大等待时间,则丢弃该元素对象的任务,并对这个请求返回[TIMEOUT_CLEAN_QUEUE]broker busy 异常信息。
发送 Topic 消息报该错误
sendThreadPoolQueue 取出头元素,转换成对应的任务,判断任务在队列存活时间是否超过了队列设置的最大等待时间,如果超过了则组装处理返回对象 response,response 的 code 为 RemotingSysResponseCode.SYSTEM_BUSY,内容为:
[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: [当前任务在队列存活时间], size of queue: [当前队列的长度]
复制代码
MQClientAPIImpl.processSendResponse 处理返回 response,根据 response.getCode()的处理分支,最终返回 MQBrokerException 异常,response 分支处理代码如下:
// 只有ResponseCode.SUCCESS的情况下返回结果,其他情况抛出MQBrokerException异常
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
case ResponseCode.SLAVE_NOT_AVAILABLE: {
}
case ResponseCode.SUCCESS: {
// 省略部分代码
return sendResult;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
复制代码
消息发送客户端接收到 MQBrokerException 异常信息,捕获异常处理中不符合消息重试逻辑,直接抛出该异常,也就是用户看到的;// timesTotal 为消息生产者设置的发送失败重试次数
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
// 省略部分代码
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
// 此处为MQBrokerException异常处理逻辑,RemotingSysResponseCode.SYSTEM_BUSY不符合分支条件,最终throw e抛出异常
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
复制代码
生产环境各种参数:
#发送队列等待时间
waitTimeMillsInSendQueue=3000
#系统页面缓存繁忙超时时间(翻译),默认值 1000
osPageCacheBusyTimeOutMills=5000
复制代码
出现问题分析
出现异常的原因是因为我们同一台服务器部署的多个应用造成的。我们一台服务器上部署了 三个 ES、八个 redis、一个 rocketmq ,压力测试时这些都在使用,虽然 cpu、内存都还有很大剩余,但是磁盘 io 和内存频率毕竟只有那么多可能已经占满,或者还有其他都会有影响。
之前测试环境测试其他东西时,发现 mq 和 redis 同时大量使用时,redis 速度会降低三到四倍,由此可见应用分服务器部署的重要性。以前知道会有影响,没想到影响这么大。
最终结解决方案:应该给 rocketmq 单独部署性能较高的服务器.
记一次 rocketmq 使用时的异常。
问题分析总结
system busy , start flow control for a while
该异常会造成 消息丢失。
broker busy , start flow control for a while
该异常不会造成消息丢失。
问题解决过程
1、最开始时候 ,测试发现在性能好的服务器上只会出现 system busy,也就是说出现异常就会消息丢失。
所以:业务代码进行处理,出现异常就会重发到当前 topic 的 bak 队列,当时想的是既然这个 topic busy 了,就换到另外的 topic 去发,总不能都 busy 吧。也算是临时解决了。
2、发现有消息重复的现象。不用想肯定是报 broker busy 异常,重发到 topic 的 bak 队列了。又因为 broker busy 可能不会造成消息丢失,所以消息重复就出现了。
解决方案 1:
修改 rocketmq 配置文件:
sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true
复制代码
sendMessageThreadPoolNums 这个属性是发送线程池大小, rocketmq4.1 版本之后默认为 1,之前版本默认什么不知道但是肯定大于 1。这个属性改成 1 的话,就不用管 useReentrantLockWhenPutMessage 这个属性了;
如果改成大于 1,就需要将 useReentrantLockWhenPutMessage 这个属性设置为 true;
目前测试 未发现这两个方案有什么区别,sendMessageThreadPoolNums=1 时也支持多线程发送,发送速度感觉和 sendMessageThreadPoolNums 大于 1 没有区别,都能跑满 100M 的网卡。
感觉如果 useReentrantLockWhenPutMessage=true 的时候,就是打开锁,然后关键代码其实还是单线程处理;
解决方案 2:
业务逻辑处理中进行异常捕获,如果捕获到异常为 MQBrokerException 并且 responseCode 为 2 则重发消息;
修改 broker 的默认发送消息任务队列等待时长 waitTimeMillsInSendQueue(单位: 毫秒);
除此之外,还可以观察报错时磁盘的 IO 情况,出现这种错误很有可能是当时的磁盘 IO 很高,导致消息落盘时间变长。
参考资料
http://thinkinjava.cn/2019/05/08/2019/2019-05-09-rmq-busy-exp/
https://www.cnblogs.com/zhyg/p/10255656.html
https://blog.csdn.net/prestigeding/article/details/92800672
评论