Java并發(fā)編程源碼分析系列:
- 分析Java線程池的創(chuàng)建
- 分析Java線程池執(zhí)行原理
- 分析Java線程池Callable任務(wù)執(zhí)行原理
- 分析ReentrantLock的實(shí)現(xiàn)原理
- 分析CountDownLatch的實(shí)現(xiàn)原理
- 分析同步工具Semaphore和CyclicBarrier的實(shí)現(xiàn)原理
- 分析Java延遲與周期任務(wù)的實(shí)現(xiàn)原理
- 分析jdk-1.8-ForkJoinPool實(shí)現(xiàn)原理(上)
上篇介紹了ForkJoinPool的基本結(jié)構(gòu)和參數(shù),本篇進(jìn)入代碼細(xì)節(jié),一窺ForkJoinPool的實(shí)現(xiàn)原理颊亮。
整個(gè)流程和重要方法歸納如下:
任務(wù)提交
- 提交任務(wù)入口:submit,execute,invoke
- 完整版提交任務(wù):externalSubmit(包括初始化)
- 簡(jiǎn)單版提交任務(wù):externalPush
worker管理
- 激活或創(chuàng)建:signalWork
- 創(chuàng)建:tryAddWorker,createWorker
- 注冊(cè)、撤銷注冊(cè):registerWorker,deregisterWorker
worker執(zhí)行(runWorker三部曲)
- 獲日偷伞:scan
- 執(zhí)行:runTask
- 等待:awaitWork
Fork
- 等同于提交任務(wù)
Join(doJoin)
- 當(dāng)前不是worker:externalAwaitDone
- 當(dāng)前是worker:awaitJoin
awaitJoin等待兩種策略
- Helping:tryRemoveAndExec、helpStealer
- Compensating:tryCompensate
等待所有任務(wù)完成
- 靜止:awaitQuiescence
- 終止:awaitTermination
關(guān)閉
- shutdown,shutdownNow
- tryTerminate
異常處理
提交第一個(gè)task
提交任務(wù)默認(rèn)使用來(lái)自于接口的submit饲鄙,除此之外凄诞,F(xiàn)orkJoinPool還提供execute和invoke:
- submit:提交任務(wù)并返回任務(wù)
- execute:只提交任務(wù)
- invoke:提交并返回任務(wù)結(jié)果(return task.join())
它們?nèi)齻€(gè)內(nèi)部實(shí)現(xiàn)是一樣的,只是返回的東西不同忍级,我們來(lái)看submit的實(shí)現(xiàn)就夠:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
除了使用ForkJoinTask外帆谍,還支持Runnable和Callable,內(nèi)部使用Adapter最終轉(zhuǎn)為ForkJoinTask轴咱。submit很簡(jiǎn)單地調(diào)用externalPush汛蝙,這是個(gè)簡(jiǎn)化版的任務(wù)入隊(duì)方法,調(diào)用不成功時(shí)需要調(diào)用完整版的externalSubmit朴肺。
我們先來(lái)看externalSubmit窖剑,它處理非正常情況和進(jìn)行初始化。ForkJoinPool構(gòu)造函數(shù)只初始化一部分參數(shù)戈稿,包括WorkQueue[]等留到在externalSubmit初始化西土。
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
//1
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//2
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//3
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
//4
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
mark1檢查運(yùn)行狀態(tài)是否已經(jīng)進(jìn)入SHUTDOWN,拋出拒收的異常鞍盗。對(duì)于ForkJoinPool的關(guān)閉需了,見(jiàn)后文“關(guān)閉ForkJoinPool”一節(jié)。
第一次執(zhí)行externalSubmit時(shí)般甲,運(yùn)行狀態(tài)還沒(méi)有STARTED肋乍,執(zhí)行mark2進(jìn)行初始化操作:
- 按2的冪設(shè)置WorkQueue[]的長(zhǎng)度
- 設(shè)置原子對(duì)象stealCounter
- 運(yùn)行狀態(tài)進(jìn)入STARTED
第二次循環(huán)中,執(zhí)行mark4欣除,創(chuàng)建第一個(gè)WorkQueue住拭。
第三次循環(huán)中挪略,執(zhí)行mark3历帚,會(huì)找到剛才創(chuàng)建的WorkQueue,從隊(duì)列的top端加入任務(wù)杠娱,調(diào)用后面要講的signalWork激活或者創(chuàng)建worker挽牢。
WorkQueue在WorkQueue[]的下標(biāo),取的是k = r & m & SQMASK摊求。r是線程的probe禽拔,來(lái)自隨機(jī)數(shù)ThreadLocalRandom;m是WorkQueue[]的長(zhǎng)度減一;SQMASK是固定值0x007e睹栖,轉(zhuǎn)為二進(jìn)制是1111110硫惕,末尾是0,在&操作后野来,得出的k必定是偶數(shù)恼除。所以創(chuàng)建的第一個(gè)WorkQueue沒(méi)有對(duì)應(yīng)worker,保存的任務(wù)是submission曼氛,scanState默認(rèn)是INACTIVE豁辉。
externalSubmit是長(zhǎng)了點(diǎn),不過(guò)邏輯清晰舀患,不難理解徽级。除了初始化,大部分時(shí)間其實(shí)不需要externalSubmit聊浅,使用簡(jiǎn)單版的externalPush即可餐抢。
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
雖然進(jìn)入externalPush的if有一大堆條件,不過(guò)有了前面的分析低匙,我們很容易看懂:
- 線程經(jīng)過(guò)ThreadLocalRandom初始化弹澎;
- 運(yùn)行狀態(tài)正常;
- WorkQueue[]非空努咐;
- 隨機(jī)數(shù)取到的WorkQueue非空苦蒿,并鎖定成功。
滿足上面的條件后渗稍,任務(wù)從top端入隊(duì)佩迟。如果隊(duì)列里只有一個(gè)任務(wù),調(diào)用signalWork竿屹”ㄇ浚基本實(shí)現(xiàn)和externalSubmit的mark3差不多。
worker管理
worker的管理涉及創(chuàng)建拱燃、激活秉溉、注冊(cè)、撤銷注冊(cè)碗誉。
接上一節(jié)創(chuàng)建第一個(gè)WorkQueue并加入第一個(gè)任務(wù)召嘶,調(diào)用了signalWork,入?yún)⑹荳orkQueue[]和當(dāng)前操作的WorkQueue哮缺。
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
//1
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
//2
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
首先是進(jìn)入循環(huán)的條件弄跌,判斷了ctl的正負(fù),我們知道ctl的第一個(gè)16bit表示AC尝苇,為負(fù)時(shí)表示活動(dòng)的worker還未達(dá)到預(yù)定的Parallelism铛只,需要新增或者激活埠胖。mark1通過(guò)sp判斷現(xiàn)在沒(méi)有空閑worker,需要執(zhí)行增加淳玩,調(diào)用tryAddWorker直撤。
有空閑worker的情況進(jìn)入mark2,sp取棧頂WorkQueue的下標(biāo)蜕着,具體解掛worker的過(guò)程和tryRelease幾乎一樣谊惭,這里合起來(lái)介紹。
private boolean tryRelease(long c, WorkQueue v, long inc) {
int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
if (v != null && v.scanState == sp) { // v is at top of stack
long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
if (U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs;
if ((p = v.parker) != null)
U.unpark(p);
return true;
}
}
return false;
}
在sp上侮东,將狀態(tài)從inactive改為active圈盔,累加版本號(hào),解掛線程悄雅,通過(guò)stackPred取得前一個(gè)WorkQueue的index驱敲,設(shè)回sp里。
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
增加worker宽闲,需要將AC和TC都加1众眨,成功后調(diào)用createWorker。
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
createWorker的代碼很簡(jiǎn)單容诬,通過(guò)線程工廠創(chuàng)建worker的實(shí)例并啟動(dòng)娩梨。如果沒(méi)有異常,直接返回就行览徒;否則狈定,需要逆操作撤銷worker的注冊(cè)。worker什么時(shí)候注冊(cè)了习蓬?看ForkJoinWorkerThread的構(gòu)造函數(shù)纽什,里面調(diào)用ForkJoinPool.registerWorker。
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
//1
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
//2
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
一開(kāi)始躲叼,線程就被設(shè)置為守護(hù)線程芦缰。重溫知識(shí)點(diǎn),當(dāng)只剩下守護(hù)線程時(shí)枫慷,JVM就會(huì)退出让蕾,垃圾回收線程也是一個(gè)典型的守護(hù)線程。
mark1或听,前文講過(guò)有對(duì)應(yīng)worker的WorkQueue只能出現(xiàn)在WorkQueue[]奇數(shù)index探孝,代碼里取初始index用的是:
i = ((s << 1) | 1) & m;
seed左移再“或”1,是奇數(shù)神帅。m是WorkQueue[]長(zhǎng)度減1再姑,也是奇數(shù)。兩者再“與”找御,保證取得的i是奇數(shù)元镀。若該位置已經(jīng)存在其他WorkQueue,需要重新計(jì)算下一個(gè)位置霎桅,有需要還要擴(kuò)容WorkQueue[]栖疑。
mark2設(shè)置新創(chuàng)建WorkQueue的scanState為index,表示了兩種意思:
- 非負(fù)表示有對(duì)應(yīng)的worker滔驶;
- 默認(rèn)scanState使用SCANNING遇革。
就此描述清楚worker的創(chuàng)建、WorkQueue的創(chuàng)建和加入WorkQueue[]揭糕。
創(chuàng)建worker時(shí)會(huì)默認(rèn)注冊(cè)worker萝快,當(dāng)創(chuàng)建出現(xiàn)異常時(shí),需要執(zhí)行撤銷注冊(cè)著角。
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
//1
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws; // remove index from array
int idx = w.config & SMASK;
int rs = lockRunState();
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
unlockRunState(rs, rs & ~RSLOCK);
}
//2
long c; // decrement counts
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
//3
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks
}
//4
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
else if (ex != null && (c & ADD_WORKER) != 0L) {
tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
//5
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
撤銷注冊(cè)過(guò)程按部就班的揪漩,肯定想到包括處理WorkQueue的善后和修改ctl:
- 將歸屬的WorkQueue從WorkQueue[]中置空,具體下標(biāo)從WorkQueue.config中獲壤艨凇奄容;
- AC和TC分別減一;
- WorkQueue的qlock置負(fù)产徊,表示要終止了昂勒,并且取消隊(duì)里所有任務(wù);
- 檢查運(yùn)行狀態(tài)舟铜,嘗試激活或者創(chuàng)建worker替代戈盈;
- 異常處理。
worker執(zhí)行
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
ForkJoinWorkerThread啟動(dòng)后調(diào)用了ForkJoinPool的runWorker:
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
worker執(zhí)行流程就是三部曲:
- scan:嘗試獲取一個(gè)任務(wù)谆刨;
- runTask:執(zhí)行取得的任務(wù)奕谭;
- awaitWork:沒(méi)有任務(wù)進(jìn)入等待。
如果awaitWork返回false痴荐,等不到任務(wù)血柳,跳出runWorker的循環(huán),回到run中執(zhí)行finally生兆,最后調(diào)用deregisterWorker撤銷注冊(cè)难捌。
首先是scan,掃描WorkQueue[]鸦难,嘗試steal一個(gè)任務(wù)根吁。
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
//1
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
if ((q = ws[k]) != null) {
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
//2
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
//3
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
mark1進(jìn)入循環(huán),通過(guò)隨機(jī)數(shù)從WorkQueue[]獲取WorkQueue合蔽,并嘗試從WorkQueue的base端steal任務(wù)击敌。到達(dá)mark2表示成功定位一個(gè)任務(wù),這時(shí)要看歸屬自己WorkQueue的scanState:
- active:從WorkQueue的base端出隊(duì)并返回拴事,常規(guī)地調(diào)用signalWork沃斤,結(jié)束圣蝎;
- inactive:這個(gè)狀態(tài)下,調(diào)用tryRelease衡瓶,如果WorkQueue正好在棧頂上徘公,激活它。
mark3處哮针,每次循環(huán)會(huì)校驗(yàn)新取的index是不是等于第一次取的index关面。如果相等,說(shuō)明遍歷了一圈還沒(méi)有steal到任務(wù)十厢,當(dāng)前worker是過(guò)剩的等太,執(zhí)行如下操作:
- 當(dāng)前WorkQueue的scanState修改為inactive;
- 當(dāng)前WorkQueue掛到棧頂蛮放,AC減一缩抡。
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
steal到一個(gè)任務(wù)后,就可以開(kāi)始執(zhí)行:
- 將WorkQueue的scanState從SCANNING轉(zhuǎn)為RUNNING筛武;
- 記錄當(dāng)前任務(wù)是steal來(lái)的缝其,保存在currentSteal,并執(zhí)行doExec徘六;
- 執(zhí)行自己WorkQueue里的任務(wù)execLocalTasks(根據(jù)mode控制取任務(wù)是LIFO還是FIFO内边,調(diào)用doExec執(zhí)行,直到WorkQueue為空)待锈;
- 累加steal數(shù)量漠其;
- 能執(zhí)行的都執(zhí)行了,scanState轉(zhuǎn)回SCANNING竿音。
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
doExec方法和屎,里面最終調(diào)用ForkJoinTask的核心方法exec,前文介紹過(guò)春瞬,RecursiveAction和RecursiveTask它們override的exce調(diào)用了compute柴信。這樣子,源碼和使用的方法關(guān)聯(lián)起來(lái)了宽气。
當(dāng)任務(wù)執(zhí)行完成随常,調(diào)用setCompletion,將任務(wù)狀態(tài)改為NORMAL萄涯。注意绪氛,使用CAS修改狀態(tài)時(shí),目標(biāo)狀態(tài)使用s|NORMAL涝影。
- 原狀態(tài)是NORMAL枣察,無(wú)符號(hào)右移為0;
- 原狀態(tài)是SIGNAL,無(wú)符號(hào)右移不為0序目。
如果任務(wù)原狀態(tài)是SIGNAL臂痕,表示有線程由于join而進(jìn)入了wait,等著任務(wù)完成宛琅,這時(shí)需要額外操作notify觸發(fā)喚醒刻蟹。
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
//1
if ((ss = w.scanState) >= 0)
break;
//2
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
}
else if (w.qlock < 0) // recheck after spins
return false;
//3
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
//4
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
//5
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
awaitWork里核心是一個(gè)無(wú)限循環(huán)逗旁,我們重點(diǎn)看里面的等待操作和跳出條件嘿辟。
mark1判斷WorkQueue的scanState,非負(fù)表示W(wǎng)orkQueue要不在RUNNING片效,要不在SCANNING红伦,直接跳出。mark2里淀衣,SPINS初始為0昙读,沒(méi)有啟用自旋等待的控制。
重點(diǎn)來(lái)看mark3膨桥,只要沒(méi)有中斷蛮浑,就會(huì)一直循環(huán)執(zhí)行(tryTerminate終止ForkJoinPool時(shí)會(huì)中斷所有worker)。啰嗦一句只嚣,要分清楚return和break的不同含義:
- break:回到runWorker繼續(xù)執(zhí)行scan沮稚、runTask、awaitWork册舞;
- return false:worker需要終止了蕴掏。
mark4檢查ForkJoinPool的狀態(tài),如果走向中止那邊调鲸,當(dāng)前worker也就無(wú)必要存在盛杰,return false。
mark5判斷worker的存在是否有必要藐石,如果滿足下面條件:
- AC為零即供;
- TC超過(guò)2個(gè);
- 當(dāng)前WorkQueue在棧頂于微。
說(shuō)明當(dāng)前worker過(guò)剩逗嫡,存在也沒(méi)有任務(wù)執(zhí)行,所以WorkQueue從棧頂釋放角雷,return false終止worker祸穷。
其他情況計(jì)算一個(gè)等待時(shí)間,掛起線程勺三,被喚醒有兩種可能:
- 外部喚醒:如果scanState非負(fù)雷滚,break出循環(huán),繼續(xù)執(zhí)行scan吗坚;
- 時(shí)間到達(dá)喚醒:還是老樣子祈远,自己過(guò)剩呆万,return false終止。
Fork
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
fork的代碼很簡(jiǎn)單车份,如果當(dāng)前線程是一個(gè)worker谋减,直接將任務(wù)從top端加入自己的WorkQueue。對(duì)于非worker提交的task扫沼,執(zhí)行externalPush出爹,這個(gè)前面詳細(xì)分析過(guò)了。
Join
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
join的目的是得到任務(wù)的運(yùn)行結(jié)果缎除,核心調(diào)用doJoin严就,根據(jù)任務(wù)狀態(tài)返回結(jié)果,或者拋出異常器罐。要注意的是梢为,任務(wù)在ForkJoinPool中可能處于各種各樣的狀況,有可能剛好要被執(zhí)行啊轰坊,有可能正在隊(duì)列里排隊(duì)啊铸董,有可能已經(jīng)被別人偷走啊。
doJoin的return是花一樣的一串判斷肴沫,先分解出頭兩個(gè)判斷:
- status為負(fù)表示任務(wù)執(zhí)行已經(jīng)有結(jié)果蜂厅,直接返回盟戏;
- 區(qū)分當(dāng)前線程是否worker。
先來(lái)說(shuō)當(dāng)前線程不是worker這種情況,調(diào)用externalAwaitDone:
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
先不講CountedCompleter的協(xié)作鹦肿,將任務(wù)狀態(tài)設(shè)置為SIGNAL黑滴,然后是使用wait/notify機(jī)制真友,線程進(jìn)入等待捍岳。既然不是worker,不屬于ForkJoinPool的管理范圍沉衣,你掛起等通知就是了郁副。
如果當(dāng)前線程是worker,那就復(fù)雜多了豌习。
(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L)
首先調(diào)用tryUnpush存谎,如果WorkQueue的top端任務(wù)正好是等待join的任務(wù),毫無(wú)疑問(wèn)肥隆,下個(gè)就是執(zhí)行它既荚,直接doExec;否則調(diào)用ForkJoinPool的awaitJoin栋艳。
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
//1
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
//2
if ((s = task.status) < 0)
break;
//3
if (cc != null)
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task);
if ((s = task.status) < 0)
break;
long ms, ns;
//4
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
mark1處恰聘,worker在WorkQueue中標(biāo)記正在等待的任務(wù),記在currentJoin。進(jìn)入循環(huán)晴叨,在mark2處校驗(yàn)任務(wù)的狀態(tài)凿宾,如果已經(jīng)完成,直接跳出兼蕊。
接下來(lái)重點(diǎn)是處理worker的等待初厚,直接閑著太浪費(fèi)了,awaitJoin里會(huì)分別嘗試兩種策略:
- Helping:嘗試安排別的任務(wù)孙技;
- Compensating:創(chuàng)建或者激活一個(gè)備用worker产禾,原worker進(jìn)入等待,由備用worker補(bǔ)償工作量绪杏,直到原worker恢復(fù)下愈。
mark3和mark4分別嘗試兩種策略纽绍。
Helping
兩個(gè)help方法意思十分明確蕾久,如果任務(wù)是CountedCompleter,調(diào)用helpComplete拌夏。接下來(lái)看自己的WorkQueue僧著,調(diào)用tryRemoveAndExec檢查隊(duì)列里所有任務(wù),看等待join的任務(wù)在不在里面障簿。如無(wú)所獲盹愚,最后調(diào)用helpStealer,幫助其他worker站故。
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
return s + 1 == top; // shorter than expected
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
if (removed)
task.doExec();
break;
}
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
}
return true;
}
tryRemoveAndExec功能就是遍歷WorkQueue皆怕,任務(wù)在隊(duì)列里的位置可以分兩種情況:
- 剛好在top端,取出來(lái)直接運(yùn)行西篓;
- 在隊(duì)列中間愈腾,使用EmptyTask替代原位置,也可以取任務(wù)出來(lái)運(yùn)行岂津。
最后讓tryRemoveAndExec返回false虱黄,不再參與helpStealer。
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent: for (subtask = task; subtask.status >= 0; ) {
//1
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {
j.hint = i;
break;
}
checkSum += v.base;
}
}
//2
for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;
//3
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent;
//4
if (b - v.top >= 0 || (a = v.array) == null) {
if ((subtask = next) == null)
break descent;
j = v;
break;
}
//5
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
if (v.base == b) {
if (t == null) // stale
break descent;
if (U.compareAndSwapObject(a, i, t, null)) {
v.base = b + 1;
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);
if (w.base != w.top)
return; // can't further help
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
helpStealer體現(xiàn)了互助的原則吮成,你steal了我剛好需要join的任務(wù)橱乱,我不會(huì)閑等著,我也幫你執(zhí)行任務(wù)粱甫∮镜看最外面do-while循環(huán)和標(biāo)記為descent的for循環(huán),它們的條件都是只要join的任務(wù)沒(méi)有執(zhí)行完成茶宵,就一直執(zhí)行幫助危纫。
首先要找到需要幫助的WorkQueue,給個(gè)代號(hào)叫A,依據(jù)是currentSteal正好是等待join的任務(wù)叶摄。在mark1属韧,遍歷WorkQueue[]奇數(shù)下標(biāo)的WorkQueue,檢查currentSteal蛤吓,如果是宵喂,表示它就是我們要找的人。
- 如果A中有任務(wù)等待執(zhí)行会傲,循環(huán)從base端取任務(wù)執(zhí)行锅棕,代碼是mark5處。注意到判斷w.base != w.top時(shí)需要return淌山,因?yàn)楫?dāng)原WorkQueue有新任務(wù)時(shí)裸燎,不能繼續(xù)幫助;
- 如果A中沒(méi)有任務(wù)泼疑,難道想幫也幫不了移稳?那就繼續(xù)幫助下家。在mark4中个粱,根據(jù)A的currentJoin,找到下個(gè)WorkQueue翻翩,邏輯依舊是從base端取任務(wù)執(zhí)行絮吵。
各種數(shù)據(jù)在不斷變化中暇昂,mark3會(huì)校驗(yàn)兩個(gè)WorkQueue的currentJoin和currentSteal是不是目標(biāo)任務(wù)名段,不是的直接跳出到descent,重新查找WorkQueue卡啰。
Compensating
回到awaitJoin的mark4押搪,嘗試第二種策略Compensating及穗。在timeout范圍里娃豹,tryCompensate會(huì)不斷調(diào)用民鼓,看能不能執(zhí)行補(bǔ)償饮亏。確定能夠執(zhí)行補(bǔ)償简肴,當(dāng)前任務(wù)狀態(tài)轉(zhuǎn)為SIGNAL赫粥,并進(jìn)入wait瀑粥。
private boolean tryCompensate(WorkQueue w) {
boolean canBlock;
WorkQueue[] ws; long c; int m, pc, sp;
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false;
//1
else if ((sp = (int)(c = ctl)) != 0) // release idle worker
canBlock = tryRelease(c, ws[sp & m], 0L);
else {
//2
int ac = (int)(c >> AC_SHIFT) + pc;
int tc = (short)(c >> TC_SHIFT) + pc;
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) { // two passes of odd indices
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) {
if ((v.scanState & SCANNING) != 0)
break;
++nbusy;
}
}
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
//3
else if (tc >= pc && ac > 1 && w.isEmpty()) {
long nc = ((AC_MASK & (c - AC_UNIT)) |
(~AC_MASK & c)); // uncompensated
canBlock = U.compareAndSwapLong(this, CTL, c, nc);
}
//4
else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
//5
else { // similar to tryAddWorker
boolean add = false; int rs; // CAS within lock
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT)));
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
canBlock = add && createWorker(); // throws on exception
}
}
return canBlock;
}
mark1如果棧頂有空閑的worker,激活即可弛矛。否則,考慮創(chuàng)建新的worker:
- mark2檢查RUNNING的worker是否等于TC(上面遍歷奇數(shù)WorkQueue時(shí)循環(huán)了兩次,所以TC需要乘以2),這種情況很明顯不需要?jiǎng)?chuàng)建worker補(bǔ)償闰靴;
- mark3如果發(fā)現(xiàn)WorkQueue空了系忙,調(diào)整AC的數(shù)量捺弦,減一柒爵;
- mark4檢查TC是否超過(guò)最大值;(TC最大值不是parallelism哦)
- mark5具體增加worker的代碼和tryAddWorker類似烈评,不過(guò)這里只有TC加一芽卿,AC不需要變動(dòng),因?yàn)榛顒?dòng)worker數(shù)量在補(bǔ)償下沒(méi)有改變瓶佳。
等待所有任務(wù)完成
向ForkJoinPool提交一堆任務(wù)后精钮,我們會(huì)希望等待所有任務(wù)執(zhí)行完成后蕉堰,繼續(xù)下一步操作导披。ForkJoinPool提供了兩個(gè)阻塞的await方法滓技。
- awaitQuiescence
- awaitTermination
前者等待線程池靜止,后者等待線程池終止棚潦,都很好理解令漂。
public boolean isQuiescent() {
return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}
判斷靜止是通過(guò)判斷AC是否少于等于零,當(dāng)沒(méi)有活動(dòng)worker時(shí)丸边,也就說(shuō)明當(dāng)前所有任務(wù)都執(zhí)行完成叠必。
public boolean awaitQuiescence(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
ForkJoinWorkerThread wt;
Thread thread = Thread.currentThread();
if ((thread instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)thread).pool == this) {
helpQuiescePool(wt.workQueue);
return true;
}
long startTime = System.nanoTime();
WorkQueue[] ws;
int r = 0, m;
boolean found = true;
//1
while (!isQuiescent() && (ws = workQueues) != null &&
(m = ws.length - 1) >= 0) {
if (!found) {
//2
if ((System.nanoTime() - startTime) > nanos)
return false;
Thread.yield(); // cannot block
}
//3
found = false;
for (int j = (m + 1) << 2; j >= 0; --j) {
ForkJoinTask<?> t; WorkQueue q; int b, k;
if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
(b = q.base) - q.top < 0) {
found = true;
if ((t = q.pollAt(b)) != null)
t.doExec();
break;
}
}
}
return true;
}
awaitQuiescence又是區(qū)分當(dāng)前線程是否是worker,如果是worker妹窖,調(diào)用helpQuiescePool纬朝,接下來(lái)馬上就講。如果不是worker骄呼,有趣的是線程也參與到執(zhí)行當(dāng)中共苛。
非worker線程進(jìn)入mark1的循環(huán),條件是ForkJoinPool還沒(méi)有quiescent蜓萄。mark2在時(shí)間沒(méi)有timeout的情況下隅茎,先讓步,如果能夠得到執(zhí)行權(quán)嫉沽,進(jìn)入mark3尋找有任務(wù)的WorkQueue辟犀,從base端取出任務(wù)執(zhí)行。
不得不說(shuō)绸硕,F(xiàn)orkJoinPool極盡所能利用資源堂竟,加快任務(wù)的執(zhí)行速度。
從helpQuiescePool的方法名也能知道玻佩,在awaitQuiescence時(shí)出嘹,worker當(dāng)仁不讓會(huì)從別的WorkQueue取任務(wù),整體加快執(zhí)行速度咬崔。
final void helpQuiescePool(WorkQueue w) {
ForkJoinTask<?> ps = w.currentSteal; // save context
for (boolean active = true;;) {
long c; WorkQueue q; ForkJoinTask<?> t; int b;
//1
w.execLocalTasks(); // run locals before each scan
//2
if ((q = findNonEmptyStealQueue()) != null) {
if (!active) { // re-establish active count
active = true;
U.getAndAddLong(this, CTL, AC_UNIT);
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec();
if (++w.nsteals < 0)
w.transferStealCount(this);
}
}
//3
else if (active) { // decrement active count without queuing
long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
break; // bypass decrement-then-increment
if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
}
//4
else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
break;
}
U.putOrderedObject(w, QCURRENTSTEAL, ps);
}
第一步mark1疚漆,worker需要先execLocalTasks,將自己WorkQueue里的任務(wù)執(zhí)行完畢。
接著mark2娶聘,findNonEmptyStealQueue查找非空WorkQueue。這里使用了變量active標(biāo)記worker是否活動(dòng)甚脉,以便修改AC丸升。當(dāng)找到非空WorkQueue,worker當(dāng)前是inactive牺氨,重新變?yōu)閍ctive并將AC加一狡耻,并從非空WorkQueue的base端取任務(wù)執(zhí)行。
mark3猴凹,當(dāng)worker是active夷狰,但沒(méi)有非空WorkQueue,將worker變?yōu)閕nactive并將AC減一郊霎。如果變化前AC已經(jīng)為0沼头,表示整個(gè)ForkJoinPool所有任務(wù)都執(zhí)行完成進(jìn)入quiescent。OK這就是我們的目標(biāo)书劝,直接跳出循環(huán)进倍。
mark4,當(dāng)worker是inactive购对,沒(méi)有非空WorkQueue猾昆,AC又等于0,沒(méi)有東西可干骡苞,跳出循環(huán)垂蜗,保持AC至少為1。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (this == common) {
awaitQuiescence(timeout, unit);
return false;
}
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
if (nanos <= 0L)
return false;
long deadline = System.nanoTime() + nanos;
synchronized (this) {
for (;;) {
if (isTerminated())
return true;
if (nanos <= 0L)
return false;
long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
wait(millis > 0L ? millis : 1L);
nanos = deadline - System.nanoTime();
}
}
}
在時(shí)間范圍內(nèi)解幽,awaitTermination等待運(yùn)行狀態(tài)進(jìn)入TERMINATED贴见,沒(méi)有什么特別要講。
關(guān)閉ForkJoinPool
ForkJoinPool的關(guān)閉方法shutdown和shutdownNow都是調(diào)用tryTerminate亚铁,區(qū)別是now是否為true蝇刀。tryTerminate的代碼很長(zhǎng),我們將它分拆開(kāi)幾段來(lái)看徘溢。
int rs;
if (this == common) // cannot shut down
return false;
if ((rs = runState) >= 0) {
if (!enable)
return false;
rs = lockRunState(); // enter SHUTDOWN phase
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
}
shutdown和shutdownNow調(diào)用tryTerminate將運(yùn)行狀態(tài)設(shè)置為SHUTDOWN(enable==true)吞琐。其他代碼調(diào)用tryTerminate,enable是false然爆,僅僅檢測(cè)ForkJoinPool是否正在關(guān)閉或者已經(jīng)關(guān)閉站粟。
if ((rs & STOP) == 0) {
if (!now) { // check quiescence
for (long oldSum = 0L;;) { // repeat until stable
WorkQueue[] ws; WorkQueue w; int m, b; long c;
long checkSum = ctl;
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
return false; // still active workers
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break; // check queues
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
if ((b = w.base) != w.top || w.scanState >= 0 ||
w.currentSteal != null) {
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
return false; // arrange for recheck
}
checkSum += b;
if ((i & 1) == 0)
w.qlock = -1; // try to disable external
}
}
if (oldSum == (oldSum = checkSum))
break;
}
}
if ((runState & STOP) == 0) {
rs = lockRunState(); // enter STOP phase
unlockRunState(rs, (rs & ~RSLOCK) | STOP);
}
}
接下來(lái)一段很明確是為了進(jìn)入STOP狀態(tài),如果now是true曾雕,毫不猶豫修改狀態(tài)奴烙。否則需要進(jìn)行檢查,看是否真的能馬上進(jìn)入STOP。
檢查AC切诀,還有活動(dòng)worker當(dāng)然不能進(jìn)入STOP揩环;檢查所有WorkQueue,如果還在正常執(zhí)行任務(wù)幅虑,不能進(jìn)入STOP丰滑;偶數(shù)WorkQueue的qlock置為負(fù),攔截從外部提交任務(wù)倒庵。
int pass = 0; // 3 passes to help terminate
for (long oldSum = 0L;;) { // or until done or stable
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
long checkSum = ctl;
//1
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
(ws = workQueues) == null || (m = ws.length - 1) <= 0) {
if ((runState & TERMINATED) == 0) {
rs = lockRunState(); // done
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
synchronized (this) { notifyAll(); } // for awaitTermination
}
break;
}
//2
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
checkSum += w.base;
w.qlock = -1; // try to disable
if (pass > 0) {
w.cancelAll(); // clear queue
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try { // unblock join
wt.interrupt();
} catch (Throwable ignore) {
}
}
if (w.scanState < 0)
U.unpark(wt); // wake up
}
}
}
}
if (checkSum != oldSum) { // unstable
oldSum = checkSum;
pass = 0;
}
else if (pass > 3 && pass > m) // can't further help
break;
else if (++pass > 1) { // try to dequeue
long c; int j = 0, sp; // bound attempts
while (j++ <= m && (sp = (int)(c = ctl)) != 0)
tryRelease(c, ws[sp & m], AC_UNIT);
}
}
最后執(zhí)行真正終止ForkJoinPool的運(yùn)作褒墨。
mark1是循環(huán)的跳出條件,檢查TC和WorkQueue[]擎宝,如果為零或者為空郁妈,表示該停的都停了,進(jìn)入TERMINATED绍申。
mark2遍歷所有WorkQueue噩咪,pass記錄次數(shù),每次遍歷會(huì)執(zhí)行不同的操作:
- 第一次:qlock設(shè)置為負(fù)數(shù)失晴;
- 第二次:取消WorkQueue里所有任務(wù)剧腻,釋放棧頂WorkQueue;
- 第三次:如果有歸屬的worker涂屁,中斷并解鎖線程书在。
異常處理
ForkJoinPool正常流程講完了,再補(bǔ)充講下異常處理拆又。
出現(xiàn)的異常我們無(wú)辦法直接在主線程捕獲儒旬,所以ForkJoinTask提供了isCompletedAbnormally檢查任務(wù)狀態(tài),并且可以通過(guò)任務(wù)的getException方法獲取異常帖族。
if (task.isCompletedAbnormally) {
println(task.exception.message)
}
任務(wù)執(zhí)行doExec出現(xiàn)異常時(shí)栈源,會(huì)調(diào)用setExceptionalCompletion,里面繼續(xù)調(diào)用了recordExceptionalCompletion竖般。
final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this) // already present
break;
}
} finally {
lock.unlock();
}
s = setCompletion(EXCEPTIONAL);
}
return s;
}
ExceptionNode保存了異常任務(wù)和異常信息甚垦,由一個(gè)ExceptionNode數(shù)組(exceptionTable)統(tǒng)一保存,ExceptionNode之間通過(guò)next構(gòu)成一條鏈涣雕。
獲取任務(wù)的異常使用getException艰亮,方法里判斷狀態(tài)后繼續(xù)調(diào)用getThrowableException。CANCELLED狀態(tài)的異常直接創(chuàng)建CancellationException挣郭,和EXCEPTIONAL狀態(tài)的流程不同迄埃。
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
getThrowableException的代碼不貼了,它從exceptionTable取出任務(wù)的異常信息并返回兑障。里面對(duì)拋出異常線程不是當(dāng)前線程這種情況進(jìn)行了處理侄非,為了得到更準(zhǔn)確的結(jié)果蕉汪,會(huì)讓當(dāng)前線程使用反射創(chuàng)建一樣的異常返回。
后記
本文僅過(guò)一遍流程代碼逞怨,很多設(shè)計(jì)思想沒(méi)有也很難寫清楚者疤,多看代碼注釋吧。ForkJoinPool細(xì)節(jié)復(fù)雜骇钦,文里肯定有很多錯(cuò)漏宛渐,望指正,謝謝眯搭。