什么是異步調(diào)用?
異步調(diào)用是相對于同步調(diào)用而言的秤茅,同步調(diào)用是指程序按預(yù)定順序一步步執(zhí)行,每一步必須等到上一步執(zhí)行完后才能執(zhí)行童叠,異步調(diào)用則無需等待上一步程序執(zhí)行完即可執(zhí)行框喳。異步調(diào)用指课幕,在程序在執(zhí)行時,無需等待執(zhí)行的返回值即可繼續(xù)執(zhí)行后面的代碼五垮。在我們的應(yīng)用服務(wù)中乍惊,有很多業(yè)務(wù)邏輯的執(zhí)行操作不需要同步返回(如發(fā)送郵件、冗余數(shù)據(jù)表等)放仗,只需要異步執(zhí)行即可润绎。
本文將介紹 Spring 應(yīng)用中,如何實現(xiàn)異步調(diào)用诞挨。在異步調(diào)用的過程中莉撇,會出現(xiàn)線程上下文信息的丟失,我們該如何解決線程上下文信息的傳遞惶傻。
Spring 應(yīng)用中實現(xiàn)異步
Spring 為任務(wù)調(diào)度與異步方法執(zhí)行提供了注解支持棍郎。通過在方法或類上設(shè)置 @Async
注解,可使得方法被異步調(diào)用银室。調(diào)用者會在調(diào)用時立即返回涂佃,而被調(diào)用方法的實際執(zhí)行是交給 Spring 的 TaskExecutor
來完成的。所以被注解的方法被調(diào)用的時候蜈敢,會在新的線程中執(zhí)行辜荠,而調(diào)用它的方法會在原線程中執(zhí)行,這樣可以避免阻塞抓狭,以及保證任務(wù)的實時性伯病。
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
引入 Spring 相關(guān)的依賴即可。
入口類
@SpringBootApplication
@EnableAsync
public class AsyncApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncApplication.class, args);
}
入口類增加了 @EnableAsync
注解辐宾,主要是為了掃描范圍包下的所有 @Async
注解狱从。
對外的接口
這里寫了一個簡單的接口:
@RestController
@Slf4j
public class TaskController {
@Autowired
private TaskService taskService;
@GetMapping("/task")
public String taskExecute() {
try {
taskService.doTaskOne();
taskService.doTaskTwo();
taskService.doTaskThree();
} catch (Exception e) {
log.error("error executing task for {}",e.getMessage());
}
return "ok";
}
}
調(diào)用 TaskService
執(zhí)行三個異步方法。
Service 方法
@Component
@Slf4j
//@Async
public class TaskService {
@Async
public void doTaskOne() throws Exception {
log.info("開始做任務(wù)一");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任務(wù)一叠纹,耗時:" + (end - start) + "毫秒");
}
@Async
public void doTaskTwo() throws Exception {
log.info("開始做任務(wù)二");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任務(wù)二季研,耗時:" + (end - start) + "毫秒");
}
@Async
public void doTaskThree() throws Exception {
log.info("開始做任務(wù)三");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任務(wù)三,耗時:" + (end - start) + "毫秒");
}
}
@Async
可以用于類上誉察,標識該類的所有方法都是異步方法与涡,也可以單獨用于某些方法。每個方法都會 sleep 1000 ms持偏。
結(jié)果展示
運行結(jié)果如下:
可以看到 TaskService
中的三個方法是異步執(zhí)行的驼卖,接口的結(jié)果快速返回,日志信息異步輸出鸿秆。異步調(diào)用酌畜,通過開啟新的線程調(diào)用的方法,不影響主線程卿叽。異步方法實際的執(zhí)行交給了 Spring 的 TaskExecutor
來完成桥胞。
Future:獲取異步執(zhí)行的結(jié)果
在上面的測試中我們也可以發(fā)現(xiàn)主調(diào)用方法并沒有等到調(diào)用方法執(zhí)行完就結(jié)束了當前的任務(wù)恳守。如果想要知道調(diào)用的三個方法全部執(zhí)行完該怎么辦呢,下面就可以用到異步回調(diào)贩虾。
異步回調(diào)就是讓每個被調(diào)用的方法返回一個 Future 類型的值催烘,Spring 中提供了一個 Future 接口的子類:AsyncResult
,所以我們可以返回 AsyncResult
類型的值缎罢。
public class AsyncResult<V> implements ListenableFuture<V> {
private final V value;
private final ExecutionException executionException;
//...
}
AsyncResult
實現(xiàn)了 ListenableFuture
接口伊群,該對象內(nèi)部有兩個屬性:返回值和異常信息。
public interface ListenableFuture<T> extends Future<T> {
void addCallback(ListenableFutureCallback<? super T> var1);
void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}
ListenableFuture
接口繼承自 Future策精,在此基礎(chǔ)上增加了回調(diào)方法的定義舰始。Future 接口定義如下:
public interface Future<V> {
// 是否可以打斷當前正在執(zhí)行的任務(wù)
boolean cancel(boolean mayInterruptIfRunning);
// 任務(wù)取消的結(jié)果
boolean isCancelled();
// 異步方法中最后返回的那個對象中的值
V get() throws InterruptedException, ExecutionException;
// 用來判斷該異步任務(wù)是否執(zhí)行完成,如果執(zhí)行完成蛮寂,則返回 true蔽午,如果未執(zhí)行完成,則返回false
boolean isDone();
// 與 get() 一樣酬蹋,只不過這里參數(shù)中設(shè)置了超時時間
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
#get()
方法及老,在執(zhí)行的時候是需要等待回調(diào)結(jié)果的,阻塞等待范抓。如果不設(shè)置超時時間骄恶,它就阻塞在那里直到有了任務(wù)執(zhí)行完成。我們設(shè)置超時時間匕垫,就可以在當前任務(wù)執(zhí)行太久的情況下中斷當前任務(wù)僧鲁,釋放線程,這樣就不會導(dǎo)致一直占用資源象泵。
#cancel(boolean)
方法寞秃,參數(shù)是一個 boolean 類型的值,用來傳入是否可以打斷當前正在執(zhí)行的任務(wù)偶惠。如果參數(shù)是 true 且當前任務(wù)沒有執(zhí)行完成 春寿,說明可以打斷當前任務(wù),那么就會返回 true忽孽;如果當前任務(wù)還沒有執(zhí)行绑改,那么不管參數(shù)是 true 還是 false,返回值都是 true兄一;如果當前任務(wù)已經(jīng)完成厘线,那么不管參數(shù)是 true 還是 false,那么返回值都是 false出革;如果當前任務(wù)沒有完成且參數(shù)是 false造壮,那么返回值也是 false。即:
- 如果任務(wù)還沒執(zhí)行骂束,那么如果想取消任務(wù)耳璧,就一定返回 true硝全,與參數(shù)無關(guān)。
- 如果任務(wù)已經(jīng)執(zhí)行完成楞抡,那么任務(wù)一定是不能取消的,所以此時返回值都是false析藕,與參數(shù)無關(guān)召廷。
- 如果任務(wù)正在執(zhí)行中,那么此時是否取消任務(wù)就看參數(shù)是否允許打斷(true/false)账胧。
獲取異步方法返回值的實現(xiàn)
public Future<String> doTaskOne() throws Exception {
log.info("開始做任務(wù)一");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任務(wù)一竞慢,耗時:" + (end - start) + "毫秒");
return new AsyncResult<>("任務(wù)一完成,耗時" + (end - start) + "毫秒");
}
//...其他兩個方法類似治泥,省略
我們將 task 方法的返回值改為 Future<String>
筹煮,將執(zhí)行的時間拼接為字符串返回。
@GetMapping("/task")
public String taskExecute() {
try {
Future<String> r1 = taskService.doTaskOne();
Future<String> r2 = taskService.doTaskTwo();
Future<String> r3 = taskService.doTaskThree();
while (true) {
if (r1.isDone() && r2.isDone() && r3.isDone()) {
log.info("execute all tasks");
break;
}
Thread.sleep(200);
}
log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
} catch (Exception e) {
log.error("error executing task for {}",e.getMessage());
}
return "ok";
}
在調(diào)用異步方法之后居夹,可以通過循環(huán)判斷異步方法是否執(zhí)行完成败潦。結(jié)果正如我們所預(yù)期,future 所 get 到的是 AsyncResult 返回的字符串准脂。
配置線程池
前面是最簡單的使用方法劫扒,使用默認的 TaskExecutor
。如果想使用自定義的 Executor狸膏,可以結(jié)合 @Configuration
注解的配置方式沟饥。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class TaskPoolConfig {
@Bean("taskExecutor") // bean 的名稱,默認為首字母小寫的方法名
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 核心線程數(shù)(默認線程數(shù))
executor.setMaxPoolSize(20); // 最大線程數(shù)
executor.setQueueCapacity(200); // 緩沖隊列數(shù)
executor.setKeepAliveSeconds(60); // 允許線程空閑時間(單位:默認為秒)
executor.setThreadNamePrefix("taskExecutor-"); // 線程池名前綴
// 線程池對拒絕任務(wù)的處理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
線程池的配置很靈活湾戳,對核心線程數(shù)贤旷、最大線程數(shù)等屬性進行配置。其中砾脑,rejection-policy幼驶,當線程池已經(jīng)達到最大線程數(shù)的時候,如何處理新任務(wù)拦止∠厍玻可選策略有 CallerBlocksPolicy、CallerRunsPolicy 等汹族。CALLER_RUNS:不在新線程中執(zhí)行任務(wù)萧求,而是由調(diào)用者所在的線程來執(zhí)行。我們驗證下顶瞒,線程池的設(shè)置是否生效夸政,在 TaskService 中,打印當前的線程名稱:
public Future<String> doTaskOne() throws Exception {
log.info("開始做任務(wù)一");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任務(wù)一榴徐,耗時:" + (end - start) + "毫秒");
log.info("當前線程為 {}", Thread.currentThread().getName());
return new AsyncResult<>("任務(wù)一完成守问,耗時" + (end - start) + "毫秒");
}
通過結(jié)果可以看到匀归,線程池配置的線程名前綴已經(jīng)生效。在 Spring @Async 異步線程使用過程中耗帕,需要注意的是以下的用法會使 @Async
失效:
- 異步方法使用 static 修飾穆端;
- 異步類沒有使用 @Component 注解(或其他注解)導(dǎo)致 Spring 無法掃描到異步類;
- 異步方法不能與被調(diào)用的異步方法在同一個類中仿便;
- 類中需要使用 @Autowired 或 @Resource 等注解自動注入体啰,不能手動 new 對象;
- 如果使用 Spring Boot 框架必須在啟動類中增加
@EnableAsync
注解嗽仪。
線程上下文信息傳遞
很多時候荒勇,在微服務(wù)架構(gòu)中的一次請求會涉及多個微服務(wù)∥偶幔或者一個服務(wù)中會有多個處理方法沽翔,這些方法有可能是異步方法。有些線程上下文信息窿凤,如請求的路徑仅偎,用戶唯一的 userId,這些信息會一直在請求中傳遞卷玉。如果不做任何處理哨颂,我們看下是否能夠正常獲取這些信息。
@GetMapping("/task")
public String taskExecute() {
try {
Future<String> r1 = taskService.doTaskOne();
Future<String> r2 = taskService.doTaskTwo();
Future<String> r3 = taskService.doTaskThree();
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
log.info("當前線程為 {}相种,請求方法為 {}威恼,請求路徑為:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
while (true) {
if (r1.isDone() && r2.isDone() && r3.isDone()) {
log.info("execute all tasks");
break;
}
Thread.sleep(200);
}
log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
} catch (Exception e) {
log.error("error executing task for {}", e.getMessage());
}
return "ok";
}
在 Spring Boot Web 中我們可以通過 RequestContextHolder
很方便的獲取 request。在接口方法中寝并,輸出請求的方法和請求的路徑箫措。
public Future<String> doTaskOne() throws Exception {
log.info("開始做任務(wù)一");
long start = System.currentTimeMillis();
Thread.sleep(1000);
long end = System.currentTimeMillis();
log.info("完成任務(wù)一,耗時:" + (end - start) + "毫秒");
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = requestAttributes.getRequest();
log.info("當前線程為 {}衬潦,請求方法為 {}斤蔓,請求路徑為:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
return new AsyncResult<>("任務(wù)一完成,耗時" + (end - start) + "毫秒");
}
同時在 TaskService 中镀岛,驗證是不是也能輸出請求的信息弦牡。運行程序,結(jié)果如下:
在 TaskService 中漂羊,每個異步線程的方法獲取 RequestContextHolder
中的請求信息時驾锰,報了空指針異常。這說明了請求的上下文信息未傳遞到異步方法的線程中走越。RequestContextHolder
的實現(xiàn)椭豫,里面有兩個 ThreadLocal 保存當前線程下的 request。
//得到存儲進去的request
private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
new NamedThreadLocal<RequestAttributes>("Request attributes");
//可被子線程繼承的request
private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
new NamedInheritableThreadLocal<RequestAttributes>("Request context");
再看 #getRequestAttributes()
方法,相當于直接獲取 ThreadLocal 里面的值赏酥,這樣就使得每一次獲取到的 Request 是該請求的 request喳整。如何將上下文信息傳遞到異步線程呢账嚎?Spring 中的 ThreadPoolTaskExecutor
有一個配置屬性 TaskDecorator
提陶,TaskDecorator
是一個回調(diào)接口,采用裝飾器模式霎箍。裝飾模式是動態(tài)的給一個對象添加一些額外的功能呵晨,就增加功能來說瞬项,裝飾模式比生成子類更為靈活。因此 TaskDecorator
主要用于任務(wù)的調(diào)用時設(shè)置一些執(zhí)行上下文何荚,或者為任務(wù)執(zhí)行提供一些監(jiān)視/統(tǒng)計。
public interface TaskDecorator {
Runnable decorate(Runnable runnable);
}
#decorate
方法猪杭,裝飾給定的 Runnable餐塘,返回包裝的 Runnable 以供實際執(zhí)行。
下面我們定義一個線程上下文拷貝的 TaskDecorator
皂吮。
import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
public class ContextDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
return () -> {
try {
RequestContextHolder.setRequestAttributes(context);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
}
};
}
}
實現(xiàn)較為簡單戒傻,將當前線程的 context 裝飾到指定的 Runnable,最后重置當前線程上下文蜂筹。
在線程池的配置中需纳,增加回調(diào)的 TaskDecorator 屬性的配置:
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("taskExecutor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
// 增加 TaskDecorator 屬性的配置
executor.setTaskDecorator(new ContextDecorator());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
經(jīng)過如上配置,我們再次運行服務(wù)艺挪,并訪問接口不翩,控制臺日志信息如下:
由結(jié)果可知,線程的上下文信息傳遞成功麻裳。
小結(jié)
本文結(jié)合示例講解了 Spring 中實現(xiàn)異步方法口蝠,獲取異步方法的返回值。并介紹了配置 Spring 線程池的方式津坑。最后介紹如何在異步多線程中傳遞線程上下文信息妙蔗。線程上下文傳遞在分布式環(huán)境中會經(jīng)常用到,比如分布式鏈路追蹤中需要一次請求涉及到的 TraceId疆瑰、SpanId眉反。簡單來說,需要傳遞的信息能夠在不同線程中穆役。異步方法是我們在日常開發(fā)中用來多線程處理業(yè)務(wù)邏輯寸五,這些業(yè)務(wù)邏輯不需要嚴格的執(zhí)行順序。用好異步解決問題的同時孵睬,更要用對異步多線程的方式播歼。