你知道 Kafka 创建 Topic 这个过程做了哪些事情吗?(附视频)
日常运维 、问题排查 怎么能够少了滴滴开源的 滴滴开源LogiKM一站式Kafka监控与管控平台
脚本参数
sh bin/kafka-topic -help
查看更具体参数
下面只是列出了跟--create
相关的参数
创建 Topic 脚本
zk 方式(不推荐)
需要注意的是--zookeeper 后面接的是 kafka 的 zk 配置, 假如你配置的是 localhost:2181/kafka 带命名空间的这种,不要漏掉了
kafka 版本 >= 2.2 支持下面方式(推荐)
更多 TopicCommand 相关命令请看
当前分析的 kafka 源码版本为 kafka-2.5
创建 Topic 源码分析
温馨提示: 如果阅读源码略显枯燥,你可以直接看源码总结以及后面部分
首先我们找到源码入口处, 查看一下 kafka-topic.sh
脚本的内容 exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
最终是执行了kafka.admin.TopicCommand
这个类,找到这个地方之后就可以断点调试源码了,用 IDEA 启动 记得配置一下入参 比如: --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic test_create_topic3
1. 源码入口
上面的源码主要作用是
根据是否有传入参数
--zookeeper
来判断创建哪一种 对象topicService
如果传入了--zookeeper
则创建 类ZookeeperTopicService
的对象 否则创建类AdminClientTopicService
的对象(我们主要分析这个对象)根据传入的参数类型判断是创建 topic 还是删除等等其他 判断依据是 是否在参数里传入了
--create
2. 创建 AdminClientTopicService 对象
val topicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))
2.1 先创建 Admin
如果有入参
--command-config
,则将这个文件里面的参数都放到 mapcommandConfig
里面, 并且也加入bootstrap.servers
的参数;假如配置文件里面已经有了bootstrap.servers
配置,那么会将其覆盖将上面的
commandConfig
作为入参调用Admin.create(commandConfig)
创建 Admin; 这个时候调用的 Client 模块的代码了, 从这里我们就可以看出,我们调用kafka-topic.sh
脚本实际上是 kafka 模拟了一个客户端Client
来创建 Topic 的过程;
3. AdminClientTopicService.createTopic 创建 Topic
topicService.createTopic(opts)
检查各项入参是否有问题
adminClient.listTopics()
,然后比较是否已经存在待创建的 Topic;如果存在抛出异常;判断是否配置了参数
--replica-assignment
; 如果配置了,那么 Topic 就会按照指定的方式来配置副本情况解析配置
--config
配置放到configsMap
中;configsMap
给到NewTopic
对象调用
adminClient.createTopics
创建 Topic; 它是如何创建 Topic 的呢?往下分析源码
3.1 KafkaAdminClient.createTopics(NewTopic) 创建 Topic
这个代码里面主要看下 Call 里面的接口; 先不管 Kafka 如何跟服务端进行通信的细节; 我们主要关注创建 Topic 的逻辑;
createRequest
会构造一个请求参数CreateTopicsRequest
例如下图选择 ControllerNodeProvider 这个节点发起网络请求 可以清楚的看到, 创建 Topic 这个操作是需要 Controller 来执行的;
4. 发起网络请求
5. Controller 角色的服务端接受请求处理逻辑
首先找到服务端处理客户端请求的 源码入口 ⇒ KafkaRequestHandler.run()
主要看里面的 apis.handle(request)
方法; 可以看到客户端的请求都在request.bodyAndSize()
里面
5.1 KafkaApis.handle(request) 根据请求传递 Api 调用不同接口
进入方法可以看到根据request.header.apiKey
调用对应的方法,客户端传过来的是CreateTopics
5.2 KafkaApis.handleCreateTopicsRequest 处理创建 Topic 的请求
判断当前处理的 broker 是不是 Controller,如果不是 Controller 的话直接抛出异常,从这里可以看出,CreateTopic 这个操作必须是 Controller 来进行, 出现这种情况有可能是客户端发起请求的时候 Controller 已经变更;
鉴权 【Kafka 源码】kafka 鉴权机制
调用
adminManager.createTopics()
5.3 adminManager.createTopics()
创建主题并等等主题完全创建,回调函数将会在超时、错误、或者主题创建完成时触发
该方法过长,省略部分代码
做一些校验检查 ①.检查 Topic 是否存在 ②. 检查
--replica-assignment
参数和 (--partitions || --replication-factor
) 不能同时使用 ③.如果(--partitions || --replication-factor
) 没有设置,则使用 Broker 的配置(这个 Broker 肯定是 Controller) ④.计算分区副本分配方式createTopicPolicy
根据 Broker 是否配置了创建 Topic 的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy
接口;并 在服务器配置create.topic.policy.class.name=自定义类
; 比如我就想所有创建 Topic 的请求分区数都要大于 10; 那么这里就可以实现你的需求了createTopicWithAssignment
把 topic 相关数据写入到 zk 中; 进去分析一下
5.4 写入 zookeeper 数据
我们进入到adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
看看有哪些数据写入到了 zk 中;
源码就不再深入了,这里直接详细说明一下
写入 Topic 配置信息
先调用
SetDataRequest
请求往节点/config/topics/Topic名称
写入数据; 这里 一般这个时候都会返回NONODE (NoNode)
;节点不存在; 假如 zk 已经存在节点就直接覆盖掉节点不存在的话,就发起
CreateRequest
请求,写入数据; 并且节点类型是持久节点
这里写入的数据,是我们入参时候传的 topic 配置--config
; 这里的配置会覆盖默认配置
写入 Topic 分区副本信息
将已经分配好的副本分配策略写入到
/brokers/topics/Topic名称
中; 节点类型 持久节点
具体跟 zk 交互的地方在 ZookeeperClient.send()
这里包装了很多跟 zk 的交互;
6. Controller 监听 /brokers/topics/Topic名称
, 通知 Broker 将分区写入磁盘
Controller 有监听 zk 上的一些节点; 在上面的流程中已经在 zk 中写入了
/brokers/topics/Topic名称
; 这个时候 Controller 就监听到了这个变化并相应;
KafkaController.processTopicChange
从 zk 中获取
/brokers/topics
所有 Topic 跟当前 Broker 内存中所有 BrokercontrollerContext.allTopics
的差异; 就可以找到我们新增的 Topic; 还有在 zk 中被删除了的 Broker(该 Topic 会在当前内存中 remove 掉)从 zk 中获取
/brokers/topics/{TopicName}
给定主题的副本分配。并保存在内存中执行
onNewPartitionCreation
;分区状态开始流转
6.1 onNewPartitionCreation 状态流转
关于 Controller 的状态机 详情请看: 【kafka源码】Controller中的状态机
将待创建的分区状态流转为
NewPartition
;将待创建的副本 状态流转为
NewReplica
;将分区状态从刚刚的
NewPartition
流转为OnlinePartition
获取
leaderIsrAndControllerEpochs
; Leader 为副本的第一个; 1. 向 zk 中写入/brokers/topics/{topicName}/partitions/
持久节点; 无数据 2. 向 zk 中写入/brokers/topics/{topicName}/partitions/{分区号}
持久节点; 无数据 3. 向 zk 中写入/brokers/topics/{topicName}/partitions/{分区号}/state
持久节点; 数据为leaderIsrAndControllerEpoch
向副本所属 Broker 发送
leaderAndIsrRequest
请求向所有 Broker 发送
UPDATE_METADATA
请求将副本状态从刚刚的
NewReplica
流转为OnlineReplica
,更新下内存
关于分区状态机和副本状态机详情请看【kafka源码】Controller中的状态机
7. Broker 收到 LeaderAndIsrRequest 创建本地 Log
上面步骤中有说到向副本所属 Broker 发送
leaderAndIsrRequest
请求,那么这里做了什么呢 其实主要做的是 创建本地 Log代码太多,这里我们直接定位到只跟创建 Topic 相关的关键代码来分析
KafkaApis.handleLeaderAndIsrRequest->replicaManager.becomeLeaderOrFollower->ReplicaManager.makeLeaders...LogManager.getOrCreateLog
如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出
KafkaStorageException
详细请看 【kafka 源码】LeaderAndIsrRequest 请求
源码总结
如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述
根据是否有传入参数
--zookeeper
来判断创建哪一种 对象topicService
如果传入了--zookeeper
则创建 类ZookeeperTopicService
的对象 否则创建类AdminClientTopicService
的对象(我们主要分析这个对象)如果有入参
--command-config
,则将这个文件里面的参数都放到 mapl 类型commandConfig
里面, 并且也加入bootstrap.servers
的参数;假如配置文件里面已经有了bootstrap.servers
配置,那么会将其覆盖将上面的
commandConfig
作为入参调用Admin.create(commandConfig)
创建 Admin; 这个时候调用的 Client 模块的代码了, 从这里我们就可以猜测,我们调用kafka-topic.sh
脚本实际上是 kafka 模拟了一个客户端 Client 来创建 Topic 的过程;一些异常检查 ①.如果配置了副本副本数--replication-factor 一定要大于 0 ②.如果配置了--partitions 分区数 必须大于 0 ③.去 zk 查询是否已经存在该 Topic
判断是否配置了参数
--replica-assignment
; 如果配置了,那么 Topic 就会按照指定的方式来配置副本情况解析配置
--config
配置放到configsMap
中; configsMap 给到 NewTopic 对象将上面所有的参数包装成一个请求参数
CreateTopicsRequest
;然后找到是Controller
的节点发起请求(ControllerNodeProvider
)服务端收到请求之后,开始根据
CreateTopicsRequest
来调用创建 Topic 的方法; 不过首先要判断一下自己这个时候是不是Controller
; 有可能这个时候 Controller 重新选举了; 这个时候要抛出异常服务端进行一下请求参数检查 ①.检查 Topic 是否存在 ②.检查
--replica-assignment
参数和 (--partitions
||--replication-factor
) 不能同时使用如果(
--partitions
||--replication-factor
) 没有设置,则使用 Broker 的默认配置(这个 Broker 肯定是 Controller)计算分区副本分配方式;如果是传入了
--replica-assignment
;则会安装自定义参数进行组装;否则的话系统会自动计算分配方式; 具体详情请看 【kafka 源码】创建 Topic 的时候是如何分区和副本的分配规则createTopicPolicy
根据 Broker 是否配置了创建 Topic 的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy
接口;并 在服务器配置create.topic.policy.class.name
=自定义类; 比如我就想所有创建 Topic 的请求分区数都要大于 10; 那么这里就可以实现你的需求了zk 中写入 Topic 配置信息 发起
CreateRequest
请求,这里写入的数据,是我们入参时候传的 topic 配置--config
; 这里的配置会覆盖默认配置;并且节点类型是持久节点;path =/config/topics/Topic名称
zk 中写入 Topic 分区副本信息 发起
CreateRequest
请求 ,将已经分配好的副本分配策略 写入到/brokers/topics/Topic名称
中; 节点类型 持久节点Controller
监听 zk 上面的 topic 信息; 根据 zk 上变更的 topic 信息;计算出新增/删除了哪些 Topic; 然后拿到新增 Topic 的 副本分配信息; 并做一些状态流转向新增 Topic 所在 Broker 发送
leaderAndIsrRequest
请求,Broker 收到
发送leaderAndIsrRequest请求
; 创建副本 Log 文件;
Q&A
创建 Topic 的时候 在 Zk 上创建了哪些节点
接受客户端请求阶段:
topic 的配置信息
/config/topics/Topic名称
持久节点topic 的分区信息
/brokers/topics/Topic名称
持久节点Controller 监听 zk 节点
/brokers/topics
变更阶段
/brokers/topics/{topicName}/partitions/
持久节点; 无数据向 zk 中写入
/brokers/topics/{topicName}/partitions/{分区号}
持久节点; 无数据向 zk 中写入
/brokers/topics/{topicName}/partitions/{分区号}/state
持久节点;
创建 Topic 的时候 什么时候在 Broker 磁盘上创建的日志文件
当 Controller 监听 zk 节点
/brokers/topics
变更之后,将新增的 Topic 解析好的分区状态流转NonExistentPartition
->NewPartition
->OnlinePartition
当流转到OnlinePartition
的时候会像分区分配到的 Broker 发送一个leaderAndIsrRequest
请求,当 Broker 们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地 Log;如果没有的话就重新创建;
如果我没有指定分区数或者副本数,那么会如何创建
我们都知道,如果我们没有指定分区数或者副本数, 则默认使用 Broker 的配置, 那么这么多 Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建 topic 的过程,就是使用谁的配置; 所以是谁执行的? 那肯定是 Controller 啊! 上面的源码我们分析到了,创建的过程,会指定 Controller 这台机器去进行;
如果我手动删除了/brokers/topics/
下的某个节点会怎么样?
详情请看 【kafka 实战】一不小心删除了
/brokers/topics/
下的某个 Topic
如果我手动在 zk 中添加/brokers/topics/{TopicName}
节点会怎么样
先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建 Topic 的请求,本质上是去 zk 里面写两个数据
topic 的配置信息
/config/topics/Topic名称
持久节点topic 的分区信息
/brokers/topics/Topic名称
持久节点 所以我们绕过这一步骤直接去写入数据,可以达到一样的效果;不过我们的数据需要保证准确 因为在这一步已经没有了一些基本的校验了; 假如这一步我们写入的副本 Brokerid 不存在会怎样,从时序图中可以看到,leaderAndIsrRequest请求
; 就不会正确的发送的不存在的 BrokerId 上,那么那台机器就不会创建 Log 文件;下面不妨让我们来验证一下; 创建一个节点
/brokers/topics/create_topic_byhand_zk
节点数据为下面数据;{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}
这里我用的工具
PRETTYZOO
手动创建的,你也可以用命令行创建; 创建完成之后我们再看看本地有没有生成一个 Log 文件 可以看到我们指定的 Broker,已经生成了对应的分区副本 Log 文件; 而且 zk 中也写入了其他的数据在我们写入zk数据的时候,就已经确定好了哪个每个分区的Leader是谁了,那就是第一个副本默认为Leader
如果写入/brokers/topics/{TopicName}
节点之后 Controller 挂掉了会怎么样
先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去
然后我们来模拟这么一个过程,先停止集群,然后再 zk 中写入
/brokers/topics/{TopicName}
节点数据; 然后再启动一台 Broker; 源码分析: 我们之前分析过 Controller 的启动过程与选举 有提到过,这里再提一下 Controller 当选之后有一个地方处理这个事情replicaStateMachine.startup() partitionStateMachine.startup()
启动状态机的过程是不是跟上面的 6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了
OnlinePartition
; 伴随着是不发起了leaderAndIsrRequest
请求; 是不是 Broker 收到请求之后,创建本地 Log 文件了
附件
--config 可生效参数
请以sh bin/kafka-topic -help
为准
Tips:如果关于本篇文章你有疑问,可以在公众号给我留言
PS: 文章阅读的源码版本是 kafka-2.5
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/b01322190bc161d6be34637fc】。文章转载请联系作者。
评论 (1 条评论)