写点什么

大数据 ELK(二十二):采集 Apache Web 服务器日志

作者:Lansonli
  • 2022-10-12
    广东
  • 本文字数:8049 字

    阅读完需:约 26 分钟

大数据ELK(二十二):采集Apache Web服务器日志

采集 Apache Web 服务器日志

一、需求

Apache 的 Web Server 会产生大量日志,当我们想要对这些日志检索分析。就需要先把这些日志导入到 Elasticsearch 中。此处,我们就可以使用 Logstash 来实现日志的采集


打开这个文件,如下图所示。我们发现,是一个纯文本格式的日志。如下图所示:



这个日志其实由一个个的字段拼接而成,参考以下表格



因为最终我们需要将这些日志数据存储在 Elasticsearch 中,而 Elasticsearch 是有模式(schema)的,而不是一个大文本存储所有的消息,而是需要将字段一个个的保存在 Elasticsearch 中。所以,我们需要在 Logstash 中,提前将数据解析好,将日志文本行解析成一个个的字段,然后再将字段保存到 Elasticsearch 中

二、准备日志数据

将 Apache 服务器日志上传到 /export/server/es/data/apache/ 目录


mkdir -p /export/server/es/data/apache/
复制代码

三、使用 FileBeats 将日志发送到 Logstash

在使用 Logstash 进行数据解析之前,我们需要使用 FileBeat 将采集到的数据发送到 Logstash。之前,我们使用的 FileBeat 是通过 FileBeat 的 Harvester 组件监控日志文件,然后将日志以一定的格式保存到 Elasticsearch 中,而现在我们需要配置 FileBeats 将数据发送到 Logstash。FileBeat 这一端配置以下即可:


#----------------------------- Logstash output ---------------------------------#output.logstash:  # Boolean flag to enable or disable the output module.  #enabled: true
# The Logstash hosts #hosts: ["localhost:5044"]
复制代码


hosts 配置的是 Logstash 监听的 IP 地址/机器名以及端口号。


准备 FileBeat 配置文件


cd /export/server/es/filebeat-7.6.1-linux-x86_64
vim filebeat-logstash.yml
复制代码


因为 Apache 的 web log 日志都是以 IP 地址开头的,所以我们需要修改下匹配字段


filebeat.inputs:- type: log  enabled: true  paths:    - /var/apache/log/access.*  multiline.pattern: '^\d+\.\d+\.\d+\.\d+ '  multiline.negate: true  multiline.match: after
output.logstash: enabled: true hosts: ["node1:5044"]
复制代码


启动 FileBeat,并指定使用新的配置文件


./filebeat -e -c filebeat-logstash.yml
复制代码


FileBeat 将尝试建立与 Logstash 监听的 IP 和端口号进行连接。但此时,我们并没有开启并配置 Logstash,所以 FileBeat 是无法连接到 Logstash 的。


2021-12-05T11:28:47.585+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused
复制代码

四、配置 Logstash 接收 FileBeat 数据并打印

Logstash 的配置文件和 FileBeat 类似,它也需要有一个 input、和 output。基本格式如下:


# #号表示添加注释# input表示要接收的数据input {}
# file表示对接收到的数据进行过滤处理filter {
}
# output表示将数据输出到其他位置output {}
复制代码


配置从 FileBeat 接收数据


cd /export/server/es/logstash-7.6.1/configvim filebeat-print.conf
复制代码


input {  beats {    port => 5044  }}
output { stdout { codec => rubydebug }}
复制代码


测试 logstash 配置是否正确


bin/logstash -f config/filebeat-print.conf --config.test_and_exit
复制代码


[2021-12-05T11:46:33,940][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
复制代码


启动 logstash


bin/logstash -f config/filebeat-print.conf --config.reload.automatic
复制代码


reload.automatic:修改配置文件时自动重新加载


测试


创建一个 access.log.1 文件,使用 cat test >> access.log.1 往日志文件中追加内容。


test 文件中只保存一条日志:


[root@node1 log]# cat test 235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249
复制代码


当我们启动 Logstash 之后,就可以发现 Logstash 会打印出来从 FileBeat 接收到的数据:


{           "log" => {          "file" => {            "path" => "/var/apache/log/access.log.1"        },        "offset" => 825    },         "input" => {        "type" => "log"    },         "agent" => {        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",             "version" => "7.6.1",                "type" => "filebeat",                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",            "hostname" => "node1.itcast.cn"    },    "@timestamp" => 2021-12-05T09:07:55.236Z,           "ecs" => {        "version" => "1.4.0"    },          "host" => {        "name" => "node1"    },          "tags" => [        [0] "beats_input_codec_plain_applied"    ],       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",      "@version" => "1"}
复制代码

五、Logstash 输出数据到 Elasticsearch

通过控制台,我们发现 Logstash input 接收到的数据没有经过任何处理就发送给了 output 组件。而其实我们需要将数据输出到 Elasticsearch。所以,我们修改 Logstash 的 output 配置。配置输出 Elasticsearch 只需要配置以下就可以了:


output {    elasticsearch {        hosts => [ "localhost:9200" ]    }}
复制代码


操作步骤:

1、重新拷贝一份配置文件

cp filebeat-print.conf filebeat-es.conf
复制代码

2、将 output 修改为 Elasticsearch

input {  beats {    port => 5044  }}
output { elasticsearch { hosts => [ "node1:9200","node2:9200","node3:9200"] }}
复制代码

3、重新启动 Logstash

bin/logstash -f config/filebeat-es.conf --config.reload.automatic
复制代码

4、追加一条日志到监控的文件中,并查看 Elasticsearch 中的索引、文档

cat test >> access.log.1 
复制代码


// 查看索引数据


GET /_cat/indices?v


我们在 Elasticsearch 中发现一个以 logstash 开头的索引


    {        "health": "green",        "status": "open",        "index": "logstash-2021.12.05-000001",        "uuid": "147Uwl1LRb-HMFERUyNEBw",        "pri": "1",        "rep": "1",        "docs.count": "2",        "docs.deleted": "0",        "store.size": "44.8kb",        "pri.store.size": "22.4kb"    }
复制代码


// 查看索引库的数据


GET /logstash-2021.12.05-000001/_search?format=txt


{
"from": 0,
"size": 1
}
复制代码


我们可以获取到以下数据:


                    "@timestamp": "2021-12-05T09:38:00.402Z",                    "tags": [                        "beats_input_codec_plain_applied"                    ],                    "host": {                        "name": "node1"                    },                    "@version": "1",                    "log": {                        "file": {                            "path": "/var/apache/log/access.log.1"                        },                        "offset": 1343                    },                    "agent": {                        "version": "7.6.1",                        "ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",                        "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",                        "hostname": "node1",                        "type": "filebeat"                    },                    "input": {                        "type": "log"                    },                    "message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",                    "ecs": {                        "version": "1.4.0"                    }
复制代码


从输出返回结果,我们可以看到,日志确实已经保存到了 Elasticsearch 中,而且我们看到消息数据是封装在名为 message 中的,其他的数据也封装在一个个的字段中。我们其实更想要把消息解析成一个个的字段。例如:IP 字段、时间、请求方式、请求 URL、响应结果,这样

六、Logstash 过滤器

在 Logstash 中可以配置过滤器 Filter 对采集到的数据进行中间处理,在 Logstash 中,有大量的插件供我们使用。参考官网:


[Filter plugins | Logstash Reference 7.6] | Elastic


此处,我们重点来讲解 Grok 插件。

1、查看 Logstash 已经安装的插件

bin/logstash-plugin list
复制代码

2、Grok 插件

Grok 是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web 服务器日志、MySQL 或者是任意其他的日志格式。


Grok 官网:[Grok filter plugin | Logstash Reference 7.6] | Elastic

3、Grok 语法

Grok 是通过模式匹配的方式来识别日志中的数据,可以把 Grok 插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash 拥有 120 个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。


官网:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns


grok 模式的语法是:%{SYNTAX:SEMANTIC}


SYNTAX 指的是 Grok 模式名称,SEMANTIC 是给模式匹配到的文本字段名。例如:


%{NUMBER:duration} %{IP:client}


duration 表示:匹配一个数字,client 表示匹配一个 IP 地址


默认在 Grok 中,所有匹配到的的数据类型都是字符串,如果要转换成 int 类型(目前只支持 int 和 float),可以这样:%{NUMBER:duration:int} %{IP:client}


以下是常用的 Grok 模式:


4、用法


    filter {      grok {        match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }      }    }
复制代码

七、匹配日志中的 IP、日期并打印

235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249
复制代码


我们使用 IP 就可以把前面的 IP 字段匹配出来,使用 HTTPDATE 可以将后面的日期匹配出来


配置 Grok 过滤插件

1、配置 Logstash

input {    beats {        port => 5044    }}
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\]" } } }
output { stdout { codec => rubydebug }}
复制代码

2、启动 Logstash

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
复制代码


{           "log" => {        "offset" => 1861,          "file" => {            "path" => "/var/apache/log/access.log.1"        }    },         "input" => {        "type" => "log"    },          "tags" => [        [0] "beats_input_codec_plain_applied"    ],          "date" => "15/Apr/2015:00:27:19 +0849",           "ecs" => {        "version" => "1.4.0"    },    "@timestamp" => 2021-12-05T11:02:05.809Z,       "message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249",          "host" => {        "name" => "node1"    },            "ip" => "235.9.200.242",         "agent" => {            "hostname" => "node1",             "version" => "7.6.1",        "ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05",                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",                "type" => "filebeat"    },      "@version" => "1"}
复制代码


我们看到,经过 Grok 过滤器插件处理之后,我们已经获取到了 ip 和 date 两个字段。接下来,我们就可以继续解析其他的字段

八、解析所有字段

将日志解析成以下字段:


1、修改 Logstash 配置文件

input {    beats {        port => 5044    }}
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status} %{INT:length} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } }
output { stdout { codec => rubydebug }}
复制代码

2、测试并启动 Logstash

我们可以看到,8 个字段都已经成功解析。


{     "reference" => "www.baidu.com",      "@version" => "1",           "ecs" => {        "version" => "1.4.0"    },    "@timestamp" => 2021-12-05T03:30:10.048Z,            "ip" => "235.9.200.241",        "method" => "POST",           "uri" => "/it.cn/bigdata.html",         "agent" => {                  "id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",        "ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972",             "version" => "7.6.1",            "hostname" => "node1",                "type" => "filebeat"    },        "length" => "45",        "status" => "200",           "log" => {          "file" => {            "path" => "/var/apache/log/access.log"        },        "offset" => 1    },         "input" => {        "type" => "log"    },          "host" => {        "name" => "node1"    },          "tags" => [        [0] "beats_input_codec_plain_applied"    ],       "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",          "date" => "15/Apr/2015:00:27:19 +0849",       "message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html HTTP/1.1\" 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900\""}
复制代码

九、将数据输出到 Elasticsearch

到目前为止,我们已经通过了 Grok Filter 可以将日志消息解析成一个一个的字段,那现在我们需要将这些字段保存到 Elasticsearch 中。我们看到了 Logstash 的输出中,有大量的字段,但如果我们只需要保存我们需要的 8 个,该如何处理呢?而且,如果我们需要将日期的格式进行转换,我们又该如何处理呢?

1、过滤出来需要的字段

要过滤出来我们需要的字段。我们需要使用 mutate 插件。mutate 插件主要是作用在字段上,例如:它可以对字段进行重命名、删除、替换或者修改结构。



官方文档:[Mutate filter plugin | Logstash Reference 7.6] | Elastic


例如,mutate 插件可以支持以下常用操作



配置:


注意:此处为了方便进行类型的处理,将 status、length 指定为 int 类型


input {    beats {        port => 5044    }}
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } mutate { enable_metric => "false" remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"] }}
output { stdout { codec => rubydebug }}
复制代码

2、转换日期格式

要将日期格式进行转换,我们可以使用 Date 插件来实现。该插件专门用来解析字段中的日期,官方说明文档:[Date filter plugin | Logstash Reference 7.6] | Elastic


用法如下:



将 date 字段转换为「年月日 时分秒」格式。默认字段经过 date 插件处理后,会输出到 @timestamp 字段,所以,我们可以通过修改 target 属性来重新定义输出字段。


Logstash 配置修改为如下:


input {    beats {        port => 5044    }}
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } mutate { enable_metric => "false" remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"] } date { match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] target => "date" }}
output { stdout { codec => rubydebug }}
复制代码


启动 Logstash:


bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
复制代码


{       "status" => "200",    "reference" => "www.baidu.com",       "method" => "POST",      "browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900",           "ip" => "235.9.200.241",       "length" => "45",          "uri" => "/it.cn/bigdata.html",         "date" => 2015-04-14T15:38:19.000Z}
复制代码

3、输出到 Elasticsearch 指定索引

我们可以通过


elasticsearch {
hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]
index => "xxx"
}
复制代码


index 来指定索引名称,默认输出的 index 名称为:logstash-%{+yyyy.MM.dd}。但注意,要在 index 中使用时间格式化,filter 的输出必须包含 @timestamp 字段,否则将无法解析日期。


input {    beats {        port => 5044    }}
filter { grok { match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" } } mutate { enable_metric => "false" remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"] } date { match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"] target => "date" }}
output { stdout { codec => rubydebug } elasticsearch { hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"] index => "apache_web_log_%{+YYYY-MM}" }}
复制代码


启动 Logstash


bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exit
bin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic
复制代码


注意:


  • index 名称中,不能出现大写字符


发布于: 刚刚阅读数: 3
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022-07-12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
大数据ELK(二十二):采集Apache Web服务器日志_ELK_Lansonli_InfoQ写作社区