写点什么

通过 nginx 日志做监控

作者:Arch
  • 2021 年 12 月 08 日
  • 本文字数:8716 字

    阅读完需:约 29 分钟

一、背景

在 ToB 的交付项目中,服务器资源比较紧张,即使采用微服务的架构,一般也是所有服务都部署在一台机器上。在这这种背景下,像 prometheus、cat 之类的开源监控其实不是很适合

二、采用的中间件

TDengine 时序性数据库

官网地址 TDengine 是一款开源、高效的物联网大数据平台,具体的细节可以进官网看

三、功能实现

流程图

顶层抽象

1、nginx 日志格式配置

log_format  main    '$msec]-[$remote_addr]-[$request]-[$request_length]-[$bytes_sent]-[$status]-[$request_time';
复制代码


配置 nginx 的日志格式主要是为了解析

2、功能抽象

  • 日志监控器:负责日志的读取,日志文件游标的维护

  • 日志解析器:解析日志

监控器业务逻辑

  • 最顶层是日志监控器的接口(LogMonitor)

  • 日志监控器实现一个抽象类(AbstractLogMonitor),提供日志读取、游标维护、启动方式、参数初始化、加载日志解析器的基础能力

  • nginx 日志文件监控器(NginxMonitor)继承 AbstractLogMonitor,提供具体的文件路径、参数初始化、游标相关的一些策略

日志监控器代码

LogMonitor

import org.springframework.boot.context.event.ApplicationStartedEvent;import org.springframework.context.ApplicationListener;
/** * @Author: Arch * @Date: 2021-11-15 16:27:43 * @Description: 日志监控,监听容器启动 */public interface LogMonitor extends ApplicationListener<ApplicationStartedEvent> {
/** * 读取日志 */ void readLog();
/** * 根据日志获取解析器的名字 * * @param logLine * @return */ String getParserName(String parserName);
/** * 是否忽略的行 * * @return */ boolean ignoreLine(String logLine);
/** * 重置游标,因为有的文件会进行切割,读取切割完的文件需要重置游标 * * @return */ boolean restCur();
/** * 是否继续使用昨天的游标,因为很多文件都是按天分割,而有的文件则不分割,不分割的文件需要连续游标 * * @return */ boolean contuineLastDayCur();
}
复制代码


AbstractLogMonitor

import java.io.File;import java.io.IOException;import java.io.RandomAccessFile;import java.util.Map;import java.util.Map.Entry;import java.util.concurrent.ConcurrentHashMap;import org.springframework.boot.context.event.ApplicationStartedEvent;import cn.hutool.core.map.MapUtil;import cn.hutool.core.util.ObjectUtil;import cn.hutool.extra.spring.SpringUtil;import lombok.Data;import lombok.extern.slf4j.Slf4j;
/** * @Author: Arch * @Date: 2021-11-15 16:35:19 * @Description: 日志监控器抽象类 */@Slf4j@Datapublic abstract class AbstractLogMonitor implements LogMonitor {
/** 日志路径 */ private String logPath;
/** 上一次读取日志的游标位置 */ private long lastReadCur = 0;
/** 游标刷盘周期;子类如果需要改变,请重写 getSyncCurTime() 方法 */ private int syncCurTime = 0;
/** 读取日志的次数 */ private int bufferReadTimes = 0;
/** 日志解析器集合,key->name; value->parser */ private Map<String, LogParser> parsers = new ConcurrentHashMap<>();
// TODO 停顿时间需要按场景计算 /** 日志读取不到时线程停顿时间 */ private int sleepTime = 10;
/** 日志读取沉睡次数 */ private int sleepTimes = 0;
@SuppressWarnings({ "squid:S2189", "squid:S3776", "squid:S1141" }) @Override public void readLog() { if (!monitorSwitch()) { log.info("nginx日志监控开关为关"); return; } // 初始化参数 initParams(); // TODO 检测线程是否存活 Thread.currentThread().isAlive(); File logFile = new File(this.logPath); if (!logFile.exists()) { log.warn("{} 文件不存在!", this.logPath); // TODO 如何处理? return; } try (RandomAccessFile logReader = new RandomAccessFile(logFile, "r")) { // TODO buffer动态变化 byte[] buffer = new byte[1024 * 10]; while (true) { logReader.seek(this.getLastReadCur()); int len = readBuffer(logReader, buffer, this.sleepTime, this.sleepTimes, this.parsers); if (len <= 0) { this.sleepTimes++; continue; } else { sleepTimes = 0; }
// 判断第一行是不是起始位置,不是一行的起始位置就需要 seek 这一行的起始位置 String logContent = new String(buffer, 0, len); boolean needOffset = !logContent.endsWith("\n"); String[] lines = logContent.split("\n"); int iterLines = needOffset ? lines.length - 1 : lines.length; for (int i = 0; i < iterLines; i++) { String line = lines[i]; if (this.ignoreLine(line)) { continue; } String parserName = this.getParserName(line); LogParser logParser = parsers.get(parserName); log.debug("根据parserName:{} 获取解析器:{}", parserName, logParser.getClass().getName()); if (ObjectUtil.isNotNull(logParser)) { try { logParser.parse(line); } catch (Exception e) { log.error("日志解析异常", e); } } else { log.warn("未获取到解析器"); }
} if (needOffset) { int offsetForward = lines[lines.length - 1].getBytes().length; // 文件指针当前的位置 this.setLastReadCur(logReader.getFilePointer()); // 文件指针向前偏移 logReader.seek(lastReadCur - offsetForward); } // 文件指针当前的位置 this.setLastReadCur(logReader.getFilePointer()); } } catch (IOException ioe) { log.error("读取 log:{} 文件失败", logPath, ioe); } }
/** * 读取文件内容 * * @param logReader * @param buffer * @return */ private static int readBuffer(RandomAccessFile logReader, byte[] buffer, int sleepTime, int sleepTimes, Map<String, LogParser> parsers) { int len = 0; try { len = logReader.read(buffer); if (len <= 0) { if (sleepTimes == 1 && MapUtil.isNotEmpty(parsers)) { parsers.entrySet().stream() .filter(entry -> ObjectUtil.isNotEmpty(entry) && ObjectUtil.isNotEmpty(entry.getValue())) .map(Entry::getValue).forEach(LogParser::flushLogs); } log.info("未读取到日志,读取日志线程开始沉睡: {}秒", sleepTime); Thread.sleep(Math.multiplyExact(1000, sleepTime)); } } catch (IOException ioe) { log.error("读取日志文件异常", ioe); } catch (InterruptedException ie) { log.error("sleep thread interrupted", ie); Thread.currentThread().interrupt(); } return len; }
@Override public void onApplicationEvent(ApplicationStartedEvent event) { log.info("the application already startup, start monitor log: {}", this.getClass().getName()); // 启动监听日志文件的线程 this.readLog(); }
/** * 初始化参数 */ private void initParams() { this.loadParsers(); this.setLogPath(providerLogPath()); log.info("{} 监控器设置的日志路径为:{}", this.getClass().getName(), this.getLogPath()); this.initTaosDB(); }
/** * 初始化 tdengine */ public abstract void initTaosDB();
/** * 设置日志解析器 * * @param parsers */ private void loadParsers() { Map<String, LogParser> logParsers = this.providerParsers(); // 初始化 parser logParsers.entrySet().stream().forEach(item -> item.getValue().init()); log.info("{} 监控器设置解析器: {}", this.getClass().getName(), logParsers.toString()); this.parsers.putAll(logParsers); }
/** * 监控开关 * * @return */ public abstract boolean monitorSwitch();
/** * 设置监控日志的路径 * * @return */ public abstract String providerLogPath();
/** * 设置解析器 */ public abstract Map<String, LogParser> providerParsers();
/** * 添加日志解析器 * * @param parserName * @param parser */ public void addParser(String parserName, LogParser parser) { this.parsers.put(parserName, parser); }
/** * 移除日志解析器 * * @param parserName */ public void removeParser(String parserName) { this.parsers.remove(parserName); }
public void setLastReadCur(long lastReadCur) { bufferReadTimes++; this.lastReadCur = lastReadCur; // lastReadCur 标记几次之后需要刷盘 if (bufferReadTimes > this.getSyncCurTime()) { LogMonitorService logMonitorService = SpringUtil.getBean(LogMonitorService.class); logMonitorService.updateCur(logPath, lastReadCur); bufferReadTimes = 0; } }
public long getLastReadCur() { LogMonitorService logMonitorService = SpringUtil.getBean(LogMonitorService.class); if (this.restCur()) { this.lastReadCur = 0; logMonitorService.updateCur(logPath, this.lastReadCur); } if (this.lastReadCur > 0) { return this.lastReadCur; } else { this.lastReadCur = logMonitorService.getCur(this.logPath, this.contuineLastDayCur()); return this.lastReadCur; } }
}
复制代码


NginxMonitor

import java.net.InetAddress;import java.net.UnknownHostException;import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.stereotype.Component;
import cn.hutool.core.map.MapUtil;import cn.hutool.core.text.CharSequenceUtil;import cn.hutool.extra.spring.SpringUtil;import lombok.EqualsAndHashCode;import lombok.extern.slf4j.Slf4j;
/** * @Author: Arch * @Date: 2021-11-15 16:19:57 * @Description: Nginx 日志监控 */@Slf4j@EqualsAndHashCode(callSuper = false)@EnableConfigurationProperties(ToolsMonitorNginxConfig.class)@Componentpublic class NginxMonitor extends AbstractLogMonitor {
@Autowired private ToolsMonitorNginxConfig toolsMonitorNginxConfig;
@Autowired private LogMonitorService logMonitorService;
@Override public String providerLogPath() { return toolsMonitorNginxConfig.getNginxLogFilePath(); }
@Override public boolean ignoreLine(String logLine) { return CharSequenceUtil.containsAny(logLine, ToolsMonitorNginxConfig.DEFAULT_IGNORE_URL_PARTTERN) || CharSequenceUtil.containsAny(logLine, toolsMonitorNginxConfig.getIgnoreNginxUrlPattern()); }
@Override public String getParserName(String logLine) { return AccessLogParser.class.getName(); }
@Override public boolean monitorSwitch() { return toolsMonitorNginxConfig.isMonitorNginx(); }
@Override public Map<String, LogParser> providerParsers() { return MapUtil.of(AccessLogParser.class.getName(), SpringUtil.getBean(AccessLogParser.class)); }
@Override public boolean restCur() { return false; }
@Override public boolean contuineLastDayCur() { return true; }
public void resetCurByRest() { this.setLastReadCur(0); logMonitorService.updateCur(this.getLogPath(), 0); }
@Override @SuppressWarnings({ "squid:S5361" }) public void initTaosDB() { NginxAccessLogMapper nginxAccessLogMapper = SpringUtil.getBean(NginxAccessLogMapper.class); nginxAccessLogMapper.createDataBase(); log.info("创建数据库:nginx_access_log_db"); nginxAccessLogMapper.createSuperTable(); log.info("创建超级表:nginx_access_log_st"); try { InetAddress addr = InetAddress.getLocalHost(); String tableName = "t_" + addr.getHostAddress().replaceAll("\\.", "_"); nginxAccessLogMapper.createTable(tableName, IPV4util.ip2Int(addr.getHostAddress())); log.info("创建表:{}", tableName); } catch (UnknownHostException uhe) { log.warn("未获取到本机 IPV4 地址"); nginxAccessLogMapper.createTable("t_ip_unknow", 0); } }
}
复制代码

解析器业务逻辑

  • 顶层是解析器接口(LogParser)

  • 业务解析器直接实现解析器接口(AccessLogParser)

解析器代码

LogParser

import java.net.InetAddress;import java.util.ArrayList;import java.util.List;import java.util.Map;import java.util.stream.Collectors;import org.apache.commons.lang3.StringUtils;import org.springframework.scheduling.annotation.Async;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import cn.hutool.core.collection.CollUtil;import lombok.extern.slf4j.Slf4j;
/** * @Author: Arch * @Date: 2021-11-16 14:29:23 * @Description: nginx access.log日志解析器 */@SuppressWarnings({ "squid:S5861", "squid:S5361" })@Slf4j@Componentpublic class AccessLogParser implements LogParser {
@Autowired private NginxAccessLogMapper nginxAccessLogMapper;
@Autowired private ToolsTaosUrlDictMapper toolsTaosUrlDictMapper;
@SuppressWarnings({ "squid:S5856" }) public static final String SPEATOR = "]-\\[";
/** Taos Request Url 映射 */ private Map<String, Integer> urlDicts;
/** Taos Request Url 映射 */ private Map<String, String> convertUrlDicts;
private List<NginxAccessLog> accessLogs = new ArrayList<>();
@Override @Async public void parse(String logLine) { if (StringUtils.isBlank(logLine)) { return; } String[] logFields = logLine.split(SPEATOR); // 加工 log NginxAccessLog log = processLog(logFields); this.addLog(log); }
private synchronized void addLog(NginxAccessLog nginxAccessLog) { this.accessLogs.add(nginxAccessLog); // TODO 这个阈值应该要动态计算 if (accessLogs.size() > 200) { this.flushLogs(); } }
@Override public synchronized void flushLogs() { if (CollUtil.isEmpty(this.accessLogs)) { return; } String tableName = "t_ip_unknow"; try { InetAddress addr = InetAddress.getLocalHost(); tableName = "t_" + addr.getHostAddress().replaceAll("\\.", "_"); } catch (Exception e) { log.info("获取 taos 表名时异常", e); } // 批量插入 nginxAccessLogMapper.batchInsert(this.accessLogs, tableName); this.accessLogs.clear(); }
/** * 加工 nginxAccessLog * * @param nginxAccessLog */ private NginxAccessLog processLog(String[] logFields) { String requestStr = logFields[2]; String[] requestStrFields = requestStr.split(" "); RequestMethodEnum requestMethod = RequestMethodEnum.getRequestMethod(requestStrFields[0]); Integer requestUrl = 0; synchronized (this) { // 去除 url 中的参数部分 String originUrl = requestStrFields[1].split("\\?")[0]; Integer urlCode = this.urlDicts.get(originUrl); if (null == urlCode) { ToolsTaosUrlDict toolsTaosUrlDict = new ToolsTaosUrlDict().setRequestUrl(originUrl); toolsTaosUrlDictMapper.insert(toolsTaosUrlDict); this.urlDicts.put(toolsTaosUrlDict.getRequestUrl(), toolsTaosUrlDict.getRequestUrlCode()); this.convertUrlDicts.put(String.valueOf(toolsTaosUrlDict.getRequestUrlCode()), toolsTaosUrlDict.getRequestUrl()); requestUrl = toolsTaosUrlDict.getRequestUrlCode(); } else { requestUrl = urlCode; } } return new NginxAccessLog().setTs(logFields[0]).setRemoteAddr(logFields[1]) .setRequestMethod(requestMethod.getRequestMethodCode()) .setRequestUrl(requestUrl) .setRequestLength(logFields[3]).setBytesSent(logFields[4]).setStatus(logFields[5]) .setRequestTime(logFields[6]); }
/** * 初始化 url 映射缓存 */ @Override public void init() { log.info("init url dicts"); List<ToolsTaosUrlDict> urls = toolsTaosUrlDictMapper.selectList(null); this.urlDicts = urls.stream() .collect(Collectors.toMap(ToolsTaosUrlDict::getRequestUrl, ToolsTaosUrlDict::getRequestUrlCode)); this.convertUrlDicts = urls.stream().collect( Collectors.toMap(url -> String.valueOf(url.getRequestUrlCode()), ToolsTaosUrlDict::getRequestUrl)); log.info("url dicts: {}", urlDicts.size()); }
public Map<String, String> getConvertUrlDicts() { return this.convertUrlDicts; }
}
复制代码

四、一些细节

1、数据压缩

  • 将 IPV4 地址转换成 int 进行存储

  • 将 requestPath 映射成 int 存储

2、缓存

  • 在将 requestPath 转成 int 类型的 code 时,用到缓存的,直接看代码更清晰

  • 在监控指标刷盘的时候会进行缓冲,到达阈值之后再进行刷盘

  • 缓冲刷盘是存在问题的,就是最后一次没有到达阈值,然后又没有新增,那么最后一个的缓冲就不会进行刷盘,这个是在 AbstractLogMonitor 的 readBuffer 方法中做了控制,线程沉睡次数达到阈值之后会进行刷盘

3、异步

AccessLogParser 的解析操作使用的是 spring 提供的 @Async 进行的异步执行

发布于: 刚刚阅读数: 3
用户头像

Arch

关注

还未添加个人签名 2019.02.18 加入

还未添加个人简介

评论

发布
暂无评论
通过 nginx 日志做监控