RocketMQ 存储机制浅析
整体架构
RocketMQ 是一个典型的发布订阅系统,通过 Broker 节点中转和持久化数据、解耦上下游。Broker 是真实存储数据的节点,由多个水平部署但不一定完全对等的副本组构成,单个副本组的不同节点的数据会达到最终一致。RocketMQ 优异的性能表现,绕不开其优秀的存储模型 。
存储机制设计
在存储方式上,RocketMQ/Kafka/RabbitMQ 均采用的是消息刷盘至所部署虚拟机/物理机的文件系统做持久化。ActiveMQ(默认采用的 KahaDB 做消息存储)可选用 JDBC 做消息持久化,通过简单的 xml 配置信息即可实现 JDBC 消息存储。使用文件系统做持久化的情况下,可获得更高效的 I/O 读写。
Broker Store 目录结构
CommitLog
RocketMQ Broker 单个实例下所有的 Topic 都使用同一个 CommitLog 来存储,即单个实例消息整体有序。CommitLog 单个文件大小默认 1G,文件文件名是起始偏移量,总共 20 位,左边补零,起始偏移量是 0。假设文件按照默认大小 1G 来算:
第一个文件的文件名为 00000000000000000000 ,当第一个文件被写满之后,开始写入第二个文件;
第二个文件的文件名为 00000000001073741824 ,1G=1073741824=1024*1024*1024;
第三个文件的文件名是 00000000002147483648,(文件名相差 1G=1073741824=1024*1024*1024)。
CommitLog 按照上述命名的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。
消息存储格式
MagicCode:MagicCode 是一个特殊的字段,它可以标志 Buffer 中的某个 CommitLog 是一个正常的 CommitLog,还是因为 Buffer 没有多余的空间存放该 CommitLog,导致该 CommitLog 是一个空的 CommitLog。MagicCode 有两个值,如下所示:
BodyCRC:CRC 即循环冗余校验码,是数据通信领域中最常用的一种查错校验码,通过 CRC 就可以知道数据的正确性和完整性。RocketMQ 通过 CRC 来校验消息部分:
QueueId:消息发往哪个队列,QueueId 在 Producer 发送消息时会选择出来。
QueueOffset:存放了消息记录应该在 ConsumerQueue 中的位置,这样构建 ConsumerQueue 的时候,就知道该条记录在 ConsummerQueue 的位置顺序,在消费消息的时候很有用处。
PhysicalOffset:消息在 CommitLog 中的物理位置。需要注意的是,我们 CommitLog 对应着磁盘上的多个文件,这里的偏移量不是从某个文件开始算的,而是从第一个文件偏移开始算起的。
SysFlag:是 RocketMQ 内部使用的标记位,通过位运算进行标记。例如是否对消息进行了压缩、是否属于事务消息。SysFlag 初始值为 0,可与下面的标记进行位运算。
BornTimestamp:Producer 发送消息的时间。
BornHost:Producer 发送消息使用的套接字地址。
StoreTimestamp:消息在 Broker 上存储时间。
StoreHostAddress:Broker 的套接字地址,存储方式同 BornHost。
ReconsumeTimes:重复消费次数,初始为 0。Broker 重试的时候,这个 ReconsumeTimes 就会 +1,默认最大重试次数是 16 次。
PreparedTransactionOffset:事务消息相关的一个属性(RocketMQ 事务消息基于两阶段提交)。
Properties:存放了 RocketMQ 内部用到的一些属性,也存放了用户的一些属性。
顺序写
RocketMQ 的 Commitlog 文件、Consumequeue 文件都是顺序写入的。磁盘顺序写入速度可以达到几百兆/s,而随机写入速度只有几百 KB /s,相差上千倍。
PageCache 机制
Broker 在将消息顺序写入 CommitLog,大大提升性能。但还不够,毕竟仍是磁盘 I/O 操作,要想进一步提升性能,须利用内存。所以 Broker 将数据写入 CommitLog 文件时,不是直接写入物理磁盘文件,而是先进入 OS 的 PageCache 内存缓存,后续由 OS 后台线程异步将 PageCache 数据刷入底层磁盘文件。消费消息时,采用随机读的方式,由于 PageCache 局部性热点原理且整体情况下还是从旧到新的有序读,大部分 Case 消息还能直接从 Page Cache 读,不会产生太多缺页(Page Fault)中断而从磁盘读取。
异步刷盘若 Broker 将消息写入 PageCahe 并响应给生产者后突然宕机,此时消息在缓存中没有写入底层磁盘文件,就会造成消息丢失:生产者认为发送成功,实际上消息写入失败。
遇到 OS 进行脏页回写,内存回收,内存 Swap 等情况时,可能引起较大的消息读写延迟。
扩展: Java NIO 基于零拷贝的实现
mmap:
FileChannel#map():把文件对象映射到虚拟内存。
MappedByteBuffer/DirectByteBuffer.get(): 获取内存数据。
因其占用虚拟内存(非 JVM 的堆内存),不受 JVM -Xmx 参数限制,但其大小也受到 OS 虚拟内存大小限制。一般一次只能映射 1.5~2G 的文件至用户态的虚拟内存空间,这也是为何 RocketMQ 默认设置单 CommitLog 日志数据文件为 1G。
sendfile:
FileChannel.transferFrom()/transferTo():底层调用了 sendfile()内核函数。
RocketMQ 选择 mmap 原因:
(1) sendfile 在用户态不可见,而当前场景下有读有写。
(2) 在 Linux 系统中对于 1G 的文件,mmap 处理的性能要优于 sendfile。
ConsumeQueue
ConsumeQueue 不负责存储消息,只负责记录它所属 Topic 的消息在 CommitLog 中的偏移量,这样当消费者从 Broker 拉取消息的时候,就可以快速根据偏移量定位到消息。
ConsumeQueue 存储的格式如下,包含起始物理位置偏移量
,消息长度
,消息Tag的哈希值
,总共 20B:
每个 ConsumeQueue 都有一个 QueueId,QueueId 的值为 0 到 TopicConfig 配置的队列数量。比如某个 Topic 的消费队列数量为 4,那么四个 ConsumeQueue 的 QueueId 就分别为 0、1、2、3。
消费者消费时会先从 ConsumeQueue 中查找消息在 CommitLog 中的 Offset,再去 CommitLog 中找原始消息数据。如果某个消息只在 CommitLog 中有数据而没在 ConsumeQueue 中则消费者无法进行消费。
ConsumeQueue 类对应的是每个 topic 和 queuId 下面的所有文件。默认存储路径是 $HOME/store/consumequeue/{topic}/{queueId}/{fileName},每个文件由 30w 条数据组成,单个文件的大小是 30w x 20Byte,即每个文件为 600w 字节,单个消费队列的文件大小约为 5.722M=(600w/(1024*1024))。
ConsumeQueue 构建流程:
IndexFile
Broker 除了通过 ConsumeQueue 提供给 Consumer 消费之外,还支持通过 MsgID 或者 MessageKey 来查询消息;使用 ID 查询时,因为 ID 就是用 broker+offset 生成的(这里 MsgID 指的是服务端的),所以很容易就找到对应的 CommitLog 文件来读取消息。对于用 MessageKey 来查询消息,MessageStore 通过构建一个 Index 来提高读取速度。IndexFile 结构如下图:
Checkpoint
Checkpoint 主要记录 CommitLog、ConsumeQueue、Index 文件的刷盘时间点,如果在上一次 Broker 异常结束时,会根据 StoreCheckpoint 的数据进行恢复。
火山引擎基于字节跳动内部的大规模实践,推出的消息队列产品包括消息队列 Kafka / RabbitMQ / RocketMQ 版及云原生消息引擎 BMQ,欢迎咨询了解!
参考资料
https://zhuanlan.zhihu.com/p/574215600?utm_id=0
来源团队|字节跳动 IBF 业财研发部
版权声明: 本文为 InfoQ 作者【字节跳动云原生计算】的原创文章。
原文链接:【http://xie.infoq.cn/article/a8bde53339e362f19ae1d8c66】。文章转载请联系作者。
评论