1、Logstash 简介
Logstash 作为通用的数据转化工具,具备着动态采集、转化和传输数据的能力,可以灵活配置性和拓展性,其中利用 Grok 功能可将非结构化数据转化诚结构化数据
2、Logstash 配置结构
通常对于 Logstash 需要配置 input, filter, output 三块内容,分别对应着数据的流入篇日志,数据过滤以及处理,以及数据流出转化,样例如下:
input {
stdin {
type => "web"
}
}
filter {
if [type] == "web" {
grok {
match => ["message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status => "1"
}
} else {
elasticsearch {
}
}
}
复制代码
3、场景化解决方案
3.1、基于上游数据标识进行不同的解析
Logstash 提供了多 pipeline 和 pipeline-to-pipeline 方式进行支持,每个 pipeline 可以独立配置处理方式。如下配置为根据流入数据中 type 字段的值,后续分别由不同 pipeline 进行处理,并且每个处理逻辑都是可以独立描述。样例如下
# config/pipelines.yml
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [type] == apache {
pipeline { send_to => weblogs }
} else if [type] == system {
pipeline { send_to => syslog }
} else {
pipeline { send_to => fallback }
}
}
- pipeline.id: weblog-processing
config.string: |
input { pipeline { address => weblogs } }
filter {
# Weblog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_a_host] }
}
- pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
# Syslog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_b_host] }
}
- pipeline.id: fallback-processing
config.string: |
input { pipeline { address => fallback } }
output { elasticsearch { hosts => [es_cluster_b_host] } }
复制代码
3.2、配置自动加载
为了可以自动检测配置文件的变动和自动重新加载配置文件,需要在启动的时候使用以下命令:
./bin/lagstash -f configfile.conf --config.reload.automatic
复制代码
默认,检测配置文件的间隔时间是 3 秒,可以通过以下命令改变
--config.reload.interval <second>
复制代码
评论