写点什么

🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」系统服务底层原理以及高性能存储设计分析

作者:浩宇天尚
  • 2022 年 1 月 23 日
  • 本文字数:9676 字

    阅读完需:约 32 分钟

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」系统服务底层原理以及高性能存储设计分析

设计背景

消息中间件的本身定义来考虑,应该尽量减少对于外部第三方中间件的依赖。一般来说依赖的外部系统越多,也会使得本身的设计越复杂,采用文件系统作为消息存储的方式。

RocketMQ 存储机制

消息中间件的存储一般都是利用磁盘,一般是使用机械硬盘,但机械硬盘的速度比访问内存慢了 n 个数量级,一款优秀的消息中间件必然会将硬件资源压榨到极致,接下来看看 rocketMq 是如何做到高效存储的。

RocketMQ 存储模型

CommitLog

消息主体以及元数据的存储媒介,存储 Producer 端写入的消息主体内容。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如,00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

ConsumeQueue

  • 消息消费的逻辑队列,其中包含了这个 MessageQueue 在 CommitLog 中的起始物理位置偏移量 offset,消息实体内容的大小和 Message Tag 的哈希值。

  • 实际物理存储来说,ConsumeQueue 对应每个 Topic 和 QueueId 下面的文件,单个文件大小约 5.72M,每个文件由 30W 条数据组成,每个文件默认大小为 600 万个字节,当一个 ConsumeQueue 类型的文件写满了,则写入下一个文件;

IndexFile

生成的索引文件提供访问服务,通过消息 Key 值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引;

MapedFileQueue

对连续物理存储的抽象封装类,可以通过消息存储的物理偏移量位置快速定位该 offset 所在 MappedFile(具体物理存储位置的抽象)、创建、删除 MappedFile 等操作;

MappedFile

文件存储的直接内存映射业务抽象封装类,通过操作该类,可以把消息字节写入 PageCache 缓存区(commit),或者原子性地将消息持久化的刷盘(flush);

RocketMQ 消息架构

集群有一个 Broker,Topic 为 binlog 的队列(Consume Queue)数量为 4,如下图所示,按顺序发送消息。Consumer 端线程采用的是负载订阅的方式进行消费。

Commit Log 和 Consume Queue

消息发送流程架构

发送到相关服务节点

生产到消费的转换

总体核心流程

RocketMQ 的消息整体是有序的,所以消息按顺序将内容持久化在 Commit Log 中。Consume Queue 则是用于将消息均衡地按序排列在不同的逻辑队列,集群模式下多个消费者就可以并行消费 Consume Queue 的消息。


  1. MappedFile 所有的 topic 数据都写到同一个文件中,文件的大小默认为 1G,使用 mmap 与磁盘文件做映射,初始化时使用 mlock 将内存锁定,防止 pagecache 被 os 交换到 swap 区域。数据是顺序写,数据写满后自动创建下个 MappedFile 顺序写入。

  2. MappedFileQueue MappedFile 的队列,存储封装了所有的 MappedFile 实例。

  3. CommitLog 封装了写入消息和读取消息的实现,根据 MappedFileQueue 找到正在写的 MappedFile,之后将消息写入到 pagecache,最后同步到硬盘上。

  4. ConsumerQueue 一个 topic 可以设置多个 queue,每个 consumerQueue 对应一个 topic 下的 queue,相当于 kafka 里的 partition 概念。里面存储了 msg 在 commitLog 中的 offset、size、tagsCode,固定长度是 20 字节,consumer 可以根据消息的 offset 在 commitLog 找到具体的消息。


详细分析 MQ 发送和消费流程

消息生产和消费通过 CommitLog

生产者发送消息最终写入的是 CommitLog(消息存储的日志数据文件),Consumer 端先从 ConsumeQueue(消息逻辑队列)读取持久化消息的起始物理位置偏移量 offset、大小 size 和消息 Tag 的 HashCode 值,随后再从 CommitLog 中进行读取待拉取消费消息的真正实体内容部分;

IndexFile(索引文件)

为了消息查询提供了一种通过 key 或时间区间来查询消息的方法, 通过 IndexFile 来查找消息的方法不影响发送与消费消息的主流程。

RocketMQ 的 CommitLog 文件采用混合型存储

所有 Topic 下的消息队列共用同一个 CommitLog 的日志数据文件,并通过建立类似索引文件—ConsumeQueue 的方式来区分不同 Topic 下面的不同 MessageQueue 的消息,同时为消费消息起到一定的缓冲作用。


  • 只有 ReputMessageService 异步服务线程通过 doDispatch 异步生成了 ConsumeQueue 队列的元素后,Consumer 端才能进行消费。

  • 只要消息写入并刷盘至 CommitLog 文件后,消息就不会丢失,即使 ConsumeQueue 中的数据丢失,也可以通过 CommitLog 来恢复。

RocketMQ 顺序读写

  • 发送消息时,生产者端的消息确实是顺序写入 CommitLog;

  • 消费消息时,消费者端也是顺序读取 ConsumeQueue,根据其中的起始物理位置偏移量 offset 读取消息是随机读取 CommitLog。


在 RocketMQ 集群整体的吞吐量、并发量非常高的情况下,随机读取文件带来的性能开销影响还是比较大的

RocketMQ 存储架构的优缺点:

优点:
  1. ConsumeQueue 消息逻辑队列较为轻量级;

  2. 磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致 IOWAIT 增高;

缺点:
  1. CommitLog 来说写入消息虽然是顺序写,但是读却变成了完全的随机读;

  2. Consumer 端订阅消费一条消息,需要先读 ConsumeQueue,再读 Commit Log,一定程度上增加了开销;

RocketMQ 存储模型

RocketMQ 文件存储模型,根据类别和作用从概念模型上大致可以划分为 5 层



  1. RocketMQ 业务处理器层: Broker 端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操作(根据解析 RemotingCommand 中的 RequestCode 来区分具体的业务操作类型,进而执行不同的业务处理流程),比如前置的检查和校验步骤、构造 MessageExtBrokerInner 对象、decode 反序列化、构造 Response 返回对象等;

  2. RocketMQ 数据存储组件层:该层主要是 RocketMQ 的存储核心类—DefaultMessageStore,其为 RocketMQ 消息数据文件的访问入口,通过该类的“putMessage()”和“getMessage()”方法完成对 CommitLog 消息存储的日志数据文件进行读写操作(具体的读写访问操作还是依赖下一层中 CommitLog 对象模型提供的方法);在该组件初始化时候,还会启动很多存储相关的后台服务线程:

  3. AllocateMappedFileService(MappedFile 预分配服务线程)

  4. ReputMessageService(回放存储消息服务线程)

  5. HAService(Broker 主从同步高可用服务线程)

  6. StoreStatsService(消息存储统计服务线程)

  7. IndexService(索引文件服务线程)等;

  8. RocketMQ 存储逻辑对象层: 该层主要包含了 RocketMQ 数据文件存储直接相关的三个模型类 IndexFile、ConsumerQueue 和 CommitLog。

  9. IndexFile 为索引数据文件提供访问服务

  10. ConsumerQueue 为逻辑消息队列提供访问服务

  11. CommitLog 则为消息存储的日志数据文件提供访问服务。


这三个模型类也是构成了 RocketMQ 存储层的整体结构;


  1. 封装的文件内存映射层: RocketMQ 主要采用 JDK NIO 中的 MappedByteBuffer 和 FileChannel 两种方式完成数据文件的读写。

  2. 采用 MappedByteBuffer 这种内存映射磁盘文件的方式完成对大文件的读写,在 RocketMQ 中将该类封装成 MappedFile 类。

  3. 对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个 IndexFile 文件大小约为 400M、单个 ConsumerQueue 文件大小约 5.72M、单个 CommitLog 文件大小为 1G),其中每个分隔文件的文件名为前面所有文件的字节大小数+1,即为文件的起始偏移量,从而实现了整个大文件的串联。


每一种类的单个文件均由 MappedFile 类提供读写操作服务(其中,MappedFile 类提供了顺序写/随机读、内存数据刷盘、内存清理等和文件相关的服务);


  1. 磁盘存储层: 主要指的是部署 RocketMQ 服务器所用的磁盘。

RocketMQ 存储技术

主要采用 mmap 与 PageCache,其中 mmap 内存映射技术—Java 中的 MappedByteBuffer。

先简单介绍下 mmap

mmap 一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上。内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。


mmap 内存映射和普通标准 IO 操作的本质区别在于它并不需要将文件中的数据先拷贝至 OS 的内核 IO 缓冲区,而是可以直接将用户进程私有地址空间中的一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件读/写操作一样。


只有当缺页中断发生时,直接将文件从磁盘拷贝至用户态的进程空间内,只进行了一次数据拷贝。对于容量较大的文件来说(文件大小一般需要限制在 1.5~2G 以下,这也是 CommitLog 设置成 1G 的原因),采用 Mmap 的方式其读/写的效率和性能都非常高。



  • RocketMq 默认的文件大小为 1G,即将 1G 的文件映射到物理内存上。但 mmap 初始化时只是将文件磁盘地址和进程虚拟地址做了个映射,并没有真正的将整个文件都映射到内存中,当程序真正访问这片内存时产生缺页异常,这时候才会将文件的内容拷贝到 page cache。



如果一开始只是做个映射,而到具体写消息时才将文件的部分页加载到 pagecache,那效率将会是多么的低下。MappedFile 初始化的操作是由单独的线程(AllocateMappedFileService)实现的,就是对应的生产消费模型。RocketMq 在初始化 MappedFile 时做了内存预热,事先向 page cache 中写入一些数据 flush 到磁盘,使整个文件都加载到 page cache 中。

MappedByteBuffer 技术分析

MappedByteBuffer 继承自 ByteBuffer,其内部维护了一个逻辑地址变量—address。在建立映射关系时,


MappedByteBuffer 利用了 JDK NIO 的 FileChannel 类提供的 map()方法把文件对象映射到虚拟内存。


源码中 map()方法的实现,可以发现最终其通过调用 native 方法 map0()完成文件对象的映射工作,同时使用 Util.newMappedByteBuffer()方法初始化 MappedByteBuffer 实例,但最终返回的是 DirectByteBuffer 的实例。


在 Java 程序中使用 MappedByteBuffer 的 get()方法来获取内存数据是最终通过 DirectByteBuffer.get()方法实现(底层通过 unsafe.getByte()方法,以“地址 + 偏移量”的方式获取指定映射至内存中的数据)。

使用 Mmap 的限制
  • mmap 映射的内存空间释放的问题


由于映射的内存空间本身就不属于 JVM 的堆内存区(Java Heap),因此其不受 JVM GC 的控制,卸载这部分内存空间需要通过系统调用 unmap()方法来实现。


然而 unmap()方法是 FileChannelImpl 类里实现的私有方法,无法直接显示调用。RocketMQ 中的做法是,通过 Java 反射的方式调用“sun.misc”包下的 Cleaner 类的 clean()方法来释放映射占用的内存空间;


  • MappedByteBuffer 内存映射大小限制


因为其占用的是虚拟内存(非 JVM 的堆内存),大小不受 JVM 的-Xmx 参数限制,但其大小也受到 OS 虚拟内存大小的限制。一般来说,一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了;


  • 使用 MappedByteBuffe 的其他问题


会存在内存占用率较高和文件关闭不确定性的问题;

OS 的 PageCache 机制

PageCache 是 OS 对文件的缓存,用于加速对文件的读写。程序对文件进行顺序读写的速度几乎接近于内存的读写访问,这里的主要原因就是在于 OS 使用 PageCache 机制对读写访问操作进行了性能优化,将一部分的内存用作 PageCache。

对于数据文件的读取

如果一次读取文件时出现未命中 PageCache 的情况,OS 从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。这样,只要下次访问的文件已经被加载至 PageCache 时,读取操作的速度基本等于访问内存。

对于数据文件的写入

OS 会先写入至 Cache 内,随后通过异步的方式由 pdflush 内核线程将 Cache 内的数据刷盘至物理磁盘上。


对于文件的顺序读写操作来说,读和写的区域都在 OS 的 PageCache 内,此时读写性能接近于内存。


  • RocketMQ 的大致做法是,将数据文件映射到 OS 的虚拟内存中(通过 JDK NIO 的 MappedByteBuffer),写消息的时候首先写入 PageCache,并通过异步刷盘的方式将消息批量的做持久化(同时也支持同步刷盘);

  • 订阅消费消息时(对 CommitLog 操作是随机读取),由于 PageCache 的局部性热点原理且整体情况下还是从旧到新的有序读,因此大部分情况下消息还是可以直接从 Page Cache 中读取,不会产生太多的缺页(Page Fault)中断而从磁盘读取。



PageCache 机制也不是完全无缺点的,当遇到 OS 进行脏页回写,内存回收,内存 swap 等情况时,就会引起较大的消息读写延迟。


对于这些情况,RocketMQ 采用了多种优化技术,比如内存预分配,文件预热,mlock 系统调用等,来保证在最大可能地发挥 PageCache 机制优点的同时,尽可能地减少其缺点带来的消息读写延迟。

RocketMQ 存储优化技术

RocketMQ 存储层采用的几项优化技术方案在一定程度上可以减少 PageCache 的缺点带来的影响,主要包括内存预分配,文件预热和 mlock 系统调用。

预先分配 MappedFile

在消息写入过程中(调用 CommitLog 的 putMessage()方法),CommitLog 会先从 MappedFileQueue 队列中获取一个 MappedFile,如果没有就新建一个。


MappedFile 的创建过程是将构建好的一个 AllocateRequest 请求(具体做法是,将下一个文件的路径、下下个文件的路径、文件大小为参数封装为 AllocateRequest 对象)添加至队列中,后台运行的 AllocateMappedFileService 服务线程(在 Broker 启动时,该线程就会创建并运行),会不停地 run,只要请求队列里存在请求,就会去执行 MappedFile 映射文件的创建和预分配工作。

分配的时候有两种策略,

一种是使用 Mmap 的方式来构建 MappedFile 实例,另外一种是从 TransientStorePool 堆外内存池中获取相应的 DirectByteBuffer 来构建 MappedFile。并且,在创建分配完下个 MappedFile 后,还会将下下个 MappedFile 预先创建并保存至请求队列中等待下次获取时直接返回。RocketMQ 中预分配 MappedFile 的设计非常巧妙,下次获取时候直接返回就可以不用等待 MappedFile 创建分配所产生的时间延迟。

文件预热

预热的目的主要有两点;


  • 第一点,由于仅分配内存并进行 mlock 系统调用后并不会为程序完全锁定这些内存,因为其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。其中,RocketMQ 是在创建并分配 MappedFile 的过程中,预先写入一些随机值至 Mmap 映射出的内存空间里。

  • 第二,调用 Mmap 进行内存映射后,OS 只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。程序要访问数据时 OS 会检查该部分的分页是否已经在内存中,如果不在,则发出一次缺页中断。这里,可以想象下 1G 的 CommitLog 需要发生多少次缺页中断,才能使得对应的数据才能完全加载至物理内存中。


RocketMQ 的做法是,在做 Mmap 内存映射的同时进行 madvise 系统调用,目的是使 OS 做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。


public void warmMappedFile(FlushDiskType type, int pages) {        long beginTime = System.currentTimeMillis();        // mappedByteBuffer在java里面对应了mmap的实现        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();        int flush = 0;        long time = System.currentTimeMillis();        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {            byteBuffer.put(i, (byte) 0);            // force flush when flush disk type is sync            if (type == FlushDiskType.SYNC_FLUSH) {                // 同步刷盘机制,OS_PAGE_SIZE为4K                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {                    flush = i;                    mappedByteBuffer.force();                }            }
// prevent gc if (j % 1000 == 0) { log.info("j={}, costTime={}", j, System.currentTimeMillis() - time); time = System.currentTimeMillis(); try { Thread.sleep(0); } catch (InterruptedException e) { log.error("Interrupted", e); } } }
// force flush when prepare load finished if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", this.getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(), System.currentTimeMillis() - beginTime); // 将page cache 这片内存锁定 this.mlock(); }
复制代码

mlock 内存锁定

  • OS 在内存充足的情况下,会将文件加载到 page cache 提高文件的读写效率,但是当内存不够用时,os 会将 page cache 回收掉。试想如果 MappedFile 对应的 pagecache 被 os 回收,那就又产生缺页异常再次从磁盘加载到 pagecache,会对系统性能产生很大的影响。

  • 将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到 swap 空间。对于 RocketMQ 这种的高吞吐量的分布式消息队列来说,追求的是消息读写低延迟,那么肯定希望尽可能地多使用物理内存,提高数据读写访问的操作效率。


RocketMq 在创建完 MappedFile 并且内存预热完成后调用了 c 的 mlock 函数将这片内存锁定了,具体来看下是怎么实现的


// java 调用cLibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);// 具体实现public void mlock() {        final long beginTime = System.currentTimeMillis();        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();        Pointer pointer = new Pointer(address);        {            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);        }
{ int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED); log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime); }}
复制代码

RocketMQ 刷盘机制

写消息时是先写入到 pagecache,rocketMq 提供了两种刷盘机制,同步刷盘和异步刷盘,同步刷盘适用于对消息可靠性比较高的场合,同步刷盘性能比较低下,这样即使系统宕机消息也不会丢失。


同步刷盘

  • RocketMQ 的 Broker 端才会真正地返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。

  • RocketMQ 同步刷盘的大致做法是,基于生产者消费者模型,主线程创建刷盘请求实例—GroupCommitRequest 并在放入刷盘写队列后唤醒同步刷盘线程—GroupCommitService,来执行刷盘动作(其中用了 CAS 变量和 CountDownLatch 来保证线程间的同步)。RocketMQ 中用读写双缓存队列(requestsWrite/requestsRead)来实现读写分离,其带来的好处在于内部消费生成的同步刷盘请求可以不用加锁,提高并发度。

  • 刷盘线程从阻塞队列中获取,刷盘其实就是调用了 mappedByteBuffer.force()方法,刷盘成功后通过 countdownlatch 唤醒刷盘等待的线程,原理很简单>



public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 对应一个单独的线程 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { // GroupCommitRequest 封装了CountDownLatch,GroupCommitService刷盘完毕后唤醒等待线程 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // 异步刷盘 else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
复制代码

异步刷盘

  • 异步刷盘原理 发送消息线程写到 pagecache 成功之后就返回,消息保存在 page cache 中,异步刷盘对应了一个单独线程,刷盘默认一次刷 4 个 pageSize,也就是 16k 的数据。异步刷盘有可能会丢失数据,当 jvm 程序死掉 但机器没有宕机,pagecache 中的脏页还是能人工刷到磁盘的,但是当机器宕机之后,数据就永远丢失了。

  • 能够充分利用 OS 的 PageCache 的优势,只要消息写入 PageCache 即可将成功的 ACK 返回给 Producer 端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ 的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程 wakeup 后,就会继续执行。

RocketMQ 的堆外存储机制

  • RocketMq 提供了堆外内存池机制即 TransientStorePool,TransientStorePool 初始化时实例化 5 个堆外内存,大小和 MappedFile 的大小 1G,然后 mlock 锁定此内存区域。

  • 发送消息时如果开启了堆外内存机制,MappedFile 在实例化时从堆外内存池中获取一个 directBuffer 实例,写消息先写到堆外内存中,然后有单独的线程(CommitRealTimeService)刷到 pagecache,之后再由单独的线程(FlushRealTimeService)从 pagecahce 刷到磁盘。


开启堆外内存池的好处:写消息时先写到堆外内存,纯内存操作非常快。读消息时是从 pagecache 中读,相当于实现了读写分离,但是会存在延时性机制问题,以及对外内存宕机了会丢失,数据一致性会存在问题。



消息生产

所有发送消息的线程是串行执行的,所有 topic 的数据放一块顺序写到 pagecache 中,因此效率十分的高。在写 page cache 成功后,再由单独的线程异步构建 consumerQueue 和 indexFile(基于磁盘实现的 hashMap,实现消息的查找),构建完成 consumerQueue 成功后 consumer 就能消费到最新的消息了,当然构建 consumerQueue 也是顺序写,每次只写入 20 个字节,占用的空间也不大。

消息消费

每个 topic 可以对应多个 consumerQueue,就相当于 kafka 里面的分区概念,Rocketmq 里面的消费者与 consumerQueue 的分配算法和 kafka 的相似。由于 consumerQueue 中只保存了消息在 commitLog 中的 offset、msgSize、tagsCode,因此需要拿到 offset 去 commitlog 中把这条消息捞出来,这时候读相当与随机读。


注意,由前面的 mlock 内存锁定再加上消费的数据一般是最近生产的,数据还在 pagecache 中,对性能的影响也不大,当 consumer 消费很远的数据时,pagecache 中肯定是没有缓存的,这时候 rocketMq 建议 consumer 去 slave 上读

总结

RocketMq 所有 topic 共用一个 commitLog,磁盘顺序写,这一点实现也是参考了 kafka,读消息时根据 consumerQueue 去 commitLog 中吧数据捞出来,虽然是随机读,但是最新的数据一般在 pagecahce 中也无关紧要。使用内存锁定避免内存 swap 交换,堆外内存和 pagecache 的读写分离。

发布于: 刚刚阅读数: 2
用户头像

浩宇天尚

关注

🏆InfoQ写作平台-签约作者🏆 2020.03.25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」系统服务底层原理以及高性能存储设计分析