写点什么

微服务 SpringCloud 项目:初步整合 rabbitmq

  • 2021 年 11 月 12 日
  • 本文字数:6137 字

    阅读完需:约 20 分钟

port: 5672


addresses: 地址


#开启发送失败返回


publisher-returns: true


#配置确认回调


publisher-confirm-type: correlated


listener:


simple:


#指定最小的消费者数量.


concurrency: 5


#指定最大的消费者数量.


max-concurrency: 10


#开启 ack


acknowledge-mode: auto

最多一次消费多少条数据 -限流

prefetch: 1


#开启 ack


direct:


acknowledge-mode: auto


#支持消息的确认与返回


template:


mandatory: true


复制代码

3. direct exchange(直连型交换机)



创建 DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):


package com.cyj.dream.test.config;


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.DirectExchange;


import org.springframework.amqp.core.Queue;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Description: 直连型交换机

  • 接着我们先使用下 direct exchange(直连型交换机),创建 DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,

  • 在注释里有说明,后面的不同交换机的配置就不做同样说明了):

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.test.config

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-18

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@Configuration


public class DirectRabbitConfig {


/**


  • 队列 起名:TestDirectQueue

  • @return


*/


@Bean


public Queue TestDirectQueue() {


// durable:是否持久化,默认是 false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效


// exclusive:默认也是 false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于 durable


// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。


// return new Queue("TestDirectQueue",true,true,false);


//一般设置一下队列的持久化就好,其余两个就是默认 false


return new Queue("TestDirectQueue", true);


}


/**


  • Direct 交换机 起名:TestDirectExchange

  • @return


*/


@Bean


DirectExchange TestDirectExchange() {


// return new DirectExchange("TestDirectExchange",true,true);


return new DirectExchange("TestDirectExchange", true, false);


}


/**


  • 绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting

  • @return


*/


@Bean


Binding bindingDirect() {


return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");


}


@Bean


DirectExchange lonelyDirectExchange() {


return new DirectExchange("lonelyDirectExchange");


}


}


复制代码


1. SendMessageController 推送消息


package com.cyj.dream.test.controller;


import com.cyj.dream.core.util.user.UUIDUtils;


import org.springframework.amqp.rabbit.core.RabbitTemplate;


import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.web.bind.annotation.GetMapping;


import org.springframework.web.bind.annotation.RestController;


import java.time.LocalDateTime;


import java.time.format.DateTimeFormatter;


import java.util.HashMap;


import java.util.Map;


/**


  • @Description: 消息推送接口

  • 再写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.test.controller

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-18

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@RestController


public class SendMessageController {


/**


  • 使用 RabbitTemplate,这提供了接收/发送等等方法


*/


@Autowired


RabbitTemplate rabbitTemplate;


@GetMapping("/sendDirectMessage")


public String sendDirectMessage() {


String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());


String messageData = "这是一条测试消息, 你好啊骚年!";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> map = new HashMap<>();


map.put("messageId", messageId);


map.put("messageData", messageData);


map.put("createTime", createTime);


//将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange


rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);


return "ok";


}


}


复制代码


2. 把生产者项目项目运行,调用下接口:



因为我们目前还没弄消费者 rabbitmq-consumer,消息没有被消费的,我们可以去 rabbitMq 管理页面看看,是否推送成功,这里就不多说了。


3. 接下来,创建 rabbitmq-consumer 项目:


pom.xml 的依赖加入


<dependency>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-starter-amqp</artifactId>


</dependency>


<dependency>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-starter</artifactId>


</dependency>


复制代码


然后是 application.yml:


spring:

rabbitmq 消息队列配置

rabbitmq:


password: 账号


username: 密码


port: 5672


addresses: 地址


#虚拟 host 可以不设置,使用 server 默认 host


#virtual-host: JCcccHost


#开启发送失败返回


publisher-returns: true


#配置确认回调


publisher-confirm-type: correlated


listener:


simple:


#指定最小的消费者


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


数量.


concurrency: 5


#指定最大的消费者数量.


max-concurrency: 10


#开启 ack


acknowledge-mode: auto

最多一次消费多少条数据 -限流

prefetch: 1


#开启 ack


direct:


acknowledge-mode: auto


#支持消息的确认与返回


template:


mandatory: true


复制代码


4. 创建消息接收监听类


DirectReceiver.java


package com.cyj.dream.file.listener;


import lombok.extern.slf4j.Slf4j;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Description: 消息接收监听类,DirectReceiver.java:

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.file.listener

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-19

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@Slf4j


@Component


@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue


public class DirectReceiver {


@RabbitHandler


public void process(Map testMessage) {


log.info("DirectReceiver 消费者收到消息 : {}", testMessage.toString());


}


}


复制代码


然后将 rabbitmq-consumer 项目运行起来,可以看到把之前推送的那条消息消费下来了:(继续调用 rabbitmq-provider 项目的推送消息接口,你将可以看到消费者即时消费消息:)



等你尝试:直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?将以轮询的方式对消息进行消费,而且不存在重复消费。

4. Topic Exchange 主题交换机。



在生产者项目中创建TopicRabbitConfig.java


package com.cyj.dream.test.config;


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.Queue;


import org.springframework.amqp.core.TopicExchange;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Description: 通配符(话题)模式

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.file.config

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-19

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@Configuration


public class TopicRabbitConfig {


/**


  • 绑定键


*/


public final static String man = "topic.man";


public final static String woman = "topic.woman";


@Bean


public Queue firstQueue() {


return new Queue(TopicRabbitConfig.man);


}


@Bean


public Queue secondQueue() {


return new Queue(TopicRabbitConfig.woman);


}


@Bean


TopicExchange exchange() {


return new TopicExchange("topicExchange");


}


/**


  • 将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man

  • 这样只要是消息携带的路由键是 topic.man,才会分发到该队列

  • @return


*/


@Bean


Binding bindingExchangeMessage() {


return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);


}


/**


  • 将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#

  • 这样只要是消息携带的路由键是以 topic.开头,都会分发到该队列

  • @return


*/


@Bean


Binding bindingExchangeMessage2() {


return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");


}


}


复制代码


1. 在接口添加方法


// ================================= 生产者使用 Topic 话题模式 ================================


@GetMapping("/sendTopicMessage1")


public String sendTopicMessage1() {


String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());


String messageData = "message: M A N ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> manMap = new HashMap<>();


manMap.put("messageId", messageId);


manMap.put("messageData", messageData);


manMap.put("createTime", createTime);


rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);


return "ok";


}


@GetMapping("/sendTopicMessage2")


public String sendTopicMessage2() {


String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());


String messageData = "message: woman is all ";


String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));


Map<String, Object> womanMap = new HashMap<>();


womanMap.put("messageId", messageId);


womanMap.put("messageData", messageData);


womanMap.put("createTime", createTime);


rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);


return "ok";


}


复制代码


2. 在消费者项目创建 TopicManReceiver


package com.cyj.dream.file.listener;


import lombok.extern.slf4j.Slf4j;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Description: Topic 话题模式消费者

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.file.listener

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-19

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@Slf4j


@Component


@RabbitListener(queues = "topic.man")


public class TopicManReceiver {


@RabbitHandler


public void process(Map testMessage) {


log.info("TopicManReceiver 消费者收到消息 : " + testMessage.toString());


}


}


复制代码


3. 再建一个 TopicTotalReceiver 监听 topic.woman


package com.cyj.dream.file.listener;


import lombok.extern.slf4j.Slf4j;


import org.springframework.amqp.rabbit.annotation.RabbitHandler;


import org.springframework.amqp.rabbit.annotation.RabbitListener;


import org.springframework.stereotype.Component;


import java.util.Map;


/**


  • @Description: Topic 话题模式总数监听器

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.file.listener

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-19

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@Slf4j


@Component


@RabbitListener(queues = "topic.woman")


public class TopicTotalReceiver {


@RabbitHandler


public void process(Map testMessage) {


log.info("TopicTotalReceiver 消费者收到消息 : " + testMessage.toString());


}


}


复制代码


4. 添加 主题交换机 配置 TopicRabbitConfig


package com.cyj.dream.file.config;


import org.springframework.amqp.core.Binding;


import org.springframework.amqp.core.BindingBuilder;


import org.springframework.amqp.core.Queue;


import org.springframework.amqp.core.TopicExchange;


import org.springframework.context.annotation.Bean;


import org.springframework.context.annotation.Configuration;


/**


  • @Description: 通配符(话题)模式--消费者一定要加这个配置吗? 不需要的其实

  • @BelongsProject: DreamChardonnay

  • @BelongsPackage: com.cyj.dream.file.config

  • @Author: ChenYongJia

  • @CreateTime: 2021-10-19

  • @Email: chen87647213@163.com

  • @Version: 1.0


*/


@Configuration


public class TopicRabbitConfig {


/**


  • 绑定键


*/


public final static String man = "topic.man";


public final static String woman = "topic.woman";


@Bean


public Queue firstQueue() {


return new Queue(TopicRabbitConfig.man);


}


@Bean


public Queue secondQueue() {


return new Queue(TopicRabbitConfig.woman);


}


@Bean


TopicExchange exchange() {


return new TopicExchange("topicExchange");


}


/**


  • 将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man

  • 这样只要是消息携带的路由键是 topic.man,才会分发到该队列

  • @return


*/


@Bean


Binding bindingExchangeMessage() {


return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);


}


/**


  • 将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#

  • 这样只要是消息携带的路由键是以 topic.开头,都会分发到该队列

  • @return


*/


@Bean


Binding bindingExchangeMessage2() {


return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");


}


}


复制代码


5. 重启生产者和消费者,然后分别调用两个新加的接口,然后看消费者 rabbitmq-consumer 的控制台输出情况:



TopicTotalReceiver 消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }


TopicManReceiver 消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }


TopicTotalReceiver 消费者收到消息 : {createTime=2021-10-22 09:15:11, messageId=0020ad91b22d49d28b49e5bc4d7b3550, messageData=message: woman is all }


复制代码

5. 消息回调



那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。



rabbitmq-provider 项目的 application.yml 文件上,加上消息确认的配置项后:

评论

发布
暂无评论
微服务SpringCloud项目:初步整合rabbitmq