写点什么

Kafka 集群缩容实战

发布于: 2020 年 08 月 04 日
Kafka集群缩容实战

CDH 版本5.15.1,Kafka版本1.0.1



由于历史原因,我们有两套Kafka集群,老集群的规模对于它所承载的流量有些过剩,为了充分利用资源,需要对老集群进行缩容。

本文记录了Kafka集群缩容的过程,仅供参考。



缩容过程中用到的脚本已经上传到github:https://github.com/iamabug/kafka-shrink-scripts

公众号:大数据学徒

缩容思路

假设集群有15台机器,预计缩到10台机器,那么需要做5次缩容操作,每次将一个节点下线,那么现在问题就是如何正确、安全地从Kafka集群中移除一台broker?搞定这个之后,重复5次即可。



众所周知,一个broker下线,它上面的所有partition都会处于副本不足的状态,并且Kafka集群不会在其它的broker上生成这些副本,因此,在将一个broker从集群中移除之前,需要将这个broker上的partition副本都转移到最终会保留的10台机器上,怎么实现这个呢?Kafka自带的分区重分配工具



在集群数据量较大的情况下,分区的转移可能会花费较长时间,那么在转移过程中最好不要创建新topic,不然新的topic有可能又创建到要被移除的broker上,当然如果实在无法避免的话,可以再对新的topic进行一次额外的转移。



总结一下,从Kafka集群中移除一个broker的步骤如下:

  1. 在Cloudera Manager里获取所有broker的broker id,选择一个待移除的broker(一般来说选取主机名序号靠后的broker,因为很有可能用户使用的bootstrap.servers是靠前的几台broker,如果移除了前面的broker,有可能造成用户无法连接集群)

  2. 使用 kafka-reassign-partitions 脚本将待移除broker上的partition均匀地转移到最终会留在集群的broker上。

  3. 确认待移除broker上没有任何partition之后,在Cloudera Manager里对这个broker进行停止和删除。

其中重点是partition的转移或者说重分配。

分区重分配

分区重分配主要使用的是kafka-reassign-partitions脚本,参考Kafka官方文档的说明,这个脚本有三种模式:

  • --generate,给定topic列表和broker列表,生成一个备选的重分配方案,重分配方案是一个JSON;

  • --execute,根据一个JSON文件里的重分配方案,进行分区重分配;

  • --verify,根据集群当前partition的分布情况,验证和JSON文件提供的重分配方案是否一致;

根据这个脚本的使用方式,分区重分配包括以下几步。

获取broker id

在测试集群的CM中Kafka的配置页面里,在搜索栏里输入“id”,可以看到broker id和主机名的对应关系:

可以看到,共有5个broker,broker id分别为150,151,152,165,294,选择 294 作为被移除的broker。

获取topic列表及输出格式转换

使用 kafka-topics 脚本获取集群所有的topic:

$ kafka-topics --list --zookeeper ${ZK_HOSTS}

把输出的topic列表保存在 topics.txt 里,然后用python脚本把它处理成json格式:

import json
obj = {}
obj["version"] = 1
obj["topics"] = []
with open("topics.txt") as f:
for line in f.readlines():
topic = {"topic": line.strip()}
obj["topics"].append(topic)
with open("topics.json", "w") as f:
json.dump(obj, f, indent=2)

输出的json文件是 topics.json,其中的内容格式如下:

{
"version": 1,
"topics": [
{
"topic": "first"
},
{
"topic": "second"
},
...
]
}

获取当前partition分配方案

使用 kafka-reassign-partitions 脚本的 --generate 来获取当前的partition分配方案:

$ kafka-reassign-partitions --zookeeper ${ZK_HOSTS} --topics-to-move-json-file topics.json --broker-list "150,151,152,165" --generate
# 注意 --broker-list 这个参数传入的值里面没有 294 这个broker id

这个命令会有非常多的输出,分为两部分,一部分是当前partition的分配情况,一部分是这个脚本生成的重分配方案:

Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "first",
"partition": 0,
"replicas": [150, 151, 294]
},
...
]
}
Proposed partition reassignment configuration
{
"version": 1,
"partitions": [
{
"topic": "first",
"partition": 0,
"replicas": [150, 151, 165]
},
...
]
}

可以看到,脚本生成的重分配方案里面,replicas里面是没有 294 这个broker id的。



注意:通过比较可以发现,脚本生成的重分配方案不光将原来在294上的partition转移到了其它broker上,即使和294无关的partition也会被调整,这样既增大了重分配的耗时,也带来不必要的leader切换,不是特别理想,在这种情况下,可以手动生成重分配方案。

生成重分配方案

我希望的重分配方案是:把partition从待移除的broker上转移到不会被移除的broker上,且不存在同一partition的两个副本在同一个broker上的情况即可。



将上一步输出的当前分配方案保存到为 current.json,然后使用python脚本进行处理:

import json
import random
BROKER_ID = 294
# 不会被移除的broker id列表
BROKER_IDS = [150, 151, 152, 165]
n = len(BROKER_IDS)
topics = {}
with open("current.json") as f:
topics = json.load(f)
# 遍历每个partition
for partition in topics["partitions"]:
# 遍历当前partition的每个副本
replicas = partition['replicas']
for i in range(len(replicas)):
# 当前副本需要转移
if replicas[i] == BROKER_ID:
# 随机生成一个broker id,确保新的副本不和当前partition的其他副本在同一个broker上
choice = random.randint(0, n-1)
while BROKER_IDS[choice] in replicas:
choice = random.randint(0, n-1)
#print("before: ", replicas)
replicas[i] = BROKER_IDS[choice]
#print("after: ", replicas)
with open("reassigned.json", "w") as f:
json.dump(topics, f, indent=2)

生成的重分配方案保存在 reassigned.json 中。

执行分区重分配

执行分区重分配命令:

$ kafka-reassign-partitions --zookeeper ${ZK_HOSTS} --reassignment-json-file reassigned.json --execute

这个命令是在后台运行的,如果你安装了kafka-manager的话,可以在“Reassign Partitions”页面看到状态:



pending 表示还未完成。

参考时间:15个节点、160T总磁盘占用的Kafka集群对一个节点做分区重分配大约花费时间为6个小时。

注意事项

在分区重分配的过程中,因为partition的leader会发生切换,客户端有可能报 NotLeaderForPartition 的错,这个报错一般来说是可恢复的,但还是需要密切关注。

验证

可以通过三种方式进行验证:

  1. 执行验证命令,过滤包含successfully的输出:

$ kafka-reassign-partitions --zookeeper ${ZK_HOSTS} --reassignment-json-file reassigned.json --verify | grep -v successsfully

在重分配完成之前会得到很多类似 Reassignment of partition first-0 is still in progress 的输出,完成之后执行的话就没有太多输出。

  1. 通过kafka-manager进行验证,既可以检查“Reassign Partitions”页面的Completed是否还显示pending,也可以通过brokers页面进入到待移除broker的页面,查看topic数和分区数:



topic数、分区数都是0,符合预期。

  1. 登录到待移除broker的机器上,查看kafka的数据目录下是否有分区目录:

# 假设kafka的数据目录为/data*/kafka/data
$ ls /data*/kafka/data
/data01/kafka/data:
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint
log-start-offset-checkpoint recovery-point-offset-checkpoint
/data02/kafka/data:
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint
log-start-offset-checkpoint recovery-point-offset-checkpoint
...

可以看到,kafka的数据目录下只剩下kafka自己的数据文件,分区重分配成功。

broker下线

确认待移除的broker上没有任何分区之后,在CM里,先停止broker,再删除broker,broker下线完成。

根据需要下线的broker数量,重复上面的操作,就可以实现多台broker的缩容。



欢迎批评指正沟通建议。



公众号:大数据学徒



发布于: 2020 年 08 月 04 日阅读数: 186
用户头像

活到秃学到秃 2019.01.08 加入

专注大数据运维开发

评论

发布
暂无评论
Kafka集群缩容实战