如何用 RabbitMQ 实现延迟队列
@Configuration
public class TtlDelayRabbitConfig {
//路由 ttl 消息交换机
@Bean("ttlDelayFanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange("TTL_DELAY_FANOUT_EXCHANGE");
}
//ttl 消息队列
@Bean("ttlDelayQueue")
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-message-ttl", 5000);//队列中所有消息 5 秒后过期
map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//过期后进入死信队列
return new Queue("TTL_QUEUE",false,false,false,map);
}
//Fanout 交换机和 productQueue 绑定
@Bean
public Binding bindTtlFanoutExchange(@Qualifier("ttlDelayQueue") Queue queue, @Qualifier("ttlDelayFanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
//fanout 死信交换机
@Bean("ttlDelayDeadLetterExchange")
public FanoutExchange deadLetterExchange(){
return new FanoutExchange("TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");
}
//死信队列
@Bean("ttlDelayDeadLetterQueue")
public Queue ttlDelayDeadLetterQueue(){
return new Queue("TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE");
}
//死信队列和死信交换机绑定
@Bean
public Bindin
g deadLetterQueueBindExchange(@Qualifier("ttlDelayDeadLetterQueue") Queue queue, @Qualifier("ttlDelayDeadLetterExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
}
新建一个消费者
TtlDelayConsumer
类,监听死信队列,这里收到的消息都是生产者生产消息之后的5
秒,也就是延迟了5
秒的消息:
@Component
public class TtlDelayConsumer {
@RabbitHandler
@RabbitListener(queues = "TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE")
public void fanoutConsumer(String msg){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("【延迟队列】【" + sdf.format(new Date()) + "】收到死信队列消息:" + msg);
}
}
新建一个
DelayQueueController
类做生产者来发送消息:
@RestController
@RequestMapping("/delay")
public class DelayQueueController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/ttl/send")
public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){
rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",msg);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送成功【" + sdf.format(new Date()) + "】");
return "succ";
}
}
最后我们在浏览器输入地址
http://localhost:8080/delay/ttl/send?msg=测试ttl延迟队列
进行测试,可以看到每条消息都是在发送5
秒之后才能收到消息:
TTL 延迟队列的问题
假如我们实际中,有的消息是 10
分钟过期,有的是 20
分钟过期,这时候我们就需要建立多个队列,一旦时间维度非常庞大,那么就需要维护非常多的队列。说到这里,可能很多人会有疑问,我们可以针对单条信息设置过期时间,大可不必去定义多个队列?
然而事实真的是如此吗?接下来我们通过一个例子来验证下。
把上面示例中
TtlDelayRabbitConfig
类中的队列定义函数x-message-ttl
属性去掉,不过需要注意的是我们需要先把这个队列后台删除掉,否则同名队列重复创建无效:
@Bean("ttlDelayQueue")
public Queue ttlQueue(){
Map<String, Object> map = new HashMap<String, Object>();
// map.put("x-message-ttl", 5000);//注释掉这个属性,队列不设置过期时间
map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//过期后进入死信队列
return new Queue("TTL_QUEUE",false,false,false,map);
}
然后将
DelayQueueController
类中的发送消息方法修改一下,对每条信息设置过期时间:
@GetMapping(value="/ttl/send")
public String ttlMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg,
@RequestParam(value = "time") String millTimes){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration(millTimes);//单条消息设置过期时间,单位:毫秒
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",message);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息发送成功【" + sdf.format(new Date()) + "】");
return "succ";
}
然后执行
2
条消息发送,一条10
秒过期,一条5
秒过期,先发送10
秒的:
http://localhost:8080/delay/ttl/send?msg=10 秒过期消息 &time=10000
http://localhost:8080/delay/ttl/send?msg=5 秒过期消息 &time=5000
执行之后得到如下结果:
我们看到,两条消息都是 10
秒后过期,这是巧合吗?并不是,这是因为 RabbitMQ
中的机制就是如果前一条消息没有出队,那么即使后一条消息已经失效,也必须要等前一条消息出队之后才能出队,所以这就是为什么一般都尽量避免同一个队列单条消息设置不同过期时间的做法。
死信队列实现的延迟队列缺点
通过以上两个例子,使用死信队列来实现延迟队列,我们可以得到几个很明显的缺点:
如果有非常多的时间点(比如有的
10
分钟过期,有的20
分钟过期等),则需要创建不同的交换机和队列来实现消息的路由。单独设置消息的
TTL
时可能会造成消息的阻塞。因为当前一条消息没有出队,后一条消息即使到期了也不能出队。消息可能会有一定的延迟(上面的示例中就可以看到有一点延迟)。
为了避免 TTL
和死信队列可能造成的问题,所以就非常有必要用一种新的更好的方案来替代实现延迟队列,这就是延时队列插件。
在 RabbitMQ
的 3.5.7
版本之后,提供了一个插件(rabbitmq-delayed-message-exchange
)来实现延迟队列 ,同时需保证 Erlang/OPT
版本为 18.0
之后。
安装延迟队列插件
RabbitMQ
版本在3.5.7-3.7.x
的可以执行以下命令进行下载(也可以直接通过浏览器下载):
如果 RabbitMQ
是 3.8
之后的版本,可以点击这里,找到延迟队列对应版本的插件,然后下载。
下载好之后,将插件上传到
plugins
目录下,执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange
命令启动插件。如果要禁止该插件,则可以执行命令rabbitmq-plugins disable rabbitmq_delayed_message_exchange
(启用插件后需要重启RabbitMQ
才会生效)。
延迟队列插件示例
新建一个
PluginDelayRabbitConfig
配置类:
@Configuration
public class PluginDelayRabbitConfig {
@Bean("pluginDelayExchange")
public CustomExchange pluginDelayExchange() {
Map<String, Object> argMap = new HashMap<>();
argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是 direct,topic 和 fanout
//第二个参数必须为 x-delayed-message
return new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap);
}
@Bean("pluginDelayQueue")
评论