Skip to content

AOP 之 @Async

使用

注意

  • 如果方法返回 Future 对象,方法就会阻塞,知道执行完成(类似于变成单线程执行了)。
  • @Async 可以指定一个参数,这个参数名字代表着对应线程池在 Spring 容器中的 bean name, 如果不指定,默认是 taskExecutor 线程池,当然,是可以配置默认线程池大小的。
  • org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
yaml
spring:
  task:
    # @Async Spring异步任务 SpringBean=taskExecutor
    execution:
      pool:
        # default 8  TaskExecutionProperties
        core-size: 8
java
// 启用异步注解
@EnableAsync
@Configuration
public class Configuration {}

@Component
public class AsyncFactory {

    @Async
    public Future<String> doTaskInAsync(String name) {
        // do something
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug(name + " doTaskIn Async");
        return new AsyncResult<>("SUCCESS " + name);
    }
}

源码分析

@Async 也是使用 AOP 来实现的,但跟缓存和事务使用的不是一种方式,@Async 自己又实现了 BeanPostProcessor, 重新定义了另外一套实现。但是切面有的 Advisor(ClassFilter, MethodMatcher), Pointcut, Advice 这些东西肯定都是存在的。

@EnableAsync

主要导入了 AsyncConfigurationSelector.class

java
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	Class<? extends Annotation> annotation() default Annotation.class;

	boolean proxyTargetClass() default false;

	AdviceMode mode() default AdviceMode.PROXY;

	int order() default Ordered.LOWEST_PRECEDENCE;
}

AsyncConfigurationSelector

主要实例化了 ProxyAsyncConfiguration.class

java
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

ProxyAsyncConfiguration

主要实例化了 AsyncAnnotationBeanPostProcessor.class

java
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}

AsyncAnnotationBeanPostProcessor

├─AsyncAnnotationBeanPostProcessor
│ ├─extends AbstractBeanFactoryAwareAdvisingPostProcessor
│ │ ├─extends AbstractAdvisingBeanPostProcessor implements BeanFactoryAware
│ │ │ ├─ extends ProxyProcessorSupport implements BeanPostProcessor

可以看出,是一个后置处理器,那么它就会在 Bean 实例化的前后被调用。由于 AbstractAdvisingBeanPostProcessor 实现的是 postProcessAfterInitialization 方法, 那么,就意味着这个后置处理器会在 Bean 实例化完成后,再调用。

java
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
    @Override
    public void setBeanFactory(BeanFactory beanFactory) {
        super.setBeanFactory(beanFactory);
        // 这里设置了 Advisor 为 AsyncAnnotationAdvisor,而这个里面就有 Advice 和 Pointcut
        AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
        if (this.asyncAnnotationType != null) {
            advisor.setAsyncAnnotationType(this.asyncAnnotationType);
        }
        advisor.setBeanFactory(beanFactory);
        this.advisor = advisor;
    }
}
java
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
	private Advice advice;
	private Pointcut pointcut;
	public AsyncAnnotationAdvisor() {
		this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
	}
	public AsyncAnnotationAdvisor(
			@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {
		this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
	}
	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
        // 这里终于出现了 @Async 注解
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
        // 通知
		this.advice = buildAdvice(executor, exceptionHandler);
        // 切点
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

	public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
		Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
		asyncAnnotationTypes.add(asyncAnnotationType);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		if (this.advice instanceof BeanFactoryAware) {
			((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
		}
	}

	protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
        /**
         * 通知 Advice = AnnotationAsyncExecutionInterceptor
         * 主要拿到 AsyncTaskExecutor 线程池,然后往里面提交任务
         */
		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}

	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            // 匹配类上面的 @Async
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            // 匹配方法上面的 @Async
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
	}
}

AbstractAdvisingBeanPostProcessor

java
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (this.advisor == null || bean instanceof AopInfrastructureBean) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }

        if (bean instanceof Advised) {
            Advised advised = (Advised) bean;
            if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
                // Add our local Advisor to the existing proxy's Advisor chain...
                if (this.beforeExistingAdvisors) {
                    advised.addAdvisor(0, this.advisor);
                }
                else {
                    advised.addAdvisor(this.advisor);
                }
                return bean;
            }
        }
        // 类和方法上面是否有 @Async,最终会调用到 Advisor 中的方法匹配
        if (isEligible(bean, beanName)) {
            ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
            if (!proxyFactory.isProxyTargetClass()) {
                evaluateProxyInterfaces(bean.getClass(), proxyFactory);
            }
            proxyFactory.addAdvisor(this.advisor);
            customizeProxyFactory(proxyFactory);

            // Use original ClassLoader if bean class not locally loaded in overriding class loader
            ClassLoader classLoader = getProxyClassLoader();
            if (classLoader instanceof SmartClassLoader && classLoader != bean.getClass().getClassLoader()) {
                classLoader = ((SmartClassLoader) classLoader).getOriginalClassLoader();
            }
            // 创建代理对象,并返回
            return proxyFactory.getProxy(classLoader);
        }

        // No proxy needed.
        return bean;
    }
}