写点什么

云原生系列五:Kafka 集群数据迁移基于 Kubernetes 的内部

作者:叶秋学长
  • 2022 年 8 月 08 日
  • 本文字数:2731 字

    阅读完需:约 9 分钟


1.概述


Kafka 的使用场景非常广泛,一些实时流数据业务场景,均依赖 Kafka 来做数据分流。而在分布式应用场景中,数据迁移是一个比较常见的问题。关于 Kafka 集群数据如何迁移,今天叶秋学长将为大家详细介绍。

2.内容

本篇博客为大家介绍两种迁移场景,分别是同集群数据迁移、跨集群数据迁移。如下图所示:



点击并拖拽以移动

​编辑

 2.1 同集群迁移

同集群之间数据迁移,比如在已有的集群中新增了一个 Broker 节点,此时需要将原来集群中已有的 Topic 的数据迁移部分到新的集群中,缓解集群压力。

将新的节点添加到 Kafka 集群很简单,只需为它们分配一个唯一的 Broker ID,并在新服务器上启动 Kafka。但是,这些新服务器节点不会自动分配任何数据分区,因此除非将分区移动到新增的节点,否则在创建新 Topic 之前新节点不会执行任何操作。因此,通常在将新服务器节点添加到 Kafka 集群时,需要将一些现有数据迁移到这些新的节点。

迁移数据的过程是手动启动的,执行过程是完全自动化的。在 Kafka 后台服务中,Kafka 将添加新服务器作为其正在迁移的分区的 Follower,并允许新增节点完全复制该分区中的现有数据。当新服务器节点完全复制此分区的内容并加入同步副本(ISR)时,其中一个现有副本将删除其分区的数据。

Kafka 系统提供了一个分区重新分配工具(kafka-reassign-partitions.sh),该工具可用于在 Broker 之间迁移分区。理想情况下,将确保所有 Broker 的数据和分区均匀分配。分区重新分配工具无法自动分析 Kafka 群集中的数据分布并迁移分区以实现均匀的负载均衡。因此,管理员在操作的时候,必须弄清楚应该迁移哪些 Topic 或分区。

分区重新分配工具可以在 3 种互斥模式下运行:

  • --generate:在此模式下,给定 Topic 列表和 Broker 列表,该工具会生成候选重新​​分配,以将指定 Topic 的所有分区迁移到新 Broker 中。此选项仅提供了一种方便的方法,可在给定 Topic 和目标 Broker 列表的情况下生成分区重新分配计划。

  • --execute:在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用--reassignment-json-file 选项)。由管理员手动制定自定义重新分配计划,也可以使用--generate 选项提供。

  • --verify:在此模式下,该工具将验证最后一次--execute 期间列出的所有分区的重新分配状态。状态可以有成功、失败或正在进行等状态。

2.1.1 迁移过程实现

分区重新分配工具可用于将一些 Topic 从当前的 Broker 节点中迁移到新添加的 Broker 中。这在扩展现有集群时通常很有用,因为将整个 Topic 移动到新的 Broker 变得更容易,而不是一次移动一个分区。当执行此操作时,用户需要提供已有的 Broker 节点的 Topic 列表,以及到新节点的 Broker 列表(源 Broker 到新 Broker 的映射关系)。然后,该工具在新的 Broker 中均匀分配给指定 Topic 列表的所有分区。在迁移过程中,Topic 的复制因子保持不变。

现有如下实例,将 Topic 为 ke01,ke02 的所有分区从 Broker1 中移动到新增的 Broker2 和 Broker3 中。由于该工具接受 Topic 的输入列表作为 JSON 文件,因此需要明确迁移的 Topic 并创建 json 文件,如下所示:

> cat topic-to-move.json{"topics": [{"topic": "ke01"},            {"topic": "ke02"}],"version":1}
复制代码

准备好 JSON 文件,然后使用分区重新分配工具生成候选分配,命令如下:

> bin/kafka-reassign-partitions.sh --zookeeper dn1:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
复制代码

执行命名之前,Topic(ke01、ke02)的分区如下图所示:


点击并拖拽以移动

编辑


点击并拖拽以移动

编辑

执行完成命令之后,控制台出现如下信息:


点击并拖拽以移动

编辑

该工具生成一个候选分配,将所有分区从 Topic ke01,ke02 移动到 Broker1 和 Broker2。需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它。新的赋值应保存在 JSON 文件(例如 expand-cluster-reassignment.json)中,以使用--execute 选项执行。JSON 文件如下:

{"version":1,"partitions":[{"topic":"ke02","partition":0,"replicas":[2]},{"topic":"ke02","partition":1,"replicas":[1]},{"topic":"ke02","partition":2,"replicas":[2]},{"topic":"ke01","partition":0,"replicas":[2]},{"topic":"ke01","partition":1,"replicas":[1]},{"topic":"ke01","partition":2,"replicas":[2]}]}
复制代码

执行命令如下所示:

> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
复制代码

最后,--verify 选项可与该工具一起使用,以检查分区重新分配的状态。需要注意的是,相同的 expand-cluster-reassignment.json(与--execute 选项一起使用)应与--verify 选项一起使用,执行命令如下:

> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
复制代码

执行结果如下图所示:


点击并拖拽以移动

编辑


 同时,我们可以通过Kafka Eagle工具来查看 Topic 的分区情况。


点击并拖拽以移动

编辑


点击并拖拽以移动

编辑

2.2 跨集群迁移

这里跨集群迁移,我们指的是在 Kafka 多个集群之间复制数据“镜像”的过程,以避免与单个集群中的节点之间发生的复制混淆。 Kafka 附带了一个用于在 Kafka 集群之间镜像数据的工具。该工具从源集群使用并生成到目标集群。这种镜像的一个常见用例是在另一个数据中心提供副本。

另外,你可以运行许多此类镜像进程以提高吞吐量和容错(如果一个进程终止,其他进程将占用额外负载)。将从源集群中的 Topic 读取数据,并将其写入目标集群中具有相同名称的主题。事实上,“镜像”数据只不过是一个 Kafka 将消费者和生产者联系在了一起。

源集群和目标集群是完全独立的实体,它们可以具有不同数量的分区,并且偏移量将不相同。出于这个原因,镜像集群并不是真正意图作为容错机制(因为消费者的位置会有所不同);为此,建议使用正常的集群内复制。但是,镜像进程将保留并使用消息 Key 进行分区,因此可以按 Key 保留顺序。

下面是一个跨集群的单 Topic 实例,命令如下:

> ./kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist ke03
复制代码

需要注意的是,consumer.properties 文件配置源 Kafka 集群 Broker 地址,producer.properties 文件配置目标 Kafka 集群地址。如果需要迁移多个 Topic,可以使用 --whitelist 'A|B',如果需要迁移所有的 Topic,可以使用 --whitelist '*'。

3.结果预览

执行跨集群迁移命令后,目标集群中使用 Kafka Eagle 中查看 Topic Size 大小看是否与源集群的 Topic Size 大小相等,或者使用 SQL 语句,验证是否有数据迁移过来,结果如下图所示:


点击并拖拽以移动

编辑


4.总结

跨集群迁移数据的本质是,Kafka 启动了消费者读取源集群数据,并将消费后的数据写入到目标集群,在迁移的过程中,可以启动多个实例,提供迁出的吞吐量。

本期分享到此为止,关注博主不迷路,叶秋学长带你上高速

发布于: 刚刚阅读数: 3
用户头像

叶秋学长

关注

还未添加个人签名 2022.07.01 加入

全栈JAVA领域创作者

评论

发布
暂无评论
云原生系列五:Kafka 集群数据迁移基于Kubernetes的内部_kafka_叶秋学长_InfoQ写作社区