新穎的望抽、優(yōu)雅的異步處理數(shù)據(jù)的方法
Java SE 8為Java平臺帶來了許多新東西丑罪,其中很多已經(jīng)在生產(chǎn)環(huán)境當(dāng)中得到了應(yīng)用爪模。但是在異步編程方法沃琅,卻并不是每個程序員都能很好的使用殴蹄,也并非所有應(yīng)用程序都使用java.util.concurrent包究抓,即使此包中對于編寫正確的并發(fā)代碼提供的原語非常有用。
java.util.concurrent包在Java 8中增加了幾個非常好的補充接口和實現(xiàn)類袭灯。我們在本文中討論的是CompletionStage接口和CompletableFuture實現(xiàn)類刺下。 與Future接口一起,它們?yōu)闃?gòu)建異步系統(tǒng)提供了非常好的應(yīng)用模式稽荧。
Problem Statement
讓我們從下面的代碼開始橘茉。 我們不用關(guān)心是哪個API提供的方法,也不關(guān)心使用的類,甚至不用關(guān)心是不是正確的Java代碼。
queryEngine.select("select user from User user")
.forEach(user -> System.out.println(user));
我們這里有一個查詢引擎畅卓,它從數(shù)據(jù)庫上執(zhí)行Java Persistence Query Language (JPQL)類型的請求擅腰。 查詢的結(jié)果,然后打印結(jié)果翁潘。 查詢數(shù)據(jù)庫的速度可能很慢趁冈,因此我們希望在單獨的線程中執(zhí)行此代碼,并在獲取結(jié)果以后觸發(fā)打印拜马。 一旦我們啟動了這項任務(wù)渗勘,我們真的不想再慢慢等待了。
我們在Java 7中有哪些API工具來實現(xiàn)此需求一膨? 眾所周知,Java 5中引入的Callable洒沦,我們可以將要執(zhí)行的任務(wù)包裝在Callable中豹绪,并將此對象提交給ExecutorService。 如下所示:
Callable<String> task = () -> "select user from User";
Future<String> future = executorService.submit(task);
從future對象獲取結(jié)果的唯一方法是在提交Callable任務(wù)的線程中調(diào)用其get() 方法申眼。 此方法是阻塞的瞒津,因此此調(diào)用將阻止線程,直到結(jié)果查詢完畢括尸。
這正是CompletionStage的使用場景巷蚪。
第一個鏈模式
讓我們使用CompletionStage模式重寫任務(wù)提交。
原來:
executor.submit(() -> {
() -> "select user from User";
});
改變:
CompletableFuture<List<User>> completableFuture =
CompletableFuture.supplyAsync(() -> { () -> dbEngine.query("select user from User"); }, executor);
我們將參數(shù)Callable傳遞給CompletableFuture的靜態(tài)supplyAsync()方法,而不是將它傳遞給ExecutorService的submit()方法濒翻。 此方法還可以將Executor作為第二個參數(shù)屁柏,使客戶端可以自定義選擇執(zhí)行Callable的線程池。
它返回一個CompletableFuture實例有送,這是一個Java 8的新類淌喻。在這個對象上,我們可以做以下操作:
completableFuture.thenAccept(System.out::println);
thenAccept()方法接收的參數(shù)是一個Consumer雀摘,在結(jié)果可用時自動調(diào)用該方法裸删,無需編寫額外的等待代碼。 避免像前一種情況那樣造成線程阻塞阵赠。
CompletionStage是什么涯塔?
簡而言之,CompletionStage是一個承載任務(wù)的模型清蚀。接下來我們將看到匕荸,任務(wù)可以是任意的Runnable,Consumer或Function的實例枷邪。 任務(wù)是鏈的一個要素每聪。 CompletionStage以不同的方式鏈接在一起。 “上游”元素是之前執(zhí)行的CompletionStage。 因此药薯,“下游”元素是在之后執(zhí)行的CompletionStage绑洛。
執(zhí)行完成一個或多個上游CompletionStageS后,將觸發(fā)當(dāng)前的CompletionStage執(zhí)行童本。 執(zhí)行完CompletionStageS可能會有返回值真屯,返回值可以傳遞給當(dāng)前的CompletionStage。 當(dāng)前CompletionStage執(zhí)行完會觸發(fā)其他下游的CompletionStageS,并且將生成返回值傳遞下去穷娱。
因此绑蔫,CompletionStage是鏈的一個元素。
CompletionStage接口的實現(xiàn)類是java.util.concurrent.CompletableFuture的實現(xiàn)泵额。 請注意配深,CompletableFuture也實現(xiàn)了Future接口。但是 CompletionStage沒有繼承Future嫁盲。
一個任務(wù)一般有如下三種狀態(tài):
1篓叶、正在執(zhí)行
2、執(zhí)行正常完成羞秤,并產(chǎn)生正確的結(jié)果
3缸托、執(zhí)行異常完成,可能會產(chǎn)生異常
Future的方法介紹
Future定義了三種類型五個方法:
- cancel(), 試圖取消正在執(zhí)行的任務(wù)
- isCanceled() 和 isDone() 判斷任務(wù)的狀態(tài)(取消成功|完成)
- get(), 兩個方法瘾蛋,一個不帶參數(shù)的俐镐,和一個帶超時參數(shù)的。
CompletableFuture增加了六種類似Future的新方法哺哼。
前兩個方法是join()和getNow(value)佩抹。 第一個,join()取董,阻塞直到CompletableFuture完成匹摇,和Future的get()方法一樣。 主要區(qū)別在于join() 方法不會拋出顯式的異常(get()方法拋出InterruptedException, ExecutionException異常)甲葬,從而更簡單廊勃。getNow(value)類似,它會立即返回经窖,如果完成坡垫,則返回執(zhí)行結(jié)果,如果沒有完成画侣,則返回給定得默認(rèn)值value冰悠。 請注意,此調(diào)用不會阻塞配乱,不會等待CompletableFuture完成溉卓。
其余四種方法強制CompletableFuture結(jié)束皮迟,無論是使用默認(rèn)值還是異常,如果CompletableFuture已經(jīng)完成桑寨,它們可以覆蓋此CompletableFuture得返回值伏尼。
- complete(value)方法完成CompletableFuture(如果CompletableFuture沒有結(jié)束),并返回value尉尾。 如果CompletableFuture已完成蔼两,則complete(value)方法不會執(zhí)行胰柑。 如果需要更改value入问,則需要調(diào)用obtrude(value) 方法践险。 此方法確實會更改CompletableFuture的值,即使它已經(jīng)完成肢藐。 但是使用的時候要小心故河,因為complete已經(jīng)觸發(fā)了客戶端,有可能導(dǎo)致客戶端會得到不期望的結(jié)果吆豹。
- 另一對方法的工作方式相同鱼的,但它們強制CompletableFuture拋出異常并結(jié)束: completeExceptionally(throwable)和obtrudeExceptionally(throwable)。 如果CompletableFuture尚未完成瞻讽,則第一個拋出RuntimeException鸳吸,第二個強制CompletableFuture更改其狀態(tài),和*obtrude(value) *類似熏挎。
如何創(chuàng)建CompletableFuture
創(chuàng)建CompletableFutures有以下幾種方式速勇。
創(chuàng)建一個已完成的CompletableFuture
首先介紹的第一種方式,創(chuàng)建了一個已完成的CompletableFuture坎拐。 創(chuàng)建這樣的Future可能看起來很奇怪烦磁,但它在測試環(huán)境中非常有用。
CompletableFuture<String> cf =
CompletableFuture.completedFuture("I'm done!");
cf.isDone(); // return true
cf.join(); // return "I'm done"
從任務(wù)創(chuàng)建一個CompletableFuture
CompletableFuture可以構(gòu)建在兩種任務(wù)上:一個不帶任何參數(shù)且沒有返回值的Runnable哼勇,另一個是不帶參數(shù)都伪,返回一個對象的Supplier。 在這兩種情況下积担,都可以傳遞Executor來設(shè)置執(zhí)行任務(wù)的線程池陨晶。如下所示:
CompletableFuture<Void> cf1 =
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture<T> cf2 =
CompletableFuture.supplyAsync(Supplier<T> supplier);
如果未提供ExecutorService,則任務(wù)將在 ForkJoinPool.commonPool()
線程池
中執(zhí)行帝璧,該池與Stream并行執(zhí)行的線程池相同先誉。
自定義線程池,如下所示:
Runnable runnable = () -> {
System.out.println("Executing in " +
Thread.currentThread().getName());
};
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> cf =
CompletableFuture.runAsync(runnable, executor);
cf.thenRun(() -> System.out.println("I'm done"));
executor.shutdown();
執(zhí)行上面的代碼的烁,輸入如下:
Executing in pool-1-thread-1
I'm done
在這種情況下褐耳,Runnable在我們自定義的的SingleThreadExecutor線程池中執(zhí)行。
構(gòu)建CompletableFuture鏈
我們在本文的開始已經(jīng)介紹過渴庆,CompletableFuture是鏈的一個元素铃芦。 上一節(jié)中看到了如何從任務(wù)(Runnable或Supplier)創(chuàng)建此鏈的第一個元素雅镊。 現(xiàn)在讓我們看看如何將其他任務(wù)鏈接到這個任務(wù)。 事實上刃滓,根據(jù)前面的例子仁烹,我們已經(jīng)猜到了該怎么做了。
鏈的任務(wù)
第一個任務(wù)是由Runnable或Supplier構(gòu)建的注盈,兩個功能接口(你可以看成是functions)晃危,它們不帶任何參數(shù),但是可能會老客,也可能不會有返回值僚饭。
鏈的第二個元素和其他元素都可以獲取前一個元素的結(jié)果(如果有的話)。 所以我們需要不同的functions來構(gòu)建這些元素胧砰。 我們先嘗試簡單的理解一下鳍鸵。
鏈的前一個元素可能會,也可能不會有返回值尉间。 所以鏈的functions的入?yún)⒖梢杂幸粋€對象偿乖,或者沒有參數(shù)。 此鏈元素可能會有哲嘲,可也能不會有返回值贪薪。 所以鏈的函數(shù)應(yīng)該有一個返回值,或者沒有返回值眠副。 這有四種可能的情況画切。 在這四種可能的函數(shù)中,其中不帶結(jié)果參數(shù)囱怕,并產(chǎn)生返回值的函數(shù)是鏈的起點霍弹,我們已在上一節(jié)中看到過。
CompletableFuture API中使用的四種可能的functions的名稱
Takes a Parameters? | Returns Void | Returns R |
---|---|---|
Takes T | Consumer<T> | Function<T, R> |
Does not take anything | Runnable | Not an element of a chain |
鏈的類型
現(xiàn)在我們已經(jīng)對API支持的任務(wù)有所了解娃弓,讓我們來看看鏈的含義典格。 到目前為止,我們假設(shè)鏈?zhǔn)顷P(guān)于觸發(fā)另一個任務(wù)的任務(wù)台丛,將第一個的結(jié)果作為參數(shù)傳遞給第二個任務(wù)耍缴。 這是基本的一對一的鏈。
我們也可以組合元素而不是鏈接它們挽霉。 這僅適用于獲取前一任務(wù)結(jié)果并包裝成CompletableFuture對象提供給另一個任務(wù)防嗡。 這又是一對一的關(guān)系(不是鏈,因為這是組合)炼吴。
但我們也可以構(gòu)建一個樹狀結(jié)構(gòu)本鸣,其中由兩個上游任務(wù)而不是一個上游任務(wù)觸發(fā)的任務(wù)。 我們可以想象成兩個提供組合結(jié)果硅蹦,或者在第一個上游元素提供結(jié)果荣德,并觸發(fā)當(dāng)前任務(wù)闷煤。 這兩種情況都有意義,我們將會說到它們的例子涮瞻。
選擇一個執(zhí)行器
最后鲤拿,我們希望能夠根據(jù)不同的情形來選擇ExecutorService(即線程池)執(zhí)行我們的任務(wù)。 這有很多種情況需要我們來判斷:
- 我們的任務(wù)之一可能是更新圖形用戶界面署咽。 在這種情況下近顷,我們希望它在人機界面(HMI)線程中運行。 Swing宁否,JavaFX和Android就屬于這種情況窒升。
- 我們有些I/O任務(wù)或計算任務(wù)需要在專門的線程池中執(zhí)行。
- 我們的變量中可能存在可見性問題慕匠,需要在同一個線程中執(zhí)行任務(wù)饱须。
- 我們希望在默認(rèn)的fork/join池中異步執(zhí)行任務(wù)。
所有的這些情況下台谊,我們必須增加ExecutorService參數(shù)用來定制執(zhí)行器 蓉媳。
Note: 調(diào)整線程池的大小
《Java并發(fā)編程實戰(zhàn)》一書中,Brian Goetz和合著者們?yōu)榫€程池大小 的優(yōu)化提供了不少中肯的建議锅铅。這非常重要酪呻,如果線程池中線程的數(shù)量過多,最終它們會競爭 稀缺的處理器和內(nèi)存資源盐须,浪費大量的時間在上下文切換上玩荠。反之,如果線程的數(shù)目過少丰歌,正 如你的應(yīng)用所面臨的情況姨蟋,處理器的一些核可能就無法充分利用屉凯。Brian Goetz建議立帖,線程池大 小與處理器的利用率之比可以使用下面的公式進行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
1、NCPU是處理器的核的數(shù)目悠砚,可以通過Runtime.getRuntime().availableProce- ssors()得到
2晓勇、UCPU是期望的CPU利用率(該值應(yīng)該介于0和1之間)
3、W/C是等待時間與計算時間的比率
豐富的API實現(xiàn)
CompletableFuture類中有很多API方法灌旧! 三種類型的任務(wù)绑咱,四種類型的鏈接和組合,三種方式指定ExecutorService枢泰。36種鏈接任務(wù)的方法描融。大量可用的方法使這個類變得很復(fù)雜。
逐一的學(xué)習(xí)API方法將是非常繁瑣的衡蚂,所以讓我們看看該如何正確的選擇合適的API窿克。
模式選擇
以下是一些可用模式的描述骏庸。
一對一的模式
在這種情況下,從第一個CompletableFuture開始年叮,當(dāng)完成其任務(wù)執(zhí)行時具被,我們創(chuàng)建的第二個CompletableFuture開始執(zhí)行。如下所示:
CompletableFuture<String> cf1 =
CompletableFuture.supplyAsync(() -> "Hello world");
CompletableFuture<String> cf2 =
cf1.thenApply(s -> s + " from the Future!");
有三種 "then-apply"的方法只损。 它們都有一個Function的參數(shù)一姿,T為上游元素的結(jié)果,并返回一個新的對象R跃惫。
我們再為流水線添加一個步驟叮叹。 這次,我們thenAccept()方法爆存,參數(shù)為Consumer<String>衬横,沒有返回值(Void)。
CompletableFuture<Void> cf3 =
cf2.thenAccept(System.out::println);
讓我們?yōu)檫@個流水線添加最后一步终蒂。 調(diào)用thenRun(),參數(shù)為Runnable(不帶參數(shù)蜂林,并且沒有返回值) .
CompletableFuture<Void> cf4 =
cf3.thenRun(() -> System.out.println("Done processing this chain"));
這些方法的命名都很清晰:以then開頭,后面跟上函數(shù)接口的名稱(run的參數(shù)是Runnable拇泣,accept參數(shù)為Consumer噪叙,apply參數(shù)為Function)。所有這些方法都在與上游任務(wù)具有相同的執(zhí)行器(同一個線程池)霉翔。
然后睁蕾,這些方法還可以進一步的采用相同的后綴:async。 異步方法在默認(rèn)的fork/join池( ForkJoinPool.commonPool()
)中執(zhí)行其任務(wù)债朵,當(dāng)然你也可以指定任務(wù)執(zhí)行器Executor子眶。
我們用異步的方式重寫cf4,如下所示:
CompletableFuture<Void> cf4 =
cf3.thenRunAsync(() -> System.out.println("Done processing this chain"));
在這種情況下序芦,Runnable任務(wù)將在默認(rèn)的fork/join池中執(zhí)行臭杰。
二對一的組合模式
組合模式是下一步任務(wù)接收兩個上游任務(wù)的結(jié)果的模式。 在這種情況下可以使用兩個函數(shù):BiFunction和BiConsumer谚中。 也可以在組合模式中執(zhí)行Runnable渴杆。 如下所示:
Method | Description |
---|---|
<U,V> CompletableFuture<V> thenCombine(CompletionStage<U> other, BiFunction<T, U, R> action) |
當(dāng)前和另一個給定的階段都正常完成時,兩個結(jié)果作為BiFunction函數(shù)的參數(shù)執(zhí)行宪塔。 |
<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action) |
當(dāng)這個和另一個給定的階段都正常完成時磁奖,兩個結(jié)果作為提供的BiConsumer操作的參數(shù)被執(zhí)行。 |
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) |
當(dāng)這個和另一個給定的階段都正常完成時某筐,執(zhí)行給定的Runnable動作比搭。 |
這些方法也可以采用async后綴,與上一節(jié)方法集具有相同的語義南誊。
二對一的選擇模式
最后一類模式還是二對一模式身诺。 但是這次蔽莱,不是完成兩個上游元素后再執(zhí)行下游元素,而是戚长,并且當(dāng)兩個上游元素中其中一個完成時盗冷,即可執(zhí)行下游元素。這非常有用, 例如同廉,當(dāng)我們想要解析域名時仪糖, 我們可能會發(fā)現(xiàn)查詢一組域名服務(wù)器的效率比只查詢一個域名服務(wù)器更高。 我們不想從不同的服務(wù)器獲得相同的結(jié)果迫肖,因此我們不只要其中一個服務(wù)器返回結(jié)果即可锅劝,所有其他查詢可以安全的取消。
該模式只需要在上游元素的一個結(jié)果蟆湖,這些方法的名稱中都有關(guān)鍵字故爵,因為只會選擇其中一個,所以組合元素應(yīng)該產(chǎn)生相同類型的結(jié)果隅津。
Method | Description |
---|---|
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) |
當(dāng)這個或另一個給定階段正常完成時诬垂,執(zhí)行相應(yīng)的結(jié)果作為提供的Function函數(shù)的參數(shù)。伦仍。 |
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
當(dāng)這個或另一個給定階段正常完成時结窘,執(zhí)行相應(yīng)的結(jié)果作為提供的Consumer操作的參數(shù)。 |
CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) |
當(dāng)這個或另一個給定階段正常完成時充蓝,執(zhí)行給定的Runnable動作隧枫。 |
這些方法也可以采用async后綴,與上一節(jié)方法集具有相同的語義谓苟。
示例
我們先看看幾個例子官脓。
在Jersey中測試一個耗時的請求
下面是 Jersey documentation中的一段代碼.
@Path("/resource")
public class AsyncResource {
@Inject
private Executor executor;
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
executor.execute(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
}
}
這是一段基本的REST服務(wù)代碼,它調(diào)用耗時的操作涝焙。 這種情況下卑笨,典型的處理方法是在另一個線程中的異步調(diào)用耗時操作。 此方法沒有返回值; 上面的代碼時Jersey的實現(xiàn)方式纱皆。
我們在這里又遇到了這么一個問題:我們?nèi)绾螌υ摲椒ㄟM行單元測試湾趾? 測試 longOperation()
不是問題:我們可以單獨對該方法進行單元測試芭商。 我們需要在這里測試的是如何將result
對象正確地傳遞給asyncResponse
對象的 resume()
方法派草。 這可以通過測試框架輕松完成,例如Mockito铛楣。 但是我們又面臨的問題如下:
- 在 "main"主線程中執(zhí)行executor.execute() 近迁。
- 但是asyncResponse.resume()是在另一個線程中異步調(diào)用的, 同時我們無法獲取到結(jié)果.
在測試中我們需要的是在asyncResponse.resume() 后執(zhí)行的某種回調(diào),以便我們可以模擬測試簸州。如下所示:
Mockito.verify(mockAsyncResponse).resume(result);
我們運行這段簡單的代碼:
- 調(diào)用resume()方法
- 假設(shè)和執(zhí)行resume()是同一個線程; 那么鉴竭,確信我們的模擬測試中不會出現(xiàn)任何并發(fā)問題(特別是可見性)
這時候歧譬,CompletionStage框架終于排上用場了!我們依據(jù)Runnable創(chuàng)建一個CompletionStage對象搏存,而不是將Runnable傳遞給executor.execute()方法瑰步。
原來:
executor.submit(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
使用CompletionStage重寫:
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(() -> {
String result = longOperation();
asyncResponse.resume(result);
}, executor);
因為CompletionStage可以觸發(fā)其他任務(wù),我們使用下面的代碼進行測試:
completableFuture
.thenRun(() -> {
Mockito.verify(mockAsyncResponse).resume(result);
}
);
這段代碼完全符合我們的要求:
- 它由前一個CompletionStage的Runnable完成觸發(fā)璧眠。
- 它在同一個線程中執(zhí)行缩焦。
要實現(xiàn)此該方案,我們需要在類中創(chuàng)建一個公共方法责静,該類返回CompletableFuture袁滥。 如果我們修改了Jersey方法的返回類型,那么Jersey將嘗試使用此返回類型構(gòu)建響應(yīng)灾螃,將其轉(zhuǎn)換為XML或JSON题翻。 對于CompletableFuture,可能會導(dǎo)致運行失敗腰鬼。
因此嵌赠,完整的測試模式如下:
- 在mocks中創(chuàng)建模擬對象:
String result = Mockito.mock(String.class);
AsyncResponse response = Mockito.mock(AsyncResponse.class);
Runnable train = () -> {
Mockito.doReturn(result).when(response).longOperation();
}
Runnable verify = () -> Mockito.verify(response).resume(result);
2、創(chuàng)建調(diào)用和驗證對象:
Runnable callAndVerify = () -> {
asyncResource.executeAsync(response).thenRun(verify); }
3熄赡、最后創(chuàng)建要測試的任務(wù):
ExecutorService executor = Executors.newSingleThreadExecutor();
AsyncResource asyncResource = new AsyncResource();
asyncResource.setExecutorService(executor);
CompletableFuture
.runAsync(train, executor)
.thenRun(callAndVerify);
因為這是一個單元測試猾普,如果在給定的時間后沒有看到響應(yīng),我們可能希望失敗本谜。 我們可以使用CompletableFuture中對于Future接口的get()方法來實現(xiàn)初家。
異步分析網(wǎng)頁的鏈接
讓我們編碼實現(xiàn)如下需求:在Swing面板中顯示自動的分析網(wǎng)頁的鏈接(異步方式)
我們需要如下幾個步驟:
1、讀取網(wǎng)頁內(nèi)容
2乌助、獲取網(wǎng)頁鏈接
3溜在、Swing面板中顯示鏈接
當(dāng)然,修改Swing組件應(yīng)該從合適的線程完成他托,但是掖肋,我們不希望在此線程中運行長任務(wù)。
使用CompletableFuture赏参,很簡單就能實現(xiàn)了:
CompletableFuture.supplyAsync(
() -> readPage("http://whatever.com/")
)
.thenApply(page -> linkParser.getLinks(page))
.thenAcceptAsync(
links -> displayPanel.display(links),
executor
);
第一步是創(chuàng)建異步執(zhí)行的Supplier志笼。 比如它以String形式返回網(wǎng)頁內(nèi)容。
第二步是將獲取到的頁面內(nèi)容傳遞給linkParser. 這是一個返回List<String> 的function函數(shù). 這前兩個任務(wù)在同一個線程中執(zhí)行.
最后一步只是獲取鏈接列表并顯示把篓。 這個任務(wù)需要訪問Swing組件纫溃,所以它應(yīng)該在Swing線程中執(zhí)行。 我們通過傳遞正確的executor作為參數(shù)來做到這一點韧掩。
有一點比較好:Executor接口是一個functional interface紊浩。 我們可以用lambda實現(xiàn)它:
Executor executor = runnable -> SwingUtilities.invokeLater(runnable);
我們可以利用方法引用語法來編寫此模式的最終版本:
CompletableFuture.supplyAsync(
() -> readPage("http://whatever.com/")
)
.thenApply(Parser::getLinks)
.thenAcceptAsync(
DisplayPanel::display,
SwingUtilities::invokeLater
);
CompletableFutures結(jié)合lambdas和方法引用可以編寫非常優(yōu)雅的代碼。
異常處理
CompletionStage API還提供了異常處理模式。 讓我們看一個例子坊谁。
假設(shè)我們有如圖所示的處理鏈:
所有這些CompletableFutures都使用我們在上面說到的模式鏈接在一起费彼。
現(xiàn)在假設(shè)CF21引發(fā)異常。 如果沒有對此異常做處理口芍,則所有下游的CompletableFutures都會出錯箍铲。 這意味著兩件事:
- CF21, CF31, 和 CF41的CompletableFutures調(diào)用isCompletedExceptionally()都返回 true .
- 這些對象調(diào)用get()方法都會拋出ExecutionException, 原因是因為CF21引發(fā)的根異常.
我們可以使用下圖所示的模式處理CompletableFutures鏈中的異常。
cf30 = cf21.exceptionally();
此模式創(chuàng)建的CompletableFuture具有以下屬性:
- 如果CF21正常完成鬓椭,則CF30將透明地返回與CF21相同的值虹钮。
- 如果CF21發(fā)生異常,則CF30能夠捕獲它膘融,并且可以將正常值傳輸?shù)紺F31芙粱。
有好幾種方法可以做到這一點,用不同方法的接受異常氧映。
exceptionally(Function<Throwable, T> function)是最簡單的方法調(diào)用. 它返回一個CompletionStage春畔,如果上游CompletionStage也正常完成,則返回的CompletionStage也會以相同的值正常完成岛都。 否則律姨,如果此上游CompletionStage發(fā)生異常,則將此異常傳遞給提供的函數(shù)臼疫。返回的CompletionStage正常完成择份,返回Function的結(jié)果。 此方法沒有異步版本烫堤。
handle(BiFunction<T, Throwable, R> bifunction) 具有相同的語義. 它返回一個CompletionStage荣赶,當(dāng)此階段正常或異常完成時鸽斟,將使用此階段的結(jié)果和異常作為所提供函數(shù)的參數(shù)執(zhí)行拔创。 如果上游CompletionStage正常完成,則Throwable為null調(diào)用BiFunction富蓄,如果異常完成剩燥,則R為null調(diào)用BiFunction。 在這兩種情況下立倍,都被能正常返回的CompletionStage灭红。該方法有兩個姐妹方法handleAsync(BiFunction<? super T,Throwable,? extends U> fn)和handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor), 這兩種方法的工作方式相同口注,都是異步的变擒,只是執(zhí)行器不同。執(zhí)行器可以作為參數(shù)提供疆导。如果沒有提供赁项,則使用公共的fork/join 線程池葛躏。
第三種處理異常的方式是whenComplete(BiConsumer<T, Throwable> biconsumer)澈段。handle() 可以正常結(jié)束并返回CompletionStage悠菜,而whenComplete()則不盡然。 它遵循構(gòu)建的CompletionStage的流水線行為败富。 因此悔醋,如果上游CompletionStage發(fā)生異常,則whenComplete()返回的CompletionStage也會異常完成(結(jié)合exceptionally()理解)兽叮。使用上游CompletionStage的返回值及其此階段返回值調(diào)用BiConsumer. 與handle()情況一樣芬骄,將使用結(jié)果(或 null如果沒有))和此階段的異常(或 null如果沒有)調(diào)用BiConsumer;BiConsumer沒有返回值鹦聪。 所以它只是一個不會影響CompletionStages流水線處理的回調(diào)账阻。 與handle()方法類似,該方法也有兩個姐妹方法whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)和whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)泽本。這兩種方法的工作方式都是異步的淘太。執(zhí)行器可以作為參數(shù)提供。如果沒有提供规丽,則使用公共的fork/join 線程池蒲牧。.
結(jié)論
CompletionStage 接口和CompletableFuture類帶來了異步處理數(shù)據(jù)的新方式。這個API非常復(fù)雜赌莺,主要是由于這個接口和類暴露的方法數(shù)量較多冰抢,但是,豐富的API使得我們處理異步數(shù)據(jù)流水線方面有了更多的選擇艘狭,以便更好的滿足應(yīng)用程序的需求挎扰。
這些API基于lambda表達式構(gòu)建,從而創(chuàng)造非常干凈且優(yōu)雅的代碼巢音。 它可以很好地控制哪個線程執(zhí)行每個任務(wù)鼓鲁。 它還允許以多種方式構(gòu)建流水線和組合任務(wù),并且在處理異常方面也提供對應(yīng)的方式方法港谊。