写点什么

Java 开发中常用的消息队列工具 ActiveMQ

作者:编程江湖
  • 2021 年 11 月 29 日
  • 本文字数:2980 字

    阅读完需:约 10 分钟

1 、简介 ActiveMQ


同类产品: RabbitMQ 、 Kafka、Redis(List)


1.1 对比 RabbitMQ 最接近的同类型产品,经常拿来比较,性能伯仲之间,基本上可以互相替代。最主要区别是二者的协议不同 RabbitMQ 的协议是 AMQP(Advanced Message Queueing Protoco),而 ActiveMQ 使用的是 JMS(Java Messaging Service )协议。顾名思义 JMS 是针对 Java 体系的传输协议,队列两端必须有 JVM,所以如果开发环境都是 java 的话推荐使用 ActiveMQ,可以用 Java 的一些对象进行传递比如 Map、BLob、Stream 等。而 AMQP 通用行较强,非 java 环境经常使用,传输内容就是标准字符串。


另外一点就是 RabbitMQ 用 Erlang 开发,安装前要装 Erlang 环境,比较麻烦。ActiveMQ 解压即可用不用任何安装。


1.2 对比 KafKaKafka 性能超过 ActiveMQ 等传统 MQ 工具,集群扩展性好。


弊端是:


在传输过程中可能会出现消息重复的情况,


不保证发送顺序


一些传统 MQ 的功能没有,比如消息的事务功能。


所以通常用 Kafka 处理大数据日志。


1.3 对比 Redis 其实 Redis 本身利用 List 可以实现消息队列的功能,但是功能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简单的场景可以使用。


2 安装 ActiveMQ 拷贝 apache-activemq-5.14.4-bin.tar.gz 到 Linux 服务器的/opt 下


解压缩 tar -zxvf apache-activemq-5.14.4-bin.tar.gz


重命名 mv apache-activemq-5.14.4 activemq


vim /opt/activemq/bin/activemq


查看 java 环境:vim /etc/profile 或者 echo $JAVA_HOME


增加两行


JAVA_HOME=”/opt/jdk1.8.0_152″


JAVA_CMD=”/opt/jdk1.8.0_152/bin”


注册服务


ln -s /opt/activemq/bin/activemq /etc/init.d/activemq(软连接必须使用绝对路径)


chkconfig –add activemq


禁止使用 cp /opt/activemq/bin/activemq /etc/init.d/activemq


启动服务


service activemq start


关闭服务


service activemq stop


通过 netstat 查看端口


netstat -tlnpt:表示 tcp


l:表示监听


n: 将 ip 和端口转换成域名和服务名


p:显示的程序名


activemq 两个重要的端口,一个是提供消息队列的默认端口:61616


另一个是控制台端口 8161


通过控制台测试


启动消费端


进入网页控制台


账号/密码默认: admin/admin


点击 Queues




观察客户端


3、在 Java 中使用消息队列 3.1 在 gmall-service-util 中导入依赖坐标


<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.2</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency>3.2 在 payment 项目中添加 producer 端


public class ProducerTest {


public static void main(String[] args) throws JMSException {// 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(“tcp://192.168.67.201:61616”);Connection connection = connectionFactory.createConnection();connection.start();// 创建 session 第一个参数表示是否支持事务,false 时,第二个参数 Session.AUTO_ACKNOWLEDGE 自动签收,Session.CLIENT_ACKNOWLEDGE 手动签收,DUPS_OK_ACKNOWLEDGE 订阅时签收其中一个// 第一个参数设置为 true 时,第二个参数可以忽略 服务器设置为 SESSION_TRANSACTEDSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列 Queue queue = session.createQueue(“Atguigu”);


MessageProducer producer = session.createProducer(queue);// 创建消息对象ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();activeMQTextMessage.setText(“hello ActiveMq!”);// 发送消息producer.send(activeMQTextMessage);producer.close();connection.close();
复制代码


}}


注意:如果有事务需要先提交事务 session.commit();


Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数


Number Of Consumers 消费者 这个是消费者端的消费者数量


Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减


Messages Dequeued 出了队列的消息 可以理解为是消费者消费掉的数量


总结:


当有一个消息进入这个队列时,等待消费的消息是 1,进入队列的消息是 1。当消息消费后,等待消费的消息是 0,进入队列的消息是 1,出队列的消息是 1.在来一条消息时,等待消费的消息是 1,进入队列的消息就是 2.


3.3 在 payment 项目中添加 consumer 端 public class ConsumerTest {


public static void main(String[] args) throws JMSException {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, “tcp://192.168.67.201:61616”);// 创建连接 Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列 Queue queue = session.createQueue(“Atguigu”);// 创建 ConsumerMessageConsumer consumer = session.createConsumer(queue);// 接收消息 consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// 参数就是收到的消息 if (message instanceof TextMessage){try {String text = ((TextMessage) message).getText();System.out.println(text+“接收的消息!”);} catch (JMSException e) {e.printStackTrace();}}}});consumer.close();session.close();}}


3.4 关于事务控制 producer 提交时的事务


事务开启 true


只执行 send 并不会提交到队列中,只有当执行 session.commit()时,消息才被真正的提交到队列中。


事务不开启 false


只要执行 send,就进入到队列中。


consumer 接收时的事务


事务开启,签收必须写


Session.SESSION_TRANSACTED


收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了 session.rollback()那么消息会释放,重新回到队列中被别的消费端再次消费。java培训


事务不开启,签收方式选择


Session.AUTO_ACKNOWLEDGE


只要调用 comsumer.receive 方法 ,自动确认。


事务不开启,签收方式选择


Session.CLIENT_ACKNOWLEDGE


需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。


这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。


手动签收


事务不开启,签收方式选择


Session.DUPS_OK_ACKNOWLEDGE


在 Topic 模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的 consumer 要可以处理重复提交的问题。


3.5 持久化与非持久化通过 producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置


持久化的好处就是当 activemq 宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。


持久化:当服务器宕机,消息依然存在。


非持久化:当服务器宕机,消息不存在。


在 zookeeper 中,有持久化-非持久化。

用户头像

编程江湖

关注

IT技术分享 2021.11.23 加入

还未添加个人简介

评论

发布
暂无评论
Java开发中常用的消息队列工具 ActiveMQ