在主流的編程語言中蝌诡,Java一直走在簡化并發(fā)編程任務(wù)的最前沿。1996年Java發(fā)布時(shí)枫吧,就通過同步和wait/notify內(nèi)置了對線程的支持浦旱。Java5引入了java.util.concurrent類庫,提供了并行集合(concurrent collection)和執(zhí)行者框架(executor framework)九杂。Java7引入fork-join包颁湖,這是一個(gè)處理并行分解的高性能框架。Java8引入Stream例隆,只需要調(diào)用一次parallel方法就可以實(shí)現(xiàn)并行處理甥捺。在Java中編寫并發(fā)程序變得越來越容易,但是要編寫出正確又快速的并發(fā)程序镀层,則一向沒那么簡單涎永。安全性和活性失敗是并發(fā)編程中需要面對的問題,Stream pipeline并行也不例外鹿响。
請看摘自第45條的這段程序:
// Stream-based program to generate the first 20 Mersenne primes
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.forEach(System.out::println);
}
static Stream<BigInteger> primes() {
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
在我的機(jī)器上羡微,這段程序會立即開始打印素?cái)?shù),完成運(yùn)行花了12.5秒惶我。假設(shè)我天真的想通過在Stream pipeline上添加一個(gè)parallel()調(diào)用來提速妈倔。你認(rèn)為這樣會對其性能產(chǎn)生什么樣的影響?運(yùn)行速度會稍微塊一點(diǎn)嗎绸贡?還是會慢一點(diǎn)盯蝴?遺憾的是,其結(jié)果是根本不打印任何內(nèi)容听怕,CPU的使用率卻定在90%一動不動了(活性失斉跬Α)。程序最后可能會終止尿瞭,但是我們不想一探究竟闽烙,半個(gè)小時(shí)后就強(qiáng)行把它終止了。
這是怎么回事呢声搁?簡單的說黑竞,Stream類庫不知道如何并行這個(gè)pipeline,以及如何探索失敗疏旨。即便在最佳環(huán)境下很魂,如果源頭是來自Stream.iterate,或者使用了中間操作的limit檐涝,那么并行pipeline也不可能提升性能
遏匆。這個(gè)pipeline必須同時(shí)滿足這兩個(gè)條件法挨。更糟糕的是,默認(rèn)的并行策略在處理limit的不可預(yù)知性時(shí)幅聘,是假設(shè)額外多處理幾個(gè)元素凡纳,并放棄任何不需要的結(jié)果,這些都不會影響性能喊暖。在這種情況下,他查找每個(gè)梅森素?cái)?shù)時(shí)撕瞧,所花費(fèi)的時(shí)間大概是查找之前元素的兩倍陵叽。因而,額外多計(jì)算一個(gè)元素的成本丛版,大概相當(dāng)于計(jì)算所有之前元素總和的時(shí)間巩掺,這個(gè)貌似無傷大雅的pipeline,卻使得自動并行算法瀕臨崩潰页畦。這個(gè)故事的寓意很簡單:千萬不要任意的并行Stream pipeline
胖替。它造成的性能后果有可能是災(zāi)難性的。
總之豫缨,在Stream上通過并行獲得性能独令,最好是通過ArrayList、HashMap好芭、HashSet和ConcurrentHashMap實(shí)例燃箭,數(shù)組,int范圍和long范圍等
舍败。這些數(shù)據(jù)結(jié)構(gòu)的共性是招狸,都可以被精確、輕松的分成任意大小的子范圍邻薯,使并行線程的分工變得更加輕松裙戏。Stream類庫用來執(zhí)行這個(gè)任務(wù)的抽象是分割迭代器,它是由Stream和Iterable中的spliterator方法返回的厕诡。
這些數(shù)據(jù)結(jié)構(gòu)共有的另一項(xiàng)重要特性是累榜,在進(jìn)行順序處理時(shí),它們提供了優(yōu)異的引用局部性:序列化的元素引用一起保存在內(nèi)存中灵嫌。被那些引用訪問到的對象在內(nèi)存中可能不是一個(gè)緊挨著一個(gè)信柿,這降低了引用的局部性。事實(shí)證明醒第,引用局部性對于并發(fā)批處理來說至關(guān)重要:沒有它渔嚷,線程就會出現(xiàn)閑置,需要等待數(shù)據(jù)從內(nèi)存轉(zhuǎn)移到處理器的緩存稠曼。具有最佳引用局部性的數(shù)據(jù)結(jié)構(gòu)是基本類型數(shù)組形病,因?yàn)閿?shù)據(jù)本身是相鄰的保存在內(nèi)存中的。
Stream pipeline的終止操作本質(zhì)上也影響了并發(fā)執(zhí)行的效率。如果大量的工作在終止操作中完成漠吻,而不是全部工作在pipeline中完成量瓜,并且這個(gè)操作是固有的順序,那么并行pipeline的效率就會受到限制途乃。并行的最佳終止操作是做減法(reduction)绍傲,用一個(gè)Stream的reduce方法,將所有從pipeline產(chǎn)生的元素都合并在一起耍共,或者預(yù)先打包像min烫饼、max、count和sum這類方法试读。短路操作anyMatch杠纵、allMatch和noneMatch也都可以并行。由Stream的collect方法執(zhí)行的操作钩骇,都是可變的減法比藻,不是并行的最好選擇,因?yàn)楹喜⒓系某杀痉浅8摺?/p>
如果是自己編寫Stream倘屹、Iterable或者Collection實(shí)現(xiàn)银亲,并且想要得到適當(dāng)?shù)牟⑿行阅埽捅仨毟采wspliterator方法纽匙,并廣泛的測試結(jié)果Stream的并行性能群凶。編寫高質(zhì)量的分割迭代器很困難,并且超出了本書的討論范疇哄辣。
并行Stream不僅可能降低性能请梢,包括活性失敗,還可以導(dǎo)致結(jié)果出錯(cuò)力穗,以及難以預(yù)計(jì)的行為
(如安全性失斠慊 )。安全性失敗可能是因?yàn)椴⑿械膒ipeline使用了映射当窗、過濾器或者程序員自己編寫的其他函數(shù)對象够坐,并且沒有遵守它們的規(guī)范。Stream規(guī)范對于這些函數(shù)對象有著嚴(yán)格的要求條件崖面。例如元咙,傳到Stream的reduce操作的收集器函數(shù)和組合器函數(shù),必須是有關(guān)聯(lián)巫员、互不干擾庶香,并且是無狀態(tài)的。如果不滿足這些條件(在第46條提到了一些)简识,但是按序列運(yùn)行pipeline赶掖,可能會得到正確的結(jié)果感猛;如果并發(fā)運(yùn)行,則可能會突發(fā)性失敗奢赂。
以上值得注意的是陪白,并行的梅森素?cái)?shù)程序雖然運(yùn)行完成了,但是并沒有按正確的順序(升序)打印出素?cái)?shù)膳灶。為了保存序列化版本程序顯示的順序咱士,必須用forEachOrdered代替終止操作的forEach,它可以確保按enconuter順序遍歷并行的Stream轧钓。
假如在使用的是一個(gè)可以有效分割的源Stream序厉,一個(gè)可行的或者簡單的終止操作,以及互不干擾的函數(shù)對象聋迎,那么將無法獲得通過并行實(shí)現(xiàn)的提速脂矫,除非pipeline完成了足夠的實(shí)際工作枣耀,抵消了與并行相關(guān)的成本霉晕。據(jù)不完全估計(jì),Stream中的元素?cái)?shù)量捞奕,是每個(gè)元素所執(zhí)行的代碼行數(shù)的很多倍牺堰,至少是十萬倍。
切記:并行Stream是一項(xiàng)嚴(yán)格的性能優(yōu)化颅围。對于任何優(yōu)化都必須在改變前后對性能進(jìn)行測試伟葫,以確保值得這么做(詳見第67條)。最理想的是在實(shí)現(xiàn)的系統(tǒng)設(shè)置中進(jìn)行測試院促。一般來說筏养,程序中所有的并行Stream pipeline都是在一個(gè)通用的fork-join池中運(yùn)行的。只要有一個(gè)pipeline運(yùn)行異常常拓,都會損害到系統(tǒng)中其他不相關(guān)部分的性能渐溶。
聽起來貌似在并行Stream pipeline時(shí)怪事連連,其實(shí)正是如此弄抬。我有一個(gè)朋友茎辐,他發(fā)現(xiàn)在大量使用Stream的幾百萬行代碼中,只有少數(shù)幾個(gè)并行Stream是有效的掂恕。這并不意味著應(yīng)該避免使用并行Stream拖陆。在適當(dāng)?shù)臈l件下,給Stream pipeline添加parallel調(diào)用懊亡,確實(shí)可以在多處理器核的情況下實(shí)現(xiàn)近乎線性的倍增
依啰。某些域如機(jī)器學(xué)習(xí)和數(shù)據(jù)處理,尤其適用于這樣的提速店枣。
簡單舉一個(gè)并行Stream pipeline有效的例子孔飒。假設(shè)下面這個(gè)函數(shù)是用來計(jì)算π(n)灌闺,素?cái)?shù)的數(shù)量少于或者等于n:
// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
在我的機(jī)器上,這個(gè)函數(shù)花31秒完成了計(jì)算π(108)坏瞄。只要添加一個(gè)parallel()調(diào)用桂对,就把調(diào)用時(shí)間減到了9.2秒:
// Prime-counting stream pipeline - parallel version
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.parallel()
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
換句話說,并行計(jì)算在我的四核機(jī)器上添加了parallel()調(diào)用后鸠匀,速度加快了3.7倍蕉斜。值得注意的是,這并不是在實(shí)踐計(jì)算n值很大時(shí)的π(n)的方法缀棍。還有更加高效的算法宅此,如著名的Lehmer公式。
如果要并行一個(gè)隨機(jī)數(shù)的Stream爬范,應(yīng)該從SplittableRandom實(shí)例開始父腕,而不是從ThreadLocalRandom(或?qū)嶋H上已經(jīng)過時(shí)的Random)開始。SplittableRandom正是專門為此設(shè)計(jì)的青瀑,還有線性提速的可能璧亮。ThreadLocalRandom則只用于單線程,它將自身當(dāng)作一個(gè)并行的Stream源運(yùn)用到函數(shù)中斥难,但是沒有SplittableRandom那么快枝嘶。Random在每個(gè)操作上都進(jìn)行同步,因此會導(dǎo)致濫用哑诊,扼殺了并行的優(yōu)勢群扶。
總而言之,盡量不要并行Stream pipelien镀裤,除非有足夠的理由相信它能保證計(jì)算的正確性竞阐,并且能加快程序的運(yùn)行速度。如果對Stream進(jìn)行不恰當(dāng)?shù)牟⑿胁僮魇钊埃赡軐?dǎo)致程序運(yùn)行失敗骆莹,或者造成性能災(zāi)難。如果確信并行是可行的铃岔,并行運(yùn)行時(shí)一定要確保代碼正確汪疮,并在真實(shí)環(huán)境下認(rèn)真的進(jìn)行性能測量。如果代碼正確毁习,這些實(shí)驗(yàn)也證明它有助于提升性能智嚷,只有這時(shí)候,才可以在編寫代碼時(shí)并行Stream纺且。