RocketMQ—Producer(二)路由动态更新
一、Producer 路由信息
从 NameServer 章节分析得知,路由信息存储在 NameServer,生产端和消费端定时向 NameServer 获取 topic 相关的路由信息; 从生产者启动流程得知:路由信息的动态更新源码在 MQClientInstance#startScheduledTask 定时任务里面具体方法:updateTopicRouteInfoFromNameServer 下图为路由更新流程
接下来我们着重解析此段源码:
1 定时任务:频率-30s
2 updateTopicRouteInfoFromNameServer
分析如下:
1.1 getPublishTopicListgetPublishTopicList 方法分析:
备注:
细心的你可能发现从启动流程中得知:
topicPublishInfoTable(ConcurrentHashMap)只会默认注册 topic=TBW102 的信息,那正常业务发送的 topic 是如何注册进去的呢,建议直接观看理解以下代码,在发送流程中会体现出如何注册到 topicPublishInfoTable 中;topicPublishInfoTable 数据的初始化(value:第一次默认都是 new TopicPublishInfo())
分析
如果生产者中缓存了 topic 的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,
如果没有缓存或没有包含消息队列, 则向 NameServer 查询该 topic 的路由信息。
如果最终未找到路由信息,则抛出异常 : 无法找到主题相关路由信息异常 。
1.2 updateTopicRouteInfoFromNameServer 从 NameServer 更新 topic 路由信息
在分析之前,可先简单分析 MQClientInstance 核心属性:
备注:此处列出的属性仅跟生产端相关,其他的属性和方法大都我们会在消费端分析
接下来着重分析:updateTopicRouteInfoFromNameServer
备注:
判断:TopicRouteData 是否改变,topicRouteDataIsChange(old, topicRouteData); 代码很简单,直接分析如下:
(1) 判断 olddata 或 nowdata 是否为空
(2) TopicRouteData 的 equals 方法比较
继续判断是否需要更新 topic 路由信息 isNeedUpdateTopicRouteInfo(topic);
最终调用的代码为:
DefaultMQProducerImpl#isPublishTopicNeedUpdate(主要逻辑判断是 TopicPublishInfo 是否存在,或者 TopicPublishInfo 的 messageQueueList 是否为空)
topicRouteData2TopicPublishInfo 数据转换,你一定感兴趣,内容相当简单,
分析如下:
4.更新-路由发布信息:updateTopicPublishInfo(topic, publishInfo);调用的代码为:DefaultMQProducerImpl#updateTopicPublishInfo,本质就是维护 Map-topicPublishInfoTable
二、结论
路由更新虽然相对简单,但对于生产者来说至关重要,生产端需要知道路由信息才能进行计算选择将消息发送到哪台 broker;但从源码分析中,可以看出更新路由信息以 topic 为维度,组装更新数据,本质还是维护 Map(topicRouteTable、brokerAddrTable、topicPublishInfoTable)等,但是要注意是:ConcurrentHashMap。
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT 巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例,作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ 上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。
版权声明: 本文为 InfoQ 作者【IT巅峰技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/b44f73c0a528ba3f7b1e1e2cc】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论