JUC源碼分析-線程池篇(五):ForkJoinPool - 2

通過(guò)上一篇(JUC源碼分析-線程池篇(四):ForkJoinPool - 1)的講解,相信同學(xué)們對(duì) ForkJoinPool 已經(jīng)有了一個(gè)大概的認(rèn)識(shí),本篇我們將通過(guò)分析源碼的方式來(lái)深入了解 ForkJoinPool 的具體實(shí)現(xiàn)原理胚宦。


概述

上篇提到過(guò),在 java.util.concurrent 包中绝骚,F(xiàn)ork/Join 框架主要由 ForkJoinPool田柔、ForkJoinWorkerThread 和 ForkJoinTask(包括RecursiveTask、RecursiveAction 和 CountedCompleter) 來(lái)實(shí)現(xiàn)耀石,它們之間的聯(lián)系如下:

  • ForkJoinPool 中使用 ForkJoinWorkerThread 來(lái)運(yùn)行 ForkJoinTask 任務(wù)牵囤,F(xiàn)orkJoinPool 只接收 ForkJoinTask 任務(wù)(在實(shí)際使用中,也可以接收 Runnable/Callable 任務(wù)娶牌,但在真正運(yùn)行時(shí)奔浅,也會(huì)把這些任務(wù)封裝成 ForkJoinTask 類型的任務(wù))馆纳,RecursiveTask 是 ForkJoinTask 的子類诗良,是一個(gè)可以遞歸執(zhí)行的 ForkJoinTask,RecursiveAction 是一個(gè)無(wú)返回值的 RecursiveTask鲁驶,CountedCompleter 在任務(wù)完成執(zhí)行后會(huì)觸發(fā)執(zhí)行一個(gè)自定義的鉤子函數(shù)鉴裹。
    在實(shí)際運(yùn)用中,我們一般都會(huì)繼承 RecursiveTask 钥弯、RecursiveAction 或 CountedCompleter 來(lái)實(shí)現(xiàn)我們的業(yè)務(wù)需求径荔,而不會(huì)直接繼承 ForkJoinTask 類。

在本篇的解析中主要包括三個(gè)類( ForkJoinPool脆霎、ForkJoinWorkerThread 和 ForkJoinTask )总处,我們通過(guò)穿插講解它們之間的聯(lián)系來(lái)分析 ForkJoinPool 的運(yùn)行原理。

數(shù)據(jù)結(jié)構(gòu)和核心參數(shù)

1. ForkJoinPool

ForkJoinPool 繼承關(guān)系

內(nèi)部類介紹:

  1. ForkJoinWorkerThreadFactory:內(nèi)部線程工廠接口睛蛛,用于創(chuàng)建工作線程ForkJoinWorkerThread

  2. DefaultForkJoinWorkerThreadFactory:ForkJoinWorkerThreadFactory 的默認(rèn)實(shí)現(xiàn)類

  3. InnocuousForkJoinWorkerThreadFactory:實(shí)現(xiàn)了 ForkJoinWorkerThreadFactory鹦马,無(wú)許可線程工廠,當(dāng)系統(tǒng)變量中有系統(tǒng)安全管理相關(guān)屬性時(shí)忆肾,默認(rèn)使用這個(gè)工廠創(chuàng)建工作線程荸频。

  4. EmptyTask:內(nèi)部占位類,用于替換隊(duì)列中 join 的任務(wù)客冈。

  5. ManagedBlocker:為 ForkJoinPool 中的任務(wù)提供擴(kuò)展管理并行數(shù)的接口旭从,一般用在可能會(huì)阻塞的任務(wù)(如在 Phaser 中用于等待 phase 到下一個(gè)generation)。

  6. WorkQueue:ForkJoinPool 的核心數(shù)據(jù)結(jié)構(gòu)场仲,本質(zhì)上是work-stealing 模式的雙端任務(wù)隊(duì)列和悦,內(nèi)部存放 ForkJoinTask 對(duì)象任務(wù),使用 @Contented 注解修飾防止偽共享渠缕。具體介紹見(jiàn)上篇摹闽。

    • 工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了 fork())時(shí),此時(shí)可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個(gè)棧褐健,新的任務(wù)會(huì)放入棧頂(top 位)付鹿;工作線程在處理自己工作隊(duì)列的任務(wù)時(shí)澜汤,按照 LIFO 的順序。

    • 工作線程在處理自己的工作隊(duì)列同時(shí)舵匾,會(huì)嘗試竊取一個(gè)任務(wù)(可能是來(lái)自于剛剛提交到 pool 的任務(wù)俊抵,或是來(lái)自于其他工作線程的隊(duì)列任務(wù)),此時(shí)可以把 WorkQueue 的數(shù)據(jù)結(jié)構(gòu)視為一個(gè) FIFO 的隊(duì)列坐梯,竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首(base位)徽诲。

偽共享狀態(tài):緩存系統(tǒng)中是以緩存行(cache line)為單位存儲(chǔ)的。緩存行是2的整數(shù)冪個(gè)連續(xù)字節(jié)吵血,一般為32-256個(gè)字節(jié)谎替。最常見(jiàn)的緩存行大小是64個(gè)字節(jié)。當(dāng)多線程修改互相獨(dú)立的變量時(shí)蹋辅,如果這些變量共享同一個(gè)緩存行钱贯,就會(huì)無(wú)意中影響彼此的性能,這就是偽共享侦另。

核心參數(shù)

在后面的源碼解析中秩命,我們會(huì)看到大量的位運(yùn)算,這些位運(yùn)算都是通過(guò)我們接下來(lái)介紹的一些常量參數(shù)來(lái)計(jì)算的褒傅。
例如弃锐,如果要更新活躍線程數(shù),使用公式(UC_MASK & (c + AC_UNIT)) | (SP_MASK & c)殿托;c 代表當(dāng)前 ctl霹菊,UC_MASK 和 SP_MASK 分別是高位和低位掩碼,AC_UNIT 為活躍線程的增量數(shù)支竹,使用(UC_MASK & (c + AC_UNIT))就可以計(jì)算出高32位旋廷,然后再加上低32位(SP_MASK & c),就拼接成了一個(gè)新的ctl唾戚。

這些運(yùn)算的可讀性很差柳洋,看起來(lái)有些復(fù)雜。在后面源碼解析中有位運(yùn)算的地方我都會(huì)加上注釋叹坦,大家只需要了解它們的作用即可熊镣。

  1. ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量:
// Constants shared across ForkJoinPool and WorkQueue

// 限定參數(shù)
static final int SMASK = 0xffff;        //  低位掩碼,也是最大索引位
static final int MAX_CAP = 0x7fff;        //  工作線程最大容量
static final int EVENMASK = 0xfffe;        //  偶數(shù)低位掩碼
static final int SQMASK = 0x007e;        //  workQueues 數(shù)組最多64個(gè)槽位

// ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
static final int SCANNING = 1;             // 標(biāo)記是否正在運(yùn)行任務(wù)
static final int INACTIVE = 1 << 31;       // 失活狀態(tài)  負(fù)數(shù)
static final int SS_SEQ = 1 << 16;       // 版本戳募书,防止ABA問(wèn)題

// ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
static final int MODE_MASK = 0xffff << 16;  // 模式掩碼
static final int LIFO_QUEUE = 0; //LIFO隊(duì)列
static final int FIFO_QUEUE = 1 << 16;//FIFO隊(duì)列
static final int SHARED_QUEUE = 1 << 31;       // 共享模式隊(duì)列绪囱,負(fù)數(shù)
  1. ForkJoinPool 中的相關(guān)常量和實(shí)例字段:
//  低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;

// 活躍線程數(shù)
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼

// 工作線程數(shù)
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 創(chuàng)建工作線程標(biāo)志

// 池狀態(tài)
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;

// 實(shí)例字段
volatile long ctl;                   // 主控制參數(shù)
volatile int runState;               // 運(yùn)行狀態(tài)鎖
final int config;                    // 并行度|模式
int indexSeed;                       // 用于生成工作線程索引
volatile WorkQueue[] workQueues;     // 主對(duì)象注冊(cè)信息,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh;  // 每個(gè)工作線程的異常信息
final String workerNamePrefix;       // 用于創(chuàng)建工作線程的名稱
volatile AtomicLong stealCounter;    // 偷取任務(wù)總數(shù)莹捡,也可作為同步監(jiān)視器

/** 靜態(tài)初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啟動(dòng)或殺死線程的方法調(diào)用者的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態(tài)pool
static final ForkJoinPool common;
//并行度鬼吵,對(duì)應(yīng)內(nèi)部common池
static final int commonParallelism;
//備用線程數(shù),在tryCompensate中使用
private static int commonMaxSpares;
//創(chuàng)建workerNamePrefix(工作線程名稱前綴)時(shí)的序號(hào)
private static int poolNumberSequence;
//線程阻塞等待新的任務(wù)的超時(shí)值(以納秒為單位)篮赢,默認(rèn)2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閑超時(shí)時(shí)間齿椅,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms
//默認(rèn)備用線程數(shù)
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數(shù)琉挖,用在在awaitRunStateLock和awaitWork中
private static final int SPINS  = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;

說(shuō)明: ForkJoinPool 的內(nèi)部狀態(tài)都是通過(guò)一個(gè)64位的 long 型 變量ctl來(lái)存儲(chǔ),它由四個(gè)16位的子域組成:

  • AC:正在運(yùn)行工作線程數(shù)減去目標(biāo)并行度涣脚,高16位
  • TC:總工作線程數(shù)減去目標(biāo)并行度示辈,中高16位
  • SS:棧頂?shù)却€程的版本計(jì)數(shù)和狀態(tài),中低16位
  • ID: 棧頂 WorkQueue 在池中的索引(poolIndex)遣蚀,低16位

在后面的源碼解析中矾麻,某些地方也提取了ctl的低32位(sp=(int)ctl)來(lái)檢查工作線程狀態(tài),例如芭梯,當(dāng)sp不為0時(shí)說(shuō)明當(dāng)前還有空閑工作線程险耀。

  1. ForkJoinPool.WorkQueue 中的相關(guān)屬性:
//初始隊(duì)列容量,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊(duì)列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// 實(shí)例字段
volatile int scanState;    // Woker狀態(tài), <0: inactive; odd:scanning
int stackPred;             // 記錄前一個(gè)棧頂?shù)腸tl
int nsteals;               // 偷取任務(wù)數(shù)
int hint;                  // 記錄偷取者索引玖喘,初始為隨機(jī)索引
int config;                // 池索引和模式
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         //下一個(gè)poll操作的索引(棧底/隊(duì)列頭)
int top;                   //  下一個(gè)push操作的索引(棧頂/隊(duì)列尾)
ForkJoinTask<?>[] array;   // 任務(wù)數(shù)組
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當(dāng)前工作隊(duì)列的工作線程甩牺,共享模式下為null
volatile Thread parker;    // 調(diào)用park阻塞期間為owner,其他情況為null
volatile ForkJoinTask<?> currentJoin;  // 記錄被join過(guò)來(lái)的任務(wù)
volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊(duì)列偷取過(guò)來(lái)的任務(wù)

2. ForkJoinTask

ForkJoinTask 繼承關(guān)系
  • ForkJoinTask 實(shí)現(xiàn)了 Future 接口芒涡,說(shuō)明它也是一個(gè)可取消的異步運(yùn)算任務(wù)柴灯,實(shí)際上ForkJoinTask 是 Future 的輕量級(jí)實(shí)現(xiàn)卖漫,主要用在純粹是計(jì)算的函數(shù)式任務(wù)或者操作完全獨(dú)立的對(duì)象計(jì)算任務(wù)费尽。fork 是主運(yùn)行方法,用于異步執(zhí)行羊始;而 join 方法在任務(wù)結(jié)果計(jì)算完畢之后才會(huì)運(yùn)行旱幼,用來(lái)合并或返回計(jì)算結(jié)果。
  • 其內(nèi)部類都比較簡(jiǎn)單突委,ExceptionNode 是用于存儲(chǔ)任務(wù)執(zhí)行期間的異常信息的單向鏈表柏卤;其余四個(gè)類是為 Runnable/Callable 任務(wù)提供的適配器類,用于把 Runnable/Callable 轉(zhuǎn)化為 ForkJoinTask 類型的任務(wù)(因?yàn)?ForkJoinPool 只可以運(yùn)行 ForkJoinTask 類型的任務(wù))匀油。
核心參數(shù)
/** 任務(wù)運(yùn)行狀態(tài) */
volatile int status; // 任務(wù)運(yùn)行狀態(tài)
static final int DONE_MASK   = 0xf0000000;  // 任務(wù)完成狀態(tài)標(biāo)志位
static final int NORMAL      = 0xf0000000;  // must be negative
static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 等待信號(hào)
static final int SMASK       = 0x0000ffff;  //  低位掩碼

源碼解析

首先介紹一下構(gòu)造函數(shù):

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
            checkFactory(factory),
            handler,
            asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
            "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

說(shuō)明: 在 ForkJoinPool 中我們可以自定義四個(gè)參數(shù):

  • parallelism:并行度缘缚,默認(rèn)為CPU數(shù),最小為1
  • factory:工作線程工廠敌蚜;
  • handler:處理工作線程運(yùn)行任務(wù)時(shí)的異常情況類桥滨,默認(rèn)為null;
  • asyncMode:是否為異步模式弛车,默認(rèn)為 false齐媒。如果為true,表示子任務(wù)的執(zhí)行遵循 FIFO 順序并且任務(wù)不能被合并(join)纷跛,這種模式適用于工作線程只運(yùn)行事件類型的異步任務(wù)喻括。

在多數(shù)場(chǎng)景使用時(shí),如果沒(méi)有太強(qiáng)的業(yè)務(wù)需求贫奠,我們一般直接使用 ForkJoinPool 中的common池唬血,在JDK1.8之后提供了ForkJoinPool.commonPool()方法可以直接使用common池望蜡,來(lái)看一下它的構(gòu)造:

private static ForkJoinPool makeCommonPool() {
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {  // ignore exceptions in accessing/parsing
        String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");//并行度
        String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");//線程工廠
        String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");//異常處理類
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory) ClassLoader.
                    getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler) ClassLoader.
                    getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    }
    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;//默認(rèn)并行度為1
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
            "ForkJoinPool.commonPool-worker-");
}

使用common pool的優(yōu)點(diǎn)就是我們可以通過(guò)指定系統(tǒng)參數(shù)的方式定義“并行度、線程工廠和異常處理類”拷恨;并且它使用的是同步模式泣特,也就是說(shuō)可以支持任務(wù)合并(join)。


ForkJoinPool 中的任務(wù)執(zhí)行分兩種:

  1. 直接通過(guò) FJP 提交的外部任務(wù)(external/submissions task)挑随,存放在 workQueues 的偶數(shù)槽位状您;
  2. 通過(guò)內(nèi)部 fork 分割的子任務(wù)(Worker task),存放在 workQueues 的奇數(shù)槽位兜挨。

首先來(lái)看一下整個(gè)Fork/Join 框架的執(zhí)行流程膏孟,后面我們的源碼解析會(huì)完全按照這個(gè)流程圖來(lái)進(jìn)行

ForkJoinPool 任務(wù)執(zhí)行流程

在接下來(lái)的解析中,我們會(huì)分四個(gè)部分:首先介紹兩種任務(wù)的提交流程拌汇;再分析任務(wù)的執(zhí)行過(guò)程(ForkJoinWorkerThread.run()ForkJoinTask.doExec()這一部分)柒桑;最后介紹任務(wù)的結(jié)果獲取(ForkJoinTask.join()ForkJoinTask.invoke()

1. 外部任務(wù)(external/submissions task)提交

向 ForkJoinPool 提交任務(wù)有三種方式:invoke()會(huì)等待任務(wù)計(jì)算完畢并返回計(jì)算結(jié)果噪舀;execute()是直接向池提交一個(gè)任務(wù)來(lái)異步執(zhí)行魁淳,無(wú)返回結(jié)果;submit()也是異步執(zhí)行与倡,但是會(huì)返回提交的任務(wù)界逛,在適當(dāng)?shù)臅r(shí)候可通過(guò)task.get()獲取執(zhí)行結(jié)果。
這三種提交方式都都是調(diào)用externalPush()方法來(lái)完成纺座,所以接下來(lái)我們將從externalPush()方法開(kāi)始逐步分析外部任務(wù)的執(zhí)行過(guò)程息拜。

1.1 externalPush(ForkJoinTask<?> task)

//添加給定任務(wù)到submission隊(duì)列中
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws;
    WorkQueue q;
    int m;
    int r = ThreadLocalRandom.getProbe();//探針值,用于計(jì)算WorkQueue槽位索引
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //獲取隨機(jī)偶數(shù)槽位的workQueue
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定workQueue
        ForkJoinTask<?>[] a;
        int am, n, s;
        if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;//計(jì)算任務(wù)索引位置
            U.putOrderedObject(a, j, task);//任務(wù)入列
            U.putOrderedInt(q, QTOP, s + 1);//更新push slot
            U.putIntVolatile(q, QLOCK, 0);//解除鎖定
            if (n <= 1)
                signalWork(ws, q);//任務(wù)數(shù)小于1時(shí)嘗試創(chuàng)建或激活一個(gè)工作線程
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
    }
    externalSubmit(task);//初始化workQueues及相關(guān)屬性
}

首先說(shuō)明一下externalPushexternalSubmit兩個(gè)方法的聯(lián)系:它們的作用都是把任務(wù)放到隊(duì)列中等待執(zhí)行净响。不同的是少欺,externalSubmit可以說(shuō)是完整版的externalPush,在任務(wù)首次提交時(shí)馋贤,需要初始化workQueues及其他相關(guān)屬性赞别,這個(gè)初始化操作就是externalSubmit來(lái)完成的;而后再向池中提交的任務(wù)都是通過(guò)簡(jiǎn)化版的externalSubmit-externalPush來(lái)完成配乓。

externalPush的執(zhí)行流程很簡(jiǎn)單:首先找到一個(gè)隨機(jī)偶數(shù)槽位的 workQueue仿滔,然后把任務(wù)放入這個(gè) workQueue 的任務(wù)數(shù)組中,并更新top位扰付。如果隊(duì)列的剩余任務(wù)數(shù)小于1堤撵,則嘗試創(chuàng)建或激活一個(gè)工作線程來(lái)運(yùn)行任務(wù)(防止在externalSubmit初始化時(shí)發(fā)生異常導(dǎo)致工作線程創(chuàng)建失敗)羽莺。

1.2 externalSubmit(ForkJoinTask<?> task)

//任務(wù)提交
private void externalSubmit(ForkJoinTask<?> task) {
    //初始化調(diào)用線程的探針值实昨,用于計(jì)算WorkQueue索引
    int r;                                    // initialize caller's probe
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    for (; ; ) {
        WorkQueue[] ws;
        WorkQueue q;
        int rs, m, k;
        boolean move = false;
        if ((rs = runState) < 0) {// 池已關(guān)閉
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        //初始化workQueues
        else if ((rs & STARTED) == 0 ||     // initialize
                ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();//鎖定runState
            try {
                //初始化
                if ((rs & STARTED) == 0) {
                    //初始化stealCounter
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                            new AtomicLong());
                    //創(chuàng)建workQueues,容量為2的冪次方
                    // create workQueues array with size a power of two
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1;
                    n |= n >>> 2;
                    n |= n >>> 4;
                    n |= n >>> 8;
                    n |= n >>> 16;
                    n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);//解鎖并更新runState
            }
        } else if ((q = ws[k = r & m & SQMASK]) != null) {//獲取隨機(jī)偶數(shù)槽位的workQueue
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {//鎖定 workQueue
                ForkJoinTask<?>[] a = q.array;//當(dāng)前workQueue的全部任務(wù)
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {//擴(kuò)容
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);//放入給定任務(wù)
                        U.putOrderedInt(q, QTOP, s + 1);//修改push slot
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);//解除鎖定
                }
                if (submitted) {//任務(wù)提交成功盐固,創(chuàng)建或激活工作線程
                    signalWork(ws, q);//創(chuàng)建或激活一個(gè)工作線程來(lái)運(yùn)行任務(wù)
                    return;
                }
            }
            move = true;                   // move on failure 操作失敗荒给,重新獲取探針值
        } else if (((rs = runState) & RSLOCK) == 0) { // create new queue
            q = new WorkQueue(this, null);
            q.hint = r;
            q.config = k | SHARED_QUEUE;
            q.scanState = INACTIVE;
            rs = lockRunState();           // publish index
            if (rs > 0 && (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                ws[k] = q;                 // 更新索引k位值的workQueue
            //else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        } else
            move = true;                   // move if busy
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);//重新獲取線程探針值
    }
}

說(shuō)明externalSubmitexternalPush的完整版本丈挟,主要用于第一次提交任務(wù)時(shí)初始化workQueues及相關(guān)屬性,并且提交給定任務(wù)到隊(duì)列中志电。具體執(zhí)行步驟如下:

  1. 如果池為終止?fàn)顟B(tài)(runState<0)曙咽,調(diào)用tryTerminate來(lái)終止線程池,并拋出任務(wù)拒絕異常挑辆;
  2. 如果尚未初始化例朱,就為 FJP 執(zhí)行初始化操作:初始化stealCounter、創(chuàng)建workerQueues鱼蝉,然后繼續(xù)自旋洒嗤;
  3. 初始化完成后,執(zhí)行在externalPush中相同的操作:獲取 workQueue魁亦,放入指定任務(wù)渔隶。任務(wù)提交成功后調(diào)用signalWork方法創(chuàng)建或激活線程;
  4. 如果在步驟3中獲取到的 workQueue 為null洁奈,會(huì)在這一步中創(chuàng)建一個(gè) workQueue间唉,創(chuàng)建成功繼續(xù)自旋執(zhí)行第三步操作;
  5. 如果非上述情況利术,或者有線程爭(zhēng)用資源導(dǎo)致獲取鎖失敗呈野,就重新獲取線程探針值繼續(xù)自旋。

1.3 signalWork(WorkQueue[] ws, WorkQueue q)

final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c;
    int sp, i;
    WorkQueue v;
    Thread p;
    while ((c = ctl) < 0L) {                       // too few active
        if ((sp = (int) c) == 0) {                  // no idle workers
            if ((c & ADD_WORKER) != 0L)            // too few workers
                tryAddWorker(c);//工作線程太少氯哮,添加新的工作線程
            break;
        }
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        //計(jì)算ctl际跪,加上版本戳SS_SEQ避免ABA問(wèn)題
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        //計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位)
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p);//喚醒阻塞線程
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

說(shuō)明:新建或喚醒一個(gè)工作線程商佛,在externalPush喉钢、externalSubmitworkQueue.push良姆、scan中調(diào)用肠虽。如果還有空閑線程,則嘗試喚醒索引到的 WorkQueue 的parker線程玛追;如果工作線程過(guò)少((ctl & ADD_WORKER) != 0L)税课,則調(diào)用tryAddWorker添加一個(gè)新的工作線程。

1.4 tryAddWorker(long c)

private void tryAddWorker(long c) {
    boolean add = false;
    do {
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        if (ctl == c) {
            int rs, stop;                 // check if terminating
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);//釋放鎖
            if (stop != 0)
                break;
            if (add) {
                createWorker();//創(chuàng)建工作線程
                break;
            }
        }
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

說(shuō)明:嘗試添加一個(gè)新的工作線程痊剖,首先更新ctl中的工作線程數(shù)韩玩,然后調(diào)用createWorker()創(chuàng)建工作線程。

1.5 createWorker()

private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    deregisterWorker(wt, ex);//線程創(chuàng)建失敗處理
    return false;
}

說(shuō)明createWorker首先通過(guò)線程工廠創(chuàng)一個(gè)新的ForkJoinWorkerThread陆馁,然后啟動(dòng)這個(gè)工作線程(wt.start())找颓。如果期間發(fā)生異常,調(diào)用deregisterWorker處理線程創(chuàng)建失敗的邏輯(deregisterWorker在后面再詳細(xì)說(shuō)明)叮贩。

ForkJoinWorkerThread 的構(gòu)造函數(shù)如下:

protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

可以看到 ForkJoinWorkerThread 在構(gòu)造時(shí)首先調(diào)用父類 Thread 的方法击狮,然后為工作線程注冊(cè)poolworkQueue佛析,而workQueue的注冊(cè)任務(wù)由ForkJoinPool.registerWorker來(lái)完成。

1.6 registerWorker()

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    //設(shè)置為守護(hù)線程
    wt.setDaemon(true);                           // configure thread
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);
    WorkQueue w = new WorkQueue(this, wt);//構(gòu)造新的WorkQueue
    int i = 0;                                    // assign a pool index
    int mode = config & MODE_MASK;
    int rs = lockRunState();
    try {
        WorkQueue[] ws;
        int n;                    // skip if no array
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            //生成新建WorkQueue的索引
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            int m = n - 1;
            i = ((s << 1) | 1) & m;               // Worker任務(wù)放在奇數(shù)索引位 odd-numbered indices
            if (ws[i] != null) {                  // collision 已存在彪蓬,重新計(jì)算索引位
                int probes = 0;                   // step by approx half n
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                //查找可用的索引位
                while (ws[i = (i + step) & m] != null) {
                    if (++probes >= n) {//所有索引位都被占用寸莫,對(duì)workQueues進(jìn)行擴(kuò)容
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);//workQueues 擴(kuò)容
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            w.hint = s;                           // use as random seed
            w.config = i | mode;
            w.scanState = i;                      // publication fence
            ws[i] = w;
        }
    } finally {
        unlockRunState(rs, rs & ~RSLOCK);
    }
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

說(shuō)明registerWorker是 ForkJoinWorkerThread 構(gòu)造器的回調(diào)函數(shù),用于創(chuàng)建和記錄工作線程的 WorkQueue档冬。比較簡(jiǎn)單膘茎,就不多贅述了。注意在此為工作線程創(chuàng)建的 WorkQueue 是放在奇數(shù)索引的(代碼行:i = ((s << 1) | 1) & m;

1.7 小結(jié)

OK酷誓,外部任務(wù)的提交流程就先講到這里辽狈。在createWorker()中啟動(dòng)工作線程后(wt.start()),當(dāng)為線程分配到CPU執(zhí)行時(shí)間片之后會(huì)運(yùn)行 ForkJoinWorkerThread 的run方法開(kāi)啟線程來(lái)執(zhí)行任務(wù)呛牲。工作線程執(zhí)行任務(wù)的流程我們?cè)谥v完內(nèi)部任務(wù)提交之后會(huì)統(tǒng)一講解刮萌。


2. 子任務(wù)(Worker task)提交

子任務(wù)的提交相對(duì)比較簡(jiǎn)單,由任務(wù)的fork()方法完成娘扩。通過(guò)上面的流程圖可以看到任務(wù)被分割(fork)之后調(diào)用了ForkJoinPool.WorkQueue.push()方法直接把任務(wù)放到隊(duì)列中等待被執(zhí)行着茸。

2.1 ForkJoinTask.fork()

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

說(shuō)明:如果當(dāng)前線程是 Worker 線程,說(shuō)明當(dāng)前任務(wù)是fork分割的子任務(wù)琐旁,通過(guò)ForkJoinPool.workQueue.push()方法直接把任務(wù)放到自己的等待隊(duì)列中涮阔;否則調(diào)用ForkJoinPool.externalPush()提交到一個(gè)隨機(jī)的等待隊(duì)列中(外部任務(wù))。

2.2 ForkJoinPool.WorkQueue.push()

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a;
    ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {//首次提交灰殴,創(chuàng)建或喚醒一個(gè)工作線程
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        } else if (n >= m)
            growArray();
    }
}

說(shuō)明:首先把任務(wù)放入等待隊(duì)列并更新top位敬特;如果當(dāng)前 WorkQueue 為新建的等待隊(duì)列(top-base<=1),則調(diào)用signalWork方法為當(dāng)前 WorkQueue 新建或喚醒一個(gè)工作線程牺陶;如果 WorkQueue 中的任務(wù)數(shù)組容量過(guò)小伟阔,則調(diào)用growArray()方法對(duì)其進(jìn)行兩倍擴(kuò)容,growArray()方法源碼如下:

final ForkJoinTask<?>[] growArray() {
    ForkJoinTask<?>[] oldA = array;//獲取內(nèi)部任務(wù)列表
    int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
    if (size > MAXIMUM_QUEUE_CAPACITY)
        throw new RejectedExecutionException("Queue capacity exceeded");
    int oldMask, t, b;
    //新建一個(gè)兩倍容量的任務(wù)數(shù)組
    ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
    if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
            (t = top) - (b = base) > 0) {
        int mask = size - 1;
        //從老數(shù)組中拿出數(shù)據(jù)掰伸,放到新的數(shù)組中
        do { // emulate poll from old array, push to new array
            ForkJoinTask<?> x;
            int oldj = ((b & oldMask) << ASHIFT) + ABASE;
            int j = ((b & mask) << ASHIFT) + ABASE;
            x = (ForkJoinTask<?>) U.getObjectVolatile(oldA, oldj);
            if (x != null &&
                    U.compareAndSwapObject(oldA, oldj, x, null))
                U.putObjectVolatile(a, j, x);
        } while (++b != t);
    }
    return a;
}

2.3 小結(jié)

到此忧额,兩種任務(wù)的提交流程都已經(jīng)解析完畢抹腿,下一節(jié)我們來(lái)一起看看任務(wù)提交之后是如何被運(yùn)行的。


3. 任務(wù)執(zhí)行

回到我們開(kāi)始時(shí)的流程圖,在ForkJoinPool .createWorker()方法中創(chuàng)建工作線程后兢哭,會(huì)啟動(dòng)工作線程聋亡,系統(tǒng)為工作線程分配到CPU執(zhí)行時(shí)間片之后會(huì)執(zhí)行 ForkJoinWorkerThread 的run()方法正式開(kāi)始執(zhí)行任務(wù)汰蓉。

3.1 ForkJoinWorkerThread.run()

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();//鉤子方法珊佣,可自定義擴(kuò)展
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            try {
                onTermination(exception);//鉤子方法,可自定義擴(kuò)展
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);//處理異常
            }
        }
    }
}

說(shuō)明:方法很簡(jiǎn)單惯退,在工作線程運(yùn)行前后會(huì)調(diào)用自定義鉤子函數(shù)(onStartonTermination)赌髓,任務(wù)的運(yùn)行則是調(diào)用了ForkJoinPool.runWorker()。如果全部任務(wù)執(zhí)行完畢或者期間遭遇異常,則通過(guò)ForkJoinPool.deregisterWorker關(guān)閉工作線程并處理異常信息(deregisterWorker方法我們后面會(huì)詳細(xì)講解)春弥。

3.2 ForkJoinPool.runWorker(WorkQueue w)

final void runWorker(WorkQueue w) {
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask<?> t; ; ) {
        if ((t = scan(w, r)) != null)//掃描任務(wù)執(zhí)行
            w.runTask(t);
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13;
        r ^= r >>> 17;
        r ^= r << 5; // xorshift
    }
}

說(shuō)明runWorker是 ForkJoinWorkerThread 的主運(yùn)行方法呛哟,用來(lái)依次執(zhí)行當(dāng)前工作線程中的任務(wù)。函數(shù)流程很簡(jiǎn)單:調(diào)用scan方法依次獲取任務(wù)匿沛,然后調(diào)用WorkQueue .runTask運(yùn)行任務(wù)扫责;如果未掃描到任務(wù),則調(diào)用awaitWork等待逃呼,直到工作線程/線程池終止或等待超時(shí)鳖孤。

3.3 ForkJoinPool.scan(WorkQueue w, int r)

private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws;
    int m;
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
        int ss = w.scanState;                     // initially non-negative
        //初始掃描起點(diǎn),自旋掃描
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
            WorkQueue q;
            ForkJoinTask<?>[] a;
            ForkJoinTask<?> t;
            int b, n;
            long c;
            if ((q = ws[k]) != null) {//獲取workQueue
                if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                    //計(jì)算偏移量
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask<?>)
                            U.getObjectVolatile(a, i))) != null && //取base位置任務(wù)
                            q.base == b) {//stable
                        if (ss >= 0) {  //scanning
                            if (U.compareAndSwapObject(a, i, t, null)) {//
                                q.base = b + 1;//更新base位
                                if (n < -1)       // signal others
                                    signalWork(ws, q);//創(chuàng)建或喚醒工作線程來(lái)運(yùn)行任務(wù)
                                return t;
                            }
                        } else if (oldSum == 0 &&   // try to activate 嘗試激活工作線程
                                w.scanState < 0)
                            tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);//喚醒棧頂工作線程
                    }
                    //base位置任務(wù)為空或base位置偏移抡笼,隨機(jī)移位重新掃描
                    if (ss < 0)                   // refresh
                        ss = w.scanState;
                    r ^= r << 1;
                    r ^= r >>> 3;
                    r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                checkSum += b;//隊(duì)列任務(wù)為空苏揣,記錄base位
            }
            //更新索引k 繼續(xù)向后查找
            if ((k = (k + 1) & m) == origin) {    // continue until stable
                //運(yùn)行到這里說(shuō)明已經(jīng)掃描了全部的 workQueues,但并未掃描到任務(wù)

                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0)    // already inactive
                        break;// 已經(jīng)被滅活或終止,跳出循環(huán)

                    //對(duì)當(dāng)前WorkQueue進(jìn)行滅活操作
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                            (UC_MASK & ((c = ctl) - AC_UNIT)));//計(jì)算ctl為INACTIVE狀態(tài)并減少活躍線程數(shù)
                    w.stackPred = (int) c;         // hold prev stack top
                    U.putInt(w, QSCANSTATE, ns);//修改scanState為inactive狀態(tài)
                    if (U.compareAndSwapLong(this, CTL, c, nc))//更新scanState為滅活狀態(tài)
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;//重置checkSum推姻,繼續(xù)循環(huán)
            }
        }
    }
    return null;
}

說(shuō)明:掃描并嘗試偷取一個(gè)任務(wù)平匈。使用w.hint進(jìn)行隨機(jī)索引 WorkQueue,也就是說(shuō)并不一定會(huì)執(zhí)行當(dāng)前 WorkQueue 中的任務(wù)藏古,而是偷取別的Worker的任務(wù)來(lái)執(zhí)行增炭。

函數(shù)的大概執(zhí)行流程如下:

  1. 取隨機(jī)位置的一個(gè) WorkQueue;
  2. 獲取base位的 ForkJoinTask拧晕,成功取到后更新base位并返回任務(wù)隙姿;如果取到的 WorkQueue 中任務(wù)數(shù)大于1,則調(diào)用signalWork創(chuàng)建或喚醒其他工作線程厂捞;
  3. 如果當(dāng)前工作線程處于不活躍狀態(tài)(INACTIVE)输玷,則調(diào)用tryRelease嘗試喚醒棧頂工作線程來(lái)執(zhí)行。tryRelease源碼如下:
private boolean tryRelease(long c, WorkQueue v, long inc) {
    int sp = (int) c, vs = (sp + SS_SEQ) & ~INACTIVE;
    Thread p;
    //ctl低32位等于scanState靡馁,說(shuō)明可以喚醒parker線程
    if (v != null && v.scanState == sp) {          // v is at top of stack
        //計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位)
        long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
        if (U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;
            if ((p = v.parker) != null)
                U.unpark(p);//喚醒線程
            return true;
        }
    }
    return false;
}
  1. 如果base位任務(wù)為空或發(fā)生偏移欲鹏,則對(duì)索引位進(jìn)行隨機(jī)移位,然后重新掃描奈嘿;
  2. 如果掃描整個(gè)workQueues之后沒(méi)有獲取到任務(wù)貌虾,則設(shè)置當(dāng)前工作線程為INACTIVE狀態(tài);然后重置checkSum裙犹,再次掃描一圈之后如果還沒(méi)有任務(wù)則跳出循環(huán)返回null

3.4 ForkJoinPool.awaitWork(WorkQueue w, int r)

private boolean awaitWork(WorkQueue w, int r) {
    if (w == null || w.qlock < 0)                 // w is terminating
        return false;
    for (int pred = w.stackPred, spins = SPINS, ss; ; ) {
        if ((ss = w.scanState) >= 0)//正在掃描衔憨,跳出循環(huán)
            break;
        else if (spins > 0) {
            r ^= r << 6;
            r ^= r >>> 21;
            r ^= r << 7;
            if (r >= 0 && --spins == 0) {         // randomize spins
                WorkQueue v;
                WorkQueue[] ws;
                int s, j;
                AtomicLong sc;
                if (pred != 0 && (ws = workQueues) != null &&
                        (j = pred & SMASK) < ws.length &&
                        (v = ws[j]) != null &&        // see if pred parking
                        (v.parker == null || v.scanState >= 0))
                    spins = SPINS;                // continue spinning
            }
        } else if (w.qlock < 0)                     // 當(dāng)前workQueue已經(jīng)終止叶圃,返回false recheck after spins
            return false;
        else if (!Thread.interrupted()) {//判斷線程是否被中斷,并清除中斷狀態(tài)
            long c, prevctl, parkTime, deadline;
            int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK);//活躍線程數(shù)
            if ((ac <= 0 && tryTerminate(false, false)) || //無(wú)active線程践图,嘗試終止
                    (runState & STOP) != 0)           // pool terminating
                return false;
            if (ac <= 0 && ss == (int) c) {        // is last waiter
                //計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位)
                prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                int t = (short) (c >>> TC_SHIFT);  // shrink excess spares
                if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//總線程過(guò)量
                    return false;                 // else use timed wait
                //計(jì)算空閑超時(shí)時(shí)間
                parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
            } else
                prevctl = parkTime = deadline = 0L;
            Thread wt = Thread.currentThread();
            U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
            w.parker = wt;//設(shè)置parker掺冠,準(zhǔn)備阻塞
            if (w.scanState < 0 && ctl == c)      // recheck before park
                U.park(false, parkTime);//阻塞指定的時(shí)間

            U.putOrderedObject(w, QPARKER, null);
            U.putObject(wt, PARKBLOCKER, null);
            if (w.scanState >= 0)//正在掃描,說(shuō)明等到任務(wù),跳出循環(huán)
                break;
            if (parkTime != 0L && ctl == c &&
                    deadline - System.nanoTime() <= 0L &&
                    U.compareAndSwapLong(this, CTL, c, prevctl))//未等到任務(wù)德崭,更新ctl斥黑,返回false
                return false;                     // shrink pool
        }
    }
    return true;
}

說(shuō)明:回到runWorker方法,如果scan方法未掃描到任務(wù)眉厨,會(huì)調(diào)用awaitWork等待獲取任務(wù)锌奴。函數(shù)的具體執(zhí)行流程大家看源碼,這里簡(jiǎn)單說(shuō)一下:
在等待獲取任務(wù)期間憾股,如果工作線程或線程池已經(jīng)終止則直接返回false鹿蜀。如果當(dāng)前無(wú) active 線程,嘗試終止線程池并返回false服球,如果終止失敗并且當(dāng)前是最后一個(gè)等待的 Worker茴恰,就阻塞指定的時(shí)間(IDLE_TIMEOUT);等到屆期或被喚醒后如果發(fā)現(xiàn)自己是scanningscanState >= 0)狀態(tài)斩熊,說(shuō)明已經(jīng)等到任務(wù)往枣,跳出等待返回true繼續(xù) scan,否則的更新ctl并返回false粉渠。

3.5 WorkQueue.runTask()

final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentSteal = task).doExec();//更新currentSteal并執(zhí)行任務(wù)
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();//依次執(zhí)行本地任務(wù)
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);//增加偷取任務(wù)數(shù)
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();//執(zhí)行鉤子函數(shù)
    }
}

說(shuō)明:在scan方法掃描到任務(wù)之后婉商,調(diào)用WorkQueue.runTask()來(lái)執(zhí)行獲取到的任務(wù),大概流程如下:

  1. 標(biāo)記scanState為正在執(zhí)行狀態(tài)渣叛;
  2. 更新currentSteal為當(dāng)前獲取到的任務(wù)并執(zhí)行它丈秩,任務(wù)的執(zhí)行調(diào)用了ForkJoinTask.doExec()方法,源碼如下:
//ForkJoinTask.doExec()
final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();//執(zhí)行我們定義的任務(wù)
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}
  1. 調(diào)用execLocalTasks依次執(zhí)行當(dāng)前WorkerQueue中的任務(wù)淳衙,源碼如下:
//執(zhí)行并移除所有本地任務(wù)
final void execLocalTasks() {
    int b = base, m, s;
    ForkJoinTask<?>[] a = array;
    if (b - (s = top - 1) <= 0 && a != null &&
            (m = a.length - 1) >= 0) {
        if ((config & FIFO_QUEUE) == 0) {//FIFO模式
            for (ForkJoinTask<?> t; ; ) {
                if ((t = (ForkJoinTask<?>) U.getAndSetObject
                        (a, ((m & s) << ASHIFT) + ABASE, null)) == null)//FIFO執(zhí)行蘑秽,取top任務(wù)
                    break;
                U.putOrderedInt(this, QTOP, s);
                t.doExec();//執(zhí)行
                if (base - (s = top - 1) > 0)
                    break;
            }
        } else
            pollAndExecAll();//LIFO模式執(zhí)行,取base任務(wù)
    }
}
  1. 更新偷取任務(wù)數(shù)箫攀;
  2. 還原scanState并執(zhí)行鉤子函數(shù)肠牲。

3.6 ForkJoinPool.deregisterWorker(ForkJoinWorkerThread wt, Throwable ex)

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    WorkQueue w = null;
    //1.移除workQueue
    if (wt != null && (w = wt.workQueue) != null) {//獲取ForkJoinWorkerThread的等待隊(duì)列
        WorkQueue[] ws;                           // remove index from array
        int idx = w.config & SMASK;//計(jì)算workQueue索引
        int rs = lockRunState();//獲取runState鎖和當(dāng)前池運(yùn)行狀態(tài)
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            ws[idx] = null;//移除workQueue
        unlockRunState(rs, rs & ~RSLOCK);//解除runState鎖
    }
    //2.減少CTL數(shù)
    long c;                                       // decrement counts
    do {} while (!U.compareAndSwapLong
                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                       (TC_MASK & (c - TC_UNIT)) |
                                       (SP_MASK & c))));
    //3.處理被移除workQueue內(nèi)部相關(guān)參數(shù)
    if (w != null) {
        w.qlock = -1;                             // ensure set
        w.transferStealCount(this);
        w.cancelAll();                            // cancel remaining tasks
    }
    //4.如果線程未終止,替換被移除的workQueue并喚醒內(nèi)部線程
    for (;;) {                                    // possibly replace
        WorkQueue[] ws; int m, sp;
        //嘗試終止線程池
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)              // already terminating
            break;
        //喚醒被替換的線程靴跛,依賴于下一步
        if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
        }
        //創(chuàng)建工作線程替換
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            tryAddWorker(c);                      // create replacement
            break;
        }
        else                                      // don't need replacement
            break;
    }
    //5.處理異常
    if (ex == null)                               // help clean on way out
        ForkJoinTask.helpExpungeStaleExceptions();
    else                                          // rethrow
        ForkJoinTask.rethrow(ex);
}

說(shuō)明deregisterWorker方法用于工作線程運(yùn)行完畢之后終止線程或處理工作線程異常缀雳,主要就是清除已關(guān)閉的工作線程或回滾創(chuàng)建線程之前的操作,并把傳入的異常拋給 ForkJoinTask 來(lái)處理梢睛。具體步驟見(jiàn)源碼注釋肥印。

3.7 小結(jié)

本節(jié)我們對(duì)任務(wù)的執(zhí)行流程進(jìn)行了說(shuō)明,后面我們將繼續(xù)介紹任務(wù)的結(jié)果獲染稀(join/invoke)深碱。


4. 獲取任務(wù)結(jié)果 - ForkJoinTask.join() / ForkJoinTask.invoke()

join() :

//合并任務(wù)結(jié)果
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

//join, get, quietlyJoin的主實(shí)現(xiàn)方法
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

invoke() :

//執(zhí)行任務(wù),并等待任務(wù)完成并返回結(jié)果
public final V invoke() {
    int s;
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

//invoke, quietlyInvoke的主實(shí)現(xiàn)方法
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

說(shuō)明: join()方法一般是在任務(wù)fork()之后調(diào)用藏畅,用來(lái)獲确蠊琛(或者叫“合并”)任務(wù)的執(zhí)行結(jié)果。

ForkJoinTask的join()invoke()方法都可以用來(lái)獲取任務(wù)的執(zhí)行結(jié)果(另外還有get方法也是調(diào)用了doJoin來(lái)獲取任務(wù)結(jié)果,但是會(huì)響應(yīng)運(yùn)行時(shí)異常)绞蹦,它們對(duì)外部提交任務(wù)的執(zhí)行方式一致力奋,都是通過(guò)externalAwaitDone方法等待執(zhí)行結(jié)果。不同的是invoke()方法會(huì)直接執(zhí)行當(dāng)前任務(wù)幽七;而join()方法則是在當(dāng)前任務(wù)在隊(duì)列 top 位時(shí)(通過(guò)tryUnpush方法判斷)才能執(zhí)行景殷,如果當(dāng)前任務(wù)不在 top 位或者任務(wù)執(zhí)行失敗調(diào)用ForkJoinPool.awaitJoin方法幫助執(zhí)行或阻塞當(dāng)前 join 任務(wù)。(所以在官方文檔中建議了我們對(duì)ForkJoinTask任務(wù)的調(diào)用順序锉走,一對(duì) fork-join操作一般按照如下順序調(diào)用:a.fork(); b.fork(); b.join(); a.join();滨彻。因?yàn)槿蝿?wù) b 是后面進(jìn)入隊(duì)列,也就是說(shuō)它是在棧頂?shù)模╰op 位)挪蹭,在它fork()之后直接調(diào)用join()就可以直接執(zhí)行而不會(huì)調(diào)用ForkJoinPool.awaitJoin方法去等待亭饵。

在這些方法中,join()相對(duì)比較全面梁厉,所以之后的講解我們將從join()開(kāi)始逐步向下分析辜羊,首先看一下join()的執(zhí)行流程:

join 執(zhí)行流程

后面的源碼分析中,我們首先講解比較簡(jiǎn)單的外部 join 任務(wù)(externalAwaitDone)词顾,然后再講解內(nèi)部 join 任務(wù)(從ForkJoinPool.awaitJoin()開(kāi)始)八秃。

4.1 ForkJoinTask.externalAwaitDone()

private int externalAwaitDone() {
    //執(zhí)行任務(wù)
    int s = ((this instanceof CountedCompleter) ? // try helping
             ForkJoinPool.common.externalHelpComplete(  // CountedCompleter任務(wù)
                 (CountedCompleter<?>)this, 0) :
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);  // ForkJoinTask任務(wù)
    if (s >= 0 && (s = status) >= 0) {//執(zhí)行失敗,進(jìn)入等待
        boolean interrupted = false;
        do {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {  //更新state
                synchronized (this) {
                    if (status >= 0) {//SIGNAL 等待信號(hào)
                        try {
                            wait(0L);
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    else
                        notifyAll();
                }
            }
        } while ((s = status) >= 0);
        if (interrupted)
            Thread.currentThread().interrupt();
    }
    return s;
}

說(shuō)明: 如果當(dāng)前join為外部調(diào)用肉盹,則調(diào)用此方法執(zhí)行任務(wù)昔驱,如果任務(wù)執(zhí)行失敗就進(jìn)入等待。方法本身是很簡(jiǎn)單的上忍,需要注意的是對(duì)不同的任務(wù)類型分兩種情況:

  • 如果我們的任務(wù)為 CountedCompleter 類型的任務(wù)骤肛,則調(diào)用externalHelpComplete方法來(lái)執(zhí)行任務(wù)。(后面筆者會(huì)開(kāi)新篇來(lái)專門講解CountedCompleter窍蓝,在此篇就不詳細(xì)介紹了腋颠,有興趣的同學(xué)們可以自己先看一下。)

  • 其他類型的 ForkJoinTask 任務(wù)調(diào)用tryExternalUnpush來(lái)執(zhí)行吓笙,源碼如下:

//為外部提交者提供 tryUnpush 功能(給定任務(wù)在top位時(shí)彈出任務(wù))
final boolean tryExternalUnpush(ForkJoinTask<?> task) {
    WorkQueue[] ws;
    WorkQueue w;
    ForkJoinTask<?>[] a;
    int m, s;
    int r = ThreadLocalRandom.getProbe();
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
            (w = ws[m & r & SQMASK]) != null &&
            (a = w.array) != null && (s = w.top) != w.base) {
        long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;  //取top位任務(wù)
        if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {  //加鎖
            if (w.top == s && w.array == a &&
                    U.getObject(a, j) == task &&
                    U.compareAndSwapObject(a, j, task, null)) {  //符合條件淑玫,彈出
                U.putOrderedInt(w, QTOP, s - 1);  //更新top
                U.putOrderedInt(w, QLOCK, 0); //解鎖,返回true
                return true;
            }
            U.compareAndSwapInt(w, QLOCK, 1, 0);  //當(dāng)前任務(wù)不在top位面睛,解鎖返回false
        }
    }
    return false;
}

tryExternalUnpush的作用就是判斷當(dāng)前任務(wù)是否在top位絮蒿,如果是則彈出任務(wù),然后在externalAwaitDone中調(diào)用doExec()執(zhí)行任務(wù)侮穿。


4.2 ForkJoinPool.awaitJoin()

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask<?> prevJoin = w.currentJoin;  //獲取給定Worker的join任務(wù)
        U.putOrderedObject(w, QCURRENTJOIN, task);  //把currentJoin替換為給定任務(wù)
        //判斷是否為CountedCompleter類型的任務(wù)
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>) task : null;
        for (; ; ) {
            if ((s = task.status) < 0)  //已經(jīng)完成|取消|異常 跳出循環(huán)
                break;

            if (cc != null)//CountedCompleter任務(wù)由helpComplete來(lái)完成join
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))  //嘗試執(zhí)行
                helpStealer(w, task);  //隊(duì)列為空或執(zhí)行失敗歌径,任務(wù)可能被偷,幫助偷取者執(zhí)行該任務(wù)

            if ((s = task.status) < 0) //已經(jīng)完成|取消|異常亲茅,跳出循環(huán)
                break;
            //計(jì)算任務(wù)等待時(shí)間
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;

            if (tryCompensate(w)) {//執(zhí)行補(bǔ)償操作
                task.internalWait(ms);//補(bǔ)償執(zhí)行成功,任務(wù)等待指定時(shí)間
                U.getAndAddLong(this, CTL, AC_UNIT);//更新活躍線程數(shù)
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);//循環(huán)結(jié)束,替換為原來(lái)的join任務(wù)
    }
    return s;
}

說(shuō)明: 如果當(dāng)前 join 任務(wù)不在Worker等待隊(duì)列的top位克锣,或者任務(wù)執(zhí)行失敗茵肃,調(diào)用此方法來(lái)幫助執(zhí)行或阻塞當(dāng)前 join 的任務(wù)。函數(shù)執(zhí)行流程如下:

  • 由于每次調(diào)用awaitJoin都會(huì)優(yōu)先執(zhí)行當(dāng)前join的任務(wù)袭祟,所以首先會(huì)更新currentJoin為當(dāng)前join任務(wù)验残;
  • 進(jìn)入自旋:
  1. 首先檢查任務(wù)是否已經(jīng)完成(通過(guò)task.status < 0判斷),如果給定任務(wù)執(zhí)行完畢|取消|異常 則跳出循環(huán)返回執(zhí)行狀態(tài)s巾乳;
  2. 如果是 CountedCompleter 任務(wù)類型您没,調(diào)用helpComplete方法來(lái)完成join操作(后面筆者會(huì)開(kāi)新篇來(lái)專門講解CountedCompleter,本篇暫時(shí)不做詳細(xì)解析)胆绊;
  3. 非 CountedCompleter 任務(wù)類型調(diào)用WorkQueue.tryRemoveAndExec嘗試執(zhí)行任務(wù)氨鹏;
  4. 如果給定 WorkQueue 的等待隊(duì)列為空或任務(wù)執(zhí)行失敗,說(shuō)明任務(wù)可能被偷压状,調(diào)用helpStealer幫助偷取者執(zhí)行任務(wù)(也就是說(shuō)仆抵,偷取者幫我執(zhí)行任務(wù),我去幫偷取者執(zhí)行它的任務(wù))种冬;
  5. 再次判斷任務(wù)是否執(zhí)行完畢(task.status < 0)镣丑,如果任務(wù)執(zhí)行失敗,計(jì)算一個(gè)等待時(shí)間準(zhǔn)備進(jìn)行補(bǔ)償操作娱两;
  6. 調(diào)用tryCompensate方法為給定 WorkQueue 嘗試執(zhí)行補(bǔ)償操作莺匠。在執(zhí)行補(bǔ)償期間,如果發(fā)現(xiàn) 資源爭(zhēng)用|池處于unstable狀態(tài)|當(dāng)前Worker已終止十兢,則調(diào)用ForkJoinTask.internalWait()方法等待指定的時(shí)間趣竣,任務(wù)喚醒之后繼續(xù)自旋,ForkJoinTask.internalWait()源碼如下:
final void internalWait(long timeout) {
    int s;
    if ((s = status) >= 0 && // force completer to issue notify
        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//更新任務(wù)狀態(tài)為SIGNAL(等待喚醒)
        synchronized (this) {
            if (status >= 0)
                try { wait(timeout); } catch (InterruptedException ie) { }
            else
                notifyAll();
        }
    }
}

awaitJoin中纪挎,我們總共調(diào)用了三個(gè)比較復(fù)雜的方法:tryRemoveAndExec期贫、helpStealer和tryCompensate,下面我們依次講解异袄。

4.2.1 WorkQueue.tryRemoveAndExec(ForkJoinTask<?> task)

final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a;
    int m, s, b, n;
    if ((a = array) != null && (m = a.length - 1) >= 0 &&
            task != null) {
        while ((n = (s = top) - (b = base)) > 0) {
            //從top往下自旋查找
            for (ForkJoinTask<?> t; ; ) {      // traverse from s to b
                long j = ((--s & m) << ASHIFT) + ABASE;//計(jì)算任務(wù)索引
                if ((t = (ForkJoinTask<?>) U.getObject(a, j)) == null) //獲取索引到的任務(wù)
                    return s + 1 == top;     // shorter than expected
                else if (t == task) { //給定任務(wù)為索引任務(wù)
                    boolean removed = false;
                    if (s + 1 == top) {      // pop
                        if (U.compareAndSwapObject(a, j, task, null)) { //彈出任務(wù)
                            U.putOrderedInt(this, QTOP, s); //更新top
                            removed = true;
                        }
                    } else if (base == b)      // replace with proxy
                        removed = U.compareAndSwapObject(
                                a, j, task, new EmptyTask()); //join任務(wù)已經(jīng)被移除通砍,替換為一個(gè)占位任務(wù)
                    if (removed)
                        task.doExec(); //執(zhí)行
                    break;
                } else if (t.status < 0 && s + 1 == top) { //給定任務(wù)不是top任務(wù)
                    if (U.compareAndSwapObject(a, j, t, null)) //彈出任務(wù)
                        U.putOrderedInt(this, QTOP, s);//更新top
                    break;                  // was cancelled
                }
                if (--n == 0) //遍歷結(jié)束
                    return false;
            }
            if (task.status < 0) //任務(wù)執(zhí)行完畢
                return false;
        }
    }
    return true;
}

說(shuō)明:top位開(kāi)始自旋向下找到給定任務(wù),如果找到把它從當(dāng)前 Worker 的任務(wù)隊(duì)列中移除并執(zhí)行它烤蜕。注意返回的參數(shù):如果任務(wù)隊(duì)列為空或者任務(wù)執(zhí)行完畢返回true封孙;任務(wù)執(zhí)行完畢返回false

4.2.2 ForkJoinPool.helpStealer(WorkQueue w, ForkJoinTask<?> task)

private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
    WorkQueue[] ws = workQueues;
    int oldSum = 0, checkSum, m;
    if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
            task != null) {
        do {                                       // restart point
            checkSum = 0;                          // for stability check
            ForkJoinTask<?> subtask;
            WorkQueue j = w, v;                    // v is subtask stealer
            descent:
            for (subtask = task; subtask.status >= 0; ) {
                //1. 找到給定WorkQueue的偷取者v
                for (int h = j.hint | 1, k = 0, i; ; k += 2) {//跳兩個(gè)索引讽营,因?yàn)閃orker在奇數(shù)索引位
                    if (k > m)                     // can't find stealer
                        break descent;
                    if ((v = ws[i = (h + k) & m]) != null) {
                        if (v.currentSteal == subtask) {//定位到偷取者
                            j.hint = i;//更新stealer索引
                            break;
                        }
                        checkSum += v.base;
                    }
                }
                //2. 幫助偷取者v執(zhí)行任務(wù)
                for (; ; ) {                         // help v or descend
                    ForkJoinTask<?>[] a;            //偷取者內(nèi)部的任務(wù)
                    int b;
                    checkSum += (b = v.base);
                    ForkJoinTask<?> next = v.currentJoin;//獲取偷取者的join任務(wù)
                    if (subtask.status < 0 || j.currentJoin != subtask ||
                            v.currentSteal != subtask) // stale
                        break descent; // stale虎忌,跳出descent循環(huán)重來(lái)
                    if (b - v.top >= 0 || (a = v.array) == null) {
                        if ((subtask = next) == null)   //偷取者的join任務(wù)為null,跳出descent循環(huán)
                            break descent;
                        j = v;
                        break; //偷取者內(nèi)部任務(wù)為空橱鹏,可能任務(wù)也被偷走了膜蠢;跳出本次循環(huán)堪藐,查找偷取者的偷取者
                    }
                    int i = (((a.length - 1) & b) << ASHIFT) + ABASE;//獲取base偏移地址
                    ForkJoinTask<?> t = ((ForkJoinTask<?>)
                            U.getObjectVolatile(a, i));//獲取偷取者的base任務(wù)
                    if (v.base == b) {
                        if (t == null)             // stale
                            break descent; // stale,跳出descent循環(huán)重來(lái)
                        if (U.compareAndSwapObject(a, i, t, null)) {//彈出任務(wù)
                            v.base = b + 1;         //更新偷取者的base位
                            ForkJoinTask<?> ps = w.currentSteal;//獲取調(diào)用者偷來(lái)的任務(wù)
                            int top = w.top;
                            //首先更新給定workQueue的currentSteal為偷取者的base任務(wù)挑围,然后執(zhí)行該任務(wù)
                            //然后通過(guò)檢查top來(lái)判斷給定workQueue是否有自己的任務(wù)礁竞,如果有,
                            // 則依次彈出任務(wù)(LIFO)->更新currentSteal->執(zhí)行該任務(wù)(注意這里是自己偷自己的任務(wù)執(zhí)行)
                            do {
                                U.putOrderedObject(w, QCURRENTSTEAL, t);
                                t.doExec();        // clear local tasks too
                            } while (task.status >= 0 &&
                                    w.top != top && //內(nèi)部有自己的任務(wù)杉辙,依次彈出執(zhí)行
                                    (t = w.pop()) != null);
                            U.putOrderedObject(w, QCURRENTSTEAL, ps);//還原給定workQueue的currentSteal
                            if (w.base != w.top)//給定workQueue有自己的任務(wù)了模捂,幫助結(jié)束,返回
                                return;            // can't further help
                        }
                    }
                }
            }
        } while (task.status >= 0 && oldSum != (oldSum = checkSum));
    }
}

說(shuō)明: 如果隊(duì)列為空或任務(wù)執(zhí)行失敗蜘矢,說(shuō)明任務(wù)可能被偷狂男,調(diào)用此方法來(lái)幫助偷取者執(zhí)行任務(wù)∑犯梗基本思想是:偷取者幫助我執(zhí)行任務(wù)岖食,我去幫助偷取者執(zhí)行它的任務(wù)。
函數(shù)執(zhí)行流程如下:

  1. 循環(huán)定位偷取者珍昨,由于Worker是在奇數(shù)索引位县耽,所以每次會(huì)跳兩個(gè)索引位。定位到偷取者之后镣典,更新調(diào)用者 WorkQueue 的hint為偷取者的索引兔毙,方便下次定位;
  2. 定位到偷取者后兄春,開(kāi)始幫助偷取者執(zhí)行任務(wù)澎剥。從偷取者的base索引開(kāi)始,每次偷取一個(gè)任務(wù)執(zhí)行赶舆。在幫助偷取者執(zhí)行任務(wù)后哑姚,如果調(diào)用者發(fā)現(xiàn)本身已經(jīng)有任務(wù)(w.top != top),則依次彈出自己的任務(wù)(LIFO順序)并執(zhí)行(也就是說(shuō)自己偷自己的任務(wù)執(zhí)行)芜茵。

4.2.3 ForkJoinPool.tryCompensate(WorkQueue w)

//執(zhí)行補(bǔ)償操作:嘗試縮減活動(dòng)線程量叙量,可能釋放或創(chuàng)建一個(gè)補(bǔ)償線程來(lái)準(zhǔn)備阻塞
private boolean tryCompensate(WorkQueue w) {
    boolean canBlock;
    WorkQueue[] ws;
    long c;
    int m, pc, sp;
    if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
        canBlock = false; //調(diào)用者已終止
    else if ((sp = (int) (c = ctl)) != 0)      // release idle worker
        canBlock = tryRelease(c, ws[sp & m], 0L);//喚醒等待的工作線程
    else {//沒(méi)有空閑線程
        int ac = (int) (c >> AC_SHIFT) + pc; //活躍線程數(shù)
        int tc = (short) (c >> TC_SHIFT) + pc;//總線程數(shù)
        int nbusy = 0;                        // validate saturation
        for (int i = 0; i <= m; ++i) {        // two passes of odd indices
            WorkQueue v;
            if ((v = ws[((i << 1) | 1) & m]) != null) {//取奇數(shù)索引位
                if ((v.scanState & SCANNING) != 0)//沒(méi)有正在運(yùn)行任務(wù),跳出
                    break;
                ++nbusy;//正在運(yùn)行任務(wù)九串,添加標(biāo)記
            }
        }
        if (nbusy != (tc << 1) || ctl != c)
            canBlock = false;                 // unstable or stale
        else if (tc >= pc && ac > 1 && w.isEmpty()) {//總線程數(shù)大于并行度 && 活動(dòng)線程數(shù)大于1 && 調(diào)用者任務(wù)隊(duì)列為空绞佩,不需要補(bǔ)償
            long nc = ((AC_MASK & (c - AC_UNIT)) |
                    (~AC_MASK & c));       // uncompensated
            canBlock = U.compareAndSwapLong(this, CTL, c, nc);//更新活躍線程數(shù)
        } else if (tc >= MAX_CAP ||
                (this == common && tc >= pc + commonMaxSpares))//超出最大線程數(shù)
            throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
        else {                                // similar to tryAddWorker
            boolean add = false;
            int rs;      // CAS within lock
            long nc = ((AC_MASK & c) |
                    (TC_MASK & (c + TC_UNIT)));//計(jì)算總線程數(shù)
            if (((rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);//更新總線程數(shù)
            unlockRunState(rs, rs & ~RSLOCK);
            //運(yùn)行到這里說(shuō)明活躍工作線程數(shù)不足,需要?jiǎng)?chuàng)建一個(gè)新的工作線程來(lái)補(bǔ)償
            canBlock = add && createWorker(); // throws on exception
        }
    }
    return canBlock;
}

說(shuō)明: 具體的執(zhí)行看源碼及注釋猪钮,這里我們簡(jiǎn)單總結(jié)一下需要和不需要補(bǔ)償?shù)膸追N情況:

  • 需要補(bǔ)償:
    • 調(diào)用者隊(duì)列不為空品山,并且有空閑工作線程,這種情況會(huì)喚醒空閑線程(調(diào)用tryRelease方法)
    • 池尚未停止烤低,活躍線程數(shù)不足肘交,這時(shí)會(huì)新建一個(gè)工作線程(調(diào)用createWorker方法)
  • 不需要補(bǔ)償:
    • 調(diào)用者已終止或池處于不穩(wěn)定狀態(tài)
    • 總線程數(shù)大于并行度 && 活動(dòng)線程數(shù)大于1 && 調(diào)用者任務(wù)隊(duì)列為空

總結(jié)

ForkJoinPool 內(nèi)部的代碼實(shí)現(xiàn)非常復(fù)雜,本篇文章筆者前前后后寫了近兩個(gè)月扑馁,有想挑戰(zhàn)一下自己的同學(xué)可以通篇仔細(xì)研究一下涯呻,如發(fā)現(xiàn)文中有錯(cuò)誤的地方凉驻,歡迎批評(píng)指正。一般來(lái)說(shuō)魄懂,我們只需要理解它的內(nèi)部思想即可沿侈。
本章重點(diǎn):

  • Fork/Join 任務(wù)運(yùn)行機(jī)制
  • ForkJoinPool 的 work-stealing 實(shí)現(xiàn)方式
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末闯第,一起剝皮案震驚了整個(gè)濱河市市栗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌咳短,老刑警劉巖填帽,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異咙好,居然都是意外死亡篡腌,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門勾效,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)嘹悼,“玉大人,你說(shuō)我怎么就攤上這事层宫⊙罨铮” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵萌腿,是天一觀的道長(zhǎng)限匣。 經(jīng)常有香客問(wèn)我,道長(zhǎng)毁菱,這世上最難降的妖魔是什么米死? 我笑而不...
    開(kāi)封第一講書人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮贮庞,結(jié)果婚禮上峦筒,老公的妹妹穿的比我還像新娘。我一直安慰自己窗慎,他們只是感情好物喷,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著捉邢,像睡著了一般脯丝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上伏伐,一...
    開(kāi)封第一講書人閱讀 51,198評(píng)論 1 299
  • 那天宠进,我揣著相機(jī)與錄音,去河邊找鬼藐翎。 笑死材蹬,一個(gè)胖子當(dāng)著我的面吹牛实幕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播堤器,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼昆庇,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了闸溃?” 一聲冷哼從身側(cè)響起整吆,我...
    開(kāi)封第一講書人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎辉川,沒(méi)想到半個(gè)月后表蝙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡乓旗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年府蛇,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片屿愚。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡汇跨,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出妆距,到底是詐尸還是另有隱情穷遂,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布毅厚,位于F島的核電站塞颁,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏吸耿。R本人自食惡果不足惜祠锣,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望咽安。 院中可真熱鬧伴网,春花似錦、人聲如沸妆棒。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)糕珊。三九已至动分,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間红选,已是汗流浹背澜公。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留喇肋,地道東北人坟乾。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓迹辐,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親甚侣。 傳聞我的和親對(duì)象是個(gè)殘疾皇子明吩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容