可取消的異步任務——FutureTask用法及解析

網(wǎng)上一搜取消正在執(zhí)行的異步任務,會出現(xiàn)很多Future戚嗅,FutureTask相關的文章雨涛,最近我也用了一下FutureTask枢舶,這里記錄一下使用中遇到的問題,最后結合源碼分析一下替久。

  1. FutureTask的用法凉泄。
  2. 開發(fā)中我遇到的問題。
  3. 結合FutureTask的源碼分析問題蚯根。

1. FutureTask的用法

在Java中后众,一般是通過繼承Thread類或者實現(xiàn)Runnable接口來創(chuàng)建多線程, Runnable接口不能返回結果颅拦,如果要獲取子線程的執(zhí)行結果蒂誉,一般都是在子線程執(zhí)行結束之后,通過Handler將結果返回到調(diào)用線程距帅,jdk1.5之后右锨,Java提供了Callable接口來封裝子任務,Callable接口可以獲取返回結果碌秸。

FutureTask相關的類或接口绍移,有RunnableCallable讥电,Future蹂窖,直接從Callable開始。

Callable接口

下面可以看一下Callable接口的定義:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable接口很簡單恩敌,是一個泛型接口瞬测,就是定義了一個call()方法,與Runnablerun()方法相比纠炮,這個有返回值月趟,泛型V就是要返回的結果類型,可以返回子任務的執(zhí)行結果抗碰。

Future接口

Future接口表示異步計算的結果狮斗,通過Future接口提供的方法绽乔,可以很方便的查詢異步計算任務是否執(zhí)行完成弧蝇,獲取異步計算的結果,取消未執(zhí)行的異步任務折砸,或者中斷異步任務的執(zhí)行看疗,接口定義如下:

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}
  1. cancel(boolean mayInterruptIfRunning):取消子任務的執(zhí)行,如果這個子任務已經(jīng)執(zhí)行結束睦授,或者已經(jīng)被取消两芳,或者不能被取消,這個方法就會執(zhí)行失敗并返回false去枷;如果子任務還沒有開始執(zhí)行怖辆,那么子任務會被取消是复,不會再被執(zhí)行;如果子任務已經(jīng)開始執(zhí)行了竖螃,但是還沒有執(zhí)行結束淑廊,根據(jù)mayInterruptIfRunning的值,如果mayInterruptIfRunning = true特咆,那么會中斷執(zhí)行任務的線程季惩,然后返回true,如果參數(shù)為false腻格,會返回true画拾,不會中斷執(zhí)行任務的線程。這個方法在FutureTask的實現(xiàn)中有很多值得關注的地方菜职,后面再細說青抛。
    需要注意,這個方法執(zhí)行結束酬核,返回結果之后脂凶,再調(diào)用isDone()會返回true。
  2. isCancelled()愁茁,判斷任務是否被取消蚕钦,如果任務執(zhí)行結束(正常執(zhí)行結束和發(fā)生異常結束,都算執(zhí)行結束)前被取消鹅很,也就是調(diào)用了cancel()方法嘶居,并且cancel()返回true,則該方法返回true促煮,否則返回false.
  3. isDone():判斷任務是否執(zhí)行結束邮屁,正常執(zhí)行結束,或者發(fā)生異常結束菠齿,或者被取消佑吝,都屬于結束,該方法都會返回true.
  4. V get():獲取結果绳匀,如果這個計算任務還沒有執(zhí)行結束芋忿,該調(diào)用線程會進入阻塞狀態(tài)。如果計算任務已經(jīng)被取消疾棵,調(diào)用get()會拋出CancellationException戈钢,如果計算過程中拋出異常,該方法會拋出ExecutionException是尔,如果當前線程在阻塞等待的時候被中斷了殉了,該方法會拋出InterruptedException
  5. V get(long timeout, TimeUnit unit):帶超時限制的get()拟枚,等待超時之后薪铜,該方法會拋出TimeoutException众弓。
FutureTask

FutureTask可以像Runnable一下,封裝異步任務隔箍,然后提交給Thread或線程池執(zhí)行田轧,然后獲取任務執(zhí)行結果。原因在于FutureTask實現(xiàn)了RunnableFuture接口鞍恢,RunnableFuture是什么呢傻粘,其實就是RunnableCallable的結合,它繼承自RunnableCallable帮掉。繼承關系如下:

public class FutureTask<V> implements RunnableFuture<V> {

public interface RunnableFuture<V> extends Runnable, Future<V> {

FutureTask使用

  1. FutureTask + Thread
    上面介紹過弦悉,F(xiàn)utureTask有Runnable接口和Callable接口的特征,可以被Thread執(zhí)行蟆炊。
//step1:封裝一個計算任務稽莉,實現(xiàn)Callable接口   
class Task implements Callable<Boolean> {

    @Override
    public Boolean call() throws Exception {
        try {
            for (int i = 0; i < 10; i++) {
                Log.d(TAG, "task......." + Thread.currentThread().getName() + "...i = " + i);
                //模擬耗時操作
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            Log.e(TAG, " is interrupted when calculating, will stop...");
            return false; // 注意這里如果不return的話,線程還會繼續(xù)執(zhí)行涩搓,所以任務超時后在這里處理結果然后返回
        }
        return true;
    }
}

//step2:創(chuàng)建計算任務污秆,作為參數(shù),傳入FutureTask
Task task = new Task();
FutureTask futureTask = new FutureTask(task);
//step3:將FutureTask提交給Thread執(zhí)行
Thread thread1 = new Thread(futureTask);
thread1.setName("task thread 1");
thread1.start();

//step4:獲取執(zhí)行結果昧甘,由于get()方法可能會阻塞當前調(diào)用線程良拼,如果子任務執(zhí)行時間不確定,最好在子線程中獲取執(zhí)行結果
try {
    // boolean result = (boolean) futureTask.get();
    boolean result = (boolean) futureTask.get(5, TimeUnit.SECONDS);
    Log.d(TAG, "result:" + result);
} catch (InterruptedException e) {
    Log.e(TAG, "守護線程阻塞被打斷...");
    e.printStackTrace();
} catch (ExecutionException e) {
    Log.e(TAG, "執(zhí)行任務時出錯...");
    e.printStackTrace();
} catch (TimeoutException e) {
    Log.e(TAG, "執(zhí)行超時...");
    futureTask.cancel(true);
    e.printStackTrace();
} catch (CancellationException e) {
    //如果線程已經(jīng)cancel了充边,再執(zhí)行get操作會拋出這個異常
    Log.e(TAG, "future已經(jīng)cancel了...");
    e.printStackTrace();
}
  1. Future + ExecutorService
//step1 ......
//step2:創(chuàng)建計算任務
Task task = new Task();
//step3:創(chuàng)建線程池庸推,將Callable類型的task提交給線程池執(zhí)行,通過Future獲取子任務的執(zhí)行結果
ExecutorService executorService = Executors.newCachedThreadPool();
final Future<Boolean> future = executorService.submit(task);
//step4:通過future獲取執(zhí)行結果
boolean result = (boolean) future.get();
  1. FutureTask + ExecutorService
//step1 ......
//step2 ......
//step3:將FutureTask提交給線程池執(zhí)行
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(futureTask);
//step4 ......

2. 開發(fā)中我遇到的問題浇冰。

FutureTask使用還是比較簡單的贬媒,FutureTaskRunnable,最大的區(qū)別有兩個肘习,一個是可以獲取執(zhí)行結果际乘,另一個是可以取消,使用方法可以參考以上步驟漂佩,不過我在項目中使用FutureTask出現(xiàn)了以下兩個問題:

  1. 有的情況下脖含,使用 futuretask.cancel(true)方法并不能真正的結束子任務執(zhí)行。

  2. FutureTaskget(long timeout, TimeUnit unit)方法仅仆,是等待timeout時間后器赞,獲取子線程的執(zhí)行結果垢袱,但是如果子任務執(zhí)行結束了墓拜,但是超時時間還沒有到,這個方法也會返回結果请契。

3. 結合FutureTask的源碼分析問題咳榜。

成員變量

下面夏醉,結合FutureTask的源碼,分析一下以上兩個問題涌韩。在此之前畔柔,先看一下FutureTask內(nèi)部比較值得關注的幾個成員變量。

  1. private volatile int state臣樱,state用來標識當前任務的運行狀態(tài)靶擦。FutureTask的所有方法都是圍繞這個狀態(tài)進行的,需要注意雇毫,這個值用volatile(易變的)來標記玄捕,如果有多個子線程在執(zhí)行FutureTask,那么它們看到的都會是同一個state棚放,有如下幾個值:

     private volatile int state;
     private static final int NEW          = 0;
     private static final int COMPLETING   = 1;
     private static final int NORMAL       = 2;
     private static final int EXCEPTIONAL  = 3;
     private static final int CANCELLED    = 4;
     private static final int INTERRUPTING = 5;
     private static final int INTERRUPTED  = 6;
    

NEW:表示這是一個新的任務枚粘,或者還沒有執(zhí)行完的任務,是初始狀態(tài)飘蚯。
COMPLETING:表示任務執(zhí)行結束(正常執(zhí)行結束馍迄,或者發(fā)生異常結束),但是還沒有將結果保存到outcome中局骤。是一個中間狀態(tài)攀圈。
NORMAL:表示任務正常執(zhí)行結束,并且已經(jīng)把執(zhí)行結果保存到outcome字段中峦甩。是一個最終狀態(tài)量承。
EXCEPTIONAL:表示任務發(fā)生異常結束,異常信息已經(jīng)保存到outcome中穴店,這是一個最終狀態(tài)撕捍。
CANCELLED:任務在新建之后,執(zhí)行結束之前被取消了泣洞,但是不要求中斷正在執(zhí)行的線程忧风,也就是調(diào)用了cancel(false),任務就是CANCELLED狀態(tài)球凰,這時任務狀態(tài)變化是NEW -> CANCELLED狮腿。
INTERRUPTING:任務在新建之后,執(zhí)行結束之前被取消了呕诉,并要求中斷線程的執(zhí)行缘厢,也就是調(diào)用了cancel(true),這時任務狀態(tài)就是INTERRUPTING甩挫。這是一個中間狀態(tài)贴硫。
INTERRUPTED:調(diào)用cancel(true)取消異步任務,會調(diào)用interrupt()中斷線程的執(zhí)行,然后狀態(tài)會從INTERRUPTING變到INTERRUPTED英遭。

狀態(tài)變化有如下4種情況:
NEW -> COMPLETING -> NORMAL --------------------------------------- 正常執(zhí)行結束的流程
NEW -> COMPLETING -> EXCEPTIONAL ---------------------執(zhí)行過程中出現(xiàn)異常的流程
NEW -> CANCELLED -------------------------------------------被取消间护,即調(diào)用了cancel(false)
NEW -> INTERRUPTING -> INTERRUPTED -------------被中斷,即調(diào)用了cancel(true)

  1. private Callable<V> callable挖诸,一個Callable類型的變量汁尺,封裝了計算任務,可獲取計算結果多律。從上面的用法中可以看到痴突,FutureTask的構造函數(shù)中,我們傳入的就是實現(xiàn)了Callable的接口的計算任務狼荞。

  2. private Object outcome苞也,Object類型的變量outcome,用來保存計算任務的返回結果粘秆,或者執(zhí)行過程中拋出的異常如迟。

  3. private volatile Thread runner,指向當前在運行Callable任務的線程攻走,runner在FutureTask中的賦值變化很值得關注殷勘,后面源碼會詳細介紹這個。

  4. private volatile WaitNode waiters昔搂,WaitNode是FutureTask的內(nèi)部類玲销,表示一個阻塞隊列,如果任務還沒有執(zhí)行結束摘符,那么調(diào)用get()獲取結果的線程會阻塞贤斜,在這個阻塞隊列中排隊等待。

成員函數(shù)

下面從構造函數(shù)說起逛裤,看一下FutureTask的源碼瘩绒。

1. 構造函數(shù)
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

FutureTask的第一個構造函數(shù),參數(shù)是Callable類型的變量带族。將傳入的參數(shù)賦值給this.callable锁荔,然后設置state狀態(tài)為NEW,表示這是新任務蝙砌。

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

FutureTask還有一個構造函數(shù)阳堕,接收Runnable類型的參數(shù),通過Executors.callable(runnable, result)將傳入的Runnableresult轉換成Callable類型择克。使用該構造方法恬总,可以定制返回結果。

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}

可以看一下Executors.callable(runnable, result)方法肚邢,這里通過適配器模式進行適配壹堰,創(chuàng)建一個RunnableAdapter適配器。

private static final class RunnableAdapter<T> implements Callable<T> {
    private final Runnable task;
    private final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

RunnableAdapterExecutors的內(nèi)部類,實現(xiàn)也比較簡單缀旁,實現(xiàn)了適配對象Callable接口记劈,在call()方法中執(zhí)行Runnablerun()勺鸦,然后返回result并巍。

2. 任務被執(zhí)行——run()

FutureTask封裝了計算任務,無論是提交給Thread執(zhí)行换途,或者線程池執(zhí)行懊渡,調(diào)用的都是FutureTaskrun()

public void run() {
    //1.判斷狀態(tài)是否是NEW军拟,不是NEW剃执,說明任務已經(jīng)被其他線程執(zhí)行,甚至執(zhí)行結束懈息,或者被取消了肾档,直接返回
    //2.調(diào)用CAS方法,判斷RUNNER為null的話辫继,就將當前線程保存到RUNNER中怒见,設置RUNNER失敗,就直接返回
    if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //3.執(zhí)行Callable任務姑宽,結果保存到result中
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                //3.1 如果執(zhí)行任務過程中發(fā)生異常遣耍,將調(diào)用setException()設置異常
                result = null;
                ran = false;
                setException(ex);
            }
            //3.2 任務正常執(zhí)行結束調(diào)用set(result)保存結果
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //4. 任務執(zhí)行結束,runner設置為null炮车,表示當前沒有線程在執(zhí)行這個任務了
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        //5. 讀取狀態(tài)舵变,判斷是否在執(zhí)行的過程中,被中斷了瘦穆,如果被中斷纪隙,處理中斷
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
  1. 首先,判斷state的值是不是NEW扛或,如果不是NEW瘫拣,說明線程已經(jīng)被執(zhí)行了,可能已經(jīng)執(zhí)行結束告喊,或者被取消了麸拄,直接返回。
  2. 這里其實是調(diào)用了Unsafe的CAS方法黔姜,讀取并設置runner的值拢切,將當前線程保存到runner中,表示當前正在執(zhí)行任務的線程秆吵』匆可以看到,這里設置的其實是RUNNER,和前面介紹的Thread類型的runner變量不一樣的主穗,那為什么還說設置的是runner的值泻拦?RUNNER在FutureTask中定義如下:
private static final long RUNNER;
//RUNNER是一個long類型的變量,指向runner字段的偏移地址忽媒,相當于指針
RUNNER = U.objectFieldOffset
        (FutureTask.class.getDeclaredField("runner"));

關于Unsafe的CAS方法争拐,簡單介紹一下,它提供了一種對runner進行原子操作的方法晦雨,原子操作架曹,意味著,這個操作不會被打斷闹瞧。runnervolatile字段修飾绑雄,只能保證,當多個子線程在執(zhí)行FutureTask的時候奥邮,它們讀取到的runner的值是同一個万牺,但是不能保證原子操作,所以很容易讀到臟數(shù)據(jù)(舉個例子:線程A準備對runner進行讀和寫操作洽腺,讀取到runner的值為null脚粟,這是,cpu切換執(zhí)行線程B已脓,線程B讀取到runner的值也是null珊楼,然后又切換到線程A執(zhí)行,線程A對runner賦值thread-A度液,此時runner的值已經(jīng)不再是null深浮,線程B讀取到的runner=null就是臟數(shù)據(jù))烤蜕,用Unsafe的CAS方法孕锄,來對runner進行讀寫座掘,就能保證原子操作。多個線程訪問run()方法時霹购,會在這里同步佑惠。

  1. 讀取callable變量,執(zhí)行call()齐疙,并獲取執(zhí)行結果膜楷。
    如果執(zhí)行call()的過程中發(fā)生異常,就調(diào)用setException()設置異常贞奋,setException()定義如下:
protected void setException(Throwable t) {
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = t;
        U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
//a. 調(diào)用Unsafe的CAS方法赌厅,state從NEW --> COMPLETING,這里的STATE和上面的RUNNER定義類似轿塔,指向state字段的偏移地址特愿。
//b. 將異常信息保存到outcome字段仲墨,state變成EXCEPTIONAL。
//c. 調(diào)用finishCompletion()揍障。
//NEW --> COMPLETING --> EXCEPTIONAL目养。

如果任務正常執(zhí)行結束,就調(diào)用set(result)保存結果毒嫡,定義如下:

protected void set(V v) {
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = v;
        U.putOrderedInt(this, STATE, NORMAL); // final state
        finishCompletion();
    }
}
//a. 和setException()類似癌蚁,state從NEW --> COMPLETING。
//b. 將正常執(zhí)行的結果result保存到outcome审胚,state變成NORMAL匈勋。
//c. 調(diào)用finishCompletion()礼旅。NEW --> COMPLETING --> NORMAL膳叨。
  1. 任務執(zhí)行結束,runner設置為null痘系,表示當前沒有線程在執(zhí)行這個任務了菲嘴。
  2. 讀取state狀態(tài),判斷是否在執(zhí)行的過程中被中斷了汰翠,如果被中斷龄坪,處理中斷,看一下這個中斷處理:
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

如果狀態(tài)是INTERRUPTING复唤,表示正在被中斷健田,這時就讓出線程的執(zhí)行權,給其他線程來執(zhí)行佛纫。

3. 獲取任務的執(zhí)行結果——get()

一般情況下妓局,執(zhí)行任務的線程和獲取結果的線程不會是同一個,當我們在主線程或者其他線程中呈宇,獲取計算任務的結果時好爬,就會調(diào)用get方法,如果這時計算任務還沒有執(zhí)行完成甥啄,調(diào)用get()的線程就會阻塞等待存炮。get()實現(xiàn)如下:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
  1. 讀取任務的執(zhí)行狀態(tài) state ,如果 state <= COMPLETING蜈漓,說明線程還沒有執(zhí)行完(run()中可以看到穆桂,只有任務執(zhí)行結束,或者發(fā)生異常的時候融虽,state才會被設置成COMPLETING)享完。
  2. 調(diào)用awaitDone(false, 0L),進入阻塞狀態(tài)衣形⊥障溃看一下awaitDone(false, 0L)的實現(xiàn):
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    long startTime = 0L;    // Special value 0L means not yet parked
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //1. 讀取狀態(tài)
        //1.1 如果s > COMPLETING姿鸿,表示任務已經(jīng)執(zhí)行結束,或者發(fā)生異常結束了倒源,就不會阻塞苛预,直接返回
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //1.2 如果s == COMPLETING,表示任務結束(正常/異常)笋熬,但是結果還沒有保存到outcome字段热某,當前線程讓出執(zhí)行權,給其他線程先執(zhí)行
        else if (s == COMPLETING)
            // We may have already promised (via isDone) that we are done
            // so never return empty-handed or throw InterruptedException
            Thread.yield();
        //2. 如果調(diào)用get()的線程被中斷了胳螟,就從等待的線程棧中移除這個等待節(jié)點昔馋,然后拋出中斷異常
        else if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        //3. 如果等待節(jié)點q=null,就創(chuàng)建一個等待節(jié)點
        else if (q == null) {
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode();
        }
        //4. 如果這個等待節(jié)點還沒有加入等待隊列,就加入隊列頭
        else if (!queued)
            queued = U.compareAndSwapObject(this, WAITERS,
                    q.next = waiters, q);
        //5. 如果設置了超時等待時間
        else if (timed) {
            //5.1 設置startTime,用于計算超時時間糖耸,如果超時時間到了秘遏,就等待隊列中移除當前節(jié)點
            final long parkNanos;
            if (startTime == 0L) { // first time
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            } else {
                long elapsed = System.nanoTime() - startTime;
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            // nanoTime may be slow; recheck before parking
            //5.2 如果超時時間還沒有到,而且任務還沒有結束嘉竟,就阻塞特定時間
            if (state < COMPLETING)
                LockSupport.parkNanos(this, parkNanos);
        }
        //6. 阻塞邦危,等待喚醒
        else
            LockSupport.park(this);
    }
}

這里主要有幾個步驟:
a. 讀取state,如果s > COMPLETING舍扰,表示任務已經(jīng)執(zhí)行結束倦蚪,或者發(fā)生異常結束了,此時边苹,調(diào)用get()的線程就不會阻塞陵且;如果s == COMPLETING,表示任務結束(正常/異常)个束,但是結果還沒有保存到outcome字段慕购,當前線程讓出執(zhí)行權,給其他線程先執(zhí)行播急。
b. 判斷Thread.interrupted()脓钾,如果調(diào)用get()的線程被中斷了,就從等待的線程棧(其實就是一個WaitNode節(jié)點隊列或者說是棧)中移除這個等待節(jié)點桩警,然后拋出中斷異常可训。
c. 判斷q == null,如果等待節(jié)點q為null捶枢,就創(chuàng)建等待節(jié)點握截,這個節(jié)點后面會被插入阻塞隊列。
d. 判斷queued烂叔,這里是將c中創(chuàng)建節(jié)點q加入隊列頭谨胞。使用Unsafe的CAS方法,對waiters進行賦值蒜鸡,waiters也是一個WaitNode節(jié)點胯努,相當于隊列頭牢裳,或者理解為隊列的頭指針。通過WaitNode可以遍歷整個阻塞隊列叶沛。
e. 之后蒲讯,判斷timed,這是從get()傳入的值灰署,表示是否設置了超時時間判帮。設置超時時間之后,調(diào)用get()的線程最多阻塞nanos溉箕,就會從阻塞狀態(tài)醒過來晦墙。如果沒有設置超時時間,就直接進入阻塞狀態(tài)肴茄,等待被其他線程喚醒晌畅。

awaitDone()方法內(nèi)部有一個無限循環(huán),看似有很多判斷独郎,比較難理解踩麦,其實這個循環(huán)最多循環(huán)3次枚赡。
假設Thread A執(zhí)行了get()獲取計算任務執(zhí)行結果氓癌,但是子任務還沒有執(zhí)行完,而且Thread A沒有被中斷贫橙,它會進行以下步驟贪婉。
step1:Thread A執(zhí)行了awaitDone(),1卢肃,2兩次判斷都不成立疲迂,Thread A判斷q=null,會創(chuàng)建一個WaitNode節(jié)點q莫湘,然后進入第二次循環(huán)尤蒿。
step2:第二次循環(huán),判斷4不成立幅垮,此時將step1創(chuàng)建的節(jié)點q加入隊列頭腰池。
step3:第三次循環(huán),判斷是否設置了超時時間忙芒,如果設置了超時時間示弓,就阻塞特定時間,否則呵萨,一直阻塞奏属,等待被其他線程喚醒。

  1. awaitDone()返回潮峦,最后調(diào)用report(int s)囱皿,這個后面再介紹勇婴。
4. 取消任務——cancel(boolean mayInterruptIfRunning)

通常調(diào)用cancel()的線程和執(zhí)行子任務的線程不會是同一個。當FutureTaskcancel(boolean mayInterruptIfRunning)方法被調(diào)用時嘱腥,如果子任務還沒有執(zhí)行咆耿,那么這個任務就不會執(zhí)行了,如果子任務已經(jīng)執(zhí)行爹橱,且mayInterruptIfRunning=true萨螺,那么執(zhí)行子任務的線程會被中斷(注意:這里說的是線程被中斷,不是任務被取消)愧驱,下面看一下這個方法的實現(xiàn):

public boolean cancel(boolean mayInterruptIfRunning) {
    //1.判斷state是否為NEW慰技,如果不是NEW,說明任務已經(jīng)結束或者被取消了组砚,該方法會執(zhí)行返回false
    //state=NEW時吻商,判斷mayInterruptIfRunning,如果mayInterruptIfRunning=true糟红,說明要中斷任務的執(zhí)行艾帐,NEW->INTERRUPTING
    //如果mayInterruptIfRunning=false,不需要中斷,狀態(tài)改為CANCELLED
    if (!(state == NEW &&
            U.compareAndSwapInt(this, STATE, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                //2.讀取當前正在執(zhí)行子任務的線程runner,調(diào)用t.interrupt()盆偿,中斷線程執(zhí)行
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //3.修改狀態(tài)為INTERRUPTED
                U.putOrderedInt(this, STATE, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

cancel()分析:

  1. 判斷state柒爸,保證state = NEW才能繼續(xù)cancel()的后續(xù)操作。state=NEWmayInterruptIfRunning=true事扭,說明要中斷任務的執(zhí)行捎稚,此時,NEW->INTERRUPTING求橄。然后讀取當前執(zhí)行任務的線程runner今野,調(diào)用t.interrupt(),中斷線程執(zhí)行罐农,NEW->INTERRUPTING->INTERRUPTED条霜,最后調(diào)用finishCompletion()
  2. 如果NEW->INTERRUPTING涵亏,那么cancel()方法宰睡,只是修改了狀態(tài),NEW->CANCELLED溯乒,然后直接調(diào)用finishCompletion()夹厌。

所以cancel(true)方法,只是調(diào)用t.interrupt()裆悄,此時矛纹,如果t因為sleep(),wait()等方法進入阻塞狀態(tài)光稼,那么阻塞的地方會拋出InterruptedException或南;如果線程正常運行孩等,需要結合Threadinterrupted()方法進行判斷,才能結束采够,否則肄方,cancel(true)不能結束正在執(zhí)行的任務。
這也就可以解釋前面我遇到的問題蹬癌,有的情況下权她,使用 futuretask.cancel(true)方法并不能真正的結束子任務執(zhí)行。

5. 子線程返回結果前的最后一步——finishCompletion()

前面多次出現(xiàn)過這個方法逝薪,set(V v)(保存執(zhí)行結果隅要,設置狀態(tài)為NORMAL),setException(Throwable t)(保存結果董济,設置狀態(tài)為EXCEPTIONAL)和cancel(boolean mayInterruptIfRunning)(設置狀態(tài)為CANCELLED/INTERRUPTED)步清,該方法在state變成最終態(tài)之后,會被調(diào)用虏肾。

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (U.compareAndSwapObject(this, WAITERS, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

finishCompletion()主要做了三件事情:

  1. 遍歷waiters等待隊列廓啊,調(diào)用LockSupport.unpark(t)喚醒等待返回結果的線程,釋放資源封豪。
  2. 調(diào)用done()谴轮,這個方法什么都沒有做,不過子類可以實現(xiàn)這個方法撑毛,做一些額外的操作书聚。
  3. 設置callable為null,callableFutureTask封裝的任務藻雌,任務執(zhí)行完,釋放資源斩个。

這里可以解答上面的第二個問題了胯杭。FutureTask的get(long timeout, TimeUnit unit)方法,表示阻塞timeout時間后受啥,獲取子線程的執(zhí)行結果做个,但是如果子任務執(zhí)行結束了,但是超時時間還沒有到滚局,這個方法也會返回結果居暖。因為任務執(zhí)行完之后,會遍歷阻塞隊列藤肢,喚醒阻塞的線程太闺。LockSupport.unpark(t)執(zhí)行之后,阻塞的線程會從LockSupport.park(this)/LockSupport.parkNanos(this, parkNanos)醒來嘁圈,然后會繼續(xù)進入awaitDone(boolean timed, long nanos)while循環(huán)省骂,此時蟀淮,state >= COMPLETING,然后從awaitDone()返回钞澳。此時怠惶,get()/get(long timeout, TimeUnit unit)會繼續(xù)執(zhí)行,return report(s)轧粟,上面介紹get()的時候沒介紹的方法策治。看一下report(int s)

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

其實就是讀取outcome兰吟,將state映射到最后返回的結果中览妖,s == NORMAL說明任務正常結束,返回正常結果揽祥,s >= CANCELLED讽膏,就拋出CancellationException

6.其他方法

FutureTask的還有兩個方法isCancelled()isDone()拄丰,其實就是判斷state府树,沒有過多的步驟。

public boolean isCancelled() {
    return state >= CANCELLED;
}

public boolean isDone() {
    return state != NEW;
}

總結

到此FutureTask分析完畢料按,其中感受最深的是Unsafe的用法奄侠,對于多線程共享的對象,采用volatile + Unsafe的方法载矿,代替鎖操作垄潮,進行同步;其次闷盔,是LockSupportpark(Object blocker)unpark(Thread thread)的使用

  1. park(Object blocker):線程進入阻塞狀態(tài)弯洗,告訴線程調(diào)度,當前線程不可用逢勾,直到線程再次獲取permit(允許)牡整;如果在調(diào)用park(Object blocker)之前,線程已經(jīng)獲得了permit(比如說溺拱,已經(jīng)調(diào)用了unpark(t))逃贝,那么該方法會返回。
  2. unpark(Thread thread):使得傳入的線程再次獲得permit.這里的permit可以理解為一個信號量迫摔。

LockSupport在這里的作用沐扳,類似于wait(),notify()/notifyAll(),關于二者的區(qū)別句占,可以看一下
Java的LockSupport.park()實現(xiàn)分析 沪摄。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子卓起,更是在濱河造成了極大的恐慌和敬,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件戏阅,死亡現(xiàn)場離奇詭異昼弟,居然都是意外死亡,警方通過查閱死者的電腦和手機奕筐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門舱痘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人离赫,你說我怎么就攤上這事芭逝。” “怎么了渊胸?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵旬盯,是天一觀的道長。 經(jīng)常有香客問我翎猛,道長胖翰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任切厘,我火速辦了婚禮萨咳,結果婚禮上,老公的妹妹穿的比我還像新娘疫稿。我一直安慰自己培他,他們只是感情好,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布遗座。 她就那樣靜靜地躺著舀凛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪员萍。 梳的紋絲不亂的頭發(fā)上腾降,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天,我揣著相機與錄音碎绎,去河邊找鬼。 笑死抗果,一個胖子當著我的面吹牛筋帖,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播冤馏,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼日麸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側響起代箭,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤墩划,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后嗡综,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體乙帮,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年极景,在試婚紗的時候發(fā)現(xiàn)自己被綠了察净。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡盼樟,死狀恐怖氢卡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情晨缴,我是刑警寧澤译秦,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布,位于F島的核電站击碗,受9級特大地震影響筑悴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜延都,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一雷猪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧晰房,春花似錦求摇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至猖吴,卻和暖如春摔刁,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背海蔽。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工共屈, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人党窜。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓拗引,卻偏偏與公主長得像,于是被迫代替她去往敵國和親幌衣。 傳聞我的和親對象是個殘疾皇子矾削,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345