揭秘 | RocketMQ 文件清理机制~
前言最近在公司的 MQ 论坛里,看到好几次 Broker 扩容消息,我心里就在想 RocketMq 是有文件清理机制的,咱们的 MQ 消息就那么多吗,hh~虽然我知道 RocketMQ 存在文件清理机制,但是具体的清理策略却不是很清楚,本文就来整理一下 RocketMQ 的清理机制及源码
RocketMQ 文件清理机制
每天凌晨 4 点会清理过期的文件当磁盘使用率达到 70%时,会立刻清理过期的文件。当磁盘使用率达到 85%时,会从最早创建的文件开始清理,不管文件是否已过期,直到磁盘空间充足。
源码 RocketMQ 的消息是存储在 Broker 上的,而 Broker 的消息存储是依赖于 DefaultMessageStoreBroker 启动时,也会 start DefaultMessageStorejava 复制代码// org.apache.rocketmq.store.DefaultMessageStore#startpublic void start() throws Exception {
// ......
// 添加一些定时任务 this.addScheduleTask();
}
private void addScheduleTask() {
// todo 清理过期文件 每隔 10s// this.messageStoreConfig.getCleanResourceInterval() == 10000this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {// 文件清理定时任务 DefaultMessageStore.this.cleanFilesPeriodically();}}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);}
DefaultMessageStore start 时,会开启一些定时任务,其中就包含文件清理的定时任务,每隔 10s 执行一次 java 复制代码 private void cleanFilesPeriodically() {// 清理 commitLogthis.cleanCommitLogService.run();// 清理 ConsumerQueuethis.cleanConsumeQueueService.run();}
清理 CommitLogjava 复制代码 class CleanCommitLogService {public void run() {try {// 删除过期文件 this.deleteExpiredFiles();
}}
java 复制代码 private void deleteExpiredFiles() {int deleteCount = 0;
// 文件保留时间(默认 72 个小时)long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// 两次删除文件的间隔时间(默认 100ms)int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();// 强制删除间隔时间 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 是否到达删除时间点,例如: 默认每天凌晨 4 点删除一次 boolean timeup = this.isTimeToDelete();// 磁盘空间是否快满了 boolean spacefull = this.isSpaceToDelete();// 手动删除(删除命令),可调用 executeDeleteFilesManually 进行删除 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
}}
文件清理存在三种情况
每天的定时删除磁盘容量达到阈值手动执行的删除命令
只要有一种满足,就需要去执行清理逻辑~
定时删除 isTimeToDelete 用于判断是否到达删除的时间点,例如: 默认每天凌晨 4 点进行删除
支持多个删除时间点,用;分隔即可
**在文章开头也提到了,会开启每隔 10s 的定时任务,所以如果当前时间符合定义的删除时间点,及满足清理条件~ **java 复制代码 private boolean isTimeToDelete() {// 默认是 "04"String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();// 是否到达可删除的事件点 if (UtilAll.isItTimeToDo(when)) {DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);return true;}
return false;}
public static boolean isItTimeToDo(final String when) {// 支持多个删除时间点,用;分隔 String[] whiles = when.split(";");if (whiles.length > 0) {Calendar now = Calendar.getInstance();for (String w : whiles) {int nowHour = Integer.parseInt(w);// 是否到达删除时间点 if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {return true;}}}
return false;}
磁盘使用量达到阈值删除有关阈值的定义,一共有三个
0.75: 默认的清理阈值,大于该阈值才会清理,但需要等到文件达到过期时间 0.85: 强制清理阈值,达到该阈值,不需要等到文件是否达到过期时间(72 小时)0.95: 告警阈值,逻辑与 0.85 阈值一样,只不过会多条 error`日志
java 复制代码 private boolean isSpaceToDelete() {// 清理阈值比例,默认 0.75,文件磁盘使用比例超过 75%,则需要进行清理 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
// 是否立即清除 cleanImmediately = false;
{// 多存储路径 String commitLogStorePath = DefaultMessageStore.this.getStorePathPhysic();String[] storePaths = commitLogStorePath.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);Set<String> fullStorePath = new HashSet<>();
}
// ......同上
return false;}
清理文件当满足定时删除、达到阈值删除、手动删除其中任何一种时,即可触发文件的清理逻辑 java 复制代码 public int deleteExpiredFile(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {// 清理过期文件 return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);}
java 复制代码// org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFileByTimepublic int deleteExpiredFileByTime(final long expiredTime,final int deleteFilesInterval,final long intervalForcibly,final boolean cleanImmediately) {// 拿到 mappedFile 引用 Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)return 0;
// -1, 因为最后一个文件肯定处于使用中,不需要清理 int mfsLength = mfs.length - 1;int deleteCount = 0;List<MappedFile> files = new ArrayList<MappedFile>();if (null != mfs) {for (int i = 0; i < mfsLength; i++) {MappedFile mappedFile = (MappedFile) mfs[i];
}
// 清除 mappedFiledeleteExpiredFile(files);
return deleteCount;}
流程如下
遍历 MappedFile,除了最后一个(最后一个处于使用中,不用清理)计算文件最大存活时间,即文件的最后一次修改时间 + 过期时间(72 小时)如果当前时间 > 最大存活时间,或者需要强制删除,才进行文件清理操作销毁 MappedFile,关闭文件 channel,删除 File,并且每次删除之间存在一定的时间间隔(默认 100ms)从 mappedFiles 中,删除当前 mappedFile
下面再看下 destroy 逻辑 java 复制代码 public boolean destroy(final long intervalForcibly) {// 扣减引用计数,超过 intervalForcibly(120s)this.shutdown(intervalForcibly);
// 是否清楚结束 if (this.isCleanupOver()) {try {// 关闭文件通道 this.fileChannel.close();log.info("close file channel " + this.fileName + " OK");
} else {log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName+ " Failed. cleanupOver: " + this.cleanupOver);}
return false;}
因为文件是会被使用的,所以当我们要删除文件的时候,文件可能正在被使用,即文件被引用次数 > 0,此时是不能删除的所以当引用次数 > 0 时删除,此时的第一次删除并不会真正删除,而是会记录第一次准备删除的时间只有当引用计数 <= 0 或者引用计数 > 0 但是超过了强制删除时间,才会去删除,即关闭文件 channel 和删除 file 主要删除流程 deleteExpiredFiles 看完了,直接看看 redeleteHangedFile 上面刚说,因为引用计数的原因,导致某个文件可能还无法被删除,当能删除的文件都删除后,那么未能删除的文件就成为了第一个 mappedFile 所以此时需要 redeleteHangedFile 再来兜个底,看看第一个 mappedFile 能否删除,如果可以就删除 java 复制代码 private void redeleteHangedFile() {int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();long currentTimestamp = System.currentTimeMillis();if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {this.lastRedeleteTimestamp = currentTimestamp;
}}
java 复制代码 public boolean retryDeleteFirstFile(final long intervalForcibly) {// 拿到第一个 mappedFileMappedFile mappedFile = this.getFirstMappedFile();if (mappedFile != null) {// 如果不可用 if (!mappedFile.isAvailable()) {log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());// 则执行 destroyboolean result = mappedFile.destroy(intervalForcibly);if (result) {log.info("the mappedFile re delete OK, " + mappedFile.getFileName());List<MappedFile> tmpFiles = new ArrayList<MappedFile>();tmpFiles.add(mappedFile);// destroy 成功,从 mappedFils 中删除该 mappedFilethis.deleteExpiredFile(tmpFiles);} else {log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());}
}
return false;}
清理 ConsumerQueuejava 复制代码 private void cleanFilesPeriodically() {// todo 清除 CommitLog 文件 this.cleanCommitLogService.run();// todo 清除 ConsumeQueue 文件 this.cleanConsumeQueueService.run();}
java 复制代码 class CleanConsumeQueueService {private long lastPhysicalMinOffset = 0;
public void run() {try {// 删除过期文件 this.deleteExpiredFiles();} catch (Throwable e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}}
java 复制代码 private void deleteExpiredFiles() {// 每次清理的间隔,默认 100msint deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 拿到 commitLog 的最小偏移量 long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
// 是否大于最后一次清理的最小偏移量,默认为 0if (minOffset > this.lastPhysicalMinOffset) {// 更新一下 this.lastPhysicalMinOffset = minOffset;
}}
当 CommitLog 清除完毕后,那么在 ConsumerQueue、IndexFile 中小于第一个 MappedFile 偏移量的记录都要被删除掉所以上述代码在拿到 minOffset 后,遍历 consumeQueueTable,挨个判断并删除 java 复制代码 public int deleteExpiredFileByOffset(long offset, int unitSize) {// 拿到引用 Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();int deleteCount = 0;if (null != mfs) {
}
// 从 mappedFiles 中清除该 mappedFiledeleteExpiredFile(files);
return deleteCount;}
删除逻辑大体与 CommitLog 删除逻辑一致,只不过 CommitLog 是根据文件过期时间来删除,而 ConsumerQueue 是根据 offset 来删除
清理 IndexFile 依旧是 deleteExpiredFiles 方法,在清理完 consumerQueue 后,会同步清理 IndexFilejava 复制代码 private void deleteExpiredFiles() {int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();if (minOffset > this.lastPhysicalMinOffset) {
}}
java 复制代码 public void deleteExpiredFile(long offset) {Object[] files = null;try {this.readWriteLock.readLock().lock();if (this.indexFileList.isEmpty()) {return;}
} catch (Exception e) {log.error("destroy exception", e);} finally {this.readWriteLock.readLock().unlock();}
if (files != null) {// 说明第一个 IndexFile 最后的偏移量小于 offsetList<IndexFile> fileList = new ArrayList<IndexFile>();
}}
逻辑还是与 commitLog、consumerQueue 基本一致,无非就是遍历文件比较,如果文件需要删除就加入到待删除集中中然后统一对 indexFile destroy,再移除掉该 indexFile 引用即可
总结源码学完总结一下首先 Broker 启动的时候会开启定时任务,每隔 10s 去清理过期文件,其中包含 commitLog、consumerQueue、indexFile 三种文件先清除 commitLog,当满足以下条件时,可执行 commitLog 删除逻辑
定时删除,达到符合删除的时间节点,即可触发达到阈值,阈值分为三种 0.75、0.85、0.95
当达到 0.75 时就可以触发清理逻辑了,但此时受限于文件是否过期当阈值超过 0.85 时,此时需强制删除,不受限于文件是否过期,0.95 与 0.85 的区别就是多了一条 error 日志
手动删除,执行删除命令
当 commitLog 清理完毕后,那么可以获取 minOffset 在 consumerQueue、indexFile 中,遍历判断小于该 minOffset 的文件都需要删除
评论