写点什么

日志归一管理的一种解决方案

作者:为自己带盐
  • 2021 年 12 月 04 日
  • 本文字数:3376 字

    阅读完需:约 11 分钟

日志归一管理的一种解决方案

在一个开发团队中,由于业务不同,会开发多种针对不同业务的系统。不论是单体应用,还是微服务,都是根据或模糊或精确的服务边界来进行业务划分,进而开发出各种各样的功能。

​ 当我们开发的系统多起来以后,维护的成本也会越来越高,而系统维护的一个非常重要的判定指标就是系统日志。也就是说,我们有多少业务系统,就会产生多少种对应的日志数据,而每种日志又会化分成不同类型的日志,比如 Informatica,warning,debug,error 等。其实日志数据就像我们身体的新陈代谢一样,随着系统的运行,每时每刻都在产生,如果每个系统都单独的管理这些数据,那无疑是一个难以想象的工作,所以我们需要对这些数据进行统一管理。

​ ElasticSearch 从诞生之初,就被很多团队用来做日志管理,当然它的能力远不止这些。这次简单分享一下用 Elastic 公司旗下的 ElasticSearch+Logstash+Kibana,也就是常说的 ELK 组件,来管理多个系统日志的方案(dotnetcore 5.0 环境)。

我们整体的开发思路是,分别搭建 redis 和 elasticsearch 集群,然后通过日志组件,将系统的日志写入到 redis 队列,再通过 logstash 数据管道来将日志数据迁移到 es 集群,如下图。



搭建 ElasticSearch 和 Kibana 环境

关于环境的搭建,我认为没有比官网的介绍更简洁,明了的地方了,直接看官方文档即可 https://www.elastic.co/cn/elastic-stack/

这里我再介绍一个插件,elasticvue,支持 chrome,Firefox,edge,我觉得比 head 要好用也更美观。



kibana 安装成功后,的截图



日志写入到队列

这里我用的是 Serilog 组件,然后没有用 Kafka 之列的消息队列,而是使用的 redis 的队列类型(list),redis 的客户端用的是 csredis。


public class Logger: interfaces.ILogger{        public Logger()    {            }    private CSRedisClient csRedis = new CSRedisClient("10.185.3.130:6379,defaultDatabase=7");
const string log1Name = "ApiLog"; const string log2Name = "WebLog"; const string log3Name = "ErrorLog"; const string log4Name = "DebugLog"; const string log5Name = "WarningLog"; /// <summary> /// 初始化日志 /// </summary> public void InitLog() { RedisHelper.Initialization(csRedis); string LogFilePath(string FileName) => $@"{AppContext.BaseDirectory}ALL_Logs\{FileName}\log.log"; string SerilogOutputTemplate = "{NewLine}Date:{Timestamp:yyyy-MM-dd HH:mm:ss.fff}{NewLine}LogLevel:{Level}{NewLine}Message:{Message}{NewLine}{Exception}" + new string('-', 100); Log.Logger = new LoggerConfiguration() .Enrich.FromLogContext() .MinimumLevel.Debug() // 所有Sink的最小记录级别 .WriteTo.Logger(lg => lg.Filter.ByIncludingOnly(Matching.WithProperty<string>("position", p => p == log1Name)).WriteTo.Async(a => a.File(LogFilePath(log1Name), rollingInterval: RollingInterval.Day, outputTemplate: SerilogOutputTemplate))) .WriteTo.Logger(lg => lg.Filter.ByIncludingOnly(Matching.WithProperty<string>("position", p => p == log2Name)).WriteTo.Async(a => a.File(LogFilePath(log2Name), rollingInterval: RollingInterval.Day, outputTemplate: SerilogOutputTemplate))) .WriteTo.Logger(lg => lg.Filter.ByIncludingOnly(Matching.WithProperty<string>("position", p => p == log3Name)).WriteTo.Async(a => a.File(LogFilePath(log3Name), rollingInterval: RollingInterval.Day, outputTemplate: SerilogOutputTemplate))) .WriteTo.Logger(lg => lg.Filter.ByIncludingOnly(Matching.WithProperty<string>("position", p => p == log4Name)).WriteTo.Async(a => a.File(LogFilePath(log4Name), rollingInterval: RollingInterval.Day, outputTemplate: SerilogOutputTemplate))) .WriteTo.Logger(lg => lg.Filter.ByIncludingOnly(Matching.WithProperty<string>("position", p => p == log5Name)).WriteTo.Async(a => a.File(LogFilePath(log5Name), rollingInterval: RollingInterval.Day, outputTemplate: SerilogOutputTemplate))) .CreateLogger();
}
/*****************************下面是不同日志级别*********************************************/ // FATAL(致命错误) > ERROR(一般错误) > Warning(警告) > Information(一般信息) > DEBUG(调试信息)>Verbose(详细模式,即全部) /// <summary> /// 普通日志 /// </summary> /// <param name="msg"></param> /// <param name="fileName"></param> public void Info(string msg, string fileName = "") { if (fileName == "" || fileName == log1Name) { Log.Information($"{{position}}:{msg}", log1Name); } else if (fileName == log2Name) { Log.Information($"{{position}}:{msg}", log2Name); } else { //输入其他的话,还是存放到第一个文件夹 Log.Information($"{{position}}:{msg}", log1Name); } Task.Run(() => writeLogToRedis(msg, "infomation")); } /// <summary> /// 调试日志 /// </summary> /// <param name="msg"></param> public void Debug(string msg) { Log.Debug($"{{position}}:{msg}", log4Name); Task.Run(() => writeLogToRedis(msg, "debug")); } /// <summary> /// 警告日志或一些关键日志 /// </summary> /// <param name="msg"></param> public void Warning(string msg) { Log.Warning($"{{position}}:{msg}", log5Name); Task.Run(() => writeLogToRedis(msg, "warning")); } /// <summary> /// Error方法统一存放到ErrorLog文件夹 /// </summary> /// <param name="msg"></param> public void Error(Exception ex) { Log.Error(ex, "{position}:" + ex.Message, log3Name); Task.Run(() => writeLogToRedis(ex.Message, "error")); }
public void Error(string msg) { Log.Error("{position}:" + msg, log3Name); Task.Run(() => writeLogToRedis(msg, "error")); } /// <summary> /// 日志写入到redis队列 /// </summary> /// <param name="msg"></param> /// <param name="logLevel"></param> /// <returns></returns> public async Task writeLogToRedis(string msg,string logLevel) { string ret = $"{logLevel}\"{msg}\""; await RedisHelper.LPushAsync(ConfigurationHelper.GetSectionValue("redislogkey"), ret); }}
复制代码

写入之后的效果如下



Logstash 同步日志数据到 ES

在官网下载好 logstash 之后,设定好配置文件,这里需要了解一些数据解析的相关知识,官网也都有介绍,我这里是用的 grok 模式。

配置文件代码如下

input {  redis {        codec => plain        host => "{redis地址}"        port => 6379        data_type => list        key => "{键值}"        db => 7    }}
filter { grok { match=>{"message"=>"%{DATA:level}\"%{DATA:info}\""} #改成你需要的匹配模式 remove_field => ["message"] #去掉message属性 }}output { elasticsearch { hosts => ["你的es出口地址"] index=>"索引名称" #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" #user => "elastic" #password => "changeme" }}
复制代码

配置完成后,执行 logstash 指令

bin/logstash -f conf/logstash-redis.conf
复制代码

运行之后无报错,再到 kibana 面板里检查一下索引数据,一切正常则日志同步成功





好了,大概就这些,之后我们就可以在 Kibana 面板里监控所有系统的日志了,可以通过设置不同的指标来进行监控.

发布于: 1 小时前阅读数: 9
用户头像

学着码代码,学着码人生。 2019.04.11 加入

狂奔的小码农

评论

发布
暂无评论
日志归一管理的一种解决方案