[懷舊并發(fā)11]分析jdk-1.8-ForkJoinPool實(shí)現(xiàn)原理(下)

Java并發(fā)編程源碼分析系列:

上篇介紹了ForkJoinPool的基本結(jié)構(gòu)和參數(shù),本篇進(jìn)入代碼細(xì)節(jié),一窺ForkJoinPool的實(shí)現(xiàn)原理颊亮。

整個(gè)流程和重要方法歸納如下:

任務(wù)提交

  • 提交任務(wù)入口:submit,execute,invoke
  • 完整版提交任務(wù):externalSubmit(包括初始化)
  • 簡(jiǎn)單版提交任務(wù):externalPush

worker管理

  • 激活或創(chuàng)建:signalWork
  • 創(chuàng)建:tryAddWorker,createWorker
  • 注冊(cè)、撤銷注冊(cè):registerWorker,deregisterWorker

worker執(zhí)行(runWorker三部曲)

  • 獲日偷伞:scan
  • 執(zhí)行:runTask
  • 等待:awaitWork

Fork

  • 等同于提交任務(wù)

Join(doJoin)

  • 當(dāng)前不是worker:externalAwaitDone
  • 當(dāng)前是worker:awaitJoin

awaitJoin等待兩種策略

  • Helping:tryRemoveAndExec、helpStealer
  • Compensating:tryCompensate

等待所有任務(wù)完成

  • 靜止:awaitQuiescence
  • 終止:awaitTermination

關(guān)閉

  • shutdown,shutdownNow
  • tryTerminate

異常處理

提交第一個(gè)task

提交任務(wù)默認(rèn)使用來(lái)自于接口的submit饲鄙,除此之外凄诞,F(xiàn)orkJoinPool還提供execute和invoke:

  • submit:提交任務(wù)并返回任務(wù)
  • execute:只提交任務(wù)
  • invoke:提交并返回任務(wù)結(jié)果(return task.join())

它們?nèi)齻€(gè)內(nèi)部實(shí)現(xiàn)是一樣的,只是返回的東西不同忍级,我們來(lái)看submit的實(shí)現(xiàn)就夠:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}

除了使用ForkJoinTask外帆谍,還支持Runnable和Callable,內(nèi)部使用Adapter最終轉(zhuǎn)為ForkJoinTask轴咱。submit很簡(jiǎn)單地調(diào)用externalPush汛蝙,這是個(gè)簡(jiǎn)化版的任務(wù)入隊(duì)方法,調(diào)用不成功時(shí)需要調(diào)用完整版的externalSubmit朴肺。

我們先來(lái)看externalSubmit窖剑,它處理非正常情況和進(jìn)行初始化。ForkJoinPool構(gòu)造函數(shù)只初始化一部分參數(shù)戈稿,包括WorkQueue[]等留到在externalSubmit初始化西土。

private void externalSubmit(ForkJoinTask<?> task) {
    int r;                                    // initialize caller's probe
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    for (;;) {
        WorkQueue[] ws; WorkQueue q; int rs, m, k;
        boolean move = false;
        //1
        if ((rs = runState) < 0) {
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        //2
        else if ((rs & STARTED) == 0 ||     // initialize
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();
            try {
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                           new AtomicLong());
                    // create workQueues array with size a power of two
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        //3
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask<?>[] a = q.array;
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);
                        U.putOrderedInt(q, QTOP, s + 1);
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                if (submitted) {
                    signalWork(ws, q);
                    return;
                }
            }
            move = true;                   // move on failure
        }
       //4
        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
            q = new WorkQueue(this, null);
            q.hint = r;
            q.config = k | SHARED_QUEUE;
            q.scanState = INACTIVE;
            rs = lockRunState();           // publish index
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                ws[k] = q;                 // else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        }
        else
            move = true;                   // move if busy
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

mark1檢查運(yùn)行狀態(tài)是否已經(jīng)進(jìn)入SHUTDOWN,拋出拒收的異常鞍盗。對(duì)于ForkJoinPool的關(guān)閉需了,見(jiàn)后文“關(guān)閉ForkJoinPool”一節(jié)。

第一次執(zhí)行externalSubmit時(shí)般甲,運(yùn)行狀態(tài)還沒(méi)有STARTED肋乍,執(zhí)行mark2進(jìn)行初始化操作:

  • 按2的冪設(shè)置WorkQueue[]的長(zhǎng)度
  • 設(shè)置原子對(duì)象stealCounter
  • 運(yùn)行狀態(tài)進(jìn)入STARTED

第二次循環(huán)中,執(zhí)行mark4欣除,創(chuàng)建第一個(gè)WorkQueue住拭。

第三次循環(huán)中挪略,執(zhí)行mark3历帚,會(huì)找到剛才創(chuàng)建的WorkQueue,從隊(duì)列的top端加入任務(wù)杠娱,調(diào)用后面要講的signalWork激活或者創(chuàng)建worker挽牢。

WorkQueue在WorkQueue[]的下標(biāo),取的是k = r & m & SQMASK摊求。r是線程的probe禽拔,來(lái)自隨機(jī)數(shù)ThreadLocalRandom;m是WorkQueue[]的長(zhǎng)度減一;SQMASK是固定值0x007e睹栖,轉(zhuǎn)為二進(jìn)制是1111110硫惕,末尾是0,在&操作后野来,得出的k必定是偶數(shù)恼除。所以創(chuàng)建的第一個(gè)WorkQueue沒(méi)有對(duì)應(yīng)worker,保存的任務(wù)是submission曼氛,scanState默認(rèn)是INACTIVE豁辉。

externalSubmit是長(zhǎng)了點(diǎn),不過(guò)邏輯清晰舀患,不難理解徽级。除了初始化,大部分時(shí)間其實(shí)不需要externalSubmit聊浅,使用簡(jiǎn)單版的externalPush即可餐抢。

final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}

雖然進(jìn)入externalPush的if有一大堆條件,不過(guò)有了前面的分析低匙,我們很容易看懂:

  • 線程經(jīng)過(guò)ThreadLocalRandom初始化弹澎;
  • 運(yùn)行狀態(tài)正常;
  • WorkQueue[]非空努咐;
  • 隨機(jī)數(shù)取到的WorkQueue非空苦蒿,并鎖定成功。

滿足上面的條件后渗稍,任務(wù)從top端入隊(duì)佩迟。如果隊(duì)列里只有一個(gè)任務(wù),調(diào)用signalWork竿屹”ㄇ浚基本實(shí)現(xiàn)和externalSubmit的mark3差不多。

worker管理

worker的管理涉及創(chuàng)建拱燃、激活秉溉、注冊(cè)、撤銷注冊(cè)碗誉。

接上一節(jié)創(chuàng)建第一個(gè)WorkQueue并加入第一個(gè)任務(wù)召嘶,調(diào)用了signalWork,入?yún)⑹荳orkQueue[]和當(dāng)前操作的WorkQueue哮缺。

final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    while ((c = ctl) < 0L) {                       // too few active
        //1
        if ((sp = (int)c) == 0) {                  // no idle workers
            if ((c & ADD_WORKER) != 0L)            // too few workers
                tryAddWorker(c);
            break;
        }
        //2
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p);
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

首先是進(jìn)入循環(huán)的條件弄跌,判斷了ctl的正負(fù),我們知道ctl的第一個(gè)16bit表示AC尝苇,為負(fù)時(shí)表示活動(dòng)的worker還未達(dá)到預(yù)定的Parallelism铛只,需要新增或者激活埠胖。mark1通過(guò)sp判斷現(xiàn)在沒(méi)有空閑worker,需要執(zhí)行增加淳玩,調(diào)用tryAddWorker直撤。

有空閑worker的情況進(jìn)入mark2,sp取棧頂WorkQueue的下標(biāo)蜕着,具體解掛worker的過(guò)程和tryRelease幾乎一樣谊惭,這里合起來(lái)介紹。

private boolean tryRelease(long c, WorkQueue v, long inc) {
    int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
    if (v != null && v.scanState == sp) {          // v is at top of stack
        long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
        if (U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;
            if ((p = v.parker) != null)
                U.unpark(p);
            return true;
        }
    }
    return false;
}

在sp上侮东,將狀態(tài)從inactive改為active圈盔,累加版本號(hào),解掛線程悄雅,通過(guò)stackPred取得前一個(gè)WorkQueue的index驱敲,設(shè)回sp里。


private void tryAddWorker(long c) {
    boolean add = false;
    do {
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        if (ctl == c) {
            int rs, stop;                 // check if terminating
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);
            if (stop != 0)
                break;
            if (add) {
                createWorker();
                break;
            }
        }
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

增加worker宽闲,需要將AC和TC都加1众眨,成功后調(diào)用createWorker。

private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    deregisterWorker(wt, ex);
    return false;
}

createWorker的代碼很簡(jiǎn)單容诬,通過(guò)線程工廠創(chuàng)建worker的實(shí)例并啟動(dòng)娩梨。如果沒(méi)有異常,直接返回就行览徒;否則狈定,需要逆操作撤銷worker的注冊(cè)。worker什么時(shí)候注冊(cè)了习蓬?看ForkJoinWorkerThread的構(gòu)造函數(shù)纽什,里面調(diào)用ForkJoinPool.registerWorker。


final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    wt.setDaemon(true);                           // configure thread
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);
    WorkQueue w = new WorkQueue(this, wt);
    int i = 0;                                    // assign a pool index
    int mode = config & MODE_MASK;
    int rs = lockRunState();
    try {
        WorkQueue[] ws; int n;                    // skip if no array
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
           //1
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            int m = n - 1;
            i = ((s << 1) | 1) & m;               // odd-numbered indices
            if (ws[i] != null) {                  // collision
                int probes = 0;                   // step by approx half n
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                while (ws[i = (i + step) & m] != null) {
                    if (++probes >= n) {
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            //2
            w.hint = s;                           // use as random seed
            w.config = i | mode;
            w.scanState = i;                      // publication fence
            ws[i] = w;
        }
    } finally {
        unlockRunState(rs, rs & ~RSLOCK);
    }
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

一開(kāi)始躲叼,線程就被設(shè)置為守護(hù)線程芦缰。重溫知識(shí)點(diǎn),當(dāng)只剩下守護(hù)線程時(shí)枫慷,JVM就會(huì)退出让蕾,垃圾回收線程也是一個(gè)典型的守護(hù)線程。

mark1或听,前文講過(guò)有對(duì)應(yīng)worker的WorkQueue只能出現(xiàn)在WorkQueue[]奇數(shù)index探孝,代碼里取初始index用的是:

i = ((s << 1) | 1) & m; 

seed左移再“或”1,是奇數(shù)神帅。m是WorkQueue[]長(zhǎng)度減1再姑,也是奇數(shù)。兩者再“與”找御,保證取得的i是奇數(shù)元镀。若該位置已經(jīng)存在其他WorkQueue,需要重新計(jì)算下一個(gè)位置霎桅,有需要還要擴(kuò)容WorkQueue[]栖疑。

mark2設(shè)置新創(chuàng)建WorkQueue的scanState為index,表示了兩種意思:

  • 非負(fù)表示有對(duì)應(yīng)的worker滔驶;
  • 默認(rèn)scanState使用SCANNING遇革。

就此描述清楚worker的創(chuàng)建、WorkQueue的創(chuàng)建和加入WorkQueue[]揭糕。


創(chuàng)建worker時(shí)會(huì)默認(rèn)注冊(cè)worker萝快,當(dāng)創(chuàng)建出現(xiàn)異常時(shí),需要執(zhí)行撤銷注冊(cè)著角。

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    WorkQueue w = null;
    //1
    if (wt != null && (w = wt.workQueue) != null) {
        WorkQueue[] ws;                           // remove index from array
        int idx = w.config & SMASK;
        int rs = lockRunState();
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            ws[idx] = null;
        unlockRunState(rs, rs & ~RSLOCK);
    }
    //2
    long c;                                       // decrement counts
    do {} while (!U.compareAndSwapLong
                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                       (TC_MASK & (c - TC_UNIT)) |
                                       (SP_MASK & c))));
    //3
    if (w != null) {
        w.qlock = -1;                             // ensure set
        w.transferStealCount(this);
        w.cancelAll();                            // cancel remaining tasks
    }
    //4
    for (;;) {                                    // possibly replace
        WorkQueue[] ws; int m, sp;
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)              // already terminating
            break;
        if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
        }
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            tryAddWorker(c);                      // create replacement
            break;
        }
        else                                      // don't need replacement
            break;
    }
    //5
    if (ex == null)                               // help clean on way out
        ForkJoinTask.helpExpungeStaleExceptions();
    else                                          // rethrow
        ForkJoinTask.rethrow(ex);
}

撤銷注冊(cè)過(guò)程按部就班的揪漩,肯定想到包括處理WorkQueue的善后和修改ctl:

  1. 將歸屬的WorkQueue從WorkQueue[]中置空,具體下標(biāo)從WorkQueue.config中獲壤艨凇奄容;
  2. AC和TC分別減一;
  3. WorkQueue的qlock置負(fù)产徊,表示要終止了昂勒,并且取消隊(duì)里所有任務(wù);
  4. 檢查運(yùn)行狀態(tài)舟铜,嘗試激活或者創(chuàng)建worker替代戈盈;
  5. 異常處理。

worker執(zhí)行

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

ForkJoinWorkerThread啟動(dòng)后調(diào)用了ForkJoinPool的runWorker:

final void runWorker(WorkQueue w) {
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask<?> t;;) {
        if ((t = scan(w, r)) != null)
            w.runTask(t);
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}

worker執(zhí)行流程就是三部曲:

  • scan:嘗試獲取一個(gè)任務(wù)谆刨;
  • runTask:執(zhí)行取得的任務(wù)奕谭;
  • awaitWork:沒(méi)有任務(wù)進(jìn)入等待。

如果awaitWork返回false痴荐,等不到任務(wù)血柳,跳出runWorker的循環(huán),回到run中執(zhí)行finally生兆,最后調(diào)用deregisterWorker撤銷注冊(cè)难捌。


首先是scan,掃描WorkQueue[]鸦难,嘗試steal一個(gè)任務(wù)根吁。

private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws; int m;
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
        int ss = w.scanState;                     // initially non-negative
        //1
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
            WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
            int b, n; long c;
            if ((q = ws[k]) != null) {
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {      // non-empty
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask<?>)
                              U.getObjectVolatile(a, i))) != null &&
                        q.base == b) {
                        //2
                        if (ss >= 0) {
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                q.base = b + 1;
                                if (n < -1)       // signal others
                                    signalWork(ws, q);
                                return t;
                            }
                        }
                        else if (oldSum == 0 &&   // try to activate
                                 w.scanState < 0)
                            tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                    }
                    if (ss < 0)                   // refresh
                        ss = w.scanState;
                    r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                checkSum += b;
            }
            //3
            if ((k = (k + 1) & m) == origin) {    // continue until stable
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0)    // already inactive
                        break;
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                               (UC_MASK & ((c = ctl) - AC_UNIT)));
                    w.stackPred = (int)c;         // hold prev stack top
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;
            }
        }
    }
    return null;
}

mark1進(jìn)入循環(huán),通過(guò)隨機(jī)數(shù)從WorkQueue[]獲取WorkQueue合蔽,并嘗試從WorkQueue的base端steal任務(wù)击敌。到達(dá)mark2表示成功定位一個(gè)任務(wù),這時(shí)要看歸屬自己WorkQueue的scanState:

  • active:從WorkQueue的base端出隊(duì)并返回拴事,常規(guī)地調(diào)用signalWork沃斤,結(jié)束圣蝎;
  • inactive:這個(gè)狀態(tài)下,調(diào)用tryRelease衡瓶,如果WorkQueue正好在棧頂上徘公,激活它。

mark3處哮针,每次循環(huán)會(huì)校驗(yàn)新取的index是不是等于第一次取的index关面。如果相等,說(shuō)明遍歷了一圈還沒(méi)有steal到任務(wù)十厢,當(dāng)前worker是過(guò)剩的等太,執(zhí)行如下操作:

  • 當(dāng)前WorkQueue的scanState修改為inactive;
  • 當(dāng)前WorkQueue掛到棧頂蛮放,AC減一缩抡。

final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentSteal = task).doExec();
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();
    }
}

steal到一個(gè)任務(wù)后,就可以開(kāi)始執(zhí)行:

  • 將WorkQueue的scanState從SCANNING轉(zhuǎn)為RUNNING筛武;
  • 記錄當(dāng)前任務(wù)是steal來(lái)的缝其,保存在currentSteal,并執(zhí)行doExec徘六;
  • 執(zhí)行自己WorkQueue里的任務(wù)execLocalTasks(根據(jù)mode控制取任務(wù)是LIFO還是FIFO内边,調(diào)用doExec執(zhí)行,直到WorkQueue為空)待锈;
  • 累加steal數(shù)量漠其;
  • 能執(zhí)行的都執(zhí)行了,scanState轉(zhuǎn)回SCANNING竿音。
final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

private int setCompletion(int completion) {
   for (int s;;) {
       if ((s = status) < 0)
           return s;
       if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
           if ((s >>> 16) != 0)
               synchronized (this) { notifyAll(); }
           return completion;
       }
   }
}

doExec方法和屎,里面最終調(diào)用ForkJoinTask的核心方法exec,前文介紹過(guò)春瞬,RecursiveAction和RecursiveTask它們override的exce調(diào)用了compute柴信。這樣子,源碼和使用的方法關(guān)聯(lián)起來(lái)了宽气。

當(dāng)任務(wù)執(zhí)行完成随常,調(diào)用setCompletion,將任務(wù)狀態(tài)改為NORMAL萄涯。注意绪氛,使用CAS修改狀態(tài)時(shí),目標(biāo)狀態(tài)使用s|NORMAL涝影。

  • 原狀態(tài)是NORMAL枣察,無(wú)符號(hào)右移為0;
  • 原狀態(tài)是SIGNAL,無(wú)符號(hào)右移不為0序目。

如果任務(wù)原狀態(tài)是SIGNAL臂痕,表示有線程由于join而進(jìn)入了wait,等著任務(wù)完成宛琅,這時(shí)需要額外操作notify觸發(fā)喚醒刻蟹。


private boolean awaitWork(WorkQueue w, int r) {
    if (w == null || w.qlock < 0)                 // w is terminating
        return false;
    for (int pred = w.stackPred, spins = SPINS, ss;;) {
        //1
        if ((ss = w.scanState) >= 0)
            break;
       //2
        else if (spins > 0) {
            r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
            if (r >= 0 && --spins == 0) {         // randomize spins
                WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
                if (pred != 0 && (ws = workQueues) != null &&
                    (j = pred & SMASK) < ws.length &&
                    (v = ws[j]) != null &&        // see if pred parking
                    (v.parker == null || v.scanState >= 0))
                    spins = SPINS;                // continue spinning
            }
        }
        else if (w.qlock < 0)                     // recheck after spins
            return false;
       //3
        else if (!Thread.interrupted()) {
            long c, prevctl, parkTime, deadline;
            int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
           //4
            if ((ac <= 0 && tryTerminate(false, false)) ||
                (runState & STOP) != 0)           // pool terminating
                return false;
            //5
            if (ac <= 0 && ss == (int)c) {        // is last waiter
                prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
                if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
                    return false;                 // else use timed wait
                parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
            }
            else
                prevctl = parkTime = deadline = 0L;
            Thread wt = Thread.currentThread();
            U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
            w.parker = wt;
            if (w.scanState < 0 && ctl == c)      // recheck before park
                U.park(false, parkTime);
            U.putOrderedObject(w, QPARKER, null);
            U.putObject(wt, PARKBLOCKER, null);
            if (w.scanState >= 0)
                break;
            if (parkTime != 0L && ctl == c &&
                deadline - System.nanoTime() <= 0L &&
                U.compareAndSwapLong(this, CTL, c, prevctl))
                return false;                     // shrink pool
        }
    }
    return true;
}

awaitWork里核心是一個(gè)無(wú)限循環(huán)逗旁,我們重點(diǎn)看里面的等待操作和跳出條件嘿辟。

mark1判斷WorkQueue的scanState,非負(fù)表示W(wǎng)orkQueue要不在RUNNING片效,要不在SCANNING红伦,直接跳出。mark2里淀衣,SPINS初始為0昙读,沒(méi)有啟用自旋等待的控制。

重點(diǎn)來(lái)看mark3膨桥,只要沒(méi)有中斷蛮浑,就會(huì)一直循環(huán)執(zhí)行(tryTerminate終止ForkJoinPool時(shí)會(huì)中斷所有worker)。啰嗦一句只嚣,要分清楚return和break的不同含義:

  • break:回到runWorker繼續(xù)執(zhí)行scan沮稚、runTask、awaitWork册舞;
  • return false:worker需要終止了蕴掏。

mark4檢查ForkJoinPool的狀態(tài),如果走向中止那邊调鲸,當(dāng)前worker也就無(wú)必要存在盛杰,return false。

mark5判斷worker的存在是否有必要藐石,如果滿足下面條件:

  • AC為零即供;
  • TC超過(guò)2個(gè);
  • 當(dāng)前WorkQueue在棧頂于微。

說(shuō)明當(dāng)前worker過(guò)剩逗嫡,存在也沒(méi)有任務(wù)執(zhí)行,所以WorkQueue從棧頂釋放角雷,return false終止worker祸穷。

其他情況計(jì)算一個(gè)等待時(shí)間,掛起線程勺三,被喚醒有兩種可能:

  • 外部喚醒:如果scanState非負(fù)雷滚,break出循環(huán),繼續(xù)執(zhí)行scan吗坚;
  • 時(shí)間到達(dá)喚醒:還是老樣子祈远,自己過(guò)剩呆万,return false終止。

Fork

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

fork的代碼很簡(jiǎn)單车份,如果當(dāng)前線程是一個(gè)worker谋减,直接將任務(wù)從top端加入自己的WorkQueue。對(duì)于非worker提交的task扫沼,執(zhí)行externalPush出爹,這個(gè)前面詳細(xì)分析過(guò)了。

Join

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

join的目的是得到任務(wù)的運(yùn)行結(jié)果缎除,核心調(diào)用doJoin严就,根據(jù)任務(wù)狀態(tài)返回結(jié)果,或者拋出異常器罐。要注意的是梢为,任務(wù)在ForkJoinPool中可能處于各種各樣的狀況,有可能剛好要被執(zhí)行啊轰坊,有可能正在隊(duì)列里排隊(duì)啊铸董,有可能已經(jīng)被別人偷走啊。

doJoin的return是花一樣的一串判斷肴沫,先分解出頭兩個(gè)判斷:

  • status為負(fù)表示任務(wù)執(zhí)行已經(jīng)有結(jié)果蜂厅,直接返回盟戏;
  • 區(qū)分當(dāng)前線程是否worker。

先來(lái)說(shuō)當(dāng)前線程不是worker這種情況,調(diào)用externalAwaitDone:

private int externalAwaitDone() {
    int s = ((this instanceof CountedCompleter) ? // try helping
             ForkJoinPool.common.externalHelpComplete(
                 (CountedCompleter<?>)this, 0) :
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
    if (s >= 0 && (s = status) >= 0) {
        boolean interrupted = false;
        do {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0) {
                        try {
                            wait(0L);
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    else
                        notifyAll();
                }
            }
        } while ((s = status) >= 0);
        if (interrupted)
            Thread.currentThread().interrupt();
    }
    return s;
}

先不講CountedCompleter的協(xié)作鹦肿,將任務(wù)狀態(tài)設(shè)置為SIGNAL黑滴,然后是使用wait/notify機(jī)制真友,線程進(jìn)入等待捍岳。既然不是worker,不屬于ForkJoinPool的管理范圍沉衣,你掛起等通知就是了郁副。


如果當(dāng)前線程是worker,那就復(fù)雜多了豌习。

(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L)

首先調(diào)用tryUnpush存谎,如果WorkQueue的top端任務(wù)正好是等待join的任務(wù),毫無(wú)疑問(wèn)肥隆,下個(gè)就是執(zhí)行它既荚,直接doExec;否則調(diào)用ForkJoinPool的awaitJoin栋艳。

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        //1
        ForkJoinTask<?> prevJoin = w.currentJoin;
        U.putOrderedObject(w, QCURRENTJOIN, task);
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>)task : null;
        for (;;) {
            //2
            if ((s = task.status) < 0)
                break;
            //3
            if (cc != null)
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))
                helpStealer(w, task);
            if ((s = task.status) < 0)
                break;
            long ms, ns;
            //4
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;
            if (tryCompensate(w)) {
                task.internalWait(ms);
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
    }
    return s;
}

mark1處恰聘,worker在WorkQueue中標(biāo)記正在等待的任務(wù),記在currentJoin。進(jìn)入循環(huán)晴叨,在mark2處校驗(yàn)任務(wù)的狀態(tài)凿宾,如果已經(jīng)完成,直接跳出兼蕊。

接下來(lái)重點(diǎn)是處理worker的等待初厚,直接閑著太浪費(fèi)了,awaitJoin里會(huì)分別嘗試兩種策略:

  • Helping:嘗試安排別的任務(wù)孙技;
  • Compensating:創(chuàng)建或者激活一個(gè)備用worker产禾,原worker進(jìn)入等待,由備用worker補(bǔ)償工作量绪杏,直到原worker恢復(fù)下愈。

mark3和mark4分別嘗試兩種策略纽绍。

Helping

兩個(gè)help方法意思十分明確蕾久,如果任務(wù)是CountedCompleter,調(diào)用helpComplete拌夏。接下來(lái)看自己的WorkQueue僧著,調(diào)用tryRemoveAndExec檢查隊(duì)列里所有任務(wù),看等待join的任務(wù)在不在里面障簿。如無(wú)所獲盹愚,最后調(diào)用helpStealer,幫助其他worker站故。

final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; int m, s, b, n;
    if ((a = array) != null && (m = a.length - 1) >= 0 &&
        task != null) {
        while ((n = (s = top) - (b = base)) > 0) {
            for (ForkJoinTask<?> t;;) {      // traverse from s to b
                long j = ((--s & m) << ASHIFT) + ABASE;
                if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                    return s + 1 == top;     // shorter than expected
                else if (t == task) {
                    boolean removed = false;
                    if (s + 1 == top) {      // pop
                        if (U.compareAndSwapObject(a, j, task, null)) {
                            U.putOrderedInt(this, QTOP, s);
                            removed = true;
                        }
                    }
                    else if (base == b)      // replace with proxy
                        removed = U.compareAndSwapObject(
                            a, j, task, new EmptyTask());
                    if (removed)
                        task.doExec();
                    break;
                }
                else if (t.status < 0 && s + 1 == top) {
                    if (U.compareAndSwapObject(a, j, t, null))
                        U.putOrderedInt(this, QTOP, s);
                    break;                  // was cancelled
                }
                if (--n == 0)
                    return false;
            }
            if (task.status < 0)
                return false;
        }
    }
    return true;
}

tryRemoveAndExec功能就是遍歷WorkQueue皆怕,任務(wù)在隊(duì)列里的位置可以分兩種情況:

  • 剛好在top端,取出來(lái)直接運(yùn)行西篓;
  • 在隊(duì)列中間愈腾,使用EmptyTask替代原位置,也可以取任務(wù)出來(lái)運(yùn)行岂津。

最后讓tryRemoveAndExec返回false虱黄,不再參與helpStealer。


private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
    WorkQueue[] ws = workQueues;
    int oldSum = 0, checkSum, m;
    if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
        task != null) {
        do {                                       // restart point
            checkSum = 0;                          // for stability check
            ForkJoinTask<?> subtask;
            WorkQueue j = w, v;                    // v is subtask stealer
            descent: for (subtask = task; subtask.status >= 0; ) {
                 //1
                for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                    if (k > m)                     // can't find stealer
                        break descent;
                    if ((v = ws[i = (h + k) & m]) != null) {
                        if (v.currentSteal == subtask) {
                            j.hint = i;
                            break;
                        }
                        checkSum += v.base;
                    }
                }
                //2
                for (;;) {                         // help v or descend
                    ForkJoinTask<?>[] a; int b;
                    checkSum += (b = v.base);
                    ForkJoinTask<?> next = v.currentJoin;
                    //3
                    if (subtask.status < 0 || j.currentJoin != subtask ||
                        v.currentSteal != subtask) // stale
                        break descent;
                    //4
                    if (b - v.top >= 0 || (a = v.array) == null) {
                        if ((subtask = next) == null)
                            break descent;
                        j = v;
                        break;
                    }
                    //5
                    int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                         U.getObjectVolatile(a, i));
                    if (v.base == b) {
                        if (t == null)             // stale
                            break descent;
                        if (U.compareAndSwapObject(a, i, t, null)) {
                            v.base = b + 1;
                            ForkJoinTask<?> ps = w.currentSteal;
                            int top = w.top;
                            do {
                                U.putOrderedObject(w, QCURRENTSTEAL, t);
                                t.doExec();        // clear local tasks too
                            } while (task.status >= 0 &&
                                     w.top != top &&
                                     (t = w.pop()) != null);
                            U.putOrderedObject(w, QCURRENTSTEAL, ps);
                            if (w.base != w.top)
                                return;            // can't further help
                        }
                    }
                }
            }
        } while (task.status >= 0 && oldSum != (oldSum = checkSum));
    }
}

helpStealer體現(xiàn)了互助的原則吮成,你steal了我剛好需要join的任務(wù)橱乱,我不會(huì)閑等著,我也幫你執(zhí)行任務(wù)粱甫∮镜看最外面do-while循環(huán)和標(biāo)記為descent的for循環(huán),它們的條件都是只要join的任務(wù)沒(méi)有執(zhí)行完成茶宵,就一直執(zhí)行幫助危纫。

首先要找到需要幫助的WorkQueue,給個(gè)代號(hào)叫A,依據(jù)是currentSteal正好是等待join的任務(wù)叶摄。在mark1属韧,遍歷WorkQueue[]奇數(shù)下標(biāo)的WorkQueue,檢查currentSteal蛤吓,如果是宵喂,表示它就是我們要找的人。

  • 如果A中有任務(wù)等待執(zhí)行会傲,循環(huán)從base端取任務(wù)執(zhí)行锅棕,代碼是mark5處。注意到判斷w.base != w.top時(shí)需要return淌山,因?yàn)楫?dāng)原WorkQueue有新任務(wù)時(shí)裸燎,不能繼續(xù)幫助;
  • 如果A中沒(méi)有任務(wù)泼疑,難道想幫也幫不了移稳?那就繼續(xù)幫助下家。在mark4中个粱,根據(jù)A的currentJoin,找到下個(gè)WorkQueue翻翩,邏輯依舊是從base端取任務(wù)執(zhí)行絮吵。

各種數(shù)據(jù)在不斷變化中暇昂,mark3會(huì)校驗(yàn)兩個(gè)WorkQueue的currentJoin和currentSteal是不是目標(biāo)任務(wù)名段,不是的直接跳出到descent,重新查找WorkQueue卡啰。

Compensating

回到awaitJoin的mark4押搪,嘗試第二種策略Compensating及穗。在timeout范圍里娃豹,tryCompensate會(huì)不斷調(diào)用民鼓,看能不能執(zhí)行補(bǔ)償饮亏。確定能夠執(zhí)行補(bǔ)償简肴,當(dāng)前任務(wù)狀態(tài)轉(zhuǎn)為SIGNAL赫粥,并進(jìn)入wait瀑粥。

private boolean tryCompensate(WorkQueue w) {
    boolean canBlock;
    WorkQueue[] ws; long c; int m, pc, sp;
    if (w == null || w.qlock < 0 ||           // caller terminating
        (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
        (pc = config & SMASK) == 0)           // parallelism disabled
        canBlock = false;
    //1
    else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
        canBlock = tryRelease(c, ws[sp & m], 0L);
    else {
        //2
        int ac = (int)(c >> AC_SHIFT) + pc;
        int tc = (short)(c >> TC_SHIFT) + pc;
        int nbusy = 0;                        // validate saturation
        for (int i = 0; i <= m; ++i) {        // two passes of odd indices
            WorkQueue v;
            if ((v = ws[((i << 1) | 1) & m]) != null) {
                if ((v.scanState & SCANNING) != 0)
                    break;
                ++nbusy;
            }
        }
        if (nbusy != (tc << 1) || ctl != c)
            canBlock = false;                 // unstable or stale
        //3
        else if (tc >= pc && ac > 1 && w.isEmpty()) {
            long nc = ((AC_MASK & (c - AC_UNIT)) |
                       (~AC_MASK & c));       // uncompensated
            canBlock = U.compareAndSwapLong(this, CTL, c, nc);
        }
        //4
        else if (tc >= MAX_CAP ||
                 (this == common && tc >= pc + commonMaxSpares))
            throw new RejectedExecutionException(
                "Thread limit exceeded replacing blocked worker");
        //5
        else {                                // similar to tryAddWorker
            boolean add = false; int rs;      // CAS within lock
            long nc = ((AC_MASK & c) |
                       (TC_MASK & (c + TC_UNIT)));
            if (((rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);
            canBlock = add && createWorker(); // throws on exception
        }
    }
    return canBlock;
}

mark1如果棧頂有空閑的worker,激活即可弛矛。否則,考慮創(chuàng)建新的worker:

  • mark2檢查RUNNING的worker是否等于TC(上面遍歷奇數(shù)WorkQueue時(shí)循環(huán)了兩次,所以TC需要乘以2),這種情況很明顯不需要?jiǎng)?chuàng)建worker補(bǔ)償闰靴;
  • mark3如果發(fā)現(xiàn)WorkQueue空了系忙,調(diào)整AC的數(shù)量捺弦,減一柒爵;
  • mark4檢查TC是否超過(guò)最大值;(TC最大值不是parallelism哦)
  • mark5具體增加worker的代碼和tryAddWorker類似烈评,不過(guò)這里只有TC加一芽卿,AC不需要變動(dòng),因?yàn)榛顒?dòng)worker數(shù)量在補(bǔ)償下沒(méi)有改變瓶佳。

等待所有任務(wù)完成

向ForkJoinPool提交一堆任務(wù)后精钮,我們會(huì)希望等待所有任務(wù)執(zhí)行完成后蕉堰,繼續(xù)下一步操作导披。ForkJoinPool提供了兩個(gè)阻塞的await方法滓技。

  • awaitQuiescence
  • awaitTermination

前者等待線程池靜止,后者等待線程池終止棚潦,都很好理解令漂。

public boolean isQuiescent() {
    return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}

判斷靜止是通過(guò)判斷AC是否少于等于零,當(dāng)沒(méi)有活動(dòng)worker時(shí)丸边,也就說(shuō)明當(dāng)前所有任務(wù)都執(zhí)行完成叠必。

public boolean awaitQuiescence(long timeout, TimeUnit unit) {
    long nanos = unit.toNanos(timeout);
    ForkJoinWorkerThread wt;
    Thread thread = Thread.currentThread();
    if ((thread instanceof ForkJoinWorkerThread) &&
        (wt = (ForkJoinWorkerThread)thread).pool == this) {
        helpQuiescePool(wt.workQueue);
        return true;
    }
    long startTime = System.nanoTime();
    WorkQueue[] ws;
    int r = 0, m;
    boolean found = true;
    //1
    while (!isQuiescent() && (ws = workQueues) != null &&
           (m = ws.length - 1) >= 0) {
        if (!found) {
            //2
            if ((System.nanoTime() - startTime) > nanos)
                return false;
            Thread.yield(); // cannot block
        }
        //3
        found = false;
        for (int j = (m + 1) << 2; j >= 0; --j) {
            ForkJoinTask<?> t; WorkQueue q; int b, k;
            if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
                (b = q.base) - q.top < 0) {
                found = true;
                if ((t = q.pollAt(b)) != null)
                    t.doExec();
                break;
            }
        }
    }
    return true;
}

awaitQuiescence又是區(qū)分當(dāng)前線程是否是worker,如果是worker妹窖,調(diào)用helpQuiescePool纬朝,接下來(lái)馬上就講。如果不是worker骄呼,有趣的是線程也參與到執(zhí)行當(dāng)中共苛。

非worker線程進(jìn)入mark1的循環(huán),條件是ForkJoinPool還沒(méi)有quiescent蜓萄。mark2在時(shí)間沒(méi)有timeout的情況下隅茎,先讓步,如果能夠得到執(zhí)行權(quán)嫉沽,進(jìn)入mark3尋找有任務(wù)的WorkQueue辟犀,從base端取出任務(wù)執(zhí)行。

不得不說(shuō)绸硕,F(xiàn)orkJoinPool極盡所能利用資源堂竟,加快任務(wù)的執(zhí)行速度。


從helpQuiescePool的方法名也能知道玻佩,在awaitQuiescence時(shí)出嘹,worker當(dāng)仁不讓會(huì)從別的WorkQueue取任務(wù),整體加快執(zhí)行速度咬崔。

final void helpQuiescePool(WorkQueue w) {
    ForkJoinTask<?> ps = w.currentSteal; // save context
    for (boolean active = true;;) {
        long c; WorkQueue q; ForkJoinTask<?> t; int b;
        //1
        w.execLocalTasks();     // run locals before each scan
        //2
        if ((q = findNonEmptyStealQueue()) != null) {
            if (!active) {      // re-establish active count
                active = true;
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
                U.putOrderedObject(w, QCURRENTSTEAL, t);
                t.doExec();
                if (++w.nsteals < 0)
                    w.transferStealCount(this);
            }
        }
        //3
        else if (active) {      // decrement active count without queuing
            long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
            if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
                break;          // bypass decrement-then-increment
            if (U.compareAndSwapLong(this, CTL, c, nc))
                active = false;
        }
        //4
        else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
                 U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
            break;
    }
    U.putOrderedObject(w, QCURRENTSTEAL, ps);
}

第一步mark1疚漆,worker需要先execLocalTasks,將自己WorkQueue里的任務(wù)執(zhí)行完畢。

接著mark2娶聘,findNonEmptyStealQueue查找非空WorkQueue。這里使用了變量active標(biāo)記worker是否活動(dòng)甚脉,以便修改AC丸升。當(dāng)找到非空WorkQueue,worker當(dāng)前是inactive牺氨,重新變?yōu)閍ctive并將AC加一狡耻,并從非空WorkQueue的base端取任務(wù)執(zhí)行。

mark3猴凹,當(dāng)worker是active夷狰,但沒(méi)有非空WorkQueue,將worker變?yōu)閕nactive并將AC減一郊霎。如果變化前AC已經(jīng)為0沼头,表示整個(gè)ForkJoinPool所有任務(wù)都執(zhí)行完成進(jìn)入quiescent。OK這就是我們的目標(biāo)书劝,直接跳出循環(huán)进倍。

mark4,當(dāng)worker是inactive购对,沒(méi)有非空WorkQueue猾昆,AC又等于0,沒(méi)有東西可干骡苞,跳出循環(huán)垂蜗,保持AC至少為1。


public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (this == common) {
        awaitQuiescence(timeout, unit);
        return false;
    }
    long nanos = unit.toNanos(timeout);
    if (isTerminated())
        return true;
    if (nanos <= 0L)
        return false;
    long deadline = System.nanoTime() + nanos;
    synchronized (this) {
        for (;;) {
            if (isTerminated())
                return true;
            if (nanos <= 0L)
                return false;
            long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
            wait(millis > 0L ? millis : 1L);
            nanos = deadline - System.nanoTime();
        }
    }
}

在時(shí)間范圍內(nèi)解幽,awaitTermination等待運(yùn)行狀態(tài)進(jìn)入TERMINATED贴见,沒(méi)有什么特別要講。

關(guān)閉ForkJoinPool

ForkJoinPool的關(guān)閉方法shutdown和shutdownNow都是調(diào)用tryTerminate亚铁,區(qū)別是now是否為true蝇刀。tryTerminate的代碼很長(zhǎng),我們將它分拆開(kāi)幾段來(lái)看徘溢。

int rs;
if (this == common)                       // cannot shut down
    return false;
if ((rs = runState) >= 0) {
    if (!enable)
        return false;
    rs = lockRunState();                  // enter SHUTDOWN phase
    unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
}

shutdown和shutdownNow調(diào)用tryTerminate將運(yùn)行狀態(tài)設(shè)置為SHUTDOWN(enable==true)吞琐。其他代碼調(diào)用tryTerminate,enable是false然爆,僅僅檢測(cè)ForkJoinPool是否正在關(guān)閉或者已經(jīng)關(guān)閉站粟。

if ((rs & STOP) == 0) {
    if (!now) {                           // check quiescence
        for (long oldSum = 0L;;) {        // repeat until stable
            WorkQueue[] ws; WorkQueue w; int m, b; long c;
            long checkSum = ctl;
            if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
                return false;             // still active workers
            if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
                break;                    // check queues
            for (int i = 0; i <= m; ++i) {
                if ((w = ws[i]) != null) {
                    if ((b = w.base) != w.top || w.scanState >= 0 ||
                        w.currentSteal != null) {
                        tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        return false;     // arrange for recheck
                    }
                    checkSum += b;
                    if ((i & 1) == 0)
                        w.qlock = -1;     // try to disable external
                }
            }
            if (oldSum == (oldSum = checkSum))
                break;
        }
    }
    if ((runState & STOP) == 0) {
        rs = lockRunState();              // enter STOP phase
        unlockRunState(rs, (rs & ~RSLOCK) | STOP);
    }
}

接下來(lái)一段很明確是為了進(jìn)入STOP狀態(tài),如果now是true曾雕,毫不猶豫修改狀態(tài)奴烙。否則需要進(jìn)行檢查,看是否真的能馬上進(jìn)入STOP。

檢查AC切诀,還有活動(dòng)worker當(dāng)然不能進(jìn)入STOP揩环;檢查所有WorkQueue,如果還在正常執(zhí)行任務(wù)幅虑,不能進(jìn)入STOP丰滑;偶數(shù)WorkQueue的qlock置為負(fù),攔截從外部提交任務(wù)倒庵。

int pass = 0;                             // 3 passes to help terminate
for (long oldSum = 0L;;) {                // or until done or stable
    WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
    long checkSum = ctl;
    //1
    if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
        (ws = workQueues) == null || (m = ws.length - 1) <= 0) {
        if ((runState & TERMINATED) == 0) {
            rs = lockRunState();          // done
            unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
            synchronized (this) { notifyAll(); } // for awaitTermination
        }
        break;
    }
    //2
    for (int i = 0; i <= m; ++i) {
        if ((w = ws[i]) != null) {
            checkSum += w.base;
            w.qlock = -1;                 // try to disable
            if (pass > 0) {
                w.cancelAll();            // clear queue
                if (pass > 1 && (wt = w.owner) != null) {
                    if (!wt.isInterrupted()) {
                        try {             // unblock join
                            wt.interrupt();
                        } catch (Throwable ignore) {
                        }
                    }
                    if (w.scanState < 0)
                        U.unpark(wt);     // wake up
                }
            }
        }
    }
    if (checkSum != oldSum) {             // unstable
        oldSum = checkSum;
        pass = 0;
    }
    else if (pass > 3 && pass > m)        // can't further help
        break;
    else if (++pass > 1) {                // try to dequeue
        long c; int j = 0, sp;            // bound attempts
        while (j++ <= m && (sp = (int)(c = ctl)) != 0)
            tryRelease(c, ws[sp & m], AC_UNIT);
    }
}

最后執(zhí)行真正終止ForkJoinPool的運(yùn)作褒墨。

mark1是循環(huán)的跳出條件,檢查TC和WorkQueue[]擎宝,如果為零或者為空郁妈,表示該停的都停了,進(jìn)入TERMINATED绍申。

mark2遍歷所有WorkQueue噩咪,pass記錄次數(shù),每次遍歷會(huì)執(zhí)行不同的操作:

  • 第一次:qlock設(shè)置為負(fù)數(shù)失晴;
  • 第二次:取消WorkQueue里所有任務(wù)剧腻,釋放棧頂WorkQueue;
  • 第三次:如果有歸屬的worker涂屁,中斷并解鎖線程书在。

異常處理

ForkJoinPool正常流程講完了,再補(bǔ)充講下異常處理拆又。

出現(xiàn)的異常我們無(wú)辦法直接在主線程捕獲儒旬,所以ForkJoinTask提供了isCompletedAbnormally檢查任務(wù)狀態(tài),并且可以通過(guò)任務(wù)的getException方法獲取異常帖族。

if (task.isCompletedAbnormally) {
    println(task.exception.message)
}

任務(wù)執(zhí)行doExec出現(xiàn)異常時(shí)栈源,會(huì)調(diào)用setExceptionalCompletion,里面繼續(xù)調(diào)用了recordExceptionalCompletion竖般。

final int recordExceptionalCompletion(Throwable ex) {
    int s;
    if ((s = status) >= 0) {
        int h = System.identityHashCode(this);
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();
        try {
            expungeStaleExceptions();
            ExceptionNode[] t = exceptionTable;
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                if (e == null) {
                    t[i] = new ExceptionNode(this, ex, t[i]);
                    break;
                }
                if (e.get() == this) // already present
                    break;
            }
        } finally {
            lock.unlock();
        }
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}

ExceptionNode保存了異常任務(wù)和異常信息甚垦,由一個(gè)ExceptionNode數(shù)組(exceptionTable)統(tǒng)一保存,ExceptionNode之間通過(guò)next構(gòu)成一條鏈涣雕。


獲取任務(wù)的異常使用getException艰亮,方法里判斷狀態(tài)后繼續(xù)調(diào)用getThrowableException。CANCELLED狀態(tài)的異常直接創(chuàng)建CancellationException挣郭,和EXCEPTIONAL狀態(tài)的流程不同迄埃。

public final Throwable getException() {
    int s = status & DONE_MASK;
    return ((s >= NORMAL)    ? null :
            (s == CANCELLED) ? new CancellationException() :
            getThrowableException());
}

getThrowableException的代碼不貼了,它從exceptionTable取出任務(wù)的異常信息并返回兑障。里面對(duì)拋出異常線程不是當(dāng)前線程這種情況進(jìn)行了處理侄非,為了得到更準(zhǔn)確的結(jié)果蕉汪,會(huì)讓當(dāng)前線程使用反射創(chuàng)建一樣的異常返回。

后記

本文僅過(guò)一遍流程代碼逞怨,很多設(shè)計(jì)思想沒(méi)有也很難寫清楚者疤,多看代碼注釋吧。ForkJoinPool細(xì)節(jié)復(fù)雜骇钦,文里肯定有很多錯(cuò)漏宛渐,望指正,謝謝眯搭。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市业岁,隨后出現(xiàn)的幾起案子鳞仙,更是在濱河造成了極大的恐慌,老刑警劉巖笔时,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棍好,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡允耿,警方通過(guò)查閱死者的電腦和手機(jī)借笙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)较锡,“玉大人业稼,你說(shuō)我怎么就攤上這事÷煸蹋” “怎么了低散?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)骡楼。 經(jīng)常有香客問(wèn)我熔号,道長(zhǎng),這世上最難降的妖魔是什么鸟整? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任引镊,我火速辦了婚禮,結(jié)果婚禮上篮条,老公的妹妹穿的比我還像新娘弟头。我一直安慰自己,他們只是感情好兑燥,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布亮瓷。 她就那樣靜靜地躺著,像睡著了一般降瞳。 火紅的嫁衣襯著肌膚如雪嘱支。 梳的紋絲不亂的頭發(fā)上蚓胸,一...
    開(kāi)封第一講書(shū)人閱讀 52,475評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音除师,去河邊找鬼沛膳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛汛聚,可吹牛的內(nèi)容都是我干的锹安。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼倚舀,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼叹哭!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起痕貌,我...
    開(kāi)封第一講書(shū)人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤风罩,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后舵稠,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體超升,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年哺徊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了室琢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡落追,死狀恐怖盈滴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情淋硝,我是刑警寧澤雹熬,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站谣膳,受9級(jí)特大地震影響竿报,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜继谚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一烈菌、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧花履,春花似錦芽世、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至妹卿,卻和暖如春旺矾,著一層夾襖步出監(jiān)牢的瞬間蔑鹦,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工箕宙, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留嚎朽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓柬帕,卻偏偏與公主長(zhǎng)得像哟忍,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子陷寝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容