写点什么

SpringCloud 网关实现线程池异步批量保存请求日志

  • 2023-04-13
    湖南
  • 本文字数:6482 字

    阅读完需:约 21 分钟

本文章实现的是 线程池异步批量保存请求日志,实现的是数据库中保存日志数据

日志过滤器添加

首先是在网关服务中添加日志过滤器:


@Log4j2public class LogFilter implements GlobalFilter, Ordered { private static final String START_TIME = "startTime"; private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Resource VisitRecordService visitRecordService;
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest(); // 请求方法 HttpMethod method = request.getMethod(); // 请求头 HttpHeaders headers = request.getHeaders(); // 设置startTime 用来计算响应的时间 exchange.getAttributes().put(START_TIME, System.currentTimeMillis()); // 构建日志记录 AccessRecord accessRecord = visitRecordService.build(exchange);
if (method != null) { //设置请求方法 accessRecord.setMethod(method.name()); if (method == HttpMethod.GET) { //获取get请求参数 MultiValueMap<String, String> formData = request.getQueryParams(); if (!formData.isEmpty()) { //保存请求参数 accessRecord.setFormData(JSON.toJSONString(formData)); } } else if (method == HttpMethod.POST) { MediaType contentType = headers.getContentType(); if (contentType != null) { Mono<Void> voidMono = null; if (contentType.equals(MediaType.APPLICATION_JSON)) { // JSON voidMono = readBody(exchange, chain, accessRecord); } if (voidMono != null) { //计算请求时间 cacueConsumTime(exchange);
return voidMono; } } } }
visitRecordService.put(exchange, accessRecord); // 请求后执行保存 return chain.filter(exchange).then(saveRecord(exchange)); }
private Mono<Void> saveRecord(ServerWebExchange exchange) { return Mono.fromRunnable(() -> { cacueConsumTime(exchange); });
}
/** * 计算访问时间 * * @param exchange */ private void cacueConsumTime(ServerWebExchange exchange) { //请求开始时设置的自定义属性标识 Long startTime = exchange.getAttribute(START_TIME); Long consumingTime = 0L; if (startTime != null) { consumingTime = System.currentTimeMillis() - startTime; log.info(exchange.getRequest().getURI().getRawPath() + ": 耗时 " + consumingTime + "ms"); } visitRecordService.add(exchange, consumingTime); }

private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(bytes); DataBufferUtils.release(dataBuffer); Flux<DataBuffer> cachedFlux = Flux.defer(() -> { DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes); DataBufferUtils.retain(buffer); return Mono.just(buffer); });

// 重写请求体,因为请求体数据只能被消费一次 ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { @Override public Flux<DataBuffer> getBody() { return cachedFlux; } };
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, messageReaders) .bodyToMono(String.class) .doOnNext(objectValue -> { accessRecord.setBody(objectValue); visitRecordService.put(exchange, accessRecord); }).then( chain.filter(mutatedExchange) ); }); }
@Override public int getOrder() { return 2; } }
复制代码

HttpMessageReader 是用于读取 HTTP 消息的类。 ServerRequest.create 方法创建一个新的 ServerRequest 对象,该对象表示一个 HTTP 请求。 AccessRecord 是自定义的数据模型用来保存访问日志,代码如下:


import com.baomidou.mybatisplus.annotation.TableName;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;
import java.io.Serializable;import java.net.URI;import java.time.LocalDateTime;
@Data@AllArgsConstructor@NoArgsConstructor@TableName(value = "access_recoder_log")public class AccessRecord implements Serializable { private String formData; private URI targetUri; private String method; private String scheme; private String path; private String body; private String ip; private Integer status; private Long userId; private Long consumingTime; private LocalDateTime createTime;}
复制代码

VisitRecordService 就是我这里定义的异步保存日志的实现类

VisitRecordService 异步保存日志

ServerWebExchange 是 Spring WebFlux 中的一个接口,用于表示 HTTP 请求和响应的交换。它提供了访问请求和响应的方法,以及访问请求属性和响应属性的方法。可以使用它来处理 HTTP 请求和响应,例如修改请求头或响应体,或者将请求转发到另一个处理程序。


在过滤器的 filter 方法中获取到对应的 ServerWebExchange,再从其中读取访问信息。

@Slf4j@Servicepublic class VisitRecordService {    //自定义的一个标识    private final String attributeKey = "visitRecord";    /**     * 构建一个 VisitRecord 实体类,但仅适用于获取 request 信息     *     * @param exchange gateway访问     * @return 访问信息     */    public AccessRecord build(ServerWebExchange exchange) {        // 获取请求信息        ServerHttpRequest request = exchange.getRequest();        String ip = RequestUtils.getIpAddress(request);        // 请求路径        String path = request.getPath().pathWithinApplication().value();        // 请求schema: http/https        String scheme = request.getURI().getScheme();        // 请求方法        HttpMethod method = request.getMethod();        // 路由服务地址        URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);        // 请求头        HttpHeaders headers = request.getHeaders();        // 获取请求地址        InetSocketAddress remoteAddress = request.getRemoteAddress();
AccessRecord accessRecord = new AccessRecord(); accessRecord.setPath(path); accessRecord.setScheme(scheme); accessRecord.setTargetUri(targetUri); accessRecord.setIp(ip); accessRecord.setCreateTime(LocalDateTime.now()); return accessRecord; } /** * 将访问信息存入 ServerWebExchange 当中,将会与当前请求关联起来, * 以便于后续在任何地方均可获得 * * @param exchange gateway访问合同 * @param visitRecord 访问信息 */ public void put(ServerWebExchange exchange, AccessRecord visitRecord) { Map<String, Object> attributes = exchange.getAttributes(); attributes.put(attributeKey, visitRecord); } }
复制代码

然后在请求结束的时候保存一下日志

@Slf4j@Servicepublic class VisitRecordService {    /**     * 缓存,在插入数据库前先存入此。     * 为防止数据被重复插入,故使用Set,但不能确保100%不会被重复存储。     */    private HashSet<AccessRecord> visitCache = new HashSet<>();    /**     * 保存访问记录     *     * @param exchange      gateway访问     * @param consumingTime  访问耗时     */    public void add(ServerWebExchange exchange, Long consumingTime) {        // 获取数据        ServerHttpResponse response = exchange.getResponse();        ServerHttpRequest request = exchange.getRequest();        //获取保存的日志记录体        AccessRecord visitRecord = getOrBuild(exchange);        //设置访问时间 单位毫秒        visitRecord.setConsumingTime(consumingTime);
// 设置访问状态 if (response.getStatusCode() != null) { visitRecord.setStatus(response.getStatusCode().value()); } //设置访问的用户ID 我这里是保存在请求头中 String userId = request.getHeaders().getFirst("userId"); if(StringUtils.isNotEmpty(userId)) { visitRecord.setUserId(Long.parseLong(userId)); } // 打印访问情况 log.info(visitRecord.toString()); // 添加记录到缓存中 visitCache.add(visitRecord); // 执行任务,保存数据 doTask(); }}
复制代码

doTask 在这里是使用线程池异步执行日志保存

    /**     * 信号量,用于标记当前是否有任务正在执行,{@code true}表示当前无任务进行。     */    private volatile boolean taskFinish = true;    private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 3, 15, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());    /**     * 单次批量插入的数据量     */    private final int BATCH_SIZE = 500;        private void doTask() {        if (taskFinish) {            // 当前没有任务的情况下,加锁并执行任务            synchronized (this) {                if (taskFinish) {                    taskFinish = false;                    threadPool.execute(() -> {                        try {                            // 当数据量较小时,则等待一段时间再插入数据,从而做到将数据尽可能的批量插入数据库                            if (visitCache.size() <= BATCH_SIZE) {                                Thread.sleep(500);                            }                            //批量保存                            batchSave();                        } catch (InterruptedException e) {                            log.error("睡眠时发生了异常: {}", e.getMessage());                        } finally {                            // 任务执行完毕后修改标志位                            taskFinish = true;                        }                    });                }            }        }    }
复制代码

ThreadPoolExecutor 是 Java 中的一个线程池实现,用于管理和复用线程,以提高应用程序的性能和响应能力。


它可以控制线程的数量,避免线程过多导致的资源浪费和性能下降,同时也可以避免线程不足导致的任务等待和响应延迟。


通过 ThreadPoolExecutor,我们可以将任务提交给线程池,由线程池中的线程来执行任务,从而实现任务的异步执行和并发处理。


然后 batchSave() 方法就是具体的实现数据保存

@Slf4j@Servicepublic class VisitRecordService {    @Resource    VisitLogService visitLogService;    /**     * 缩减因子,每次更新缓存Set时缩小的倍数,对应HashSet的扩容倍数     */    private final float REDUCE_FACTOR = 0.5f;
private void batchSave() { log.debug("访问记录准备插入数据库,当前数据量:{}", visitCache.size()); if (visitCache.size() == 0) { return; } // 构造新对象来存储数据,旧对象保存到数据库后不再使用 HashSet<AccessRecord> oldCache = visitCache; visitCache = new HashSet<>((int) (oldCache.size() * REDUCE_FACTOR)); boolean isSave = false; try { //批量保存 isSave = visitLogService.saveBatch(oldCache, BATCH_SIZE); } finally { if (!isSave) { // 如果插入失败,则重新添加所有数据 visitCache.addAll(oldCache); } } } }
复制代码

VisitLogService 就是 mybatis 的正常的数据增删改查范畴,我这里的定义如下:

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.biglead.gateway.pojo.AccessRecord;import com.biglead.gateway.mapper.VisitLogMapper;import org.springframework.stereotype.Service;
/** * 访问日志Service类 */@Servicepublic class VisitLogService extends ServiceImpl<VisitLogMapper, AccessRecord> {
}
复制代码

VisitLogMapper 定义如下:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;import com.biglead.gateway.pojo.AccessRecord;import org.apache.ibatis.annotations.Mapper;
@Mapperpublic interface VisitLogMapper extends BaseMapper<AccessRecord> {}
复制代码

然后启动项目,访问接口数据,就可以自动记录到数据库中 

对应的 sql:

CREATE TABLE `access_recoder_log` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `form_data` varchar(500) DEFAULT NULL,  `body` varchar(500) DEFAULT NULL,  `path` varchar(255) DEFAULT NULL,  `ip` varchar(255) DEFAULT NULL,  `status` int(11) DEFAULT NULL,  `user_id` bigint(20) DEFAULT NULL,  `scheme` varchar(255) DEFAULT NULL,  `method` varchar(255) DEFAULT NULL,  `consuming_time` bigint(20) DEFAULT NULL,  `create_time` datetime DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8 COMMENT='日志记录表';
复制代码


作者:早起的年轻人

链接:https://juejin.cn/post/7221093591295311930

来源:稀土掘金

用户头像

还未添加个人签名 2021-07-28 加入

公众号:该用户快成仙了

评论

发布
暂无评论
SpringCloud 网关实现线程池异步批量保存请求日志_Java_做梦都在改BUG_InfoQ写作社区