1 topic 相关
1.1 查看 kafka 集群的所有 topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
复制代码
1.2 创建名称为 topic_test 的 topic,partitions 为 1 个,副本为 1 个
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test
复制代码
1.3 修改 topic_test 的 partition 为 10(只能改大不能改小)
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
复制代码
1.4 删除名称为 topic_test 的 topic(只删除 zookeeper 内的元数据,消息文件须手动删除)
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_test
复制代码
1.5 查看 topic 为 topic_test 的详细信息
./bin/kafka-topics.sh -zookeeper localhost:2181 -describe -topic topic_test
复制代码
2 consumer 相关
2.1 查看所有 consumer 的 group
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
复制代码
2.2 查看 group 为 group_test 消费的机器以及 partition
./bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092
复制代码
2.3 查询__consumer_offsets topic 所有内容
需要注意的是,__consumer_offset 是内部使用的 topic,外部应用程序是没法读取的,需要在 config/consumer.properties 关闭 exclude.internal.topics 这个参数。
exclude.internal.topics=false
# 0.11.0.0之前版本
./bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
# 0.11.0.0之后版本(含)
./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
复制代码
__consumer_offsets topic 的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
2.4 计算指定 consumer group 在 consumer_offsets topic 中分区信息,Kafka 会使用下面公式计算该 group 位移保存在 consumer_offsets 的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
复制代码
默认情况下__consumer_offsets 有 50 个分区,如果你的系统中 consumer group 也很多的话,那么这个命令的输出结果会很多。
2.5 查询__consumer_offsets topic 所有某个 partition 的所有内容
# 0.11.0.0之前版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --partition 20 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
# 0.11.0.0之后版本(含)
./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --partition 20 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
复制代码
3 控制台相关
3.1 控制台向 topic_test 发送数据
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test
复制代码
3.2 控制台接收 topic_test 数据
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test
复制代码
3.3 控制台接收 topic_test 最开始的数据
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning
复制代码
4 服务相关
4.1 启动 zookeeper 服务
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
复制代码
4.2 启动 kafka 服务
./bin/kafka-server-start.sh -daemon ./config/server.properties
复制代码
4.3 查看服务启动
5 遇到的问题
5.1 kafka 生产消息在 broker 上分配不均匀
__consumer_offser 大量提交写入导致 broker 负载不均匀,问题参考
Kafka 如何读取offset topic内容 (__consumer_offsets)
6 参考
https://kafka.apache.org
__consumer_offser大量提交写入导致broker负载不均匀
Kafka 如何读取offset topic内容 (__consumer_offsets)
评论