ELK 太重?试试 KFC 日志采集,2021 大厂 Java 面试经验
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcMySyslogs'
fields_under_root: true
调试输出
#output.console:
pretty: true
#output.file:
path: "D:/bigData"
filename: filebeat.log
-------------------------------- Kafka Output --------------------------------
output.kafka:
Boolean flag to enable or disable the output module.
enabled: true
hosts: ["192.168.1.10:9092"]
The Kafka topic used for produced events. The setting can be a format string
using any event field. To set the topic from document type use `%{[type]}`.
topic: '%{[topic]}'
Authentication details. Password is required if username is set.
#username: ''
#password: ''
The number of concurrent load-balanced Kafka output workers.
worker: 2
max_message_bytes: 10000000
================================= Processors =================================
processors:
add_host_metadata:
when.not.contains.tags: forwarded
add_cloud_metadata: ~
add_docker_metadata: ~
add_kubernetes_metadata: ~
script:
lang: javascript
id: my_filter
tag: enable
source: >
function process(event) {
var str = event.Get("message");
var sp = str.split(" ");
var log_datetime = sp.slice(0,2).join(" ");
var regEx = /^\d{4}-\d{2}-\d{2}$/;
var prefix_date = log_datetime.substring(0, 10);
if(prefix_date.match(regEx) != null)
{
event.Put("server","221");
log_datetime = log_datetime.replace(",",".");
log_datetime = log_datetime.replace("'","");
regEx = /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}$/;
if(log_datetime.match(regEx) != null)
{
event.Put("log_datetime",log_datetime);
event.Put("log_index",sp.slice(2,3).join(" ").replace("[","").replace("]",""));
event.Put("log_level",sp.slice(3,4).join(" "));
if(str.match(/(?<=time:)\S*(?=ms)/)!=null)
{
var spTime= str.split("time:");
var spPre = spTime[0].split(" ");
var spNext = spTime[1].split(" ");
event.Put("log_class",spPre.slice(4).join(" "));
var log_execTime= spNext.slice(0,1).join(" ").replace("ms","");
regEx = /^(\-|\+)?\d+(\.\d+)?$/;
if(regEx.test(log_execTime))
{
event.Put("log_execTime",log_execTime);
}
else
{
event.Put("log_execTime","-1");
}
event.Put("log_message",spNext.slice(1).join(" "));
}
else
{
event.Put("log_class",sp.slice(4,5).join(" "));
event.Put("log_execTime","-1");
event.Put("log_message",sp.slice(6).join(" "));
}
return;
}
}
event.Cancel();
}
drop_fields:
fields: ["@timestamp", "message", "host", "ecs", "agent", "@metadata", "log", "input"]
以上的配置说明:
max_procs:设置可以同时执行的最大 CPU 数;
queue :内部队列信息;
Filebeat inputs:日志数据源采集的入口;
其他字段如下说明:
#日志类型
type: log
#开启
enabled: true
#编码格式,有中文必须设置
encoding: GB2312
#路径
paths:
D:/IIS/www.A.com/logs/*.txt
#多行匹配前缀
multiline.pattern: '^\d{4}-\d{1,2}-
\d{1,2}'
#开启多行匹配
multiline.negate: true
#开启多行之后,匹配是合并到上一条信息
multiline.match: after
#增加一个字段,用于 kafka 的 topic 的识别
fields:
topic: 'dlbZcZGBSyslogs'
字段增加在输出 json 的根目录下
fields_under_root: true
//https://www.cnblogs.com/EminemJK/p/15165961.html
Kafka Output:kafka 的配置信息,主要是??topic: '%{[topic]}'?的设置,因为这里采集多个数据源,对于不同的 topic,在数据源输入的时候,已经设置好字段如?topic: 'dlbZcZGBSyslogs'?,所以此处使用占位符灵活设置;
Processors:配置处理器,即对采集的日志信息进行处理,处理是按行处理,当字符串处理即可,可以使用 js 语法进行对字符串进行处理;Filebeat 的处理器可以多种多样,具体可以看文档。
另外,在调试的时候,可以采用文件输出或 Console 输出来观察处理后输出的数据格式,再进行微调:
output.file:
path: "D:/bigData"
filename: filebeat.log
IIS 的日志也差不多,只是微调处理逻辑就可以了,一通百通。
其他配置可以参考官网文档:h__ttps://www.elastic.co/guide/en/beats/filebeat/current/index.html
Kafka 配置
Kafka 没有特别的处理,在这里只是进行消息的接收,新建好主题就可以。
//个人系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcMySyslogs
//单位系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcDWSyslogs
//管理系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcZGBSyslogs
partitions?分区数的大小,取决设置了多少个消费者,这里我们有三台服务器做了 Clickhouse 的集群作为消费者,所以分区数设置为 3,一般情况,消费者总数不应该大于分区数,每个分区只能分配一个消费者。
Clickhouse 配置
Clickhouse 三个分片的集群,如果你是单机的,只需要把语法相应的修改一下即可。
在每台服务器上创建 kafka 引擎表:
CREATE TABLE kafka_dlb_ZC_My_syslogs (
log_datetime DateTime64,
log_index String,
log_level String,
log_class String,
log_message String,
log_execTime Float32,
server String
) ENGINE = Kafka
SETTINGS kafka_broker_list = '192.168.1.10:9092',
kafka_topic_list = 'dlbZcMySyslogs',
kafka_group_name = 'dlbZcMySyslogs_sys',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
创建实体表:
CREATE TABLE dlb_ZC_My_syslogs on cluster cluster_3s_1r
(
log_datetime DateTime64,
log_index String,
log_level String,
log_class String,
log_message String,
log_execTime Float32,
server String
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/dlb_ZC_My_syslogs', '{replica}')
ORDER BY toDate(log_datetime)
PARTITION BY toYYYYMM(log_datetime);
//https://www.cnblogs.com/EminemJK/p/15165961.html
实体表是使用集群来创建的,如果是单机请删除?on?cluster cluster_3s_1r?,修改表引擎即可。如果已经开启了 zookeeper 且开启复制表,在任一一台服务器运行运行一次即可。
在每台服务器上创建物化视图:
CREATE MATERIALIZED VIEW viem_dlb_ZC_My_syslogs_consumer TO dlb_ZC_My_syslogs
AS SELECT *
FROM kafka_dlb_ZC_My_syslogs;
创建分布式视图(可选,单机请忽略):
CREATE TABLE Dis_dlb_ZC_My_syslogs ON CLUSTER cluster_3s_1r
AS LogsDataBase.dlb_ZC_My_syslogs
ENGINE = Distributed(cluster_3s_1r, 'LogsDataBase', 'dlb_ZC_My_syslogs',rand());
分布式表将聚合集群中每个分片的表信息,进行执行一次。
运行
--
顺便提供一个快速运行 Filebeat 和卸载的 bat 脚本:
运行服务:
//windows server2008 以上版本的服务器
cd %~dp0
.\install-service-filebeat.ps1
pause
//windows server 2008 和以下的服务器
cd %~dp0
PowerShell.exe -ExecutionPolicy RemoteSigned -File .\install-service-filebeat.ps1
pause
卸载服务:
//windows server2008 以上版本的服务器
cd %~dp0
.\uninstall-service-filebeat.ps1
pause
评论