這篇博客,我們將介紹Java中的并行流(Parallel Stream)。
Java 8
引入了并行流的概念來(lái)進(jìn)行并行處理。 隨著技術(shù)的發(fā)展,我們所使用的計(jì)算機(jī)硬件成本越來(lái)越廉價(jià)枢析,性能卻遵循者摩爾定律越來(lái)越快。無(wú)論是大型服務(wù)器還是個(gè)人計(jì)算機(jī)刃麸,多核CPU已不再是觸不可及的稀缺資源醒叁,因此可以使用并行處理來(lái)充分榨取CPU資源獲取更好的性能。
下面讓我們通過(guò)2個(gè)例子來(lái)做個(gè)對(duì)比以更好的了解并行技術(shù)
public class Java8ParallelStreamMain {
public static void main(String[] args) {
System.out.println("=================================");
System.out.println("Using Sequential Stream");
System.out.println("=================================");
int[] array= {1,2,3,4,5,6,7,8,9,10};
IntStream intArrStream=Arrays.stream(array);
intArrStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
System.out.println("=================================");
System.out.println("Using Parallel Stream");
System.out.println("=================================");
IntStream intParallelStream=Arrays.stream(array).parallel();
intParallelStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
}
}
在自己的電腦上執(zhí)行上述代碼泊业,你會(huì)看到控制臺(tái)的輸出如下:
=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 main
8 ForkJoinPool.commonPool-worker-3
2 main
1 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-3
4 ForkJoinPool.commonPool-worker-5
9 ForkJoinPool.commonPool-worker-2
3 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-4
我們可以看到控制臺(tái)的輸出把沼,在順序執(zhí)行的情況下主線程(main thread)會(huì)完成所有工作。所有的工作串行執(zhí)行吁伺,直到等到當(dāng)前迭代執(zhí)行完成饮睬,才會(huì)進(jìn)行下一個(gè)迭代。
同樣篮奄,我們注意到在并行流的情況下捆愁,會(huì)同時(shí)生成6個(gè)線程,并在內(nèi)部使用Fork和Join池創(chuàng)建和管理線程窟却。并行流通過(guò)靜態(tài)ForkJoinPool.commonPool() 方法創(chuàng)建ForkJoinPool
實(shí)例昼丑。
并行流(Parallel Stream)利用所有可用CPU內(nèi)核的優(yōu)勢(shì),并并行處理任務(wù)夸赫。 如果任務(wù)數(shù)超過(guò)內(nèi)核數(shù)菩帝,則其余任務(wù)將等待當(dāng)前正在運(yùn)行的任務(wù)完成。
你可以通過(guò)Runtime.getRuntime().availableProcessors()
來(lái)獲取當(dāng)前計(jì)算機(jī)的CPU 內(nèi)核數(shù)量。
并行流優(yōu)勢(shì)如此大呼奢,那么我們應(yīng)該始終使用它嗎宜雀?
答案是否定的,
通過(guò)上述代碼我們可以看到僅通過(guò)添加parallel()
即可輕松將順序流轉(zhuǎn)換為并行流控妻,但是這并不意味著我們應(yīng)該始終使用它州袒。
在使用并行流時(shí)我們需要考慮很多因素揭绑,否則我們將會(huì)遭受并行流所帶來(lái)的負(fù)面影響弓候。
并行流比順序流具有更高的開銷,并且在線程之間的上下文進(jìn)行協(xié)調(diào)切換需要花費(fèi)大量時(shí)間他匪。
僅在以下情況下菇存,才需要考慮是否使用并行流:
- 需要處理大量數(shù)據(jù)集。
- 我們知道 Java 使用
ForkJoinPool
實(shí)現(xiàn)并行性邦蜜,ForkJoinPool
派生源流并提交執(zhí)行依鸥,因此源數(shù)據(jù)流應(yīng)該是可拆分的。例如:ArrayList
非常容易拆分悼沈,因?yàn)槲覀兛梢酝ㄟ^(guò)索引找到中間元素并將其拆分贱迟,但是LinkedList
很難拆分,并且在大多數(shù)情況下表現(xiàn)不佳絮供。在這種情況下就不適合用并行流衣吠。 - 我們?cè)谔幚韱栴}的時(shí)候確實(shí)遇到流性能問題,否則請(qǐng)不要為了并行而并行壤靶。
- 我們需要確保線程之間的所有共享資源都是正確同步缚俏,否則可能會(huì)產(chǎn)生數(shù)據(jù)不一致問題。
衡量并行度的最簡(jiǎn)單公式是Brian Goetz在其演講中提到的NQ模型贮乳。
N x Q >10000
N = 數(shù)據(jù)集中的項(xiàng)目數(shù)量
Q = 每個(gè)項(xiàng)目的工作量
這意味著如果您有大量數(shù)據(jù)集并且每個(gè)項(xiàng)目的工作量比較少(例如:求和)忧换,那么并行性可能會(huì)幫助您提升程序性能,反之亦然向拆。 因此亚茬,如果您有較少的數(shù)據(jù)集和每個(gè)項(xiàng)目更多的工作(做一些計(jì)算工作),那么并行性也可以幫助您提升程序性能浓恳。
下面才写,讓我們看一個(gè)相對(duì)復(fù)雜一點(diǎn)的用例。
即將展示的這個(gè)示例中奖蔓,我們將非常明顯的看到在并行流和順序流的情況下赞草,程序執(zhí)行的時(shí)長(zhǎng)和CPU的行為。
public class PerformanceComparisonMain {
public static void main(String[] args) {
long currentTime=System.currentTimeMillis();
List<Integer> data=new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
data.add(i);
}
long sum=data.stream()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
System.out.println(sum);
long endTime=System.currentTimeMillis();
System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
}
public static int performComputation(int number)
{
int sum=0;
for (int i = 1; i < 1000000; i++) {
int div=(number/i);
sum+=div;
}
return sum;
}
}
當(dāng)執(zhí)行上述代碼吆鹤,控制臺(tái)輸出如下:
117612733
Time taken to complete:5 minutes
上述代碼執(zhí)行時(shí)長(zhǎng)與我們電腦的CPU密切相關(guān)厨疙,我們來(lái)看看在執(zhí)行代碼過(guò)程中CPU的行為
如您所見,在順序流的情況下疑务,CPU沒有得到充分利用沾凄。
讓我們更改一下上面的代碼梗醇,在8行加上.parallel()
并行處理。
long sum=data.stream()
.parallel()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
再次執(zhí)行上述代碼撒蟀,您將會(huì)看到如下控制臺(tái)輸出
117612733
Time taken to complete:1 minutes
讓我們?cè)俅蝸?lái)看看在并行執(zhí)行的情況下CPU的運(yùn)行情況
綜上對(duì)比我們發(fā)現(xiàn)在并行執(zhí)行的情況下程序的執(zhí)行性能提升了將近5倍叙谨。前面我們已經(jīng)提到,并行流底層實(shí)現(xiàn)采用的ForkJoin
機(jī)制保屯,關(guān)于Fork-Join
的更多細(xì)節(jié)手负,下一篇博文呈現(xiàn)給大家。
祝您編碼愉快~