1.簡(jiǎn)介
本文是CompletableFuture類(lèi)的功能和用例的指南- 作為Java 8 Concurrency API改進(jìn)而引入秀鞭。
2. Java中的異步計(jì)算
異步計(jì)算很難推理。通常我們希望將任何計(jì)算視為一系列步驟扛禽。但是在異步計(jì)算的情況下锋边,表示為回調(diào)的動(dòng)作往往分散在代碼中或者深深地嵌套在彼此內(nèi)部。當(dāng)我們需要處理其中一個(gè)步驟中可能發(fā)生的錯(cuò)誤時(shí)编曼,情況變得更糟豆巨。
Future接口是Java 5中添加作為異步計(jì)算的結(jié)果,但它沒(méi)有任何方法掐场,這些計(jì)算組合或處理可能出現(xiàn)的錯(cuò)誤往扔。
在Java 8中,引入了CompletableFuture類(lèi)熊户。與Future接口一起萍膛,它還實(shí)現(xiàn)了CompletionStage接口。此接口定義了可與其他步驟組合的異步計(jì)算步驟的契約嚷堡。
CompletableFuture同時(shí)是一個(gè)構(gòu)建塊和一個(gè)框架卦羡,具有大約50種不同的組合,兼容,執(zhí)行異步計(jì)算步驟和處理錯(cuò)誤的方法绿饵。
如此龐大的API可能會(huì)令人難以招架,但這些API大多屬于幾個(gè)明確且不同的用例瓶颠。
3.使用CompletableFuture作為簡(jiǎn)單的Future
首先拟赊,CompletableFuture類(lèi)實(shí)現(xiàn)Future接口,因此您可以將其用作Future實(shí)現(xiàn)粹淋,但具有額外的完成邏輯吸祟。
例如,您可以使用no-arg構(gòu)造函數(shù)創(chuàng)建此類(lèi)的實(shí)例桃移,以表示Future的某些結(jié)果屋匕,將其交給使用者,并在將來(lái)的某個(gè)時(shí)間使用complete方法完成借杰。消費(fèi)者可以使用get方法來(lái)阻止當(dāng)前線程过吻,直到提供此結(jié)果。
在下面的示例中蔗衡,我們有一個(gè)創(chuàng)建CompletableFuture實(shí)例的方法纤虽,然后在另一個(gè)線程中旋轉(zhuǎn)一些計(jì)算并立即返回Future。
計(jì)算完成后绞惦,該方法通過(guò)將結(jié)果提供給完整方法來(lái)完成Future:
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture;
}
為了分離計(jì)算逼纸,我們使用了“Java中的線程池簡(jiǎn)介”一文中描述的Executor API ,但是這種創(chuàng)建和完成CompletableFuture的方法可以與任何并發(fā)機(jī)制或API(包括原始線程)一起使用济蝉。
請(qǐng)注意杰刽,該calculateAsync方法返回一個(gè)未來(lái)的實(shí)例。
我們只是調(diào)用方法王滤,接收Future實(shí)例并在我們準(zhǔn)備阻塞結(jié)果時(shí)調(diào)用它的get方法贺嫂。
另請(qǐng)注意,get方法拋出一些已檢查的異常淑仆,即ExecutionException(封裝計(jì)算期間發(fā)生的異常)和InterruptedException(表示執(zhí)行方法的線程被中斷的異常):
Future<String> completableFuture = calculateAsync();
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
如果您已經(jīng)知道計(jì)算的結(jié)果涝婉,則可以將static completedFuture方法與表示此計(jì)算結(jié)果的參數(shù)一起使用。然后蔗怠,F(xiàn)uture的get方法永遠(yuǎn)不會(huì)阻塞墩弯,而是立即返回此結(jié)果。
Future<String> completableFuture = CompletableFuture.completedFuture("Hello");
// ...
String result = completableFuture.get();
assertEquals("Hello", result);
作為替代方案寞射,您可能希望取消Future的執(zhí)行渔工。
假設(shè)我們沒(méi)有設(shè)法找到結(jié)果并決定完全取消異步執(zhí)行。這可以通過(guò)Future的取消方法完成桥温。此方法接收布爾參數(shù)mayInterruptIfRunning引矩,但在CompletableFuture的情況下,它沒(méi)有任何效果笤虫,因?yàn)橹袛嗖挥糜诳刂艭ompletableFuture的處理毙沾。
這是異步方法的修改版本:
public Future<String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.cancel(false);
return null;
});
return completableFuture;
}
當(dāng)我們使用Future.get()方法阻塞結(jié)果時(shí)钙态,如果取消將來(lái)取消兽掰,它將拋出CancellationException:
Future<String> future = calculateAsyncWithCancellation();
future.get(); // CancellationException
4. 具有封裝計(jì)算邏輯的CompletableFuture
上面的代碼允許我們選擇任何并發(fā)執(zhí)行機(jī)制翎苫,但是如果我們想要跳過(guò)這個(gè)樣板并簡(jiǎn)單地異步執(zhí)行一些代碼呢师脂?
靜態(tài)方法runAsync和supplyAsync允許我們相應(yīng)地從Runnable和Supplier功能類(lèi)型中創(chuàng)建CompletableFuture實(shí)例乡数。
雙方可運(yùn)行和供應(yīng)商的功能接口晚唇,允許得益于通過(guò)他們實(shí)例作為lambda表達(dá)式新的Java 8的功能织盼。
該Runnable的接口是在線程使用相同的舊的接口杨何,它不允許返回值。
的供應(yīng)商接口與不具有參數(shù)沥邻,并返回參數(shù)化類(lèi)型的一個(gè)值的單個(gè)方法的通用功能接口危虱。
這允許將Supplier的實(shí)例作為lambda表達(dá)式提供,該表達(dá)式執(zhí)行計(jì)算并返回結(jié)果唐全。這很簡(jiǎn)單:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
5.處理異步計(jì)算的結(jié)果
處理計(jì)算結(jié)果的最通用方法是將其提供給函數(shù)埃跷。該thenApply方法正是這么做的:接受一個(gè)函數(shù)實(shí)例,用它來(lái)處理結(jié)果芦瘾,并返回一個(gè)未來(lái)的保存函數(shù)的返回值:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
如果您不需要在Future鏈中返回值捌蚊,則可以使用Consumer功能接口的實(shí)例。它的單個(gè)方法接受一個(gè)參數(shù)并返回void近弟。
在CompletableFuture中有一個(gè)用于此用例的方法- thenAccept方法接收Consumer并將計(jì)算結(jié)果傳遞給它缅糟。最后的future.get()調(diào)用返回Void類(lèi)型的實(shí)例。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
最后祷愉,如果您既不需要計(jì)算的值也不想在鏈的末尾返回一些值窗宦,那么您可以將Runnable lambda 傳遞給thenRun方法。在下面的示例中二鳄,在調(diào)用future.get()方法之后赴涵,我們只需在控制臺(tái)中打印一行:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture.thenRun(() -> System.out.println("Computation finished."));
future.get();
6.CompletableFuture
CompletableFuture API 的最佳部分是能夠在一系列計(jì)算步驟中組合CompletableFuture實(shí)例。
這種鏈接的結(jié)果本身就是CompletableFuture订讼,允許進(jìn)一步鏈接和組合髓窜。這種方法在函數(shù)式語(yǔ)言中無(wú)處不在,通常被稱(chēng)為monadic設(shè)計(jì)模式欺殿。
在下面的示例中寄纵,我們使用thenCompose方法按順序鏈接兩個(gè)Futures。
請(qǐng)注意脖苏,此方法采用返回CompletableFuture實(shí)例的函數(shù)程拭。該函數(shù)的參數(shù)是先前計(jì)算步驟的結(jié)果。這允許我們?cè)谙乱粋€(gè)CompletableFuture的lambda中使用這個(gè)值:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
該thenCompose方法連同thenApply實(shí)現(xiàn)一元圖案的基本構(gòu)建塊棍潘。它們與Java 8中可用的Stream和Optional類(lèi)的map和flatMap方法密切相關(guān)恃鞋。
兩個(gè)方法都接收一個(gè)函數(shù)并將其應(yīng)用于計(jì)算結(jié)果崖媚,但thenCompose(flatMap)方法接收一個(gè)函數(shù),該函數(shù)返回相同類(lèi)型的另一個(gè)對(duì)象恤浪。此功能結(jié)構(gòu)允許將這些類(lèi)的實(shí)例組合為構(gòu)建塊畅哑。
如果要執(zhí)行兩個(gè)獨(dú)立的Futures并對(duì)其結(jié)果執(zhí)行某些操作,請(qǐng)使用接受Future的thenCombine方法和具有兩個(gè)參數(shù)的Function來(lái)處理兩個(gè)結(jié)果:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
更簡(jiǎn)單的情況是资锰,當(dāng)您想要使用兩個(gè)期貨結(jié)果時(shí)敢课,但不需要將任何結(jié)果值傳遞給Future鏈。該thenAcceptBoth方法是有幫助:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
7. thenApply()和thenCompose()之間的區(qū)別
在前面的部分中绷杜,我們展示了關(guān)于thenApply()和thenCompose()的示例。這兩個(gè)API都有助于鏈接不同的CompletableFuture調(diào)用濒募,但這兩個(gè)函數(shù)的使用是不同的鞭盟。
7.1 thenApply()
此方法用于處理先前調(diào)用的結(jié)果。但是瑰剃,要記住的一個(gè)關(guān)鍵點(diǎn)是返回類(lèi)型將合并所有調(diào)用齿诉。
因此,當(dāng)我們想要轉(zhuǎn)換CompletableFuture 調(diào)用的結(jié)果時(shí)晌姚,此方法很有用 :
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
7.2 thenCompose()
該thenCompose()方法類(lèi)似于thenApply()在都返回一個(gè)新的完成階段粤剧。但是,thenCompose()使用前一個(gè)階段作為參數(shù)挥唠。它會(huì)直接使結(jié)果變平并返回Future抵恋,而不是我們?cè)趖henApply()中觀察到的嵌套未來(lái):
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此,如果想要鏈接CompletableFuture 方法宝磨,那么最好使用thenCompose()弧关。
另請(qǐng)注意,這兩種方法之間的差異類(lèi)似于map()和flatMap()之間的差異唤锉。
8. 并行運(yùn)行多個(gè)Futures
當(dāng)我們需要并行執(zhí)行多個(gè)Futures時(shí)世囊,我們通常希望等待所有它們執(zhí)行,然后處理它們的組合結(jié)果窿祥。
該CompletableFuture.allOf靜態(tài)方法允許等待所有的完成期貨作為一個(gè)變種-精氨酸提供:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
請(qǐng)注意株憾,CompletableFuture.allOf()的返回類(lèi)型是CompletableFuture <Void>。這種方法的局限性在于它不會(huì)返回所有期貨的綜合結(jié)果晒衩。相反嗤瞎,您必須手動(dòng)從Futures獲取結(jié)果。幸運(yùn)的是浸遗,CompletableFuture.join()方法和Java 8 Streams API使它變得簡(jiǎn)單:
String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
該CompletableFuture.join()方法類(lèi)似于GET方法猫胁,但它拋出一個(gè)未經(jīng)檢查的異常的情況下,在未來(lái)沒(méi)有正常完成跛锌。這使得它可以在Stream.map()方法中用作方法引用弃秆。
9.處理錯(cuò)誤
對(duì)于異步計(jì)算步驟鏈中的錯(cuò)誤處理届惋,必須以類(lèi)似的方式調(diào)整throw / catch慣用法。
CompletableFuture類(lèi)允許您在特殊的句柄方法中處理它菠赚,而不是在語(yǔ)法塊中捕獲異常脑豹。此方法接收兩個(gè)參數(shù):計(jì)算結(jié)果(如果成功完成)和拋出異常(如果某些計(jì)算步驟未正常完成)。
在下面的示例中衡查,我們使用handle方法在問(wèn)候語(yǔ)的異步計(jì)算完成時(shí)提供默認(rèn)值瘩欺,因?yàn)闆](méi)有提供名稱(chēng):
String name = null;
// ...
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", completableFuture.get());
作為替代方案,假設(shè)我們想要使用值手動(dòng)完成Future拌牲,如第一個(gè)示例中所示俱饿,但也可以使用異常來(lái)完成它。該completeExceptionally方法旨在用于這一點(diǎn)塌忽。以下示例中的completableFuture.get()方法拋出ExecutionException拍埠,并將RuntimeException作為其原因:
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
在上面的示例中,我們可以使用handle方法異步處理異常土居,但是使用get方法枣购,我們可以使用更典型的同步異常處理方法。
10.異步方法
CompletableFuture類(lèi)中的流體API的大多數(shù)方法都有兩個(gè)帶有Async后綴的附加變體擦耀。這些方法通常用于在另一個(gè)線程中運(yùn)行相應(yīng)的執(zhí)行步驟棉圈。
沒(méi)有Async后綴的方法使用調(diào)用線程運(yùn)行下一個(gè)執(zhí)行階段。不帶Executor參數(shù)的Async方法使用使用ForkJoinPool.commonPool()方法訪問(wèn)的Executor的公共fork / join池實(shí)現(xiàn)來(lái)運(yùn)行一個(gè)步驟眷蜓。帶有Executor參數(shù)的Async方法使用傳遞的Executor運(yùn)行一個(gè)步驟分瘾。
這是一個(gè)使用Function實(shí)例處理計(jì)算結(jié)果的修改示例。唯一可見(jiàn)的區(qū)別是thenApplyAsync方法账磺。但在幕后芹敌,函數(shù)的應(yīng)用程序被包裝到ForkJoinTask實(shí)例中(有關(guān)fork / join框架的更多信息,請(qǐng)參閱文章“Java中的Fork / Join Framework指南”)垮抗。這樣可以進(jìn)一步并行化您的計(jì)算并更有效地使用系統(tǒng)資源氏捞。
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", future.get());
11. JDK 9 CompletableFuture API
在Java 9中, CompletableFuture API通過(guò)以下更改得到了進(jìn)一步增強(qiáng):
- 新工廠方法增加了
- 支持延遲和超時(shí)
- 改進(jìn)了對(duì)子類(lèi)化的支持冒版。
引入了新的實(shí)例API:
- Executor defaultExecutor()
- CompletableFuture<U> newIncompleteFuture()
- CompletableFuture<T> copy()
- CompletionStage<T> minimalCompletionStage()
- CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor)
- CompletableFuture<T> completeAsync(Supplier<? extends T> supplier)
- CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
- CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
我們現(xiàn)在還有一些靜態(tài)實(shí)用方法:
- Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
- Executor delayedExecutor(long delay, TimeUnit unit)
- <U> CompletionStage<U> completedStage(U value)
- <U> CompletionStage<U> failedStage(Throwable ex)
- <U> CompletableFuture<U> failedFuture(Throwable ex)
最后液茎,為了解決超時(shí)問(wèn)題,Java 9又引入了兩個(gè)新功能:
- orTimeout()
- completeOnTimeout()
歡迎大家關(guān)注公眾號(hào):「Java知己」辞嗡,關(guān)注公眾號(hào)捆等,回復(fù)「1024」你懂得,免費(fèi)領(lǐng)取 30 本經(jīng)典編程書(shū)籍续室。關(guān)注我栋烤,與 10 萬(wàn)程序員一起進(jìn)步。 每天更新Java知識(shí)哦挺狰,期待你的到來(lái)明郭!