筆者所有文章第一時(shí)間發(fā)布于:
hhbbz的個(gè)人博客
一、簡介
java.util.concurrent.CompletableFuture繼承于java.util.concurrent.Future,它本身具備Future的所有特性打厘,并且基于JDK1.8的流式編程以及Lambda表達(dá)式等實(shí)現(xiàn)一元操作符、異步性以及事件驅(qū)動(dòng)編程模型,它的靈活性和更強(qiáng)大的功能是Future無法比擬的磅网。它提供了一共有50多種Api,這些Api的注釋比較少筷屡,命名也比較生澀涧偷,下面將會(huì)分類講解它們的使用方式簸喂。CompletableFuture吸收了所有ListenableFuture(Guava)和SettableFuture的優(yōu)點(diǎn)。此外燎潮,內(nèi)置的lambda表達(dá)式使它更接近于Scala/Akka futures喻鳄。CompletableFuture有兩個(gè)主要的方面優(yōu)于Future – 異步回調(diào)/轉(zhuǎn)換,這能使得任何線程在任何時(shí)刻都可以設(shè)置CompletableFuture的值确封。
二除呵、使用流程簡述
首先,簡單地創(chuàng)建新的CompletableFuture并且給你的客戶端:
public CompletableFuture<String> ask() {
final CompletableFuture<String> future = new CompletableFuture<>();
//...
return future;
}
客戶端代碼調(diào)用ask().get()爪喘,它將永遠(yuǎn)阻塞颜曾,直到CompletableFuture回調(diào),下面是一種回調(diào)方式:
future.complete("42");
此時(shí)此刻所有客戶端 Future(CompletableFuture).get() 將得到字符串的結(jié)果秉剑。注意的是: CompletableFuture.complete() 只能調(diào)用一次泛啸,后續(xù)調(diào)用將被忽略。但也有一個(gè)后門叫做 CompletableFuture.obtrudeValue() 覆蓋Future之前的值秃症,請(qǐng)小心使用候址。有時(shí)你想處理 CompletableFuture.complete() 調(diào)用過程拋出的異常,如果你想進(jìn)一步傳遞異常种柑,可以用 CompletableFuture.completeExceptionally(ex) (或者用 obtrudeException(ex) 這樣更強(qiáng)大的方法覆蓋前面的異常)岗仑。 completeExceptionally()也能解鎖所有等待的客戶端。前面說到get()(永久阻塞等待)聚请,其實(shí)還有 get(long timeout荠雕,TimeUnit unit) 可以設(shè)置超時(shí)時(shí)間(超時(shí)會(huì)拋出異常),也有 CompletableFuture.join( )(join方法在錯(cuò)誤處理方面有著細(xì)微的變動(dòng)驶赏。但總體上炸卑,它和get方法是一樣的)。然后也有 CompletableFuture.getNow(valueIfAbsent) 煤傍,這個(gè)方法沒有阻塞盖文,但是如果Future還沒完成將返回默認(rèn)值,這使得當(dāng)構(gòu)建那種我們不想等太久的健壯系統(tǒng)時(shí)非常有用蚯姆。
最后的方法靜態(tài) completedFuture(value) 來返回已經(jīng)完成Future的對(duì)象五续,當(dāng)測(cè)試或者寫一些適配器層時(shí)可能非常有用。
三龄恋、CompletableFuture主要Api詳述
聲明
如果CompletableFuture的方法沒有參數(shù)Executor并且以…Async結(jié)尾疙驾,它將會(huì)使用 ForkJoinPool.commonPool() (在JDK8中的通用線程池,基于Fork/join線程池和任務(wù)竊取實(shí)現(xiàn))郭毕,這適用于CompletableFuture類中的大多數(shù)的方法它碎。所以下面分析的時(shí)候可能會(huì)直接跳過命名為…Async的方法。
創(chuàng)建和獲取CompletableFuture
使用 new 關(guān)鍵字新建CompletableFuture實(shí)例并不是唯一的選擇,CompletableFuture提供了靜態(tài)工廠方法用于創(chuàng)建自身的實(shí)例:
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
runAsync()易于理解扳肛,注意它需要Runnable傻挂,因此它返回CompletableFuture<Void>作為Runnable不返回任何值。如果你需要處理異步操作并返回結(jié)果敞峭,使用Supplier<U>踊谋,它是一個(gè)函數(shù)式接口,接口如下:
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get();
}
可以這樣使用Supplier<U>:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
//...long running...
return "42";
}
}, executor);
換成Lambda表達(dá)式:
finalCompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//...long running...
return "42";
}, executor)旋讹;
轉(zhuǎn)換–CompletableFuture.thenApply()
apply一般翻譯為’作用于’殖蚕,但是在CompletableFuture中,thenApply()起到轉(zhuǎn)換結(jié)果的作用沉迹,總結(jié)來說就是疊加多個(gè)CompletableFuture的功能睦疫,把多個(gè)CompletableFuture組合在一起,跨線程池進(jìn)行異步調(diào)用鞭呕,調(diào)用的過程就是結(jié)果轉(zhuǎn)換的過程蛤育。先看下這些方法:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
其中Function<? super T,? extends U>是函數(shù)式接口,此接口如下:
@FunctionalInterface
public interface Function<T, R> {
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
...//省略其他流式方法
}
使用例子:
CompletableFuture<String> f1 = = CompletableFuture.supplyAsync(() -> {
return "42";
}, executor)葫松;
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
或者直接使用流式編程:
CompletableFuture<Double> f3 = CompletableFuture.supplyAsync(() -> {
return "42";
}, executor).thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
終端運(yùn)行(消費(fèi))–CompletableFuture.thenRun()/CompletableFuture.thenAccept()
CompletableFuture有兩種典型的”最終”階段方法瓦糕,其實(shí)就是Lambda的終端方法,使用的是Consumer接口(消費(fèi)操作的接口)腋么。當(dāng)CompletableFuture的結(jié)果已經(jīng)準(zhǔn)備好咕娄,thenAccept()執(zhí)行最終消費(fèi)操作,thenRun()執(zhí)行Runnable珊擂,沒有返回值(或者說返回結(jié)果為Void)圣勒。
CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block);
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,Executor executor);
CompletableFuture<Void> thenRun(Runnable action);
CompletionStage<Void> thenRunAsync(Runnable action);
CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
下面是一個(gè)例子:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor);
log.debug("Continuing");
thenAccept( )/thenRun( )方法并沒有發(fā)生阻塞(即使沒有明確的executor)。它們像事件偵聽器摧扇。上例中”Continuing”消息將立即出現(xiàn)圣贸,但是這個(gè)時(shí)候thenAcceptAsync()有可能尚未執(zhí)行完。thenAccept()和thenRun()的區(qū)別是:thenAccept()是針對(duì)結(jié)果進(jìn)行消費(fèi)扛稽,因?yàn)槿雲(yún)⑹荂onsumer函數(shù)式接口吁峻,有入?yún)o返回值,而thenRun()它的入?yún)⑹且粋€(gè)Runnable的實(shí)例庇绽,表示當(dāng)?shù)玫缴弦徊降慕Y(jié)果時(shí)的操作锡搜,也就是當(dāng)?shù)玫缴弦徊降慕Y(jié)果則異步執(zhí)行Runnable。
異常處理
CompletableFuture<String> future = new CompletableFuture<>();
try {
throw new RuntimeException("test exception");
}catch (Exception e){
future.completeExceptionally(e);
future.complete("test success");
}
System.out.println(future.get());
//結(jié)果(觸發(fā)了completeExceptionally后瞧掺,complete將會(huì)失效):
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: test exception
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.throwable.TestGitA.main(TestGitA.java:22)
Caused by: java.lang.RuntimeException: test exception
at org.throwable.TestGitA.main(TestGitA.java:17)
...
補(bǔ)償型的例子:
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測(cè)試一下異常情況");
}
return "s1";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
//結(jié)果
java.lang.RuntimeException: 測(cè)試一下異常情況
hello world
全方位型的例子:
//注意這里OK為String類型
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return Integer.parseInt(ok);
} else {
log.warn("Problem", ex);
return -1;
}
});
CompletableFuture之間建立關(guān)聯(lián)
CompletableFuture的”串聯(lián)”–CompletableFuture.thenCompose()
thenCompose()方法允許你對(duì)兩個(gè)異步操作進(jìn)行流水線,第一個(gè)操作完成時(shí)凡傅,將其結(jié)果作為參數(shù)傳遞給第二個(gè)操作辟狈。你可以創(chuàng)建兩個(gè)CompletableFuture對(duì)象,對(duì)第一個(gè)CompletableFuture對(duì)象調(diào)用thenCompose() ,并向其傳遞一個(gè)函數(shù)哼转。當(dāng)?shù)谝粋€(gè)CompletableFuture執(zhí)行完畢后明未,它的結(jié)果將作為該函數(shù)的參數(shù),這個(gè)函數(shù)的返回值是以第一個(gè)CompletableFuture的返回做輸入計(jì)算出的第二個(gè)CompletableFuture對(duì)象壹蔓。
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,CompletableFuture<U>> fn);
<U> CompletableFuture<U> thenComposeAsync(Function<? super T,CompletableFuture<U>> fn,Executor executor);
thenCompose()是一個(gè)重要的方法趟妥,它允許構(gòu)建健壯的和異步的管道,沒有阻塞和等待的中間步驟佣蓉。在下面的事例中披摄,仔細(xì)觀察thenApply()(map)和thenCompose()(flatMap)的類型和差異,calculateRelevance()方法返回CompletableFuture實(shí)例:
CompletableFuture<Document> docFuture = //...
CompletableFuture<CompletableFuture<Double>> f = docFuture.thenApply(this::calculateRelevance);
CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance);
//...
private CompletableFuture<Double> calculateRelevance(Document doc) //...
CompletableFuture的”并聯(lián)”–CompletableFuture.thenCombine()
thenCombine()用于連接兩個(gè)獨(dú)立的CompletableFuture勇凭,它接收名為 BiFunction 的第二參數(shù)疚膊,這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture 對(duì)象完成計(jì)算后結(jié)果如何合并,返回?cái)y帶計(jì)算合并結(jié)果的一個(gè)新的CompletableFuture虾标。
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
<U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
<U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn,Executor executor);
假設(shè)你有兩個(gè)CompletableFuture寓盗,一個(gè)加載Customer另一個(gè)加載最近的Shop。他們彼此完全獨(dú)立璧函,但是當(dāng)他們完成時(shí)傀蚌,您想要使用它們的值來計(jì)算Route。下面是一個(gè)例子:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123); //省略loadCustomerDetails方法代碼
CompletableFuture<Shop> shopFuture = closestShop(); //省略closestShop方法代碼
CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...
新建customerFuture和shopFuture蘸吓。那么routeFuture包裝它們?nèi)缓蟆暗却彼鼈兺瓿缮旗拧.?dāng)它們的結(jié)果準(zhǔn)備好了,它會(huì)運(yùn)行我們提供的函數(shù)來結(jié)合所有的結(jié)果(findRoute())美澳。當(dāng)兩個(gè)基本的CompletableFuture實(shí)例完成并且findRoute()也完成時(shí)销部,這樣routeFuture將會(huì)完成。
結(jié)果記錄–CompletableFuture.whenComplete()
CompletableFuture.whenComplete()的作用是CompletableFuture運(yùn)行完成時(shí)制跟,對(duì)結(jié)果的記錄操作舅桩,記錄的操作由函數(shù)BiConsumer<? super T, ? super Throwable>完成,一般BiConsumer這種消費(fèi)操作應(yīng)該是終端操作雨膨,但是whenComplete返回的是CompletableFuture的接口的實(shí)例擂涛,這個(gè)實(shí)例就是調(diào)用whenComplete的原始CompletableFuture對(duì)象。
CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
一個(gè)使用例子如下:
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測(cè)試一下異常情況");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println(s);
System.out.println(t.getMessage());
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
//控制臺(tái)輸出
null
java.lang.RuntimeException: 測(cè)試一下異常情況
java.lang.RuntimeException: 測(cè)試一下異常情況
hello world
這里也可以看出聊记,如果使用了exceptionally撒妈,就會(huì)對(duì)最終的結(jié)果產(chǎn)生影響,也就證明了whenComplete返回的是原始的CompletableFuture對(duì)象排监。
結(jié)果處理–CompletableFuture.handle()
CompletableFuture.handle() 的作用是CompletableFuture運(yùn)行完成時(shí)狰右,對(duì)結(jié)果的處理。這里的完成時(shí)有兩種情況舆床,一種是正常執(zhí)行棋蚌,返回預(yù)期的值嫁佳。另外一種是遇到異常拋出造成程序的中斷。
<U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
<U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
<U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
一個(gè)出現(xiàn)異常時(shí)的例子:
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//出現(xiàn)異常
if (1 == 1) {
throw new RuntimeException("測(cè)試一下異常情況");
}
return "s1";
}).handle((s, t) -> { //這里t的參數(shù)類型為Throwable谷暮。
if (t != null) {
return "hello world"; //這里是異常不為null時(shí)候的邏輯蒿往,可以選擇補(bǔ)償,也可以直接拋出異常t湿弦,一旦拋出異常瓤漏,調(diào)用join()的時(shí)候異常會(huì)外拋。
}
return s;
}).join();
System.out.println(result);
//控制臺(tái)輸出
hello world
一個(gè)未出現(xiàn)異常時(shí)的例子:
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s; //未出現(xiàn)異常颊埃,實(shí)際上走到這一步
}).join();
System.out.println(result);
//控制臺(tái)輸出
s1
合并消費(fèi)–CompletableFuture.thenAcceptBoth()
CompletableFuture.thenAcceptBoth() 用于連接兩個(gè)獨(dú)立的CompletableFuture蔬充,它接收名為BiConsumer的第二參數(shù),這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture對(duì)象完成計(jì)算后竟秫,結(jié)果如何消費(fèi)娃惯,有點(diǎn)像thenCombine,但是對(duì)于兩個(gè)CompletableFuture的計(jì)算操作是終端操作肥败,沒有返回值(或者說返回結(jié)果為Void類型)趾浅。
<U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
一個(gè)例子如下,5000毫秒后控制臺(tái)輸出”hello world”:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> System.out.println(s1 + " " + s2));
合并執(zhí)行–CompletableFuture.runAfterBoth()
CompletableFuture.runAfterBoth() 用于連接兩個(gè)獨(dú)立的CompletableFuture馒稍,不關(guān)心兩個(gè)CompletableFuture的計(jì)算結(jié)果皿哨,當(dāng)兩個(gè)CompletableFuture執(zhí)行完成后,執(zhí)行Runnable纽谒。
CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
一個(gè)例子如下证膨,5000毫秒后控制臺(tái)輸出”hello world”:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world")); //() -> System.out.println("hello world");就是Runnable的Lambda實(shí)現(xiàn)
時(shí)間優(yōu)先度執(zhí)行
CompletableFuture中有部分的API可以等待第一個(gè)完成的CompletableFuture再進(jìn)行后續(xù)操作鼓黔。當(dāng)你有兩個(gè)相同類型任務(wù)的結(jié)果時(shí)就顯得非常方便央勒,你只要關(guān)心響應(yīng)時(shí)間就行了,沒有哪個(gè)任務(wù)是優(yōu)先的(這類型的方法的好處是只關(guān)注響應(yīng)的時(shí)間澳化,除了時(shí)間這個(gè)優(yōu)先級(jí)限定崔步,沒有其他優(yōu)先級(jí))。
時(shí)間優(yōu)先轉(zhuǎn)換–CompletableFuture.applyToEither()
CompletableFuture.applyToEither() 用于連接兩個(gè)獨(dú)立的CompletableFuture缎谷,選擇計(jì)算(返回結(jié)果)最快的一個(gè)CompletableFuture井濒,進(jìn)行轉(zhuǎn)換計(jì)算操作(Function<? super T, U>)并返回結(jié)果。
<U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
<U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
<U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
我們現(xiàn)實(shí)開發(fā)場(chǎng)景中列林,總會(huì)碰到有兩種渠道完成同一個(gè)事情瑞你,所以就可以調(diào)用這個(gè)方法,找一個(gè)最快的結(jié)果進(jìn)行處理希痴。一個(gè)例子如下:
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), s -> s).join(); //2000毫秒后返回"hello world"
時(shí)間優(yōu)先消費(fèi)–CompletableFuture.acceptEither()
CompletableFuture.acceptEither() 用于連接兩個(gè)獨(dú)立的CompletableFuture者甲,選擇計(jì)算(返回結(jié)果)最快的一個(gè)CompletableFuture,進(jìn)行消費(fèi)操作(Consumer<? super T> action)砌创,無返回值过牙。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
一個(gè)例子如下:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).acceptEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), System.out::println); //2000毫秒后控制臺(tái)打印 "hello world"
時(shí)間優(yōu)先執(zhí)行–CompletableFuture.runAfterEither()
CompletableFuture.runAfterEither() 用于連接兩個(gè)獨(dú)立的CompletableFuture甥厦,不關(guān)心任何CompletableFuture的返回值纺铭,任何一個(gè)CompletableFuture執(zhí)行完畢得到了結(jié)果后會(huì)馬上執(zhí)行Runable寇钉。
CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
一個(gè)例子如下:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world")); //() -> System.out.println("hello world")是Runable的Lambda實(shí)現(xiàn)。 2000毫秒后控制臺(tái)打印 "hello world"
結(jié)果賦值
CompletableFuture的完計(jì)算結(jié)果直接賦值方法主要有以下幾個(gè):
- boolean complete(T value)舶赔,通過CAS賦值計(jì)算結(jié)果扫倡,內(nèi)部會(huì)發(fā)送完成狀態(tài),再次調(diào)用無效竟纳。
- boolean completeExceptionally(Throwable ex)撵溃,通過CAS賦值計(jì)算異常,內(nèi)部會(huì)發(fā)送完成狀態(tài)锥累,再次調(diào)用無效缘挑。
- void obtrudeValue(T value),直接賦值計(jì)算結(jié)果桶略,內(nèi)部會(huì)發(fā)送完成狀態(tài)语淘,再次調(diào)用無效。
- obtrudeException(Throwable ex)际歼,直接賦值計(jì)算異常惶翻,內(nèi)部會(huì)發(fā)送完成狀態(tài),再次調(diào)用無效鹅心。
只要上面四個(gè)方法之一被調(diào)用吕粗,CompletableFuture就會(huì)標(biāo)記為’完結(jié)狀態(tài)’,再次調(diào)用其他方法將不會(huì)起效旭愧,另外颅筋,obtrudeXXX方法屬于強(qiáng)制賦值,不建議使用输枯,因?yàn)樗鼈儠?huì)直接覆蓋當(dāng)前的值议泵。
一個(gè)例子如下:
CompletableFuture<String> future = new CompletableFuture<>();
try {
future.complete("test success");
}catch (Exception e){
future.completeExceptionally(e);
}
System.out.println(future.get());
//輸出 test success
結(jié)果獲取
- T get() throws InterruptedException, ExecutionException ,永久阻塞用押,直到返回結(jié)果值肢簿,允許中斷,計(jì)算過程中所有的異常會(huì)包裹為新的ExecutionException實(shí)例再拋出蜻拨。
- T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException 池充,添加超時(shí)時(shí)間設(shè)定,如果超時(shí)會(huì)拋出TimeoutException缎讼,如果獲取到結(jié)果則釋放并返回收夸,允許中斷,計(jì)算過程中所有的異常會(huì)包裹為新的ExecutionException實(shí)例再拋出血崭。
- T join() 卧惜,永久阻塞厘灼,直到返回結(jié)果值,不允許中斷咽瓷,計(jì)算過程中所有的異常會(huì)直接拋出设凹。
-
T getNow(T valueIfAbsent) ,如果當(dāng)前的計(jì)算結(jié)果為null茅姜,馬上返回valueIfAbsent闪朱,否則調(diào)用join()的邏輯。
結(jié)果的獲取不做舉例钻洒,因?yàn)檫@個(gè)實(shí)在太常用奋姿,強(qiáng)烈建議使用T get(long timeout, TimeUnit unit),其他三個(gè)方法看場(chǎng)景選擇使用素标。
其它
取消–cancel()
調(diào)用CompletableFuture實(shí)例的 cancel() 方法可以取消當(dāng)前的CompletableFuture称诗,此時(shí)該CompletableFuture實(shí)例會(huì)進(jìn)入’完結(jié)狀態(tài)’,其結(jié)果會(huì)傳入一個(gè)新的CancellationException實(shí)例头遭,此時(shí)通過上一節(jié)的’結(jié)果獲取’中的Api調(diào)用就會(huì)按各自的處理模式拋出異常寓免。
所有完成
調(diào)用CompletableFuture的靜態(tài)方法 CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) ,當(dāng)所有的傳入的CompletableFuture實(shí)例都完成的時(shí)候任岸,會(huì)返回一個(gè)新建的CompletableFuture再榄,也就是程序?qū)?huì)阻塞在此方法調(diào)用,直到所有傳入CompletableFuture都完成享潜,這個(gè)時(shí)候返回值CompletableFuture實(shí)例也完成困鸥。舉個(gè)例子:CompletableFuture.allOf(cf1,cf2).join() ;,其中cf1剑按、cf2是兩個(gè)獨(dú)立的CompletableFuture實(shí)例疾就。如果你的程序有這么一段代碼,那么執(zhí)行的時(shí)候會(huì)阻塞在此艺蝴,直到cf1和cf2都完成了猬腰,才會(huì)釋放。
任一完成
調(diào)用CompletableFuture的靜態(tài)方法 CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)猜敢, 這個(gè)方法和上面的’所有完成’是相對(duì)的姑荷。當(dāng)所有的傳入的CompletableFuture實(shí)例中只要有一個(gè)實(shí)例完成的時(shí)候,會(huì)返回一個(gè)新建的CompletableFuture缩擂,也就是程序?qū)?huì)阻塞在此方法調(diào)用鼠冕,直到有一個(gè)傳入的CompletableFuture完成,這個(gè)時(shí)候返回值CompletableFuture實(shí)例也完成胯盯。舉個(gè)例子: CompletableFuture.anyOf(cf1,cf2).join(); 懈费,其中cf1、cf2是兩個(gè)獨(dú)立的CompletableFuture實(shí)例博脑。如果你的程序有這么一段代碼憎乙,那么執(zhí)行的時(shí)候會(huì)阻塞在此票罐,直到cf1或cf2其中一個(gè)完成了,才會(huì)釋放泞边。
四该押、實(shí)戰(zhàn)例子
個(gè)人認(rèn)為,CompletableFuture使用在API網(wǎng)關(guān)做接口的聚合是什么有優(yōu)勢(shì)的(不知道為什么身邊很少人使用這個(gè)神器)》北ぃ現(xiàn)在假設(shè)有一個(gè)API網(wǎng)關(guān)沈善,在調(diào)用查詢用戶某個(gè)訂單詳情的時(shí)候,需要分別從訂單服務(wù)的訂單信息接口椭蹄、用戶服務(wù)的用戶信息接口兩個(gè)接口拉取數(shù)據(jù),一般來說净赴,低效的偽代碼大概如下:
//這兩個(gè)參數(shù)從外部獲得
Long userId = 10006L;
String orderId = "XXXXXXXXXXXXXXXXXXXXXX";
//從用戶服務(wù)獲取用戶信息
UserInfo userInfo = userService.getUserInfo(userId);
//從用訂單務(wù)獲取訂單信息
OrderInfo orderInfo = orderService.getOrderInfo(orderId);
//返回兩者的聚合DTO
return new OrderDetailDTO(userInfo,orderInfo);
其實(shí)如果微服務(wù)設(shè)計(jì)得足夠好绳矩,下面三個(gè)外部接口的信息一定是不相關(guān)聯(lián)的,也就是可以并行獲取玖翅,三個(gè)接口的結(jié)果都獲取完畢之后做一次數(shù)據(jù)聚合到DTO即可翼馆,也就是聚合的耗時(shí)大致是這三個(gè)接口中耗時(shí)最長的接口的響應(yīng)時(shí)間。修改后的代碼如下:
@Service
public class OrderDetailService {
/**
* 建立一個(gè)線程池專門交給CompletableFuture使用
*/
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100));
@Autowired
private UserService userService;
@Autowired
private OrderService orderService;
public OrderDetailDTO getOrderDetail(Long userId, String orderId) throws Exception {
CompletableFuture<UserInfo> userInfoCompletableFuture = CompletableFuture.supplyAsync(() -> userService.getUserInfo(userId), executor);
CompletableFuture<OrderInfo> orderInfoCompletableFuture = CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(orderId), executor);
CompletableFuture<OrderDetailDTO> result
= userInfoCompletableFuture.thenCombineAsync(orderInfoCompletableFuture, OrderDetailDTO::new, executor);
return result.get();
}
}
上面的代碼還沒有考慮到外部的微服務(wù)異常的情況金度,但是相對(duì)串行的拉取外部信息的接口的操作方式应媚,這種并行的方式顯然是更加高效的,而且CompletableFuture的supplyAsync方法可以傳入Supplier接口實(shí)例猜极,也就是允許任何參數(shù)類型的表達(dá)式中姜,這點(diǎn)是什么方便的。當(dāng)然跟伏,其實(shí)用ExecutorService的invokeAll方法也可以達(dá)到相同的效果.