1為什么引入CompletableFuture份殿?
1.1 回調(diào)
回調(diào)函數(shù)比較通用的解釋是氨鹏,它是一個(gè)通過函數(shù)指針調(diào)用的函數(shù)搀罢。如果你把函數(shù)的指針(地址)作為參數(shù)傳遞給另一個(gè)函數(shù)艘绍,當(dāng)這個(gè)指針被用為調(diào)用它所指向的函數(shù)時(shí),我們就說這是回調(diào)函數(shù)泉沾±搪欤回調(diào)函數(shù)不是由該函數(shù)的實(shí)現(xiàn)方直接調(diào)用,而是在特定的事件或條件發(fā)生時(shí)由另外一方調(diào)用的跷究,用于對(duì)該事件或條件進(jìn)行響應(yīng)。
回調(diào)函數(shù)的機(jī)制:
定義一個(gè)回調(diào)函數(shù)敲霍;
提供函數(shù)實(shí)現(xiàn)的一方在初始化時(shí)候俊马,將回調(diào)函數(shù)的函數(shù)指針注冊(cè)給調(diào)用者;
當(dāng)特定的事件或條件發(fā)生的時(shí)候肩杈,調(diào)用者使用函數(shù)指針調(diào)用回調(diào)函數(shù)對(duì)事件進(jìn)行處理柴我。
1.2 回調(diào)方式的異步編程
JDK5 新增了 Future 接口,用于描述一個(gè)異步計(jì)算的結(jié)果扩然。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力艘儒,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果夫偶。阻塞的方式顯然和我們的異步編程的初衷相違背界睁,輪詢的方式又會(huì)耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果兵拢。
為什么不能用觀察者設(shè)計(jì)模式呢翻斟?即當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽者。
有一些開源框架實(shí)現(xiàn)了我們的設(shè)想:
例如 Netty 的 ChannelFuture 類擴(kuò)展了 Future 接口说铃,通過提供 addListener 方法實(shí)現(xiàn)支持回調(diào)方式的異步編程访惜。Netty 中所有的 I/O 操作都是異步的,這意味著任何的 I/O 調(diào)用都將立即返回嘹履,而不保證這些被請(qǐng)求的 I/O 操作在調(diào)用結(jié)束的時(shí)候已經(jīng)完成。取而代之地债热,你會(huì)得到一個(gè)返回的 ChannelFuture 實(shí)例砾嫉,這個(gè)實(shí)例將給你一些關(guān)于 I/O 操作結(jié)果或者狀態(tài)的信息。當(dāng)一個(gè) I/O 操作開始的時(shí)候窒篱,一個(gè)新的 Future 對(duì)象就會(huì)被創(chuàng)建焰枢。在開始的時(shí)候,新的 Future 是未完成的狀態(tài)--它既非成功舌剂、失敗济锄,也非被取消,因?yàn)?I/O 操作還沒有結(jié)束霍转。如果 I/O 操作以成功荐绝、失敗或者被取消中的任何一種狀態(tài)結(jié)束了,那么這個(gè) Future 將會(huì)被標(biāo)記為已完成避消,并包含更多詳細(xì)的信息(例如:失敗的原因)低滩。請(qǐng)注意,即使是失敗和被取消的狀態(tài)岩喷,也是屬于已完成的狀態(tài)恕沫。
Google guava也提供了通用的擴(kuò)展Future:ListenableFuture、SettableFuture 以及輔助類Futures等纱意,方便異步編程婶溯。
Scala也提供了簡(jiǎn)單易用且功能強(qiáng)大的Future/Promise異步編程模式。
CompletableFuture提供了非常強(qiáng)大的 Future 的擴(kuò)展功能偷霉,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性迄委,并且提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果类少,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法叙身。
1.3 CompletableFuture
處理非阻塞調(diào)用的傳統(tǒng)方法是使用事件處理器,程序員為任務(wù)完成之后要出現(xiàn)的動(dòng)作注冊(cè)一個(gè)處理器硫狞。但是信轿,要嘗試在一組事件處理器中實(shí)現(xiàn)一個(gè)控制流會(huì)很困難。
CompletableFuture提供了一種候選方法残吩,與事件處理器不同财忽,CompletableFuture可以組合。利用CompletableFuture世剖,可以指定希望做什么定罢,以及希望以什么順序執(zhí)行這些工作。這些動(dòng)作不會(huì)立即發(fā)生旁瘫,不過重要的是將所有代碼放在一起祖凫。
CompletableFuture提供了非常強(qiáng)大的 Future 的擴(kuò)展功能琼蚯,可以幫助我們簡(jiǎn)化異步編程的復(fù)雜性,并且提供了函數(shù)式編程的能力惠况,可以通過回調(diào)的方式處理計(jì)算結(jié)果遭庶,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法。
對(duì)于阻塞或者輪詢方式稠屠,依然可以通過 CompletableFuture 類的 CompletionStage 和 Future 接口方式支持峦睡。
CompletableFuture 類聲明了 CompletionStage 接口,CompletionStage 接口實(shí)際上提供了同步或異步運(yùn)行計(jì)算的舞臺(tái)权埠,所以我們可以通過實(shí)現(xiàn)多個(gè) CompletionStage 命令榨了,并且將這些命令串聯(lián)在一起的方式實(shí)現(xiàn)多個(gè)命令之間的觸發(fā)。
2.每一種方法的三種形式
public<U> CompletableFuture<U> thenApply(
? ? ? ? Function<? super T,? extends U> fn) {returnuniApplyStage(null,fn);}? ? public <U> CompletableFuture<U> thenApplyAsync(
? ? ? ? Function<? super T,? extends U> fn) {returnuniApplyStage(asyncPool,fn);}? ? public <U> CompletableFuture<U> thenApplyAsync(
? ? ? ? Function<? super T,? extends U> fn, Executor executor) {returnuniApplyStage(screenExecutor(executor),fn);}
privatestaticfinalbooleanuseCommonPool=(ForkJoinPool.getCommonPoolParallelism()>1);/**
? ? * Default executor -- ForkJoinPool.commonPool() unless it cannot
? ? * support parallelism.
? ? */privatestaticfinalExecutorasyncPool=useCommonPool?ForkJoinPool.commonPool():newThreadPerTaskExecutor();/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */staticfinalclassThreadPerTaskExecutorimplementsExecutor{publicvoidexecute(Runnabler){newThread(r).start();}}
非異步方法由當(dāng)前線程或調(diào)用線程執(zhí)行
不帶executor的異步方法使用asyncPool來執(zhí)行
1)如果不支持多線程攘蔽,則新建一個(gè)線程專門執(zhí)行
2)否則使用ForkJoinPool.commonPool()執(zhí)行
另一種異步方法使用executor執(zhí)行
3.創(chuàng)建CompletableFuture
publicCompletableFuture(){}privateCompletableFuture(Object r){this.result=r;}
publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier){returnasyncSupplyStage(asyncPool,supplier);}publicstatic<U>CompletableFuture<U>supplyAsync(Supplier<U>supplier,Executorexecutor){returnasyncSupplyStage(screenExecutor(executor),supplier);}
publicstaticCompletableFuture<Void>runAsync(Runnablerunnable){returnasyncRunStage(asyncPool,runnable);}publicstaticCompletableFuture<Void>runAsync(Runnablerunnable,Executorexecutor){returnasyncRunStage(screenExecutor(executor),runnable);}
4.中間組合操作
Runnable類型的參數(shù)會(huì)忽略計(jì)算的結(jié)果
Consumer是純消費(fèi)計(jì)算結(jié)果龙屉,BiConsumer會(huì)組合另外一個(gè)CompletionStage純消費(fèi)
Function會(huì)對(duì)計(jì)算結(jié)果做轉(zhuǎn)換,BiFunction會(huì)組合另外一個(gè)CompletionStage的計(jì)算結(jié)果做轉(zhuǎn)換满俗。
4.1 轉(zhuǎn)換
thenApply功能相當(dāng)于將CompletableFuture<T>轉(zhuǎn)換成CompletableFuture<U>转捕。
public<U> CompletableFuture<U> thenApply(
? ? ? ? Function<? super T,? extends U> fn) {returnuniApplyStage(null,fn);}
public<U>CompletableFuture<U>applyToEither(? ? ? ? CompletionStage<?extendsT>other, Function<?superT,U>fn) {? ? ? ? return orApplyStage(null, other, fn);? ? }
其實(shí)從功能上來講,thenCombine的功能更類似thenAcceptBoth唆垃,只不過thenAcceptBoth是純消費(fèi)五芝,它的函數(shù)參數(shù)沒有返回值,而thenCombine的函數(shù)參數(shù)fn有返回值辕万。
public<U,V>CompletableFuture<V> thenCombine(
? ? ? ? CompletionStage<? extends U> other,
? ? ? ? BiFunction<? super T,? super U,? extends V> fn) {returnbiApplyStage(null,other,fn);}
這一組方法接受一個(gè)Function作為參數(shù)枢步,這個(gè)Function的輸入是當(dāng)前的CompletableFuture的計(jì)算值,返回結(jié)果將是一個(gè)新的CompletableFuture蓄坏,這個(gè)新的CompletableFuture會(huì)組合原來的CompletableFuture和函數(shù)返回的CompletableFuture价捧。
public<U> CompletableFuture<U> thenCompose(
? ? ? ? Function<? super T, ? extends CompletionStage<U>> fn) {returnuniComposeStage(null,fn);}
示例:
publicclassTest3{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>f=future.thenApplyAsync(i->i*10).thenApply(i->i.toString());System.out.println(f.get());}}
結(jié)果:
1000
publicclassTest6{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>future2=CompletableFuture.supplyAsync(()->{return"abc";});CompletableFuture<String>f=future.thenCombine(future2,(x,y)->y+"-"+x);System.out.println(f.get());}}
結(jié)果:
abc-100
publicclassTest7{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<String>f=future.thenCompose(i->{returnCompletableFuture.supplyAsync(()->{return(i*10)+"";});});System.out.println(f.get());}}
結(jié)果:
1000
4.2 消費(fèi)
thenAccept只對(duì)結(jié)果執(zhí)行Action,而不返回新的計(jì)算值涡戳。
publicCompletableFuture<Void>thenAccept(Consumer<?superT>action){returnuniAcceptStage(null,action);}
public<U>CompletableFuture<Void>thenAcceptBoth(CompletionStage<?extends U>other,BiConsumer<?superT,?superU>action){returnbiAcceptStage(null,other,action);}
public<U>CompletableFuture<U>applyToEither(? ? ? ? CompletionStage<?extendsT>other, Function<?superT,U>fn) {? ? ? ? return orApplyStage(null, other, fn);? ? }
示例:
publicclassTest4{publicstatic voidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenAccept(System.out::println);System.out.println(f.get());}}
結(jié)果:
100null
publicclassTest5{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{return100;});CompletableFuture<Void>f=future.thenAcceptBoth(CompletableFuture.completedFuture(10),(x,y)->System.out.println(x*y));System.out.println(f.get());}}
結(jié)果:
1000null
4.3 運(yùn)行
thenRun更徹底地,下面一組方法當(dāng)計(jì)算完成的時(shí)候會(huì)執(zhí)行一個(gè)Runnable脯倚,與thenAccept不同渔彰,Runnable并不使用CompletableFuture計(jì)算的結(jié)果。
publicCompletableFuture<Void>thenRun(Runnable action){returnuniRunStage(null,action);}
publicCompletableFuture<Void>runAfterBoth(CompletionStage<?>other,Runnable action){returnbiRunStage(null,other,action);}
publicCompletableFuture<Void>runAfterEither(CompletionStage<?>other,Runnable action){returnorRunStage(null,other,action);}
4.4 批量
publicstaticCompletableFuture<Void>allOf(CompletableFuture<?>...cfs){returnandTree(cfs,0,cfs.length-1);}
publicstaticCompletableFuture<Object>anyOf(CompletableFuture<?>...cfs){returnorTree(cfs,0,cfs.length-1);}
5.終止操作
public<U>CompletableFuture<U>handle(BiFunction<?superT,Throwable,?extendsU>fn){returnuniHandleStage(null,fn);}
publicCompletableFuture<T>whenComplete(BiConsumer<?superT,?superThrowable>action){returnuniWhenCompleteStage(null,action);}
publicCompletableFuture<T>exceptionally(Function<Throwable,?extendsT>fn){returnuniExceptionallyStage(fn);}
6.通過阻塞或者輪詢的方式獲得結(jié)果
get在遇到底層異常時(shí)推正,會(huì)拋出受查異常ExecutionException恍涂。
publicTget()throwsInterruptedException,ExecutionException{Objectr;returnreportGet((r=result)==null?waitingGet(true):r);}publicTget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{Objectr;longnanos=unit.toNanos(timeout);returnreportGet((r=result)==null?timedGet(nanos):r);}
join在遇到底層的異常時(shí),會(huì)拋出未受查的CompletionException植榕。
publicTjoin(){Objectr;returnreportJoin((r=result)==null?waitingGet(false):r);}
publicTgetNow(TvalueIfAbsent){Objectr;return((r=result)==null)?valueIfAbsent:reportJoin(r);}
示例:
publicclassTest1{publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{inti=1/0;return100;});System.out.println(future.get());}}
結(jié)果:
Exceptioninthread"main"java.util.concurrent.ExecutionException:java.lang.ArithmeticException:/by zero? ? at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)at com.enjoy.learn.core.concurrency.completablefuture.Test1.main(Test1.java:19)Causedby:java.lang.ArithmeticException:/by zero
publicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{CompletableFuture<Integer>future=CompletableFuture.supplyAsync(()->{inti=1/0;return100;});System.out.println(future.join());}
結(jié)果:
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero? ? at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)? ? at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)? ? at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)? ? at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)? ? at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)? ? at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)? ? at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)? ? at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)Caused by: java.lang.ArithmeticException: / by zero