写点什么

ELK 太重?试试 KFC 日志采集

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:3286 字

    阅读完需:约 11 分钟

堆栈跟踪: at System.Web.Mvc.DefaultControllerFactory.GetControllerInstance(RequestContext requestContext, Type controllerType)


at System.Web.Mvc.DefaultControllerFactory.CreateController(RequestContext requestContext, String controllerName)


at System.Web.Mvc.MvcHandler.ProcessRequestInit(HttpContextBase httpContext, IController& controller, IControllerFactory& factory)


at System.Web.Mvc.MvcHandler.BeginProcessRequest(HttpContextBase httpContext, AsyncCallback callback, Object state)


at System.Web.HttpApplication.CallHandlerExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute()


at System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously)



FileBeat 配置:



max_procs: 2


queue:


mem:


events: 2048


flush.min_events: 2048

============================== Filebeat inputs ===============================

filebeat.inputs:

管理系统

  • type: log


enabled: true


encoding: GB2312


paths:


  • D:/IIS/www.A.com/logs/*.txt


multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'


multiline.negate: true


multiline.match: after


fields:


topic: 'dlbZcZGBSyslogs'


fields_under_root: true

单位系统

  • type: log


enabled: true


encoding: GB2312


paths:


  • D:/IIS/www.B.com/logs/*.txt

Multiline options

multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'


multiline.negate: true


multiline.match: after


fields:


topic: 'dlbZcDWSyslogs'


fields_under_root: true

个人系统

  • type: log


enabled: true


encoding: GB2312


paths:


  • D:/IIS/www.C.com/logs/*.txt

Multiline options

multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'


multiline.negate: true


multiline.match: after


fields:


topic: 'dlbZcMySyslogs'


fields_under_root: true

调试输出

#output.console:

pretty: true

#output.file:

path: "D:/bigData"

filename: filebeat.log

-------------------------------- Kafka Output --------------------------------

output.kafka:

Boolean flag to enable or disable the output module.

enabled: true


hosts: ["192.168.1.10:9092"]

The Kafka topic used for produced events. The setting can be a format string

using any event field. To set the topic from document type use `%{[type]}`.

topic: '%{[topic]}'

Authentication details. Password is required if username is set.

#username: ''


#password: ''

The number of concurrent load-balanced Kafka output workers.

worker: 2


max_message_bytes: 10000000

================================= Processors =================================

processors:


  • add_host_metadata:


when.not.contains.tags: forwarded


  • add_cloud_metadata: ~

  • add_docker_metadata: ~

  • add_kubernetes_metadata: ~

  • script:


lang: javascript


id: my_filter


tag: enable


source: >


function process(event) {


var str = event.Get("message");


var sp = str.split(" ");


var log_datetime = sp.slice(0,2).join(" ");


var regEx = /^\d{4}-\d{2}-\d{2}$/;


var prefix_date = log_datetime.substring(0, 10);


if(prefix_date.match(regEx) != null)


{


event.Put("server","221");


log_datetime = log_datetime.replace(",",".");


log_datetime = log_datetime.replace("'","");


regEx = /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}$/;


if(log_datetime.match(regEx) != null)


{


event.Put("log_datetime",log_datetime);


event.Put("log_index",sp.slice(2,3).join(" ").replace("[","").replace("]",""));


event.Put("log_level",sp.slice(3,4).join(" "));


if(str.match(/(?<=time:)\S*(?=ms)/)!=null)


{


var spTime= str.split("time:");


var spPre = spTime[0].split(" ");


var spNext = spTime[1].split(" ");


event.Put("log_class",spPre.slice(4).join(" "));


var log_execTime= spNext.slice(0,1).join(" ").replace("ms","");


regEx = /^(\-|\+)?\d+(\.\d+)?$/;


if(regEx.test(log_execTime))


{


event.Put("log_execTime",log_execTime);


}


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


else


{


event.Put("log_execTime","-1");


}


event.Put("log_message",spNext.slice(1).join(" "));


}


else


{


event.Put("log_class",sp.slice(4,5).join(" "));


event.Put("log_execTime","-1");


event.Put("log_message",sp.slice(6).join(" "));


}


return;


}


}


event.Cancel();


}


  • drop_fields:


fields: ["@timestamp", "message", "host", "ecs", "agent", "@metadata", "log", "input"]



以上的配置说明:


max_procs:设置可以同时执行的最大 CPU 数;


queue :内部队列信息;


Filebeat inputs:日志数据源采集的入口;


其他字段如下说明:



#日志类型


  • type: log


#开启


enabled: true


#编码格式,有中文必须设置


encoding: GB2312


#路径


paths:


  • D:/IIS/www.A.com/logs/*.txt


#多行匹配前缀


multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'


#开启多行匹配


multiline.negate: true


#开启多行之后,匹配是合并到上一条信息


multiline.match: after


#增加一个字段,用于 kafka 的 topic 的识别


fields:


topic: 'dlbZcZGBSyslogs'

字段增加在输出 json 的根目录下

fields_under_root: true


//https://www.cnblogs.com/EminemJK/p/15165961.html



Kafka Output:kafka 的配置信息,主要是??topic: '%{[topic]}'?的设置,因为这里采集多个数据源,对于不同的 topic,在数据源输入的时候,已经设置好字段如?topic: 'dlbZcZGBSyslogs'?,所以此处使用占位符灵活设置;


Processors:配置处理器,即对采集的日志信息进行处理,处理是按行处理,当字符串处理即可,可以使用 js 语法进行对字符串进行处理;Filebeat 的处理器可以多种多样,具体可以看文档。


另外,在调试的时候,可以采用文件输出或 Console 输出来观察处理后输出的数据格式,再进行微调:


output.file:


path: "D:/bigData"


filename: filebeat.log


IIS 的日志也差不多,只是微调处理逻辑就可以了,一通百通。


其他配置可以参考官网文档:h__ttps://www.elastic.co/guide/en/beats/filebeat/current/index.html


Kafka 配置




Kafka 没有特别的处理,在这里只是进行消息的接收,新建好主题就可以。



//个人系统


bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcMySyslogs


//单位系统


bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcDWSyslogs


//管理系统


bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcZGBSyslogs



partitions?分区数的大小,取决设置了多少个消费者,这里我们有三台服务器做了 Clickhouse 的集群作为消费者,所以分区数设置为 3,一般情况,消费者总数不应该大于分区数,每个分区只能分配一个消费者。


Clickhouse 配置




Clickhouse 三个分片的集群,如果你是单机的,只需要把语法相应的修改一下即可。


在每台服务器上创建 kafka 引擎表:



CREATE TABLE kafka_dlb_ZC_My_syslogs (


log_datetime DateTime64,


log_index String,


log_level String,


log_class String,


log_message String,


log_execTime Float32,


server String


) ENGINE = Kafka


SETTINGS kafka_broker_list = '192.168.1.10:9092',


kafka_topic_list = 'dlbZcMySyslogs',


kafka_group_name = 'dlbZcMySyslogs_sys',


kafka_format = 'JSONEachRow',


kafka_num_consumers = 1;



创建实体表:



CREATE TABLE dlb_ZC_My_syslogs on cluster cluster_3s_1r


(


log_datetime DateTime64,


log_index String,


log_level String,


log_class String,


log_message String,


log_execTime Float32,


server String


) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/dlb_ZC_My_syslogs', '{replica}')


ORDER BY toDate(log_datetime)


PARTITION BY toYYYYMM(log_datetime);

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
ELK太重?试试KFC日志采集