1树肃、ForkJoinPool簡(jiǎn)介
ForkJoinPool運(yùn)用fork-join的原理丐吓,使用分而治之的思想,將大任務(wù)進(jìn)行拆分柿汛,直到拆分成無法可再拆分的最小單元冗酿,并將拆分后的任務(wù)分配給多線程執(zhí)行,最終再將執(zhí)行結(jié)果進(jìn)行join络断。同時(shí)利用工作竊取算法裁替,使得任務(wù)能及時(shí)被空閑線程處理。故ForkJoinPool適于可將大任務(wù)分割成類似的小任務(wù)的場(chǎng)景貌笨。
1.1弱判、ForkJoinPool類繼承結(jié)構(gòu)
ForkJoinPool也繼承于AbstractExecutorService抽象類,實(shí)現(xiàn)ExecutorService相關(guān)接口锥惋。
1.2昌腰、ForkJoinPool體系主要類簡(jiǎn)介
ForkJoinTask:提交到ForkJoinPool中的任務(wù),在使用時(shí)主要有三個(gè)實(shí)現(xiàn)膀跌。RecursiveTask:可以遞歸執(zhí)行的ForkJoinTask遭商;RecursiveAction:無返回值的RecursiveTask;CountedCompleter:執(zhí)行完成后捅伤,觸發(fā)自定義鉤子函數(shù)劫流。
ForkJoinWorkerThread:運(yùn)行 ForkJoinTask 任務(wù)的工作線程。每個(gè)ForkJoinWorkerThread都關(guān)聯(lián)其所屬的ForkJoinPool及其工作隊(duì)列WorkQueue丛忆。
WorkQueue:任務(wù)隊(duì)列祠汇,支持LIFO的棧式操作和FIFO的隊(duì)列操作。
2蘸际、ForkJoinTask源碼解析
ForkJoinTask將任務(wù)fork成足夠小的任務(wù)座哩,并發(fā)解決這些小任務(wù),然后將這些小任務(wù)結(jié)果join粮彤。這種思想充分利用了CPU的多核系統(tǒng)根穷,使得CPU的利用率得到大幅度提升姜骡,減少了任務(wù)執(zhí)行時(shí)間。通常我們會(huì)利用ForkJoinTask的fork方法來分割任務(wù)屿良,利用join方法來合并任務(wù)圈澈。
2.1、ForkJoinTask任務(wù)狀態(tài)
statue為ForkJoinTask的狀態(tài)尘惧,其初始狀態(tài)為0康栈,標(biāo)識(shí)正則處理任務(wù)狀態(tài);NORMAL:標(biāo)識(shí)任務(wù)正常結(jié)束喷橙;CANCELLED:標(biāo)識(shí)任務(wù)被取消啥么;EXCEPTIONAL:標(biāo)識(shí)任務(wù)執(zhí)行異常;SIGNAL:表示有依賴當(dāng)前任務(wù)結(jié)果的任務(wù)贰逾,需要執(zhí)行完成后進(jìn)行通知悬荣。
//任務(wù)狀態(tài),初始值為0
volatile int status; // accessed directly by pool and workers
// 任務(wù)狀態(tài)的掩碼
static final int DONE_MASK = 0xf0000000;
// 正常狀態(tài)疙剑,負(fù)數(shù)氯迂,標(biāo)識(shí)任務(wù)已經(jīng)完成
static final int NORMAL = 0xf0000000;
// 任務(wù)取消,非負(fù)言缤,<NORMAL
static final int CANCELLED = 0xc0000000;
// 任務(wù)異常嚼蚀,非負(fù),<CANCELLED
static final int EXCEPTIONAL = 0x80000000;
// 通知狀態(tài)管挟,>= 1<<16轿曙,有其他任務(wù)依賴當(dāng)前任務(wù),任務(wù)結(jié)束前哮独,通知其他任務(wù)join當(dāng)前任務(wù)的結(jié)果拳芙。
static final int SIGNAL = 0x00010000;
// 低位掩碼
static final int SMASK = 0x0000ffff;
2.2、主要方法實(shí)現(xiàn)
ForkJoinTask的主要方法有異步執(zhí)行方法fork()皮璧,獲取結(jié)果方法join()舟扎,執(zhí)行任務(wù)方法invoke系列,其他獲取狀態(tài)即結(jié)果等方法悴务。
fork()源碼解析:
public final ForkJoinTask<V> fork() {
Thread t;
//如果線程類型為ForkJoinWorkerThread睹限,則將任務(wù)推入workQueue進(jìn)行處理,
//否則,交由ForkJoinPool的common線程池進(jìn)行處理
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
join()源碼解析:
public final V join() {
int s;
//調(diào)用doJoin()進(jìn)行任務(wù)的執(zhí)行讯檐,若任務(wù)結(jié)果為非正常完成羡疗,則根據(jù)狀態(tài)拋出不同的異常,
//如若狀態(tài)為CANCELLED别洪,則拋出CancellationException()叨恨,異常;
//若狀態(tài)為EXCEPTIONAL挖垛,則拋出包裝后的異常
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//1痒钝、若任務(wù)狀態(tài)為正常完成(status < 0)秉颗,則返回任務(wù)的正常完成狀態(tài);
//2送矩、若執(zhí)行任務(wù)的當(dāng)前線程類型為ForkJoinWorkerThread蚕甥,且將任務(wù)從線程的工作隊(duì)列中移除成功,
//則調(diào)用doExec()執(zhí)行任務(wù)栋荸,若任務(wù)執(zhí)行狀態(tài)為正常結(jié)束菇怀,則返回狀態(tài),否則awaitJoin()等待任務(wù)結(jié)束晌块。
//3爱沟、否則調(diào)用externalAwaitDone()等待任務(wù)執(zhí)行完成。
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
final int doExec() {
int s; boolean completed;
//任務(wù)未完成匆背?
if ((s = status) >= 0) {
try {
//執(zhí)行任務(wù)
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
//設(shè)置任務(wù)狀態(tài)為正常完成
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
private int externalAwaitDone() {
//任務(wù)處理
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
//等待任務(wù)完成
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
//任務(wù)完成后通知其他依賴的任務(wù)
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
invoke系列方法源碼解析:
public final V invoke() {
int s;
//執(zhí)行任務(wù)并返回狀態(tài)钥顽,處理同doJoin()類似
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
//執(zhí)行任務(wù)并獲取任務(wù)狀態(tài),狀態(tài)<0表示正常完成靠汁,否則等待任務(wù)完成
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
//處理兩個(gè)任務(wù)
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
//提交任務(wù)t2,交由線程池執(zhí)行
t2.fork();
//執(zhí)行任務(wù)t1并獲取直接結(jié)果的任務(wù)狀態(tài)
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
//獲取任務(wù)t2的直接結(jié)果狀態(tài)
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
//處理多個(gè)任務(wù)
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = tasks[i];
//任務(wù)為空則跑NPE異常
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
//非最后一個(gè)任務(wù)闽铐,則推入線程池執(zhí)行
else if (i != 0)
t.fork();
//最后一個(gè)任務(wù)直接調(diào)用doInvoke()執(zhí)行
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
//遍歷任務(wù)蝶怔,獲取任務(wù)執(zhí)行結(jié)果
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
//若有某個(gè)任務(wù)執(zhí)行有異常,則取消所有任務(wù)
if (ex != null)
t.cancel(false);
//獲取任務(wù)執(zhí)行結(jié)果兄墅,若結(jié)果非正常結(jié)束踢星,獲取異常結(jié)果
else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
}
獲取任務(wù)狀態(tài)及執(zhí)行結(jié)果等方法:
方法名 | 說明 |
---|---|
cancel | 取消任務(wù) |
isDone | 判斷任務(wù)是否正常完成 |
isCancelled | 判斷任務(wù)是否已取消 |
isCompletedAbnormally | 判斷任務(wù)是否非正常完成,如被取消或任務(wù)執(zhí)行異常等 |
isCompletedNormally | 判斷任務(wù)是否執(zhí)行正常完成隙咸,及任務(wù)狀態(tài)是否為NORMAL |
getException | 獲取任務(wù)執(zhí)行的異常結(jié)果 |
completeExceptionally | 將任務(wù)狀態(tài)設(shè)置為異常沐悦,并設(shè)置異常結(jié)果 |
complete | 將任務(wù)設(shè)置正常結(jié)束,并設(shè)置任務(wù)執(zhí)行結(jié)果 |
get | 獲取任務(wù)執(zhí)行結(jié)果五督,若任務(wù)取消或異常藏否,則拋出異常;否則返回任務(wù)執(zhí)行結(jié)果 |
3充包、WorkQueue源碼詳解
WorkQueue為ForkJoinPool的工作隊(duì)列副签,其封裝提交的任務(wù)ForkJoinTask、線程池ForkJoinPool基矮、執(zhí)行線程ForkJoinWorkerThread淆储、及其他任務(wù)相關(guān)數(shù)據(jù)等。
3.1家浇、主要屬性說明
//
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor
int nsteals; // number of steals
int hint; // randomization and stealer index hint
int config; // pool index and mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
scanState:如果WorkQueue沒有屬于自己的owner(下標(biāo)為偶數(shù)的都沒有),該值為 inactive 也就是一個(gè)負(fù)數(shù)本砰;如果有自己的owner,該值的初始值為其在WorkQueue[]數(shù)組中的下標(biāo)钢悲,也肯定是個(gè)奇數(shù)点额;如果這個(gè)值舔株,變成了偶數(shù),說明該隊(duì)列所屬的Thread正在執(zhí)行Task咖楣。
stackPred:前任池(WorkQueue[])索引督笆,由此構(gòu)成一個(gè)棧;
config:index | mode诱贿。 如果下標(biāo)為偶數(shù)的WorkQueue,則其mode是共享類型娃肿。如果有自己的owner 默認(rèn)是 LIFO叶骨;
**qlock: **鎖標(biāo)識(shí),在多線程往隊(duì)列中添加數(shù)據(jù)像寒,會(huì)有競(jìng)爭(zhēng),使用此標(biāo)識(shí)搶占鎖琅翻。1: locked, < 0: terminate; else 0
base:worker steal的偏移量,因?yàn)槠渌木€程都可以偷該隊(duì)列的任務(wù),所有base使用volatile標(biāo)識(shí)焙蹭。
top:owner執(zhí)行任務(wù)的偏移量晒杈。
parker:如果 owner 掛起,則使用該變量做記錄掛起owner的線程孔厉。
**currentJoin: **當(dāng)前正在join等待結(jié)果的任務(wù)拯钻。
currentSteal:當(dāng)前執(zhí)行的任務(wù)是steal過來的任務(wù),該變量做記錄撰豺。
3.2污桦、主要方法說明
入隊(duì)方法:
//工作線程將任務(wù)提交到其對(duì)應(yīng)的工作隊(duì)列中
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
//計(jì)算放置任務(wù)的位置,并將任務(wù)保存到隊(duì)列中
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
//修改頂部數(shù)據(jù)
U.putOrderedInt(this, QTOP, s + 1);
//
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
//隊(duì)列已滿,則進(jìn)行擴(kuò)容
else if (n >= m)
growArray();
}
}
出隊(duì)方法:
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
if ((a = array) != null && (m = a.length - 1) >= 0) {
for (int s; (s = top - 1) - base >= 0;) {
long j = ((m & s) << ASHIFT) + ABASE;
//數(shù)組下班j處有任務(wù),則cas獲取獲取任務(wù),并修改top值
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s);
return t;
}
}
}
return null;
}
//從base到top獲取任務(wù)
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
while ((b = base) - top < 0 && (a = array) != null) {
//獲取base數(shù)據(jù)偏移量
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
//獲取base處任務(wù)
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
//base未被改動(dòng)?
if (base == b) {
if (t != null) {
//cas更改base處的數(shù)據(jù)螃概,同時(shí)base+1
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
//任務(wù)全部取完吊洼?
else if (b + 1 == top) // now empty
break;
}
}
return null;
}
final ForkJoinTask<?> peek() {
ForkJoinTask<?>[] a = array; int m;
if (a == null || (m = a.length - 1) < 0)
return null;
//判斷任務(wù)隊(duì)列是FIFO或LILF冒窍?
int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
int j = ((i & m) << ASHIFT) + ABASE;
//獲取指定頂部或底部的任務(wù)
return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
}
執(zhí)行任務(wù):
final void execLocalTasks() {
int b = base, m, s;
ForkJoinTask<?>[] a = array;
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
//隊(duì)列類型為FIFO?
if ((config & FIFO_QUEUE) == 0) {
//遍歷任務(wù)并執(zhí)行
for (ForkJoinTask<?> t;;) {
if ((t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)
break;
U.putOrderedInt(this, QTOP, s);
t.doExec();
if (base - (s = top - 1) > 0)
break;
}
}
else
pollAndExecAll();
}
}
//執(zhí)行任務(wù)
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
//設(shè)置WorkQueue狀態(tài)為執(zhí)行任務(wù)狀態(tài)
scanState &= ~SCANNING; // mark as busy
//執(zhí)行竊取的任務(wù)
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
//執(zhí)行所有本地
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
4桩了、ForkJoinWorkerThread源碼解析
ForkJoinWorkerThread為ForkJoinPool的運(yùn)行線程實(shí)現(xiàn)類,其關(guān)聯(lián)了對(duì)應(yīng)的ForkJoinPool及WorkQueue。
構(gòu)造函數(shù):
//構(gòu)造函數(shù)
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
執(zhí)行任務(wù)的鉤子函數(shù):
//線程啟動(dòng)時(shí)的鉤子函數(shù)
protected void onStart() {
}
//線程結(jié)束的鉤子函數(shù)
protected void onTermination(Throwable exception) {
}
執(zhí)行任務(wù):
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
//線程開始鉤子
onStart();
//執(zhí)行任務(wù)
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
//線程終止鉤子
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
//ForkJoinPool#runWorker()
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
//掃描任務(wù)洁段,并執(zhí)行任務(wù)
if ((t = scan(w, r)) != null)
w.runTask(t);
//等待竊取任務(wù)
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
5、ForkJoinPool源碼詳解
5.1除嘹、主要屬性
// Bounds
//低16位掩碼,索引的最大位數(shù)尉咕,
static final int SMASK = 0xffff; // short bits == max index
//工作線程的最大容量
static final int MAX_CAP = 0x7fff; // max #workers - 1
//偶數(shù)低位掩碼
static final int EVENMASK = 0xfffe; // even short bits
//偶數(shù)槽位數(shù),最多64個(gè)偶數(shù)槽位(0x007e = 0111 1110,有效的是中間6個(gè)1的位置悔捶,111111 = 63铃慷,再加上000000(0槽位)蜕该,總共64個(gè))
static final int SQMASK = 0x007e; // max 64 (even) slots
// Masks and units for WorkQueue.scanState and ctl sp subfield
//WorkQueue的狀態(tài):正在掃描任務(wù)
static final int SCANNING = 1; // false when running tasks
//WorkQueue的狀態(tài):非活動(dòng)狀態(tài)
static final int INACTIVE = 1 << 31; // must be negative
//版本號(hào)(防止CAS的ABA問題)
static final int SS_SEQ = 1 << 16; // version count
// Mode bits for ForkJoinPool.config and WorkQueue.config
//模式掩碼
static final int MODE_MASK = 0xffff << 16; // top half of int
//任務(wù)隊(duì)列模式為L(zhǎng)IFO
static final int LIFO_QUEUE = 0;
//任務(wù)隊(duì)列模式為FIFO
static final int FIFO_QUEUE = 1 << 16;
//任務(wù)隊(duì)列模式為共享模式
static final int SHARED_QUEUE = 1 << 31; // must be negative
//線程工廠類
public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;
//默認(rèn)的公共線程池
static final ForkJoinPool common;
//并行度
static final int commonParallelism;
//最大備用線程數(shù)
private static int commonMaxSpares;
//線程變化序列號(hào)
private static int poolNumberSequence;
//ctl的低32位掩碼
private static final long SP_MASK = 0xffffffffL;
//ctl的高32位掩碼
private static final long UC_MASK = ~SP_MASK;
// Active counts
//活躍線程的計(jì)算shift
private static final int AC_SHIFT = 48;
//活躍線程的最小單位
private static final long AC_UNIT = 0x0001L << AC_SHIFT;
//活躍線程數(shù)的掩碼
private static final long AC_MASK = 0xffffL << AC_SHIFT;
// Total counts
//工作線程shift
private static final int TC_SHIFT = 32;
//工作線程的最小單元
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
//工作線程掩碼
private static final long TC_MASK = 0xffffL << TC_SHIFT;
//創(chuàng)建工作線程的標(biāo)記
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
//線程池狀態(tài)
//鎖定
private static final int RSLOCK = 1;
//通知
private static final int RSIGNAL = 1 << 1;
//開始
private static final int STARTED = 1 << 2;
//停止
private static final int STOP = 1 << 29;
//終止
private static final int TERMINATED = 1 << 30;
//關(guān)閉
private static final int SHUTDOWN = 1 << 31;
// Instance fields
//線程池主控參數(shù)
volatile long ctl; // main pool control
//線程池運(yùn)行狀態(tài)
volatile int runState; // lockable status
//并行度|模式
final int config; // parallelism, mode
//用于生成線程池的索引
int indexSeed; // to generate worker index
//工作隊(duì)列池
volatile WorkQueue[] workQueues; // main registry
//線程工廠
final ForkJoinWorkerThreadFactory factory;
//工作線程異常處理
final UncaughtExceptionHandler ueh; // per-worker UEH
//工作線程名稱的前綴
final String workerNamePrefix; // to create worker name string
//偷取任務(wù)的總數(shù)
volatile AtomicLong stealCounter; // also used as sync monitor
5.2股囊、狀態(tài)說明
ctl參數(shù)說明:
字段ctl是ForkJoinPool的核心狀態(tài),它是一個(gè)64位的long類型數(shù)值祭务,包含4個(gè)16位子字段:
- AC: 活動(dòng)的工作線程數(shù)量減去目標(biāo)并行度(目標(biāo)并行度:最大的工作線程數(shù)量内狗,所以AC一般是負(fù)值,等于0時(shí)义锥,說明活動(dòng)線程已經(jīng)達(dá)到飽和了)
- TC: 總的工作線程數(shù)量總數(shù)減去目標(biāo)并行度(TC一般也是負(fù)值柳沙,等于0時(shí),說明總的工作線程已經(jīng)飽和拌倍,并且赂鲤,AC一般小于等于TC)
- SS: 棧頂工作線程狀態(tài)和版本數(shù)(每一個(gè)線程在掛起時(shí)都會(huì)持有前一個(gè)等待線程所在工作隊(duì)列的索引,由此構(gòu)成一個(gè)等待的工作線程棧柱恤,棧頂是最新等待的線程数初,第一位表示狀態(tài)1.不活動(dòng) 0.活動(dòng),后15表示版本號(hào)梗顺,標(biāo)識(shí)ID的版本-最后16位)泡孩。
- ID: 棧頂工作線程所在工作隊(duì)列的池索引。
runState狀態(tài)說明:
- STARTED 1
- STOP 1 << 1
- TERMINATED 1<<2
- SHUTDOWN 1<<29
- RSLOCK 1<<30
- RSIGNAL 1<<31
runState記錄了線程池的運(yùn)行狀態(tài)寺谤,特別地仑鸥,除了SHUTDOWN是負(fù)數(shù)外,其他值都是正數(shù)变屁,RSLOCK和RSIGNAL是跟鎖相關(guān)眼俊。
5.3、主要方法
5.3.1粟关、構(gòu)造方法
//parallelism:并行度
//factory:線程工廠泵琳;
//handler:異常處理
//asyncMode:隊(duì)列模式,true:FIFO,false:LIFO
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
5.3.2获列、提交任務(wù)
invoke/execute/submit任務(wù)提交:
//提交任務(wù)并等待任務(wù)執(zhí)行完成谷市,然后返回執(zhí)行結(jié)果
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
//只提交任務(wù)
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
//提交任務(wù)并返回任務(wù),F(xiàn)orkJoinTask可獲取任務(wù)的異步執(zhí)行結(jié)果
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
提交任務(wù)主要有三中方法击孩,invoke()迫悠,execute(),submit()巩梢,它們最終都是調(diào)用externalPush()進(jìn)行處理创泄,都屬于外部提交,置于偶數(shù)索引的工作隊(duì)列括蝠。
externalPush()添加任務(wù):
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
//探針值鞠抑,用于計(jì)算WorkQueue槽位索引
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null //線程池不為空
&& (m = (ws.length - 1)) >= 0 //線程池長(zhǎng)度大于0
&& (q = ws[m & r & SQMASK]) != null //獲取偶數(shù)槽位的WorkQueue
&& r != 0 && rs > 0 //探針值不為0
&&U.compareAndSwapInt(q, QLOCK, 0, 1)) { //加鎖
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null //線程的任務(wù)隊(duì)列不為空
&&(am = a.length - 1) > (n = (s = q.top) - q.base)) { //任務(wù)數(shù)組長(zhǎng)度大于數(shù)組中的任務(wù)個(gè)數(shù),則無需擴(kuò)容
int j = ((am & s) << ASHIFT) + ABASE; //計(jì)算任務(wù)的位置索引
U.putOrderedObject(a, j, task);//將任務(wù)放入任務(wù)數(shù)組中
U.putOrderedInt(q, QTOP, s + 1); //設(shè)置top為top+1
U.putIntVolatile(q, QLOCK, 0); //解鎖
//若之前的任務(wù)數(shù)<=1忌警,則此槽位的線程可能在等待搁拙,同時(shí)可能其他槽位的線程也在等,此時(shí)需要喚醒線程來執(zhí)行任務(wù)
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0); //添加任務(wù)失敗法绵,則解鎖
}
//若if條件中有不滿足的箕速,或是添加任務(wù)失敗,則通過externalSubmit()來添加任務(wù)
externalSubmit(task);
}
signalWork()喚醒worker線程:
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { //活躍線程數(shù)太少朋譬,則創(chuàng)建工作線程
if ((sp = (int)c) == 0) { //無空閑線程盐茎?
// (c & ADD_WORKER) != 0L,說明TC的最高位為1徙赢,為負(fù)值字柠,而TC = 總的線程數(shù) - 并行度 < 0,
// 表示總的線程數(shù) < 并行度狡赐,說明工作線程的個(gè)數(shù)還很少
if ((c & ADD_WORKER) != 0L)
tryAddWorker(c); //嘗試添加線程
break;
}
//未開始或已停止
if (ws == null) // unstarted/terminated
break;
// 空閑線程棧頂端線程的所屬工作隊(duì)列索引(正常來講募谎,應(yīng)該小于WorkQueue[]的長(zhǎng)度的)
if (ws.length <= (i = sp & SMASK)) // terminated
break;
//正則終止?
if ((v = ws[i]) == null) // terminating
break;
// 作為下一個(gè)scanState待更新的值(增加了版本號(hào)阴汇,并且調(diào)整為激活狀態(tài))
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
// 如果d為0,則說明scanState還為更新過节槐,然后才考慮CAS ctl
int d = sp - v.scanState; // screen CAS
// 下一個(gè)ctl的值搀庶,AC + 1 | 上一個(gè)等待線程的索引
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
// 如果線程阻塞了,喚醒它
if ((p = v.parker) != null)
U.unpark(p);
break;
}
// 沒有任務(wù)铜异,直接退出
if (q != null && q.base == q.top) // no more work
break;
}
}
externalSubmit()添加任務(wù):
externalSubmit()包含了三方面的操作:
- 若線程未初始化哥倔,則初始化線程池,長(zhǎng)度是2的冪次方揍庄;
- 若選中的槽位位空咆蒿,則初始化一個(gè)共享模式的工作隊(duì)列;
- 若選中槽位不為空,則獲取任務(wù)隊(duì)列沃测,并將任務(wù)提交到任務(wù)隊(duì)列缭黔,成功則喚醒沉睡的線程;若失敗則專業(yè)槽位蒂破。
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
//初始化當(dāng)前線程的探針值馏谨,用于計(jì)算WorkQueue的索引
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
//線程池已經(jīng)關(guān)閉?
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//1附迷、線程池狀態(tài)為還未初始化惧互?;
//2喇伯、線程池為空喊儡?
//3、線程池中工作線程數(shù)為0稻据?
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null ||
(m = ws.length - 1) < 0)) {
int ns = 0;
//加鎖
rs = lockRunState();
try {
//加鎖后再次判斷線程池狀態(tài)艾猜,避免重復(fù)初始化
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
//保證n是2的冪次方
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
//解鎖
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//獲取隨機(jī)偶數(shù)槽位的WorkQueue
else if ((q = ws[k = r & m & SQMASK]) != null) {
//對(duì)WorkQueue加鎖
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
//若WorkQueue的任務(wù)隊(duì)列為空,則初始化任務(wù)隊(duì)列(growArray)
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
//計(jì)算任務(wù)索引的下標(biāo)
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
//解鎖
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//喚醒掛起的線程
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
//在未加鎖的情況下攀甚,創(chuàng)建新線程
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
//共享模式
q.config = k | SHARED_QUEUE;
//未激活
q.scanState = INACTIVE;
//加鎖
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
//釋放鎖
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
5.3.3箩朴、執(zhí)行任務(wù)
runWorker()是在ForkJoinWorkerThread的run()方法中調(diào)用,即在啟動(dòng)worker線程調(diào)用的秋度。其主要工作是獲取任務(wù)并執(zhí)行任務(wù)炸庞,若線程池關(guān)閉,則等待任務(wù)隊(duì)列的任務(wù)執(zhí)行完成并退出荚斯。
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
//掃描任務(wù)
if ((t = scan(w, r)) != null)
//工作線程執(zhí)行任務(wù)
w.runTask(t);
//沒有任務(wù)執(zhí)行則等待
else if (!awaitWork(w, r))
break;
//隨機(jī)值更新
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
scan()掃描任務(wù):
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
//線程池不為空埠居?
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
//槽位k不為空,嘗試從該任務(wù)隊(duì)列里獲取任務(wù)
if ((q = ws[k]) != null) {
//有任務(wù)
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
//數(shù)組地址
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
//獲取任務(wù)并更新base等索引信息
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
//通知其他線程
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
//設(shè)置WorkQueue的狀態(tài)
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
//未掃描到任務(wù)事期,準(zhǔn)備inactive此工作隊(duì)列
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
awaitWork()為等待任務(wù)滥壕。若工作線程未獲取到任務(wù),則會(huì)執(zhí)行此方法兽泣。
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w已經(jīng)終止绎橘,返回false,不再掃描任務(wù)
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
if ((ss = w.scanState) >= 0) // 如果已經(jīng)active唠倦,跳出称鳞,返回true,繼續(xù)掃描任務(wù)
break;
else if (spins > 0) { // 如果spins > 0稠鼻,自旋等待
r ^= r << 6;
r ^= r >>> 21;
r ^= r << 7;
if (r >= 0 && --spins == 0) { // 隨機(jī)消耗自旋次數(shù)
WorkQueue v;
WorkQueue[] ws;
int s, j;
AtomicLong sc;
if (pred != 0 // 除了自己冈止,還有等待的線程-工作隊(duì)列
&& (ws = workQueues) != null // 線程池還在
&& (j = pred & SMASK) < ws.length // 前任索引還在池范圍內(nèi)
&& (v = ws[j]) != null // 前任任務(wù)隊(duì)列還在
&& (v.parker == null || v.scanState >= 0)) // 前任線程已經(jīng)喚醒,且工作隊(duì)列已經(jīng)激活
spins = SPINS; // 上面的一系列判斷表明候齿,很快就有任務(wù)了熙暴,先不park闺属,繼續(xù)自旋
}
} else if (w.qlock < 0) // 自旋之后,再次檢查工作隊(duì)列是否終止周霉,若是掂器,退出掃描
return false;
else if (!Thread.interrupted()) { // 如果線程中斷了,清除中斷標(biāo)記诗眨,不考慮park唉匾,否則進(jìn)入該分支
long c, prevctl, parkTime, deadline;
int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK); // 計(jì)算活躍線程的個(gè)數(shù)
if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // 線程池正在終止,退出掃描
return false;
if (ac <= 0 && ss == (int) c) { // 自己是棧頂?shù)却? prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); // 設(shè)置為前一次的ctl
int t = (short) (c >>> TC_SHIFT); // 總的線程數(shù)
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) // 總線程數(shù)過多匠楚,直接退出掃描
return false;
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); // 計(jì)算等待時(shí)間
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
} else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // 加鎖
w.parker = wt; // 設(shè)置parker巍膘,準(zhǔn)備阻塞
if (w.scanState < 0 && ctl == c) // 阻塞前再次檢查狀態(tài)
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null); // 喚醒后,置空parker
U.putObject(wt, PARKBLOCKER, null); // 解鎖
if (w.scanState >= 0) // 已激活芋簿,跳出繼續(xù)掃描
break;
if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L
&& U.compareAndSwapLong(this, CTL, c, prevctl)) // 超時(shí)峡懈,未等到任務(wù),跳出与斤,不再執(zhí)行掃描任務(wù)肪康,削減工作線程
return false; // shrink pool
}
}
return true;
}