写点什么

🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」Broker 服务端自动创建 topic 的原理分析和问题要点指南

作者:浩宇天尚
  • 2022 年 1 月 19 日
  • 本文字数:8527 字

    阅读完需:约 28 分钟

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」Broker服务端自动创建topic的原理分析和问题要点指南

前提背景


使用 RocketMQ 进行发消息时,一般我们是必须要指定 topic,此外 topic 必须要提前建立,但是 topic 的创建(自动或者手动方式)的设置有一个开关 autoCreateTopicEnable,此部分主要会在 broker 节点的配置文件的时候进行设置,运行环境中会使用默认设置 autoCreateTopicEnable = true,但是这样就会导致 topic 的设置不容易规范管理,所以在生产环境中会在 Broker 设置参数 autoCreateTopicEnable = false。那么如果此参数稍有偏差,或者没有提前手动创建 topic,则会频繁出现 No route info of this topic 这个错误,那么接下来我们探索一下此问题的出现原因以及系统如何进行创建 topic。

No route info of this topic

相信做过 RocketMQ 项目的小伙伴们,可能对 No route info of this topic 一点都不陌生,说明的含义起始就是无法解析或者路由这个 topic,但是造成的原因有很多种。

没有配置 NameServer 服务

Broker 启动时我们没有配置 NameSrv 地址,发送程序会报错:No route info of this topic。但当我们配上 NameSrv 地址后,再次启动,可以正常发送消息。

没有建立 autoCreateTopicEnable=true 且没有创建该 topic

当 autoCreateTopicEnable=false 时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于 topic 的一些信息,比如有几个消息队列,是不时有序 topic,有这个 topic 的 Broker 列表等,当获取不到正确的信息时,就会抛出异常

RocketMQ 的客户端版本与服务端版本不一致

RocketMQ Java 客户端调用 No route info of this topic 错误(原因版本不一致)。此时,即使启动 broker 的时候设置 autoCreateTopicEnable=true 也没有用,假如,使用的 rocketmq 的版本是 4.9.0,java client 端版本 4.3.0



RocketMQ 4.3.0 版本的自动创建(autoCreateTopicEnable),客户端传递使用的 AUTO_CREATE_TOPIC_KEY_TOPIC 是”AUTO_CREATE_TOPIC_KEY”,新版本的 client,客户端传递的默认 AUTO_CREATE_TOPIC_KEY_TOPIC 是“TBW102”。


org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKeyorg.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPICpublic static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
复制代码

实际代码

> 4.4.0 版本
<=4.3.0 版本

方案 1:要不就进行调整 client 客户端版本的 version

<dependency>    <groupId>org.apache.rocketmq</groupId>    <artifactId>rocketmq-client</artifactId>    <version>4.5.1</version></dependency>
复制代码

方案 2:调整自动创建代码为 AUTO_CREATE_TOPIC_KEY

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");//设置自动创建topic的key值producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
复制代码


Topic 之前并未创建过,Broker 未配置 NameSrv 地址,无法发送,而配置 NameSrv 后则可以正常发送。这中间有 2 个问题:1、topic 是怎么自动创建的?2、topic 自动创建过程中 Broker、NameSrv 如何协作配合的?

分析以下如何自动创建 topic 的源码流程

RocketMQ 基本路由规则


  1. Broker 在启动时向 Nameserver 注册存储在该服务器上的路由信息,并每隔 30s 向 Nameserver 发送心跳包,并更新路由信息。Nameserver 每隔 10s 扫描路由表,如果检测到 Broker 服务宕机,则移除对应的路由信息。

  2. 消息生产者每隔 30s 会从 Nameserver 重新拉取 Topic 的路由信息并更新本地路由表;在消息发送之前,如果本地路由表中不存在对应主题的路由消息时,会主动向 Nameserver 拉取该主题的消息。

  3. 如果 autoCreateTopicEnable 设置为 true,消息发送者向 NameServer 查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此时消息发送者得到的路由信息为:

默认 Topic 的路由信息是如何创建的?

Nameserver?broker?当 autoCreateTopicEnable=false 时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于 topic 的一些信息,比如有几个消息队列,是不时有序 topic,有这个 topic 的 Broker 列表等,当获取不到正确的信息时,就会抛出异常


 private SendResult sendDefaultImpl(        Message msg,        final CommunicationMode communicationMode,        final SendCallback sendCallback,        final long timeout    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.makeSureStateOK();        Validators.checkMessage(msg, this.defaultMQProducer);        final long invokeID = random.nextLong();        long beginTimestampFirst = System.currentTimeMillis();        long beginTimestampPrev = beginTimestampFirst;        long endTimestamp = beginTimestampFirst;        // 如果获取到topic的路由信息,则发送,否则抛异常        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());        if (topicPublishInfo != null && topicPublishInfo.ok()) {           ... ...        }        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();        if (null == nsList || nsList.isEmpty()) {            throw new MQClientException(                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);        }        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);    }
复制代码


tryToFindTopicPublishInfo 是发送的关键,如果获取到 topic 的信息,则发送,否则就异常;因此之前 No route info of this topic 的异常,就是 Producer 获取不到 Topic 的信息,导致发送失败。

先从 topicPublishInfoTable 缓存中获取

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {    // topicPublishInfoTable是Producer本地缓存的topic信息表    // Producer启动后,会添加默认的topic:TBW102    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);    if (null == topicPublishInfo || !topicPublishInfo.ok()) {        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());        // 未获取到,从NameSrv获取该topic的信息        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);        topicPublishInfo = this.topicPublishInfoTable.get(topic);    }    // 获取到了,则返回    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {        return topicPublishInfo;    } else {        // 没获取到,再换种方式从NameSrv获取        // 如果再获取不到,那后续就无法发送了        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);        topicPublishInfo = this.topicPublishInfoTable.get(topic);        return topicPublishInfo;    }}
复制代码


  1. Producer 本地 topicPublishInfoTable 变量中没有 topic 的信息,只缓存了 TBW102。

  2. 尝试从 NameSrv 获取 Topic 的信息。获取失败,NameSrv 中根本没有 Topic,因为这个 topic 是 Producer 发送时设置的,没有同步到 NameSrv。

  3. 再换种方式从 NameSrv 获取,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛 No route info of this topic 的异常了。

再从 NameServer 服务中进行获取

public boolean updateTopicRouteInfoFromNameServer(final String topic) {        return updateTopicRouteInfoFromNameServer(topic, false, null);}
复制代码


  1. 第 1 次获取时,isDefault 传的 false,defaultMQProducer 传的 null,因此在 updateTopicRouteInfoFromNameServer 会走 else 分支,用 Topic 去获取

  2. 第 2 次获取时,isDefault 传的 true,defaultMQProducer 也传值了,因此会走 if 分支,将入参的 topic 转换为默认的 TBW102,获取 TBW102 的信息

  3. 不管 Broker 配没配 NameSrv 地址,获取 Topic 的信息,必失败

  4. 获取 TBW102 信息:

  5. 2.1 Broker 配置了 NameSrv 地址,成功

  6. 2.2 Broker 没有配置 NameSrv 地址,失败


生产者首先向 NameServer 查询路由信息,由于是一个不存在的主题,故此时返回的路由信息为空,RocketMQ 会使用默认的主题再次寻找,由于开启了自动创建路由信息,NameServer 会向生产者返回默认主题的路由信息。


然后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从 Nameserver 获取到默认的 Topic 的队列信息后,队列的个数会改变吗?


从 NameServer 中获取,注意这个 isDefault=false,defaultMQProducer=null


温馨提示:消息发送者在到默认路由信息时,其队列数量,会选择 DefaultMQProducer#defaultTopicQueueNums 与 Nameserver 返回的的队列数取最小值,DefaultMQProducer#defaultTopicQueueNums 默认值为 4,故自动创建的主题,其队列数量默认为 4。

获取消息对应的 topic 信息

发请求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader),但是因为没有任何一个 Broker 有关于这个 topic 的信息,所以 namesrv 就会返回 topic 不存在,处理请求的代码在 DefaultRequestProcessor 的。


case RequestCode.GET_ROUTEINTO_BY_TOPIC:  return this.getRouteInfoByTopic(ctx, request);
复制代码


也就是回应码 ResponseCode.TOPIC_NOT_EXIST,然后抛出异常 throw new MQClientException(response.getCode(), response.getRemark());被捕获之后退出返回 false。

从 NameServer 获取相关的 Topic 信息数据

updateTopicRouteInfoFromNameServer 最终会发给 NameSrv 一个 GET_ROUTEINTO_BY_TOPIC 请求


public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,        DefaultMQProducer defaultMQProducer) {        try {            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {                try {                    TopicRouteData topicRouteData;                    if (isDefault && defaultMQProducer != null) {                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),                            1000 * 3);                        if (topicRouteData != null) {                            for (QueueData data : topicRouteData.getQueueDatas()) {                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());                                data.setReadQueueNums(queueNums);                                data.setWriteQueueNums(queueNums);                            }                        }                    } else {                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);                    }                } catch (Exception e) {                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);                    }                } finally {                    this.lockNamesrv.unlock();                }            } else {                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);            }        } catch (InterruptedException e) {            log.warn("updateTopicRouteInfoFromNameServer Exception", e);        }        return false;    }
复制代码


因为 if 条件不满足,所以获取默认的 topic 信息,注意 isDefault=true,defaultMQProducer=defaultMQProducer


if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {            return topicPublishInfo;        } else {            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);            topicPublishInfo = this.topicPublishInfoTable.get(topic);            return topicPublishInfo;}
复制代码


默认的 topic 为"TBW102",这个时候如果 namesrv 中如果还是没有这个 topic 的信息的话,就会抛出异常 No route info of this topic。autoCreateTopicEnable=true 的作用。

Broker 启动流程自动创建 topic

  • 在 Broker 启动流程中,会构建 TopicConfigManager 对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向 topicConfigTable 中添加默认主题的路由信息。

  • 当 Broker 启动时,TopicConfigManager 初始化,这里会判断该标识,创建 TBW102topic,并且在后续的心跳中把信息更新到 namesrv 中,这样在发消息的时候就不会抛出不存在的异常。


 // MixAll.DEFAULT_TOPIC            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {                String topic = MixAll.DEFAULT_TOPIC;                TopicConfig topicConfig = new TopicConfig(topic);                this.systemTopicList.add(topic);                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()                    .getDefaultTopicQueueNums());                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()                    .getDefaultTopicQueueNums());                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;                topicConfig.setPerm(perm);                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);            }
复制代码


该 topicConfigTable 中所有的路由信息,会随着 Broker 向 Nameserver 发送心跳包中,Nameserver 收到这些信息后,更新对应 Topic 的路由信息表。


BrokerConfig 的 defaultTopicQueueNum 默认为 8。两台 Broker 服务器都会运行上面的过程,故最终 Nameserver 中关于默认主题的路由信息中,会包含两个 Broker 分别各 8 个队列信息。

TopicConfigManager 构造方法

当从 namesrv 查出 Topic 相关的信息时,在 topicRouteData2TopicPublishInfo 设置消息队列数量 info.getMessageQueueList().add(mq);,调用 updateTopicPublishInfo 方法更新缓存 topicPublishInfoTable


 // Update Pub info                            {                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);                                publishInfo.setHaveTopicRouterInfo(true);                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();                                while (it.hasNext()) {                                    Entry<String, MQProducerInner> entry = it.next();                                    MQProducerInner impl = entry.getValue();                                    if (impl != null) {                                        impl.updateTopicPublishInfo(topic, publishInfo);                                    }                                }                            }
复制代码


然后 if (topicPublishInfo != null && topicPublishInfo.ok()) 这个条件就会符合,那个异常就不会抛出。

当 autoCreateTopicEnable=false 时

  1. 创建 topic 的类 UpdateTopicSubCommand(),设置相应的信息,最后调用 defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

  2. 发消息 RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor 处理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);

  3. 同步给其他 Broker


this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);this.brokerController.registerBrokerAll(false, true);
复制代码

Broker 端收到消息后的处理流程

服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时,会调用 super.msgCheck 方法:

AbstractSendMessageProcessor#msgCheck

在 Broker 端,首先会使用 TopicConfigManager 根据 topic 查询路由信息,如果 Broker 端不存在该主题的路由配置(路由信息),此时如果 Broker 中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在 Broker 创建新 Topic 的路由信息。这样 Broker 服务端就会存在主题的路由信息。


在 Broker 端的 topic 配置管理器中存在的路由信息,一会向 Nameserver 发送心跳包,汇报到 Nameserver,另一方面会有一个定时任务,定时存储在 broker 端,具体路径为 ${ROCKET_HOME}/store/config/topics.json 中,这样在 Broker 关闭后再重启,并不会丢失路由信息。

TBW102 是为何物?

TBW102 是 Broker 启动时,当 autoCreateTopicEnable 的配置为 true 时,会自动创建该默认 topic。


public TopicConfigManager(BrokerController brokerController) {    this.brokerController = brokerController;    // ...    {        // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC        if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {            String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;            TopicConfig topicConfig = new TopicConfig(topic);            this.systemTopicList.add(topic);            topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()                .getDefaultTopicQueueNums());            topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()                .getDefaultTopicQueueNums());            int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;            topicConfig.setPerm(perm);            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);        }    }    // ...}
复制代码


autoCreateTopicEnable 的默认值是 true,可以同步外部配置文件,让 Broker 启动时加载,来改变该值。我理解的 TBW102 的作用是当开启自动创建 topic 功能,发送时用了未配置的 topic,可以让该 topic 继承默认 TBW102 的配置,实现消息的发送。

总结分析

  1. client 本地首先没有缓存对应 topic 的路由信息,然后先去 nameserver 去查找,nameserver 中也没有此 topic 的路由信息,然后返回给 client。client 接收到返回后再向 nameserver 请求 topic 为 tbw102 的路由信息。

  2. 如果有 broker 设置了 autocreateTopic,则 broker 在启动的时候会在 topicManager 中创建对应的 topicconfig 通过心跳发送给 nameserver,namerserver 会将其保存。nameserver 将之前保存的 tbw102 的路由信息返回给请求的 client。

  3. client 拿到了 topic 为 tbw102 的路由信息后返回,client 根据返回的 tbw102 路由信息(里面包含所有设置了 autocreateTopic 为 true 的 broker,默认每个 broker 会在 client 本地创建 DefaultTopicQueueNums=4 个读写队列选择,假设两个 broker 则会有 8 个队列让你选择)先缓存到本地的 topicPublishInfoTable 表中,key 为此 topic ,value 为此 topicRouteData,轮询选择一个队列进行发送。


根据选择到的队列对应的 broker 发送该 topic 消息。

broker 在接收到此消息后会在 msgcheck 方法中调用 createTopicInSendMessageMethod 方法创建 topicConfig 信息塞进 topicConfigTable 表中,然后就跟发送已经创建的 topic 的流程一样发送消息了。


同时 topicConfigTable 会通过心跳将新的这个 topicConfig 信息发送给 nameserver。


nameserver 接收到后会更新 topic 的路由信息,如果之前接收到消息的 broker 没有全部覆盖到,因为 broker 会 30S 向 nameserver 发送一次心跳,心跳包里包含 topicconfig,覆盖到的 broker 会将自动创建好的 topicconfig 信息发送给 nameserver,从而在 nameserver 那边接收到后会注册这个新的 topic 信息,因为消费者每 30S 也会到 nameserver 去更新本地的 topicrouteinfo,请求发送到 nameserver 得到了之前覆盖到的 broker 发送的心跳包更新后的最新 topic 路由信息,那么未被覆盖的 broker 就永远不会加入到这个负载均衡了,就会造成负载均衡达不到预期了,即所有能自动创建 topic 的 broker 不能全部都参与进来。

参考资料

https://www.cnblogs.com/dingwpmz/p/11809404.html


https://www.pianshen.com/article/24191855587/


https://www.jianshu.com/p/c8fd57a7f741

发布于: 2022 年 01 月 19 日阅读数: 53
用户头像

浩宇天尚

关注

🏆InfoQ写作平台-签约作者🏆 2020.03.25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」Broker服务端自动创建topic的原理分析和问题要点指南_RocketMQ_浩宇天尚_InfoQ写作社区