JAVA基礎之Fork/Join框架

1简僧、核心思想

Fork/Join框架是Java 7提供的一個用于并行執(zhí)行任務的框架澄惊, 核心思想就是把大任務分割成若干個小任務紊馏,最終匯總每個小任務結果后得到大任務結果牺弄,其實現(xiàn)思想與MapReduce有異曲同工之妙姻几。

Fork就是把一個大任務切分為若干子任務并行的執(zhí)行,Join就是合并這些子任務的執(zhí)行結果势告,最后得到這個大任務的結果蛇捌。比如計算1+2+…+10000,可以分割成10個子任務咱台,每個子任務分別對1000個數(shù)進行求和络拌,最終匯總這10個子任務的結果。Fork/Join的運行流程圖如下:

1.png

Fork/Join框架使用一個巧妙的算法來平衡線程的負載回溺,稱為工作竊取(work-stealing)算法春贸。工作竊取的運行流程圖如下:

2.png

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務遗遵,為了減少線程間的競爭萍恕,于是把這些子任務分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨的線程來執(zhí)行隊列里的任務车要,線程和隊列一一對應允粤,比如A線程負責處理A隊列里的任務。但是有的線程會先把自己隊列里的任務干完翼岁,而其他線程對應的隊列里還有任務等待處理维哈。干完活的線程與其等著,不如去幫其他線程干活登澜,于是它就去其他線程的隊列里竊取一個任務來執(zhí)行。而在這時它們會訪問同一個隊列飘庄,所以為了減少竊取任務線程和被竊取任務線程之間的競爭脑蠕,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執(zhí)行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執(zhí)行谴仙。

工作竊取算法的優(yōu)點是充分利用線程進行并行計算迂求,并減少了線程間的競爭,其缺點是在某些情況下還是存在競爭晃跺,比如雙端隊列里只有一個任務時揩局。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列掀虎。

2凌盯、應用實例

Fork/Join框架主要由兩部分組成:

  • 分割任務。首先我們需要有一個fork類來把大任務分割成子任務烹玉,有可能子任務還是很大驰怎,所以還需要不停的分割,直到分割出的子任務足夠小二打。
  • 執(zhí)行任務并合并結果县忌。分割的子任務分別放在雙端隊列里,然后幾個啟動線程分別從雙端隊列里獲取任務執(zhí)行继效。子任務執(zhí)行完的結果都統(tǒng)一放在一個隊列里症杏,啟動一個線程從隊列里拿數(shù)據(jù),然后合并這些數(shù)據(jù)瑞信。

Fork/Join使用兩個類來完成以上兩件事情:

  • ForkJoinTask
    我們要使用ForkJoin框架厉颤,必須首先創(chuàng)建一個ForkJoin任務。它提供在任務中執(zhí)行fork()join()操作的機制喧伞,通常情況下我們不需要直接繼承ForkJoinTask類走芋,而只需要繼承它的子類,F(xiàn)ork/Join框架提供了以下兩個子類:
    • RecursiveAction:用于沒有返回結果的任務潘鲫。
    • RecursiveTask :用于有返回結果的任務翁逞。
  • ForkJoinPool
    ForkJoinTask需要通過ForkJoinPool來執(zhí)行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中溉仑,進入隊列的頭部挖函。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務浊竟。

讓我們通過一個簡單的需求來使用下Fork/Join框架怨喘,需求是:計算1~8的累加結果。

使用Fork/Join框架首先要考慮到的是如何分割任務振定,如果我們希望每個子任務最多執(zhí)行兩個數(shù)的相加必怜,那么設置分割的閾值是2,由于是8個數(shù)字相加后频,所以Fork/Join框架會把這個任務fork成兩個子任務梳庆,子任務一負責計算1+2+3+4暖途,子任務二負責計算3+4+5+6,然后子任務會繼續(xù)分隔膏执,直到累加的數(shù)字將為兩個驻售,最后逐層join子任務的結果。

public class CountTask extends RecursiveTask<Integer> {

    private static final int THRESHHOLD = 2;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        System.out.println(start + " - " + end + " begin");
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHHOLD;
        if (canCompute) { // 達到了計算條件更米,則直接執(zhí)行
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else { // 不滿足計算條件欺栗,則分割任務
            int middle = (start + end) / 2;

            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);

            leftTask.fork(); // 執(zhí)行子任務
            rightTask.fork();
            int leftResult = leftTask.join(); // 等待子任務執(zhí)行完畢
            int rightResult = rightTask.join();

            sum = leftResult + rightResult; // 合并子任務的計算結果
        }
        System.out.println(start + " - " + end + " end");
        return sum;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool();
        CountTask task = new CountTask(1, 8);
        Future<Integer> future = pool.submit(task);
        if (task.isCompletedAbnormally()) {
            System.out.println(task.getException());
        } else {
            System.out.println("result: " + future.get());
        }

    }

}

打印結果:

1 - 8 begin
1 - 4 begin
5 - 8 begin
5 - 6 begin
5 - 6 end
1 - 2 begin
1 - 2 end
3 - 4 begin
3 - 4 end
7 - 8 begin
7 - 8 end
5 - 8 end
1 - 4 end
1 - 8 end
result: 36

由于每個任務是由線程池執(zhí)行的,每次的執(zhí)行順序會有不同征峦,但是迟几,父任務肯定在所有子任務之后完成,比如1-8的計算肯定在子任務1-4眶痰、5-8之后完成瘤旨,但是1-4、5-8的完成順序是不確定的竖伯。

ForkJoinTask在執(zhí)行的時候可能會拋出異常存哲,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經(jīng)拋出異称哂ぃ或已經(jīng)被取消了祟偷,并且可以通過ForkJoinTaskgetException方法獲取異常。

getException方法返回Throwable對象打厘,如果任務被取消了則返回CancellationException修肠。如果任務沒有完成或者沒有拋出異常則返回null。

3户盯、 源碼解讀

3.png

3.1 ForkJoinPool

ForkJoinTask代表一個需要執(zhí)行的任務嵌施,真正執(zhí)行這些任務的線程放在一個ForkJoinPool里面。ForkJoinPool是一個可以執(zhí)行ForkJoinTaskExcuteService莽鸭,與ExcuteService不同的是它采用了work-stealing模式:所有在池中的空閑線程嘗試去執(zhí)行其他線程創(chuàng)建的子任務吗伤,這樣就很少有線程處于空閑狀態(tài),非常高效硫眨。

池中維護著ForkJoinWorkerThread對象數(shù)組:

    /**
     * Array holding all worker threads in the pool.  Initialized upon
     * construction. Array size must be a power of two.  Updates and
     * replacements are protected by scanGuard, but the array is
     * always kept in a consistent enough state to be randomly
     * accessed without locking by workers performing work-stealing,
     * as well as other traversal-based methods in this class, so long
     * as reads memory-acquire by first reading ctl. All readers must
     * tolerate that some array slots may be null.
          */
     ForkJoinWorkerThread[] workers;

ForkJoinWorkerThread為任務的執(zhí)行線程足淆,workers數(shù)組在構造方法中初始化,其大小必須為2的n次方(方便將取模轉換為移位運算)礁阁。

ForkJoinPool初始化方法:

// initialize workers array with room for 2*parallelism if possible
int n = parallelism << 1;
if (n >= MAX_ID)
  n = MAX_ID;
else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
  n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
workers = new ForkJoinWorkerThread[n + 1];

可見巧号,workers數(shù)組大小由parallelism屬性決定,parallelism默認為處理器個數(shù)姥闭,workers數(shù)組默認大小為處理器數(shù)量2*丹鸿,但是不能超過MAX_ID

private static final int MAX_ID = 0x7fff; // max poolIndex

什么情況下需要添加線程呢?當新的任務到來棚品,線程池會通知其他線程前去處理卜高,如果這時沒有處于等待的線程或者處于活動的線程非常少(這是通過ctl屬性來判斷的)弥姻,就會往線程池中添加線程:

/**
 * Tries to create and start a worker; minimally rolls back counts
 * on failure.
      */
    private void addWorker() {
    Throwable ex = null;
    ForkJoinWorkerThread t = null;
    try {
        t = factory.newThread(this);
    } catch (Throwable e) {
        ex = e;
    }
    if (t == null) {  // null or exceptional factory return
        long c;       // adjust counts
        do {} while (!UNSAFE.compareAndSwapLong
                     (this, ctlOffset, c = ctl,
                      (((c - AC_UNIT) & AC_MASK) |
                       ((c - TC_UNIT) & TC_MASK) |
                       (c & ~(AC_MASK|TC_MASK)))));
        // Propagate exception if originating from an external caller
        if (!tryTerminate(false) && ex != null &&
            !(Thread.currentThread() instanceof ForkJoinWorkerThread))
            UNSAFE.throwException(ex);
    }
    else
        t.start();
    }

增加線程通過ForkJoinWorkerThreadFactory來實現(xiàn),底層實現(xiàn)方法為:

    /**
     * Creates a ForkJoinWorkerThread operating in the given pool.
     *
     * @param pool the pool this thread works in
     * @throws NullPointerException if pool is null
     */
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super(pool.nextWorkerName());
        this.pool = pool;
        int k = pool.registerWorker(this);
        poolIndex = k;
        eventCount = ~k & SMASK; // clear wait count
        locallyFifo = pool.locallyFifo;
        Thread.UncaughtExceptionHandler ueh = pool.ueh;
        if (ueh != null)
            setUncaughtExceptionHandler(ueh);
        setDaemon(true);
    }

可見掺涛,該線程生成后需要回調ForkJoinPool. registerWorker在線程池中完成注冊:

    /**
     * Callback from ForkJoinWorkerThread constructor to
     * determine its poolIndex and record in workers array.
     *
     * @param w the worker
     * @return the worker's pool index
     */
    final int registerWorker(ForkJoinWorkerThread w) {
        /*
         * In the typical case, a new worker acquires the lock, uses
         * next available index and returns quickly.  Since we should
         * not block callers (ultimately from signalWork or
         * tryPreBlock) waiting for the lock needed to do this, we
         * instead help release other workers while waiting for the
         * lock.
         */
        for (int g;;) {
            ForkJoinWorkerThread[] ws;
            if (((g = scanGuard) & SG_UNIT) == 0 &&
                UNSAFE.compareAndSwapInt(this, scanGuardOffset,
                                         g, g | SG_UNIT)) {
                int k = nextWorkerIndex;
                try {
                    if ((ws = workers) != null) { // ignore on shutdown
                        int n = ws.length;
                        if (k < 0 || k >= n || ws[k] != null) {
                            for (k = 0; k < n && ws[k] != null; ++k)
                                ;
                            if (k == n)
                                ws = workers = Arrays.copyOf(ws, n << 1);
                        }
                        ws[k] = w;
                        nextWorkerIndex = k + 1;
                        int m = g & SMASK;
                        g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
                    }
                } finally {
                    scanGuard = g;
                }
                return k;
            }
            else if ((ws = workers) != null) { // help release others
                for (ForkJoinWorkerThread u : ws) {
                    if (u != null && u.queueBase != u.queueTop) {
                        if (tryReleaseWaiter())
                            break;
                    }
                }
            }
        }
    }

整個框架大量采用順序鎖,好處是不用阻塞疼进,不好的地方是會有額外的循環(huán)薪缆。這里也是通過循環(huán)來注冊這個線程,在循環(huán)的過程中有兩種情況發(fā)生:

  • compareAndSwapInt操作成功伞广,掃描workers數(shù)組拣帽,找到一個為空的項,并把新創(chuàng)建的線程放在這個位置嚼锄;如果沒有找到减拭,表示數(shù)組大小不夠,則將數(shù)組擴大2倍区丑;
  • compareAndSwapInt操作失敗拧粪,需要循環(huán)重新嘗試直接成功為止,從代碼中可以看出沧侥,即使是失敗了可霎,也不忘做一些額外的事:通知其他線程去執(zhí)行沒有完成的任務

ForkJoinPool可以通過execute提交ForkJoinTask任務,然后通過ForkJoinWorkerThread. pushTask實現(xiàn)宴杀。

    /**
     * Unless terminating, forks task if within an ongoing FJ computation in the
     * current pool, else submits as external task.
     */
    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread) t).pool == this)
            w.pushTask(task);
        else
            addSubmission(task);
    }

    /**
     * Arranges for (asynchronous) execution of the given task.
     *
     * @param task
     *            the task
     * @throws NullPointerException
     *             if the task is null
     * @throws RejectedExecutionException
     *             if the task cannot be scheduled for execution
     */
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
    }

除此之外癣朗,ForkJoinPool還覆蓋并重載了從ExecutorService繼承過來的executesubmit方法外,可以接受Runnable``與Callable類型的任務旺罢。

ExecutorService一樣旷余,ForkJoinPool可以調用shutdown()shutdownNow()來終止線程,會先設置每個線程的任務狀態(tài)為CANCELLED扁达,然后調用Threadinterrupt方法來終止每個線程正卧。

ExcuteService不同的是,ForkJoinPool除了可以執(zhí)行Runnable任務外罩驻,還可以執(zhí)行ForkJoinTask任務穗酥; ExcuteService中處于后面的任務需要等待前面任務執(zhí)行后才有機會執(zhí)行,而ForkJoinPool會采用work-stealing模式幫助其他線程執(zhí)行任務惠遏,即ExcuteService解決的是并發(fā)問題砾跃,而ForkJoinPool解決的是并行問題

3.2 ForkJoinWorkerThread

ForkJoinWorkerThread繼承自Thread节吮,受ForkJoinPool支配用以執(zhí)行ForkJoinTask抽高。

該類中有幾個重要的域:

    /**
     * Capacity of work-stealing queue array upon initialization.
     * Must be a power of two. Initial size must be at least 4, but is
     * padded to minimize cache effects.
     */
    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    /**
     * Maximum size for queue array. Must be a power of two
     * less than or equal to 1 << (31 - width of array entry) to
     * ensure lack of index wraparound, but is capped at a lower
     * value to help users trap runaway computations.
     */
    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    /**
     * The work-stealing queue array. Size must be a power of two.
     * Initialized when started (as oposed to when constructed), to
     * improve memory locality.
     */
    ForkJoinTask<?>[] queue;

    /**
     * The pool this thread works in. Accessed directly by ForkJoinTask.
     */
    final ForkJoinPool pool;

    /**
     * Index (mod queue.length) of next queue slot to push to or pop
     * from. It is written only by owner thread, and accessed by other
     * threads only after reading (volatile) queueBase.  Both queueTop
     * and queueBase are allowed to wrap around on overflow, but
     * (queueTop - queueBase) still estimates size.
     */
    int queueTop;

    /**
     * Index (mod queue.length) of least valid queue slot, which is
     * always the next position to steal from if nonempty.
     */
    volatile int queueBase;
    /**
     * The index of most recent stealer, used as a hint to avoid
     * traversal in method helpJoinTask. This is only a hint because a
     * worker might have had multiple steals and this only holds one
     * of them (usually the most current). Declared non-volatile,
     * relying on other prevailing sync to keep reasonably current.
     */
    int stealHint;

ForkJoinWorkerThread使用數(shù)組實現(xiàn)雙端隊列,用來盛放ForkJoinTask透绩,queueTop指向對頭翘骂,queueBase指向隊尾壁熄。本地線程插入任務、獲取任務都在隊頭進行碳竟,其他線程“竊取”任務則在隊尾進行草丧。

poolIndex本線程在ForkJoinPool中工作線程數(shù)組中的下標,stealHint保存了最近的竊取者(來竊取任務的工作線程)的下標(poolIndex)莹桅。注意這個值不準確昌执,因為可能同時有很多竊取者來竊取任務,這個值只能記錄其中之一诈泼。

添加任務:

    /**
     * Pushes a task. Call only from this thread.
     *
     * @param t the task. Caller must ensure non-null.
     */
    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

首先將任務放在queueTop指向的隊列位置懂拾,再將queueTop加1。

然后分析隊列容量情況铐达,當數(shù)組元素比較少時(1或者2)岖赋,就調用signalWork()方法。signalWork()方法做了兩件事:

  • 喚醒當前線程;
  • 當沒有活動線程時或者線程數(shù)較少時瓮孙,添加新的線程唐断。

else if部分表示隊列已滿(隊頭指針=隊列長度減1),調用growQueue()擴容衷畦。

join任務:

    /**
     * Possibly runs some tasks and/or blocks, until joinMe is done.
     *
     * @param joinMe the task to join
     * @return completion status on exit
     */
    final int joinTask(ForkJoinTask<?> joinMe) {
        ForkJoinTask<?> prevJoin = currentJoin;
        currentJoin = joinMe;
        for (int s, retries = MAX_HELP;;) {
            if ((s = joinMe.status) < 0) {
                currentJoin = prevJoin;
                return s;
            }
            if (retries > 0) {
                if (queueTop != queueBase) {
                    if (!localHelpJoinTask(joinMe))
                        retries = 0;           // cannot help
                }
                else if (retries == MAX_HELP >>> 1) {
                    --retries;                 // check uncommon case
                    if (tryDeqAndExec(joinMe) >= 0)
                        Thread.yield();        // for politeness
                }
                else
                    retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
            }
            else {
                retries = MAX_HELP;           // restart if not done
                pool.tryAwaitJoin(joinMe);
            }
        }
    }

join操作類似插隊栗涂,確保入?yún)?code>joinMe執(zhí)行完畢后再進行后續(xù)操作。

這里面有個變量retries祈争,表示可以重試的次數(shù)斤程,最大值為MAX_HELP=16。重試的過程如下:

  • 判斷joinMe是否已完成(joinMe.status < 0)菩混,如果是忿墅,則直接返回。
  • 判斷retries是否用完了沮峡,如果是疚脐,則調用pool.tryAwaitJoin()阻塞當前新城,等待joinMe完成
  • retries大于0邢疙,首先判斷當前線程的任務隊列queue是否為空(queueTop != queueBase)棍弄,如果不為空,調用localHelpJoinTask()方法疟游,判斷joinMe任務是否在自己的queue的隊首位置呼畸,如果正好在,執(zhí)行該任務颁虐;同時蛮原,由于queue不為空,則證明自己并不是沒事干另绩,無法幫助別的線程干活(工作竊热逶伞)花嘶,retries置零
  • 如果自己的queue為空了,調用helpJoinTask()方法進行工作竊取蹦漠,幫助其他線程干活椭员,反正閑著也是閑著。
  • 幫別人干活也不是每次都能成功笛园,如果連續(xù)8次都失敗了(retries == MAX_HELP >>> 1)拆撼,說明人品不行,自己還是歇會吧喘沿,調用Thread.yield()讓權。不過竭贩,讓權之前還會做最有一次努力蚜印,調用tryDeqAndExec(),看看自己在等的任務是否在某個線程的隊尾留量,在的話偷過來執(zhí)行掉窄赋。

3.3 ForkJoinTask

當我們調用ForkJoinTaskfork方法時,程序會調用ForkJoinWorkerThreadpushTask方法異步的執(zhí)行這個任務楼熄,然后立即返回結果忆绰。

    /**
     * Arranges to asynchronously execute this task.  While it is not
     * necessarily enforced, it is a usage error to fork a task more
     * than once unless it has completed and been reinitialized.
     * Subsequent modifications to the state of this task or any data
     * it operates on are not necessarily consistently observable by
     * any thread other than the one executing it unless preceded by a
     * call to {@link #join} or related methods, or a call to {@link
     * #isDone} returning {@code true}.
     *
     * <p>This method may be invoked only from within {@code
     * ForkJoinPool} computations (as may be determined using method
     * {@link #inForkJoinPool}).  Attempts to invoke in other contexts
     * result in exceptions or errors, possibly including {@code
     * ClassCastException}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

可見,fork()操作是通過調用ForkJoinWorkerThread.pushTask()實現(xiàn)的可岂。該方法在上面已做分析错敢,不再贅述。

join方法的主要作用是阻塞當前線程并等待獲取結果缕粹。代碼如下:

    /**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

    /**
     * Report the result of invoke or join; called only upon
     * non-normal return of internal versions.
     */
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }

首先稚茅,它調用了doJoin()方法,通過doJoin()方法得到當前任務的狀態(tài)來判斷返回什么結果平斩,任務狀態(tài)有四種:

private static final int NORMAL      = -1;
private static final int CANCELLED   = -2;
private static final int EXCEPTIONAL = -3;
private static final int SIGNAL      =  1;
  • 如果任務狀態(tài)是已完成亚享,則直接返回任務結果。
  • 如果任務狀態(tài)是被取消绘面,則直接拋出CancellationException欺税。
  • 如果任務狀態(tài)是拋出異常,則直接拋出對應的異常揭璃。

再來看doJoin方法:

private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                    (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                            // 執(zhí)行任務
                            tryUnpush(this) && (s = doExec()) < 0 ? s :
                            wt.pool.awaitJoin(w, this, 0L) :
                    // 阻塞非工作線程晚凿,直到工作線程執(zhí)行完畢
                    externalAwaitDone();
}

final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) {
        try {
            completed = exec();
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex);
        }
        if (completed)
            s = setCompletion(NORMAL);
    }
    return s;
}

doJoin()方法里,首先通過查看任務的狀態(tài)塘辅,看任務是否已經(jīng)執(zhí)行完成晃虫,如果執(zhí)行完成, 則直接返回任務狀態(tài)扣墩;如果沒有執(zhí)行完哲银,則從任務數(shù)組里取出任務并執(zhí)行扛吞。如果任務順利執(zhí)行完成,則設置任務狀態(tài)為NORMAL荆责,如果出現(xiàn)異常滥比,則記錄異常,并將任務狀態(tài)設置為EXCEPTIONAL做院。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末盲泛,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子键耕,更是在濱河造成了極大的恐慌寺滚,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件屈雄,死亡現(xiàn)場離奇詭異村视,居然都是意外死亡,警方通過查閱死者的電腦和手機酒奶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門蚁孔,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人惋嚎,你說我怎么就攤上這事杠氢。” “怎么了另伍?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵鼻百,是天一觀的道長。 經(jīng)常有香客問我质况,道長愕宋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任结榄,我火速辦了婚禮中贝,結果婚禮上,老公的妹妹穿的比我還像新娘臼朗。我一直安慰自己邻寿,他們只是感情好,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布视哑。 她就那樣靜靜地躺著绣否,像睡著了一般。 火紅的嫁衣襯著肌膚如雪挡毅。 梳的紋絲不亂的頭發(fā)上蒜撮,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機與錄音,去河邊找鬼段磨。 笑死取逾,一個胖子當著我的面吹牛,可吹牛的內容都是我干的苹支。 我是一名探鬼主播砾隅,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼债蜜!你這毒婦竟也來了晴埂?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤寻定,失蹤者是張志新(化名)和其女友劉穎儒洛,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狼速,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡晶丘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了唐含。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡沫浆,死狀恐怖捷枯,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情专执,我是刑警寧澤淮捆,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站本股,受9級特大地震影響攀痊,放射性物質發(fā)生泄漏。R本人自食惡果不足惜拄显,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一苟径、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧躬审,春花似錦棘街、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至博助,卻和暖如春险污,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背富岳。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工蛔糯, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留拯腮,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓渤闷,卻偏偏與公主長得像疾瓮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子飒箭,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內容