completableFuture終極指南

筆者所有文章第一時(shí)間發(fā)布于:
hhbbz的個(gè)人博客

一、簡介

java.util.concurrent.CompletableFuture繼承于java.util.concurrent.Future,它本身具備Future的所有特性打厘,并且基于JDK1.8的流式編程以及Lambda表達(dá)式等實(shí)現(xiàn)一元操作符、異步性以及事件驅(qū)動(dòng)編程模型,它的靈活性和更強(qiáng)大的功能是Future無法比擬的磅网。它提供了一共有50多種Api,這些Api的注釋比較少筷屡,命名也比較生澀涧偷,下面將會(huì)分類講解它們的使用方式簸喂。CompletableFuture吸收了所有ListenableFuture(Guava)和SettableFuture的優(yōu)點(diǎn)。此外燎潮,內(nèi)置的lambda表達(dá)式使它更接近于Scala/Akka futures喻鳄。CompletableFuture有兩個(gè)主要的方面優(yōu)于Future – 異步回調(diào)/轉(zhuǎn)換,這能使得任何線程在任何時(shí)刻都可以設(shè)置CompletableFuture的值确封。

二除呵、使用流程簡述

首先,簡單地創(chuàng)建新的CompletableFuture并且給你的客戶端:

public CompletableFuture<String> ask() {
    final CompletableFuture<String> future = new CompletableFuture<>();
    //...
    return future;
}

客戶端代碼調(diào)用ask().get()爪喘,它將永遠(yuǎn)阻塞颜曾,直到CompletableFuture回調(diào),下面是一種回調(diào)方式:

future.complete("42");

此時(shí)此刻所有客戶端 Future(CompletableFuture).get() 將得到字符串的結(jié)果秉剑。注意的是: CompletableFuture.complete() 只能調(diào)用一次泛啸,后續(xù)調(diào)用將被忽略。但也有一個(gè)后門叫做 CompletableFuture.obtrudeValue() 覆蓋Future之前的值秃症,請(qǐng)小心使用候址。有時(shí)你想處理 CompletableFuture.complete() 調(diào)用過程拋出的異常,如果你想進(jìn)一步傳遞異常种柑,可以用 CompletableFuture.completeExceptionally(ex) (或者用 obtrudeException(ex) 這樣更強(qiáng)大的方法覆蓋前面的異常)岗仑。 completeExceptionally()也能解鎖所有等待的客戶端。前面說到get()(永久阻塞等待)聚请,其實(shí)還有 get(long timeout荠雕,TimeUnit unit) 可以設(shè)置超時(shí)時(shí)間(超時(shí)會(huì)拋出異常),也有 CompletableFuture.join( )(join方法在錯(cuò)誤處理方面有著細(xì)微的變動(dòng)驶赏。但總體上炸卑,它和get方法是一樣的)。然后也有 CompletableFuture.getNow(valueIfAbsent) 煤傍,這個(gè)方法沒有阻塞盖文,但是如果Future還沒完成將返回默認(rèn)值,這使得當(dāng)構(gòu)建那種我們不想等太久的健壯系統(tǒng)時(shí)非常有用蚯姆。
最后的方法靜態(tài) completedFuture(value) 來返回已經(jīng)完成Future的對(duì)象五续,當(dāng)測(cè)試或者寫一些適配器層時(shí)可能非常有用。

三龄恋、CompletableFuture主要Api詳述

聲明

如果CompletableFuture的方法沒有參數(shù)Executor并且以…Async結(jié)尾疙驾,它將會(huì)使用 ForkJoinPool.commonPool() (在JDK8中的通用線程池,基于Fork/join線程池和任務(wù)竊取實(shí)現(xiàn))郭毕,這適用于CompletableFuture類中的大多數(shù)的方法它碎。所以下面分析的時(shí)候可能會(huì)直接跳過命名為…Async的方法。

創(chuàng)建和獲取CompletableFuture

使用 new 關(guān)鍵字新建CompletableFuture實(shí)例并不是唯一的選擇,CompletableFuture提供了靜態(tài)工廠方法用于創(chuàng)建自身的實(shí)例:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

runAsync()易于理解扳肛,注意它需要Runnable傻挂,因此它返回CompletableFuture<Void>作為Runnable不返回任何值。如果你需要處理異步操作并返回結(jié)果敞峭,使用Supplier<U>踊谋,它是一個(gè)函數(shù)式接口,接口如下:

@FunctionalInterface
public interface Supplier<T> {
    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

可以這樣使用Supplier<U>:

final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        //...long running...
        return "42";
    }
}, executor);

換成Lambda表達(dá)式:

finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //...long running...
    return "42";
}, executor)旋讹;

轉(zhuǎn)換–CompletableFuture.thenApply()

apply一般翻譯為’作用于’殖蚕,但是在CompletableFuture中,thenApply()起到轉(zhuǎn)換結(jié)果的作用沉迹,總結(jié)來說就是疊加多個(gè)CompletableFuture的功能睦疫,把多個(gè)CompletableFuture組合在一起,跨線程池進(jìn)行異步調(diào)用鞭呕,調(diào)用的過程就是結(jié)果轉(zhuǎn)換的過程蛤育。先看下這些方法:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

其中Function<? super T,? extends U>是函數(shù)式接口,此接口如下:

@FunctionalInterface
public interface Function<T, R> {
    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);
    ...//省略其他流式方法
}

使用例子:

CompletableFuture<String> f1 =  = CompletableFuture.supplyAsync(() -> {
    return "42";
}, executor)葫松;
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

或者直接使用流式編程:

CompletableFuture<Double> f3 = CompletableFuture.supplyAsync(() -> {
    return "42";
}, executor).thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

終端運(yùn)行(消費(fèi))–CompletableFuture.thenRun()/CompletableFuture.thenAccept()

CompletableFuture有兩種典型的”最終”階段方法瓦糕,其實(shí)就是Lambda的終端方法,使用的是Consumer接口(消費(fèi)操作的接口)腋么。當(dāng)CompletableFuture的結(jié)果已經(jīng)準(zhǔn)備好咕娄,thenAccept()執(zhí)行最終消費(fèi)操作,thenRun()執(zhí)行Runnable珊擂,沒有返回值(或者說返回結(jié)果為Void)圣勒。

CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,Executor executor);
CompletableFuture<Void> thenRun(Runnable action);
CompletionStage<Void> thenRunAsync(Runnable action);
CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

下面是一個(gè)例子:

future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");

thenAccept( )/thenRun( )方法并沒有發(fā)生阻塞(即使沒有明確的executor)。它們像事件偵聽器摧扇。上例中”Continuing”消息將立即出現(xiàn)圣贸,但是這個(gè)時(shí)候thenAcceptAsync()有可能尚未執(zhí)行完。thenAccept()和thenRun()的區(qū)別是:thenAccept()是針對(duì)結(jié)果進(jìn)行消費(fèi)扛稽,因?yàn)槿雲(yún)⑹荂onsumer函數(shù)式接口吁峻,有入?yún)o返回值,而thenRun()它的入?yún)⑹且粋€(gè)Runnable的實(shí)例庇绽,表示當(dāng)?shù)玫缴弦徊降慕Y(jié)果時(shí)的操作锡搜,也就是當(dāng)?shù)玫缴弦徊降慕Y(jié)果則異步執(zhí)行Runnable。

異常處理

CompletableFuture<String> future = new CompletableFuture<>();
        try {
            throw new RuntimeException("test exception");
        }catch (Exception e){
            future.completeExceptionally(e);
            future.complete("test success");
        }
System.out.println(future.get());
//結(jié)果(觸發(fā)了completeExceptionally后瞧掺,complete將會(huì)失效):
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: test exception
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at org.throwable.TestGitA.main(TestGitA.java:22)
Caused by: java.lang.RuntimeException: test exception
    at org.throwable.TestGitA.main(TestGitA.java:17)
...

補(bǔ)償型的例子:

String result = CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         if (1 == 1) {
             throw new RuntimeException("測(cè)試一下異常情況");
         }
         return "s1";
     }).exceptionally(e -> {
         System.out.println(e.getMessage());
         return "hello world";
     }).join();
System.out.println(result);
//結(jié)果
java.lang.RuntimeException: 測(cè)試一下異常情況
hello world

全方位型的例子:

//注意這里OK為String類型
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
    if (ok != null) {
        return Integer.parseInt(ok);
    } else {
        log.warn("Problem", ex);
        return -1;
    }
});

CompletableFuture之間建立關(guān)聯(lián)

CompletableFuture的”串聯(lián)”–CompletableFuture.thenCompose()

thenCompose()方法允許你對(duì)兩個(gè)異步操作進(jìn)行流水線,第一個(gè)操作完成時(shí)凡傅,將其結(jié)果作為參數(shù)傳遞給第二個(gè)操作辟狈。你可以創(chuàng)建兩個(gè)CompletableFuture對(duì)象,對(duì)第一個(gè)CompletableFuture對(duì)象調(diào)用thenCompose() ,并向其傳遞一個(gè)函數(shù)哼转。當(dāng)?shù)谝粋€(gè)CompletableFuture執(zhí)行完畢后明未,它的結(jié)果將作為該函數(shù)的參數(shù),這個(gè)函數(shù)的返回值是以第一個(gè)CompletableFuture的返回做輸入計(jì)算出的第二個(gè)CompletableFuture對(duì)象壹蔓。

<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,CompletableFuture<U>> fn);
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,CompletableFuture<U>> fn,Executor executor);

thenCompose()是一個(gè)重要的方法趟妥,它允許構(gòu)建健壯的和異步的管道,沒有阻塞和等待的中間步驟佣蓉。在下面的事例中披摄,仔細(xì)觀察thenApply()(map)和thenCompose()(flatMap)的類型和差異,calculateRelevance()方法返回CompletableFuture實(shí)例:

CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f =  docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc)  //...

CompletableFuture的”并聯(lián)”–CompletableFuture.thenCombine()

thenCombine()用于連接兩個(gè)獨(dú)立的CompletableFuture勇凭,它接收名為 BiFunction 的第二參數(shù)疚膊,這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture 對(duì)象完成計(jì)算后結(jié)果如何合并,返回?cái)y帶計(jì)算合并結(jié)果的一個(gè)新的CompletableFuture虾标。

<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
<U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
<U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,Executor executor);

假設(shè)你有兩個(gè)CompletableFuture寓盗,一個(gè)加載Customer另一個(gè)加載最近的Shop。他們彼此完全獨(dú)立璧函,但是當(dāng)他們完成時(shí)傀蚌,您想要使用它們的值來計(jì)算Route。下面是一個(gè)例子:

CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);  //省略loadCustomerDetails方法代碼
CompletableFuture<Shop> shopFuture = closestShop();  //省略closestShop方法代碼
CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
   
//...
private Route findRoute(Customer customer, Shop shop) //...

新建customerFuture和shopFuture蘸吓。那么routeFuture包裝它們?nèi)缓蟆暗却彼鼈兺瓿缮旗拧.?dāng)它們的結(jié)果準(zhǔn)備好了,它會(huì)運(yùn)行我們提供的函數(shù)來結(jié)合所有的結(jié)果(findRoute())美澳。當(dāng)兩個(gè)基本的CompletableFuture實(shí)例完成并且findRoute()也完成時(shí)销部,這樣routeFuture將會(huì)完成。

結(jié)果記錄–CompletableFuture.whenComplete()

CompletableFuture.whenComplete()的作用是CompletableFuture運(yùn)行完成時(shí)制跟,對(duì)結(jié)果的記錄操作舅桩,記錄的操作由函數(shù)BiConsumer<? super T, ? super Throwable>完成,一般BiConsumer這種消費(fèi)操作應(yīng)該是終端操作雨膨,但是whenComplete返回的是CompletableFuture的接口的實(shí)例擂涛,這個(gè)實(shí)例就是調(diào)用whenComplete的原始CompletableFuture對(duì)象。


CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);

一個(gè)使用例子如下:

String result = CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         if (1 == 1) {
             throw new RuntimeException("測(cè)試一下異常情況");
         }
         return "s1";
     }).whenComplete((s, t) -> {
         System.out.println(s);
         System.out.println(t.getMessage());
     }).exceptionally(e -> {
         System.out.println(e.getMessage());
         return "hello world";
     }).join();
System.out.println(result);
//控制臺(tái)輸出
null
java.lang.RuntimeException: 測(cè)試一下異常情況
java.lang.RuntimeException: 測(cè)試一下異常情況
hello world

這里也可以看出聊记,如果使用了exceptionally撒妈,就會(huì)對(duì)最終的結(jié)果產(chǎn)生影響,也就證明了whenComplete返回的是原始的CompletableFuture對(duì)象排监。

結(jié)果處理–CompletableFuture.handle()

CompletableFuture.handle() 的作用是CompletableFuture運(yùn)行完成時(shí)狰右,對(duì)結(jié)果的處理。這里的完成時(shí)有兩種情況舆床,一種是正常執(zhí)行棋蚌,返回預(yù)期的值嫁佳。另外一種是遇到異常拋出造成程序的中斷。

<U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
<U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
<U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

一個(gè)出現(xiàn)異常時(shí)的例子:

String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //出現(xiàn)異常
        if (1 == 1) {
            throw new RuntimeException("測(cè)試一下異常情況");
        }
        return "s1";
    }).handle((s, t) -> {   //這里t的參數(shù)類型為Throwable谷暮。
        if (t != null) {
            return "hello world"; //這里是異常不為null時(shí)候的邏輯蒿往,可以選擇補(bǔ)償,也可以直接拋出異常t湿弦,一旦拋出異常瓤漏,調(diào)用join()的時(shí)候異常會(huì)外拋。
        }
        return s;
    }).join();
System.out.println(result);
//控制臺(tái)輸出
hello world

一個(gè)未出現(xiàn)異常時(shí)的例子:

String result = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "s1";
    }).handle((s, t) -> {
        if (t != null) {
            return "hello world";  
        }
        return s;   //未出現(xiàn)異常颊埃,實(shí)際上走到這一步
    }).join();
System.out.println(result);
//控制臺(tái)輸出
s1

合并消費(fèi)–CompletableFuture.thenAcceptBoth()

CompletableFuture.thenAcceptBoth() 用于連接兩個(gè)獨(dú)立的CompletableFuture蔬充,它接收名為BiConsumer的第二參數(shù),這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture對(duì)象完成計(jì)算后竟秫,結(jié)果如何消費(fèi)娃惯,有點(diǎn)像thenCombine,但是對(duì)于兩個(gè)CompletableFuture的計(jì)算操作是終端操作肥败,沒有返回值(或者說返回結(jié)果為Void類型)趾浅。

<U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);

一個(gè)例子如下,5000毫秒后控制臺(tái)輸出”hello world”:

CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "hello";
     }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "world";
     }), (s1, s2) -> System.out.println(s1 + " " + s2));

合并執(zhí)行–CompletableFuture.runAfterBoth()

CompletableFuture.runAfterBoth() 用于連接兩個(gè)獨(dú)立的CompletableFuture馒稍,不關(guān)心兩個(gè)CompletableFuture的計(jì)算結(jié)果皿哨,當(dāng)兩個(gè)CompletableFuture執(zhí)行完成后,執(zhí)行Runnable纽谒。

CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

一個(gè)例子如下证膨,5000毫秒后控制臺(tái)輸出”hello world”:

CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "s1";
     }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "s2";
     }), () -> System.out.println("hello world"));  //() -> System.out.println("hello world");就是Runnable的Lambda實(shí)現(xiàn)

時(shí)間優(yōu)先度執(zhí)行

CompletableFuture中有部分的API可以等待第一個(gè)完成的CompletableFuture再進(jìn)行后續(xù)操作鼓黔。當(dāng)你有兩個(gè)相同類型任務(wù)的結(jié)果時(shí)就顯得非常方便央勒,你只要關(guān)心響應(yīng)時(shí)間就行了,沒有哪個(gè)任務(wù)是優(yōu)先的(這類型的方法的好處是只關(guān)注響應(yīng)的時(shí)間澳化,除了時(shí)間這個(gè)優(yōu)先級(jí)限定崔步,沒有其他優(yōu)先級(jí))。

時(shí)間優(yōu)先轉(zhuǎn)換–CompletableFuture.applyToEither()

CompletableFuture.applyToEither() 用于連接兩個(gè)獨(dú)立的CompletableFuture缎谷,選擇計(jì)算(返回結(jié)果)最快的一個(gè)CompletableFuture井濒,進(jìn)行轉(zhuǎn)換計(jì)算操作(Function<? super T, U>)并返回結(jié)果。

<U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
<U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
<U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

我們現(xiàn)實(shí)開發(fā)場(chǎng)景中列林,總會(huì)碰到有兩種渠道完成同一個(gè)事情瑞你,所以就可以調(diào)用這個(gè)方法,找一個(gè)最快的結(jié)果進(jìn)行處理希痴。一個(gè)例子如下:

String result = CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "s1";
     }).applyToEither(CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "hello world";
     }), s -> s).join();  //2000毫秒后返回"hello world"

時(shí)間優(yōu)先消費(fèi)–CompletableFuture.acceptEither()

CompletableFuture.acceptEither() 用于連接兩個(gè)獨(dú)立的CompletableFuture者甲,選擇計(jì)算(返回結(jié)果)最快的一個(gè)CompletableFuture,進(jìn)行消費(fèi)操作(Consumer<? super T> action)砌创,無返回值过牙。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

一個(gè)例子如下:

CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "s1";
     }).acceptEither(CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "hello world";
     }), System.out::println); //2000毫秒后控制臺(tái)打印 "hello world"

時(shí)間優(yōu)先執(zhí)行–CompletableFuture.runAfterEither()

CompletableFuture.runAfterEither() 用于連接兩個(gè)獨(dú)立的CompletableFuture甥厦,不關(guān)心任何CompletableFuture的返回值纺铭,任何一個(gè)CompletableFuture執(zhí)行完畢得到了結(jié)果后會(huì)馬上執(zhí)行Runable寇钉。

CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

一個(gè)例子如下:

CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(3000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "s1";
     }).runAfterEither(CompletableFuture.supplyAsync(() -> {
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
         return "s2";
     }), () -> System.out.println("hello world")); //() -> System.out.println("hello world")是Runable的Lambda實(shí)現(xiàn)。 2000毫秒后控制臺(tái)打印 "hello world"

結(jié)果賦值

CompletableFuture的完計(jì)算結(jié)果直接賦值方法主要有以下幾個(gè):

  • boolean complete(T value)舶赔,通過CAS賦值計(jì)算結(jié)果扫倡,內(nèi)部會(huì)發(fā)送完成狀態(tài),再次調(diào)用無效竟纳。
  • boolean completeExceptionally(Throwable ex)撵溃,通過CAS賦值計(jì)算異常,內(nèi)部會(huì)發(fā)送完成狀態(tài)锥累,再次調(diào)用無效缘挑。
  • void obtrudeValue(T value),直接賦值計(jì)算結(jié)果桶略,內(nèi)部會(huì)發(fā)送完成狀態(tài)语淘,再次調(diào)用無效。
  • obtrudeException(Throwable ex)际歼,直接賦值計(jì)算異常惶翻,內(nèi)部會(huì)發(fā)送完成狀態(tài),再次調(diào)用無效鹅心。

只要上面四個(gè)方法之一被調(diào)用吕粗,CompletableFuture就會(huì)標(biāo)記為’完結(jié)狀態(tài)’,再次調(diào)用其他方法將不會(huì)起效旭愧,另外颅筋,obtrudeXXX方法屬于強(qiáng)制賦值,不建議使用输枯,因?yàn)樗鼈儠?huì)直接覆蓋當(dāng)前的值议泵。

一個(gè)例子如下:

CompletableFuture<String> future = new CompletableFuture<>();
        try {
            future.complete("test success");
        }catch (Exception e){
            future.completeExceptionally(e);
        }
System.out.println(future.get());

//輸出 test success

結(jié)果獲取

  • T get() throws InterruptedException, ExecutionException ,永久阻塞用押,直到返回結(jié)果值肢簿,允許中斷,計(jì)算過程中所有的異常會(huì)包裹為新的ExecutionException實(shí)例再拋出蜻拨。
  • T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException 池充,添加超時(shí)時(shí)間設(shè)定,如果超時(shí)會(huì)拋出TimeoutException缎讼,如果獲取到結(jié)果則釋放并返回收夸,允許中斷,計(jì)算過程中所有的異常會(huì)包裹為新的ExecutionException實(shí)例再拋出血崭。
  • T join() 卧惜,永久阻塞厘灼,直到返回結(jié)果值,不允許中斷咽瓷,計(jì)算過程中所有的異常會(huì)直接拋出设凹。
  • T getNow(T valueIfAbsent) ,如果當(dāng)前的計(jì)算結(jié)果為null茅姜,馬上返回valueIfAbsent闪朱,否則調(diào)用join()的邏輯。
    結(jié)果的獲取不做舉例钻洒,因?yàn)檫@個(gè)實(shí)在太常用奋姿,強(qiáng)烈建議使用T get(long timeout, TimeUnit unit),其他三個(gè)方法看場(chǎng)景選擇使用素标。

其它

取消–cancel()

調(diào)用CompletableFuture實(shí)例的 cancel() 方法可以取消當(dāng)前的CompletableFuture称诗,此時(shí)該CompletableFuture實(shí)例會(huì)進(jìn)入’完結(jié)狀態(tài)’,其結(jié)果會(huì)傳入一個(gè)新的CancellationException實(shí)例头遭,此時(shí)通過上一節(jié)的’結(jié)果獲取’中的Api調(diào)用就會(huì)按各自的處理模式拋出異常寓免。

所有完成

調(diào)用CompletableFuture的靜態(tài)方法 CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) ,當(dāng)所有的傳入的CompletableFuture實(shí)例都完成的時(shí)候任岸,會(huì)返回一個(gè)新建的CompletableFuture再榄,也就是程序?qū)?huì)阻塞在此方法調(diào)用,直到所有傳入CompletableFuture都完成享潜,這個(gè)時(shí)候返回值CompletableFuture實(shí)例也完成困鸥。舉個(gè)例子:CompletableFuture.allOf(cf1,cf2).join() ;,其中cf1剑按、cf2是兩個(gè)獨(dú)立的CompletableFuture實(shí)例疾就。如果你的程序有這么一段代碼,那么執(zhí)行的時(shí)候會(huì)阻塞在此艺蝴,直到cf1和cf2都完成了猬腰,才會(huì)釋放。

任一完成

調(diào)用CompletableFuture的靜態(tài)方法 CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)猜敢, 這個(gè)方法和上面的’所有完成’是相對(duì)的姑荷。當(dāng)所有的傳入的CompletableFuture實(shí)例中只要有一個(gè)實(shí)例完成的時(shí)候,會(huì)返回一個(gè)新建的CompletableFuture缩擂,也就是程序?qū)?huì)阻塞在此方法調(diào)用鼠冕,直到有一個(gè)傳入的CompletableFuture完成,這個(gè)時(shí)候返回值CompletableFuture實(shí)例也完成胯盯。舉個(gè)例子: CompletableFuture.anyOf(cf1,cf2).join(); 懈费,其中cf1、cf2是兩個(gè)獨(dú)立的CompletableFuture實(shí)例博脑。如果你的程序有這么一段代碼憎乙,那么執(zhí)行的時(shí)候會(huì)阻塞在此票罐,直到cf1或cf2其中一個(gè)完成了,才會(huì)釋放泞边。

四该押、實(shí)戰(zhàn)例子

個(gè)人認(rèn)為,CompletableFuture使用在API網(wǎng)關(guān)做接口的聚合是什么有優(yōu)勢(shì)的(不知道為什么身邊很少人使用這個(gè)神器)》北ぃ現(xiàn)在假設(shè)有一個(gè)API網(wǎng)關(guān)沈善,在調(diào)用查詢用戶某個(gè)訂單詳情的時(shí)候,需要分別從訂單服務(wù)的訂單信息接口椭蹄、用戶服務(wù)的用戶信息接口兩個(gè)接口拉取數(shù)據(jù),一般來說净赴,低效的偽代碼大概如下:

//這兩個(gè)參數(shù)從外部獲得
Long userId = 10006L;
String orderId = "XXXXXXXXXXXXXXXXXXXXXX";
//從用戶服務(wù)獲取用戶信息
UserInfo userInfo = userService.getUserInfo(userId);
//從用訂單務(wù)獲取訂單信息
OrderInfo orderInfo = orderService.getOrderInfo(orderId);
//返回兩者的聚合DTO
return new OrderDetailDTO(userInfo,orderInfo);

其實(shí)如果微服務(wù)設(shè)計(jì)得足夠好绳矩,下面三個(gè)外部接口的信息一定是不相關(guān)聯(lián)的,也就是可以并行獲取玖翅,三個(gè)接口的結(jié)果都獲取完畢之后做一次數(shù)據(jù)聚合到DTO即可翼馆,也就是聚合的耗時(shí)大致是這三個(gè)接口中耗時(shí)最長的接口的響應(yīng)時(shí)間。修改后的代碼如下:

@Service
public class OrderDetailService {
    /**
     * 建立一個(gè)線程池專門交給CompletableFuture使用
     */
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100));
    @Autowired
    private UserService userService;
    @Autowired
    private OrderService orderService;
    public OrderDetailDTO getOrderDetail(Long userId, String orderId) throws Exception {
        CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId), executor);
        CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(orderId), executor);
        CompletableFuture<OrderDetailDTO> result
                = userInfoCompletableFuture.thenCombineAsync(orderInfoCompletableFuture, OrderDetailDTO::new, executor);
        return result.get();
    }
}

上面的代碼還沒有考慮到外部的微服務(wù)異常的情況金度,但是相對(duì)串行的拉取外部信息的接口的操作方式应媚,這種并行的方式顯然是更加高效的,而且CompletableFuture的supplyAsync方法可以傳入Supplier接口實(shí)例猜极,也就是允許任何參數(shù)類型的表達(dá)式中姜,這點(diǎn)是什么方便的。當(dāng)然跟伏,其實(shí)用ExecutorService的invokeAll方法也可以達(dá)到相同的效果.

參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載丢胚,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者。
  • 序言:七十年代末受扳,一起剝皮案震驚了整個(gè)濱河市携龟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌勘高,老刑警劉巖峡蟋,帶你破解...
    沈念sama閱讀 212,599評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異华望,居然都是意外死亡蕊蝗,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門立美,熙熙樓的掌柜王于貴愁眉苦臉地迎上來匿又,“玉大人,你說我怎么就攤上這事建蹄÷蹈” “怎么了裕偿?”我有些...
    開封第一講書人閱讀 158,084評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長痛单。 經(jīng)常有香客問我嘿棘,道長,這世上最難降的妖魔是什么旭绒? 我笑而不...
    開封第一講書人閱讀 56,708評(píng)論 1 284
  • 正文 為了忘掉前任鸟妙,我火速辦了婚禮,結(jié)果婚禮上挥吵,老公的妹妹穿的比我還像新娘重父。我一直安慰自己,他們只是感情好忽匈,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評(píng)論 6 386
  • 文/花漫 我一把揭開白布房午。 她就那樣靜靜地躺著,像睡著了一般丹允。 火紅的嫁衣襯著肌膚如雪郭厌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,021評(píng)論 1 291
  • 那天雕蔽,我揣著相機(jī)與錄音折柠,去河邊找鬼。 笑死批狐,一個(gè)胖子當(dāng)著我的面吹牛扇售,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播贾陷,決...
    沈念sama閱讀 39,120評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼缘眶,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了髓废?” 一聲冷哼從身側(cè)響起毅访,我...
    開封第一講書人閱讀 37,866評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤哭廉,失蹤者是張志新(化名)和其女友劉穎窿祥,沒想到半個(gè)月后穗慕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡冈爹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評(píng)論 2 327
  • 正文 我和宋清朗相戀三年涌攻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片频伤。...
    茶點(diǎn)故事閱讀 38,768評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡恳谎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情因痛,我是刑警寧澤婚苹,帶...
    沈念sama閱讀 34,461評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站鸵膏,受9級(jí)特大地震影響膊升,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜谭企,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評(píng)論 3 317
  • 文/蒙蒙 一廓译、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧债查,春花似錦非区、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至速和,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間剥汤,已是汗流浹背颠放。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評(píng)論 1 267
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留吭敢,地道東北人碰凶。 一個(gè)月前我還...
    沈念sama閱讀 46,571評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像鹿驼,于是被迫代替她去往敵國和親欲低。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容