logstash 是 ElasticStack(ELK)的一个重要技术组件,用于对数据进行转换处理。他可以接受各种输入 源,并按照记录对数据进行变换,并导出到输出源中。
安装
docker pull docker.elastic.co/logstash/logstash
复制代码
quick start
简单的输入一行内容,并发送给远程的 elastic search 服务器
docker run -it --rm logstash -e 'input { stdin { } }filter { json {source => "message"} }output {elasticsearch{ hosts=>["x.x.x.x"] }stdout {}}'
复制代码
输入源
#从csv文件读取file { path => "/data/ELK/data/*.csv" }#从kafka中读取kafka { topic_id => 'topic_name'; zk_connect => '${zookeeper的地址}:2181/kafka'}
复制代码
常见的 filter
#读取csv,并设置表头csv{columns =>[ "log_time", "real_ip", "status", "http_user_agent"]#读取json数据json {source => "message"}
复制代码
输出源
elasticsearch{ hosts=>["x.x.x.x"] }stdout {}
复制代码
完整配置实例
读取 csv 数据
input {file {path => "/data/ELK/data/*.csv"start_position => beginning}}filter {csv{columns =>[ "log_time", "real_ip", "status", "http_user_agent"]}date {match => ["log_time", "yyyy-MM-dd HH:mm:ss"]}}output {elasticsearch {}}
复制代码
读取 kafka 数据
input {kafka {topic_id => 'topic_name'zk_connect => '${zookeeper的地址}:2181/kafka'}}filter {csv{separator => "|"columns => [ "host", "request", "http_user_agent"]}date {match => ["log_time", "yyyy-MM-dd HH:mm:ss"]}}output {elasticsearch {index => "logstash-topic-%{+YYYY.MM.dd}"}}
复制代码
Filebeat 是一个高性能的日志采集框架,它主要是以 agent 模式工作,特点是高性能。用以解决 logstash 的性能问题,一般我们都会把数据先借助于 filebeat 采集,并存到 redis 里,再由 logstash 对数据进行 编辑变换,再输出到 es 中。
搜索微信公众号:TestingStudio 霍格沃兹的干货都很硬核
评论