写点什么

使用 Logstash 实现 PostgreSQL 到 Elasticsearch 的数据摄取

作者:qife122
  • 2025-11-25
    福建
  • 本文字数:3439 字

    阅读完需:约 11 分钟

使用 Logstash 实现 PostgreSQL 到 Elasticsearch 的数据摄取

什么是 Logstash?

Logstash 是 Elastic 提供的开源数据处理管道工具,用于摄取、转换和将数据发送到不同源,包括 Elasticsearch、Kafka、平面文件等。


Logstash 管道包含三个不同的处理过程:


  • 输入:从中收集数据以进行摄取的数据源

  • 过滤器:使用 Grok、Mutate、Date 等插件转换(清理、聚合等)数据

  • 输出:摄取的目标(Elasticsearch、平面文件、数据库等)


以下是使用 Logstash 将数据发送到 Elasticsearch 的先决条件:


  • 系统上安装了 Logstash 和 Postgres 的 JDBC 驱动程序

  • 具有要同步的表或函数的 Postgres 数据库

  • 正在运行的 Elasticsearch 实例

Logstash 设置(Windows 版)

以下是本地安装和运行 Logstash 的简要步骤。

1. 安装 Java

从官方 Oracle 网站下载 JDK 包(Java 8 或更高版本)。下载完成后,将文件解压缩到首选位置。


解压缩文件后,需要添加环境变量以便系统识别 Java 命令。


转到环境变量,添加一个名为 JAVA_HOME 的新变量,并将其指向 Java 文件所在的目录。将 %JAVA_HOME%\bin 附加到路径中。


要验证安装是否成功,请转到命令提示符并运行以下命令:


java -version
复制代码


如果一切设置正确,它将显示 Java 版本。

2. 安装 Logstash

要安装 Logstash,请从官方 Elastic 网站下载包,并将其解压缩到首选位置。


要在本地测试,请打开命令提示符,导航到 Logstash 文件夹中的 bin 文件夹,并运行以下命令:


logstash -e "input { stdin {} } output { stdout {} }"
复制代码

Logstash 摄取管道

1. 安装所需的 JDBC 驱动程序

从官方 PostgreSQL 网站下载 Postgres 驱动程序。将 jar 文件放在可访问的位置。

2. 创建 Logstash 管道

以下是示例管道:


input {    jdbc {        jdbc_driver_library => "c:/logstash/jdbc/postgresql.jar"        jdbc_driver_class => "org.postgresql.Driver"        jdbc_connection_string => "${JDBC_HOST}"        jdbc_user => "${DB_USER}"        jdbc_password => "${DB_PWD}"        jdbc_paging_enabled => true        jdbc_page_size => 1000        schedule => "* * * * *"  # 计划每分钟运行一次        statement => "SELECT * FROM employee WHERE updated_at > :sql_last_value"        use_column_value => true        tracking_column => "updated_at"        tracking_column_type => "timestamp"        last_run_metadata_path => "c:/logstash/employee.tracker"    }}
filter { mutate { remove_field => ["date", "@timestamp", "host"] }
# 如果需要解析JSON字段的示例 json { source => "first_name" target => "name" }}
output { stdout { codec => json_lines } elasticsearch { hosts => ["http://localhost:9200"] index => "my_table_index" custom_headers => { "Authorization" => "${AUTH_KEY}" } document_id => "%{table_id}" # 表中的唯一标识符 timeout => 120 }}
复制代码


上述管道用于增量摄取。这意味着它会跟踪最后一次运行,并从最后一次运行开始获取记录,按照计划摄取数据。


以下是使用的关键概念:


输入


  • jdbc_driver_library - JDBC 驱动程序文件(.jar)的存储位置

  • jdbc_driver_class - 正在使用的驱动程序类

  • jdbc_connection_string - postgres 数据库连接字符串

  • jdbc_user - 数据库用户名

  • jdbc_password - 用户的数据库密码

  • paging - 数据将以多页形式发送,每页大小为 1000。这将提高管道的性能,并有助于跟踪发送到 Elasticsearch 的记录数

  • schedule - 上述管道计划每分钟运行一次。以下是计划的格式:

  • statement - 管道将执行的 SQL 语句。要执行复杂的语句,可以将其保存在单独的.sql 文件中,并将文件路径提及到 statement_filepath 而不是 statement。最好使用视图或物化视图而不是具有复杂连接的查询。


最后一部分用于增量摄取:


use_column_value => truetracking_column => "updated_dt"tracking_column_type => "timestamp"last_run_metadata_path => "c:/project/logstash/date.tracker"
复制代码


  • use_column_value设置为 true。它让 Logstash 知道跟踪在 tracking_column 中使用的列 updated_at 的实际值,而不是使用上次运行查询的时间。在这种情况下,:sql_last_value将使用 updated_dt 值。

  • 如果设置为 false,Logstash 将使用上次查询执行时间作为:sql_last_value

  • 最后一次运行时间将保存在 last_run_metadata_path 中提到的文件中。它将用于跟踪管道最后一次运行的时间。


过滤器这是一个可选部分,用于在将数据发送到目标之前操作数据。


在上述管道中,日期字段正在从摄取中删除。此外,它还将数据中的 first_name 发送到目标中的 name 字段。


输出此部分定义数据的目标。在这种情况下,它是 Elasticsearch 端点、授权密钥(如果有)、elastic 索引、document_id。document_id 是索引中 elastic 文档的唯一标识符。如果未提及此字段,Elasticsearch 将自动为文档分配唯一标识符。


在增量摄取的情况下,建议定义此字段。在摄取期间,Elasticsearch 将在索引中查找此字段;如果匹配,它将更新同一文档。


如果未定义该字段,它将在索引中创建一个新文档,从而导致重复记录。

运行管道

要运行此管道,请打开命令提示符,转到 Logstash 文件夹,并运行以下命令:


bin/logstash -f c:/logstash/sample_pipeline.conf
复制代码


以下是管道的输出。


来自 Elasticsearch 索引的输出。


{    "took": 1,    "timed_out": false,    "_shards": {        "total": 1,        "successful": 1,        "skipped": 0,        "failed": 0    },    "hits": {        "total": {            "value": 3,            "relation": "eq"        },        "max_score": 1.0,        "hits": [            {                "_index": "testing",                "_id": "1",                "_score": 1.0,                "_source": {                    "name": "James",                    "id": 1,                    "last_name": "Smith",                    "updated_dt": "2024-12-12T16:10:57.349Z",                    "@version": "1",                    "@timestamp": "2025-06-25T20:41:02.167442600Z"                }            },            {                "_index": "testing",                "_id": "2",                "_score": 1.0,                "_source": {                    "name": "John",                    "id": 2,                    "last_name": "Doe",                    "updated_dt": "2024-12-12T16:10:57.349Z",                    "@version": "1",                    "@timestamp": "2025-06-25T20:41:02.169021400Z"                }            },            {                "_index": "testing",                "_id": "3",                "_score": 1.0,                "_source": {                    "name": "Kate",                    "id": 3,                    "last_name": "Williams",                    "updated_dt": "2024-12-12T16:10:57.349Z",                    "@version": "1",                    "@timestamp": "2025-06-25T20:41:02.170098800Z"                }            }        ]    }}
复制代码


这种方法有几个优点:


  • Logstash 是一个开源工具,易于实现。

  • 有 200 多个可用于数据转换的插件。使用这些插件,可以使用过滤器解析和转换数据。

  • 它是数据源和 Elasticsearch 之间的解耦架构。

  • 与 Elasticsearch 无缝集成。


尽管这是一种开源的简单实现方法,但它也有一些缺点:


  • 延迟问题:对于需要极低延迟或实时数据的应用程序来说,它并不理想。随着管道的增长,加载、转换/过滤和发送数据需要时间。

  • 错误处理:除非明确监控,否则很难跟踪错误,这可能导致数据丢失。

  • 如果管道定义不当,可能会产生重复项。

  • 与其他工具相比,启动时间更长。

  • 它使用 YAML 风格的配置文件,这使其变得复杂且难以维护。

  • 资源利用:在重负载和复杂管道的情况下,它可能利用更多资源。


如果某人正在寻找更强大和集中化的数据流管道,可以使用上述管道。它不适用于实时数据传送。更多精彩内容 请关注我的个人公众号 公众号(办公 AI 智能小助手)对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)


公众号二维码


办公AI智能小助手


公众号二维码


网络安全技术点滴分享


用户头像

qife122

关注

还未添加个人签名 2021-05-19 加入

还未添加个人简介

评论

发布
暂无评论
使用Logstash实现PostgreSQL到Elasticsearch的数据摄取_elasticsearch_qife122_InfoQ写作社区