-
并行流
parallelStream
并行流就是一個(gè)把內(nèi)容分成多個(gè)數(shù)據(jù)塊肄扎,并用不同的線程分別處理每個(gè)數(shù)據(jù)塊的流嫌松。這樣就可以自動(dòng)把給定操作的工作負(fù)荷分配給多核處理器的所有內(nèi)核,讓這些處理器都忙起來(lái)恶耽。
例如:下面的方法,接收數(shù)字n作為參數(shù)衅鹿,返回從1到給定參數(shù)的所有數(shù)字的和public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1)//生成自然數(shù)無(wú)限流 .limit(n)//限制前n個(gè)數(shù) .reduce(0L, Long::sum);//對(duì)所有數(shù)字求和
-
iterate
,Stream API提供由兩個(gè)靜態(tài)方法操你個(gè)函數(shù)生成流(常見(jiàn)的還有三種方法構(gòu)建流过咬,由值構(gòu)建大渤、由數(shù)組構(gòu)建、由文件構(gòu)建)掸绞,Stream.iterate
和Stream.generate
可以創(chuàng)建無(wú)限流泵三,會(huì)根據(jù)給定的函數(shù)按需創(chuàng)建。注意與generate
區(qū)別:
generate
不是一次對(duì)每個(gè)新生成的值應(yīng)用函數(shù)衔掸,例如以下使用的供應(yīng)源是無(wú)狀態(tài)的烫幕,即它不會(huì)在任何地方記錄任何值,以備以后計(jì)算使用敞映。iterate
供應(yīng)源不一定是無(wú)狀態(tài)的较曼。可以創(chuàng)建存儲(chǔ)狀態(tài)的供應(yīng)源驱显,它可以修改狀態(tài)诗芜,并在為流生成下一個(gè)值時(shí)使用Stream.generate(Math::random) .limit(5) .forEach(System.out::println);
這時(shí),若n特別大的時(shí)候埃疫,就可以做并行處理伏恐,將順序流轉(zhuǎn)換為并行流:對(duì)順序流調(diào)用parallel
即可:在處理時(shí),會(huì)把Stream在內(nèi)部分成幾塊栓霜,在不同的塊獨(dú)立并行歸納操作翠桦,最后,同一個(gè)歸納操作會(huì)將各個(gè)子流的部分歸納結(jié)果合并起來(lái)胳蛮,得到整個(gè)原始流的歸納結(jié)果销凑。
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()//將流轉(zhuǎn)換為并行流
.reduce(0L, Long::sum);
}
但是,在實(shí)際操作時(shí)仅炊,對(duì)順序流調(diào)用parallel
方法并不意味著對(duì)流有任何實(shí)際的變化斗幼。它內(nèi)部實(shí)際上就是設(shè)了一個(gè)boolean
標(biāo)志,表示你想讓調(diào)用parallel
之后進(jìn)行的所有操作都并行執(zhí)行抚垄。對(duì)并行流調(diào)用sequential
方法就可以把它變成順序流蜕窿。將兩個(gè)方法結(jié)合起來(lái),就可以更細(xì)化地控制在遍歷流時(shí)哪些操作要并行執(zhí)行呆馁,哪些要順序執(zhí)行桐经。例如:
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()//最后一次parallel或sequential調(diào)用會(huì)影響整個(gè)流水線
.reduce();
并行流默認(rèn)使用的線程使用了默認(rèn)的ForkJoinPool
,它默認(rèn)的線程數(shù)量就是你的處理器數(shù)量浙滤。
-
性能
實(shí)際上阴挣,上面利用并行版本會(huì)比順序版本滿很多,主要有兩方面原因:-
iterate
生成的是裝箱的對(duì)象纺腊,必須拆箱成數(shù)字才能求和畔咧; - 很難把
iterate
分成多個(gè)獨(dú)立塊來(lái)并行執(zhí)行茎芭,由于某些流操作會(huì)比其他車(chē)操作更容易并行化,之所以iterate
很難被分成獨(dú)立執(zhí)行的小塊盒卸,是因?yàn)槊看螒?yīng)用這個(gè)函數(shù)都要依賴前一次應(yīng)用的結(jié)果骗爆,也就說(shuō),在這種情況下蔽介,歸納進(jìn)程不像圖7.1那樣進(jìn)行摘投,整張數(shù)字列表在歸納過(guò)程開(kāi)始時(shí)沒(méi)有準(zhǔn)備好,因而無(wú)法有效地把流劃分為小塊來(lái)并行處理虹蓄。把流標(biāo)記成并行犀呼,其實(shí)是給順序處理增加了開(kāi)銷(xiāo),它還要把每次求和操作分到一個(gè)不同的線程上薇组。
所以說(shuō)外臂,并行編程可能很復(fù)雜,如果用的不對(duì)(
iterate
)律胀,會(huì)讓整體的性能變差宋光。 -
-
解決方法
使用LongStream.rangeClosed
方法,和iterate
相比有兩個(gè)優(yōu)點(diǎn):LongStream.rangeClosed
直接產(chǎn)生原始類(lèi)型的long
數(shù)字炭菌,沒(méi)有裝箱拆箱的開(kāi)銷(xiāo)罪佳。-
LongStream.rangeClosed
會(huì)生成數(shù)字范圍,很容易拆分為獨(dú)立的小塊黑低。public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); }
并行化是需要付出代價(jià)的赘艳,并行化過(guò)程本身需要對(duì)流做遞歸劃分,把每個(gè)子流的歸納操作分配到不同的線程克握,然后把這些操作的結(jié)果合并成一個(gè)值蕾管。但在多個(gè)內(nèi)核之間移動(dòng)數(shù)據(jù)的代價(jià)也可能比你想的要大,所以很重要的一點(diǎn)是要保證在內(nèi)核中并行執(zhí)行工作的時(shí)間比在內(nèi)核之間傳輸數(shù)據(jù)的時(shí)間長(zhǎng)菩暗。
-
正確使用并行流
上面直接使用并行化的錯(cuò)誤原因就是掰曾,使用的算法改變了某些共享狀態(tài)。- 留意裝箱停团。自動(dòng)裝箱和拆箱操作會(huì)大大降低性能旷坦。Java 8中有原始類(lèi)型流(
IntStream
、LongStream
客蹋、DoubleStream
)來(lái)避免這種操作, - 有些操作本身在并行流上的性能就比順序流差孽江。特別是
limit
和findFirst
等依賴于元素順序的操作讶坯,它們?cè)诓⑿辛魃蠄?zhí)行的代價(jià)非常大。例如岗屏,findAny
會(huì)比findFirst
性能好辆琅,因?yàn)樗灰欢ㄒ错樞騺?lái)執(zhí)行漱办。 - 對(duì)于較小的數(shù)據(jù)量,選擇并行流幾乎從來(lái)都不是一個(gè)好的決定婉烟。并行處理少數(shù)幾個(gè)元素的好處還抵不上并行化造成的額外開(kāi)銷(xiāo)娩井。
- 要考慮流背后的數(shù)據(jù)結(jié)構(gòu)是否易于分解。例如似袁,
ArrayList
的拆分效率比LinkedList
高得多洞辣,因?yàn)榍罢哂貌恢闅v就可以平均拆分,而后者則必須遍歷昙衅。另外扬霜,用range
工廠方法創(chuàng)建的原始類(lèi)型流也可以快速分解。 - 還要考慮終端操作中合并步驟的代價(jià)是大是卸妗(例如
Collector
中的combiner
方法)著瓶。如果這一步代價(jià)很大,那么組合每個(gè)子流產(chǎn)生的部分結(jié)果所付出的代價(jià)就可能會(huì)超出通過(guò)并行流得到的性能提升啼县。
- 留意裝箱停团。自動(dòng)裝箱和拆箱操作會(huì)大大降低性能旷坦。Java 8中有原始類(lèi)型流(
根據(jù)可分解性總結(jié)的一些流數(shù)據(jù)源適不適合并行
源 | 可分解性 |
---|---|
ArrayList |
極佳 |
LinkedList |
差 |
IntStream.range |
極差 |
Stream.iterate |
差 |
HashSet |
好 |
TreeSet |
好 |
分支/合并框架
分支/合并框架的目的是以遞歸方式將可以并行的任務(wù)拆分成更小的任務(wù)材原,然后將每個(gè)子任務(wù)的結(jié)果合并起來(lái)生成整體結(jié)果。它是ExecutorService
接口的一個(gè)實(shí)現(xiàn)季眷,它把子任務(wù)分配給線程池(稱為ForkJoinPool
)中的工作線程余蟹。
-
使用
RecursiveTask
把任務(wù)提交到這個(gè)池,必須創(chuàng)建RecursiveTask<R>的一個(gè)子類(lèi)瘟裸,其中R是并行化任務(wù)(以及所有子任務(wù))產(chǎn)生的結(jié)果類(lèi)型客叉,或者如果任務(wù)不返回結(jié)果,則是RecursiveAction類(lèi)型,則需要實(shí)現(xiàn)它的抽象方法compute
:
protected abstract R compute();
這個(gè)方法同時(shí)定義了將任務(wù)拆分成子任務(wù)的邏輯话告,以及無(wú)法再拆分或不方便再拆分時(shí)兼搏,生成單個(gè)子任務(wù)結(jié)果的邏輯。if (任務(wù)足夠小或不可分) { 順序計(jì)算該任務(wù) } else { 將任務(wù)分成兩個(gè)子任務(wù) 遞歸調(diào)用本方法沙郭,拆分每個(gè)子任務(wù)佛呻,等待所有子任務(wù)完成 合并每個(gè)子任務(wù)的結(jié)果 }
例如:利用分支/合并框架執(zhí)行并行求和
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;//不再將任務(wù)分成子任務(wù)的大小
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {//如果大小小于或等于閾值,順序計(jì)算結(jié)果
return computeSequentially();
}
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();//利用另一個(gè)ForkJoinPool線程異步執(zhí)行新創(chuàng)建的子任務(wù)
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length/2, end);//創(chuàng)建一個(gè)任務(wù)為數(shù)組的后一半數(shù)組求和
Long rightResult = rightTask.compute();//同步執(zhí)行第二個(gè)子任務(wù)病线,有可能允許進(jìn)一步遞歸劃分
Long leftResult = leftTask.join();//讀取第一個(gè)子任務(wù)的結(jié)果吓著,如果尚未完成就等待
return leftResult + rightResult;
}
//在子任務(wù)不再可分時(shí)計(jì)算結(jié)果的簡(jiǎn)單算法
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {{
sum += numbers[i];
}
return sum;
}
}
把數(shù)字?jǐn)?shù)組傳給ForkJoinSumCalculator
的構(gòu)造函數(shù),即可實(shí)現(xiàn)并行對(duì)前n個(gè)自然數(shù)求和:
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
//ForkJoinTask為RecursiveTask的父類(lèi)
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);//invoke用來(lái)執(zhí)行某個(gè)對(duì)象的目標(biāo)方法
}
-
工作竊人吞簟:
分支/合并框架工程用一種稱為工作竊劝筝骸(work stealing)用于在線程池中的工作線程之間重新分配和平衡任務(wù)。在實(shí)際應(yīng)用中惕耕,這意味著這些任務(wù)差不多被平均分配到ForkJoinPool中的所有線程上纺裁。每個(gè)線程都為分配給它的任務(wù)保存一個(gè)雙向鏈?zhǔn)疥?duì)列,每完成一個(gè)任務(wù),就會(huì)從隊(duì)列頭上取出下一個(gè)任務(wù)開(kāi)始執(zhí)行欺缘《霸ィ基于前面所述的原因,某個(gè)線程可能早早完成了分配給它的所有任務(wù)谚殊,也就是它的隊(duì)列已經(jīng)空了丧鸯,而其他的線程還很忙。這時(shí)嫩絮,這個(gè)線程并沒(méi)有閑下來(lái)丛肢,而是隨機(jī)選了一個(gè)別的線程,從隊(duì)列的尾巴上“偷走”一個(gè)任務(wù)絮记。這個(gè)過(guò)程一直繼續(xù)下去摔踱,直到所有的任務(wù)都執(zhí)行完畢,所有的隊(duì)列都清空怨愤,有助于更好地在工作線程之間平衡負(fù)載派敷。
image.png -
Spliterator接口
和Iterator
一樣,Spliterator
也用于遍歷數(shù)據(jù)源中的元素撰洗,但它是為了并行執(zhí)行而設(shè)計(jì)的篮愉。public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action);//類(lèi)似于Iterator,按順序一個(gè)一個(gè)使用Spliterator中的元素差导,如果有其他元素需要遍歷就返回true Spliterator<T> trySplit();//可以把一些元素劃出去分給第二個(gè)Spliterator试躏,讓他們并行處理 long estimateSize();//估計(jì)剩下還有多少元素需要遍歷 int characteristics();//代表Spliterator本身特征集的編碼∩韬郑可以用這些特征來(lái)更好地控制和優(yōu)化它的使用颠蕴。 }
將
Stream
拆分成多個(gè)部分的算法是一個(gè)遞歸過(guò)程,如圖所示助析。第一步是對(duì)第一個(gè)Spliterator
調(diào)用trySplit
犀被,生成第二個(gè)Spliterator
。第二步對(duì)這兩個(gè)Spliterator
調(diào)用trysplit
外冀,這樣總共就有了四個(gè)Spliterator
寡键。這個(gè)框架不斷對(duì)Spliterator
調(diào)用trySplit
直到它返回null
,表明它處理的數(shù)據(jù)結(jié)構(gòu)不能再分割雪隧,如第三步所示西轩。最后,這個(gè)遞歸拆分過(guò)程到第四步就終止了脑沿,這時(shí)所有的Spliterator
在調(diào)用trySplit
時(shí)都返回了null
藕畔。
//7.3.2自定義Spliterator