写点什么

你知道 Kafka 创建 Topic 这个过程做了哪些事情吗?(附视频)

发布于: 2021 年 08 月 09 日
你知道Kafka创建Topic这个过程做了哪些事情吗?(附视频)

日常运维问题排查 怎么能够少了滴滴开源的 滴滴开源LogiKM一站式Kafka监控与管控平台


脚本参数

sh bin/kafka-topic -help 查看更具体参数

下面只是列出了跟--create 相关的参数


创建 Topic 脚本

zk 方式(不推荐)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
复制代码

需要注意的是--zookeeper 后面接的是 kafka 的 zk 配置, 假如你配置的是 localhost:2181/kafka 带命名空间的这种,不要漏掉了 

kafka 版本 >= 2.2 支持下面方式(推荐)

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
复制代码


更多 TopicCommand 相关命令请看

https://mp.weixin.qq.com/mp/appmsgalbum?__biz=Mzg4ODY1NTcxNg==&action=getalbum&album_id=1966026980307304450#wechat_redirect


当前分析的 kafka 源码版本为 kafka-2.5

创建 Topic 源码分析

温馨提示: 如果阅读源码略显枯燥,你可以直接看源码总结以及后面部分

首先我们找到源码入口处, 查看一下 kafka-topic.sh脚本的内容 exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@" 最终是执行了kafka.admin.TopicCommand这个类,找到这个地方之后就可以断点调试源码了,用 IDEA 启动 记得配置一下入参 比如: --create --bootstrap-server 127.0.0.1:9092 --partitions 3 --topic test_create_topic3

1. 源码入口

上面的源码主要作用是

  1. 根据是否有传入参数--zookeeper 来判断创建哪一种 对象topicService 如果传入了--zookeeper 则创建 类 ZookeeperTopicService的对象 否则创建类AdminClientTopicService的对象(我们主要分析这个对象)

  2. 根据传入的参数类型判断是创建 topic 还是删除等等其他 判断依据是 是否在参数里传入了--create

2. 创建 AdminClientTopicService 对象

val topicService = new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))

2.1 先创建 Admin

object AdminClientTopicService {    def createAdminClient(commandConfig: Properties, bootstrapServer: Option[String]): Admin = {      bootstrapServer match {        case Some(serverList) => commandConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, serverList)        case None =>      }      Admin.create(commandConfig)    }    def apply(commandConfig: Properties, bootstrapServer: Option[String]): AdminClientTopicService =      new AdminClientTopicService(createAdminClient(commandConfig, bootstrapServer))  }
复制代码
  1. 如果有入参--command-config ,则将这个文件里面的参数都放到 map commandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖

  2. 将上面的commandConfig 作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的 Client 模块的代码了, 从这里我们就可以看出,我们调用kafka-topic.sh脚本实际上是 kafka 模拟了一个客户端Client来创建 Topic 的过程;


3. AdminClientTopicService.createTopic 创建 Topic

topicService.createTopic(opts)

  case class AdminClientTopicService private (adminClient: Admin) extends TopicService {    override def createTopic(topic: CommandTopicPartition): Unit = {      //如果配置了副本副本数--replication-factor 一定要大于0      if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))        throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")       //如果配置了--partitions 分区数 必须大于0      if (topic.partitions.exists(partitions => partitions < 1))        throw new IllegalArgumentException(s"The partitions must be greater than 0")    //查询是否已经存在该Topic      if (!adminClient.listTopics().names().get().contains(topic.name)) {        val newTopic = if (topic.hasReplicaAssignment)          //如果指定了--replica-assignment参数;则按照指定的来分配副本          new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))        else {          new NewTopic(            topic.name,            topic.partitions.asJava,            topic.replicationFactor.map(_.toShort).map(Short.box).asJava)        }        // 将配置--config 解析成一个配置map        val configsMap = topic.configsToAdd.stringPropertyNames()          .asScala          .map(name => name -> topic.configsToAdd.getProperty(name))          .toMap.asJava        newTopic.configs(configsMap)        //调用adminClient创建Topic        val createResult = adminClient.createTopics(Collections.singleton(newTopic))        createResult.all().get()        println(s"Created topic ${topic.name}.")      } else {        throw new IllegalArgumentException(s"Topic ${topic.name} already exists")      }    }
复制代码
  1. 检查各项入参是否有问题

  2. adminClient.listTopics(),然后比较是否已经存在待创建的 Topic;如果存在抛出异常;

  3. 判断是否配置了参数--replica-assignment ; 如果配置了,那么 Topic 就会按照指定的方式来配置副本情况

  4. 解析配置--config 配置放到configsMap中; configsMap给到NewTopic对象

  5. 调用adminClient.createTopics创建 Topic; 它是如何创建 Topic 的呢?往下分析源码

3.1 KafkaAdminClient.createTopics(NewTopic) 创建 Topic

    @Override    public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,                                           final CreateTopicsOptions options) {              //省略部分源码...        Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),            new ControllerNodeProvider()) {            @Override            public CreateTopicsRequest.Builder createRequest(int timeoutMs) {                return new CreateTopicsRequest.Builder(                    new CreateTopicsRequestData().                        setTopics(topics).                        setTimeoutMs(timeoutMs).                        setValidateOnly(options.shouldValidateOnly()));            }            @Override            public void handleResponse(AbstractResponse abstractResponse) {                //省略            }            @Override            void handleFailure(Throwable throwable) {                completeAllExceptionally(topicFutures.values(), throwable);            }        };            }
复制代码

这个代码里面主要看下 Call 里面的接口; 先不管 Kafka 如何跟服务端进行通信的细节; 我们主要关注创建 Topic 的逻辑;

  1. createRequest会构造一个请求参数CreateTopicsRequest 例如下图

  2. 选择 ControllerNodeProvider 这个节点发起网络请求 可以清楚的看到, 创建 Topic 这个操作是需要 Controller 来执行的;


4. 发起网络请求

==>服务端客户端网络模型

5. Controller 角色的服务端接受请求处理逻辑

首先找到服务端处理客户端请求的 源码入口KafkaRequestHandler.run()

主要看里面的 apis.handle(request) 方法; 可以看到客户端的请求都在request.bodyAndSize()里面

5.1 KafkaApis.handle(request) 根据请求传递 Api 调用不同接口

进入方法可以看到根据request.header.apiKey 调用对应的方法,客户端传过来的是CreateTopics

5.2 KafkaApis.handleCreateTopicsRequest 处理创建 Topic 的请求

def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {    // 部分代码省略  //如果当前Broker不是属于Controller的话,就抛出异常    if (!controller.isActive) {      createTopicsRequest.data.topics.asScala.foreach { topic =>        results.add(new CreatableTopicResult().setName(topic.name).          setErrorCode(Errors.NOT_CONTROLLER.code))      }      sendResponseCallback(results)    } else {     // 部分代码省略    }      adminManager.createTopics(createTopicsRequest.data.timeoutMs,          createTopicsRequest.data.validateOnly,          toCreate,          authorizedForDescribeConfigs,          handleCreateTopicsResults)    }  }
复制代码
  1. 判断当前处理的 broker 是不是 Controller,如果不是 Controller 的话直接抛出异常,从这里可以看出,CreateTopic 这个操作必须是 Controller 来进行, 出现这种情况有可能是客户端发起请求的时候 Controller 已经变更;

  2. 鉴权 【Kafka 源码】kafka 鉴权机制

  3. 调用adminManager.createTopics()

5.3 adminManager.createTopics()

创建主题并等等主题完全创建,回调函数将会在超时、错误、或者主题创建完成时触发

该方法过长,省略部分代码

def createTopics(timeout: Int,                   validateOnly: Boolean,                   toCreate: Map[String, CreatableTopic],                   includeConfigsAndMetatadata: Map[String, CreatableTopicResult],                   responseCallback: Map[String, ApiError] => Unit): Unit = {    // 1. map over topics creating assignment and calling zookeeper    val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }    val metadata = toCreate.values.map(topic =>      try {          //省略部分代码         //检查Topic是否存在         //检查 --replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用         // 如果(--partitions || --replication-factor ) 没有设置,则使用 Broker的配置(这个Broker肯定是Controller)    // 计算分区副本分配方式        createTopicPolicy match {          case Some(policy) =>          //省略部分代码            adminZkClient.validateTopicCreate(topic.name(), assignments, configs)            if (!validateOnly)              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)          case None =>            if (validateOnly)             //校验创建topic的参数准确性              adminZkClient.validateTopicCreate(topic.name, assignments, configs)            else              //把topic相关数据写入到zk中              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)        }         }
复制代码
  1. 做一些校验检查 ①.检查 Topic 是否存在 ②. 检查--replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用 ③.如果(--partitions || --replication-factor ) 没有设置,则使用 Broker 的配置(这个 Broker 肯定是 Controller) ④.计算分区副本分配方式

  2. createTopicPolicy 根据 Broker 是否配置了创建 Topic 的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置 create.topic.policy.class.name=自定义类; 比如我就想所有创建 Topic 的请求分区数都要大于 10; 那么这里就可以实现你的需求了

  3. createTopicWithAssignment把 topic 相关数据写入到 zk 中; 进去分析一下


5.4 写入 zookeeper 数据

我们进入到adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)看看有哪些数据写入到了 zk 中;

  def createTopicWithAssignment(topic: String,                                config: Properties,                                partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {    validateTopicCreate(topic, partitionReplicaAssignment, config)    // 将topic单独的配置写入到zk中    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)    // 将topic分区相关信息写入zk中    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.mapValues(ReplicaAssignment(_)).toMap, isUpdate = false)  }
复制代码

源码就不再深入了,这里直接详细说明一下

写入 Topic 配置信息

  1. 先调用SetDataRequest请求往节点/config/topics/Topic名称 写入数据; 这里 一般这个时候都会返回 NONODE (NoNode);节点不存在; 假如 zk 已经存在节点就直接覆盖掉

  2. 节点不存在的话,就发起CreateRequest请求,写入数据; 并且节点类型是持久节点

这里写入的数据,是我们入参时候传的 topic 配置--config; 这里的配置会覆盖默认配置

写入 Topic 分区副本信息

  1. 将已经分配好的副本分配策略写入到 /brokers/topics/Topic名称 中; 节点类型 持久节点

具体跟 zk 交互的地方在 ZookeeperClient.send() 这里包装了很多跟 zk 的交互;

6. Controller 监听 /brokers/topics/Topic名称, 通知 Broker 将分区写入磁盘

Controller 有监听 zk 上的一些节点; 在上面的流程中已经在 zk 中写入了 /brokers/topics/Topic名称 ; 这个时候 Controller 就监听到了这个变化并相应;

KafkaController.processTopicChange

  private def processTopicChange(): Unit = {    //如果处理的不是Controller角色就返回    if (!isActive) return    //从zk中获取 `/brokers/topics 所有Topic    val topics = zkClient.getAllTopicsInCluster    //找出哪些是新增的    val newTopics = topics -- controllerContext.allTopics    //找出哪些Topic在zk上被删除了    val deletedTopics = controllerContext.allTopics -- topics    controllerContext.allTopics = topics        registerPartitionModificationsHandlers(newTopics.toSeq)    val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)    deletedTopics.foreach(controllerContext.removeTopic)    addedPartitionReplicaAssignment.foreach {      case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)    }    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +      s"[$addedPartitionReplicaAssignment]")    if (addedPartitionReplicaAssignment.nonEmpty)      onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)  }
复制代码
  1. 从 zk 中获取 /brokers/topics 所有 Topic 跟当前 Broker 内存中所有 BrokercontrollerContext.allTopics的差异; 就可以找到我们新增的 Topic; 还有在 zk 中被删除了的 Broker(该 Topic 会在当前内存中 remove 掉)

  2. 从 zk 中获取/brokers/topics/{TopicName} 给定主题的副本分配。并保存在内存中

  3. 执行onNewPartitionCreation;分区状态开始流转

6.1 onNewPartitionCreation 状态流转

关于 Controller 的状态机 详情请看: 【kafka源码】Controller中的状态机

  /**   * This callback is invoked by the topic change callback with the list of failed brokers as input.   * It does the following -   * 1. Move the newly created partitions to the NewPartition state   * 2. Move the newly created partitions from NewPartition->OnlinePartition state   */  private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {    info(s"New partition creation callback for ${newPartitions.mkString(",")}")    partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)    partitionStateMachine.handleStateChanges(      newPartitions.toSeq,      OnlinePartition,      Some(OfflinePartitionLeaderElectionStrategy(false))    )    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)  }
复制代码
  1. 将待创建的分区状态流转为NewPartition;

  2. 将待创建的副本 状态流转为NewReplica;

  3. 将分区状态从刚刚的NewPartition流转为OnlinePartition

  4. 获取leaderIsrAndControllerEpochs; Leader 为副本的第一个; 1. 向 zk 中写入/brokers/topics/{topicName}/partitions/ 持久节点; 无数据 2. 向 zk 中写入/brokers/topics/{topicName}/partitions/{分区号} 持久节点; 无数据 3. 向 zk 中写入/brokers/topics/{topicName}/partitions/{分区号}/state 持久节点; 数据为leaderIsrAndControllerEpoch

    向副本所属 Broker 发送leaderAndIsrRequest请求

    向所有 Broker 发送UPDATE_METADATA 请求

  5. 将副本状态从刚刚的NewReplica流转为OnlineReplica ,更新下内存

关于分区状态机和副本状态机详情请看【kafka源码】Controller中的状态机

7. Broker 收到 LeaderAndIsrRequest 创建本地 Log

上面步骤中有说到向副本所属 Broker 发送leaderAndIsrRequest请求,那么这里做了什么呢 其实主要做的是 创建本地 Log

代码太多,这里我们直接定位到只跟创建 Topic 相关的关键代码来分析 KafkaApis.handleLeaderAndIsrRequest->replicaManager.becomeLeaderOrFollower->ReplicaManager.makeLeaders...LogManager.getOrCreateLog

  /**   * 如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出 KafkaStorageException   */  def getOrCreateLog(topicPartition: TopicPartition, config: LogConfig, isNew: Boolean = false, isFuture: Boolean = false): Log = {    logCreationOrDeletionLock synchronized {      getLog(topicPartition, isFuture).getOrElse {        // create the log if it has not already been created in another thread        if (!isNew && offlineLogDirs.nonEmpty)          throw new KafkaStorageException(s"Can not create log for $topicPartition because log directories ${offlineLogDirs.mkString(",")} are offline")        val logDirs: List[File] = {          val preferredLogDir = preferredLogDirs.get(topicPartition)          if (isFuture) {            if (preferredLogDir == null)              throw new IllegalStateException(s"Can not create the future log for $topicPartition without having a preferred log directory")            else if (getLog(topicPartition).get.dir.getParent == preferredLogDir)              throw new IllegalStateException(s"Can not create the future log for $topicPartition in the current log directory of this partition")          }          if (preferredLogDir != null)            List(new File(preferredLogDir))          else            nextLogDirs()        }        val logDirName = {          if (isFuture)            Log.logFutureDirName(topicPartition)          else            Log.logDirName(topicPartition)        }        val logDir = logDirs          .toStream // to prevent actually mapping the whole list, lazy map          .map(createLogDirectory(_, logDirName))          .find(_.isSuccess)          .getOrElse(Failure(new KafkaStorageException("No log directories available. Tried " + logDirs.map(_.getAbsolutePath).mkString(", "))))          .get // If Failure, will throw        val log = Log(          dir = logDir,          config = config,          logStartOffset = 0L,          recoveryPoint = 0L,          maxProducerIdExpirationMs = maxPidExpirationMs,          producerIdExpirationCheckIntervalMs = LogManager.ProducerIdExpirationCheckIntervalMs,          scheduler = scheduler,          time = time,          brokerTopicStats = brokerTopicStats,          logDirFailureChannel = logDirFailureChannel)        if (isFuture)          futureLogs.put(topicPartition, log)        else          currentLogs.put(topicPartition, log)        info(s"Created log for partition $topicPartition in $logDir with properties " + s"{${config.originals.asScala.mkString(", ")}}.")        // Remove the preferred log dir since it has already been satisfied        preferredLogDirs.remove(topicPartition)        log      }    }  }
复制代码
  1. 如果日志已经存在,只返回现有日志的副本否则如果 isNew=true 或者如果没有离线日志目录,则为给定的主题和给定的分区创建日志 否则抛出KafkaStorageException

详细请看 【kafka 源码】LeaderAndIsrRequest 请求

源码总结

如果上面的源码分析,你不想看,那么你可以直接看这里的简洁叙述

  1. 根据是否有传入参数--zookeeper 来判断创建哪一种 对象topicService 如果传入了--zookeeper 则创建 类 ZookeeperTopicService的对象 否则创建类AdminClientTopicService的对象(我们主要分析这个对象)

  2. 如果有入参--command-config ,则将这个文件里面的参数都放到 mapl 类型 commandConfig里面, 并且也加入bootstrap.servers的参数;假如配置文件里面已经有了bootstrap.servers配置,那么会将其覆盖

  3. 将上面的commandConfig作为入参调用Admin.create(commandConfig)创建 Admin; 这个时候调用的 Client 模块的代码了, 从这里我们就可以猜测,我们调用kafka-topic.sh脚本实际上是 kafka 模拟了一个客户端 Client 来创建 Topic 的过程;

  4. 一些异常检查 ①.如果配置了副本副本数--replication-factor 一定要大于 0 ②.如果配置了--partitions 分区数 必须大于 0 ③.去 zk 查询是否已经存在该 Topic

  5. 判断是否配置了参数--replica-assignment ; 如果配置了,那么 Topic 就会按照指定的方式来配置副本情况

  6. 解析配置--config 配置放到configsMap中; configsMap 给到 NewTopic 对象

  7. 将上面所有的参数包装成一个请求参数CreateTopicsRequest ;然后找到是Controller的节点发起请求(ControllerNodeProvider)

  8. 服务端收到请求之后,开始根据CreateTopicsRequest来调用创建 Topic 的方法; 不过首先要判断一下自己这个时候是不是Controller; 有可能这个时候 Controller 重新选举了; 这个时候要抛出异常

  9. 服务端进行一下请求参数检查 ①.检查 Topic 是否存在 ②.检查 --replica-assignment参数和 (--partitions || --replication-factor ) 不能同时使用

  10. 如果(--partitions || --replication-factor ) 没有设置,则使用 Broker 的默认配置(这个 Broker 肯定是 Controller)

  11. 计算分区副本分配方式;如果是传入了 --replica-assignment;则会安装自定义参数进行组装;否则的话系统会自动计算分配方式; 具体详情请看 【kafka 源码】创建 Topic 的时候是如何分区和副本的分配规则

  12. createTopicPolicy根据 Broker 是否配置了创建 Topic 的自定义校验策略; 使用方式是自定义实现org.apache.kafka.server.policy.CreateTopicPolicy接口;并 在服务器配置 create.topic.policy.class.name=自定义类; 比如我就想所有创建 Topic 的请求分区数都要大于 10; 那么这里就可以实现你的需求了

  13. zk 中写入 Topic 配置信息 发起CreateRequest请求,这里写入的数据,是我们入参时候传的 topic 配置--config; 这里的配置会覆盖默认配置;并且节点类型是持久节点;path = /config/topics/Topic名称

  14. zk 中写入 Topic 分区副本信息 发起CreateRequest请求 ,将已经分配好的副本分配策略 写入到 /brokers/topics/Topic名称中; 节点类型 持久节点

  15. Controller监听 zk 上面的 topic 信息; 根据 zk 上变更的 topic 信息;计算出新增/删除了哪些 Topic; 然后拿到新增 Topic 的 副本分配信息; 并做一些状态流转

  16. 向新增 Topic 所在 Broker 发送leaderAndIsrRequest请求,

  17. Broker 收到发送leaderAndIsrRequest请求; 创建副本 Log 文件;


Q&A

创建 Topic 的时候 在 Zk 上创建了哪些节点

接受客户端请求阶段:

  1. topic 的配置信息 /config/topics/Topic名称 持久节点

  2. topic 的分区信息/brokers/topics/Topic名称 持久节点

Controller 监听 zk 节点/brokers/topics变更阶段

  1. /brokers/topics/{topicName}/partitions/持久节点; 无数据

  2. 向 zk 中写入/brokers/topics/{topicName}/partitions/{分区号} 持久节点; 无数据

  3. 向 zk 中写入/brokers/topics/{topicName}/partitions/{分区号}/state 持久节点;

创建 Topic 的时候 什么时候在 Broker 磁盘上创建的日志文件

当 Controller 监听 zk 节点/brokers/topics变更之后,将新增的 Topic 解析好的分区状态流转 NonExistentPartition->NewPartition->OnlinePartition 当流转到OnlinePartition的时候会像分区分配到的 Broker 发送一个leaderAndIsrRequest请求,当 Broker 们收到这个请求之后,根据请求参数做一些处理,其中就包括检查自身有没有这个分区副本的本地 Log;如果没有的话就重新创建;

如果我没有指定分区数或者副本数,那么会如何创建

我们都知道,如果我们没有指定分区数或者副本数, 则默认使用 Broker 的配置, 那么这么多 Broker,假如不小心默认值配置不一样,那究竟使用哪一个呢? 那肯定是哪台机器执行创建 topic 的过程,就是使用谁的配置; 所以是谁执行的? 那肯定是 Controller 啊! 上面的源码我们分析到了,创建的过程,会指定 Controller 这台机器去进行;

如果我手动删除了/brokers/topics/下的某个节点会怎么样?

详情请看 【kafka 实战】一不小心删除了/brokers/topics/下的某个 Topic

如果我手动在 zk 中添加/brokers/topics/{TopicName}节点会怎么样

先说结论: 根据上面分析过的源码画出的时序图可以指定; 客户端发起创建 Topic 的请求,本质上是去 zk 里面写两个数据

  1. topic 的配置信息 /config/topics/Topic名称 持久节点

  2. topic 的分区信息/brokers/topics/Topic名称 持久节点 所以我们绕过这一步骤直接去写入数据,可以达到一样的效果;不过我们的数据需要保证准确 因为在这一步已经没有了一些基本的校验了; 假如这一步我们写入的副本 Brokerid 不存在会怎样,从时序图中可以看到,leaderAndIsrRequest请求; 就不会正确的发送的不存在的 BrokerId 上,那么那台机器就不会创建 Log 文件;

下面不妨让我们来验证一下; 创建一个节点/brokers/topics/create_topic_byhand_zk 节点数据为下面数据;

{"version":2,"partitions":{"2":[3],"1":[3],"0":[3]},"adding_replicas":{},"removing_replicas":{}}

这里我用的工具PRETTYZOO手动创建的,你也可以用命令行创建; 创建完成之后我们再看看本地有没有生成一个 Log 文件 可以看到我们指定的 Broker,已经生成了对应的分区副本 Log 文件; 而且 zk 中也写入了其他的数据 在我们写入zk数据的时候,就已经确定好了哪个每个分区的Leader是谁了,那就是第一个副本默认为Leader


如果写入/brokers/topics/{TopicName}节点之后 Controller 挂掉了会怎么样

先说结论:Controller 重新选举的时候,会有一些初始化的操作; 会把创建过程继续下去

然后我们来模拟这么一个过程,先停止集群,然后再 zk 中写入/brokers/topics/{TopicName}节点数据; 然后再启动一台 Broker; 源码分析: 我们之前分析过 Controller 的启动过程与选举 有提到过,这里再提一下 Controller 当选之后有一个地方处理这个事情

replicaStateMachine.startup()
partitionStateMachine.startup()

启动状态机的过程是不是跟上面的 6.1 onNewPartitionCreation 状态流转 的过程很像; 最终都把状态流转到了OnlinePartition; 伴随着是不发起了leaderAndIsrRequest请求; 是不是 Broker 收到请求之后,创建本地 Log 文件了


附件

--config 可生效参数

请以sh bin/kafka-topic -help 为准

configurations:                                        cleanup.policy                                          compression.type                                        delete.retention.ms                                     file.delete.delay.ms                                    flush.messages                                          flush.ms                                                follower.replication.throttled.       replicas                                             index.interval.bytes                                  leader.replication.throttled.replicas                 max.compaction.lag.ms                                 max.message.bytes                                     message.downconversion.enable                         message.format.version                                message.timestamp.difference.max.ms                   message.timestamp.type                                min.cleanable.dirty.ratio                             min.compaction.lag.ms                                 min.insync.replicas                                   preallocate                                           retention.bytes                                       retention.ms                                          segment.bytes                                         segment.index.bytes                                   segment.jitter.ms                                     segment.ms                                            unclean.leader.election.enable
复制代码



Tips:如果关于本篇文章你有疑问,可以在公众号给我留言 


PS: 文章阅读的源码版本是 kafka-2.5 

发布于: 2021 年 08 月 09 日阅读数: 6
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

有想进《滴滴LogI开源用户群》 的加我个人微信: jjdlmn_ 进群(备注:进群) 群里面主要交流 kakfa、es、agent、以及其他技术 群内有专人解答疑问,你所问的都能得到回应 同名公众号: 石臻臻的杂货铺

评论 (1 条评论)

发布
用户头像
全套视频请关注 公众号获取
2021 年 08 月 09 日 20:24
回复
没有更多了
你知道Kafka创建Topic这个过程做了哪些事情吗?(附视频)