Springboot 整合 ActiveMQ(Queue 和 Topic 两种模式),Java 开发者跳槽指
1、ActiveMQ 简介
Apache ActiveMQ 是 Apache 软件基金会所研发的开放源代码消息中间件;由于 ActiveMQ 是一个纯 Java 程序,因此只需要操作系统支持 Java 虚拟机,ActiveMQ 便可执行。
2、ActiveMQ 下载
下载地址:http://activemq.apache.org/components/classic/download/
下载完成后解压双击 activemq.bat 文件打开(不用安装,直接使用),目录和打开后效果如下:
运行后,浏览器访问 http://localhost:8161/地址进入一下界面。
点击 Manage ActiveMQ broker 登录到 ActiveMQ 管理页面,默认账号和密码都是 admin。管理页面如下:
1、新建 SpringBoot 项目
新建 Springboot 项目,添加对应的依赖。项目完整的 pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
</parent>
<groupId>com.mcy</groupId>
<artifactId>springboot-mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、项目结构
3、相关配置信息
在 application.properties 类中添加 ActiveMQ 相关的配置信息
server.port=8080
server.servlet.context-path=/mq
#MQ 服务器地址
spring.activemq.broker-url=tcp://localhost:61616
#用户名
spring.activemq.user=admin
#密码
spring.activemq.password=admin
#设置是 Queue 队列还是 Topic,false 为 Queue,true 为 Topic,默认 false-Queue
spring.jms.pub-sub-domain=false
#spring.jms.pub-sub-domain=true
#变量,定义队列和 topic 的名称
myqueue: activemq-queue
mytopic: activemq-topic
4、ActiveMQ 配置类
ActiveMQ 配置类 ConfigBean,配置了 Queue 队列和 topic 两种模式,代码如下:
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
import javax.jms.Topic;
/**
MQ 配置类
*/
@Component
@EnableJms
public class ConfigBean {
@Value("${myqueue}")
private String myQueue;
@Value("${mytopic}")
private String topicName;
//队列
@Bean
public ActiveMQQueue queue(){
return new ActiveMQQueue(myQueue);
}
//topic
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
}
队列模式即点对点传输。
点对点消息传递域的特点如下:
每个消息只能有一个消费者,类似于 1 对 1 的关系。好比个人快递自己领自己的。
消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
1、队列生产者
QueueProducerController 类为队列生产者控制器,主要向消息队列中发送消息。代码如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Queue;
/*
队列消息生产者
*/
@RestController
public class QueueProducerController {
@Autowired
p
rivate JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
/*
消息生产者
*/
@RequestMapping("/sendmsg")
public void sendmsg(String msg) {
System.out.println("发送消息到队列:" + msg);
// 指定消息发送的目的地及内容
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
}
}
2、队列消费者
QueueConsumerController 类为队列消费者控制器,具体代码如下:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;
/*
队列 queue 消费者控制器
*/
@RestController
public class QueueConsumerController {
/*
消费者接收消息
*/
@JmsListener(destination="${myqueue}")
public void readActiveQueue(String message) {
System.out.println("接受到:" + message);
}
}
3、测试效果
运行项目在浏览器中访问 http://localhost:8080/mq/sendmsg?msg=123。向消息队列中发送 123。控制台输出效果:
ActiveMQ 控制台显示:
Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量
【注】队列模式时,配置文件 application.properties 中 spring.jms.pub-sub-domain 属性必须设置为 false。
topic 模式基于发布/订阅模式的传输。
基于发布/订阅模式的传输的特点如下:
评论