如何在 Spring 異步調(diào)用中傳遞上下文

image

什么是異步調(diào)用?

異步調(diào)用是相對于同步調(diào)用而言的秤茅,同步調(diào)用是指程序按預(yù)定順序一步步執(zhí)行,每一步必須等到上一步執(zhí)行完后才能執(zhí)行童叠,異步調(diào)用則無需等待上一步程序執(zhí)行完即可執(zhí)行框喳。異步調(diào)用指课幕,在程序在執(zhí)行時,無需等待執(zhí)行的返回值即可繼續(xù)執(zhí)行后面的代碼五垮。在我們的應(yīng)用服務(wù)中乍惊,有很多業(yè)務(wù)邏輯的執(zhí)行操作不需要同步返回(如發(fā)送郵件、冗余數(shù)據(jù)表等)放仗,只需要異步執(zhí)行即可润绎。

本文將介紹 Spring 應(yīng)用中,如何實現(xiàn)異步調(diào)用诞挨。在異步調(diào)用的過程中莉撇,會出現(xiàn)線程上下文信息的丟失,我們該如何解決線程上下文信息的傳遞惶傻。

Spring 應(yīng)用中實現(xiàn)異步

Spring 為任務(wù)調(diào)度與異步方法執(zhí)行提供了注解支持棍郎。通過在方法或類上設(shè)置 @Async 注解,可使得方法被異步調(diào)用银室。調(diào)用者會在調(diào)用時立即返回涂佃,而被調(diào)用方法的實際執(zhí)行是交給 Spring 的 TaskExecutor 來完成的。所以被注解的方法被調(diào)用的時候蜈敢,會在新的線程中執(zhí)行辜荠,而調(diào)用它的方法會在原線程中執(zhí)行,這樣可以避免阻塞抓狭,以及保證任務(wù)的實時性伯病。

引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

引入 Spring 相關(guān)的依賴即可。

入口類

@SpringBootApplication
@EnableAsync
public class AsyncApplication {
    public static void main(String[] args) {
        SpringApplication.run(AsyncApplication.class, args);
    }

入口類增加了 @EnableAsync 注解辐宾,主要是為了掃描范圍包下的所有 @Async 注解狱从。

對外的接口

這里寫了一個簡單的接口:

@RestController
@Slf4j
public class TaskController {

    @Autowired
    private TaskService taskService;

    @GetMapping("/task")
    public String taskExecute() {
        try {
            taskService.doTaskOne();
            taskService.doTaskTwo();
            taskService.doTaskThree();
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }
        return "ok";
    }
}

調(diào)用 TaskService 執(zhí)行三個異步方法。

Service 方法

@Component
@Slf4j
//@Async
public class TaskService {

    @Async
    public void doTaskOne() throws Exception {
        log.info("開始做任務(wù)一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)一叠纹,耗時:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskTwo() throws Exception {
        log.info("開始做任務(wù)二");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)二季研,耗時:" + (end - start) + "毫秒");
    }

    @Async
    public void doTaskThree() throws Exception {
        log.info("開始做任務(wù)三");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)三,耗時:" + (end - start) + "毫秒");
    }
}

@Async 可以用于類上誉察,標識該類的所有方法都是異步方法与涡,也可以單獨用于某些方法。每個方法都會 sleep 1000 ms持偏。

結(jié)果展示

運行結(jié)果如下:

image

可以看到 TaskService 中的三個方法是異步執(zhí)行的驼卖,接口的結(jié)果快速返回,日志信息異步輸出鸿秆。異步調(diào)用酌畜,通過開啟新的線程調(diào)用的方法,不影響主線程卿叽。異步方法實際的執(zhí)行交給了 Spring 的 TaskExecutor 來完成桥胞。

Future:獲取異步執(zhí)行的結(jié)果

在上面的測試中我們也可以發(fā)現(xiàn)主調(diào)用方法并沒有等到調(diào)用方法執(zhí)行完就結(jié)束了當前的任務(wù)恳守。如果想要知道調(diào)用的三個方法全部執(zhí)行完該怎么辦呢,下面就可以用到異步回調(diào)贩虾。

異步回調(diào)就是讓每個被調(diào)用的方法返回一個 Future 類型的值催烘,Spring 中提供了一個 Future 接口的子類:AsyncResult,所以我們可以返回 AsyncResult 類型的值缎罢。

public class AsyncResult<V> implements ListenableFuture<V> {

    private final V value;

    private final ExecutionException executionException;
    //...
}

AsyncResult 實現(xiàn)了 ListenableFuture 接口伊群,該對象內(nèi)部有兩個屬性:返回值和異常信息。

public interface ListenableFuture<T> extends Future<T> {
    void addCallback(ListenableFutureCallback<? super T> var1);

    void addCallback(SuccessCallback<? super T> var1, FailureCallback var2);
}

ListenableFuture 接口繼承自 Future策精,在此基礎(chǔ)上增加了回調(diào)方法的定義舰始。Future 接口定義如下:

public interface Future<V> {
    // 是否可以打斷當前正在執(zhí)行的任務(wù)
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 任務(wù)取消的結(jié)果
    boolean isCancelled();
    
    // 異步方法中最后返回的那個對象中的值 
    V get() throws InterruptedException, ExecutionException;
    // 用來判斷該異步任務(wù)是否執(zhí)行完成,如果執(zhí)行完成蛮寂,則返回 true蔽午,如果未執(zhí)行完成,則返回false
    boolean isDone();
    // 與 get() 一樣酬蹋,只不過這里參數(shù)中設(shè)置了超時時間
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

#get() 方法及老,在執(zhí)行的時候是需要等待回調(diào)結(jié)果的,阻塞等待范抓。如果不設(shè)置超時時間骄恶,它就阻塞在那里直到有了任務(wù)執(zhí)行完成。我們設(shè)置超時時間匕垫,就可以在當前任務(wù)執(zhí)行太久的情況下中斷當前任務(wù)僧鲁,釋放線程,這樣就不會導(dǎo)致一直占用資源象泵。

#cancel(boolean) 方法寞秃,參數(shù)是一個 boolean 類型的值,用來傳入是否可以打斷當前正在執(zhí)行的任務(wù)偶惠。如果參數(shù)是 true 且當前任務(wù)沒有執(zhí)行完成 春寿,說明可以打斷當前任務(wù),那么就會返回 true忽孽;如果當前任務(wù)還沒有執(zhí)行绑改,那么不管參數(shù)是 true 還是 false,返回值都是 true兄一;如果當前任務(wù)已經(jīng)完成厘线,那么不管參數(shù)是 true 還是 false,那么返回值都是 false出革;如果當前任務(wù)沒有完成且參數(shù)是 false造壮,那么返回值也是 false。即:

  1. 如果任務(wù)還沒執(zhí)行骂束,那么如果想取消任務(wù)耳璧,就一定返回 true硝全,與參數(shù)無關(guān)。
  2. 如果任務(wù)已經(jīng)執(zhí)行完成楞抡,那么任務(wù)一定是不能取消的,所以此時返回值都是false析藕,與參數(shù)無關(guān)召廷。
  3. 如果任務(wù)正在執(zhí)行中,那么此時是否取消任務(wù)就看參數(shù)是否允許打斷(true/false)账胧。

獲取異步方法返回值的實現(xiàn)

    public Future<String> doTaskOne() throws Exception {
        log.info("開始做任務(wù)一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)一竞慢,耗時:" + (end - start) + "毫秒");
        return new AsyncResult<>("任務(wù)一完成,耗時" + (end - start) + "毫秒");
    }
    //...其他兩個方法類似治泥,省略

我們將 task 方法的返回值改為 Future<String>筹煮,將執(zhí)行的時間拼接為字符串返回。

    @GetMapping("/task")
    public String taskExecute() {
        try {
            Future<String> r1 = taskService.doTaskOne();
            Future<String> r2 = taskService.doTaskTwo();
            Future<String> r3 = taskService.doTaskThree();
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
           log.error("error executing task for {}",e.getMessage());
        }

        return "ok";
    }
image

在調(diào)用異步方法之后居夹,可以通過循環(huán)判斷異步方法是否執(zhí)行完成败潦。結(jié)果正如我們所預(yù)期,future 所 get 到的是 AsyncResult 返回的字符串准脂。

配置線程池

前面是最簡單的使用方法劫扒,使用默認的 TaskExecutor。如果想使用自定義的 Executor狸膏,可以結(jié)合 @Configuration 注解的配置方式沟饥。


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskPoolConfig {

    @Bean("taskExecutor") // bean 的名稱,默認為首字母小寫的方法名
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 核心線程數(shù)(默認線程數(shù))
        executor.setMaxPoolSize(20); // 最大線程數(shù)
        executor.setQueueCapacity(200); // 緩沖隊列數(shù)
        executor.setKeepAliveSeconds(60); // 允許線程空閑時間(單位:默認為秒)
        executor.setThreadNamePrefix("taskExecutor-"); // 線程池名前綴
        // 線程池對拒絕任務(wù)的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

線程池的配置很靈活湾戳,對核心線程數(shù)贤旷、最大線程數(shù)等屬性進行配置。其中砾脑,rejection-policy幼驶,當線程池已經(jīng)達到最大線程數(shù)的時候,如何處理新任務(wù)拦止∠厍玻可選策略有 CallerBlocksPolicy、CallerRunsPolicy 等汹族。CALLER_RUNS:不在新線程中執(zhí)行任務(wù)萧求,而是由調(diào)用者所在的線程來執(zhí)行。我們驗證下顶瞒,線程池的設(shè)置是否生效夸政,在 TaskService 中,打印當前的線程名稱:

    public Future<String> doTaskOne() throws Exception {
        log.info("開始做任務(wù)一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)一榴徐,耗時:" + (end - start) + "毫秒");
        log.info("當前線程為 {}", Thread.currentThread().getName());
        return new AsyncResult<>("任務(wù)一完成守问,耗時" + (end - start) + "毫秒");
    }
image

通過結(jié)果可以看到匀归,線程池配置的線程名前綴已經(jīng)生效。在 Spring @Async 異步線程使用過程中耗帕,需要注意的是以下的用法會使 @Async 失效:

  • 異步方法使用 static 修飾穆端;
  • 異步類沒有使用 @Component 注解(或其他注解)導(dǎo)致 Spring 無法掃描到異步類;
  • 異步方法不能與被調(diào)用的異步方法在同一個類中仿便;
  • 類中需要使用 @Autowired 或 @Resource 等注解自動注入体啰,不能手動 new 對象;
  • 如果使用 Spring Boot 框架必須在啟動類中增加 @EnableAsync 注解嗽仪。

線程上下文信息傳遞

很多時候荒勇,在微服務(wù)架構(gòu)中的一次請求會涉及多個微服務(wù)∥偶幔或者一個服務(wù)中會有多個處理方法沽翔,這些方法有可能是異步方法。有些線程上下文信息窿凤,如請求的路徑仅偎,用戶唯一的 userId,這些信息會一直在請求中傳遞卷玉。如果不做任何處理哨颂,我們看下是否能夠正常獲取這些信息。

@GetMapping("/task")
    public String taskExecute() {
        try {
            Future<String> r1 = taskService.doTaskOne();
            Future<String> r2 = taskService.doTaskTwo();
            Future<String> r3 = taskService.doTaskThree();

            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
            log.info("當前線程為 {}相种,請求方法為 {}威恼,請求路徑為:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
            while (true) {
                if (r1.isDone() && r2.isDone() && r3.isDone()) {
                    log.info("execute all tasks");
                    break;
                }
                Thread.sleep(200);
            }
            log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get());
        } catch (Exception e) {
            log.error("error executing task for {}", e.getMessage());
        }

        return "ok";
    }

在 Spring Boot Web 中我們可以通過 RequestContextHolder 很方便的獲取 request。在接口方法中寝并,輸出請求的方法和請求的路徑箫措。

    public Future<String> doTaskOne() throws Exception {
        log.info("開始做任務(wù)一");
        long start = System.currentTimeMillis();
        Thread.sleep(1000);
        long end = System.currentTimeMillis();
        log.info("完成任務(wù)一,耗時:" + (end - start) + "毫秒");
        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = requestAttributes.getRequest();
        log.info("當前線程為 {}衬潦,請求方法為 {}斤蔓,請求路徑為:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString());
        return new AsyncResult<>("任務(wù)一完成,耗時" + (end - start) + "毫秒");
    }

同時在 TaskService 中镀岛,驗證是不是也能輸出請求的信息弦牡。運行程序,結(jié)果如下:

image

在 TaskService 中漂羊,每個異步線程的方法獲取 RequestContextHolder 中的請求信息時驾锰,報了空指針異常。這說明了請求的上下文信息未傳遞到異步方法的線程中走越。RequestContextHolder 的實現(xiàn)椭豫,里面有兩個 ThreadLocal 保存當前線程下的 request。

    //得到存儲進去的request
    private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
            new NamedThreadLocal<RequestAttributes>("Request attributes");
    //可被子線程繼承的request
    private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
            new NamedInheritableThreadLocal<RequestAttributes>("Request context");

再看 #getRequestAttributes() 方法,相當于直接獲取 ThreadLocal 里面的值赏酥,這樣就使得每一次獲取到的 Request 是該請求的 request喳整。如何將上下文信息傳遞到異步線程呢账嚎?Spring 中的 ThreadPoolTaskExecutor 有一個配置屬性 TaskDecorator提陶,TaskDecorator 是一個回調(diào)接口,采用裝飾器模式霎箍。裝飾模式是動態(tài)的給一個對象添加一些額外的功能呵晨,就增加功能來說瞬项,裝飾模式比生成子類更為靈活。因此 TaskDecorator 主要用于任務(wù)的調(diào)用時設(shè)置一些執(zhí)行上下文何荚,或者為任務(wù)執(zhí)行提供一些監(jiān)視/統(tǒng)計。

public interface TaskDecorator {

    Runnable decorate(Runnable runnable);
}

#decorate 方法猪杭,裝飾給定的 Runnable餐塘,返回包裝的 Runnable 以供實際執(zhí)行。

下面我們定義一個線程上下文拷貝的 TaskDecorator皂吮。

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

public class ContextDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

實現(xiàn)較為簡單戒傻,將當前線程的 context 裝飾到指定的 Runnable,最后重置當前線程上下文蜂筹。

在線程池的配置中需纳,增加回調(diào)的 TaskDecorator 屬性的配置:

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        // 增加 TaskDecorator 屬性的配置
        executor.setTaskDecorator(new ContextDecorator());
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

經(jīng)過如上配置,我們再次運行服務(wù)艺挪,并訪問接口不翩,控制臺日志信息如下:

image

由結(jié)果可知,線程的上下文信息傳遞成功麻裳。

小結(jié)

本文結(jié)合示例講解了 Spring 中實現(xiàn)異步方法口蝠,獲取異步方法的返回值。并介紹了配置 Spring 線程池的方式津坑。最后介紹如何在異步多線程中傳遞線程上下文信息妙蔗。線程上下文傳遞在分布式環(huán)境中會經(jīng)常用到,比如分布式鏈路追蹤中需要一次請求涉及到的 TraceId疆瑰、SpanId眉反。簡單來說,需要傳遞的信息能夠在不同線程中穆役。異步方法是我們在日常開發(fā)中用來多線程處理業(yè)務(wù)邏輯寸五,這些業(yè)務(wù)邏輯不需要嚴格的執(zhí)行順序。用好異步解決問題的同時孵睬,更要用對異步多線程的方式播歼。

源碼地址

推薦閱讀

微服務(wù)合集

訂閱最新文章,歡迎關(guān)注我的公眾號

微信公眾號
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市秘狞,隨后出現(xiàn)的幾起案子叭莫,更是在濱河造成了極大的恐慌,老刑警劉巖烁试,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雇初,死亡現(xiàn)場離奇詭異,居然都是意外死亡减响,警方通過查閱死者的電腦和手機靖诗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來支示,“玉大人刊橘,你說我怎么就攤上這事∷毯瑁” “怎么了促绵?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長嘴纺。 經(jīng)常有香客問我败晴,道長,這世上最難降的妖魔是什么栽渴? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任尖坤,我火速辦了婚禮,結(jié)果婚禮上闲擦,老公的妹妹穿的比我還像新娘慢味。我一直安慰自己,他們只是感情好墅冷,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布贮缕。 她就那樣靜靜地躺著,像睡著了一般俺榆。 火紅的嫁衣襯著肌膚如雪感昼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天罐脊,我揣著相機與錄音定嗓,去河邊找鬼。 笑死萍桌,一個胖子當著我的面吹牛宵溅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播上炎,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼恃逻,長吁一口氣:“原來是場噩夢啊……” “哼雏搂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起寇损,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤凸郑,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后矛市,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芙沥,經(jīng)...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年浊吏,在試婚紗的時候發(fā)現(xiàn)自己被綠了而昨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡找田,死狀恐怖歌憨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情墩衙,我是刑警寧澤躺孝,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站底桂,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏惧眠。R本人自食惡果不足惜籽懦,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望氛魁。 院中可真熱鬧暮顺,春花似錦、人聲如沸秀存。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽或链。三九已至惫恼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間澳盐,已是汗流浹背祈纯。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留叼耙,地道東北人腕窥。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像筛婉,于是被迫代替她去往敵國和親簇爆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355

推薦閱讀更多精彩內(nèi)容