写点什么

面试官:谈谈 RabbitMQ 的队头阻塞问题?

作者:王磊
  • 2025-02-18
    陕西
  • 本文字数:2113 字

    阅读完需:约 7 分钟

面试官:谈谈RabbitMQ的队头阻塞问题?

RabbitMQ 延迟消息的队头阻塞问题是指,在使用死信队列(DLX)和 TTL(消息过期时间)实现延迟消息时,由于队列的先进先出(FIFO)特性,在队列头部消息未过期的情况下,即使后续消息已经过期也不能及时处理的情况

实现原理

RabbitMQ 延迟消息的实现方式有以下两种:


  1. 死信队列+TTL

  2. 使用 rabbitmq-delayed-message-exchange 插件


而我们本文要讨论的“RabbitMQ 延迟消息的队头阻塞问题”只会发生在死信队列+TTL 的实现方式中。


死信队列 + TTL 的实现流程如下:



  1. 生产者先将设置了 TTL(过期时间)的消息发送到普通队列。

  2. 普通队列没有消息者,所以一定会过期,消息过期之后就会发送到死信队列。

  3. 消费者订阅死信队列获取消息,并执行延迟任务。

代码实现

死信队列 + TTL 在 Spring Boot 项目中的实现代码如下。


  1. 定义死信交换器(DLX)和死信队列


// Spring Boot 配置示例@Configurationpublic class RabbitConfig {    // 定义死信交换器    @Bean    public DirectExchange dlxExchange() {        return new DirectExchange("dlx.exchange");    }
// 定义死信队列 @Bean public Queue dlxQueue() { return new Queue("dlx.queue"); }
// 绑定死信队列到 DLX @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key"); }
// 定义普通队列,设置死信交换器和路由键 @Bean public Queue mainQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 可选:设置队列级别的 TTL(所有消息统一过期时间) args.put("x-message-ttl", 10000); // 10秒 return new Queue("main.queue", true, false, false, args); }
// 主队列绑定到默认交换器(根据需要调整) @Bean public Binding mainBinding() { return BindingBuilder.bind(mainQueue()).to(new DirectExchange("default.exchange")).with("main.routing.key"); }}
复制代码


  1. 发送消息时设置 TTL(消息级别)


// 发送延迟消息(消息级别 TTL)public void sendDelayedMessage(String message, int delayMs) {    rabbitTemplate.convertAndSend("default.exchange", "main.routing.key", message, msg -> {        // 设置消息过期时间(覆盖队列级别的 TTL)        msg.getMessageProperties().setExpiration(String.valueOf(delayMs));        return msg;    });}
复制代码


  1. 消费者监听死信队列


@RabbitListener(queues = "dlx.queue")public void handleDelayedMessage(String message) {System.out.println("处理延迟消息: " + message);}
复制代码


所以说消息的过期时间 TTL 的设置方式有以下两种:


  1. 队列级别:通过设置队列的 x-message-ttl 参数,设置队列统一的过期时间。


Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 设置队列消息过期时间为 60 秒channel.queueDeclare(queueName, true, false, false, args);
复制代码


  1. 消息级别:通过给每个消息设置 expiration 属性,为每个消息设置过期时间。


AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()        .deliveryMode(2) // 消息持久化        .expiration("60000") // 设置消息过期时间为 60 秒        .build();channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
复制代码


如果同时设置了消息级 TTL 和队列级 TTL,消息的实际过期时间会取两者中的最小值。

造成队头阻塞的原因

造成队头阻塞的原因有以下两个:


  1. 先进先出的队列特性:队列中的消息必须按顺序处理,即使后面的消息 TTL 较短且已过期,也必须等待队头的消息先被处理(或过期)。

  2. TTL 检查机制:RabbitMQ 默认仅在处理队头消息时检查其 TTL,如果队头消息的 TTL 较长(例如 10 分钟),即使后续消息的 TTL 更短(例如 1 分钟),这些消息也会被阻塞,直到队头消息过期或被移除。


如下图所示:


解决方案

  1. 为不同延迟时间创建独立队列:将相同 TTL 的消息放入同一队列,避免消息的过期时间不一致。

  2. 使用延迟插件:使用 RabbitMQ 的延迟插件 rabbitmq_delayed_message_exchange,直接通过延迟交换机实现延迟消息,绕过死信队列的 FIFO 限制。延迟插件是通过将消息存储到内置数据库 Mnesia,再通过不断判断过期消息,实现延迟消息的投递和执行的,因此它不存在队列的先进先出和队头阻塞的问题。

小结

队头阻塞问题是发生在使用死信队列加 TTL 实现 RabbitMQ 延迟消息的场景中,造成的原因是队列先进先出的特性,加上延迟消息的检查机制导致的,我们可以使用 RabbitMQ 的延迟插件来避免此问题。


那么问题来了,使用延迟插件如何实现延迟任务?它和死信队列的实现方式有哪些具体的区别呢?


本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:场景题、并发编程、MySQL、Redis、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、JVM、设计模式、消息队列等模块。

用户头像

王磊

关注

javacn.site 2018-08-25 加入

我的小站:javacn.site

评论

发布
暂无评论
面试官:谈谈RabbitMQ的队头阻塞问题?_王磊_InfoQ写作社区