@[toc]
1.@EnableAsync
以及@Async
的說明
1.1 @Async
?spring從3.0版本開始支持異步的方法調(diào)用,只需要在方法或者類上面加上一個(gè)@Async
注解就可以了毯焕,當(dāng)注解在類上面的時(shí)候嘹狞,則表示整個(gè)類都作為異步方法岂膳。但是需要注意的是,當(dāng)一個(gè)方法所在類上面已經(jīng)存在@Configuration
注解的時(shí)候磅网,這個(gè)時(shí)候@Async
注解是不支持的谈截,至于為什么會(huì)不支持因?yàn)?code>@Configuration注解會(huì)對(duì)當(dāng)前類進(jìn)行代理增強(qiáng),這個(gè)時(shí)候返回的類是一個(gè)經(jīng)過代理的類涧偷,這個(gè)時(shí)候的方法可能已經(jīng)不是原方法了簸喂,至于代理增強(qiáng)的相關(guān)的東西可以看看spring源碼------@Configuration跟@Component及其派生注解@Service等的區(qū)別以及spring對(duì)其代理增強(qiáng)的原理
這個(gè)里面有對(duì)@Configuration
注解的解析。
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {
//指定對(duì)應(yīng)的Executor的beanName
String value() default "";
}
1.2 @EnableAsync
?spring在3.1版本的時(shí)候加上了是否開啟異步方法支持的注解@EnableAsync
燎潮。這個(gè)注解是基于@Import
注解進(jìn)行擴(kuò)展的喻鳄,關(guān)于@Import
可以看看spring源碼解析------@Import注解解析與ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector區(qū)別
了解以下确封。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//用戶定義的需要進(jìn)行異步執(zhí)行的注解除呵,默認(rèn)只有Async注解
Class<? extends Annotation> annotation() default Annotation.class;
//是否創(chuàng)建基于CGLIB的代理對(duì)象
boolean proxyTargetClass() default false;
//代理模式選擇 PROXY表示jdk的代理
AdviceMode mode() default AdviceMode.PROXY;
//對(duì)應(yīng)的AsyncAnnotationBeanPostProcessor的攔截順序
int order() default Ordered.LOWEST_PRECEDENCE;
}
2. 源碼分析
2.1 基于@Import
擴(kuò)展的AsyncConfigurationSelector
?AsyncConfigurationSelector
類繼承了AdviceModeImportSelector
類再菊,而AdviceModeImportSelector
類在前面的一篇文章spring源碼------@EnableCaching,@Cacheable颜曾,@CacheEvict纠拔,@CachePut的實(shí)現(xiàn)原理
中提到過。這個(gè)類主要是根據(jù)@Import
注解的擴(kuò)展注解的類型的mode
屬性來設(shè)置代理類型泛啸。
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";
protected String getAdviceModeAttributeName() {
return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
}
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
//獲取AdviceModeImportSelector中的注解泛型的Class對(duì)象
Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
//從傳入的importingClassMetadata對(duì)象中獲取對(duì)應(yīng)的Class類型的注解的內(nèi)部屬性
AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (attributes == null) {
throw new IllegalArgumentException(String.format(
"@%s is not present on importing class '%s' as expected",
annType.getSimpleName(), importingClassMetadata.getClassName()));
}
//獲取屬性中的代理類型屬性“mode”的值
AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
//根據(jù)代理類型獲取需要注入的bean
String[] imports = selectImports(adviceMode);
if (imports == null) {
throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
}
return imports;
}
}
?其中selectImports
方法由子類來實(shí)現(xiàn)绿语,這里進(jìn)入AsyncConfigurationSelector
來看看實(shí)現(xiàn)的邏輯
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
//默認(rèn)的是jdk PROXY的模式
case PROXY:
//返回ProxyAsyncConfiguration類秃症,注入到容器中候址,配置異步方法相關(guān)的配置
return new String[] {ProxyAsyncConfiguration.class.getName()};
//使用AspectJ的模式
case ASPECTJ:
//返回AspectJ關(guān)于異步方法相關(guān)的配置類AspectJAsyncConfiguration
return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
?可以看到這里就是選擇不同的代理模式情況下,需要注入不同的配置類种柑。而默認(rèn)情況下@EnableAsync
注解中的是PROXY
模式岗仑,這里我們也就這種模式進(jìn)行分析,所以接下來就是ProxyAsyncConfiguration
類了聚请。
2.2 配置異步方法執(zhí)行相關(guān)配置的ProxyAsyncConfiguration
?spring對(duì)于方法的異步執(zhí)行的邏輯跟@EnableCache
注解類似的荠雕,都是基于對(duì)方法攔截完成的,這里的攔截涉及到了切點(diǎn)跟增強(qiáng)類驶赏。而ProxyAsyncConfiguration
中就配置了這兩點(diǎn)炸卑。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//創(chuàng)建AsyncAnnotationBeanPostProcessor,實(shí)現(xiàn)了AbstractAdvisingBeanPostProcessor(內(nèi)部定義了Advisor對(duì)象可以可以注冊(cè)到容器中)
AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
//設(shè)置executor跟exceptionHandler煤傍,spring都是空實(shí)現(xiàn)盖文,也就是這兩個(gè)值都是null,后面會(huì)設(shè)置默認(rèn)的對(duì)象蚯姆,也可以自己指定實(shí)現(xiàn)五续,
bpp.configure(this.executor, this.exceptionHandler);
//獲取@EnableAsync注解中的annotation屬性,這個(gè)annotation表示貼有這些注解的方法就是需要攔截的進(jìn)行異步執(zhí)行的方法
Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
//如果annotation值不是默認(rèn)的龄恋,則將這個(gè)加入到AsyncAnnotationBeanPostProcessor疙驾,后面進(jìn)行攔截用
if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
bpp.setAsyncAnnotationType(customAsyncAnnotation);
}
//是否對(duì)目標(biāo)類進(jìn)行代理
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
//設(shè)置AsyncAnnotationBeanPostProcessor的順序,默認(rèn)是最低的郭毕,這樣可以在其他的 post-processors執(zhí)行后在處理
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
?可以看到主要邏輯如下:
- 創(chuàng)建
AsyncAnnotationBeanPostProcessor
它碎, - 配置異步執(zhí)行方法的執(zhí)行器executor,以及錯(cuò)誤異常處理的exceptionHandler
- 然后解析
@EnableAsync
注解的annotation
屬性显押,或許需要攔截代理的方法扳肛,然后設(shè)置進(jìn)入。 - 設(shè)置是否代理目標(biāo)對(duì)象
- 設(shè)置攔截器所處的順序
?接下來就是進(jìn)入AsyncAnnotationBeanPostProcessor
了解相關(guān)的配置和處理異步相關(guān)的類煮落。
2.3 創(chuàng)建切點(diǎn)以及增強(qiáng)類的AsyncAnnotationBeanPostProcessor
及其父類
2.3.1 創(chuàng)建增強(qiáng)類的AsyncAnnotationBeanPostProcessor
?在AsyncAnnotationBeanPostProcessor
的構(gòu)造器中有一個(gè)邏輯就是將當(dāng)前類會(huì)創(chuàng)建的增強(qiáng)類作為所有增強(qiáng)類的第一個(gè)
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
public AsyncAnnotationBeanPostProcessor() {
//是否將這個(gè)advisor放在advisor列表的第一個(gè)敞峭,也就是最開始攔截
setBeforeExistingAdvisors(true);
}
}
?這個(gè)屬性是在其父類中被用到的,這里簡(jiǎn)單的展示以下部分的代碼蝉仇。
//實(shí)現(xiàn)了BeanPostProcessor旋讹,在bean初始化之后調(diào)用
public Object postProcessAfterInitialization(Object bean, String beanName) {
......
if (bean instanceof Advised) {
Advised advised = (Advised) bean;
if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
// Add our local Advisor to the existing proxy's Advisor chain...
//是否將當(dāng)前的Advised放在調(diào)用鏈的最前面
if (this.beforeExistingAdvisors) {
advised.addAdvisor(0, this.advisor);
}
else {
advised.addAdvisor(this.advisor);
}
return bean;
}
}
......
}
?AsyncAnnotationBeanPostProcessor
在這里還間接的實(shí)現(xiàn)了BeanFactoryAware
接口的setBeanFactory
方法殖蚕,而這個(gè)方法里面就創(chuàng)建了增強(qiáng)相關(guān)的advisor。
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//創(chuàng)建AsyncAnnotationAdvisor沉迹,
AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
//將需要攔截的方法上的注解加入到advisor
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
//設(shè)置容器到advisor
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
?這里的創(chuàng)建的AsyncAnnotationAdvisor
間接實(shí)現(xiàn)了PointcutAdvisor
跟Advisor
睦疫,會(huì)將內(nèi)部的Advice
以及Pointcut
注入到容器中,關(guān)于這兩個(gè)類可以看看6.1Spring的AOP的解析——AOP的自定義組件
了解一下鞭呕。這里直接進(jìn)入到AsyncAnnotationAdvisor
中查看蛤育。
2.3.2 創(chuàng)建增強(qiáng)跟切點(diǎn)的AsyncAnnotationAdvisor
?在AsyncAnnotationAdvisor
的構(gòu)造器中,就包含了advice跟pointcut的創(chuàng)建入口葫松。
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
//在用戶自定義的攔截注解上額外加入Async到需要攔截的注解集合
asyncAnnotationTypes.add(Async.class);
try {
//增加對(duì)java的ejv的Asynchronous注解的支持
asyncAnnotationTypes.add((Class<? extends Annotation>)
ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
}
catch (ClassNotFoundException ex) {
// If EJB 3.1 API not present, simply ignore.
}
//創(chuàng)建advice
this.advice = buildAdvice(executor, exceptionHandler);
//設(shè)置pointcut
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
?這里主要就是將表示需要攔截的注解加入到需要對(duì)應(yīng)的集合中瓦糕,然后就開始對(duì)advice跟pointcut的對(duì)象的創(chuàng)建。這里先看advice腋么。
2.3.3 創(chuàng)建攔截器AnnotationAsyncExecutionInterceptor
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//設(shè)置攔截器
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
//設(shè)置executor跟exceptionHandler
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
?這里的configure就是配置用來執(zhí)行異步調(diào)用方法的執(zhí)行器Executor
以及處理調(diào)用出異常的時(shí)候處理類AsyncUncaughtExceptionHandler
public void configure(@Nullable Supplier<Executor> defaultExecutor,
@Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
//如果defaultExecutor為null咕娄,也就是沒有指定Executor,設(shè)置默認(rèn)的SimpleAsyncTaskExecutor(在子類AsyncExecutionInterceptor中重載getDefaultExecutor方法)
this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
//如果沒有指定exceptionHandler默認(rèn)為SimpleAsyncUncaughtExceptionHandler
this.exceptionHandler = new SingletonSupplier<>(exceptionHandler, SimpleAsyncUncaughtExceptionHandler::new);
}
?其中getDefaultExecutor
方法子類AsyncExecutionInterceptor
進(jìn)行了重載
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
//先調(diào)用父類的方法從容器中獲取珊擂,
Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
//如果沒有設(shè)置Executor 則使用SimpleAsyncTaskExecutor
return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}
2.3.4 創(chuàng)建切點(diǎn)
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
//創(chuàng)建Pointcut圣勒,會(huì)設(shè)置classFilter為AnnotationClassFilter,methodMatcher為TrueMethodMatcher
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
//創(chuàng)建Pointcut摧扇,會(huì)設(shè)置classFilter為AnnotationClassFilter圣贸,methodMatcher為AnnotationMethodMatcher
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
if (result == null) {
result = new ComposablePointcut(cpc);
}
else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
?這里主要包含兩種,一種是AnnotationClassFilter
跟TrueMethodMatcher
結(jié)合的扛稽,另外一種是AnnotationClassFilter
跟AnnotationMethodMatcher
結(jié)合的吁峻。對(duì)于這三個(gè)類這里不進(jìn)行分析,只說明其作用庇绽。作用就是對(duì)當(dāng)前攔截的方法進(jìn)行匹配判斷锡搜,是否包含前面設(shè)置的那些需要攔截的注解,如果包含則說明瞧掺,這個(gè)方法是需要進(jìn)行異步執(zhí)行的耕餐,沒有則表示不匹配則會(huì)跳過這個(gè)方法。
2.4 方法攔截增強(qiáng)的邏輯
?先總結(jié)一下上面那些步驟做了啥辟狈,主要就是做了下面的幾點(diǎn):
- 設(shè)置需要攔截的注解集合
asyncAnnotationType
- 設(shè)置方法的攔截處理器
AsyncExecutionInterceptor
肠缔, - 進(jìn)行異步調(diào)用的執(zhí)行器
Executor
,錯(cuò)誤處理器SimpleAsyncUncaughtExceptionHandler
?接下來就對(duì)這些進(jìn)行分析哼转。
2.4.1 攔截方法的AsyncExecutionInterceptor
?這里直接進(jìn)入實(shí)現(xiàn)了MethodInterceptor
接口的invoke
方法明未。
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//根據(jù)方法相關(guān)信息,獲取executor壹蔓,這個(gè)executor可以指定也可以是默認(rèn)的
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
//進(jìn)入下一個(gè)攔截器鏈
Object result = invocation.proceed();
if (result instanceof Future) {
//獲取結(jié)果返回
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
//進(jìn)入到SimpleAsyncUncaughtExceptionHandler進(jìn)行錯(cuò)誤處理
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
//進(jìn)入到SimpleAsyncUncaughtExceptionHandler進(jìn)行錯(cuò)誤處理
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
//使用選定的執(zhí)行程序?qū)嶋H執(zhí)行給定任務(wù)的委托趟妥,使用的是CompletableFuture
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
?這里主要邏輯如下:
- 從目標(biāo)類中獲取最匹配的需要執(zhí)行的方法
- 獲取異步調(diào)度方法用的executor
- 執(zhí)行完攔截鏈獲取到最終結(jié)果,中間如果出錯(cuò)佣蓉,則最后交給
SimpleAsyncUncaughtExceptionHandler
處理 - 使用executor來完成異步調(diào)用
?這里進(jìn)入到determineAsyncExecutor
看看怎么選擇執(zhí)行器的披摄。
2.4.2 獲取方法調(diào)度用的執(zhí)行器
?determineAsyncExecutor
方法在MethodInterceptor
的父類AsyncExecutionAspectSupport
中亲雪。
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//緩存的AsyncTaskExecutor跟method的map集合
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
Executor targetExecutor;
//獲取貼有Async注解的方法,最終的是現(xiàn)在注冊(cè)的AnnotationAsyncExecutionInterceptor類中疚膊,尋找Async注解中的value字段
String qualifier = getExecutorQualifier(method);
if (StringUtils.hasLength(qualifier)) {
//如果value有指定Executor則在容器中尋找關(guān)聯(lián)的執(zhí)行器
targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
}
else {
//使用默認(rèn)的Executor,SimpleAsyncTaskExecutor
targetExecutor = this.defaultExecutor.get();
}
if (targetExecutor == null) {
return null;
}
//轉(zhuǎn)換為AsyncTaskExecutor類型
executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
//保存到緩存中
this.executors.put(method, executor);
}
return executor;
}
?這里主要邏輯就是先獲取配置指定义辕,如果沒有指定則用默認(rèn)的。而獲取配置的邏輯在AsyncExecutionInterceptor
的子類AnnotationAsyncExecutionInterceptor
中實(shí)現(xiàn)的寓盗。
protected String getExecutorQualifier(Method method) {
//獲取方法上的Async注解
Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
if (async == null) {
//注解為空灌砖,則獲取聲明這個(gè)方法的類上面有沒有這個(gè)注解
async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
}
//返回Async注解value或者null
return (async != null ? async.value() : null);
}
?其主要邏輯就是獲取方法或者類上面有沒有@Async
注解,有則代表是需要異步調(diào)用的方法傀蚌,沒有則不是基显,是的還需要進(jìn)一步獲取注解中value
屬性,看有沒有指定執(zhí)行器并返回喳张。
2.4.3 進(jìn)行方法的異步調(diào)用——使用線程池
?在AsyncExecutionInterceptor
中獲取到了AsyncTaskExecutor
類型的執(zhí)行器之后续镇,就是信息異步的方法調(diào)用了。這個(gè)邏輯在doSubmit
方法中销部。
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
//使用的java8 的新的Future相關(guān)API,CompletableFuture來完成
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
//提交任務(wù)
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
//提交任務(wù)
return executor.submit(task);
}
else {
//提交任務(wù)
executor.submit(task);
return null;
}
}
?可以看到制跟,最后的所有異步調(diào)用還是離不開線程池舅桩。到這里整個(gè)@EnableAsync
注解以及@Async
注解的分析就完結(jié)了。