通过 nginx 日志做监控
- 2021 年 12 月 08 日
本文字数:8716 字
阅读完需:约 29 分钟
一、背景
在 ToB 的交付项目中,服务器资源比较紧张,即使采用微服务的架构,一般也是所有服务都部署在一台机器上。在这这种背景下,像 prometheus、cat 之类的开源监控其实不是很适合
二、采用的中间件
TDengine 时序性数据库
官网地址 TDengine 是一款开源、高效的物联网大数据平台,具体的细节可以进官网看
三、功能实现
流程图
顶层抽象
1、nginx 日志格式配置
log_format main '$msec]-[$remote_addr]-[$request]-[$request_length]-[$bytes_sent]-[$status]-[$request_time';
配置 nginx 的日志格式主要是为了解析
2、功能抽象
日志监控器:负责日志的读取,日志文件游标的维护
日志解析器:解析日志
监控器业务逻辑
最顶层是日志监控器的接口(LogMonitor)
日志监控器实现一个抽象类(AbstractLogMonitor),提供日志读取、游标维护、启动方式、参数初始化、加载日志解析器的基础能力
nginx 日志文件监控器(NginxMonitor)继承 AbstractLogMonitor,提供具体的文件路径、参数初始化、游标相关的一些策略
日志监控器代码
LogMonitor
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
/**
* @Author: Arch
* @Date: 2021-11-15 16:27:43
* @Description: 日志监控,监听容器启动
*/
public interface LogMonitor extends ApplicationListener<ApplicationStartedEvent> {
/**
* 读取日志
*/
void readLog();
/**
* 根据日志获取解析器的名字
*
* @param logLine
* @return
*/
String getParserName(String parserName);
/**
* 是否忽略的行
*
* @return
*/
boolean ignoreLine(String logLine);
/**
* 重置游标,因为有的文件会进行切割,读取切割完的文件需要重置游标
*
* @return
*/
boolean restCur();
/**
* 是否继续使用昨天的游标,因为很多文件都是按天分割,而有的文件则不分割,不分割的文件需要连续游标
*
* @return
*/
boolean contuineLastDayCur();
}
AbstractLogMonitor
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.extra.spring.SpringUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: Arch
* @Date: 2021-11-15 16:35:19
* @Description: 日志监控器抽象类
*/
@Slf4j
@Data
public abstract class AbstractLogMonitor implements LogMonitor {
/** 日志路径 */
private String logPath;
/** 上一次读取日志的游标位置 */
private long lastReadCur = 0;
/** 游标刷盘周期;子类如果需要改变,请重写 getSyncCurTime() 方法 */
private int syncCurTime = 0;
/** 读取日志的次数 */
private int bufferReadTimes = 0;
/** 日志解析器集合,key->name; value->parser */
private Map<String, LogParser> parsers = new ConcurrentHashMap<>();
// TODO 停顿时间需要按场景计算
/** 日志读取不到时线程停顿时间 */
private int sleepTime = 10;
/** 日志读取沉睡次数 */
private int sleepTimes = 0;
@SuppressWarnings({ "squid:S2189", "squid:S3776", "squid:S1141" })
@Override
public void readLog() {
if (!monitorSwitch()) {
log.info("nginx日志监控开关为关");
return;
}
// 初始化参数
initParams();
// TODO 检测线程是否存活
Thread.currentThread().isAlive();
File logFile = new File(this.logPath);
if (!logFile.exists()) {
log.warn("{} 文件不存在!", this.logPath);
// TODO 如何处理?
return;
}
try (RandomAccessFile logReader = new RandomAccessFile(logFile, "r")) {
// TODO buffer动态变化
byte[] buffer = new byte[1024 * 10];
while (true) {
logReader.seek(this.getLastReadCur());
int len = readBuffer(logReader, buffer, this.sleepTime, this.sleepTimes, this.parsers);
if (len <= 0) {
this.sleepTimes++;
continue;
} else {
sleepTimes = 0;
}
// 判断第一行是不是起始位置,不是一行的起始位置就需要 seek 这一行的起始位置
String logContent = new String(buffer, 0, len);
boolean needOffset = !logContent.endsWith("\n");
String[] lines = logContent.split("\n");
int iterLines = needOffset ? lines.length - 1 : lines.length;
for (int i = 0; i < iterLines; i++) {
String line = lines[i];
if (this.ignoreLine(line)) {
continue;
}
String parserName = this.getParserName(line);
LogParser logParser = parsers.get(parserName);
log.debug("根据parserName:{} 获取解析器:{}", parserName, logParser.getClass().getName());
if (ObjectUtil.isNotNull(logParser)) {
try {
logParser.parse(line);
} catch (Exception e) {
log.error("日志解析异常", e);
}
} else {
log.warn("未获取到解析器");
}
}
if (needOffset) {
int offsetForward = lines[lines.length - 1].getBytes().length;
// 文件指针当前的位置
this.setLastReadCur(logReader.getFilePointer());
// 文件指针向前偏移
logReader.seek(lastReadCur - offsetForward);
}
// 文件指针当前的位置
this.setLastReadCur(logReader.getFilePointer());
}
} catch (IOException ioe) {
log.error("读取 log:{} 文件失败", logPath, ioe);
}
}
/**
* 读取文件内容
*
* @param logReader
* @param buffer
* @return
*/
private static int readBuffer(RandomAccessFile logReader, byte[] buffer, int sleepTime, int sleepTimes,
Map<String, LogParser> parsers) {
int len = 0;
try {
len = logReader.read(buffer);
if (len <= 0) {
if (sleepTimes == 1 && MapUtil.isNotEmpty(parsers)) {
parsers.entrySet().stream()
.filter(entry -> ObjectUtil.isNotEmpty(entry) && ObjectUtil.isNotEmpty(entry.getValue()))
.map(Entry::getValue).forEach(LogParser::flushLogs);
}
log.info("未读取到日志,读取日志线程开始沉睡: {}秒", sleepTime);
Thread.sleep(Math.multiplyExact(1000, sleepTime));
}
} catch (IOException ioe) {
log.error("读取日志文件异常", ioe);
} catch (InterruptedException ie) {
log.error("sleep thread interrupted", ie);
Thread.currentThread().interrupt();
}
return len;
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
log.info("the application already startup, start monitor log: {}", this.getClass().getName());
// 启动监听日志文件的线程
this.readLog();
}
/**
* 初始化参数
*/
private void initParams() {
this.loadParsers();
this.setLogPath(providerLogPath());
log.info("{} 监控器设置的日志路径为:{}", this.getClass().getName(), this.getLogPath());
this.initTaosDB();
}
/**
* 初始化 tdengine
*/
public abstract void initTaosDB();
/**
* 设置日志解析器
*
* @param parsers
*/
private void loadParsers() {
Map<String, LogParser> logParsers = this.providerParsers();
// 初始化 parser
logParsers.entrySet().stream().forEach(item -> item.getValue().init());
log.info("{} 监控器设置解析器: {}", this.getClass().getName(), logParsers.toString());
this.parsers.putAll(logParsers);
}
/**
* 监控开关
*
* @return
*/
public abstract boolean monitorSwitch();
/**
* 设置监控日志的路径
*
* @return
*/
public abstract String providerLogPath();
/**
* 设置解析器
*/
public abstract Map<String, LogParser> providerParsers();
/**
* 添加日志解析器
*
* @param parserName
* @param parser
*/
public void addParser(String parserName, LogParser parser) {
this.parsers.put(parserName, parser);
}
/**
* 移除日志解析器
*
* @param parserName
*/
public void removeParser(String parserName) {
this.parsers.remove(parserName);
}
public void setLastReadCur(long lastReadCur) {
bufferReadTimes++;
this.lastReadCur = lastReadCur;
// lastReadCur 标记几次之后需要刷盘
if (bufferReadTimes > this.getSyncCurTime()) {
LogMonitorService logMonitorService = SpringUtil.getBean(LogMonitorService.class);
logMonitorService.updateCur(logPath, lastReadCur);
bufferReadTimes = 0;
}
}
public long getLastReadCur() {
LogMonitorService logMonitorService = SpringUtil.getBean(LogMonitorService.class);
if (this.restCur()) {
this.lastReadCur = 0;
logMonitorService.updateCur(logPath, this.lastReadCur);
}
if (this.lastReadCur > 0) {
return this.lastReadCur;
} else {
this.lastReadCur = logMonitorService.getCur(this.logPath, this.contuineLastDayCur());
return this.lastReadCur;
}
}
}
NginxMonitor
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.stereotype.Component;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.extra.spring.SpringUtil;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: Arch
* @Date: 2021-11-15 16:19:57
* @Description: Nginx 日志监控
*/
@Slf4j
@EqualsAndHashCode(callSuper = false)
@EnableConfigurationProperties(ToolsMonitorNginxConfig.class)
@Component
public class NginxMonitor extends AbstractLogMonitor {
@Autowired
private ToolsMonitorNginxConfig toolsMonitorNginxConfig;
@Autowired
private LogMonitorService logMonitorService;
@Override
public String providerLogPath() {
return toolsMonitorNginxConfig.getNginxLogFilePath();
}
@Override
public boolean ignoreLine(String logLine) {
return CharSequenceUtil.containsAny(logLine, ToolsMonitorNginxConfig.DEFAULT_IGNORE_URL_PARTTERN)
|| CharSequenceUtil.containsAny(logLine, toolsMonitorNginxConfig.getIgnoreNginxUrlPattern());
}
@Override
public String getParserName(String logLine) {
return AccessLogParser.class.getName();
}
@Override
public boolean monitorSwitch() {
return toolsMonitorNginxConfig.isMonitorNginx();
}
@Override
public Map<String, LogParser> providerParsers() {
return MapUtil.of(AccessLogParser.class.getName(), SpringUtil.getBean(AccessLogParser.class));
}
@Override
public boolean restCur() {
return false;
}
@Override
public boolean contuineLastDayCur() {
return true;
}
public void resetCurByRest() {
this.setLastReadCur(0);
logMonitorService.updateCur(this.getLogPath(), 0);
}
@Override
@SuppressWarnings({ "squid:S5361" })
public void initTaosDB() {
NginxAccessLogMapper nginxAccessLogMapper = SpringUtil.getBean(NginxAccessLogMapper.class);
nginxAccessLogMapper.createDataBase();
log.info("创建数据库:nginx_access_log_db");
nginxAccessLogMapper.createSuperTable();
log.info("创建超级表:nginx_access_log_st");
try {
InetAddress addr = InetAddress.getLocalHost();
String tableName = "t_" + addr.getHostAddress().replaceAll("\\.", "_");
nginxAccessLogMapper.createTable(tableName, IPV4util.ip2Int(addr.getHostAddress()));
log.info("创建表:{}", tableName);
} catch (UnknownHostException uhe) {
log.warn("未获取到本机 IPV4 地址");
nginxAccessLogMapper.createTable("t_ip_unknow", 0);
}
}
}
解析器业务逻辑
顶层是解析器接口(LogParser)
业务解析器直接实现解析器接口(AccessLogParser)
解析器代码
LogParser
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: Arch
* @Date: 2021-11-16 14:29:23
* @Description: nginx access.log日志解析器
*/
@SuppressWarnings({ "squid:S5861", "squid:S5361" })
@Slf4j
@Component
public class AccessLogParser implements LogParser {
@Autowired
private NginxAccessLogMapper nginxAccessLogMapper;
@Autowired
private ToolsTaosUrlDictMapper toolsTaosUrlDictMapper;
@SuppressWarnings({ "squid:S5856" })
public static final String SPEATOR = "]-\\[";
/** Taos Request Url 映射 */
private Map<String, Integer> urlDicts;
/** Taos Request Url 映射 */
private Map<String, String> convertUrlDicts;
private List<NginxAccessLog> accessLogs = new ArrayList<>();
@Override
@Async
public void parse(String logLine) {
if (StringUtils.isBlank(logLine)) {
return;
}
String[] logFields = logLine.split(SPEATOR);
// 加工 log
NginxAccessLog log = processLog(logFields);
this.addLog(log);
}
private synchronized void addLog(NginxAccessLog nginxAccessLog) {
this.accessLogs.add(nginxAccessLog);
// TODO 这个阈值应该要动态计算
if (accessLogs.size() > 200) {
this.flushLogs();
}
}
@Override
public synchronized void flushLogs() {
if (CollUtil.isEmpty(this.accessLogs)) {
return;
}
String tableName = "t_ip_unknow";
try {
InetAddress addr = InetAddress.getLocalHost();
tableName = "t_" + addr.getHostAddress().replaceAll("\\.", "_");
} catch (Exception e) {
log.info("获取 taos 表名时异常", e);
}
// 批量插入
nginxAccessLogMapper.batchInsert(this.accessLogs, tableName);
this.accessLogs.clear();
}
/**
* 加工 nginxAccessLog
*
* @param nginxAccessLog
*/
private NginxAccessLog processLog(String[] logFields) {
String requestStr = logFields[2];
String[] requestStrFields = requestStr.split(" ");
RequestMethodEnum requestMethod = RequestMethodEnum.getRequestMethod(requestStrFields[0]);
Integer requestUrl = 0;
synchronized (this) {
// 去除 url 中的参数部分
String originUrl = requestStrFields[1].split("\\?")[0];
Integer urlCode = this.urlDicts.get(originUrl);
if (null == urlCode) {
ToolsTaosUrlDict toolsTaosUrlDict = new ToolsTaosUrlDict().setRequestUrl(originUrl);
toolsTaosUrlDictMapper.insert(toolsTaosUrlDict);
this.urlDicts.put(toolsTaosUrlDict.getRequestUrl(), toolsTaosUrlDict.getRequestUrlCode());
this.convertUrlDicts.put(String.valueOf(toolsTaosUrlDict.getRequestUrlCode()),
toolsTaosUrlDict.getRequestUrl());
requestUrl = toolsTaosUrlDict.getRequestUrlCode();
} else {
requestUrl = urlCode;
}
}
return new NginxAccessLog().setTs(logFields[0]).setRemoteAddr(logFields[1])
.setRequestMethod(requestMethod.getRequestMethodCode())
.setRequestUrl(requestUrl)
.setRequestLength(logFields[3]).setBytesSent(logFields[4]).setStatus(logFields[5])
.setRequestTime(logFields[6]);
}
/**
* 初始化 url 映射缓存
*/
@Override
public void init() {
log.info("init url dicts");
List<ToolsTaosUrlDict> urls = toolsTaosUrlDictMapper.selectList(null);
this.urlDicts = urls.stream()
.collect(Collectors.toMap(ToolsTaosUrlDict::getRequestUrl, ToolsTaosUrlDict::getRequestUrlCode));
this.convertUrlDicts = urls.stream().collect(
Collectors.toMap(url -> String.valueOf(url.getRequestUrlCode()), ToolsTaosUrlDict::getRequestUrl));
log.info("url dicts: {}", urlDicts.size());
}
public Map<String, String> getConvertUrlDicts() {
return this.convertUrlDicts;
}
}
四、一些细节
1、数据压缩
将 IPV4 地址转换成 int 进行存储
将 requestPath 映射成 int 存储
2、缓存
在将 requestPath 转成 int 类型的 code 时,用到缓存的,直接看代码更清晰
在监控指标刷盘的时候会进行缓冲,到达阈值之后再进行刷盘
缓冲刷盘是存在问题的,就是最后一次没有到达阈值,然后又没有新增,那么最后一个的缓冲就不会进行刷盘,这个是在 AbstractLogMonitor 的 readBuffer 方法中做了控制,线程沉睡次数达到阈值之后会进行刷盘
3、异步
AccessLogParser 的解析操作使用的是 spring 提供的 @Async 进行的异步执行
版权声明: 本文为 InfoQ 作者【Arch】的原创文章。
原文链接:【http://xie.infoq.cn/article/fb4f6e3c1815914f218981dd0】。文章转载请联系作者。
Arch
还未添加个人签名 2019.02.18 加入
还未添加个人简介
评论