背書中(引用書上的話):Java7中提供了用于并行執(zhí)行任務的Fork/Join框架跛锌, 可以把任務分成若干個分任務产场,最終匯總每個分任務的結果得到總?cè)蝿盏慕Y果光戈。這篇我們來看看Fork/Join框架跃捣。
先舉個栗子
一個字符串數(shù)組泽论,需要把每個元素中的*字符的索引返回,并求和(自己編了個栗子,沒有撒實際意義)偷卧,用Fork/Join框架來實現(xiàn)豺瘤,可以定義一個處理字符串數(shù)組的總?cè)蝿眨缓蟀芽側(cè)蝿詹鸱痔睿褦?shù)組中每個字符串交給子任務去處理坐求,然后等待子任務執(zhí)行完畢,匯總結果晌梨,并返回:
package thread.ForkJoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @Description: .
* @Author: ZhaoWeiNan .
* @CreatedTime: 2017/6/21 .
* @Version: 1.0 .
*/
public class StringTask extends RecursiveTask<Integer>{
//要處理的字符串
private String dest;
public StringTask(String dest) {
this.dest = dest;
}
//父類RecursiveTask是一個抽象類桥嗤,所以需要實現(xiàn)compute方法
@Override
protected Integer compute() {
if (dest == null || "".equals(dest))
return 0;
//判斷字符串中 * 的索引,并返回
return dest.indexOf("*");
}
}
class ArrayTask extends RecursiveTask<Integer>{
//需要處理的字符串數(shù)組
private String[] array;
public ArrayTask(String[] array) {
this.array = array;
}
@Override
protected Integer compute() {
if (array == null || array.length < 1)
return 0;
//申明一個StringTask變量仔蝌,作為子任務
StringTask stringTask;
//定義一個子任務隊列泛领,用于任務執(zhí)行完畢后,獲取子任務的執(zhí)行結果
List<StringTask> list = new ArrayList<>();
int sum = 0;
//把字符串數(shù)組的中每一個字符串分給多個StringTask子任務去處理
for (String s : array){
//創(chuàng)建一個變量敛惊,作為子任務去處理字符串
stringTask = new StringTask(s);
//執(zhí)行子任務
stringTask.fork();
//加入子任務隊列
list.add(stringTask);
}
for (StringTask task : list){
//等子任務執(zhí)行完畢渊鞋,獲取子任務執(zhí)行的結果,并累加
sum += task.join();
}
return sum;
}
}
class Demo{
public static void main(String[] args){
//初始化字符串數(shù)組
String[] array = new String[]{"#####*####","##*########","###*#######","#*############"};
//創(chuàng)建一個總?cè)蝿涨萍罚幚碜址當?shù)組
ArrayTask arrayTask = new ArrayTask(array);
//創(chuàng)建執(zhí)行任務的線程池ForkJoinPool對象
ForkJoinPool forkJoinPool = new ForkJoinPool();
//執(zhí)行總?cè)蝿? ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(arrayTask);
//返回任務的結果
try {
System.out.println(forkJoinTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
代碼放到了開源中國:http://git.oschina.net/zhaoweinan/forkjoin锡宋,有興趣的小伙伴可以拿去
總的來說,F(xiàn)ork/Join框架就是一個用來并行執(zhí)行任務的框架特恬,可以把一個大任務执俩,分成若干個子任務,等各個子任務執(zhí)行完畢癌刽,可以把他們的執(zhí)行結果獲取到役首,并匯聚,起到了并行執(zhí)行任務作用显拜。
Fork/Join框架的構成
1.ForkJoinPool
ForkJoinPool繼承了AbstractExecutorService抽象類衡奥,AbstractExecutorService實現(xiàn)了ExecutorService接口,由此看來ForkJoinPool也是線程池家族的一員讼油,
ForkJoinPool使用invoke矮台、execute乏屯、submit用來執(zhí)行任務。
2.ForkJoinTask
ForkJoinTask是Fork/Join框架使用的任務類瘦赫,實現(xiàn)了Future接口辰晕,我們一般使用它的兩個子類RecursiveTask和RecursiveAction,
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
RecursiveAction適用于沒有返回結果的任務确虱,
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
RecursiveTask適用于有返回值的任務含友。
Work-Stealing (工作竊取)
粗略說一下Work-Stealing,F(xiàn)orkJoinPool具有 Work-Stealing (工作竊染轿省)的能力辆童,什么意思呢?就拿文章開頭的栗子來說惠赫,把處理字符串數(shù)組的大任務把鉴,分成了若干個處理字符串的子任務,這些子任務線程執(zhí)行完畢后儿咱,不會閑著庭砍,回去執(zhí)行別的子任務,通俗的來說混埠,Work-Stealing (工作竊鹊「住)就是線程從其他隊列里面獲取任務來執(zhí)行。
Work-Stealing的優(yōu)點
充分利用了線程钳宪,提高了線程并行執(zhí)行任務的效率揭北,并減少了線程間競爭帶來的系統(tǒng)開銷。
Work-Stealing的缺點
存在競爭的情況使套,而且占用了更多的系統(tǒng)資源罐呼。
Fork/Join框架原理
ForkJoinPool分析
貼一張ForkJoinPool的類圖
注意箭頭所指的兩個屬性侦高,
ForkJoinTask<?>數(shù)組submissionQueue,存放程序加到ForkJoinPool的任務
private ForkJoinTask<?>[] submissionQueue;
ForkJoinWorkerThread類繼承了Thread厌杜,是一個線程類奉呛, ForkJoinWorkerThread[] workers就是一個線程數(shù)組,負責去執(zhí)行submissionQueue中的任務
ForkJoinWorkerThread[] workers;
.....
public class ForkJoinWorkerThread extends Thread
ForkJoinTask分析
fork方法
獲取當前ForkJoinWorkerThread線程夯尽,調(diào)用ForkJoinWorkerThread的pushTask方法執(zhí)行ForkJoinTask任務
public final ForkJoinTask<V> fork() {
//獲取當前ForkJoinWorkerThread線程瞧壮,調(diào)用ForkJoinWorkerThread的pushTask方法執(zhí)行任務
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}
再來看看ForkJoinWorkerThread的pushTask方法:
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
//調(diào)用線程池ForkJoinPool的signalWork方法
pool.signalWork();
else if (s == m)
growQueue();
}
}
ForkJoinWorkerThread的pushTask方法把任務ForkJoinTask加到了ForkJoinTask[]任務數(shù)組中,并調(diào)用了ForkJoinPool線程池的signalWork方法喚醒線程或者創(chuàng)建一個線程去執(zhí)行任務匙握,粗略的貼一下signalWork的關鍵代碼:
private void addWorker() {
Throwable ex = null;
ForkJoinWorkerThread t = null;
try {
t = factory.newThread(this);
} catch (Throwable e) {
ex = e;
}
if (t == null) { // null or exceptional factory return
long c; // adjust counts
do {} while (!UNSAFE.compareAndSwapLong
(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// Propagate exception if originating from an external caller
if (!tryTerminate(false) && ex != null &&
!(Thread.currentThread() instanceof ForkJoinWorkerThread))
UNSAFE.throwException(ex);
}
else
t.start();
}
最終調(diào)用到了這里咆槽,來執(zhí)行任務。
join方法
從文章開頭的栗子來看圈纺,join方法會阻塞當前線程秦忿,等待獲取任務執(zhí)行的結果
//百度了這四種狀態(tài)的含義
private static final int NORMAL = -1; //NORMAL已完成
private static final int CANCELLED = -2; //CANCELLED已取消
private static final int EXCEPTIONAL = -3; //EXCEPTIONAL出現(xiàn)異常
private static final int SIGNAL = 1; //SIGNAL信號
public final V join() {
//先調(diào)用doJoin方法判斷上面定義的四個狀態(tài)
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}
join方法先調(diào)用doJoin方法判斷任務的狀態(tài),看看doJoin方法蛾娶,
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
//獲取當前ForkJoinWorkerThread線程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
//如果狀態(tài)是小于0 也就是 -1灯谣,-2,-3 分別代表已完成蛔琅、已取消胎许、出現(xiàn)異常
//直接返回狀態(tài)
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
//如果狀態(tài)為1|SIGNAL|信號
//執(zhí)行任務
try {
completed = exec();
} catch (Throwable rex) {
//出現(xiàn)異常,把狀態(tài)改為-3|EXCEPTIONAL|出現(xiàn)異常,返回
return setExceptionalCompletion(rex);
}
if (completed)
//執(zhí)行成功辜窑,把狀態(tài)改為-1|NORMAL|已完成钩述,返回
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}
doJoin查看任務的狀態(tài),如果狀態(tài)是-1|NORMAL|已完成穆碎,-2|CANCELLED|已取消牙勘,-3|EXCEPTIONAL|出現(xiàn)異常,證明任務已經(jīng)執(zhí)行完畢惨远,返回狀態(tài)位谜悟,如果狀態(tài)是 1|SIGNAL|信號,則去執(zhí)行任務北秽,如果執(zhí)行成功返回-1|NORMAL|已完成葡幸,出現(xiàn)異常返回-3|EXCEPTIONAL|出現(xiàn)異常。
再來看看返回結果的reportResult方法和getRawResult方法:
private V reportResult() {
int s; Throwable ex;
//如果狀態(tài)為-2|CANCELLED|已取消贺氓,拋出一個CancellationException異常
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
//調(diào)用getRawResult方法返回結果
return getRawResult();
}
reportResult方法蔚叨,先會判斷狀態(tài),如果狀態(tài)為-2|CANCELLED|已取消辙培,則拋出一個CancellationException異常蔑水,否則調(diào)用getRawResult方法返回結果:
public abstract V getRawResult();
getRawResult方法在ForkJoinTask類是抽象方法,具體實現(xiàn)在他的兩子類中扬蕊。
RecursiveAction子類:
public final Void getRawResult() { return null; }
所以說RecursiveAction子類使用于沒有返回值的任務搀别。
RecursiveTask子類:
public final V getRawResult() {
return result;
}
RecursiveTask子類適用于有返回值的任務。
并行執(zhí)行任務的Fork/Join框架是說完了尾抑。
歡迎大家來交流歇父,指出文中一些說錯的地方,讓我加深認識再愈。
謝謝大家榜苫!