写点什么

RocketMQ 主从同步读写分离机制

  • 2021 年 11 月 11 日
  • 本文字数:1831 字

    阅读完需:约 6 分钟

}


}


if (found) {


return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));


}


return null;


}


  • brokerName:broker 名称;brokerId:brokerId;onlyThisBroker:是否必须返回 brokerId 的 broker 对应的服务器信息。

  • brokerAddrTable 地址缓存表中根据 brokerName 获取所有的 broker 信息。brokerAddrTable 的存储格式如:brokerName:{brokerId:brokerAddress}。

  • 根据 brokerId 从 broker 主从缓存表中获取指定 broker 名称,如果根据 brokerId 未找到相关条目,此时如果 onlyThisBroker 为 false,则随机返回 broker 中任意一个 Broker,否则返回 null。

  • 组装 FindBrokerResult 时,需要设置是否是 slave 这个属性。如果 brokerId=0 表示返回的 broker 是主节点,否则返回的是从节点。


上述方法,根据 brokerName 是如何获取 brokerId 的呢?


请看 MQClientInstance#recalculatePullFromWhichNode:


public long recalculatePullFromWhichNode(final MessageQueue mq) {


if (this.isConnectBrokerByUser()) {


return this.defaultBrokerId;


}


AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);


if (suggest != null) {


return suggest.get();


}


return MixAll.MASTER_ID;


}


首先从 pullFromWhichNodeTable 缓存表中获取该消息消费队列的 brokerId,如果找到,则返回,否则返回 brokerName 的主节点。由此可以看出 pullFromWhichNodeTable 中存放的是消息队列建议从从哪个 Broker 服务器拉取消息的缓存表,其存储结构:MessageQueue:AtomicLong,那该信息从何而来呢?


原来消息消费拉取线程 PullMessageService 根据 PullRequest 请求从主服务器拉取消息后会返回下一次建议拉取的 brok


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


erId,消息消费者线程在收到消息后,会根据主服务器的建议拉取 brokerId 来更新 pullFromWhichNodeTable,消息消费者线程更新 pullFromWhichNodeTable 的代码如下:


PullAPIWrapper#processPullResult


this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());


public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {


AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);


if (null == suggest) {


this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));


} else {


suggest.set(brokerId);


}


}


那服务端是如何计算下一次拉取建议从哪台 Broker 服务器拉取消息呢?


请看:DefaultMessageStore#getMessage


long diff = maxOffsetPy - maxPhyOffsetPulling;


long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));


getResult.setSuggestPullingFromSlave(diff > memory);


  • maxOffsetPy:代表当前主服务器消息存储文件最大偏移量,maxPhyOffsetPulling:此次拉取消息最大偏移量。

  • diff:对于 PullMessageService 线程来说,当前未被拉取到消息消费端的消息长度。

  • TOTAL_PHYSICAL_MEMORY_SIZE:RocketMQ 所在服务器总内存大小;accessMessageInMemoryMaxRatio:表示 RocketMQ 所能使用的最大内存比例,超过该内存,消息将被置换出内存;memory 表示 RocketMQ 消息常驻内存的大小,超过该大小,RocketMQ 会将旧的消息置换会磁盘。

  • 如果 diff 大于 memory,表示当前需要拉取的消息已经超出了常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。


PullMessageProcessor#processRequest


if (getMessageResult.isSuggestPullingFromSlave()) {


responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());


} else {


responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);


}


当 GetResult 的 suggestPullingFromSlave 为真是,将会直接返回消息消费组的配置信息 whichBrokerWhenConsumeSlowly,默认为 1,可以通过客户端命令 updateSubGroup 配置当主服务器繁忙时,建议从哪个从服务器读取消息。


注意:RocketMQ 读写分离不按套路出牌,并不是主服务器只负责消息发送,消息从服务器主要负责消息拉取,而是只有当主服务器消息拉取出现堆积时才将拉取任务转向从服务器。




备注:本文是《RocketMQ 技术内幕》的前期素材,建议关注笔者的书籍:《RocketMQ 技术内幕》。




见文如面,我是威哥,热衷于成体系剖析 JAVA 主流中间件,关注公众号『中间件兴趣圈』,回复专栏可获取成体系专栏导航,回复资料可以获取笔者的学习思维导图



评论

发布
暂无评论
RocketMQ 主从同步读写分离机制