java8 -CompletableFuture

新穎的望抽、優(yōu)雅的異步處理數(shù)據(jù)的方法

Java SE 8為Java平臺帶來了許多新東西丑罪,其中很多已經(jīng)在生產(chǎn)環(huán)境當(dāng)中得到了應(yīng)用爪模。但是在異步編程方法沃琅,卻并不是每個程序員都能很好的使用殴蹄,也并非所有應(yīng)用程序都使用java.util.concurrent包究抓,即使此包中對于編寫正確的并發(fā)代碼提供的原語非常有用。
java.util.concurrent包在Java 8中增加了幾個非常好的補充接口和實現(xiàn)類袭灯。我們在本文中討論的是CompletionStage接口和CompletableFuture實現(xiàn)類刺下。 與Future接口一起,它們?yōu)闃?gòu)建異步系統(tǒng)提供了非常好的應(yīng)用模式稽荧。

Problem Statement

讓我們從下面的代碼開始橘茉。 我們不用關(guān)心是哪個API提供的方法,也不關(guān)心使用的類,甚至不用關(guān)心是不是正確的Java代碼。

queryEngine.select("select user from User user")  
            .forEach(user -> System.out.println(user));  

我們這里有一個查詢引擎畅卓,它從數(shù)據(jù)庫上執(zhí)行Java Persistence Query Language (JPQL)類型的請求擅腰。 查詢的結(jié)果,然后打印結(jié)果翁潘。 查詢數(shù)據(jù)庫的速度可能很慢趁冈,因此我們希望在單獨的線程中執(zhí)行此代碼,并在獲取結(jié)果以后觸發(fā)打印拜马。 一旦我們啟動了這項任務(wù)渗勘,我們真的不想再慢慢等待了。
我們在Java 7中有哪些API工具來實現(xiàn)此需求一膨? 眾所周知,Java 5中引入的Callable洒沦,我們可以將要執(zhí)行的任務(wù)包裝在Callable中豹绪,并將此對象提交給ExecutorService。 如下所示:

Callable<String> task = () -> "select user from User";  
Future<String> future = executorService.submit(task);  

future對象獲取結(jié)果的唯一方法是在提交Callable任務(wù)的線程中調(diào)用其get() 方法申眼。 此方法是阻塞的瞒津,因此此調(diào)用將阻止線程,直到結(jié)果查詢完畢括尸。
這正是
CompletionStage
的使用場景巷蚪。

第一個鏈模式

讓我們使用CompletionStage模式重寫任務(wù)提交。
原來:

executor.submit(() -> {  
           () -> "select user from User";  
}); 

改變:

CompletableFuture<List<User>> completableFuture =  
     CompletableFuture.supplyAsync(() -> {           () -> dbEngine.query("select user from User");    }, executor);  

我們將參數(shù)Callable傳遞給CompletableFuture的靜態(tài)supplyAsync()方法,而不是將它傳遞給ExecutorServicesubmit()方法濒翻。 此方法還可以將Executor作為第二個參數(shù)屁柏,使客戶端可以自定義選擇執(zhí)行Callable的線程池。
它返回一個CompletableFuture實例有送,這是一個Java 8的新類淌喻。在這個對象上,我們可以做以下操作:

completableFuture.thenAccept(System.out::println);  

thenAccept()方法接收的參數(shù)是一個Consumer雀摘,在結(jié)果可用時自動調(diào)用該方法裸删,無需編寫額外的等待代碼。 避免像前一種情況那樣造成線程阻塞阵赠。

CompletionStage是什么涯塔?

簡而言之,CompletionStage是一個承載任務(wù)的模型清蚀。接下來我們將看到匕荸,任務(wù)可以是任意的RunnableConsumerFunction的實例枷邪。 任務(wù)是鏈的一個要素每聪。 CompletionStage以不同的方式鏈接在一起。 “上游”元素是之前執(zhí)行的CompletionStage。 因此药薯,“下游”元素是在之后執(zhí)行的CompletionStage绑洛。
執(zhí)行完成一個或多個上游CompletionStageS后,將觸發(fā)當(dāng)前的CompletionStage執(zhí)行童本。 執(zhí)行完CompletionStageS可能會有返回值真屯,返回值可以傳遞給當(dāng)前的CompletionStage。 當(dāng)前CompletionStage執(zhí)行完會觸發(fā)其他下游的CompletionStageS,并且將生成返回值傳遞下去穷娱。
因此绑蔫,CompletionStage是鏈的一個元素。
CompletionStage接口的實現(xiàn)類是java.util.concurrent.CompletableFuture的實現(xiàn)泵额。 請注意配深,CompletableFuture也實現(xiàn)了Future接口。但是 CompletionStage沒有繼承Future嫁盲。
一個任務(wù)一般有如下三種狀態(tài):
1篓叶、正在執(zhí)行
2、執(zhí)行正常完成羞秤,并產(chǎn)生正確的結(jié)果
3缸托、執(zhí)行異常完成,可能會產(chǎn)生異常

Future的方法介紹

Future定義了三種類型五個方法:

  • cancel(), 試圖取消正在執(zhí)行的任務(wù)
  • isCanceled()isDone() 判斷任務(wù)的狀態(tài)(取消成功|完成)
  • get(), 兩個方法瘾蛋,一個不帶參數(shù)的俐镐,和一個帶超時參數(shù)的。

CompletableFuture增加了六種類似Future的新方法哺哼。

前兩個方法是join()getNow(value)佩抹。 第一個,join()取董,阻塞直到CompletableFuture完成匹摇,和Futureget()方法一樣。 主要區(qū)別在于join() 方法不會拋出顯式的異常(get()方法拋出InterruptedException, ExecutionException異常)甲葬,從而更簡單廊勃。getNow(value)類似,它會立即返回经窖,如果完成坡垫,則返回執(zhí)行結(jié)果,如果沒有完成画侣,則返回給定得默認(rèn)值value冰悠。 請注意,此調(diào)用不會阻塞配乱,不會等待CompletableFuture完成溉卓。
其余四種方法強制CompletableFuture結(jié)束皮迟,無論是使用默認(rèn)值還是異常,如果CompletableFuture已經(jīng)完成桑寨,它們可以覆蓋此CompletableFuture得返回值伏尼。

  • complete(value)方法完成CompletableFuture(如果CompletableFuture沒有結(jié)束),并返回value尉尾。 如果CompletableFuture已完成蔼两,則complete(value)方法不會執(zhí)行胰柑。 如果需要更改value入问,則需要調(diào)用obtrude(value) 方法践险。 此方法確實會更改CompletableFuture的值,即使它已經(jīng)完成肢藐。 但是使用的時候要小心故河,因為complete已經(jīng)觸發(fā)了客戶端,有可能導(dǎo)致客戶端會得到不期望的結(jié)果吆豹。
  • 另一對方法的工作方式相同鱼的,但它們強制CompletableFuture拋出異常并結(jié)束: completeExceptionally(throwable)和obtrudeExceptionally(throwable)。 如果CompletableFuture尚未完成瞻讽,則第一個拋出RuntimeException鸳吸,第二個強制CompletableFuture更改其狀態(tài),和*obtrude(value) *類似熏挎。

如何創(chuàng)建CompletableFuture

創(chuàng)建CompletableFutures有以下幾種方式速勇。

創(chuàng)建一個已完成的CompletableFuture

首先介紹的第一種方式,創(chuàng)建了一個已完成的CompletableFuture坎拐。 創(chuàng)建這樣的Future可能看起來很奇怪烦磁,但它在測試環(huán)境中非常有用。

CompletableFuture<String> cf =  
     CompletableFuture.completedFuture("I'm done!");  
cf.isDone(); // return true  
cf.join();   // return "I'm done"  

從任務(wù)創(chuàng)建一個CompletableFuture

CompletableFuture可以構(gòu)建在兩種任務(wù)上:一個不帶任何參數(shù)且沒有返回值的Runnable哼勇,另一個是不帶參數(shù)都伪,返回一個對象的Supplier。 在這兩種情況下积担,都可以傳遞Executor來設(shè)置執(zhí)行任務(wù)的線程池陨晶。如下所示:

CompletableFuture<Void> cf1 =  
     CompletableFuture.runAsync(Runnable runnable);  
CompletableFuture<T> cf2 =  
    CompletableFuture.supplyAsync(Supplier<T> supplier);  

如果未提供ExecutorService,則任務(wù)將在 ForkJoinPool.commonPool()線程池
中執(zhí)行帝璧,該池與Stream并行執(zhí)行的線程池相同先誉。
自定義線程池,如下所示:

Runnable runnable = () -> {  
       System.out.println("Executing in " +  
Thread.currentThread().getName());  
       };  
  
ExecutorService executor = Executors.newSingleThreadExecutor();  
  
CompletableFuture<Void> cf =  
CompletableFuture.runAsync(runnable, executor);  
cf.thenRun(() -> System.out.println("I'm done"));  
  
executor.shutdown(); 

執(zhí)行上面的代碼的烁,輸入如下:

Executing in pool-1-thread-1
I'm done

在這種情況下褐耳,Runnable在我們自定義的的SingleThreadExecutor線程池中執(zhí)行。

構(gòu)建CompletableFuture鏈

我們在本文的開始已經(jīng)介紹過渴庆,CompletableFuture是鏈的一個元素铃芦。 上一節(jié)中看到了如何從任務(wù)(RunnableSupplier)創(chuàng)建此鏈的第一個元素雅镊。 現(xiàn)在讓我們看看如何將其他任務(wù)鏈接到這個任務(wù)。 事實上刃滓,根據(jù)前面的例子仁烹,我們已經(jīng)猜到了該怎么做了。

鏈的任務(wù)

第一個任務(wù)是由RunnableSupplier構(gòu)建的注盈,兩個功能接口(你可以看成是functions)晃危,它們不帶任何參數(shù),但是可能會老客,也可能不會有返回值僚饭。
鏈的第二個元素和其他元素都可以獲取前一個元素的結(jié)果(如果有的話)。 所以我們需要不同的functions來構(gòu)建這些元素胧砰。 我們先嘗試簡單的理解一下鳍鸵。
鏈的前一個元素可能會,也可能不會有返回值尉间。 所以鏈的functions的入?yún)⒖梢杂幸粋€對象偿乖,或者沒有參數(shù)。 此鏈元素可能會有哲嘲,可也能不會有返回值贪薪。 所以鏈的函數(shù)應(yīng)該有一個返回值,或者沒有返回值眠副。 這有四種可能的情況画切。 在這四種可能的函數(shù)中,其中不帶結(jié)果參數(shù)囱怕,并產(chǎn)生返回值的函數(shù)是鏈的起點霍弹,我們已在上一節(jié)中看到過。
CompletableFuture API中使用的四種可能的functions的名稱

Takes a Parameters? Returns Void Returns R
Takes T Consumer<T> Function<T, R>
Does not take anything Runnable Not an element of a chain

鏈的類型

現(xiàn)在我們已經(jīng)對API支持的任務(wù)有所了解娃弓,讓我們來看看鏈的含義典格。 到目前為止,我們假設(shè)鏈?zhǔn)顷P(guān)于觸發(fā)另一個任務(wù)的任務(wù)台丛,將第一個的結(jié)果作為參數(shù)傳遞給第二個任務(wù)耍缴。 這是基本的一對一的鏈。
我們也可以組合元素而不是鏈接它們挽霉。 這僅適用于獲取前一任務(wù)結(jié)果并包裝成CompletableFuture對象提供給另一個任務(wù)防嗡。 這又是一對一的關(guān)系(不是鏈,因為這是組合)炼吴。
但我們也可以構(gòu)建一個樹狀結(jié)構(gòu)本鸣,其中由兩個上游任務(wù)而不是一個上游任務(wù)觸發(fā)的任務(wù)。 我們可以想象成兩個提供組合結(jié)果硅蹦,或者在第一個上游元素提供結(jié)果荣德,并觸發(fā)當(dāng)前任務(wù)闷煤。 這兩種情況都有意義,我們將會說到它們的例子涮瞻。

選擇一個執(zhí)行器

最后鲤拿,我們希望能夠根據(jù)不同的情形來選擇ExecutorService(即線程池)執(zhí)行我們的任務(wù)。 這有很多種情況需要我們來判斷:

  • 我們的任務(wù)之一可能是更新圖形用戶界面署咽。 在這種情況下近顷,我們希望它在人機界面(HMI)線程中運行。 Swing宁否,JavaFX和Android就屬于這種情況窒升。
  • 我們有些I/O任務(wù)或計算任務(wù)需要在專門的線程池中執(zhí)行。
  • 我們的變量中可能存在可見性問題慕匠,需要在同一個線程中執(zhí)行任務(wù)饱须。
  • 我們希望在默認(rèn)的fork/join池中異步執(zhí)行任務(wù)。

所有的這些情況下台谊,我們必須增加ExecutorService參數(shù)用來定制執(zhí)行器 蓉媳。

Note: 調(diào)整線程池的大小
《Java并發(fā)編程實戰(zhàn)》一書中,Brian Goetz和合著者們?yōu)榫€程池大小 的優(yōu)化提供了不少中肯的建議锅铅。這非常重要酪呻,如果線程池中線程的數(shù)量過多,最終它們會競爭 稀缺的處理器和內(nèi)存資源盐须,浪費大量的時間在上下文切換上玩荠。反之,如果線程的數(shù)目過少丰歌,正 如你的應(yīng)用所面臨的情況姨蟋,處理器的一些核可能就無法充分利用屉凯。Brian Goetz建議立帖,線程池大 小與處理器的利用率之比可以使用下面的公式進行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
1、NCPU是處理器的核的數(shù)目悠砚,可以通過Runtime.getRuntime().availableProce- ssors()得到
2晓勇、UCPU是期望的CPU利用率(該值應(yīng)該介于0和1之間)
3、W/C是等待時間與計算時間的比率

豐富的API實現(xiàn)

CompletableFuture類中有很多API方法灌旧! 三種類型的任務(wù)绑咱,四種類型的鏈接和組合,三種方式指定ExecutorService枢泰。36種鏈接任務(wù)的方法描融。大量可用的方法使這個類變得很復(fù)雜。
逐一的學(xué)習(xí)API方法將是非常繁瑣的衡蚂,所以讓我們看看該如何正確的選擇合適的API窿克。

模式選擇

以下是一些可用模式的描述骏庸。

一對一的模式

在這種情況下,從第一個CompletableFuture開始年叮,當(dāng)完成其任務(wù)執(zhí)行時具被,我們創(chuàng)建的第二個CompletableFuture開始執(zhí)行。如下所示:

CompletableFuture<String> cf1 =  
CompletableFuture.supplyAsync(() -> "Hello world");  
CompletableFuture<String> cf2 =  
cf1.thenApply(s -> s + " from the Future!"); 

有三種 "then-apply"的方法只损。 它們都有一個Function的參數(shù)一姿,T為上游元素的結(jié)果,并返回一個新的對象R跃惫。
我們再為流水線添加一個步驟叮叹。 這次,我們thenAccept()方法爆存,參數(shù)為Consumer<String>衬横,沒有返回值(Void)。

CompletableFuture<Void> cf3 =  
cf2.thenAccept(System.out::println); 

讓我們?yōu)檫@個流水線添加最后一步终蒂。 調(diào)用thenRun(),參數(shù)為Runnable(不帶參數(shù)蜂林,并且沒有返回值) .

CompletableFuture<Void> cf4 =
                cf3.thenRun(() -> System.out.println("Done processing this chain"));

這些方法的命名都很清晰:以then開頭,后面跟上函數(shù)接口的名稱(run的參數(shù)是Runnable拇泣,accept參數(shù)為Consumer噪叙,apply參數(shù)為Function)。所有這些方法都在與上游任務(wù)具有相同的執(zhí)行器(同一個線程池)霉翔。
然后睁蕾,這些方法還可以進一步的采用相同的后綴:async。 異步方法在默認(rèn)的fork/join池( ForkJoinPool.commonPool())中執(zhí)行其任務(wù)债朵,當(dāng)然你也可以指定任務(wù)執(zhí)行器Executor子眶。

我們用異步的方式重寫cf4,如下所示:

CompletableFuture<Void> cf4 =
                cf3.thenRunAsync(() -> System.out.println("Done processing this chain"));

在這種情況下序芦,Runnable任務(wù)將在默認(rèn)的fork/join池中執(zhí)行臭杰。

二對一的組合模式

組合模式是下一步任務(wù)接收兩個上游任務(wù)的結(jié)果的模式。 在這種情況下可以使用兩個函數(shù):BiFunctionBiConsumer谚中。 也可以在組合模式中執(zhí)行Runnable渴杆。 如下所示:
\color{#123456}{\normalsize\mathbf{二對一組合模式的三種基本方法}}

Method Description
<U,V> CompletableFuture<V>
thenCombine(CompletionStage<U> other, BiFunction<T, U, R> action)
當(dāng)前和另一個給定的階段都正常完成時,兩個結(jié)果作為BiFunction函數(shù)的參數(shù)執(zhí)行宪塔。
<U> CompletableFuture<Void>
thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)
當(dāng)這個和另一個給定的階段都正常完成時磁奖,兩個結(jié)果作為提供的BiConsumer操作的參數(shù)被執(zhí)行。
CompletableFuture<Void>
runAfterBoth(CompletionStage<?> other, Runnable action)
當(dāng)這個和另一個給定的階段都正常完成時某筐,執(zhí)行給定的Runnable動作比搭。

這些方法也可以采用async后綴,與上一節(jié)方法集具有相同的語義南誊。

二對一的選擇模式

最后一類模式還是二對一模式身诺。 但是這次蔽莱,不是完成兩個上游元素后再執(zhí)行下游元素,而是戚长,并且當(dāng)兩個上游元素中其中一個完成時盗冷,即可執(zhí)行下游元素。這非常有用, 例如同廉,當(dāng)我們想要解析域名時仪糖, 我們可能會發(fā)現(xiàn)查詢一組域名服務(wù)器的效率比只查詢一個域名服務(wù)器更高。 我們不想從不同的服務(wù)器獲得相同的結(jié)果迫肖,因此我們不只要其中一個服務(wù)器返回結(jié)果即可锅劝,所有其他查詢可以安全的取消。

該模式只需要在上游元素的一個結(jié)果蟆湖,這些方法的名稱中都有關(guān)鍵字故爵,因為只會選擇其中一個,所以組合元素應(yīng)該產(chǎn)生相同類型的結(jié)果隅津。

\color{#123456}{\normalsize\mathbf{二對一選擇模式的三種基本方法}}

Method Description
<U> CompletableFuture<U>
applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
當(dāng)這個或另一個給定階段正常完成時诬垂,執(zhí)行相應(yīng)的結(jié)果作為提供的Function函數(shù)的參數(shù)。伦仍。
CompletableFuture<Void>
acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
當(dāng)這個或另一個給定階段正常完成時结窘,執(zhí)行相應(yīng)的結(jié)果作為提供的Consumer操作的參數(shù)。
CompletableFuture<Void>
runAfterEither(CompletionStage<?> other, Runnable action)
當(dāng)這個或另一個給定階段正常完成時充蓝,執(zhí)行給定的Runnable動作隧枫。

這些方法也可以采用async后綴,與上一節(jié)方法集具有相同的語義谓苟。

示例

我們先看看幾個例子官脓。

在Jersey中測試一個耗時的請求

下面是 Jersey documentation中的一段代碼.


@Path("/resource")  
public class AsyncResource {  
  
      @Inject  
      private Executor executor;  
  
      @GET  
      public void asyncGet(@Suspended final AsyncResponse asyncResponse) {  
  
           executor.execute(() -> {  
              String result = longOperation();  
              asyncResponse.resume(result);  
         });  
     }   
}  

這是一段基本的REST服務(wù)代碼,它調(diào)用耗時的操作涝焙。 這種情況下卑笨,典型的處理方法是在另一個線程中的異步調(diào)用耗時操作。 此方法沒有返回值; 上面的代碼時Jersey的實現(xiàn)方式纱皆。

我們在這里又遇到了這么一個問題:我們?nèi)绾螌υ摲椒ㄟM行單元測試湾趾? 測試 longOperation()不是問題:我們可以單獨對該方法進行單元測試芭商。 我們需要在這里測試的是如何將result對象正確地傳遞給asyncResponse對象的 resume()方法派草。 這可以通過測試框架輕松完成,例如Mockito铛楣。 但是我們又面臨的問題如下:

  • 在 "main"主線程中執(zhí)行executor.execute() 近迁。
  • 但是asyncResponse.resume()是在另一個線程中異步調(diào)用的, 同時我們無法獲取到結(jié)果.

在測試中我們需要的是在asyncResponse.resume() 后執(zhí)行的某種回調(diào),以便我們可以模擬測試簸州。如下所示:

Mockito.verify(mockAsyncResponse).resume(result); 

我們運行這段簡單的代碼:

  • 調(diào)用resume()方法
  • 假設(shè)和執(zhí)行resume()是同一個線程; 那么鉴竭,確信我們的模擬測試中不會出現(xiàn)任何并發(fā)問題(特別是可見性)

這時候歧譬,CompletionStage框架終于排上用場了!我們依據(jù)Runnable創(chuàng)建一個CompletionStage對象搏存,而不是將Runnable傳遞給executor.execute()方法瑰步。

原來:

executor.submit(() -> {  
           String result = longOperation();  
           asyncResponse.resume(result);  
       });  

使用CompletionStage重寫:

CompletableFuture<Void> completableFuture =  
CompletableFuture.runAsync(() -> {  
           String result = longOperation();  
           asyncResponse.resume(result);  
       }, executor);  

因為CompletionStage可以觸發(fā)其他任務(wù),我們使用下面的代碼進行測試:

completableFuture  
    .thenRun(() -> {  
                                   Mockito.verify(mockAsyncResponse).resume(result);  
    }  
); 

這段代碼完全符合我們的要求:

  • 它由前一個CompletionStage的Runnable完成觸發(fā)璧眠。
  • 它在同一個線程中執(zhí)行缩焦。

要實現(xiàn)此該方案,我們需要在類中創(chuàng)建一個公共方法责静,該類返回CompletableFuture袁滥。 如果我們修改了Jersey方法的返回類型,那么Jersey將嘗試使用此返回類型構(gòu)建響應(yīng)灾螃,將其轉(zhuǎn)換為XML或JSON题翻。 對于CompletableFuture,可能會導(dǎo)致運行失敗腰鬼。

因此嵌赠,完整的測試模式如下:

  1. 在mocks中創(chuàng)建模擬對象:
String result = Mockito.mock(String.class);  
AsyncResponse response = Mockito.mock(AsyncResponse.class);  
Runnable train =  () -> {  
    Mockito.doReturn(result).when(response).longOperation();  
}  
Runnable verify = () -> Mockito.verify(response).resume(result);  

2、創(chuàng)建調(diào)用和驗證對象:

Runnable callAndVerify = () -> {  
    asyncResource.executeAsync(response).thenRun(verify); }  

3熄赡、最后創(chuàng)建要測試的任務(wù):

ExecutorService executor = Executors.newSingleThreadExecutor();  
AsyncResource asyncResource = new AsyncResource();  
asyncResource.setExecutorService(executor);  
CompletableFuture  
    .runAsync(train, executor)  
    .thenRun(callAndVerify);  

因為這是一個單元測試猾普,如果在給定的時間后沒有看到響應(yīng),我們可能希望失敗本谜。 我們可以使用CompletableFuture中對于Future接口的get()方法來實現(xiàn)初家。

異步分析網(wǎng)頁的鏈接

讓我們編碼實現(xiàn)如下需求:在Swing面板中顯示自動的分析網(wǎng)頁的鏈接(異步方式)

我們需要如下幾個步驟:

1、讀取網(wǎng)頁內(nèi)容
2乌助、獲取網(wǎng)頁鏈接
3溜在、Swing面板中顯示鏈接

當(dāng)然,修改Swing組件應(yīng)該從合適的線程完成他托,但是掖肋,我們不希望在此線程中運行長任務(wù)。
使用CompletableFuture赏参,很簡單就能實現(xiàn)了:


CompletableFuture.supplyAsync(  
    () -> readPage("http://whatever.com/")  
)  
.thenApply(page -> linkParser.getLinks(page))  
.thenAcceptAsync(  
    links -> displayPanel.display(links),  
    executor  
); 

第一步是創(chuàng)建異步執(zhí)行的Supplier志笼。 比如它以String形式返回網(wǎng)頁內(nèi)容。

第二步是將獲取到的頁面內(nèi)容傳遞給linkParser. 這是一個返回List<String>function函數(shù). 這前兩個任務(wù)在同一個線程中執(zhí)行.

最后一步只是獲取鏈接列表并顯示把篓。 這個任務(wù)需要訪問Swing組件纫溃,所以它應(yīng)該在Swing線程中執(zhí)行。 我們通過傳遞正確的executor作為參數(shù)來做到這一點韧掩。

有一點比較好:Executor接口是一個functional interface紊浩。 我們可以用lambda實現(xiàn)它:

Executor executor = runnable -> SwingUtilities.invokeLater(runnable); 

我們可以利用方法引用語法來編寫此模式的最終版本:

CompletableFuture.supplyAsync(  
    () -> readPage("http://whatever.com/")  
)  
.thenApply(Parser::getLinks)  
.thenAcceptAsync(  
    DisplayPanel::display,  
    SwingUtilities::invokeLater  
);  

CompletableFutures結(jié)合lambdas和方法引用可以編寫非常優(yōu)雅的代碼。

異常處理

CompletionStage API還提供了異常處理模式。 讓我們看一個例子坊谁。

假設(shè)我們有如圖所示的處理鏈:


Processing chain

所有這些CompletableFutures都使用我們在上面說到的模式鏈接在一起费彼。

現(xiàn)在假設(shè)CF21引發(fā)異常。 如果沒有對此異常做處理口芍,則所有下游的CompletableFutures都會出錯箍铲。 這意味著兩件事:

  • CF21, CF31, 和 CF41的CompletableFutures調(diào)用isCompletedExceptionally()都返回 true .
  • 這些對象調(diào)用get()方法都會拋出ExecutionException, 原因是因為CF21引發(fā)的根異常.

我們可以使用下圖所示的模式處理CompletableFutures鏈中的異常。

cf30 = cf21.exceptionally();  
handle exceptions

此模式創(chuàng)建的CompletableFuture具有以下屬性:

  • 如果CF21正常完成鬓椭,則CF30將透明地返回與CF21相同的值虹钮。
  • 如果CF21發(fā)生異常,則CF30能夠捕獲它膘融,并且可以將正常值傳輸?shù)紺F31芙粱。

有好幾種方法可以做到這一點,用不同方法的接受異常氧映。

exceptionally(Function<Throwable, T> function)是最簡單的方法調(diào)用. 它返回一個CompletionStage春畔,如果上游CompletionStage也正常完成,則返回的CompletionStage也會以相同的值正常完成岛都。 否則律姨,如果此上游CompletionStage發(fā)生異常,則將此異常傳遞給提供的函數(shù)臼疫。返回的CompletionStage正常完成择份,返回Function的結(jié)果。 此方法沒有異步版本烫堤。

handle(BiFunction<T, Throwable, R> bifunction) 具有相同的語義. 它返回一個CompletionStage荣赶,當(dāng)此階段正常或異常完成時鸽斟,將使用此階段的結(jié)果和異常作為所提供函數(shù)的參數(shù)執(zhí)行拔创。 如果上游CompletionStage正常完成,則Throwable為null調(diào)用BiFunction富蓄,如果異常完成剩燥,則R為null調(diào)用BiFunction。 在這兩種情況下立倍,都被能正常返回的CompletionStage灭红。該方法有兩個姐妹方法handleAsync(BiFunction<? super T,Throwable,? extends U> fn)handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor), 這兩種方法的工作方式相同口注,都是異步的变擒,只是執(zhí)行器不同。執(zhí)行器可以作為參數(shù)提供疆导。如果沒有提供赁项,則使用公共的fork/join 線程池葛躏。

第三種處理異常的方式是whenComplete(BiConsumer<T, Throwable> biconsumer)澈段。handle() 可以正常結(jié)束并返回CompletionStage悠菜,而whenComplete()則不盡然。 它遵循構(gòu)建的CompletionStage的流水線行為败富。 因此悔醋,如果上游CompletionStage發(fā)生異常,則whenComplete()返回的CompletionStage也會異常完成(結(jié)合exceptionally()理解)兽叮。使用上游CompletionStage的返回值及其此階段返回值調(diào)用BiConsumer. 與handle()情況一樣芬骄,將使用結(jié)果(或 null如果沒有))和此階段的異常(或 null如果沒有)調(diào)用BiConsumerBiConsumer沒有返回值鹦聪。 所以它只是一個不會影響CompletionStages流水線處理的回調(diào)账阻。 與handle()方法類似,該方法也有兩個姐妹方法whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)泽本。這兩種方法的工作方式都是異步的淘太。執(zhí)行器可以作為參數(shù)提供。如果沒有提供规丽,則使用公共的fork/join 線程池蒲牧。.

結(jié)論

CompletionStage 接口和CompletableFuture類帶來了異步處理數(shù)據(jù)的新方式。這個API非常復(fù)雜赌莺,主要是由于這個接口和類暴露的方法數(shù)量較多冰抢,但是,豐富的API使得我們處理異步數(shù)據(jù)流水線方面有了更多的選擇艘狭,以便更好的滿足應(yīng)用程序的需求挎扰。

這些API基于lambda表達式構(gòu)建,從而創(chuàng)造非常干凈且優(yōu)雅的代碼巢音。 它可以很好地控制哪個線程執(zhí)行每個任務(wù)鼓鲁。 它還允許以多種方式構(gòu)建流水線和組合任務(wù),并且在處理異常方面也提供對應(yīng)的方式方法港谊。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末骇吭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子歧寺,更是在濱河造成了極大的恐慌燥狰,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件斜筐,死亡現(xiàn)場離奇詭異龙致,居然都是意外死亡,警方通過查閱死者的電腦和手機顷链,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門目代,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事榛了≡谘龋” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵霜大,是天一觀的道長构哺。 經(jīng)常有香客問我,道長战坤,這世上最難降的妖魔是什么曙强? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮途茫,結(jié)果婚禮上碟嘴,老公的妹妹穿的比我還像新娘。我一直安慰自己囊卜,他們只是感情好娜扇,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著边败,像睡著了一般袱衷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上笑窜,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天致燥,我揣著相機與錄音,去河邊找鬼排截。 笑死嫌蚤,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的断傲。 我是一名探鬼主播脱吱,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼认罩!你這毒婦竟也來了箱蝠?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤垦垂,失蹤者是張志新(化名)和其女友劉穎宦搬,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體劫拗,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡间校,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了页慷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片憔足。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡胁附,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出滓彰,到底是詐尸還是另有隱情控妻,我是刑警寧澤,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布找蜜,位于F島的核電站饼暑,受9級特大地震影響稳析,放射性物質(zhì)發(fā)生泄漏洗做。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一彰居、第九天 我趴在偏房一處隱蔽的房頂上張望诚纸。 院中可真熱鬧,春花似錦陈惰、人聲如沸畦徘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽井辆。三九已至,卻和暖如春溶握,著一層夾襖步出監(jiān)牢的瞬間杯缺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工睡榆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留萍肆,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓胀屿,卻偏偏與公主長得像塘揣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子宿崭,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容