写点什么

教你 3 种 Kafka 的指定副本作为 Leader 的实现方式

  • 2022 年 3 月 04 日
  • 本文字数:3578 字

    阅读完需:约 12 分钟

摘要:因为在我们实际的运维过程中,需要指定某个副本为 ISR,但是 Kafka 中的 Leader 选举策略并不支持这个功能,所以需要我们自己来实现它。


本文分享自华为云社区《Kafka的指定副本作为Leader的三种实现方式》,作者:石臻臻的杂货铺。

前几天有个群友问到: kafka 如何修改优先副本? 他们有个需求是, 想指定某个分区中的其中一个副本为 Leader。

需求分析

对于这么一个问题,在我们生产环境还是挺常见的,经常有需要修改某个 Topic 中某分区的 Leader


比如 topic1-0 这个分区有 3 个副本[0,1,2], 按照「优先副本」的规则那么 0 号副本肯定就是 Leader 了 我们都知道分区中的只有 Leader 副本才会提供读写副本其他副本作为备份 假如在某些情况下,「0」 号副本性能资源不够,或者网络不太好,或者 IO 压力比较大那么肯定对 Topic 的整体读写性能有很大影响, 这个时候切换一台压力较小副本作为 Leader 就显得很重要;优先副本: 分区中的 AR(所有副本)信息, 优先选择排在第一位的副本作为 Leader Leader 机制: 分区中只有一个 Leader 来承担读写,其他副本只是作为备份


那么如何实现这样一个需求呢?

解决方案

知道了原理之后,我们就能想到对应的解决方案了 只要将 分区的 AR 中的第一个位置,替换成你指定副本就行了;AR = { 0,1,2 } ==> AR = {2,1,0}


一般能够达到这个目的有两种方案,下面我们来分析一下

方案一: 分区副本重分配 (低成本方案)

一般分区副本重分配主要有三个流程

  1. 生成推荐的迁移 Json 文件

  2. 执行迁移 Json 文件

  3. 验证迁移流程是否完成


这里我们主要看第 2 步骤, 来看看迁移文件一般是什么样子的

{	"version": 1,	"partitions": [{		"topic": "topic1",		"partition": 0,		"replicas": [0,1,2]	}]}
复制代码

这个迁移 Json 意思是, 把 topic1 的「0」号分区的副本分配成[0,1,2] ,也就是说 topic1-0 号分区最终有 3 个副本分别在 {brokerId-0,brokerId-1,brokerId-2} ;


又根据 Leader 的选举策略得知,不管是什么策略的选择,都是按照 AR 的顺序来选的

修改 AR 顺序

AR: 副本的分配顺序


那么我们想要实现我们的需求是不是把这个 Json 文件 中的 “replicas”: [0,1,2] 改一下就行了比如改成 “replicas”: [2,1,0] ,改完 Json 后执行,执行 execute, 正式开始重分配流程!迁移完成之后, 就会发现,Leader 已经变成上面的第一个位置的副本「2」 了

执行 Leader 选举


修改完 AR 顺序就结束了吗?


可以说是结束了,也可以说没有结束。


上面只是修改了 AR 的顺序, 但是没有执行 Leader 选举呀,这个时候 Leader 还是原来的,所以我们需要主动触发一下 Leader 选举

## 石臻臻的杂货铺## 微信: szzdzhp001
sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1 --election-type PREFERRED --partition 0
复制代码

这样就会立马切换成我们想要的 Leader 了。

也可以不主动触发,等 Controller 自动均衡。

如果你觉得主动触发这个很麻烦,那么没有关系,那就不执行,如果你开启了自动均衡策略的话,默认是开启的。

延伸: 自动均机制

当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。这意味着在默认情况下,当这个 broker 重新启动之后,它的所有分区都将仅作为 follower,不再用于客户端的读写操作。


为了避免这种不平衡,Kafka 有一个优先副本的概念。如果一个分区的副本列表是 1,5,9,节点 1 将优先作为其他两个副本 5 和 9 的 leader。


Controller 会有一个定时任务,定期执行优先副本选举,这样就不会导致负载不均衡和资源浪费,这就是 leader 的自动均衡机制

优缺点

优点: 实现了需求, 不需要改源码,也没有额外的开发工作。

缺点: 操作比较复杂容易出错,需要先获取原先的分区分配数据,然后手动修改 Json 文件,这里比较容易出错,影响会比较大,当然这些都可以通过校验接口来做好限制, 最重要的一点是 副本重分配当前只能有一个任务 !假如你当前有一个「副本重分配」的任务在,那么这里就不能够执行了。

方案二: 手动修改 AR 顺序(高成本方案)

  1. 从 zk 中获取/brokers/topics/{topic 名称}节点数据。

  2. 手动调整一下里面的顺序

  3. 将调整后的数据,重新覆盖掉之前的节点。

  4. 删除 zk 中的/Controller 节点,让它触发重新加载,并且同时触发 Leader 选举。

例如:

修改的时候请先用 get 获取数据,在那个基础上改,因为不同版本,里面的数据结构是不一样的,我们只需要改分区 AR 顺序就行了 “partitions”:{“0”:[0,1,2]}

## get zk 节点数据。
get /szz1/brokers/topics/Topic2
## zk中的修改命令set /szz1/brokers/topics/Topic2 {"version":2,"partitions":{"0":[0,1,2]},"adding_replicas":{},"removing_replicas":{}}
复制代码

为什么要删除 Controller 的 zk 节点?


之所以删除 Controller 节点,是因为我们手动修改了 zk 节点数据之后,因为没有副本的新增,是不会触发 Controller 去更新 AR 内存的,就算你主动触发 Leader 选举,AR 还是以前的,并不会达到想要的效果。


删除 zk 中的/Controller 节点,会触发 Controller 重新选举,重新选举会重新加载所有元数据,所以我们刚刚加载的数据就会生效, 同时 Controller 重新加载也会触发 Leader 选举


简单代码

当然上面功能,手动改起来麻烦,那么饿肯定是要集成到 LogiKM 3.0 中的咯;

优缺点

优点: 实现了目标需求, 简单, 操作方便

缺点: 频繁的 Controller 重选举对生产环境来说会有一些影响;

方案三:修改源码(高级方案推荐)

我们方案二中的问题就是需要删除/Controller 节点发送重新选举,我们能不能不重新选举 Controller 也能生效呢?

如何让修改后的 AR 立即生效 ?


Controller 会监听每一个 topic 的节点/brokers/topics/{topic 名称}

KafkaController#processPartitionModifications

/*** 石臻臻的杂货铺* 微信:szzdzhp001* 省略部分代码**/ private def processPartitionModifications(topic: String): Unit = {    def restorePartitionReplicaAssignment(      topic: String,      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]    ): Unit = {          val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>      controllerContext.partitionReplicaAssignment(topicPartition).isEmpty    }
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) { } else if (partitionsToBeAdded.nonEmpty) { info(s"New partitions to be added $partitionsToBeAdded") partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) => controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas) } onNewPartitionCreation(partitionsToBeAdded.keySet) } } }
复制代码

这段代码省略了很多,我想让你看到的是:

只有新增了副本,才会执行更新 Controller 的内存操作。

那么我们在这里面新增一段逻辑


新增逻辑:如果只是变更了 AR 的顺序,那么我们也更新一下内存。


来我们改一下源码

 // 1. 找到 AR 顺序有变更的 所有TopicPartition    val partitionsOrderChange = partitionReplicaAssignment.filter { case (topicPartition, _) =>      //这里自己写下过滤逻辑 把只是顺序变更的分区找出      true    }     if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {      if (partitionsToBeAdded.nonEmpty) {       } else {       }    } else if (partitionsToBeAdded.nonEmpty) {      info(s"New partitions to be added $partitionsToBeAdded")      partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)      }      onNewPartitionCreation(partitionsToBeAdded.keySet)    }else if (partitionsOrderChange.nonEmpty) {      // ② .在这里加个逻辑      info(s"OrderChange partitions to be updatecache $partitionsToBeAdded")      partitionsOrderChange.foreach { case (topicPartition, assignedReplicas) =>        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)      }    }
复制代码

改成这样之后,上面的流程就变成了

  1. 从 zk 中获取/brokers/topics/{topic 名称}节点数据。

  2. 手动调整一下里面的顺序

  3. 将调整后的数据,重新覆盖掉之前的节点。

  4. 手动执行一次,优先副本选举。

思考

方案三改了之后会对其他的流程有影响吗?


上面更改的方法,一般是在分区副本重分配或者新增分区的时候会触发。


上面新增的逻辑并不会对现有流程有影响,因为假设都是上面的场景的情况下,他们都是会主动更新内存的。

在我看来,这里的改动,完全可以向 kafka 社区提一个 Pr. 来“修复”这个问题。


因为提了这个 PR,对我们有收益,没有额外的开销!


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 4
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
教你3种Kafka的指定副本作为Leader的实现方式_Leader_华为云开发者社区_InfoQ写作平台