写点什么

RocketMQ 原理—消息读写的性能优化

  • 2025-05-13
    福建
  • 本文字数:7863 字

    阅读完需:约 26 分钟

1.Producer 基于队列的消息分发机制


(1)消息是如何发送到 Broker 去的


Producer 发送消息时需要指定一个 Topic,需要知道 Topic 里有哪些 Queue,以及这些 Queue 分别分布在哪些 Broker 上。因此,Producer 发送消息到 Broker 的流程如下:

 

首先,Producer 会从 NameServer 中拉取 Topic 的路由信息,然后缓存本地的 Topic 缓存中,并每隔 30s 进行刷新。

 

然后,Producer 会对获取到的 Topic 的 Queues 进行负载均衡,选择其中的一个 Queue 并找出对应的 Broker 主节点。

 

最后,Producer 才会与这个 Broker 的主节点进行通信并发送消息过去。



(2)消息发送失败会如何处理


如果 Producer 往 Broker 的主节点写入消息失败,那么就会执行重试——重新选择一个 Queue 和 Broker 主节点。此外,故障的 Broker 在一段时间内也不会再次被选中。这就是重试机制 + 故障退避机制。

 

Broker 故障的延迟感知机制:Broker 故障后,虽然 NameServer 会通过心跳 + 定时任务可以感知并摘除掉它,但该故障的 Broker 信息无需实时通知 Producer。通过 Producer 端的重试机制 + 故障退避机制,就可以让 Broker 故障时也能做到高可用。



(3)发送消息时有哪些高阶特性可以使用


按照 Key 进行 Hash,比如让 orderId 相同的消息都进入同一个 Queue 里,以保证它们的顺序性。

 

2.Producer 基于 Hash 的有序消息分发


一个 Topic 会有很多个 Queue,这些 Queue 会落到各个 Broker 上。默认情况下,Producer 往一个 Topic 写入的数据,会均匀地分散到各个 Broker 的各个 Queue 里。所以各个 Broker 的各个 Queue 里都会有一些 Producer 的数据。

 

那么发送到 Topic 里的数据,是否是有顺序的?其实一个 Queue 就代表了一个队列,当消息进入到同一个 Queue 时,这些同一个 Queue 里的消息便是有顺序的,但是不同的 Queue 之间的消息则是没有顺序的。

 

如果需要让某一类数据有一定的特殊顺序性,比如同样 orderId 对应的多条消息(下单->支付->发券)是有顺序的,那么唯一的选择就是让同样 orderId 对应的所有消息都进入同一个 Queue,并保证它们在同一个 Queue 里是有顺序的。

 

具体的做法就是:根据消息的某个字段值对应的 Hash 值,对 Queue 的数量进行取模,按取模结果来选择 Queue。从而实现同样的字段值对应同样的 Queue,由此来实现顺序性。

 

3.Broker 如何实现高并发消息数据写入


写消息的方式有两种:一是随机写,二是顺序写。写消息的落地有两种:一是内存,二是磁盘。

 

(1)数据持久化


一般来说,如果要持久化保存写入的消息数据,消息必须要落地到磁盘。如果消息只落地到内存,为了避免内存里的数据丢失,此时就需要设计一套避免内存里数据不丢失的机制,而且这套机制一般都会基于 Write Ahead Log(预写日志),也是需要写磁盘的。所以写入 RocketMQ 的消息数据,都会写入到磁盘里来保证持久化存储。

 

(2)磁盘的随机写和顺序写


磁盘随机写:往磁盘文件里写的数据,其格式都是用户自定义的。用户每次写入数据都需要找到磁盘文件里的某个位置(在磁盘文件里随机寻址),才能在那个位置里插入最新的数据。

 

磁盘顺序写:每次写入数据都是在一个文件末尾去进行追加,绝对不会随机在文件里寻址来找到一个中间的位置进行插入。

 

磁盘文件随机写的性能:几十 ms 到几百 ms。

 

磁盘文件顺序写的性能:约等同于在内存里随机写数据,毫秒级,低于 1ms 或几 ms。

 

(3)内存的随机写和顺序写


如果把数据往内存里写,也分顺序写和随机写。

 

内存随机写:内存也是有地址空间的,在内存里随机写需要在内存地址里随机寻址,然后再去插入数据。

 

内存顺序写:在一块连续的内存空间里,顺序地追加数据,避免了内存地址的随机寻址。

 

(4)RocketMQ 的消息写入


首先会将所有消息数据都顺序写入到 CommitLog 磁盘文件。1 个 CommitLog 的大小为 1GB,写满一个 CommitLog 就切换下一个 CommitLog。这时 CommitLog 便会包含很多种数据,所以需要区分一条消息到底是属于哪个 Topic 哪个 Queue 的。

 

然后 RocketMQ 会有一个后台线程负责对 CommitLog 里的数据进行转发。该后台线程会监听 CommitLog 里的最新数据写入,负责把消息写入到所属的 Topic 对应的 Queue 里面。这个 Queue 本身也是属于一个磁盘文件,而 Queue 里面的内容则是每条数据在 CommitLog 磁盘文件里的 offset 偏移量。

 

RocketMQ 便是通过以上的方式来实现消息数据的高并发写入和存储。


 

4.RocketMQ 读写队列的运作原理分析


多个 Consumer 会组成一个消费者组来消费同一个 Topic,该 Topic 里的 ReadQueue 会均匀分配给同一个消费者组里的各个 Consumer。

 

创建一个 Topic 时会默认 4 个 WriteQueue 和 4 个 ReadQueue。其中 WriteQueue 会和磁盘里的文件对应起来,属于物理概念,而 ReadQueue 则属于虚拟概念。通常来说 WriteQueue 和 ReadQueue 会一一对应。

 

WriteQueue 用于 Producer 写消息时进行路由,ReadQueue 用于 Consumer 读消息时进行路由。

 

WriteQueue 其实只是对于 Producer 而言的,实际上只是逻辑上的一个名字而已。在物理上,WriteQueue 对应的是 ConsumeQueue。

 

RocketMQ 设计出 WriteQueue 和 ReadQueue 的原因是为了保持灵活性,方便扩容和缩容。

 

如果创建 Topic 时配置了 4 个 WriteQueue、8 个 ReadQueue:由于 WriteQueue 和 ReadQueue 是一一对应的,当这 8 个 ReadQueue 均匀下发到 4 个 Consumer 时,可能会导致某 Consumer 分到的 ReadQueue 是完全没有数据的。

 

如果创建 Topic 时配置了 8 个 WriteQueue、4 个 ReadQueue:由于 WriteQueue 和 ReadQueue 是一一对应的,可能会导致只有 4 个 WriteQueue 有对应的 ReadQueue 会进行下发到 Consumer,剩余 4 个 WriteQueue 的数据没法给到 Consumer 进行消费。

 

5.Consumer 拉取消息的流程原理分析


基于 RocketMQ 编写 Consumer 代码时,用户一般会定义一个 ConsumerListener 回调监听函数,并在函数里添加处理消息的具体逻辑。这样,当 Consumer 拉取到消息后,会调用用户定义的 ConsumerListener 回调监听函数,从而实现对消息的处理。

 

当 Consumer 分配到 Topic 的某些 ReadQueue 之后,会有一个专门的线程处理 ReadQueue 里的消息。这个线程叫做 PullMessageService,它会和对应的 Broker 建立好网络连接,然后不停地循环拉取 ReadQueue 里的消息。Consumer 和某 Broker 建立好连接后,便可以感知分配给该 Consumer 的 ReadQueue 所映射的 WriteQueue 里最新的数据。Broker 会根据最新数据的 offset 偏移量,从 CommitLog 中获取完整的消息数据,最后通过网络发送给 Consumer。

 

Consumer 的 PullMessageService 线程拉取到消息之后,会将消息写入一个叫 ProcessQueue 的内存数据结构中,这个 ProcessQueue 数据结构的作用其实是用来对消息进行中转用的。完成中转后,Consumer 便能够开辟多个 ConsumeMessageThread 线程(即消息消费线程),来对消息进行消费。ConsumeMessageThread 线程从 ProcessQueue 读取到消息后,便会调用用户实现的 consumeMessage()方法。这个 consumeMessage()方法就是用户自定义 ConsumerListener 回调监听函数里的方法,里面便是用户对消息的业务逻辑处理。

 

如果执行用户的业务逻辑处理成功了,便会返回 SUCCESS 标记给 ConsumeMessageThread 线程。如果执行用户的业务逻辑处理失败了,便会返回 RECONSUME_LATER 标记给 ConsumeMessageThread 线程。

 

当 ConsumeMessageThread 线程收到执行用户的业务逻辑处理的 SUCCESS 标记后,Consumer 便会上报某消息处理成功到 Broker 的 WriteQueue,这样下次该消息就不会重复分发给 Consumer 去消费了。

 

当 ConsumeMessageThread 线程收到执行用户的业务逻辑处理的 RECONSUME_LATER 标记后,Consumer 便会上报某消息处理失败到 Broker 的 WriteQueue,这样下次该消息就会继续分发给 Consumer 去消费。


 

6.ConsumeQueue 的随机位置读取需求分析


ConsumeQueue 应该如何设计才能让 Consumer 高性能地读取一条一条的消息?

 

首先,一个 Topic 是可以给多个 ConsumerGroup 去进行消费的。比如对于一个有 8 个 Queue 的 TestTopic,业务系统 A 和业务系统 B 都可以订阅 TestTopic 分别进行各自消费。其中业务系统 A 部署了 3 台机器,每台机器就是一个 Consumer,那么这 8 个 Queue 会分配给这 3 台机器去消费。而业务系统 B 部署了 5 台机器,每台机器就是一个 Consumer,那么这 8 个 Queue 会分配给这 5 台机器去消费。

 

然后,不同的 ConsumerGroup 对一个 Queue 的消费进度是不一样的。有的 ConsumerGroup 可能已经消费这个 Queue 的 500 条消息,有的 ConsumerGroup 可能只消费这个 Queue 的 100 条消息。

 

所以 Queue 需要能够随时根据要消费的消息序号,定位到某条消息在磁盘文件里的不同位置,然后从该位置去进行读取。

 

针对这样的读取需求,ConsumeQueue 磁盘文件应该如何设计才能支持高效的磁盘位置定位以及读取?

 

7.ConsumeQueue 的物理存储结构设计


首先会有一个目录来存放 ConsumeQueue 磁盘文件,比如~/topicName/queueId/多个磁盘文件,可见每个 Topic 都会有属于自己的目录。

 

然后每个 ConsumeQueue 磁盘文件里都会存储一条一条的消息数据,每条消息数据在磁盘文件里存储的内容如下:


8个字节的offset + 4个字节的消息大小 + 8个字节的Tag的Hash值
复制代码



按上述方式设计 ConsumeQueue 消息内容的好处是定长,可以让 ConsumeQueue 的每条消息在 ConsumeQueue 磁盘文件里存储的大小是固定的 20 字节。这样每个 ConsumeQueue 磁盘文件最多 30 万条数据,只占 5.72MB 大小。当消息定长和磁盘文件固定大小后,就可以方便快速地根据逻辑偏移量在 ConsumeQueue 里定位和读取某条消息,从而快速地获取物理偏移量,然后再从 CommitLog 里定位和读取到具体的消息。


8.ConsumeQueue 如何实现高性能消息读取


(1)从 ConsumeQueue 高效读取消息


ConsumerGroup 读取的消息,都会有一个自己的逻辑 offset。逻辑 offset 可以认为是逻辑上的偏移量,指的是 Queue 里的第几条消息。ConsumerGroup 里的一个 Consumer 会负责读取某个 Queue 里的消息。当 Consumer 从 Queue 里读取消息时,它会知道需要读取的是这个 Queue 在逻辑上的某个 offset,也就是第几条消息。

 

因为消息是定长的,而且 ConsumeQueue 磁盘文件也是固定大小的,以及会最多存放 30 万条消息。所以当 ConsumerGroup 的一个 Consumer 需要读取 ConsumeQueue 里的某逻辑偏移量 offset 位置的消息时,就会根据该消息的逻辑偏移量 offset,去定位到 ConsumeQueue 磁盘文件中的位置((offset - 1) * 20 字节)作为起始位置,然后再连续读取 20 字节即可,这样就可以快速地将消息读取出来。

 

这就是 ConsumeQueue 的高效率、高性能的读取方式,无需遍历读取磁盘文件里一条一条的消息来进行查找,类似于根据索引去数组定位元素。

 

(2)从 CommitLog 高效读取消息


一个 CommitLog 的大小就是 1G。CommitLog 的文件名就是这个文件里的第一条消息在整个 CommitLog 所有文件组成的文件组里的一个总的物理偏移量。也就是 Broker 会将 CommitLog 文件里第一条消息的这个总物理偏移量作为该 CommitLog 的文件名。比如:


00000000000000000000、00000000000004545345
复制代码


当 Broker 从 ConsumeQueue 里将消息在 CommitLog 的偏移量 offset + 大小 size 读取出来后,就可以在所有的 CommitLog 文件里根据偏移量 offset 进行二分查找,便能确定该消息是存在于哪个 CommitLog 文件里。然后用偏移量 offset 对 CommitLog 文件名进行减法运算,便能知道消息在该 CommitLog 文件里的起始位置。最后从该 CommitLog 文件的对应起始位置开始读取 size 大小的内容,便能获取到该消息。

 

(3)两次随机磁盘读


Broker 读取一条消息时只需要两次随机磁盘读。

 

第一次的随机磁盘读是针对 ConsumeQueue 文件,根据逻辑偏移量计算出具体位置后,再进行随机定位的读取。

 

第二次的随机磁盘读是针对 CommitLog 文件,根据读出的物理偏移量利用二分查找定位具体位置,再进行随机定位的读取。

 

9.CommitLog 基于内存的高并发写入优化


(1)Broker 写入性能优化


一.往 CommitLog 写入消息时会基于磁盘顺序写来提升写入的性能

二.往 ConsumeQueue 写入消息时会基于异步转发写入机制来提升写入的性能

 

(2)Broker 读取性能优化


一.从 ConsumeQueue 中读取消息时会基于定长消息 + 定长文件实现消息的一次定位和读取

二.从 CommitLog 中读取消息时会基于文件名(第一条消息的总物理偏移量)+消息偏移量实现消息的一次定位和读取

 

(3)基于内存来提升读取和写入的性能


无论是写入还是读取,此时最大的问题就是需要对物理磁盘文件进行顺序写或随机读。所以优化的方向就是:基于内存来进一步提升 Broker 的整体写入性能和读取性能。

 

实际上,RocketMQ 会基于内存来提升 CommitLog 的写入性能。虽然磁盘顺序写已经比磁盘随机写好很多,但也比内存写差。

 

具体上,Broker 会基于 MappedFile 这种 mapping 文件内存映射机制,实现把消息数据先写入到内存、再从内存刷到磁盘。MappedFile 可以把磁盘文件映射成一个内存空间,这里的内存是操作系统的 PageCache。

 

所以当 Broker 往 CommitLog 写入消息时,会先写入到 CommitLog 磁盘文件映射的操作系统的 PageCache 中,而 PageCache 里的消息会在后续通过异步刷盘写入到 CommitLog 磁盘文件里。



(4)Broker 写入性能优化的总结


一.往 CommitLog 写入消息时,会通过顺序写内存 + 异步顺序刷磁盘来提升性能

二.往 ConsumeQueue 写入消息时,会基于异步转发的写入机制来提升性能

 

10.Broker 数据丢失场景以及解决方案


(1)Broker 数据丢失场景分析


第一种情况:由于 RocketMQ 是用 Java 开发的中间件系统,Broker 启动后就是一个 JVM 进程。所以如果 Broker 这个 JVM 进程突然崩溃了,那么此时仅仅是 JVM 进程没了,其写到 PageCache 里的数据由于是 OS 管理的,因此数据不会丢失。这种情况发生的概率还是比较高的。

 

第二种情况:Broker 这个 JVM 进程所在的服务器故障宕机了,此时就可能导致 Broker 写入到 PageCache 里的数据丢失。这种情况发生的概率还是非常低的。

 

(2)解决方案


将异步刷盘改成同步刷盘:Broker 将消息顺序写入 PageCache 后,就等操作系统将 PageCache 的数据同步到磁盘文件后再返回响应给 Producer。

 

11.PageCache 内存高并发读写问题分析


如果可以忽略在小概率下的一点点数据丢失问题,将顺序写磁盘文件换成顺序写内存,那么就可以明显提升性能和吞吐量。而且即便是丢失数据,也是默认丢失 500ms 内的数据。

 

当 Producer 和 Consumer 都在高并发地往 Broker 写消息和读消息时:PageCache 的内存数据可能会出现一个经典的问题,就是 RocketMQ 里的 Broker Busy 异常,也就是 Broker 过于繁忙,这会导致一些操作阻塞甚至失败。

 

Broker Busy 就是在高并发的读写情况下,出现的竞争同一块 PageCache 数据太频繁太激烈的问题。为了解决这个问题,RocketMQ 提供了一个叫 TransientStorePoolEnabled 的机制。

 

12.基于 JVM OffHeap 的内存读写分离机制


TransientStorePoolEnabled 机制,可以理解为瞬时存储池启用机制。如果对 Broker 的读写压力真的大到出现 Broker Busy 异常,那么通过开启瞬时存储池,就可以实现内存级别的读写分离模式。

 

一般而言,在一个服务器上部署一个 Java 系统后,这个系统会作为一个 JVM 进程运行在操作系统上。其使用的内存会分为三种:第一种是 JVM Heap 内存(即 JVM 管理的堆内存),第二种是 OffHeap 内存(即 JVM 堆外的内存),第三种是 PageCache 内存(即由操作系统管理的页缓存)。

 

所以如果开启了 TransientStorePoolEnabled,那么 Broker 在写消息时,就会先把消息写到 JVM OffHeap 堆外内存里。然后会有一个额外的后台线程每隔一段时间定时把 JVM OffHeap 堆外内存里的数据写入到 PageCache 中。这样高并发写消息便往 JVM OffHeap 堆外内存里写,高并发读消息便从操作系统的 PageCache 中读,从而实现内存级别的读写分离。

 

所以,RocketMQ 为了解决高并发场景下对 PageCache 竞争读写导致的 Broker Busy 问题,引入了 JVM OffHeap 做了缓存分离,实现了内存级别的读写分离,解决了对一块内存空间的写和读出现频繁竞争的问题。

 

13.JVM OffHeap + PageCache 的数据丢失问题


系统设计里,凡事皆有利弊,没有什么方案是十全十美的。为了解决一个问题,往往会引入新的问题。

 

RocketMQ 为了解决高并发场景下对 PageCache 竞争读写导致的 Broker Busy 问题,引入了 JVM OffHeap 做了缓存分离,实现了内存级别的读写分离,解决了对一块内存空间的写和读出现频繁竞争的问题。但这会大大提高数据丢失的风险,数据丢失的情况主要有两种。

 

情况一:Broker JVM 进程关闭。比如 Broker 崩溃宕机、JVM 进程意外退出,或者正常关闭 Broker JVM 进程进行重启等。由于 OffHeap 堆外内存是由 JVM 管理的,所以 OffHeap 堆外内存的数据此时会丢失,而 OS 管理的 PageCache 则不会丢失。

 

情况二:Broker 所在的服务器宕机。此时 OffHeap 堆外内存和 PageCache 里的数据都会丢失。

 

因此没有一个技术方案是完美的,只能抓住当前场景里的主要矛盾。如果是金融级的数据绝对不能丢失,可能就要牺牲性能和吞吐量,让数据的每一次写入都直接刷盘到磁盘文件。如果是大部分的普通情况,数据允许丢一点点,也就在服务器宕机的极端场景下才会丢几百毫秒的数据,保持默认即可。

 

RocketMQ 默认就是只写 PageCache + 异步刷盘,如果出现高并发竞争 PageCache 的问题,那么可以开启写 JVM OffHeap。容忍一定的 JVM 崩溃也丢失一点数据,但实现了利用缓存分离抗高并发读写。

 

14.ConsumeQueue 异步写入失败的恢复机制


(1)ConsumeQueue 异步写入消息的两个线程


后台线程一:将写入到 PageCache 的数据异步刷盘到 CommitLog 磁盘文件里

后台线程二:监听 CommitLog 磁盘文件的最新数据写入到 ConsumeQueue 磁盘文件里

 

(2)ConsumeQueue 异步写入失败有两种情况


情况一:消息进入到 PageCache 时,Broker 对应的 JVM 进程就宕机了,对应上述的两个后台线程便停止工作。此时只要 Broker 对应的 JVM 进程重启后,便会继续让上述两个线程处理 PageCache 里的消息。

 

情况二:消息进入到 CommitLog 磁盘文件时,Broker 对应的 JVM 进程就宕机了,对应上述的两个后台线程便停止工作。此时只要 Broker 对应的 JVM 进程重启后,负责监听 CommitLog 磁盘文件的后台线程也会继续处理里面的新增消息。而且 Broker 重启时也会对比 CommitLog 的最新数据和 ConsumeQueue 的最新数据,保证被中断的消息写入能正常恢复。


 

15.Broker 写入与读取流程性能优化总结


(1)写入流程的优化


一.默认先写入 OS 的 PageCache 后就直接返回成功了,优化成内存级的顺序写


这里会基于 MappedFile 机制来实现,将磁盘文件映射为一块 OS 的 PageCache 内存,让写文件等同于写内存。其中的亮点是基于 OS 的 PageCache 来写入数据,Broker 的 JVM 进程崩溃(高概率)是不会导致 PageCache 的数据丢失的。只有服务器崩溃的小概率极端场景才会导致几百毫秒内写入的数据会丢失,所以丢数据的概率是很低的。

 

二.对 ConsumeQueue 文件和 IndexFile 文件的写入,是通过异步来进行写入的


虽然将消息写入这两种文件时是异步写的,但只要数据还在 CommitLog 中没有丢失,那么即便异步写入失败也没影响。

 

(2)存储结构的优化


一.ConsumeQueue 文件的存储结构是为了能够实现高性能的读取而设计的


在 ConsumeQueue 文件里存储的每条消息都是定长的 20 字节,每个 ConsumeQueue 文件满了是 30 万条消息,约 5.72MB。此外,一个 Topic 目录会有多个 MessageQueue 目录,一个 MessageQueue 目录会有多个 ConsumeQueue 磁盘文件。

 

二.CommitLog 文件默认满了就是 1GB


CommitLog 的物理存储结构核心就是其文件名,每一条消息都会有一个在所有 CommitLog 里的总的物理偏移量,每个文件的名称就是文件里第一条消息在所有 CommitLog 里的总物理偏移量。

 

(3)读取流程的优化


一.根据消息的逻辑偏移量 offset 来定位哪个磁盘文件的哪个物理位置


通过第一次定位,就能找到这条消息在 CommitLog 里的物理偏移量 offset。然后再通过第二次定位,也就是根据物理偏移量利用二分查找去对应的 CommitLog 中便能读取出该消息。

 

二.高并发对 PageCache 进行读写竞争时可能会出现 Broker Busy 问题


此时可以通过开启 TransientStorePoolEnabled,也就是启用 JVM OffHeap 堆外内存,来实现内存级的读写分离。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18691275

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
RocketMQ原理—消息读写的性能优化_RocketMQ_量贩潮汐·WholesaleTide_InfoQ写作社区