写点什么

RocketMQ 详解系列

作者:牧小农
  • 2022 年 8 月 08 日
  • 本文字数:4171 字

    阅读完需:约 14 分钟

RocketMQ 详解系列

什么是 RocketMQ

RocketMQ 作为一款纯 java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。主要功能是异步解耦和流量削峰:。



常见的 MQ 主要有:ActiveMQ、RabbitMQ、Kafka、RocketMQ


四种消息中间件的基本介绍:



消息中间件的使用场景:


异步与解耦:


当我们下了一个订单之后,订单系统会进行 RPC 同步调用 支付系统、库存系统、物流系统等,那么系统之间就会有耦合性,耦合性越高的话,容错性就越低,比如我们的支付系统如果宕机了,就会导致我们整个交易的异常,从而影响用户的体验。


如果我们中间加入了消息中间件,不管是支付还是库存等系统,都是通过异步的方式进行调用的,如果其中一个系统宕机了,不会影响我们用户下单的使用。


本质上 MQ 第一步完成了 异步 ,第二步完成了 解耦 。那么系统的容错性就越高。



流量削峰:


流量削峰也可以叫削峰填谷,比如一些互联网公司大促场景,双十一、店庆或者秒杀活动,都会使用到消息中间件。


如果在不使用消息中间件或者没有流量削峰,每秒是很高的并发,这个时候如果我们的 A 系统,如果要将数据写入到我们的 MYSQL 中,受限于 MYSQL 本身服务的上限,最大我们只能每秒处理 200 请求,这个时候会有大量的消息进行堆积,从而导致 A 系统的奔溃。


这个时候我们可以将用户的请求消息通过 MQ 进行写入,因为消息中间件本身是对数据量处理比较高的一个系统,所以对于每秒 2000 请求,消息中间件可以处理,然后 A 系统作为消息中间件的一个消费者,以固定的速度从 MQ 中拉取 200 个消息,完成我们的业务操作,用时间换空间 从而确保我们 A 系统的稳定性。



数据分发:


如果 S 系统,在对系统进行开发的时候,需要对接多个(A、B、C、D)系统,使用传统的接口调用,中间有改动就需要修改我们的代码,当新增了 A 系统,我们需要去修改代码去调用 A 系统来完成对应的业务逻辑,如果我们当中的 D 系统需要移除, 同样也需要修改代码删除对应的接口调用。


如果 S 系统使用了消息中间件,我们 S 系统只要将消息交给 MQ,剩下的不论是新增还是移除,还是原有的,他们都只是消息中间件的一个消费者,这个时候我们就便于数据的分发。


比如我们新增一个系统,我们只需要新增一个 MQ 的消费者,直接从 MQ 里面拿消息就可以,当我们需要移除一个系统的时候,只需要取消对 MQ 消息的监听即可。对于我们原有的 S 系统不需要进行额外的修改。如果使用 MQ 作为数据分发,减少数据的修改,提高开发的效率。


RocketMQ 基本概念

RocketMQ 主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。这些角色通常以集群的方式存在,RocketMQ 基于纯 Java 开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。



对于 RockerMQ 而言,我们想要启动,必须首先启动 NameServer,在启动 Brober 主机, Brober 会向 NameServer 注册对应的路由和服务(Broker 地址、主体和),Producer 会进行路由的发现,向 NameServer 请求 Broker 路由信息,进行消息的发送。


作为 Consumer 要连通 NameServer,获取到相关的路由信息,方便我们进行消息的订阅。


Broker 也是一个很重要的角色,主要负责消息的存储,不管是生产消息还是订阅消息,消息的来源都是 Broker,一般来说消息的发送(Producer)只会发到主节点,然后 Broker 会进行消息的同步,同步到从节点,作为消费者(Consumer)也只会优先从 Master 节点,获取消息,进行消费,除非主节点不可用或者非常繁忙,才会从从节点进行消费,Broker 除了消息的中转,还负责消息的持久化以及主从数据之间的复制


NameServer:


NameServer 是一个服务与注册的发现中心。也是整个 RocketMQ 的“大脑”,所以 RocketMQ 需要先启动 NameServer 再启动 RocketMQ 中的 Broker


NameServer 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。NameServer底层由 Netty 实现,是内存式存储,所以 NameServer 中的 broker、topic 不会持久化。


NameServer 其角色类似 Dubbo 和 zookeeper,主要负责 Broker 的动态注册与发现。为什么不使用 zookeeper?rocketmq 主要是在分布式情况下使用追求性能,因为 zookeeper 最求最终一致性,所以在性能上会有所折扣。


Broker:


消息服务器(Broker)是消息存储中心,主要作用是接收来自 Producer 的消息并存储,Consumer 从这里取得消息。存储与消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。从部署结构图中可以看出 BrokerMasterSlave 两种类型, Master 既可以写又可以读,Slave 不可以写只可以读。


Producer:


Producer 也称为消息发布者(生产者),负责生产并发送消息至 Topic。生产者向 broker 发送由业务应用程序系统生成的消息。RocketMQ 提供了发送:同步、异步和单向(one-way)的多种范例。


Consumer:


也称为消息订阅者,负责从 Topic 接收并消费消息。消费者从 brokers 那里拉取信息并将其输入应用程序。从 Master 拿到消息,执行完成后,会发送一个消息给 Broker 进行确认,这个就是 ACK 确认

RocketMQ 基本概念

分组(Group)

Group 分为两个部分 生产者和消费者


  • 生产者: 表示发送同一类消息的 Producer,通常情况下发送逻辑是一致的。发送普通消息时,用于标识使用,没有特别的用处。

  • 主要用来作用于事务消息,当事务消息中某条消息一直处于等待状态并超时,Broker 会回查同一个 Group 下的其他 producer,确定该消息是 commit 还是 rollback

  • 消费者: 消费者的分组就非常有意义了,消费者是标识一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。同一个 Consumer Group 下的各个实例将共同消费 topic 的消息,起到负载均衡的作用。

  • 消费进度以 Consumer Group 为粒度管理,不同 Consumer Group 之间消费进度彼此不受影响,即消息 A 被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。

主体(Topic)

用来区分消息的种类,表示一类消息的逻辑名字,消息的逻辑管理单位,无论生产还是消费消息,都需要执行 Topic。


一个发送者可以发送消息给一个或者多个 Topic;


一个消息接受者可以订阅一个或多个 Topic 消息;

消息队列(Message Queue)

消息队列 简称 Queue ,消息物理管理单位。用来并行发送和接收消息,相当于是 Topic 的分区。


一个 Topic 会有若干个 Queue,消息的生产一般会比消息消费的速度要快,消息进行消费的时会有对应的业务逻辑进行处理,这个时候就会降低消息消费的速度。所有一般 Topic 会有若干个 Queue。主要用来解决生产很快,消费很慢。


如果同一个 Topic 创建在不同的 Broker,那么不同的 Broker 有不同的 Queue,将物理存储在不同的 Broker 节点之上,具有水平扩展的能力。无论是生产者还是消费者,实际的操作都是针对 Queue 级别。

标签(Tag)

RocketMQ 支持在发送时给 topic 的消息设置 tag,用于同一主题下区分不同类型的消息。


来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。比如有一个 Topic 消息为水果,那么水果可以有其他的标签 可以是 香蕉、西瓜、草莓等等,我们可以把对应的消息,打上对应的标签(Tag),这个就是方便我们在消费的时候做对应的筛选。


标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

偏移量(Offset)

在 RocketMQ 中,有很多 offset 的概念。一般我们只关心暴露到客户端的 offset。不指定的话,一般指的是消费者消息的偏移量(ConsumerOffset)


Message queue 是无限长的数组。一条消息进来下标就会涨 1,而这个数组的下标就是 offset。


Message queue 中的 max offset 表示消息的最大 offset,Consumer offset 可以理解为标记 Consumer Group 在一条逻辑 Message Queue 上,消息消费到哪里即消费进度。

RocketMQ 下载安装

下载地址:https://rocketmq.apache.org/dowloading/releases/



环境要求:


  • Windows/Linux 64 位系统

  • JDK1.8(64 位)

  • 源码安装需要安装 Maven 3.2.x


这里我们用 rocketmq-4.9.2 来做演示案例。设置环境变量:


变量名: ROCKETMQ_HOME


变量值: MQ 解压路径\MQ 文件夹名


启动

rocketmq-4.9.2\bin目录下,打开 cmd 窗口


先启动 nameServer,启动命令:start mqnamesrv.cmd


然后在启动 Broker,启动命令:start mqbroker.cmd -n 127.0.0.1:7906 autoCreateTopicEnable=true


管理端插件安装:


老版本地址下载:https://codeload.github.com/apache/rocketmq-externals/zip/master


新版本地址:https://github.com/apache/rocketmq-dashboard


启动完成之后,浏览器中输入‘127.0.0.1:8089’,成功后即可进行管理端查看。


消息发送

RocketMQ 提供的原生客户端的 API,当然除了原生客户端外,SpringBoot、SpringCloudStream 也进行了集成,但本质上这些也是基于原生 API 的封装,所以只需掌握原生 API,其他的也会水到渠成。


导入 MQ 客户端依赖


<dependency>  <groupId>org.apache.rocketmq</groupId>  <artifactId>rocketmq-client</artifactId>  <version>4.9.2</version></dependency>
复制代码


消息发送:


/** * 同步发送 */public class SyncProducer {    public static void main(String[] args) throws Exception{        // 实例化消息生产者Producer        DefaultMQProducer producer = new DefaultMQProducer("group_test");
// 设置NameServer的地址 producer.setNamesrvAddr("127.0.0.1:9876"); //producer.setSendLatencyFaultEnable(true);
// 启动Producer实例 producer.start();

for (int i = 0; i < 10; i++) { // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //如果不再发送消息,关闭Producer实例。 producer.shutdown(); }}
复制代码

总结

这篇主要是带大家了解 RocketMQ 的基本原理和介绍,在后面的章节中,会带大家深入了解和使用 RocketMQ,如果觉得文章有帮助的,记得点赞关注,您的支持是我创作的最大动力。


怕什么真理无穷,进一步有进一步的欢喜,大家加油!

发布于: 刚刚阅读数: 3
用户头像

牧小农

关注

业精于勤荒于嬉,行成于思毁于随。 2019.02.13 加入

公众号【牧小农】

评论

发布
暂无评论
RocketMQ 详解系列_RocketMQ_牧小农_InfoQ写作社区