RocketMQ 高可用设计之异步刷盘
- 2022 年 8 月 19 日 贵州
- 本文字数:4158 字 - 阅读完需:约 14 分钟 
RocketMQ 高可用设计之异步刷盘
比起同步刷盘,异步刷盘效率更高,也是生产中首选使用的刷盘策略,而 RocketMQ 默认采用异步刷盘,异步刷盘两种策略,分为开启缓冲池和不开启缓冲池两种模式。
CommitLog 的 handleDiskFlush 方法:
CommitLog 的 handleDiskFlush 方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    // Synchronization flush    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;        if (messageExt.isWaitStoreMsgOK()) {            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());            service.putRequest(request);            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());            if (!flushOK) {                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                    + " client address: " + messageExt.getBornHostString());                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);            }        } else {            service.wakeup();        }    }    // Asynchronous flush    else {        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            flushCommitLogService.wakeup();        } else {            commitLogService.wakeup();        }    }}
通过这个刷盘的方法我们知道,CommitLog 的 handleDiskFlush()这个方法分为同步刷盘和异步刷盘,而同步刷盘我们在上篇文章进行了分析,我们主要看一下异步刷盘的逻辑
开启缓冲池模式
class CommitRealTimeService extends FlushCommitLogService {
    private long lastCommitTimestamp = 0;
    @Override    public String getServiceName() {        return CommitRealTimeService.class.getSimpleName();    }
    @Override    public void run() {        CommitLog.log.info(this.getServiceName() + " service started");        while (!this.isStopped()) {            //每次提交间隔200毫秒            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
            //每次提交最少4页内存数据(16KB)            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
            //距离上次提交时间阈值为200毫秒            int commitDataThoroughInterval =                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
            long begin = System.currentTimeMillis();            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {                this.lastCommitTimestamp = begin;                commitDataLeastPages = 0;            }
            try {                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);                long end = System.currentTimeMillis();                if (!result) {                    this.lastCommitTimestamp = end; // result = false means some data committed.                    //now wake up flush thread.                    flushCommitLogService.wakeup();                }
                if (end - begin > 500) {                    log.info("Commit data to file costs {} ms", end - begin);                }                this.waitForRunning(interval);            } catch (Throwable e) {                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);            }        }
        boolean result = false;        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {            result = CommitLog.this.mappedFileQueue.commit(0);            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));        }        CommitLog.log.info(this.getServiceName() + " service end");    }}
RocketMQ 申请一块和 CommitLog 文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程 CommitRealTimeService 也每间隔 500 毫秒尝试提交到文件通道等待刷盘,刷盘最终由 FlushRealTimeService 来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高 io 性能。
- 判断是否超过 200 毫秒没提交,需要强制提交 
- 提交到 MappedFile,此时还未刷盘 
- 然后唤醒刷盘线程 
- 在 Broker 正常停止前,提交内存 page 中的数据 
不开启缓冲池
不开启缓冲池:默认不开启,刷盘线程 FlushRealTimeService 会每间隔 500 毫秒尝试去刷盘。
FlushRealTimeService
class FlushRealTimeService extends FlushCommitLogService {    private long lastFlushTimestamp = 0;    private long printTimes = 0;
    public void run() {        CommitLog.log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
            //每次Flush间隔500毫秒            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();            //每次Flush最少4页内存数据(16KB)            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
             //距离上次刷盘时间阈值为10秒            int flushPhysicQueueThoroughInterval =                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
            boolean printFlushProgress = false;
            // Print flush progress            long currentTimeMillis = System.currentTimeMillis();            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {                this.lastFlushTimestamp = currentTimeMillis;                flushPhysicQueueLeastPages = 0;                printFlushProgress = (printTimes++ % 10) == 0;            }
            try {                if (flushCommitLogTimed) {                    Thread.sleep(interval);                } else {                    this.waitForRunning(interval);                }
                if (printFlushProgress) {                    this.printFlushProgress();                }
                long begin = System.currentTimeMillis();                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                if (storeTimestamp > 0) {                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                }                long past = System.currentTimeMillis() - begin;                if (past > 500) {                    log.info("Flush data to disk costs {} ms", past);                }            } catch (Throwable e) {                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                this.printFlushProgress();            }        }
        // Normal shutdown, to ensure that all the flush before exit        boolean result = false;        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {            result = CommitLog.this.mappedFileQueue.flush(0);            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));        }
        this.printFlushProgress();
        CommitLog.log.info(this.getServiceName() + " service end");    }
    @Override    public String getServiceName() {        return FlushRealTimeService.class.getSimpleName();    }
    private void printFlushProgress() {        // CommitLog.log.info("how much disk fall behind memory, "        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());    }
    @Override    public long getJointime() {        return 1000 * 60 * 5;    }}
- 判断是否超过 10 秒没刷盘了,如果超过强制刷盘 
- 等待 Flush 间隔 500ms 
- 通过 MappedFile 刷盘 
- 设置 StoreCheckpoint 刷盘时间点 
- 超过 500ms 的刷盘记录日志 
- Broker 正常停止前,把内存 page 中的数据刷盘 
总结
这篇文章我们讲了 RocketMQ 的异步刷盘的逻辑,总体上分为开启缓存池的策略和不开启缓存池的策略,默认是不开启缓冲池。
❤️ 感谢大家
如果你觉得这篇内容对你挺有有帮助的话:
- 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏 
- 关注 - 盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。
版权声明: 本文为 InfoQ 作者【周杰伦本人】的原创文章。
原文链接:【http://xie.infoq.cn/article/9652b0921ddc64bd96075662c】。文章转载请联系作者。

周杰伦本人
还未添加个人签名 2020.02.29 加入
公众号《盼盼小课堂》,多平台优质博主









 
    
评论