写点什么

RabbitMQ(五)死信队列

作者:JAVA活菩萨
  • 2022 年 8 月 04 日
  • 本文字数:4555 字

    阅读完需:约 15 分钟

RabbitMQ(五)死信队列文章目录 RabbitMQ(五)死信队列 6. 死信队列​ 死信:无法被消费的消息,在某些时候由于特定的原因到导致了 queue 中某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信,有了死信,就自然有了死信队列


​ 应用场景,为了保证订单业务的数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息发生消费异常时,将消息投入死信队列中。场景:用户在商城下单成功并点击支付后在指定时间未支付时自动失效。


6.1 死信产生的原因


  1. 消息 TTL 过期

  2. 队列达到了最大长度(队列已经排满了,无法再添加到 MQ 中)

  3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false6.2 死信队列实战 6.2.1 代码架构图


6.2.2 消息 TTL 过期​ 我们可以在交换机声明处,或者队列声明处设置消息过期时间 TTL,当消息超过这个时间还没有被消费,则会转入死信队列。


​ 我们在这次实战中通过关闭消费端,由生产者发送十条信息,消息发送后没有被消费,超时后就会发送到死信队列中,由死信队列消费。


生产者 Producer (设置过期时间 TTL,发送消息)


public class Producer {


public static void main(String[] args) throws Exception{


Channel channel = RabbitMqUtil.getChannel();
//死信消息 设置TTL时间 单位是msAMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build();
for (int i = 1; i <= 10; i++) {
String message = "info"+i; channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",properties,message.getBytes("UTF-8")); System.out.println("发送消息 "+message+" 成功");}
复制代码


}}普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)


public class Consumer01 {


public static void main(String[] args)throws Exception {


Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机//普通交换机channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
//过期时间 10s 队列处也可以设置ttl 但是一般在交换机处设置// arguments.put("x-message-ttl",10000);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");
//声明普通队列channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
///声明死信队列channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
//绑定普通队列channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
//绑定死信队列channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
System.out.println("等待接收消息。。。。");//ttl接收消息channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8"); System.out.println("ConsumerO1接收到消息:" + msg); System.out.println("处理完成");}, (consumerTag) -> {
System.out.println("Consumer01 错误消息被中断:" + consumerTag);});
复制代码


}}死信队列消费者 Consumer02 (启动后关闭此队列,观察消息去向后打开消费掉死信)


/**


  • 死信队列, 消费者 2*/public class Consumer02 {

  • public static void main(String[] args)throws Exception {


});


}
复制代码


}​操作步骤查看未发送消息状态下 队列状态


启动生产者代码 发送十条消息 此时正常队列中有十条未消费信息


等待时间过去 10s,等待消息过期,正常队列中的消息由于没有被消费,消息进入死信队列


启动死信队列,可以看到过期的消息被一条一条消费。


6.2.3 队列达到最大长度​ 我们可以在队列声明处设置队列的长度,当队列中消息超过这个长度时,则会转入死信队列。


​ 我们在这次实战中通过给消费者设置队列长度,然后将它关闭,不消费信息,消息发送后超过这个长度,超过这个长度的消息就会发送到死信队列中,由死信队列消费。


生产者 Producer


public class Producer {


public static void main(String[] args) throws Exception{


Channel channel = RabbitMqUtil.getChannel();
for (int i = 1; i <= 10; i++) {
String message = "info"+i; channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8")); System.out.println("发送消息 "+message+" 成功");}
复制代码


}}普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)


public class Consumer01 {


public static void main(String[] args)throws Exception {


Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机//普通交换机channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
//设置队列最大长度,达到这个长度将后续消息转发给c2消费者arguments.put("x-max-length",6);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");
//声明普通队列channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
///声明死信队列channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
//绑定普通队列channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
//绑定死信队列channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
System.out.println("等待接收消息。。。。");//ttl接收消息channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8"); System.out.println("ConsumerO1接收到消息:" + msg); System.out.println("处理完成");}, (consumerTag) -> {
System.out.println("Consumer01 错误消息被中断:" + consumerTag);});
复制代码


}}死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)


/**


  • 死信队列, 消费者 2*/public class Consumer02 {

  • public static void main(String[] args)throws Exception {


});


}
复制代码


}​操作步骤​ 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。


启动两个消费者 然后关闭,打开生产者(可以发现超出长度的消息自动转入死信队列中)


启动两个消费者,可以看到死信队列消费了其中四条消息


6.2.5 消息被拒绝​ 我们可以不使用自动应答,而采用手动应答,在手动应答中的拒绝应答,并且拒绝消息重新入队,若存在死信队列,则消息会转入死信队列。


​ 我们在这次实战中通过在消息应答里拒绝一部分应答,并阻止消息重新入队,这条消息就会发送到死信队列中,由死信队列消费。


生产者 Producer


public class Producer {


public static void main(String[] args) throws Exception{


Channel channel = RabbitMqUtil.getChannel();
for (int i = 1; i <= 10; i++) {
String message = "info"+i; channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8")); System.out.println("发送消息 "+message+" 成功");}
复制代码


}}普通消费者 Consumer01


public class Consumer01 {


public static void main(String[] args)throws Exception {


Channel channel = RabbitMqUtil.getChannel();//声明普通交换机和死信交换机//普通交换机channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
//设置队列最大长度,达到这个长度将后续消息转发给c2消费者arguments.put("x-max-length",6);//正常队列设置死信交换机arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","lisi");
//声明普通队列channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
///声明死信队列channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
//绑定普通队列channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
//绑定死信队列channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
System.out.println("等待接收消息。。。。");channel.basicConsume(QueueName.Normal_Queue.getName(), false,(consumerTag,message)->{
String msg = new String(message.getBody(),"UTF-8"); if ("info5".equals(msg)){
System.out.println(msg+"此消息是被拒绝的!"); //拒绝应答 1为消息标签,2为是否放回普通队列 channel.basicReject(message.getEnvelope().getDeliveryTag(),false); }else {
System.out.println("Consumer01 接收到消息:"+new String(message.getBody(),"UTF-8")); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }
System.out.println("处理完成");},(consumerTag)->{
System.out.println("Consumer01 错误消息被中断:"+consumerTag);});
复制代码


}}死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)


/**


  • 死信队列, 消费者 2*/public class Consumer02 {

  • public static void main(String[] args)throws Exception {


});


}
复制代码


}​操作步骤​ 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。


启动两个消费者 ,打开生产者(可以发现 info5 的消息自动转入死信队列中)


思考:如果拒绝了某一条消息,但是并没有拒绝它重新入队会导致什么?


if ("info5".equals(msg)) {


System.out.println(msg + "此消息是被拒绝的!");//拒绝应答 1 为消息标签,2 为是否放回普通队列//channel.basicReject(message.getEnvelope().getDeliveryTag(), false);channel.basicReject(message.getEnvelope().getDeliveryTag(), true); //拒绝但是允许重新入队}答案是:如果允许重新入队,队列会一直循环处理这一条消息,这条消息也会被循环入队

用户头像

JAVA活菩萨

关注

还未添加个人签名 2022.07.25 加入

还未添加个人简介

评论

发布
暂无评论
RabbitMQ(五)死信队列_Java_JAVA活菩萨_InfoQ写作社区