写点什么

快速尝鲜:RabbitMQ 搭建完就得用起来

作者:阿Q说代码
  • 2022 年 4 月 11 日
  • 本文字数:4839 字

    阅读完需:约 16 分钟

快速尝鲜:RabbitMQ 搭建完就得用起来

哈喽大家好,我是阿 Q!

上文我们已经完成了RabbitMQ的安装,安完就要让它发挥点作用,今天就在SpringBoot项目里集成一下子,尝尝鲜!

在项目真正开始之前我们先来简单介绍下RabbitMQ的工作流程:


  • 生产者往交换机中发送消息;

  • 交换机通过规则绑定队列,通过路由键将消息存储到队列中;

  • 消费者获取队列中的消息进行消费;

环境:SpringBoot 2.6.3、JDK 1.8

项目搭建

首先创建SpringBoot项目 rabbit-mq

  1. 引入依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>
复制代码
  1. yml 文件配置

spring:  rabbitmq:    host: 127.0.0.1     //rabbitMQ服务地址    port: 15672   //这个地方暂时先用我们之前配置的15672    username: cheetah   //自己的账户名    password: 123456    //自己的密码
复制代码
  1. 直连交换机

本项目以直连交换机为例,至于其他的交换机类型将在后文中给出详细介绍。

@Configurationpublic class DirectRabbitConfig {
    /**     * 定义交换机     **/    @Bean    public DirectExchange directExchange(){        /**         * 交换机名称         * 持久性标志:是否持久化,默认是 true 即声明一个持久的 exchange,该exchange将在服务器重启后继续运行         * 自动删除标志:是否自动删除,默认为 false, 如果服务器想在 exchange不再使用时删除它,则设置为 true         **/        return new DirectExchange("directExchange", true, false);    }
    /**     * 定义队列     **/    @Bean    public Queue directQueue(){        /**         * name:队列名称         * durable:是否持久化,默认是 true,持久化队列,会被存储在磁盘上,当消息代理重启时仍然存在         * exclusive:是否排他,默认为 false,true则表示声明了一个排他队列(该队列将仅由声明者连接使用),如果连接关闭,则队列被删除。此参考优先级高于durable         * autoDelete:是否自动删除, 默认是 false,true则表示当队列不再使用时,服务器删除该队列         **/        return new Queue("directQueue",true);    }
    /**     * 队列和交换机绑定     * 设置路由键:directRouting     **/    @Bean    Binding bindingDirect(){        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting");    }

}
复制代码
  1. 消息发送

@RestControllerpublic class SendMessageController {
    @Autowired    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMessage")    public String sendMessage(){        //将消息携带路由键值        rabbitTemplate.convertAndSend("directExchange", "directRouting", "发送消息!");        return "ok";    }
}
复制代码

我们先启动程序,在浏览器访问下

http://127.0.0.1:9001/sendMessage

报错如下:


我们之前已经给该用户分配过权限了,如果之前未分配,直接在客户端中配置:



之所以访问不到,是因为我们使用的端口号不正确


所以我们需要将端口改为 5672(如果是阿里云服务器实例,需要将该端口开放权限

我们再来访问下

http://127.0.0.1:9001/sendMessage

请求返回"OK",控制台输出


客户端相关页面截图如下:






  1. 消息消费

@Component@RabbitListener(queues = "directQueue")//监听队列名称public class MQReciever {
    @RabbitHandler    public void process(String message){        System.out.println("接收到的消息是:"+ message);    }}
复制代码

启动项目,发现消息已经被消费。


为了防止消息丢失,RabbitMQ增加了消息确认机制:生产者消息确认机制和消费者消息确认机制。

确认机制

一、生产者消息确认机制

  1. yml中增加配置信息

spring:  rabbitmq:    #确认消息已发送到交换机(Exchange)    publisher-confirm-type: correlated    #确认消息已发送到队列(Queue)    publisher-returns: true
复制代码

spring.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果

  1. 增加回调

@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true);
 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {  @Override  public void confirm(CorrelationData correlationData, boolean ack, String cause) {   System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);   System.out.println("ConfirmCallback:     "+"确认情况:"+ack);   System.out.println("ConfirmCallback:     "+"原因:"+cause);  } });
 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){  @Override  public void returnedMessage(ReturnedMessage returned) {   System.out.println("ReturnCallback:     "+"消息:"+returned.getMessage());   System.out.println("ReturnCallback:     "+"回应码:"+returned.getReplyCode());   System.out.println("ReturnCallback:     "+"回应信息:"+returned.getReplyText());   System.out.println("ReturnCallback:     "+"交换机:"+returned.getExchange());   System.out.println("ReturnCallback:     "+"路由键:"+returned.getRoutingKey());  } }); return rabbitTemplate;}
复制代码
  • confirm机制是只保证消息到达exchange,并不保证消息可以路由到正确的queue

  • 当前的exchange不存在或者指定的路由key路由不到才会触发return机制

大家可以自行演示以下情况的执行结果:

  • 不存在交换机和队列

  • 存在交换机,不存在队列

  • 消息推送成功

二、消费者消息的确认机制

默认情况下如果一个消息被消费者正确接收则会从队列中移除。如果一个队列没被任何消费者订阅,那么这个队列中的消息会被缓存,当有消费者订阅时则会立即发送,进而从队列中移除。

消费者消息的确认机制可以分为以下 3 种:

  1. 自动确认

AcknowledgeMode.NONE 默认为自动确认,不管消费者是否成功处理了消息,消息都会从队列中被移除。

  1. 根据情况确认

AcknowledgeMode.AUTO 根据方法的执行情况来决定是否确认还是拒绝(是否重新入队列)

  • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认

  • 当抛出AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且消息不会重回队列

  • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认

  • 其他的异常,则消息会被拒绝,并且该消息会重回队列,如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的。可以通过 setDefaultRequeueRejected(默认是true)去设置

可能造成消息丢失,一般是需要我们在try-catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

  1. 手动确认

AcknowledgeMode.MANUAL对于手动确认,也是我们工作中最常用到的,它的用法如下:

/* * 肯定确认 * deliveryTag:消息队列数据的唯一id * multiple:是否批量  * true :一次性确认所有小于等于deliveryTag的消息 * false:对当前消息进行确认; */channel.basicAck(long deliveryTag, boolean multiple); 
复制代码


/* * 否定确认 * multiple:是否批量  *   true:一次性拒绝所有小于deliveryTag的消息 *   false:对当前消息进行确认; * requeue:被拒绝的是否重新入列, *   true:就是将数据重新丢回队列里,那么下次还会消费这消息; *   false:就是拒绝处理该消息,服务器把该消息丢掉即可。  */channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);
复制代码


/* * 用于否定确认,但与basicNack相比有一个限制,一次只能拒绝单条消息 */channel.basicReject(long deliveryTag, boolean requeue);  
复制代码

手动确认

在 yml 配置中开启手动确认模式

spring:  rabbitmq:    listener:      simple:        acknowledge-mode: manual
复制代码

或者在代码中开启

@Configurationpublic class MessageListenerConfig {
    @Autowired    private CachingConnectionFactory connectionFactory;
    @Autowired    private MQReciever mqReciever;//消息接收处理类
    @Bean    public SimpleMessageListenerContainer simpleMessageListenerContainer(){        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);        //并发使用者的数量        container.setConcurrentConsumers(1);        //消费者人数上限        container.setMaxConcurrentConsumers(1);        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息        //设置一个队列,此处支持设置多个        container.setQueueNames("directQueue");        container.setMessageListener(mqReciever);        return container;    }}
复制代码

消息消费类

@Component@RabbitListener(queues = "directQueue")//监听队列名称public class MQReciever implements ChannelAwareMessageListener {
    @Override    public void onMessage(Message message, Channel channel) throws Exception {        long deliveryTag = message.getMessageProperties().getDeliveryTag();        try {            String msg = message.toString();            String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据            System.out.println("消费的消息内容:"+msgArray[1]);            System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());                        //业务处理            ......                        channel.basicAck(deliveryTag, true);                    } catch (Exception e) {            //拒绝重新入队列            channel.basicReject(deliveryTag, false);               e.printStackTrace();        }    }}
复制代码

无 ack:效率高,存在丢失大量消息的风险;有 ack:效率低,不会丢消息。

以上就是今天的全部内容了。阿 Q 将持续更新java实战方面的文章,感兴趣的可以关注下。


阿Q说代码,值得关注的公众号

文章风格多变,配图通俗易懂,故事生动有趣,来聊聊技术呀!


发布于: 2022 年 04 月 11 日阅读数: 38
用户头像

阿Q说代码

关注

公众号:阿Q说代码 | 🏆 签约作者 🏆 2021.06.08 加入

目前就职于世界五百强企业公司,担任技术leader,文章风格多变,配图通俗易懂,故事生动有趣!

评论

发布
暂无评论
快速尝鲜:RabbitMQ 搭建完就得用起来_RabbitMQ_阿Q说代码_InfoQ写作平台