走进 RocketMQ- 消息存储与消费
前言
Halo,我是白裤。
上一次我们学习了 RocketMQ 的几种部署模式,并且简单地对 RocketMQ 进行了 Dledger 模式的部署实战,今天,我们将了解 RocketMQ 的消息存储设计与消费原理,话不多说,让我们立即进入学习吧。
消息存储
消息存储过程
RocketMQ 在消息存储方面的设计相对于其他模块来说是比较复杂的,我们先看着图来理解一下消息存储的整个过程。
Producer 从 NameServer 中拉取 Topic 路由信息至本地,然后在本地中通过负载均衡的策略,选择其中一个 MessageQueue,并且向该 MessageQueue 所在的 Broker 进行消息的投递,投递的过程中,Broker 对投递的消息进行了一系列的操作与处理。
在上图中,我们可以看到 Broker 在收到消息的时候,会将消息存储到 CommitLog 中,然后找到 MessageQueue 对应的 ConsumeQueue,向 ConsumeQueue 同步信息,同步的信息内容包括:[commitLogOffset、messageSize、tagHashCode],commitLogOffset:消息在 CommitLog 中的存储位置,messageSize:消息的大小,tagHashCode:消息的 tag 哈希值。ConsumeQueue 会将这些信息进行存储,在 ConsumeQueue 中还会记录当前 ConsumeQueue 的消费进度 consumeOffset 以及存储的最大位置 maxOffset,介于 consumeOffset 与 maxOffset 之间的即是还未消费的内容,记录这些信息是用来提高 Consumer 进行消费时的性能,让 Consumer 迅速定位到当前消费的进度位置,继续消费后续未消费的内容。
Broker 除了同步消息信息到 ConsumeQueue 之外,还会同步消息信息到 IndexFile 中,分发到 IndexFile 的信息包括:[keyHash、commitLogOffset、timeStamp、nextIndexOffset],keyHash:消息 key 的哈希值,commitLogOffset:消息在 CommitLog 文件中的存储位置,timeStamp:消息存储的时间差,nextIndexOffset:消息索引数据的下个节点的位置值,IndexFile 会将这些信息存储起来便于消息的快速查询,比如完成通过 key 查询消息内容等这些需求。
消息存储设计
通过上文中的描述,我们可以知道 RocketMQ 的存储设计主要有 CommitLog、ConsumeQueue、IndexFile 三种文件,CommitLog 主要是存储消息的完整内容信息,是消息真正存储的地方,ConsumeQueue 主要存储消息消费信息,为消费者提供待消费查询并以此来提高消费性能,IndexFile 则是消息的索引信息,方便对消息的检索,下面我们来分别对三种文件进行详细的分析:
CommitLog
CommitLog 是 RocketMQ 真正存放消息内容的文件,CommitLog 文件的大小被设计成 1G(至于为什么默认是 1G 我们后面再讲),当文件被写满了,则开始写入下一个文件,文件的命名规则是 20 位长度的数字,记录文件的开始字节数,比如第一个文件命名是 00000000000000000000,当第一个文件写满之后,创建第二个文件继续写,由于一个 CommitLog 文件默认是 1G,则第二个文件命名为 00000000001073741824,以此类推下去。
ConsumeQueue
ConsumeQueue 文件结构如下图所示:
ConsumeQueue 是 RocketMQ 用来提高消费性能的,消费者可以直接从 ConsumeQueue 中查询到待消费的消息信息,并通过 commitLogOffset 从 CommitLog 中获取消息内容,我们知道一个 CommitLog 中是有多个 topic 的消息存储的,如果消费者自己通过 topic 去 CommitLog 中检索,那么效率会非常低下,有了 ConsumeQueue,消费者非常方便高效地从 ConsumeQueue 中检索出待消费的消息在 CommitLog 的物理偏移量,从而拉取完整的消息内容。ConsumeQueue 中保存了消息的相关信息条目,每个条目信息包括消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值,每个条目占 20 字节,物理偏移量 commitLogOffset 占 8 字节,消息大小 size 占 4 字节,Tag 的 hashCode 值占 8 字节。ConsumeQueue 单个文件存放 30W 个条目信息,文件大小为 8*30W=5.7M。
IndexFile
IndexFile 文件结构如下图所示:
IndexFile 是一个用于检索消息的文件存储,文件中有三大块:Header、SlotTable、IndexLinkedList。
Header:存放一些统计信息,其中包括 beginTimeStamp:第一个索引数据存储的时间戳,endTimeStamp:最后一个索引数据存储的时间戳,beginPhyOffset:第一个索引数据在 commitLogOffset,endPhyOffset:最后一个索引数据的 commitLogOffset,hashSlotCount:哈希槽位数,indexCount:索引数。
SlotTable:一个哈希槽表,每个槽保存单向链表的头,指向单链表头节点,这里的数据结构跟 HashMap 有点类似,解决冲突的方案用的链表。
IndexLinkedList:单向链表,链表中每个节点才是具体的索引数据,索引数据中包含了 keyHash、commitLogOffset、timeStamp、nextIndexOffset 值,其中 keyHash 占 4 字节,commitLogOffset 占 8 字节,timeStamp 占 4 字节,nextIndexOffset 占 4 字节,所以每个索引数据占 20 字节。
在 IndexFile 中,Header 存放数据占 20 字节,SlotTable 可以存放 500W 的槽,每个槽存放的数据占 4 个字节,而 IndexLinkedList 则可以存放 2000W 的索引数据,所以整个 IndexFile 文件的大小是固定的,为 20+4 * 500W+20 * 2000W=420000040 字节。
消息消费
消息消费过程
消费者从 Broker 中获取消息内容的大概过程如下图:
首先 Consumer 对自己订阅的 Topic 的 MessageQueue 所在的 Broker 发起消息拉取请求,Broker 会根据 MessageQueue 对应的 ConsumeQueue 找到当前 ConsumeQueue 的待消费的位置 consumeOffset,然后根据 consumeOffset 来获取当前待消费的进度条目数据信息,从进度条目数据信息中的 commitLogOffset 去 CommitLog 中找到对应物理位置的消息内容,将消息内容交给 Consumer,最后 Consumer 消费成功后再标记出新的 consumeOffset 值。
消息消费设计
那么消息消费的过程 RocketMQ 是怎么设计的呢?我们来看看具体的消费设计。消费者消费消息在 RocketMQ 中,分为以下几个步骤:
启动阶段
首先 Consumer 启动的时候,会向 Broker 发送心跳,心跳信息包括了 Consumer 所属的消费者组名称、消费者 id 以及订阅关系信息等,Broker 收到后会将这些信息存储下来,这样 Broker 本地就会拥有所有 Consumer 的相关信息等,方便后续为 Consumer 在拉取消息前做负载均衡的时候提供数据支持,比如让 Consumer 能够知道当前订阅某个 topic 的 Consumer 有多少个等信息。
队列分配
Consumer 在消费某个 Topic 的时候,会向 Broker 获取这个 Topic 下面的 MessageQueue 信息、订阅了这个 Topic 的 Consumer 信息,然后选择一定的策略(这里 RocketMQ 提供了几种策略:a.平均分配策略、b.环形平均分配、c.用户自定义配置、d.机房负载策略、e.附近机房策略、f.一致性哈希策略)进行 MessageQueue 的分配,假如 TopicA 分配有 4 个 MessageQueue,当前订阅 TopicA 的 Consumer 有 4 个,这个时候分两种消费模式:1.广播模式、2.集群模式,当消费模式是广播模式的时候,每个 Consumer 会负责所有 MessageQueue 的消费;当消费模式是集群模式的时候,假如此时队列分配策略用的是平均分配策略,那么此时 TopicA 的 MessageQueue 会平均的分配到每个 Consumer 上,即 4 个 MessageQueue,4 个 Consumer,那么每个 Consumer 都分配了 1 个 MessageQueue,此时 Consumer 只会消费分配给它的 MessageQueue 的消息。
广播模式:消费者组中的所有消费者都会消费每个队列的消息集群模式:消费者组中消费者只会消费分配给它的队列的消息,即一个消息只会被同一个消费者组中的一个消费者所消费
消息拉取
当 MessageQueue 分配完成后,首先 Consumer 会为分配给自己的每个 MessageQueue 创建对应的消息处理队列 ProcessQueue,然后由消息拉取服务 PullMessageService 内部的消息拉取请求队列 pullRequestQueue 对 MessageQueue 进行消息的拉取,将拉取到的消息加入到消息处理队列 ProcessQueue,并且提交一个消费请求到消息消费服务 ConsumeMessageService,最后重新提交下一个消息的拉取任务。
消费处理
当消息被提交到消息消费服务 ConsumeMessageService 的时候,有两种形式对消息进行消费处理,主要作用是触发消费逻辑,将消息消费的结果进行处理,第一种是并发消费,第二种是顺序消费。
并发消费
并发消费是业务消息没有限定会被投递到哪个 MessageQueue,消息可能会被投递到不同的 MessageQueue,并且 Consumer 也可能会开启多线程模式进行消费。当消息消费返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 时,Broker 会对该消息复制出一个相同属性的全新消息,进入重试队列,该新消息跟其他消息一样也会存储进 CommitLog 文件,该新消息按照"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"的延迟进行重试,直至超过最大次数失败后进入死信队列。
顺序消费
顺序消费是同一业务的消息被投递到同一个 MessageQueue 中,由队列的天然先进先出的顺序来进行消费,并且消费端必须用单线程的形式进行消费,才能达到顺序消费的效果。当消息消费失败返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 的时候,此时 MessageQueue 会被挂起,当前消息会重试消费,超过消费的最大次数时将会进入死信队列,当消息进入死信队列时,Broker 会默认消息消费成功,此时,后续的消息才得以继续进行消费。
小结
本章节我们主要学习了 RocketMQ 的消息存储与消费,下面我们来做一个小结:
首先我们了解了消息在 RocketMQ 中是怎么从 Producer 发送到 Broker 的,我们还对 Broker 中的存储设计进行了详细的讲解,知道了消息在 Broker 中是如何存储的,其主要由三种文件完成存储:
CommitLog
ConsumeQueue
IndexFile
然后我们又继续学习了消息的消费过程,明白了 Consumer 是如何对消息进行拉取的,之后我们学习了消息消费的细节设计,通过这些设计我们对 Consumer 消费的一个工作原理更加清楚了,主要有以下工作内容:
Consumer 会向 Broker 发送心跳信息;
Topic 的队列会被负载均衡策略分配在 Consumer 上;
Consumer 的消费模式分为广播模式和集群模式;
Consumer 对消息的拉取有专门的拉取服务,对拉取到的消息有专门的处理队列任务来进行处理;
Consumer 的对消息的消费分为并发消费和顺序消费;
好了,今天我们就学习到这里,下一次我们将学习 RocketMQ 的高性能网络设计,一起看看 RocketMQ 的网络通信是如何设计的。
版权声明: 本文为 InfoQ 作者【白裤】的原创文章。
原文链接:【http://xie.infoq.cn/article/acc8af2d4d3674f4d9d6fbfb3】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论