我的心血全在这了,这种方式讲 @Async 原理,你别再不懂 Spring 了
1.前言想你在看这篇文章之前有过使用 @Async 注解进行任务异步处理的经历,在项目开发过程中,针对非主流程、非实时、耗时的任务,往往会进行异步处理,这样既不会影响主流程,还会提高主流程的响应时间。
在使用 @Async 注解进行异步处理的过程中,相信你也踩过不少的坑,比如:任务并没有异步执行,由于共用线程池导致任务之间相互影响、异步任务出现异常不知道如何处理等等。今天我将带着你去了解它的真面目,以便下次再遇到问题的时候可以游刃有余,不至于慌慌张张、无从下手。
2.探秘之旅 2.1 实现原理
2.1.1 寻找异步注解后置处理器
你应该知道,要想在项目中使用 @Async 注解来执行异步任务,需要我们手动去开启异步功能,开启的方式就是需要添加 @EnableAsync
@SpringBootApplication@EnableAsyncpublic class SpringBootAsyncApplication {
}既然通过 @EnableAsync 注解可以开启异步功能,那么该注解就是我们探秘的入口
进入 @EnableAsync 注解,就会看到另一个熟悉的注解 @Import,该注解的功能就是在程序中引入相关功能对应的配置类
@Import(AsyncConfigurationSelector.class)public @interface EnableAsync {}点开 AsyncConfigurationSelector,可以看到此次引入的是 ProxyAsyncConfiguration 配置类
public String[] selectImports(AdviceMode adviceMode) {switch (adviceMode) {case PROXY:return new String[] {ProxyAsyncConfiguration.class.getName()};case ASPECTJ:return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};default:return null;}}进入 ProxyAsyncConfiguration 配置类
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public AsyncAnnotationBeanPostProcessor asyncAdvisor() {Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();bpp.configure(this.executor, this.exceptionHandler);Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {bpp.setAsyncAnnotationType(customAsyncAnnotation);}bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));return bpp;}可以看到 ProxyAsyncConfiguration 配置类中声明 AsyncAnnotationBeanPostProcessor 这样一个 Bean,从字面意思也可以猜出该 Bean 应该就是异步处理的主角,接下来就来看看这个主角做了哪些工作
进入 AsyncAnnotationBeanPostProcessor 中,可以看到该类实现了 BeanFactoryAware、BeanPostProcessor 这两个与 Bean 生命周期息息相关的接口,由 Bean 的生命周期特性可以得知 BeanFactoryAware 接口的实现方法先于 BeanPostProcessor 接口的实现方法执行。
2.1.2 BeanFactoryAware 实现
2.1.2.1 定义切面
@Overridepublic void setBeanFactory(BeanFactory beanFactory) {super.setBeanFactory(beanFactory);
}在 setBeanFactory()实现方法中定义了切面对象,看到切面这两个字,相信你的脑海中会立马浮现出与之有关的两个概念:切点、通知
切点:用来声明切入的目标通知:针对切入目标的相应处理 2.1.3 定义切点
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);asyncAnnotationTypes.add(Async.class);try {asyncAnnotationTypes.add((Class<? extends Annotation>)ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));}catch (ClassNotFoundException ex) {// If EJB 3.1 API not present, simply ignore.}protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {ComposablePointcut result = null;for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {// 定义在类上标注 @Async、@Asynchronous 注解的切点 Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);// 定义在方法上标注 @Async、@Asynchronous 注解的切点 Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);if (result == null) {result = new ComposablePointcut(cpc);}else {result.union(cpc);}result = result.union(mpc);}return (result != null ? result : Pointcut.TRUE);}2.1.4 定义通知
protected Advice buildAdvice(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {// 定义通知 AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);interceptor.configure(executor, exceptionHandler);return interceptor;}通知就是最终要执行的,也是相当重要的一部分,既然很重要,那就需要我们来看看具体的实现
public Object invoke(final MethodInvocation invocation) throws Throwable {Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
...return null;};
}protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {// 异步任务的返回值类型是 CompletableFutureif (CompletableFuture.class.isAssignableFrom(returnType)) {return CompletableFuture.supplyAsync(() -> {try {return task.call();}catch (Throwable ex) {throw new CompletionException(ex);}}, executor);}// 异步任务的返回值类型是 ListenableFutureelse if (ListenableFuture.class.isAssignableFrom(returnType)) {return ((AsyncListenableTaskExecutor) executor).submitListenable(task);}// 异步任务的返回值类型是 Futureelse if (Future.class.isAssignableFrom(returnType)) {return executor.submit(task);}// 否则交由线程池来处理,没有返回值 else {executor.submit(task);return null;}}通知的具体实现如下:
第一步获取异步任务线程池,用来执行异步任务使用 Callable 包裹目标方法执行异步异步任务,根据不同的返回值类型做相应的处理通过通知可以了解到异步任务最终实现的原理,你可能还有疑问,那就是如何告知通知来执行异步任务呢?
不知道,你是否还记得上文提到的 BeanPostProcessor 接口,下面就来看看它的具体实现
2.1.3 BeanPostProcessor 实现
提到 BeanPostProcessor 接口,你就应该立刻意识到它的处理方法肯定对 Bean 做了某些处理,比如生成代理
有了基础的意识后就来看看此处对应的后置处理实现
@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) {// 判断当前 Bean 是否满足之前定义的切点,如果满足则生成代理对象 if (isEligible(bean, beanName)) {ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);if (!proxyFactory.isProxyTargetClass()) {evaluateProxyInterfaces(bean.getClass(), proxyFactory);}proxyFactory.addAdvisor(this.advisor);customizeProxyFactory(proxyFactory);return proxyFactory.getProxy(getProxyClassLoader());}
}通过 BeanPostProcessor 的后置处理对满足切点的 Bean 生成代理,在调用目标方法的时候,会执行通知的 invoke()方法
到此,异步实现原理部分就结束了,其实原理很简单。我们需要做的就是定义切点、通知;要想实现对目标方法的增强,自然而然想到的就是反向代理;最后就是如何对原有的 Bean 进行改变呢?此刻就需要联系到与 Bean 生命周期相关的 BeanPostProcessor 接口
2.2 线程池使用
线程池这部分还是相当重要的,使用不当可能会导致意向不到的问题发生,比如内存溢出、无限制创建线程、业务之间相互影响等等
<p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@linkorg.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. 复制代码
根据官方文档的说明,可以得知 Spring 会从上下文中获取唯一的 TaskExecutor 或者名称"taskExecutor"的 Bean 作为线程池,默认的线程池在 TaskExecutionAutoConfiguration 自动配置类中定义,默认的线程池相关配置如下
可以看到默认线程池的队列大小和最大线程数都是 Integer 的最大值,显然会给系统留下一定的风险隐患,因此我们需要针对每个异步任务自定义线程池,然后在 @Async()注解上指明对应线程池的 Bean 名称
2.3 异常处理
异步任务的异常处理默认情况下只会打印日志信息,不会做任何额外的额处理,官方文档也有相关的说明
Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. 复制代码
SimpleAsyncUncaughtExceptionHandler 就是异步任务异常处理的默认实现,如果想要自定义异常处理,只需要 AsyncConfigurer 接口即可
2.4 返回值类型
关于返回值类型,首先来看下官方的相关说明
<p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@linkjava.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. 复制代码
从官方说明上可以看出返回值类型只支持 4 种类型:
voidFutureListenableFutureCompletableFuture 再来看看具体的源码
不管从官方说明还是源码分析都可以得出异步任务只支持 4 种返回类型的结论,以后就不用再问别人 String 返回类型为什么返回的是 null 这样的傻瓜问题了!
评论