写点什么

利用 Vector 将 Kafka 中的日志数据高效写入 GreptimeDB

  • 2024-10-29
    北京
  • 本文字数:9678 字

    阅读完需:约 32 分钟

利用 Vector 将 Kafka 中的日志数据高效写入 GreptimeDB

Kafka 是一款具备高吞吐量、高可靠性和高可扩展性的分布式消息队列,而 GreptimeDB 是专门用于存储时间序列数据的开源时序数据库。两者在各自的领域都表现出色,但如何高效地连接它们以实现数据的无缝传输和处理?


Vector 作为一个高速且可扩展的数据管道工具发挥了作用。它能够从多个来源(如应用日志、系统指标)收集、转换并传输数据,并将这些数据发送到不同的目标(如数据库、监控系统)。


而随着 GreptimeDB 现已全面支持日志数据的存储与分析,日志接收功能 greptime log sink 已被集成到 Vector 中,使用户可以通过 greptime_logs sink 将来自 Vector 的各种数据源轻松写入 GreptimeDB。详情和示例代码可参考此文:《Vector 增加 GreptimeDB 日志写入支持,连接数十种数据源》。


接下来,本文将详细介绍如何使用 Vector 从 Kafka 读取日志数据并将其写入 GreptimeDB,包括具体的实现步骤与示例代码。

准备工作

假设我们已经有一个 Kafka 集群,其中有一个名为 test_topic 的 topic,里面存储了日志数据。Kafka 中的示例数据内容如下:


127.0.0.1 - - [04/Sep/2024:15:46:13 -0700] "GET / HTTP/1.1" 200 615 "-" "Mozilla/5.0 (X11; Linux x86_64; rv:130.0) Gecko/20100101 Firefox/130.0"
复制代码


接下来,我们需要安装 Vector 和 GreptimeDB。

安装 & 配置 Vector

Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。我们可以使用 Vector 从 Kafka 读取数据,并将数据写入 GreptimeDB。


安装 Vector 非常简单,可通过二进制容器等进行安装,具体安装步骤请参考 Vector 官方文档


安装完成后,我们需要配置 Vector,使其能够从 Kafka 读取数据并写入 GreptimeDB。下面是一个简单的 Vector 配置文件:


[sources.mq]type = "kafka"group_id = "vector0"topics = ["test_topic"]bootstrap_servers = "kafka:9092"
[sinks.console]type = "console"inputs = [ "mq" ]encoding.codec = "text"
[sinks.sink_greptime_logs]type = "greptimedb_logs"table = "demo_logs"pipeline_name = "demo_pipeline"compression = "gzip"inputs = [ "mq" ]endpoint = "http://greptimedb:4000"
复制代码


上面的配置文件中,我们定义了一个名为 mq 的 source,用于从 Kafka 读取数据。我们还定义了一个名为 sink_greptime_logs 的 sink,用于将数据写入 GreptimeDB。

安装 & 配置 GreptimeDB

GreptimeDB 是一个开源的时序数据库,专门用于存储时间序列数据。我们可以使用 GreptimeDB 存储从 Kafka 读取的日志数据。


安装 GreptimeDB 同样非常简单,可通过二进制、容器等进行安装。具体安装步骤请参考 GreptimeDB 官方文档


安装完成后,我们使用默认配置即可。因为日志数据多种多样,我们提供了 Pipeline 功能来处理和过滤日志数据,只保留日志中我们关心的数据,我们将在后续技术博客中分享 Pipeline 引擎的实现原理和方案步骤,敬请期待。


如下例子对我们提供的 nginx 日志格式进行了解析,我们使用如下所示的 Pipeline 配置文件。


processors:  - dissect:      fields:        - message      patterns:        - '%{ip} - - [%{datetime}] "%{method} %{path} %{protocol}" %{status} %{size} "-" "%{user_agent}"'  - date:      fields:        - datetime      formats:        - "%d/%b/%Y:%H:%M:%S %z"  - date:      fields:        - timestamp      formats:        - "%Y-%m-%dT%H:%M:%S%.3fZ"
transform: - fields: - ip - path type: string - fields: - method - protocol type: string index: tag - fields: - user_agent type: string index: fulltext - fields: - status type: uint32 index: tag - fields: - size type: uint32 - fields: - datetime type: timestamp index: timestamp - fields: - timestamp type: timestamp
复制代码


在上面的 Pipeline 配置文件中,我们使用 dissect processor 对日志数据进行解析。本来非结构化的日志数据,被拆分并进行格式转化后,获得了一个结构化的数据,包含 ipdatatimemethodpathprotocolstatussizeuser_agent 。然后使用 date processor 对时间两个不同格式的时间字段进行解析。最后使用 transform 对字段进行转换,并设置 index。


关于 index,我们指定了 methodprotocolstatus 为 tag 字段,主要用于高效的查询,一些不确定值的数量的字段,或者值的数量特别多的,不建议设置为 tag,这会导致高基问题。所以 ipsize 均没有被设置为 tag 字段。


pathuser_agent 字段,我们增加了全文索引。以便可以使用模糊搜索来快速找到的关心的内容。详细的查询语法可参考此处


上述配置文件可通过 HTTP 接口上传到 GreptimeDB 中,以创建一个名为 demo_pipeline 的 Pipeline 用于日志的解析与修剪,然后存入 GreptimeDB 中。


curl -X 'POST' 'http://greptimedb:4000/v1/events/pipelines/demo_pipeline' -F 'file=@/config_data/pipeline.yaml' -v
复制代码

运行 Vector & GreptimeDB

现在,我们已经准备好了 Vector 和 GreptimeDB,现在就可以运行它们了。成功后,Vector 将从 Kafka 读取数据,并将数据写入 GreptimeDB。


我们可以通过 MySQL 协议连接 GreptimeDB,查看数据。


mysql> show tables;+-------------+| Tables      |+-------------+| demo_logs   || numbers     |+-------------+3 rows in set (0.00 sec)
mysql> select * from demo_logs order by timestamp desc limit 10;+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+| ip | method | protocol | path | user_agent | status | size | datetime | timestamp |+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+| 37.254.223.207 | DELETE | HTTP/2.0 | /about | Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World) | 201 | 495 | 2024-10-28 03:39:29 | 2024-10-28 03:39:29.982000 || 113.26.47.170 | PUT | HTTP/2.0 | /contact | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) | 404 | 183 | 2024-10-28 03:39:26 | 2024-10-28 03:39:26.977000 || 33.80.49.13 | PUT | HTTP/2.0 | /about | Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11 | 500 | 150 | 2024-10-28 03:39:23 | 2024-10-28 03:39:23.973000 || 240.14.156.37 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13 | 200 | 155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 || 210.90.39.41 | POST | HTTP/2.0 | /about | Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; .NET4.0C; .NET4.0E; LBBROWSER) | 201 | 188 | 2024-10-28 03:39:17 | 2024-10-28 03:39:17.964000 || 219.88.194.150 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0) | 404 | 704 | 2024-10-28 03:39:14 | 2024-10-28 03:39:14.963000 || 130.255.0.241 | DELETE | HTTP/1.1 | /contact | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1) | 500 | 816 | 2024-10-28 03:39:11 | 2024-10-28 03:39:11.959000 || 168.144.155.215 | POST | HTTP/1.1 | / | Mozilla/5.0 (iPhone; U; CPU iPhone OS 4_3_3 like Mac OS X; en-us) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8J2 Safari/6533.18.5 | 500 | 511 | 2024-10-28 03:39:08 | 2024-10-28 03:39:08.954000 || 28.112.30.158 | GET | HTTP/1.1 | /about | Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; en) Opera 9.50 | 200 | 842 | 2024-10-28 03:39:05 | 2024-10-28 03:39:05.950000 || 166.9.187.104 | GET | HTTP/2.0 | /blog | Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.71 Safari/537.36 | 201 | 970 | 2024-10-28 03:39:02 | 2024-10-28 03:39:02.946000 |+-----------------+--------+----------+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+10 rows in set (0.00 sec)
mysql> desc demo_logs;+------------+---------------------+------+------+---------+---------------+| Column | Type | Key | Null | Default | Semantic Type |+------------+---------------------+------+------+---------+---------------+| ip | String | | YES | | FIELD || method | String | PRI | YES | | TAG || protocol | String | PRI | YES | | TAG || path | String | | YES | | FIELD || user_agent | String | | YES | | FIELD || status | UInt32 | PRI | YES | | TAG || size | UInt32 | | YES | | FIELD || datetime | TimestampNanosecond | PRI | NO | | TIMESTAMP || timestamp | TimestampNanosecond | | YES | | FIELD |+------------+---------------------+------+------+---------+---------------+9 rows in set (0.00 sec)

mysql> show create table demo_logs;+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Table | Create Table |+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| demo_logs | CREATE TABLE IF NOT EXISTS `demo_logs` ( `ip` STRING NULL, `method` STRING NULL, `protocol` STRING NULL, `path` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'), `user_agent` STRING NULL FULLTEXT WITH(analyzer = 'English', case_sensitive = 'false'), `status` INT UNSIGNED NULL, `size` INT UNSIGNED NULL, `datetime` TIMESTAMP(9) NOT NULL, `timestamp` TIMESTAMP(9) NULL, TIME INDEX (`datetime`), PRIMARY KEY (`method`, `protocol`, `status`))
ENGINE=mitoWITH( append_mode = 'true') |+-----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.00 sec)
复制代码


现在我们的数据已经入库,可以利用 GreptimeDB 提供的一些功能来快速过滤我们关心的数据,比如通过全文搜索我们可以对 UA 进行模糊匹配,快速找到 UA 包含 Android 的数据。


mysql> SELECT * FROM demo_logs WHERE MATCHES(user_agent, 'Android') limit 10;+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+| ip              | method | protocol | path     | user_agent                                                                                                                                                         | status | size | datetime            | timestamp                  |+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+| 240.14.156.37   | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 3.0; en-us; Xoom Build/HRI39) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13                                      |    200 |  155 | 2024-10-28 03:39:20 | 2024-10-28 03:39:20.969000 || 186.44.204.29   | DELETE | HTTP/1.1 | /        | Opera/9.80 (Android 2.3.4; Linux; Opera Mobi/build-1107180945; U; en-GB) Presto/2.8.149 Version/11.10                                                              |    201 |  343 | 2024-10-28 03:45:33 | 2024-10-28 03:45:33.459000 || 75.246.111.167  | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    404 |  869 | 2024-10-28 03:38:59 | 2024-10-28 03:38:59.942000 || 236.239.192.109 | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.2.1; zh-cn; HTC_Wildfire_A3333 Build/FRG83D) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                |    500 |  892 | 2024-10-28 03:38:53 | 2024-10-28 03:38:53.934000 || 232.232.14.176  | DELETE | HTTP/1.1 | /contact | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    500 |  644 | 2024-10-28 03:46:42 | 2024-10-28 03:46:42.550000 || 135.16.130.172  | DELETE | HTTP/2.0 | /        | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 |    404 |  177 | 2024-10-28 03:47:27 | 2024-10-28 03:47:27.613000 || 69.23.7.123     | GET    | HTTP/1.1 | /blog    | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    201 |  770 | 2024-10-28 03:45:09 | 2024-10-28 03:45:09.425000 || 37.61.6.211     | GET    | HTTP/1.1 | /blog    | MQQBrowser/26 Mozilla/5.0 (Linux; U; Android 2.3.7; zh-cn; MB200 Build/GRJ22; CyanogenMod-7) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1 |    404 |  298 | 2024-10-28 03:45:21 | 2024-10-28 03:45:21.442000 || 244.166.255.46  | GET    | HTTP/2.0 | /blog    | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    201 |  963 | 2024-10-28 03:45:48 | 2024-10-28 03:45:48.478000 || 35.169.107.238  | GET    | HTTP/2.0 | /blog    | Mozilla/5.0 (Linux; U; Android 2.3.7; en-us; Nexus One Build/FRF91) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1                          |    404 |  249 | 2024-10-28 03:46:48 | 2024-10-28 03:46:48.558000 |+-----------------+--------+----------+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+------+---------------------+----------------------------+10 rows in set (0.01 sec)
复制代码


在进行一些统计工作或者问题排查的时候,经常性的需要区分用户的渠道和 url。例如,我们可能需要筛选出 Android 渠道的用户,并查看访问 blog 页面时的 HTTP 状态码分布。通过以下 SQL 查询,可以快速获取所需结果,显著减少数据处理时间。


mysql> SELECT method,status,count(*) FROM demo_logs WHERE MATCHES(user_agent, 'Android') and MATCHES(path, 'blog') group by method,status;+--------+--------+----------+| method | status | COUNT(*) |+--------+--------+----------+| GET    |    404 |        2 || GET    |    201 |        3 || PUT    |    500 |        2 || POST   |    404 |        1 |+--------+--------+----------+4 rows in set (0.01 sec)
复制代码


我们已经将此过程打包成一个 docker compose 文件,欢迎前往 GitHub demo-scene repo 获取相关源码和指南:


https://github.com/GreptimeTeam/demo-scene/tree/main/kafka-ingestion

总结

本文介绍了如何利用 Vector 从 Kafka 读取日志数据并写入 GreptimeDB。Vector 是一个开源的数据收集工具,支持从多种数据源读取数据,并将数据写入多种数据目的地。目前已支持 GreptimeDB 的 sink,可以很方便的将原先系统中的监控数据导入 GreptimeDB 中。


本文介绍了如何利用 Vector 工具将 Kafka 中的日志数据无缝传输至 GreptimeDB 中,充分利用 GreptimeDB 在存储和分析时序数据上的优势,以及 Vector 的灵活性让数据处理更加高效。


GreptimeDB 强大的日志存储和查询功能为日志分析提供了可靠保障,无论是构建日志管理系统,还是进行实时监控与分析,Kafka + Vector + GreptimeDB 的组合能够帮助用户实现高效的数据流转与处理


未来我们将进一步介绍如何通过 GreptimeDB 的 Pipeline 引擎实现更加复杂的日志处理和数据过滤,敬请期待!


11 月 9 日我们将在深圳举办「云平台及 AI 时代下的可观测性技术演进」线下沙龙,欢迎报名: https://1965290734055.huodongxing.com/event/6777706662000


关于 Greptime

Greptime 格睿科技专注于为可观测、物联网及车联网等领域提供实时、高效的数据存储和分析服务,帮助客户挖掘数据的深层价值。目前基于云原生的时序数据库 GreptimeDB 已经衍生出多款适合不同用户的解决方案,更多信息或 demo 展示请联系下方小助手(微信号:greptime)。

欢迎对开源感兴趣的朋友们参与贡献和讨论,从带有 good first issue 标签的 issue 开始你的开源之旅吧~期待在开源社群里遇见你!添加小助手微信即可加入“技术交流群”与志同道合的朋友们面对面交流哦~

Star us on GitHub Now: https://github.com/GreptimeTeam/greptimedb

官网:https://greptime.cn/

文档:https://docs.greptime.cn/

Twitter: https://twitter.com/Greptime

Slack: https://greptime.com/slackLinkedIn: https://www.linkedin.com/company/greptime/

用户头像

专注于 Infra 技术分享 2022-09-23 加入

分布式、高性能、存储计算分离的开源云原生时序数据库

评论

发布
暂无评论
利用 Vector 将 Kafka 中的日志数据高效写入 GreptimeDB_kafka_Greptime 格睿科技_InfoQ写作社区