昔日未來之星 Future接口
上一章講了并行流汰规,這一張也該講講流的并發(fā)了。要用到并發(fā)柏靶,無非就是多開些線程弃理,Thread、Runable之類的俗套咱就不多說了屎蜓,直接來簡單講講Future接口痘昌。沒有接觸過并發(fā)編程的同學可以先自行了解一下相關知識,以免對后面的內(nèi)容感到生澀難懂炬转。Future顧名思義辆苔,就是未來要產(chǎn)生的值,如果我要耗費較長時間來計算出一個值扼劈,并且在后面用到這個值之前還會做一些簡單的其他任務驻啤,就可以使用Future。
例5.0:
Future<Integer> future =
Executors.newSingleThreadExecutor().submit(() -> calculator());
//do something else ...
future.get();
上面的例子中荐吵,我們Executors的工廠方法實例化了一個單線程線程池并將其開啟骑冗,然后通過submit方法向其提交了一個任務calculator。后面這個Lambda表達式代表著一個匿名的Callable或Runnble對象先煎,Callable的call方法相對于Runnable的run方法多了一個返回值贼涩,在call方法執(zhí)行結束后會進行回調(diào),將值傳給我們在最前面聲明的Future薯蝎。在線程池執(zhí)行任務期間遥倦,我們可以繼續(xù)在當前線程中做其他事情,然后在最后需要用到計算結果的時候占锯,調(diào)用Future的get方法來獲取袒哥。如果這時已經(jīng)計算出結果缩筛,get會直接返回此結果,否則將會一直阻塞下去统诺,我們可以為其傳入第二個參數(shù)設置超時時間歪脏,超出時限還沒有計算出結果就會拋出異常疑俭,結束計算粮呢。我們也可以調(diào)用cancel方法來手動關停任務,另外還有兩個方法isCancel與isDone來檢查Future所對應任務的當前執(zhí)行狀態(tài)钞艇。了解了Furture啄寡,我們就來試試如何將其與函數(shù)式編程結合起來,和流結合起來哩照。算了挺物,不想試了,反正最后也不會用它飘弧,直接來看新東西吧识藤。
遲早要完 CompletableFuture類
Future固然很好,但是結合起Stream來還是不太方便次伶,為此Java8新加入了CompletableFuture類痴昧,該類實現(xiàn)了Future接口與CompletionStage接口,它的新方法會更好的與函數(shù)式編程相結合冠王。CompletableFuture意為可完成的未來赶撰,即未來要完成之事,由于名字較長柱彻,我們姑且稱其為<strong style="color:#FF0000;">要完</strong>豪娜。老Future一般只能通過向線程池提交任務的方式來獲取實例,而我們可愛的要完君則添加了supplyAsync工廠方法哟楷,可直接使用該類默認的線程池來異步執(zhí)行任務瘤载。
例5.1:
List<CompletableFuture<String>> completableFutures= Stream.iterate(1, i->i+1)
.map(i-> CompletableFuture.supplyAsync(()->i.toString()))
.collect(Collectors.toList());
List<String> strings=completableFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
為了方便大家理解,我沿用了上面迭代生成1~1000的例子卖擅,當然上面已經(jīng)說過這樣會帶來頻繁裝包拆包的問題鸣奔,但是我們不管它,直接看代碼磨镶。在map方法中溃蔫,要完類通過supplyAsync方法,以異步方式提交了一個將整數(shù)轉化為字符串的任務琳猫,并返回相應的要完對象伟叛,然后由collect方法收集成一個List<CompletableFuture<String>>。一套流程執(zhí)行完畢后脐嫂,又進行了第二次迭代统刮,通過join方法將要完映射成字符串并收集起來紊遵。join方法與get一樣都是用來獲取運算結果的,只不過不會拋出異常侥蒙,不然就需要在我們美麗的函數(shù)式語句中加入臃腫的try/catch塊暗膜。
可能有讀者看了上面的代碼會產(chǎn)生以下三個疑問:為何代碼要分成兩部分來寫,而不是直接再調(diào)用一次map方法鞭衩,然后收集成List<String>学搜?是否每一次的迭代都會單獨執(zhí)行在一個線程之中?相比并行论衍,并發(fā)代碼看起來很繁瑣瑞佩,效率上會比并發(fā)更高嗎?下面我就來一一解答坯台。
抽刀斷水 分步迭代
之所以拆成兩部分寫炬丸,是因為合在一起的寫法會影響并發(fā)。上述代碼中第二個map操作會通過join方法將要完映射為字符串蜒蕾,而這時如果這時第一個map所提交的異步任務還沒有計算出結果稠炬,join方法顯然會發(fā)生阻塞。一旦某一迭代元素的join方法發(fā)生了阻塞咪啡,那么后面的元素就只能苦苦等待首启,等待前面元素的join方法計算完畢再通過第一個map方法提交自己的異步任務,依次類推瑟匆。如果所有元素的join方法都阻塞闽坡,我們這個并發(fā)和串行還有什么區(qū)別呢?相比之下愁溜,分開的寫法會先將所有的任務提交而不進行求值疾嗅,直接在最后收集成一個List<CompletableFuture>,再在第二次迭代的時候調(diào)用join方法冕象,這樣不同線程上的任務就可以一并等待了代承。
化龍金池 Executor接口
前面說過Future對象一般只能通過向線程池提交任務來獲得,而要完則是通過supplyAsync工廠方法來獲得渐扮。為什么它可以這么拽呢论悴?我們通過分析源碼便可得知,它原來是內(nèi)置了ForkJoinPool墓律。ForkJoin的意思就是分支合并膀估,和我們講并行是提到的分支合并框架都是一路貨色,所以默認情況下的并發(fā)和并行在具體實現(xiàn)和運行效率沒有多大差別耻讽。這里可以回答我們后面的兩個問題察纯,我們的并發(fā)會開啟多少線程以及效率是否比并行要好。由于ForkJoinPool所提供的線程是有限的,所以肯定不會每次迭代都開啟一個新線程饼记,具體會開多少我也沒有深入研究香伴,大家還請繼續(xù)自行查驗,不過肯定不會很多具则,不能說你想提交多少個任務人家就給你開幾個進程即纲。效率方面,既然跟并行是一個路子的東西博肋,肯定也差不多了多少低斋。相比之下,并行只需要調(diào)用一個方法束昵,簡潔得很拔稳,那么并發(fā)還有用的必要嗎?答案當然是肯定的锹雏,不然我也不會在這里講,并發(fā)既然專門有個類了术奖,那肯定就比只有一個方法的并行靈活的多礁遵。
首先要解決的就是效率問題,之所以并發(fā)并行差不多采记,就是因為并發(fā)默認用了并行的那一套東西佣耐,如果我們想根據(jù)實際情況對并發(fā)進行優(yōu)化,就必須要革換線程池唧龄。為此兼砖,要完提供了supplyAsync的重載版本,允許我們在第二個參數(shù)位置傳入我們自己選擇的線程池既棺,即一個Executor類型的對象讽挟。可以將原代碼進行如下修改丸冕。
例5.2:
Executor executor = Executors.newFixedThreadPool(1000);
Stream.iterate(1, i -> i + 1)
.map(i -> CompletableFuture.supplyAsync(() -> i.toString(),executor))
...
上述代碼中耽梅,我新建了一個線程池,該線程池擁有固定數(shù)量為1000的線程池胖烛,平均每個元素都能分到一個線程專門給自己用來迭代眼姐。那么線程池中的線程是不是越多越好呢?當然不是佩番,若果你沒有那么多的任務众旗,開那么多線程不也是浪費啊,具體開多少才合適趟畏,就要涉及到任務密集類型與CPU線程相關的內(nèi)容了贡歧,這里就不多講了。有關Executor的更多原理與詳細用法,這里也不想多講艘款,真正要到的讀者可以專門去看看Java并發(fā)編程相關的書籍資料持际。
奔流相繼 then方法族
前面兩個例子中,都是只在map方法中簡單提交了一個異步任務哗咆,要完本身似乎沒有顯露出什么函數(shù)式的風范蜘欲。其實它的本事可大著呢,真操練起來絲毫不遜色于前面的Stream與Optional晌柬,它的可級聯(lián)的方法皆以then開頭姥份,很符合要完用于提交異步任務的本質。
例5.3:
CompletableFuture.supplyAsync(() -> "CF")
.thenApply(s -> s + ".thenApply")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ".thenCompose"))
.thenAccept(System.out::println);
這里用到了三個then族方法年碘,看名字與參數(shù)大致可以推測一下它們的含義澈歉。thenApply方法會在任務執(zhí)行完畢后再繼續(xù)執(zhí)行其他操作,消費之前的結果并產(chǎn)生新的結果屿衅,有一點像map類的方法埃难。thenCompose方法跟thenApply功能類似,只是傳入的表達式需要返回CompletionStage類型的值涤久。CompletionStage前面說過涡尘,是要完除了Future外所實現(xiàn)的另一個接口,可以看做是它的養(yǎng)父响迂。盡管在表達式的返回值上有區(qū)別考抄,thenCompose方法的返回值卻仍然和thenApply一樣是單層泛型參數(shù)的要完,而不會發(fā)生嵌套蔗彤,返回一個CompletableFuture<CompletableFuture<U>>川梅。這就很像flatMap類的方法。這兩個比喻然遏,已經(jīng)被寫在Javadoc中,這就說明啦鸣,要完的設計思想與Stream潮饱、Optional、Collector等接口是很類似的诫给,都體現(xiàn)了級聯(lián)式函數(shù)式編程的思想香拉。下面再來看看最后一個方法thenAccept,該方法的參數(shù)與前兩者不同中狂,前兩者都是傳入的Function可以產(chǎn)生新的運算結果凫碌,而它則傳入一個Consumer直接把之前算出來的結果給干掉了。盡管如此胃榕,為了和其他then族方法保持一致盛险,免得級聯(lián)在它這里掉隊瞄摊,它還是很誠實的返回一個CompletableFuture<Void>,注意這里的Void是開頭大寫的包裝類苦掘,我們?nèi)绻麑λ祷氐目找暝龠M行join换帜、get之類的求值操作就只能得到空空如也的null。
并駕齊驅 雙元then族
我們雖然可以通過thenCompose方法將兩個要完結合在一起鹤啡,但是畢竟作為方法調(diào)用者的要完與作為表達式結果的要完是不平等的關系惯驼,他們中間隔著一個then,就意味著后者需要在前者執(zhí)行完畢后递瑰,在處理它計算出來的結果祟牲,嚼人家吃剩的東西。而有些情況下抖部,兩個要完并不存在依賴關系说贝,二者可以同時進行,只需要在最后將他們的結果合并到一起慎颗,這就需要用到thenCombine方法乡恕,傳入一個BiFunction。
例5.4:
CompletableFuture.supplyAsync(() -> "former")
.thenCombine(CompletableFuture.supplyAsync(()->"latter"),
(former,latter)->former+latter)
.thenRun(()->{});
thenCombine方法有兩個參數(shù)哗总,第一個是要合并的要完几颜,第二個就是合并的規(guī)則,當然你也可以選擇無視前兩者的運算結果讯屈,當他們都運行完畢后直接返回一個空要完,當然這一般沒什么意義县习。上述代碼中涮母,級聯(lián)到最后調(diào)用了一個thenRun,該方法傳入一個Runable參數(shù)躁愿,來對級聯(lián)進行收尾工作叛本。我們知道Runnable接口中需要實現(xiàn)的run方法是沒有返回值的,所以thenRun和thenAccept一個德行彤钟,執(zhí)行后返回一個空要完来候,后面再接什么join、get甚至thenAccept都沒有什么意義了逸雹。不過thenAccept后面到是可以接一個thenRun营搅,在我們?nèi)f事皆畢之后,用它來做一些收尾工作梆砸,比如發(fā)送個完成任務的消息給控制臺或者用戶界面转质。thenCombine的第二個參數(shù)是個BiFunction(二元函數(shù)),類似的還有BiConsumer(二元消費者)帖世,如果把第二個參數(shù)換成它休蟹,就得到了thenAcceptBoth方法。這個方法名字和thenAccept很像,thenAccept方法吃掉了前面計算出來的值后就不再往外吐東西赂弓,thenAcceptBoth比它多了個Both就變本加厲绑榴,可以吃掉兩個要完計算出來的值。當然這話說得對于他們兩個有失公正盈魁,既然設計了他們翔怎,就必然會有用處,Consumer和BiConsumer沒有返回值备埃,所以就只能通過在代碼中產(chǎn)生副作用來對外部造成影響姓惑。
then族方法到這里只講了三分之一,不過不用擔心按脚,剩下的方法都是在前面方法的基礎上設計出來的于毙。他們的名字都是在前面講到的方法后面加上Async,顯然會異步的執(zhí)行新增的任務辅搬,比如thenCombineAsync就會單獨開啟一個線程來執(zhí)行兩個要完的合并操作唯沮。這些方法還有一種重載,在后面加上了Executor參數(shù)堪遂,讓我們自行選擇線程池來執(zhí)行異步操作介蛉。
更多驚喜 更多then族
正當我天真的以為then族方法到這里就講完了,卻發(fā)現(xiàn)要完還有很多不姓then但是仍然屬于then族的方法溶褪,比如下面這倆貨币旧。
例5.5:
CompletableFuture.supplyAsync(() -> "first task")
.applyToEither(CompletableFuture.supplyAsync(() -> "second task")
,s -> s+" is quickest")
.acceptEither(CompletableFuture.supplyAsync(() -> "third task is quickest")
,System.out::println);
applyToEither方法會在兩個要完有一個先完的時候,將它的計算結果作為參數(shù)進行下一步運算猿妈,acceptEither與之同理吹菱,會消費掉先完犢子那貨的計算結果。此外還有彭则,runAterBoth與runAfterEither鳍刷,看名字也知道一個是在都完成的時候run,一個是在有一個先完的時候run俯抖。與正統(tǒng)的then族方法一樣输瓜,這些方法也都有對應的兩種Async系方法》移迹
以上這些方法都沒有考慮計算出錯的問題尤揣,一旦在Lambda表達式中拋出了異常,后果便不堪設想担忧,為此就出現(xiàn)了以下三個救世方法:exceptionally芹缔、whenComplete、handle瓶盛。exceptionally方法用于對任務執(zhí)行時發(fā)生了異常的要完進行補償最欠,它的參數(shù)是一個Function<Throwable,T>示罗,需要傳入一個并返回一個補償后計算結果,有點像異常處理結構中的catch塊芝硬。whenComplete方法與之類似蚜点,參數(shù)是一個BiConsumer,傳入計算結果與異常拌阴,在計算結束的時候進行處理绍绘,如果順利計算出結果,那么異常就會是null迟赃,否則計算結果是null陪拘。與thenAccept系方法不同,whenComplete方法不會只吃不吐纤壁,它的返回值的泛型參數(shù)是計算結果的類型而不是Void左刽,調(diào)用該方法會返回要完自身。如果不想返回自身酌媒,而是在此基礎上對結果進行進一步的操作欠痴,就像thenApply做的那樣,我們可以調(diào)用handle方法秒咨,它的參數(shù)是一個BiFunction喇辽,我們在前面已經(jīng)講過它與BiConsumer的區(qū)別,相信大家看到這里已經(jīng)能領悟到此方法的功能了雨席。whenComplete與handle兩個方法也都有對應的Async系方法菩咨。
要完不完 取值方法
以上所有的方法都有一個共同的特點,就是返回值都是要完類型陡厘,所以它們才能進行級聯(lián)旦委。這些方法很像是Stream接口中的延時求值方法,而get和join就類似相應的及時求值方法雏亚。除了這兩個方法,我們還可以getNow方法來獲取要完的計算結果摩钙,它只有一個參數(shù)罢低,代表當缺少計算結果是要提供的默認值,如果你很心急胖笛,現(xiàn)在就要网持,那么可以試試getNow。除此之外长踊,還可以調(diào)用complete方法功舀,傳入一個計算結果給要完返回一個boolean,要玩這時候會處于完成狀態(tài)身弊,后面如果再調(diào)用join之類的方法辟汰,就會立即獲得我們傳入的值列敲。除了complete,要完還從老Future那里繼承來了一個cancel方法帖汞,兩個方法名字看上去很類似戴而,功能也很類似,但具體差別在哪里翩蘸,我這里沒做研究就不亂說了所意,以免誤人子弟,只是推薦大家優(yōu)先使用complete催首,畢竟新方法新氣象不是扶踊?
終于要完 其他方法
講了這么多,要完的方法還沒講完郎任,不過剩的也沒幾個秧耗。有的名字又長又生僻,讓人看了就不想用涝滴,我就不講了绣版,還有的諸如isCancel、isDone一看就知道什么意思我也不講了歼疮。最后就再提一個有意思的方法吧杂抽,名字叫做toCompletableFuture,該方法會返回要完本身韩脏。之所以會有這樣的奇葩方法缩麸,不是為了讓要完的方法再多一些,而是因為要完它養(yǎng)父CompletionStage里面有這么個方法要它來實現(xiàn)赡矢,所以也只好出此下策了杭朱。
還沒有完 靜態(tài)方法
本來以為到這里終于要把要完的方法講完了,卻發(fā)現(xiàn)漏了靜態(tài)方法吹散,吐血弧械。除了supplyAsync方法,要完類還有allOf空民、anyOf刃唐、CompleteFuture、runAsync四個工廠方法界轩。allOf與anyOf方法通過傳入不同泛型參數(shù)的要完數(shù)組來開啟多個要完画饥,不同的是allOf會在所有的要完都完了之后返回一個空要完,而anyOf會在任意一個要完完成后浊猾,返回一個包含了該要完計算結果的新要完抖甘。CompleteFuture直接返回一個包含了你想要的計算結果的新要完,而runAsync和supplyAsync很相似葫慎,只不過參數(shù)是一個Runable衔彻,所以按照慣例返回值也只能是個泛型參數(shù)為Void的空要完薇宠。
結語
看了這么多的方法,單獨使用他們其實還不夠米奸,最終的目的是要和流搭配起來昼接。具體的寫法很多樣,可以在map方法里傳入一個要完悴晰,再在后面級聯(lián)的map方法中寫入諸如cf->cf.thenCompose(CompletableFuture.supplyAsync(e->e))
的表達式慢睡,也可以直接在第一個map中就直接調(diào)用要完的各個級聯(lián)。具體的代碼我就先不展示了铡溪,等有空寫個大點的工程漂辐,挑點好的代碼給大家一起玩味一番。