写点什么

RabbitMQ 如何实现延迟队列?

作者:王磊
  • 2023-09-05
    陕西
  • 本文字数:2546 字

    阅读完需:约 8 分钟

RabbitMQ 如何实现延迟队列?

延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种:


  1. 未按时支付的订单,30 分钟过期之后取消订单。

  2. 给活跃度比较低的用户间隔 N 天之后推送消息,提高活跃度。

  3. 新注册会员的用户,等待几分钟之后发送欢迎邮件等。

1.如何实现延迟队列?

延迟队列有以下两种实现方式:


  1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;

  2. 使用官方提供的延迟插件实现延迟功能。


早期,大部分公司都会采用第一种方式,而随着 RabbitMQ 3.5.7(2015 年底发布)的延迟插件的发布,因为其使用更简单、更方便,所以它现在才是大家普通会采用的,实现延迟队列的方式,所以本文也只讲第二种方式。

2.实现延迟队列

2.1 安装并启动延迟队列

2.1.1 下载延迟插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases


注意:需要根据你自己的 RabbitMQ 服务器端版本选择相同版本的延迟插件,可以在 RabbitMQ 控制台查看:



2.1.2 将插件放到插件目录

接下来,将上一步下载的插件放到 RabbitMQ 服务器安装目录,如果是 docker,使用一下命令复制:


docker cp 宿主机文件 容器名称或 ID:容器目录


如下图所示:



之后,进入 docker 容器,查看插件中是否包含延迟队列:


docker exec -it 容器名称或 ID /bin/bashrabbitmq-plugins list


如下图所示:


2.1.3 启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange


如下图所示:


2.1.4 重启 RabbitMQ 服务

安装完 RabbitMQ 插件之后,需要重启 RabbitMQ 服务才能生效。如果使用的是 Docker,只需要重启 Docker 容器即可:


docker restart 容器名称或 ID


如下图所示:


2.1.5 验收结果

在 RabbitMQ 控制台查看,新建交换机时是否有延迟消息选项,如果有就说明延迟消息插件已经正常运行了,如下图所示:


2.1.6 手动创建延迟交换器(可选)

此步骤可选(非必须),因为某些版本下通过程序创建延迟交换器可能会出错,如果出错了,手动创建延迟队列即可,如下图所示:


2.2 编写延迟消息实现代码

2.2.1 配置交换器和队列

import org.springframework.context.annotation.Configuration;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;
/** * 延迟交换器和队列 */@Configurationpublic class DelayedExchangeConfig { public static final String EXCHANGE_NAME = "myDelayedExchange"; public static final String QUEUE_NAME = "delayed.queue"; public static final String ROUTING_KEY = "delayed.routing.key";
@Bean public CustomExchange delayedExchange() { return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", // 消息类型 true, // 是否持久化 false); // 是否自动删除 }
@Bean public Queue delayedQueue() { return QueueBuilder.durable(QUEUE_NAME) .withArgument("x-delayed-type", "direct") .build(); }
@Bean public Binding delayedBinding(Queue delayedQueue,CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); }}
复制代码

2.1.2 定义消息发送方法

import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;
@Componentpublic class DelayedMessageProducer {
@Autowired private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 5000) public void sendDelayedMessage(String message) { rabbitTemplate.convertAndSend(DelayedExchangeConfig.EXCHANGE_NAME, DelayedExchangeConfig.ROUTING_KEY, message, messagePostProcessor -> { messagePostProcessor.getMessageProperties().setDelay(10000); // 设置延迟时间,单位毫秒 return messagePostProcessor; }); }}
复制代码

2.1.3 发送延迟消息

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
@RestController@RequestMapping("/delayed")public class DelayedMessageController { @Autowired private DelayedMessageProducer delayedMessageProducer;
@GetMapping("/send") public String sendDirectMessage(@RequestParam String message) { delayedMessageProducer.sendDelayedMessage(message); return "Delayed message sent to Exchange: " + message; }}
复制代码

2.1.4 接收延迟消息

import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;

@Componentpublic class DelayedMessageConsumer {
@RabbitListener(queues = DelayedExchangeConfig.QUEUE_NAME) public void receiveDelayedMessage(String message) { System.out.println("Received delayed message: " + message); }}
复制代码


PS:获取本文延迟队列的实现 Demo,请加我:GG_Stone【备注:延迟队列】

小结

实现 RabbitMQ 延迟队列目前主流的实现方式,是采用官方提供的延迟插件来实现。而延迟插件需要先下载插件、然后配置并重启 RabbitMQ 服务,之后就可以通过编写代码的方式实现延迟队列了。


本文已收录到我的面试小站 www.javacn.site,其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。

用户头像

王磊

关注

公众号:Java中文社群 2018-08-25 加入

公众号:Java中文社群

评论

发布
暂无评论
RabbitMQ 如何实现延迟队列?_Java_王磊_InfoQ写作社区