前提概述
Java 7開始引入了一種新的Fork/Join線程池夫啊,它可以執(zhí)行一種特殊的任務(wù):把一個大任務(wù)拆成多個小任務(wù)并行執(zhí)行刊苍。
我們舉個例子:如果要計(jì)算一個超大數(shù)組的和,最簡單的做法是用一個循環(huán)在一個線程內(nèi)完成:
算法原理介紹
相信大家此前或多或少有了解到ForkJoin框架,F(xiàn)orkJoin框架其實(shí)就是一個線程池ExecutorService的實(shí)現(xiàn),通過工作竊取(work-stealing)算法,獲取其他線程中未完成的任務(wù)來執(zhí)行移必。可以充分利用機(jī)器的多處理器優(yōu)勢毡鉴,利用空閑的線程去并行快速完成一個可拆分為小任務(wù)的大任務(wù)崔泵,類似于分治算法。
實(shí)現(xiàn)達(dá)成目標(biāo)
ForkJoin的目標(biāo)猪瞬,就是利用所有可用的處理能力來提高程序的響應(yīng)和性能憎瘸。本文將介紹ForkJoin框架,依次介紹基礎(chǔ)特性陈瘦、案例使用幌甘、源碼剖析和實(shí)現(xiàn)亮點(diǎn)。
java.util.concurrent.ForkJoinPool由Java大師Doug Lea主持編寫痊项,它可以將一個大的任務(wù)拆分成多個子任務(wù)進(jìn)行并行處理锅风,最后將子任務(wù)結(jié)果合并成最后的計(jì)算結(jié)果,并進(jìn)行輸出鞍泉。
基本使用
入門例子皱埠,用Fork/Join框架使用示例,在這個示例中我們計(jì)算了1-5000累加后的值:
public class TestForkAndJoinPlus {
private static final Integer MAX = 400;
static class WorkTask extends RecursiveTask<Integer> {
// 子任務(wù)開始計(jì)算的值
private Integer startValue;
// 子任務(wù)結(jié)束計(jì)算的值
private Integer endValue;
public WorkTask(Integer startValue , Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
// 如果小于最小分片閾值咖驮,則說明要進(jìn)行相關(guān)的數(shù)據(jù)操作
// 可以正式進(jìn)行累加計(jì)算了
if(endValue - startValue < MAX) {
System.out.println("開始計(jì)算的部分:startValue = " + startValue + ";endValue = " + endValue);
Integer totalValue = 0;
for(int index = this.startValue ; index <= this.endValue ; index++) {
totalValue += index;
}
return totalValue;
}
// 否則再進(jìn)行任務(wù)拆分边器,拆分成兩個任務(wù)
else {
// 因?yàn)椴捎枚址ㄑ凳啵鸱郑赃M(jìn)行1/2切分?jǐn)?shù)據(jù)量
WorkTask subTask1 = new WorkTask(startValue, (startValue + endValue) / 2);
subTask1.fork();//進(jìn)行拆分機(jī)制控制
WorkTask subTask2 = new WorkTask((startValue + endValue) / 2 + 1 , endValue);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {
// 這是Fork/Join框架的線程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> taskFuture = pool.submit(new MyForkJoinTask(1,1001));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}
}
對此我封裝了一個框架集合忘巧,基于JDK1.8+中的Fork/Join框架實(shí)現(xiàn)恒界,參考的Fork/Join框架主要源代碼也基于JDK1.8+。
WorkTaskCallable實(shí)現(xiàn)抽象模型層次操作轉(zhuǎn)換
@Accessors(chain = true)
public class WorkTaskCallable<T> extends RecursiveTask<T> {
/**
* 斷言操作控制
*/
@Getter
private Predicate<T> predicate;
/**
* 執(zhí)行參數(shù)化分割條件
*/
@Getter
private T splitParam;
/**
* 操作拆分方法操作機(jī)制
*/
@Getter
private Function<Object,Object[]> splitFunction;
/**
* 操作合并方法操作機(jī)制
*/
@Getter
private BiFunction<Object,Object,T> mergeFunction;
/**
* 操作處理機(jī)制
*/
@Setter
@Getter
private Function<T,T> processHandler;
/**
* 構(gòu)造器是否進(jìn)行分割操作
* @param predicate 判斷是否進(jìn)行下一步分割的條件關(guān)系
* @param splitParam 分割參數(shù)
* @param splitFunction 分割方法
* @param mergeFunction 合并數(shù)據(jù)操作
*/
public WorkTaskCallable(Predicate predicate,T splitParam,Function<Object,Object[]> splitFunction,BiFunction<Object,Object,T> mergeFunction,Function<T,T> processHandler){
this.predicate = predicate;
this.splitParam = splitParam;
this.splitFunction = splitFunction;
this.mergeFunction = mergeFunction;
this.processHandler = processHandler;
}
/**
* 實(shí)際執(zhí)行調(diào)用操作機(jī)制
* @return
*/
@Override
protected T compute() {
if(predicate.test(splitParam)){
Object[] result = splitFunction.apply(splitParam);
WorkTaskCallable workTaskCallable1 = new WorkTaskCallable(predicate,result[0],splitFunction,mergeFunction,processHandler);
workTaskCallable1.fork();
WorkTaskCallable workTaskCallable2 = new WorkTaskCallable(predicate,result[1],splitFunction,mergeFunction,processHandler);
workTaskCallable2.fork();
return mergeFunction.apply(workTaskCallable1.join(),workTaskCallable2.join());
}else{
return processHandler.apply(splitParam);
}
}
}
ArrayListWorkTaskCallable實(shí)現(xiàn)List集合層次操作轉(zhuǎn)換
/**
* @project-name:wiz-shrding-framework
* @package-name:com.wiz.sharding.framework.boot.common.thread.forkjoin
* @author:LiBo/Alex
* @create-date:2021-09-09 17:26
* @copyright:libo-alex4java
* @email:liboware@gmail.com
* @description:
*/
public class ArrayListWorkTaskCallable extends WorkTaskCallable<List>{
static Predicate<List> predicateFunction = param->param.size() > 3;
static Function<List,List[]> splitFunction = (param)-> {
if(predicateFunction.test(param)){
return new List[]{param.subList(0,param.size()/ 2),param.subList(param.size()/2,param.size())};
}else{
return new List[]{param.subList(0,param.size()+1),Lists.newArrayList()};
}
};
static BiFunction<List,List,List> mergeFunction = (param1,param2)->{
List datalist = Lists.newArrayList();
datalist.addAll(param2);
datalist.addAll(param1);
return datalist;
};
/**
* 構(gòu)造器是否進(jìn)行分割操作
* @param predicate 判斷是否進(jìn)行下一步分割的條件關(guān)系
* @param splitParam 分割參數(shù)
* @param splitFunction 分割方法
* @param mergeFunction 合并數(shù)據(jù)操作
*/
public ArrayListWorkTaskCallable(Predicate<List> predicate, List splitParam, Function splitFunction, BiFunction mergeFunction,
Function<List,List> processHandler) {
super(predicate, splitParam, splitFunction, mergeFunction,processHandler);
}
public ArrayListWorkTaskCallable(List splitParam, Function splitFunction, BiFunction mergeFunction,
Function<List,List> processHandler) {
super(predicateFunction, splitParam, splitFunction, mergeFunction,processHandler);
}
public ArrayListWorkTaskCallable(Predicate<List> predicate,List splitParam,Function<List,List> processHandler) {
this(predicate, splitParam, splitFunction, mergeFunction,processHandler);
}
public ArrayListWorkTaskCallable(List splitParam,Function<List,List> processHandler) {
this(predicateFunction, splitParam, splitFunction, mergeFunction,processHandler);
}
public static void main(String[] args){
List dataList = Lists.newArrayList(0,1,2,3,4,5,6,7,8,9);
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
ForkJoinTask<List> forkJoinResult = forkJoinPool.submit(new ArrayListWorkTaskCallable(dataList,param->Lists.newArrayList(param.size())));
try {
System.out.println(forkJoinResult.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
ForkJoin代碼分析
ForkJoinPool構(gòu)造函數(shù)
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
* java.lang.Runtime#availableProcessors}, using the {@linkplain
* #defaultForkJoinWorkerThreadFactory default thread factory},
* no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
* level, the {@linkplain
* #defaultForkJoinWorkerThreadFactory default thread factory},
* no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @param parallelism the parallelism level
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the given parameters.
*
* @param parallelism the parallelism level. For default value,
* use {@link java.lang.Runtime#availableProcessors}.
* @param factory the factory for creating new threads. For default value,
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
* tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
* For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
(asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
/**
* Creates a {@code ForkJoinPool} with the given parameters, without
* any security checks or parameter validation. Invoked directly by
* makeCommonPool.
*/
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.mode = (short)mode;
this.parallelism = (short)parallelism;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
parallelism:可并行級別砚嘴,F(xiàn)ork/Join框架將依據(jù)這個并行級別的設(shè)定仗处,決定框架內(nèi)并行執(zhí)行的線程數(shù)量。并行的每一個任務(wù)都會有一個線程進(jìn)行處理枣宫,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的線程數(shù)量婆誓。
factory:當(dāng)Fork/Join框架創(chuàng)建一個新的線程時,同樣會用到線程創(chuàng)建工廠也颤。只不過這個線程工廠不再需要實(shí)現(xiàn)ThreadFactory接口洋幻,而是需要實(shí)現(xiàn)ForkJoinWorkerThreadFactory接口。后者是一個函數(shù)式接口翅娶,只需要實(shí)現(xiàn)一個名叫newThread的方法文留。
在Fork/Join框架中有一個默認(rèn)的ForkJoinWorkerThreadFactory接口實(shí)現(xiàn):DefaultForkJoinWorkerThreadFactory。
handler:異常捕獲處理器竭沫。當(dāng)執(zhí)行的任務(wù)中出現(xiàn)異常燥翅,并從任務(wù)中被拋出時,就會被handler捕獲蜕提。
-
asyncMode:這個參數(shù)也非常重要森书,從字面意思來看是指的異步模式,它并不是說Fork/Join框架是采用同步模式還是采用異步模式工作谎势。Fork/Join框架中為每一個獨(dú)立工作的線程準(zhǔn)備了對應(yīng)的待執(zhí)行任務(wù)隊(duì)列凛膏,這個任務(wù)隊(duì)列是使用數(shù)組進(jìn)行組合的雙向隊(duì)列。即是說存在于隊(duì)列中的待執(zhí)行任務(wù)脏榆,即可以使用先進(jìn)先出的工作模式猖毫,也可以使用后進(jìn)先出的工作模式。
-
先進(jìn)先出
image -
后進(jìn)先出
image -
當(dāng)asyncMode設(shè)置為true的時候须喂,隊(duì)列采用先進(jìn)先出方式工作吁断;反之則是采用后進(jìn)先出的方式工作,該值默認(rèn)為false
- asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
-
需要注意點(diǎn)
ForkJoinPool 一個構(gòu)造函數(shù)只帶有parallelism參數(shù)坞生,既是可以設(shè)定Fork/Join框架的最大并行任務(wù)數(shù)量仔役;另一個構(gòu)造函數(shù)則不帶有任何參數(shù),對于最大并行任務(wù)數(shù)量也只是一個默認(rèn)值——當(dāng)前操作系統(tǒng)可以使用的CPU內(nèi)核數(shù)量(Runtime.getRuntime().availableProcessors())恨胚。實(shí)際上ForkJoinPool還有一個私有的骂因、原生構(gòu)造函數(shù)炎咖,之上提到的三個構(gòu)造函數(shù)都是對這個私有的赃泡、原生構(gòu)造函數(shù)的調(diào)用寒波。
如果你對Fork/Join框架沒有特定的執(zhí)行要求,可以直接使用不帶有任何參數(shù)的構(gòu)造函數(shù)升熊。也就是說推薦基于當(dāng)前操作系統(tǒng)可以使用的CPU內(nèi)核數(shù)作為Fork/Join框架內(nèi)最大并行任務(wù)數(shù)量俄烁,這樣可以保證CPU在處理并行任務(wù)時,盡量少發(fā)生任務(wù)線程間的運(yùn)行狀態(tài)切換(實(shí)際上單個CPU內(nèi)核上的線程間狀態(tài)切換基本上無法避免级野,因?yàn)椴僮飨到y(tǒng)同時運(yùn)行多個線程和多個進(jìn)程)页屠。
從上面的的類關(guān)系圖可以看出來,F(xiàn)orkJoin框架的核心是ForkJoinPool類蓖柔,基于AbstractExecutorService擴(kuò)展(@sun.misc.Contended注解)辰企。
ForkJoinPool中維護(hù)了一個隊(duì)列數(shù)組WorkQueue[],每個WorkQueue維護(hù)一個ForkJoinTask數(shù)組和當(dāng)前工作線程。ForkJoinPool實(shí)現(xiàn)了工作竊取(work-stealing)算法并執(zhí)行ForkJoinTask况鸣。
ForkJoinPool類的屬性介紹
ADD_WORKER: 100000000000000000000000000000000000000000000000 -> 1000 0000 0000 0000,用來配合ctl在控制線程數(shù)量時使用
ctl: 控制ForkJoinPool創(chuàng)建線程數(shù)量牢贸,(ctl & ADD_WORKER) != 0L 時創(chuàng)建線程,也就是當(dāng)ctl的第16位不為0時镐捧,可以繼續(xù)創(chuàng)建線程
defaultForkJoinWorkerThreadFactory: 默認(rèn)線程工廠潜索,默認(rèn)實(shí)現(xiàn)是DefaultForkJoinWorkerThreadFactory
runState: 全局鎖控制,全局運(yùn)行狀態(tài)
workQueues: 工作隊(duì)列數(shù)組WorkQueue[]
config: 記錄并行數(shù)量和ForkJoinPool的模式(異步或同步)
WorkQueue類
qlock: 并發(fā)控制懂酱,put任務(wù)時的鎖控制
array: 任務(wù)數(shù)組ForkJoinTask<?>[]
pool: ForkJoinPool竹习,所有線程和WorkQueue共享,用于工作竊取列牺、任務(wù)狀態(tài)和工作狀態(tài)同步
base: array數(shù)組中取任務(wù)的下標(biāo)
top: array數(shù)組中放置任務(wù)的下標(biāo)
owner: 所屬線程整陌,F(xiàn)orkJoin框架中,只有一個WorkQueue是沒有owner的瞎领,其他的均有具體線程owner
ForkJoinTask是能夠在ForkJoinPool中執(zhí)行的任務(wù)抽象類蔓榄,父類是Future,具體實(shí)現(xiàn)類有很多默刚,這里主要關(guān)注RecursiveAction和RecursiveTask甥郑。
- RecursiveAction是沒有返回結(jié)果的任務(wù)
- RecursiveTask是需要返回結(jié)果的任務(wù)。
ForkJoinTask類屬性的介紹
status: 任務(wù)的狀態(tài)荤西,對其他工作線程和pool可見澜搅,運(yùn)行正常則status為負(fù)數(shù),異常情況為正數(shù)邪锌。
ForkJoinTask功能介紹
ForkJoinTask任務(wù)是一種能在Fork/Join框架中運(yùn)行的特定任務(wù)勉躺,也只有這種類型的任務(wù)可以在Fork/Join框架中被拆分運(yùn)行和合并運(yùn)行。
ForkJoinWorkerThread線程是一種在Fork/Join框架中運(yùn)行的特性線程觅丰,它除了具有普通線程的特性外饵溅,最主要的特點(diǎn)是每一個ForkJoinWorkerThread線程都具有一個獨(dú)立的任務(wù)等待隊(duì)列(work queue),這個任務(wù)隊(duì)列用于存儲在本線程中被拆分的若干子任務(wù)妇萄。
只需要實(shí)現(xiàn)其compute()方法蜕企,在compute()中做最小任務(wù)控制咬荷,任務(wù)分解(fork)和結(jié)果合并(join)。
ForkJoinPool中執(zhí)行的默認(rèn)線程是ForkJoinWorkerThread轻掩,由默認(rèn)工廠產(chǎn)生幸乒,可以自己重寫要實(shí)現(xiàn)的工作線程。同時會將ForkJoinPool引用放在每個工作線程中唇牧,供工作竊取時使用罕扎。
ForkJoinWorkerThread類屬性介紹
- pool: ForkJoinPool,所有線程和WorkQueue共享丐重,用于工作竊取腔召、任務(wù)狀態(tài)和工作狀態(tài)同步。
- workQueue: 當(dāng)前線程的任務(wù)隊(duì)列扮惦,與WorkQueue的owner呼應(yīng)宴咧。
簡易執(zhí)行圖
實(shí)際上Fork/Join框架的內(nèi)部工作過程要比這張圖復(fù)雜得多,例如如何決定某一個recursive task是使用哪條線程進(jìn)行運(yùn)行径缅;再例如如何決定當(dāng)一個任務(wù)/子任務(wù)提交到Fork/Join框架內(nèi)部后掺栅,是創(chuàng)建一個新的線程去運(yùn)行還是讓它進(jìn)行隊(duì)列等待。
邏輯模型圖(盜一張圖:)
()
fork方法和join方法
Fork/Join框架中提供的fork方法和join方法纳猪,可以說是該框架中提供的最重要的兩個方法氧卧,它們和parallelism“可并行任務(wù)數(shù)量”配合工作。
Fork方法介紹
- Fork就是一個不斷分枝的過程氏堤,在當(dāng)前任務(wù)的基礎(chǔ)上長出n多個子任務(wù)沙绝,他將新創(chuàng)建的子任務(wù)放入當(dāng)前線程的work queue隊(duì)列中,F(xiàn)ork/Join框架將根據(jù)當(dāng)前正在并發(fā)執(zhí)行ForkJoinTask任務(wù)的ForkJoinWorkerThread線程狀態(tài)鼠锈,決定是讓這個任務(wù)在隊(duì)列中等待闪檬,還是創(chuàng)建一個新的ForkJoinWorkerThread線程運(yùn)行它,又或者是喚起其它正在等待任務(wù)的ForkJoinWorkerThread線程運(yùn)行它购笆。
當(dāng)一個ForkJoinTask任務(wù)調(diào)用fork()方法時粗悯,當(dāng)前線程會把這個任務(wù)放入到queue數(shù)組的queueTop位置,然后執(zhí)行以下兩句代碼:
if ((s -= queueBase) <= 2)
pool.signalWork();
else if (s == m)
growQueue();
當(dāng)調(diào)用signalWork()方法同欠。signalWork()方法做了兩件事:1样傍、喚配當(dāng)前線程;2、當(dāng)沒有活動線程時或者線程數(shù)較少時铺遂,添加新的線程衫哥。
Join方法介紹
Join是一個不斷等待,獲取任務(wù)執(zhí)行結(jié)果的過程襟锐。
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}
- 第4行撤逢,(s=status)<0表示這個任務(wù)被執(zhí)行完,直接返回執(zhí)行結(jié)果狀態(tài),上層捕獲到狀態(tài)后蚊荣,決定是要獲取結(jié)果還是進(jìn)行錯誤處理初狰;
- 第6行,從queue中取出這個任務(wù)來執(zhí)行妇押,如果執(zhí)行完了,就設(shè)置狀態(tài)為NORMAL姓迅;
- 前面unpushTask()方法在隊(duì)列中沒有這個任務(wù)時會返回false敲霍,15行調(diào)用joinTask等待這個任務(wù)完成。
- 由于ForkJoinPool中有一個數(shù)組叫submissionQueue丁存,通過submit方法調(diào)用而且非ForkJoinTask這種任務(wù)會被放到這個隊(duì)列中肩杈。這種任務(wù)有可能被非ForkJoinWorkerThread線程執(zhí)行,第18行表示如果是這種任務(wù)解寝,等待它執(zhí)行完成扩然。
下面來看joinTask方法
final int joinTask(ForkJoinTask<?> joinMe) {
ForkJoinTask<?> prevJoin = currentJoin;
currentJoin = joinMe;
for (int s, retries = MAX_HELP;;) {
if ((s = joinMe.status) < 0) {
currentJoin = prevJoin;
return s;
}
if (retries > 0) {
if (queueTop != queueBase) {
if (!localHelpJoinTask(joinMe))
retries = 0; // cannot help
}
else if (retries == MAX_HELP >>> 1) {
--retries; // check uncommon case
if (tryDeqAndExec(joinMe) >= 0)
Thread.yield(); // for politeness
}
else
retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
}
else {
retries = MAX_HELP; // restart if not done
pool.tryAwaitJoin(joinMe);
}
}
}
- (1)這里有個常量MAX_HELP=16,表示幫助join的次數(shù)。第11行聋伦,queueTop!=queueBase表示本地隊(duì)列中有任務(wù)夫偶,如果這個任務(wù)剛好在隊(duì)首,則嘗試自己執(zhí)行觉增;否則返回false兵拢。這時retries被設(shè)置為0,表示不能幫助,因?yàn)樽砸殃?duì)列不為空逾礁,自己并不空閑说铃。在下一次循環(huán)就會進(jìn)入第24行,等待這個任務(wù)執(zhí)行完成嘹履。
- (2)第20行helpJoinTask()方法返回false時腻扇,retries-1,連續(xù)8次都沒有幫到忙砾嫉,就會進(jìn)入第14行幼苛,調(diào)用yield讓權(quán)等待。沒辦法人口太差焕刮,想做點(diǎn)好事都不行蚓峦,只有停下來休息一下。
- (3)當(dāng)執(zhí)行到第20行济锄,表示自己隊(duì)列為空暑椰,可以去幫助這個任務(wù)了,下面來看是怎么幫助的荐绝?
outer:for (ForkJoinWorkerThread thread = this;;) {
// Try to find v, the stealer of task, by first using hint
ForkJoinWorkerThread v = ws[thread.stealHint & m];
if (v == null || v.currentSteal != task) {
for (int j = 0; ;) { // search array
if ((v = ws[j]) != null && v.currentSteal == task) {
thread.stealHint = j;
break; // save hint for next time
}
if (++j > m)
break outer; // can't find stealer
}
}
// Try to help v, using specialized form of deqTask
for (;;) {
ForkJoinTask<?>[] q; int b, i;
if (joinMe.status < 0)
break outer;
if ((b = v.queueBase) == v.queueTop ||
(q = v.queue) == null ||
(i = (q.length-1) & b) < 0)
break; // empty
long u = (i << ASHIFT) + ABASE;
ForkJoinTask<?> t = q[i];
if (task.status < 0)
break outer; // stale
if (t != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
v.queueBase = b + 1;
v.stealHint = poolIndex;
ForkJoinTask<?> ps = currentSteal;
currentSteal = t;
t.doExec();
currentSteal = ps;
helped = true;
}
}
// Try to descend to find v's stealer
ForkJoinTask<?> next = v.currentJoin;
if (--levels > 0 && task.status >= 0 &&
next != null && next != task) {
task = next;
thread = v;
}
}
- (1)通過查看stealHint這個字段的注釋可以知道一汽,它表示最近一次誰來偷過我的queue中的任務(wù)。因此通過stealHint并不能找到當(dāng)前任務(wù)被誰偷了?所以第4行v.currentSteal != task完全可能召夹。這時還有一個辦法找到這個任務(wù)被誰偷了岩喷,看看currentSteal這個字段的注釋表示最近偷的哪個任務(wù)。這里掃描所有偷來的任務(wù)與當(dāng)前任務(wù)比較监憎,如果相等纱意,就是這個線程偷的。如果這兩種方法都不能找到小偷鲸阔,只能等待了偷霉。
- (2)當(dāng)找到了小偷后,以其人之身還之其人之道褐筛,從小偷那里偷任務(wù)過來类少,相當(dāng)于你和小偷共同執(zhí)行你的任務(wù),會加速你的任務(wù)完成渔扎。
- (3)小偷也是爺硫狞,如果小偷也在等待一個任務(wù)完成氛雪,權(quán)利反轉(zhuǎn)(小偷等待的這個任務(wù)做為當(dāng)前任務(wù)乒躺,小偷扮演當(dāng)事人角色把前面的流程走一遍)房铭,這是一個遞歸的過程榆骚。