写点什么

通过源码分析 RocketMQ 主从复制原理

  • 2023-03-02
    四川
  • 本文字数:12310 字

    阅读完需:约 40 分钟

通过源码分析RocketMQ主从复制原理

作者:京东物流 宫丙来


一、主从复制概述


  • RocketMQ Broker 的主从复制主要包括两部分内容:CommitLog 的消息复制和 Broker 元数据的复制。

  • CommitLog 的消息复制是发生在消息写入时,当消息写完 Broker Master 时,会通过单独的线程,将消息写入到从服务器,在写入的时候支持同步写入、异步写入两种方式。

  • Broker 元数据的写入,则是 Broker 从服务器通过单独的线程每隔 10s 从主 Broker 上获取,然后更新从的配置,并持久化到相应的配置文件中。

  • RocketMQ 主从同步一个重要的特征:主从同步不具备主从切换功能,即当主节点宕机后,从不会接管消息发送,但可以提供消息读取。


二、CommitLog 消息复制


2.1、整体概述



CommitLog 主从复制的流程如下:


1.Producer 发送消息到 Broker Master,Broker 进行消息存储,并调用 handleHA 进行主从同步; 2.如果是同步复制的话,参考 2.6 章节的同步复制; 3.如果是异步复制的话,流程如下:


1. Broker Master启动,并在指定端口监听;2. Broker Slave启动,主动连接Broker Master,通过Java NIO建立TCP连接;3.  Broker Slave以每隔5s的间隔时间向服务端拉取消息,如果是第一次拉取的话,先获取本地CommitLog文件中最大的偏移量,以该偏移量向服务端拉取消息4.  Broker Master 解析请求,并返回数据给Broker Slave;5.Broker Slave收到一批消息后,将消息写入本地CommitLog文件中,然后向Master汇报拉取进度,并更新下一次待拉取偏移量;
复制代码


我们先看下异步复制的整体流程,最后再看下同步复制的流程,异步复制的入口为 HAService.start();


public void start() throws Exception { //broker master启动,接收slave请求,并处理    this.acceptSocketService.beginAccept();    this.acceptSocketService.start(); //同步复制线程启动    this.groupTransferService.start(); //broker slave启动    this.haClient.start();}
复制代码


下面分别对上面的每一步做详细说明。


2.2、HAService Master 启动


public void beginAccept() throws Exception {    this.serverSocketChannel = ServerSocketChannel.open();    this.selector = RemotingUtil.openSelector();    this.serverSocketChannel.socket().setReuseAddress(true);    this.serverSocketChannel.socket().bind(this.socketAddressListen);    this.serverSocketChannel.configureBlocking(false);    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}
复制代码


在 beginAccept 方法中主要创建了 ServerSocketChannel、Selector、设置 TCP reuseAddress、绑定监听端口、设置为非阻塞模式,并注册 OP_ACCEPT(连接事件)。可以看到在这里是通过 Java 原生的 NIO 来实现的,并没有通过 Netty 框架来实现。


acceptSocketService.start()启动方法代码如下:


while (!this.isStopped()) {    try {   //获取事件        this.selector.select(1000);        Set<SelectionKey> selected = this.selector.selectedKeys();        if (selected != null) {            for (SelectionKey k : selected) {//处理OP_ACCEPT事件,并创建HAConnection                if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                    SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();                    if (sc != null) {                       HAConnection conn = new HAConnection(HAService.this, sc);                       //主要是启动readSocketService,writeSocketService这两个线程 conn.start();                       HAService.this.addConnection(conn);                    }                }            }            selected.clear();        }    } catch (Exception e) {        log.error(this.getServiceName() + " service has exception.", e);    }}
复制代码


选择器每 1s 处理一次处理一次连接就绪事件。连接事件就绪后,调用 ServerSocketChannel 的 accept()方法创建 SocketChannel,与服务端数据传输的通道。然后为每一个连接创建一个 HAConnection 对象,该 HAConnection 将负责 Master-Slave 数据同步逻辑。HAConnection.start 方法如下:


public void start() {  this.readSocketService.start();  this.writeSocketService.start();}
复制代码


2.3、HAClient 启动


while (!this.isStopped()) {  try {    //和broker master建立连接,通过java nio来实现    if (this.connectMaster()) {      //在心跳的同时,上报offset      if (this.isTimeToReportOffset()) {        //上报offset        boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);        if (!result) {          this.closeMaster();        }      }      this.selector.select(1000);      //处理网络读请求,也就是处理从Master传回的消息数据      boolean ok = this.processReadEvent();      if (!ok) {        this.closeMaster();      }      if (!reportSlaveMaxOffsetPlus()) {        continue;      }      long interval =        HAService.this.getDefaultMessageStore().getSystemClock().now()          - this.lastWriteTimestamp;      if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()        .getHaHousekeepingInterval()) {        log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress          + "] expired, " + interval);        this.closeMaster();        log.warn("HAClient, master not response some time, so close connection");      }    } else {      this.waitForRunning(1000 * 5);    }  } catch (Exception e) {    log.warn(this.getServiceName() + " service has exception. ", e);    this.waitForRunning(1000 * 5);  }}
复制代码


2.3.1、HAService 主从建立连接


如果 socketChannel 为空,则尝试连接 Master,如果 Master 地址为空,返回 false。


private boolean connectMaster() throws ClosedChannelException {  if (null == socketChannel) {    String addr = this.masterAddress.get();    if (addr != null) {      SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);      if (socketAddress != null) {        this.socketChannel = RemotingUtil.connect(socketAddress);        if (this.socketChannel != null) {          //注册读事件,监听broker master返回的数据          this.socketChannel.register(this.selector, SelectionKey.OP_READ);        }      }    }    //获取当前的offset    this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();    this.lastWriteTimestamp = System.currentTimeMillis();  }  return this.socketChannel != null;}
复制代码


  1. Broker 主从连接


Broker Slave 通过 NIO 来进行 Broker Master 连接,代码如下:


SocketChannel sc = null;sc = SocketChannel.open();sc.configureBlocking(true);sc.socket().setSoLinger(false, -1);sc.socket().setTcpNoDelay(true);sc.socket().setReceiveBufferSize(1024 * 64);sc.socket().setSendBufferSize(1024 * 64);sc.socket().connect(remote, timeoutMillis);sc.configureBlocking(false);
复制代码


  1. Slave 获取当前 offset


public long getMaxPhyOffset() {  return this.commitLog.getMaxOffset();}public long getMaxOffset() {  return this.mappedFileQueue.getMaxOffset();}public long getMaxOffset() {  MappedFile mappedFile = getLastMappedFile();  if (mappedFile != null) {    return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();  }  return 0;}
复制代码


可以看到最终还是通过读取 MappedFile 的 position 来获取从的 offset。


2.3.2、上报 offset 时间判断


private boolean isTimeToReportOffset() {  //当前时间-上次写的时间  long interval =    HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;  boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()    .getHaSendHeartbeatInterval();

return needHeart;}
复制代码


判断逻辑为当前时间-上次写的时间>haSendHeartbeatInterval 时,则进行心跳和 offset 的上报。haSendHeartbeatInterval 默认为 5s,可配置。


2.3.3、上报 offset


private boolean reportSlaveMaxOffset(final long maxOffset) {  this.reportOffset.position(0);  this.reportOffset.limit(8);  this.reportOffset.putLong(maxOffset);  this.reportOffset.position(0);  this.reportOffset.limit(8);  //最多发送三次,reportOffset是否有剩余  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {    try {      this.socketChannel.write(this.reportOffset);    } catch (IOException e) {      log.error(this.getServiceName()        + "reportSlaveMaxOffset this.socketChannel.write exception", e);      return false;    }  }  return !this.reportOffset.hasRemaining();}
复制代码


主要还是通过 NIO 发送请求。


2.4、Broker Master 处理请求


在主从建立连接时创建了 HAConnection 对象,该对象主要包含了如下两个重要的线程服务类:


//负责写,将commitlog数据发送到从private WriteSocketService writeSocketService;//负责读,读取从上报的offset,并根据offset从Broker Master读取commitlogprivate ReadSocketService readSocketService;
复制代码


2.4.1、ReadSocketService 接收读请求


readSocketService.run 方法如下:


while (!this.isStopped()) {  try {    this.selector.select(1000);    //处理读事件    boolean ok = this.processReadEvent();    if (!ok) {      HAConnection.log.error("processReadEvent error");      break;    }    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {      log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);      break;    }  } catch (Exception e) {    HAConnection.log.error(this.getServiceName() + " service has exception.", e);    break;  }}
复制代码


processReadEvent 的逻辑如下:


int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {  readSizeZeroTimes = 0;  this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();  if ((this.byteBufferRead.position() - this.processPostion) >= 8) {    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);    //获取slave 请求的offset    long readOffset = this.byteBufferRead.getLong(pos - 8);    this.processPostion = pos;

HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } //如果是同步复制的话,判断请求的offset是否push2SlaveMaxOffset相同,相同的话则唤醒master GroupTransferService HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); }}
复制代码


可以看到 processReadEvent 逻辑很简单,就是从 ByteBuffer 中解析出 offset,然后设置 HAConnection.this.slaveRequestOffset;


2.4.2、WriteSocketService 进行写处理


Broker Master 通过 HAConnection.WriteSocketService 进行 CommitLog 的读取,run 方法主逻辑如下:


this.selector.select(1000);//nextTransferFromWhere下次传输commitLog的起始位置if (-1 == this.nextTransferFromWhere) {  if (0 == HAConnection.this.slaveRequestOffset) {    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();    masterOffset =      masterOffset        - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()        .getMapedFileSizeCommitLog());

if (masterOffset < 0) { masterOffset = 0; }

this.nextTransferFromWhere = masterOffset; } else { this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; }

log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr + "], and slave request " + HAConnection.this.slaveRequestOffset);}

//获取commitLog数据SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);//获取commitLog数据SelectMappedBufferResult selectResult = HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);if (selectResult != null) { int size = selectResult.getSize(); if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); }

long thisOffset = this.nextTransferFromWhere; this.nextTransferFromWhere += size;

selectResult.getByteBuffer().limit(size); this.selectMappedBufferResult = selectResult;

// Build Header this.byteBufferHeader.position(0); this.byteBufferHeader.limit(headerSize); this.byteBufferHeader.putLong(thisOffset); this.byteBufferHeader.putInt(size); this.byteBufferHeader.flip(); //nio发送commitlog this.lastWriteOver = this.transferData();} else { //如果没有获取到commitLog数据,等待100ms HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(1
复制代码


这里面主要包括获取 CommitLog 数据、发送 CommitLog 数据这两个步骤。


2.4.2.1、获取 CommitLog 数据


public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {  int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();  MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);  if (mappedFile != null) {    int pos = (int) (offset % mappedFileSize);    SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);    return result;  }  return null;}public SelectMappedBufferResult selectMappedBuffer(int pos) {  int readPosition = getReadPosition();  if (pos < readPosition && pos >= 0) {    if (this.hold()) {      ByteBuffer byteBuffer = this.mappedByteBuffer.slice();      byteBuffer.position(pos);      int size = readPosition - pos;      ByteBuffer byteBufferNew = byteBuffer.slice();      byteBufferNew.limit(size);      return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);    }  }  return null;}
复制代码


可以看到最终还是根据 offset 从 MappedFile 读取数据。


2.4.2.2、发送 CommitLog 数据


数据主要包括 header、body 两部分,数据发送的话还是通过 NIO 来实现,主要代码如下:


// Build Headerthis.byteBufferHeader.position(0);this.byteBufferHeader.limit(headerSize);this.byteBufferHeader.putLong(thisOffset);this.byteBufferHeader.putInt(size);this.byteBufferHeader.flip();

int writeSize = this.socketChannel.write(this.byteBufferHeader);// Write Bodyif (!this.byteBufferHeader.hasRemaining()) { while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); if (writeSize > 0) { writeSizeZeroTimes = 0; this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); } else if (writeSize == 0) { if (++writeSizeZeroTimes >= 3) { break; } } else { throw new Exception("ha master write body error < 0"); } }}
复制代码


CommitLog 主从发送完成后,Broker Slave 则会监听读事件、获取 CommitLog 数据,并进行 CommitLog 的写入。


2.5、HAClient processReadEvent


在主从建立连接后,从注册了可读事件,目的就是读取从 Broker Master 返回的 CommitLog 数据,对应的方法为 HAClient.processReadEvent:


int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {  lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();  readSizeZeroTimes = 0;  boolean result = this.dispatchReadRequest();  if (!result) {    log.error("HAClient, dispatchReadRequest error");    return false;  }} 
复制代码


dispatchReadRequest 方法如下:


 //读取返回的body databyte[] bodyData = new byte[bodySize];this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);this.byteBufferRead.get(bodyData);

HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

this.byteBufferRead.position(readSocketPos);this.dispatchPostion += msgHeaderSize + bodySize;

//上报从的offsetif (!reportSlaveMaxOffsetPlus()) { return false;
复制代码


里面的核心逻辑主要包括如下三个步骤:


  1. 从 byteBufferRead 中读取 CommitLog 数据;


  1. 调用 defaultMessageStore.appendToCommitLog 方法,将数据写入到 MappedFile 文件,写入方法如下:


public boolean appendToCommitLog(long startOffset, byte[] data) {  //将数据写到commitlog,同普通消息的存储  boolean result = this.commitLog.appendData(startOffset, data);  //唤醒reputMessageService,构建consumeQueue,index  this.reputMessageService.wakeup();  return result;}
复制代码


  1. 上报从新的 offset,也是读取 MappedFile 的 offset,然后上报 Broker Master;


2.6、同步复制


上面主要介绍了 Broker 的异步复制,下面再来看下 Broker 的同步复制的实现。同步复制的整体流程图如下:



大概说明如下:


  1. producer 发送消息到 broker,broker 进行消息的存储,将消息写入到 commitLog;

  2. broker master 写消息线程唤醒 WriteSocketService 线程,查询 commitLog 数据,然后发送到从。在 WriteSocketService 获取 commitLog 时,如果没有获取到 commitLog 数据,会等待 100ms。所以当 commitLog 新写入数据的时候,会唤醒 WriteSocketService,然后查询 commitLog 数据,发送到从。

  3. broker master 创建 GroupCommitRequest,同步等待主从复制完成;

  4. 从接受新的 commitLog 数据,然后写 commitLog 数据,并返回新的 slave offset 到主;

  5. 主更新 push2SlaveMaxOffset,并判断 push2SlaveMaxOffset 是否大于等于主从复制请求的 offset,如果大于等于的话,则认为主从复制完成,返回 commitLog.handleHA 方法成功,从而返回消息保存成功。


对应的代码入口为 CommitLog.handleHA 方法。


public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {  //如果是broker主,并且是同步复制的话  if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {    //获取HAService    HAService service = this.defaultMessageStore.getHaService();    //获取Message上的MessageConst.PROPERTY_WAIT_STORE_MSG_OK,默认是需要等待主从复制完成    if (messageExt.isWaitStoreMsgOK()) {      /**       * 判断从是否可用,判断的逻辑是:(主offset-push2SlaveMaxOffset<1024 * 1024 * 256),也就是如果主从的offset差的太多,       * 则认为从不可用, Tell the producer, slave not available       * 这里的result = mappedFile.appendMessage(msg, this.appendMessageCallback);       */      if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {        //组装GroupCommitRequest,nextOffset=result.getWroteOffset() + result.getWroteBytes(),这里的nextOffset指的就是从要写到的offset        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                            /**                * 调用的是this.groupTransferService.putRequest(request);将request放到requestsWrite list中。                  * HAService持有GroupTransferService groupTransferService引用;                */        service.putRequest(request);         /**                     * 唤醒的是WriteSocketService,查询commitLog数据,然后发送到从。                     * 在WriteSocketService获取commitLog时,如果没有获取到commitLog数据,等待100ms                     * HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);                     * 所以当commitLog新写入数据的时候,会唤醒WriteSocketService,然后查询commitLog数据,发送到从。                     */        service.getWaitNotifyObject().wakeupAll();

//等待同步复制完成,判断逻辑是: HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

//如果同步复制失败的话,设置putMessageResult中的状态为同步从超时 if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } }
复制代码


2.6.1、GroupTransferService 启动


在 HAService 启动的时候,启动了 GroupTransferService 线程,代码如下:


public void run() {  while (!this.isStopped()) {    this.waitForRunning(10);    this.doWaitTransfer();  }}private void doWaitTransfer() {  synchronized (this.requestsRead) {    if (!this.requestsRead.isEmpty()) {      for (CommitLog.GroupCommitRequest req : this.requestsRead) {        /**         * req.getNextOffset:result.getWroteOffset() + result.getWroteBytes()         * push2SlaveMaxOffset:         */        boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();        //在这循环5次,最多等待5s,因为slave 心跳间隔默认5s        for (int i = 0; !transferOK && i < 5; i++) {          this.notifyTransferObject.waitForRunning(1000);          transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();        }        if (!transferOK) {          log.warn("transfer messsage to slave timeout, " + req.getNextOffset());        }        //主从复制完成,唤醒handleHA后续操作                req.wakeupCustomer(transferOK);      }      this.requestsRead.clear();    }  }}
复制代码


wakeupCustomer:


public void wakeupCustomer(final boolean flushOK) {    this.flushOK = flushOK;    this.countDownLatch.countDown();}
复制代码


2.6.2、唤醒 WriteSocketService


service.getWaitNotifyObject().wakeupAll();


唤醒的是 WriteSocketService,查询 commitLog 数据,然后发送到从。在 WriteSocketService 获取 commitLog 时,如果没有获取到 commitLog 数据,等待 100ms。HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);所以当 commitLog 新写入数据的时候,会唤醒 WriteSocketService,然后查询 commitLog 数据,发送到从。


2.6.3、同步等待,直到复制完成


boolean flushOK =  request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

public boolean waitForFlush(long timeout) { try { //等待同步复制完成 this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return this.flushOK; } catch (InterruptedException e) { log.error("Interrupted", e); return false; }}}
复制代码


三、元数据的复制


broker 元数据的复制,主要包括 topicConfig、consumerOffset、delayOffset、subscriptionGroup 这几部分,整体流程图如下:



从 broker 通过单独的线程,每隔 10s 进行一次元数据的复制 ,代码入口为:BrokerController.start -> SlaveSynchronize.syncAll:


slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {    @Override    public void run() {        try {            //10s 进行一次主从同步            BrokerController.this.slaveSynchronize.syncAll();        }        catch (Throwable e) {            log.error("ScheduledTask SlaveSynchronize syncAll error.", e);        }    }}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);

public void syncAll() { this.syncTopicConfig(); this.syncConsumerOffset(); this.syncDelayOffset(); this.syncSubscriptionGroupConfig();}
复制代码


3.1、syncTopicConfig


//从Master获取TopicConfig信息,最终调用的是AdminBrokerProcessor.getAllTopicConfigTopicConfigSerializeWrapper topicWrapper =    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);if (!this.brokerController.getTopicConfigManager().getDataVersion()    .equals(topicWrapper.getDataVersion())) {    this.brokerController.getTopicConfigManager().getDataVersion()        .assignNewOne(topicWrapper.getDataVersion());    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();    this.brokerController.getTopicConfigManager().getTopicConfigTable()        .putAll(topicWrapper.getTopicConfigTable()); //将topicConfig进行持久化,对应的文件为topics.json    this.brokerController.getTopicConfigManager().persist();    log.info("Update slave topic config from master, {}", masterAddrBak)
复制代码


3.2、syncConsumerOffset


//从"主Broker"获取ConsumerOffsetConsumerOffsetSerializeWrapper offsetWrapper =        this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);//设置从的offsetTablethis.brokerController.getConsumerOffsetManager().getOffsetTable()                    .putAll(offsetWrapper.getOffsetTable());//并持久化到从的consumerOffset.json文件中this.brokerController.getConsumerOffsetManager().persist(); 
复制代码


3.3、syncDelayOffset


String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);String fileName = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); MixAll.string2File(delayOffset, fileName);
复制代码


3.4、syncSubscriptionGroupConfig


SubscriptionGroupWrapper subscriptionWrapper =this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak);SubscriptionGroupManager subscriptionGroupManager =this.brokerController.getSubscriptionGroupManager();subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());subscriptionGroupManager.getSubscriptionGroupTable().clear();subscriptionGroupManager.getSubscriptionGroupTable().putAll(subscriptionWrapper.getSubscriptionGroupTable());subscriptionGroupManager.persist();
复制代码


四、思考与收获


通过上面的分享,我们基本上了解了 RocketMQ 的主从复制原理,其中有些思想我们可以后续借鉴下:


  1. 在功能设计的时候将元数据、程序数据分开管理;

  2. 主从复制的时候,基本思想都是从请求主,请求时带上 offset,然后主查询数据返回从,从再执行;mysql 的主从复制、redis 的主从复制基本也是这样;

  3. 主从复制包括异步复制、同步复制两种方式,可以通过配置来决定使用哪种同步方式,这个需要根据实际业务场景来决定;

  4. 主从复制线程尽量和消息写线程或者主线程分开;


由于时间、精力有限,难免会有纰漏、考虑不到之处,如有问题欢迎沟通、交流。

发布于: 刚刚阅读数: 6
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
通过源码分析RocketMQ主从复制原理_Java_京东科技开发者_InfoQ写作社区