Spring 应用中实现异步
Spring 为任务调度与异步方法执行提供了注解支持。通过在方法或类上设置 @Async 注解,可使得方法被异步调用。调用者会在调用时立即返回,而被调用方法的实际执行是交给 Spring 的 TaskExecutor 来完成的。所以被注解的方法被调用的时候,会在新的线程中执行,而调用它的方法会在原线程中执行,这样可以避免阻塞,以及保证任务的实时性。
简单回顾相关配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
复制代码
@EnableAsync 添加配置类
入口类增加了 @EnableAsync 注解,主要是为了扫描范围包下的所有 @Async 注解。
异步调用,通过开启新的线程调用的方法,不影响主线程。异步方法实际的执行交给了 Spring 的 TaskExecutor 来完成。
Future 获取异步执行的结果
public class AsyncResult<V> implements ListenableFuture<V> {
private final V value;
private final ExecutionException executionException;
//...
}
复制代码
AsyncResult 实现了 ListenableFuture 接口,该对象内部有两个属性:返回值和异常信息。
public interface ListenableFuture<T> extends Future<T> {
void addCallback(ListenableFutureCallback<? super T> var1);
void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}
复制代码
ListenableFuture 接口继承自 Future,在此基础上增加了回调方法的定义。Future 接口定义如下:
public interface Future<V> {
// 是否可以打断当前正在执行的任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务取消的结果
boolean isCancelled();
// 异步方法中最后返回的那个对象中的值
V get() throws InterruptedException, ExecutionException;
// 用来判断该异步任务是否执行完成,如果执行完成,则返回 true,如果未执行完成,则返回false
boolean isDone();
// 与 get() 一样,只不过这里参数中设置了超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
get()方法,在执行的时候是需要等待回调结果的,阻塞等待。如果不设置超时时间,它就阻塞在那里直到有了任务执行完成。我们设置超时时间,就可以在当前任务执行太久的情况下中断当前任务,释放线程,这样就不会导致一直占用资源。
cancel(boolean) 方法,参数是一个 boolean 类型的值,用来传入是否可以打断当前正在执行的任务。如果参数是 true 且当前任务没有执行完成 ,说明可以打断当前任务,那么就会返回 true;
如果当前任务还没有执行,那么不管参数是 true 还是 false,返回值都是 true;
如果当前任务已经完成,那么不管参数是 true 还是 false,那么返回值都是 false;
如果当前任务没有完成且参数是 false,那么返回值也是 false。
意思就是:
如果任务还没执行,那么如果想取消任务,就一定返回 true,与参数无关。
如果任务已经执行完成,那么任务一定是不能取消的,所以此时返回值都是 false,与参数无关。
如果任务正在执行中,那么此时是否取消任务就看参数是否允许打断(true/false)。
获取异步方法返回值的实现
public Future<String> test() throws Exception {
log.info("开始做任务");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任务,耗时:" + (end - start) + "毫秒");
return new AsyncResult<>("任务完成,耗时" + (end - start) + "毫秒");
}
复制代码
我们将 task 方法的返回值改为 Future<String>,将执行的时间拼接为字符串返回。
@GetMapping("/task")
public String taskExecute() {
try {
Future<String> r1 = taskService.test();
Future<String> r2 = taskService.test();
Future<String> r3 = taskService.test();
while (true) {
if (r1.isDone() && r2.isDone() && r3.isDone()) {
log.info("execute all tasks");
break;
}
Thread.sleep(200);
}
log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
} catch (Exception e) {
log.error("error executing task for {}",e.getMessage());
}
return "ok";
}
复制代码
另一种异步回调结果获取实现
统计一下三个任务并发执行共耗时多少,这就需要等到上述三个函数都完成调动之后记录时间,并计算结果。
也可以使用 CompletableFuture<T>来返回异步调用的结果
@Async
public CompletableFuture<String> doTaskOne() throws Exception {
log.info("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(random.nextInt(10000));
long end = System.currentTimeMillis();
log.info("完成任务一,耗时:" + (end - start) + "毫秒");
return CompletableFuture.completedFuture("任务一完成");
}
复制代码
按照如上方式改造一下其他两个异步函数之后,下面我们改造一下测试用例,让测试在等待完成三个异步调用之后来做一些其他事情。
@Test
public void test() throws Exception {
long start = System.currentTimeMillis();
CompletableFuture<String> task1 = asyncTasks.test();
CompletableFuture<String> task2 = asyncTasks.test();
CompletableFuture<String> task3 = asyncTasks.test();
CompletableFuture.allOf(task1, task2, task3).join();
long end = System.currentTimeMillis();
log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}
复制代码
在调用三个异步函数的时候,返回 CompletableFuture<String>类型的结果对象
通过 CompletableFuture.allOf(task1, task2, task3).join()实现三个异步任务都结束之前的阻塞效果
三个任务都完成之后,根据结束时间 - 开始时间,计算出三个任务并发执行的总耗时。
配置线程池
前面是最简单的使用方法,使用默认的 TaskExecutor。如果想使用自定义的 Executor,可以结合 @Configuration 注解的配置方式,Spring 基本上共有五大线程池。
SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
ConcurrentTaskExecutor:Executor 的适配类,不推荐使用。如果 ThreadPoolTaskExecutor 不满足要求时,才用考虑使用这个类
SimpleThreadPoolTaskExecutor:是 Quartz 的 SimpleThreadPool 的类。线程池同时被 quartz 和非 quartz 使用,才需要使用此类
ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对 java.util.concurrent.ThreadPoolExecutor 的包装,
public ThreadPoolTaskExecutor FebsShiroThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(5);
//配置最大线程数
executor.setMaxPoolSize(20);
//配置队列大小
executor.setQueueCapacity(200);
//线程池维护线程所允许的空闲时间
executor.setKeepAliveSeconds(30);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix(ConstantFiledUtil.KMALL_THREAD_NAME_PREFIX);
//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setWaitForTasksToCompleteOnShutdown(true);
//设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(60);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
复制代码
线程池的配置很灵活,对核心线程数、最大线程数等属性进行配置。其中,rejection-policy,当线程池已经达到最大线程数的时候,如何处理新任务。可选策略有 CallerBlocksPolicy、CallerRunsPolicy 等。CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行。我们验证下,线程池的设置是否生效,在 TaskService 中,打印当前的线程名称:
public Future<String> doExecute() throws Exception {
log.info("开始做任务一");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任务一,耗时:" + (end - start) + "毫秒");
log.info("当前线程为 {}", Thread.currentThread().getName());
return new AsyncResult<>("任务一完成,耗时" + (end - start) + "毫秒");
}
复制代码
在 Spring @Async 异步线程使用过程中,需要注意的是以下的用法会使 @Async 失效:
异步方法使用 static 修饰;
异步类没有使用 @Component 注解(或其他注解)导致 Spring 无法扫描到异步类;
异步方法不能与被调用的异步方法在同一个类中;
类中需要使用 @Autowired 或 @Resource 等注解自动注入,不能手动 new 对象;
如果使用 Spring Boot 框架必须在启动类中增加 @EnableAsync 注解。
线程上下文信息传递
微服务架构中的一次请求会涉及多个微服务。或者一个服务中会有多个处理方法,这些方法有可能是异步方法。有些线程上下文信息,如请求的路径,用户唯一的 userId,这些信息会一直在请求中传递。如果不做任何处理,我们看下是否能够正常获取这些信息。
如果理由 RequestContextHolder 中的请求信息时,报了空指针异常。这说明了请求的上下文信息未传递到异步方法的线程中。RequestContextHolder 的实现,里面有两个 ThreadLocal 保存当前线程下的 request
//得到存储进去的request
private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
new NamedThreadLocal<RequestAttributes>("Request attributes");
//可被子线程继承的request
private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
new NamedInheritableThreadLocal<RequestAttributes>("Request context");
复制代码
如何将上下文信息传递到异步线程呢?
Spring 中的 ThreadPoolTaskExecutor 有一个配置属性 TaskDecorator,TaskDecorator 是一个回调接口,采用装饰器模式。
装饰模式是动态的给一个对象添加一些额外的功能,就增加功能来说,装饰模式比生成子类更为灵活。因此 TaskDecorator 主要用于任务的调用时设置一些执行上下文,或者为任务执行提供一些监视/统计。
public interface TaskDecorator {
Runnable decorate(Runnable runnable);
}
复制代码
decorate 方法,装饰给定的 Runnable,返回包装的 Runnable 以供实际执行。
下面我们定义一个线程上下文拷贝的 TaskDecorator。
public class ThreadLocalDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
return () -> {
try {
RequestContextHolder.setRequestAttributes(context);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
}
};
}
}
复制代码
实现较为简单,将当前线程的 context 装饰到指定的 Runnable,最后重置当前线程,上下文在线程池的配置中,增加回调的 TaskDecorator 属性的配置:
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
// 增加 TaskDecorator 属性的配置
executor.setTaskDecorator(new ThreadLocalDecorator());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
复制代码
异步方法返回类型只能有两种:
当返回类型为 void 的时候,方法调用过程产生的异常不会抛到调用者层面,可以通过注 AsyncUncaughtExceptionHandler 来捕获此类异常
当返回类型为 Future 的时候,方法调用过程产生的异常会抛到调用者层面
注意:如果不自定义异步方法的线程池默认使用 SimpleAsyncTaskExecutor。
SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用会创建一个新的线程。并发大的时候会产生严重的性能问题。
评论