写点什么

RocketMQ 高可用设计之异步刷盘

作者:周杰伦本人
  • 2022 年 8 月 19 日
    贵州
  • 本文字数:4158 字

    阅读完需:约 14 分钟

RocketMQ 高可用设计之异步刷盘

比起同步刷盘,异步刷盘效率更高,也是生产中首选使用的刷盘策略,而 RocketMQ 默认采用异步刷盘,异步刷盘两种策略,分为开启缓冲池和不开启缓冲池两种模式。

CommitLog 的 handleDiskFlush 方法:

CommitLog 的 handleDiskFlush 方法:


public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    // Synchronization flush    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;        if (messageExt.isWaitStoreMsgOK()) {            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());            service.putRequest(request);            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());            if (!flushOK) {                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                    + " client address: " + messageExt.getBornHostString());                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);            }        } else {            service.wakeup();        }    }    // Asynchronous flush    else {        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            flushCommitLogService.wakeup();        } else {            commitLogService.wakeup();        }    }}
复制代码


通过这个刷盘的方法我们知道,CommitLog 的 handleDiskFlush()这个方法分为同步刷盘和异步刷盘,而同步刷盘我们在上篇文章进行了分析,我们主要看一下异步刷盘的逻辑

开启缓冲池模式

class CommitRealTimeService extends FlushCommitLogService {
private long lastCommitTimestamp = 0;
@Override public String getServiceName() { return CommitRealTimeService.class.getSimpleName(); }
@Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { //每次提交间隔200毫秒 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
//每次提交最少4页内存数据(16KB) int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
//距离上次提交时间阈值为200毫秒 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; }
try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); }
if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } }
boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }}
复制代码


RocketMQ 申请一块和 CommitLog 文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程 CommitRealTimeService 也每间隔 500 毫秒尝试提交到文件通道等待刷盘,刷盘最终由 FlushRealTimeService 来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高 io 性能。


  1. 判断是否超过 200 毫秒没提交,需要强制提交

  2. 提交到 MappedFile,此时还未刷盘

  3. 然后唤醒刷盘线程

  4. 在 Broker 正常停止前,提交内存 page 中的数据

不开启缓冲池

不开启缓冲池:默认不开启,刷盘线程 FlushRealTimeService 会每间隔 500 毫秒尝试去刷盘。

FlushRealTimeService

class FlushRealTimeService extends FlushCommitLogService {    private long lastFlushTimestamp = 0;    private long printTimes = 0;
public void run() { CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
//每次Flush间隔500毫秒 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); //每次Flush最少4页内存数据(16KB) int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
//距离上次刷盘时间阈值为10秒 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
boolean printFlushProgress = false;
// Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; }
try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); }
if (printFlushProgress) { this.printFlushProgress(); }
long begin = System.currentTimeMillis(); CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } }
// Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); }
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end"); }
@Override public String getServiceName() { return FlushRealTimeService.class.getSimpleName(); }
private void printFlushProgress() { // CommitLog.log.info("how much disk fall behind memory, " // + CommitLog.this.mappedFileQueue.howMuchFallBehind()); }
@Override public long getJointime() { return 1000 * 60 * 5; }}
复制代码


  1. 判断是否超过 10 秒没刷盘了,如果超过强制刷盘

  2. 等待 Flush 间隔 500ms

  3. 通过 MappedFile 刷盘

  4. 设置 StoreCheckpoint 刷盘时间点

  5. 超过 500ms 的刷盘记录日志

  6. Broker 正常停止前,把内存 page 中的数据刷盘

总结

这篇文章我们讲了 RocketMQ 的异步刷盘的逻辑,总体上分为开启缓存池的策略和不开启缓存池的策略,默认是不开启缓冲池。

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:


  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏

  2. 关注盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ高可用设计之异步刷盘_8月月更_周杰伦本人_InfoQ写作社区