写点什么

Apache Kafka 动态配置的原理与应用

  • 2022 年 9 月 20 日
    江苏
  • 本文字数:3551 字

    阅读完需:约 12 分钟

本文作者为中国移动云能力中心大数据团队软件开发工程师孙大鹏,本文从 Apache Kafka 动态参数功能的概念及分类出发,对动态配置的实现原理进行介绍,并提供了应用案例,供大家参考。

诞生背景

配置 Kafka 集群的参数,普通做法是,一次性在 server.properties 文件中配置好所有参数后,启动 Broker。但是在需要变更任何参数后,就必须要重启 Broker,这很不方便,所以在 1.1.0 版本中正是引入了动态 Broker 参数(Dynamic Broker Configs)。

动态参数的概念及分类

在配置文件 server.properties 中配置的参数称之为静态参数(Static Configs)。所谓动态,就是指修改参数值后,无需重启服务进程就能立即生效的参数。Kafka 的动态配置,是借助 zookeeper 来实现的。Kafka 将动态参数保存在 Zookeeper 的/config 下,并根据不同的维度写入到不同的路径。动态配置参数,可以分为如下四个维度:


brokers

brokers 级动态参数配置,相关内容记录在 zk 的/config/brokers 下,用来保存 broker 级别参数。其下又可以分为两类:针对集群所有 broker 节点(cluster-wide)和指定某一 broker(per-broker)。

•per-broker:被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。记录在/config/brokers/${Broker.id}路径下

•cluster-wide:被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效。记录在 zk 中的/config/brokers/default 下

topics

topic 维度的动态参数,保存在/config/topics 路径下,再以 topic 的名字作为子路径存储 topic 级的相关配置。

clients

client 维度的动态参数,保存在/config/clients 路径下,再以 client_id 作为子路径存储 client 的配额信息,所谓配额是指限制连入集群的客户端的吞吐量或是限定他们的使用 CPU 资源。

users

users 维度的动态参数,保存在/config/users 路径下,再以 user_id 作为子路径存储 user 的配额信息,即限制连入集群的用户的吞吐量或是限定他们的使用 CPU 资源。

参数生效优先级

参数的生效优先级别如图所示,环越小优先级越高,整体符合如下规则:



•动态的高于静态的

•作用范围小的高于作用范围大的

•topic 的高于 broker 的在/config 下除了以上四个 znode 外还会存在一个 changes 节点,用来实时监控动态参数变更的,不会保存参数值,具体用途参加下面实现原理部分。

实现原理

broker 启动时加载配置

broker 启动时初始化 broker 级配置

启动 kafka broker 时,在 Kafka 主类中调用 config.dynamicConfig.initialize(zkClient)方法,首先通过读取启动命令中指定的静态配置文件,来创建一个初始的配置类对象 currentConfig,然后根据 zkClient 读取 zk 中 broker 级的 cluster-wide 配置(/config/brokers/<default>)更新配置 updateDefaultConfig;再读取 zk 中 per-broker 的配置 /config/brokers/{当前 BrokerId},根据指定 broker 的配置覆盖原有的配置,通过 updateBrokerConfig 再次更新内存中的 currentConfig。

broker 启动时遍历加载所有历史的动态配置

前面概念部分介绍过,动态配置可以在四个维度进行配置,针对这四个维度分别创建对应的 ConfigHandler,统一添加到 DynamicConfigManager,在启动 DynamicConfigManager 时,会遍历 zk 中动态配置 /config 下的对应四种维度的 z 所有 znode 及其子节点(brokers/clients/topics/users),根据其中记录的内容更新内存中的配置,并维护一个 lastExecutedChange,新的变化可以通过和 lastExecutedChange 对比来确认是否已经处理过。至此 broker 在启动时就可以加载到静态及全部的动态配置。

内存中配置值的更新

broker 启动时会遍历所有维度的动态配置并调用 processConfigChanges()方法去更新内存中的配置,这里对应于四种维度,processConfigChanges()有四种实现:

1.BrokerConfigHandler.processConfigChanges(brokerId: String, properties: Properties)2.TopicConfigHandler.processConfigChanges(topic: String, topicConfig: Properties)3.UserConfigHandler.processConfigChanges(quotaEntityPath: String, config: Properties)

4.ClientIdConfigHandler.processConfigChanges(sanitizedClientId: String, clientConfig: Properties)

客户端触发动态配置

客户端可以通过 Kafka 自带的 Kafka-configs 脚本通过命令行的方式发起动态配置参数的请求,根据传入的 entity-type 和 entity-name 等信息,从 zk 中找到对应的 znode,读取现有的配置,根据现有配置及传入的变更需求,进行相关参数的校验及配置参数的格式化,并将更改后的参数配置写回到响应的 znode 中,最终在/config/change 下创建一个新的以 config_change_ 为前缀的 znode,通过此 znode 可以索引到变更配置的详细内容。

broker 感知动态配置

kafka 动态配置的实现是基于 zk 的 watcher 实现的,前面介绍过,zk 中分别用/config/brokers、/config/clients、/config/topics、/config/users 来保存不同维度的动态参数配置,为了减少 broker 监听的 znode,kafka 在/config 下创建了另一个/config/changes,用来记录具体是哪个配置发生了变更,这样 broker 只需要监听/config/changes 即可。broker 在启动时通过 ZkNodeChangeNotificationListener 对/config/change 注册了一个子节点变化监听器。监听/config/changes 下所有子节点的变化,/config/changes 的子节点是有统一前缀(config_change_)的有序 znode,当检测到/config/changes 下有新增的 znode 时,就会校验上次处理过的 znode 的编号,确认有新的 znode 出现,就会读取该新添加的 znode 并根据其内容所引到真实变化的配置的 znode 节点,读取其最新的内容并调用通过对应的 processConfigChanges()去更新内存中的配置。

流程总结



常用动态参数及应对场景

应对场景

1.动态调整 Broker 端各种线程池大小,实时应对突发流量,相关参数如 num.io.threads、num.network.threads 等

2.动态修改日志留存时间/大小等,实时应对磁盘使用率,相关参数如 log.retention.bytes、log.retention.ms 等

3.与 SSL 相关的参数(灵活控制 SSL 证书有效期提升安全性),相关参数如 ssl.keystore.type、ssl.keystore.location 等 4.客户端/用户配额等,相关参数如 consumer_byte_rate、producer_byte_rate 等

实际应用案例

一个生产环境,发现磁盘使用率达到了 75%,查看磁盘数据,发现 __consumer_offset 的日志占用了 70%,检查 zk 中 __consumer_offset 的配置:

[zk: localhost:2181(CONNECTED) 8] get /kafka/config/topics/__consumer_offsets{  "version": 1,  "config": {    "segment.bytes": "104857600",    "cleanup.policy": "compact"  }}
复制代码

从上面的结果中可以看到 __consumer_offset 的日志保留策略为 compact,再结合静态配置文件 server.properties 中 log.cleaner.enable=false,可以确认 __consumer_offset 的日志一直不会被回收。对此存在两种方案:

1.更改静态配置 log.cleaner.enable=true,重启集群所有 broker

2.更改 __consumer_offset 的日志保存策略为 delete,通过 retention.bytes 和 retention.ms 控制对应日志的保留时间及大小

为了减少对客户的影响,采用第二种方案,动态配置 __consumer_offset 的保留策略为 delete,在客户侧无感知的情况下,合理的回收了磁盘空间。

./bin/kafka-configs.sh --zookeeper wj-test-003:2181/kafka --entity-type topics --entity-name __consumer_offsets --alter --add-config retention.ms=604800000./bin/kafka-configs.sh --zookeeper wj-test-003:2181/kafka --entity-type topics --entity-name __consumer_offsets --alter --add-config cleanup.policy=delete
复制代码

变更后,观察 zk 中相关 znode 的内容如下:


[zk: localhost:2181(CONNECTED) 9] get /kafka/config/changes/config_change_0000000189{ "version": 2, "entity_path": "topics/__consumer_offsets"}
[zk: localhost:2181(CONNECTED) 10] get /kafka/config/topics/__consumer_offsets{ "version": 1, "config": { "segment.bytes": "104857600", "retention.ms": "604800000", "cleanup.policy": "delete" }}
复制代码

配置完成后,可以看到,/config/changes/config_change_0000000189 记录了本次变化涉及到的的是 topic 维度的 __consumer_offset,再查看/config/topics/__consumer_offsets,可以获取到变更后的配置完成内容,同时查看磁盘路径,超过 7 天的日志文件,很快就被回收,释放了对应的磁盘空间,并且对用户侧完全没有影响。

参考链接

https://blog.51cto.com/szzdzhp/5301872

https://www.jianshu.com/p/9522bcf66c4b

https://time.geekbang.org/column/article/113504

https://www.cnblogs.com/lizherui/p/12271285.html

用户头像

移动云,5G时代你身边的智慧云 2019.02.13 加入

移动云大数据产品团队,在移动云上提供云原生大数据分析LakeHouse,消息队列Kafka/Pulsar,云数据库HBase,弹性MapReduce,数据集成与治理等PaaS服务。 微信公众号:人人都学大数据

评论

发布
暂无评论
Apache Kafka 动态配置的原理与应用_移动云大数据_InfoQ写作社区