ForkJoinPool源碼分析

概述

ForkJoinPool是Doug Lea 在JDK 1.7后加入的哈恰,為了充分利用多核CPU的計(jì)算能力察绷,采用分治算法,創(chuàng)建多個(gè)線程笨使、多個(gè)隊(duì)列镀裤,使用不同線程處理不同的隊(duì)列竞阐,且處理完自己的任務(wù)后,還會(huì)竊取其他線程的任務(wù)暑劝,達(dá)到充分使用CPU的目的骆莹。ForkJoinPool有很多使用場(chǎng)景,特別是JDK1.8中添加的parallel流處理和異步處理類CompletableFuture等中都有用到担猛。而且該類比較復(fù)雜幕垦,我們要戰(zhàn)術(shù)上重視它,耐下心看且放棄一些細(xì)枝末節(jié)傅联,先通覽整個(gè)流程先改。戰(zhàn)略上小看它,前面介紹類普通線程池和定時(shí)調(diào)度線程池蒸走,我們已經(jīng)知道套路了(最簡(jiǎn)單的一個(gè)流程:任務(wù)提交線程池->線程池創(chuàng)建線程->啟動(dòng)線程->線程run方法中又調(diào)用任務(wù)的run方法)仇奶,它也屬于線程池也是大概的邏輯。

看一下ForkJoinTask流程圖

image

ForkJoinPool使用例子

例子依然可以在github中找到

public class ForkJoinPoolTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //顯然使用IntStream.parallel().sum()可以方便得到結(jié)果
        // 且parallel也是使用的ForkJoinPool载碌,這是后話猜嘱,我們本例就是測(cè)試ForkJoinTask的分解
        int[] numbers = IntStream.rangeClosed(0, 1_000_000).toArray();
        long begin = System.currentTimeMillis();
        ForkJoinTask<Integer> submit = forkJoinPool.submit(new SumTask(numbers, 0, numbers.length - 1));
        System.out.println("累加結(jié)果為:" + submit.get());
        System.out.println("運(yùn)算耗時(shí):" + (System.currentTimeMillis() - begin));
    }

    private static class SumTask extends RecursiveTask<Integer> {
        private int[] numbers;
        private int from;
        private int to;

        public SumTask(int[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Integer compute() {
            if (to - from <= 2) {
                int total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskRight.join() + taskLeft.join();
                //return taskLeft.join() + taskRight.join() ;
            }
        }
    }
}

結(jié)果就不展示了衅枫,就是計(jì)算累加的和,這里有個(gè)注意點(diǎn)朗伶,可以看到compute方法中弦撩,對(duì)子任務(wù)taskLeft.fork()、taskRight.fork()后论皆,先執(zhí)行taskRight.join()再加上taskLeft.join()益楼,如果反過(guò)來(lái)寫,會(huì)發(fā)現(xiàn)慢將近一倍的時(shí)間点晴,為什么是這樣感凤?我們先留個(gè)疑問(wèn)在這,后面揭曉粒督。

提交任務(wù)

submit方法

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    //任務(wù)不允許為空
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    //probe是和線程相關(guān)的一個(gè)值陪竿,線程私有
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    //相當(dāng)于進(jìn)行一次快速入隊(duì),成功則返回屠橄,不成功externalSubmit執(zhí)行完整的入隊(duì)
    //當(dāng)隊(duì)列數(shù)組不為空且線程入隊(duì)的隊(duì)列不為空時(shí)族跛,加鎖入隊(duì)
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //隨機(jī)到某個(gè)偶數(shù)隊(duì)列中
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {//加鎖操作,鎖定workQueue
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task); //任務(wù)入隊(duì)
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0); //解鎖操作
            if (n <= 1) //當(dāng)任務(wù)數(shù)小于等于1時(shí)執(zhí)行喚醒空閑線程或者創(chuàng)建新線程執(zhí)行任務(wù)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    //完整版入隊(duì)操作锐墙,可以看到如果某個(gè)外部線程第一次submit礁哄,肯定是到這里的(因?yàn)樗玫降膔是0)
    externalSubmit(task);
}

externalSubmit方法

private void externalSubmit(ForkJoinTask<?> task) {
    int r; // initialize callers probe 
    //如果線程的probe沒有初始化,進(jìn)行初始化
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    //這是一個(gè)死循環(huán)溪北,所以可以保證WorkQueue[]數(shù)組的創(chuàng)建桐绒, 隊(duì)列的創(chuàng)建, 任務(wù)入隊(duì)
    for (;;) {
        WorkQueue[] ws; WorkQueue q; int rs, m, k;
        boolean move = false;
        if ((rs = runState) < 0) {
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        else if ((rs & STARTED) == 0 ||     // initialize WorkQueue[]數(shù)組的創(chuàng)建
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();
            try {
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                           new AtomicLong());
                    // create workQueues array with size a power of two
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        else if ((q = ws[k = r & m & SQMASK]) != null) { //任務(wù)入隊(duì)
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                ForkJoinTask<?>[] a = q.array;
                int s = q.top;
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        U.putOrderedObject(a, j, task);
                        U.putOrderedInt(q, QTOP, s + 1);
                        submitted = true;
                    }
                } finally {
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                if (submitted) { //入隊(duì)成功后之拨,喚醒或者新建一個(gè)線程茉继,處理任務(wù)
                    signalWork(ws, q);
                    return;
                }
            }
            move = true;                   // move on failure
        }
        else if (((rs = runState) & RSLOCK) == 0) { // create new queue 隊(duì)列的創(chuàng)建
            q = new WorkQueue(this, null);
            q.hint = r;
            q.config = k | SHARED_QUEUE;
            q.scanState = INACTIVE;
            rs = lockRunState();           // publish index
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                ws[k] = q;                 // else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        }
        else
            move = true;                   // move if busy
        //如果隊(duì)列加鎖失敗,說(shuō)明被別的線程處理了蚀乔,重新計(jì)算probe的值
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

可以看到不管是快速入隊(duì)方法馒疹,還是完整入隊(duì)方法,入隊(duì)成功后都會(huì)調(diào)用signalWork方法乙墙。
signalWork方法

final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    //循環(huán)檢查:有空閑線程喚醒空閑線程颖变,工作線程數(shù)太少,則新建空閑線程
    while ((c = ctl) < 0L) {                       // too few active
        if ((sp = (int)c) == 0) {                  // no idle workers
            if ((c & ADD_WORKER) != 0L)            // too few workers
                tryAddWorker(c); //如果工作線程太小听想,創(chuàng)建新的工作線程處理
            break;
        }
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p); //喚醒阻塞線程
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

我們看看新建線程方法

private void tryAddWorker(long c) {
    boolean add = false;
    //也是同樣的套路腥刹,先嘗試CAS修改ctl值,增加工作線程數(shù)汉买,增加成功衔峰,調(diào)用createWorker方法
    do {
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        if (ctl == c) {
            int rs, stop;                 // check if terminating
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);
            if (stop != 0)
                break;
            if (add) {
                createWorker(); //創(chuàng)建新線程
                break;
            }
        }
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

createWorker 方法

private boolean createWorker() {
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        //也是和ThreadPoolExecutor一樣的套路
        //創(chuàng)建線程成功,將線程start后方法返回, 否則執(zhí)行deregisterWorker進(jìn)行回退操作
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    } catch (Throwable rex) {
        ex = rex;
    }
    //注銷工作線程和fac.newThread方法中的registerWorker相對(duì)
    //回退操作,會(huì)減少ctl值垫卤,移除工作線程的隊(duì)列威彰,另外如果工作線程數(shù)太少會(huì)再次調(diào)用tryAddWorker方法,嘗試新建線程
    deregisterWorker(wt, ex);
    return false;
}

我們看看ForkJoinWorkerThreadFactory.newThread做了什么穴肘?
ForkJoinWorkerThreadFactory.newThread方法

public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    return new ForkJoinWorkerThread(pool);
}
//將自己的工作隊(duì)列workQueue注冊(cè)到ForkJoinPool的WorkQueue[] 數(shù)組中
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // Use a placeholder until a useful name can be set in registerWorker
    super("aForkJoinWorkerThread");
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    wt.setDaemon(true);                           // configure thread
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);
    //新建一個(gè)WorkQueue對(duì)象歇盼,這個(gè)是工作線程的WorkQueue
    WorkQueue w = new WorkQueue(this, wt);
    int i = 0;                                    // assign a pool index
    int mode = config & MODE_MASK;
    int rs = lockRunState();
    try {
        WorkQueue[] ws; int n;                    // skip if no array
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            int m = n - 1;
            //得到一個(gè)奇數(shù)下標(biāo)
            i = ((s << 1) | 1) & m;               // odd-numbered indices
            if (ws[i] != null) {                  // collision
                int probes = 0;                   // step by approx half n
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                while (ws[i = (i + step) & m] != null) {
                    if (++probes >= n) {
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            w.hint = s;                           // use as random seed
            w.config = i | mode;
            w.scanState = i;                      // publication fence
            //將工作線程的workWueue賦值給線程池的一個(gè)奇數(shù)下標(biāo)
            ws[i] = w;
        }
    } finally {
        unlockRunState(rs, rs & ~RSLOCK);
    }
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

上面我們看到createWorker方法中,線程創(chuàng)建成功后评抚,會(huì)進(jìn)行thread.start,我們照舊看ForkJoinWorkerThread類的run方法吧豹缀。
ForkJoinWorkerThread.run 方法

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

run方法又調(diào)用了ForkJoinPool的runWorker方法

final void runWorker(WorkQueue w) {
    //分配內(nèi)存
    w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask<?> t;;) {
        //進(jìn)行掃描,隨機(jī)竊取一個(gè)頂級(jí)任務(wù)
        if ((t = scan(w, r)) != null)
            w.runTask(t); //運(yùn)行任務(wù)
        else if (!awaitWork(w, r)) //如果竊取不到任務(wù)慨代,進(jìn)行等待
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}
private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws; int m;
    //當(dāng)線程池不為空邢笙,進(jìn)行掃描
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
        int ss = w.scanState;                     // initially non-negative
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
            WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
            int b, n; long c;
            if ((q = ws[k]) != null) {//獲取workQueue
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {      // non-empty
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask<?>) //獲取任務(wù)
                              U.getObjectVolatile(a, i))) != null &&
                        q.base == b) {
                        if (ss >= 0) {
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                q.base = b + 1; //更新base位置
                                if (n < -1)       // signal others
                                    signalWork(ws, q); //喚醒空閑線程或新建線程,幫忙處理任務(wù)
                                return t;
                            }
                        }
                        else if (oldSum == 0 &&   // try to activate
                                 w.scanState < 0)
                            tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                    }
                    if (ss < 0)                   // refresh
                        ss = w.scanState;
                    //沒掃描到侍匙,掃描其他位置    
                    r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                checkSum += b;
            }
            //更新workQueue下標(biāo)值k 繼續(xù)查找
            if ((k = (k + 1) & m) == origin) {    // continue until stable
                //運(yùn)行到這里說(shuō)明已經(jīng)掃描了全部的 workQueues氮惯,但并未掃描到任務(wù)
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0)    // already inactive
                        break;
                    //對(duì)當(dāng)前WorkQueue進(jìn)行inactivate 處理
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                               (UC_MASK & ((c = ctl) - AC_UNIT)));
                    w.stackPred = (int)c;         // hold prev stack top
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;
            }
        }
    }
    return null;
}

掃描到任務(wù)以后,會(huì)調(diào)用任務(wù)的runTask方法

final void runTask(ForkJoinTask<?> task) {
    if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        //調(diào)用任務(wù)的doExec方法
        (currentSteal = task).doExec();
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();
    }
}
final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            //調(diào)用exec方法并將返回值賦值給completed
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

到了這里想暗,終于快看到我們測(cè)試?yán)恿藦?fù)寫的compute方法了筐骇,我們看下例子中繼承的RecursiveTask類

protected final boolean exec() {
    result = compute();
    return true;
}

小結(jié)
上面我們看到線程池提交任務(wù),放到一個(gè)workQueue數(shù)組的一個(gè)偶數(shù)下標(biāo)的隊(duì)列中江滨,然后新建一個(gè)工作線程,工作線程中初始化一個(gè)workQueue放入workQueue數(shù)組奇數(shù)下標(biāo)中厌均。\

fork方法

public final ForkJoinTask<V> fork() {
    Thread t;
    //如果是ForkJoinWorkerThread 線程fork出來(lái)的唬滑,push到自己的workQueue中
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else 
        ForkJoinPool.common.externalPush(this); //否則push到common池中
    return this;
}

push 方法

final void push(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //任務(wù)入隊(duì)
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m) //數(shù)組滿了,進(jìn)行擴(kuò)容
            growArray();
    }
}

compute中調(diào)用子任務(wù)的fork后棺弊,就會(huì)將子任務(wù)入隊(duì)了晶密,然后taskRight.join等待子任務(wù)處理完成。我們看看join方法的邏輯模她。

//等待任務(wù)執(zhí)行完成并返回結(jié)果
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    //tryUnpush判斷當(dāng)前任務(wù)是棧頂任務(wù)稻艰,直接進(jìn)行處理(即調(diào)子任務(wù)的compute方法),否則進(jìn)入awaitJoin方法
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}

await方法

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask<?> prevJoin = w.currentJoin;
        U.putOrderedObject(w, QCURRENTJOIN, task);
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>)task : null;
        for (;;) {
            if ((s = task.status) < 0)
                break;
            //如果是CountedCompleter任務(wù)侈净,執(zhí)行helpComplete    
            if (cc != null)
                helpComplete(w, cc, 0);
            //這里比較關(guān)鍵尊勿,如果隊(duì)列不為空,會(huì)再執(zhí)行tryRemoveAndExec    
            else if (w.base == w.top || w.tryRemoveAndExec(task))
                helpStealer(w, task);//如果隊(duì)列是空或者遇到的任務(wù)都被別的線程執(zhí)行過(guò)了畜侦,就偷個(gè)任務(wù)做
            if ((s = task.status) < 0)
                break;
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;
            //嘗試釋放一個(gè)線程或新建一個(gè)線程    
            if (tryCompensate(w)) {
                //阻塞自己
                task.internalWait(ms);
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
    }
    return s;
}

tryRemoveAndExec方法

final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
    ForkJoinTask<?>[] a; int m, s, b, n;
    if ((a = array) != null && (m = a.length - 1) >= 0 &&
        task != null) {
        while ((n = (s = top) - (b = base)) > 0) {
            //遍歷整個(gè)隊(duì)列元扔,如果隊(duì)列中存在此子任務(wù),進(jìn)行調(diào)用doExec
            for (ForkJoinTask<?> t;;) {      // traverse from s to b
                long j = ((--s & m) << ASHIFT) + ABASE;
                if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                    return s + 1 == top;     // shorter than expected
                else if (t == task) {
                    boolean removed = false;
                    if (s + 1 == top) {      // pop
                        if (U.compareAndSwapObject(a, j, task, null)) {
                            U.putOrderedInt(this, QTOP, s);
                            removed = true;
                        }
                    }
                    else if (base == b)      // replace with proxy
                        removed = U.compareAndSwapObject(
                            a, j, task, new EmptyTask());
                    if (removed)
                        task.doExec();
                    break;
                }
                else if (t.status < 0 && s + 1 == top) {
                    if (U.compareAndSwapObject(a, j, t, null))
                        U.putOrderedInt(this, QTOP, s);
                    break;                  // was cancelled
                }
                if (--n == 0)
                    return false;
            }
            if (task.status < 0)
                return false;
        }
    }
    return true;
}

至此整個(gè)流程就串起來(lái)了旋膳,例子中的SumTask類的compute方法執(zhí)行后澎语,會(huì)創(chuàng)建子任務(wù),子任務(wù).fork()會(huì)將任務(wù)入隊(duì),子任務(wù).join()時(shí)擅羞,會(huì)執(zhí)行子任務(wù)的compute方法尸变。
join方法的分析完后,我們可以回答taskRight.join() + taskLeft.join()會(huì)更高效减俏?
因?yàn)檎{(diào)用taskLeft.fork會(huì)將taskLeft入隊(duì)召烂,taskRight.fork會(huì)將taskRight入隊(duì),接下來(lái)如果執(zhí)行taskRight.join()垄懂,taskRight這時(shí)候是棧頂任務(wù)骑晶,直接tryUnpush執(zhí)行,不需要再遍歷隊(duì)列草慧。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末桶蛔,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子漫谷,更是在濱河造成了極大的恐慌仔雷,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件舔示,死亡現(xiàn)場(chǎng)離奇詭異碟婆,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)惕稻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門竖共,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人俺祠,你說(shuō)我怎么就攤上這事公给。” “怎么了蜘渣?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵淌铐,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蔫缸,道長(zhǎng)腿准,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任拾碌,我火速辦了婚禮吐葱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘校翔。我一直安慰自己唇撬,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布展融。 她就那樣靜靜地躺著窖认,像睡著了一般豫柬。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上扑浸,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天烧给,我揣著相機(jī)與錄音,去河邊找鬼喝噪。 笑死础嫡,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的酝惧。 我是一名探鬼主播榴鼎,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼晚唇!你這毒婦竟也來(lái)了巫财?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤哩陕,失蹤者是張志新(化名)和其女友劉穎平项,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體悍及,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡闽瓢,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了心赶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片扣讼。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖缨叫,靈堂內(nèi)的尸體忽然破棺而出椭符,到底是詐尸還是另有隱情,我是刑警寧澤弯汰,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站湖雹,受9級(jí)特大地震影響咏闪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜摔吏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一鸽嫂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧征讲,春花似錦据某、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至,卻和暖如春筷狼,著一層夾襖步出監(jiān)牢的瞬間瓶籽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工埂材, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留塑顺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓俏险,卻偏偏與公主長(zhǎng)得像严拒,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子竖独,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355