參考資料:《實(shí)戰(zhàn)Java高并發(fā)程序設(shè)計(jì)》
1. 簡(jiǎn)介
分而治之是一個(gè)非常有效的處理大量數(shù)據(jù)的方法。
Fork一詞原義是吃飯用的叉子,也有分叉的意思橘茉。Linux中使用fork()函數(shù)來(lái)創(chuàng)建子進(jìn)程,從而使系統(tǒng)進(jìn)程可以多一個(gè)執(zhí)行分支。Java中也沿用了類(lèi)似的命名映跟。
Join的含義和Thread.join()類(lèi)似,表示等待扬虚。也就是使用fork()后系統(tǒng)多了一個(gè)執(zhí)行分支(線(xiàn)程)努隙,所以需要等待這個(gè)線(xiàn)程執(zhí)行完畢,才有可能得到最終結(jié)果辜昵。
在實(shí)際使用中荸镊,如果毫無(wú)顧忌地使用fork()開(kāi)啟線(xiàn)程,那么很有可能導(dǎo)致系統(tǒng)因開(kāi)啟過(guò)多線(xiàn)程而嚴(yán)重影響性能。所以JDK給出了一個(gè)ForkJoinPool線(xiàn)程池躬存,對(duì)于fork方法并不急著開(kāi)啟線(xiàn)程张惹,而是提交給ForkJoinPool線(xiàn)程池處理,以節(jié)省系統(tǒng)資源岭洲。
-
使用Fork&Join進(jìn)行進(jìn)行數(shù)據(jù)處理的總體結(jié)構(gòu)如下:
-
由于線(xiàn)程池的優(yōu)化宛逗,提交的任務(wù)和線(xiàn)程數(shù)量并不是一對(duì)一的關(guān)系。在絕大多數(shù)情況下盾剩,一個(gè)物理線(xiàn)程實(shí)際上是需要處理多個(gè)邏輯任務(wù)的雷激。因此,每個(gè)線(xiàn)程必然需要擁有一個(gè)任務(wù)隊(duì)列告私。所以屎暇,可能遇到這樣一種情況:線(xiàn)程A已經(jīng)把自己的任務(wù)都執(zhí)行完成了,而線(xiàn)程B還有一堆任務(wù)等著處理德挣。此時(shí)恭垦,線(xiàn)程A就會(huì)“幫助”線(xiàn)程B,從線(xiàn)程B的任務(wù)隊(duì)列中拿一個(gè)任務(wù)過(guò)來(lái)處理格嗅,盡可能地達(dá)到平衡番挺。示意圖:
值得注意的是:當(dāng)一個(gè)線(xiàn)程試圖幫助另一個(gè)線(xiàn)程時(shí),總是從任務(wù)隊(duì)列的尾部拿數(shù)據(jù)屯掖,而線(xiàn)程試圖執(zhí)行自己的任務(wù)時(shí)玄柏,則是從頭部開(kāi)始拿。這是為了避免數(shù)據(jù)競(jìng)爭(zhēng)贴铜。
2.ForkJoinPool 和 ForkJoinTask
- 先來(lái)看一下ForkJoinPool的一個(gè)重要的接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
- 我們可以向ForkJoinPool線(xiàn)程池提交一個(gè)ForkJoinTask任務(wù)粪摘。
- 所謂ForkJoinTask任務(wù)就是支持fork()分解和join()等待的任務(wù)。
- ForkJoinTask有兩個(gè)重要的子類(lèi):
- RecursiveAction:沒(méi)有返回值的任務(wù)
- RecursiveTask:攜帶返回值的任務(wù)
- 下面通過(guò)一個(gè)計(jì)算數(shù)列求和的demo绍坝,來(lái)展示Fork&Join的使用:
public class Test {
public static class CountTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if (canCompute) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
// 分成100個(gè)小任務(wù)
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end) {
lastOne = end;
}
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
subTasks.add(subTask);
subTask.fork();
}
for (CountTask t : subTasks) {
sum += t.join();
}
}
return sum;
}
}
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task = forkJoinPool.submit(new CountTask(0, 200000L));
try {
long result = task.get();
System.out.println("sum=" + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 輸出:
// sum=20000100000
- 上述代碼在執(zhí)行g(shù)et()方法時(shí)徘意,如果任務(wù)沒(méi)有結(jié)束,那么主線(xiàn)程就會(huì)在get()方法上等待轩褐。
- 使用ForkJoin時(shí)要注意椎咧,如果任務(wù)的劃分層次很深,一直得不到返回把介,那么可能出現(xiàn)兩種情況:
- 系統(tǒng)內(nèi)的線(xiàn)程數(shù)量越積越多勤讽,導(dǎo)致性能?chē)?yán)重下降。
- 函數(shù)的調(diào)用層次變得很深拗踢,最終導(dǎo)致棧溢出脚牍。
- 此外,F(xiàn)orkJoin線(xiàn)程池使用一個(gè)無(wú)鎖的棧來(lái)管理空閑線(xiàn)程巢墅。如果一個(gè)工作線(xiàn)程暫時(shí)取不到可用的任務(wù)诸狭,則可能會(huì)被掛起券膀,掛起的線(xiàn)程將會(huì)被壓入由線(xiàn)程池維護(hù)的棧中。待將來(lái)有任務(wù)可用時(shí)作谚,再?gòu)?strong>棧中喚醒這些線(xiàn)程三娩。
end