這就是Fork/Join任務(wù)的原理:判斷一個(gè)任務(wù)是否足夠小滤否,如果是,直接計(jì)算钳枕,否則缴渊,就分拆成幾個(gè)小任務(wù)分別計(jì)算。這個(gè)過程可以反復(fù)“裂變”成一系列小任務(wù)鱼炒。
我們來看如何使用Fork/Join對(duì)大數(shù)據(jù)進(jìn)行并行求:
public class SumTask extends RecursiveTask<Integer> {
static final int THRESHOLD = 5;
int start = 0;
int end = 0;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if ((end-start)<=THRESHOLD){
for (int i =start;i<=end;i++){
sum=sum+i;
}
return sum;
}
int mid = (end+start)/2;
SumTask task1 = new SumTask(start,mid);
SumTask task2 = new SumTask(mid+1,end);
invokeAll(task1,task2);
int r1 = task1.join();
int r2 = task2.join();
return r1+r2;
}
public static void main(String[] args) {
ForkJoinTask<Integer> task= new SumTask(0,10);
System.out.println(ForkJoinPool.commonPool().invoke(task));
System.out.println(task);
Long sum = LongStream.rangeClosed(0L, 3L).parallel().reduce(0, Long::sum);
reduce的第一個(gè)初始值如過按照并行流parallel計(jì)算的話衔沼,會(huì)根據(jù)計(jì)算機(jī)的核數(shù)每次都多加一個(gè)初始值
}