技术分享 | SpringBoot 流式输出时,正常输出后为何突然报错?
- 2024-05-31 广东
本文字数:3886 字
阅读完需:约 13 分钟
一个 SpringBoot 项目同时使用了 Tomcat 的过滤器和 Spring 的拦截器,一些线程变量在过滤器中初始化并在拦截器中使用。
该项目需要调用大语言模型进行流式输出。
项目中,笔者使用 SpringBoot 的
ResponseEntity<StreamingResponseBody>
将流式输出返回前端。
问题出现
问题出现在上述第 3 点:正常输出一段内容后,后台突然报错,而报错内容由拦截器产生。
笔者仔细查看了报错日志,发现只是拦截器的问题:执行时由于某些线程变量不存在而报错。但是,这些线程变量已经在过滤器中初始化了。
那么问题来了:为什么这个接口明明可以正常通过过滤器和拦截器,并开始正常输出,却又突然在拦截器中报错呢?
场景重现
Filter
@Slf4j
@Component
@Order(1)
public class MyFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
// 要继续处理请求,必须添加 filterChain.doFilter()
log.info("doFilter method is running..., thread: {}, dispatcherType: {}", Thread.currentThread(), servletRequest.getDispatcherType());
filterChain.doFilter(servletRequest,servletResponse);
}
}
Interceptor
@Slf4j
public class MyInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(@NotNull HttpServletRequest request, @NotNull HttpServletResponse response, @NotNull Object handler) throws Exception {
log.info("preHandle method is running..., thread: {}, dispatcherType: {}", Thread.currentThread(), request.getDispatcherType());
if (DispatcherType.ASYNC == request.getDispatcherType()) {
log.info("preHandle dispatcherType={}", request.getDispatcherType());
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
log.info("postHandle method is running..., thread: {}", Thread.currentThread());
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
log.info("afterCompletion method is running..., thread: {}", Thread.currentThread());
}
}
WebMvcConfigurer
@Configuration
public class WebAppConfigurer implements WebMvcConfigurer {
@Bean
public MyInterceptor myInterceptor() {
return new MyInterceptor();
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(myInterceptor()).addPathPatterns("/**");
}
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setDefaultTimeout(120_000L);
configurer.registerCallableInterceptors();
configurer.registerDeferredResultInterceptors();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("web-async-");
executor.initialize();
configurer.setTaskExecutor(executor);
}
}
Controller
@Slf4j
@RestController
@RequestMapping("/test-stream")
public class TestStreamController {
@ApiOperation("流式输出示例")
@PostMapping(value = "/example", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<StreamingResponseBody> example() {
log.info("Stream method is running, thread: {}", Thread.currentThread());
return ResponseEntity.status(HttpStatus.OK)
.contentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8))
.body(outputStream -> {
log.info("Internal stream method is running, thread: {}", Thread.currentThread());
try (outputStream) {
String msg = "To be or not to be!";
outputStream.write(msg.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
}
});
}
}
根据以下运行日志,我们可以看到拦截器的 preHandle
确实执行了两次,并且此次调用过程共有 3 个线程(io-14000-exec-1
,web-async-1
,io-14000-exec-2
)参与了工作。
2024-05-06 07:35:27.362 INFO 209108 --- [io-14000-exec-1] o.a.c.c.C.[.[localhost].[/java-study] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-05-06 07:35:27.362 INFO 209108 --- [io-14000-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2024-05-06 07:35:27.365 INFO 209108 --- [io-14000-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
2024-05-06 07:35:27.402 INFO 209108 --- [io-14000-exec-1] com.peng.java.study.web.config.MyFilter : doFilter method is running..., thread: Thread[http-nio-14000-exec-1,5,main], dispatcherType: REQUEST
2024-05-06 07:35:28.107 INFO 209108 --- [io-14000-exec-1] c.p.java.study.web.config.MyInterceptor : preHandle method is running..., thread: Thread[http-nio-14000-exec-1,5,main], dispatcherType: REQUEST
2024-05-06 07:35:28.121 INFO 209108 --- [io-14000-exec-1] c.p.j.s.w.r.test.TestStreamController : Stream method is running, thread: Thread[http-nio-14000-exec-1,5,main]
2024-05-06 07:35:28.152 INFO 209108 --- [ web-async-1] c.p.j.s.w.r.test.TestStreamController : Internal stream method is running, thread: Thread[web-async-1,5,main]
2024-05-06 07:35:28.167 INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor : preHandle method is running..., thread: Thread[http-nio-14000-exec-2,5,main], dispatcherType: ASYNC
2024-05-06 07:35:28.167 INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor : preHandle dispatcherType=ASYNC
2024-05-06 07:35:28.174 INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor : postHandle method is running..., thread: Thread[http-nio-14000-exec-2,5,main]
2024-05-06 07:35:28.183 INFO 209108 --- [io-14000-exec-2] c.p.java.study.web.config.MyInterceptor : afterCompletion method is running..., thread: Thread[http-nio-14000-exec-2,5,main]
问题分析
1. 方法调用流程的差异
众所周知,SpringBoot 的普通输出接口调用流程图如图 1 所示。
结合日志,我们可以简单画出流式输出接口对应的流程图(图 2)。
2. 线程的差异
普通接口的执行时序图如图 3 所示。
而流式接口的时序图如图 4 所示。
解决问题
通过分析,对流式输出的情况提出两种解决方案:
将过滤器中的部分业务逻辑迁移到拦截器中。
根据条件,跳过第二次的拦截器 preHandle 方法。
笔者选择了第二个方案,实现代码如下。
@Slf4j
public class MyInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(@NotNull HttpServletRequest request, @NotNull HttpServletResponse response, @NotNull Object handler) throws Exception {
log.info("preHandle method is running..., thread: {}, dispatcherType: {}", Thread.currentThread(), request.getDispatcherType());
// 如果是异步请求,则跳过
if (DispatcherType.ASYNC == request.getDispatcherType()) {
log.info("preHandle dispatcherType={}", request.getDispatcherType());
return true;
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
log.info("postHandle method is running..., thread: {}", Thread.currentThread());
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
log.info("afterCompletion method is running..., thread: {}", Thread.currentThread());
}
}
需要注意,请求线程和回调线程都需考虑清理线程变量,不然会导致内存泄漏。
了解更多技术干货、研发管理实践等分享,请关注 LigaAI。
版权声明: 本文为 InfoQ 作者【LigaAI】的原创文章。
原文链接:【http://xie.infoq.cn/article/8078f5a38577e1cf6262beaf5】。未经作者许可,禁止转载。
LigaAI
新一代智能研发协作平台 2021-02-23 加入
AI赋能工作场景,想要做最懂开发者的智能研发管理平台~
评论