AOP 之 @Async
使用
注意
- 如果方法返回 Future 对象,方法就会阻塞,知道执行完成(类似于变成单线程执行了)。
- @Async 可以指定一个参数,这个参数名字代表着对应线程池在 Spring 容器中的 bean name, 如果不指定,默认是 taskExecutor 线程池,当然,是可以配置默认线程池大小的。
- org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
yaml
spring:
task:
# @Async Spring异步任务 SpringBean=taskExecutor
execution:
pool:
# default 8 TaskExecutionProperties
core-size: 8
java
// 启用异步注解
@EnableAsync
@Configuration
public class Configuration {}
@Component
public class AsyncFactory {
@Async
public Future<String> doTaskInAsync(String name) {
// do something
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(name + " doTaskIn Async");
return new AsyncResult<>("SUCCESS " + name);
}
}
源码分析
@Async 也是使用 AOP 来实现的,但跟缓存和事务使用的不是一种方式,@Async 自己又实现了 BeanPostProcessor, 重新定义了另外一套实现。但是切面有的 Advisor(ClassFilter, MethodMatcher), Pointcut, Advice 这些东西肯定都是存在的。
@EnableAsync
主要导入了 AsyncConfigurationSelector.class
java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
Class<? extends Annotation> annotation() default Annotation.class;
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
AsyncConfigurationSelector
主要实例化了 ProxyAsyncConfiguration.class
java
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {ProxyAsyncConfiguration.class.getName()};
case ASPECTJ:
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
ProxyAsyncConfiguration
主要实例化了 AsyncAnnotationBeanPostProcessor.class
java
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
bpp.configure(this.executor, this.exceptionHandler);
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
AsyncAnnotationBeanPostProcessor
├─AsyncAnnotationBeanPostProcessor
│ ├─extends AbstractBeanFactoryAwareAdvisingPostProcessor
│ │ ├─extends AbstractAdvisingBeanPostProcessor implements BeanFactoryAware
│ │ │ ├─ extends ProxyProcessorSupport implements BeanPostProcessor
可以看出,是一个后置处理器,那么它就会在 Bean 实例化的前后被调用。由于 AbstractAdvisingBeanPostProcessor 实现的是 postProcessAfterInitialization 方法, 那么,就意味着这个后置处理器会在 Bean 实例化完成后,再调用。
java
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
// 这里设置了 Advisor 为 AsyncAnnotationAdvisor,而这个里面就有 Advice 和 Pointcut
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
}
java
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;
public AsyncAnnotationAdvisor() {
this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
}
public AsyncAnnotationAdvisor(
@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
}
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
// 这里终于出现了 @Async 注解
asyncAnnotationTypes.add(Async.class);
try {
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
// 通知
this.advice = buildAdvice(executor, exceptionHandler);
// 切点
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
asyncAnnotationTypes.add(asyncAnnotationType);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
if (this.advice instanceof BeanFactoryAware) {
((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
}
}
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
/**
* 通知 Advice = AnnotationAsyncExecutionInterceptor
* 主要拿到 AsyncTaskExecutor 线程池,然后往里面提交任务
*/
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// 匹配类上面的 @Async
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// 匹配方法上面的 @Async
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
}
AbstractAdvisingBeanPostProcessor
java
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (this.advisor == null || bean instanceof AopInfrastructureBean) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
// 类和方法上面是否有 @Async,最终会调用到 Advisor 中的方法匹配
if (isEligible(bean, beanName)) {
ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
if (!proxyFactory.isProxyTargetClass()) {
evaluateProxyInterfaces(bean.getClass(), proxyFactory);
}
proxyFactory.addAdvisor(this.advisor);
customizeProxyFactory(proxyFactory);
// Use original ClassLoader if bean class not locally loaded in overriding class loader
ClassLoader classLoader = getProxyClassLoader();
if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
}
// 创建代理对象,并返回
return proxyFactory.getProxy(classLoader);
}
// No proxy needed.
return bean;
}
}