JUC詳解(四)線程池-ScheduledThreadPoolExecutor與Fork/Join

JUC包含幾個部分?

1)Lock框架
2)并發(fā)集合
3) 原子類

4) 線程池

5)工具類砂豌、ThreadLocal

ScheduledThreadPoolExecutor簡介

ScheduledThreadPoolExecutor繼承自 ThreadPoolExecutor赚哗,為任務(wù)提供延遲或周期執(zhí)行湘今,屬于線程池的一種虑凛。和 ThreadPoolExecutor 相比翎碑,它還具有以下幾種特性:

  • 使用專門的任務(wù)類型—ScheduledFutureTask 來執(zhí)行周期任務(wù),也可以接收不需要時間調(diào)度的任務(wù)(這些任務(wù)通過 ExecutorService 來執(zhí)行)鲁豪。
  • 使用專門的存儲隊列—DelayedWorkQueue 來存儲任務(wù),DelayedWorkQueue 是無界延遲隊列DelayQueue 的一種律秃。相比ThreadPoolExecutor也簡化了執(zhí)行機制(delayedExecute方法爬橡,后面單獨分析)。
  • 支持可選的run-after-shutdown參數(shù)棒动,在池被關(guān)閉(shutdown)之后支持可選的邏輯來決定是否繼續(xù)運行周期或延遲任務(wù)糙申。并且當(dāng)任務(wù)(重新)提交操作與 shutdown 操作重疊時,復(fù)查邏輯也不相同船惨。

ScheduledThreadPoolExecutor數(shù)據(jù)結(jié)構(gòu)

image.png

ScheduledThreadPoolExecutor繼承自 ThreadPoolExecutor:

ScheduledThreadPoolExecutor 內(nèi)部構(gòu)造了兩個內(nèi)部類 ScheduledFutureTaskDelayedWorkQueue:

  • ScheduledFutureTask: 繼承了FutureTask柜裸,說明是一個異步運算任務(wù);最上層分別實現(xiàn)了Runnable粱锐、Future疙挺、Delayed接口,說明它是一個可以延遲執(zhí)行的異步運算任務(wù)怜浅。

  • DelayedWorkQueue: 這是 ScheduledThreadPoolExecutor 為存儲周期或延遲任務(wù)專門定義的一個延遲隊列铐然,繼承了 AbstractQueue晴股,為了契合 ThreadPoolExecutor 也實現(xiàn)了 BlockingQueue 接口剩瓶。它內(nèi)部只允許存儲 RunnableScheduledFuture 類型的任務(wù)周荐。與 DelayQueue 的不同之處就是它只允許存放 RunnableScheduledFuture 對象形用,并且自己實現(xiàn)了二叉堆(DelayQueue 是利用了 PriorityQueue 的二叉堆結(jié)構(gòu))。

ScheduledThreadPoolExecutor源碼解析

內(nèi)部類ScheduledFutureTask

屬性

//為相同延時任務(wù)提供的順序編號
private final long sequenceNumber;

//任務(wù)可以執(zhí)行的時間险掀,納秒級
private long time;

//重復(fù)任務(wù)的執(zhí)行周期時間沪袭,納秒級。
private final long period;

//重新入隊的任務(wù)
RunnableScheduledFuture<V> outerTask = this;

//延遲隊列的索引樟氢,以支持更快的取消操作
int heapIndex;

  • sequenceNumber: 當(dāng)兩個任務(wù)有相同的延遲時間時冈绊,按照 FIFO 的順序入隊。sequenceNumber 就是為相同延時任務(wù)提供的順序編號埠啃。
  • time: 任務(wù)可以執(zhí)行時的時間死宣,納秒級,通過triggerTime方法計算得出碴开。
  • period: 任務(wù)的執(zhí)行周期時間毅该,納秒級。正數(shù)表示固定速率執(zhí)行(為scheduleAtFixedRate提供服務(wù))潦牛,負(fù)數(shù)表示固定延遲執(zhí)行(為scheduleWithFixedDelay提供服務(wù))眶掌,0表示不重復(fù)任務(wù)。
  • outerTask: 重新入隊的任務(wù)巴碗,通過reExecutePeriodic方法入隊重新排序朴爬。

核心方法run()

public void run() {
    boolean periodic = isPeriodic();//是否為周期任務(wù)
    if (!canRunInCurrentRunState(periodic))//當(dāng)前狀態(tài)是否可以執(zhí)行
        cancel(false);
    else if (!periodic)
        //不是周期任務(wù),直接執(zhí)行
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();//設(shè)置下一次運行時間
        reExecutePeriodic(outerTask);//重排序一個周期任務(wù)
    }
}

說明: ScheduledFutureTask 的run方法重寫了 FutureTask 的版本橡淆,以便執(zhí)行周期任務(wù)時重置/重排序任務(wù)召噩。任務(wù)的執(zhí)行通過父類 FutureTask 的run實現(xiàn)。內(nèi)部有兩個針對周期任務(wù)的方法:

  • setNextRunTime(): 用來設(shè)置下一次運行的時間逸爵,源碼如下:
//設(shè)置下一次執(zhí)行任務(wù)的時間
private void setNextRunTime() {
    long p = period;
    if (p > 0)  //固定速率執(zhí)行具滴,scheduleAtFixedRate
        time += p;
    else
        time = triggerTime(-p);  //固定延遲執(zhí)行,scheduleWithFixedDelay
}
//計算固定延遲任務(wù)的執(zhí)行時間
long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

reExecutePeriodic(): 周期任務(wù)重新入隊等待下一次執(zhí)行痊银,源碼如下:

//重排序一個周期任務(wù)
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {//池關(guān)閉后可繼續(xù)執(zhí)行
        super.getQueue().add(task);//任務(wù)入列
        //重新檢查run-after-shutdown參數(shù)抵蚊,如果不能繼續(xù)運行就移除隊列任務(wù),并取消任務(wù)的執(zhí)行
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();//啟動一個新的線程等待任務(wù)
    }
}

reExecutePeriodic與delayedExecute的執(zhí)行策略一致溯革,只不過reExecutePeriodic不會執(zhí)行拒絕策略而是直接丟掉任務(wù)。
cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    if (cancelled && removeOnCancel && heapIndex >= 0)
        remove(this);
    return cancelled;
}

ScheduledFutureTask.cancel本質(zhì)上由其父類 FutureTask.cancel 實現(xiàn)谷醉。取消任務(wù)成功后會根據(jù)removeOnCancel參數(shù)決定是否從隊列中移除此任務(wù)致稀。

核心屬性

//關(guān)閉后繼續(xù)執(zhí)行已經(jīng)存在的周期任務(wù) 
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

//關(guān)閉后繼續(xù)執(zhí)行已經(jīng)存在的延時任務(wù) 
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

//取消任務(wù)后移除 
private volatile boolean removeOnCancel = false;

//為相同延時的任務(wù)提供的順序編號,保證任務(wù)之間的FIFO順序
private static final AtomicLong sequencer = new AtomicLong();


  • continueExistingPeriodicTasksAfterShutdownexecuteExistingDelayedTasksAfterShutdown是 ScheduledThreadPoolExecutor 定義的 run-after-shutdown 參數(shù)俱尼,用來控制池關(guān)閉之后的任務(wù)執(zhí)行邏輯抖单。

  • removeOnCancel用來控制任務(wù)取消后是否從隊列中移除。當(dāng)一個已經(jīng)提交的周期或延遲任務(wù)在運行之前被取消,那么它之后將不會運行矛绘。默認(rèn)配置下耍休,這種已經(jīng)取消的任務(wù)在屆期之前不會被移除。 通過這種機制货矮,可以方便檢查和監(jiān)控線程池狀態(tài)羊精,但也可能導(dǎo)致已經(jīng)取消的任務(wù)無限滯留。為了避免這種情況的發(fā)生囚玫,我們可以通過setRemoveOnCancelPolicy方法設(shè)置移除策略喧锦,把參數(shù)removeOnCancel設(shè)為true可以在任務(wù)取消后立即從隊列中移除。

  • sequencer是為相同延時的任務(wù)提供的順序編號抓督,保證任務(wù)之間的 FIFO 順序燃少。與 ScheduledFutureTask 內(nèi)部的sequenceNumber參數(shù)作用一致。

構(gòu)造函數(shù)

首先看下構(gòu)造函數(shù)铃在,ScheduledThreadPoolExecutor 內(nèi)部有四個構(gòu)造函數(shù)阵具,這里我們只看這個最大構(gòu)造靈活度的:

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

構(gòu)造函數(shù)都是通過super調(diào)用了ThreadPoolExecutor的構(gòu)造,并且使用特定等待隊列DelayedWorkQueue定铜。

核心方法:Schedule

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));//構(gòu)造ScheduledFutureTask任務(wù)
    delayedExecute(t);//任務(wù)執(zhí)行主方法
    return t;
}

說明: schedule主要用于執(zhí)行一次性(延遲)任務(wù)阳液。函數(shù)執(zhí)行邏輯分兩步:

  • 封裝 Callable/Runnable: 首先通過triggerTime計算任務(wù)的延遲執(zhí)行時間,然后通過 ScheduledFutureTask 的構(gòu)造函數(shù)把 Runnable/Callable 任務(wù)構(gòu)造為ScheduledThreadPoolExecutor可以執(zhí)行的任務(wù)類型宿稀,最后調(diào)用decorateTask方法執(zhí)行用戶自定義的邏輯趁舀;decorateTask是一個用戶可自定義擴展的方法,默認(rèn)實現(xiàn)下直接返回封裝的RunnableScheduledFuture任務(wù)祝沸,源碼如下:
protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

  • 執(zhí)行任務(wù): 通過delayedExecute實現(xiàn)矮烹。下面我們來詳細(xì)分析。
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);//池已關(guān)閉罩锐,執(zhí)行拒絕策略
    else {
        super.getQueue().add(task);//任務(wù)入隊
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&//判斷run-after-shutdown參數(shù)
            remove(task))//移除任務(wù)
            task.cancel(false);
        else
            ensurePrestart();//啟動一個新的線程等待任務(wù)
    }
}

說明: delayedExecute是執(zhí)行任務(wù)的主方法奉狈,方法執(zhí)行邏輯如下:

  • 如果池已關(guān)閉(ctl >= SHUTDOWN),執(zhí)行任務(wù)拒絕策略涩惑;
  • 池正在運行仁期,首先把任務(wù)入隊排序;然后重新檢查池的關(guān)閉狀態(tài)竭恬,執(zhí)行如下邏輯:

A: 如果池正在運行跛蛋,或者 run-after-shutdown 參數(shù)值為true,則調(diào)用父類方法ensurePrestart啟動一個新的線程等待執(zhí)行任務(wù)痊硕。ensurePrestart源碼如下:

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

ensurePrestart是父類 ThreadPoolExecutor 的方法赊级,用于啟動一個新的工作線程等待執(zhí)行任務(wù),即使corePoolSize為0也會安排一個新線程岔绸。

B: 如果池已經(jīng)關(guān)閉理逊,并且 run-after-shutdown 參數(shù)值為false橡伞,則執(zhí)行父類(ThreadPoolExecutor)方法remove移除隊列中的指定任務(wù),成功移除后調(diào)用ScheduledFutureTask.cancel取消任務(wù)

核心方法:scheduleAtFixedRate 和 scheduleWithFixedDelay

/**
 * 創(chuàng)建一個周期執(zhí)行的任務(wù)晋被,第一次執(zhí)行延期時間為initialDelay兑徘,
 * 之后每隔period執(zhí)行一次,不等待第一次執(zhí)行完成就開始計時
 */
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //構(gòu)建RunnableScheduledFuture任務(wù)類型
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),//計算任務(wù)的延遲時間
                                      unit.toNanos(period));//計算任務(wù)的執(zhí)行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);//執(zhí)行用戶自定義邏輯
    sft.outerTask = t;//賦值給outerTask羡洛,準(zhǔn)備重新入隊等待下一次執(zhí)行
    delayedExecute(t);//執(zhí)行任務(wù)
    return t;
}

/**
 * 創(chuàng)建一個周期執(zhí)行的任務(wù)挂脑,第一次執(zhí)行延期時間為initialDelay,
 * 在第一次執(zhí)行完之后延遲delay后開始下一次執(zhí)行
 */
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    //構(gòu)建RunnableScheduledFuture任務(wù)類型
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),//計算任務(wù)的延遲時間
                                      unit.toNanos(-delay));//計算任務(wù)的執(zhí)行周期
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);//執(zhí)行用戶自定義邏輯
    sft.outerTask = t;//賦值給outerTask翘县,準(zhǔn)備重新入隊等待下一次執(zhí)行
    delayedExecute(t);//執(zhí)行任務(wù)
    return t;
}

說明: scheduleAtFixedRate和scheduleWithFixedDelay方法的邏輯與schedule類似最域。

注意scheduleAtFixedRate和scheduleWithFixedDelay的區(qū)別: 乍一看兩個方法一模一樣,其實锈麸,在unit.toNanos這一行代碼中還是有區(qū)別的镀脂。沒錯,scheduleAtFixedRate傳的是正值忘伞,而scheduleWithFixedDelay傳的則是負(fù)值薄翅,這個值就是 ScheduledFutureTask 的period屬性。

核心方法:shutdown()

public void shutdown() {
    super.shutdown();
}
//取消并清除由于關(guān)閉策略不應(yīng)該運行的所有任務(wù)
@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    //獲取run-after-shutdown參數(shù)
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {//池關(guān)閉后不保留任務(wù)
        //依次取消任務(wù)
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();//清除等待隊列
    }
    else {//池關(guān)閉后保留任務(wù)
        // Traverse snapshot to avoid iterator exceptions
        //遍歷快照以避免迭代器異常
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    //如果任務(wù)已經(jīng)取消氓奈,移除隊列中的任務(wù)
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    tryTerminate(); //終止線程池
}

說明: 池關(guān)閉方法調(diào)用了父類ThreadPoolExecutor的shutdown翘魄,具體分析見 ThreadPoolExecutor 篇。這里主要介紹以下在shutdown方法中調(diào)用的關(guān)閉鉤子onShutdown方法舀奶,它的主要作用是在關(guān)閉線程池后取消并清除由于關(guān)閉策略不應(yīng)該運行的所有任務(wù)暑竟,這里主要是根據(jù) run-after-shutdown 參數(shù)(continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown)來決定線程池關(guān)閉后是否關(guān)閉已經(jīng)存在的任務(wù)。
  • 為什么ThreadPoolExecutor 的調(diào)整策略卻不適用于 ScheduledThreadPoolExecutor育勺?

例如: 由于 ScheduledThreadPoolExecutor 是一個固定核心線程數(shù)大小的線程池但荤,并且使用了一個無界隊列,所以調(diào)整maximumPoolSize對其沒有任何影響(所以 ScheduledThreadPoolExecutor 沒有提供可以調(diào)整最大線程數(shù)的構(gòu)造函數(shù)涧至,默認(rèn)最大線程數(shù)固定為Integer.MAX_VALUE)腹躁。此外,設(shè)置corePoolSize為0或者設(shè)置核心線程空閑后清除(allowCoreThreadTimeOut)同樣也不是一個好的策略南蓬,因為一旦周期任務(wù)到達(dá)某一次運行周期時纺非,可能導(dǎo)致線程池內(nèi)沒有線程去處理這些任務(wù)。

  • Executors 提供了哪幾種方法來構(gòu)造 ScheduledThreadPoolExecutor赘方?

    • newScheduledThreadPool: 可指定核心線程數(shù)的線程池烧颖。
    • newSingleThreadScheduledExecutor: 只有一個工作線程的線程池。如果內(nèi)部工作線程由于執(zhí)行周期任務(wù)異常而被終止窄陡,則會新建一個線程替代它的位置倒信。

注意: newScheduledThreadPool(1, threadFactory) 不等價于newSingleThreadScheduledExecutor。newSingleThreadScheduledExecutor創(chuàng)建的線程池保證內(nèi)部只有一個線程執(zhí)行任務(wù)泳梆,并且線程數(shù)不可擴展鳖悠;而通過newScheduledThreadPool(1, threadFactory)創(chuàng)建的線程池可以通過setCorePoolSize方法來修改核心線程數(shù)。

  • ScheduledThreadPoolExecutor要解決什么樣的問題?
    現(xiàn)實案例:用到一個定時器處理藍(lán)牙設(shè)備接收的數(shù)據(jù)优妙,并且需要處理的頻率是很快的乘综,這就需要一個穩(wěn)定的定時器來保證數(shù)據(jù)的長久進(jìn)行。ScheduledThreadPoolExecutor這個類就是個很好的選擇套硼。

  • ScheduledThreadPoolExecutor相比ThreadPoolExecutor有哪些特性?
    1.ScheduledThreadPoolExecutor使用的是固定核心線程數(shù)大小的線程池
    2.延時卡辰、周期

  • ScheduledThreadPoolExecutor有什么樣的數(shù)據(jù)結(jié)構(gòu),核心內(nèi)部類和抽象類?
    隊列邪意。run()九妈、cancel()、

  • ScheduledThreadPoolExecutor有哪兩個關(guān)閉策略? 區(qū)別是什么?
    關(guān)閉操作雾鬼,與線程池執(zhí)行器的關(guān)閉基本相同萌朱,不同的是,在onShutdown方法策菜,調(diào)度線程池執(zhí)行器晶疼,重寫了這個方法,這個方法主要是根據(jù)線程池關(guān)閉間歇性任務(wù)和延時任務(wù)的處理策略又憨,確定是否以不可中斷方式取消任務(wù)翠霍。

  • ScheduledThreadPoolExecutor中scheduleAtFixedRate 和 scheduleWithFixedDelay區(qū)別是什么?
    在unit.toNanos這一行代碼中還是有區(qū)別的。沒錯蠢莺,scheduleAtFixedRate傳的是正值寒匙,而scheduleWithFixedDelay傳的則是負(fù)值,這個值就是 ScheduledFutureTask 的period屬性

  • 為什么ThreadPoolExecutor 的調(diào)整策略卻不適用于 ScheduledThreadPoolExecutor?
    由于 ScheduledThreadPoolExecutor 是一個固定核心線程數(shù)大小的線程池躏将,并且使用了一個無界隊列锄弱,所以調(diào)整maximumPoolSize對其沒有任何影響(所以 ScheduledThreadPoolExecutor 沒有提供可以調(diào)整最大線程數(shù)的構(gòu)造函數(shù),默認(rèn)最大線程數(shù)固定為Integer.MAX_VALUE)。此外,設(shè)置corePoolSize為0或者設(shè)置核心線程空閑后清除(allowCoreThreadTimeOut)同樣也不是一個好的策略苔可,因為一旦周期任務(wù)到達(dá)某一次運行周期時萨咳,可能導(dǎo)致線程池內(nèi)沒有線程去處理這些任務(wù)。

  • Executors 提供了幾種方法來構(gòu)造 ScheduledThreadPoolExecutor?

  • newScheduledThreadPool: 可指定核心線程數(shù)的線程池。

  • newSingleThreadScheduledExecutor: 只有一個工作線程的線程池。如果內(nèi)部工作線程由于執(zhí)行周期任務(wù)異常而被終止敞临,則會新建一個線程替代它的位置窄俏。

Fork/Join

Fork/Join框架基本使用

計算1-1001累加后的值:

/**
 * 這是一個簡單的Join/Fork計算過程缎岗,將1—1001數(shù)字相加
 */
public class TestForkJoinPool {

    private static final Integer MAX = 200;

    static class MyForkJoinTask extends RecursiveTask<Integer> {
        // 子任務(wù)開始計算的值
        private Integer startValue;

        // 子任務(wù)結(jié)束計算的值
        private Integer endValue;

        public MyForkJoinTask(Integer startValue , Integer endValue) {
            this.startValue = startValue;
            this.endValue = endValue;
        }

        @Override
        protected Integer compute() {
            // 如果條件成立普舆,說明這個任務(wù)所需要計算的數(shù)值分為足夠小了
            // 可以正式進(jìn)行累加計算了
            if(endValue - startValue < MAX) {
                System.out.println("開始計算的部分:startValue = " + startValue + ";endValue = " + endValue);
                Integer totalValue = 0;
                for(int index = this.startValue ; index <= this.endValue  ; index++) {
                    totalValue += index;
                }
                return totalValue;
            }
            // 否則再進(jìn)行任務(wù)拆分轧膘,拆分成兩個任務(wù)
            else {
                MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
                subTask1.fork();
                MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
                subTask2.fork();
                return subTask1.join() + subTask2.join();
            }
        }
    }

    public static void main(String[] args) {
        // 這是Fork/Join框架的線程池
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture =  pool.submit(new MyForkJoinTask(1,1001));
        try {
            Integer result = taskFuture.get();
            System.out.println("result = " + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace(System.out);
        }
    }
}
開始計算的部分:startValue = 1;endValue = 126
開始計算的部分:startValue = 127;endValue = 251
開始計算的部分:startValue = 252;endValue = 376
開始計算的部分:startValue = 377;endValue = 501
開始計算的部分:startValue = 502;endValue = 626
開始計算的部分:startValue = 627;endValue = 751
開始計算的部分:startValue = 752;endValue = 876
開始計算的部分:startValue = 877;endValue = 1001
result = 501501

Fork/Join框架簡介

Fork/Join框架是Java并發(fā)工具包中的一種可以將一個大任務(wù)拆分為很多小任務(wù)來異步執(zhí)行的工具,自JDK1.7引入。

三個模塊及關(guān)系

Fork/Join框架主要包含三個模塊:

  • 任務(wù)對象: ForkJoinTask (包括RecursiveTaskRecursiveActionCountedCompleter)
  • 執(zhí)行Fork/Join任務(wù)的線程: ForkJoinWorkerThread
  • 線程池: ForkJoinPool
    這三者的關(guān)系是: ForkJoinPool可以通過池中的ForkJoinWorkerThread來處理ForkJoinTask任務(wù)候学。
// from 《A Java Fork/Join Framework》Dong Lea
Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

ForkJoinPool 只接收 ForkJoinTask 任務(wù)(在實際使用中,也可以接收 Runnable/Callable 任務(wù)焰坪,但在真正運行時趣倾,也會把這些任務(wù)封裝成 ForkJoinTask 類型的任務(wù)),RecursiveTask 是 ForkJoinTask 的子類某饰,是一個可以遞歸執(zhí)行的 ForkJoinTask儒恋,RecursiveAction 是一個無返回值的 RecursiveTask,CountedCompleter 在任務(wù)完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)黔漂。

在實際運用中诫尽,我們一般都會繼承 RecursiveTaskRecursiveActionCountedCompleter 來實現(xiàn)我們的業(yè)務(wù)需求炬守,而不會直接繼承 ForkJoinTask 類牧嫉。

核心思想: 分治算法(Divide-and-Conquer)

分治算法(Divide-and-Conquer)把任務(wù)遞歸的拆分為各個子任務(wù),這樣可以更好的利用系統(tǒng)資源减途,盡可能的使用所有可用的計算能力來提升應(yīng)用性能酣藻。首先看一下 Fork/Join 框架的任務(wù)運行機制:

image.png

核心思想: work-stealing(工作竊取)算法

work-stealing(工作竊取)算法: 線程池內(nèi)的所有工作線程都嘗試找到并執(zhí)行已經(jīng)提交的任務(wù),或者是被其他活動任務(wù)創(chuàng)建的子任務(wù)(如果不存在就阻塞等待)鳍置。這種特性使得 ForkJoinPool 在運行多個可以產(chǎn)生子任務(wù)的任務(wù)辽剧,或者是提交的許多小任務(wù)時效率更高。尤其是構(gòu)建異步模型的 ForkJoinPool 時墓捻,對不需要合并(join)的事件類型任務(wù)也非常適用抖仅。

在 ForkJoinPool 中,線程池中每個工作線程(ForkJoinWorkerThread)都對應(yīng)一個任務(wù)隊列(WorkQueue)砖第,工作線程優(yōu)先處理來自自身隊列的任務(wù)(LIFO或FIFO順序撤卢,參數(shù) mode 決定),然后以FIFO的順序隨機竊取其他隊列中的任務(wù)梧兼。

具體思路如下:

  • 每個線程都有自己的一個WorkQueue放吩,該工作隊列是一個雙端隊列。
  • 隊列支持三個功能push羽杰、pop渡紫、poll
  • push/pop只能被隊列的所有者線程調(diào)用,而poll可以被其他線程調(diào)用考赛。
  • 劃分的子任務(wù)調(diào)用fork時惕澎,都會被push到自己的隊列中。
  • 默認(rèn)情況下颜骤,工作線程從自己的雙端隊列獲出任務(wù)并執(zhí)行唧喉。
  • 當(dāng)自己的隊列為空時,線程隨機從另一個線程的隊列末尾調(diào)用poll方法竊取任務(wù)。


    image.png

Fork/Join 框架的執(zhí)行流程

上圖可以看出ForkJoinPool 中的任務(wù)執(zhí)行分兩種:

  • 直接通過 FJP 提交的外部任務(wù)(external/submissions task)八孝,存放在 workQueues 的偶數(shù)槽位董朝;
  • 通過內(nèi)部 fork 分割的子任務(wù)(Worker task),存放在 workQueues 的奇數(shù)槽位干跛。

那Fork/Join 框架的執(zhí)行流程是什么樣的?

image.png

Fork/Join類關(guān)系

ForkJoinPool繼承關(guān)系

image.png

內(nèi)部類介紹:

  • ForkJoinWorkerThreadFactory: 內(nèi)部線程工廠接口子姜,用于創(chuàng)建工作線程ForkJoinWorkerThread

  • DefaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory 的默認(rèn)實現(xiàn)類

  • InnocuousForkJoinWorkerThreadFactory: 實現(xiàn)了 ForkJoinWorkerThreadFactory,無許可線程工廠楼入,當(dāng)系統(tǒng)變量中有系統(tǒng)安全管理相關(guān)屬性時哥捕,默認(rèn)使用這個工廠創(chuàng)建工作線程。

  • EmptyTask: 內(nèi)部占位類浅辙,用于替換隊列中 join 的任務(wù)扭弧。

  • ManagedBlocker: 為 ForkJoinPool 中的任務(wù)提供擴展管理并行數(shù)的接口,一般用在可能會阻塞的任務(wù)(如在 Phaser 中用于等待 phase 到下一個generation)记舆。

  • WorkQueue: ForkJoinPool 的核心數(shù)據(jù)結(jié)構(gòu),本質(zhì)上是work-stealing 模式的雙端任務(wù)隊列呼巴,內(nèi)部存放 ForkJoinTask 對象任務(wù)泽腮,使用 @Contented 注解修飾防止偽共享。

    • 工作線程在運行中產(chǎn)生新的任務(wù)(通常是因為調(diào)用了 fork())時衣赶,此時可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個棧诊赊,新的任務(wù)會放入棧頂(top 位);工作線程在處理自己工作隊列的任務(wù)時府瞄,按照 LIFO 的順序碧磅。
    • 工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(wù)(可能是來自于剛剛提交到 pool 的任務(wù)遵馆,或是來自于其他工作線程的隊列任務(wù))鲸郊,此時可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個 FIFO 的隊列,竊取的任務(wù)位于其他線程的工作隊列的隊首(base位)货邓。
  • 偽共享狀態(tài): 緩存系統(tǒng)中是以緩存行(cache line)為單位存儲的秆撮。緩存行是2的整數(shù)冪個連續(xù)字節(jié),一般為32-256個字節(jié)换况。最常見的緩存行大小是64個字節(jié)职辨。當(dāng)多線程修改互相獨立的變量時,如果這些變量共享同一個緩存行戈二,就會無意中影響彼此的性能舒裤,這就是偽共享。

ForkJoinTask繼承關(guān)系

image.png

ForkJoinTask 實現(xiàn)了 Future 接口觉吭,說明它也是一個可取消的異步運算任務(wù)腾供,實際上ForkJoinTask 是 Future 的輕量級實現(xiàn),主要用在純粹是計算的函數(shù)式任務(wù)或者操作完全獨立的對象計算任務(wù)。fork 是主運行方法台腥,用于異步執(zhí)行宏赘;而 join 方法在任務(wù)結(jié)果計算完畢之后才會運行,用來合并或返回計算結(jié)果黎侈。 其內(nèi)部類都比較簡單察署,ExceptionNode 是用于存儲任務(wù)執(zhí)行期間的異常信息的單向鏈表;其余四個類是為 Runnable/Callable 任務(wù)提供的適配器類峻汉,用于把 Runnable/Callable 轉(zhuǎn)化為 ForkJoinTask 類型的任務(wù)(因為 ForkJoinPool 只可以運行 ForkJoinTask 類型的任務(wù))贴汪。

Fork/Join框架源碼解析

  • 首先介紹任務(wù)的提交流程 - 外部任務(wù)(external/submissions task)提交
  • 然后介紹任務(wù)的提交流程 - 子任務(wù)(Worker task)提交
  • 再分析任務(wù)的執(zhí)行過程(ForkJoinWorkerThread.run()到ForkJoinTask.doExec()這一部分);
  • 最后介紹任務(wù)的結(jié)果獲取(ForkJoinTask.join()和ForkJoinTask.invoke())

ForkJoinPool

核心參數(shù)

在后面的源碼解析中休吠,我們會看到大量的位運算扳埂,這些位運算都是通過我們接下來介紹的一些常量參數(shù)來計算的。

例如瘤礁,如果要更新活躍線程數(shù)阳懂,使用公式(UC_MASK & (c + AC_UNIT)) | (SP_MASK & c);c 代表當(dāng)前 ctl柜思,UC_MASK 和 SP_MASK 分別是高位和低位掩碼岩调,AC_UNIT 為活躍線程的增量數(shù),使用(UC_MASK & (c + AC_UNIT))就可以計算出高32位赡盘,然后再加上低32位(SP_MASK & c)号枕,就拼接成了一個新的ctl。

這些運算的可讀性很差陨享,看起來有些復(fù)雜葱淳。在后面源碼解析中有位運算的地方我都會加上注釋,大家只需要了解它們的作用即可抛姑。

ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量:

// Constants shared across ForkJoinPool and WorkQueue

// 限定參數(shù)
static final int SMASK = 0xffff;        //  低位掩碼赞厕,也是最大索引位
static final int MAX_CAP = 0x7fff;        //  工作線程最大容量
static final int EVENMASK = 0xfffe;        //  偶數(shù)低位掩碼
static final int SQMASK = 0x007e;        //  workQueues 數(shù)組最多64個槽位

// ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
static final int SCANNING = 1;             // 標(biāo)記是否正在運行任務(wù)
static final int INACTIVE = 1 << 31;       // 失活狀態(tài)  負(fù)數(shù)
static final int SS_SEQ = 1 << 16;       // 版本戳,防止ABA問題

// ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
static final int MODE_MASK = 0xffff << 16;  // 模式掩碼
static final int LIFO_QUEUE = 0; //LIFO隊列
static final int FIFO_QUEUE = 1 << 16;//FIFO隊列
static final int SHARED_QUEUE = 1 << 31;       // 共享模式隊列途戒,負(fù)數(shù)

ForkJoinPool 中的相關(guān)常量和實例字段:

//  低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;

// 活躍線程數(shù)
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼

// 工作線程數(shù)
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 創(chuàng)建工作線程標(biāo)志

// 池狀態(tài)
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;

// 實例字段
volatile long ctl;                   // 主控制參數(shù)
volatile int runState;               // 運行狀態(tài)鎖
final int config;                    // 并行度|模式
int indexSeed;                       // 用于生成工作線程索引
volatile WorkQueue[] workQueues;     // 主對象注冊信息坑傅,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh;  // 每個工作線程的異常信息
final String workerNamePrefix;       // 用于創(chuàng)建工作線程的名稱
volatile AtomicLong stealCounter;    // 偷取任務(wù)總數(shù),也可作為同步監(jiān)視器

/** 靜態(tài)初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啟動或殺死線程的方法調(diào)用者的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態(tài)pool
static final ForkJoinPool common;
//并行度喷斋,對應(yīng)內(nèi)部common池
static final int commonParallelism;
//備用線程數(shù)唁毒,在tryCompensate中使用
private static int commonMaxSpares;
//創(chuàng)建workerNamePrefix(工作線程名稱前綴)時的序號
private static int poolNumberSequence;
//線程阻塞等待新的任務(wù)的超時值(以納秒為單位),默認(rèn)2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閑超時時間星爪,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms
//默認(rèn)備用線程數(shù)
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數(shù)浆西,用在在awaitRunStateLock和awaitWork中
private static final int SPINS  = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;

說明: ForkJoinPool 的內(nèi)部狀態(tài)都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:

  • AC: 正在運行工作線程數(shù)減去目標(biāo)并行度顽腾,高16位
  • TC: 總工作線程數(shù)減去目標(biāo)并行度近零,中高16位
  • SS: 棧頂?shù)却€程的版本計數(shù)和狀態(tài)诺核,中低16位
  • ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位

在后面的源碼解析中久信,某些地方也提取了ctl的低32位(sp=(int)ctl)來檢查工作線程狀態(tài)窖杀,例如,當(dāng)sp不為0時說明當(dāng)前還有空閑工作線程裙士。
ForkJoinPool.WorkQueue 中的相關(guān)屬性:

//初始隊列容量入客,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// 實例字段
volatile int scanState;    // Woker狀態(tài), <0: inactive; odd:scanning
int stackPred;             // 記錄前一個棧頂?shù)腸tl
int nsteals;               // 偷取任務(wù)數(shù)
int hint;                  // 記錄偷取者索引,初始為隨機索引
int config;                // 池索引和模式
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         //下一個poll操作的索引(棧底/隊列頭)
int top;                   //  下一個push操作的索引(棧頂/隊列尾)
ForkJoinTask<?>[] array;   // 任務(wù)數(shù)組
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當(dāng)前工作隊列的工作線程腿椎,共享模式下為null
volatile Thread parker;    // 調(diào)用park阻塞期間為owner桌硫,其他情況為null
volatile ForkJoinTask<?> currentJoin;  // 記錄被join過來的任務(wù)
volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務(wù)

ForkJoinTask

核心參數(shù)

/** 任務(wù)運行狀態(tài) */
volatile int status; // 任務(wù)運行狀態(tài)
static final int DONE_MASK   = 0xf0000000;  // 任務(wù)完成狀態(tài)標(biāo)志位
static final int NORMAL      = 0xf0000000;  // must be negative
static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 等待信號
static final int SMASK       = 0x0000ffff;  //  低位掩碼

Fork/Join框架源碼解析

構(gòu)造函數(shù)

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
            checkFactory(factory),
            handler,
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
            "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

說明: 在 ForkJoinPool 中我們可以自定義四個參數(shù):

  • parallelism: 并行度,默認(rèn)為CPU數(shù)啃炸,最小為1
  • factory: 工作線程工廠铆隘;
  • handler: 處理工作線程運行任務(wù)時的異常情況類,默認(rèn)為null南用;
  • asyncMode: 是否為異步模式膀钠,默認(rèn)為 false。如果為true裹虫,表示子任務(wù)的執(zhí)行遵循 FIFO 順序并且任務(wù)不能被合并(join)托修,這種模式適用于工作線程只運行事件類型的異步任務(wù)。

在多數(shù)場景使用時恒界,如果沒有太強的業(yè)務(wù)需求,我們一般直接使用 ForkJoinPool 中的common池砚嘴,在JDK1.8之后提供了ForkJoinPool.commonPool()方法可以直接使用common池十酣,來看一下它的構(gòu)造:

private static ForkJoinPool makeCommonPool() {
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing
        String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");//并行度
        String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");//線程工廠
        String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");//異常處理類
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory) ClassLoader.
                    getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler) ClassLoader.
                    getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;//默認(rèn)并行度為1
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
            "ForkJoinPool.commonPool-worker-");
}

使用common pool的優(yōu)點就是我們可以通過指定系統(tǒng)參數(shù)的方式定義“并行度、線程工廠和異常處理類”际长;并且它使用的是同步模式耸采,也就是說可以支持任務(wù)合并(join)盆色。

執(zhí)行流程 - 外部任務(wù)(external/submissions task)提交

向 ForkJoinPool 提交任務(wù)有三種方式:

  • invoke()會等待任務(wù)計算完畢并返回計算結(jié)果忠怖;
  • execute()是直接向池提交一個任務(wù)來異步執(zhí)行,無返回結(jié)果簇搅;
  • submit()也是異步執(zhí)行如绸,但是會返回提交的任務(wù)嘱朽,在適當(dāng)?shù)臅r候可通過task.get()獲取執(zhí)行結(jié)果。

這三種提交方式都都是調(diào)用externalPush()方法來完成怔接,所以接下來我們將從externalPush()方法開始逐步分析外部任務(wù)的執(zhí)行過程搪泳。

externalPush(ForkJoinTask<?> task)

//添加給定任務(wù)到submission隊列中
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws;
    WorkQueue q;
    int m;
    int r = ThreadLocalRandom.getProbe();//探針值,用于計算WorkQueue槽位索引
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //獲取隨機偶數(shù)槽位的workQueue
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定workQueue
        ForkJoinTask<?>[] a;
        int am, n, s;
        if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;//計算任務(wù)索引位置
            U.putOrderedObject(a, j, task);//任務(wù)入列
            U.putOrderedInt(q, QTOP, s + 1);//更新push slot
            U.putIntVolatile(q, QLOCK, 0);//解除鎖定
            if (n <= 1)
                signalWork(ws, q);//任務(wù)數(shù)小于1時嘗試創(chuàng)建或激活一個工作線程
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
    }
    externalSubmit(task);//初始化workQueues及相關(guān)屬性
}

首先說明一下externalPush和externalSubmit兩個方法的聯(lián)系: 它們的作用都是把任務(wù)放到隊列中等待執(zhí)行扼脐。不同的是岸军,externalSubmit可以說是完整版的externalPush,在任務(wù)首次提交時,需要初始化workQueues及其他相關(guān)屬性艰赞,這個初始化操作就是externalSubmit來完成的佣谐;而后再向池中提交的任務(wù)都是通過簡化版的externalSubmit-externalPush來完成。

externalPush的執(zhí)行流程很簡單: 首先找到一個隨機偶數(shù)槽位的 workQueue方妖,然后把任務(wù)放入這個 workQueue 的任務(wù)數(shù)組中狭魂,并更新top位。如果隊列的剩余任務(wù)數(shù)小于1吁断,則嘗試創(chuàng)建或激活一個工作線程來運行任務(wù)(防止在externalSubmit初始化時發(fā)生異常導(dǎo)致工作線程創(chuàng)建失敗)趁蕊。

externalSubmit(ForkJoinTask<?> task)

//任務(wù)提交
private void externalSubmit(ForkJoinTask<?> task) {
    //初始化調(diào)用線程的探針值,用于計算WorkQueue索引
    int r;                                    // initialize caller's probe
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    for (; ; ) {
        WorkQueue[] ws;
        WorkQueue q;
        int rs, m, k;
        boolean move = false;
        if ((rs = runState) < 0) {// 池已關(guān)閉
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        //初始化workQueues
        else if ((rs & STARTED) == 0 ||     // initialize
                ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();//鎖定runState
            try {
                //初始化
                if ((rs & STARTED) == 0) {
                    //初始化stealCounter
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                            new AtomicLong());
                    //創(chuàng)建workQueues仔役,容量為2的冪次方
                    // create workQueues array with size a power of two
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1;
                    n |= n >>> 2;
                    n |= n >>> 4;
                    n |= n >>> 8;
                    n |= n >>> 16;
                    n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);//解鎖并更新runState
            }
        } else if ((q = ws[k = r & m & SQMASK]) != null) {//獲取隨機偶數(shù)槽位的workQueue
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定 workQueue
                ForkJoinTask<?>[] a = q.array;//當(dāng)前workQueue的全部任務(wù)
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {//擴容
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);//放入給定任務(wù)
                        U.putOrderedInt(q, QTOP, s + 1);//修改push slot
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
                }
                if (submitted) {//任務(wù)提交成功掷伙,創(chuàng)建或激活工作線程
                    signalWork(ws, q);//創(chuàng)建或激活一個工作線程來運行任務(wù)
                    return;
                }
            }
            move = true;                   // move on failure 操作失敗,重新獲取探針值
        } else if (((rs = runState) & RSLOCK) == 0) { // create new queue
            q = new WorkQueue(this, null);
            q.hint = r;
            q.config = k | SHARED_QUEUE;
            q.scanState = INACTIVE;
            rs = lockRunState();           // publish index
            if (rs > 0 && (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                ws[k] = q;                 // 更新索引k位值的workQueue
            //else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        } else
            move = true;                   // move if busy
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);//重新獲取線程探針值
    }
}

說明: externalSubmit是externalPush的完整版本又兵,主要用于第一次提交任務(wù)時初始化workQueues及相關(guān)屬性任柜,并且提交給定任務(wù)到隊列中。具體執(zhí)行步驟如下:

  • 如果池為終止?fàn)顟B(tài)(runState<0)沛厨,調(diào)用tryTerminate來終止線程池宙地,并拋出任務(wù)拒絕異常;
  • 如果尚未初始化逆皮,就為 FJP 執(zhí)行初始化操作: 初始化stealCounter宅粥、創(chuàng)建workerQueues,然后繼續(xù)自旋电谣;
  • 初始化完成后秽梅,執(zhí)行在externalPush中相同的操作: 獲取 workQueue,放入指定任務(wù)剿牺。任務(wù)提交成功后調(diào)用signalWork方法創(chuàng)建或激活線程企垦;
  • 如果在步驟3中獲取到的 workQueue 為null,會在這一步中創(chuàng)建一個 workQueue晒来,創(chuàng)建成功繼續(xù)自旋執(zhí)行第三步操作钞诡;
  • 如果非上述情況,或者有線程爭用資源導(dǎo)致獲取鎖失敗湃崩,就重新獲取線程探針值繼續(xù)自旋荧降。

signalWork(WorkQueue[] ws, WorkQueue q)

final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c;
    int sp, i;
    WorkQueue v;
    Thread p;
    while ((c = ctl) < 0L) {                       // too few active
        if ((sp = (int) c) == 0) {                  // no idle workers
            if ((c & ADD_WORKER) != 0L)            // too few workers
                tryAddWorker(c);//工作線程太少,添加新的工作線程
            break;
        }
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        //計算ctl竹习,加上版本戳SS_SEQ避免ABA問題
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        //計算活躍線程數(shù)(高32位)并更新為下一個棧頂?shù)膕canState(低32位)
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p);//喚醒阻塞線程
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

說明: 新建或喚醒一個工作線程誊抛,在externalPush、externalSubmit整陌、workQueue.push拗窃、scan中調(diào)用瞎领。如果還有空閑線程,則嘗試喚醒索引到的 WorkQueue 的parker線程随夸;如果工作線程過少((ctl & ADD_WORKER) != 0L)九默,則調(diào)用tryAddWorker添加一個新的工作線程。

tryAddWorker(long c)

private void tryAddWorker(long c) {
    boolean add = false;
    do {
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        if (ctl == c) {
            int rs, stop;                 // check if terminating
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);//釋放鎖
            if (stop != 0)
                break;
            if (add) {
                createWorker();//創(chuàng)建工作線程
                break;
            }
        }
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

說明: 嘗試添加一個新的工作線程宾毒,首先更新ctl中的工作線程數(shù)驼修,然后調(diào)用createWorker()創(chuàng)建工作線程。

createWorker()

private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    deregisterWorker(wt, ex);//線程創(chuàng)建失敗處理
    return false;
}

說明: createWorker首先通過線程工廠創(chuàng)一個新的ForkJoinWorkerThread诈铛,然后啟動這個工作線程(wt.start())乙各。如果期間發(fā)生異常,調(diào)用deregisterWorker處理線程創(chuàng)建失敗的邏輯(deregisterWorker在后面再詳細(xì)說明)幢竹。

ForkJoinWorkerThread 的構(gòu)造函數(shù)如下:

protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

可以看到 ForkJoinWorkerThread 在構(gòu)造時首先調(diào)用父類 Thread 的方法耳峦,然后為工作線程注冊pool和workQueue,而workQueue的注冊任務(wù)由ForkJoinPool.registerWorker來完成焕毫。

registerWorker()

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    //設(shè)置為守護(hù)線程
    wt.setDaemon(true);                           // configure thread
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);
    WorkQueue w = new WorkQueue(this, wt);//構(gòu)造新的WorkQueue
    int i = 0;                                    // assign a pool index
    int mode = config & MODE_MASK;
    int rs = lockRunState();
    try {
        WorkQueue[] ws;
        int n;                    // skip if no array
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            //生成新建WorkQueue的索引
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            int m = n - 1;
            i = ((s << 1) | 1) & m;               // Worker任務(wù)放在奇數(shù)索引位 odd-numbered indices
            if (ws[i] != null) {                  // collision 已存在蹲坷,重新計算索引位
                int probes = 0;                   // step by approx half n
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                //查找可用的索引位
                while (ws[i = (i + step) & m] != null) {
                    if (++probes >= n) {//所有索引位都被占用,對workQueues進(jìn)行擴容
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);//workQueues 擴容
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            w.hint = s;                           // use as random seed
            w.config = i | mode;
            w.scanState = i;                      // publication fence
            ws[i] = w;
        }
    } finally {
        unlockRunState(rs, rs & ~RSLOCK);
    }
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

說明: registerWorker是 ForkJoinWorkerThread 構(gòu)造器的回調(diào)函數(shù)邑飒,用于創(chuàng)建和記錄工作線程的 WorkQueue循签。比較簡單,就不多贅述了疙咸。注意在此為工作線程創(chuàng)建的 WorkQueue 是放在奇數(shù)索引的(代碼行: i = ((s << 1) | 1) & m;)

小結(jié)

OK县匠,外部任務(wù)的提交流程就先講到這里。在createWorker()中啟動工作線程后(wt.start())撒轮,當(dāng)為線程分配到CPU執(zhí)行時間片之后會運行 ForkJoinWorkerThread 的run方法開啟線程來執(zhí)行任務(wù)聚唐。工作線程執(zhí)行任務(wù)的流程我們在講完內(nèi)部任務(wù)提交之后會統(tǒng)一講解。

執(zhí)行流程: 子任務(wù)(Worker task)提交

子任務(wù)的提交相對比較簡單腔召,由任務(wù)的fork()方法完成。通過上面的流程圖可以看到任務(wù)被分割(fork)之后調(diào)用了ForkJoinPool.WorkQueue.push()方法直接把任務(wù)放到隊列中等待被執(zhí)行扮惦。

ForkJoinTask.fork()

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

說明: 如果當(dāng)前線程是 Worker 線程臀蛛,說明當(dāng)前任務(wù)是fork分割的子任務(wù),通過ForkJoinPool.workQueue.push()方法直接把任務(wù)放到自己的等待隊列中崖蜜;否則調(diào)用ForkJoinPool.externalPush()提交到一個隨機的等待隊列中(外部任務(wù))浊仆。

ForkJoinPool.WorkQueue.push()

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a;
    ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {//首次提交,創(chuàng)建或喚醒一個工作線程
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        } else if (n >= m)
            growArray();
    }
}

說明: 首先把任務(wù)放入等待隊列并更新top位豫领;如果當(dāng)前 WorkQueue 為新建的等待隊列(top-base<=1)抡柿,則調(diào)用signalWork方法為當(dāng)前 WorkQueue 新建或喚醒一個工作線程;如果 WorkQueue 中的任務(wù)數(shù)組容量過小等恐,則調(diào)用growArray()方法對其進(jìn)行兩倍擴容洲劣,growArray()方法源碼如下:

final ForkJoinTask<?>[] growArray() {
    ForkJoinTask<?>[] oldA = array;//獲取內(nèi)部任務(wù)列表
    int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
    if (size > MAXIMUM_QUEUE_CAPACITY)
        throw new RejectedExecutionException("Queue capacity exceeded");
    int oldMask, t, b;
    //新建一個兩倍容量的任務(wù)數(shù)組
    ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
    if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
            (t = top) - (b = base) > 0) {
        int mask = size - 1;
        //從老數(shù)組中拿出數(shù)據(jù)备蚓,放到新的數(shù)組中
        do { // emulate poll from old array, push to new array
            ForkJoinTask<?> x;
            int oldj = ((b & oldMask) << ASHIFT) + ABASE;
            int j = ((b & mask) << ASHIFT) + ABASE;
            x = (ForkJoinTask<?>) U.getObjectVolatile(oldA, oldj);
            if (x != null &&
                    U.compareAndSwapObject(oldA, oldj, x, null))
                U.putObjectVolatile(a, j, x);
        } while (++b != t);
    }
    return a;
}

小結(jié)

到此,兩種任務(wù)的提交流程都已經(jīng)解析完畢囱稽,下一節(jié)我們來一起看看任務(wù)提交之后是如何被運行的郊尝。

執(zhí)行流程: 任務(wù)執(zhí)行

回到我們開始時的流程圖,在ForkJoinPool .createWorker()方法中創(chuàng)建工作線程后战惊,會啟動工作線程流昏,系統(tǒng)為工作線程分配到CPU執(zhí)行時間片之后會執(zhí)行 ForkJoinWorkerThread 的run()方法正式開始執(zhí)行任務(wù)。

ForkJoinWorkerThread.run()

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();//鉤子方法吞获,可自定義擴展
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            try {
                onTermination(exception);//鉤子方法况凉,可自定義擴展
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);//處理異常
            }
        }
    }
}

說明: 方法很簡單,在工作線程運行前后會調(diào)用自定義鉤子函數(shù)(onStart和onTermination)各拷,任務(wù)的運行則是調(diào)用了ForkJoinPool.runWorker()刁绒。如果全部任務(wù)執(zhí)行完畢或者期間遭遇異常,則通過ForkJoinPool.deregisterWorker關(guān)閉工作線程并處理異常信息(deregisterWorker方法我們后面會詳細(xì)講解)撤逢。

ForkJoinPool.runWorker(WorkQueue w)

final void runWorker(WorkQueue w) {
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask<?> t; ; ) {
        if ((t = scan(w, r)) != null)//掃描任務(wù)執(zhí)行
            w.runTask(t);
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13;
        r ^= r >>> 17;
        r ^= r << 5; // xorshift
    }
}

說明: runWorker是 ForkJoinWorkerThread 的主運行方法膛锭,用來依次執(zhí)行當(dāng)前工作線程中的任務(wù)。函數(shù)流程很簡單: 調(diào)用scan方法依次獲取任務(wù)蚊荣,然后調(diào)用WorkQueue .runTask運行任務(wù)初狰;如果未掃描到任務(wù),則調(diào)用awaitWork等待互例,直到工作線程/線程池終止或等待超時奢入。

ForkJoinPool.scan(WorkQueue w, int r)

private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws;
    int m;
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
        int ss = w.scanState;                     // initially non-negative
        //初始掃描起點,自旋掃描
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
            WorkQueue q;
            ForkJoinTask<?>[] a;
            ForkJoinTask<?> t;
            int b, n;
            long c;
            if ((q = ws[k]) != null) {//獲取workQueue
                if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                    //計算偏移量
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask<?>)
                            U.getObjectVolatile(a, i))) != null && //取base位置任務(wù)
                            q.base == b) {//stable
                        if (ss >= 0) {  //scanning
                            if (U.compareAndSwapObject(a, i, t, null)) {//
                                q.base = b + 1;//更新base位
                                if (n < -1)       // signal others
                                    signalWork(ws, q);//創(chuàng)建或喚醒工作線程來運行任務(wù)
                                return t;
                            }
                        } else if (oldSum == 0 &&   // try to activate 嘗試激活工作線程
                                w.scanState < 0)
                            tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);//喚醒棧頂工作線程
                    }
                    //base位置任務(wù)為空或base位置偏移媳叨,隨機移位重新掃描
                    if (ss < 0)                   // refresh
                        ss = w.scanState;
                    r ^= r << 1;
                    r ^= r >>> 3;
                    r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                checkSum += b;//隊列任務(wù)為空腥光,記錄base位
            }
            //更新索引k 繼續(xù)向后查找
            if ((k = (k + 1) & m) == origin) {    // continue until stable
                //運行到這里說明已經(jīng)掃描了全部的 workQueues,但并未掃描到任務(wù)

                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0)    // already inactive
                        break;// 已經(jīng)被滅活或終止,跳出循環(huán)

                    //對當(dāng)前WorkQueue進(jìn)行滅活操作
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                            (UC_MASK & ((c = ctl) - AC_UNIT)));//計算ctl為INACTIVE狀態(tài)并減少活躍線程數(shù)
                    w.stackPred = (int) c;         // hold prev stack top
                    U.putInt(w, QSCANSTATE, ns);//修改scanState為inactive狀態(tài)
                    if (U.compareAndSwapLong(this, CTL, c, nc))//更新scanState為滅活狀態(tài)
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;//重置checkSum糊秆,繼續(xù)循環(huán)
            }
        }
    }
    return null;
}

說明: 掃描并嘗試偷取一個任務(wù)武福。使用w.hint進(jìn)行隨機索引 WorkQueue,也就是說并不一定會執(zhí)行當(dāng)前 WorkQueue 中的任務(wù)痘番,而是偷取別的Worker的任務(wù)來執(zhí)行捉片。

函數(shù)的大概執(zhí)行流程如下:

  • 取隨機位置的一個 WorkQueue;

  • 獲取base位的 ForkJoinTask汞舱,成功取到后更新base位并返回任務(wù)伍纫;如果取到的 WorkQueue 中任務(wù)數(shù)大于1,則調(diào)用signalWork創(chuàng)建或喚醒其他工作線程昂芜;

  • 如果當(dāng)前工作線程處于不活躍狀態(tài)(INACTIVE)莹规,則調(diào)用tryRelease嘗試喚醒棧頂工作線程來執(zhí)行。

    tryRelease源碼如下:

private boolean tryRelease(long c, WorkQueue v, long inc) {
    int sp = (int) c, vs = (sp + SS_SEQ) & ~INACTIVE;
    Thread p;
    //ctl低32位等于scanState泌神,說明可以喚醒parker線程
    if (v != null && v.scanState == sp) {          // v is at top of stack
        //計算活躍線程數(shù)(高32位)并更新為下一個棧頂?shù)膕canState(低32位)
        long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
        if (U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;
            if ((p = v.parker) != null)
                U.unpark(p);//喚醒線程
            return true;
        }
    }
    return false;
}

  • 如果base位任務(wù)為空或發(fā)生偏移良漱,則對索引位進(jìn)行隨機移位舞虱,然后重新掃描;

  • 如果掃描整個workQueues之后沒有獲取到任務(wù)债热,則設(shè)置當(dāng)前工作線程為INACTIVE狀態(tài)砾嫉;然后重置checkSum,再次掃描一圈之后如果還沒有任務(wù)則跳出循環(huán)返回null窒篱。

ForkJoinPool.awaitWork(WorkQueue w, int r)

private boolean awaitWork(WorkQueue w, int r) {
    if (w == null || w.qlock < 0)                 // w is terminating
        return false;
    for (int pred = w.stackPred, spins = SPINS, ss; ; ) {
        if ((ss = w.scanState) >= 0)//正在掃描焕刮,跳出循環(huán)
            break;
        else if (spins > 0) {
            r ^= r << 6;
            r ^= r >>> 21;
            r ^= r << 7;
            if (r >= 0 && --spins == 0) {         // randomize spins
                WorkQueue v;
                WorkQueue[] ws;
                int s, j;
                AtomicLong sc;
                if (pred != 0 && (ws = workQueues) != null &&
                        (j = pred & SMASK) < ws.length &&
                        (v = ws[j]) != null &&        // see if pred parking
                        (v.parker == null || v.scanState >= 0))
                    spins = SPINS;                // continue spinning
            }
        } else if (w.qlock < 0)                     // 當(dāng)前workQueue已經(jīng)終止,返回false recheck after spins
            return false;
        else if (!Thread.interrupted()) {//判斷線程是否被中斷墙杯,并清除中斷狀態(tài)
            long c, prevctl, parkTime, deadline;
            int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK);//活躍線程數(shù)
            if ((ac <= 0 && tryTerminate(false, false)) || //無active線程配并,嘗試終止
                    (runState & STOP) != 0)           // pool terminating
                return false;
            if (ac <= 0 && ss == (int) c) {        // is last waiter
                //計算活躍線程數(shù)(高32位)并更新為下一個棧頂?shù)膕canState(低32位)
                prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                int t = (short) (c >>> TC_SHIFT);  // shrink excess spares
                if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//總線程過量
                    return false;                 // else use timed wait
                //計算空閑超時時間
                parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
            } else
                prevctl = parkTime = deadline = 0L;
            Thread wt = Thread.currentThread();
            U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
            w.parker = wt;//設(shè)置parker,準(zhǔn)備阻塞
            if (w.scanState < 0 && ctl == c)      // recheck before park
                U.park(false, parkTime);//阻塞指定的時間

            U.putOrderedObject(w, QPARKER, null);
            U.putObject(wt, PARKBLOCKER, null);
            if (w.scanState >= 0)//正在掃描高镐,說明等到任務(wù)溉旋,跳出循環(huán)
                break;
            if (parkTime != 0L && ctl == c &&
                    deadline - System.nanoTime() <= 0L &&
                    U.compareAndSwapLong(this, CTL, c, prevctl))//未等到任務(wù),更新ctl嫉髓,返回false
                return false;                     // shrink pool
        }
    }
    return true;
}

說明: 回到runWorker方法观腊,如果scan方法未掃描到任務(wù),會調(diào)用awaitWork等待獲取任務(wù)算行。函數(shù)的具體執(zhí)行流程大家看源碼梧油,這里簡單說一下:

  • 在等待獲取任務(wù)期間,如果工作線程或線程池已經(jīng)終止則直接返回false州邢。如果當(dāng)前無 active 線程儡陨,嘗試終止線程池并返回false,如果終止失敗并且當(dāng)前是最后一個等待的 Worker量淌,就阻塞指定的時間(IDLE_TIMEOUT)骗村;等到屆期或被喚醒后如果發(fā)現(xiàn)自己是scanning(scanState >= 0)狀態(tài),說明已經(jīng)等到任務(wù)呀枢,跳出等待返回true繼續(xù) scan胚股,否則的更新ctl并返回false。

WorkQueue.runTask()

final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentSteal = task).doExec();//更新currentSteal并執(zhí)行任務(wù)
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();//依次執(zhí)行本地任務(wù)
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);//增加偷取任務(wù)數(shù)
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();//執(zhí)行鉤子函數(shù)
    }
}

說明: 在scan方法掃描到任務(wù)之后裙秋,調(diào)用WorkQueue.runTask()來執(zhí)行獲取到的任務(wù)信轿,大概流程如下:

  • 標(biāo)記scanState為正在執(zhí)行狀態(tài);
  • 更新currentSteal為當(dāng)前獲取到的任務(wù)并執(zhí)行它残吩,任務(wù)的執(zhí)行調(diào)用了ForkJoinTask.doExec()方法,源碼如下:
//ForkJoinTask.doExec()
final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();//執(zhí)行我們定義的任務(wù)
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

  • 調(diào)用execLocalTasks依次執(zhí)行當(dāng)前WorkerQueue中的任務(wù)倘核,源碼如下:
//執(zhí)行并移除所有本地任務(wù)
final void execLocalTasks() {
    int b = base, m, s;
    ForkJoinTask<?>[] a = array;
    if (b - (s = top - 1) <= 0 && a != null &&
            (m = a.length - 1) >= 0) {
        if ((config & FIFO_QUEUE) == 0) {//FIFO模式
            for (ForkJoinTask<?> t; ; ) {
                if ((t = (ForkJoinTask<?>) U.getAndSetObject
                        (a, ((m & s) << ASHIFT) + ABASE, null)) == null)//FIFO執(zhí)行泣侮,取top任務(wù)
                    break;
                U.putOrderedInt(this, QTOP, s);
                t.doExec();//執(zhí)行
                if (base - (s = top - 1) > 0)
                    break;
            }
        } else
            pollAndExecAll();//LIFO模式執(zhí)行,取base任務(wù)
    }
}

  • 更新偷取任務(wù)數(shù)紧唱;
  • 還原scanState并執(zhí)行鉤子函數(shù)活尊。

ForkJoinPool.deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    WorkQueue w = null;
    //1.移除workQueue
    if (wt != null && (w = wt.workQueue) != null) {//獲取ForkJoinWorkerThread的等待隊列
        WorkQueue[] ws;                           // remove index from array
        int idx = w.config & SMASK;//計算workQueue索引
        int rs = lockRunState();//獲取runState鎖和當(dāng)前池運行狀態(tài)
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            ws[idx] = null;//移除workQueue
        unlockRunState(rs, rs & ~RSLOCK);//解除runState鎖
    }
    //2.減少CTL數(shù)
    long c;                                       // decrement counts
    do {} while (!U.compareAndSwapLong
                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                       (TC_MASK & (c - TC_UNIT)) |
                                       (SP_MASK & c))));
    //3.處理被移除workQueue內(nèi)部相關(guān)參數(shù)
    if (w != null) {
        w.qlock = -1;                             // ensure set
        w.transferStealCount(this);
        w.cancelAll();                            // cancel remaining tasks
    }
    //4.如果線程未終止隶校,替換被移除的workQueue并喚醒內(nèi)部線程
    for (;;) {                                    // possibly replace
        WorkQueue[] ws; int m, sp;
        //嘗試終止線程池
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)              // already terminating
            break;
        //喚醒被替換的線程,依賴于下一步
        if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
        }
        //創(chuàng)建工作線程替換
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            tryAddWorker(c);                      // create replacement
            break;
        }
        else                                      // don't need replacement
            break;
    }
    //5.處理異常
    if (ex == null)                               // help clean on way out
        ForkJoinTask.helpExpungeStaleExceptions();
    else                                          // rethrow
        ForkJoinTask.rethrow(ex);
}

說明: deregisterWorker方法用于工作線程運行完畢之后終止線程或處理工作線程異常蛹锰,主要就是清除已關(guān)閉的工作線程或回滾創(chuàng)建線程之前的操作深胳,并把傳入的異常拋給 ForkJoinTask 來處理苗分。具體步驟見源碼注釋矾策。

小結(jié)

本節(jié)我們對任務(wù)的執(zhí)行流程進(jìn)行了說明愿题,后面我們將繼續(xù)介紹任務(wù)的結(jié)果獲取(join/invoke)工闺。

獲取任務(wù)結(jié)果 - ForkJoinTask.join() / ForkJoinTask.invoke()

//合并任務(wù)結(jié)果
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

//join, get, quietlyJoin的主實現(xiàn)方法
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

  • invoke() :
//執(zhí)行任務(wù)投剥,并等待任務(wù)完成并返回結(jié)果
public final V invoke() {
    int s;
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

//invoke, quietlyInvoke的主實現(xiàn)方法
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

說明: join()方法一把是在任務(wù)fork()之后調(diào)用碰酝,用來獲取(或者叫“合并”)任務(wù)的執(zhí)行結(jié)果丐黄。

ForkJoinTask的join()和invoke()方法都可以用來獲取任務(wù)的執(zhí)行結(jié)果(另外還有g(shù)et方法也是調(diào)用了doJoin來獲取任務(wù)結(jié)果绊序,但是會響應(yīng)運行時異常)纷宇,它們對外部提交任務(wù)的執(zhí)行方式一致夸盟,都是通過externalAwaitDone方法等待執(zhí)行結(jié)果。不同的是invoke()方法會直接執(zhí)行當(dāng)前任務(wù)像捶;而join()方法則是在當(dāng)前任務(wù)在隊列 top 位時(通過tryUnpush方法判斷)才能執(zhí)行上陕,如果當(dāng)前任務(wù)不在 top 位或者任務(wù)執(zhí)行失敗調(diào)用ForkJoinPool.awaitJoin方法幫助執(zhí)行或阻塞當(dāng)前 join 任務(wù)。(所以在官方文檔中建議了我們對ForkJoinTask任務(wù)的調(diào)用順序拓春,一對 fork-join操作一般按照如下順序調(diào)用: a.fork(); b.fork(); b.join(); a.join();释簿。因為任務(wù) b 是后面進(jìn)入隊列,也就是說它是在棧頂?shù)?top 位)痘儡,在它fork()之后直接調(diào)用join()就可以直接執(zhí)行而不會調(diào)用ForkJoinPool.awaitJoin方法去等待辕万。)

在這些方法中,join()相對比較全面沉删,所以之后的講解我們將從join()開始逐步向下分析渐尿,首先看一下join()的執(zhí)行流程:


image.png

后面的源碼分析中,我們首先講解比較簡單的外部 join 任務(wù)(externalAwaitDone)矾瑰,然后再講解內(nèi)部 join 任務(wù)(從ForkJoinPool.awaitJoin()開始)砖茸。

ForkJoinTask.externalAwaitDone()

private int externalAwaitDone() {
    //執(zhí)行任務(wù)
    int s = ((this instanceof CountedCompleter) ? // try helping
             ForkJoinPool.common.externalHelpComplete(  // CountedCompleter任務(wù)
                 (CountedCompleter<?>)this, 0) :
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);  // ForkJoinTask任務(wù)
    if (s >= 0 && (s = status) >= 0) {//執(zhí)行失敗,進(jìn)入等待
        boolean interrupted = false;
        do {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {  //更新state
                synchronized (this) {
                    if (status >= 0) {//SIGNAL 等待信號
                        try {
                            wait(0L);
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    else
                        notifyAll();
                }
            }
        } while ((s = status) >= 0);
        if (interrupted)
            Thread.currentThread().interrupt();
    }
    return s;
}

說明: 如果當(dāng)前join為外部調(diào)用殴穴,則調(diào)用此方法執(zhí)行任務(wù)凉夯,如果任務(wù)執(zhí)行失敗就進(jìn)入等待。方法本身是很簡單的采幌,需要注意的是對不同的任務(wù)類型分兩種情況:

  • 如果我們的任務(wù)為 CountedCompleter 類型的任務(wù)劲够,則調(diào)用externalHelpComplete方法來執(zhí)行任務(wù)。

  • 其他類型的 ForkJoinTask 任務(wù)調(diào)用tryExternalUnpush來執(zhí)行休傍,源碼如下:

//為外部提交者提供 tryUnpush 功能(給定任務(wù)在top位時彈出任務(wù))
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
    WorkQueue[] ws;
    WorkQueue w;
    ForkJoinTask<?>[] a;
    int m, s;
    int r = ThreadLocalRandom.getProbe();
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
            (w = ws[m & r & SQMASK]) != null &&
            (a = w.array) != null && (s = w.top) != w.base) {
        long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;  //取top位任務(wù)
        if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {  //加鎖
            if (w.top == s && w.array == a &&
                    U.getObject(a, j) == task &&
                    U.compareAndSwapObject(a, j, task, null)) {  //符合條件征绎,彈出
                U.putOrderedInt(w, QTOP, s - 1);  //更新top
                U.putOrderedInt(w, QLOCK, 0); //解鎖,返回true
                return true;
            }
            U.compareAndSwapInt(w, QLOCK, 1, 0);  //當(dāng)前任務(wù)不在top位磨取,解鎖返回false
        }
    }
    return false;
}

tryExternalUnpush的作用就是判斷當(dāng)前任務(wù)是否在top位人柿,如果是則彈出任務(wù)柴墩,然后在externalAwaitDone中調(diào)用doExec()執(zhí)行任務(wù)。

ForkJoinPool.awaitJoin()

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask<?> prevJoin = w.currentJoin;  //獲取給定Worker的join任務(wù)
        U.putOrderedObject(w, QCURRENTJOIN, task);  //把currentJoin替換為給定任務(wù)
        //判斷是否為CountedCompleter類型的任務(wù)
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>) task : null;
        for (; ; ) {
            if ((s = task.status) < 0)  //已經(jīng)完成|取消|異常 跳出循環(huán)
                break;

            if (cc != null)//CountedCompleter任務(wù)由helpComplete來完成join
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))  //嘗試執(zhí)行
                helpStealer(w, task);  //隊列為空或執(zhí)行失敗凫岖,任務(wù)可能被偷江咳,幫助偷取者執(zhí)行該任務(wù)

            if ((s = task.status) < 0) //已經(jīng)完成|取消|異常,跳出循環(huán)
                break;
            //計算任務(wù)等待時間
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;

            if (tryCompensate(w)) {//執(zhí)行補償操作
                task.internalWait(ms);//補償執(zhí)行成功哥放,任務(wù)等待指定時間
                U.getAndAddLong(this, CTL, AC_UNIT);//更新活躍線程數(shù)
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);//循環(huán)結(jié)束歼指,替換為原來的join任務(wù)
    }
    return s;
}

說明: 如果當(dāng)前 join 任務(wù)不在Worker等待隊列的top位,或者任務(wù)執(zhí)行失敗婶芭,調(diào)用此方法來幫助執(zhí)行或阻塞當(dāng)前 join 的任務(wù)东臀。函數(shù)執(zhí)行流程如下:

  • 由于每次調(diào)用awaitJoin都會優(yōu)先執(zhí)行當(dāng)前join的任務(wù),所以首先會更新currentJoin為當(dāng)前join任務(wù)犀农;
  • 進(jìn)入自旋:
    • 首先檢查任務(wù)是否已經(jīng)完成(通過task.status < 0判斷)惰赋,如果給定任務(wù)執(zhí)行完畢|取消|異常 則跳出循環(huán)返回執(zhí)行狀態(tài)s;
    • 如果是 CountedCompleter 任務(wù)類型呵哨,調(diào)用helpComplete方法來完成join操作(后面筆者會開新篇來專門講解CountedCompleter赁濒,本篇暫時不做詳細(xì)解析);
    • 非 CountedCompleter 任務(wù)類型調(diào)用WorkQueue.tryRemoveAndExec嘗試執(zhí)行任務(wù)孟害;
    • 如果給定 WorkQueue 的等待隊列為空或任務(wù)執(zhí)行失敗拒炎,說明任務(wù)可能被偷,調(diào)用helpStealer幫助偷取者執(zhí)行任務(wù)(也就是說挨务,偷取者幫我執(zhí)行任務(wù)击你,我去幫偷取者執(zhí)行它的任務(wù));
    • 再次判斷任務(wù)是否執(zhí)行完畢(task.status < 0)谎柄,如果任務(wù)執(zhí)行失敗丁侄,計算一個等待時間準(zhǔn)備進(jìn)行補償操作;
    • 調(diào)用tryCompensate方法為給定 WorkQueue 嘗試執(zhí)行補償操作朝巫。在執(zhí)行補償期間鸿摇,如果發(fā)現(xiàn) 資源爭用|池處于unstable狀態(tài)|當(dāng)前Worker已終止,則調(diào)用ForkJoinTask.internalWait()方法等待指定的時間劈猿,任務(wù)喚醒之后繼續(xù)自旋拙吉,F(xiàn)orkJoinTask.internalWait()源碼如下:
final void internalWait(long timeout) {
    int s;
    if ((s = status) >= 0 && // force completer to issue notify
        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//更新任務(wù)狀態(tài)為SIGNAL(等待喚醒)
        synchronized (this) {
            if (status >= 0)
                try { wait(timeout); } catch (InterruptedException ie) { }
            else
                notifyAll();
        }
    }
}

在awaitJoin中,我們總共調(diào)用了三個比較復(fù)雜的方法: tryRemoveAndExec揪荣、helpStealer和tryCompensate筷黔,下面我們依次講解。

WorkQueue.tryRemoveAndExec(ForkJoinTask<?> task)

final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a;
    int m, s, b, n;
    if ((a = array) != null && (m = a.length - 1) >= 0 &&
            task != null) {
        while ((n = (s = top) - (b = base)) > 0) {
            //從top往下自旋查找
            for (ForkJoinTask<?> t; ; ) {      // traverse from s to b
                long j = ((--s & m) << ASHIFT) + ABASE;//計算任務(wù)索引
                if ((t = (ForkJoinTask<?>) U.getObject(a, j)) == null) //獲取索引到的任務(wù)
                    return s + 1 == top;     // shorter than expected
                else if (t == task) { //給定任務(wù)為索引任務(wù)
                    boolean removed = false;
                    if (s + 1 == top) {      // pop
                        if (U.compareAndSwapObject(a, j, task, null)) { //彈出任務(wù)
                            U.putOrderedInt(this, QTOP, s); //更新top
                            removed = true;
                        }
                    } else if (base == b)      // replace with proxy
                        removed = U.compareAndSwapObject(
                                a, j, task, new EmptyTask()); //join任務(wù)已經(jīng)被移除佛舱,替換為一個占位任務(wù)
                    if (removed)
                        task.doExec(); //執(zhí)行
                    break;
                } else if (t.status < 0 && s + 1 == top) { //給定任務(wù)不是top任務(wù)
                    if (U.compareAndSwapObject(a, j, t, null)) //彈出任務(wù)
                        U.putOrderedInt(this, QTOP, s);//更新top
                    break;                  // was cancelled
                }
                if (--n == 0) //遍歷結(jié)束
                    return false;
            }
            if (task.status < 0) //任務(wù)執(zhí)行完畢
                return false;
        }
    }
    return true;
}

說明: 從top位開始自旋向下找到給定任務(wù),如果找到把它從當(dāng)前 Worker 的任務(wù)隊列中移除并執(zhí)行它。注意返回的參數(shù): 如果任務(wù)隊列為空或者任務(wù)未執(zhí)行完畢返回true;任務(wù)執(zhí)行完畢返回false仆救。

ForkJoinPool.helpStealer(WorkQueue w, ForkJoinTask<?> task)

private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
    WorkQueue[] ws = workQueues;
    int oldSum = 0, checkSum, m;
    if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
            task != null) {
        do {                                       // restart point
            checkSum = 0;                          // for stability check
            ForkJoinTask<?> subtask;
            WorkQueue j = w, v;                    // v is subtask stealer
            descent:
            for (subtask = task; subtask.status >= 0; ) {
                //1. 找到給定WorkQueue的偷取者v
                for (int h = j.hint | 1, k = 0, i; ; k += 2) {//跳兩個索引碌补,因為Worker在奇數(shù)索引位
                    if (k > m)                     // can't find stealer
                        break descent;
                    if ((v = ws[i = (h + k) & m]) != null) {
                        if (v.currentSteal == subtask) {//定位到偷取者
                            j.hint = i;//更新stealer索引
                            break;
                        }
                        checkSum += v.base;
                    }
                }
                //2. 幫助偷取者v執(zhí)行任務(wù)
                for (; ; ) {                         // help v or descend
                    ForkJoinTask<?>[] a;            //偷取者內(nèi)部的任務(wù)
                    int b;
                    checkSum += (b = v.base);
                    ForkJoinTask<?> next = v.currentJoin;//獲取偷取者的join任務(wù)
                    if (subtask.status < 0 || j.currentJoin != subtask ||
                            v.currentSteal != subtask) // stale
                        break descent; // stale只恨,跳出descent循環(huán)重來
                    if (b - v.top >= 0 || (a = v.array) == null) {
                        if ((subtask = next) == null)   //偷取者的join任務(wù)為null,跳出descent循環(huán)
                            break descent;
                        j = v;
                        break; //偷取者內(nèi)部任務(wù)為空,可能任務(wù)也被偷走了牛曹;跳出本次循環(huán)佛点,查找偷取者的偷取者
                    }
                    int i = (((a.length - 1) & b) << ASHIFT) + ABASE;//獲取base偏移地址
                    ForkJoinTask<?> t = ((ForkJoinTask<?>)
                            U.getObjectVolatile(a, i));//獲取偷取者的base任務(wù)
                    if (v.base == b) {
                        if (t == null)             // stale
                            break descent; // stale,跳出descent循環(huán)重來
                        if (U.compareAndSwapObject(a, i, t, null)) {//彈出任務(wù)
                            v.base = b + 1;         //更新偷取者的base位
                            ForkJoinTask<?> ps = w.currentSteal;//獲取調(diào)用者偷來的任務(wù)
                            int top = w.top;
                            //首先更新給定workQueue的currentSteal為偷取者的base任務(wù)黎比,然后執(zhí)行該任務(wù)
                            //然后通過檢查top來判斷給定workQueue是否有自己的任務(wù)超营,如果有,
                            // 則依次彈出任務(wù)(LIFO)->更新currentSteal->執(zhí)行該任務(wù)(注意這里是自己偷自己的任務(wù)執(zhí)行)
                            do {
                                U.putOrderedObject(w, QCURRENTSTEAL, t);
                                t.doExec();        // clear local tasks too
                            } while (task.status >= 0 &&
                                    w.top != top && //內(nèi)部有自己的任務(wù)阅虫,依次彈出執(zhí)行
                                    (t = w.pop()) != null);
                            U.putOrderedObject(w, QCURRENTSTEAL, ps);//還原給定workQueue的currentSteal
                            if (w.base != w.top)//給定workQueue有自己的任務(wù)了演闭,幫助結(jié)束,返回
                                return;            // can't further help
                        }
                    }
                }
            }
        } while (task.status >= 0 && oldSum != (oldSum = checkSum));
    }
}

說明: 如果隊列為空或任務(wù)執(zhí)行失敗颓帝,說明任務(wù)可能被偷米碰,調(diào)用此方法來幫助偷取者執(zhí)行任務(wù)窝革。基本思想是: 偷取者幫助我執(zhí)行任務(wù)吕座,我去幫助偷取者執(zhí)行它的任務(wù)虐译。 函數(shù)執(zhí)行流程如下:

循環(huán)定位偷取者,由于Worker是在奇數(shù)索引位吴趴,所以每次會跳兩個索引位菱蔬。定位到偷取者之后,更新調(diào)用者 WorkQueue 的hint為偷取者的索引史侣,方便下次定位; 定位到偷取者后魏身,開始幫助偷取者執(zhí)行任務(wù)惊橱。從偷取者的base索引開始,每次偷取一個任務(wù)執(zhí)行箭昵。在幫助偷取者執(zhí)行任務(wù)后税朴,如果調(diào)用者發(fā)現(xiàn)本身已經(jīng)有任務(wù)(w.top != top),則依次彈出自己的任務(wù)(LIFO順序)并執(zhí)行(也就是說自己偷自己的任務(wù)執(zhí)行)家制。

ForkJoinPool.tryCompensate(WorkQueue w)

//執(zhí)行補償操作: 嘗試縮減活動線程量正林,可能釋放或創(chuàng)建一個補償線程來準(zhǔn)備阻塞
private boolean tryCompensate(WorkQueue w) {
    boolean canBlock;
    WorkQueue[] ws;
    long c;
    int m, pc, sp;
    if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
        canBlock = false; //調(diào)用者已終止
    else if ((sp = (int) (c = ctl)) != 0)      // release idle worker
        canBlock = tryRelease(c, ws[sp & m], 0L);//喚醒等待的工作線程
    else {//沒有空閑線程
        int ac = (int) (c >> AC_SHIFT) + pc; //活躍線程數(shù)
        int tc = (short) (c >> TC_SHIFT) + pc;//總線程數(shù)
        int nbusy = 0;                        // validate saturation
        for (int i = 0; i <= m; ++i) {        // two passes of odd indices
            WorkQueue v;
            if ((v = ws[((i << 1) | 1) & m]) != null) {//取奇數(shù)索引位
                if ((v.scanState & SCANNING) != 0)//沒有正在運行任務(wù),跳出
                    break;
                ++nbusy;//正在運行任務(wù)颤殴,添加標(biāo)記
            }
        }
        if (nbusy != (tc << 1) || ctl != c)
            canBlock = false;                 // unstable or stale
        else if (tc >= pc && ac > 1 && w.isEmpty()) {//總線程數(shù)大于并行度 && 活動線程數(shù)大于1 && 調(diào)用者任務(wù)隊列為空觅廓,不需要補償
            long nc = ((AC_MASK & (c - AC_UNIT)) |
                    (~AC_MASK & c));       // uncompensated
            canBlock = U.compareAndSwapLong(this, CTL, c, nc);//更新活躍線程數(shù)
        } else if (tc >= MAX_CAP ||
                (this == common && tc >= pc + commonMaxSpares))//超出最大線程數(shù)
            throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
        else {                                // similar to tryAddWorker
            boolean add = false;
            int rs;      // CAS within lock
            long nc = ((AC_MASK & c) |
                    (TC_MASK & (c + TC_UNIT)));//計算總線程數(shù)
            if (((rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);//更新總線程數(shù)
            unlockRunState(rs, rs & ~RSLOCK);
            //運行到這里說明活躍工作線程數(shù)不足付魔,需要創(chuàng)建一個新的工作線程來補償
            canBlock = add && createWorker(); // throws on exception
        }
    }
    return canBlock;
}

說明: 具體的執(zhí)行看源碼及注釋膳帕,這里我們簡單總結(jié)一下需要和不需要補償?shù)膸追N情況:

需要補償 :

  • 調(diào)用者隊列不為空,并且有空閑工作線程企软,這種情況會喚醒空閑線程(調(diào)用tryRelease方法)
  • 池尚未停止矮瘟,活躍線程數(shù)不足瞳脓,這時會新建一個工作線程(調(diào)用createWorker方法)

不需要補償 :

  • 調(diào)用者已終止或池處于不穩(wěn)定狀態(tài)
  • 總線程數(shù)大于并行度 && 活動線程數(shù)大于1 && 調(diào)用者任務(wù)隊列為空

Fork/Join的陷阱與注意事項

使用Fork/Join框架時,需要注意一些陷阱, 在下面 斐波那契數(shù)列例子中你將看到示例:

避免不必要的fork()

劃分成兩個子任務(wù)后澈侠,不要同時調(diào)用兩個子任務(wù)的fork()方法劫侧。

表面上看上去兩個子任務(wù)都fork(),然后join()兩次似乎更自然哨啃。但事實證明烧栋,直接調(diào)用compute()效率更高。因為直接調(diào)用子任務(wù)的compute()方法實際上就是在當(dāng)前的工作線程進(jìn)行了計算(線程重用)棘催,這比“將子任務(wù)提交到工作隊列劲弦,線程又從工作隊列中拿任務(wù)”快得多。

當(dāng)一個大任務(wù)被劃分成兩個以上的子任務(wù)時醇坝,盡可能使用前面說到的三個衍生的invokeAll方法邑跪,因為使用它們能避免不必要的fork()次坡。

注意fork()、compute()画畅、join()的順序

為了兩個任務(wù)并行砸琅,三個方法的調(diào)用順序需要萬分注意。

right.fork(); // 計算右邊的任務(wù)
long leftAns = left.compute(); // 計算左邊的任務(wù)(同時右邊任務(wù)也在計算)
long rightAns = right.join(); // 等待右邊的結(jié)果
return leftAns + rightAns;

如果我們寫成:

left.fork(); // 計算完左邊的任務(wù)
long leftAns = left.join(); // 等待左邊的計算結(jié)果
long rightAns = right.compute(); // 再計算右邊的任務(wù)
return leftAns + rightAns;

或者
long rightAns = right.compute(); // 計算完右邊的任務(wù)
left.fork(); // 再計算左邊的任務(wù)
long leftAns = left.join(); // 等待左邊的計算結(jié)果
return leftAns + rightAns;

這兩種實際上都沒有并行轴踱。

選擇合適的子任務(wù)粒度

選擇劃分子任務(wù)的粒度(順序執(zhí)行的閾值)很重要症脂,因為使用Fork/Join框架并不一定比順序執(zhí)行任務(wù)的效率高: 如果任務(wù)太大,則無法提高并行的吞吐量淫僻;如果任務(wù)太小诱篷,子任務(wù)的調(diào)度開銷可能會大于并行計算的性能提升,我們還要考慮創(chuàng)建子任務(wù)雳灵、fork()子任務(wù)棕所、線程調(diào)度以及合并子任務(wù)處理結(jié)果的耗時以及相應(yīng)的內(nèi)存消耗。

官方文檔給出的粗略經(jīng)驗是: 任務(wù)應(yīng)該執(zhí)行100~10000個基本的計算步驟悯辙。決定子任務(wù)的粒度的最好辦法是實踐琳省,通過實際測試結(jié)果來確定這個閾值才是“上上策”。

和其他Java代碼一樣躲撰,F(xiàn)ork/Join框架測試時需要“預(yù)熱”或者說執(zhí)行幾遍才會被JIT(Just-in-time)編譯器優(yōu)化针贬,所以測試性能之前跑幾遍程序很重要。

避免重量級任務(wù)劃分與結(jié)果合并

Fork/Join的很多使用場景都用到數(shù)組或者List等數(shù)據(jù)結(jié)構(gòu)拢蛋,子任務(wù)在某個分區(qū)中運行桦他,最典型的例子如并行排序和并行查找。拆分子任務(wù)以及合并處理結(jié)果的時候谆棱,應(yīng)該盡量避免System.arraycopy這樣耗時耗空間的操作瞬铸,從而最小化任務(wù)的處理開銷。

采用Fork/Join來異步計算1+2+3+…+10000的結(jié)果

public class Test {
    static final class SumTask extends RecursiveTask<Integer> {
        private static final long serialVersionUID = 1L;
        
        final int start; //開始計算的數(shù)
        final int end; //最后計算的數(shù)
        
        SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            //如果計算量小于1000础锐,那么分配一個線程執(zhí)行if中的代碼塊嗓节,并返回執(zhí)行結(jié)果
            if(end - start < 1000) {
                System.out.println(Thread.currentThread().getName() + " 開始執(zhí)行: " + start + "-" + end);
                int sum = 0;
                for(int i = start; i <= end; i++)
                    sum += i;
                return sum;
            }
            //如果計算量大于1000,那么拆分為兩個任務(wù)
            SumTask task1 = new SumTask(start, (start + end) / 2);
            SumTask task2 = new SumTask((start + end) / 2 + 1, end);
            //執(zhí)行任務(wù)
            task1.fork();
            task2.fork();
            //獲取任務(wù)執(zhí)行的結(jié)果
            return task1.join() + task2.join();
        }
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Integer> task = new SumTask(1, 10000);
        pool.submit(task);
        System.out.println(task.get());
    }
}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末皆警,一起剝皮案震驚了整個濱河市拦宣,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌信姓,老刑警劉巖鸵隧,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異意推,居然都是意外死亡豆瘫,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進(jìn)店門菊值,熙熙樓的掌柜王于貴愁眉苦臉地迎上來外驱,“玉大人育灸,你說我怎么就攤上這事£怯睿” “怎么了磅崭?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長瓦哎。 經(jīng)常有香客問我砸喻,道長,這世上最難降的妖魔是什么蒋譬? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任割岛,我火速辦了婚禮,結(jié)果婚禮上犯助,老公的妹妹穿的比我還像新娘蜂桶。我一直安慰自己,他們只是感情好也切,可當(dāng)我...
    茶點故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著腰湾,像睡著了一般雷恃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上费坊,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天倒槐,我揣著相機與錄音,去河邊找鬼附井。 笑死讨越,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的永毅。 我是一名探鬼主播把跨,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼沼死!你這毒婦竟也來了着逐?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤意蛀,失蹤者是張志新(化名)和其女友劉穎耸别,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體县钥,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡秀姐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了若贮。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片省有。...
    茶點故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡痒留,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出锥咸,到底是詐尸還是另有隱情狭瞎,我是刑警寧澤,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布搏予,位于F島的核電站熊锭,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏雪侥。R本人自食惡果不足惜碗殷,卻給世界環(huán)境...
    茶點故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望速缨。 院中可真熱鬧锌妻,春花似錦、人聲如沸旬牲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽原茅。三九已至吭历,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間擂橘,已是汗流浹背晌区。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留通贞,地道東北人朗若。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像昌罩,于是被迫代替她去往敵國和親哭懈。 傳聞我的和親對象是個殘疾皇子下梢,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內(nèi)容