写点什么

记一次 RabbitMQ 消费者莫名消失问题的排查

作者:EquatorCoco
  • 2024-09-23
    福建
  • 本文字数:10201 字

    阅读完需:约 33 分钟

问题回顾


某天下午,生产监控告警:消息积压,队列 xxx 消息数超过 100;我第一时间想到的是应用服务是不是停了,但应用服务存活监控又没有告警,但我还是找值班运维同事帮忙确认了下,确认结果是服务的 6 个节点都是存活的,然后我又让运维确认了下队列的消费者情况,结果发现消费者列表中只有 2 个节点的消费者,其他 4 个节点的消费者不见了,所以消息消费不过来,导致了消息积压!


所以问题来了


那 4 个节点的注册的消费者为何消失?


但当务之急是解决消息积压的问题,所以让运维重启那 4 个节点的服务,消费者重新注册上,消息得以快速消费,消息积压告警得以恢复


生产问题虽暂时得以解决,但未找到根因,还是存在复发风险;下面就请大家跟随我的脚本,来看看我是如何排查的


问题排查


直接查 ERROR 级别的日志,很容易就能就找到了关键日志


Consumer thread error, thread abort.


以及异常堆栈


java.lang.OutOfMemoryError: Requested array size exceeds VM limit	at java.lang.StringCoding$StringEncoder.encode(StringCoding.java:300)	at java.lang.StringCoding.encode(StringCoding.java:344)	at java.lang.String.getBytes(String.java:918)    ...
复制代码


Consumer thread error, thread abort 大家能看懂吧,就是字面意思


消费者线程错误,线程中止


消费者线程就是我们前面提到的队列消费者,一个队列消费者就是一个消费者线程,消费者线程中止那就意味着队列消费者中止,也就对应文章标题中的 消费者消失;是不是离真相越来越近了?


OutOfMemoryError 是不是很熟悉,内存溢出嘛


OutOfMemoryError 表示 Java 虚拟机在堆内存中没有足够的空间来分配对象


问你们一个问题:OOM 一定会导致 JVM 退出吗,这个问题你们先思考,后面答案会揭晓


回到正题,从关键日志以及异常堆栈,我们是不是可以得出以下推测


OOM 会导致消费者线程中止


有了推测,那就去验证呗;我先给大家模拟下案例,基于 SpringBootpom.xml 关键依赖


<dependencies>    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-amqp</artifactId>    </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency></dependencies>
复制代码


配置文件 application.yml


server:  port: 8088spring:  rabbitmq:    host: 192.168.2.118    port: 5672    username: admin    password: admin    virtual-host: /    listener:      simple:        acknowledge-mode: manual #设置确认模式手工确认        concurrency: 3 #消费者个数,线程个数        prefetch: 1
复制代码


RabbitMQ 配置 TaskRabbitConfig.java


package com.qsl.rabbit.config;
import com.qsl.rabbit.constant.Constant;import com.qsl.rabbit.listener.TaskMessageListener;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;
/** * @author: 青石路 */@Configurationpublic class TaskRabbitConfig {
@Value("${spring.rabbitmq.listener.simple.concurrency:3}") private int concurrency; @Value("${spring.rabbitmq.listener.simple.prefetch:1}") private int prefetch;
@Bean public DirectExchange taskExchange() { return new DirectExchange(Constant.TASK_EXCHANGE, true, false); }
@Bean public Queue taskQueue() { return new Queue(Constant.TASK_QUEUE, true, false, false); }
@Bean public Binding bindingTaskQueue() { return BindingBuilder.bind(taskQueue()).to(taskExchange()).with(Constant.TASK_QUEUE); }
@Bean public SimpleMessageListenerContainer taskMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //设置确认模式手工确认 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setQueueNames(Constant.TASK_QUEUE); //消费者个数,线程个数 container.setConcurrentConsumers(concurrency); //设置预处理个数 container.setPrefetchCount(prefetch);
container.setMessageListener(new TaskMessageListener()); return container; }}
复制代码


消息监听器 TaskMessageListener.java


/** * @author: 青石路 */@Slf4jpublic class TaskMessageListener implements ChannelAwareMessageListener {
@Override public void onMessage(Message message, Channel channel) { String content = new String(message.getBody(), StandardCharsets.UTF_8); log.info("消费者接收到消息:{}", content); handleTask(content); try { // 手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.error("消息确认失败,异常:", e); } }
private void handleTask(String message) { try { // 业务处理 log.info("处理任务:{}", message); log.info("任务处理完成"); } catch (Exception e) { log.error("处理任务失败,异常:", e); } }}
复制代码


业务处理的时候进行 Exception 捕获,并且手动确认消息,我相信你们平时都是这么用的,这难道有什么问题?我调整下 handleTask 方法


/** * 业务处理 * @param message 消息内容 * @author: 青石路 */private void handleTask(String message) {    try {        // 业务处理        log.info("处理任务:{}", message);        int i = 3 / (message.length() % 10);        if (i == 1) {            throw new OutOfMemoryError("模拟内存溢出");        }        log.info("任务处理结果:{}", i);    } catch (Exception e) {        log.error("处理任务失败,异常:", e);    }}
复制代码


启动服务后,队列消费者情况如下



发送消息 a,日志输出如下


2024-09-22 20:15:55|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|20|消费者接收到消息:a2024-09-22 20:15:55|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|37|处理任务:a2024-09-22 20:15:55|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|42|任务处理结果:3
复制代码


相当于业务正常处理;我们再发送消息 abcdefghij,日志输出如下


2024-09-22 20:17:45|taskMessageListenerContainer-3|com.qsl.rabbit.listener.TaskMessageListener|INFO|20|消费者接收到消息:abcdefghij2024-09-22 20:17:45|taskMessageListenerContainer-3|com.qsl.rabbit.listener.TaskMessageListener|INFO|37|处理任务:abcdefghij2024-09-22 20:17:45|taskMessageListenerContainer-3|com.qsl.rabbit.listener.TaskMessageListener|ERROR|44|处理任务失败,异常:java.lang.ArithmeticException: / by zero	at com.qsl.rabbit.listener.TaskMessageListener.handleTask(TaskMessageListener.java:38)	at com.qsl.rabbit.listener.TaskMessageListener.onMessage(TaskMessageListener.java:21)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)	at java.lang.Thread.run(Thread.java:748)
复制代码


被除数为 0,出现 ArithmeticException



相当于业务处理出现了 Exception,而我们进行了 catch,所以日志打印也符合我们的代码逻辑,也不会对消费者线程造成影响,队列消费者还是最初的那 3 个



我们发送消息 ab,日志输出如下


2024-09-22 20:36:31|taskMessageListenerContainer-1|com.qsl.rabbit.listener.TaskMessageListener|INFO|20|消费者接收到消息:ab2024-09-22 20:36:31|taskMessageListenerContainer-1|com.qsl.rabbit.listener.TaskMessageListener|INFO|37|处理任务:ab2024-09-22 20:36:31|taskMessageListenerContainer-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|ERROR|1268|Consumer thread error, thread abort.java.lang.OutOfMemoryError: 模拟内存溢出	at com.qsl.rabbit.listener.TaskMessageListener.handleTask(TaskMessageListener.java:40)	at com.qsl.rabbit.listener.TaskMessageListener.onMessage(TaskMessageListener.java:21)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)	at java.lang.Thread.run(Thread.java:748)2024-09-22 20:36:31|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|20|消费者接收到消息:ab2024-09-22 20:36:31|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|37|处理任务:ab2024-09-22 20:36:31|taskMessageListenerContainer-2|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|ERROR|1268|Consumer thread error, thread abort.java.lang.OutOfMemoryError: 模拟内存溢出	at com.qsl.rabbit.listener.TaskMessageListener.handleTask(TaskMessageListener.java:40)	at com.qsl.rabbit.listener.TaskMessageListener.onMessage(TaskMessageListener.java:21)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)	at java.lang.Thread.run(Thread.java:748)2024-09-22 20:36:31|taskMessageListenerContainer-3|com.qsl.rabbit.listener.TaskMessageListener|INFO|20|消费者接收到消息:ab2024-09-22 20:36:31|taskMessageListenerContainer-3|com.qsl.rabbit.listener.TaskMessageListener|INFO|37|处理任务:ab2024-09-22 20:36:31|taskMessageListenerContainer-3|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|ERROR|1268|Consumer thread error, thread abort.java.lang.OutOfMemoryError: 模拟内存溢出	at com.qsl.rabbit.listener.TaskMessageListener.handleTask(TaskMessageListener.java:40)	at com.qsl.rabbit.listener.TaskMessageListener.onMessage(TaskMessageListener.java:21)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)	at java.lang.Thread.run(Thread.java:748)2024-09-22 20:36:31|taskMessageListenerContainer-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|ERROR|1415|Stopping container from aborted consumer2024-09-22 20:36:31|taskMessageListenerContainer-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|INFO|646|Waiting for workers to finish.2024-09-22 20:36:31|taskMessageListenerContainer-1|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|INFO|649|Successfully waited for workers to finish.
复制代码


可以看到,除了我们的业务日志,还有 spring 的日志;从日志可以看出,消息一共被消费了 3 次,但无一例外,都消费失败了,每次失败日志都包括


Consumer thread error, thread abort.

Stopping container from aborted consumer


我们再去看下队列消费者情况



我们把这个流程捋一捋


消费者线程 taskMessageListenerContainer-1 收到消息,业务处理的时候 OOM 了,Spring 中止该线程,消息未被手动确认,回到队列等待被消费消费者线程 taskMessageListenerContainer-2 收到消息,业务处理的时候又 OOM,Spring 中止该线程,消息未被手动确认,回到队列等待被消费消费者线程 taskMessageListenerContainer-3 收到消息,业务处理的时候扔 OOM,Spring 中止该线程,消息未被手动确认,回到队列等待被消费


全部的 3 个消费者线程都被 Spring 中止了,对应的 3 个队列消费者也就都无了,消息最终回到队列,等待下一个就绪的消费者消费


我们不是 catch 了 Exception 吗,为什么 OutOfMemoryError 还是向上抛给了 Spring ?



OutOfMemoryError 是 Error,并不是 Exception,所以我们的代码并不会捕获 OutOfMemoryError,继续往上抛给了 Spring,而


org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
复制代码


中有这么一段代码


publishConsumerFailedEvent 发布一个消费者失败事件,事件处理器收到该事件后会中止该线程;这里就不展开讲了,后续我再写一篇源码,给你们好好介绍下 Spring 的中止逻辑

至此,OutOfMemoryError 会导致消费者线程中止,大家都清楚了吧;细心的小伙伴可能会有这样的疑问了


照理来说,生产中 6 个节点的消费者线程不应该都被中止吗,为什么还剩 2 个节点的消费者?


这 2 个节点内存比较充足,所以 JVM 的堆内存配置的比较大,它们的消费者线程在处理消息的时候,并不会 OOM;而当天正好是业务人员在进行历史大数据量处理,几轮操作下来,把那 4 个内存比较小的节点的消费者全干没了,只剩下那 2 个内存比较大的节点的消费者了


根因其实是 OutOfMemoryError,当前只知道是


com.fasterxml.jackson.databind.ObjectMapper#writeValueAsString
复制代码


这个方法导致的,具体原因还待进一步排查


问题处理


因为 OutOfMemoryError 的原因没找到,并且是在操作历史大数据量这种很少出现的场景中触发 OutOfMemoryError,也没有导致服务重启,所以暂定方式是将 ERROR 也捕获


/** * 业务处理 * @param message 消息内容 * @author: 青石路 */private void handleTask(String message) {    try {        // 业务处理        log.info("处理任务:{}", message);        int i = 3 / (message.length() % 10);        if (i == 1) {            throw new OutOfMemoryError("模拟内存溢出");        }        log.info("任务处理结果:{}", i);    } catch (Exception | Error e) {        log.error("处理任务失败,异常:", e);    }}
复制代码


重新启动服务,继续消费队列中那条未被消费的消息 ab,此时日志输出如下


2024-09-22 21:38:57|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|20|消费者接收到消息:ab2024-09-22 21:38:57|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|INFO|37|处理任务:ab2024-09-22 21:38:57|taskMessageListenerContainer-2|com.qsl.rabbit.listener.TaskMessageListener|ERROR|44|处理任务失败,异常:java.lang.OutOfMemoryError: 模拟内存溢出	at com.qsl.rabbit.listener.TaskMessageListener.handleTask(TaskMessageListener.java:40)	at com.qsl.rabbit.listener.TaskMessageListener.onMessage(TaskMessageListener.java:21)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489)	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:975)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:921)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1296)	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1202)	at java.lang.Thread.run(Thread.java:748)2024-09-22 21:38:57|main|com.qsl.rabbit.RabbitmqApplication|INFO|61|Started RabbitmqApplication in 1.045 seconds (JVM running for 1.515)
复制代码


虽然业务处理仍然失败,但只有符合我们代码逻辑的错误日志输出,并没有 Spring 的错误日志,此时队列消费者情况如下



当然,这只是缓兵之计,最终解决方案还是要分析 OOM 的原因,然后对症下药


总结


1、示例代码:spring-boot-rabbitmq


2、OOM 不一定会导致 JVM 退出,但是 SimpleMessageListenerContainer 会捕获它,然后中止当前线程,对应的队列消费者也就无了



3、业务代码 catch Error 虽说只是缓兵之计,但从健壮性考虑的话,也是一个不错的解决办法


4、但 OOM 的原因还得继续排查,然后对症下药,这才是最终解决之道


文章转载自:青石路

原文链接:https://www.cnblogs.com/youzhibing/p/18426017

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
记一次 RabbitMQ 消费者莫名消失问题的排查_Rabbit MQ_EquatorCoco_InfoQ写作社区