干货|SpringBoot JMS(ActiveMQ)API 实践应用详解
- 2023-04-27 江苏
本文字数:6393 字
阅读完需:约 21 分钟
前言
Active 是一种开源的,实现了 JMS1.1 规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。AC-tiveMQ 使用 Apache 提供的授权,任何人都可以对其实现代码进行修改。
ActiveMQ 的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。ActiveMQ 实现了 JMS 标准并提供了很多附加的特性。本文将带大家详细介绍 ActiveMQ 的 API 的使用。
1. JMS 的概念?
「什么是 JMS 呢:」
JMS---------JAVA Message Service
JAVA 的消息服务,是 sun 公司提供的接口,只是一个规范,这个规范就类似于 JDBC 是一样的,使用的时候是需要当前规范的实现产品的。
「JMS 能干什么呢:」
能够将信息发布到目的地
可以从目的地来消费这个消息
2、两种通信模型
「队列的通信概念:」
特点:当我们同一个队列有多个消费者的时候,多个消费者的数据之和才是原来队列中的所有数据
队列的通信模型最大的适用场景:流量的消峰,高并发的处理
「主题的通信模型:」
特点:当我们队列有多个消费者的时候,那么这多个消费者消费到的数据是一样的
主题消费者通信模型的适用场景:微服务下服务之间的异步通信
3. MQ 的实现产品
「实现产品:」
ActiveMQ
RabbitMQ
RockerMQ
Kafka(这个设计的初衷是做分布式的日志的,后来因为日志有严格的顺序问题,这个时候人们就用 Kafka 来做消息队列了)
4、JMS 中常见的名词
「常见的名词:」
ActiveMQConnectionFactory:这个是创建连接的工厂
ConnectionFactory:连接的工厂
Connection:连接 JAVA 对 MQ 的一个连接
Destination:目的地
生产者(Producer)
消费者(Consumer)
Session:会话(每一次对 MQ 的操作都称为一次会话)
Queue:队列
Topic:主题
5、什么是消息队列
「消息队列简单的说就是用来存放临时数据的地方:」
生产者----------->存储介质上
消费者----------->存储介质上
「消息队列类似于快递公司:」
你可以将东西交给快递公司
目标人也可以从快递公司去取东西
6. ActiveMQ 是什么
「含义:」
ActiveMQ 就是一个 JMS 的实现产品,它能够实现 JMS 下的所有功能
7、ActiveMQ 能干什么
「主要作用:」
流量消峰处理
微服务下模块的异步通信
处理高并发下的订单
处理第三方平台的高并发
协助消息表可以完成分布式事务的最终一致性
8、ActiveMQ 的安装
「ActiveMQ 的安装和配置:」
1、官网下载Linux版的ActiveMQ(最新版本为5.13.4) https://activemq.apache.org/download.html 2、解压安装 tar -zxvf apache-activemq-5.13.4-bin.tar.gz 3、配置(这里采用默认配置,无需修改) vim /usr/lical/activemq-1/conf/activemq.xml 4、启动 cd /usr/local/activemq-1/bin./activemq start 5、打开管理界面(管理界面可以查看并管理所有队列及消息) http://192.168.1.100:8161 启动成功后,可以浏览 http://localhost:8161/admin/ 默认用户名、密码:admin/admin 管理界面是用jetty做容器的,如果想修改管理界面的端口,可以编辑../conf/jetty.xml,找到下面这一段: <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean> 用户名/密码是在 ../conf/jetty-realm.properties 里,比如要增加一个管理员jimmy/123456,可参考下面修改: 123admin: admin, adminjimmy: 123456, adminuser: user, user 注:管理界面有一个小坑,ActiveMQ 5.13.2与jdk1.8兼容性有点问题,如果使用jdk1.8,管理界面进入Queues标签页时,偶尔会报错,但是并不影响消息正常收发,只是无法从界面上查看队列情况,如果出现该问题,可将jdk版本降至1.7,同时最好清空data目录下的所有数据,再重启activemq即可。9. ActiveMQ 的 API 的使用
「AcatveMQ 的 API 使用:」
队列的使用(生产者)
package com.qy.mq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.Message;import javax.jms.*;/** * @Auther: qianyu * @Date: 2020/11/04 14:12 * @Description:生产者 */public class Producer { //准备发布的这个地址 private static final String PATH="tcp://10.7.182.87:61616"; //ActiveMQ下的用户名 private static final String userName="admin"; //ActiveMQ下的密码 private static final String password="admin"; public static void main(String[] args) throws JMSException { //第一步:创建连接的工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); //通过这个工厂获取连接 Connection connection = activeMQConnectionFactory.createConnection(); //第三步:打开这个连接 connection.start(); //第四步:创建操作MQ的这个会话 /** * 第一个参数:是否使用事务 * 第二个参数:客户端的应答模式 * 第一种:自动应答 * 第二种:客户端手动应答 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //需要发送消息的目的地(queue操作的是队列) Destination destination=session.createQueue("wqqq"); //生产者来生产这个消息 //要有生产者 MessageProducer messageProducer = session.createProducer(destination); //发送很多的消息到消息队列中去// for (int i=0;i<100;i++){ //需要准备发送的消息// TextMessage textMessage = session.createTextMessage("我是浅羽:"+i); //研究消息的类型 /* BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.setByteProperty("www",new Byte("123")); //接下来就可以发送消息了 messageProducer.send(bytesMessage);*/ //创建map类型的message /*MapMessage mapMessage = session.createMapMessage(); mapMessage.setInt("www1",123); messageProducer.send(mapMessage);*/ ObjectMessage objectMessage = session.createObjectMessage(new User(1, "qianyu", "123")); messageProducer.send(objectMessage);// } }}队列的使用(消费者)
package com.qy.mq.queue;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;import java.io.Serializable;/** * @Auther: qianyu * @Date: 2020/11/04 14:13 * @Description:消费者 */public class Consumer { //准备发布的这个地址 private static final String PATH="tcp://10.7.182.87:61616"; //ActiveMQ下的用户名 private static final String userName="admin"; //ActiveMQ下的密码 private static final String password="admin"; public static void main(String[] args) throws JMSException { //第一步:创建连接的工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); //通过这个工厂获取连接 Connection connection = activeMQConnectionFactory.createConnection(); //第三步:打开这个连接 connection.start(); //第四步:创建操作MQ的这个会话 /** * 第一个参数:是否使用事务 * 第二个参数:客户端的应答模式 * 第一种:自动应答 * 第二种:客户端手动应答 */ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //需要发送消息的目的地(queue操作的是队列) Destination destination=session.createQueue("wqqq"); //创建我们的消费者了 MessageConsumer messageConsumer = session.createConsumer(destination); //接下来就可以接收我们的消息了 //Message message = messageConsumer.receive(); //接收消息并指定这个超时的时间// Message message = messageConsumer.receive(5000); //接收消息没有就不等待了 直接over了 不接收了// Message message = messageConsumer.receiveNoWait(); //给定当前的路径设置监听器 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { /* BytesMessage bytesMessage= (BytesMessage) message; try { System.out.println("获取到的数据是:"+bytesMessage.getByteProperty("www")); } catch (JMSException e) { e.printStackTrace(); }*/ /*MapMessage mapMessage= (MapMessage) message; try { System.out.println("获取到的数据是:"+mapMessage.getInt("www1")); } catch (JMSException e) { e.printStackTrace(); }*/ //测试对象类型的消息的发送和接收 ObjectMessage objectMessage= (ObjectMessage) message; try { User user = (User) objectMessage.getObject(); System.out.println("接收到的数据是:"+user); } catch (JMSException e) { e.printStackTrace(); } /* //我们知道是一个字符串类型的消息 TextMessage textMessage= (TextMessage) message; //接下来就可以打印这个消息了 try { System.out.println("消费者1---接收到的消息是:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); }*/ try { //这句话就表示的是客户端来手动的进行应答 message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); }}主题模型的生产者
package com.qy.mq.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * @Auther: qianyu * @Date: 2020/11/04 15:17 * @Description: */public class Producer { //准备发布的这个地址 private static final String PATH="tcp://10.7.182.87:61616"; //ActiveMQ下的用户名 private static final String userName="admin"; //ActiveMQ下的密码 private static final String password="admin"; public static void main(String[] args) throws JMSException { //第一步:创建连接的工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); //通过这个工厂获取连接 Connection connection = activeMQConnectionFactory.createConnection(); //第三步:打开这个连接 connection.start(); //第四步:创建操作MQ的这个会话 /** * 第一个参数:是否使用事务 * 第二个参数:客户端的应答模式 * 第一种:自动应答 * 第二种:客户端手动应答 */ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //需要发送消息的目的地(下面创建的就应该是主题模型的地址) Destination destination=session.createTopic("topic222"); //生产者来生产这个消息 //要有生产者 MessageProducer messageProducer = session.createProducer(destination); //发送很多的消息到消息队列中去 for (int i=0;i<100;i++){ //需要准备发送的消息 TextMessage textMessage = session.createTextMessage("我是浅羽:"+i); //接下来就可以发送消息了 messageProducer.send(textMessage); } }}主题模型的消费者
package com.qy.mq.topic;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/** * @Auther: qianyu * @Date: 2020/11/04 15:19 * @Description: */public class Consumer { //准备发布的这个地址 private static final String PATH="tcp://10.7.182.87:61616"; //ActiveMQ下的用户名 private static final String userName="admin"; //ActiveMQ下的密码 private static final String password="admin"; public static void main(String[] args) throws JMSException { //第一步:创建连接的工厂 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(userName, password, PATH); //通过这个工厂获取连接 Connection connection = activeMQConnectionFactory.createConnection(); //第三步:打开这个连接 connection.start(); //第四步:创建操作MQ的这个会话 /** * 第一个参数:是否使用事务 * 第二个参数:客户端的应答模式 * 第一种:自动应答 * 第二种:客户端手动应答 */ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //需要发送消息的目的地(queue操作的是队列) Destination destination=session.createTopic("topic222"); //创建我们的消费者了 MessageConsumer messageConsumer = session.createConsumer(destination); //接下来就可以接收我们的消息了 //给定当前的路径设置监听器 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //我们知道是一个字符串类型的消息 TextMessage textMessage= (TextMessage) message; //接下来就可以打印这个消息了 try { System.out.println("消费者1---接收到的消息是:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } try { message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); }}本篇关于 ActiveMQ 的介绍就先到这里结束了,后续会出更多关于 ActiveMQ 系列更多文章,谢谢大家支持!
版权声明: 本文为 InfoQ 作者【浅羽技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/41803f95fd24b7ad156497e2b】。文章转载请联系作者。
浅羽技术
才疏学浅,习习而为,编程羽录,与你同行。 2019-02-26 加入
分享一些计算机信息知识、理论技术、工具资源、软件介绍、后端开发、面试、工作感想以及生活随想等一系列文章。










评论