写点什么

Kafka 常用工具脚本总结

用户头像
Se7en
关注
发布于: 5 小时前
Kafka 常用工具脚本总结

系统

启动 Kafka

-daemon 参数可以让 Kafka 在后台运行。


kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
复制代码

指定 JMX 端口启动

JMX 的全称为 Java Management Extensions。 顾名思义,是管理 Java 的一种扩展,通过 JMX 可以方便我们监控 Kafka 的内存,线程,CPU 的使用情况,以及生产和消费消息的指标。


JMX_PORT=9999 kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
复制代码

停止 Kafka

kafka-server-stop.sh 
复制代码

Topic

创建 Topic

kafka-topics.sh --create  --bootstrap-server 10.37.62.20:9092 --replication-factor 3 --partitions 3 --topic <topic-name>
复制代码

列出所有 Topic

kafka-topics.sh  --bootstrap-server 10.37.62.20:9092 --list
复制代码

查看指定 Topic

kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --describe --topic <topic-name>
复制代码

删除指定 Topic

kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --delete --topic <topic-name>
复制代码

扩展 Topic 的 Partition 数量

artition 数量只能扩大不能缩小。


kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --topic app --alter --partitions 30
复制代码

扩展 topic 每个 partition 的副本数量

replication factor 可以扩大也可以缩小,最多不能超过 broker 数量。先创建一个文件名为 increace-factor.json,这里要扩展的是 mysql-audit-log 这个 topic 的 partition 到 15 个:0,1,2 为 broker id。


{"version":1,"partitions":[{"topic":"mysql-audit-log","partition":0,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":1,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":2,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":3,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":4,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":5,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":6,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":7,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":8,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":9,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":10,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":11,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":12,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":13,"replicas":[0,1,2]},{"topic":"mysql-audit-log","partition":14,"replicas":[0,1,2]}]}
复制代码


kafka-reassign-partitions.sh --zookeeper 10.37.62.20:2181 --reassignment-json-file  increace-factor.json --execute 
复制代码

查看 Topic 数据大小

#方法一kafka-log-dirs.sh \  --bootstrap-server 192.168.1.87:9092 \  --topic-list mytopic \  --describe \  | grep -oP '(?<=size":)\d+'  \  | awk '{ sum += $1 } END { print sum }'  #返回结果,单位 Byte648
#方法二,需要安装 jqkafka-log-dirs.sh \ --bootstrap-server 192.168.1.87:9092 \ --topic-list mytopic \ --describe \ | grep '^{' \ | jq '[ ..|.size? | numbers ] | add'
#返回结果,单位 Byte648
复制代码

消费者组 Consumer Group

列出所有的 Consumer Group

 kafka-consumer-groups.sh --bootstrap-server 10.37.62.20:9092 --list
复制代码

查看指定 Consumer Group 详情

  • GROUP:消费者 group

  • TOPIC:话题 id

  • PARTITION:分区 id

  • CURRENT-OFFSET:当前已消费的条数

  • LOG-END-OFFSET:总条数

  • LAG:未消费的条数

  • CONSUMER-ID:消费者 id

  • HOST:消费者 ip 地址

  • CLIENT-ID:客户端 id


#这里查看的是 logstash_mysql 这个消费者 group 的消费情况kafka-consumer-groups.sh --bootstrap-server 10.37.62.20:9092 --describe --group logstash_mysql
#返回结果GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDlogstash_mysql mysql-audit-log 11 1312115 1312857 742 logstash-5-0545a8a7-f7bd-430c-b619-7a2b206addd2 /10.37.62.24 logstash-5logstash_mysql mysql-audit-log 1 1312593 1313345 752 logstash-0-d86bd51a-d010-45de-aa6f-f6da8542b779 /10.37.62.23 logstash-0logstash_mysql mysql-audit-log 2 1309548 1310317 769 logstash-1-496340ea-935d-444d-a184-51d42e225054 /10.37.62.24 logstash-1logstash_mysql mysql-audit-log 12 1313083 1313194 111 logstash-6-806b20cb-a9af-49c1-b37d-ccb33a646ab2 /10.37.62.24 logstash-6logstash_mysql mysql-audit-log 6 1310984 1311192 208 logstash-13-8d474bf6-e8d0-4b8a-b319-cf5e2e6cc078 /10.37.62.24 logstash-13logstash_mysql mysql-audit-log 9 1312998 1313768 770 logstash-3-29863fb0-6708-4fb1-9e28-bd81c30ce8ef /10.37.62.24 logstash-3logstash_mysql mysql-audit-log 4 1315150 1315276 126 logstash-11-6d66a188-85b7-476b-bd89-5423ef48cd01 /10.37.62.24 logstash-11logstash_mysql mysql-audit-log 0 22770935522 22770935650 128 logstash-0-7be475d6-a49e-4ff9-bf83-6b83f6067306 /10.37.62.24 logstash-0logstash_mysql mysql-audit-log 8 1309956 1310103 147 logstash-2-3c313c6f-8c98-4333-8bad-2f9696457d7d /10.37.62.24 logstash-2logstash_mysql mysql-audit-log 13 1314659 1314775 116 logstash-7-e98fd14e-e7f6-45e5-8ccf-2442058f0bc9 /10.37.62.24 logstash-7logstash_mysql mysql-audit-log 14 1313145 1313250 105 logstash-8-2c3345a8-f8f1-4f08-a18e-333dff2f0d65 /10.37.62.24 logstash-8logstash_mysql mysql-audit-log 5 1314037 1314297 260 logstash-12-ce018227-9e59-4137-a23f-5ccc0c7d4f6a /10.37.62.24 logstash-12logstash_mysql mysql-audit-log 10 1312883 1312962 79 logstash-4-9eb84ae4-3351-4083-9b1f-288910a6c3b8 /10.37.62.24 logstash-4logstash_mysql mysql-audit-log 7 1312476 1313200 724 logstash-14-680c982e-5cf3-406b-810a-4d5c96b5bdee /10.37.62.24 logstash-14logstash_mysql mysql-audit-log 3 1313227 1313328 101 logstash-10-e212dc18-a2bb-42d9-9d0b-095a93841efc /10.37.62.24 logstash-10
复制代码

删除指定 Consumer Group

 kafka-topics.sh --bootstrap-server 10.37.62.20:9092 --delete --topic pgw-nginx
复制代码

消息

生产消息

普通生产消息

kafka-console-producer.sh --broker-list 11.8.36.125:9092 --topic mytopic>this is my topic
复制代码

生产消息指定 Key

key.separator=, 指定以逗号作为 key 和 value 的分隔符。


kafka-console-producer.sh --broker-list kafka1:9092 --topic cr7-topic --property parse.key=true --property key.separator=,
>mykey,{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
复制代码

消费消息

从头开始消费

从头开始消费是可以消费到之前的消息的,通过 --from-beginning 指定:


kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic --from-beginningthis is my topic
复制代码

从尾部开始消费

--offset latest 指定从尾部开始消费,另外还需要指定 partition,可以指定多个:


kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic  --offset latest  --partition 0 1 2 
复制代码

消费指定条数的消息

--max-messages指定取的个数:


kafka-console-consumer.sh --bootstrap-server 11.8.36.125:9092 --topic mytopic  --offset latest  --partition 0 1 2 --max-messages 2bobo1111Processed a total of 2 messages
复制代码

指定消费组进行消费

--consumer-property group.id=<消费者组名>执行消费者组进行消费:


kafka-console-consumer.sh --bootstrap-server  kafka1:9092 --topic test_partition --consumer-property  group.id=test_group --from-beginning 
复制代码

查看消息具体内容

 kafka-dump-log.sh --files cr7-topic-0/00000000000000000000.log  -deep-iteration --print-data-log  #输出结果| offset: 1080 CreateTime: 1615020877664 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 1 payload: {"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}| offset: 1081 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 5 payload: {"orderAmount":1000,"orderId":5,"productId":105,"productNum":5}| offset: 1082 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 7 payload: {"orderAmount":1000,"orderId":7,"productId":107,"productNum":7}| offset: 1083 CreateTime: 1615020877677 keysize: 1 valuesize: 63 sequence: -1 headerKeys: [] key: 8 payload: {"orderAmount":1000,"orderId":8,"productId":108,"productNum":8}| offset: 1084 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 11 payload: {"orderAmount":1000,"orderId":11,"productId":111,"productNum":11}| offset: 1085 CreateTime: 1615020877677 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 15 payload: {"orderAmount":1000,"orderId":15,"productId":115,"productNum":15}| offset: 1086 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 17 payload: {"orderAmount":1000,"orderId":17,"productId":117,"productNum":17}| offset: 1087 CreateTime: 1615020877678 keysize: 2 valuesize: 65 sequence: -1 headerKeys: [] key: 21 payload: {"orderAmount":1000,"orderId":21,"productId":121,"productNum":21}
复制代码

查看 Topic 中当前消息总数

Kafka 自带的命令没有直接提供这样的功能,要使用 Kafka 提供的工具类 GetOffsetShell 来计算给定 Topic 每个分区当前最早位移和最新位移,差值就是每个分区的当前的消息总数,将该 Topic 所有分区的消息总数累加就能得到该 Topic 总的消息数。


首先查询 Topic 中每个分区 offset 的最小值(起始位置),使用 --time -2 参数。一个分区的起始位置并不是每时每刻都为 0 ,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加。


kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 -topic test-topic  --time -2
#前面是分区号,后面是 offsettest-topic:0:0test-topic:1:0
复制代码


然后使用 --time -1 参数查询 Topic 各个分区的 offset 的最大值。


kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic test-topic
#输出结果test-topic:0:5500000test-topic:1:5500000
复制代码


对于本例来说,test-topic 中当前总的消息数为 (5500000 - 0) + (5500000 - 0),等于 1100 万条。如果只是要获取 Topic 中总的消息数(包括已经从 Kafka 删除的消息),那么只需要将 Topic 中每个 Partition 的 Offset 累加即可。

Offset

重置消费者 Offset

#查看消费者组消费情况#目前的 0 分区 CURRENT-OFFSET 是 4,2 分区 CURRENT-OFFSET 是 6kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
#返回结果Consumer group 'my-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmy-consumer-group transaction-topic-msg 2 6 6 0 - - -my-consumer-group transaction-topic-msg 1 0 0 0 - - -my-consumer-group transaction-topic-msg 0 4 4 0 - - - -
#重置消费者组 offset 为 3,重置是所有分区一起重置kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --group my-consumer-group --reset-offsets --execute --to-offset 3 --topic transaction-topic-msg
#返回结果[2021-06-25 10:44:51,848] WARN New offset (3) is higher than latest offset for topic partition transaction-topic-msg-1. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$)
GROUP TOPIC PARTITION NEW-OFFSET my-consumer-group transaction-topic-msg 0 3 my-consumer-group transaction-topic-msg 1 0 my-consumer-group transaction-topic-msg 2 3
#可以看到 0 分区和 2 分区的 CURRENT-OFFSET 都变为 3 了kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group my-consumer-group
#返回结果Consumer group 'my-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDmy-consumer-group transaction-topic-msg 2 3 6 3 - - -my-consumer-group transaction-topic-msg 1 0 0 0 - - -my-consumer-group transaction-topic-msg 0 3 4 1 - - -
#可以重新消费到之前的数据kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic transaction-topic-msg --group my-consumer-group
#返回结果message-111111message-333333
复制代码

性能测试

生产者性能测试

  • --num-records 10000000: 向指定主题发送了 1 千万条消息。

  • --record-size 1024: 每条消息的大小为 1024KB。

  • --throughput -1: 不限制吞吐量。

  • --producer-props: 指定生产者参数。

  • acks=-1: 这要求 ISR 列表里跟 leader 保持同步的那些 follower 都要把消息同步过去,才能认为这条消息是写入成功。

  • linger.ms=2000: batch.size 和 linger.ms 是对 kafka producer 性能影响比较大的两个参数。batch.size 是 producer 批量发送的基本单位,默认是 16384Bytes,即 16kB;lingger.ms 是 sender 线程在检查 batch 是否 ready 时候,判断有没有过期的参数,默认大小是 0ms。

  • compression.type=lz4: 使用 lz4 压缩算法。


[root@kafka1 ~]# kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka1:9092 acks=-1 linger.ms=2000 compression.type=lz4
#输出结果705600 records sent, 141063.6 records/sec (137.76 MB/sec), 54.8 ms avg latency, 557.0 ms max latency.1204178 records sent, 240739.3 records/sec (235.10 MB/sec), 44.1 ms avg latency, 402.0 ms max latency.1370938 records sent, 274187.6 records/sec (267.76 MB/sec), 27.9 ms avg latency, 311.0 ms max latency.1464605 records sent, 292628.4 records/sec (285.77 MB/sec), 19.2 ms avg latency, 139.0 ms max latency.1477239 records sent, 295447.8 records/sec (288.52 MB/sec), 31.8 ms avg latency, 290.0 ms max latency.1446682 records sent, 289336.4 records/sec (282.56 MB/sec), 26.4 ms avg latency, 281.0 ms max latency.1555098 records sent, 311019.6 records/sec (303.73 MB/sec), 37.6 ms avg latency, 344.0 ms max latency.10000000 records sent, 263894.020162 records/sec (257.71 MB/sec), 32.60 ms avg latency, 557.00 ms max latency, 12 ms 50th, 140 ms 95th, 262 ms 99th, 396 ms 99.9th.
复制代码


我们应该关心延时的概率分布情况,仅仅知道一个平均值是没有意义的。这就是这里计算分位数的原因。通常我们关注到 99th 分位就可以了。比如在上面的输出中,99th 值是 262 ms,这表明测试生产者生产的消息中,有 99% 消息的延时都在 262 ms 以内。你完全可以把这个数据当作这个生产者对外承诺的 SLA。

消费者性能测试

[root@kafka1 ~]# kafka-consumer-perf-test.sh --broker-list kafka1:9092 --messages 10000000 --topic test_producer_perf
#输出结果start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec2021-03-09 10:34:18:447, 2021-03-09 10:34:33:948, 9765.6250, 629.9997, 10000000, 645119.6697, 1615257259068, -1615257243567, -0.0000, -0.0062
复制代码


虽然输出格式有所差别,但该脚本也会打印出消费者的吞吐量数据。比如本例中的 629.9997MB/s。有点令人遗憾的是,它没有计算不同分位数下的分布情况。因此,在实际使用过程中,这个脚本的使用率要比生产者性能测试脚本的使用率低。

修改动态参数

查看支持的动态参数

如果你想要知道动态 Broker 参数都有哪些,一种方式是在 Kafka 官网中查看 Broker 端参数列表,另一种方式是直接运行无参数的 kafka-configs 脚本,该脚本的说明文档会告诉你当前动态 Broker 参数都有哪些。


[root@kafka1 ~]# kafka-configs.sh This tool helps to manipulate and describe entity config for a topic, client, user or brokerOption                                 Description                            ------                                 -----------                            --add-config <String>                  Key Value pairs of configs to add.                                              Square brackets can be used to group                                          values which contain commas: 'k1=v1,                                          k2=[v1,v2,v2],k3=v3'. The following                                           #Topic 动态参数                                         is a list of valid configurations:                                            For entity-type 'topics':                                                    cleanup.policy                                                                compression.type                                                              delete.retention.ms                                                           file.delete.delay.ms                                                          flush.messages                                                                flush.ms                                                                      follower.replication.throttled.                                                replicas                                                                     index.interval.bytes                                                          leader.replication.throttled.replicas                                         max.compaction.lag.ms                                                         max.message.bytes                                                             message.downconversion.enable                                                 message.format.version                                                        message.timestamp.difference.max.ms                                           message.timestamp.type                                                        min.cleanable.dirty.ratio                                                     min.compaction.lag.ms                                                         min.insync.replicas                                                           preallocate                                                                   retention.bytes                                                               retention.ms                                                                  segment.bytes                                                                 segment.index.bytes                                                           segment.jitter.ms                                                             segment.ms                                                                    unclean.leader.election.enable                                                #Broker 动态参数                                       For entity-type 'brokers':                                                     advertised.listeners                                                          background.threads                                                            compression.type                                                              follower.replication.throttled.rate                                           leader.replication.throttled.rate                                             listener.security.protocol.map                                                listeners                                                                     log.cleaner.backoff.ms                                                        log.cleaner.dedupe.buffer.size                                                log.cleaner.delete.retention.ms                                               log.cleaner.io.buffer.load.factor                                             log.cleaner.io.buffer.size                                                    log.cleaner.io.max.bytes.per.second                                           log.cleaner.max.compaction.lag.ms                                             log.cleaner.min.cleanable.ratio                                               log.cleaner.min.compaction.lag.ms                                             log.cleaner.threads                                                           log.cleanup.policy                                                            log.flush.interval.messages                                                   log.flush.interval.ms                                                         log.index.interval.bytes                                                      log.index.size.max.bytes                                                      log.message.downconversion.enable                                             log.message.timestamp.difference.max.                                          ms                                                                           log.message.timestamp.type                                                    log.preallocate                                                               log.retention.bytes                                                           log.retention.ms                                                              log.roll.jitter.ms                                                            log.roll.ms                                                                   log.segment.bytes                                                             log.segment.delete.delay.ms                                                   max.connection.creation.rate                                                  max.connections                                                               max.connections.per.ip                                                        max.connections.per.ip.overrides                                              message.max.bytes                                                             metric.reporters                                                              min.insync.replicas                                                           num.io.threads                                                                num.network.threads                                                           num.recovery.threads.per.data.dir                                             num.replica.fetchers                                                          principal.builder.class                                                       replica.alter.log.dirs.io.max.bytes.                                           per.second                                                                   sasl.enabled.mechanisms                                                       sasl.jaas.config                                                              sasl.kerberos.kinit.cmd                                                       sasl.kerberos.min.time.before.relogin                                         sasl.kerberos.principal.to.local.rules                                        sasl.kerberos.service.name                                                    sasl.kerberos.ticket.renew.jitter                                             sasl.kerberos.ticket.renew.window.                                             factor                                                                       sasl.login.refresh.buffer.seconds                                             sasl.login.refresh.min.period.seconds                                         sasl.login.refresh.window.factor                                              sasl.login.refresh.window.jitter                                              sasl.mechanism.inter.broker.protocol                                          ssl.cipher.suites                                                             ssl.client.auth                                                               ssl.enabled.protocols                                                         ssl.endpoint.identification.algorithm                                         ssl.engine.factory.class                                                      ssl.key.password                                                              ssl.keymanager.algorithm                                                      ssl.keystore.certificate.chain                                                ssl.keystore.key                                                              ssl.keystore.location                                                         ssl.keystore.password                                                         ssl.keystore.type                                                             ssl.protocol                                                                  ssl.provider                                                                  ssl.secure.random.implementation                                              ssl.trustmanager.algorithm                                                    ssl.truststore.certificates                                                   ssl.truststore.location                                                       ssl.truststore.password                                                       ssl.truststore.type                                                           unclean.leader.election.enable                                               For entity-type 'users':                                                       SCRAM-SHA-256                                                                 SCRAM-SHA-512                                                                 consumer_byte_rate                                                            controller_mutation_rate                                                      producer_byte_rate                                                            request_percentage                                                           For entity-type 'clients':                                                     consumer_byte_rate                                                            controller_mutation_rate                                                      producer_byte_rate                                                            request_percentage                                                           Entity types 'users' and 'clients' may                                          be specified together to update                                               config for clients of a specific                                              user.   
复制代码

修改 Broker 动态参数

修改动态参数无需重启 Broker,动态 Broker 参数的使用场景非常广泛,通常包括但不限于以下几种:


  • 动态调整 Broker 端各种线程池大小,实时应对突发流量。

  • 动态调整 Broker 端连接信息或安全配置信息。

  • 动态更新 SSL Keystore 有效期。

  • 动态调整 Broker 端 Compact 操作性能。

  • 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。


Kafka Broker Config 的参数有以下 3 种类型:


  • read-only:被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。

  • per-broker:被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。

  • cluster-wide:被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。


在集群层面设置全局值,即设置 cluster-wide 范围值,将 unclean.leader.election.enable 参数在集群层面设置为 true。


kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \--entity-type brokers --entity-default --alter \--add-config unclean.leader.election.enable=true
#返回结果Completed updating default config for brokers in the cluster.
复制代码


如果要设置 cluster-wide 范围的动态参数,需要显式指定 entity-default。现在,我们使用下面的命令来查看一下刚才的配置是否成功。


kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \--entity-type brokers --entity-default --describe
#返回结果Default configs for brokers in the cluster are: unclean.leader.election.enable=true sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true}
复制代码


在 Zookeeper 上查看 /config/brokers/<default> 节点可以查看 cluster-wide 的动态参数设置。


[zk: (CONNECTED) ] > get /config/brokers/<default>{"version":1,"config":{"unclean.leader.election.enable":"true"}}cZxid = 17179869570ctime = 1631246402937mZxid = 17179869570mtime = 1631246402937pZxid = 17179869570cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0dataLength = 64numChildren = 0
复制代码


设置 per-broker 范围参数。我们还是以 unclean.leader.election.enable 参数为例,我现在为 ID 为 1 的 Broker 设置一个不同的值。命令如下:


kafka-configs.sh --bootstrap-server  10.37.249.58:9092 --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
#返回结果Completed updating config for broker 1.
复制代码


我们使用下列命令查看 Broker ID 为 1 的节点动态参数,可以看到 DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false,表示我们刚才对 per-broker 参数的调整生效了。


kafka-configs.sh --bootstrap-server 10.37.249.58:9092 --entity-type brokers --entity-name 1 --describe
#返回结果Dynamic configs for broker 1 are: unclean.leader.election.enable=false sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:unclean.leader.election.enable=false, DYNAMIC_DEFAULT_BROKER_CONFIG:unclean.leader.election.enable=true, STATIC_BROKER_CONFIG:unclean.leader.election.enable=false, DEFAULT_CONFIG:unclean.leader.election.enable=false}
复制代码


在 Zookeeper 上查看 /config/brokers/1 节点可以查看 Broker ID 为 1 的节点的动态参数设置。


[zk: (CONNECTED) ] > get /config/brokers/1{"version":1,"config":{"unclean.leader.election.enable":"false"}}cZxid = 17179869574ctime = 1631246495120mZxid = 17179869574mtime = 1631246495120pZxid = 17179869574cversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0dataLength = 65numChildren = 0[zk: (CONNECTED) ] > get /config/brokers/<default>[zk: (CONNECTED) ] > get /config/brokers/1
复制代码


删除 cluster-wide 范围动态参数。


kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \--entity-type brokers --entity-default --alter \--delete-config unclean.leader.election.enable
#返回结果Completed updating default config for brokers in the cluster.
复制代码


删除 per-broker 范围参数。


kafka-configs.sh --bootstrap-server 10.37.249.58:9092 \--entity-type brokers --entity-name 1 --alter \--delete-config unclean.leader.election.enable
#返回结果Completed updating config for broker 1.
复制代码

修改 Topic 动态参数

设置 Topic test-topic 的 retention.ms 为 10000。


kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \--entity-type topics --entity-name test-topic --alter \--add-config retention.ms=10000
复制代码


查看设置的 Topic 动态参数。


kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \--entity-type topics --entity-name test-topic --describe
#返回结果Dynamic configs for topic test-topic are: retention.ms=10000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000}
复制代码


在 Zookeeper 上可以查看 /config/topics/<topic-name> 来查看 Topic 动态参数。


[zk: (CONNECTED) ] > get /config/topics/test-topic{"version":1,"config":{"retention.ms":"10000"}}cZxid = 17179869460ctime = 1631245744105mZxid = 17179869619mtime = 1631250116481pZxid = 17179869460cversion = 0dataVersion = 10aclVersion = 0ephemeralOwner = 0dataLength = 47numChildren = 0[zk: (CONNECTED) ] > get /config/topics/test-topic
复制代码


删除 Topic 动态参数。


kafka-configs.sh --bootstrap-server  10.37.249.58:9092 \--entity-type topics --entity-name test-topic --alter \--delete-config retention.ms
复制代码

Kafka 集群一键启动/停止脚本

环境变量设置:


#/etc/profile 文件export KAFKA_HOME=/usr/local/kafkaexport PATH=$PATH:$KAFKA_HOME/bin
复制代码


一键启动/停止脚本,查看状态需要安装 jps 工具。


#! /bin/bash# 填写 Kafka Broker 节点地址hosts=(kafka1 kafka2 kafka3)
# 打印启动分布式脚本信息mill=`date "+%N"`tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
# 执行分布式开启命令function start(){ for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" & sleep 1 done}
# 执行分布式关闭命令function stop(){ for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" & sleep 1 done}
# 查看 Kafka Broker 节点状态function status(){ for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh root@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" & sleep 1 done}
# 判断输入的 Kafka 命令参数是否有效case "$1" in start) start ;; stop) stop ;; status) status ;; *) echo "Usage: $0 {start|stop|status}" RETVAL=1esac
复制代码

参考资料

欢迎关注


发布于: 5 小时前阅读数: 9
用户头像

Se7en

关注

还未添加个人签名 2020.01.10 加入

还未添加个人简介

评论

发布
暂无评论
Kafka 常用工具脚本总结