使用 Logstash 实现 PostgreSQL 到 Elasticsearch 的数据摄取
使用 Logstash 实现 PostgreSQL 到 Elasticsearch 的数据摄取
什么是 Logstash?
Logstash 是 Elastic 提供的开源数据处理管道工具,用于摄取、转换和将数据发送到不同源,包括 Elasticsearch、Kafka、平面文件等。
Logstash 管道包含三个不同的处理过程:
输入:从中收集数据以进行摄取的数据源
过滤器:使用 Grok、Mutate、Date 等插件转换(清理、聚合等)数据
输出:摄取的目标(Elasticsearch、平面文件、数据库等)
以下是使用 Logstash 将数据发送到 Elasticsearch 的先决条件:
系统上安装了 Logstash 和 Postgres 的 JDBC 驱动程序
具有要同步的表或函数的 Postgres 数据库
正在运行的 Elasticsearch 实例
Logstash 设置(Windows 版)
以下是本地安装和运行 Logstash 的简要步骤。
1. 安装 Java
从官方 Oracle 网站下载 JDK 包(Java 8 或更高版本)。下载完成后,将文件解压缩到首选位置。
解压缩文件后,需要添加环境变量以便系统识别 Java 命令。
转到环境变量,添加一个名为 JAVA_HOME 的新变量,并将其指向 Java 文件所在的目录。将 %JAVA_HOME%\bin 附加到路径中。
要验证安装是否成功,请转到命令提示符并运行以下命令:
如果一切设置正确,它将显示 Java 版本。
2. 安装 Logstash
要安装 Logstash,请从官方 Elastic 网站下载包,并将其解压缩到首选位置。
要在本地测试,请打开命令提示符,导航到 Logstash 文件夹中的 bin 文件夹,并运行以下命令:
Logstash 摄取管道
1. 安装所需的 JDBC 驱动程序
从官方 PostgreSQL 网站下载 Postgres 驱动程序。将 jar 文件放在可访问的位置。
2. 创建 Logstash 管道
以下是示例管道:
上述管道用于增量摄取。这意味着它会跟踪最后一次运行,并从最后一次运行开始获取记录,按照计划摄取数据。
以下是使用的关键概念:
输入:
jdbc_driver_library- JDBC 驱动程序文件(.jar)的存储位置jdbc_driver_class- 正在使用的驱动程序类jdbc_connection_string- postgres 数据库连接字符串jdbc_user- 数据库用户名jdbc_password- 用户的数据库密码paging- 数据将以多页形式发送,每页大小为 1000。这将提高管道的性能,并有助于跟踪发送到 Elasticsearch 的记录数schedule- 上述管道计划每分钟运行一次。以下是计划的格式:statement- 管道将执行的 SQL 语句。要执行复杂的语句,可以将其保存在单独的.sql 文件中,并将文件路径提及到 statement_filepath 而不是 statement。最好使用视图或物化视图而不是具有复杂连接的查询。
最后一部分用于增量摄取:
use_column_value设置为 true。它让 Logstash 知道跟踪在 tracking_column 中使用的列 updated_at 的实际值,而不是使用上次运行查询的时间。在这种情况下,:sql_last_value将使用 updated_dt 值。如果设置为 false,Logstash 将使用上次查询执行时间作为
:sql_last_value。最后一次运行时间将保存在 last_run_metadata_path 中提到的文件中。它将用于跟踪管道最后一次运行的时间。
过滤器这是一个可选部分,用于在将数据发送到目标之前操作数据。
在上述管道中,日期字段正在从摄取中删除。此外,它还将数据中的 first_name 发送到目标中的 name 字段。
输出此部分定义数据的目标。在这种情况下,它是 Elasticsearch 端点、授权密钥(如果有)、elastic 索引、document_id。document_id 是索引中 elastic 文档的唯一标识符。如果未提及此字段,Elasticsearch 将自动为文档分配唯一标识符。
在增量摄取的情况下,建议定义此字段。在摄取期间,Elasticsearch 将在索引中查找此字段;如果匹配,它将更新同一文档。
如果未定义该字段,它将在索引中创建一个新文档,从而导致重复记录。
运行管道
要运行此管道,请打开命令提示符,转到 Logstash 文件夹,并运行以下命令:
以下是管道的输出。
来自 Elasticsearch 索引的输出。
这种方法有几个优点:
Logstash 是一个开源工具,易于实现。
有 200 多个可用于数据转换的插件。使用这些插件,可以使用过滤器解析和转换数据。
它是数据源和 Elasticsearch 之间的解耦架构。
与 Elasticsearch 无缝集成。
尽管这是一种开源的简单实现方法,但它也有一些缺点:
延迟问题:对于需要极低延迟或实时数据的应用程序来说,它并不理想。随着管道的增长,加载、转换/过滤和发送数据需要时间。
错误处理:除非明确监控,否则很难跟踪错误,这可能导致数据丢失。
如果管道定义不当,可能会产生重复项。
与其他工具相比,启动时间更长。
它使用 YAML 风格的配置文件,这使其变得复杂且难以维护。
资源利用:在重负载和复杂管道的情况下,它可能利用更多资源。
如果某人正在寻找更强大和集中化的数据流管道,可以使用上述管道。它不适用于实时数据传送。更多精彩内容 请关注我的个人公众号 公众号(办公 AI 智能小助手)对网络安全、黑客技术感兴趣的朋友可以关注我的安全公众号(网络安全技术点滴分享)
公众号二维码
公众号二维码







评论