RocketMQ 核心技术
一、技术架构
1.领域模型
如上图所示,Apache RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费这三部分。生产者生产消息并发送至 Apache RocketMQ 服务端,消息被存储在服务端的主题中,消费者通过订阅主题消费消息。
消息生产
生产者(Producer):
Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。
消息存储
主题(Topic):
Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
队列(MessageQueue):
Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
消息(Message):
Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。
消息消费
消费者分组(ConsumerGroup):
Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
消费者(Consumer):
Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
订阅关系(Subscription):
Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。
Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。
2.部署架构
角色
Producer 消息的发送者
Consumer 消息的消费者
Broker 代理服务器,消息中转角色,负责存储、转发消息。
NameServer 支持 Broker 的动态注册与发现及 Topic 的路由。
集群工作流程
1.启动 NameServer 后,NameServer 会监听端口,等待 Broker,Producer,Consumer 连上来。2.Broker 启动,跟所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储的所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
3.收到消息前,会先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
4.Producer 发送消息,启动时先跟 NameServer 集群中的其中一个建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列。然后与队列所在 Broker 建立长连接,从而向 Broker 发送消息。
5.Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接,开始消费消息。
二、消息存储
1.存储结构
CommitLog
当生产者将消息发送到 RocketMQ 的 Broker 之后,需要将消息进行持久化存储,防止消息数据丢失。RocketMQ 将消息数据写入存储文件 CommitLog 中,按照消息的发送顺序写入文件当中,每个文件的大小约为 1G,当达到文件大小限制后,就会创建新的 CommitLog 文件。RocketMQ 作为消息中间件来说,最主要的数据流程就是基于主题的发布 - 订阅模式进行消息的发布以及消费,那么当消费者根据自己订阅的 Topic 进行消息消费的时候,Broker 怎么在那么多的 CommitLog 文件中找到对应 Topic 的消息数据呢?
大家可以想一想,CommitLog 文件中的消息数据是一条一条顺序写的,最笨的方法就是遍历文件,作为一款高性能的消息中间件,显然这不是一个好的解决方案。就像从数据库查询数据的时候,遍历的效率肯定是很低的。那么我们可不可以借助数据库提升数据查询的方式,使用索引来加快消息数据的查询呢?答案是肯定的。就像 Mysql 中的索引本身需要文件保存一样,在 RocketMQ 中页有单独保存索引的文件,就是 ConsumerQueue 文件。
ConsumerQueue
在 RocketMQ 中,每个 Topic 对应多个 MessageQueue,每个 MessageQueue 对应一组 ConsumerQueue 文件索引文件。ConsumerQueue 文件中存储了消息相对于 CommitLog 文件的 offset 偏移量,CommitLog 文件本身实际上也是通过偏移量来进行命名如第一个文件是 0000000000000,那么第二文件就是消息总量之和 00000001232321,往后新的文件再进行累计。为什么这么做呢?主要就是在进行消息查找的时候根据消息的偏移量通过二分查找快速定位具体的 CommitLog 文件,提升消息查找效率。需要说明的是,Broker 在进行消息写入 CommitLog 文件中就会异步将其对应的偏移量写入 ConsumerQueue 文件中。
在 ConsumerQueue 文件中实际存储了 CommitLog 文件的 offset 偏移量、消息长度以及 tag 的 hashcode,组成 20 字节的 block 块。其在 Broker 上面的存储路径大致是:…/store/consumequeue/{topic}/{queueid}/{file}。其中 topic 就是生产中订阅的主题,因此消费者在消费消息的时候,Broker 会根据其对应的 Topic 找到对应的 ConsumerQueue 文件,进而找到其索引位置,再到 CommitLog 文件中直接定位具体的消息。
IndexFile
上文中除了构建消费队列的索引外,还同时为每条消息根据 MessageID, MessageKey 构建了索引到 IndexFile。这是为了方便快速快速定位目标消息而产生的,支持通过 MessageID 或者 MessageKey 来查询消息。
整合后模型图
2.顺序写
消息进入 RocketMQ 之后,消息数据是通过顺序写的方式落到 CommitLog 文件中的。那么这里面就涉及两个问题,为什么进行顺序写以及是不是直接写磁盘文件。
为什么要顺序写?
当新的数据到来时,只要在之前的文件末尾进行数据追加就可以,这样的数据写入效率要比随机写入的效率高。
每次数据写入的时候是直接写磁盘文件吗?
我们可以反过想,如果每次都是落盘写入的话实际效率是不高的,无法满足消息中间件这种高吞吐的性能要求。
因此 RocketMQ 实际是借助操作系统的 page cache 来提升写入效率的,消息并不是直接写入磁盘,认识先写入操作系统的 page cache,然后再通过异步刷盘的方式,写入 CommitLog 文件中,这样借助顺序写以及系统的 page cache 可以时间近乎内存的数据写入效率。
3.同步刷盘和异步刷盘
以上所说的异步写入,其实就是 RocketMQ 的异步刷盘模式。但是这个模式有一个缺陷,就是如果 Broker 宕机了,那么此时在 page cache 的消息数据是容易丢失的。
所以虽然异步刷盘的写入效率高但是也存在数据丢失的风险。
同步刷盘
在同步刷盘的场景下,当 Broker 接受到对应的消息之后,Broker 将会把这条消息刷入磁盘的 CommitLog 中,才会返回确认消息给生产者。如果在进行消息写入的时候 Broker 挂了,那么生产者会感受到消息投递失败,一般都会都有消息重新发送的重试逻辑。
这样消息不会丢失了,但是由于每次都是先落盘,就会导致数据写入性能下降。
4.零拷贝技术
RocketMQ 消息每次的发送与接收,实际上都是对磁盘数据的写入与读取,而磁盘 I/O 性能的瓶颈是比较低的。现在很多主流的消息中间件比如 Kafka、RocketMQ,它们能做到百万消息的高性能读写,其实背后都少不了零拷贝的实现。
传统文件 IO
Linux 分为用户空间和内核空间,应用程序运行在用户空间,操作系统和驱动程序运行在内核空间,应用程序要读取磁盘等硬件设备,必须要先经过内核空间的中转处理,用户空间是无法直接访问到硬件层的。因此对于传统 IO 的工作流程,数据读取和写入都必须经过用户空间和内核空间的多次复制。
这个过程发生了 4 次上下文切换、 4 次数据拷贝
mmap+write
把内核空间和用户空间的虚拟地址映射到同一个物理地址,从而减少数据拷贝次数!mmap 就是用了虚拟内存这个特点,它将内核中的读缓冲区与用户空间的缓冲区进行映射,所有的 IO 都在内核中完成。
调用过程:
用户进程通过
mmap方法
向操作系统内核发起 IO 调用,上下文从用户态切换为内核态。CPU 利用 DMA 控制器,把数据从硬盘中拷贝到内核缓冲区。
上下文从内核态切换回用户态,mmap 方法返回。
用户进程通过
write
方法向操作系统内核发起 IO 调用,上下文从用户态切换为内核态。CPU 将内核缓冲区的数据拷贝到的 socket 缓冲区。
CPU 利用 DMA 控制器,把数据从 socket 缓冲区拷贝到网卡,上下文从内核态切换回用户态,write 调用返回。
这个过程发生了 4 次上下文切换、 3 次数据拷贝,减少了一次 CPU 拷贝。
三、核心技术原理
1.顺序消息
顺序消息是指消息的消费顺序和产生顺序相同,顺序消息分为全局顺序消息和部分顺序消息:
全局顺序消息指某个 Topic 下的所有消息都要保证顺序;
部分顺序消息只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一个订单 ID 的消息能按顺序消费即可。
要保证全局顺序消息,需要先把 Topic 的读写队列数设置为 1,然后 Producer 和 Consumer 的并发设置也要是 1。简单来说,为了保证整个 Topic 的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。
部分消息有序
要保证部分消息有序,需要发送端和消费端配合处理。
在发送端,要做到把同一业务 ID 的消息发送到同一个 Message Queue。
通过 MessageQueueSelector 来实现分区的选择。
在消费过程中,要做到从同一个 Message Queue 读取的消息不被并发处理,这样才能达到部分有序。
通过使用 MessageListenerOrderly 类来解决单 Message Queue 的消息被并发处理的问题。
在 MessageListenerOrderly 的实现中,为每个 Consumer Queue 加个锁,消费每个消息前,需要先获得这个消息对应的 Consumer Queue 所对应的锁,这样保证了同一时间,同一个 Consumer Queue 的消息不被并发消费,但不同 Consumer Queue 的消息可以并发处理。
2.事务消息
事务消息发送流程:
1.发送方发送半事务消息
2.Broker 收到半事务消息存储后返回结果
3.发送半事务消息方处理本地事务
4.发送方把本地事务处理结果以消息形式发送到 Broker
事务消息回查流程(异步):
5.Broker 在固定的时间内(默认 60 秒)未收到
4 的确认消息,Broker 为发送方发送回查消息
6.业务发送发收到 Broker 回查消息后,查询本地业务执行结果 7.业务方发送回查结果消息
3.可靠消息
RockerMQ 默认提供了至少消费一次的消费语义来保证消息的可靠消费。
3.1.发送端可靠性
消息发送一般有以下几种方式:同步发送、异步发送以及单向发送。
同步发送是指发送端在发送消息时,阻塞线程进行等待,直到服务器返回发送的结果。发送端可以通过检查 Brocker 返回的状态来判断消息是否持久化成功。
异步发送是指发送端在发送消息时,传入回调接口实现类,消息发送结果会回传给相应的回调函数。具体的业务实现可以根据发送的结果信息来判断是否需要重试来保证消息的可靠性。
单向发送是指发送端发送完成之后,调用该发送接口后立刻返回,并不返回发送的结果。单向发送相对前两种发送方式来说是一种不可靠的消息发送方式,因此要保证消息发送的可靠性,不推荐采用这种方式来发送消息。
3.2.存储消息可靠性
同步刷盘可以保证数据绝对安全,但是吞吐量不大。
异步刷盘吞吐量大,性能高,但是 PageCache 中的数据可能丢失,不能保证数据绝对的安全。
实际应用中要结合业务场景,合理设置刷盘方式,尤其是同步刷盘的方式,由于频繁的触发磁盘写动作,会明显降低性能。
3.3.消费端消息可靠性
RocketMQ 默认采取“先消费,消费成功后再提交”的策略,由各自 consumer 业务方保证幂等来解决重复消费问题。
消费者从 RocketMQ 拉取到消息之后,需要返回消费成功来表示业务方正常消费完成。因此只有返回 CONSUME_SUCCESS 才算消费完成,如果返回 CONSUME_LATER 则会按照不同的 messageDelayLevel 时间进行再次消费,时间分级从秒到小时,最长时间为 2 个小时后再次进行消费重试,如果消费满 16 次之后还是未能消费成功,则不再重试,会将消息发送到死信队列,从而保证消息存储的可靠性。
4.死信队列
死信消息有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
一个死信队列对应一个 ConsumerGroup ID,一个死信队列包含了对应 Group 产生的所有死信消息,不论该消息属于哪个 Topic。
5.消息过滤
RocketMQ 的消费者可以根据 Tag 进行消息过滤,也支持自定义属性过滤。消息过滤目前是在 Broker 端实现的,优点是减少了对于 Consumer 无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。
6.流量控制
消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。
Apache RocketMQ 的消息流控触发条件如下:
存储压力大:若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
参考
版权声明: 本文为 InfoQ 作者【苏格拉格拉】的原创文章。
原文链接:【http://xie.infoq.cn/article/00286110c9c9808dfbd43aa32】。文章转载请联系作者。
评论