一珊佣、ForkJoinPool
ForkJoinPool 是 JDK7 引入的咒锻,由 Doug Lea 編寫的高性能線程池。核心思想是將大的任務拆分成多個小任務(即fork)滨巴,然后在將多個小任務處理匯總到一個結(jié)果上(即join)恭取,非常像MapReduce處理原理。同時攒发,它提供基本的線程池功能,支持設置最大并發(fā)線程數(shù)紊扬,支持任務排隊,支持線程池停止玩祟,支持線程池使用情況監(jiān)控,也是AbstractExecutorService的子類转锈,主要引入了“工作竊取”機制,在多CPU計算機上處理性能更佳砌溺。其廣泛用在java8的stream中规伐。
從圖中可以看出ForkJoinPool要先執(zhí)行完子任務才能執(zhí)行上一層任務鲜棠,所以ForkJoinPool適合在有限的線程數(shù)下完成有父子關系的任務場景,比如:快速排序献联,二分查找里逆,矩陣乘法,線性時間選擇等場景诸衔,以及數(shù)組和集合的運算。
Fork/Join Pool采用優(yōu)良的設計谒亦、代碼實現(xiàn)和硬件原子操作機制等多種思路保證其執(zhí)行性能。其中包括(但不限于):計算資源共享锁摔、高性能隊列孕豹、避免偽共享巩步、工作竊取機制等。
二竟闪、與ThreadPoolExecutor原生線程池的區(qū)別
ForkJoinPool和ThreadPoolExecutor都實現(xiàn)了Executor和ExecutorService接口,都可以通過構(gòu)造函數(shù)設置線程數(shù)理朋,threadFactory嗽上,可以查看ForkJoinPool.makeCommonPool()方法的源碼查看通用線程池的構(gòu)造細節(jié)。
在內(nèi)部結(jié)構(gòu)上我覺得兩個線程池最大的區(qū)別是在工作隊列的設計上浅萧,如下圖
ThreadPoolExecutor:
ForkJoinPool:
圖上細節(jié)畫的不嚴謹棚赔,但大致能看出區(qū)別:
- ForkJoinPool每個線程都有自己的隊列
- ThreadPoolExecutor共用一個隊列
使用ForkJoinPool可以在有限的線程數(shù)下來完成非常多的具有父子關系的任務,比如使用4個線程來完成超過2000萬個任務捆毫。
ForkJoinPool最適合計算密集型任務途样,而且最好是非阻塞任務何暇,之前的一篇文章:Java踩坑記系列之線程池 也說了線程池的不同使用場景和注意事項。
所以ForkJoinPool是ThreadPoolExecutor線程池的一種補充宏胯,是對計算密集型場景的加強。
三氛赐、工作竊取的實現(xiàn)原理
ForkJoinPool類中的WorkQueue正是實現(xiàn)工作竊取的隊列,javadoc中的注釋如下:
大意是大多數(shù)操作都發(fā)生在工作竊取隊列中(在嵌套類工作隊列中)。這些是特殊形式的Deques街图,主要有push餐济,pop,poll操作胆剧。
Deque是雙端隊列(double ended queue縮寫)絮姆,頭部和尾部任何一端都可以進行插入,刪除秩霍,獲取的操作篙悯,即支持FIFO(隊列)也支持LIFO(棧)順序铃绒。
Deque接口的實現(xiàn)最常見的是LinkedList鸽照,除此還有ArrayDeque、ConcurrentLinkedDeque等颠悬。
工作竊取模式主要分以下幾個步驟:
- 1矮燎、每個線程都有自己的雙端隊列定血。
- 2、當調(diào)用fork方法時诞外,將任務放進隊列頭部澜沟,線程以LIFO順序,使用push/pop方式處理隊列中的任務峡谊。
- 3茫虽、如果自己隊列里的任務處理完后,會從其他線程維護的隊列尾部使用poll的方式竊取任務靖苇,以達到充分利用CPU資源的目的席噩。
- 4、從尾部竊取可以減少同原線程的競爭贤壁。
- 5悼枢、當隊列中剩最后一個任務時,通過cas解決原線程和竊取線程的競爭脾拆。
流程大致如下所示:
工作竊取便是ForkJoinPool線程池的優(yōu)勢所在馒索,在一般的線程池比如ThreadPoolExecutor中,如果一個線程正在執(zhí)行的任務由于某種原因無法繼續(xù)運行名船,那么該線程會處于等待狀態(tài)绰上,包括singleThreadPool、fixedThreadPool渠驼、cachedThreadPool這幾種線程池蜈块。
而在ForkJoinPool中院刁,那么線程會主動尋找其他尚未被執(zhí)行的任務然后竊取過來執(zhí)行绒净,減少線程等待時間朗若。
JDK8中的并行流(parallelStream)功能是基于ForkJoinPool實現(xiàn)的爽柒,另外還有java.util.concurrent.CompletableFuture異步回調(diào)future,內(nèi)部使用的線程池也是ForkJoinPool枣耀。
四障涯、ForkJoinPool分析
4.1 ForkJoinPool成員變量
// 用來配置ctl在控制線程數(shù)量使用
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
//控制線程池數(shù)量(ctl & ADD_WORKER) != 0L 時創(chuàng)建線程纵朋,
// 也就是當ctl的第16位不為0時厨内,可以繼續(xù)創(chuàng)建線程
volatile long ctl; // main pool control
//全局鎖控制祈秕,全局運行狀態(tài)
volatile int runState; // lockable status
//config二進制形式的低16位表示parallelism,
//config二進制形式的第高16位表示mode,1表示異步模式, 使用先進先出隊列, 0表示同步模式, 使用先進后出棧
//低16位表示workerQueue在pool中的索引雏胃,高16位表示mode, 有FIFI LIFL
final int config; // parallelism, mode
//生成workerQueue索引的重要依據(jù)
int indexSeed; // to generate worker index
//工作者隊列數(shù)組请毛,內(nèi)部線程ForkJoinWorkerThread啟動時會注冊一個WorkerQueue對象到這個數(shù)組中
volatile WorkQueue[] workQueues; // main registry
//工作者線程線程工廠,創(chuàng)建ForkJoinWorkerThread的策略
final ForkJoinWorkerThreadFactory factory;
//在線程因未捕異常而退出時瞭亮,java虛擬機將回調(diào)的異常處理策略
final UncaughtExceptionHandler ueh; // per-worker UEH
//工作者線程名的前綴
final String workerNamePrefix; // to create worker name string
//執(zhí)行器所有線程竊取的任務總數(shù)获印,也作為監(jiān)視runState的鎖
volatile AtomicLong stealCounter; // also used as sync monitor
//通用的執(zhí)行器,它在靜態(tài)塊中初始化
static final ForkJoinPool common;
五、WorkQueue
5.1 類結(jié)構(gòu)及其成員變量
5.1.1 類結(jié)構(gòu)和注釋
WorkQueue是ForkJoinPool的核心內(nèi)部類兼丰,是一個Contented修飾的靜態(tài)內(nèi)部類。
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. The @Contended annotation alerts
* JVMs to try to keep instances apart.
*/
@sun.misc.Contended
static final class WorkQueue {
}
其注釋大意為:
workQUeue是一個支持任務竊取和外部提交任務的隊列唆缴,其實現(xiàn)參考ForkJoinPool描述的算法鳍征。在大多數(shù)平臺上的性能對工作隊列及其數(shù)組的實例都非常敏感。我們不希望多個工作隊列的實例和多個隊列數(shù)組共享緩存面徽。@Contented注釋用來提醒jvm將workQueue在執(zhí)行的時候與其他對象進行區(qū)別艳丛。
@Contented,實際上就是采用內(nèi)存對齊的方式避免偽共享趟紊,保證WorkQueue在執(zhí)行的時候氮双,其前后不會有其他對象干擾。
注:JVM 添加 -XX:-RestrictContended 參數(shù)后 @sun.misc.Contended 注解才有效)
5.1.2 MAXIMUM_QUEUE_CAPACITY
MAXIMUM_QUEUE_CAPACITY注釋如下:
/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
MAXIMUM_QUEUE_CAPACITY是隊列支持的最大容量霎匈,必須是2的冪小于或等于1<<(31-數(shù)組項的寬度)戴差,但定義為一個略小于此值的值,以幫助用戶在飽和系統(tǒng)之前捕獲失控的程序铛嘱。
5.1.3 成員變量
成員變量區(qū)如下:
@sun.misc.Contended
static final class WorkQueue {
//隊列的初始容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
// 64M 隊列的最大容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor
int nsteals; // number of steals
int hint; // randomization and stealer index hint
int config; // pool index and mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
}
scanState:它可以看作是樂觀鎖的版本號暖释,另外它還有此其他功能,它為負數(shù)時墨吓,表示工作者線程非活動球匕,它為奇數(shù)是表示,正在掃描(準備竊忍妗)任務亮曹,它為偶數(shù)是表示正在執(zhí)行任務。
stackPred:表示在線程池棧當前工作線程的前驅(qū)線程的索引秘症。在喚醒線程時常用到此屬性照卦。
nsteals:表示owner線程竊取的任務數(shù)。
hint:任務竊取時的隨機定位種子历极。
config:低16位表示窄瘟,當前WorkerQueue對象在外部類的數(shù)組屬性workQueues中的索引(下標) 。高16位表示當前WorkerQueue對象的模式趟卸。對于內(nèi)部任務蹄葱,若構(gòu)造方法配置為異步模式就將WorkQueue當作先進先出的隊列,反之將WorkQueue當作后進先出的棧锄列。對于外部任務图云,將WorkQueue視為共享隊列。
qlock:初始值為0邻邮,”=1“時表示當前WorkerQueue對象被鎖住竣况,” < 0“時 表示當前WorkerQueue對象已終止,隊列中的其他未完成任務將不再被執(zhí)行筒严。
base:表示下次對任務數(shù)組array進行poll出隊操作(竊取任務)的槽位索引(隊尾)丹泉。
top:表示下次任務數(shù)組array進行push入棧操作(添加任務)的槽位索引(棧頂)情萤。
array:非學重要的屬性,這用是保存任務的數(shù)組(容器)摹恨。
pool:與之關聯(lián)的ForkJoinPool執(zhí)行器,它可能為空筋岛。若為空,就使用靜態(tài)變量common作為執(zhí)行器晒哄。
owner:當前隊列對應的工作者線程睁宰,它一般不為空。若從外部提交任務時寝凌,當前WorkerQueue對象表示共享隊列柒傻,owner為空。
parker:阻塞的線程较木。在被阻塞的時候红符,它等于owner,其他時候它為空劫映。
currentJoin:表示當前正在join的任務违孝,主要在awaitJoin方法使用。
currentSteal:表示當前被竊取的任務泳赋,主要在helpStealer方法中使用雌桑。
5.2 構(gòu)造函數(shù)
WorkQueue就一個構(gòu)造函數(shù):
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
在這個構(gòu)造函數(shù)中,只會指定pool和owoner祖今,如果該隊列是共享隊列校坑,那么owoner此時是空的。此外千诬,base和top兩個指針分別都指向了數(shù)組的中值耍目,這個值是初始化容量右移一位。
那么結(jié)合前面的代碼徐绑,實際上初始化的時候邪驮,數(shù)組的長度為8192,那么base=top=4096傲茄。
這個數(shù)組在構(gòu)造函數(shù)被調(diào)用之后初始化如下:
5.3 重要的方法
5.3.1 push
當ForkJoinWorkerThread需要向雙端隊列中放入一個新的待執(zhí)行子任務時毅访,會調(diào)用WorkQueue中的push方法。來看看這個方法的主要執(zhí)行過程(請注意盘榨,源代碼來自JDK1.8喻粹,它和JDK1.7中的實現(xiàn)有顯著不同):
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
// 請注意,在執(zhí)行task.fork時草巡,觸發(fā)push情況下守呜,array不會為null
// 因為在這之前workqueue中的array已經(jīng)完成了初始化(在工作線程初始化時就完成了)
if ((a = array) != null) { // ignore if queue removed
//m為最高為位置的index
int m = a.length - 1; // fenced write for task visibility
// U常量是java底層的sun.misc.Unsafe操作類
// 這個類提供硬件級別的原子操作
// putOrderedObject方法在指定的對象a中,指定的內(nèi)存偏移量的位置,賦予一個新的元素
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
// putOrderedInt方法對當前指定的對象中的指定字段查乒,進行賦值操作
// 這里的代碼意義是將workQueue對象本身中的top標示的位置 + 1弥喉,
U.putOrderedInt(this, QTOP, s + 1);
//如果n小于等于1則 且poll不為空 則觸發(fā)worker竊取或者產(chǎn)生新的worker
if ((n = s - b) <= 1) {
if ((p = pool) != null)
// signalWork方法的意義在于,在當前活動的工作線程過少的情況下侣颂,創(chuàng)建新的工作線程
p.signalWork(p.workQueues, this);
}
//如果n大于等于了m 則說明需要擴容了, array的剩余空間不夠了
else if (n >= m)
growArray();
}
}
這個push方法是提供給工作隊列自己push任務來使用的档桃,共享隊列push任務是在外部externalPush和externalSubmit等方法來進行初始化和push。
這里需要注意的是憔晒,當隊列中的任務數(shù)小于1的時候,才會調(diào)用signalWork蔑舞,這個地方一開始并不理解拒担,實際上,我們需要注意的是攻询,這個方法是專門提供給工作隊列來使用的从撼,那么這個條件滿足的時候,說明工作隊列空閑钧栖。如果這個條件不滿足低零,那么工作隊列中有很多任務需要工作隊列來處理,就不會觸發(fā)對這個隊列的竊取操作拯杠。
5.3.2 growArray
這是擴容的方法掏婶。實際上這個方法有兩個作用,首先是初始化潭陪,其次是判斷雄妥,是否需要擴容,如果需要擴容則容量加倍依溯。
/**
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
*/
final ForkJoinTask<?>[] growArray() {
//舊的數(shù)組 oldA
ForkJoinTask<?>[] oldA = array;
//如果oldA不為空老厌,則size就為oldA的長度*2,反之說明數(shù)組沒有被初始化黎炉,那么長度就應該為初始化的長度8192
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
//如果size比允許的最大容量還大枝秤,那么此時會拋出異常
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
//array a 為根據(jù)size new出來的一個新的數(shù)組
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
//如果oldA不為空且其長度大于等于0為有效數(shù)組,且top-base大于0 說明不為空
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
//按size定義掩碼
int mask = size - 1;
//從舊的數(shù)組中poll全部task慷嗜,然后push到新的array中
do { // emulate poll from old array, push to new array
ForkJoinTask<?> x;
//采用unsafe操作
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
int j = ((b & mask) << ASHIFT) + ABASE;
//實際上直接進行的內(nèi)存對象copy淀弹,這樣效率比循環(huán)調(diào)用push和poll要高很多
x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
//判斷 x不為空 則使用unsafe進行操作
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null))
U.putObjectVolatile(a, j, x);
} while (++b != t);
}
//返回新的數(shù)組
return a;
}
需要注意的是,這個方法一旦調(diào)用進行擴容之后洪添,無論是來自于外部push操作觸發(fā)垦页,還是有工作線程worker觸發(fā),都將被鎖定干奢,之后痊焊,不能移動top指針,但是base指針是可以移動的。這也就是說薄啥,一旦處于擴容的過程中辕羽,就不能新增task,但是可以從base進行消費垄惧,這就只支持FIFO刁愿。因此同步模式將在此時被阻塞。
5.3.3 pop
同樣到逊,pop操作也僅限于工作線程铣口,對于共享對立中則不允許使用pop方法。這個方法將按LIFO后進先出的方式從隊列中觉壶。
/**
* Takes next task, if one exists, in LIFO order. Call only
* by owner in unshared queues.
*/
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
//如果array不為空切長度大于0
if ((a = array) != null && (m = a.length - 1) >= 0) {
//循環(huán)脑题,s為top的指針減1,即top減1之后要大于0 也就是說要存在task
for (int s; (s = top - 1) - base >= 0;) {
//計算unsafe的偏移量 得到s的位置
long j = ((m & s) << ASHIFT) + ABASE;
//如果這個索引處的對象為空铜靶,則退出
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
//反之用usafe的方法將這個值取走叔遂,之后返回,并更新top的指針
if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s);
return t;
}
}
}
return null;
}
pop方法争剿,這是僅限于owoner調(diào)用的方法已艰,將從top指針處取出task。這個方法對于整個隊列是LIFO的方式蚕苇。
5.3.4 poll
poll方法將從隊列中按FIFO的方式取出task哩掺。
/**
* Takes next task, if one exists, in FIFO order.
*/
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
//判斷 base-top小于0說明存在task 切array不為空
while ((b = base) - top < 0 && (a = array) != null) {
//計算出unsafe操作的索引 實際上就是拿到b
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
//之后拿到這個task 用volatile的方式
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
//之后如果base和b相等
if (base == b) {
//如果拿到的task不為空
if (t != null) {
//那么將這個位置的元素移除 base+1 然后返回t
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
//在上述操作之后,如果base比top小1說明已經(jīng)為空了 直接退出循環(huán)
else if (b + 1 == top) // now empty
break;
}
}
//默認返回null
return null;
}
5.3.5 pollAt
這個方法將采用FIFO的方式捆蜀,從 隊列中獲得task疮丛。
/**
* Takes a task in FIFO order if b is base of queue and a task
* can be claimed without contention. Specialized versions
* appear in ForkJoinPool methods scan and helpStealer.
*/
final ForkJoinTask<?> pollAt(int b) {
ForkJoinTask<?> t; ForkJoinTask<?>[] a;
//數(shù)組不為空
if ((a = array) != null) {
//計算索引b的位置
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
//如果此處的task不為空,則將此處置為null然后將對象task返回
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
base == b && U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
return null;
}
通常情況下辆它,b指的是隊列的base指針誊薄。那么從底部獲取元素就能實現(xiàn)FIFO。特殊的版本出現(xiàn)在scan和helpStealer中用于對工作隊列的竊取操作的實現(xiàn)锰茉。
5.3.6 nextLocalTask
/**
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
return (config & FIFO_QUEUE) == 0 ? pop() : poll();
}
這個方法中對之前的MODE會起作用呢蔫,如果是FIFO則用pop方法,反之則用poll方法獲得下一個task飒筑。
5.3.7 peek
/**
* Returns next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> peek() {
ForkJoinTask<?>[] a = array; int m;
//判斷數(shù)組的合法性
if (a == null || (m = a.length - 1) < 0)
return null;
//根據(jù)mode決定從top還是base處獲得task
int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
int j = ((i & m) << ASHIFT) + ABASE;
//返回獲得的task
return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
}
peek則根據(jù)之前的mode定義片吊,從隊列的前面或者后面取得task。
5.3.8 tryUnpush
/**
* Pops the given task only if it is at the current top.
* (A shared version is available only via FJP.tryExternalUnpush)
*/
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
//判斷數(shù)組的合法性
if ((a = array) != null && (s = top) != base &&
//將top位置的task與t比較协屡,如果相等則將其改為null
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
//將top減1
U.putOrderedInt(this, QTOP, s);
//返回操作成功
return true;
}
//默認返回失敗
return false;
}
這個方法是將之前push的任務撤回俏脊。這個操作僅僅只有task位于top的時候操能成功。
5.3.9 runTask
在之前的文章分析外部提交task的時候肤晓,就提到了這個方法爷贫。實際上是runWorker調(diào)用的认然。
也就是說,線程在啟動之后漫萄,一旦worker獲取到task卷员,就會運行。
/**
* Executes the given task and any remaining local tasks.
*/
final void runTask(ForkJoinTask<?> task) {
//task不為空
if (task != null) {
//掃描狀態(tài)標記為busy 那么說明當前的worker正在處理本地任務 此時這個操作會將scanState改為0
scanState &= ~SCANNING; // mark as busy
//執(zhí)行這個task
(currentSteal = task).doExec();
//釋放已執(zhí)行任務的內(nèi)存
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
//執(zhí)行其他本地的task
execLocalTasks();
ForkJoinWorkerThread thread = owner;
//增加增加steals的次數(shù)
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
//將scanState改為1 這樣就變得活躍可以被其他worker scan
scanState |= SCANNING;
//如果thread不為null說明為worker線程 則調(diào)用后續(xù)的exec方法
if (thread != null)
thread.afterTopLevelExec();
}
}
5.3.10 execLocalTasks
調(diào)用這個方法腾务,運行隊列中的全部task毕骡,如果采用了LIFO模式,則調(diào)用pollAndExecAll岩瘦,這是另外一種實現(xiàn)方法未巫。直到將隊列都執(zhí)行到empty
/**
* Removes and executes all local tasks. If LIFO, invokes
* pollAndExecAll. Otherwise implements a specialized pop loop
* to exec until empty.
*/
final void execLocalTasks() {
int b = base, m, s;
//拿到數(shù)組
ForkJoinTask<?>[] a = array;
//如果b-s小于0說明存在task,a不為空启昧,切a的長度大于0 這均是檢測方法的合法性
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
//如果沒有采用FIFO的mode 那么一定是LIFO 則從top處開始
if ((config & FIFO_QUEUE) == 0) {
//開始循環(huán)
for (ForkJoinTask<?> t;;) {
//從top開始取出task
if ((t = (ForkJoinTask<?>)U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)
break;
//修改top
U.putOrderedInt(this, QTOP, s);
//執(zhí)行task
t.doExec();
//如果沒有任務的了 則退出
if (base - (s = top - 1) > 0)
break;
}
}
else
//FIFO的方式調(diào)用pollAndExecAll
pollAndExecAll();
}
}
5.3.11 pollAndExecAll
此方法將用poll橱赠,F(xiàn)IFO的方式獲得task并執(zhí)行。
final void pollAndExecAll() {
for (ForkJoinTask<?> t; (t = poll()) != null;)
t.doExec();
}
可見箫津,當通過workQueue中調(diào)用runTask的方法的時候,會將這個隊列的scanState狀態(tài)修改為0宰啦,之后將這個隊列中的全部task根據(jù)定義的mode全部消費完畢苏遥。
5.3.12 tryRemoveAndExec
從注釋中可知,這個方法僅僅供awaitJoin方法調(diào)用赡模,在await的過程中田炭,將task從workQueue中移除并執(zhí)行。
/**
* If present, removes from queue and executes the given task,
* or any other cancelled task. Used only by awaitJoin.
*
* @return true if queue empty and task not known to be done
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
//判斷數(shù)組的合法性 task不能為空
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
//循環(huán) n為task的數(shù)量漓柑,必須大于0
while ((n = (s = top) - (b = base)) > 0) {
//死循環(huán) 從top遍歷到base
for (ForkJoinTask<?> t;;) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
return s + 1 == top; // shorter than expected
//如果task處于top位置
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { // pop
//pop的方式獲取task 然后替換為null
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
//用emptytask代替
else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
//如果remove成功 則執(zhí)行這個task
if (removed)
task.doExec();
break;
}
//如果task的status為負數(shù) 切 top=s=1
else if (t.status < 0 && s + 1 == top) {
//移除
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
}
return true;
}
5.3.13 popCC
如果pop CountedCompleter教硫。這方法支持共享和worker的隊列,但是僅僅通過helpComplete調(diào)用辆布。
CountedCompleter是jdk1.8中新增的一個ForkJoinTask的一個實現(xiàn)類瞬矩。
/**
* Pops task if in the same CC computation as the given task,
* in either shared or owned mode. Used only by helpComplete.
*/
final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
int s; ForkJoinTask<?>[] a; Object o;
//判斷隊列數(shù)組合法性
if (base - (s = top) < 0 && (a = array) != null) {
//從top處開始
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
//如果獲的的task不為null
if ((o = U.getObjectVolatile(a, j)) != null &&
//且為CountedCompleter對象
(o instanceof CountedCompleter)) {
//轉(zhuǎn)換為CountedCompleter
CountedCompleter<?> t = (CountedCompleter<?>)o;
//死循環(huán)
for (CountedCompleter<?> r = t;;) {
//如果task與獲得的r相等為同一對象
if (r == task) {
//如果mode小于0
if (mode < 0) { // must lock
//cas的方式加鎖
if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
//將這個對象清除 并修改top后解鎖
if (top == s && array == a &&
U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s - 1);
U.putOrderedInt(this, QLOCK, 0);
//返回t
return t;
}
//解鎖
U.compareAndSwapInt(this, QLOCK, 1, 0);
}
}
else if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s - 1);
return t;
}
break;
}
else if ((r = r.completer) == null) // try parent
break;
}
}
}
return null;
}
5.3.14 pollAndExecCC
pollAndExecCC 。竊取并運行與給定任務相同CountedCompleter計算任務(如果存在)锋玲,并且可以在不發(fā)生爭用的情況下執(zhí)行該任務景用。否則,返回一個校驗和/控制值惭蹂,供helpComplete方法使用伞插。
/**
* Steals and runs a task in the same CC computation as the
* given task if one exists and can be taken without
* contention. Otherwise returns a checksum/control value for
* use by method helpComplete.
*
* @return 1 if successful, 2 if retryable (lost to another
* stealer), -1 if non-empty but no matching task found, else
* the base index, forced negative.
*/
final int pollAndExecCC(CountedCompleter<?> task) {
int b, h; ForkJoinTask<?>[] a; Object o;
//判斷array的合法性
if ((b = base) - top >= 0 || (a = array) == null)
h = b | Integer.MIN_VALUE; // to sense movement on re-poll
else {
//從base開始獲得task
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((o = U.getObjectVolatile(a, j)) == null)
h = 2; // retryable
else if (!(o instanceof CountedCompleter))
h = -1; // unmatchable
else {
CountedCompleter<?> t = (CountedCompleter<?>)o;
//死循環(huán)
for (CountedCompleter<?> r = t;;) {
if (r == task) {
if (base == b &&
U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
t.doExec();
h = 1; // success
}
else
h = 2; // lost CAS
break;
}
else if ((r = r.completer) == null) {
h = -1; // unmatched
break;
}
}
}
}
return h;
}
externalPush方法中的“q = ws[m & r & SQMASK]”代碼非常重要。我們大致來分析一下作者的意圖盾碗,首先m是ForkJoinPool中的WorkQueue數(shù)組長度減1媚污,例如當前WorkQueue數(shù)組大小為16,那么m的值就為15廷雅;r是一個線程獨立的隨機數(shù)生成器耗美,關于java.util.concurrent.ThreadLocalRandom類的功能和使用方式可參見其它資料京髓;而SQMASK是一個常量,值為126 (0x7e)幽歼。以下是一種可能的計算過程和計算結(jié)果:
實際上任何數(shù)和126進行“與”運算朵锣,其結(jié)果只可能是0或者偶數(shù),即0甸私、2诚些、4、6皇型、8诬烹。也就是說以上代碼中從名為“ws”的WorkQueue數(shù)組中,取出的元素只可能是第0個或者第偶數(shù)個隊列弃鸦。
結(jié)論就是偶數(shù)是外部任務绞吁,奇數(shù)是需要拆解合并的任務。
ForkJoinWorkerThread需要從雙端隊列中取出下一個待執(zhí)行子任務唬格,就會根據(jù)設定的asyncMode調(diào)用雙端隊列的不同方法家破,代碼概要如下所示:
final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
for (ForkJoinTask<?> t;;) {
WorkQueue q; int b;
// 該方法試圖從“w”這個隊列獲取下一個待處理子任務
if ((t = w.nextLocalTask()) != null)
return t;
// 如果沒有獲取到,則使用findNonEmptyStealQueue方法
// 隨機得到一個元素非空购岗,并且可以進行任務竊取的存在于ForkJoinPool中的其它隊列
// 這個隊列被記為“q”
if ((q = findNonEmptyStealQueue()) == null)
return null;
// 試圖從“q”這個隊列base位處取出待執(zhí)行任務
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
return t;
}
}
六汰聋、總結(jié)
本文對workQueue的源碼進行了分析,我們需要注意的是喊积,對于workQueue烹困,定義了三個操作,分別是push乾吻,poll和pop髓梅。
-
push
主要是操作top指針,將top進行移動绎签。
-
poll
如果top和base不等枯饿,則說明隊列有值,可以消費辜御,那么poll就從base指針處開始消費鸭你。這個方法實現(xiàn)了隊列的FIFO。
消費之后對base進行移動擒权。
-
pop
同樣袱巨,還可以從top開始消費,這就是pop碳抄。這個方法實際上實現(xiàn)了對隊列的LIFO愉老。
消費之后將top減1。
以上就是這三個方法對應的操作剖效。但是我們還需要注意的是嫉入,在所有的unsafe操作中焰盗,通過cas進行設置或者獲得task的時候,還有一個掩碼咒林。這個非常重要熬拒。
我們可以看在push方法中:
int m = a.length - 1;
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
在擴容的方法growArray中我們可以知道。每次擴容都是采用左移的方式來進行垫竞,這樣就保證了數(shù)組的長度為2的冪澎粟。
在這里,m=a.length-1欢瞪,那就說明活烙,m實際上其二進制格式將會有效位都為1,這個數(shù)字就可以做為掩碼。當m再與s取&計算的時候遣鼓⌒フ担可以想象,s大于m的部分將被去除骑祟,只會保留比m小的部分回懦。那么實際上,這就等價于次企,當我們一直再push元素到數(shù)組中的時候粉怕,實際上就從數(shù)組的索引底部開始:
參考上面這個過程,也就是說抒巢,實際上這個數(shù)組,base和top實際指向的index并不重要秉犹。只有二者的相對位移才是重要的蛉谜。這有點類似與RingBuffer的數(shù)據(jù)結(jié)構(gòu),但是還是有所不同崇堵。也就是說這個數(shù)組實際上是不會被浪費的型诚。之前有很多不理解的地方,為什么top減去base可能出現(xiàn)負數(shù)鸳劳。那么這樣實際上就會導致負數(shù)的產(chǎn)生狰贯。
這樣的話,如果我們采用異步模式赏廓,asyncMode為true的時候涵紊,workQueue則會采用FIFO_QUEUE的model,這樣workQueue本身就使用的時poll方法幔摸。反之如果使用LIFO_QUEUE的同步模式摸柄,則workQueue使用pop方法妄辩。默認情況下采用同步模式慈迈。同步的時候workQueue的指針都圍繞在數(shù)組的初始化的中間位置波動。而共享隊列則會一直循環(huán)片橡。
至此,我們分析了workQueue的源碼跃脊,對其內(nèi)部實現(xiàn)的雙端隊列本身的操作進行了分析宇挫。為什么作者會自己實現(xiàn)一個Deque,而不是使用juc中已存在的容器酪术。這就是因為這個隊列全程都是采用Unsafe來實現(xiàn)的器瘪,在開篇作者也說了,需要@Contented修飾拼缝,就是為了避免緩存的偽代共享娱局。這樣來實現(xiàn)一個高效的Deque,以供ForkJoinPool來操作咧七。
這與學習ConcurrentHashMap等容器的源碼一樣衰齐,可以看出作者為了性能的優(yōu)化,采用了很多獨特的方式來實現(xiàn)继阻。這些地方都是我們值得學習和借鑒之處耻涛。這也是ForkJoin性能高效的關鍵。在作者的論文中也可以看出瘟檩,java的實現(xiàn)抹缕,由于抽象在jvm之上,性能比c/c++的實現(xiàn)要低很多墨辛。這也是作者盡可能將性能做到最優(yōu)的原因之一卓研。
參考:
https://blog.csdn.net/Xiaowu_First/article/details/122407019
https://blog.csdn.net/tyrroo/article/details/81483608
https://www.cnblogs.com/juniorMa/articles/14234472.html