使用線程池框架可以大大提高資源利用率陨帆,做到任務(wù)的提交與執(zhí)行分離,每一個(gè)worker線程執(zhí)行完自己的任務(wù)就算結(jié)束了责静,這里面有一種情況,假如某一個(gè)線程執(zhí)行的特別慢齿税,因?yàn)樗鼉?nèi)部又不斷的生成新的task牵舱,所以任務(wù)比較大,在這種情況下就算其他線程池里面有空閑的線程也幫不上忙窖铡,在這類問題下TheadPoolEexcutor處理起來就比較乏力箍铲,所以在Java7之后,新增了一種并行計(jì)算框架ForkJoinPool今穿,當(dāng)然這個(gè)類也歸屬Java線程池里面的一個(gè)實(shí)現(xiàn),作為ForkJoinPool不會(huì)為每個(gè)子任務(wù)創(chuàng)建單獨(dú)的線程。相反毫缆,池中的每個(gè)線程都有自己的雙端隊(duì)列(或deque,發(fā)音甲板)撇贺,用于存儲(chǔ)任務(wù)。
TheadPoolEexcutor一個(gè)有力補(bǔ)充,其在解決分治問題場景下的效率比較好禁舷,因?yàn)檫@個(gè)框架采用了work-steal算法,可以充分利用線程池里面的cpu資源。
什么是ForkJoin
ForkJoin框架的核心思想分治思想,對于非常大的問題错沃,把它的切分成多個(gè)小問題醒叁,然后在多個(gè)小問題里面繼續(xù)切分成更小的部分篮奄,遞歸切分直到問題的粒度足夠小,可以被直接處理他匪。這種切分任務(wù)的操作我們就叫fork,然后對于這些小問題因?yàn)槭强梢圆⑿羞\(yùn)行在多核處理上夸研,所以我們需要聯(lián)合這些問題的結(jié)果從而得到最終的計(jì)算結(jié)果邦蜜,這種方式我們叫做join,這就是Fork/Join的名字含義亥至。
采用偽代碼來描述這一過程如下:
ForkJoin的API介紹
在Java的api里面ForkJoinPool線程池是ForkJoin框架的核心悼沈,這個(gè)線程池首先維護(hù)了一個(gè)總的任務(wù)隊(duì)列贱迟,每個(gè)線程會(huì)從總的任務(wù)隊(duì)列里面獲取任務(wù)執(zhí)行,得到自己的任務(wù)之后絮供,由該任務(wù)產(chǎn)生的子任務(wù)會(huì)放在自己線程維護(hù)的一個(gè)雙端隊(duì)列里面衣吠,然后從頭部獲取任務(wù)依次處理,直到完畢壤靶,此時(shí)如果別的線程處理完了自己隊(duì)列的任務(wù)缚俏,那么會(huì)幫著還沒有處理完任務(wù)的線程處理任務(wù),具體是從尾部讀取任務(wù)萍肆,依次處理袍榆,這樣就達(dá)到了work-steal的理念,從而有效的利用了cpu資源塘揣。
work stealing算法的優(yōu)點(diǎn):
利用了線程進(jìn)行并行計(jì)算包雀,減少了線程間的競爭。
work stealing算法的缺點(diǎn):
如果雙端隊(duì)列中只有一個(gè)任務(wù)時(shí)亲铡,線程間會(huì)存在競爭才写。
額外的開銷,例如雙端隊(duì)列
ForkJoin框架的api支持:
- ForkJoinPool: 管理ForkJoin框架里面線程的執(zhí)行
- ForkJoinTask:一個(gè)抽象的類定義了task在ForkJoinPool里面的運(yùn)行
- RecursiveAction: ForkJoinTask的子類運(yùn)行一個(gè)沒有返回值的任務(wù)
- RecursiveTask: ForkJoinTask的子類運(yùn)行一個(gè)有返回值的任務(wù)
其中ForkJoinTask類代表了抽象的切成的任務(wù)奖蔓,而不是實(shí)際的執(zhí)行的線程赞草,
這種機(jī)制允許一小部分?jǐn)?shù)量的線程去管理執(zhí)行一大部分任務(wù)。
ForkJoinTask的關(guān)鍵方法有:
final ForkJoinTask < V > fork ()
final V join ()
final V invoke ()
fork方法提交一個(gè)任務(wù)異步的去執(zhí)行吆鹤,這個(gè)方法返回this會(huì)持續(xù)的調(diào)用線程去運(yùn)行厨疙。
join方法,會(huì)等待直到任務(wù)結(jié)束并返回其結(jié)果疑务,對于沒有返回值的任務(wù)返回的值Void類型沾凄。
invoke方法,包裝了fork和join方法知允,我們只需要調(diào)用invoke方法撒蟀,它就會(huì)啟動(dòng)任務(wù)然后返回結(jié)果。
額外的這里面還有兩個(gè)靜態(tài)方法:
static void invokeAll ( ForkJoinTask <?> task1 , ForkJoinTask <?> task2 ): static void invokeAll ( ForkJoinTask <?>… taskList )
前者執(zhí)行兩個(gè)任務(wù)温鸽,后者執(zhí)行一個(gè)任務(wù)列表保屯。
ForkJoinTask還有一個(gè)compute方法,這個(gè)方法用來編寫主要的計(jì)算邏輯:
ForkJoinPool類主要有兩個(gè)提交任務(wù)的方法:
第一個(gè)是invoke會(huì)同步等待直到所有的任務(wù)返回并得到執(zhí)行結(jié)果涤垫。
第二個(gè)是異步的提交使用execute方法姑尺,對于結(jié)果的獲取需要調(diào)用任務(wù)的get方法來阻塞獲取◎疴或者使用get超時(shí)方法來輪詢獲取結(jié)果股缸。
最后值得一提的是,對于ForkJoinPool方法吱雏,我們不需要顯式的調(diào)用shutdown來關(guān)閉敦姻,其使用的是守護(hù)線程瘾境,只要所有的線程運(yùn)行完畢,線程池會(huì)自動(dòng)退出镰惦。
ForkJoinPool組成類:
ForkJoinPool
充當(dāng)fork/join框架里面的管理者迷守,最原始的任務(wù)都要交給它才能處理。它負(fù)責(zé)控制整個(gè)fork/join有多少個(gè)workerThread旺入,workerThread的創(chuàng)建兑凿,激活都是由它來掌控。它還負(fù)責(zé)workQueue隊(duì)列的創(chuàng)建和分配茵瘾,每當(dāng)創(chuàng)建一個(gè)workerThread礼华,它負(fù)責(zé)分配相應(yīng)的workQueue。然后它把接到的活都交給workerThread去處理拗秘,它可以說是整個(gè)frok/join的容器圣絮。
ForkJoinWorkerThread
fork/join里面真正干活的"工人",本質(zhì)是一個(gè)線程雕旨。里面有一個(gè)ForkJoinPool.WorkQueue的隊(duì)列存放著它要干的活扮匠,接活之前它要向ForkJoinPool注冊(registerWorker),拿到相應(yīng)的workQueue凡涩。然后就從workQueue里面拿任務(wù)出來處理棒搜。它是依附于ForkJoinPool而存活,如果ForkJoinPool的銷毀了,它也會(huì)跟著結(jié)束活箕。
ForkJoinPool.WorkQueue
雙端隊(duì)列就是它力麸,它負(fù)責(zé)存儲(chǔ)接收的任務(wù)。
這是一個(gè)雙端隊(duì)列(Deque)育韩,里面存放的對象是任務(wù)(ForkJoinTask)克蚂。
每個(gè)工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),會(huì)放入工作隊(duì)列的隊(duì)尾座慰,并且工作線程在處理自己的工作隊(duì)列時(shí),使用的是 LIFO 方式翠拣,也就是說每次從隊(duì)尾取出任務(wù)來執(zhí)行版仔。
每個(gè)工作線程在處理自己的工作隊(duì)列同時(shí),會(huì)嘗試竊取一個(gè)任務(wù)(或是來自于剛剛提交到 pool的任務(wù)误墓,或是來自于其他工作線程的工作隊(duì)列)蛮粮,竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首,也就是說工作線程在竊取其他工作線程的任務(wù)時(shí)谜慌,使用的是 FIFO 方式然想。
Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK = 0xffff << 16; // top half of int
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1 << 16;
static final int SHARED_QUEUE = 1 << 31; // must be negative
控制是FIFO還是LIFO
/**
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
return (config & FIFO_QUEUE) == 0 ? pop() : poll();
}
- ForkJoinTask:代表fork/join里面任務(wù)類型,我們一般用它的兩個(gè)子類RecursiveTask欣范、RecursiveAction变泄。這兩個(gè)區(qū)別在于RecursiveTask任務(wù)是有返回值令哟,RecursiveAction沒有返回值。任務(wù)的處理邏輯包括任務(wù)的切分都集中在compute()方法里面妨蛹。
配置參數(shù)
通過代碼指定屏富,必須得在commonPool初始化之前(parallel的stream被調(diào)用之前,一般可在系統(tǒng)啟動(dòng)后設(shè)置)注入進(jìn)去蛙卤,否則無法生效狠半。
通過啟動(dòng)參數(shù)指定無此限制,較為安全
- parallelism(即配置線程池個(gè)數(shù))
可以通過java.util.concurrent.ForkJoinPool.common.parallelism進(jìn)行配置颤难,最大值不能超過MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767
如果沒有指定神年,則默認(rèn)為Runtime.getRuntime().availableProcessors() - 1.
代碼指定(必須得在commonPool初始化之前注入進(jìn)去,否則無法生效)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
或者參數(shù)指定
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
Fork Join注意的點(diǎn)
protected Long compute() {
if (任務(wù)足夠小?) {
return computeDirect();
}
// 任務(wù)太大,一分為二:
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// 分別對子任務(wù)調(diào)用fork():
subtask1.fork();
subtask2.fork();
// 合并結(jié)果:
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
return subresult1 + subresult2;
}
很遺憾行嗤,這種寫法是錯(cuò)已日!誤!的昂验!這樣寫沒有正確理解Fork/Join模型的任務(wù)執(zhí)行邏輯捂敌。
JDK用來執(zhí)行Fork/Join任務(wù)的工作線程池大小等于CPU核心數(shù)。在一個(gè)4核CPU上既琴,最多可以同時(shí)執(zhí)行4個(gè)子任務(wù)占婉。對400個(gè)元素的數(shù)組求和,執(zhí)行時(shí)間應(yīng)該為1秒甫恩。但是逆济,換成上面的代碼,執(zhí)行時(shí)間卻是兩秒磺箕。
這是因?yàn)閳?zhí)行compute()方法的線程本身也是一個(gè)Worker線程奖慌,當(dāng)對兩個(gè)子任務(wù)調(diào)用fork()時(shí),這個(gè)Worker線程就會(huì)把任務(wù)分配給另外兩個(gè)Worker松靡,但是它自己卻停下來等待不干活了简僧!這樣就白白浪費(fèi)了Fork/Join線程池中的一個(gè)Worker線程,導(dǎo)致了4個(gè)子任務(wù)至少需要7個(gè)線程才能并發(fā)執(zhí)行雕欺。
正確的寫法:
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
其實(shí)岛马,我們查看JDK的invokeAll()方法的源碼就可以發(fā)現(xiàn),invokeAll的N個(gè)任務(wù)中屠列,其中N-1個(gè)任務(wù)會(huì)使用fork()交給其它線程執(zhí)行啦逆,但是,它還會(huì)留一個(gè)任務(wù)自己執(zhí)行笛洛,這樣夏志,就充分利用了線程池,保證沒有空閑的不干活的線程苛让。
https://www.liaoxuefeng.com/article/1146802219354112
例子
下面的代碼是測試forkjoin和for循環(huán)的耗時(shí)沟蔑。
測試結(jié)果:首次forkjoin的耗時(shí)會(huì)遠(yuǎn)遠(yuǎn)比for循環(huán)的耗時(shí)高湿诊,這是因?yàn)槭状涡枰コ跏蓟Y源,比如創(chuàng)建線程池等溉贿。所以耗時(shí)會(huì)遠(yuǎn)遠(yuǎn)比for循環(huán)高枫吧,但是第二次的耗時(shí)會(huì)比for循環(huán)少很多,這是因?yàn)樗枰馁Y源都已經(jīng)創(chuàng)建好了宇色,所以耗時(shí)會(huì)很低九杂。
結(jié)論:forkjoin首次運(yùn)行會(huì)需要?jiǎng)?chuàng)建線程池等資源所以會(huì)比實(shí)際運(yùn)行任務(wù)的時(shí)間會(huì)高一些,接下里在使用forkjoin運(yùn)行任務(wù)就會(huì)和運(yùn)行任務(wù)本身耗時(shí)差不多宣蠕。
public static void main(String[] args) {
//測試forkjoin耗時(shí)
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 8; i++) {
list.add(i);
}
long s = System.currentTimeMillis();
list.parallelStream().forEach(i -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long e = System.currentTimeMillis();
System.out.println("forkjoin:" + (e - s));
//測試for循環(huán)耗時(shí)
s = System.currentTimeMillis();
for (int i : list) {
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e = System.currentTimeMillis();
System.out.println("for:" + (e - s));
//再次測試forkjoin耗時(shí)
s = System.currentTimeMillis();
list.parallelStream().forEach(i -> {
try {
Thread.sleep(10);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
});
e = System.currentTimeMillis();
System.out.println("forkjoin:" + (e - s));
}
結(jié)果
forkjoin:81
for:69
forkjoin:12
上面的代碼有一個(gè)細(xì)節(jié)要注意的是例隆,由于我本機(jī)的機(jī)器是8核,通過以下代碼可以獲取forkjoin默認(rèn)啟動(dòng)線程池大小:
ForkJoinPool.getCommonPoolParallelism();
可以得出結(jié)果是7抢蚀,也就是可以同時(shí)運(yùn)行7個(gè)任務(wù)镀层。
但是我上面的循環(huán)次數(shù)確實(shí)8,也就是有8個(gè)任務(wù)同時(shí)運(yùn)行皿曲,按理來說任務(wù)耗時(shí)要大于20ms唱逢,可實(shí)際打印結(jié)果確實(shí)12ms。這是為什么呢屋休?原因是因?yàn)閙ain線程也會(huì)運(yùn)行任務(wù)坞古,因?yàn)樽铋_始分解任務(wù)的線程是main線程,main線程分解任務(wù)后自己也會(huì)運(yùn)行任務(wù)劫樟,這樣就不會(huì)白白浪費(fèi)一個(gè)線程痪枫,所以這也就是為什么 ForkJoinPool.getCommonPoolParallelism();得到可以同時(shí)并行運(yùn)行任務(wù)的是 = 本機(jī)CPU核數(shù)-1 的原因,這是因?yàn)閙ain線程已經(jīng)占用了一個(gè)線程運(yùn)行任務(wù)叠艳。
我把上面的循環(huán)次數(shù)改為9奶陈,然后再次運(yùn)行,得到結(jié)果:
forkjoin:116
for:98
forkjoin:23
可以看到第二次forkjoin耗費(fèi)了23s附较。因?yàn)槲业腃PU是8核吃粒,每次最多可以運(yùn)行8個(gè)任務(wù),這次有9個(gè)任務(wù)拒课,所以只能等其他8個(gè)任務(wù)運(yùn)行完了之后徐勃,第9個(gè)任務(wù)在運(yùn)行,所以耗時(shí)會(huì)是23ms捕发。
參考
https://mp.weixin.qq.com/s/yvmWS4cCwTPzyEB3AxK5MQ