数据存储,消息队列的高可用保障
1 介绍
在之前的章节中,我们介绍了消息的发送 和 消息通信 的原理。但是这边有一个比较核心的关键点,那就是如果已经把消息传递给 Broker。在 Broker 在被消费之前,如何保证消息的稳定性,避免消息丢失和数据。这时候就需要数据持久化数据来进行保障了。根据之前我们 MQ 系列 2:消息中间件的技术选型 章节做的分析,RabbitMQ 支持 1W+ 级别的吞吐,Kafka 和 Rocket 支持 10W+ 级别的吞吐,想要实现这么大的吞吐,必须具备一个很强悍的存储功能。下面我们来看看。
2 Broker 存储架构
RocketMQ 采用文件存储机制(类似 Kafka),即直接在磁盘上使用文件来保存消息,而不是采用 Redis 或者 MySQL 之类的持久化工具。它会把消息存储所属相关的文件存储在 ROCKETMQ_HOME 下,包含三个部分:
2.1 CommitLog 消息元数据
存储消息的元数据,所有消息都会顺序存入到 CommitLog 文件中。CommitLog 由多个文件组成,每个文件固定大小 1G。它有如下特征:
单个文件默认大小为 1G
文件名称长度 20,保存偏移量,偏移量不够 20 位的补 0。
如第 1 个文件没有偏移量,则为:00000000000000000000
第 2 个文件起始偏移量为 1073741824(1G=1073740842),则文件名为 00000000001073741824。
第一个 1G 文件文件写满之后之后转入第 2 个文件,如此反复,因为是顺序的,所以写入效率较高。
2.2 ConsumeQueue 消息逻辑队列
ConsumeQueue 是指存储消息在 CommitLog 上的索引,一个 MessageQueue 一个文件,记录当前 MessageQueue 被哪些消费者组消费到了哪一条 CommitLog。它有如下特征:
ComsumeQueue 的结构组成共 20 个字节,包含 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode
ConsumeQueue 里只存偏移量信息,内容精悍。加载到内存中,操作效率非常高。
一致性保障,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,在 ConsumeQueue 丢失或者故障时候,数据可快速回复。
因为每个 Topic 下可能有多个 Queueu,所以存储结构为:HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
2.3 IndexFile 索引文件
IndexFile 是一种可选索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法,并且这种查找消息的方法不影响发送与消费消息的主流程。它的特征如下:
算法原理:IndexFile 索引文件的底层实现 为 hash 索引,可以对照 Java 的 HashMap 比较,通过计算 Key 的 hashcode, 取余获得 hash 槽,并通过拉链法解决哈希冲突。
大小限制:IndexFile 以创建时间戳命名,单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引。
通过上面的三个部件说明可以了解到,RocketMQ 消息存储结构主要是由 CommitLog,ConsumeQueue,IndexFile 三部分组成的。当我们发送消息的时候,会执行如下过程:
消息格式化成 CommitLog 的字段结构,并按照顺序写入到 CommitLog 文件中。
Broker 会按照 ConsumeQueue 的字段结构的要求创建一条索引记录。
按需创建 IndexFile 索引文件。
3 存储的执行过程
通过上面我们已经了解到了,Kafka 和 Rocket 均支持 10W+ 级别的吞吐,那么上述的存储结构是如何保持这样的高超性能的呢?
之前的章节我们已经了解到,Broker 启动时同步启动 NettyRemotingServer 进行端口监听,等坐等客户端的连接。
当客户端发送请求时,NettyRemotingServer WorkerGroup 处理可读事件,执行 processRequestCommand 处理来源消息数据。
接收到消息之后就需要存储下来了,DefaultMessageStore 对数据进行校验,校验如下,校验完成之后发送存储指令。
Broker 无响应时拒绝消息写入
Broker 角色 为 SLAVE 时也拒绝写入
判断是否支持写入,不支持写入时也拒绝
topic length 小于等于 256 字符,否则拒绝消息写入
消息 length 小于等于 65536 字符,否则拒绝消息写入
PageCache 繁忙时报错误消息,无法写入
DefaultMessageStore 调用 CommitLog.putMessage 存入消息
获取可以写入的 CommitLog 进行写入
CommitLog(每个 CommitLog 默认 1G 大小) 对应 MappedFile(程序视角),当有多个 MappedFiled 时,组成 MappedFileQueue。
MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),但并没有通过 fileChannel 直接访问物理 CommitLog 文件,而是映射到一个 MappedByteBuffer,并把序列化后的消息写入这个 ByteBuffer 中,已达到提升执行效率的目的。
最后写入 MappedFile 相对应的 CommitLog 文件中。
4 总结
理解好 RabbitMQ 中 Broker 存储的组成要素 CommitLog,ConsumeQueue,IndexFile。
当 Broker 收到消息存储请求时,通过调用 CommitLog 对应的 MappedFile,把消息写入 MappedFile 的 MeppedByteBuffer(内存映射)。
5 参考资料
更多 C++后台开发技术点知识内容包括 C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,MongoDB,ZK,流媒体,音视频开发,Linux 内核,TCP/IP,协程,DPDK 多个高级知识点。
C/C++Linux服务器开发高级架构师/C++后台开发架构师免费学习地址
【文章福利】另外还整理一些C++后台开发架构师 相关学习资料,面试题,教学视频,以及学习路线图,免费分享有需要的可以点击领取
原文链接:MQ 系列 8:数据存储,消息队列的高可用保障 - Hello-Brand - 博客园
评论