写点什么

大数据 ELK(十九):使用 FileBeat 采集 Kafka 日志到 Elasticsearch

作者:Lansonli
  • 2022 年 10 月 09 日
    广东
  • 本文字数:3504 字

    阅读完需:约 11 分钟

大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch

使用 FileBeat 采集 Kafka 日志到 Elasticsearch

一、需求分析

在资料中有一个 kafka_server.log.tar.gz 压缩包,里面包含了很多的 Kafka 服务器日志,现在我们为了通过在 Elasticsearch 中快速查询这些日志,定位问题。我们需要用 FileBeats 将日志数据上传到 Elasticsearch 中。


问题:


  • 首先,我们要指定 FileBeat 采集哪些 Kafka 日志,因为 FileBeats 中必须知道采集存放在哪儿的日志,才能进行采集。

  • 其次,采集到这些数据后,还需要指定 FileBeats 将采集到的日志输出到 Elasticsearch,那么 Elasticsearch 的地址也必须指定。

二、配置 FileBeats

FileBeats 配置文件主要分为两个部分。


  • inputs

  • output


从名字就能看出来,一个是用来输入数据的,一个是用来输出数据的。

1、input 配置

filebeat.inputs:- type: log  enabled: true  paths:    - /var/log/*.log    #- c:\programdata\elasticsearch\logs\*
复制代码


在 FileBeats 中,可以读取一个或多个数据源。


2、output 配置


默认 FileBeat 会将日志数据放入到名称为:filebeat-%filebeat 版本号 %-yyyy.MM.dd 的索引中。


PS:


FileBeats 中的 filebeat.reference.yml 包含了 FileBeats 所有支持的配置选项。

三、配置文件

1、创建配置文件

cd /export/server/es/filebeat-7.6.1-linux-x86_64vim filebeat_kafka_log.yml
复制代码

2、一下到配置文件中

filebeat.inputs:- type: log  enabled: true  paths:    - /export/server/es/data/kafka/server.log.*
output.elasticsearch: hosts: ["node1:9200", "node2:9200", "node3:9200"]
复制代码

四、运行 FileBeat

1、运行 FileBeat

./filebeat -c filebeat_kafka_log.yml -e
复制代码

2、将日志数据上传到/var/kafka/log,并解压

mkdir -p /export/server/es/data/kafka/
tar -xvzf kafka_server.log.tar.gz
复制代码


注意: 文件权限的报错


如果在启动 fileBeat 的时候, 报了一个配置文件权限的错误, 请修改其权限为 -rw-r--r--

五、查询数据

1、查看索引信息

GET /_cat/indices?v


    {        "health": "green",        "status": "open",        "index": "filebeat-7.6.1-2021.12.05-000001",        "uuid": "dplqB_hTQq2XeSk6S4tccQ",        "pri": "1",        "rep": "1",        "docs.count": "213780",        "docs.deleted": "0",        "store.size": "71.9mb",        "pri.store.size": "35.8mb"    }
复制代码


GET /filebeat-7.6.1-2021.12.05-000001/_search


            {                "_index": "filebeat-7.6.1-2021.12.05-000001",                "_type": "_doc",                "_id": "-72pX3IBjTeClvZff0CB",                "_score": 1,                "_source": {                    "@timestamp": "2021-12-05T09:00:40.041Z",                    "log": {                        "offset": 55433,                        "file": {                            "path": "/var/kafka/log/server.log.2021-12-05-16"                        }                    },                    "message": "[2021-12-05 09:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)",                    "input": {                        "type": "log"                    },                    "ecs": {                        "version": "1.4.0"                    },                    "host": {                        "name": "node1"                    },                    "agent": {                        "id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64",                        "version": "7.6.1",                        "type": "filebeat",                        "ephemeral_id": "b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63",                        "hostname": "node1"                    }                }            }
复制代码


FileBeat 自动给我们添加了一些关于日志、采集类型、Host 各种字段。

六、解决一个日志涉及到多行问题

我们在日常日志的处理中,经常会碰到日志中出现异常的情况。类似下面的情况:


[2021-12-05 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread)java.io.IOException: Connection to node2:9092 (id: 1 rack: null) failed.        at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)        at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)        at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)        at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2021-12-05 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)[2021-12-05 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
复制代码


在 FileBeat 中,Harvest 是逐行读取日志文件的。但上述的日志会出现一条日志,跨多行的情况。有异常信息时,肯定会出现多行。我们先来看一下,如果默认不处理这种情况会出现什么问题。

1、导入错误日志

1)在/export/server/es/data/kafka/中创建名为 server.log.2021-12-05 的日志文件


2)将资料中的 err.txt 日志文本贴入到该文件中


观察 FileBeat,发现 FileBeat 已经针对该日志文件启动了 Harvester,并读取到数据数据。


2021-12-05T19:11:01.236+0800    INFO    log/harvester.go:297    Harvester started for file: /var/kafka/log/server.log.2021-12-05
复制代码


3)在 Elasticsearch 检索该文件


我们发现,原本是一条日志中的异常信息,都被作为一条单独的消息来处理了~


"message":"java.io.IOException:Connection to node2:9092 (id;


这明显是不符合我们的预期的,我们想要的是将所有的异常消息合并到一条日志中。那针对这种情况该如何处理呢?

2、问题分析

每条日志都是有统一格式的开头的,就拿 Kafka 的日志消息来说,2021-12-05 14:00:05,725 这是一个统一的格式,如果不是以这样的形式开头,说明这一行肯定是属于某一条日志,而不是独立的一条日志。所以,我们可以通过日志的开头来判断某一行是否为新的一条日志。

3、FileBeat 多行配置选项

在 FileBeat 的配置中,专门有一个解决一条日志跨多行问题的配置。主要为以下三个配置:


multiline.pattern: ^\[multiline.negate: falsemultiline.match: after
复制代码


multiline.pattern 表示能够匹配一条日志的模式,默认配置的是以[开头的才认为是一条新的日志。


multiline.negate:配置该模式是否生效,默认为 false。


multiline.match:表示是否将未匹配到的行追加到上一日志,还是追加到下一个日志。

4、重新配置 FileBeat

1)修改 filebeat.yml,并添加以下内容


filebeat.inputs:- type: log  enabled: true  paths:    - /var/kafka/log/server.log.*  multiline.pattern: '^\['  multiline.negate: true  multiline.match: after
output.elasticsearch: hosts: ["node1:9200", "node2:9200", "node3:9200"]
复制代码


2)修改「注册表」/data.json,将 server.log.2021-12-05 对应的 offset 设置为 0


cd /export/server/es/filebeat-7.6.1-linux-x86_64/data/registry/filebeat
vim data.json
复制代码


3)删除之前创建的文档


// 删除指定文件的文档POST /filebeat-7.6.1-2021.12.05-000001/_delete_by_query{    "query": {        "match": {            "log.file.path": "/var/kafka/log/server.log.2021-12-05"        }    }}
复制代码


4)重新启动 FileBeat


./filebeat -e
复制代码


发布于: 刚刚阅读数: 7
用户头像

Lansonli

关注

微信公众号:三帮大数据 2022.07.12 加入

CSDN大数据领域博客专家,华为云享专家、阿里云专家博主、腾云先锋(TDP)核心成员、51CTO专家博主,全网六万多粉丝,知名互联网公司大数据高级开发工程师

评论

发布
暂无评论
大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch_Filebeat_Lansonli_InfoQ写作社区