RabbitMQ(五)死信队列文章目录 RabbitMQ(五)死信队列 6. 死信队列 死信:无法被消费的消息,在某些时候由于特定的原因到导致了 queue 中某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信,有了死信,就自然有了死信队列
应用场景,为了保证订单业务的数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息发生消费异常时,将消息投入死信队列中。场景:用户在商城下单成功并点击支付后在指定时间未支付时自动失效。
6.1 死信产生的原因
消息 TTL 过期
队列达到了最大长度(队列已经排满了,无法再添加到 MQ 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false6.2 死信队列实战 6.2.1 代码架构图
6.2.2 消息 TTL 过期 我们可以在交换机声明处,或者队列声明处设置消息过期时间 TTL,当消息超过这个时间还没有被消费,则会转入死信队列。
我们在这次实战中通过关闭消费端,由生产者发送十条信息,消息发送后没有被消费,超时后就会发送到死信队列中,由死信队列消费。
生产者 Producer (设置过期时间 TTL,发送消息)
public class Producer {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
//死信消息 设置TTL时间 单位是ms
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 1; i <= 10; i++) {
String message = "info"+i;
channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",properties,message.getBytes("UTF-8"));
System.out.println("发送消息 "+message+" 成功");
}
复制代码
}}普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)
public class Consumer01 {
public static void main(String[] args)throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明普通交换机和死信交换机
//普通交换机
channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
//过期时间 10s 队列处也可以设置ttl 但是一般在交换机处设置
// arguments.put("x-message-ttl",10000);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
//声明普通队列
channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
/
//声明死信队列
channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
//绑定普通队列
channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
//绑定死信队列
channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
System.out.println("等待接收消息。。。。");
//ttl接收消息
channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("ConsumerO1接收到消息:" + msg);
System.out.println("处理完成");
}, (consumerTag) -> {
System.out.println("Consumer01 错误消息被中断:" + consumerTag);
});
复制代码
}}死信队列消费者 Consumer02 (启动后关闭此队列,观察消息去向后打开消费掉死信)
/**
});
}操作步骤查看未发送消息状态下 队列状态
启动生产者代码 发送十条消息 此时正常队列中有十条未消费信息
等待时间过去 10s,等待消息过期,正常队列中的消息由于没有被消费,消息进入死信队列
启动死信队列,可以看到过期的消息被一条一条消费。
6.2.3 队列达到最大长度 我们可以在队列声明处设置队列的长度,当队列中消息超过这个长度时,则会转入死信队列。
我们在这次实战中通过给消费者设置队列长度,然后将它关闭,不消费信息,消息发送后超过这个长度,超过这个长度的消息就会发送到死信队列中,由死信队列消费。
生产者 Producer
public class Producer {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
for (int i = 1; i <= 10; i++) {
String message = "info"+i;
channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8"));
System.out.println("发送消息 "+message+" 成功");
}
复制代码
}}普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)
public class Consumer01 {
public static void main(String[] args)throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明普通交换机和死信交换机
//普通交换机
channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
//设置队列最大长度,达到这个长度将后续消息转发给c2消费者
arguments.put("x-max-length",6);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
//声明普通队列
channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
/
//声明死信队列
channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
//绑定普通队列
channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
//绑定死信队列
channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
System.out.println("等待接收消息。。。。");
//ttl接收消息
channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("ConsumerO1接收到消息:" + msg);
System.out.println("处理完成");
}, (consumerTag) -> {
System.out.println("Consumer01 错误消息被中断:" + consumerTag);
});
复制代码
}}死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)
/**
});
}操作步骤 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。
启动两个消费者 然后关闭,打开生产者(可以发现超出长度的消息自动转入死信队列中)
启动两个消费者,可以看到死信队列消费了其中四条消息
6.2.5 消息被拒绝 我们可以不使用自动应答,而采用手动应答,在手动应答中的拒绝应答,并且拒绝消息重新入队,若存在死信队列,则消息会转入死信队列。
我们在这次实战中通过在消息应答里拒绝一部分应答,并阻止消息重新入队,这条消息就会发送到死信队列中,由死信队列消费。
生产者 Producer
public class Producer {
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtil.getChannel();
for (int i = 1; i <= 10; i++) {
String message = "info"+i;
channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8"));
System.out.println("发送消息 "+message+" 成功");
}
复制代码
}}普通消费者 Consumer01
public class Consumer01 {
public static void main(String[] args)throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明普通交换机和死信交换机
//普通交换机
channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
HashMap<String, Object> arguments = new HashMap<>();
//设置队列最大长度,达到这个长度将后续消息转发给c2消费者
arguments.put("x-max-length",6);
//正常队列设置死信交换机
arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key","lisi");
//声明普通队列
channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
/
//声明死信队列
channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
//绑定普通队列
channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
//绑定死信队列
channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
System.out.println("等待接收消息。。。。");
channel.basicConsume(QueueName.Normal_Queue.getName(), false,(consumerTag,message)->{
String msg = new String(message.getBody(),"UTF-8");
if ("info5".equals(msg)){
System.out.println(msg+"此消息是被拒绝的!");
//拒绝应答 1为消息标签,2为是否放回普通队列
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
System.out.println("Consumer01 接收到消息:"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
System.out.println("处理完成");
},(consumerTag)->{
System.out.println("Consumer01 错误消息被中断:"+consumerTag);
});
复制代码
}}死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)
/**
});
}操作步骤 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。
启动两个消费者 ,打开生产者(可以发现 info5 的消息自动转入死信队列中)
思考:如果拒绝了某一条消息,但是并没有拒绝它重新入队会导致什么?
if ("info5".equals(msg)) {
System.out.println(msg + "此消息是被拒绝的!");//拒绝应答 1 为消息标签,2 为是否放回普通队列//channel.basicReject(message.getEnvelope().getDeliveryTag(), false);channel.basicReject(message.getEnvelope().getDeliveryTag(), true); //拒绝但是允许重新入队}答案是:如果允许重新入队,队列会一直循环处理这一条消息,这条消息也会被循环入队
评论