通過(guò)上一篇(JUC源碼分析-線程池篇(四):ForkJoinPool - 1)的講解,相信同學(xué)們對(duì) ForkJoinPool 已經(jīng)有了一個(gè)大概的認(rèn)識(shí),本篇我們將通過(guò)分析源碼的方式來(lái)深入了解 ForkJoinPool 的具體實(shí)現(xiàn)原理胚宦。
概述
上篇提到過(guò),在 java.util.concurrent 包中绝骚,F(xiàn)ork/Join 框架主要由 ForkJoinPool田柔、ForkJoinWorkerThread 和 ForkJoinTask(包括RecursiveTask、RecursiveAction 和 CountedCompleter) 來(lái)實(shí)現(xiàn)耀石,它們之間的聯(lián)系如下:
- ForkJoinPool 中使用 ForkJoinWorkerThread 來(lái)運(yùn)行 ForkJoinTask 任務(wù)牵囤,F(xiàn)orkJoinPool 只接收 ForkJoinTask 任務(wù)(在實(shí)際使用中,也可以接收 Runnable/Callable 任務(wù)娶牌,但在真正運(yùn)行時(shí)奔浅,也會(huì)把這些任務(wù)封裝成 ForkJoinTask 類型的任務(wù))馆纳,RecursiveTask 是 ForkJoinTask 的子類诗良,是一個(gè)可以遞歸執(zhí)行的 ForkJoinTask,RecursiveAction 是一個(gè)無(wú)返回值的 RecursiveTask鲁驶,CountedCompleter 在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行一個(gè)自定義的鉤子函數(shù)鉴裹。
在實(shí)際運(yùn)用中,我們一般都會(huì)繼承 RecursiveTask 钥弯、RecursiveAction 或 CountedCompleter 來(lái)實(shí)現(xiàn)我們的業(yè)務(wù)需求径荔,而不會(huì)直接繼承 ForkJoinTask 類。
在本篇的解析中主要包括三個(gè)類( ForkJoinPool脆霎、ForkJoinWorkerThread 和 ForkJoinTask )总处,我們通過(guò)穿插講解它們之間的聯(lián)系來(lái)分析 ForkJoinPool 的運(yùn)行原理。
數(shù)據(jù)結(jié)構(gòu)和核心參數(shù)
1. ForkJoinPool
內(nèi)部類介紹:
ForkJoinWorkerThreadFactory:內(nèi)部線程工廠接口睛蛛,用于創(chuàng)建工作線程ForkJoinWorkerThread
DefaultForkJoinWorkerThreadFactory:ForkJoinWorkerThreadFactory 的默認(rèn)實(shí)現(xiàn)類
InnocuousForkJoinWorkerThreadFactory:實(shí)現(xiàn)了 ForkJoinWorkerThreadFactory鹦马,無(wú)許可線程工廠,當(dāng)系統(tǒng)變量中有系統(tǒng)安全管理相關(guān)屬性時(shí)忆肾,默認(rèn)使用這個(gè)工廠創(chuàng)建工作線程荸频。
EmptyTask:內(nèi)部占位類,用于替換隊(duì)列中 join 的任務(wù)客冈。
ManagedBlocker:為 ForkJoinPool 中的任務(wù)提供擴(kuò)展管理并行數(shù)的接口旭从,一般用在可能會(huì)阻塞的任務(wù)(如在 Phaser 中用于等待 phase 到下一個(gè)generation)。
-
WorkQueue:ForkJoinPool 的核心數(shù)據(jù)結(jié)構(gòu)场仲,本質(zhì)上是work-stealing 模式的雙端任務(wù)隊(duì)列和悦,內(nèi)部存放 ForkJoinTask 對(duì)象任務(wù),使用 @Contented 注解修飾防止偽共享渠缕。具體介紹見(jiàn)上篇摹闽。
工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),此時(shí)可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個(gè)棧褐健,新的任務(wù)會(huì)放入棧頂(top 位)付鹿;工作線程在處理自己工作隊(duì)列的任務(wù)時(shí)澜汤,按照 LIFO 的順序。
工作線程在處理自己的工作隊(duì)列同時(shí)舵匾,會(huì)嘗試竊取一個(gè)任務(wù)(可能是來(lái)自于剛剛提交到 pool 的任務(wù)俊抵,或是來(lái)自于其他工作線程的隊(duì)列任務(wù)),此時(shí)可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個(gè) FIFO 的隊(duì)列坐梯,竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首(base位)徽诲。
偽共享狀態(tài):緩存系統(tǒng)中是以緩存行(cache line)為單位存儲(chǔ)的。緩存行是2的整數(shù)冪個(gè)連續(xù)字節(jié)吵血,一般為32-256個(gè)字節(jié)谎替。最常見(jiàn)的緩存行大小是64個(gè)字節(jié)。當(dāng)多線程修改互相獨(dú)立的變量時(shí)蹋辅,如果這些變量共享同一個(gè)緩存行钱贯,就會(huì)無(wú)意中影響彼此的性能,這就是偽共享侦另。
核心參數(shù)
在后面的源碼解析中秩命,我們會(huì)看到大量的位運(yùn)算,這些位運(yùn)算都是通過(guò)我們接下來(lái)介紹的一些常量參數(shù)來(lái)計(jì)算的褒傅。
例如弃锐,如果要更新活躍線程數(shù),使用公式(UC_MASK & (c + AC_UNIT)) | (SP_MASK & c)
殿托;c 代表當(dāng)前 ctl霹菊,UC_MASK 和 SP_MASK 分別是高位和低位掩碼,AC_UNIT 為活躍線程的增量數(shù)支竹,使用(UC_MASK & (c + AC_UNIT))
就可以計(jì)算出高32位旋廷,然后再加上低32位(SP_MASK & c)
,就拼接成了一個(gè)新的ctl
唾戚。
這些運(yùn)算的可讀性很差柳洋,看起來(lái)有些復(fù)雜。在后面源碼解析中有位運(yùn)算的地方我都會(huì)加上注釋叹坦,大家只需要了解它們的作用即可熊镣。
- ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量:
// Constants shared across ForkJoinPool and WorkQueue
// 限定參數(shù)
static final int SMASK = 0xffff; // 低位掩碼,也是最大索引位
static final int MAX_CAP = 0x7fff; // 工作線程最大容量
static final int EVENMASK = 0xfffe; // 偶數(shù)低位掩碼
static final int SQMASK = 0x007e; // workQueues 數(shù)組最多64個(gè)槽位
// ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
static final int SCANNING = 1; // 標(biāo)記是否正在運(yùn)行任務(wù)
static final int INACTIVE = 1 << 31; // 失活狀態(tài) 負(fù)數(shù)
static final int SS_SEQ = 1 << 16; // 版本戳募书,防止ABA問(wèn)題
// ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
static final int MODE_MASK = 0xffff << 16; // 模式掩碼
static final int LIFO_QUEUE = 0; //LIFO隊(duì)列
static final int FIFO_QUEUE = 1 << 16;//FIFO隊(duì)列
static final int SHARED_QUEUE = 1 << 31; // 共享模式隊(duì)列绪囱,負(fù)數(shù)
- ForkJoinPool 中的相關(guān)常量和實(shí)例字段:
// 低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
// 活躍線程數(shù)
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼
// 工作線程數(shù)
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 創(chuàng)建工作線程標(biāo)志
// 池狀態(tài)
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
// 實(shí)例字段
volatile long ctl; // 主控制參數(shù)
volatile int runState; // 運(yùn)行狀態(tài)鎖
final int config; // 并行度|模式
int indexSeed; // 用于生成工作線程索引
volatile WorkQueue[] workQueues; // 主對(duì)象注冊(cè)信息,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh; // 每個(gè)工作線程的異常信息
final String workerNamePrefix; // 用于創(chuàng)建工作線程的名稱
volatile AtomicLong stealCounter; // 偷取任務(wù)總數(shù)莹捡,也可作為同步監(jiān)視器
/** 靜態(tài)初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啟動(dòng)或殺死線程的方法調(diào)用者的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態(tài)pool
static final ForkJoinPool common;
//并行度鬼吵,對(duì)應(yīng)內(nèi)部common池
static final int commonParallelism;
//備用線程數(shù),在tryCompensate中使用
private static int commonMaxSpares;
//創(chuàng)建workerNamePrefix(工作線程名稱前綴)時(shí)的序號(hào)
private static int poolNumberSequence;
//線程阻塞等待新的任務(wù)的超時(shí)值(以納秒為單位)篮赢,默認(rèn)2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閑超時(shí)時(shí)間齿椅,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
//默認(rèn)備用線程數(shù)
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數(shù)琉挖,用在在awaitRunStateLock和awaitWork中
private static final int SPINS = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;
說(shuō)明: ForkJoinPool 的內(nèi)部狀態(tài)都是通過(guò)一個(gè)64位的 long 型 變量ctl
來(lái)存儲(chǔ),它由四個(gè)16位的子域組成:
- AC:正在運(yùn)行工作線程數(shù)減去目標(biāo)并行度涣脚,高16位
- TC:總工作線程數(shù)減去目標(biāo)并行度示辈,中高16位
- SS:棧頂?shù)却€程的版本計(jì)數(shù)和狀態(tài),中低16位
- ID: 棧頂 WorkQueue 在池中的索引(
poolIndex
)遣蚀,低16位
在后面的源碼解析中矾麻,某些地方也提取了ctl
的低32位(sp=(int)ctl
)來(lái)檢查工作線程狀態(tài),例如芭梯,當(dāng)sp不為0時(shí)說(shuō)明當(dāng)前還有空閑工作線程险耀。
- ForkJoinPool.WorkQueue 中的相關(guān)屬性:
//初始隊(duì)列容量,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊(duì)列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 實(shí)例字段
volatile int scanState; // Woker狀態(tài), <0: inactive; odd:scanning
int stackPred; // 記錄前一個(gè)棧頂?shù)腸tl
int nsteals; // 偷取任務(wù)數(shù)
int hint; // 記錄偷取者索引玖喘,初始為隨機(jī)索引
int config; // 池索引和模式
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; //下一個(gè)poll操作的索引(棧底/隊(duì)列頭)
int top; // 下一個(gè)push操作的索引(棧頂/隊(duì)列尾)
ForkJoinTask<?>[] array; // 任務(wù)數(shù)組
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當(dāng)前工作隊(duì)列的工作線程甩牺,共享模式下為null
volatile Thread parker; // 調(diào)用park阻塞期間為owner,其他情況為null
volatile ForkJoinTask<?> currentJoin; // 記錄被join過(guò)來(lái)的任務(wù)
volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊(duì)列偷取過(guò)來(lái)的任務(wù)
2. ForkJoinTask
- ForkJoinTask 實(shí)現(xiàn)了 Future 接口芒涡,說(shuō)明它也是一個(gè)可取消的異步運(yùn)算任務(wù)柴灯,實(shí)際上ForkJoinTask 是 Future 的輕量級(jí)實(shí)現(xiàn)卖漫,主要用在純粹是計(jì)算的函數(shù)式任務(wù)或者操作完全獨(dú)立的對(duì)象計(jì)算任務(wù)费尽。fork 是主運(yùn)行方法,用于異步執(zhí)行羊始;而 join 方法在任務(wù)結(jié)果計(jì)算完畢之后才會(huì)運(yùn)行旱幼,用來(lái)合并或返回計(jì)算結(jié)果。
- 其內(nèi)部類都比較簡(jiǎn)單突委,ExceptionNode 是用于存儲(chǔ)任務(wù)執(zhí)行期間的異常信息的單向鏈表柏卤;其余四個(gè)類是為 Runnable/Callable 任務(wù)提供的適配器類,用于把 Runnable/Callable 轉(zhuǎn)化為 ForkJoinTask 類型的任務(wù)(因?yàn)?ForkJoinPool 只可以運(yùn)行 ForkJoinTask 類型的任務(wù))匀油。
核心參數(shù)
/** 任務(wù)運(yùn)行狀態(tài) */
volatile int status; // 任務(wù)運(yùn)行狀態(tài)
static final int DONE_MASK = 0xf0000000; // 任務(wù)完成狀態(tài)標(biāo)志位
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16 等待信號(hào)
static final int SMASK = 0x0000ffff; // 低位掩碼
源碼解析
首先介紹一下構(gòu)造函數(shù):
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
說(shuō)明: 在 ForkJoinPool 中我們可以自定義四個(gè)參數(shù):
- parallelism:并行度缘缚,默認(rèn)為CPU數(shù),最小為1
- factory:工作線程工廠敌蚜;
- handler:處理工作線程運(yùn)行任務(wù)時(shí)的異常情況類桥滨,默認(rèn)為null;
- asyncMode:是否為異步模式弛车,默認(rèn)為 false齐媒。如果為
true
,表示子任務(wù)的執(zhí)行遵循 FIFO 順序并且任務(wù)不能被合并(join)纷跛,這種模式適用于工作線程只運(yùn)行事件類型的異步任務(wù)喻括。
在多數(shù)場(chǎng)景使用時(shí),如果沒(méi)有太強(qiáng)的業(yè)務(wù)需求贫奠,我們一般直接使用 ForkJoinPool 中的common
池唬血,在JDK1.8之后提供了ForkJoinPool.commonPool()
方法可以直接使用common
池望蜡,來(lái)看一下它的構(gòu)造:
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");//并行度
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");//線程工廠
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");//異常處理類
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory) ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler) ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;//默認(rèn)并行度為1
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
使用common pool的優(yōu)點(diǎn)就是我們可以通過(guò)指定系統(tǒng)參數(shù)的方式定義“并行度、線程工廠和異常處理類”拷恨;并且它使用的是同步模式泣特,也就是說(shuō)可以支持任務(wù)合并(join)。
ForkJoinPool 中的任務(wù)執(zhí)行分兩種:
- 直接通過(guò) FJP 提交的外部任務(wù)(external/submissions task)挑随,存放在 workQueues 的偶數(shù)槽位状您;
- 通過(guò)內(nèi)部 fork 分割的子任務(wù)(Worker task),存放在 workQueues 的奇數(shù)槽位兜挨。
首先來(lái)看一下整個(gè)Fork/Join 框架的執(zhí)行流程膏孟,后面我們的源碼解析會(huì)完全按照這個(gè)流程圖來(lái)進(jìn)行:
在接下來(lái)的解析中,我們會(huì)分四個(gè)部分:首先介紹兩種任務(wù)的提交流程拌汇;再分析任務(wù)的執(zhí)行過(guò)程(ForkJoinWorkerThread.run()
到ForkJoinTask.doExec()
這一部分)柒桑;最后介紹任務(wù)的結(jié)果獲取(ForkJoinTask.join()
和ForkJoinTask.invoke()
)
1. 外部任務(wù)(external/submissions task)提交
向 ForkJoinPool 提交任務(wù)有三種方式:invoke()
會(huì)等待任務(wù)計(jì)算完畢并返回計(jì)算結(jié)果噪舀;execute()
是直接向池提交一個(gè)任務(wù)來(lái)異步執(zhí)行魁淳,無(wú)返回結(jié)果;submit()
也是異步執(zhí)行与倡,但是會(huì)返回提交的任務(wù)界逛,在適當(dāng)?shù)臅r(shí)候可通過(guò)task.get()
獲取執(zhí)行結(jié)果。
這三種提交方式都都是調(diào)用externalPush()
方法來(lái)完成纺座,所以接下來(lái)我們將從externalPush()
方法開(kāi)始逐步分析外部任務(wù)的執(zhí)行過(guò)程息拜。
1.1 externalPush(ForkJoinTask<?> task)
//添加給定任務(wù)到submission隊(duì)列中
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue q;
int m;
int r = ThreadLocalRandom.getProbe();//探針值,用于計(jì)算WorkQueue槽位索引
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //獲取隨機(jī)偶數(shù)槽位的workQueue
U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定workQueue
ForkJoinTask<?>[] a;
int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;//計(jì)算任務(wù)索引位置
U.putOrderedObject(a, j, task);//任務(wù)入列
U.putOrderedInt(q, QTOP, s + 1);//更新push slot
U.putIntVolatile(q, QLOCK, 0);//解除鎖定
if (n <= 1)
signalWork(ws, q);//任務(wù)數(shù)小于1時(shí)嘗試創(chuàng)建或激活一個(gè)工作線程
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
}
externalSubmit(task);//初始化workQueues及相關(guān)屬性
}
首先說(shuō)明一下externalPush
和externalSubmit
兩個(gè)方法的聯(lián)系:它們的作用都是把任務(wù)放到隊(duì)列中等待執(zhí)行净响。不同的是少欺,externalSubmit
可以說(shuō)是完整版的externalPush
,在任務(wù)首次提交時(shí)馋贤,需要初始化workQueues
及其他相關(guān)屬性赞别,這個(gè)初始化操作就是externalSubmit
來(lái)完成的;而后再向池中提交的任務(wù)都是通過(guò)簡(jiǎn)化版的externalSubmit
-externalPush
來(lái)完成配乓。
externalPush
的執(zhí)行流程很簡(jiǎn)單:首先找到一個(gè)隨機(jī)偶數(shù)槽位的 workQueue仿滔,然后把任務(wù)放入這個(gè) workQueue 的任務(wù)數(shù)組中,并更新top
位扰付。如果隊(duì)列的剩余任務(wù)數(shù)小于1堤撵,則嘗試創(chuàng)建或激活一個(gè)工作線程來(lái)運(yùn)行任務(wù)(防止在externalSubmit
初始化時(shí)發(fā)生異常導(dǎo)致工作線程創(chuàng)建失敗)羽莺。
1.2 externalSubmit(ForkJoinTask<?> task)
//任務(wù)提交
private void externalSubmit(ForkJoinTask<?> task) {
//初始化調(diào)用線程的探針值实昨,用于計(jì)算WorkQueue索引
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (; ; ) {
WorkQueue[] ws;
WorkQueue q;
int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {// 池已關(guān)閉
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//初始化workQueues
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();//鎖定runState
try {
//初始化
if ((rs & STARTED) == 0) {
//初始化stealCounter
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
//創(chuàng)建workQueues,容量為2的冪次方
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);//解鎖并更新runState
}
} else if ((q = ws[k = r & m & SQMASK]) != null) {//獲取隨機(jī)偶數(shù)槽位的workQueue
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定 workQueue
ForkJoinTask<?>[] a = q.array;//當(dāng)前workQueue的全部任務(wù)
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {//擴(kuò)容
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);//放入給定任務(wù)
U.putOrderedInt(q, QTOP, s + 1);//修改push slot
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
}
if (submitted) {//任務(wù)提交成功盐固,創(chuàng)建或激活工作線程
signalWork(ws, q);//創(chuàng)建或激活一個(gè)工作線程來(lái)運(yùn)行任務(wù)
return;
}
}
move = true; // move on failure 操作失敗荒给,重新獲取探針值
} else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // 更新索引k位值的workQueue
//else terminated
unlockRunState(rs, rs & ~RSLOCK);
} else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);//重新獲取線程探針值
}
}
說(shuō)明: externalSubmit
是externalPush
的完整版本丈挟,主要用于第一次提交任務(wù)時(shí)初始化workQueues
及相關(guān)屬性,并且提交給定任務(wù)到隊(duì)列中志电。具體執(zhí)行步驟如下:
- 如果池為終止?fàn)顟B(tài)(
runState<0
)曙咽,調(diào)用tryTerminate
來(lái)終止線程池,并拋出任務(wù)拒絕異常挑辆; - 如果尚未初始化例朱,就為 FJP 執(zhí)行初始化操作:初始化
stealCounter
、創(chuàng)建workerQueues
鱼蝉,然后繼續(xù)自旋洒嗤; - 初始化完成后,執(zhí)行在
externalPush
中相同的操作:獲取 workQueue魁亦,放入指定任務(wù)渔隶。任務(wù)提交成功后調(diào)用signalWork
方法創(chuàng)建或激活線程; - 如果在步驟3中獲取到的 workQueue 為
null
洁奈,會(huì)在這一步中創(chuàng)建一個(gè) workQueue间唉,創(chuàng)建成功繼續(xù)自旋執(zhí)行第三步操作; - 如果非上述情況利术,或者有線程爭(zhēng)用資源導(dǎo)致獲取鎖失敗呈野,就重新獲取線程探針值繼續(xù)自旋。
1.3 signalWork(WorkQueue[] ws, WorkQueue q)
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c;
int sp, i;
WorkQueue v;
Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int) c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);//工作線程太少氯哮,添加新的工作線程
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
//計(jì)算ctl际跪,加上版本戳SS_SEQ避免ABA問(wèn)題
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
//計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位)
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);//喚醒阻塞線程
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
說(shuō)明:新建或喚醒一個(gè)工作線程商佛,在externalPush
喉钢、externalSubmit
、workQueue.push
良姆、scan
中調(diào)用肠虽。如果還有空閑線程,則嘗試喚醒索引到的 WorkQueue 的parker
線程玛追;如果工作線程過(guò)少((ctl & ADD_WORKER) != 0L
)税课,則調(diào)用tryAddWorker
添加一個(gè)新的工作線程。
1.4 tryAddWorker(long c)
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);//釋放鎖
if (stop != 0)
break;
if (add) {
createWorker();//創(chuàng)建工作線程
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
說(shuō)明:嘗試添加一個(gè)新的工作線程痊剖,首先更新ctl
中的工作線程數(shù)韩玩,然后調(diào)用createWorker()
創(chuàng)建工作線程。
1.5 createWorker()
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);//線程創(chuàng)建失敗處理
return false;
}
說(shuō)明:createWorker
首先通過(guò)線程工廠創(chuàng)一個(gè)新的ForkJoinWorkerThread
陆馁,然后啟動(dòng)這個(gè)工作線程(wt.start()
)找颓。如果期間發(fā)生異常,調(diào)用deregisterWorker
處理線程創(chuàng)建失敗的邏輯(deregisterWorker
在后面再詳細(xì)說(shuō)明)叮贩。
ForkJoinWorkerThread 的構(gòu)造函數(shù)如下:
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
可以看到 ForkJoinWorkerThread 在構(gòu)造時(shí)首先調(diào)用父類 Thread 的方法击狮,然后為工作線程注冊(cè)pool
和workQueue
佛析,而workQueue
的注冊(cè)任務(wù)由ForkJoinPool.registerWorker
來(lái)完成。
1.6 registerWorker()
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
//設(shè)置為守護(hù)線程
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);//構(gòu)造新的WorkQueue
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws;
int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
//生成新建WorkQueue的索引
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // Worker任務(wù)放在奇數(shù)索引位 odd-numbered indices
if (ws[i] != null) { // collision 已存在彪蓬,重新計(jì)算索引位
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
//查找可用的索引位
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {//所有索引位都被占用寸莫,對(duì)workQueues進(jìn)行擴(kuò)容
workQueues = ws = Arrays.copyOf(ws, n <<= 1);//workQueues 擴(kuò)容
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
說(shuō)明:registerWorker
是 ForkJoinWorkerThread 構(gòu)造器的回調(diào)函數(shù),用于創(chuàng)建和記錄工作線程的 WorkQueue档冬。比較簡(jiǎn)單膘茎,就不多贅述了。注意在此為工作線程創(chuàng)建的 WorkQueue 是放在奇數(shù)索引的(代碼行:i = ((s << 1) | 1) & m;
)
1.7 小結(jié)
OK酷誓,外部任務(wù)的提交流程就先講到這里辽狈。在createWorker()
中啟動(dòng)工作線程后(wt.start()
),當(dāng)為線程分配到CPU執(zhí)行時(shí)間片之后會(huì)運(yùn)行 ForkJoinWorkerThread 的run
方法開(kāi)啟線程來(lái)執(zhí)行任務(wù)呛牲。工作線程執(zhí)行任務(wù)的流程我們?cè)谥v完內(nèi)部任務(wù)提交之后會(huì)統(tǒng)一講解刮萌。
2. 子任務(wù)(Worker task)提交
子任務(wù)的提交相對(duì)比較簡(jiǎn)單,由任務(wù)的fork()
方法完成娘扩。通過(guò)上面的流程圖可以看到任務(wù)被分割(fork)之后調(diào)用了ForkJoinPool.WorkQueue.push()
方法直接把任務(wù)放到隊(duì)列中等待被執(zhí)行着茸。
2.1 ForkJoinTask.fork()
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
說(shuō)明:如果當(dāng)前線程是 Worker 線程,說(shuō)明當(dāng)前任務(wù)是fork分割的子任務(wù)琐旁,通過(guò)ForkJoinPool.workQueue.push()
方法直接把任務(wù)放到自己的等待隊(duì)列中涮阔;否則調(diào)用ForkJoinPool.externalPush()
提交到一個(gè)隨機(jī)的等待隊(duì)列中(外部任務(wù))。
2.2 ForkJoinPool.WorkQueue.push()
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {//首次提交灰殴,創(chuàng)建或喚醒一個(gè)工作線程
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
} else if (n >= m)
growArray();
}
}
說(shuō)明:首先把任務(wù)放入等待隊(duì)列并更新top
位敬特;如果當(dāng)前 WorkQueue 為新建的等待隊(duì)列(top-base<=1
),則調(diào)用signalWork
方法為當(dāng)前 WorkQueue 新建或喚醒一個(gè)工作線程牺陶;如果 WorkQueue 中的任務(wù)數(shù)組容量過(guò)小伟阔,則調(diào)用growArray()
方法對(duì)其進(jìn)行兩倍擴(kuò)容,growArray()
方法源碼如下:
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;//獲取內(nèi)部任務(wù)列表
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
//新建一個(gè)兩倍容量的任務(wù)數(shù)組
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
//從老數(shù)組中拿出數(shù)據(jù)掰伸,放到新的數(shù)組中
do { // emulate poll from old array, push to new array
ForkJoinTask<?> x;
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
int j = ((b & mask) << ASHIFT) + ABASE;
x = (ForkJoinTask<?>) U.getObjectVolatile(oldA, oldj);
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null))
U.putObjectVolatile(a, j, x);
} while (++b != t);
}
return a;
}
2.3 小結(jié)
到此忧额,兩種任務(wù)的提交流程都已經(jīng)解析完畢抹腿,下一節(jié)我們來(lái)一起看看任務(wù)提交之后是如何被運(yùn)行的。
3. 任務(wù)執(zhí)行
回到我們開(kāi)始時(shí)的流程圖,在ForkJoinPool .createWorker()
方法中創(chuàng)建工作線程后兢哭,會(huì)啟動(dòng)工作線程聋亡,系統(tǒng)為工作線程分配到CPU執(zhí)行時(shí)間片之后會(huì)執(zhí)行 ForkJoinWorkerThread 的run()
方法正式開(kāi)始執(zhí)行任務(wù)汰蓉。
3.1 ForkJoinWorkerThread.run()
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();//鉤子方法珊佣,可自定義擴(kuò)展
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);//鉤子方法,可自定義擴(kuò)展
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);//處理異常
}
}
}
}
說(shuō)明:方法很簡(jiǎn)單惯退,在工作線程運(yùn)行前后會(huì)調(diào)用自定義鉤子函數(shù)(onStart
和onTermination
)赌髓,任務(wù)的運(yùn)行則是調(diào)用了ForkJoinPool.runWorker()
。如果全部任務(wù)執(zhí)行完畢或者期間遭遇異常,則通過(guò)ForkJoinPool.deregisterWorker
關(guān)閉工作線程并處理異常信息(deregisterWorker
方法我們后面會(huì)詳細(xì)講解)春弥。
3.2 ForkJoinPool.runWorker(WorkQueue w)
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t; ; ) {
if ((t = scan(w, r)) != null)//掃描任務(wù)執(zhí)行
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13;
r ^= r >>> 17;
r ^= r << 5; // xorshift
}
}
說(shuō)明:runWorker
是 ForkJoinWorkerThread 的主運(yùn)行方法呛哟,用來(lái)依次執(zhí)行當(dāng)前工作線程中的任務(wù)。函數(shù)流程很簡(jiǎn)單:調(diào)用scan
方法依次獲取任務(wù)匿沛,然后調(diào)用WorkQueue .runTask
運(yùn)行任務(wù)扫责;如果未掃描到任務(wù),則調(diào)用awaitWork
等待逃呼,直到工作線程/線程池終止或等待超時(shí)鳖孤。
3.3 ForkJoinPool.scan(WorkQueue w, int r)
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws;
int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
//初始掃描起點(diǎn),自旋掃描
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
WorkQueue q;
ForkJoinTask<?>[] a;
ForkJoinTask<?> t;
int b, n;
long c;
if ((q = ws[k]) != null) {//獲取workQueue
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
//計(jì)算偏移量
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null && //取base位置任務(wù)
q.base == b) {//stable
if (ss >= 0) { //scanning
if (U.compareAndSwapObject(a, i, t, null)) {//
q.base = b + 1;//更新base位
if (n < -1) // signal others
signalWork(ws, q);//創(chuàng)建或喚醒工作線程來(lái)運(yùn)行任務(wù)
return t;
}
} else if (oldSum == 0 && // try to activate 嘗試激活工作線程
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);//喚醒棧頂工作線程
}
//base位置任務(wù)為空或base位置偏移抡笼,隨機(jī)移位重新掃描
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1;
r ^= r >>> 3;
r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;//隊(duì)列任務(wù)為空苏揣,記錄base位
}
//更新索引k 繼續(xù)向后查找
if ((k = (k + 1) & m) == origin) { // continue until stable
//運(yùn)行到這里說(shuō)明已經(jīng)掃描了全部的 workQueues,但并未掃描到任務(wù)
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;// 已經(jīng)被滅活或終止,跳出循環(huán)
//對(duì)當(dāng)前WorkQueue進(jìn)行滅活操作
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));//計(jì)算ctl為INACTIVE狀態(tài)并減少活躍線程數(shù)
w.stackPred = (int) c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);//修改scanState為inactive狀態(tài)
if (U.compareAndSwapLong(this, CTL, c, nc))//更新scanState為滅活狀態(tài)
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;//重置checkSum推姻,繼續(xù)循環(huán)
}
}
}
return null;
}
說(shuō)明:掃描并嘗試偷取一個(gè)任務(wù)平匈。使用w.hint
進(jìn)行隨機(jī)索引 WorkQueue,也就是說(shuō)并不一定會(huì)執(zhí)行當(dāng)前 WorkQueue 中的任務(wù)藏古,而是偷取別的Worker的任務(wù)來(lái)執(zhí)行增炭。
函數(shù)的大概執(zhí)行流程如下:
- 取隨機(jī)位置的一個(gè) WorkQueue;
- 獲取
base
位的 ForkJoinTask拧晕,成功取到后更新base
位并返回任務(wù)隙姿;如果取到的 WorkQueue 中任務(wù)數(shù)大于1,則調(diào)用signalWork
創(chuàng)建或喚醒其他工作線程厂捞; - 如果當(dāng)前工作線程處于不活躍狀態(tài)(
INACTIVE
)输玷,則調(diào)用tryRelease
嘗試喚醒棧頂工作線程來(lái)執(zhí)行。tryRelease
源碼如下:
private boolean tryRelease(long c, WorkQueue v, long inc) {
int sp = (int) c, vs = (sp + SS_SEQ) & ~INACTIVE;
Thread p;
//ctl低32位等于scanState靡馁,說(shuō)明可以喚醒parker線程
if (v != null && v.scanState == sp) { // v is at top of stack
//計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位)
long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
if (U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs;
if ((p = v.parker) != null)
U.unpark(p);//喚醒線程
return true;
}
}
return false;
}
- 如果
base
位任務(wù)為空或發(fā)生偏移欲鹏,則對(duì)索引位進(jìn)行隨機(jī)移位,然后重新掃描奈嘿; - 如果掃描整個(gè)
workQueues
之后沒(méi)有獲取到任務(wù)貌虾,則設(shè)置當(dāng)前工作線程為INACTIVE
狀態(tài);然后重置checkSum
裙犹,再次掃描一圈之后如果還沒(méi)有任務(wù)則跳出循環(huán)返回null
。
3.4 ForkJoinPool.awaitWork(WorkQueue w, int r)
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss; ; ) {
if ((ss = w.scanState) >= 0)//正在掃描衔憨,跳出循環(huán)
break;
else if (spins > 0) {
r ^= r << 6;
r ^= r >>> 21;
r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v;
WorkQueue[] ws;
int s, j;
AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
} else if (w.qlock < 0) // 當(dāng)前workQueue已經(jīng)終止叶圃,返回false recheck after spins
return false;
else if (!Thread.interrupted()) {//判斷線程是否被中斷,并清除中斷狀態(tài)
long c, prevctl, parkTime, deadline;
int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK);//活躍線程數(shù)
if ((ac <= 0 && tryTerminate(false, false)) || //無(wú)active線程践图,嘗試終止
(runState & STOP) != 0) // pool terminating
return false;
if (ac <= 0 && ss == (int) c) { // is last waiter
//計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位)
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short) (c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//總線程過(guò)量
return false; // else use timed wait
//計(jì)算空閑超時(shí)時(shí)間
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
} else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;//設(shè)置parker掺冠,準(zhǔn)備阻塞
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);//阻塞指定的時(shí)間
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)//正在掃描,說(shuō)明等到任務(wù),跳出循環(huán)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))//未等到任務(wù)德崭,更新ctl斥黑,返回false
return false; // shrink pool
}
}
return true;
}
說(shuō)明:回到runWorker
方法,如果scan
方法未掃描到任務(wù)眉厨,會(huì)調(diào)用awaitWork
等待獲取任務(wù)锌奴。函數(shù)的具體執(zhí)行流程大家看源碼,這里簡(jiǎn)單說(shuō)一下:
在等待獲取任務(wù)期間憾股,如果工作線程或線程池已經(jīng)終止則直接返回false
鹿蜀。如果當(dāng)前無(wú) active 線程,嘗試終止線程池并返回false
服球,如果終止失敗并且當(dāng)前是最后一個(gè)等待的 Worker茴恰,就阻塞指定的時(shí)間(IDLE_TIMEOUT
);等到屆期或被喚醒后如果發(fā)現(xiàn)自己是scanning
(scanState >= 0
)狀態(tài)斩熊,說(shuō)明已經(jīng)等到任務(wù)往枣,跳出等待返回true
繼續(xù) scan,否則的更新ctl
并返回false
粉渠。
3.5 WorkQueue.runTask()
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();//更新currentSteal并執(zhí)行任務(wù)
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();//依次執(zhí)行本地任務(wù)
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);//增加偷取任務(wù)數(shù)
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();//執(zhí)行鉤子函數(shù)
}
}
說(shuō)明:在scan
方法掃描到任務(wù)之后婉商,調(diào)用WorkQueue.runTask()
來(lái)執(zhí)行獲取到的任務(wù),大概流程如下:
- 標(biāo)記
scanState
為正在執(zhí)行狀態(tài)渣叛; - 更新
currentSteal
為當(dāng)前獲取到的任務(wù)并執(zhí)行它丈秩,任務(wù)的執(zhí)行調(diào)用了ForkJoinTask.doExec()
方法,源碼如下:
//ForkJoinTask.doExec()
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();//執(zhí)行我們定義的任務(wù)
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
- 調(diào)用
execLocalTasks
依次執(zhí)行當(dāng)前WorkerQueue中的任務(wù)淳衙,源碼如下:
//執(zhí)行并移除所有本地任務(wù)
final void execLocalTasks() {
int b = base, m, s;
ForkJoinTask<?>[] a = array;
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
if ((config & FIFO_QUEUE) == 0) {//FIFO模式
for (ForkJoinTask<?> t; ; ) {
if ((t = (ForkJoinTask<?>) U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)//FIFO執(zhí)行蘑秽,取top任務(wù)
break;
U.putOrderedInt(this, QTOP, s);
t.doExec();//執(zhí)行
if (base - (s = top - 1) > 0)
break;
}
} else
pollAndExecAll();//LIFO模式執(zhí)行,取base任務(wù)
}
}
- 更新偷取任務(wù)數(shù)箫攀;
- 還原
scanState
并執(zhí)行鉤子函數(shù)肠牲。
3.6 ForkJoinPool.deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
//1.移除workQueue
if (wt != null && (w = wt.workQueue) != null) {//獲取ForkJoinWorkerThread的等待隊(duì)列
WorkQueue[] ws; // remove index from array
int idx = w.config & SMASK;//計(jì)算workQueue索引
int rs = lockRunState();//獲取runState鎖和當(dāng)前池運(yùn)行狀態(tài)
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;//移除workQueue
unlockRunState(rs, rs & ~RSLOCK);//解除runState鎖
}
//2.減少CTL數(shù)
long c; // decrement counts
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
//3.處理被移除workQueue內(nèi)部相關(guān)參數(shù)
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks
}
//4.如果線程未終止,替換被移除的workQueue并喚醒內(nèi)部線程
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
//嘗試終止線程池
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
//喚醒被替換的線程靴跛,依賴于下一步
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
//創(chuàng)建工作線程替換
else if (ex != null && (c & ADD_WORKER) != 0L) {
tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
//5.處理異常
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
說(shuō)明:deregisterWorker
方法用于工作線程運(yùn)行完畢之后終止線程或處理工作線程異常缀雳,主要就是清除已關(guān)閉的工作線程或回滾創(chuàng)建線程之前的操作,并把傳入的異常拋給 ForkJoinTask 來(lái)處理梢睛。具體步驟見(jiàn)源碼注釋肥印。
3.7 小結(jié)
本節(jié)我們對(duì)任務(wù)的執(zhí)行流程進(jìn)行了說(shuō)明,后面我們將繼續(xù)介紹任務(wù)的結(jié)果獲染稀(join/invoke)深碱。
4. 獲取任務(wù)結(jié)果 - ForkJoinTask.join() / ForkJoinTask.invoke()
join() :
//合并任務(wù)結(jié)果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
//join, get, quietlyJoin的主實(shí)現(xiàn)方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
invoke() :
//執(zhí)行任務(wù),并等待任務(wù)完成并返回結(jié)果
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
//invoke, quietlyInvoke的主實(shí)現(xiàn)方法
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
說(shuō)明: join()
方法一般是在任務(wù)fork()
之后調(diào)用藏畅,用來(lái)獲确蠊琛(或者叫“合并”)任務(wù)的執(zhí)行結(jié)果。
ForkJoinTask的join()
和invoke()
方法都可以用來(lái)獲取任務(wù)的執(zhí)行結(jié)果(另外還有get
方法也是調(diào)用了doJoin
來(lái)獲取任務(wù)結(jié)果,但是會(huì)響應(yīng)運(yùn)行時(shí)異常)绞蹦,它們對(duì)外部提交任務(wù)的執(zhí)行方式一致力奋,都是通過(guò)externalAwaitDone
方法等待執(zhí)行結(jié)果。不同的是invoke()
方法會(huì)直接執(zhí)行當(dāng)前任務(wù)幽七;而join()
方法則是在當(dāng)前任務(wù)在隊(duì)列 top 位時(shí)(通過(guò)tryUnpush
方法判斷)才能執(zhí)行景殷,如果當(dāng)前任務(wù)不在 top 位或者任務(wù)執(zhí)行失敗調(diào)用ForkJoinPool.awaitJoin
方法幫助執(zhí)行或阻塞當(dāng)前 join 任務(wù)。(所以在官方文檔中建議了我們對(duì)ForkJoinTask任務(wù)的調(diào)用順序锉走,一對(duì) fork-join操作一般按照如下順序調(diào)用:a.fork(); b.fork(); b.join(); a.join();
滨彻。因?yàn)槿蝿?wù) b 是后面進(jìn)入隊(duì)列,也就是說(shuō)它是在棧頂?shù)模╰op 位)挪蹭,在它fork()
之后直接調(diào)用join()
就可以直接執(zhí)行而不會(huì)調(diào)用ForkJoinPool.awaitJoin
方法去等待亭饵。)
在這些方法中,join()
相對(duì)比較全面梁厉,所以之后的講解我們將從join()
開(kāi)始逐步向下分析辜羊,首先看一下join()
的執(zhí)行流程:
后面的源碼分析中,我們首先講解比較簡(jiǎn)單的外部 join 任務(wù)(externalAwaitDone
)词顾,然后再講解內(nèi)部 join 任務(wù)(從ForkJoinPool.awaitJoin()
開(kāi)始)八秃。
4.1 ForkJoinTask.externalAwaitDone()
private int externalAwaitDone() {
//執(zhí)行任務(wù)
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete( // CountedCompleter任務(wù)
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); // ForkJoinTask任務(wù)
if (s >= 0 && (s = status) >= 0) {//執(zhí)行失敗,進(jìn)入等待
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //更新state
synchronized (this) {
if (status >= 0) {//SIGNAL 等待信號(hào)
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
說(shuō)明: 如果當(dāng)前join
為外部調(diào)用肉盹,則調(diào)用此方法執(zhí)行任務(wù)昔驱,如果任務(wù)執(zhí)行失敗就進(jìn)入等待。方法本身是很簡(jiǎn)單的上忍,需要注意的是對(duì)不同的任務(wù)類型分兩種情況:
如果我們的任務(wù)為 CountedCompleter 類型的任務(wù)骤肛,則調(diào)用
externalHelpComplete
方法來(lái)執(zhí)行任務(wù)。(后面筆者會(huì)開(kāi)新篇來(lái)專門講解CountedCompleter
窍蓝,在此篇就不詳細(xì)介紹了腋颠,有興趣的同學(xué)們可以自己先看一下。)其他類型的 ForkJoinTask 任務(wù)調(diào)用
tryExternalUnpush
來(lái)執(zhí)行吓笙,源碼如下:
//為外部提交者提供 tryUnpush 功能(給定任務(wù)在top位時(shí)彈出任務(wù))
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue w;
ForkJoinTask<?>[] a;
int m, s;
int r = ThreadLocalRandom.getProbe();
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
(w = ws[m & r & SQMASK]) != null &&
(a = w.array) != null && (s = w.top) != w.base) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; //取top位任務(wù)
if (U.compareAndSwapInt(w, QLOCK, 0, 1)) { //加鎖
if (w.top == s && w.array == a &&
U.getObject(a, j) == task &&
U.compareAndSwapObject(a, j, task, null)) { //符合條件淑玫,彈出
U.putOrderedInt(w, QTOP, s - 1); //更新top
U.putOrderedInt(w, QLOCK, 0); //解鎖,返回true
return true;
}
U.compareAndSwapInt(w, QLOCK, 1, 0); //當(dāng)前任務(wù)不在top位面睛,解鎖返回false
}
}
return false;
}
tryExternalUnpush
的作用就是判斷當(dāng)前任務(wù)是否在top
位絮蒿,如果是則彈出任務(wù),然后在externalAwaitDone
中調(diào)用doExec()
執(zhí)行任務(wù)侮穿。
4.2 ForkJoinPool.awaitJoin()
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin; //獲取給定Worker的join任務(wù)
U.putOrderedObject(w, QCURRENTJOIN, task); //把currentJoin替換為給定任務(wù)
//判斷是否為CountedCompleter類型的任務(wù)
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>) task : null;
for (; ; ) {
if ((s = task.status) < 0) //已經(jīng)完成|取消|異常 跳出循環(huán)
break;
if (cc != null)//CountedCompleter任務(wù)由helpComplete來(lái)完成join
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task)) //嘗試執(zhí)行
helpStealer(w, task); //隊(duì)列為空或執(zhí)行失敗歌径,任務(wù)可能被偷,幫助偷取者執(zhí)行該任務(wù)
if ((s = task.status) < 0) //已經(jīng)完成|取消|異常亲茅,跳出循環(huán)
break;
//計(jì)算任務(wù)等待時(shí)間
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {//執(zhí)行補(bǔ)償操作
task.internalWait(ms);//補(bǔ)償執(zhí)行成功,任務(wù)等待指定時(shí)間
U.getAndAddLong(this, CTL, AC_UNIT);//更新活躍線程數(shù)
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);//循環(huán)結(jié)束,替換為原來(lái)的join任務(wù)
}
return s;
}
說(shuō)明: 如果當(dāng)前 join 任務(wù)不在Worker等待隊(duì)列的top
位克锣,或者任務(wù)執(zhí)行失敗茵肃,調(diào)用此方法來(lái)幫助執(zhí)行或阻塞當(dāng)前 join 的任務(wù)。函數(shù)執(zhí)行流程如下:
- 由于每次調(diào)用
awaitJoin
都會(huì)優(yōu)先執(zhí)行當(dāng)前join的任務(wù)袭祟,所以首先會(huì)更新currentJoin
為當(dāng)前join任務(wù)验残; - 進(jìn)入自旋:
- 首先檢查任務(wù)是否已經(jīng)完成(通過(guò)
task.status < 0
判斷),如果給定任務(wù)執(zhí)行完畢|取消|異常 則跳出循環(huán)返回執(zhí)行狀態(tài)s
巾乳; - 如果是 CountedCompleter 任務(wù)類型您没,調(diào)用
helpComplete
方法來(lái)完成join操作(后面筆者會(huì)開(kāi)新篇來(lái)專門講解CountedCompleter,本篇暫時(shí)不做詳細(xì)解析)胆绊; - 非 CountedCompleter 任務(wù)類型調(diào)用
WorkQueue.tryRemoveAndExec
嘗試執(zhí)行任務(wù)氨鹏; - 如果給定 WorkQueue 的等待隊(duì)列為空或任務(wù)執(zhí)行失敗,說(shuō)明任務(wù)可能被偷压状,調(diào)用
helpStealer
幫助偷取者執(zhí)行任務(wù)(也就是說(shuō)仆抵,偷取者幫我執(zhí)行任務(wù),我去幫偷取者執(zhí)行它的任務(wù))种冬; - 再次判斷任務(wù)是否執(zhí)行完畢(
task.status < 0
)镣丑,如果任務(wù)執(zhí)行失敗,計(jì)算一個(gè)等待時(shí)間準(zhǔn)備進(jìn)行補(bǔ)償操作娱两; - 調(diào)用
tryCompensate
方法為給定 WorkQueue 嘗試執(zhí)行補(bǔ)償操作莺匠。在執(zhí)行補(bǔ)償期間,如果發(fā)現(xiàn) 資源爭(zhēng)用|池處于unstable狀態(tài)|當(dāng)前Worker已終止十兢,則調(diào)用ForkJoinTask.internalWait()
方法等待指定的時(shí)間趣竣,任務(wù)喚醒之后繼續(xù)自旋,ForkJoinTask.internalWait()
源碼如下:
final void internalWait(long timeout) {
int s;
if ((s = status) >= 0 && // force completer to issue notify
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//更新任務(wù)狀態(tài)為SIGNAL(等待喚醒)
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
notifyAll();
}
}
}
在awaitJoin
中纪挎,我們總共調(diào)用了三個(gè)比較復(fù)雜的方法:tryRemoveAndExec期贫、helpStealer和tryCompensate
,下面我們依次講解异袄。
4.2.1 WorkQueue.tryRemoveAndExec(ForkJoinTask<?> task)
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
//從top往下自旋查找
for (ForkJoinTask<?> t; ; ) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;//計(jì)算任務(wù)索引
if ((t = (ForkJoinTask<?>) U.getObject(a, j)) == null) //獲取索引到的任務(wù)
return s + 1 == top; // shorter than expected
else if (t == task) { //給定任務(wù)為索引任務(wù)
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) { //彈出任務(wù)
U.putOrderedInt(this, QTOP, s); //更新top
removed = true;
}
} else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask()); //join任務(wù)已經(jīng)被移除通砍,替換為一個(gè)占位任務(wù)
if (removed)
task.doExec(); //執(zhí)行
break;
} else if (t.status < 0 && s + 1 == top) { //給定任務(wù)不是top任務(wù)
if (U.compareAndSwapObject(a, j, t, null)) //彈出任務(wù)
U.putOrderedInt(this, QTOP, s);//更新top
break; // was cancelled
}
if (--n == 0) //遍歷結(jié)束
return false;
}
if (task.status < 0) //任務(wù)執(zhí)行完畢
return false;
}
}
return true;
}
說(shuō)明: 從top
位開(kāi)始自旋向下找到給定任務(wù),如果找到把它從當(dāng)前 Worker 的任務(wù)隊(duì)列中移除并執(zhí)行它烤蜕。注意返回的參數(shù):如果任務(wù)隊(duì)列為空或者任務(wù)未執(zhí)行完畢返回true
封孙;任務(wù)執(zhí)行完畢返回false
。
4.2.2 ForkJoinPool.helpStealer(WorkQueue w, ForkJoinTask<?> task)
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent:
for (subtask = task; subtask.status >= 0; ) {
//1. 找到給定WorkQueue的偷取者v
for (int h = j.hint | 1, k = 0, i; ; k += 2) {//跳兩個(gè)索引讽营,因?yàn)閃orker在奇數(shù)索引位
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {//定位到偷取者
j.hint = i;//更新stealer索引
break;
}
checkSum += v.base;
}
}
//2. 幫助偷取者v執(zhí)行任務(wù)
for (; ; ) { // help v or descend
ForkJoinTask<?>[] a; //偷取者內(nèi)部的任務(wù)
int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;//獲取偷取者的join任務(wù)
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent; // stale虎忌,跳出descent循環(huán)重來(lái)
if (b - v.top >= 0 || (a = v.array) == null) {
if ((subtask = next) == null) //偷取者的join任務(wù)為null,跳出descent循環(huán)
break descent;
j = v;
break; //偷取者內(nèi)部任務(wù)為空橱鹏,可能任務(wù)也被偷走了膜蠢;跳出本次循環(huán)堪藐,查找偷取者的偷取者
}
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;//獲取base偏移地址
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));//獲取偷取者的base任務(wù)
if (v.base == b) {
if (t == null) // stale
break descent; // stale,跳出descent循環(huán)重來(lái)
if (U.compareAndSwapObject(a, i, t, null)) {//彈出任務(wù)
v.base = b + 1; //更新偷取者的base位
ForkJoinTask<?> ps = w.currentSteal;//獲取調(diào)用者偷來(lái)的任務(wù)
int top = w.top;
//首先更新給定workQueue的currentSteal為偷取者的base任務(wù)挑围,然后執(zhí)行該任務(wù)
//然后通過(guò)檢查top來(lái)判斷給定workQueue是否有自己的任務(wù)礁竞,如果有,
// 則依次彈出任務(wù)(LIFO)->更新currentSteal->執(zhí)行該任務(wù)(注意這里是自己偷自己的任務(wù)執(zhí)行)
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top && //內(nèi)部有自己的任務(wù)杉辙,依次彈出執(zhí)行
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);//還原給定workQueue的currentSteal
if (w.base != w.top)//給定workQueue有自己的任務(wù)了模捂,幫助結(jié)束,返回
return; // can't further help
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
說(shuō)明: 如果隊(duì)列為空或任務(wù)執(zhí)行失敗蜘矢,說(shuō)明任務(wù)可能被偷狂男,調(diào)用此方法來(lái)幫助偷取者執(zhí)行任務(wù)∑犯梗基本思想是:偷取者幫助我執(zhí)行任務(wù)岖食,我去幫助偷取者執(zhí)行它的任務(wù)。
函數(shù)執(zhí)行流程如下:
- 循環(huán)定位偷取者珍昨,由于Worker是在奇數(shù)索引位县耽,所以每次會(huì)跳兩個(gè)索引位。定位到偷取者之后镣典,更新調(diào)用者 WorkQueue 的
hint
為偷取者的索引兔毙,方便下次定位; - 定位到偷取者后兄春,開(kāi)始幫助偷取者執(zhí)行任務(wù)澎剥。從偷取者的
base
索引開(kāi)始,每次偷取一個(gè)任務(wù)執(zhí)行赶舆。在幫助偷取者執(zhí)行任務(wù)后哑姚,如果調(diào)用者發(fā)現(xiàn)本身已經(jīng)有任務(wù)(w.top != top
),則依次彈出自己的任務(wù)(LIFO順序)并執(zhí)行(也就是說(shuō)自己偷自己的任務(wù)執(zhí)行)芜茵。
4.2.3 ForkJoinPool.tryCompensate(WorkQueue w)
//執(zhí)行補(bǔ)償操作:嘗試縮減活動(dòng)線程量叙量,可能釋放或創(chuàng)建一個(gè)補(bǔ)償線程來(lái)準(zhǔn)備阻塞
private boolean tryCompensate(WorkQueue w) {
boolean canBlock;
WorkQueue[] ws;
long c;
int m, pc, sp;
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false; //調(diào)用者已終止
else if ((sp = (int) (c = ctl)) != 0) // release idle worker
canBlock = tryRelease(c, ws[sp & m], 0L);//喚醒等待的工作線程
else {//沒(méi)有空閑線程
int ac = (int) (c >> AC_SHIFT) + pc; //活躍線程數(shù)
int tc = (short) (c >> TC_SHIFT) + pc;//總線程數(shù)
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) { // two passes of odd indices
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) {//取奇數(shù)索引位
if ((v.scanState & SCANNING) != 0)//沒(méi)有正在運(yùn)行任務(wù),跳出
break;
++nbusy;//正在運(yùn)行任務(wù)九串,添加標(biāo)記
}
}
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
else if (tc >= pc && ac > 1 && w.isEmpty()) {//總線程數(shù)大于并行度 && 活動(dòng)線程數(shù)大于1 && 調(diào)用者任務(wù)隊(duì)列為空绞佩,不需要補(bǔ)償
long nc = ((AC_MASK & (c - AC_UNIT)) |
(~AC_MASK & c)); // uncompensated
canBlock = U.compareAndSwapLong(this, CTL, c, nc);//更新活躍線程數(shù)
} else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))//超出最大線程數(shù)
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
else { // similar to tryAddWorker
boolean add = false;
int rs; // CAS within lock
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT)));//計(jì)算總線程數(shù)
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);//更新總線程數(shù)
unlockRunState(rs, rs & ~RSLOCK);
//運(yùn)行到這里說(shuō)明活躍工作線程數(shù)不足,需要?jiǎng)?chuàng)建一個(gè)新的工作線程來(lái)補(bǔ)償
canBlock = add && createWorker(); // throws on exception
}
}
return canBlock;
}
說(shuō)明: 具體的執(zhí)行看源碼及注釋猪钮,這里我們簡(jiǎn)單總結(jié)一下需要和不需要補(bǔ)償?shù)膸追N情況:
- 需要補(bǔ)償:
- 調(diào)用者隊(duì)列不為空品山,并且有空閑工作線程,這種情況會(huì)喚醒空閑線程(調(diào)用
tryRelease
方法) - 池尚未停止烤低,活躍線程數(shù)不足肘交,這時(shí)會(huì)新建一個(gè)工作線程(調(diào)用
createWorker
方法)
- 調(diào)用者隊(duì)列不為空品山,并且有空閑工作線程,這種情況會(huì)喚醒空閑線程(調(diào)用
- 不需要補(bǔ)償:
- 調(diào)用者已終止或池處于不穩(wěn)定狀態(tài)
- 總線程數(shù)大于并行度 && 活動(dòng)線程數(shù)大于1 && 調(diào)用者任務(wù)隊(duì)列為空
總結(jié)
ForkJoinPool 內(nèi)部的代碼實(shí)現(xiàn)非常復(fù)雜,本篇文章筆者前前后后寫了近兩個(gè)月扑馁,有想挑戰(zhàn)一下自己的同學(xué)可以通篇仔細(xì)研究一下涯呻,如發(fā)現(xiàn)文中有錯(cuò)誤的地方凉驻,歡迎批評(píng)指正。一般來(lái)說(shuō)魄懂,我們只需要理解它的內(nèi)部思想即可沿侈。
本章重點(diǎn):
- Fork/Join 任務(wù)運(yùn)行機(jī)制
- ForkJoinPool 的 work-stealing 實(shí)現(xiàn)方式