写点什么

走进 RocketMQ- 消息存储与消费

作者:白裤
  • 2023-03-01
    广东
  • 本文字数:3943 字

    阅读完需:约 13 分钟

走进RocketMQ-消息存储与消费

前言

Halo,我是白裤。


上一次我们学习了 RocketMQ 的几种部署模式,并且简单地对 RocketMQ 进行了 Dledger 模式的部署实战,今天,我们将了解 RocketMQ 的消息存储设计与消费原理,话不多说,让我们立即进入学习吧。

消息存储

消息存储过程

RocketMQ 在消息存储方面的设计相对于其他模块来说是比较复杂的,我们先看着图来理解一下消息存储的整个过程。



Producer 从 NameServer 中拉取 Topic 路由信息至本地,然后在本地中通过负载均衡的策略,选择其中一个 MessageQueue,并且向该 MessageQueue 所在的 Broker 进行消息的投递,投递的过程中,Broker 对投递的消息进行了一系列的操作与处理。


在上图中,我们可以看到 Broker 在收到消息的时候,会将消息存储到 CommitLog 中,然后找到 MessageQueue 对应的 ConsumeQueue,向 ConsumeQueue 同步信息,同步的信息内容包括:[commitLogOffset、messageSize、tagHashCode],commitLogOffset:消息在 CommitLog 中的存储位置,messageSize:消息的大小,tagHashCode:消息的 tag 哈希值。ConsumeQueue 会将这些信息进行存储,在 ConsumeQueue 中还会记录当前 ConsumeQueue 的消费进度 consumeOffset 以及存储的最大位置 maxOffset,介于 consumeOffset 与 maxOffset 之间的即是还未消费的内容,记录这些信息是用来提高 Consumer 进行消费时的性能,让 Consumer 迅速定位到当前消费的进度位置,继续消费后续未消费的内容。


Broker 除了同步消息信息到 ConsumeQueue 之外,还会同步消息信息到 IndexFile 中,分发到 IndexFile 的信息包括:[keyHash、commitLogOffset、timeStamp、nextIndexOffset],keyHash:消息 key 的哈希值,commitLogOffset:消息在 CommitLog 文件中的存储位置,timeStamp:消息存储的时间差,nextIndexOffset:消息索引数据的下个节点的位置值,IndexFile 会将这些信息存储起来便于消息的快速查询,比如完成通过 key 查询消息内容等这些需求。

消息存储设计

通过上文中的描述,我们可以知道 RocketMQ 的存储设计主要有 CommitLog、ConsumeQueue、IndexFile 三种文件,CommitLog 主要是存储消息的完整内容信息,是消息真正存储的地方,ConsumeQueue 主要存储消息消费信息,为消费者提供待消费查询并以此来提高消费性能,IndexFile 则是消息的索引信息,方便对消息的检索,下面我们来分别对三种文件进行详细的分析:


  • CommitLog

  • CommitLog 是 RocketMQ 真正存放消息内容的文件,CommitLog 文件的大小被设计成 1G(至于为什么默认是 1G 我们后面再讲),当文件被写满了,则开始写入下一个文件,文件的命名规则是 20 位长度的数字,记录文件的开始字节数,比如第一个文件命名是 00000000000000000000,当第一个文件写满之后,创建第二个文件继续写,由于一个 CommitLog 文件默认是 1G,则第二个文件命名为 00000000001073741824,以此类推下去。

  • ConsumeQueue

  • ConsumeQueue 文件结构如下图所示:



    • ConsumeQueue 是 RocketMQ 用来提高消费性能的,消费者可以直接从 ConsumeQueue 中查询到待消费的消息信息,并通过 commitLogOffset 从 CommitLog 中获取消息内容,我们知道一个 CommitLog 中是有多个 topic 的消息存储的,如果消费者自己通过 topic 去 CommitLog 中检索,那么效率会非常低下,有了 ConsumeQueue,消费者非常方便高效地从 ConsumeQueue 中检索出待消费的消息在 CommitLog 的物理偏移量,从而拉取完整的消息内容。ConsumeQueue 中保存了消息的相关信息条目,每个条目信息包括消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值,每个条目占 20 字节,物理偏移量 commitLogOffset 占 8 字节,消息大小 size 占 4 字节,Tag 的 hashCode 值占 8 字节。ConsumeQueue 单个文件存放 30W 个条目信息,文件大小为 8*30W=5.7M。

    • IndexFile

    • IndexFile 文件结构如下图所示:



    • IndexFile 是一个用于检索消息的文件存储,文件中有三大块:Header、SlotTable、IndexLinkedList。

    • Header:存放一些统计信息,其中包括 beginTimeStamp:第一个索引数据存储的时间戳,endTimeStamp:最后一个索引数据存储的时间戳,beginPhyOffset:第一个索引数据在 commitLogOffset,endPhyOffset:最后一个索引数据的 commitLogOffset,hashSlotCount:哈希槽位数,indexCount:索引数。

    • SlotTable:一个哈希槽表,每个槽保存单向链表的头,指向单链表头节点,这里的数据结构跟 HashMap 有点类似,解决冲突的方案用的链表。

    • IndexLinkedList:单向链表,链表中每个节点才是具体的索引数据,索引数据中包含了 keyHash、commitLogOffset、timeStamp、nextIndexOffset 值,其中 keyHash 占 4 字节,commitLogOffset 占 8 字节,timeStamp 占 4 字节,nextIndexOffset 占 4 字节,所以每个索引数据占 20 字节。

    • 在 IndexFile 中,Header 存放数据占 20 字节,SlotTable 可以存放 500W 的槽,每个槽存放的数据占 4 个字节,而 IndexLinkedList 则可以存放 2000W 的索引数据,所以整个 IndexFile 文件的大小是固定的,为 20+4 * 500W+20 * 2000W=420000040 字节。

    消息消费

    消息消费过程

    消费者从 Broker 中获取消息内容的大概过程如下图:



    首先 Consumer 对自己订阅的 Topic 的 MessageQueue 所在的 Broker 发起消息拉取请求,Broker 会根据 MessageQueue 对应的 ConsumeQueue 找到当前 ConsumeQueue 的待消费的位置 consumeOffset,然后根据 consumeOffset 来获取当前待消费的进度条目数据信息,从进度条目数据信息中的 commitLogOffset 去 CommitLog 中找到对应物理位置的消息内容,将消息内容交给 Consumer,最后 Consumer 消费成功后再标记出新的 consumeOffset 值。

    消息消费设计

    那么消息消费的过程 RocketMQ 是怎么设计的呢?我们来看看具体的消费设计。消费者消费消息在 RocketMQ 中,分为以下几个步骤:


    • 启动阶段

    • 首先 Consumer 启动的时候,会向 Broker 发送心跳,心跳信息包括了 Consumer 所属的消费者组名称、消费者 id 以及订阅关系信息等,Broker 收到后会将这些信息存储下来,这样 Broker 本地就会拥有所有 Consumer 的相关信息等,方便后续为 Consumer 在拉取消息前做负载均衡的时候提供数据支持,比如让 Consumer 能够知道当前订阅某个 topic 的 Consumer 有多少个等信息。

    • 队列分配

    • Consumer 在消费某个 Topic 的时候,会向 Broker 获取这个 Topic 下面的 MessageQueue 信息、订阅了这个 Topic 的 Consumer 信息,然后选择一定的策略(这里 RocketMQ 提供了几种策略:a.平均分配策略、b.环形平均分配、c.用户自定义配置、d.机房负载策略、e.附近机房策略、f.一致性哈希策略)进行 MessageQueue 的分配,假如 TopicA 分配有 4 个 MessageQueue,当前订阅 TopicA 的 Consumer 有 4 个,这个时候分两种消费模式:1.广播模式、2.集群模式,当消费模式是广播模式的时候,每个 Consumer 会负责所有 MessageQueue 的消费;当消费模式是集群模式的时候,假如此时队列分配策略用的是平均分配策略,那么此时 TopicA 的 MessageQueue 会平均的分配到每个 Consumer 上,即 4 个 MessageQueue,4 个 Consumer,那么每个 Consumer 都分配了 1 个 MessageQueue,此时 Consumer 只会消费分配给它的 MessageQueue 的消息。

    • 广播模式:消费者组中的所有消费者都会消费每个队列的消息集群模式:消费者组中消费者只会消费分配给它的队列的消息,即一个消息只会被同一个消费者组中的一个消费者所消费

    • 消息拉取

    • 当 MessageQueue 分配完成后,首先 Consumer 会为分配给自己的每个 MessageQueue 创建对应的消息处理队列 ProcessQueue,然后由消息拉取服务 PullMessageService 内部的消息拉取请求队列 pullRequestQueue 对 MessageQueue 进行消息的拉取,将拉取到的消息加入到消息处理队列 ProcessQueue,并且提交一个消费请求到消息消费服务 ConsumeMessageService,最后重新提交下一个消息的拉取任务。

    • 消费处理

    • 当消息被提交到消息消费服务 ConsumeMessageService 的时候,有两种形式对消息进行消费处理,主要作用是触发消费逻辑,将消息消费的结果进行处理,第一种是并发消费,第二种是顺序消费。

    • 并发消费

    • 并发消费是业务消息没有限定会被投递到哪个 MessageQueue,消息可能会被投递到不同的 MessageQueue,并且 Consumer 也可能会开启多线程模式进行消费。当消息消费返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 时,Broker 会对该消息复制出一个相同属性的全新消息,进入重试队列,该新消息跟其他消息一样也会存储进 CommitLog 文件,该新消息按照"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"的延迟进行重试,直至超过最大次数失败后进入死信队列。

    • 顺序消费

    • 顺序消费是同一业务的消息被投递到同一个 MessageQueue 中,由队列的天然先进先出的顺序来进行消费,并且消费端必须用单线程的形式进行消费,才能达到顺序消费的效果。当消息消费失败返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 的时候,此时 MessageQueue 会被挂起,当前消息会重试消费,超过消费的最大次数时将会进入死信队列,当消息进入死信队列时,Broker 会默认消息消费成功,此时,后续的消息才得以继续进行消费。

    小结

    本章节我们主要学习了 RocketMQ 的消息存储与消费,下面我们来做一个小结:


    首先我们了解了消息在 RocketMQ 中是怎么从 Producer 发送到 Broker 的,我们还对 Broker 中的存储设计进行了详细的讲解,知道了消息在 Broker 中是如何存储的,其主要由三种文件完成存储:


    • CommitLog

    • ConsumeQueue

    • IndexFile


    然后我们又继续学习了消息的消费过程,明白了 Consumer 是如何对消息进行拉取的,之后我们学习了消息消费的细节设计,通过这些设计我们对 Consumer 消费的一个工作原理更加清楚了,主要有以下工作内容:


    • Consumer 会向 Broker 发送心跳信息;

    • Topic 的队列会被负载均衡策略分配在 Consumer 上;

    • Consumer 的消费模式分为广播模式和集群模式;

    • Consumer 对消息的拉取有专门的拉取服务,对拉取到的消息有专门的处理队列任务来进行处理;

    • Consumer 的对消息的消费分为并发消费和顺序消费;


    好了,今天我们就学习到这里,下一次我们将学习 RocketMQ 的高性能网络设计,一起看看 RocketMQ 的网络通信是如何设计的。

    发布于: 刚刚阅读数: 5
    用户头像

    白裤

    关注

    还未添加个人签名 2021-04-25 加入

    一个很懒的人

    评论

    发布
    暂无评论
    走进RocketMQ-消息存储与消费_RocketMQ_白裤_InfoQ写作社区