緒論
之前的幾章中情连,我們已經(jīng)看到了新的Stream接口可以讓你以聲明性方式處理數(shù)據(jù)集沸呐。我們還解釋了將外部迭代換為內(nèi)部迭代能夠讓原生Java庫控制流元素的處理呐伞。這種方法讓Java程序員無需顯示實現(xiàn)優(yōu)化來為數(shù)據(jù)集的處理加速颈渊。到目前為止,最重要的好處是可以對這些集合執(zhí)行操作流水線俐填,能夠自動利用計算機(jī)上的多個內(nèi)核安接。
在本篇中,你將了解Stream接口如何讓你不用太費(fèi)力氣就能對數(shù)據(jù)執(zhí)行并行操作英融。它允許你聲明性的將順序流變?yōu)椴⑿辛髡甸堋4送猓阒v看到Java是如何變戲法的驶悟,或者更實際地來說胡野,流是如何在幕后應(yīng)用java7引入的分支/合并框架的。你還會發(fā)現(xiàn)痕鳍,了解并行流內(nèi)部是如何工作的很重要硫豆,因為如果你忽視這一方面,就可能因誤用而得到意外(很可能是錯的)的結(jié)果。
為此够庙,會特別顯示恭应,在并行處理數(shù)據(jù)塊之前,并行流被劃分為數(shù)據(jù)塊的方式在某些情況下恰恰是這些錯誤且無法解釋的結(jié)果的根源耘眨。因此,你將會學(xué)習(xí)到如何通過實現(xiàn)和使用你自己的Spliterator來控制這個劃分過程境肾。
并行流
可以通過對收集源調(diào)用parallelStream方法來把集合轉(zhuǎn)換為并行流剔难。并行流就是一個把內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個數(shù)據(jù)塊的流奥喻。這樣一來偶宫,你就可以自動把給定操作的工作負(fù)荷分配給多核處理器的所有內(nèi)核,讓他們都忙起來环鲤。
假設(shè)要寫一個方法纯趋,接受數(shù)字n作為參數(shù),并返回從1到給定參數(shù)的的所有數(shù)字的和冷离。一個直接的方法是生成一個無窮大的數(shù)字流吵冒,把它限制到給定的數(shù)目,然后用對兩個數(shù)字求和的BinaryOperater來歸約這個流
public static long sequentialSum(long n){
return Stream.iterate(1L,i ->i + 1)
.limit(n).reduce(0L,long :: sum);
}
這似乎是利用并行處理的好機(jī)會西剥,特別是n很大的時候痹栖。那怎么入手呢?你要對結(jié)果變量進(jìn)行同步嗎瞭空?用多少個線程呢揪阿?誰負(fù)責(zé)生成數(shù)呢?誰來做加法呢咆畏?
根本不用擔(dān)心南捂。用并行流的話,這問題就簡單多了旧找。
將順序流轉(zhuǎn)化為并行流
可以把流轉(zhuǎn)化成并行流溺健,從而讓前面的函數(shù)歸約過程(也就是求和)并行運(yùn)行——對順序流調(diào)用parallel方法:
public static long parallelSum(long n){
return Stream.iterate(1L,i -> i +1)
.limit(n)
.parallel()
.reduce(0L,Long::sum);
}
上面的代碼的不同之處在于Stream在內(nèi)部分成了幾塊。因此可以對不同的快獨立并行進(jìn)行歸納操作钦讳。
請注意矿瘦,在現(xiàn)實中,對順序流調(diào)用parallel方法并不意味著流本身有任何實際的變化愿卒。它在內(nèi)部實際上就是設(shè)了一個boolean標(biāo)志缚去,表示你想讓調(diào)用parallel之后進(jìn)行的所有操作都并行執(zhí)行。類似的琼开,你只需要對并行流調(diào)用sequential方法就可以刻把它變成順序流易结。請注意,你可能以為吧這兩個方法都結(jié)合起來,就可以更細(xì)化地控制在遍歷流時那些操作要并行執(zhí)行搞动,哪些要順序執(zhí)行躏精。例如:
stream.parallel().filter(...).sequential().map(...).parallel().reduce();
但是最后一次parallel或者sequential調(diào)用會影響整個流水線。在本例中鹦肿,流水線會并行執(zhí)行矗烛,因為最后調(diào)用的是它。
那么箩溃,調(diào)用parallel方法瞭吃,你可能會想,并行流用的線程是從哪兒 來的呢涣旨?有多少個歪架?怎么定義這個過程?
并行流內(nèi)部使用了默認(rèn)的ForkJoinPool霹陡,它默認(rèn)的是線程數(shù)量就是處理器的數(shù)量和蚪,這個值是Runtime.getRuntime().availableProcessors()得到的。
但是你可一個通過系統(tǒng)屬性來改變烹棉。System.setProperty();這是一個全局設(shè)置攒霹,因此它將影響代碼中所有的的并行流。反過來說峦耘,目前還無法專為某個并行流指定這個值剔蹋。一般而言,讓ForkJoinPool的大小等于處理器數(shù)量是個不錯的默認(rèn)值辅髓,強(qiáng)烈不建議修改它泣崩。
測量流性能
我們聲稱并行求和方法應(yīng)該比順序求和迭代方法性能好。然而在軟件工程上洛口,靠猜絕對不是什么好辦法矫付!特別是在優(yōu)化性能時,要測量第焰,測量买优,再測量。
例子:測量對前n個自然數(shù)求和的函數(shù)的性能
public long measureSumPerf(Function<Long ,Long> adder,long n){
long fastest = Long.MAX_VALUE;
for(int i= 0;i<10;i++){
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime()-start)/1000000;
System.out.println("Result"+sum);
if(duration<fastest) fastest = duration;
}
return fastest;
}
現(xiàn)在就可以把先前開發(fā)的所有方法都放進(jìn)了一個名為ParallelStreams的類挺举,你就可以用這個框架來測試書序加法器函數(shù)對前已前往個自然數(shù)求和要多久:
System.out.println(meadureSumPerf(ParallelStreams::sequentialSum,10000000))
//結(jié)果:97
用傳統(tǒng)for循環(huán)的迭代版本執(zhí)行起來應(yīng)該會快很多杀赢,因為它更為底層,更重要的是不需要對原始類型做任何裝箱和拆箱的工作湘纵。
System.out.println(measureSumPerf(ParallelStreams::iterativeSum,10000000));
//結(jié)果: 2
現(xiàn)在我們對函數(shù)的并行版本做測試:
System.out.println(measureSumPerf(ParallelStreams::parallelSum,10000000));
//結(jié)果:164
令人相當(dāng)?shù)氖蓿蠛头椒ǖ牟⑿邪姹颈软樞驁?zhí)行版本要慢的多。這里有兩個實際問題:
- iterate生成的是裝箱的對象梧喷,必須拆箱成數(shù)字才能求和砌左;
- 我們很難吧iterate分成多個獨立塊來并行執(zhí)行脖咐。
iterate很難分割成能夠獨立執(zhí)行的小塊,因為每次應(yīng)用這個函數(shù)都要依賴前一次用用的結(jié)果:
這意味著汇歹,在這個特定的情況下屁擅,歸納進(jìn)程不是像將順序流轉(zhuǎn)化成并行流那樣進(jìn)行的;整張數(shù)字列表在歸納過程開始時沒有準(zhǔn)備好产弹,因而無法有效地把流劃分為小塊來并行處理派歌。把流標(biāo)記成并行,你其實是個順序處理增加了開銷取视,他還要八二每次求和操作分到一個不同的線程上硝皂。
這就說明了并行編程可能很復(fù)雜,有時候甚至有點違反直覺作谭。如果用的不對(比如才用了一個不易并行化的操作,如iterate)奄毡,它甚至可能讓程序的整體性能更差折欠,所以在調(diào)用那個看似神奇的aprallel操作時,了解背后到底發(fā)生了什么是很有必要的吼过。
使用更有針對性的方法
那到底要怎么利用多核處理器锐秦,用流來高效地并行求和呢?我們在第5章中討論了一個叫LongStream.rangeClosed的方法盗忱。這個方法與iterate相比有兩個優(yōu)點酱床。
- LongStream.rangeClosed直接產(chǎn)生原始類型的long數(shù)字,沒有裝箱拆箱的操作
- LongStream.rangeClosed會生成數(shù)字范圍趟佃,很容易拆分為獨立的小塊扇谣。列如,范圍120可分為15闲昭,610罐寨,1115,16~20序矩。
先看一下它用于順序流時的性能如何鸯绿,看看拆箱的開銷:
public static long rangedSum(long n){
return LongStream.rangeClosed(1,n).reduce(0L,Long::sum);
}
//輸出 17
這個數(shù)值流比前面那個用iterate工廠方法生成數(shù)字的順序執(zhí)行版本要快得多,因為數(shù)值流避免了非針對想流那些沒必要的自動裝箱和拆箱的操作簸淀。由此可見瓶蝴,選擇適當(dāng)?shù)臄?shù)據(jù)結(jié)構(gòu)往往比并行化算法更為有效。使用并行的效果呢租幕?
public static long rangedSum(long n){
return LongStream.rangeClosed(1,n).parallel().reduce(0L,Long::sum);
}
//輸出 1
終于有了一個比順序執(zhí)行更快的并行歸納舷手。這也表明,使用正確的數(shù)據(jù)結(jié)構(gòu)然后使其并行工作能夠保證最佳的性能令蛉。
盡管如此聚霜,請記住狡恬,并行化并不是沒有代價的。并行化過程本身需要對流做遞歸劃分蝎宇,把每個子流的歸納操作分配到不同的線程弟劲,然后把這些操作的結(jié)果合并成一個值。但在多個內(nèi)核之間移動數(shù)據(jù)的代價也可能比你想要的要大姥芥,所以很重要的一點是要辦證在內(nèi)核中并行執(zhí)行工作的時間比在內(nèi)核之間傳輸數(shù)據(jù)的時間長兔乞。
高效使用并行流
- 如果有疑問,測量凉唐。把順序轉(zhuǎn)成并行流輕而易舉庸追,但卻比一定是好事。因為并行流并不總是比順序快台囱。
- 留意裝箱淡溯。自動裝箱和拆箱操作會大大降低性能。java8中有原始類型流(IntStream,LongStream,DoubleStream)來避免這種操作簿训,但凡有可能都要用這些流咱娶。
- 有些操作本身在并行流上的性能就比順序流查。特別是limit和findFirst等依賴于元素順序的操作强品,他們在并行流上執(zhí)行的代價非常大膘侮。findAny會比findFirst性能好,因為它不一定要按順序執(zhí)行的榛。
- 對于較小的數(shù)據(jù)量琼了,選擇并行流幾乎從來都不是一個好的決定。并行處理少數(shù)幾個元素的好處還抵不上并行化造成額外的開銷夫晌。
-
看流的內(nèi)容是否可拆分
流背后使用的基礎(chǔ)架構(gòu)是java7中引入的分支/合并框架雕薪。并行匯總的實例證明了要想正確使用并行流,了解它的內(nèi)部原理至關(guān)重要慷丽。
分支/合并框架
分支/合并框架的目的是以遞歸方式將可以并行的任務(wù)拆分成更小的任務(wù)蹦哼,然后將每個子任務(wù)的結(jié)果合并起來生成整體結(jié)果。它是ExecutorService接口的一個實現(xiàn)要糊,它把子任務(wù)分配給線程池(稱為ForkJoinPool)中的工作線程纲熏。