写点什么

大数据 -185 Logstash 7 入门实战:stdin/file 采集、sincedb/start_position 机制与排障

作者:武子康
  • 2025-12-16
    山东
  • 本文字数:4657 字

    阅读完需:约 15 分钟

大数据-185 Logstash 7 入门实战:stdin/file 采集、sincedb/start_position 机制与排障

TL;DR

场景:在 ELK 7.3.x 环境用 Logstash 7.3.0 快速把控制台/日志文件数据跑通到 stdout(rubydebug)结论:file 输入是否“从头读”由 sincedb 决定,start_position 只对“首次见到的文件”生效产出:可复用的最小可跑命令、文件监听配置骨架、以及高频故障定位/修复速查表


版本矩阵


简要介绍

Logstash 是一个功能强大的开源服务器端数据处理管道工具,由 Elastic 公司开发维护。它采用基于插件的模块化架构,主要用于实时收集、解析、过滤、丰富和转发各种类型的数据。

核心功能

  1. 数据收集

  2. 支持从多种数据源采集数据,包括但不限于:

  3. 日志文件(通过 filebeat 或直接读取)

  4. 数据库(MySQL、PostgreSQL 等)

  5. 消息队列(Kafka、RabbitMQ)

  6. 网络协议(TCP/UDP)

  7. 云服务(AWS S3、Azure Blob)

  8. 示例:可以配置 input 插件监控 /var/log/nginx/access.log 获取网站访问日志

  9. 数据处理

  10. 提供丰富的过滤器插件进行数据转换:

  11. grok:模式匹配解析非结构化日志

  12. mutate:字段修改和类型转换

  13. date:时间戳处理

  14. geoip:IP 地址地理位置解析

  15. 典型处理流程:原始日志 → 解析字段 → 过滤无用数据 → 添加元数据

  16. 数据输出

  17. 支持多种目的地:

  18. Elasticsearch(最常用)

  19. 关系型数据库

  20. 文件系统

  21. 消息队列

  22. 监控系统(如 Prometheus)

在 ELK Stack 中的角色

作为 Elastic Stack(ELK)的关键组件,Logstash 承担数据预处理管道的角色:


  • 与 Beats 轻量级采集器配合,构建完整数据采集链

  • 为 Elasticsearch 提供结构化的索引数据

  • 通过 Kibana 实现数据可视化时,依赖 Logstash 处理后的规范数据格式

典型应用场景

  1. 日志集中管理

  2. 收集分散在多个服务器上的应用日志

  3. 统一进行错误分析和性能监控

  4. 安全信息与事件管理(SIEM)

  5. 处理安全设备日志(防火墙、IDS)

  6. 支持实时安全事件分析

  7. 业务数据分析

  8. 处理交易日志生成业务报表

  9. 用户行为分析

性能特点

  • 支持多线程处理,可通过 pipeline 配置提高吞吐量

  • 提供持久化队列防止数据丢失

  • 具有插件热加载能力,无需重启服务


配置文件采用简洁的 DSL 语法,主要包含 input、filter 和 output 三个部分,通过条件判断可以实现复杂的数据路由逻辑。其丰富的插件生态系统(200+官方和社区插件)使其能够适应各种数据处理需求。

Logstash 的架构

Logstash 由三大核心组件组成,构成了完整的数据处理流水线:


  1. 输入(Inputs):输入插件作为数据采集入口,支持从多种异构数据源实时或批量获取数据。典型应用场景包括:


  • 文件日志采集:通过 filebeat 或直接监控日志文件(如 Nginx 访问日志)

  • 数据库同步:支持 JDBC 输入插件定期轮询 MySQL/Oracle 等关系型数据库

  • 消息队列消费:从 Kafka/RabbitMQ 等消息中间件获取数据流

  • 网络协议:通过 TCP/UDP 套接字、HTTP API 接收数据

  • 系统输入:支持从标准输入(STDIN)直接读取数据


  1. 过滤(Filters):过滤器提供强大的数据处理能力,可组合使用多个插件形成处理链:


  • grok:使用正则表达式解析非结构化日志(如将 Apache 日志解析为独立字段)

  • mutate:字段操作(重命名/删除/类型转换),如将字符串"200"转为整型

  • date:时间格式化,将"Aug 13 15:33:22"转为 ISO8601 格式

  • json:自动解析 JSON 字符串为结构化字段

  • csv:解析逗号分隔的文本数据

  • geoip:根据 IP 地址添加地理位置信息


  1. 输出(Outputs):输出插件支持将处理后的数据路由到不同目的地:


  • Elasticsearch:最常用输出,支持批量写入和自动索引管理

  • 文件系统:按日期分片存储处理结果

  • Kafka:作为数据缓冲或分发中心

  • 关系型数据库:通过 JDBC 插件写入 MySQL/PostgreSQL

  • 监控系统:输出到 Prometheus 或 Graphite

  • 调试输出:stdout 用于开发测试


补充处理组件:


  • 编解码器(Codecs):

  • JSON 编解码:在输入输出时自动序列化/反序列化

  • 多行处理:合并异常堆栈等跨行日志

  • 压缩解压:支持 gzip 等压缩格式

  • 管道间通信:支持多个 pipeline 之间通过 Redis/Kafka 传递数据

  • 监控端点:提供 HTTP API 查看运行指标和队列状态


典型处理流程示例:


  1. 从 Kafka 消费原始日志(输入)

  2. 用 grok 解析日志格式(过滤)

  3. 添加处理时间戳(date 插件)

  4. 删除敏感字段(mutate 插件)

  5. 写入 Elasticsearch 集群(输出)

Logstash 的工作流程

Logstash 的工作流程如下:


  • 数据输入(Input): Logstash 从数据源中收集原始数据,如应用程序日志、系统日志或网络流量日志等。

  • 数据过滤(Filter): 收集到的数据通过配置的过滤器进行处理。这一步可以解析日志、过滤无用数据、格式化字段、添加或删除字段、进-行数据聚合等操作。

  • 数据输出(Output): 最终经过处理的数据被输出到一个或多个目标系统,如 Elasticsearch 用于存储和搜索,或者输出到文件用于归档。

Logstash 的功能特性

  • 多种输入插件支持: Logstash 支持多种输入源,几乎可以从任何类型的系统或文件中收集数据。这些输入源可以是本地的日志文件、数据库、HTTP、TCP、UDP 连接等。

  • 强大的数据处理能力: Logstash 提供了丰富的过滤插件,可以对数据进行复杂的操作。通过 grok 插件,用户可以解析复杂的日志格式,提取关键信息。Logstash 还支持条件语句,可以根据条件对不同的数据进行不同的处理。

  • 灵活的输出机制: Logstash 支持多种输出方式,可以将数据发送到多个目标系统。常见的输出包括 Elasticsearch、文件系统、标准输出、Kafka 等。

  • 扩展性: Logstash 通过插件架构设计,用户可以根据需要开发和使用自定义插件,从而满足各种特殊需求。

  • 实时性: Logstash 可以实时处理大量数据,适合于实时日志分析、事件响应、监控告警等场景。

  • 错误处理和重试机制: Logstash 支持对处理失败的数据进行重试和错误处理,确保数据不丢失,并提供可靠的传输保障。

官方网站

https://www.elastic.co/logstash
复制代码


对应的页面截图如下:



Logstash 就是一个具备实时数据传输能力的管道,负责将数据信息从管道的输入传出到管道的输出端,与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash 提供里很多功能强大的滤网以满足你的各种应用场景。是一个 input | filter | output 的数据流。

项目下载

下载指定版本的 Logstash,我这里适配着 ES 的版本,7.3 版本:


https://www.elastic.co/downloads/past-releases#logstash
复制代码


这里放一个直达链接,7.3 版本的:


https://www.elastic.co/downloads/past-releases/logstash-7-3-0
复制代码


我这里直接下载到服务器上 h121.wzk.icu 节点。


wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.0.tar.gz
复制代码


对应的截图如下图所示:


解压配置

cd /opt/softwaremv logstash-7.3.0 ../servers/cd /opt/servers/logstash-7.3.0/
复制代码


对应的截图如下所示:


Input 插件

stdin 与 stdout

标准输入和标准输出,实现我们的数据从控制台输入,从控制台输出。


cd /opt/servers/logstash-7.3.0/bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'{  "@version" => "1",  "host" => "h121.wzk.icu",  "@timestamp" => 2024-08-16T08:33:13.126Z,  "message" => "hello"}
复制代码


运行之后,可以看到控制台对应的输出为如下的内容:


监控日志文件变化

修改配置

Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化,这个库支持 glob 展开路径,而且会记录一个叫 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 Logstash 会漏掉你的数据。


cd /opt/servers/logstash-7.3.0/configvim monitor_file.conf
复制代码


向其中写入对应的内容,并退出保存:


input{  file{    path => "/opt/servers/es/logs/wzkicu-es.log"    type => "log"    start_position => "beginning"  }}output{  stdout{    codec=>rubydebug  }}
复制代码


写入的内容如下图所示:



补充一下:start_positon => beginning 或者是 end

检查配置

通过下面的指令进行检查:


cd /opt/servers/logstash-7.3.0bin/logstash -f /opt/servers/logstash-7.3.0/config/monitor_file.conf -t
复制代码


截图如下图所示:



可以看到:“Configuration OK”,说明我们的配置文件是正确的。

启动服务

cd /opt/servers/logstash-7.3.0bin/logstash -f /opt/servers/logstash-7.3.0/config/monitor_file.conf
复制代码


启动结果如下图所示:


发送数据

我们监听到是 ES 的日志,只要你进行一些操作,就会看到日志被解析了:


其他参数

以下是扩展后的内容,增加了更多细节说明和实际应用示例:

File Input 配置参数详解

Path

  • 定义:指定需要监控的文件路径,支持通配符匹配多个文件

  • 示例

  • /var/log/nginx/*.log 监控 nginx 目录下所有.log 文件

  • /var/log/**/*.log 递归监控所有子目录中的.log 文件

  • ["/var/log/app1.log", "/var/log/app2.log"] 监控多个具体文件

Type

  • 作用:为不同类型的日志文件打上标记,便于后续处理流程区分

  • 应用场景

  • 设置 type => "nginx" 标记 nginx 日志

  • 设置 type => "syslog" 标记系统日志

  • 在 filter 阶段可通过 if [type] == "nginx" 进行条件处理

Start_position

  • 行为说明

  • beginning:从文件开头读取,适用于首次导入历史日志

  • end:从文件末尾读取(默认值),适用于持续监控新日志

  • 注意事项:切换位置会导致重复读取,建议配合 sincedb 使用

Discover_interval

  • 工作机制

  • 定期扫描 path 模式匹配的新文件

  • 默认 15 秒的间隔是性能和实时性的平衡点

  • 调优建议

  • 高频扫描(如 5s):适用于日志文件快速轮转的场景

  • 低频扫描(如 30s):适用于稳定环境减少 IO 开销

Exclude

  • 排除规则

  • 支持 glob 模式匹配

  • 示例:exclude => "*.gz" 排除所有压缩文件

  • 典型应用

  • 排除归档日志:exclude => ["*.1", "*.bak"]

  • 排除临时文件:exclude => "*.tmp"

Close_older

  • 文件句柄管理

  • 默认 1 小时无更新则释放句柄

  • 影响:过长占用导致文件无法轮转,过短导致频繁重开

  • 特殊场景

  • 低频日志可设为 86400(1 天)

  • 高并发日志可设为 1800(30 分钟)

Sincedb

  • 元数据存储

  • 默认路径:/data/plugins/inputs/file/.sincedb_*

  • 记录内容:inode 号、设备号、读取位置等

  • 集群部署

  • 建议显式指定路径如:sincedb_path => "/var/lib/logstash/sincedb"

  • 避免多实例冲突

Sincedb_write_interval

  • 持久化策略

  • 默认 15 秒写入一次

  • 在 crash 时最多丢失 15 秒的读取进度

  • 可靠性权衡

  • 重要日志可设为 5 秒

  • 高性能场景可设为 30 秒

Start_interval

  • 文件检测机制

  • 默认每秒检查文件更新

  • 检测内容包括:文件大小变化、inode 变更等

  • 性能影响

  • 高频检测(0.1 秒):实时性高但 CPU 开销大

  • 低频检测(5 秒):适用于批量写入场景

最佳实践示例

input {  file {    path => ["/var/log/app/*.log", "/var/log/nginx/access.log"]    type => "combined_logs"    start_position => "beginning"    exclude => "*.swp"    close_older => 7200    sincedb_path => "/opt/logstash/sincedb/db"    discover_interval => 30  }}
复制代码

错误速查

其他系列

🚀 AI 篇持续更新中(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI 研究-132 Java 生态前沿 2025:Spring、Quarkus、GraalVM、CRaC 与云原生落地🔗 AI模块直达链接

💻 Java 篇持续更新中(长期更新)

Java-196 消息队列选型:RabbitMQ vs RocketMQ vs KafkaMyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务已完结,Dubbo 已完结,MySQL 已完结,MongoDB 已完结,Neo4j 已完结,FastDFS 已完结,OSS 已完结,GuavaCache 已完结,EVCache 已完结,RabbitMQ 正在更新... 深入浅出助你打牢基础!🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接

发布于: 2025-12-16阅读数: 2
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-185 Logstash 7 入门实战:stdin/file 采集、sincedb/start_position 机制与排障_Java_武子康_InfoQ写作社区