微服务 SpringCloud 项目:初步整合 rabbitmq
port: 5672
addresses: 地址
#开启发送失败返回
publisher-returns: true
#配置确认回调
publisher-confirm-type: correlated
listener:
simple:
#指定最小的消费者数量.
concurrency: 5
#指定最大的消费者数量.
max-concurrency: 10
#开启 ack
acknowledge-mode: auto
最多一次消费多少条数据 -限流
prefetch: 1
#开启 ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true
复制代码
3. direct exchange(直连型交换机)
创建
DirectRabbitConfig.java
(对于队列和交换机持久化以及连接使用设置,在注释里有说明,后面的不同交换机的配置就不做同样说明了):
package com.cyj.dream.test.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
@Description: 直连型交换机
接着我们先使用下 direct exchange(直连型交换机),创建 DirectRabbitConfig.java(对于队列和交换机持久化以及连接使用设置,
在注释里有说明,后面的不同交换机的配置就不做同样说明了):
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.test.config
@Author: ChenYongJia
@CreateTime: 2021-10-18
@Email: chen87647213@163.com
@Version: 1.0
*/
@Configuration
public class DirectRabbitConfig {
/**
队列 起名:TestDirectQueue
@return
*/
@Bean
public Queue TestDirectQueue() {
// durable:是否持久化,默认是 false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是 false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于 durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// return new Queue("TestDirectQueue",true,true,false);
//一般设置一下队列的持久化就好,其余两个就是默认 false
return new Queue("TestDirectQueue", true);
}
/**
Direct 交换机 起名:TestDirectExchange
@return
*/
@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange("TestDirectExchange",true,true);
return new DirectExchange("TestDirectExchange", true, false);
}
/**
绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
@return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange("lonelyDirectExchange");
}
}
复制代码
1. SendMessageController
推送消息
package com.cyj.dream.test.controller;
import com.cyj.dream.core.util.user.UUIDUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
/**
@Description: 消息推送接口
再写个简单的接口进行消息推送(根据需求也可以改为定时任务等等,具体看需求),SendMessageController.java:
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.test.controller
@Author: ChenYongJia
@CreateTime: 2021-10-18
@Email: chen87647213@163.com
@Version: 1.0
*/
@RestController
public class SendMessageController {
/**
使用 RabbitTemplate,这提供了接收/发送等等方法
*/
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "这是一条测试消息, 你好啊骚年!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机 TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
}
复制代码
2. 把生产者项目项目运行,调用下接口:
data:image/s3,"s3://crabby-images/8ffc5/8ffc5ffbd3e0f5943fabbeaef6f2528a4a45b3e4" alt=""
因为我们目前还没弄消费者
rabbitmq-consumer
,消息没有被消费的,我们可以去rabbitMq
管理页面看看,是否推送成功,这里就不多说了。
3. 接下来,创建 rabbitmq-consumer
项目:
pom.xml
的依赖加入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
复制代码
然后是
application.yml:
spring:
rabbitmq 消息队列配置
rabbitmq:
password: 账号
username: 密码
port: 5672
addresses: 地址
#虚拟 host 可以不设置,使用 server 默认 host
#virtual-host: JCcccHost
#开启发送失败返回
publisher-returns: true
#配置确认回调
publisher-confirm-type: correlated
listener:
simple:
#指定最小的消费者
数量.
concurrency: 5
#指定最大的消费者数量.
max-concurrency: 10
#开启 ack
acknowledge-mode: auto
最多一次消费多少条数据 -限流
prefetch: 1
#开启 ack
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true
复制代码
4. 创建消息接收监听类
DirectReceiver.java
package com.cyj.dream.file.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
@Description: 消息接收监听类,DirectReceiver.java:
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.file.listener
@Author: ChenYongJia
@CreateTime: 2021-10-19
@Email: chen87647213@163.com
@Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
public class DirectReceiver {
@RabbitHandler
public void process(Map testMessage) {
log.info("DirectReceiver 消费者收到消息 : {}", testMessage.toString());
}
}
复制代码
然后将
rabbitmq-consumer
项目运行起来,可以看到把之前推送的那条消息消费下来了:(继续调用rabbitmq-provider
项目的推送消息接口,你将可以看到消费者即时消费消息:)
data:image/s3,"s3://crabby-images/3b5f9/3b5f96ec0c9d15240754870da7a11828290080d4" alt=""
等你尝试:直连交换机既然是一对一,那如果咱们配置多台监听绑定到同一个直连交互的同一个队列,会怎么样?将以轮询的方式对消息进行消费,而且不存在重复消费。
4. Topic Exchange
主题交换机。
在生产者项目中创建
TopicRabbitConfig.java
package com.cyj.dream.test.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
@Description: 通配符(话题)模式
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.file.config
@Author: ChenYongJia
@CreateTime: 2021-10-19
@Email: chen87647213@163.com
@Version: 1.0
*/
@Configuration
public class TopicRabbitConfig {
/**
绑定键
*/
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man
这样只要是消息携带的路由键是 topic.man,才会分发到该队列
@return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
/**
将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#
这样只要是消息携带的路由键是以 topic.开头,都会分发到该队列
@return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
复制代码
1. 在接口添加方法
// ================================= 生产者使用 Topic 话题模式 ================================
@GetMapping("/sendTopicMessage1")
public String sendTopicMessage1() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: M A N ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);
return "ok";
}
@GetMapping("/sendTopicMessage2")
public String sendTopicMessage2() {
String messageId = String.valueOf(UUIDUtils.getUUIDNoHave());
String messageData = "message: woman is all ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> womanMap = new HashMap<>();
womanMap.put("messageId", messageId);
womanMap.put("messageData", messageData);
womanMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);
return "ok";
}
复制代码
2. 在消费者项目创建 TopicManReceiver
package com.cyj.dream.file.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
@Description: Topic 话题模式消费者
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.file.listener
@Author: ChenYongJia
@CreateTime: 2021-10-19
@Email: chen87647213@163.com
@Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map testMessage) {
log.info("TopicManReceiver 消费者收到消息 : " + testMessage.toString());
}
}
复制代码
3. 再建一个 TopicTotalReceiver
监听 topic.woman
package com.cyj.dream.file.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
@Description: Topic 话题模式总数监听器
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.file.listener
@Author: ChenYongJia
@CreateTime: 2021-10-19
@Email: chen87647213@163.com
@Version: 1.0
*/
@Slf4j
@Component
@RabbitListener(queues = "topic.woman")
public class TopicTotalReceiver {
@RabbitHandler
public void process(Map testMessage) {
log.info("TopicTotalReceiver 消费者收到消息 : " + testMessage.toString());
}
}
复制代码
4. 添加 主题交换机
配置 TopicRabbitConfig
package com.cyj.dream.file.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
@Description: 通配符(话题)模式--消费者一定要加这个配置吗? 不需要的其实
@BelongsProject: DreamChardonnay
@BelongsPackage: com.cyj.dream.file.config
@Author: ChenYongJia
@CreateTime: 2021-10-19
@Email: chen87647213@163.com
@Version: 1.0
*/
@Configuration
public class TopicRabbitConfig {
/**
绑定键
*/
public final static String man = "topic.man";
public final static String woman = "topic.woman";
@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
将 firstQueue 和 topicExchange 绑定,而且绑定的键值为 topic.man
这样只要是消息携带的路由键是 topic.man,才会分发到该队列
@return
*/
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
/**
将 secondQueue 和 topicExchange 绑定,而且绑定的键值为用上通配路由键规则 topic.#
这样只要是消息携带的路由键是以 topic.开头,都会分发到该队列
@return
*/
@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
}
}
复制代码
5. 重启生产者和消费者,然后分别调用两个新加的接口,然后看消费者 rabbitmq-consumer
的控制台输出情况:
data:image/s3,"s3://crabby-images/f59bf/f59bf71f311023ce5b1fec8b7bdcfcc9fccf7045" alt=""
TopicTotalReceiver 消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }
TopicManReceiver 消费者收到消息 : {createTime=2021-10-22 09:14:45, messageId=54d2732244db48a3af3e579ae8e18465, messageData=message: M A N }
TopicTotalReceiver 消费者收到消息 : {createTime=2021-10-22 09:15:11, messageId=0020ad91b22d49d28b49e5bc4d7b3550, messageData=message: woman is all }
复制代码
5. 消息回调
那么接下来我们继续讲讲消息的回调,其实就是消息确认(生产者推送消息成功,消费者接收消息成功)。
在
rabbitmq-provider
项目的application.yml
文件上,加上消息确认的配置项后:
评论