1近上、什么是ForkJoinPool
雖然目前處理器核心數(shù)已經(jīng)發(fā)展到很大數(shù)目,但是按任務(wù)并發(fā)處理并不能完全充分的利用處理器資源,因?yàn)橐话愕膽?yīng)用程序沒(méi)有那么多的并發(fā)處理任務(wù)谁鳍。基于這種現(xiàn)狀,考慮把一個(gè)任務(wù)拆分成多個(gè)單元倘潜,每個(gè)單元分別得到執(zhí)行绷柒,最后合并每個(gè)單元的結(jié)果。 有些線(xiàn)程一直處于忙碌狀態(tài)涮因,而有些線(xiàn)程卻一直空閑狀態(tài)废睦,這么可以充分的利用處理器資源
Fork/Join框架是JAVA7提供的一個(gè)用于并行執(zhí)行任務(wù)的框架,是一個(gè)把大任務(wù)分割成若干小任務(wù)养泡,最終匯總每個(gè)小任務(wù)結(jié)果得到大任務(wù)結(jié)果的框架嗜湃。
ForkJoinPool主要采用分治算法 和 工作竊取算法
2、分治法
分治法的基本思想是將一個(gè)規(guī)模為N的問(wèn)題分解為K個(gè)規(guī)模較小的子問(wèn)題澜掩,這些子問(wèn)題的相互獨(dú)立且與原問(wèn)題的性質(zhì)相同购披,求出子問(wèn)題的解之后,將這些解合并肩榕,就可以得到原有問(wèn)題的解刚陡。是一種分目標(biāo)完成的程序算法。
3株汉、工作竊取算法
一個(gè)大任務(wù)拆分成多個(gè)小任務(wù)筐乳,為了減少線(xiàn)程間的競(jìng)爭(zhēng),把這些子任務(wù)分別放到不同的隊(duì)列中乔妈,并且每個(gè)隊(duì)列都有單獨(dú)的線(xiàn)程來(lái)執(zhí)行隊(duì)列里的任務(wù)蝙云,線(xiàn)程和隊(duì)列一一對(duì)應(yīng)。
但是會(huì)出現(xiàn)這樣一種情況:A線(xiàn)程處理完了自己隊(duì)列的任務(wù)路召,B線(xiàn)程的隊(duì)列里還有很多任務(wù)要處理勃刨。
A是一個(gè)很熱情的線(xiàn)程,想過(guò)去幫忙股淡,但是如果兩個(gè)線(xiàn)程訪(fǎng)問(wèn)同一個(gè)隊(duì)列身隐,會(huì)產(chǎn)生競(jìng)爭(zhēng),所以A想了一個(gè)辦法揣非,從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行抡医。而B(niǎo)線(xiàn)程永遠(yuǎn)是從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行。
注意:線(xiàn)程池中的每個(gè)線(xiàn)程都有自己的工作隊(duì)列(PS早敬,這一點(diǎn)和ThreadPoolExecutor不同忌傻,ThreadPoolExecutor是所有線(xiàn)程公用一個(gè)工作隊(duì)列搞监,所有線(xiàn)程都從這個(gè)工作隊(duì)列中取任務(wù))琐驴,當(dāng)自己隊(duì)列中的任務(wù)都完成以后秤标,會(huì)從其它線(xiàn)程的工作隊(duì)列中偷一個(gè)任務(wù)執(zhí)行苍姜,這樣可以充分利用資源衙猪。(特別注意)
工作竊取算法的優(yōu)點(diǎn):
利用了線(xiàn)程進(jìn)行并行計(jì)算布近,減少了線(xiàn)程間的競(jìng)爭(zhēng)撑瞧。
工作竊取算法的缺點(diǎn):
1预伺、如果雙端隊(duì)列中只有一個(gè)任務(wù)時(shí),線(xiàn)程間會(huì)存在競(jìng)爭(zhēng)算谈。(只有一個(gè)任務(wù)的時(shí)候需要特殊處理)
2料滥、竊取算法消耗了更多的系統(tǒng)資源葵腹,如會(huì)創(chuàng)建多個(gè)線(xiàn)程和多個(gè)雙端隊(duì)列屿岂。
4、主要類(lèi)
1爷怀、ForkJoinTask:
使用該框架运授,需要?jiǎng)?chuàng)建一個(gè)ForkJoin任務(wù),它提供在任務(wù)中執(zhí)行fork和join操作的機(jī)制吁朦。一般情況下逗宜,我們并不需要直接繼承ForkJoinTask類(lèi),只需要繼承它的子類(lèi)囤屹,它的子類(lèi)有兩個(gè):
RecursiveAction:用于沒(méi)有返回結(jié)果的任務(wù)牺丙。
RecursiveTask:用于有返回結(jié)果的任務(wù)冲簿。
2亿昏、ForkJoinPool:
任務(wù)ForkJoinTask需要通過(guò)ForkJoinPool來(lái)執(zhí)行角钩。
5、例子
計(jì)算1到40的累加和惨险,當(dāng)間隔大于limit脊髓,則繼續(xù)遞歸
package com.concurrency2;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MyTest13 {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
MyTask myTask = new MyTask(1, 40);
int result = forkJoinPool.invoke(myTask);
System.out.println(result);
forkJoinPool.shutdown();
}
}
class MyTask extends RecursiveTask<Integer> {
private int limit = 4;
private int firstIndex;
private int lastIndex;
public MyTask(int firstIndex, int lastIndex) {
this.firstIndex = firstIndex;
this.lastIndex = lastIndex;
}
@Override
protected Integer compute() {
int result = 0;
int gap = lastIndex - firstIndex;
boolean flag = gap <= limit;
if (flag) {
System.out.println(Thread.currentThread().getName());
for(int i = firstIndex;i <= lastIndex;i ++)
{
result += i;
}
} else {
int middleIndex = (firstIndex + lastIndex) / 2;
MyTask leftTask = new MyTask(firstIndex, middleIndex);
MyTask rightTask = new MyTask(middleIndex + 1, lastIndex);
invokeAll(leftTask, rightTask);
int leftTaskResult = leftTask.join();
int rightTaskResult = rightTask.join();
result = leftTaskResult + rightTaskResult;
}
return result;
}
}
輸出
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
820