一张图进阶 RocketMQ - 消息存储
前言
三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-消息存储(视频版)
本文是“一张图进阶 RocketMQ”第 5 篇,对 RocketMQ 不了解的同学可以先看看前面 4 期:
前面两期我们主要分享了 RocketMQ 是如何将消息发送出去的,现在消息已经被 Netty 送上路了,接力棒已经交给了 Broker。如果我们自己来实现 Broker 会怎么实现呢?首先肯定得把消息存起来吧,不然宕机了,消息丢失了,那就离大谱了。
可是消息要以什么结构存储呢?二进制、JSON、PB?从功能上来看肯定都是可以的,那 RocketMQ 到底是怎么搞的?
解决了存储结构问题,那消息存到哪里呢?数据库,本地文件,还是对象存储服务器?从功能的角度肯定也都是可以的。可是,哪家数据库可以支持单机十万级吞吐量?那我直接统统存到数据库得了,瞎折腾些啥。难道存在本地文件就可以了?我们自己实现不可以,但是 RocketMQ 可以,那 RocketMQ 有什么黑科技呢?
所以我们今天就来聊一聊 Broker 如何存储消息,【首先明确我们的目标】我们需要先了解 RocketMQ 的存储结构,也就是消息是如何组织的。了解了存储结构,我们才能更好的理解存储流程,不然我们不知道为什么流程是这样的。最后我们需要了解有哪些机制支撑 RocketMQ 单机十万级吞吐量。
存储架构
消息在 Broker 上的存储结构如上图,所有相关文件放在 ROCKETMQ_HOME 下,有哪些文件呢?存放消息本身的 CommitLog,以及消息的索引文件 ConsumeQueue 和 IndexFile:
CommitLog
从物理结构上来看,所有的消息都存储在 CommitLog 里面,其实就是所有的消息按照“消息在 CommitLog 各字段示意图”所示,挨个按顺序存储到文件中。
单个 CommitLog 文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量。比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。CommitLog 顺序写,可以大大提高写入效率。
但是问题来了,消息发送的时候我们指定了 Topic,现在所有 Topic 都顺序个写入到 CommitLog,存入的时候是安逸了(顺序写),但是获取消息可就麻烦了。如果我要获取某个 Topic 的消息,需要遍历 commitlog 文件,根据 topic 过滤消息。CommitLog 这个渣男,只管自己爽。有什么办法可以提高消息查询效率呢?
ConsumeQueue
我们再回忆一下,消息存入的时候是指定了 Topic,同时我们也说了每个 Topic 会对应多个 ConsumeQueue( queueId 标识)。关键就在 ConsumeQueue 上,ConsumeQueue 是指定 Topic 消息的索引文件,怎么理解呢?从“消息在 ConsumeQueue 各字段示意图”可知,每个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M。ConsumeQueue 文件可以看成是基于 topic 的 commitlog 索引文件。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。
因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 CommitLog 完全恢复出来。ConsumeQueue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
IndexFile
IndexFile 是另一种可选索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法。 IndexFile 索引文件其底层实现为 hash 索引,类似于 Java 1.7 HashMap,计算 Key 的 hashcode,hashcode 取余得到 hash 槽,拉链法解决哈希冲突。Index 文件的存储位置是:{fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引。
所以,RocketMQ 消息存储架构主要有 CommitLog,ConsumeQueue,IndexFile 构成。我们发送一条消息,会先格式化成“消息在 CommitLog 各字段示意图”中的样子,顺序写入 CommitLog 中,然后 Broker 会按照 ”消息在 ConsumeQueue 各字段示意图“所示构建一条索引记录,存入该消息所属 Topic 的 ConsumeQueue 索引文件中。如果有 IndexFile,还会构建 IndexFile。
现在我们已经知道了 RocketMQ 消息的存储结构,接下来我们的就要了解 RocketMQ 是如何构建 CommitLog、ConsumeQueue 和 IndexFile,以及 RocketMQ 如何保证性能,支撑单机十万级吞吐量的?这是本文的主要目标,一定要抓住主要目标,不要走丢咯。
启动流程
了解了 RocketMQ 消息在磁盘中是怎么存储的,我们就可以来看看具体的存储流程了。首先,还是先来看看 Broker 的启动流程。初始化过程都是这个鸟样,只看初始化过程完全不知所云,但是不看初始化过程,直接看具体执行流程也是摸不着头脑,一堆组件不知道从哪里来的,所以我们还是先耐着性子大致看看。但这并不是我们关注的重点,注意几个关键点即可。
初始化启动环境。部署好 RocketMQ 后,执行/bin/mqbroker 脚本,主要用于设置 RocketMQ 目录环境变量,例如 ROCKETMQ_HOME 。然后调用 ./bin/runbroker.sh 进入 RocketMQ 的启动入口,主要设置了 JVM 启动参数,比如 JAVA_HOME、Xms、Xmx。执行 main 函数。
初始化 BrokerController。该初始化主要包含 RocketMQ 启动命令行参数解析、NettyRemotingServer 初始化、Broker 各个模块配置参数解析、Broker 各个模块初始化、进程关机 Hook 初始化等过程。
启动 RocketMQ 的各个组件。但是这些组件并不是每一个都是核心组件,部分组件会在后面的流程中使用,这里混个眼熟,如果后面流程没有提及的大家可以暂且跳过,我们的目标是把握 RocketMQ 的核心内容,而不是每个细节。
MessageStore:存储层服务,比如 CommitLog、ConsumeQueue 存储管理,消息刷盘,索引构建等。
RemotingServer:普通通道请求处理服务。一般的请求都是在这里被处理的。
FastRemotingServer:VIP 通道请求处理服务。如果普通通道比较忙,那么可以使用 VIP 通道,一般作为客户端降级使用。
BrokerOuterAPI:Broker 访问对外接口的封装对象。
PullRequestHoldService:Pull 长轮询服务。
ClientHousekeepingService:清理心跳超时的生产者、消费者、过滤服务器。
FilterServerManager:过滤服务器管理。
存储流程
在前面 RocketMQ 存储结构中我们了解了 RocketMQ 将所有消息顺序写入 CommitLog,然后构建 ConsumeQueue/IndexFile 索引文件,所以这个小结我们主要的目标就是看看这些文件是如何构建的。
Broker 启动流程中很关键的一点是启动了 NettyRemotingServer,在 RocketMQ 通信机制(视频) 中我们介绍过 Broker(NettyRemotingServer) 初始化会监听端口等待客户端连接,当客户端发送请求的时,NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler.channelRead0() 处理数据。接着调用链到 processRequestCommand 方法,这个方法主要是根据请求中的 RequestCode,从本地缓存 processorTable 中获取相应的 Processor 来执行后续逻辑。处理器是什么?处理器的缓存从哪里来?
Processor 就是用来处理特定请求的执行者,例如,生产者存入消息使用 SendMessageProcessor,查询消息使用 QueryMessageProcessor,拉取消息使用 PullMessageProcessor。在 Broker 启动流程中有一步是注册 Processor,以 RequestCode 为 Key ,Processor 为值,添加到 processorTable 缓存中。接着 RocketMQ 消息发送(视频) 流程来看,当生产者的请求达到 Broker,Broker 获取的 Processor 应为 SendMessageProcessor。封装一个 Runable 对象,run 方法内调用 SendMessageProcessor.processRequest ,提交到线程池,继续后面的处理。
SendMessageProcessor.processRequest 调用 sendMessage 方法,主要包含消息的校验及重试逻辑处理,然后调用存储模块 DefaultMessageStore 存储消息。消息校验:校验 Broker 是否配置可写,校验 Topic 名字是否为默认值,获取或创建 topicConfig,判断 queueId 是否超过限制。重试消息处理:消费者消费失败后会将消息发回给 Broker,这里我们暂且认为就是生产者发送的请求,先看下面的流程。
DefaultMessageStore.putMessage 只是做了很多的校验,简单看看即可。包括:如果当前 Broker 停止工作则拒绝消息写入、Broker 为 SLAVE 角色则拒绝消息写入、当前 RocketMQ 不支持写入则拒绝消息写入、主题长度超过 256 个字符则拒绝消息写入、消息属性长度超过 65536 个字符则拒绝消息写入、PageCache 忙则报错。然后调用 CommitLog.putMessage 存入消息。
看到这里应该稍微熟悉一些了,终于到我们期待已久的 CommitLog 出场了。主要是延迟消息处理,然后获取可以写入的 CommitLog 进行写入。延迟消息处理:如果消息的延迟级别大于 0,将消息的原主题名称与原消息队列 ID 存入消息属性中,用延迟消息主题 SCHEDULE_TOPIC、消息队列 ID 更新原先消息的主题与队列,这是并发消息消费重试关键的一步。但不是这个本节的主要目标,后文会进一步分析。关键点在如何获取可以写入的 CommitLog。存储结构小节里面有提到每个 CommitLog 默认大小 1G,写完一个文件,以偏移量命名创建下一个文件。每个 1G 大小 CommitLog 的在代码层面对应的是 MappedFile,而多个 MappedFiled 组成 MappedFileQueue。逻辑上的 CommitLog 通过持有 MappedFileQueue 管理多个 MappedFile。所以,获取可以写入的 CommitLog 也就是获取 MappedFileQueue 最后一个 MappedFile,为什么是最后一个,因为前面的已经写完了呀。来看看 RocketMQ 逻辑与物理存储的对应关系应该能够更直观的理解。
获取到最后一个 MappedFile 后,调用 MappedFile.appendMessage 将消息追加到该文件中。可是尽管是顺序写入,但是连小学生都知道写磁盘还是很慢,难道想这样支撑 RocketMQ 单机十万吞吐量?too young too simple!从逻辑存储结构和物理存储结构的映射关系来看,MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),通过 fileChannel 可以访问物理 CommitLog 文件,但是 RocketMQ 并没有直接使用 fileChannel,而是映射到一个 MappedByteBuffer,我们的目的就是把消息写入这个 ByteBuffer 中,进而写入 MappedFile 对应的 CommitLog 文件。为什么需要这样做,还有哪些细节,会在”文件内存映射“小结中为大家解答。
继续看流程,得到 MappedFile 对应的 ByteBuffer,我们需要将消息序列化,写入 ByteBuffer 中。
构建消息 id, createMessageId
获取该消息在消息队列的偏移量,CommitLog 中保存了当前所有消息队列的当前待写入偏移量。
判断是否是事务消息:这里主要处理 Prepared 类型和 Rollback 类型的消息,设置消息 queueOffset 为 0
计算消息总大小,calMsgLength。
判断文件的剩余空间,是否足够写入当条消息,如果不可以,则将文件末尾写入剩余空间大小+固定魔数;然后返回一个 END_OF_FILE 的结果
如果空间足够,这将这条消息写入之前得到的 MappedFile 的 ByteBuffer 中。
将各字段按照”消息在 CommitLog 各字段示意图“存入 Bytebuffer,然后返回 PUT_OK 结果
总结
以上就是今天 RocketMQ 消息存储的主要内容,消息只是写入到 CommitLog 对应的 ByteBuffer 中,下一期就是我们重要的零拷贝即将登场。我们简单总结一下今天的内容:
要理解消息的存储流程需要先知道消息的存储结构:在物理上消息挨个顺序写入 CommitLog,为了提升消息查询效率需要构建消息的索引文件 ConsumeQueue/IndexFile;
Broker 启动时进行参数解析,并初始化了 NettyRemotingServer,启动存储服务用于消息存储及索引构建等;
Broker 收到消息存储请求,经过层层校验,获取 CommitLog 对应的 MappedFile,将消息写入 MappedFile 对应的内存映射 ByteBuffer;
以上就是今天全部的内容,如果觉得本期的内容对你有用的话记得点赞、关注、转发、收藏,这将是对我最大的支持。
如果你需要 RocketMQ 相关的所有资料,可以评论区留言,或者关注公众号:三此君。回复:mq,即可。
消息已经写入 ByteBuffer,写入 ByteBuffer 就可以了吗?那收到消息直接丢弃岂不是更好。消息要落在磁盘上才不会丢失,所以下一期我们要分享的就是消息的刷盘及索引构建,PageCache 及零拷贝也将闪亮登场。感谢观看,下期不见不散。
参考文献
丁威, 周继锋. RocketMQ 技术内幕:RocketMQ 架构设计与实现原理. 机械工业出版社, 2019-01.
李伟. RocketMQ 分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
杨开元. RocketMQ 实战与原理解析. 机械工业出版社, 2018-06.
版权声明: 本文为 InfoQ 作者【三此君】的原创文章。
原文链接:【http://xie.infoq.cn/article/1665cc49ecf3548fe80959e51】。文章转载请联系作者。
评论