1
rabbitmq 的死信队列
发布于: 2021 年 11 月 08 日

1.业务背景
如果有有错误消息,如果手动 nack 同时将消息放回到队列中,那么这条消息会反复消费,留在队列中 。
如果 nack 后将消息丢弃,那么如果碰到网络抖动,消息也会丢失 。所以 通过建立死信队列避免消息丢失。
2.实现
文件目录如下:
 
 1.原理
我们额外建立一条队列。当消息进入进入业务队列后,如果收到 nack 那么就将这条消息放入这条条队列中 。
2.修改 pom 文件
       <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>复制代码
 3.修改配置文件
server:  port: 8088spring:  rabbitmq:    host: 192.168.*.*    port: 5672    username: root    password: root    virtual-host: /    listener:      simple:        acknowledge-mode: manual  #手动应答        prefetch: 1 # 每次只处理一个信息    publisher-confirms: true #开启消息确认机制    publisher-returns: true #支持消息发送失败返回队列复制代码
 4.rabbitmq 的配置
@Configurationpublic class RabbitMqConfig {
    /**     * 连接工厂     */    @Autowired    private ConnectionFactory connectionFactory;
    /**     * 定制化amqp模版     *     * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack     * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack     */    @Bean    public RabbitTemplate rabbitTemplate() {        Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);        // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true        rabbitTemplate.setMandatory(true);        // 发送消息确认, yml需要配置 publisher-confirms: true        rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());        // 消息返回, yml需要配置 publisher-returns: true        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {            String correlationId = message.getMessageProperties().getCorrelationId().toString();            logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange,                routingKey);        });        return rabbitTemplate;    }
    /**     * 确认发送消息是否成功(调用util方法)     *     * @return     */    @Bean    public MsgSendConfirmCallBack msgSendConfirmCallBack() {        return new MsgSendConfirmCallBack();    }}复制代码
 5.util 类
发送是否成功的回调方法。
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    /**     * 回调方法     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {        System.out.println("MsgSendConfirmCallBack  , 回调id:" + correlationData);        if (ack) {            System.out.println("消息发送成功");        } else {            //可以将消息写入本地,使用定时任务重新发送            System.out.println("消息发送失败:" + cause + "\n重新发送");        }    }
}复制代码
 这里有一个点,如果想做实现消息失败重新发送,在注释处可以实现。需要将消息写入本地,如果失败从本地读取,然后发送,如果成功删除本地信息。
6.业务队列(如:订单业务)
这里声明了一个业务队列 ,关键点在于 x-dead-letter-exchange,x-dead-letter-routing-key 两个参数。
@Configurationpublic class BusinessConfig {
    /**     * 业务1模块direct交换机的名字     */    public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
    /**     * 业务1 demo业务的队列名称     */    public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
    /**     * 业务1 demo业务的routekey     */    public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
        @Bean    public Queue yewu1DemoDeadQueue() {        // 将普通队列绑定到死信队列交换机上        Map<String, Object> args = new HashMap<>(2);        args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);        args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);        return new Queue("yewu1_demo_dead_queue", true, false, false, args);    }
    /**     * 将消息队列和交换机进行绑定     */    @Bean    public Binding binding_one() {        return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())            .with("yewu1_demo_dead_key");    }}复制代码
 这里有一个点如果想持久化消息到磁盘,需要新建队列时,new Queue 将第二个参数输入为 true,但是面对大并发时效率会变低 。
7.死信队列
这里声明死信队列与绑定关系。
@Configurationpublic class DeadConfig {
    /**     * 死信队列     */    public final static String FAIL_QUEUE_NAME = "fail_queue";
    /**     * 死信交换机     */    public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
    /**     * 死信routing     */    public final static String FAIL_ROUTING_KEY = "fail_routing";
    /**     * 创建配置死信队列     *     */    @Bean    public Queue deadQueue() {        return new Queue(FAIL_QUEUE_NAME, true, false, false);    }
    /**     * 死信交换机     *     * @return     */    @Bean    public DirectExchange deadExchange() {        DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);        return directExchange;    }
    /**     * 绑定关系     *      * @return     */    @Bean    public Binding failBinding() {        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);    }
}复制代码
 8.生产者消费者
生产者与消费者的代码实现。
public enum RabbitEnum {
    /**     * 处理成功     */    ACCEPT,
    /**     * 可以重试的错误     */    RETRY,
    /**     * 无需重试的错误     */    REJECT@RequestMapping("/sendDirectDead")        String sendDirectDead(@RequestBody String message) throws Exception {        System.out.println("开始生产");        CorrelationData data = new CorrelationData(UUID.randomUUID().toString());        rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",                message, data);        System.out.println("结束生产");        System.out.println("发送id:" + data);        return "OK,sendDirect:" + message;    }    @RabbitListener(queues = "yewu1_demo_dead_queue")    protected void consumerDead(Message message, Channel channel) throws Exception {        RabbitEnum ackSign = RabbitEnum.RETRY;        try {            int i = 10 / 0;            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        } catch (Exception e) {            ackSign = RabbitEnum.RETRY;            throw e;        } finally {            // 通过finally块来保证Ack/Nack会且只会执行一次            if (ackSign == RabbitEnum.ACCEPT) {                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            } else if (ackSign == RabbitEnum.RETRY) {                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);            }        }    }复制代码
 9.实验
当发送 yewu1_demo_dead_queue 队列时,如果抛出异常,会放入死信队列中。
划线
评论
复制
发布于: 2021 年 11 月 08 日阅读数: 4

小黄鸡1992
关注
小黄鸡加油 2021.07.13 加入
一位技术落地与应用的博主,带你从入门,了解和使用各项顶流开源项目。











 
    
评论