ELK 太重?试试 KFC 日志采集
堆栈跟踪: at System.Web.Mvc.DefaultControllerFactory.GetControllerInstance(RequestContext requestContext, Type controllerType)
at System.Web.Mvc.DefaultControllerFactory.CreateController(RequestContext requestContext, String controllerName)
at System.Web.Mvc.MvcHandler.ProcessRequestInit(HttpContextBase httpContext, IController& controller, IControllerFactory& factory)
at System.Web.Mvc.MvcHandler.BeginProcessRequest(HttpContextBase httpContext, AsyncCallback callback, Object state)
at System.Web.HttpApplication.CallHandlerExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute()
at System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously)
FileBeat 配置:
max_procs: 2
queue:
mem:
events: 2048
flush.min_events: 2048
============================== Filebeat inputs ===============================
filebeat.inputs:
管理系统
type: log
enabled: true
encoding: GB2312
paths:
D:/IIS/www.A.com/logs/*.txt
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcZGBSyslogs'
fields_under_root: true
单位系统
type: log
enabled: true
encoding: GB2312
paths:
D:/IIS/www.B.com/logs/*.txt
Multiline options
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcDWSyslogs'
fields_under_root: true
个人系统
type: log
enabled: true
encoding: GB2312
paths:
D:/IIS/www.C.com/logs/*.txt
Multiline options
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
multiline.negate: true
multiline.match: after
fields:
topic: 'dlbZcMySyslogs'
fields_under_root: true
调试输出
#output.console:
pretty: true
#output.file:
path: "D:/bigData"
filename: filebeat.log
-------------------------------- Kafka Output --------------------------------
output.kafka:
Boolean flag to enable or disable the output module.
enabled: true
hosts: ["192.168.1.10:9092"]
The Kafka topic used for produced events. The setting can be a format string
using any event field. To set the topic from document type use `%{[type]}`.
topic: '%{[topic]}'
Authentication details. Password is required if username is set.
#username: ''
#password: ''
The number of concurrent load-balanced Kafka output workers.
worker: 2
max_message_bytes: 10000000
================================= Processors =================================
processors:
add_host_metadata:
when.not.contains.tags: forwarded
add_cloud_metadata: ~
add_docker_metadata: ~
add_kubernetes_metadata: ~
script:
lang: javascript
id: my_filter
tag: enable
source: >
function process(event) {
var str = event.Get("message");
var sp = str.split(" ");
var log_datetime = sp.slice(0,2).join(" ");
var regEx = /^\d{4}-\d{2}-\d{2}$/;
var prefix_date = log_datetime.substring(0, 10);
if(prefix_date.match(regEx) != null)
{
event.Put("server","221");
log_datetime = log_datetime.replace(",",".");
log_datetime = log_datetime.replace("'","");
regEx = /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}$/;
if(log_datetime.match(regEx) != null)
{
event.Put("log_datetime",log_datetime);
event.Put("log_index",sp.slice(2,3).join(" ").replace("[","").replace("]",""));
event.Put("log_level",sp.slice(3,4).join(" "));
if(str.match(/(?<=time:)\S*(?=ms)/)!=null)
{
var spTime= str.split("time:");
var spPre = spTime[0].split(" ");
var spNext = spTime[1].split(" ");
event.Put("log_class",spPre.slice(4).join(" "));
var log_execTime= spNext.slice(0,1).join(" ").replace("ms","");
regEx = /^(\-|\+)?\d+(\.\d+)?$/;
if(regEx.test(log_execTime))
{
event.Put("log_execTime",log_execTime);
}
else
{
event.Put("log_execTime","-1");
}
event.Put("log_message",spNext.slice(1).join(" "));
}
else
{
event.Put("log_class",sp.slice(4,5).join(" "));
event.Put("log_execTime","-1");
event.Put("log_message",sp.slice(6).join(" "));
}
return;
}
}
event.Cancel();
}
drop_fields:
fields: ["@timestamp", "message", "host", "ecs", "agent", "@metadata", "log", "input"]
以上的配置说明:
max_procs:设置可以同时执行的最大 CPU 数;
queue :内部队列信息;
Filebeat inputs:日志数据源采集的入口;
其他字段如下说明:
#日志类型
type: log
#开启
enabled: true
#编码格式,有中文必须设置
encoding: GB2312
#路径
paths:
D:/IIS/www.A.com/logs/*.txt
#多行匹配前缀
multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
#开启多行匹配
multiline.negate: true
#开启多行之后,匹配是合并到上一条信息
multiline.match: after
#增加一个字段,用于 kafka 的 topic 的识别
fields:
topic: 'dlbZcZGBSyslogs'
字段增加在输出 json 的根目录下
fields_under_root: true
//https://www.cnblogs.com/EminemJK/p/15165961.html
Kafka Output:kafka 的配置信息,主要是??topic: '%{[topic]}'?的设置,因为这里采集多个数据源,对于不同的 topic,在数据源输入的时候,已经设置好字段如?topic: 'dlbZcZGBSyslogs'?,所以此处使用占位符灵活设置;
Processors:配置处理器,即对采集的日志信息进行处理,处理是按行处理,当字符串处理即可,可以使用 js 语法进行对字符串进行处理;Filebeat 的处理器可以多种多样,具体可以看文档。
另外,在调试的时候,可以采用文件输出或 Console 输出来观察处理后输出的数据格式,再进行微调:
output.file:
path: "D:/bigData"
filename: filebeat.log
IIS 的日志也差不多,只是微调处理逻辑就可以了,一通百通。
其他配置可以参考官网文档:h__ttps://www.elastic.co/guide/en/beats/filebeat/current/index.html
Kafka 配置
Kafka 没有特别的处理,在这里只是进行消息的接收,新建好主题就可以。
//个人系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcMySyslogs
//单位系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcDWSyslogs
//管理系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcZGBSyslogs
partitions?分区数的大小,取决设置了多少个消费者,这里我们有三台服务器做了 Clickhouse 的集群作为消费者,所以分区数设置为 3,一般情况,消费者总数不应该大于分区数,每个分区只能分配一个消费者。
Clickhouse 配置
Clickhouse 三个分片的集群,如果你是单机的,只需要把语法相应的修改一下即可。
在每台服务器上创建 kafka 引擎表:
CREATE TABLE kafka_dlb_ZC_My_syslogs (
log_datetime DateTime64,
log_index String,
log_level String,
log_class String,
log_message String,
log_execTime Float32,
server String
) ENGINE = Kafka
SETTINGS kafka_broker_list = '192.168.1.10:9092',
kafka_topic_list = 'dlbZcMySyslogs',
kafka_group_name = 'dlbZcMySyslogs_sys',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 1;
创建实体表:
CREATE TABLE dlb_ZC_My_syslogs on cluster cluster_3s_1r
(
log_datetime DateTime64,
log_index String,
log_level String,
log_class String,
log_message String,
log_execTime Float32,
server String
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/dlb_ZC_My_syslogs', '{replica}')
ORDER BY toDate(log_datetime)
PARTITION BY toYYYYMM(log_datetime);
评论