写点什么

大数据 -62 Kafka Topic 管理与运维实战:命令详解 + 副本分配策略 + JavaAPI

作者:武子康
  • 2025-08-07
    山东
  • 本文字数:3422 字

    阅读完需:约 11 分钟

大数据-62 Kafka Topic 管理与运维实战:命令详解 + 副本分配策略 + JavaAPI

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-30-新发布【1T 万亿】参数量大模型!Kimi‑K2 开源大模型解读与实践,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 08 月 04 日更新到:Java-89 深入浅出 MySQL 搞懂 MySQL Undo/Redo Log,彻底掌握事务回滚与持久化 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容,基本都是特性概念相关的:


  • Kafka 主题与分区

  • Kafka 自定义反序列化

  • Kafka 拦截器

  • Kafka 位移提交

  • Kafka 位移管理

  • Kafka 重平衡


kafka-topics

相关参数


创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic topic_test_1 --partitions 1 --replication-factor 1
复制代码

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --list
复制代码


kafak-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic topic_test_1
复制代码

修改主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic topic_test_1 --partitions 2 --replication-factor 1
复制代码

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic topic_test_1
复制代码

增加分区

通过命令行工具操作,只能够增加,不能够减少。


kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic topic_test_1 --partitions 1 --replication-factor 1
复制代码


kafka-topics.sh --zookeeper h121.wzk.icu:2181 --alter --topic topic_test_2 --partions 2
复制代码

分区副本分配策略详解

Kafka 的分区副本分配遵循三个核心原则,以确保集群的高可用性和数据可靠性:

副本分配目标

  1. Broker 间均衡分布

  2. 副本尽可能均匀地分布在所有可用的 Broker 上,避免单个 Broker 负载过重

  3. 示例:在 6 个 Broker 的集群中,每个分区的 3 个副本应该尽可能分散在 3 个不同的 Broker 上

  4. 跨 Broker 冗余

  5. 同一分区的多个副本必须分布在不同的 Broker 上

  6. 这样当某个 Broker 宕机时,该 Broker 上的所有主分区副本都可以从其他 Broker 上的副本中选举出新的主副本

  7. 跨机架容灾(可选)

  8. 如果配置了机架信息(rack.id),会尽量将同一分区的副本分散在不同机架

  9. 这可以防止单个机架故障导致数据不可用

  10. 示例:2 个副本在不同机架,第三个副本在第三个机架或随机分布

无机架信息的分配算法

当集群未配置机架信息时,采用以下分配逻辑:


  1. 第一个副本分配

  2. 从 Broker 列表中随机选择一个起始位置开始轮询

  3. 例如:Broker 列表为[1,2,3,4,5,6],随机选择起始位置 3,则分配顺序为 3→4→5→6→1→2

  4. 这种随机起始的轮询方式避免了每次都从固定位置开始导致的分配不均

  5. 后续副本分配

  6. 对第一个副本的 Broker ID 增加特定偏移量来确定其他副本位置

  7. 偏移量计算公式:(前一个副本Broker ID + 固定增量) % Broker总数

  8. 固定增量通常取质数(如 3),以确保良好的分布性

  9. 示例:第一个副本在 Broker3,Broker 总数 6,固定增量 3

  10. 第二个副本:(3+3)%6=0 → Broker6

  11. 第三个副本:(6+3)%6=3 → 回到 Broker3 时会跳过,选择下一个可用 Broker

  12. 分配调整

  13. 如果计算出的 Broker 已包含该分区的副本,则顺延选择下一个可用 Broker

  14. 确保同一分区的多个副本不会分配到同一个 Broker 上


这种分配策略在保证数据安全性的同时,也维持了集群的负载均衡,是 Kafka 高可用架构的重要基础。

必要参数

KafkaAdminClient

除了使用 Kafka 的 Bin 目录下的脚本来管理 Kafka 集群,开发者还可以通过编程方式调用 Kafka 提供的管理 API 将这些管理功能集成到自己的系统中,实现更灵活的集群管理方案。


在 Kafka 0.11 版本之前,主要可以通过以下两种方式实现编程化管理:


  1. 使用 kafka-core 包(Kafka 服务端实现,基于 Scala)中的 AdminClient 类

  2. 使用 AdminUtils 工具类这些方式可以完成基本的集群管理操作,如创建/删除 Topic、查看 Broker 状态等。


在 Kafka 0.11 版本之后,官方引入了一个新的管理客户端架构:


  1. 新增了位于 kafka-client 包下的 AdminClient 抽象类

  2. 其具体实现类为 KafkaAdminClient

  3. 这个新的 API 设计更加完善,提供了更丰富的管理功能,包括:

  4. 完整的 Topic 管理(创建/删除/修改配置)

  5. 分区管理(增加分区、重新分配分区)

  6. 配置管理(动态更新配置)

  7. ACL 权限管理

  8. 消费者组管理

  9. 副本管理


使用示例:


Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");try (AdminClient adminClient = AdminClient.create(props)) {    // 创建Topic    NewTopic newTopic = new NewTopic("my-topic", 3, (short)1);    CreateTopicsResult result = adminClient.createTopics(        Collections.singleton(newTopic));    // 等待创建完成    result.all().get();}
复制代码


新的 AdminClient API 相比之前的方案具有以下优势:


  1. 完全基于 Java 实现,不再依赖 Scala

  2. 异步非阻塞设计

  3. 更全面的管理功能覆盖

  4. 更好的错误处理和返回结果

  5. 与 Kafka 其他客户端 API 保持一致的编程风格


这种编程化的管理方式特别适合需要将 Kafka 管理功能集成到运维系统或自动化平台的场景,可以实现:


  • 自动化部署和配置管理

  • 动态资源调配

  • 监控告警系统集成

  • 自助式管理 Portal 等企业级功能

基本功能介绍

Kafka 提供了一系列基础管理功能,用于集群和主题的日常运维管理。以下是详细功能说明:

主题管理功能

  1. 创建主题 (createTopics)

  2. 可指定分区数、副本数、副本分配策略等参数

  3. 示例:创建名为"test_topic"的主题,设置 3 个分区和 2 个副本

  4. 支持批量创建多个主题

  5. 删除主题 (deleteTopics)

  6. 可指定立即删除或延迟删除

  7. 支持批量删除多个主题

  8. 删除前会自动检查主题是否正在被使用

  9. 列出所有主题 (listTopics)

  10. 返回集群中所有主题的列表

  11. 可配合正则表达式过滤主题名称

  12. 支持分页查询大量主题

  13. 查询主题详情 (describeTopics)

  14. 获取主题的完整配置信息

  15. 包括分区分布、副本位置、ISR 列表等

  16. 支持查询多个主题的详细信息

  17. 增加分区 (createPartitions)

  18. 动态扩展主题的分区数量

  19. 需指定新增分区数及可选的分区分配方案

  20. 注意:分区数只能增加不能减少

集群管理功能

  1. 查询集群信息 (describeCluster)

  2. 获取集群 ID、控制器节点等元数据

  3. 列出所有 Broker 节点及其状态

  4. 查看集群当前资源使用情况

  5. 配置管理

  6. 查询配置 (describeConfigs):获取 Broker、主题或客户端的配置详情

  7. 修改配置 (alterConfigs):动态更新配置参数

  8. 支持批量修改多个资源配置

  9. 可设置临时或永久配置变更

日志目录管理

  1. 修改副本日志目录 (alterReplicaLogDirs)

  2. 将分区副本迁移到指定的日志目录

  3. 支持在线迁移不影响可用性

  4. 可用于磁盘空间均衡或故障恢复

  5. 日志目录查询

  6. describeLogDirs:查询 Broker 节点上所有日志目录的使用情况

  7. describedReplicaLogDirs:获取特定副本的日志目录信息

应用场景示例

  • 容量规划时使用describeTopics分析分区分布

  • 磁盘扩容后使用alterReplicaLogDirs重新平衡存储

  • 业务高峰前用createPartitions扩展主题容量

  • 日常维护使用describeCluster监控集群健康状态


这些功能共同构成了 Kafka 基础管理 API,可通过 Kafka 命令行工具或 AdminClient API 调用。

操作步骤

  • 客户端根据方法的调用,创建出相应的协议请求,比如创建:Topic 的 createTopics 方法,其内部就是发送 CreateTopicRequest 请求。

  • 客户端发送请求到 KafkaBroker

  • KafkaBroker 处理相应的请求并回执,CreateTopicResponse,客户端接受并解析处理。


如果要自己实现一个功能,需要:


  • 自定义 XXOptions

  • 自定义 XXResult 返回值

  • 自定义 Call,挑选合适的 XXRequest 和 XXResponse

偏移量管理

在 Kafka 1.0.2 版本,在 __consumer_offsets 主题中保存各个消费组的偏移量。早期是子啊 ZooKeeper 中管理消费偏移量。

脚本参数

脚本名称为:bin/kafka-consumer-groups.sh




查看 GroupID 消费

查看当前有哪些 GroupID 正在进行消费,这里没有指定 Topic,查看的是所有 Topic 消费的 GroupId 的列表。


kafka-consumer-groups.sh --bootstrap-server h121.wzk.icu:9092 --list
复制代码

查看指定 ID 消费

kafka-consumer-groups.sh --bootstrap-server h121.wzk.icu:9092 --describe --group group 
复制代码

设置偏移量为最早

kafka-consumer-groups.sh --bootstrap-server h121.wzk.icu:9092 --reset-offsets --group group --to-earliest --topic test_demo
复制代码

设置偏移量为最新

kafka-consumer-groups.sh --bootstrap-server h121.wzk.icu:9092  --reset-offsets --group group --to-latest --topic test_demo
复制代码


发布于: 刚刚阅读数: 4
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-62 Kafka Topic 管理与运维实战:命令详解 + 副本分配策略 + JavaAPI_Java_武子康_InfoQ写作社区