java concurrent 之 ForkJoinPool
ForkJoinPool在Java 7中被引入徙硅。ForkJoinPool類(lèi)似于Java ExecutorService狡逢,但有一個(gè)區(qū)別傲隶。 ForkJoinPool可以輕松將任務(wù)分解成較小的任務(wù)遂赠,然后將其提交給ForkJoinPool艾少。 任務(wù)可以將其工作分成較小的子任務(wù)疚漆,只要它能夠分解任務(wù)即可。 它可能聽(tīng)起來(lái)有點(diǎn)抽象奈附,所以在這個(gè)fork和join教程中,我將解釋ForkJoinPool如何工作煮剧,以及分裂任務(wù)如何工作斥滤。
解釋Fork和Join
在我們看看ForkJoinPool之前将鸵,我想解釋一下fork和join的原理。
Fork和Join原則由遞歸執(zhí)行的兩個(gè)步驟組成佑颇。 這兩個(gè)步驟是fork步驟和join步驟顶掉。
Fork
使用fork和join原理的任務(wù)可以將(自己)分割成更小的子任務(wù)
通過(guò)將其自身分解為子任務(wù),每個(gè)子任務(wù)可以由不同的CPU或同一CPU上的不同線程并行執(zhí)行挑胸。
如果任務(wù)給出的工作足夠大痒筒,任務(wù)只會(huì)分解成子任務(wù),這樣才有意義茬贵。 將任務(wù)分解為子任務(wù)有一個(gè)開(kāi)銷(xiāo)簿透,因此對(duì)于少量工作,此開(kāi)銷(xiāo)可能大于通過(guò)并發(fā)執(zhí)行子任務(wù)而實(shí)現(xiàn)的加速解藻。
將任務(wù)分解為子任務(wù)的時(shí)間限制也稱(chēng)為閾值老充。 每個(gè)任務(wù)都由決定一個(gè)明智的門(mén)檻決定。 這在很大程度上取決于正在做的工作螟左。
其實(shí)閾值的問(wèn)題就是在遞歸算法中的退出條件相似
Join
當(dāng)一個(gè)任務(wù)已經(jīng)分裂成子任務(wù)時(shí)啡浊,任務(wù)等待直到子任務(wù)完成執(zhí)行。
子任務(wù)完成執(zhí)行后胶背,任務(wù)可以將所有結(jié)果加入(合并)為一個(gè)結(jié)果巷嚣。 如下圖所示:
ForkJoinPool
ForkJoinPool是一個(gè)特殊的線程池,旨在使用fork-and-join任務(wù)拆分工作钳吟。 ForkJoinPool位于java.util.concurrent包中涂籽,因此完整的類(lèi)名稱(chēng)為java.util.concurrent.ForkJoinPool。
創(chuàng)建ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
ForkJoinTask任務(wù)分為兩類(lèi)
RecursiveAction 用于沒(méi)有返回結(jié)果的任務(wù)砸抛。
RecursiveTask 用于有返回結(jié)果的任務(wù)
ForkJoinTask需要通過(guò)ForkJoinPool來(lái)執(zhí)行评雌,任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部直焙。當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒(méi)有任務(wù)時(shí)景东,它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)。
使用Fork/Join框架 Demo
package com.viashare.forkjoin;
import java.util.concurrent.*;
/**
* Created by Jeffy on 16/01/12.
*/
public class ForkJoinMain {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> future = forkJoinPool.submit(new CountTask(1, 5));
System.err.println(future.get());
}
static class CountTask extends RecursiveTask<Integer> {
private static final int Threshold = 3;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
int temp = end - start;
System.err.println(temp);
if (temp <= Threshold) {
for (int i = start; i <=end; i++) {
sum += i;
}
} else {
int millde = (end+start)/Threshold;
CountTask task1 = new CountTask(start, millde);
CountTask task2 = new CountTask(millde+1, end);
task1.fork();
task2.fork();
int sum1 = task1.join();
int sum2 = task2.join();
System.err.println("sum1 "+sum1);
System.err.println("sum2 "+sum2);
sum = sum1 + sum2;
}
return sum;
}
}
}
Demo2
package com.viashare.forkjoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* Created by Jeffy on 16/01/12.
*/
public class ForkJoinmain2 {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);
}
static class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask(long workLoad) {
this.workLoad = workLoad;
}
protected Long compute() {
//if work is above threshold, break tasks up into smaller tasks
if (this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
subtasks.addAll(createSubtasks());
for (MyRecursiveTask subtask : subtasks) {
subtask.fork();
}
long result = 0;
for (MyRecursiveTask subtask : subtasks) {
result += subtask.join();
}
return result;
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
return workLoad * 3;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
}
Fork/Join框架的異常處理
ForkJoinTask在執(zhí)行的時(shí)候可能會(huì)拋出異常奔誓,但是我們沒(méi)辦法在主線程里直接捕獲異常斤吐,所以ForkJoinTask提供了isCompletedAbnormally()方法來(lái)檢查任務(wù)是否已經(jīng)拋出異常或已經(jīng)被取消了厨喂,并且可以通過(guò)ForkJoinTask的getException方法獲取異常和措。使用如下代碼:
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable對(duì)象,如果任務(wù)被取消了則返回CancellationException蜕煌。如果任務(wù)沒(méi)有完成或者沒(méi)有拋出異常則返回null派阱。