KafkaMQ 日志采集最佳实践

概述
Kafka 是由 LinkedIn 开发、后由 Apache 软件基金会维护的分布式流处理平台,采用 Scala 和 Java 编写。它本质是一个高吞吐、持久化的发布-订阅消息系统,专注于处理实时数据流(如用户行为日志、点击流等)。在收集日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志,链路,指标数据。
观测云
观测云是一款专为 IT 工程师打造的全链路可观测产品,它集成了基础设施监控、应用程序性能监控和日志管理,为整个技术栈提供实时可观察性。这款产品能够帮助工程师全面了解端到端的用户体验追踪,了解应用内函数的每一次调用,以及全面监控云时代的基础设施。此外,观测云还具备快速发现系统安全风险的能力,为数字化时代提供安全保障。
本实践主要是通过观测云消费 Kafka 队列收集到的日志数据,并将数据通过 Pipeline 进行字段提取和分类,便于用户对日志数据进行可视化分析。
部署 Kafka
目前 DataKit 支持的 Kafka 版本有 [ version:0.8.2 ~ 3.2.0 ]。
下载 3.2.0 版本,解压即可使用。
1、启动 Zookeeper 服务
2、启动 KafkaServer
3、创建 Topic
创建名为 testlog 的 Topic 。
4、启动 Producer
部署 DataKit
DataKit 是一个开源的、跨平台的数据收集和监控工具,由观测云开发并维护。它旨在帮助用户收集、处理和分析各种数据源,如日志、指标和事件,以便进行有效的监控和故障排查。DataKit 支持多种数据输入和输出格式,可以轻松集成到现有的监控系统中。
登录观测云控制台,在「集成」 - 「DataKit」选择对应安装方式,当前采用 Linux 主机部署 DataKit。

开启 KafkaMQ 采集器
进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/
) 的 conf.d/kafkamq
目录,复制 kafkamq.conf.sample
并命名为 kafkamq.conf
。
类似如下:
调整 kafkamq 采集器配置如下:
addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
kafka_version = "3.2.0",该文使用 Kafka 的版本。
[inputs.kafkamq.custom],删除注释符号“#”。
[inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
"testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
其他一些配置说明:
group_id = "datakit-group":消费者组名称,相同组内消费者共享分区消费进度。不同消费者组可独立消费同一主题。
assignor = "roundrobin":分区轮询分配给消费者,适合组内消费者订阅相同主题列表,实现负载均衡。
注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。
编写 Pipeline
log.p
规则内容:
效果展示
发送业务日志样例
业务日志样例文件如下:
日志发送命令
在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。
效果
通过 DataKit 采集到 Kafka 的三条业务日志

使用 Pipeline 对日志进行字段提取的效果展示
下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。

结语
观测云通过集成 KafkaMQ ,实现了 Kafka 队列日志数据的高效采集和处理,并结合观测云的 Pipeline 功能,能够实时采集业务日志并进行字段提取和分类,便于后续分析和可视化;此外,DataKit 的 KafkaMQ 采集器可扩展应用于其他数据处理场景,如还支持链路(如开源 otel,skywalking,jaeger),指标,RUM 等数据的消费,这种集成方案提升了系统的可观测性,同时反映了观测云平台的开放和包容性,加速了企业的数字化转型。
评论