1、Logstash 简介
Logstash 作为通用的数据转化工具,具备着动态采集、转化和传输数据的能力,可以灵活配置性和拓展性,其中利用 Grok 功能可将非结构化数据转化诚结构化数据
2、Logstash 配置结构
通常对于 Logstash 需要配置 input, filter, output 三块内容,分别对应着数据的流入篇日志,数据过滤以及处理,以及数据流出转化,样例如下:
input { stdin { type => "web" }}
filter { if [type] == "web" { grok { match => ["message", %{COMBINEDAPACHELOG}] } }}
output { if "_grokparsefailure" in [tags] { nagios_nsca { nagios_status => "1" } } else { elasticsearch { } }}
复制代码
3、场景化解决方案
3.1、基于上游数据标识进行不同的解析
Logstash 提供了多 pipeline 和 pipeline-to-pipeline 方式进行支持,每个 pipeline 可以独立配置处理方式。如下配置为根据流入数据中 type 字段的值,后续分别由不同 pipeline 进行处理,并且每个处理逻辑都是可以独立描述。样例如下
# config/pipelines.yml- pipeline.id: beats-server config.string: | input { beats { port => 5044 } } output { if [type] == apache { pipeline { send_to => weblogs } } else if [type] == system { pipeline { send_to => syslog } } else { pipeline { send_to => fallback } } }- pipeline.id: weblog-processing config.string: | input { pipeline { address => weblogs } } filter { # Weblog filter statements here... } output { elasticsearch { hosts => [es_cluster_a_host] } }- pipeline.id: syslog-processing config.string: | input { pipeline { address => syslog } } filter { # Syslog filter statements here... } output { elasticsearch { hosts => [es_cluster_b_host] } }- pipeline.id: fallback-processing config.string: | input { pipeline { address => fallback } } output { elasticsearch { hosts => [es_cluster_b_host] } }
复制代码
3.2、配置自动加载
为了可以自动检测配置文件的变动和自动重新加载配置文件,需要在启动的时候使用以下命令:
./bin/lagstash -f configfile.conf --config.reload.automatic
复制代码
默认,检测配置文件的间隔时间是 3 秒,可以通过以下命令改变
--config.reload.interval <second>
复制代码
评论