Springboot & RabbitMQ 延时队列的使用
一、问题来源
针对业务需求当 MQ 异步推送数据上链时,消费服务端通过业务主键 ID 查询业务记录时,由于生产业务逻辑处理,在消费者消费上链数据时,生产者未及时进行事务提交,导致消费者无法查询到业务记录,导致上链失败,需要对上链数据进行延迟处理,确认数据处理完成事务提交在进行上链。
二、什么是延迟队列
针对特殊场景需要进行延时处理的消息,放入延时队列中会被延时消费处理,普通消息队列对于加入的消息会及时消费。使用场景以下几个
延时消费:订单类状态超时处理,数据消费延后补发处理;
延时重试:数据消费失败时想进行过段时间重试消费;
三、实现方案
实现上述 2 种消费方式,需要用到 RabbitMQ 种的两个特性 死信交换器(dead letter exchange) 与 Time-To-Live(ttl 过期时间)
设置队列过期时间实现延迟消费
TTL 指过期时间,rabbitmq 可以通过设置队列的过期时间或者消息的过期时间实现延时消费。
x-message-ttl -过期时间;
x-dead-letter-exchange -到期转发的交换机;
x-dead-letter-routing-key -绑定交换机实际消费 Queue;
1.引入相关的 maven 依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
2.配置文件
rabbitMQ 相关配置
spring:
#配置 rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
3.代码实现
DirectRabbitConfig 配置类中: oneExchange() -是用于重新分配队列的交换机; repeatTradeQueue() -是延时消费的实际队列; repeatTradeBinding() -绑定交换机并指定 routing key,队列数据分配是通过 routingKey -> 交换机 -> BindingKey -> 队列 Queue; deadLetterQueue() -死信队列到期时会通过配置的指定交换机参数(x-dead-letter-exchange) 与分配实际消费队列的 routing key(x-dead-letter-routing-key)分配对应的交换机;
DirectRabbitConfig MQ 配置类
package com.easy.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitMq 配置
*/
@Configuration
public class RabbitmqConfig {
/**
* 交换机用于重新分配队列
*
* @return
*/
@Bean
DirectExchange oneExchange() {
return new DirectExchange("exchange");
}
/**
* 用于延时消费的队列
*
* @return
*/
@Bean
public Queue repeatTradeQueue() {
// durable:是否持久化,默认是 false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是 false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于 durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
Queue queue = new Queue("repeatTradeQueue", true, false, false);
return queue;
}
/**
* 绑定交换机并指定 routing key
*
* @return
*/
@Bean
public Binding repeatTradeBinding() {
return BindingBuilder.bind(repeatTradeQueue()).to(oneExchange()).with("repeatTradeQueue");
}
/**
* 配置死信队列
*
* @return
*/
@Bean
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 3000);
args.put("x-dead-letter-exchange", "exchange");
args.put("x-dead-letter-routing-key", "repeatTradeQueue");
return new Queue("deadLetterQueue", true, false, false, args);
}
}
RabbitProducer 生产者
通过死信队列 发送方法,向队列添加消息
package com.easy.rabbitmq.util;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 生产者
*/
@Component
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 死信队列 消息发送
* @param msg
*/
public void deadLetterSend(String msg) {
System.out.println("DeadLetterSender 发送时间:" + LocalDateTime.now().toString() + " msg 内容:" + msg);
rabbitTemplate.convertAndSend("deadLetterQueue", msg);
}
}
RabbitConsumer 消费者
接受生产者发送过来的数据,此处接受为实际消费队列 repeatTradeQueue
package com.easy.rabbitmq.util;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 消费者
*/
@Component
@RabbitListener(queues = "repeatTradeQueue")
public class RabbitConsumer {
@RabbitHandler
public void process(String msg) {
System.out.println("repeatTradeQueue 接收时间:" + LocalDateTime.now().toString() + " 接收内容:" + msg);
}
}
RabbitTestApi 测试接口
测试发送消息接口,需要设置 expiration 过期时间参数
package com.easy.rabbitmq.controller;
import com.easy.rabbitmq.util.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rabbit")
public class RabbitTestApi {
@Autowired
private RabbitProducer rabbitProducer;
@GetMapping("/deadTest")
public void deadTest() {
rabbitProducer.deadLetterSend("队列设置过期时间测试");
}
}
测试 log 打印
可以看出实际消费时间为延迟了 5 秒钟
2021-11-30 10:24:02.716 INFO 15276 --- [nio-8083-exec-1] o.a.c.c.C.[.[localhost].[/rabbitmq] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-30 10:24:02.717 INFO 15276 --- [nio-8083-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-30 10:24:02.717 INFO 15276 --- [nio-8083-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms
DeadLetterSender 发送时间:2021-11-30T10:24:02.731585800 msg 内容:队列设置过期时间测试
repeatTradeQueue 接收时间:2021-11-30T10:24:05.774777500 接收内容:队列设置过期时间测试
设置消息过期时间实现延迟消费
基本的 config 配置都一样,只是此时针对过期配置时间由队列转变成对单个消息设定过期时间,同时如果队列和消息都设定了过期时间,取 2 者最小的时间来计算过期时间
DirectRabbitConfig MQ 配置类
deadLetterMsgQueue 新的死信队列配置去除了 x-message-ttl 配置,通过对单个消息设定过期时间
package com.easy.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* rabbitMq 配置
*/
@Configuration
public class DirectRabbitConfig {
/**
* 交换机用于重新分配队列
*
* @return
*/
@Bean
DirectExchange oneExchange() {
return new DirectExchange("exchange");
}
/**
* 用于延时消费的实际队列
*
* @return
*/
@Bean
public Queue repeatTradeQueue() {
// durable:是否持久化,默认是 false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是 false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于 durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
Queue queue = new Queue("repeatTradeQueue", true, false, false);
return queue;
}
/**
* 绑定交换机并指定 routing key
*
* @return
*/
@Bean
public Binding repeatTradeBinding() {
return BindingBuilder.bind(repeatTradeQueue()).to(oneExchange()).with("repeatTradeQueue");
}
/**
* 配置死信队列-指定队列过期时间,单位毫秒
*
* @return
*/
@Bean
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 3000);
args.put("x-dead-letter-exchange", "exchange");
args.put("x-dead-letter-routing-key", "repeatTradeQueue");
return new Queue("deadLetterQueue", true, false, false, args);
}
/**
* 配置死信队列-不指定队列过期时间,单位毫秒
*
* @return
*/
@Bean
public Queue deadLetterMsgQueue() {
Map<String, Object> args = new HashMap<>();
// args.put("x-message-ttl", 3000);
args.put("x-dead-letter-exchange", "exchange");
args.put("x-dead-letter-routing-key", "repeatTradeQueue");
return new Queue("deadLetterMsgQueue", true, false, false, args);
}
}
RabbitProducer 生产者
发送方法多了个过期时间设置,单位毫秒,指定当前消息的过期时间
package com.easy.rabbitmq.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 生产者
*/
@Component
@Slf4j
public class RabbitProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
/**
* 死信队列,指定单个消息过期时间,单位毫秒
*
* @param msg
* @param times 过期时间,单位毫秒
*/
public void deadLetterTimeSend(String msg, long times) {
log.info("DeadLetterSender 发送时间: {} msg 内容:{}", LocalDateTime.now().toString(), msg);
MessagePostProcessor processor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(times + "");
return message;
}
};
rabbitTemplate.convertAndSend("deadLetterMsgQueue", (Object) msg, processor);
}
}
RabbitConsumer 消费者
package com.easy.rabbitmq.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 消费者
*/
@Component
@Slf4j
@RabbitListener(queues = "repeatTradeQueue")
public class RabbitConsumer {
@RabbitHandler
public void process(String msg) {
log.info("repeatTradeQueue 接收时间:" + LocalDateTime.now().toString() + " 接收内容:" + msg);
}
}
RabbitTestApi 测试接口
测试发送消息接口,需要设置 expiration 过期时间参数
package com.easy.rabbitmq.controller;
import com.easy.rabbitmq.util.RabbitProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/rabbit")
public class RabbitTestApi {
@Autowired
private RabbitProducer rabbitProducer;
@GetMapping("/deadTest")
public void deadTest() {
// rabbitProducer.deadLetterSend("队列设置过期时间测试");
rabbitProducer.deadLetterTimeSend("消息设置过期时间测试",5000);
}
}
测试 log 打印
可以看出实际消费时间为延迟了 5 秒钟
2021-11-30 10:50:01.266 INFO 1400 --- [nio-8083-exec-1] o.a.c.c.C.[.[localhost].[/rabbitmq] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-30 10:50:01.266 INFO 1400 --- [nio-8083-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2021-11-30 10:50:01.267 INFO 1400 --- [nio-8083-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2021-11-30 10:50:01.280 INFO 1400 --- [nio-8083-exec-1] com.easy.rabbitmq.util.RabbitProducer : DeadLetterSender 发送时间: 2021-11-30T10:50:01.280346600 msg 内容:消息设置过期时间测试
2021-11-30 10:50:06.347 INFO 1400 --- [ntContainer#0-1] com.easy.rabbitmq.util.RabbitConsumer : repeatTradeQueue 接收时间:2021-11-30T10:50:06.342772500 接收内容:消息设置过期时间测试
通过上述的操作可以看出,大数据培训通过设置延迟队列和消息延迟都可以实现延迟消费的需求,我们也可以通过 rabbitMQ 看到所配置的队列的状态信息:
deadLetterQueue 队列我们设置了过期消息,相比较 deadLetterMsgQueue 队列它多了 TTL 设置,也就是过期时间,同时
他们都是 DLX 队列;
TTL:RabbitMQ 的 TTL 全称为 Time-To-Live,表示的是消息的有效期。消息如果在队列中一直没有被消费并且存在时间超过了 TTL,消息就会变成了”死信” (Dead Message),后续无法再被消费了。设置 TTL 有两种方式:
1.第一种是声明队列的时候,在队列的属性中设置,这样该队列中的消息都会有相同的有效期;
2.第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的 TTL。
DLX:DLX 是 Dead-Letter-Exchange 的简写,意思是死信交换机。它的作用其实是用来接收死信消息(dead message)的。
评论