1.介紹
本文是 CompletableFuture 類的功能和用例的指南, CompletableFuture 類是作為 Java 8 中對 Concurrency 包中API的改進而引入的缭黔。全文較長宴胧,關鍵點用黑色字體加粗表示。
需要具備的前提知識點:
- Thread類扒接、Runnable接口、 Future接口们衙、Lambda表達式钾怔、函數式編程
2. Java中的異步計算
異步計算很難調試。 通常我們希望將任何復雜的計算拆開視為一系列步驟蒙挑。 但是在異步計算的情況下宗侦,通過回調表示的動作往往分散在代碼中,也可能相互嵌套在內部忆蚀。 當我們需要處理其中一個步驟中可能發(fā)生的錯誤時凝垛,情況會變得更加糟糕。
Java 5中蜓谋,引入了 Future 接口梦皮,以作為異步計算的結果,但是它沒有任何方法可以組合這些計算結果或處理可能的錯誤桃焕。
Java 8中剑肯,引入了 CompletableFuture 類。 除 Future 接口外观堂,它還實現了 CompletionStage 接口。 該接口定義了可以與其他步驟組合的異步計算步驟的協(xié)定。
同時眷蜈, CompletableFuture 類是一個構件和框架萨脑,具有大約50種不同的方法竿秆,用于組成,組合,執(zhí)行異步計算的步驟和處理過程中出現的錯誤。如此眾多的API可能會讓人不知所措泞辐,但這些API大多屬于幾種清晰明確的用例,下面會演示典型案例竞滓。
3.將 CompletableFuture 作為簡單的 Future
首先咐吼, CompletableFuture 類實現了 Future 接口,因此你可以將其用作 Future 來實現商佑,但需要額外的完成邏輯锯茄。
例如,你可以使用 CompletableFuture 的無參構造函數創(chuàng)建此類的實例茶没,像 Future 一樣表示將來的計算結果肌幽,將其返回給調用者,并在將來的某個時間使用 complete()
方法完成該過程得到結果抓半。調用者可以使用 get()
方法來阻塞自己的當前線程喂急,直到 get()
方法獲取到計算結果為止。
在下面的示例中琅关,我們有一個方法,該方法創(chuàng)建一個 CompletableFuture 實例讥蔽,然后在另一個線程中分離一些計算并立即返回 Future 涣易。
計算完成后,該方法通過將結果提供給 complete()
方法來完成 Future 冶伞。
public Future<String> calculateAsync() throws InterruptedException {
CompletableFuture <String> completableFuture
= new CompletableFuture <>();
// 為了簡化代碼新症,使用了Java線程池的Executor來創(chuàng)建和執(zhí)行線程
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
completableFuture.complete("Hello");
return null;
});
return completableFuture ;
}
注意,calculateAsync()
方法返回一個 Future 接口的實例响禽。
當我們準備好進入阻塞狀態(tài)來獲取計算結果時徒爹,我們只需調用定義好的calculateAsync()
方法,接收 Future 實例芋类,并在 Future 實例上調用 get()
方法隆嗅,直到得到計算結果。
還要注意的是侯繁, get()
方法會拋出一些受檢異常胖喳,即 ExecutionException(表示計算過程中發(fā)生的異常)和InterruptedException(表示執(zhí)行方法的線程被中斷的異常)。
Future <String> CompletableFuture = calculateAsync();
// ... do something else
String result = CompletableFuture.get();
assertEquals("Hello", result);
如果你已經知道計算結果贮竟,則可以調用 CompletableFuture 實例中的靜態(tài)方法 completedFuture()
丽焊,將結果作為參數傳入较剃。 之后調用 Future 的 get()
方法將不再會阻塞,而是立即返回此結果技健。
Future <String> CompletableFuture = CompletableFuture.completedFuture("Hello");
// ... do something else
String result = CompletableFuture.get();
assertEquals("Hello", result);
另一種情況是你可能要取消執(zhí)行 Future 任務写穴。
假設我們沒有設法找到計算結果,而是決定完全取消異步執(zhí)行雌贱。 這可以通過 Future 的 cancel()
方法 來完成啊送。 此方法接收一個 boolean
型參數 mayInterruptIfRunning,但是對于 CompletableFuture 而言這個方法是無效的帽芽,因為不使用中斷來控制 CompletableFuture 的處理删掀。
這是前面異步方法的修改版本
public Future <String> calculateAsyncWithCancellation() throws InterruptedException {
CompletableFuture<String> CompletableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Thread.sleep(500);
CompletableFuture.cancel(false);
return null;
});
return CompletableFuture;
}
當我們使用 Future 的 get()
方法來阻塞并嘗試獲取結果時,如果 Future 已經被取消執(zhí)行了导街,就會拋出CancellationException 異常披泪。
4.封裝計算邏輯的 CompletableFuture
上面的代碼允許我們選擇任意的并行執(zhí)行機制(Thread 執(zhí)行或 Runnable 執(zhí)行或 ThreadPool 執(zhí)行),但是如果我們想跳過這些樣板并簡單地異步執(zhí)行一些代碼搬瑰,該怎么做呢款票?
靜態(tài)方法 runAsync()
和 supplyAsync()
允許我們根據 Runnable 和 Supplier 功能類型分別創(chuàng)建 CompletableFuture 實例。
由于新的 Java 8功能泽论, Runnable 和 Supplier 都是 functional 函數式接口艾少,所以允許它們的實例作為 lambda 表達式傳遞。
Runnable 接口與線程中使用的舊接口相同翼悴,并且不允許返回值缚够。
Supplier 接口是具有單個方法的通用 functional 接口,該方法沒有參數鹦赎,并且返回參數化類型的值谍椅。
這樣的特性就允許我們提供 Supplier 的實例作為 lambda 表達式來執(zhí)行計算并返回結果。這十分簡單:
CompletableFuture<String> Future = CompletableFuture.supplyAsync(() -> "Hello");
// ... do sometiong else
assertEquals("Hello", Future.get());
5.處理異步計算的結果
處理計算結果的最通用方法是將其提供給另一個函數古话。 thenApply()
方法的作用正是:接受一個 Function 實例雏吭,使用這個函數來處理上一個 Future 的計算結果,并返回一個新的 Future 陪踩,該 Future 包含一個函數處理后的新值:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> Future = completableFuture.thenApply(s -> s + " World");
assertEquals("Hello World", Future.get());
如果不需要在 Future 鏈中返回值杖们,可以改用另一個函數式接口 Consumer。Consumer 中的方法是接受一個參數并返回 void肩狂。
在 CompletableFuture 中有一個針對該用例的方法 — thenAccept()
接收一個 Consumer 實例并將計算結果傳遞給它摘完。最后的 Future 的 get()
方法調用后返回 void。
CompletableFuture<String> CompletableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> Future = CompletableFuture.thenAccept(s -> System.out.println("Computation returned: " + s));
Future.get();
最后傻谁,如果你既不需要計算的值描焰,又不想在 Future 鏈的末端返回某個值,則可以將 Runnable 通過 lambda 傳遞給 thenRun()
方法。 在以下示例中荆秦,在調用 get()
方法之后篱竭,我們僅在控制臺中打印出一行 "Computation finished.":
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> Future = completableFuture.thenRun(() -> System.out.println("Computation finished."));
Future.get();
6. Combining Futures
CompletableFuture API 最棒的部分是能夠在一系列計算步驟中組合 CompletableFuture 實例的功能。
這種 Future 鏈的結果本身就是 CompletableFuture 步绸,它允許進一步的鏈接和組合掺逼。這種做法在函數式編程語言中無處不在。
在以下示例中瓤介,我們將使用 thenCompose()
方法按順序鏈接兩個 Future 吕喘。
請注意,此方法會調用一個函數并返回 CompletableFuture 實例刑桑。 此函數的參數是上一個計算步驟的結果氯质。 這使我們可以在下一個 CompletableFuture 的 lambda 中使用該值:
CompletableFuture<String> CompletableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", CompletableFuture.get());
thenCompose()
方法與 thenApply()
一起實現了monadic模式的基礎構件。它們與Java 8中同樣可用的 Stream 的 map()
和 flatMap()
方法緊密相關祠斧。
這兩個方法都接收一個函數并將其應用于計算結果闻察,但是 thenCompose(flatMap)
方法接收一個函數,該函數返回另一個相同類型的對象琢锋。 這種功能結構允許將這些類的實例組成構件辕漂。
如果要執(zhí)行兩個獨立的 Future 并對其結果進行處理,請使用 thenCombine()
方法吴超,該方法接受帶有兩個參數的 Future 和 Function 來處理兩個結果:
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
7. thenApply() 和 thenCompose() 之間的區(qū)別
在前面的部分中钉嘹,我們顯示了有關 thenApply()
和 thenCompose()
的示例。這兩個方法都可以用來鏈接不同的 CompletableFuture 調用鲸阻,但是這兩個方法的用法不同跋涣。
7.1 thenApply()
這個方法用于處理上一個調用的結果。但是鸟悴,要記住的關鍵是返回類型將結合所有調用陈辱。
因此,當我們要轉換 CompletableFuture 調用的結果時遣臼,此方法很有用:
CompletableFuture<Integer> finalResult = compute().thenApply(s -> s + 1);
7.2 thenCompose()
thenCompose()
方法類似于 thenApply()
性置,兩者均返回新的 Completion Stage拾并。
但是揍堰,thenCompose()
使用上一個 stage 作為參數。它將被 flatten 并直接返回帶有結果的 Future 嗅义,而不是如thenApply()
中觀察到的嵌套的 Future :
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此屏歹,如果是想要鏈接多個 CompletableFuture 方法,那么最好使用 thenCompose()
之碗。
另外注意蝙眶,這兩種方法之間的差異類似于 map()
和 flatMap()
之間的差異。
8. 并行運行多個 Future
當我們需要并行執(zhí)行多個 Future 時,我們通常要等待所有 Future 執(zhí)行完成幽纷,然后處理它們的合并結果式塌。
CompletableFuture 中的 allOf()
靜態(tài)方法允許等待以參數形式傳入的所有 Future 的完成:
CompletableFuture<String> Future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> Future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture <String> Future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture <Void> combinedFuture
= CompletableFuture.allOf(Future1, Future2, Future3);
// ...
combinedFuture.get();
assertTrue(Future1.isDone());
assertTrue(Future2.isDone());
assertTrue(Future3.isDone());
請注意,allOf()
的返回類型是 CompletableFuture <Void>友浸。 此方法的局限性在于它不會返回所有 Future 的合并結果峰尝。 所以當你需要合并多個 Future 結果時,你必須手動進行收恢。 幸運的是武学, join()
方法和 Java 8 Streams API 使其變得簡單:
String combined = Stream.of(Future1, Future2, Future3)
.map( CompletableFuture ::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
join() 方法類似于 get()
方法,但是如果 Future 無法正常完成伦意,它將拋出非受檢異常火窒。這樣就可以將其用作 Stream.map()
方法中的方法引用。
9. 處理錯誤
為了在一系列異步計算步驟中進行錯誤處理驮肉,必須通過 throw / catch 熏矿。與在語法塊中使用 try catch 捕獲異常不同的是, CompletableFuture 類使你可以使用特殊的 handle()
方法對其進行處理缆八。 此方法接收兩個參數:計算結果(如果成功完成)和引發(fā)的異常(如果某些計算步驟未正常完成)曲掰。
在下面的示例中,我們使用 handle()
方法來處理當缺少 name 參數時導致程序報錯的情況奈辰,并通過 handle()
方法輸出一個默認值:
String name = null;
// ...
CompletableFuture <String> CompletableFuture
= CompletableFuture .supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
})}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
assertEquals("Hello, Stranger!", CompletableFuture .get());
作為另一種方案栏妖,假設我們像第一個示例一樣,想用一個值手動通過 complet()
完成 Future 奖恰,同時也希望能夠在出現異常時也能夠完成 Future 吊趾。 completeExceptionally()
方法就是為此目的而設計的。
以下示例中的 get()
方法將拋出一個 ExecutionException瑟啃,其內部是 RuntimeException:
CompletableFuture <String> CompletableFuture = new CompletableFuture <>();
// ...
CompletableFuture .completeExceptionally(new RuntimeException("Calculation failed!"));
// ...
CompletableFuture .get(); // ExecutionException
在上面的示例中论泛,我們可以使用 handle()
方法異步處理異常,但是另一種方式通過 get()
方法蛹屿,我們也可以實現典型的同步異常處理屁奏。
10. 異步方法
CompletableFuture 類中的大多數 API 方法都有兩個額外的類似的方法,帶有 Asyn 后綴错负。這些方法通常用于在另一個線程中運行相應的執(zhí)行步驟坟瓢。
沒有 Asyn 后綴的方法使用當前線程運行下一個執(zhí)行階段。
不帶 Executor 參數的 Async 方法運行一個步驟犹撒,該步驟使用通過 ForkJoinPool.commonPool()
方法訪問的 Executor 的線程池實現折联。 具有 Executor 參數的 Async 方法使用傳遞的 Executor 運行一個步驟。
這是一個經過修改的示例识颊,該示例使用 Function 實例處理計算結果诚镰。 唯一可見的區(qū)別是 thenApplyAsync()
方法。 但是在幕后,函數的應用程序包裝到了 ForkJoinTask 實例中清笨。 這可以使你的計算更加并行化月杉,并可以更有效地使用系統(tǒng)資源。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> Future = completableFuture
.thenApplyAsync(s -> s + " World");
assertEquals("Hello World", Future.get());