初始 RabbitMQ
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ 服务器是用 Erlang 语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ 优势
可靠性(Reliability):持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
消息集群(Clustering)
高可用(Highly Available Queues)
多种协议(Multi-protocol):支持多种消息队列协议,比如 STOMP、MQTT 等等。
多语言客户端(Many Clients):几乎支持所有常用语言 Java、.NET、Ruby 等等。
管理界面(Management UI):用户可以监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
插件机制(Plugin System):众多查件,方便扩展
RabbitMQ 作用
同步变异步
在用户下单的过程中,比如某东中,下单之后,如果开启微信消息推送和发送邮件。
高内聚低耦合
当多个系统进行交互时,为了多个业务系统相互独立互不影响,而且可以正常通信,只需推送服务系统去订阅订单服务系统在 RabbitMQ 上发布的信息,并完成推送服务。
流量削峰
当大量用户请求服务系统时,若不对用户请求进行数量控制,可能导致服务器崩溃,因此在中间新增一个 RabbitMQ 消息队列,直接将请求的数据信息放到消息队列中。然后将队列中请求依次发送到业务系统进行业务处理。常用的场景有:秒杀系统,
快速开始
POM
本次使用 Maven 进行项目构建,因此引入 pom 依赖。导入 maven 依赖需要 2 个,如下文所示。
<!-- RabbitMQ strat-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- RabbitMQ end-->
复制代码
application.properties
在 application.properties 文件当中引入 RabbitMQ 基本的配置信息,最简单的配置包含:RabbitMQ 的主机地址、端口号、基础的用户名和密码。
##rabbitmq 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
复制代码
RabbitMqConfig
RabbitMQ 配置,主要是配置队列,如果提前存在该队列,可以省略本配置类。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。 其中 durable: 是否持久化,exclusive: 是否排它。
Binding:绑定,它的作用就是把 exchange 和 queue 按照路由规则绑定起来。
Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。
RabbitMqConfig 把交换机,队列,通过路由关键字进行绑定。一个交换机可以绑定多个消息队列,也就是消息通过一个交换机
package com.example.demo.config;
import com.example.demo.constant.RabbitConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
/**
* @ClassName RabbitMqConfig
* @Description: RabbitMQ配置
* @Author JavaZhan @公众号:Java全栈架构师
* @Date 2020/6/12
* @Version V1.0
**/
@Slf4j
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message));
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
/**
* 队列
* */
@Bean
public Queue testQueue() {
return new Queue(RabbitConstant.TEST_QUEUE);
}
/**
* 交换机
* */
@Bean
public DirectExchange testExchange() {
return new DirectExchange(RabbitConstant.TEST_QUEUE_KEE_EXCHANGE,
true,
false);
}
/**
* 队列绑定路由和交换机
* */
@Bean
public Binding taskOrderHandleBinding() {
return BindingBuilder.bind(testQueue()).to(testExchange()).with(RabbitConstant.TEST_QUEUE_KEY);
}
}
复制代码
消息消费者
/**
* @ClassName TestConsumer
* @Description: 消息消费者
* @Author JavaZhan @公众号:Java全栈架构师
* @Date 2020/6/12
* @Version V1.0
**/
@Component
public class TestConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitConstant.TEST_QUEUE),
key = RabbitConstant.TEST_QUEUE_KEY,
exchange = @Exchange(value = RabbitConstant.TEST_QUEUE_KEE_EXCHANGE)
))
@RabbitHandler()
public void getMsg(String messageBody, Message message, Channel channel) throws Exception {
System.out.println(messageBody);
}
}
复制代码
消息生产者
/**
* @ClassName 消息生产者
* @Description: TODO
* @Author JavaZhan @公众号:Java全栈架构师
* @Date 2020/6/12
* @Version V1.0
**/
@Component
public class TestSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String str){
rabbitTemplate.convertAndSend(RabbitConstant.TEST_QUEUE_KEE_EXCHANGE,RabbitConstant.TEST_QUEUE_KEY,str);
}
}
复制代码
RabbitConstant
RabbitMq 的基本配置信息,里面可以包含多个交换机、队列和路由。
package com.example.demo.constant;
/**
* @ClassName RabbitConstant
* @Description: TODO
* @Author JavaZhan @公众号:Java全栈架构师
* @Date 2020/6/12
* @Version V1.0
**/
public interface RabbitConstant {
/**
* 测试队列
* */
String TEST_QUEUE ="TEST_QUEUE";
/**
* 测试路由
* */
String TEST_QUEUE_KEY ="TEST_QUEUE_KEY";
/**
* 测试交换机
* */
String TEST_QUEUE_KEE_EXCHANGE ="TEST_QUEUE_KEY_EXCHANGE";
}
复制代码
测试类
@Test
void testMqSendMsg(){
for(int i =0 ;i<20;i++){
testSender.send("这是消息"+i+"这是一个测试的消息!来自【掘金,小阿杰】");
}
}
复制代码
启动本地 MQ
如下图所示,本地已经那种 RabbitMQ,启动并登陆访问正常。
运行测试用例之后,输出
好了,一个简单的 Spring Boot 集成 RabbitMQ 就搭建完成了。
结语
这样 RabbitMQ 与 Spring Boot 集成成功啦。更多的测试大家可以深入研究一下 RabbitMQ 集群配置的高可用性。
好了,感谢您的阅读,希望您喜欢,如对您有帮助,欢迎点赞收藏。如有不足之处,欢迎评论指正。下次见。
评论