写点什么

揭秘 | RocketMQ 文件清理机制~

作者:java易二三
  • 2023-08-25
    湖南
  • 本文字数:8607 字

    阅读完需:约 28 分钟

前言最近在公司的 MQ 论坛里,看到好几次 Broker 扩容消息,我心里就在想 RocketMq 是有文件清理机制的,咱们的 MQ 消息就那么多吗,hh~虽然我知道 RocketMQ 存在文件清理机制,但是具体的清理策略却不是很清楚,本文就来整理一下 RocketMQ 的清理机制及源码


RocketMQ 文件清理机制


每天凌晨 4 点会清理过期的文件当磁盘使用率达到 70%时,会立刻清理过期的文件。当磁盘使用率达到 85%时,会从最早创建的文件开始清理,不管文件是否已过期,直到磁盘空间充足。


源码 RocketMQ 的消息是存储在 Broker 上的,而 Broker 的消息存储是依赖于 DefaultMessageStoreBroker 启动时,也会 start DefaultMessageStorejava 复制代码// org.apache.rocketmq.store.DefaultMessageStore#startpublic void start() throws Exception {


// ......


// 添加一些定时任务 this.addScheduleTask();


}


private void addScheduleTask() {


// todo 清理过期文件 每隔 10s// this.messageStoreConfig.getCleanResourceInterval() == 10000this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 文件清理定时任务 DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);}


DefaultMessageStore start 时,会开启一些定时任务,其中就包含文件清理的定时任务,每隔 10s 执行一次 java 复制代码 private void cleanFilesPeriodically() {// 清理 commitLogthis.cleanCommitLogService.run();// 清理 ConsumerQueuethis.cleanConsumeQueueService.run();}


清理 CommitLogjava 复制代码 class CleanCommitLogService {public void run() {try {// 删除过期文件 this.deleteExpiredFiles();


  // 兜底删除  this.redeleteHangedFile();} catch (Throwable e) {  DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}
复制代码


}}


java 复制代码 private void deleteExpiredFiles() {int deleteCount = 0;


// 文件保留时间(默认 72 个小时)long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();


// 两次删除文件的间隔时间(默认 100ms)int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();// 强制删除间隔时间 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();


// 是否到达删除时间点,例如: 默认每天凌晨 4 点删除一次 boolean timeup = this.isTimeToDelete();// 磁盘空间是否快满了 boolean spacefull = this.isSpaceToDelete();// 手动删除(删除命令),可调用 executeDeleteFilesManually 进行删除 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;


if (timeup || spacefull || manualDelete) {


if (manualDelete)  this.manualDeleteFileSeveralTimes--;
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
// todo 清理过期文件deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce);if (deleteCount > 0) {} else if (spacefull) { log.warn("disk space will be full soon, but delete file failed.");}
复制代码


}}


文件清理存在三种情况


每天的定时删除磁盘容量达到阈值手动执行的删除命令


只要有一种满足,就需要去执行清理逻辑~


定时删除 isTimeToDelete 用于判断是否到达删除的时间点,例如: 默认每天凌晨 4 点进行删除


支持多个删除时间点,用;分隔即可


**在文章开头也提到了,会开启每隔 10s 的定时任务,所以如果当前时间符合定义的删除时间点,及满足清理条件~ **java 复制代码 private boolean isTimeToDelete() {// 默认是 "04"String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();// 是否到达可删除的事件点 if (UtilAll.isItTimeToDo(when)) {DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);return true;}


return false;}


public static boolean isItTimeToDo(final String when) {// 支持多个删除时间点,用;分隔 String[] whiles = when.split(";");if (whiles.length > 0) {Calendar now = Calendar.getInstance();for (String w : whiles) {int nowHour = Integer.parseInt(w);// 是否到达删除时间点 if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {return true;}}}


return false;}


磁盘使用量达到阈值删除有关阈值的定义,一共有三个


0.75: 默认的清理阈值,大于该阈值才会清理,但需要等到文件达到过期时间 0.85: 强制清理阈值,达到该阈值,不需要等到文件是否达到过期时间(72 小时)0.95: 告警阈值,逻辑与 0.85 阈值一样,只不过会多条 error`日志


java 复制代码 private boolean isSpaceToDelete() {// 清理阈值比例,默认 0.75,文件磁盘使用比例超过 75%,则需要进行清理 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;


// 是否立即清除 cleanImmediately = false;


{// 多存储路径 String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);Set<String> fullStorePath = new HashSet<>();


// 最小的磁盘使用比例及对应的文件pathdouble minPhysicRatio = 100;String minStorePath = null;
// 遍历所有文件,拿到最小的磁盘使用比例及对应的文件pathfor (String storePathPhysic : storePaths) { // 计算磁盘的使用率 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); if (minPhysicRatio > physicRatio) { minPhysicRatio = physicRatio; minStorePath = storePathPhysic; } // 是否超过磁盘空间清理强制比例 if (physicRatio > diskSpaceCleanForciblyRatio) { fullStorePath.add(storePathPhysic); }}
// 如果最小的磁盘使用比例超过了磁盘告警比例(0.95) or 磁盘强制清除比例(0.85)// 那么cleanImmediately = true; 需立即清除DefaultMessageStore.this.commitLog.setFullStorePaths(fullStorePath);if (minPhysicRatio > diskSpaceWarningLevelRatio) { // 超过了磁盘告警比例(0.95) boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + minPhysicRatio + ", so mark disk full, storePathPhysic=" + minStorePath); }
cleanImmediately = true;} else if (minPhysicRatio > diskSpaceCleanForciblyRatio) { // 超过了磁盘强制清除比例(0.85) cleanImmediately = true;} else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + minPhysicRatio + ", so mark disk ok, storePathPhysic=" + minStorePath); }}
// 最小的磁盘使用比例 < 0 或者 > 默认清除阈值(0.75),则需要清理文件if (minPhysicRatio < 0 || minPhysicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + minPhysicRatio + ", storePathPhysic=" + minStorePath); return true;}
复制代码


}


// ......同上


return false;}


清理文件当满足定时删除、达到阈值删除、手动删除其中任何一种时,即可触发文件的清理逻辑 java 复制代码 public int deleteExpiredFile(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {// 清理过期文件 return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);}


java 复制代码// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTimepublic int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {// 拿到 mappedFile 引用 Object[] mfs = this.copyMappedFiles(0);


if (null == mfs)return 0;


// -1, 因为最后一个文件肯定处于使用中,不需要清理 int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];


  // 计算文件的最大存活时间,文件最后一次修改时间 + 过期时间(默认72小时)  long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;    // 如果当前时间 > 最大存活时间,或者需要立即删除  if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {        // 销毁mappedFile    if (mappedFile.destroy(intervalForcibly)) {      // 添加到待删除文件集合中      files.add(mappedFile);      deleteCount++;
// 每一批次,最多删除十个文件 if (files.size() >= DELETE_FILES_BATCH_MAX) { break; }
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { // 多次删除文件之间停顿一下(默认100ms) Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; }}
复制代码


}


// 清除 mappedFiledeleteExpiredFile(files);


return deleteCount;}


流程如下


遍历 MappedFile,除了最后一个(最后一个处于使用中,不用清理)计算文件最大存活时间,即文件的最后一次修改时间 + 过期时间(72 小时)如果当前时间 > 最大存活时间,或者需要强制删除,才进行文件清理操作销毁 MappedFile,关闭文件 channel,删除 File,并且每次删除之间存在一定的时间间隔(默认 100ms)从 mappedFiles 中,删除当前 mappedFile


下面再看下 destroy 逻辑 java 复制代码 public boolean destroy(final long intervalForcibly) {// 扣减引用计数,超过 intervalForcibly(120s)this.shutdown(intervalForcibly);


// 是否清楚结束 if (this.isCleanupOver()) {try {// 关闭文件通道 this.fileChannel.close();log.info("close file channel " + this.fileName + " OK");


  long beginTime = System.currentTimeMillis();    // 删除文件  boolean result = this.file.delete();  log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName           + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"           + this.getFlushedPosition() + ", "           + UtilAll.computeElapsedTimeMilliseconds(beginTime));} catch (Exception e) {  log.warn("close file channel " + this.fileName + " Failed. ", e);}
return true;
复制代码


} else {log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName+ " Failed. cleanupOver: " + this.cleanupOver);}


return false;}


因为文件是会被使用的,所以当我们要删除文件的时候,文件可能正在被使用,即文件被引用次数 > 0,此时是不能删除的所以当引用次数 > 0 时删除,此时的第一次删除并不会真正删除,而是会记录第一次准备删除的时间只有当引用计数 <= 0 或者引用计数 > 0 但是超过了强制删除时间,才会去删除,即关闭文件 channel 和删除 file 主要删除流程 deleteExpiredFiles 看完了,直接看看 redeleteHangedFile 上面刚说,因为引用计数的原因,导致某个文件可能还无法被删除,当能删除的文件都删除后,那么未能删除的文件就成为了第一个 mappedFile 所以此时需要 redeleteHangedFile 再来兜个底,看看第一个 mappedFile 能否删除,如果可以就删除 java 复制代码 private void redeleteHangedFile() {int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();long currentTimestamp = System.currentTimeMillis();if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {this.lastRedeleteTimestamp = currentTimestamp;


// 上面看到了的,强制删除间隔时间int destroyMapedFileIntervalForcibly =  DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();// 重试删除第一个mappedFileif (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {}
复制代码


}}


java 复制代码 public boolean retryDeleteFirstFile(final long intervalForcibly) {// 拿到第一个 mappedFileMappedFile mappedFile = this.getFirstMappedFile();if (mappedFile != null) {// 如果不可用 if (!mappedFile.isAvailable()) {log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());// 则执行 destroyboolean result = mappedFile.destroy(intervalForcibly);if (result) {log.info("the mappedFile re delete OK, " + mappedFile.getFileName());List<MappedFile> tmpFiles = new ArrayList<MappedFile>();tmpFiles.add(mappedFile);// destroy 成功,从 mappedFils 中删除该 mappedFilethis.deleteExpiredFile(tmpFiles);} else {log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());}


  return result;}
复制代码


}


return false;}


清理 ConsumerQueuejava 复制代码 private void cleanFilesPeriodically() {// todo 清除 CommitLog 文件 this.cleanCommitLogService.run();// todo 清除 ConsumeQueue 文件 this.cleanConsumeQueueService.run();}


java 复制代码 class CleanConsumeQueueService {private long lastPhysicalMinOffset = 0;


public void run() {try {// 删除过期文件 this.deleteExpiredFiles();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}}


java 复制代码 private void deleteExpiredFiles() {// 每次清理的间隔,默认 100msint deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();


// 拿到 commitLog 的最小偏移量 long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();


// 是否大于最后一次清理的最小偏移量,默认为 0if (minOffset > this.lastPhysicalMinOffset) {// 更新一下 this.lastPhysicalMinOffset = minOffset;


ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍历删除for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) {
// 删除 int deleteCount = logic.deleteExpiredFile(minOffset);
// 如果删除过文件,且需要进行间隔 if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { // 休眠一下,间隔 Thread.sleep(deleteLogicsFilesInterval); } catch (InterruptedException ignored) { } } }}
// todo 清理IndexFileDefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
复制代码


}}


当 CommitLog 清除完毕后,那么在 ConsumerQueue、IndexFile 中小于第一个 MappedFile 偏移量的记录都要被删除掉所以上述代码在拿到 minOffset 后,遍历 consumeQueueTable,挨个判断并删除 java 复制代码 public int deleteExpiredFileByOffset(long offset, int unitSize) {// 拿到引用 Object[] mfs = this.copyMappedFiles(0);


List<MappedFile> files = new ArrayList<MappedFile>();int deleteCount = 0;if (null != mfs) {


// 最后一个不清楚int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) { boolean destroy; MappedFile mappedFile = (MappedFile) mfs[i]; // 其实就是拿到最后一个消息的buffer SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize); if (result != null) { // 然后取这个消息的偏移量,即就是这个文件的最大偏移量了 long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); result.release(); // 是否小于需要删除的偏移量,小于就要删除 destroy = maxOffsetInLogicQueue < offset; if (destroy) { log.info("physic min offset " + offset + ", logics in current mappedFile max offset " + maxOffsetInLogicQueue + ", delete it"); } } else if (!mappedFile.isAvailable()) { // Handle hanged file. log.warn("Found a hanged consume queue file, attempting to delete it."); destroy = true; } else { log.warn("this being not executed forever."); break; }
// 进行销毁,间隔强制删除时间间隔为60s if (destroy && mappedFile.destroy(1000 * 60)) { files.add(mappedFile); deleteCount++; } else { break; }}
复制代码


}


// 从 mappedFiles 中清除该 mappedFiledeleteExpiredFile(files);


return deleteCount;}


删除逻辑大体与 CommitLog 删除逻辑一致,只不过 CommitLog 是根据文件过期时间来删除,而 ConsumerQueue 是根据 offset 来删除


清理 IndexFile 依旧是 deleteExpiredFiles 方法,在清理完 consumerQueue 后,会同步清理 IndexFilejava 复制代码 private void deleteExpiredFiles() {int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();


long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();if (minOffset > this.lastPhysicalMinOffset) {


// ...... 清理consumerQueue
// todo 清理IndexFileDefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
复制代码


}}


java 复制代码 public void deleteExpiredFile(long offset) {Object[] files = null;try {this.readWriteLock.readLock().lock();if (this.indexFileList.isEmpty()) {return;}


// 拿到第一个IndexFile,进行判断最后的偏移量是否小于offsetlong endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();if (endPhyOffset < offset) {  files = this.indexFileList.toArray();}
复制代码


} catch (Exception e) {log.error("destroy exception", e);} finally {this.readWriteLock.readLock().unlock();}


if (files != null) {// 说明第一个 IndexFile 最后的偏移量小于 offsetList<IndexFile> fileList = new ArrayList<IndexFile>();


// 遍历,将小于offset的file加入到待删除集合中for (int i = 0; i < (files.length - 1); i++) {  IndexFile f = (IndexFile) files[i];  if (f.getEndPhyOffset() < offset) {    fileList.add(f);  } else {    break;  }}
// 删除过期文件,逻辑与之前基本一致,即destroy,然后从IndexFileList中移除即可this.deleteExpiredFile(fileList);
复制代码


}}


逻辑还是与 commitLog、consumerQueue 基本一致,无非就是遍历文件比较,如果文件需要删除就加入到待删除集中中然后统一对 indexFile destroy,再移除掉该 indexFile 引用即可


总结源码学完总结一下首先 Broker 启动的时候会开启定时任务,每隔 10s 去清理过期文件,其中包含 commitLog、consumerQueue、indexFile 三种文件先清除 commitLog,当满足以下条件时,可执行 commitLog 删除逻辑


定时删除,达到符合删除的时间节点,即可触发达到阈值,阈值分为三种 0.75、0.85、0.95


当达到 0.75 时就可以触发清理逻辑了,但此时受限于文件是否过期当阈值超过 0.85 时,此时需强制删除,不受限于文件是否过期,0.95 与 0.85 的区别就是多了一条 error 日志


手动删除,执行删除命令


当 commitLog 清理完毕后,那么可以获取 minOffset 在 consumerQueue、indexFile 中,遍历判断小于该 minOffset 的文件都需要删除

用户头像

java易二三

关注

还未添加个人签名 2021-11-23 加入

还未添加个人简介

评论

发布
暂无评论
揭秘 | RocketMQ文件清理机制~_Java_java易二三_InfoQ写作社区