写点什么

RocketMQ 之 Store 模块

用户头像
邱学喆
关注
发布于: 1 小时前
RocketMQ之Store模块

一. 概述

需要存储消息到磁盘,避免消息被丢失。在 RocketMQ 框架中除了消息需要保存到磁盘,还有哪些数据需要存放到磁盘,其数据结构是什么样的?磁盘中的数据又是如何跟其他模块进行交互的,例如,Broker 模块等;在官方文档中提供了这么一张图,如下:

从上面的图来看,生产者生产消息会保存到磁盘文件 CommitLog 中;意味着所有消息都保存到 CommitLog 文件中;那么问题来了,CommitLog 的文件结构是什么样的

消费者是通过 ConsumerQueue 拉取消息进行消费,从而记录每个 Topic 对应的 queue 的消费情况;那么问题来了,为什么不直接通过 CommitLog 进行拉取消息进行消费;主要还是 CommitLog 文件的数据量错终复杂,如果从 CommitLog 去拉取消息,会花大量的时间去检索到对应的消息,所以才提出了这么 ConsumerQueue 这么一个概念,我们可以理解为 CommitLog 文件的索引信息,且会记录消费情况;那么 ConsumerQueue 的文件结构是怎么样的

其里面隐藏了一类文件,叫做索引文件,IndexFile。

RocketMQ 是如何管理这些文件的。这篇只是围绕 CommitLog 进行展开,并没有对 DLedgerCommitLog 进行解读,后续会对其进行分析;

二. CommitLog

消息主题以及元数据的存储主题,存储 Producer 端写入的消息主题内容,消息内容不是定长的。单个文件默认 1G,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

我们会首先看单个文件所存储的内容结构,接着会解读 RocketMQ 是如何对 CommitLog 进行管理的。

1. 文件结构

如下图:

我们重点查看消息记录格式:

  • TOTAL_SIZE 该消息条目的总长度

  • MAGIC_CODE 魔术,固定值:0x255CDF59

  • BODY_CRC 消息体 crc 校验码

  • QUEUE_ID 消息消费队列 ID

  • FLAG 消息 FLAG,供应用程序使用

  • QUEUE_OFFSET 消息在消费队列的偏移量

  • PHYSICAL_OFFSET 消息在 CommitLog 文件中的偏移量

  • SYS_FLAG 消息系统 FLAG,例如是否压缩,是否是事务消息等

  • BORN_TIMESTAMP 消息生产者调用消息发送 API 的时间戳

  • BORN_HOST 消息生产者 IP、端口信息

  • STORE_TIMESTAMP 消息存储时间戳

  • STORE_HOST broker 服务器 IP、端口信息

  • RECONSUME_TIMES 消息重试次数

  • PREPARED_TRANSACTION_OFFSET 事务消息物理偏移量

  • BODY_LENGTH 消息体长度

  • BODY 消息体内容

  • TOPIC_LENGTH 主题名称长度

  • TOPIC 主题名称

  • PROPERTIES_LENGTH 消息属性长度

  • PROPERTIES 消息属性

我们在生产端发送单个消息以及批量消息时,Broker 是如何对其处理,从而保存到 CommitLog 文件中的;

  • 单个消息,到 Broker 后,会由 DefaultAppendMessageCallback.doAppend 进行处理;

  • 批量消息,到 broker 后,先由 MessageExtBatchEncoder.encode 方法对批量消息 MessageExtBatch 处理后,再由 DefaultAppendMessageCallback.doAppend 进行处理;

当文件快满了,如何保存完整的消息时,会在末尾添加 8 个字节的信息,来确定文件结尾;

2. 文件管理

主要围绕对 CommitLog 文件管理进行解读,主要围绕几大关键动作进行了解:

  • 创建文件,基于什么样的机制会创建文件;

  • 删除文件,基于什么样的策略会将文件进行删除;

  • 加载文件,RocketMQ 是如何加载文件的,例如 Broker 重新启动时会去加载 CommitLog 文件;这里需要特别说一下,为什么会出现加载动作:主要有两种情况会出现:开发人员手动停止 Broker 服务,另外一种是 Broker 服务器异常;所以需要重新启动 Broker 服务器,那么就会触发加载文件动作。

  • 关闭文件,RocketMQ 是如何关闭

  • 添加消息,这里的添加主要是向 CommitLog 文件中插入消息

  • 查询消息,向 CommitLog 文件查询到对应的消息

有关 commit 基础的配置项如下:

  • storePathCommitLog CommitLog 存储目录,默认为 ${Broker 存储目录}/commitlog

  • mappedFileSizeCommitLog 单个 commitlog 文件的大小,默认为 1G

2.1 创建文件

当新的 Broker 启动时,其并没有创建任何 CommitLog 文件,然而并没有立马创建;而是在需要时才创建该文件;而创建文件,则交由 AllocateMappedFileService 去创建文件;AllocateMappedFileService 是线程对象,会在后台一直循环处理,除非被中断打断该执行。所以创建文件是属于异步创建;

通过调用 AllocateMappedFileService.putRequestAndReturnMappedFile 方法,将创建 MappedFile 文件请求保存到队列里面,然后交由线程去自行处理创建文件;RocketMQ 针对 AllocateMappedFileService 线程对象,提供了相关的配置项:

  • transientStorePoolEnable 是否开启 transientStorePool 机制,默认值为 false。

  • transientStorePoolSize transientStorePool 中缓存 byteBuffer 个数,默认 5 个

  • fastFailIfNoBufferInStorePool 从 transientStorePool 中获取 ByteBuffer 是否支持快速失败,默认为 false.

  • warmMapedFileEnable 是否温和地使用 MappedFile,默认为 false;我们可以理解为预热,是为了避免缺页中断导致用户态与内核态切换导致性能降低;

  • flushLeastPagesWhenWarmMapedFile 用字节 0 填充整个文件,每多少页刷盘一次。默认 4096 页,异步刷盘模式生效

  • flushDiskType 刷盘类型,有异步刷盘和同步刷盘两种类型;

具体的代码在 AllocateMappedFileService 类中的 putRequestAndReturnMappedFile 方法与 mmapOperation 方法;

2.2 删除文件

删除文件,主要是哪些过期文件;那么在 RocketMQ 是如何定义哪些文件是过期的。在 RocketMQ 是会定义一个阈值,来确定文件的存活时间;在 RocketMQ 是提供这样的配置项:

  • fileReservedTime 文件保留时间,默认 72 小时,表示非当前写文件最后一次更新时间加上 fileReservedTime 小于当前时间,该文件将被清理

删除文件动作并不是交由主线程来进行操作,而是异步定时去清理,频率是通过配置来决定;

  • cleanResourceInterval 清除过期文件线程调度频率,默认每隔 10s 检测一下是否需要清除过期文件


然而就算发现了过期文件存在,也不会立即去清除;而是满足下列条件之一,才能做清理文件动作

  • 是否到了指定时间 -> 配置项 deleteWhen: 磁盘文件充足的情况下,默认每天的什么时候执行删除过期文件。默认为 04,表示凌晨 4 点

  • 空间是否满了 -> 当 commitlog 目录所在分区的最大使用比例,如果 commitlog 目录所在的分区使用比例大于该值,则触发过期文件删除,配置由 diskMaxUsedSpaceRatio 进行配置,默认值为 75,一般我们都不用做特殊处理;

  • 手工删除 -> 目前并没有找到对应哪类接口去手工删除;可以先暂时忽略

值得注意的是清理文件动作,并不意味已经发现了过期文件,进行删除;而是执行 CommitLog 类中的 deleteExpiredFile 方法;接下来我们对其方法进行解读,该方法暴露的方法参数如下:

  • expiredTime 过期时间,也就是我们配置项中的 fileReservedTime

  • deletePhysicFilesInterval 删除文件频率,意味着不是一次性删除完,而是没删除一份文件,先休息一段在接着删除;我们通过配置 deleteCommitLogFilesInterval:删除 CommitLog 文件的间隔时间,删除一个文件后,等一下再删除下一个文件。默认为 100ms

  • destroyMapedFileIntervalForcibly 这是是配置项,销毁 MappedFile 被拒绝的最大从存活时间,默认为 120s。清除过期文件线程在初次销毁 MappedFile 时,如果该文件被其他线程(引用次数大于 0),则设置 MappedFile 的可用状态为 false,并设置第一次删除时间,下一次清理任务到达时,如果系统时间大于初次删除时间加上 destroyMappedFileIntervalForcibly,则将 ref 次数一次减 1000,直到引用次数小于 0,则释放物理资源。

  • cleanImmediately 立即清除,表示:无条件清理文件。当为 true 时,会忽略过期文件条件,直接删除文件,具体的代码 MappedFileQueue.deleteExpiredFileByTime。其通过两个值进行与换算得来;

  • 配置项 cleanFileForciblyEnable 是否支持强行删除过期文件,默认为 true

  • 是否立即清除标志,其表示是通过磁盘空间使用率来设置,方法在 CleanCommitLogService.isSpaceToDelete。磁盘空间使用率超过 85%,就会设置为 true。磁盘空间使用率有两个分区,一个是 CommitLog 所在目录所在的分区,另外一个是 RocketMQ 主目录所在的分区;


我们上面介绍了 destroyMapedFileIntervalForcibly 配置项,说明并不会成功删除文件;那么 RocketMQ 会定时的尝试删除该未删除成功的文件;相关的配置项如下:

  • redeleteHangedFileInterval 重试删除文件间隔,默认为 120s,配合 destroyMappedFileIntervalForcibly 使用

上面的删除文件的定时主要是 CleanCommitLogService 对象;详细细节可查阅其代码

2.3 加载文件

当 Broker 服务重新启动时,会触发加载 CommitLog 目录下的 commitlog 文件;

其会检验每份文件的大小,如果不等于 mappedFileSizeCommitLog,则认为加载失败,那么启动就会失败;具体的代码在 CommitLog.load;

同时会去做校验动作,来校验 commitlog 是否合法;

正常启动时,会去调用 CommitLog.recoverNormally 方法;异常宕机时,会去调用 CommitLog.recoverAbnormally 方法;详细的细节查阅相关的代码;

2.4 关闭文件

当关闭服务器,并没有执行文件关闭动作,只有在删除过期文件时会关闭文件,具体代码如下:

//MappedFilepublic boolean destroy(final long intervalForcibly) {  this.shutdown(intervalForcibly);    if (this.isCleanupOver()) {    try {      //关闭文件句柄      this.fileChannel.close();      log.info("close file channel " + this.fileName + " OK");            long beginTime = System.currentTimeMillis();      boolean result = this.file.delete();      //......    } catch (Exception e) {      log.warn("close file channel " + this.fileName + " Failed. ", e);    }        return true;  } else {    log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName             + " Failed. cleanupOver: " + this.cleanupOver);  }    return false;}
复制代码

2.5 添加消息

终于到了关键的操作,消息中间件中生产者生产消息发送给 broker 服务器时,会向 Commitlog 文件上追加消息,以确保消息真正的持久化到磁盘中,避免了消息丢失问题;

然而添加消息时,做了关键的几个动作,具体代码如下:

//CommitLogpublic PutMessageResult putMessage(final MessageExtBrokerInner msg) {  // .......  // Back to Results  AppendMessageResult result = null;    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();    String topic = msg.getTopic();  int queueId = msg.getQueueId();    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());  if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE      || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {        if (msg.getDelayTimeLevel() > 0) {      //处理定时消息      if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());      }            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;      queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());            // Backup real topic, queueId      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());      MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));      msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));            msg.setTopic(topic);      msg.setQueueId(queueId);    }  }    //...    long elapsedTimeInLock = 0;    MappedFile unlockMappedFile = null;  //获取最新的CommitLog文件对象  MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();  //先获取锁  putMessageLock.lock(); //spin or ReentrantLock ,depending on store config  try {    //.......            if (null == mappedFile) {      //如果没有文件,则直接报错.      log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());      beginTimeInLock = 0;      return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);    }    //往commitlog文件添加消息    result = mappedFile.appendMessage(msg, this.appendMessageCallback);    switch (result.getStatus()) {      case PUT_OK:        break;      case END_OF_FILE: //到文件尾部,则新建commitlog文件在进行添加消息        unlockMappedFile = mappedFile;        // Create a new file, re-write the message        mappedFile = this.mappedFileQueue.getLastMappedFile(0);        if (null == mappedFile) {          // XXX: warn and notify me          log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());          beginTimeInLock = 0;          return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);        }        result = mappedFile.appendMessage(msg, this.appendMessageCallback);        break;      case MESSAGE_SIZE_EXCEEDED:      case PROPERTIES_SIZE_EXCEEDED:        beginTimeInLock = 0;        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);      case UNKNOWN_ERROR:        beginTimeInLock = 0;        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);      default:        beginTimeInLock = 0;        return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);    }         } finally {    putMessageLock.unlock();  }    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {    //释放之前文件所占用的内存    this.defaultMessageStore.unlockMappedFile(unlockMappedFile);  }    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);    //统计数据  storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();  storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());  //磁盘刷盘-交由异步线程去处理-GroupCommitService或者FlushCommitLogService的子类去处理commit  handleDiskFlush(result, putMessageResult, msg);  //同步消息到其他从节点-交由异步线程去处理。- GroupTransferService去处理  handleHA(result, putMessageResult, msg);    return putMessageResult;}
复制代码

有关锁 putMessageLock 对象,是根据配置项来决定;

  • useReentrantLockWhenPutMessage 消息存储到 commitlog 文件时获取锁类型,如果为 true,使用 ReentrantLock;否则使用自旋锁,默认为 false


这里单独列了这么一个方法,其他的方法都大同小异,要么是异步,要么是同步的形式添加消息;

添加消息的主要过程如下:

  • 处理消息

  • 向文件追加消息

  • 统计数据

  • 磁盘刷盘

  • 同步消息到其他从节点


磁盘刷盘,会根据我们的配置项来决定,配置项:

  • flushDiskType 刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值:SYNC_FLUSH(同步刷盘)

如果是同步刷盘,其会根据消息中的属性是否有设置了

WAIT(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)为 true,如果没有设置,默认为 true;

  • syncFlushTimeout 同步刷盘超时时间

详细代码如下:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {    //同步刷盘    final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;    if (messageExt.isWaitStoreMsgOK()) {      //会把写入消息的对应的文件偏移量以及写入的字节数封装成一个request,接着交由GroupCommitService去处理      //其最终会调用MappedFile.flush      //接着刷新完,会更新StoreCheckpoint的physicMsgTimestamp,该值时commitlog更新时间      GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());      service.putRequest(request);      CompletableFuture<PutMessageStatus> flushOkFuture = request.future();      PutMessageStatus flushStatus = null;      try {        flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),                                        TimeUnit.MILLISECONDS);      } catch (InterruptedException | ExecutionException | TimeoutException e) {        //flushOK=false;      }      if (flushStatus != PutMessageStatus.PUT_OK) {        log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                  + " client address: " + messageExt.getBornHostString());        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);      }    } else {      service.wakeup();    }  } else {    //异步刷盘    if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {      //如果没有开启transientStorePool机制,则由FlushRealTimeService线程去处理刷盘操作      //其最终会调用MappedFile.flush      flushCommitLogService.wakeup();    } else {      //如果开启了,则由CommitRealTimeService线程去处理      //其会先写入文件系统的内存中,然后激活FlushRealTimeService线程去执行刷盘操作      commitLogService.wakeup();    }  }}
复制代码

有关刷盘,RocketMQ 提供了相关的配置项,如下:

  • flushIntervalCommitLog commitlog 刷盘频率,默认为 500ms

  • flushCommitLogLeastPages 一次刷盘至少需要脏页的数量,默认 4 页,针对 CommitLog 文件

  • flushCommitLogThoroughInterval commitlog 两次刷盘的最大间隔,如果超过该间隔,将忽略 flushCommitLogLeastPages 要求直接执行刷盘操作,默认为 10s.

    commitIntervalCommitLog commitlog 提交频率,默认为 200ms

  • commitCommitLogLeastPages 一次提交至少需要脏页的数量,默认 4 页,针对 CommitLog 文件

  • commitCommitLogThoroughInterval CommitLog 两次提交的最大间隔,如果超过该间隔,将忽略 commitCommitLogLeastPages 直接提交。默认 200ms.

  • flushCommitLogTimed 默认为 false,表示 await 方法等待 flushIntervalCommitLog 。如果为 true,表示使用 Thread.sleep 方法等待

这里稍微注意的是,在追加信息,commit,以及 flush 操作时,会记录文件的指针;

对象 MappedFileQueue 中属性

  • flushedWhere 已经执行 flush 操作的位置指针

  • committedWhere 已经执行 commit 操作的位置指针

  • storeTimestamp 记录 commitlog 更新时间

对象 MappedFile 中属性

  • wrotePosition 已经写入的位置指针

  • committedPosition 已经 commit 的位置指针

  • flushedPosition 已经 flush 的位置指针


同步消息,主节点的数据同步到从节点,会根据我们的配置项来决定是否同步;

  • brokerRole Broker 角色,分为 ASYNC_MASTER, SYNC_MASTER, SLAVE, 默认为异步 Master(ASYNC_MASTER)

  • ASYNC_MASTER 主从异步复制

  • SYNC_MASTER 主从同步复制

  • SLAVE 从节点角色

public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {  if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {    //主从同步复制    HAService service = this.defaultMessageStore.getHaService();    if (messageExt.isWaitStoreMsgOK()) {      // 等待数据同步完成后,回复客户端。      if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());        service.putRequest(request);        service.getWaitNotifyObject().wakeupAll();        PutMessageStatus replicaStatus = null;        try {          replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),                                               TimeUnit.MILLISECONDS);        } catch (InterruptedException | ExecutionException | TimeoutException e) {        }        if (replicaStatus != PutMessageStatus.PUT_OK) {          log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "                    + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());          putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);        }      }      else {        putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);      }    }  }  }
复制代码

从上面的代码来看,创建 GroupCommitRequest 请求添加到 HAService 中的 GroupTransferService 线程去执行;然而 GroupTransferService 并没有真正执行数据同步到从节点;真正执行同步操作的是交由 HAConnection.ReadSocketService 线程去操作;


在上面的主从同步复制模式下,会去校验从节点与主节点是否偏移量差,从而判断从节点是否可用,配置项如下:

  • haSlaveFallBehindMax 允许从服务器落户的最大偏移字节数,默认为 256M。超过该值则表示该 Slave 不可用


HAService 启动时,如果是从节点,会启动时 HAClient 会向主节点发起连接;如果是主节点,AcceptSocketService 线程接收到请求后,会创建 HAConnection 对象;

相关的配置项如下:

  • haListenPort Master 监听端口,从服务器连接该端口,默认为 10912;

  • haMasterAddress Master 服务器 IP 地址与端口号,如果从节点没有配置该主节点 IP,则会从集群中获取;

  • haHousekeepingInterval Master 与 Slave 长链接空闲时间,超过该时间将关闭连接;这里关闭的连接主要是 HAConnection 以及 HAClient 对象的中的连接;

其中有一个关键的属性值,会与 GroupTransferService 线程相关;

  • push2SlaveMaxOffset 已经推送给从节点的最大偏移量;


HAConnection 有两个内部线程;

  • ReadSocketService 读取从节点返回的"已接收"的偏移量,会记录该偏移量到 HAConnection 的 slaveAckOffset 值,然后更新 HAService 的 push2SlaveMaxOffset 属性值;

  • WriteSocketService 向从节点同步数据;


WriteSocketService 同步数据有限制,提供了相关的配置项,如下:

  • haTransferBatchSize 一次 HA 主从同步传输的最大字节长度,默认为 32K

同步数据的协议如下:


HAClient 对象,会定时发送心跳给主节点,告知目前从节点"已接收"的偏移量;同时接收主节点发送过来的同步数据;相关配置项如下:

  • haSendHeartbeatInterval Master 与 Slave 心跳包发送间隔,默认为 5s

相关的细节,可具体查阅相关代码;目前我也只是对其大体粗略看了一下;并没有过多详细梳理;

2.6 查询消息

查询消息,CommitLog 提供了相关的方法进行查询;在这里我们主要讲解的是 getData 方法的入参 offset 偏移量。意味着从 commitlog 文件查询对应的消息,是根据偏移量来查找;具体的细节可查阅其代码;

那么问题来了,offset 偏移量是如何计算出来的?那么就衍生了另外的两份文件,ConsumerQueue 以及 IndexFile;

三. ConsumerQueue

消息消费队列,引入的目的主要是提高消息消费的性能,由于 RocketMQ 是基于主题 topic 的订阅模式,消息消费是针对主题进行的,如果要遍历 commitlog 文件中根据 topic 检索消息是非常低效的。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定 Topic 下的队列消息在 CommitLog 中的起始物理偏移量 offset,消息大小 size 和消息 Tag 的 HashCode 值。consumequeue 文件可以看成是基于 topic 的 commitlog 索引文件,故 consumequeue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样 consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M;

上面的文件描述来自官方文档对其 ConsumerQueue 介绍;

我们按照 CommitLog 文件的思路去解读 ConsumerQueue 文件;

有关 ConsumeQueue 拓展属性就不再进行解读;相关的可以查阅代码;

1. 文件结构

  • offset 消息在 commitlog 物理偏移量

  • size 消息的长度

  • tagsCode tag 的 hashcode

2. 文件管理

RocketMQ 提供了相关的配置项:

  • mappedFileSizeConsumeQueue 单个 consumequeue 文件的大小,默认为 30w*20(字节)。表示单个 ConsumeQueue 文件中存储 30W 个 ConsumeQueue。

    enableConsumeQueueExt 是否启用 ConsumeQueue 拓展属性,默认为 false

    mappedFileSizeConsumeQueueExt ConsumeQueue 拓展文件大小,默认位 48M

    bitMapLengthConsumeQueueExt ConsumeQueue 拓展过滤 bitmap 大小,默认为 64M

    flushIntervalConsumeQueue consumeQueue 文件刷盘频率,默认为 1s.

    flushConsumeQueueLeastPages 一次刷盘至少需要脏页的数量,默认 2 页,针对 Consume 文件

    flushConsumeQueueThoroughInterval Consume 两次刷盘的最大间隔,如果超过该间隔,将忽略 flushConsumeQueueLeastPages 直接刷盘,默认 60s.

    deleteConsumeQueueFilesInterval 删除 ConsumeQueue 文件的时间间隔,默认为 100ms.

2.1 创建文件

当添加 consumequeue 时,发现没有会创建对应的文件,相关的代码:DefaultMessageStore.findConsumeQueue

2.2 删除文件

有关删除文件的,跟 CommitLog 类似,具体的线程类 CleanConsumeQueueService;

只是定位过期的规则不一样,规则是根据现有的 Commitlog 最小偏移量 minOffset,跟 Consumequeue 文件中最后一个 item 中的 offset 进行比较;如果 minOffset>offset,则删除该 consumequeue 文件;

2.3 加载文件

Broker 启动时会去加载配置项指定的目录下 ${storePathRootDir}/consumequeue 的文件,配置项如下:

该目录有两级目录,第一级目录是 topic 名称目录,第二级目录是队列名称目录;

相关的代码如下:

加载: DefaultMessageStore.load -> DefaultMessageStore.loadConsumeQueue -> ConsumeQueue.load

恢复: DefaultMessageStore.load -> DefaultMessageStore.recover ->DefaultMessageStore.recoverConsumeQueue ->ConsumeQueue.recoverConsumeQueue

2.4 关闭文件

同样,这里没有关闭文件操作,具体可以参考 commitlog 中的关闭文件的介绍;

2.5 添加信息

文件结构中介绍了添加信息的结构,具体的代码查阅 ConsumeQueue.putMessagePositionInfoWrapper;

这里没有 commit 机制,只有刷盘动作,具体交由 FlushConsumeQueueService 线程去操作;


这里值得注意的是,什么时候会触发添加消息;

当消息保存到 commitlog 时,是没有直接调用 consumequeue 去将相关的信息保存到 consumequeue 里面的;而是通过线程 ReputMessageService 定时的抓取的;相关的属性为 reputFromOffset;每个消息封装成 DispatchRequest,进行分发给具体的处理器 CommitLogDispatcher,在 consumequeue 中就有对应的 CommitLogDispatcherBuildConsumeQueue 对象;具体可查阅其代码;


另外,需要注意的是 CommitLogDispatcherBuildConsumeQueue 中的事务类型;

2.6 查询信息

查询主要是围绕 topic 和 queueId,从而定位到具体某个 Consumequeue 对象,接着在根据偏移量去定位到具体哪个 item;这里的偏移量是个数,即 item 的数量;


四. IndexFile

IndexFile(索引文件)提供了一种可以通过 key 或时间区间来查询消息的方法。Index 文件的存储位置是:$HOME \store\index${fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引,IndexFile 的底层存储设计为在文件系统中实现 HashMap 结构,故 rocketmq 的索引文件其底层实现为 hash 索引。

上面的文件描述来自官方文档对其 ConsumerQueue 介绍;

相关的配置项如下:

  • messageIndexEnable 是否支持消息索引文件,默认为 true。

    maxHashSlotNum 单个索引文件 Hash 槽的个数,默认为 5 百万

    maxIndexNum 单个索引文件索引条目的个数,默认为 2 千万

1. 文件结构

按照同样的思路去解读 IndexFile 文件;这里就不在过多解读,相关的逻辑细节可查阅代码;

稍微值得一提的是,IndexFile 没有定时刷盘动作,而是等到 IndexFile 写满后,才触发刷盘动作;

五. 总结

通过对 CommitLog 文件、consumequeue 文件、IndexFile 文件的粗糙的了解,大体知道 RocketMQ 是如何持久化消息,以及对文件的管理有一定的了解,这三个文件的关系;想要过多的了解细节,可查阅其代码;

当然我这里还有其他特性没有解读,如监听器 MessageArrivingListener,StoreCheckpoint 等;


发布于: 1 小时前阅读数: 3
用户头像

邱学喆

关注

计算机原理的深度解读,源码分析。 2018.08.26 加入

在IT领域keep Learning。要知其然,也要知其所以然。原理的爱好,源码的阅读。输出我对原理以及源码解读的理解。个人的仓库:https://gitee.com/Michael_Chan

评论

发布
暂无评论
RocketMQ之Store模块