Kafka 业务日志采集最佳实践
简介
Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据流管道和应用程序。在收集业务日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志数据。将 Kafka 与其他系统(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更丰富的日志处理和分析功能。本文提到的是和观测云集成,即通过观测云的采集器 Datakit 采集 Kafka 中的业务日志,下面通过一些例子了解下观测云的快速集成效果。
实践环境
前置条件
软件和中间件
Kafka3.2.0
Datakit 采集器
JDK 8
硬件
云服务器 CentOS7.9 64 位 4vCPU,8GB 内存,100GB 云盘一台。
接入方案
准备 Kafka 环境
安装 Kafka
下载 3.2.0 版本,解压即可使用。
wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz
注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]
启动 Zookeeper 服务
启动 KafkaServer
创建 Topic
创建名为 testlog 的 Topic 。
启动 Producer
安装 DataKit
参考官网文档安装 DataKit 采集器
开启 Kafka 采集器
进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/
) 的 conf.d/kafkamq
目录,复制 kafkamq.conf.sample
并命名为 kafkamq.conf
。
类似如下:
调制 kafka 采集器配置如下:
addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
kafka_version = "3.2.0",该文使用 Kafka 的版本。
[inputs.kafkamq.custom],删除注释符号“#”。
[inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
"testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。
使用 Pipeline
log.p 规则内容
效果展示
发送业务日志样例
业务日志样例文件如下:
日志发送命令
在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。
通过 DataKit 采集到 Kafka 的三条业务日志
使用 Pipeline 对业务日志进行字段提取
下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。
评论