CompletableFuture初識(shí)

1為什么引入CompletableFuture份殿?

1.1 回調(diào)

回調(diào)函數(shù)比較通用的解釋是氨鹏,它是一個(gè)通過函數(shù)指針調(diào)用的函數(shù)搀罢。如果你把函數(shù)的指針(地址)作為參數(shù)傳遞給另一個(gè)函數(shù)艘绍,當(dāng)這個(gè)指針被用為調(diào)用它所指向的函數(shù)時(shí),我們就說這是回調(diào)函數(shù)泉沾±搪欤回調(diào)函數(shù)不是由該函數(shù)的實(shí)現(xiàn)方直接調(diào)用,而是在特定的事件或條件發(fā)生時(shí)由另外一方調(diào)用的跷究,用于對(duì)該事件或條件進(jìn)行響應(yīng)。

回調(diào)函數(shù)的機(jī)制:

定義一個(gè)回調(diào)函數(shù)敲霍;

提供函數(shù)實(shí)現(xiàn)的一方在初始化時(shí)候俊马,將回調(diào)函數(shù)的函數(shù)指針注冊(cè)給調(diào)用者;

當(dāng)特定的事件或條件發(fā)生的時(shí)候肩杈,調(diào)用者使用函數(shù)指針調(diào)用回調(diào)函數(shù)對(duì)事件進(jìn)行處理柴我。

1.2 回調(diào)方式的異步編程

JDK5 新增了 Future 接口,用于描述一個(gè)異步計(jì)算的結(jié)果扩然。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力艘儒,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果夫偶。阻塞的方式顯然和我們的異步編程的初衷相違背界睁,輪詢的方式又會(huì)耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果兵拢。

為什么不能用觀察者設(shè)計(jì)模式呢翻斟?即當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽者。

有一些開源框架實(shí)現(xiàn)了我們的設(shè)想:

例如 Netty 的 ChannelFuture 類擴(kuò)展了 Future 接口说铃,通過提供 addListener 方法實(shí)現(xiàn)支持回調(diào)方式的異步編程访惜。Netty 中所有的 I/O 操作都是異步的,這意味著任何的 I/O 調(diào)用都將立即返回嘹履,而不保證這些被請(qǐng)求的 I/O 操作在調(diào)用結(jié)束的時(shí)候已經(jīng)完成。取而代之地债热,你會(huì)得到一個(gè)返回的 ChannelFuture 實(shí)例砾嫉,這個(gè)實(shí)例將給你一些關(guān)于 I/O 操作結(jié)果或者狀態(tài)的信息。當(dāng)一個(gè) I/O 操作開始的時(shí)候窒篱,一個(gè)新的 Future 對(duì)象就會(huì)被創(chuàng)建焰枢。在開始的時(shí)候,新的 Future 是未完成的狀態(tài)--它既非成功舌剂、失敗济锄,也非被取消,因?yàn)?I/O 操作還沒有結(jié)束霍转。如果 I/O 操作以成功荐绝、失敗或者被取消中的任何一種狀態(tài)結(jié)束了,那么這個(gè) Future 將會(huì)被標(biāo)記為已完成避消,并包含更多詳細(xì)的信息(例如:失敗的原因)低滩。請(qǐng)注意,即使是失敗和被取消的狀態(tài)岩喷,也是屬于已完成的狀態(tài)恕沫。

Google guava也提供了通用的擴(kuò)展Future:ListenableFuture、SettableFuture 以及輔助類Futures等纱意,方便異步編程婶溯。

Scala也提供了簡(jiǎn)單易用且功能強(qiáng)大的Future/Promise異步編程模式。

CompletableFuture提供了非常強(qiáng)大的 Future 的擴(kuò)展功能偷霉,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性迄委,并且提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果类少,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法叙身。

1.3 CompletableFuture

處理非阻塞調(diào)用的傳統(tǒng)方法是使用事件處理器,程序員為任務(wù)完成之后要出現(xiàn)的動(dòng)作注冊(cè)一個(gè)處理器硫狞。但是信轿,要嘗試在一組事件處理器中實(shí)現(xiàn)一個(gè)控制流會(huì)很困難。

CompletableFuture提供了一種候選方法残吩,與事件處理器不同财忽,CompletableFuture可以組合。利用CompletableFuture世剖,可以指定希望做什么定罢,以及希望以什么順序執(zhí)行這些工作。這些動(dòng)作不會(huì)立即發(fā)生旁瘫,不過重要的是將所有代碼放在一起祖凫。

CompletableFuture提供了非常強(qiáng)大的 Future 的擴(kuò)展功能琼蚯,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,并且提供了函數(shù)式編程的能力惠况,可以通過回調(diào)的方式處理計(jì)算結(jié)果遭庶,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法。

對(duì)于阻塞或者輪詢方式稠屠,依然可以通過 CompletableFuture 類的 CompletionStage 和 Future 接口方式支持峦睡。

CompletableFuture 類聲明了 CompletionStage 接口,CompletionStage 接口實(shí)際上提供了同步或異步運(yùn)行計(jì)算的舞臺(tái)权埠,所以我們可以通過實(shí)現(xiàn)多個(gè) CompletionStage 命令榨了,并且將這些命令串聯(lián)在一起的方式實(shí)現(xiàn)多個(gè)命令之間的觸發(fā)。

2.每一種方法的三種形式

public<U> CompletableFuture<U> thenApply(

? ? ? ? Function<? super T,? extends U> fn) {returnuniApplyStage(null,fn);}? ? public <U> CompletableFuture<U> thenApplyAsync(

? ? ? ? Function<? super T,? extends U> fn) {returnuniApplyStage(asyncPool,fn);}? ? public <U> CompletableFuture<U> thenApplyAsync(

? ? ? ? Function<? super T,? extends U> fn, Executor executor) {returnuniApplyStage(screenExecutor(executor),fn);}

privatestaticfinalbooleanuseCommonPool=(ForkJoinPool.getCommonPoolParallelism()>1);/**

? ? * Default executor -- ForkJoinPool.commonPool() unless it cannot

? ? * support parallelism.

? ? */privatestaticfinalExecutorasyncPool=useCommonPool?ForkJoinPool.commonPool():newThreadPerTaskExecutor();/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */staticfinalclassThreadPerTaskExecutorimplementsExecutor{publicvoidexecute(Runnabler){newThread(r).start();}}

非異步方法由當(dāng)前線程或調(diào)用線程執(zhí)行

不帶executor的異步方法使用asyncPool來執(zhí)行

1)如果不支持多線程攘蔽,則新建一個(gè)線程專門執(zhí)行

2)否則使用ForkJoinPool.commonPool()執(zhí)行

另一種異步方法使用executor執(zhí)行

3.創(chuàng)建CompletableFuture

publicCompletableFuture(){}privateCompletableFuture(Object r){this.result=r;}

publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier){returnasyncSupplyStage(asyncPool,supplier);}publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier,Executorexecutor){returnasyncSupplyStage(screenExecutor(executor),supplier);}

publicstaticCompletableFuture<Void>runAsync(Runnablerunnable){returnasyncRunStage(asyncPool,runnable);}publicstaticCompletableFuture<Void>runAsync(Runnablerunnable,Executorexecutor){returnasyncRunStage(screenExecutor(executor),runnable);}

4.中間組合操作

Runnable類型的參數(shù)會(huì)忽略計(jì)算的結(jié)果

Consumer是純消費(fèi)計(jì)算結(jié)果龙屉,BiConsumer會(huì)組合另外一個(gè)CompletionStage純消費(fèi)

Function會(huì)對(duì)計(jì)算結(jié)果做轉(zhuǎn)換,BiFunction會(huì)組合另外一個(gè)CompletionStage的計(jì)算結(jié)果做轉(zhuǎn)換满俗。

4.1 轉(zhuǎn)換

thenApply功能相當(dāng)于將CompletableFuture<T>轉(zhuǎn)換成CompletableFuture<U>转捕。

public<U> CompletableFuture<U> thenApply(

? ? ? ? Function<? super T,? extends U> fn) {returnuniApplyStage(null,fn);}

public<U>CompletableFuture<U>applyToEither(? ? ? ? CompletionStage<?extendsT>other, Function<?superT,U>fn) {? ? ? ? return orApplyStage(null, other, fn);? ? }

其實(shí)從功能上來講,thenCombine的功能更類似thenAcceptBoth唆垃,只不過thenAcceptBoth是純消費(fèi)五芝,它的函數(shù)參數(shù)沒有返回值,而thenCombine的函數(shù)參數(shù)fn有返回值辕万。

public<U,V>CompletableFuture<V> thenCombine(

? ? ? ? CompletionStage<? extends U> other,

? ? ? ? BiFunction<? super T,? super U,? extends V> fn) {returnbiApplyStage(null,other,fn);}

這一組方法接受一個(gè)Function作為參數(shù)枢步,這個(gè)Function的輸入是當(dāng)前的CompletableFuture的計(jì)算值,返回結(jié)果將是一個(gè)新的CompletableFuture蓄坏,這個(gè)新的CompletableFuture會(huì)組合原來的CompletableFuture和函數(shù)返回的CompletableFuture价捧。

public<U> CompletableFuture<U> thenCompose(

? ? ? ? Function<? super T, ? extends CompletionStage<U>> fn) {returnuniComposeStage(null,fn);}

示例:

publicclassTest3{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>f=future.thenApplyAsync(i->i*10).thenApply(i->i.toString());System.out.println(f.get());}}

結(jié)果:

1000

publicclassTest6{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>future2=CompletableFuture.supplyAsync(()->{return"abc";});CompletableFuture<String>f=future.thenCombine(future2,(x,y)->y+"-"+x);System.out.println(f.get());}}

結(jié)果:

abc-100

publicclassTest7{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>f=future.thenCompose(i->{returnCompletableFuture.supplyAsync(()->{return(i*10)+"";});});System.out.println(f.get());}}

結(jié)果:

1000

4.2 消費(fèi)

thenAccept只對(duì)結(jié)果執(zhí)行Action,而不返回新的計(jì)算值涡戳。

publicCompletableFuture<Void>thenAccept(Consumer<?superT>action){returnuniAcceptStage(null,action);}

public<U>CompletableFuture<Void>thenAcceptBoth(CompletionStage<?extends U>other,BiConsumer<?superT,?superU>action){returnbiAcceptStage(null,other,action);}

public<U>CompletableFuture<U>applyToEither(? ? ? ? CompletionStage<?extendsT>other, Function<?superT,U>fn) {? ? ? ? return orApplyStage(null, other, fn);? ? }

示例:

publicclassTest4{publicstatic voidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenAccept(System.out::println);System.out.println(f.get());}}

結(jié)果:

100null

publicclassTest5{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenAcceptBoth(CompletableFuture.completedFuture(10),(x,y)->System.out.println(x*y));System.out.println(f.get());}}

結(jié)果:

1000null

4.3 運(yùn)行

thenRun更徹底地,下面一組方法當(dāng)計(jì)算完成的時(shí)候會(huì)執(zhí)行一個(gè)Runnable脯倚,與thenAccept不同渔彰,Runnable并不使用CompletableFuture計(jì)算的結(jié)果。

publicCompletableFuture<Void>thenRun(Runnable action){returnuniRunStage(null,action);}

publicCompletableFuture<Void>runAfterBoth(CompletionStage<?>other,Runnable action){returnbiRunStage(null,other,action);}

publicCompletableFuture<Void>runAfterEither(CompletionStage<?>other,Runnable action){returnorRunStage(null,other,action);}

4.4 批量

publicstaticCompletableFuture<Void>allOf(CompletableFuture<?>...cfs){returnandTree(cfs,0,cfs.length-1);}

publicstaticCompletableFuture<Object>anyOf(CompletableFuture<?>...cfs){returnorTree(cfs,0,cfs.length-1);}

5.終止操作

public<U>CompletableFuture<U>handle(BiFunction<?superT,Throwable,?extendsU>fn){returnuniHandleStage(null,fn);}

publicCompletableFuture<T>whenComplete(BiConsumer<?superT,?superThrowable>action){returnuniWhenCompleteStage(null,action);}

publicCompletableFuture<T>exceptionally(Function<Throwable,?extendsT>fn){returnuniExceptionallyStage(fn);}

6.通過阻塞或者輪詢的方式獲得結(jié)果

get在遇到底層異常時(shí)推正,會(huì)拋出受查異常ExecutionException恍涂。

publicTget()throwsInterruptedException,ExecutionException{Objectr;returnreportGet((r=result)==null?waitingGet(true):r);}publicTget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{Objectr;longnanos=unit.toNanos(timeout);returnreportGet((r=result)==null?timedGet(nanos):r);}

join在遇到底層的異常時(shí),會(huì)拋出未受查的CompletionException植榕。

publicTjoin(){Objectr;returnreportJoin((r=result)==null?waitingGet(false):r);}

publicTgetNow(TvalueIfAbsent){Objectr;return((r=result)==null)?valueIfAbsent:reportJoin(r);}

示例:

publicclassTest1{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{inti=1/0;return100;});System.out.println(future.get());}}

結(jié)果:

Exceptioninthread"main"java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/by zero? ? at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at com.enjoy.learn.core.concurrency.completablefuture.Test1.main(Test1.java:19)Causedby:java.lang.ArithmeticException:/by zero

publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{inti=1/0;return100;});System.out.println(future.join());}

結(jié)果:

Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero? ? at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)? ? at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)? ? at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)? ? at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)? ? at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)? ? at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)? ? at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)? ? at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: java.lang.ArithmeticException: / by zero

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末再沧,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子尊残,更是在濱河造成了極大的恐慌炒瘸,老刑警劉巖淤堵,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異顷扩,居然都是意外死亡拐邪,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門隘截,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扎阶,“玉大人,你說我怎么就攤上這事婶芭《危” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵犀农,是天一觀的道長(zhǎng)惰赋。 經(jīng)常有香客問我,道長(zhǎng)井赌,這世上最難降的妖魔是什么谤逼? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮仇穗,結(jié)果婚禮上流部,老公的妹妹穿的比我還像新娘。我一直安慰自己纹坐,他們只是感情好枝冀,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耘子,像睡著了一般果漾。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谷誓,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天绒障,我揣著相機(jī)與錄音,去河邊找鬼捍歪。 笑死户辱,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的糙臼。 我是一名探鬼主播庐镐,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼变逃!你這毒婦竟也來了必逆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎名眉,沒想到半個(gè)月后粟矿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡璧针,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年嚷炉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片探橱。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡申屹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出隧膏,到底是詐尸還是另有隱情哗讥,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布胞枕,位于F島的核電站杆煞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏腐泻。R本人自食惡果不足惜决乎,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望派桩。 院中可真熱鬧构诚,春花似錦、人聲如沸铆惑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽员魏。三九已至丑蛤,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間撕阎,已是汗流浹背受裹。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留虏束,地道東北人名斟。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像魄眉,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子闷袒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361