写点什么

RocketMQ 高可用设计之同步刷盘

作者:周杰伦本人
  • 2022 年 8 月 18 日
    贵州
  • 本文字数:4124 字

    阅读完需:约 14 分钟

RocketMQ 高可用设计之同步刷盘

在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的 CommitLog 文件。

CommitLog 的 handleDiskFlush 方法:

顾名思义,这是一个同步刷盘的方法


CommitLog 的 handleDiskFlush 方法:


public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {    // Synchronization flush    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;        if (messageExt.isWaitStoreMsgOK()) {            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());            service.putRequest(request);            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());            if (!flushOK) {                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                    + " client address: " + messageExt.getBornHostString());                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);            }        } else {            service.wakeup();        }    }    // Asynchronous flush    else {        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {            flushCommitLogService.wakeup();        } else {            commitLogService.wakeup();        }    }}
class GroupCommitService extends FlushCommitLogService { private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>(); private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
public synchronized void putRequest(final GroupCommitRequest request) { synchronized (this.requestsWrite) { this.requestsWrite.add(request); } if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); // notify } }
private void swapRequests() { List<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; }
private void doCommit() { synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } }
req.wakeupCustomer(flushOK); }
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }
this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
public void run() { CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { try { this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } }
// Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); }
synchronized (this) { this.swapRequests(); }
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end"); }
@Override protected void onWaitEnd() { this.swapRequests(); }
@Override public String getServiceName() { return GroupCommitService.class.getSimpleName(); }
@Override public long getJointime() { return 1000 * 60 * 5; } }
复制代码


GroupCommitRequest 是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程


GroupCommitService 每隔 10 毫秒写一批数据到磁盘。之所以不直接写是磁盘 io 压力大,写入性能低,每隔 10 毫秒写一次可以提升磁盘 io 效率和写入性能。


  1. putRequest(request) 提交刷盘任务到任务列表

  2. request.waitForFlush 同步等待 GroupCommitService 将任务列表中的任务刷盘完成。


两个队列读写分离,requestsWrite 是写队列,用户保存添加进来的刷盘任务,requestsRead 是读队列,在刷盘之前会把写队列的数据放入读队列。

CommitLog 的 doCommit 方法

CommitLog 的 doCommit 方法:


private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } }
req.wakeupCustomer(flushOK); }
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
复制代码


  1. 刷盘的时候依次读取 requestsRead 中的数据写入磁盘,

  2. 写入完成后清空 requestsRead。


读写分离设计的目的是在刷盘时不影响任务提交到列表。

刷盘

CommitLog.this.mappedFileQueue.flush(0);是刷盘操作


CommitLog.this.mappedFileQueue.flush()方法:


public boolean flush(final int flushLeastPages) {    boolean result = true;    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);    if (mappedFile != null) {        long tmpTimeStamp = mappedFile.getStoreTimestamp();        int offset = mappedFile.flush(flushLeastPages);        long where = mappedFile.getFileFromOffset() + offset;        result = where == this.flushedWhere;        this.flushedWhere = where;        if (0 == flushLeastPages) {            this.storeTimestamp = tmpTimeStamp;        }    }
return result;}
复制代码


通过 MappedFile 映射的 CommitLog 文件写入磁盘


好了,这就是 RocketMQ 高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而 GroupCommitService 每隔 10 毫秒写一批数据到磁盘。

总结

这篇文章我们介绍了 RocketMQ 的同步刷盘机制,每隔 10 毫秒写一批数据到磁盘,最终通过 MappedFile 映射的 CommitLog 文件写入磁盘,采用读写分离的队列来刷盘

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:


  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏

  2. 关注盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ高可用设计之同步刷盘_8月月更_周杰伦本人_InfoQ写作社区