写点什么

kafka 日志写入 logstash

用户头像
Rubble
关注
发布于: 10 小时前
kafka日志写入logstash

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中

Logstash 是一个开源数据收集引擎,具有实时流水线功能。Logstash 可以动态统一来自不同来源的数据,并将数据规范化为您选择的目的地。为各种高级下游分析和可视化用例清理和民主化您的所有数据。



安装 Logstash

从下载的二进制文件安装

Logstash 二进制文件可从 www.elastic.co/cn/download…

下载适用于您的主机环境的 Logstash 安装文件 - TARG.GZ、DEB、ZIP 或 RPM。

启动 logstash

从控制台进行测试输入 stdin 输出 stdout

cd logstash-7.13.4bin/logstash -e 'input { stdin { } } output { stdout {} }'
复制代码

指定配置文件 --config.reload.automatic 自动重新加载配置文件

bin/logstash -f first-pipeline.conf --config.reload.automatic


配置文件主要有 3 部分 input filter output

Codec plugins

codec 插件可以在 input、output 流中处理数据,更改数据格式。

常用的 codec 有

  • json 读取 JSON 格式的内容,为 JSON 数组中的每个元素创建一个事件

  • json_lines 读取以换行符分隔的 JSON

  • plain 读取纯文本,事件之间没有分隔

  • mutiline 将多行消息合并为一个事件


将 kafka 日志消息输入到 logstash

topics 指定监听的 topic

input {  kafka {    id => "my_plugin_id"    bootstrap_servers => "localhost:9092"    topics => ["logger-channel"]    auto_offset_reset => "latest"  }}
filter { #json json { source => "message" } date { match => ["time", "yyyy-MM-dd HH:mm:ss.SSS"] remove_field => ["time"] }}
output { stdout {}}
复制代码


启动服务

./bin/logstash -f ./config/kafka-std.conf --config.reload.automatic

控制台接收到日志消息


{        "logger" => "com.paw.kafka.elk.controller.KafkaLogController",      "@version" => 1,        "thread" => "http-nio-8080-exec-7",      "levelVal" => 20000,    "@timestamp" => 2021-08-01T07:10:27.273Z,       "appName" => "paw-kelk",       "message" => "cost time: 23",           "env" => "dev",        "caller" => {          "file" => "KafkaLogController.java",        "method" => "kafka",         "class" => "com.paw.kafka.elk.controller.KafkaLogController",          "line" => 35    },         "level" => "INFO"}{        "logger" => "com.paw.kafka.elk.controller.KafkaLogController",      "@version" => 1,        "thread" => "http-nio-8080-exec-7",      "levelVal" => 10000,    "@timestamp" => 2021-08-01T07:10:27.273Z,       "appName" => "paw-kelk",       "message" => "debug time: 23",           "env" => "dev",        "caller" => {          "file" => "KafkaLogController.java",        "method" => "kafka",         "class" => "com.paw.kafka.elk.controller.KafkaLogController",          "line" => 36    },         "level" => "DEBUG"}
复制代码


至此 kafka 日志写入 logstash 完成。logsstash 作为 kafka 日志 topic 的一个消费端,kafka 将日志发往 logstash,logtash 以输入流方式结束日志数据,经过 filter 加工处理输出到输出流如 elasticsearch 中。


用户头像

Rubble

关注

还未添加个人签名 2021.06.01 加入

还未添加个人简介

评论

发布
暂无评论
kafka日志写入logstash