上文通过配置已经连接了上了 springboot 项目,但是现在市场环境一般都是微服务项目,所以需要 logstash 监控多个服务,同时也需要将监控过来的日志转换成固定的格式,方便我们接下来的查询和整理。下文将介绍如何配置。
一.监控多服务器
1.配置背景
从两台服务器(服务器名称分别 A,B)上分别获取日志,且日志索引不同。
A 服务日志进行拦截,如果没有带"登录日志"字样的日志丢弃。
2.配置知识点
判断语句,if 写法。
过滤器使用。
通过日期生成相同业务的不同索引。(可以通过 index template 查询)
3.修改 log_to_es.conf
修改/logstash/bin 中 log_to_es.conf 文件,具体配置见下文。
input{
//从服务器A获取日志,类型为A
tcp {
mode => "server"
host => "0.0.0.0"
port => 5000
codec => json_lines
type=> "A"
}
//从服务器B获取日志,类型为B
tcp {
mode => "server"
host => "0.0.0.0"
port => 4999
codec => json_lines
type=> "B"
}
}
//过滤器,如果为A类型,且没有带"登录日志"字样的日志丢弃。
filter{
if[type] == "A"{
if([message] =~ "^(?!.*?登录日志).*$") {
### 丢弃
drop{}
}
}
}
output{
//将A类型的日志传输给elasticsearch,且命名索引为A-当前日期
if[type] == "A"{
elasticsearch{
hosts=>["192.168.XX.XX:9200"]
index => "A-%{+YYYY.MM.dd}"
}
}
//将B类型的日志传输给elasticsearch,且命名索引为B-当前日期
if[type] == "B"{
elasticsearch{
hosts=>["192.168.XX.XX:9200"]
index => "B-%{+YYYY.MM.dd}"
}
}
}
复制代码
二.将日志拆分成规范的 Documents
看到上文的可以知道,springboot 打印的日志为已|分隔开来的字符集合,所以为了使用条件查询,我们将该字符串解析出来,并组成规范的 Documents。
1.修改 log_to_es.conf
修改/logstash/bin 中 log_to_es.conf 文件,在 filter 中添加以下代码,在具体配置见下文。
grok {
match => {"message" => "\|%{DATA:userName}\|%{GREEDYDATA:operationName}\|%{DATA:timeFormat}\|%{DATA:ip}\|%{DATA:systemType}\|%{GREEDYDATA:logType}\|%{GREEDYDATA:method}\|%{GREEDYDATA:input}"}
}
复制代码
上文的 %{DATA:systemType} 第一个参数为数据类型,第二个参数为值的 key。
DATA:为文字类型 不包含_,:等特殊符号
GREEDYDATA:为文字类型 可以包含_,:等符号
如果时间类型 如 2021-01-19 17:28:41 匹配不到 ,需要更改为 GREEDYDATA
前文 springboot 输出的日志为|test|日志-日志|2021-01-19 17:31:06|192.168.xx.xx|信息平台|接口日志|/system/LoginLog/selectLog|{"beginTime":"2021-01-19 17:28:41","endTime":"2021-01-19 17:28:43"},分隔方式是与前文向对应的。
2.测试配置
在保存配置后可以使用 kibana 测试是否可以成功分隔。
如果匹配成功会出现上图示例,否则会报错。
三.logstash 中 @timestamp 时区早 8 个小时解决方案
在博主的测试过程中,发现 @timestamp 的值会比真实时间早 8 个小时,所以在收集日志时,在上文配置文件的 filter 加入了以下配置,将时间弥补过来。
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
复制代码
四.logstash 将 @timestamp 映射为其他字段
博主在使用 springboot 查询 elasticsearch 时遇到一个问题,想使用时间进行筛选,但是在 spring boot 中 @timestamp 获取不到。所以博主的解决方案是将 @timestamp 复制一个字段。
在上文配置文件的 filter 加入了以下配置。就可以解决以上问题。
ruby {
code => "event['time'] = event['@timestamp']"
}
mutate
{
add_field => ["time", "%{@timestamp}"]
}
复制代码
这样就会会在索引添加一个 time 字段且类型为 date。
五.全配置解析
通过以上讲解,大家已经了解每种配置的使用方式了。下面将讲解一下博主的业务需求与实现配置。
博主这有现有两台服务器,loginlog 为认证服务器,打印的是登录日志,datalog 业务服务器,打印的是接口日志。所以打印的日志格式不同,需要判断做不同的解析。所以有此需求的小伙伴可以参考下文配置。
input{
tcp {
mode => "server"
host => "0.0.0.0"
port => 5000
codec => json_lines
type=> "datalog"
}
tcp {
mode => "server"
host => "0.0.0.0"
port => 4999
codec => json_lines
type=> "loginlog"
}
}
filter{
if[type] == "loginlog"{
grok {
match => {"message" => "\|%{GREEDYDATA:loginMsg}\|%{GREEDYDATA:timeFormat}\|%{GREEDYDATA:userName}"}
}
if([message] =~ "^(?!.*?登录系统).*$") {
### 丢弃
drop{}
}
}
if[type] == "datalog"{
grok {
match => {"message" => "\|%{DATA:userName}\|%{GREEDYDATA:operationName}\|%{DATA:timeFormat}\|%{DATA:ip}\|%{DATA:systemType}\|%{GREEDYDATA:logType}\|%{GREEDYDATA:method}\|%{GREEDYDATA:input}"}
}
}
ruby {
code => "event['time'] = event['@timestamp']"
}
mutate
{
add_field => ["time", "%{@timestamp}"]
}
}
output{
if[type] == "datalog"{
elasticsearch{
hosts=>["192.168.xx.xx:9200"]
user => "elastic"
password => "xxxx"
index => "xxxx-%{+YYYY.MM.dd}"
}
}
if[type] == "loginlog"{
elasticsearch{
hosts=>["192.168.xx.xx:9200"]
user => "elastic"
password => "xxxx"
index => "xxxx-%{+YYYY.MM.dd}"
}
}
}
复制代码
评论