一贵少、簡(jiǎn)述
JDK8為了讓處理大數(shù)據(jù)量集合更快速组砚,使用了并行的形式來進(jìn)行處理峻呕。在上面的例子中我們也看到了利职,如果我們需要一個(gè)并行流的話,只要對(duì)一個(gè)集合打開parallelStream即可瘦癌。在JDK7以前猪贪,想要對(duì)一個(gè)集合進(jìn)行并行處理似乎是一件困難的事情。所以這一篇文章我們可以看看JDK8是怎么實(shí)現(xiàn)并行處理的讯私。
二热押、將順序流轉(zhuǎn)換為并行流
有一個(gè)需求是,求從1開始到n的所有數(shù)字的和斤寇。
JDK7的做法是:
/**
* JDK7的做法
*/
@Test
public void test01(){
long l = iterativeSum(100);
System.out.println(l);
}
public long iterativeSum(long n){
long res = 0;
for(long i = 0; i <= n; i++){
res += i;
}
return res;
}
JDK8有兩種方式桶癣,一種是順序流,另外一種就是并行流了娘锁。
順序流的方式:
/**
* JDK8的順序流做法
*/
@Test
public void test02(){
long l = sequentialSum(100);
System.out.println(l);
}
/** 順序流的方式 */
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
然后我們只要對(duì)以上的代碼加以修改就可以變成并行流的方式了:
/**
* JDK8的并行流做法
*/
@Test
public void test03(){
long l = parallelSum(100);
System.out.println(l);
}
/** 并行流的方式 */
public long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()// 只需要對(duì)一個(gè)順序流調(diào)用此方法牙寞,就可以實(shí)現(xiàn)順序流到并行流的轉(zhuǎn)換
.reduce(0L, Long::sum);
}
注意
這里有個(gè)需要注意的地方就是,可能有人會(huì)以為我們可以通過調(diào)用不同的轉(zhuǎn)換(parallel轉(zhuǎn)換為并行莫秆,sequential轉(zhuǎn)換為順序流)來控制到每一步操作是否通過順序流還是并行流间雀,但是其實(shí)并不是這樣的,最后一次調(diào)用會(huì)影響到整個(gè)流的操作镊屎,例如:
Stream.parallel().filter(...). sequential().map(...).parallel().reduce(..)
這個(gè)語句其實(shí)整個(gè)操作都是并行操作的惹挟,因?yàn)樵谶@段代碼中,最后一次調(diào)用的方法是parallel
使用的核心數(shù)
并行流使用的核心數(shù)是默認(rèn)的處理器的數(shù)量杯道,如果需要更改匪煌,我們可以通過java.util.concurrent.ForkJoinPool.common.paramllelism來修改默認(rèn)的核心數(shù)量责蝠。比如通過調(diào)用的System.setProperty來更改党巾。但是默認(rèn)核心數(shù)量是一個(gè)很好的設(shè)置萎庭,如果沒有特別的需求的話并不建議修改。
/**
* 查看電腦核心數(shù)量
*/
@Test
public void test04(){
System.out.println(Runtime.getRuntime().availableProcessors());
}
正確使用并行
在使用并行的時(shí)候我們應(yīng)該盡量去避免對(duì)一個(gè)值進(jìn)行修改齿拂,因?yàn)樵诙嗑€程情況下驳规,這個(gè)值的修改會(huì)被競(jìng)爭(zhēng),這樣就會(huì)出現(xiàn)計(jì)算結(jié)果錯(cuò)誤的后果署海,但是當(dāng)我們?nèi)ソo這個(gè)值加上鎖的時(shí)候吗购,這個(gè)操作又失去了并行的意義。
/**
* 并行流錯(cuò)誤使用示范
*/
@Test
public void test05() {
/** 順序流的方式 */
Accmulator accmulator = new Accmulator();
LongStream.rangeClosed(1, 100).forEach(accmulator::add);
System.out.println(accmulator.total);// 5050
/** 并行流的方式 */
for (int i = 0; i < 10; i++) {
accmulator = new Accmulator();
LongStream.rangeClosed(1, 100).parallel().forEach(accmulator::add);
System.out.println(accmulator.total);
}
/*
4004
4872
4952
5050
4874
4719
4935
4798
5050
4392
*/
}
/**
* 創(chuàng)建一個(gè)類用于修改這個(gè)類里的變量
*/
class Accmulator {
long total = 0;
public void add(long v) {
total += v;
}
}
當(dāng)輸出來的值只有一次是正確的時(shí)候砸狞,性能似乎就變得不太那么重要了捻勉。
三、使用分支/合并框架進(jìn)行求和
JDK8并行流的使用我們需要考慮一定的因素刀森,并不是所有的操作都是用并行流就是好的踱启。如果數(shù)據(jù)量較小,使用并行流并不比使用順序流的速度要快研底,因?yàn)閯?chuàng)建線程是需要耗費(fèi)資源的埠偿。
接下來就來體驗(yàn)一下JDK7的ForkJoin框架。(說真的榜晦,我寫代碼這么久以來還沒用過這個(gè)框架...)
如果使用過線程池的同學(xué)就知道冠蒋,ExecutorService是用來配置系統(tǒng)中線程池信息的,而ForkJoin就是對(duì)該接口的一個(gè)實(shí)現(xiàn)乾胶,稱為ForkJoinPool抖剿,顧名思義就是一個(gè)分割線程的池。而這些任務(wù)就在這個(gè)池里面獲取線程資源進(jìn)行執(zhí)行识窿。
需求:定義一個(gè)函數(shù)牙躺,對(duì)Long類型的前n個(gè)數(shù)進(jìn)行求和。
package cn.liweidan.forkandjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
/**
* <p>Desciption:并行求和Long數(shù)組</p>
* CreateTime : 2017/7/17 上午11:25
* Author : Weidan
* Version : V1.0
*/
public class ForkJoinCaculator extends RecursiveTask<Long> {// 需要繼承RecursiveTask腕扶,傳遞并行處理后返回值的類型孽拷;如果執(zhí)行的沒有返回值,則繼承RecursiveAction
/** 需要進(jìn)行計(jì)算的long數(shù)組 */
private final long[] numbers;
/** 子任務(wù)處理數(shù)組的其實(shí)和終止位置 */
private final int start;
private final int end;
/** 進(jìn)行數(shù)組分割的閾值 */
public static final long THRESHOLD = 10_000;
public ForkJoinCaculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinCaculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
/**
* JDK會(huì)通過線程池中的一個(gè)線程調(diào)用該方法半抱,這里需要我們確定一個(gè)條件判斷脓恕,判斷這個(gè)任務(wù)是否需要繼續(xù)進(jìn)行切割
* 在這里我們運(yùn)行的時(shí)候,定義一個(gè)閾值窿侈,如果數(shù)組小于這個(gè)閾值炼幔,則直接進(jìn)行計(jì)算,如果大于這個(gè)閾值史简,則對(duì)這個(gè)數(shù)組進(jìn)行分割
* @return
*/
@Override
protected Long compute() {
/** 計(jì)算長(zhǎng)度 */
int length = end - start;
/** 如果長(zhǎng)度小于閾值則直接進(jìn)行計(jì)算 */
if(length <= THRESHOLD){
return computeSequentially();
}
/** 創(chuàng)建一個(gè)從開始到開始+長(zhǎng)度的一半的數(shù)組計(jì)算線程 */
ForkJoinCaculator leftTask =
new ForkJoinCaculator(numbers, start, start + length / 2);
/** 加入線程池 */
leftTask.fork();
/** 創(chuàng)建一個(gè)任務(wù)為后面剩余部分求和 */
ForkJoinCaculator rightTask =
new ForkJoinCaculator(numbers, start + length / 2, end);
/** 同步執(zhí)行 */
Long rightRes = rightTask.compute();
/** 讀取剛剛創(chuàng)建的第一個(gè)任務(wù)的結(jié)果乃秀,join方法會(huì)等待該線程返回結(jié)果,未完成線程會(huì)阻塞 */
Long leftRes = leftTask.join();
/** 返回左右兩個(gè)任務(wù)計(jì)算出來的結(jié)果 */
return leftRes + rightRes;
}
/**
* 用于順序計(jì)算
* @return
*/
private Long computeSequentially() {
long sum = 0;
for (long number : numbers) {
sum += number;
}
return sum;
}
public static long forkjoinSum(long n) {
long numbers[] = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinCaculator(numbers);
return new ForkJoinPool().invoke(task);
}
}
注意代碼中的join方法是會(huì)進(jìn)行線程阻塞的,在那里需要等待每個(gè)任務(wù)的返回結(jié)果再進(jìn)行下去跺讯。
接下來就是運(yùn)行了枢贿。
public class ForkJoinCaculatorTest {
@Test
public void forkjoinSum() throws Exception {
long startTime = System.nanoTime();
long l = ForkJoinCaculator.forkjoinSum(10_000_000);
long endTime = System.nanoTime();
System.out.println("共耗時(shí):" + ((endTime -startTime)/1000000) + "毫秒, 結(jié)果是:" + l);
}
}
運(yùn)行完成以后,輸出共耗時(shí):3400毫秒, 結(jié)果是:51200005120000000
運(yùn)行以上程序的時(shí)候刀脏,Java會(huì)把長(zhǎng)度為10_000_000的long數(shù)組傳遞給了ForkJoinCaculator
局荚,期間通過compute
進(jìn)行分割,將數(shù)組分割到足夠小的長(zhǎng)度的時(shí)候進(jìn)行計(jì)算愈污,最后耀态,程序把每個(gè)線程計(jì)算出來的結(jié)果進(jìn)行合并,得出以上的結(jié)果暂雹。