JUC包含幾個部分?
1)Lock框架
2)并發(fā)集合
3) 原子類
4) 線程池
5)工具類砂豌、ThreadLocal
ScheduledThreadPoolExecutor簡介
ScheduledThreadPoolExecutor繼承自 ThreadPoolExecutor赚哗,為任務(wù)提供延遲或周期執(zhí)行湘今,屬于線程池的一種虑凛。和 ThreadPoolExecutor 相比翎碑,它還具有以下幾種特性:
- 使用專門的任務(wù)類型—ScheduledFutureTask 來執(zhí)行周期任務(wù),也可以接收不需要時間調(diào)度的任務(wù)(這些任務(wù)通過 ExecutorService 來執(zhí)行)鲁豪。
- 使用專門的存儲隊列—DelayedWorkQueue 來存儲任務(wù),DelayedWorkQueue 是無界延遲隊列DelayQueue 的一種律秃。相比ThreadPoolExecutor也簡化了執(zhí)行機制(delayedExecute方法爬橡,后面單獨分析)。
- 支持可選的run-after-shutdown參數(shù)棒动,在池被關(guān)閉(shutdown)之后支持可選的邏輯來決定是否繼續(xù)運行周期或延遲任務(wù)糙申。并且當(dāng)任務(wù)(重新)提交操作與 shutdown 操作重疊時,復(fù)查邏輯也不相同船惨。
ScheduledThreadPoolExecutor數(shù)據(jù)結(jié)構(gòu)
ScheduledThreadPoolExecutor繼承自 ThreadPoolExecutor:
ScheduledThreadPoolExecutor 內(nèi)部構(gòu)造了兩個內(nèi)部類 ScheduledFutureTask
和 DelayedWorkQueue
:
ScheduledFutureTask
: 繼承了FutureTask柜裸,說明是一個異步運算任務(wù);最上層分別實現(xiàn)了Runnable粱锐、Future疙挺、Delayed接口,說明它是一個可以延遲執(zhí)行的異步運算任務(wù)怜浅。DelayedWorkQueue
: 這是 ScheduledThreadPoolExecutor 為存儲周期或延遲任務(wù)專門定義的一個延遲隊列铐然,繼承了 AbstractQueue晴股,為了契合 ThreadPoolExecutor 也實現(xiàn)了 BlockingQueue 接口剩瓶。它內(nèi)部只允許存儲 RunnableScheduledFuture 類型的任務(wù)周荐。與 DelayQueue 的不同之處就是它只允許存放 RunnableScheduledFuture 對象形用,并且自己實現(xiàn)了二叉堆(DelayQueue 是利用了 PriorityQueue 的二叉堆結(jié)構(gòu))。
ScheduledThreadPoolExecutor源碼解析
內(nèi)部類ScheduledFutureTask
屬性
//為相同延時任務(wù)提供的順序編號
private final long sequenceNumber;
//任務(wù)可以執(zhí)行的時間险掀,納秒級
private long time;
//重復(fù)任務(wù)的執(zhí)行周期時間沪袭,納秒級。
private final long period;
//重新入隊的任務(wù)
RunnableScheduledFuture<V> outerTask = this;
//延遲隊列的索引樟氢,以支持更快的取消操作
int heapIndex;
-
sequenceNumber
: 當(dāng)兩個任務(wù)有相同的延遲時間時冈绊,按照 FIFO 的順序入隊。sequenceNumber 就是為相同延時任務(wù)提供的順序編號埠啃。 -
time
: 任務(wù)可以執(zhí)行時的時間死宣,納秒級,通過triggerTime方法計算得出碴开。 -
period
: 任務(wù)的執(zhí)行周期時間毅该,納秒級。正數(shù)表示固定速率執(zhí)行(為scheduleAtFixedRate提供服務(wù))潦牛,負(fù)數(shù)表示固定延遲執(zhí)行(為scheduleWithFixedDelay提供服務(wù))眶掌,0表示不重復(fù)任務(wù)。 -
outerTask
: 重新入隊的任務(wù)巴碗,通過reExecutePeriodic方法入隊重新排序朴爬。
核心方法run()
public void run() {
boolean periodic = isPeriodic();//是否為周期任務(wù)
if (!canRunInCurrentRunState(periodic))//當(dāng)前狀態(tài)是否可以執(zhí)行
cancel(false);
else if (!periodic)
//不是周期任務(wù),直接執(zhí)行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();//設(shè)置下一次運行時間
reExecutePeriodic(outerTask);//重排序一個周期任務(wù)
}
}
說明: ScheduledFutureTask 的run方法重寫了 FutureTask 的版本橡淆,以便執(zhí)行周期任務(wù)時重置/重排序任務(wù)召噩。任務(wù)的執(zhí)行通過父類 FutureTask 的run實現(xiàn)。內(nèi)部有兩個針對周期任務(wù)的方法:
- setNextRunTime(): 用來設(shè)置下一次運行的時間逸爵,源碼如下:
//設(shè)置下一次執(zhí)行任務(wù)的時間
private void setNextRunTime() {
long p = period;
if (p > 0) //固定速率執(zhí)行具滴,scheduleAtFixedRate
time += p;
else
time = triggerTime(-p); //固定延遲執(zhí)行,scheduleWithFixedDelay
}
//計算固定延遲任務(wù)的執(zhí)行時間
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
reExecutePeriodic(): 周期任務(wù)重新入隊等待下一次執(zhí)行痊银,源碼如下:
//重排序一個周期任務(wù)
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {//池關(guān)閉后可繼續(xù)執(zhí)行
super.getQueue().add(task);//任務(wù)入列
//重新檢查run-after-shutdown參數(shù)抵蚊,如果不能繼續(xù)運行就移除隊列任務(wù),并取消任務(wù)的執(zhí)行
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();//啟動一個新的線程等待任務(wù)
}
}
reExecutePeriodic與delayedExecute的執(zhí)行策略一致溯革,只不過reExecutePeriodic不會執(zhí)行拒絕策略而是直接丟掉任務(wù)。
cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
ScheduledFutureTask.cancel本質(zhì)上由其父類 FutureTask.cancel 實現(xiàn)谷醉。取消任務(wù)成功后會根據(jù)removeOnCancel參數(shù)決定是否從隊列中移除此任務(wù)致稀。
核心屬性
//關(guān)閉后繼續(xù)執(zhí)行已經(jīng)存在的周期任務(wù)
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
//關(guān)閉后繼續(xù)執(zhí)行已經(jīng)存在的延時任務(wù)
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
//取消任務(wù)后移除
private volatile boolean removeOnCancel = false;
//為相同延時的任務(wù)提供的順序編號,保證任務(wù)之間的FIFO順序
private static final AtomicLong sequencer = new AtomicLong();
continueExistingPeriodicTasksAfterShutdown
和executeExistingDelayedTasksAfterShutdown
是 ScheduledThreadPoolExecutor 定義的run-after-shutdown
參數(shù)俱尼,用來控制池關(guān)閉之后的任務(wù)執(zhí)行邏輯抖单。removeOnCancel
用來控制任務(wù)取消后是否從隊列中移除。當(dāng)一個已經(jīng)提交的周期或延遲任務(wù)在運行之前被取消,那么它之后將不會運行矛绘。默認(rèn)配置下耍休,這種已經(jīng)取消的任務(wù)在屆期之前不會被移除。 通過這種機制货矮,可以方便檢查和監(jiān)控線程池狀態(tài)羊精,但也可能導(dǎo)致已經(jīng)取消的任務(wù)無限滯留。為了避免這種情況的發(fā)生囚玫,我們可以通過setRemoveOnCancelPolicy
方法設(shè)置移除策略喧锦,把參數(shù)removeOnCancel
設(shè)為true可以在任務(wù)取消后立即從隊列中移除。sequencer
是為相同延時的任務(wù)提供的順序編號抓督,保證任務(wù)之間的 FIFO 順序燃少。與 ScheduledFutureTask 內(nèi)部的sequenceNumber參數(shù)作用一致。
構(gòu)造函數(shù)
首先看下構(gòu)造函數(shù)铃在,ScheduledThreadPoolExecutor 內(nèi)部有四個構(gòu)造函數(shù)阵具,這里我們只看這個最大構(gòu)造靈活度的:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
構(gòu)造函數(shù)都是通過super調(diào)用了ThreadPoolExecutor的構(gòu)造,并且使用特定等待隊列DelayedWorkQueue定铜。
核心方法:Schedule
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//構(gòu)造ScheduledFutureTask任務(wù)
delayedExecute(t);//任務(wù)執(zhí)行主方法
return t;
}
說明: schedule主要用于執(zhí)行一次性(延遲)任務(wù)阳液。函數(shù)執(zhí)行邏輯分兩步:
-
封裝 Callable/Runnable
: 首先通過triggerTime計算任務(wù)的延遲執(zhí)行時間,然后通過 ScheduledFutureTask 的構(gòu)造函數(shù)把 Runnable/Callable 任務(wù)構(gòu)造為ScheduledThreadPoolExecutor可以執(zhí)行的任務(wù)類型宿稀,最后調(diào)用decorateTask方法執(zhí)行用戶自定義的邏輯趁舀;decorateTask是一個用戶可自定義擴展的方法,默認(rèn)實現(xiàn)下直接返回封裝的RunnableScheduledFuture任務(wù)祝沸,源碼如下:
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
-
執(zhí)行任務(wù)
: 通過delayedExecute實現(xiàn)矮烹。下面我們來詳細(xì)分析。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);//池已關(guān)閉罩锐,執(zhí)行拒絕策略
else {
super.getQueue().add(task);//任務(wù)入隊
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&//判斷run-after-shutdown參數(shù)
remove(task))//移除任務(wù)
task.cancel(false);
else
ensurePrestart();//啟動一個新的線程等待任務(wù)
}
}
說明: delayedExecute是執(zhí)行任務(wù)的主方法奉狈,方法執(zhí)行邏輯如下:
- 如果池已關(guān)閉(ctl >= SHUTDOWN),執(zhí)行任務(wù)拒絕策略涩惑;
- 池正在運行仁期,首先把任務(wù)入隊排序;然后重新檢查池的關(guān)閉狀態(tài)竭恬,執(zhí)行如下邏輯:
A
: 如果池正在運行跛蛋,或者 run-after-shutdown 參數(shù)值為true,則調(diào)用父類方法ensurePrestart啟動一個新的線程等待執(zhí)行任務(wù)痊硕。ensurePrestart源碼如下:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
ensurePrestart是父類 ThreadPoolExecutor 的方法赊级,用于啟動一個新的工作線程等待執(zhí)行任務(wù),即使corePoolSize為0也會安排一個新線程岔绸。
B
: 如果池已經(jīng)關(guān)閉理逊,并且 run-after-shutdown 參數(shù)值為false橡伞,則執(zhí)行父類(ThreadPoolExecutor)方法remove移除隊列中的指定任務(wù),成功移除后調(diào)用ScheduledFutureTask.cancel取消任務(wù)
核心方法:scheduleAtFixedRate 和 scheduleWithFixedDelay
/**
* 創(chuàng)建一個周期執(zhí)行的任務(wù)晋被,第一次執(zhí)行延期時間為initialDelay兑徘,
* 之后每隔period執(zhí)行一次,不等待第一次執(zhí)行完成就開始計時
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//構(gòu)建RunnableScheduledFuture任務(wù)類型
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),//計算任務(wù)的延遲時間
unit.toNanos(period));//計算任務(wù)的執(zhí)行周期
RunnableScheduledFuture<Void> t = decorateTask(command, sft);//執(zhí)行用戶自定義邏輯
sft.outerTask = t;//賦值給outerTask羡洛,準(zhǔn)備重新入隊等待下一次執(zhí)行
delayedExecute(t);//執(zhí)行任務(wù)
return t;
}
/**
* 創(chuàng)建一個周期執(zhí)行的任務(wù)挂脑,第一次執(zhí)行延期時間為initialDelay,
* 在第一次執(zhí)行完之后延遲delay后開始下一次執(zhí)行
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//構(gòu)建RunnableScheduledFuture任務(wù)類型
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),//計算任務(wù)的延遲時間
unit.toNanos(-delay));//計算任務(wù)的執(zhí)行周期
RunnableScheduledFuture<Void> t = decorateTask(command, sft);//執(zhí)行用戶自定義邏輯
sft.outerTask = t;//賦值給outerTask翘县,準(zhǔn)備重新入隊等待下一次執(zhí)行
delayedExecute(t);//執(zhí)行任務(wù)
return t;
}
說明: scheduleAtFixedRate和scheduleWithFixedDelay方法的邏輯與schedule類似最域。
注意scheduleAtFixedRate和scheduleWithFixedDelay的區(qū)別: 乍一看兩個方法一模一樣,其實锈麸,在unit.toNanos這一行代碼中還是有區(qū)別的镀脂。沒錯,scheduleAtFixedRate傳的是正值忘伞,而scheduleWithFixedDelay傳的則是負(fù)值薄翅,這個值就是 ScheduledFutureTask 的period屬性。
核心方法:shutdown()
public void shutdown() {
super.shutdown();
}
//取消并清除由于關(guān)閉策略不應(yīng)該運行的所有任務(wù)
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
//獲取run-after-shutdown參數(shù)
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic) {//池關(guān)閉后不保留任務(wù)
//依次取消任務(wù)
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();//清除等待隊列
}
else {//池關(guān)閉后保留任務(wù)
// Traverse snapshot to avoid iterator exceptions
//遍歷快照以避免迭代器異常
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
//如果任務(wù)已經(jīng)取消氓奈,移除隊列中的任務(wù)
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate(); //終止線程池
}
說明: 池關(guān)閉方法調(diào)用了父類ThreadPoolExecutor的shutdown翘魄,具體分析見 ThreadPoolExecutor 篇。這里主要介紹以下在shutdown方法中調(diào)用的關(guān)閉鉤子onShutdown方法舀奶,它的主要作用是在關(guān)閉線程池后取消并清除由于關(guān)閉策略不應(yīng)該運行的所有任務(wù)暑竟,這里主要是根據(jù) run-after-shutdown 參數(shù)(continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown)來決定線程池關(guān)閉后是否關(guān)閉已經(jīng)存在的任務(wù)。
- 為什么ThreadPoolExecutor 的調(diào)整策略卻不適用于 ScheduledThreadPoolExecutor育勺?
例如: 由于 ScheduledThreadPoolExecutor 是一個固定核心線程數(shù)大小的線程池但荤,并且使用了一個無界隊列,所以調(diào)整maximumPoolSize對其沒有任何影響(所以 ScheduledThreadPoolExecutor 沒有提供可以調(diào)整最大線程數(shù)的構(gòu)造函數(shù)涧至,默認(rèn)最大線程數(shù)固定為Integer.MAX_VALUE)腹躁。此外,設(shè)置corePoolSize為0或者設(shè)置核心線程空閑后清除(allowCoreThreadTimeOut)同樣也不是一個好的策略南蓬,因為一旦周期任務(wù)到達(dá)某一次運行周期時纺非,可能導(dǎo)致線程池內(nèi)沒有線程去處理這些任務(wù)。
-
Executors 提供了哪幾種方法來構(gòu)造 ScheduledThreadPoolExecutor赘方?
- newScheduledThreadPool: 可指定核心線程數(shù)的線程池烧颖。
- newSingleThreadScheduledExecutor: 只有一個工作線程的線程池。如果內(nèi)部工作線程由于執(zhí)行周期任務(wù)異常而被終止窄陡,則會新建一個線程替代它的位置倒信。
注意: newScheduledThreadPool(1, threadFactory) 不等價于newSingleThreadScheduledExecutor。newSingleThreadScheduledExecutor創(chuàng)建的線程池保證內(nèi)部只有一個線程執(zhí)行任務(wù)泳梆,并且線程數(shù)不可擴展鳖悠;而通過newScheduledThreadPool(1, threadFactory)創(chuàng)建的線程池可以通過setCorePoolSize方法來修改核心線程數(shù)。
ScheduledThreadPoolExecutor要解決什么樣的問題?
現(xiàn)實案例:用到一個定時器處理藍(lán)牙設(shè)備接收的數(shù)據(jù)优妙,并且需要處理的頻率是很快的乘综,這就需要一個穩(wěn)定的定時器來保證數(shù)據(jù)的長久進(jìn)行。ScheduledThreadPoolExecutor這個類就是個很好的選擇套硼。ScheduledThreadPoolExecutor相比ThreadPoolExecutor有哪些特性?
1.ScheduledThreadPoolExecutor使用的是固定核心線程數(shù)大小的線程池
2.延時卡辰、周期ScheduledThreadPoolExecutor有什么樣的數(shù)據(jù)結(jié)構(gòu),核心內(nèi)部類和抽象類?
隊列邪意。run()九妈、cancel()、ScheduledThreadPoolExecutor有哪兩個關(guān)閉策略? 區(qū)別是什么?
關(guān)閉操作雾鬼,與線程池執(zhí)行器的關(guān)閉基本相同萌朱,不同的是,在onShutdown方法策菜,調(diào)度線程池執(zhí)行器晶疼,重寫了這個方法,這個方法主要是根據(jù)線程池關(guān)閉間歇性任務(wù)和延時任務(wù)的處理策略又憨,確定是否以不可中斷方式取消任務(wù)翠霍。ScheduledThreadPoolExecutor中scheduleAtFixedRate 和 scheduleWithFixedDelay區(qū)別是什么?
在unit.toNanos這一行代碼中還是有區(qū)別的。沒錯蠢莺,scheduleAtFixedRate傳的是正值寒匙,而scheduleWithFixedDelay傳的則是負(fù)值,這個值就是 ScheduledFutureTask 的period屬性為什么ThreadPoolExecutor 的調(diào)整策略卻不適用于 ScheduledThreadPoolExecutor?
由于 ScheduledThreadPoolExecutor 是一個固定核心線程數(shù)大小的線程池躏将,并且使用了一個無界隊列锄弱,所以調(diào)整maximumPoolSize對其沒有任何影響(所以 ScheduledThreadPoolExecutor 沒有提供可以調(diào)整最大線程數(shù)的構(gòu)造函數(shù),默認(rèn)最大線程數(shù)固定為Integer.MAX_VALUE)。此外,設(shè)置corePoolSize為0或者設(shè)置核心線程空閑后清除(allowCoreThreadTimeOut)同樣也不是一個好的策略苔可,因為一旦周期任務(wù)到達(dá)某一次運行周期時萨咳,可能導(dǎo)致線程池內(nèi)沒有線程去處理這些任務(wù)。Executors 提供了幾種方法來構(gòu)造 ScheduledThreadPoolExecutor?
newScheduledThreadPool: 可指定核心線程數(shù)的線程池。
newSingleThreadScheduledExecutor: 只有一個工作線程的線程池。如果內(nèi)部工作線程由于執(zhí)行周期任務(wù)異常而被終止敞临,則會新建一個線程替代它的位置窄俏。
Fork/Join
Fork/Join框架基本使用
計算1-1001累加后的值:
/**
* 這是一個簡單的Join/Fork計算過程缎岗,將1—1001數(shù)字相加
*/
public class TestForkJoinPool {
private static final Integer MAX = 200;
static class MyForkJoinTask extends RecursiveTask<Integer> {
// 子任務(wù)開始計算的值
private Integer startValue;
// 子任務(wù)結(jié)束計算的值
private Integer endValue;
public MyForkJoinTask(Integer startValue , Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
// 如果條件成立普舆,說明這個任務(wù)所需要計算的數(shù)值分為足夠小了
// 可以正式進(jìn)行累加計算了
if(endValue - startValue < MAX) {
System.out.println("開始計算的部分:startValue = " + startValue + ";endValue = " + endValue);
Integer totalValue = 0;
for(int index = this.startValue ; index <= this.endValue ; index++) {
totalValue += index;
}
return totalValue;
}
// 否則再進(jìn)行任務(wù)拆分轧膘,拆分成兩個任務(wù)
else {
MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
subTask1.fork();
MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {
// 這是Fork/Join框架的線程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> taskFuture = pool.submit(new MyForkJoinTask(1,1001));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}
}
開始計算的部分:startValue = 1;endValue = 126
開始計算的部分:startValue = 127;endValue = 251
開始計算的部分:startValue = 252;endValue = 376
開始計算的部分:startValue = 377;endValue = 501
開始計算的部分:startValue = 502;endValue = 626
開始計算的部分:startValue = 627;endValue = 751
開始計算的部分:startValue = 752;endValue = 876
開始計算的部分:startValue = 877;endValue = 1001
result = 501501
Fork/Join框架簡介
Fork/Join框架是Java并發(fā)工具包中的一種可以將一個大任務(wù)拆分為很多小任務(wù)來異步執(zhí)行的工具,自JDK1.7引入。
三個模塊及關(guān)系
Fork/Join框架主要包含三個模塊:
- 任務(wù)對象:
ForkJoinTask
(包括RecursiveTask
、RecursiveAction
和CountedCompleter
) - 執(zhí)行Fork/Join任務(wù)的線程:
ForkJoinWorkerThread
- 線程池:
ForkJoinPool
這三者的關(guān)系是: ForkJoinPool可以通過池中的ForkJoinWorkerThread來處理ForkJoinTask任務(wù)候学。
// from 《A Java Fork/Join Framework》Dong Lea
Result solve(Problem problem) {
if (problem is small)
directly solve problem
else {
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
ForkJoinPool 只接收 ForkJoinTask 任務(wù)(在實際使用中,也可以接收 Runnable/Callable 任務(wù)焰坪,但在真正運行時趣倾,也會把這些任務(wù)封裝成 ForkJoinTask 類型的任務(wù)),RecursiveTask 是 ForkJoinTask 的子類某饰,是一個可以遞歸執(zhí)行的 ForkJoinTask儒恋,RecursiveAction 是一個無返回值的 RecursiveTask,CountedCompleter 在任務(wù)完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)黔漂。
在實際運用中诫尽,我們一般都會繼承 RecursiveTask
、RecursiveAction
或 CountedCompleter
來實現(xiàn)我們的業(yè)務(wù)需求炬守,而不會直接繼承 ForkJoinTask 類牧嫉。
核心思想: 分治算法(Divide-and-Conquer)
分治算法(Divide-and-Conquer)把任務(wù)遞歸的拆分為各個子任務(wù),這樣可以更好的利用系統(tǒng)資源减途,盡可能的使用所有可用的計算能力來提升應(yīng)用性能酣藻。首先看一下 Fork/Join 框架的任務(wù)運行機制:
核心思想: work-stealing(工作竊取)算法
work-stealing(工作竊取)算法: 線程池內(nèi)的所有工作線程都嘗試找到并執(zhí)行已經(jīng)提交的任務(wù),或者是被其他活動任務(wù)創(chuàng)建的子任務(wù)(如果不存在就阻塞等待)鳍置。這種特性使得 ForkJoinPool 在運行多個可以產(chǎn)生子任務(wù)的任務(wù)辽剧,或者是提交的許多小任務(wù)時效率更高。尤其是構(gòu)建異步模型的 ForkJoinPool 時墓捻,對不需要合并(join)的事件類型任務(wù)也非常適用抖仅。
在 ForkJoinPool 中,線程池中每個工作線程(ForkJoinWorkerThread)都對應(yīng)一個任務(wù)隊列(WorkQueue)砖第,工作線程優(yōu)先處理來自自身隊列的任務(wù)(LIFO或FIFO順序撤卢,參數(shù) mode 決定),然后以FIFO的順序隨機竊取其他隊列中的任務(wù)梧兼。
具體思路如下:
- 每個線程都有自己的一個WorkQueue放吩,該工作隊列是一個雙端隊列。
- 隊列支持三個功能push羽杰、pop渡紫、poll
- push/pop只能被隊列的所有者線程調(diào)用,而poll可以被其他線程調(diào)用考赛。
- 劃分的子任務(wù)調(diào)用fork時惕澎,都會被push到自己的隊列中。
- 默認(rèn)情況下颜骤,工作線程從自己的雙端隊列獲出任務(wù)并執(zhí)行唧喉。
-
當(dāng)自己的隊列為空時,線程隨機從另一個線程的隊列末尾調(diào)用poll方法竊取任務(wù)。
Fork/Join 框架的執(zhí)行流程
上圖可以看出ForkJoinPool 中的任務(wù)執(zhí)行分兩種:
- 直接通過 FJP 提交的外部任務(wù)(external/submissions task)八孝,存放在 workQueues 的偶數(shù)槽位董朝;
- 通過內(nèi)部 fork 分割的子任務(wù)(Worker task),存放在 workQueues 的奇數(shù)槽位干跛。
那Fork/Join 框架的執(zhí)行流程是什么樣的?
Fork/Join類關(guān)系
ForkJoinPool繼承關(guān)系
內(nèi)部類介紹:
ForkJoinWorkerThreadFactory: 內(nèi)部線程工廠接口子姜,用于創(chuàng)建工作線程ForkJoinWorkerThread
DefaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory 的默認(rèn)實現(xiàn)類
InnocuousForkJoinWorkerThreadFactory: 實現(xiàn)了 ForkJoinWorkerThreadFactory,無許可線程工廠楼入,當(dāng)系統(tǒng)變量中有系統(tǒng)安全管理相關(guān)屬性時哥捕,默認(rèn)使用這個工廠創(chuàng)建工作線程。
EmptyTask: 內(nèi)部占位類浅辙,用于替換隊列中 join 的任務(wù)扭弧。
ManagedBlocker: 為 ForkJoinPool 中的任務(wù)提供擴展管理并行數(shù)的接口,一般用在可能會阻塞的任務(wù)(如在 Phaser 中用于等待 phase 到下一個generation)记舆。
-
WorkQueue: ForkJoinPool 的核心數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是work-stealing 模式的雙端任務(wù)隊列呼巴,內(nèi)部存放 ForkJoinTask 對象任務(wù)泽腮,使用 @Contented 注解修飾防止偽共享。
- 工作線程在運行中產(chǎn)生新的任務(wù)(通常是因為調(diào)用了 fork())時衣赶,此時可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個棧诊赊,新的任務(wù)會放入棧頂(top 位);工作線程在處理自己工作隊列的任務(wù)時府瞄,按照 LIFO 的順序碧磅。
- 工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(wù)(可能是來自于剛剛提交到 pool 的任務(wù)遵馆,或是來自于其他工作線程的隊列任務(wù))鲸郊,此時可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個 FIFO 的隊列,竊取的任務(wù)位于其他線程的工作隊列的隊首(base位)货邓。
偽共享狀態(tài): 緩存系統(tǒng)中是以緩存行(cache line)為單位存儲的秆撮。緩存行是2的整數(shù)冪個連續(xù)字節(jié),一般為32-256個字節(jié)换况。最常見的緩存行大小是64個字節(jié)职辨。當(dāng)多線程修改互相獨立的變量時,如果這些變量共享同一個緩存行戈二,就會無意中影響彼此的性能舒裤,這就是偽共享。
ForkJoinTask繼承關(guān)系
ForkJoinTask 實現(xiàn)了 Future 接口觉吭,說明它也是一個可取消的異步運算任務(wù)腾供,實際上ForkJoinTask 是 Future 的輕量級實現(xiàn),主要用在純粹是計算的函數(shù)式任務(wù)或者操作完全獨立的對象計算任務(wù)。fork 是主運行方法台腥,用于異步執(zhí)行宏赘;而 join 方法在任務(wù)結(jié)果計算完畢之后才會運行,用來合并或返回計算結(jié)果黎侈。 其內(nèi)部類都比較簡單察署,ExceptionNode 是用于存儲任務(wù)執(zhí)行期間的異常信息的單向鏈表;其余四個類是為 Runnable/Callable 任務(wù)提供的適配器類峻汉,用于把 Runnable/Callable 轉(zhuǎn)化為 ForkJoinTask 類型的任務(wù)(因為 ForkJoinPool 只可以運行 ForkJoinTask 類型的任務(wù))贴汪。
Fork/Join框架源碼解析
- 首先介紹任務(wù)的提交流程 - 外部任務(wù)(external/submissions task)提交
- 然后介紹任務(wù)的提交流程 - 子任務(wù)(Worker task)提交
- 再分析任務(wù)的執(zhí)行過程(ForkJoinWorkerThread.run()到ForkJoinTask.doExec()這一部分);
- 最后介紹任務(wù)的結(jié)果獲取(ForkJoinTask.join()和ForkJoinTask.invoke())
ForkJoinPool
核心參數(shù)
在后面的源碼解析中休吠,我們會看到大量的位運算扳埂,這些位運算都是通過我們接下來介紹的一些常量參數(shù)來計算的。
例如瘤礁,如果要更新活躍線程數(shù)阳懂,使用公式(UC_MASK & (c + AC_UNIT)) | (SP_MASK & c);c 代表當(dāng)前 ctl柜思,UC_MASK 和 SP_MASK 分別是高位和低位掩碼岩调,AC_UNIT 為活躍線程的增量數(shù),使用(UC_MASK & (c + AC_UNIT))就可以計算出高32位赡盘,然后再加上低32位(SP_MASK & c)号枕,就拼接成了一個新的ctl。
這些運算的可讀性很差陨享,看起來有些復(fù)雜葱淳。在后面源碼解析中有位運算的地方我都會加上注釋,大家只需要了解它們的作用即可抛姑。
ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量:
// Constants shared across ForkJoinPool and WorkQueue
// 限定參數(shù)
static final int SMASK = 0xffff; // 低位掩碼赞厕,也是最大索引位
static final int MAX_CAP = 0x7fff; // 工作線程最大容量
static final int EVENMASK = 0xfffe; // 偶數(shù)低位掩碼
static final int SQMASK = 0x007e; // workQueues 數(shù)組最多64個槽位
// ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
static final int SCANNING = 1; // 標(biāo)記是否正在運行任務(wù)
static final int INACTIVE = 1 << 31; // 失活狀態(tài) 負(fù)數(shù)
static final int SS_SEQ = 1 << 16; // 版本戳,防止ABA問題
// ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
static final int MODE_MASK = 0xffff << 16; // 模式掩碼
static final int LIFO_QUEUE = 0; //LIFO隊列
static final int FIFO_QUEUE = 1 << 16;//FIFO隊列
static final int SHARED_QUEUE = 1 << 31; // 共享模式隊列途戒,負(fù)數(shù)
ForkJoinPool 中的相關(guān)常量和實例字段:
// 低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
// 活躍線程數(shù)
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼
// 工作線程數(shù)
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 創(chuàng)建工作線程標(biāo)志
// 池狀態(tài)
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
// 實例字段
volatile long ctl; // 主控制參數(shù)
volatile int runState; // 運行狀態(tài)鎖
final int config; // 并行度|模式
int indexSeed; // 用于生成工作線程索引
volatile WorkQueue[] workQueues; // 主對象注冊信息坑傅,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh; // 每個工作線程的異常信息
final String workerNamePrefix; // 用于創(chuàng)建工作線程的名稱
volatile AtomicLong stealCounter; // 偷取任務(wù)總數(shù),也可作為同步監(jiān)視器
/** 靜態(tài)初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啟動或殺死線程的方法調(diào)用者的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態(tài)pool
static final ForkJoinPool common;
//并行度喷斋,對應(yīng)內(nèi)部common池
static final int commonParallelism;
//備用線程數(shù)唁毒,在tryCompensate中使用
private static int commonMaxSpares;
//創(chuàng)建workerNamePrefix(工作線程名稱前綴)時的序號
private static int poolNumberSequence;
//線程阻塞等待新的任務(wù)的超時值(以納秒為單位),默認(rèn)2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閑超時時間星爪,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
//默認(rèn)備用線程數(shù)
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數(shù)浆西,用在在awaitRunStateLock和awaitWork中
private static final int SPINS = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;
說明: ForkJoinPool 的內(nèi)部狀態(tài)都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:
- AC: 正在運行工作線程數(shù)減去目標(biāo)并行度顽腾,高16位
- TC: 總工作線程數(shù)減去目標(biāo)并行度近零,中高16位
- SS: 棧頂?shù)却€程的版本計數(shù)和狀態(tài)诺核,中低16位
- ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位
在后面的源碼解析中久信,某些地方也提取了ctl的低32位(sp=(int)ctl)來檢查工作線程狀態(tài)窖杀,例如,當(dāng)sp不為0時說明當(dāng)前還有空閑工作線程裙士。
ForkJoinPool.WorkQueue 中的相關(guān)屬性:
//初始隊列容量入客,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 實例字段
volatile int scanState; // Woker狀態(tài), <0: inactive; odd:scanning
int stackPred; // 記錄前一個棧頂?shù)腸tl
int nsteals; // 偷取任務(wù)數(shù)
int hint; // 記錄偷取者索引,初始為隨機索引
int config; // 池索引和模式
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; //下一個poll操作的索引(棧底/隊列頭)
int top; // 下一個push操作的索引(棧頂/隊列尾)
ForkJoinTask<?>[] array; // 任務(wù)數(shù)組
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當(dāng)前工作隊列的工作線程腿椎,共享模式下為null
volatile Thread parker; // 調(diào)用park阻塞期間為owner桌硫,其他情況為null
volatile ForkJoinTask<?> currentJoin; // 記錄被join過來的任務(wù)
volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務(wù)
ForkJoinTask
核心參數(shù)
/** 任務(wù)運行狀態(tài) */
volatile int status; // 任務(wù)運行狀態(tài)
static final int DONE_MASK = 0xf0000000; // 任務(wù)完成狀態(tài)標(biāo)志位
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16 等待信號
static final int SMASK = 0x0000ffff; // 低位掩碼
Fork/Join框架源碼解析
構(gòu)造函數(shù)
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
說明: 在 ForkJoinPool 中我們可以自定義四個參數(shù):
- parallelism: 并行度,默認(rèn)為CPU數(shù)啃炸,最小為1
- factory: 工作線程工廠铆隘;
- handler: 處理工作線程運行任務(wù)時的異常情況類,默認(rèn)為null南用;
- asyncMode: 是否為異步模式膀钠,默認(rèn)為 false。如果為true裹虫,表示子任務(wù)的執(zhí)行遵循 FIFO 順序并且任務(wù)不能被合并(join)托修,這種模式適用于工作線程只運行事件類型的異步任務(wù)。
在多數(shù)場景使用時恒界,如果沒有太強的業(yè)務(wù)需求,我們一般直接使用 ForkJoinPool 中的common池砚嘴,在JDK1.8之后提供了ForkJoinPool.commonPool()方法可以直接使用common池十酣,來看一下它的構(gòu)造:
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");//并行度
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");//線程工廠
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");//異常處理類
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory) ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler) ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;//默認(rèn)并行度為1
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
使用common pool的優(yōu)點就是我們可以通過指定系統(tǒng)參數(shù)的方式定義“并行度、線程工廠和異常處理類”际长;并且它使用的是同步模式耸采,也就是說可以支持任務(wù)合并(join)盆色。
執(zhí)行流程 - 外部任務(wù)(external/submissions task)提交
向 ForkJoinPool 提交任務(wù)有三種方式:
- invoke()會等待任務(wù)計算完畢并返回計算結(jié)果忠怖;
- execute()是直接向池提交一個任務(wù)來異步執(zhí)行,無返回結(jié)果簇搅;
- submit()也是異步執(zhí)行如绸,但是會返回提交的任務(wù)嘱朽,在適當(dāng)?shù)臅r候可通過task.get()獲取執(zhí)行結(jié)果。
這三種提交方式都都是調(diào)用externalPush()方法來完成怔接,所以接下來我們將從externalPush()方法開始逐步分析外部任務(wù)的執(zhí)行過程搪泳。
externalPush(ForkJoinTask<?> task)
//添加給定任務(wù)到submission隊列中
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue q;
int m;
int r = ThreadLocalRandom.getProbe();//探針值,用于計算WorkQueue槽位索引
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //獲取隨機偶數(shù)槽位的workQueue
U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定workQueue
ForkJoinTask<?>[] a;
int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;//計算任務(wù)索引位置
U.putOrderedObject(a, j, task);//任務(wù)入列
U.putOrderedInt(q, QTOP, s + 1);//更新push slot
U.putIntVolatile(q, QLOCK, 0);//解除鎖定
if (n <= 1)
signalWork(ws, q);//任務(wù)數(shù)小于1時嘗試創(chuàng)建或激活一個工作線程
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
}
externalSubmit(task);//初始化workQueues及相關(guān)屬性
}
首先說明一下externalPush和externalSubmit兩個方法的聯(lián)系: 它們的作用都是把任務(wù)放到隊列中等待執(zhí)行扼脐。不同的是岸军,externalSubmit可以說是完整版的externalPush,在任務(wù)首次提交時,需要初始化workQueues及其他相關(guān)屬性艰赞,這個初始化操作就是externalSubmit來完成的佣谐;而后再向池中提交的任務(wù)都是通過簡化版的externalSubmit-externalPush來完成。
externalPush的執(zhí)行流程很簡單: 首先找到一個隨機偶數(shù)槽位的 workQueue方妖,然后把任務(wù)放入這個 workQueue 的任務(wù)數(shù)組中狭魂,并更新top位。如果隊列的剩余任務(wù)數(shù)小于1吁断,則嘗試創(chuàng)建或激活一個工作線程來運行任務(wù)(防止在externalSubmit初始化時發(fā)生異常導(dǎo)致工作線程創(chuàng)建失敗)趁蕊。
externalSubmit(ForkJoinTask<?> task)
//任務(wù)提交
private void externalSubmit(ForkJoinTask<?> task) {
//初始化調(diào)用線程的探針值,用于計算WorkQueue索引
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (; ; ) {
WorkQueue[] ws;
WorkQueue q;
int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {// 池已關(guān)閉
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//初始化workQueues
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();//鎖定runState
try {
//初始化
if ((rs & STARTED) == 0) {
//初始化stealCounter
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
//創(chuàng)建workQueues仔役,容量為2的冪次方
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);//解鎖并更新runState
}
} else if ((q = ws[k = r & m & SQMASK]) != null) {//獲取隨機偶數(shù)槽位的workQueue
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定 workQueue
ForkJoinTask<?>[] a = q.array;//當(dāng)前workQueue的全部任務(wù)
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {//擴容
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);//放入給定任務(wù)
U.putOrderedInt(q, QTOP, s + 1);//修改push slot
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
}
if (submitted) {//任務(wù)提交成功掷伙,創(chuàng)建或激活工作線程
signalWork(ws, q);//創(chuàng)建或激活一個工作線程來運行任務(wù)
return;
}
}
move = true; // move on failure 操作失敗,重新獲取探針值
} else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // 更新索引k位值的workQueue
//else terminated
unlockRunState(rs, rs & ~RSLOCK);
} else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);//重新獲取線程探針值
}
}
說明: externalSubmit是externalPush的完整版本又兵,主要用于第一次提交任務(wù)時初始化workQueues及相關(guān)屬性任柜,并且提交給定任務(wù)到隊列中。具體執(zhí)行步驟如下:
- 如果池為終止?fàn)顟B(tài)(runState<0)沛厨,調(diào)用tryTerminate來終止線程池宙地,并拋出任務(wù)拒絕異常;
- 如果尚未初始化逆皮,就為 FJP 執(zhí)行初始化操作: 初始化stealCounter宅粥、創(chuàng)建workerQueues,然后繼續(xù)自旋电谣;
- 初始化完成后秽梅,執(zhí)行在externalPush中相同的操作: 獲取 workQueue,放入指定任務(wù)剿牺。任務(wù)提交成功后調(diào)用signalWork方法創(chuàng)建或激活線程企垦;
- 如果在步驟3中獲取到的 workQueue 為null,會在這一步中創(chuàng)建一個 workQueue晒来,創(chuàng)建成功繼續(xù)自旋執(zhí)行第三步操作钞诡;
- 如果非上述情況,或者有線程爭用資源導(dǎo)致獲取鎖失敗湃崩,就重新獲取線程探針值繼續(xù)自旋荧降。
signalWork(WorkQueue[] ws, WorkQueue q)
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c;
int sp, i;
WorkQueue v;
Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int) c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);//工作線程太少,添加新的工作線程
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
//計算ctl竹习,加上版本戳SS_SEQ避免ABA問題
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
//計算活躍線程數(shù)(高32位)并更新為下一個棧頂?shù)膕canState(低32位)
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);//喚醒阻塞線程
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
說明: 新建或喚醒一個工作線程誊抛,在externalPush、externalSubmit整陌、workQueue.push拗窃、scan中調(diào)用瞎领。如果還有空閑線程,則嘗試喚醒索引到的 WorkQueue 的parker線程随夸;如果工作線程過少((ctl & ADD_WORKER) != 0L)九默,則調(diào)用tryAddWorker添加一個新的工作線程。
tryAddWorker(long c)
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);//釋放鎖
if (stop != 0)
break;
if (add) {
createWorker();//創(chuàng)建工作線程
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
說明: 嘗試添加一個新的工作線程宾毒,首先更新ctl中的工作線程數(shù)驼修,然后調(diào)用createWorker()創(chuàng)建工作線程。
createWorker()
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);//線程創(chuàng)建失敗處理
return false;
}
說明: createWorker首先通過線程工廠創(chuàng)一個新的ForkJoinWorkerThread诈铛,然后啟動這個工作線程(wt.start())乙各。如果期間發(fā)生異常,調(diào)用deregisterWorker處理線程創(chuàng)建失敗的邏輯(deregisterWorker在后面再詳細(xì)說明)幢竹。
ForkJoinWorkerThread 的構(gòu)造函數(shù)如下:
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
可以看到 ForkJoinWorkerThread 在構(gòu)造時首先調(diào)用父類 Thread 的方法耳峦,然后為工作線程注冊pool和workQueue,而workQueue的注冊任務(wù)由ForkJoinPool.registerWorker來完成焕毫。
registerWorker()
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
//設(shè)置為守護(hù)線程
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);//構(gòu)造新的WorkQueue
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws;
int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
//生成新建WorkQueue的索引
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // Worker任務(wù)放在奇數(shù)索引位 odd-numbered indices
if (ws[i] != null) { // collision 已存在蹲坷,重新計算索引位
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
//查找可用的索引位
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {//所有索引位都被占用,對workQueues進(jìn)行擴容
workQueues = ws = Arrays.copyOf(ws, n <<= 1);//workQueues 擴容
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
說明: registerWorker是 ForkJoinWorkerThread 構(gòu)造器的回調(diào)函數(shù)邑飒,用于創(chuàng)建和記錄工作線程的 WorkQueue循签。比較簡單,就不多贅述了疙咸。注意在此為工作線程創(chuàng)建的 WorkQueue 是放在奇數(shù)索引的(代碼行: i = ((s << 1) | 1) & m;)
小結(jié)
OK县匠,外部任務(wù)的提交流程就先講到這里。在createWorker()中啟動工作線程后(wt.start())撒轮,當(dāng)為線程分配到CPU執(zhí)行時間片之后會運行 ForkJoinWorkerThread 的run方法開啟線程來執(zhí)行任務(wù)聚唐。工作線程執(zhí)行任務(wù)的流程我們在講完內(nèi)部任務(wù)提交之后會統(tǒng)一講解。
執(zhí)行流程: 子任務(wù)(Worker task)提交
子任務(wù)的提交相對比較簡單腔召,由任務(wù)的fork()方法完成。通過上面的流程圖可以看到任務(wù)被分割(fork)之后調(diào)用了ForkJoinPool.WorkQueue.push()方法直接把任務(wù)放到隊列中等待被執(zhí)行扮惦。
ForkJoinTask.fork()
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
說明: 如果當(dāng)前線程是 Worker 線程臀蛛,說明當(dāng)前任務(wù)是fork分割的子任務(wù),通過ForkJoinPool.workQueue.push()方法直接把任務(wù)放到自己的等待隊列中崖蜜;否則調(diào)用ForkJoinPool.externalPush()提交到一個隨機的等待隊列中(外部任務(wù))浊仆。
ForkJoinPool.WorkQueue.push()
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {//首次提交,創(chuàng)建或喚醒一個工作線程
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
} else if (n >= m)
growArray();
}
}
說明: 首先把任務(wù)放入等待隊列并更新top位豫领;如果當(dāng)前 WorkQueue 為新建的等待隊列(top-base<=1)抡柿,則調(diào)用signalWork方法為當(dāng)前 WorkQueue 新建或喚醒一個工作線程;如果 WorkQueue 中的任務(wù)數(shù)組容量過小等恐,則調(diào)用growArray()方法對其進(jìn)行兩倍擴容洲劣,growArray()方法源碼如下:
final ForkJoinTask<?>[] growArray() {
ForkJoinTask<?>[] oldA = array;//獲取內(nèi)部任務(wù)列表
int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
int oldMask, t, b;
//新建一個兩倍容量的任務(wù)數(shù)組
ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
(t = top) - (b = base) > 0) {
int mask = size - 1;
//從老數(shù)組中拿出數(shù)據(jù)备蚓,放到新的數(shù)組中
do { // emulate poll from old array, push to new array
ForkJoinTask<?> x;
int oldj = ((b & oldMask) << ASHIFT) + ABASE;
int j = ((b & mask) << ASHIFT) + ABASE;
x = (ForkJoinTask<?>) U.getObjectVolatile(oldA, oldj);
if (x != null &&
U.compareAndSwapObject(oldA, oldj, x, null))
U.putObjectVolatile(a, j, x);
} while (++b != t);
}
return a;
}
小結(jié)
到此,兩種任務(wù)的提交流程都已經(jīng)解析完畢囱稽,下一節(jié)我們來一起看看任務(wù)提交之后是如何被運行的郊尝。
執(zhí)行流程: 任務(wù)執(zhí)行
回到我們開始時的流程圖,在ForkJoinPool .createWorker()方法中創(chuàng)建工作線程后战惊,會啟動工作線程流昏,系統(tǒng)為工作線程分配到CPU執(zhí)行時間片之后會執(zhí)行 ForkJoinWorkerThread 的run()方法正式開始執(zhí)行任務(wù)。
ForkJoinWorkerThread.run()
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();//鉤子方法吞获,可自定義擴展
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);//鉤子方法况凉,可自定義擴展
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);//處理異常
}
}
}
}
說明: 方法很簡單,在工作線程運行前后會調(diào)用自定義鉤子函數(shù)(onStart和onTermination)各拷,任務(wù)的運行則是調(diào)用了ForkJoinPool.runWorker()刁绒。如果全部任務(wù)執(zhí)行完畢或者期間遭遇異常,則通過ForkJoinPool.deregisterWorker關(guān)閉工作線程并處理異常信息(deregisterWorker方法我們后面會詳細(xì)講解)撤逢。
ForkJoinPool.runWorker(WorkQueue w)
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t; ; ) {
if ((t = scan(w, r)) != null)//掃描任務(wù)執(zhí)行
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13;
r ^= r >>> 17;
r ^= r << 5; // xorshift
}
}
說明: runWorker是 ForkJoinWorkerThread 的主運行方法膛锭,用來依次執(zhí)行當(dāng)前工作線程中的任務(wù)。函數(shù)流程很簡單: 調(diào)用scan方法依次獲取任務(wù)蚊荣,然后調(diào)用WorkQueue .runTask運行任務(wù)初狰;如果未掃描到任務(wù),則調(diào)用awaitWork等待互例,直到工作線程/線程池終止或等待超時奢入。
ForkJoinPool.scan(WorkQueue w, int r)
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws;
int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
//初始掃描起點,自旋掃描
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
WorkQueue q;
ForkJoinTask<?>[] a;
ForkJoinTask<?> t;
int b, n;
long c;
if ((q = ws[k]) != null) {//獲取workQueue
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
//計算偏移量
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null && //取base位置任務(wù)
q.base == b) {//stable
if (ss >= 0) { //scanning
if (U.compareAndSwapObject(a, i, t, null)) {//
q.base = b + 1;//更新base位
if (n < -1) // signal others
signalWork(ws, q);//創(chuàng)建或喚醒工作線程來運行任務(wù)
return t;
}
} else if (oldSum == 0 && // try to activate 嘗試激活工作線程
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);//喚醒棧頂工作線程
}
//base位置任務(wù)為空或base位置偏移媳叨,隨機移位重新掃描
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1;
r ^= r >>> 3;
r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;//隊列任務(wù)為空腥光,記錄base位
}
//更新索引k 繼續(xù)向后查找
if ((k = (k + 1) & m) == origin) { // continue until stable
//運行到這里說明已經(jīng)掃描了全部的 workQueues,但并未掃描到任務(wù)
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;// 已經(jīng)被滅活或終止,跳出循環(huán)
//對當(dāng)前WorkQueue進(jìn)行滅活操作
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));//計算ctl為INACTIVE狀態(tài)并減少活躍線程數(shù)
w.stackPred = (int) c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);//修改scanState為inactive狀態(tài)
if (U.compareAndSwapLong(this, CTL, c, nc))//更新scanState為滅活狀態(tài)
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;//重置checkSum糊秆,繼續(xù)循環(huán)
}
}
}
return null;
}
說明: 掃描并嘗試偷取一個任務(wù)武福。使用w.hint進(jìn)行隨機索引 WorkQueue,也就是說并不一定會執(zhí)行當(dāng)前 WorkQueue 中的任務(wù)痘番,而是偷取別的Worker的任務(wù)來執(zhí)行捉片。
函數(shù)的大概執(zhí)行流程如下:
取隨機位置的一個 WorkQueue;
獲取base位的 ForkJoinTask汞舱,成功取到后更新base位并返回任務(wù)伍纫;如果取到的 WorkQueue 中任務(wù)數(shù)大于1,則調(diào)用signalWork創(chuàng)建或喚醒其他工作線程昂芜;
-
如果當(dāng)前工作線程處于不活躍狀態(tài)(INACTIVE)莹规,則調(diào)用tryRelease嘗試喚醒棧頂工作線程來執(zhí)行。
tryRelease源碼如下:
private boolean tryRelease(long c, WorkQueue v, long inc) {
int sp = (int) c, vs = (sp + SS_SEQ) & ~INACTIVE;
Thread p;
//ctl低32位等于scanState泌神,說明可以喚醒parker線程
if (v != null && v.scanState == sp) { // v is at top of stack
//計算活躍線程數(shù)(高32位)并更新為下一個棧頂?shù)膕canState(低32位)
long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
if (U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs;
if ((p = v.parker) != null)
U.unpark(p);//喚醒線程
return true;
}
}
return false;
}
如果base位任務(wù)為空或發(fā)生偏移良漱,則對索引位進(jìn)行隨機移位舞虱,然后重新掃描;
如果掃描整個workQueues之后沒有獲取到任務(wù)债热,則設(shè)置當(dāng)前工作線程為INACTIVE狀態(tài)砾嫉;然后重置checkSum,再次掃描一圈之后如果還沒有任務(wù)則跳出循環(huán)返回null窒篱。
ForkJoinPool.awaitWork(WorkQueue w, int r)
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss; ; ) {
if ((ss = w.scanState) >= 0)//正在掃描焕刮,跳出循環(huán)
break;
else if (spins > 0) {
r ^= r << 6;
r ^= r >>> 21;
r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v;
WorkQueue[] ws;
int s, j;
AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
} else if (w.qlock < 0) // 當(dāng)前workQueue已經(jīng)終止,返回false recheck after spins
return false;
else if (!Thread.interrupted()) {//判斷線程是否被中斷墙杯,并清除中斷狀態(tài)
long c, prevctl, parkTime, deadline;
int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK);//活躍線程數(shù)
if ((ac <= 0 && tryTerminate(false, false)) || //無active線程配并,嘗試終止
(runState & STOP) != 0) // pool terminating
return false;
if (ac <= 0 && ss == (int) c) { // is last waiter
//計算活躍線程數(shù)(高32位)并更新為下一個棧頂?shù)膕canState(低32位)
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short) (c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//總線程過量
return false; // else use timed wait
//計算空閑超時時間
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
} else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;//設(shè)置parker,準(zhǔn)備阻塞
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);//阻塞指定的時間
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)//正在掃描高镐,說明等到任務(wù)溉旋,跳出循環(huán)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))//未等到任務(wù),更新ctl嫉髓,返回false
return false; // shrink pool
}
}
return true;
}
說明: 回到runWorker方法观腊,如果scan方法未掃描到任務(wù),會調(diào)用awaitWork等待獲取任務(wù)算行。函數(shù)的具體執(zhí)行流程大家看源碼梧油,這里簡單說一下:
- 在等待獲取任務(wù)期間,如果工作線程或線程池已經(jīng)終止則直接返回false州邢。如果當(dāng)前無 active 線程儡陨,嘗試終止線程池并返回false,如果終止失敗并且當(dāng)前是最后一個等待的 Worker量淌,就阻塞指定的時間(IDLE_TIMEOUT)骗村;等到屆期或被喚醒后如果發(fā)現(xiàn)自己是scanning(scanState >= 0)狀態(tài),說明已經(jīng)等到任務(wù)呀枢,跳出等待返回true繼續(xù) scan胚股,否則的更新ctl并返回false。
WorkQueue.runTask()
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();//更新currentSteal并執(zhí)行任務(wù)
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();//依次執(zhí)行本地任務(wù)
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);//增加偷取任務(wù)數(shù)
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();//執(zhí)行鉤子函數(shù)
}
}
說明: 在scan方法掃描到任務(wù)之后裙秋,調(diào)用WorkQueue.runTask()來執(zhí)行獲取到的任務(wù)信轿,大概流程如下:
- 標(biāo)記scanState為正在執(zhí)行狀態(tài);
- 更新currentSteal為當(dāng)前獲取到的任務(wù)并執(zhí)行它残吩,任務(wù)的執(zhí)行調(diào)用了ForkJoinTask.doExec()方法,源碼如下:
//ForkJoinTask.doExec()
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();//執(zhí)行我們定義的任務(wù)
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
- 調(diào)用execLocalTasks依次執(zhí)行當(dāng)前WorkerQueue中的任務(wù)倘核,源碼如下:
//執(zhí)行并移除所有本地任務(wù)
final void execLocalTasks() {
int b = base, m, s;
ForkJoinTask<?>[] a = array;
if (b - (s = top - 1) <= 0 && a != null &&
(m = a.length - 1) >= 0) {
if ((config & FIFO_QUEUE) == 0) {//FIFO模式
for (ForkJoinTask<?> t; ; ) {
if ((t = (ForkJoinTask<?>) U.getAndSetObject
(a, ((m & s) << ASHIFT) + ABASE, null)) == null)//FIFO執(zhí)行泣侮,取top任務(wù)
break;
U.putOrderedInt(this, QTOP, s);
t.doExec();//執(zhí)行
if (base - (s = top - 1) > 0)
break;
}
} else
pollAndExecAll();//LIFO模式執(zhí)行,取base任務(wù)
}
}
- 更新偷取任務(wù)數(shù)紧唱;
- 還原scanState并執(zhí)行鉤子函數(shù)活尊。
ForkJoinPool.deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
//1.移除workQueue
if (wt != null && (w = wt.workQueue) != null) {//獲取ForkJoinWorkerThread的等待隊列
WorkQueue[] ws; // remove index from array
int idx = w.config & SMASK;//計算workQueue索引
int rs = lockRunState();//獲取runState鎖和當(dāng)前池運行狀態(tài)
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;//移除workQueue
unlockRunState(rs, rs & ~RSLOCK);//解除runState鎖
}
//2.減少CTL數(shù)
long c; // decrement counts
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
//3.處理被移除workQueue內(nèi)部相關(guān)參數(shù)
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks
}
//4.如果線程未終止隶校,替換被移除的workQueue并喚醒內(nèi)部線程
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
//嘗試終止線程池
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
//喚醒被替換的線程,依賴于下一步
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
//創(chuàng)建工作線程替換
else if (ex != null && (c & ADD_WORKER) != 0L) {
tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
//5.處理異常
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
說明: deregisterWorker方法用于工作線程運行完畢之后終止線程或處理工作線程異常蛹锰,主要就是清除已關(guān)閉的工作線程或回滾創(chuàng)建線程之前的操作深胳,并把傳入的異常拋給 ForkJoinTask 來處理苗分。具體步驟見源碼注釋矾策。
小結(jié)
本節(jié)我們對任務(wù)的執(zhí)行流程進(jìn)行了說明愿题,后面我們將繼續(xù)介紹任務(wù)的結(jié)果獲取(join/invoke)工闺。
獲取任務(wù)結(jié)果 - ForkJoinTask.join() / ForkJoinTask.invoke()
//合并任務(wù)結(jié)果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
//join, get, quietlyJoin的主實現(xiàn)方法
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
- invoke() :
//執(zhí)行任務(wù)投剥,并等待任務(wù)完成并返回結(jié)果
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
//invoke, quietlyInvoke的主實現(xiàn)方法
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
說明: join()方法一把是在任務(wù)fork()之后調(diào)用碰酝,用來獲取(或者叫“合并”)任務(wù)的執(zhí)行結(jié)果丐黄。
ForkJoinTask的join()和invoke()方法都可以用來獲取任務(wù)的執(zhí)行結(jié)果(另外還有g(shù)et方法也是調(diào)用了doJoin來獲取任務(wù)結(jié)果绊序,但是會響應(yīng)運行時異常)纷宇,它們對外部提交任務(wù)的執(zhí)行方式一致夸盟,都是通過externalAwaitDone方法等待執(zhí)行結(jié)果。不同的是invoke()方法會直接執(zhí)行當(dāng)前任務(wù)像捶;而join()方法則是在當(dāng)前任務(wù)在隊列 top 位時(通過tryUnpush方法判斷)才能執(zhí)行上陕,如果當(dāng)前任務(wù)不在 top 位或者任務(wù)執(zhí)行失敗調(diào)用ForkJoinPool.awaitJoin方法幫助執(zhí)行或阻塞當(dāng)前 join 任務(wù)。(所以在官方文檔中建議了我們對ForkJoinTask任務(wù)的調(diào)用順序拓春,一對 fork-join操作一般按照如下順序調(diào)用: a.fork(); b.fork(); b.join(); a.join();释簿。因為任務(wù) b 是后面進(jìn)入隊列,也就是說它是在棧頂?shù)?top 位)痘儡,在它fork()之后直接調(diào)用join()就可以直接執(zhí)行而不會調(diào)用ForkJoinPool.awaitJoin方法去等待辕万。)
在這些方法中,join()相對比較全面沉删,所以之后的講解我們將從join()開始逐步向下分析渐尿,首先看一下join()的執(zhí)行流程:
后面的源碼分析中,我們首先講解比較簡單的外部 join 任務(wù)(externalAwaitDone)矾瑰,然后再講解內(nèi)部 join 任務(wù)(從ForkJoinPool.awaitJoin()開始)砖茸。
ForkJoinTask.externalAwaitDone()
private int externalAwaitDone() {
//執(zhí)行任務(wù)
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete( // CountedCompleter任務(wù)
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); // ForkJoinTask任務(wù)
if (s >= 0 && (s = status) >= 0) {//執(zhí)行失敗,進(jìn)入等待
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //更新state
synchronized (this) {
if (status >= 0) {//SIGNAL 等待信號
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
說明: 如果當(dāng)前join為外部調(diào)用殴穴,則調(diào)用此方法執(zhí)行任務(wù)凉夯,如果任務(wù)執(zhí)行失敗就進(jìn)入等待。方法本身是很簡單的采幌,需要注意的是對不同的任務(wù)類型分兩種情況:
如果我們的任務(wù)為 CountedCompleter 類型的任務(wù)劲够,則調(diào)用externalHelpComplete方法來執(zhí)行任務(wù)。
其他類型的 ForkJoinTask 任務(wù)調(diào)用tryExternalUnpush來執(zhí)行休傍,源碼如下:
//為外部提交者提供 tryUnpush 功能(給定任務(wù)在top位時彈出任務(wù))
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
WorkQueue[] ws;
WorkQueue w;
ForkJoinTask<?>[] a;
int m, s;
int r = ThreadLocalRandom.getProbe();
if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
(w = ws[m & r & SQMASK]) != null &&
(a = w.array) != null && (s = w.top) != w.base) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; //取top位任務(wù)
if (U.compareAndSwapInt(w, QLOCK, 0, 1)) { //加鎖
if (w.top == s && w.array == a &&
U.getObject(a, j) == task &&
U.compareAndSwapObject(a, j, task, null)) { //符合條件征绎,彈出
U.putOrderedInt(w, QTOP, s - 1); //更新top
U.putOrderedInt(w, QLOCK, 0); //解鎖,返回true
return true;
}
U.compareAndSwapInt(w, QLOCK, 1, 0); //當(dāng)前任務(wù)不在top位磨取,解鎖返回false
}
}
return false;
}
tryExternalUnpush的作用就是判斷當(dāng)前任務(wù)是否在top位人柿,如果是則彈出任務(wù)柴墩,然后在externalAwaitDone中調(diào)用doExec()執(zhí)行任務(wù)。
ForkJoinPool.awaitJoin()
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin; //獲取給定Worker的join任務(wù)
U.putOrderedObject(w, QCURRENTJOIN, task); //把currentJoin替換為給定任務(wù)
//判斷是否為CountedCompleter類型的任務(wù)
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>) task : null;
for (; ; ) {
if ((s = task.status) < 0) //已經(jīng)完成|取消|異常 跳出循環(huán)
break;
if (cc != null)//CountedCompleter任務(wù)由helpComplete來完成join
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task)) //嘗試執(zhí)行
helpStealer(w, task); //隊列為空或執(zhí)行失敗凫岖,任務(wù)可能被偷江咳,幫助偷取者執(zhí)行該任務(wù)
if ((s = task.status) < 0) //已經(jīng)完成|取消|異常,跳出循環(huán)
break;
//計算任務(wù)等待時間
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {//執(zhí)行補償操作
task.internalWait(ms);//補償執(zhí)行成功哥放,任務(wù)等待指定時間
U.getAndAddLong(this, CTL, AC_UNIT);//更新活躍線程數(shù)
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);//循環(huán)結(jié)束歼指,替換為原來的join任務(wù)
}
return s;
}
說明: 如果當(dāng)前 join 任務(wù)不在Worker等待隊列的top位,或者任務(wù)執(zhí)行失敗婶芭,調(diào)用此方法來幫助執(zhí)行或阻塞當(dāng)前 join 的任務(wù)东臀。函數(shù)執(zhí)行流程如下:
- 由于每次調(diào)用awaitJoin都會優(yōu)先執(zhí)行當(dāng)前join的任務(wù),所以首先會更新currentJoin為當(dāng)前join任務(wù)犀农;
- 進(jìn)入自旋:
- 首先檢查任務(wù)是否已經(jīng)完成(通過task.status < 0判斷)惰赋,如果給定任務(wù)執(zhí)行完畢|取消|異常 則跳出循環(huán)返回執(zhí)行狀態(tài)s;
- 如果是 CountedCompleter 任務(wù)類型呵哨,調(diào)用helpComplete方法來完成join操作(后面筆者會開新篇來專門講解CountedCompleter赁濒,本篇暫時不做詳細(xì)解析);
- 非 CountedCompleter 任務(wù)類型調(diào)用WorkQueue.tryRemoveAndExec嘗試執(zhí)行任務(wù)孟害;
- 如果給定 WorkQueue 的等待隊列為空或任務(wù)執(zhí)行失敗拒炎,說明任務(wù)可能被偷,調(diào)用helpStealer幫助偷取者執(zhí)行任務(wù)(也就是說挨务,偷取者幫我執(zhí)行任務(wù)击你,我去幫偷取者執(zhí)行它的任務(wù));
- 再次判斷任務(wù)是否執(zhí)行完畢(task.status < 0)谎柄,如果任務(wù)執(zhí)行失敗丁侄,計算一個等待時間準(zhǔn)備進(jìn)行補償操作;
- 調(diào)用tryCompensate方法為給定 WorkQueue 嘗試執(zhí)行補償操作朝巫。在執(zhí)行補償期間鸿摇,如果發(fā)現(xiàn) 資源爭用|池處于unstable狀態(tài)|當(dāng)前Worker已終止,則調(diào)用ForkJoinTask.internalWait()方法等待指定的時間劈猿,任務(wù)喚醒之后繼續(xù)自旋拙吉,F(xiàn)orkJoinTask.internalWait()源碼如下:
final void internalWait(long timeout) {
int s;
if ((s = status) >= 0 && // force completer to issue notify
U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//更新任務(wù)狀態(tài)為SIGNAL(等待喚醒)
synchronized (this) {
if (status >= 0)
try { wait(timeout); } catch (InterruptedException ie) { }
else
notifyAll();
}
}
}
在awaitJoin中,我們總共調(diào)用了三個比較復(fù)雜的方法: tryRemoveAndExec揪荣、helpStealer和tryCompensate筷黔,下面我們依次講解。
WorkQueue.tryRemoveAndExec(ForkJoinTask<?> task)
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a;
int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
//從top往下自旋查找
for (ForkJoinTask<?> t; ; ) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;//計算任務(wù)索引
if ((t = (ForkJoinTask<?>) U.getObject(a, j)) == null) //獲取索引到的任務(wù)
return s + 1 == top; // shorter than expected
else if (t == task) { //給定任務(wù)為索引任務(wù)
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) { //彈出任務(wù)
U.putOrderedInt(this, QTOP, s); //更新top
removed = true;
}
} else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask()); //join任務(wù)已經(jīng)被移除佛舱,替換為一個占位任務(wù)
if (removed)
task.doExec(); //執(zhí)行
break;
} else if (t.status < 0 && s + 1 == top) { //給定任務(wù)不是top任務(wù)
if (U.compareAndSwapObject(a, j, t, null)) //彈出任務(wù)
U.putOrderedInt(this, QTOP, s);//更新top
break; // was cancelled
}
if (--n == 0) //遍歷結(jié)束
return false;
}
if (task.status < 0) //任務(wù)執(zhí)行完畢
return false;
}
}
return true;
}
說明: 從top位開始自旋向下找到給定任務(wù),如果找到把它從當(dāng)前 Worker 的任務(wù)隊列中移除并執(zhí)行它。注意返回的參數(shù): 如果任務(wù)隊列為空或者任務(wù)未執(zhí)行完畢返回true;任務(wù)執(zhí)行完畢返回false仆救。
ForkJoinPool.helpStealer(WorkQueue w, ForkJoinTask<?> task)
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent:
for (subtask = task; subtask.status >= 0; ) {
//1. 找到給定WorkQueue的偷取者v
for (int h = j.hint | 1, k = 0, i; ; k += 2) {//跳兩個索引碌补,因為Worker在奇數(shù)索引位
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {//定位到偷取者
j.hint = i;//更新stealer索引
break;
}
checkSum += v.base;
}
}
//2. 幫助偷取者v執(zhí)行任務(wù)
for (; ; ) { // help v or descend
ForkJoinTask<?>[] a; //偷取者內(nèi)部的任務(wù)
int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;//獲取偷取者的join任務(wù)
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent; // stale只恨,跳出descent循環(huán)重來
if (b - v.top >= 0 || (a = v.array) == null) {
if ((subtask = next) == null) //偷取者的join任務(wù)為null,跳出descent循環(huán)
break descent;
j = v;
break; //偷取者內(nèi)部任務(wù)為空,可能任務(wù)也被偷走了牛曹;跳出本次循環(huán)佛点,查找偷取者的偷取者
}
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;//獲取base偏移地址
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));//獲取偷取者的base任務(wù)
if (v.base == b) {
if (t == null) // stale
break descent; // stale,跳出descent循環(huán)重來
if (U.compareAndSwapObject(a, i, t, null)) {//彈出任務(wù)
v.base = b + 1; //更新偷取者的base位
ForkJoinTask<?> ps = w.currentSteal;//獲取調(diào)用者偷來的任務(wù)
int top = w.top;
//首先更新給定workQueue的currentSteal為偷取者的base任務(wù)黎比,然后執(zhí)行該任務(wù)
//然后通過檢查top來判斷給定workQueue是否有自己的任務(wù)超营,如果有,
// 則依次彈出任務(wù)(LIFO)->更新currentSteal->執(zhí)行該任務(wù)(注意這里是自己偷自己的任務(wù)執(zhí)行)
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top && //內(nèi)部有自己的任務(wù)阅虫,依次彈出執(zhí)行
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);//還原給定workQueue的currentSteal
if (w.base != w.top)//給定workQueue有自己的任務(wù)了演闭,幫助結(jié)束,返回
return; // can't further help
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
說明: 如果隊列為空或任務(wù)執(zhí)行失敗颓帝,說明任務(wù)可能被偷米碰,調(diào)用此方法來幫助偷取者執(zhí)行任務(wù)窝革。基本思想是: 偷取者幫助我執(zhí)行任務(wù)吕座,我去幫助偷取者執(zhí)行它的任務(wù)虐译。 函數(shù)執(zhí)行流程如下:
循環(huán)定位偷取者,由于Worker是在奇數(shù)索引位吴趴,所以每次會跳兩個索引位菱蔬。定位到偷取者之后,更新調(diào)用者 WorkQueue 的hint為偷取者的索引史侣,方便下次定位; 定位到偷取者后魏身,開始幫助偷取者執(zhí)行任務(wù)惊橱。從偷取者的base索引開始,每次偷取一個任務(wù)執(zhí)行箭昵。在幫助偷取者執(zhí)行任務(wù)后税朴,如果調(diào)用者發(fā)現(xiàn)本身已經(jīng)有任務(wù)(w.top != top),則依次彈出自己的任務(wù)(LIFO順序)并執(zhí)行(也就是說自己偷自己的任務(wù)執(zhí)行)家制。
ForkJoinPool.tryCompensate(WorkQueue w)
//執(zhí)行補償操作: 嘗試縮減活動線程量正林,可能釋放或創(chuàng)建一個補償線程來準(zhǔn)備阻塞
private boolean tryCompensate(WorkQueue w) {
boolean canBlock;
WorkQueue[] ws;
long c;
int m, pc, sp;
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false; //調(diào)用者已終止
else if ((sp = (int) (c = ctl)) != 0) // release idle worker
canBlock = tryRelease(c, ws[sp & m], 0L);//喚醒等待的工作線程
else {//沒有空閑線程
int ac = (int) (c >> AC_SHIFT) + pc; //活躍線程數(shù)
int tc = (short) (c >> TC_SHIFT) + pc;//總線程數(shù)
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) { // two passes of odd indices
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) {//取奇數(shù)索引位
if ((v.scanState & SCANNING) != 0)//沒有正在運行任務(wù),跳出
break;
++nbusy;//正在運行任務(wù)颤殴,添加標(biāo)記
}
}
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
else if (tc >= pc && ac > 1 && w.isEmpty()) {//總線程數(shù)大于并行度 && 活動線程數(shù)大于1 && 調(diào)用者任務(wù)隊列為空觅廓,不需要補償
long nc = ((AC_MASK & (c - AC_UNIT)) |
(~AC_MASK & c)); // uncompensated
canBlock = U.compareAndSwapLong(this, CTL, c, nc);//更新活躍線程數(shù)
} else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))//超出最大線程數(shù)
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
else { // similar to tryAddWorker
boolean add = false;
int rs; // CAS within lock
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT)));//計算總線程數(shù)
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);//更新總線程數(shù)
unlockRunState(rs, rs & ~RSLOCK);
//運行到這里說明活躍工作線程數(shù)不足付魔,需要創(chuàng)建一個新的工作線程來補償
canBlock = add && createWorker(); // throws on exception
}
}
return canBlock;
}
說明: 具體的執(zhí)行看源碼及注釋膳帕,這里我們簡單總結(jié)一下需要和不需要補償?shù)膸追N情況:
需要補償 :
- 調(diào)用者隊列不為空,并且有空閑工作線程企软,這種情況會喚醒空閑線程(調(diào)用tryRelease方法)
- 池尚未停止矮瘟,活躍線程數(shù)不足瞳脓,這時會新建一個工作線程(調(diào)用createWorker方法)
不需要補償 :
- 調(diào)用者已終止或池處于不穩(wěn)定狀態(tài)
- 總線程數(shù)大于并行度 && 活動線程數(shù)大于1 && 調(diào)用者任務(wù)隊列為空
Fork/Join的陷阱與注意事項
使用Fork/Join框架時,需要注意一些陷阱, 在下面 斐波那契數(shù)列
例子中你將看到示例:
避免不必要的fork()
劃分成兩個子任務(wù)后澈侠,不要同時調(diào)用兩個子任務(wù)的fork()方法劫侧。
表面上看上去兩個子任務(wù)都fork(),然后join()兩次似乎更自然哨啃。但事實證明烧栋,直接調(diào)用compute()效率更高。因為直接調(diào)用子任務(wù)的compute()方法實際上就是在當(dāng)前的工作線程進(jìn)行了計算(線程重用)棘催,這比“將子任務(wù)提交到工作隊列劲弦,線程又從工作隊列中拿任務(wù)”快得多。
當(dāng)一個大任務(wù)被劃分成兩個以上的子任務(wù)時醇坝,盡可能使用前面說到的三個衍生的invokeAll方法邑跪,因為使用它們能避免不必要的fork()次坡。
注意fork()、compute()画畅、join()的順序
為了兩個任務(wù)并行砸琅,三個方法的調(diào)用順序需要萬分注意。
right.fork(); // 計算右邊的任務(wù)
long leftAns = left.compute(); // 計算左邊的任務(wù)(同時右邊任務(wù)也在計算)
long rightAns = right.join(); // 等待右邊的結(jié)果
return leftAns + rightAns;
如果我們寫成:
left.fork(); // 計算完左邊的任務(wù)
long leftAns = left.join(); // 等待左邊的計算結(jié)果
long rightAns = right.compute(); // 再計算右邊的任務(wù)
return leftAns + rightAns;
或者
long rightAns = right.compute(); // 計算完右邊的任務(wù)
left.fork(); // 再計算左邊的任務(wù)
long leftAns = left.join(); // 等待左邊的計算結(jié)果
return leftAns + rightAns;
這兩種實際上都沒有并行轴踱。
選擇合適的子任務(wù)粒度
選擇劃分子任務(wù)的粒度(順序執(zhí)行的閾值)很重要症脂,因為使用Fork/Join框架并不一定比順序執(zhí)行任務(wù)的效率高: 如果任務(wù)太大,則無法提高并行的吞吐量淫僻;如果任務(wù)太小诱篷,子任務(wù)的調(diào)度開銷可能會大于并行計算的性能提升,我們還要考慮創(chuàng)建子任務(wù)雳灵、fork()子任務(wù)棕所、線程調(diào)度以及合并子任務(wù)處理結(jié)果的耗時以及相應(yīng)的內(nèi)存消耗。
官方文檔給出的粗略經(jīng)驗是: 任務(wù)應(yīng)該執(zhí)行100~10000
個基本的計算步驟悯辙。決定子任務(wù)的粒度的最好辦法是實踐琳省,通過實際測試結(jié)果來確定這個閾值才是“上上策”。
和其他Java代碼一樣躲撰,F(xiàn)ork/Join框架測試時需要“預(yù)熱”或者說執(zhí)行幾遍才會被JIT(Just-in-time)編譯器優(yōu)化针贬,所以測試性能之前跑幾遍程序很重要。
避免重量級任務(wù)劃分與結(jié)果合并
Fork/Join的很多使用場景都用到數(shù)組或者List等數(shù)據(jù)結(jié)構(gòu)拢蛋,子任務(wù)在某個分區(qū)中運行桦他,最典型的例子如并行排序和并行查找。拆分子任務(wù)以及合并處理結(jié)果的時候谆棱,應(yīng)該盡量避免System.arraycopy這樣耗時耗空間的操作瞬铸,從而最小化任務(wù)的處理開銷。
采用Fork/Join來異步計算1+2+3+…+10000的結(jié)果
public class Test {
static final class SumTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
final int start; //開始計算的數(shù)
final int end; //最后計算的數(shù)
SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//如果計算量小于1000础锐,那么分配一個線程執(zhí)行if中的代碼塊嗓节,并返回執(zhí)行結(jié)果
if(end - start < 1000) {
System.out.println(Thread.currentThread().getName() + " 開始執(zhí)行: " + start + "-" + end);
int sum = 0;
for(int i = start; i <= end; i++)
sum += i;
return sum;
}
//如果計算量大于1000,那么拆分為兩個任務(wù)
SumTask task1 = new SumTask(start, (start + end) / 2);
SumTask task2 = new SumTask((start + end) / 2 + 1, end);
//執(zhí)行任務(wù)
task1.fork();
task2.fork();
//獲取任務(wù)執(zhí)行的結(jié)果
return task1.join() + task2.join();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> task = new SumTask(1, 10000);
pool.submit(task);
System.out.println(task.get());
}
}