哈喽大家好,我是阿 Q!
前几天领导突然宣布几年前停用的电商项目又重新启动了,带着复杂的心情仔细赏阅“儿时”的代码,心中的酸楚只有自己能够体会。
这不,昨天又被领导叫进了“小黑屋”,让我把代码重构下进行升级。看到这么“可爱”的代码,心中一万只“xx 马”疾驰而过。
让我最深恶痛觉的就是里边竟然用定时任务实现了“关闭超时订单”的功能,现在想来,哭笑不得。我们先分析一波为什么大家都在抵制用定时任务来实现该功能。
定时任务
关闭超时订单是在创建订单之后的一段时间内未完成支付而关闭订单的操作,该功能一般要求每笔订单的超时时间是一致的。
如果我们使用定时任务来进行该操作,很难把握定时任务轮询的时间间隔:
假设 30 分钟订单超时自动关闭,定时任务的执行间隔时间为 30 分钟:
我们在第 5 分钟进行下单操作;
当时间来到第 30 分钟时,定时任务执行一次,但是我们的订单未满足条件,不执行;
当时间来到第 35 分钟时,订单达到关闭条件,但是定时任务未执行,所以不执行;
当时间来到第 60 分钟时,开始执行我们的订单关闭操作,而此时,误差达到 25 分钟。
经此种种,我们需要舍弃该方式。
延时队列
为了满足领导的需求,我便将手伸向了消息队列:RabbitMQ。尽管它本身并没有提供延时队列的功能,但是我们可以利用它的存活时间和死信交换机的特性来间接实现。
首先我们先来简单介绍下什么是存活时间?什么是死信交换机?
存活时间
存活时间的全拼是Time To Live,简称 TTL。它既支持对消息本身进行设置(延迟队列的关键),又支持对队列进行设置(该队列中所有消息存在相同的过期时间)。
如果同时使用这两种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么该消息就“死了”,我们把它称为 死信 消息。
消息变为死信的条件:
队列设置注意事项
队列中该属性的设置要在第一次声明队列的时候设置才有效,如果队列一开始已存在且没有这个属性,则要删掉队列再重新声明才可以;
队列的 ttl 只能被设置为某个固定的值,一旦设置后则不能更改,否则会抛出异常;
死信交换机
死信交换机全拼Dead-Letter-Exchange,简称DLX。
当消息在一个队列中变成死信之后,如果这个消息所在的队列设置了x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换机上,这个交换机就称之为死信交换机,与这个死信交换器绑定的队列就是死信队列。
死信队列与普通队列的区别就是它的RoutingKey和Exchange需要作为参数,绑定到正常的队列上。
实战教学
先来张图感受下我们的整体思路
生产者发送带有 ttl 的消息放入交换机路由到延时队列中;
在延时队列中绑定死信交换机与死信转发的routing-key;
等延时队列中的消息达到延时时间之后变成死信转发到死信交换机并路由到死信队列中;
最后供消费者消费。
我们在上文的基础上进行代码实现:
配置类
@Configurationpublic class DelayQueueRabbitConfig {
public static final String DLX_QUEUE = "queue.dlx";//死信队列 public static final String DLX_EXCHANGE = "exchange.dlx";//死信交换机 public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信队列与死信交换机绑定的routing-key
public static final String ORDER_QUEUE = "queue.order";//订单的延时队列 public static final String ORDER_EXCHANGE = "exchange.order";//订单交换机 public static final String ORDER_ROUTING_KEY = "routingkey.order";//延时队列与订单交换机绑定的routing-key
/** * 定义死信队列 **/ @Bean public Queue dlxQueue(){ return new Queue(DLX_QUEUE,true); }
/** * 定义死信交换机 **/ @Bean public DirectExchange dlxExchange(){ return new DirectExchange(DLX_EXCHANGE, true, false); }
/** * 死信队列和死信交换机绑定 * 设置路由键:routingkey.dlx **/ @Bean Binding bindingDLX(){ return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY); }
/** * 订单延时队列 * 设置队列里的死信转发到的DLX名称 * 设置死信在转发时携带的 routing-key 名称 **/ @Bean public Queue orderQueue() { Map<String, Object> params = new HashMap<>(); params.put("x-dead-letter-exchange", DLX_EXCHANGE); params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(ORDER_QUEUE, true, false, false, params); }
/** * 订单交换机 **/ @Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); }
/** * 把订单队列和订单交换机绑定在一起 **/ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY); }}
复制代码
发送消息
@RequestMapping("/order")public class OrderSendMessageController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage") public String sendMessage(){
String delayTime = "10000"; //将消息携带路由键值 rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY, "发送消息!",message->{ message.getMessageProperties().setExpiration(delayTime); return message; }); return "ok"; }
}
复制代码
消费消息
@Component@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//监听队列名称public class OrderMQReciever {
@RabbitHandler public void process(String message){ System.out.println("OrderMQReciever接收到的消息是:"+ message); }}
复制代码
测试
通过调用接口,发现 10 秒之后才会消费消息
问题升级
由于开发环境和测试环境使用的是同一个交换机和队列,所以发送的延时时间都是 30 分钟。但是为了在测试环境让测试同学方便测试,故手动将测试环境的时间改为了 1 分钟。
问题复现
接着问题就来了:延时时间为 1 分钟的消息并没有立即被消费,而是等 30 分钟的消息被消费完之后才被消费了。至于原因,我们下边再分析,先用代码来给大家复现下该问题。
@GetMapping("/sendManyMessage")public String sendManyMessage(){ send("延迟消息睡10秒",10000+""); send("延迟消息睡2秒",2000+""); send("延迟消息睡5秒",5000+""); return "ok";}
private void send(String msg, String delayTime){ rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY, msg,message->{ message.getMessageProperties().setExpiration(delayTime); return message; });}
复制代码
执行结果如下:
OrderMQReciever接收到的消息是:延迟消息睡10秒OrderMQReciever接收到的消息是:延迟消息睡2秒OrderMQReciever接收到的消息是:延迟消息睡5秒
复制代码
原因就是延时队列也满足队列先进先出的特征,当 10 秒的消息未出队列时,后边的消息不能顺利出队,造成后边的消息阻塞了,未能达到精准延时。
问题解决
我们可以利用x-delay-message插件来解决该问题
消息的延迟范围是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒)
生产者发送消息到交换机时,并不会立即进入,而是先将消息持久化到 Mnesia(一个分布式数据库管理系统);
插件将会尝试确认消息是否过期;
如果消息过期,消息会通过 x-delayed-type 类型标记的交换机投递至目标队列,供消费者消费;
实践
官网下载,我这边使用的是v3.8.0.ez,将文件下载下来放到服务器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路径下,执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。
出现如图所示,代表安装成功。
配置类
@Configurationpublic class XDelayedMessageConfig {
public static final String DIRECT_QUEUE = "queue.direct";//队列 public static final String DELAYED_EXCHANGE = "exchange.delayed";//延迟交换机 public static final String ROUTING_KEY = "routingkey.bind";//绑定的routing-key
/** * 定义队列 **/ @Bean public Queue directQueue(){ return new Queue(DIRECT_QUEUE,true); }
/** * 定义延迟交换机 * args:根据该参数进行灵活路由,设置为“direct”,意味着该插件具有与直连交换机具有相同的路由行为, * 如果想要不同的路由行为,可以更换现有的交换类型如:“topic” * 交换机类型为 x-delayed-message **/ @Bean public CustomExchange delayedExchange(){ Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args); }
/** * 队列和延迟交换机绑定 **/ @Bean public Binding orderBinding() { return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs(); }
}
复制代码
发送消息
@RestController@RequestMapping("/delayed")public class DelayedSendMessageController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendManyMessage") public String sendManyMessage(){
send("延迟消息睡10秒",10000); send("延迟消息睡2秒",2000); send("延迟消息睡5秒",5000); return "ok"; }
private void send(String msg, Integer delayTime){ //将消息携带路由键值 rabbitTemplate.convertAndSend( XDelayedMessageConfig.DELAYED_EXCHANGE, XDelayedMessageConfig.ROUTING_KEY, msg, message->{ message.getMessageProperties().setDelay(delayTime); return message; }); }}
复制代码
消费消息
@Component@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//监听队列名称public class DelayedMQReciever {
@RabbitHandler public void process(String message){ System.out.println("DelayedMQReciever接收到的消息是:"+ message); }}
复制代码
测试
DelayedMQReciever接收到的消息是:延迟消息睡2秒DelayedMQReciever接收到的消息是:延迟消息睡5秒DelayedMQReciever接收到的消息是:延迟消息睡10秒
复制代码
这样我们的问题就顺利解决了。
局限性
延迟的消息存储在一个Mnesia表中,当前节点上只有一个磁盘副本,它们将在节点重启后存活。
虽然触发计划交付的计时器不会持久化,但它将在节点启动时的插件激活期间重新初始化。显然,集群中只有一个预定消息的副本意味着丢失该节点或禁用其上的插件将丢失驻留在该节点上的消息。
该插件的当前设计并不适合延迟消息数量较多的场景(如数万条或数百万条),另外该插件的一个可变性来源是依赖于 Erlang 计时器,在系统中使用了一定数量的长时间计时器之后,它们开始争用调度程序资源,并且时间漂移不断累积。
回复“rabbitMQ”获取源码!
今天的内容就讲这了,阿 Q 将持续更新 java 实战方面的文章,如果你有不同的意见或者更好的 idea,欢迎联系阿 Q。
阿 Q 说代码,值得关注的公众号
文章风格多变,配图通俗易懂,故事生动有趣,来聊聊技术呀!
评论