写点什么

Springboot 整合 RabbitMq ,用心看完这一篇就够了

  • 2021 年 11 月 11 日
  • 本文字数:10632 字

    阅读完需:约 35 分钟

//Direct 交换机 起名:TestDirectExchange


@Bean


DirectExchange TestDirectExchange() {


// return new DirectExchange("TestDirectExchange",true,true);


return new DirectExchange("TestDirectExchange",true,false);


}


//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting


@Bean


Binding bindingDirect() {


return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");


}


@Bean


DirectExchange lonelyDirectExchange() {


return new DirectExchange("lonelyDirectExchange");


}


}


然后写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:


import org.springframework.amqp.rabbit.core.RabbitTemplate;


import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.web.bind.annotation.GetMapping;


import org.springframework.web.bind.annotation.RestController;


import java.time.LocalDateTime;


import java.time.format.DateTimeFormatter;


import java.util.HashMap;


import java.util.Map;


import java.util.UUID;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@RestController


public class SendMessageController {


@Autowired


RabbitTemplate rabbitTemplate; //使用 RabbitTemplate,这提供了接收/发送等等方法


@GetMapping("/sendDirectMessage")


public String sendDirectMessage() {


String messageId = String.valueOf(UUID.randomUUID());


String messageData = "test message, hello!";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String,Object> map=new HashMap<>();


map.put("messageId",messageId);


map.put("messageData",messageData);


map.put("createTime",createTime);


//将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange


rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);


return "ok";


}


}


把 rabbitmq-provider 项目运行,调用下接口:



因为我们目前还没弄消费者?rabbitmq-consumer,消息没有被消费的,我们去 rabbitMq 管理页面看看,是否推送成功:



再看看队列(界面上的各个英文项代表什么意思,可以自己查查哈,对理解还是有帮助的):



很好,消息已经推送到 rabbitMq 服务器上面了。


接下来,创建 rabbitmq-consumer 项目:


pom.xml 里的 jar 依赖:


<dependency>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-starter-amqp</artifactId>


</dependency>


<dependency>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-starter</artifactId>


</dependency>


然后是?application.yml:


server:


port: 8022


spring:


#给项目来个名字


application:


name: rabbitmq-consumer


#配置 rabbitMq 服务器


rabbitmq:


host: 127.0.0.1


port: 5672


username: root


password: root


#虚拟 host 可以不设置,使用 server 默认 host


virtual-host: JCcccHost


然后一样,创建 DirectRabbitConfig.java(消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。配置上了的话,其实消费者也是生成者的身份,也能推送该消息。):


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.DirectExchange;


import org.springframework.amqp.core.Queue;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Configuration


public class DirectRabbitConfig {


//队列 起名:TestDirectQueue


@Bean


public Queue TestDirectQueue() {


return new Queue("TestDirectQueue",true);


}


//Direct 交换机 起名:TestDirectExchange


@Bean


DirectExchange TestDirectExchange() {


return new DirectExchange("TestDirectExchange");


}


//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting


@Bean


Binding bindingDirect() {


return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");


}


}


然后是创建消息接收监听类,DirectReceiver.java:


@Component


@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue


public class DirectReceiver {


@RabbitHandler


public void process(Map testMessage) {


System.out.println("DirectReceiver 消费者收到消息 : " + testMessage.toString());


}


}


然后将 rabbitmq-consumer 项目运行起来,可以看到把之前推送的那条消息消费下来了:



然后可以再继续调用 rabbitmq-provider 项目的推送消息接口,可以看到消费者即时消费消息:



那么直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?



可以看到是实现了轮询的方式对消息进行消费,而且不存在重复消费。


接着,我们使用 Topic Exchange 主题交换机。


在 rabbitmq-provider 项目里面创建 TopicRabbitConfig.java:




import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.Queue;


import org.springframework.amqp.core.TopicExchange;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Configuration


public class TopicRabbitConfig {


//绑定键


public final static String man = "topic.man";


public final static String woman = "topic.woman";


@Bean


public Queue firstQueue() {


return new Queue(TopicRabbitConfig.man);


}


@Bean


public Queue secondQueue() {


return new Queue(TopicRabbitConfig.woman);


}


@Bean


TopicExchange exchange() {


return new TopicExchange("topicExchange");


}


//将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man


//这样只要是消息携带的路由键是 topic.man,才会分发到该队列


@Bean


Binding bindingExchangeMessage() {


return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);


}


//将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#


// 这样只要是消息携带的路由键是以 topic.开头,都会分发到该队列


@Bean


Binding bindingExchangeMessage2() {


return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");


}


}


然后添加多 2 个接口,用于推送消息到主题交换机:


@GetMapping("/sendTopicMessage1")


public String sendTopicMessage1() {


String messageId = String.valueOf(UUID.randomUUID());


String messageData = "message: M A N ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> manMap = new HashMap<>();


manMap.put("messageId", messageId);


manMap.put("messageData", messageData);


manMap.put("createTime", createTime);


rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);


return "ok";


}


@GetMapping("/sendTopicMessage2")


public String sendTopicMessage2() {


String messageId = String.valueOf(UUID.randomUUID());


String messageData = "message: woman is all ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> womanMap = new HashMap<>();


womanMap.put("messageId", messageId);


womanMap.put("messageData", messageData);


womanMap.put("createTime", createTime);


rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);


return "ok";


}


}


生产者这边已经完事,先不急着运行,在 rabbitmq-consumer 项目上,创建 TopicManReceiver.java:


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Component


@RabbitListener(queues = "topic.man")


public class TopicManReceiver {


@RabbitHandler


public void process(Map testMessage) {


System.out.println("TopicManReceiver 消费者收到消息 : " + testMessage.toString());


}


}


再创建一个 TopicTotalReceiver.java:


package com.elegant.rabbitmqconsumer.receiver;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Component


@RabbitListener(queues = "topic.woman")


public class TopicTotalReceiver {


@RabbitHandler


public void process(Map testMessage) {


System.out.println("TopicTotalReceiver 消费者收到消息 : " + testMessage.toString());


}


}


同样,加主题交换机的相关配置,TopicRabbitConfig.java(消费者一定要加这个配置吗? 不需要的其实,理由在前面已经说过了。):


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.Queue;


import org.springframework.amqp.core.TopicExchange;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Configuration


public class TopicRabbitConfig {


//绑定键


public final static String man = "topic.man";


public final static String woman = "topic.woman";


@Bean


public Queue firstQueue() {


return new Queue(TopicRabbitConfig.man);


}


@Bean


public Queue secondQueue() {


return new Queue(TopicRabbitConfig.woman);


}


@Bean


TopicExchange exchange() {


return new TopicExchange("topicExchange");


}


//将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man


//这样只要是消息携带的路由键是 topic.man,才会分发到该队列


@Bean


Binding bindingExchangeMessage() {


return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);


}


//将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#


// 这样只要是消息携带的路由键是以 topic.开头,都会分发到该队列


@Bean


Binding bindingExchangeMessage2() {


return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");


}


}


然后把 rabbitmq-provider,rabbitmq-consumer 两个项目都跑起来,先调用/sendTopicMessage1? 接口:



然后看消费者 rabbitmq-consumer 的控制台输出情况:


TopicManReceiver 监听队列 1,绑定键为:topic.man


TopicTotalReceiver 监听队列 2,绑定键为:topic.#


而当前推送的消息,携带的路由键为:topic.man??


所以可以看到两个监听消费者 receiver 都成功消费到了消息,因为这两个 recevier 监听的队列的绑定键都能与这条消息携带的路由键匹配上。



接下来调用接口/sendTopicMessage2:



然后看消费者 rabbitmq-consumer 的控制台输出情况:


TopicManReceiver 监听队列 1,绑定键为:topic.man


TopicTotalReceiver 监听队列 2,绑定键为:topic.#


而当前推送的消息,携带的路由键为:topic.woman


所以可以看到两个监听消费者只有 TopicTotalReceiver 成功消费到了消息。



接下来是使用 Fanout Exchang 扇型交换机。


同样地,先在 rabbitmq-provider 项目上创建 FanoutRabbitConfig.java:


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.FanoutExchange;


import org.springframework.amqp.core.Queue;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Configuration


public class FanoutRabbitConfig {


/**


  • 创建三个队列 :fanout.A fanout.B fanout.C

  • 将三个队列都绑定在交换机 fanoutExchange 上

  • 因为是扇型交换机, 路由键无需配置,配置也不起作用


*/


@Bean


public Queue queueA() {


return new Queue("fanout.A");


}


@Bean


public Queue queueB() {


return new Queue("fanout.B");


}


@Bean


public Queue queueC() {


return new Queue("fanout.C");


}


@Bean


FanoutExchange fanoutExchange() {


return new FanoutExchange("fanoutExchange");


}


@Bean


Binding bindingExchangeA() {


return BindingBuilder.bind(queueA()).to(fanoutExchange());


}


@Bean


Binding bindingExchangeB() {


return BindingBuilder.bind(queueB()).to(fanoutExchange());


}


@Bean


Binding bindingExchangeC() {


return BindingBuilder.bind(queueC()).to(fanoutExchange());


}


}


然后是写一个接口用于推送消息,


@GetMapping("/sendFanoutMessage")


public String sendFanoutMessage() {


String messageId = String.valueOf(UUID.randomUUID());


String messageData = "message: testFanoutMessage ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> map = new HashMap<>();


map.put("messageId", messageId);


map.put("messageData", messageData);


map.put("createTime", createTime);


rabbitTemplate.convertAndSend("fanoutExchange", null, map);


return "ok";


}


接着在 rabbitmq-consumer 项目里加上消息消费类,


FanoutReceiverA.java:


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Component


@RabbitListener(queues = "fanout.A")


public class FanoutReceiverA {


@RabbitHandler


public void process(Map testMessage) {


System.out.println("FanoutReceiverA 消费者收到消息 : " +testMessage.toString());


}


}


FanoutReceiverB.java:


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


impo


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


rt org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Component


@RabbitListener(queues = "fanout.B")


public class FanoutReceiverB {


@RabbitHandler


public void process(Map testMessage) {


System.out.println("FanoutReceiverB 消费者收到消息 : " +testMessage.toString());


}


}


FanoutReceiverC.java:


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Component


@RabbitListener(queues = "fanout.C")


public class FanoutReceiverC {


@RabbitHandler


public void process(Map testMessage) {


System.out.println("FanoutReceiverC 消费者收到消息 : " +testMessage.toString());


}


}


然后加上扇型交换机的配置类,FanoutRabbitConfig.java(消费者真的要加这个配置吗? 不需要的其实,理由在前面已经说过了):


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.FanoutExchange;


import org.springframework.amqp.core.Queue;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Configuration


public class FanoutRabbitConfig {


/**


  • 创建三个队列 :fanout.A fanout.B fanout.C

  • 将三个队列都绑定在交换机 fanoutExchange 上

  • 因为是扇型交换机, 路由键无需配置,配置也不起作用


*/


@Bean


public Queue queueA() {


return new Queue("fanout.A");


}


@Bean


public Queue queueB() {


return new Queue("fanout.B");


}


@Bean


public Queue queueC() {


return new Queue("fanout.C");


}


@Bean


FanoutExchange fanoutExchange() {


return new FanoutExchange("fanoutExchange");


}


@Bean


Binding bindingExchangeA() {


return BindingBuilder.bind(queueA()).to(fanoutExchange());


}


@Bean


Binding bindingExchangeB() {


return BindingBuilder.bind(queueB()).to(fanoutExchange());


}


@Bean


Binding bindingExchangeC() {


return BindingBuilder.bind(queueC()).to(fanoutExchange());


}


}


最后将 rabbitmq-provider 和 rabbitmq-consumer 项目都跑起来,调用下接口/sendFanoutMessage :



然后看看 rabbitmq-consumer 项目的控制台情况:



可以看到只要发送到?fanoutExchange 这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息。


到了这里其实三个常用的交换机的使用我们已经完毕了,那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。


在 rabbitmq-provider 项目的 application.yml 文件上,加上消息确认的配置项后:


ps: 本篇文章使用 springboot 版本为?2.1.7.RELEASE ;?


如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为版本导致的配置项不起效,


可以把 publisher-confirms:?true 替换为? publisher-confirm-type: correlated


server:


port: 8021


spring:


#给项目来个名字


application:


name: rabbitmq-provider


#配置 rabbitMq 服务器


rabbitmq:


host: 127.0.0.1


port: 5672


username: root


password: root


#虚拟 host 可以不设置,使用 server 默认 host


virtual-host: JCcccHost


#消息确认配置项


#确认消息已发送到交换机(Exchange)


publisher-confirms: true


#确认消息已发送到队列(Queue)


publisher-returns: true


然后是配置相关的消息确认回调函数,RabbitConfig.java:


import org.springframework.amqp.core.Message;


import org.springframework.amqp.rabbit.connection.ConnectionFactory;


import org.springframework.amqp.rabbit.connection.CorrelationData;


import org.springframework.amqp.rabbit.core.RabbitTemplate;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Author : JCccc

  • @CreateTime : 2019/9/3

  • @Description :


**/


@Configuration


public class RabbitConfig {


@Bean


public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){


RabbitTemplate rabbitTemplate = new RabbitTemplate();


rabbitTemplate.setConnectionFactory(connectionFactory);


//设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数


rabbitTemplate.setMandatory(true);


rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {


@Override


public void confirm(CorrelationData correlationData, boolean ack, String cause) {


System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);


System.out.println("ConfirmCallback: "+"确认情况:"+ack);


System.out.println("ConfirmCallback: "+"原因:"+cause);


}


});


rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {


@Override


public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {


System.out.println("ReturnCallback: "+"消息:"+message);


System.out.println("ReturnCallback: "+"回应码:"+replyCode);


System.out.println("ReturnCallback: "+"回应信息:"+replyText);


System.out.println("ReturnCallback: "+"交换机:"+exchange);


System.out.println("ReturnCallback: "+"路由键:"+routingKey);


}


});


return rabbitTemplate;


}


}


到这里,生产者推送消息的消息确认调用回调函数已经完毕。


可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;


那么以上这两种回调函数都是在什么情况会触发呢?


先从总体的情况分析,推送消息存在四种情况:


①消息推送到 server,但是在 server 里找不到交换机


②消息推送到 server,找到交换机了,但是没找到队列


③消息推送到 sever,交换机和队列啥都没找到


④消息推送成功


那么我先写几个接口来分别测试和认证下以上 4 种情况,消息确认触发回调函数的情况:


①消息推送到 server,但是在 server 里找不到交换机


写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):


@GetMapping("/TestMessageAck")


public String TestMessageAck() {


String messageId = String.valueOf(UUID.randomUUID());


String messageData = "message: non-existent-exchange test message ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> map = new HashMap<>();


map.put("messageId", messageId);


map.put("messageData", messageData);


map.put("createTime", createTime);


rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);


return "ok";


}


调用接口,查看 rabbitmq-provuder 项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):


2019-09-04 09:37:45.197 ERROR 8172 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)


ConfirmCallback: 相关数据:null


ConfirmCallback: 确认情况:false


ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost 'JCcccHost', class-id=60, method-id=40)


结论: ①这种情况触发的是?ConfirmCallback 回调函数。


②消息推送到 server,找到交换机了,但是没找到队列??


这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在 DirectRabitConfig 里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:


@Bean


DirectExchange lonelyDirectExchange() {


return new DirectExchange("lonelyDirectExchange");


}


然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):


@GetMapping("/TestMessageAck2")


public String TestMessageAck2() {


String messageId = String.valueOf(UUID.randomUUID());


String messageData = "message: lonelyDirectExchange test message ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> map = new HashMap<>();


map.put("messageId", messageId);


map.put("messageData", messageData);


map.put("createTime", createTime);


rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);


return "ok";


}


调用接口,查看 rabbitmq-provuder 项目的控制台输出情况:


ReturnCallback: 消息:(Body:'{createTime=2019-09-04 09:48:01, messageId=563077d9-0a77-4c27-8794-ecfb183eac80, messageData=message: lonelyDirectExchange test message }' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])


ReturnCallback: 回应码:312


ReturnCallback: 回应信息:NO_ROUTE


ReturnCallback: 交换机:lonelyDirectExchange


ReturnCallback: 路由键:TestDirectRouting


ConfirmCallback: 相关数据:null


ConfirmCallback: 确认情况:true


ConfirmCallback: 原因:null


可以看到这种情况,两个函数都被调用了;


这种情况下,消息是推送成功到服务器了的,所以 ConfirmCallback 对消息确认情况是 true;


而在 RetrunCallback 回调函数的打印参数里面可以看到,消息是推送到了交换机成功了,但是在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。


结论:②这种情况触发的是?ConfirmCallback 和 RetrunCallback 两个回调函数。


③消息推送到 sever,交换机和队列啥都没找到?


这种情况其实一看就觉得跟①很像,没错 ,③和①情况回调是一致的,所以不做结果说明了。


结论: ③这种情况触发的是?ConfirmCallback 回调函数。


④消息推送成功


那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendFanoutMessage 接口,可以看到控制台输出:


ConfirmCallback: 相关数据:null


ConfirmCallback: 确认情况:true


ConfirmCallback: 原因:null


结论: ④这种情况触发的是?ConfirmCallback 回调函数。


以上是生产者推送消息的消息确认 回调函数的使用介绍(可以在回调函数根据需求做对应的扩展或者业务数据处理)。

接下来我们继续, 消费者接收到消息的消息确认机制。

评论

发布
暂无评论
Springboot 整合RabbitMq ,用心看完这一篇就够了