SpringCloud 网关实现线程池异步批量保存请求日志
- 2023-04-13 湖南
本文字数:6420 字
阅读完需:约 21 分钟
本文章实现的是线程池异步批量保存请求日志,实现的是数据库中保存日志数据
日志过滤器添加
首先是在网关服务中添加日志过滤器
@Log4j2
public class LogFilter implements GlobalFilter, Ordered {
private static final String START_TIME = "startTime";
private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Resource
VisitRecordService visitRecordService;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 请求方法
HttpMethod method = request.getMethod();
// 请求头
HttpHeaders headers = request.getHeaders();
// 设置startTime 用来计算响应的时间
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
// 构建日志记录
AccessRecord accessRecord = visitRecordService.build(exchange);
if (method != null) {
//设置请求方法
accessRecord.setMethod(method.name());
if (method == HttpMethod.GET) {
//获取get请求参数
MultiValueMap<String, String> formData = request.getQueryParams();
if (!formData.isEmpty()) {
//保存请求参数
accessRecord.setFormData(JSON.toJSONString(formData));
}
} else if (method == HttpMethod.POST) {
MediaType contentType = headers.getContentType();
if (contentType != null) {
Mono<Void> voidMono = null;
if (contentType.equals(MediaType.APPLICATION_JSON)) {
// JSON
voidMono = readBody(exchange, chain, accessRecord);
}
if (voidMono != null) {
//计算请求时间
cacueConsumTime(exchange);
return voidMono;
}
}
}
}
visitRecordService.put(exchange, accessRecord);
// 请求后执行保存
return chain.filter(exchange).then(saveRecord(exchange));
}
private Mono<Void> saveRecord(ServerWebExchange exchange) {
return Mono.fromRunnable(() -> {
cacueConsumTime(exchange);
});
}
/**
* 计算访问时间
*
* @param exchange
*/
private void cacueConsumTime(ServerWebExchange exchange) {
//请求开始时设置的自定义属性标识
Long startTime = exchange.getAttribute(START_TIME);
Long consumingTime = 0L;
if (startTime != null) {
consumingTime = System.currentTimeMillis() - startTime;
log.info(exchange.getRequest().getURI().getRawPath() + ": 耗时 " + consumingTime + "ms");
}
visitRecordService.add(exchange, consumingTime);
}
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
// 重写请求体,因为请求体数据只能被消费一次
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, messageReaders)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
accessRecord.setBody(objectValue);
visitRecordService.put(exchange, accessRecord);
}).then(
chain.filter(mutatedExchange)
);
});
}
@Override
public int getOrder() {
return 2;
}
}
HttpMessageReader 是用于读取 HTTP 消息的类。 ServerRequest.create 方法创建一个新的 ServerRequest 对象,该对象表示一个 HTTP 请求。 AccessRecord 是自定义的数据模型用来保存访问日志,代码如下:
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.net.URI;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "access_recoder_log")
public class AccessRecord implements Serializable {
private String formData;
private URI targetUri;
private String method;
private String scheme;
private String path;
private String body;
private String ip;
private Integer status;
private Long userId;
private Long consumingTime;
private LocalDateTime createTime;
}
VisitRecordService 就是我这里定义的异步保存日志的实现类
VisitRecordService 异步保存日志
ServerWebExchange 是 Spring WebFlux 中的一个接口,用于表示 HTTP 请求和响应的交换。它提供了访问请求和响应的方法,以及访问请求属性和响应属性的方法。可以使用它来处理 HTTP 请求和响应,例如修改请求头或响应体,或者将请求转发到另一个处理程序。
在过滤器的 filter 方法中获取到对应的 ServerWebExchange,再从其中读取访问信息
@Slf4j
@Service
public class VisitRecordService {
//自定义的一个标识
private final String attributeKey = "visitRecord";
/**
* 构建一个 VisitRecord 实体类,但仅适用于获取 request 信息
*
* @param exchange gateway访问
* @return 访问信息
*/
public AccessRecord build(ServerWebExchange exchange) {
// 获取请求信息
ServerHttpRequest request = exchange.getRequest();
String ip = RequestUtils.getIpAddress(request);
// 请求路径
String path = request.getPath().pathWithinApplication().value();
// 请求schema: http/https
String scheme = request.getURI().getScheme();
// 请求方法
HttpMethod method = request.getMethod();
// 路由服务地址
URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
// 请求头
HttpHeaders headers = request.getHeaders();
// 获取请求地址
InetSocketAddress remoteAddress = request.getRemoteAddress();
AccessRecord accessRecord = new AccessRecord();
accessRecord.setPath(path);
accessRecord.setScheme(scheme);
accessRecord.setTargetUri(targetUri);
accessRecord.setIp(ip);
accessRecord.setCreateTime(LocalDateTime.now());
return accessRecord;
}
/**
* 将访问信息存入 ServerWebExchange 当中,将会与当前请求关联起来,
* 以便于后续在任何地方均可获得
*
* @param exchange gateway访问合同
* @param visitRecord 访问信息
*/
public void put(ServerWebExchange exchange, AccessRecord visitRecord) {
Map<String, Object> attributes = exchange.getAttributes();
attributes.put(attributeKey, visitRecord);
}
}
然后在请求结束的时候保存一下日志
@Slf4j
@Service
public class VisitRecordService {
/**
* 缓存,在插入数据库前先存入此。
* 为防止数据被重复插入,故使用Set,但不能确保100%不会被重复存储。
*/
private HashSet<AccessRecord> visitCache = new HashSet<>();
/**
* 保存访问记录
*
* @param exchange gateway访问
* @param consumingTime 访问耗时
*/
public void add(ServerWebExchange exchange, Long consumingTime) {
// 获取数据
ServerHttpResponse response = exchange.getResponse();
ServerHttpRequest request = exchange.getRequest();
//获取保存的日志记录体
AccessRecord visitRecord = getOrBuild(exchange);
//设置访问时间 单位毫秒
visitRecord.setConsumingTime(consumingTime);
// 设置访问状态
if (response.getStatusCode() != null) {
visitRecord.setStatus(response.getStatusCode().value());
}
//设置访问的用户ID 我这里是保存在请求头中
String userId = request.getHeaders().getFirst("userId");
if(StringUtils.isNotEmpty(userId)) {
visitRecord.setUserId(Long.parseLong(userId));
}
// 打印访问情况
log.info(visitRecord.toString());
// 添加记录到缓存中
visitCache.add(visitRecord);
// 执行任务,保存数据
doTask();
}
}
doTask 在这里是使用线程池异步执行日志保存:
/**
* 信号量,用于标记当前是否有任务正在执行,{@code true}表示当前无任务进行。
*/
private volatile boolean taskFinish = true;
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
/**
* 单次批量插入的数据量
*/
private final int BATCH_SIZE = 500;
private void doTask() {
if (taskFinish) {
// 当前没有任务的情况下,加锁并执行任务
synchronized (this) {
if (taskFinish) {
taskFinish = false;
threadPool.execute(() -> {
try {
// 当数据量较小时,则等待一段时间再插入数据,从而做到将数据尽可能的批量插入数据库
if (visitCache.size() <= BATCH_SIZE) {
Thread.sleep(500);
}
//批量保存
batchSave();
} catch (InterruptedException e) {
log.error("睡眠时发生了异常: {}", e.getMessage());
} finally {
// 任务执行完毕后修改标志位
taskFinish = true;
}
});
}
}
}
}
ThreadPoolExecutor 是 Java 中的一个线程池实现,用于管理和复用线程,以提高应用程序的性能和响应能力。
它可以控制线程的数量,避免线程过多导致的资源浪费和性能下降,同时也可以避免线程不足导致的任务等待和响应延迟。
通过 ThreadPoolExecutor,我们可以将任务提交给线程池,由线程池中的线程来执行任务,从而实现任务的异步执行和并发处理。
然后 batchSave() 方法就是具体的实现数据保存:
@Slf4j
@Service
public class VisitRecordService {
@Resource
VisitLogService visitLogService;
/**
* 缩减因子,每次更新缓存Set时缩小的倍数,对应HashSet的扩容倍数
*/
private final float REDUCE_FACTOR = 0.5f;
private void batchSave() {
log.debug("访问记录准备插入数据库,当前数据量:{}", visitCache.size());
if (visitCache.size() == 0) {
return;
}
// 构造新对象来存储数据,旧对象保存到数据库后不再使用
HashSet<AccessRecord> oldCache = visitCache;
visitCache = new HashSet<>((int) (oldCache.size() * REDUCE_FACTOR));
boolean isSave = false;
try {
//批量保存
isSave = visitLogService.saveBatch(oldCache, BATCH_SIZE);
} finally {
if (!isSave) {
// 如果插入失败,则重新添加所有数据
visitCache.addAll(oldCache);
}
}
}
}
VisitLogService 就是 mybatis 的正常的数据增删改查范畴,我这里的定义如下:
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.biglead.gateway.pojo.AccessRecord;
import com.biglead.gateway.mapper.VisitLogMapper;
import org.springframework.stereotype.Service;
/**
* 访问日志Service类
*/
@Service
public class VisitLogService extends ServiceImpl<VisitLogMapper, AccessRecord> {
}
VisitLogMapper 定义如下:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.biglead.gateway.pojo.AccessRecord;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface VisitLogMapper extends BaseMapper<AccessRecord> {
}
然后启动项目,访问接口数据,就可以自动记录到数据库中
对应的 sql:
CREATE TABLE `access_recoder_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`form_data` varchar(500) DEFAULT NULL,
`body` varchar(500) DEFAULT NULL,
`path` varchar(255) DEFAULT NULL,
`ip` varchar(255) DEFAULT NULL,
`status` int(11) DEFAULT NULL,
`user_id` bigint(20) DEFAULT NULL,
`scheme` varchar(255) DEFAULT NULL,
`method` varchar(255) DEFAULT NULL,
`consuming_time` bigint(20) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8 COMMENT='日志记录表';
最后就是源码了:https://gitee.com/android.long/spring-cloud-biglead/tree/master/biglead-api-11-admin
Java你猿哥
一只在编程路上渐行渐远的程序猿 2023-03-09 加入
关注我,了解更多Java、架构、Spring等知识
评论