FutureTask原理解析

Runnable

Runnable接口只有一個run函數改鲫,該函數沒有返回值。Thread類在調用start()函數后就是執(zhí)行的Runnable的run()方法林束。其聲明如下:

public interface Runnable {
 
    public abstract void run();
}

但是, 可以發(fā)現(xiàn), 這個方法并沒有任何返回值.
如果我們希望執(zhí)行某種類型的操作并拿到它的執(zhí)行結果, 該怎么辦呢?

Callable

Callable與Runnable的功能大致相似像棘,它有一個call方法,該方法有返回值诊县,其申明如下:

public interface Callable<V> {
    V call() throws Exception;
}

使用Callable接口解決了返回執(zhí)行結果的問題, 但是也帶來了一個新的問題:
如何獲得執(zhí)行結果?
有的同學可能就要說了, 這還不簡單? 直接拿不就好了, 看我的:

public static void main(String[] args) {
    Callable<String> myCallable = () -> "This is the results.";
    try {
        String result = myCallable.call();
        System.out.println("Callable 執(zhí)行的結果是: " + result);
    } catch (Exception e) {
        System.out.println("There is a exception.");
    }
}

這種方法確實可以, 但是它存在幾個問題:

  1. call方法是在當前線程中直接調用的, 無法利用多線程讲弄。
  2. call方法可能是一個特別耗時的操作, 這將導致程序停在myCallable.call()調用處, 無法繼續(xù)運行, 直到call方法返回。
  3. 如果call方法始終不返回, 我們沒辦法中斷它的運行依痊。

因此, 理想的操作應當是, 我們將call方法提交給另外一個線程執(zhí)行, 并在合適的時候, 判斷任務是否完成, 然后獲取線程的執(zhí)行結果或者撤銷任務, 這種思路的實現(xiàn)就是Future接口:

Future

Future就是對于具體的Runnable和Callable任務的執(zhí)行結果進行取消避除、查詢是否完成、獲取結果胸嘁、設置結果等瓶摆。其中get方法會阻塞知道任務返回結果。其聲明如下:

public interface Future<V> {
 
    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when <tt>cancel</tt> is called,
     * this task should never run.  If the task has already started,
     * then the <tt>mayInterruptIfRunning</tt> parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.     *
     */
    boolean cancel(boolean mayInterruptIfRunning);
 
    /**
     * Returns <tt>true</tt> if this task was cancelled before it completed
     * normally.
     */
    boolean isCancelled();
 
    /**
     * Returns <tt>true</tt> if this task completed.
     *
     */
    boolean isDone();
 
    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     */
    V get() throws InterruptedException, ExecutionException;
 
    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中聲明了5個方法性宏,下面依次解釋每個方法的作用:

  • cancel方法用來取消任務群井,如果取消任務成功則返回true,取消任務失敗則返回false毫胜。參數mayInterruptIfRunning表示是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務书斜,如果設置true,則表示可以取消正在執(zhí)行過程中的任務酵使。如果任務已經完成荐吉,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false口渔,如果任務正在執(zhí)行样屠,若mayInterruptIfRunning設置為true,則返回true,若mayInterruptIfRunning設置為false痪欲,則返回false悦穿;如果任務還沒有執(zhí)行,則無論mayInterruptIfRunning為true還是false业踢,肯定返回true栗柒。

  • isCancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功知举,則返回 true傍衡。

  • isDone方法表示任務是否已經完成,若任務完成负蠕,則返回true蛙埂;

  • get()方法用來獲取執(zhí)行結果,這個方法會產生阻塞遮糖,會一直等到任務執(zhí)行完畢才返回绣的;

  • get(long timeout, TimeUnit unit)用來獲取執(zhí)行結果屡江,如果在指定時間內惩嘉,還沒獲取到結果文黎,就直接返回null殿较。
    實際上Future提供了三種功能:

  1. 判斷任務是否完成淋纲;
  2. 中斷任務
    3 獲取任務的執(zhí)行結果

FutureTask

Future只是一個接口,無法直接創(chuàng)建對象洽瞬,所以有了FutureTask本涕,F(xiàn)utureTask實現(xiàn)了RunnableFuture<V>,而RunnableFuture又實現(xiàn)了Runnable和Future這兩個接口,其中Runnable接口對應了FutureTask名字中的Task伙窃,代表FutureTask本質上也是表征了一個任務菩颖。而Future接口就對應了FutureTask名字中的Future,表示了我們對于這個任務可以執(zhí)行某些操作对供,例如位他,判斷任務是否執(zhí)行完畢,獲取任務的執(zhí)行結果产场,取消任務的執(zhí)行等鹅髓。
所以簡單來說,F(xiàn)utureTask本質上就是一個“Task”京景,我們可以把它當做簡單的Runnable對象來使用窿冯。但是它又同時實現(xiàn)了Future接口,因此我們可以對它所代表的“Task”進行額外的控制操作确徙。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask還可以包裝Runnable和Callable醒串,由構造函數注入依賴。

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
 
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

Runnable注入會被Executors.callable()函數轉換為Callable類型鄙皇,也就是說FutureTask最終都是執(zhí)行Callable類型的任務伴逸,該適配函數實現(xiàn)如下:

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

    /**
     * A callable that runs given task and returns given result
     */
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

FutureTask實現(xiàn)了Runnable洲愤,因此它既可以通過Thread包裝來直接執(zhí)行,也可以提交給ExecutorService來執(zhí)行肛宋。

Unsafe

Unsafe類對于并發(fā)編程來說是個很重要的類,如果你稍微看過J.U.C里的源碼,你會發(fā)現(xiàn)到處充斥著這個類的方法調用。這個類的最大的特點在于遇西,它提供了硬件級別的CAS原子操作。

CAS可以說是實現(xiàn)了最輕量級的鎖压彭,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中的一個線程能成功地更新變量的值询一,而其他的線程將失敗。然而缩功,失敗的線程并不會被掛起脆丁。

CAS操作包含了三個操作數: 需要讀寫的內存位置跟压,進行比較的原值,擬寫入的新值。

在Unsafe類中笋庄,實現(xiàn)CAS操作的方法是: compareAndSwapXXX

public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
obj是我們要操作的目標對象
offset表示了目標對象中,對應的屬性的內存偏移量
expect是進行比較的原值
update是擬寫入的新值。

所以該方法實現(xiàn)了對目標對象obj中的某個成員變量(field)進行CAS操作的功能洽蛀。

那么峡碉,要怎么獲得目標field的內存偏移量offset呢头岔? Unsafe類為我們提供了一個方法:

public native long objectFieldOffset(Field field);

該方法的參數是我們要進行CAS操作的field對象,要怎么獲得這個field對象呢适掰?最直接的辦法就是通過反射了:

Class<?> k = FutureTask.class;
Field stateField = k.getDeclaredField("state");

這樣一波下來肌似,我們就能對FutureTask的state屬性進行CAS操作了.
除了compareAndSwapObject力细,Unsafe類還提供了更為具體的對int和long類型的CAS操作:

public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
public native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);

從方法簽名可以看出,這里只是把目標field的類型限定成int和long類型,而不是通用的Object.

原理講解

??????Runnable和Callable描述的都是抽象的計算任務馋艺,這些任務通常是有生命周期的,由于有些任務可能要執(zhí)行很長的時間踱蛀,因此通常希望可以取消這些任務崩泡。而Future用來表示一個任務的生命周期,并提供方法來判斷任務是否已經完成或取消谒所,以及獲取任務的結果等。Future是接口,無法直接創(chuàng)建對象村生,所以才有了FutureTask,而FutureTask之所以能支持cancel操作镇辉,是因為FutureTask有兩個很重要的屬性state和runner。

關于Java并發(fā)工具類的三板斧: 狀態(tài)屹逛,隊列,CAS
以這三個方面為切入點來看源碼,有助于我們快速的看清FutureTask的概貌:
狀態(tài)
在FutureTask中抛腕,狀態(tài)是由state屬性來表示的摔敛,不出所料刹悴,它是volatile類型的敢伸,確保了不同線程對它修改的可見性:

  private volatile int state; // 注意volatile關鍵字
    /**
     * 在構建FutureTask時設置,同時也表示內部成員callable已成功賦值,
     * 一直到worker thread完成FutureTask中的run();
     */
    private static final int NEW = 0;

    /**
     * woker thread在處理task時設定的中間狀態(tài)携丁,處于該狀態(tài)時,
     * 說明worker thread正準備設置result.
     */
    private static final int COMPLETING = 1;

    /**
     * 當設置result結果完成后肥橙,F(xiàn)utureTask處于該狀態(tài),代表過程結果,
     * 該狀態(tài)為最終狀態(tài)final state,(正確完成的最終狀態(tài))
     */
    private static final int NORMAL = 2;

    /**
     * 同上善茎,只不過task執(zhí)行過程出現(xiàn)異常,此時結果設值為exception,
     * 也是final state
     */
    private static final int EXCEPTIONAL = 3;

    /**
     * final state, 表明task被cancel(task還沒有執(zhí)行就被cancel的狀態(tài)).
     */
    private static final int CANCELLED = 4;

    /**
     * 中間狀態(tài)考阱,task運行過程中被interrupt時当娱,設置的中間狀態(tài)
     */
    private static final int INTERRUPTING = 5;

    /**
     * final state, 中斷完畢的最終狀態(tài),幾種情況震叙,下面具體分析
     */
    private static final int INTERRUPTED = 6;

state屬性是貫穿整個FutureTask的最核心的屬性戚丸,該屬性的值代表了任務在運行過程中的狀態(tài)夺颤,隨著任務的執(zhí)行,狀態(tài)將不斷地進行轉變宜狐,從上面的定義中可以看出俭驮,總共有7種狀態(tài):包括了1個初始態(tài)逸嘀,2個中間態(tài)和4個終止態(tài)。
state有四種可能的狀態(tài)轉換:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
image.png

任務的初始狀態(tài)都是NEW, 這一點是構造函數保證的,我們后面分析構造函數的時候再講;
任務的終止狀態(tài)有4種:

NORMAL:任務正常執(zhí)行完畢
EXCEPTIONAL:任務執(zhí)行過程中發(fā)生異常
CANCELLED:任務被取消
INTERRUPTED:任務被中斷

任務的中間狀態(tài)有2種:

COMPLETING 正在設置任務結果
INTERRUPTING 正在中斷運行任務的線程

值得一提的是,任務的中間狀態(tài)是一個瞬態(tài)晃跺,它非常的短暫揩局。而且任務的中間態(tài)并不代表任務正在執(zhí)行毫玖,而是任務已經執(zhí)行完了,正在設置最終的返回結果凌盯,所以可以這么說:
只要state不處于 NEW 狀態(tài)付枫,就說明任務已經執(zhí)行完畢

注意,這里的執(zhí)行完畢是指傳入的Callable對象的call方法執(zhí)行完畢掂榔,或者拋出了異常精肃。所以這里的COMPLETING的名字顯得有點迷惑性,它并不意味著任務正在執(zhí)行中后频,而意味著call方法已經執(zhí)行完畢毫痕,正在設置任務執(zhí)行的結果。

而將一個任務的狀態(tài)設置成終止態(tài)只有三種方法:

set
setException
cancel

隊列

在FutureTask中贺辰,隊列的實現(xiàn)是一個單向鏈表巢块,它表示所有等待任務執(zhí)行完畢的線程的集合铜跑。我們知道蔽豺,F(xiàn)utureTask實現(xiàn)了Future接口拾因,可以獲取“Task”的執(zhí)行結果图仓,那么如果獲取結果時,任務還沒有執(zhí)行完畢怎么辦呢檬果?那么獲取結果的線程就會在一個等待隊列中掛起,直到任務執(zhí)行完畢被喚醒颁虐。

在并發(fā)編程中使用隊列通常是將當前線程包裝成某種類型的數據結構扔到等待隊列中,我們先來看看隊列中的每一個節(jié)點是怎么個結構:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

相比于AQS的sync queue所使用的雙向鏈表中的Node翰灾,這個WaitNode要簡單多了箩祥,它只包含了一個記錄線程的thread屬性和指向下一個節(jié)點的next屬性寺滚。

值得一提的是,F(xiàn)utureTask中的這個單向鏈表是當做棧來使用的,確切來說是當做Treiber棧來使用的取逾,不了解Treiber棧是個啥的可以簡單的把它當做是一個線程安全的棧耗绿,它使用CAS來完成入棧出棧操作。為啥要使用一個線程安全的棧呢砾隅,因為同一時刻可能有多個線程都在獲取任務的執(zhí)行結果误阻,如果任務還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂晴埂,即完成入棧操作究反,這樣就有可能出現(xiàn)多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全儒洛,對于出棧的情況也是同理精耐。

由于FutureTask中的隊列本質上是一個Treiber棧,那么使用這個隊列就只需要一個指向棧頂節(jié)點的指針就行了琅锻,在FutureTask中卦停,就是waiters屬性:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事實上,它就是整個單向鏈表的頭節(jié)點恼蓬。
綜上惊完,F(xiàn)utureTask中所使用的隊列的結構如下:


image.png

CAS操作

CAS操作大多數是用來改變狀態(tài)的,在FutureTask中也不例外处硬。我們一般在靜態(tài)代碼塊中初始化需要CAS操作的屬性的偏移量:

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

從這個靜態(tài)代碼塊中我們也可以看出小槐,CAS操作主要針對3個屬性,包括state郁油、runner和waiters本股,說明這3個屬性基本是會被多個線程同時訪問的攀痊。其中state屬性代表了任務的狀態(tài)桐腌,waiters屬性代表了指向棧頂節(jié)點的指針拄显,這兩個我們上面已經分析過了。runner屬性代表了執(zhí)行FutureTask中的“Task”的線程案站。為什么需要一個屬性來記錄執(zhí)行任務的線程呢躬审?這是為了中斷或者取消任務做準備的,只有知道了執(zhí)行任務的線程是誰蟆盐,我們才能去中斷它承边。

定義完屬性的偏移量之后,接下來就是CAS操作本身了石挂。在FutureTask博助,CAS操作最終調用的還是Unsafe類的compareAndSwapXXX方法.

核心屬性

首先我們先來看看FutureTask的幾個核心屬性:

    /**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

可以看出,F(xiàn)utureTask的核心屬性只有5個:

state
callable
outcome
runner
waiters

關于 state waiters runner三個屬性我們上面已經解釋過了痹愚。剩下的callable屬性代表了要執(zhí)行的任務本身富岳,即FutureTask中的“Task”部分,為Callable類型拯腮,這里之所以用Callable而不用Runnable是因為FutureTask實現(xiàn)了Future接口窖式,需要獲取任務的執(zhí)行結果。

outcome屬性代表了任務的執(zhí)行結果或者拋出的異常动壤,為Object類型萝喘,也就是說outcome可以是任意類型的對象,所以當我們將正常的執(zhí)行結果返回給調用者時琼懊,需要進行強制類型轉換阁簸,返回由Callable定義的V類型。
這5個屬性綜合起來就完成了整個FutureTask的工作哼丈,使用關系如下

任務本尊:callable
任務的執(zhí)行者:runner
任務的結果:outcome
獲取任務的結果:state + outcome + waiters
中斷或者取消任務:state + runner + waiters

???? 創(chuàng)建一個FutureTask首先調用構造方法, 這是state設置為初始態(tài)NEW, 當創(chuàng)建完一個Task通常會提交給Executors或者Thread來執(zhí)行启妹,最終會調用Task的run方法,

 public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
 }
public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

???FutureTask的run方法首先會判斷任務的狀態(tài)削祈,如果任務狀態(tài)不是NEW,說明任務狀態(tài)已經改變翅溺,說明已經走了上面4種可能變化的一種,比如調用了cancel髓抑,此時狀態(tài)為Interrupting咙崎。
???如果狀態(tài)是NEW,判斷runner是否為null,如果為null吨拍,則把當前執(zhí)行任務的線程賦值給runner褪猛,如果runner不為null,說明已經有線程在執(zhí)行羹饰,直接返回伊滋。這里使用cas來賦值worker thread是保證多個線程同時提交同一個FutureTask時碳却,確保該FutureTask的run方法只被調用一次。

!UNSAFE.compareAndSwapObject(this, runnerOffset,                                   null, Thread.currentThread())
語義相當于
if (this.runner == null ){
    this.runner = Thread.currentThread();
}

????接著開始執(zhí)行任務笑旺,如果要執(zhí)行的任務不為空昼浦,并且state為New就執(zhí)行,調用Callable的call方法筒主,如果執(zhí)行成功則set結果锤躁,如果出現(xiàn)異常則setException它匕。最后把runner設為null。
set方法:如果現(xiàn)在的狀態(tài)是NEW就把狀態(tài)設置成cCOMPLETING,然后再設置成NORMAL,state的狀態(tài)變化就是:NEW->COMPLETING->NORMAL.最后執(zhí)行finishCompletion()方法。

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

這個方法一開始通過CAS操作將state屬性由原來的NEW狀態(tài)修改為COMPLETING狀態(tài)庐杨,我們在一開始介紹state狀態(tài)的時候說過掺冠,COMPLETING是一個非常短暫的中間態(tài)狮腿,表示正在設置執(zhí)行的結果杨名。
接下來我們再來看看發(fā)生了異常的版本setException(ex):

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

可見,除了將outcome屬性賦值為異常對象泽艘,以及將state的終止狀態(tài)修改為EXCEPTIONAL欲险,其余都和set方法類似。在方法的最后悉盆,都調用了 finishCompletion()來完成執(zhí)行結果的設置盯荤。那么我們就來看看 finishCompletion()干了點啥:

private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t); //喚醒阻塞隊列
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
 }

這個方法事實上完成了一個“善后”工作。我們先來看看if條件語句中的CAS操作:

UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)

該方法是將waiters屬性的值由原值設置為null, 我們知道焕盟,waiters屬性指向了Treiber棧的棧頂節(jié)點秋秤,可以說是代表了整個Treiber棧,將該值設為null的目的就是清空整個棧脚翘。如果設置不成功灼卢,則if語句塊不會被執(zhí)行,又進行下一輪for循環(huán)来农,而下一輪for循環(huán)的判斷條件又是waiters!=null 鞋真,由此我們知道,雖然最外層的for循環(huán)乍一看好像是什么遍歷節(jié)點的操作沃于,其實只是為了確保waiters屬性被成功設置成null涩咖,本質上相當于一個自旋操作。

將waiters屬性設置成null以后繁莹,接下了 for (;;)死循環(huán)才是真正的遍歷節(jié)點檩互,可以看出,循環(huán)內部就是一個普通的遍歷鏈表的操作咨演,我們前面講屬性的時候說過闸昨,Treiber棧里面存放的WaitNode代表了當前等待任務執(zhí)行結束的線程,這個循環(huán)的作用也正是遍歷鏈表中所有等待的線程,并喚醒他們饵较。

將Treiber棧中所有掛起的線程都喚醒后拍嵌,下面就是執(zhí)行done方法:

/**
 * Protected method invoked when this task transitions to state
 * {@code isDone} (whether normally or via cancellation). The
 * default implementation does nothing.  Subclasses may override
 * this method to invoke completion callbacks or perform
 * bookkeeping. Note that you can query status inside the
 * implementation of this method to determine whether this task
 * has been cancelled.
 */
protected void done() { }

這個方法是一個空方法,從注釋上看循诉,它是提供給子類覆寫的横辆,以實現(xiàn)一些任務執(zhí)行結束前的額外操作。

run方法最后還有一個finally塊:

finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}

在finally塊中打洼,我們將runner屬性置為null龄糊,并且檢查有沒有遺漏的中斷逆粹,如果發(fā)現(xiàn)s >= INTERRUPTING, 說明執(zhí)行任務的線程有可能被中斷了募疮,因為s >= INTERRUPTING 只有兩種可能,state狀態(tài)為INTERRUPTING和INTERRUPTED僻弹。

有的同學可能就要問了阿浓,咱前面已經執(zhí)行過的set方法或者setException方法不是已經將state狀態(tài)設置成NORMAL或者EXCEPTIONAL了嗎?

怎么會出現(xiàn)INTERRUPTING或者INTERRUPTED狀態(tài)呢蹋绽?別忘了芭毙,咱們在多線程的環(huán)境中,在當前線程執(zhí)行run方法的同時卸耘,有可能其他線程取消了任務的執(zhí)行退敦,此時其他線程就可能對state狀態(tài)進行改寫,這也就是我們在設置終止狀態(tài)的時候用putOrderedInt方法蚣抗,而沒有用CAS操作的原因——我們無法確信在設置state前是處于COMPLETING中間態(tài)還是INTERRUPTING中間態(tài)侈百。

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

可見該方法是一個自旋操作,如果當前的state狀態(tài)是INTERRUPTING翰铡,我們在原地自旋钝域,直到state狀態(tài)轉換成終止態(tài)。

run方法重點做了以下幾件事:

將runner屬性設置成當前正在執(zhí)行run方法的線程
調用callable成員變量的call方法來執(zhí)行任務
設置執(zhí)行結果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結果锭魔;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常例证,設置結果之前,先將state狀態(tài)設為中間態(tài)
對outcome的賦值完成后迷捧,設置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL)
喚醒Treiber棧中所有等待的線程
善后清理(waiters, callable织咧,runner設為null)
檢查是否有遺漏的中斷,如果有漠秋,等待中斷狀態(tài)完成笙蒙。

我們前面說“state只要不是NEW狀態(tài),就說明任務已經執(zhí)行完成了”就體現(xiàn)在這里膛堤,因為run方法中手趣,我們是在c.call()執(zhí)行完畢或者拋出了異常之后才開始設置中間態(tài)和終止態(tài)的。

Future接口的實現(xiàn)

Future接口一共定義了5個方法,我們一個個來看:
cancel(boolean mayInterruptIfRunning)

既然上面在分析run方法的最后绿渣,我們提到了任務可能被別的線程取消朝群,那我們就趁熱打鐵,看看怎么取消一個任務的執(zhí)行:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. 如果發(fā)起cancel時任務還沒有開始運行中符,則隨后任務就不會被執(zhí)行姜胖;
  2. 如果發(fā)起cancel時任務已經在運行了,則這時就需要看mayInterruptIfRunning參數了:
    2.1 如果mayInterruptIfRunning 為true, 則當前在執(zhí)行的任務會被中斷
    2.2 如果mayInterruptIfRunning 為false, 則可以允許正在執(zhí)行的任務繼續(xù)運行淀散,直到它執(zhí)行完

對于“任務已經執(zhí)行完成了或者任務已經被取消過了右莱,則cancel操作一定是失敗的(返回false)”這兩條,是通過簡單的判斷state值是否為NEW實現(xiàn)的档插,因為我們前面說過了慢蜓,只要state不為NEW,說明任務已經執(zhí)行完畢了郭膛。從代碼中可以看出晨抡,只要state不為NEW,則直接返回false则剃。

如果state還是NEW狀態(tài)耘柱,我們再往下看:

UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)

這一段是根據mayInterruptIfRunning的值將state的狀態(tài)由NEW設置成INTERRUPTING或者CANCELLED,當這一操作也成功之后棍现,就可以執(zhí)行后面的try語句了调煎,但無論怎么,該方法最后都返回了true己肮。

try {    // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();
        } finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
} finally {
    finishCompletion();
}

我們知道士袄,runner屬性中存放的是當前正在執(zhí)行任務的線程,因此朴肺,這個try塊的目的就是中斷當前正在執(zhí)行任務的線程窖剑,最后將state的狀態(tài)設為INTERRUPTED,當然戈稿,中斷操作完成后西土,還需要通過finishCompletion()來喚醒所有在Treiber棧中等待的線程。

我們現(xiàn)在總結一下鞍盗,cancel方法實際上完成以下兩種狀態(tài)轉換之一:

  • NEW -> CANCELLED (對應于mayInterruptIfRunning=false)
  • NEW -> INTERRUPTING -> INTERRUPTED (對應于mayInterruptIfRunning=true)

對于第一條路徑需了,雖說cancel方法最終返回了true,但它只是簡單的把state狀態(tài)設為CANCELLED般甲,并不會中斷線程的執(zhí)行肋乍。但是這樣帶來的后果是,任務即使執(zhí)行完畢了敷存,也無法設置任務的執(zhí)行結果墓造,因為前面分析run方法的時候我們知道堪伍,設置任務結果有一個中間態(tài),而這個中間態(tài)的設置觅闽,是以當前state狀態(tài)為NEW為前提的帝雇。

對于第二條路徑,則會中斷執(zhí)行任務的線程蛉拙,我們在倒回上面的run方法看看:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

雖然第二條路徑中斷了當前正在執(zhí)行的線程尸闸,但是,響不響應這個中斷是由執(zhí)行任務的線程自己決定的孕锄,更具體的說吮廉,這取決于c.call()方法內部是否對中斷進行了響應,是否將中斷異常拋出畸肆。

然而宦芦,值得一提的是,即使這里進入了catch (Throwable ex){}代碼塊恼除,setException(ex)的操作一定是失敗的踪旷,因為在我們取消任務執(zhí)行的線程中,我們已經先把state狀態(tài)設為INTERRUPTING了豁辉,而setException(ex)的操作要求設置前線程的狀態(tài)為NEW。

既然這個setException(ex)的操作一定是失敗的舀患,那放在這里有什么用呢徽级?事實上,這個setException(ex)是用來處理任務自己在正常執(zhí)行過程中產生的異常的聊浅,在我們沒有主動去cancel任務時餐抢,任務的state狀態(tài)在執(zhí)行過程中就會始終是NEW,如果任務此時自己發(fā)生了異常低匙,則這個異常就會被setException(ex)方法成功的記錄到outcome中旷痕。

反正無論如何,run方法最終都會進入finally塊顽冶,而這時候它會發(fā)現(xiàn)s >= INTERRUPTING,如果檢測發(fā)現(xiàn)s = INTERRUPTING强重,說明cancel方法還沒有執(zhí)行到中斷當前線程的地方志鹃,那就等待它將state狀態(tài)設置成INTERRUPTED.

說完了cancel,我們再來看看 isCancelled()方法,相較而言政模,它就簡單多了:

public boolean isCancelled() {
    return state >= CANCELLED;
}

那么state >= CANCELLED 包含了那些狀態(tài)呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED

image.png

isDone()
與 isCancelled方法類似顿颅,isDone方法也是簡單地通過state狀態(tài)來判斷缸濒。

public boolean isDone() {
    return state != NEW;
}

只要state狀態(tài)不是NEW,則任務已經執(zhí)行完畢了粱腻,因為state狀態(tài)不存在類似“任務正在執(zhí)行中”這種狀態(tài)庇配,即使是短暫的中間態(tài),也是發(fā)生在任務已經執(zhí)行完畢绍些,正在設置任務結果的時候捞慌。
????接下來分析FutureTask非常重要的get方法:

 public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
}

????首先判斷FutureTask的狀態(tài)是否為完成狀態(tài),如果是完成狀態(tài)柬批,說明已經執(zhí)行過set或setException方法啸澡,返回report(s)。

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

???可以看到氮帐,若FutureTask的狀態(tài)是Normal嗅虏,即正確執(zhí)行了set方法,get方法直接返回處理的結果揪漩,如果是取消狀態(tài)旋恼,即執(zhí)行了setException,則拋出CancellationException異常。

????如果get時奄容,F(xiàn)utureTask的狀態(tài)為未完成狀態(tài),則調用awaitDone方法進行阻塞产徊。awaitDone():

/**
     * Awaits completion or aborts on interrupt or timeout
     * 調用 awaitDone 進行線程的自旋
     * 自旋一般調用步驟
     *  1) 若支持線程中斷, 判斷當前的線程是否中斷
     *      a. 中斷, 退出自旋, 在線程隊列中移除對應的節(jié)點
     *      b. 進行下面的步驟
     *  2) 將當前的線程構造成一個 WaiterNode 節(jié)點, 加入到當前對象的隊列里面 (進行 cas 操作)
     *  3) 判斷當前的調用是否設置阻塞超時時間
     *      a. 有 超時時間, 調用 LockSupport.parkNanos; 阻塞結束后, 再次進行 自旋 , 還是到同一個if, 但 nanos = 0L, 刪除鏈表中對應的 WaiterdNode, 返回 state值
     *      b. 沒 超時時間, 調用 LockSupport.park
     *
     * @param timed true if use timed waits
     * @param nanos time to waits, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos) throws InterruptedException{
        // default timed = false, nanos = 0, so deadline = 0
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for(;;){
            // Thread.interrupted 判斷當前的線程是否中斷(調用兩次會清楚對應的狀態(tài)位)
            // Thread.interrupt 將當前的線程設置成中斷狀態(tài)
            if(Thread.interrupted()){
                removeWaiter(q, Thread.currentThread().getId());
                throw new InterruptedException();
            }

            int s = state;
            /** 1. s = NORMAL, 說明程序執(zhí)行成功, 直接獲取對應的 V
             */
            if(s > COMPLETING){
                if(q != null){
                    q.thread = null;
                }
                return s;
            }
            // s = COMPLETING ; 看了全部的代碼說明整個任務在處理的中間狀態(tài), s緊接著會進行改變
            // s 變成 NORMAL 或 EXCEPTION
            // 所以調用 yield 讓線程狀態(tài)變更, 重新進行CPU時間片競爭, 并且進行下次循環(huán)
            else if(s == COMPLETING){ // cannot time out yet
                Thread.yield();
            }
            // 當程序調用 get 方法時, 一定會調用一次下面的方法, 對 q 進行賦值
            else if(q == null){
                q = new WaitNode();
            }
            // 判斷有沒將當前的線程構造成一個節(jié)點, 賦值到對象對應的屬性里面
            // 第一次 waiters 一定是 null 的, 進行賦值的是一個以 q 為首節(jié)點的棧
            else if(!queued){
                queued = unsafe.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            }
            // 調用默認的 get()時, timed = false, 所以不執(zhí)行這一步
            else if(timed){
                // 進行阻塞時間的判斷, 第二次循環(huán)時, nanos = 0L, 直接 removeWaiter 返回現(xiàn)在 FutureTask 的 state
                nanos = deadline - System.nanoTime();
                if(nanos <= 0L){
                    removeWaiter(q, Thread.currentThread().getId());
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            // 進行線程的阻塞
            else{
                LockSupport.park(this);
            }
        }
    }

有一點我們先特別說明一下昂勒,F(xiàn)utureTask中會涉及到兩類線程,一類是執(zhí)行任務的線程舟铜,它只有一個戈盈,F(xiàn)utureTask的run方法就由該線程來執(zhí)行;一類是獲取任務執(zhí)行結果的線程谆刨,它可以有多個塘娶,這些線程可以并發(fā)執(zhí)行,每一個線程都是獨立的痊夭,都可以調用get方法來獲取任務的執(zhí)行結果刁岸。如果任務還沒有執(zhí)行完,則這些線程就需要進入Treiber棧中掛起她我,直到任務執(zhí)行結束虹曙,或者等待的線程自身被中斷迫横。

理清了這一點后,我們再來詳細看看awaitDone方法酝碳》猓可以看出,該方法的大框架是一個自旋操作疏哗,我們一段一段來看:

for (;;) {
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }
    // ...
}

我們先檢測當前線程是否被中斷了呛讲,這是因為get方法是阻塞式的,如果等待的任務還沒有執(zhí)行完返奉,則調用get方法的線程會被扔到Treiber棧中掛起等待贝搁,直到任務執(zhí)行完畢。但是衡瓶,如果任務遲遲沒有執(zhí)行完畢徘公,則我們也有可能直接中斷在Treiber棧中的線程,以停止等待哮针。

當檢測到線程被中斷后关面,我們調用了removeWaiter:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        ...
    }
}

removeWaiter的作用是將參數中的node從等待隊列(即Treiber棧)中移除。如果此時線程還沒有進入Treiber棧十厢,則 q=null等太,那么removeWaiter(q)啥也不干。在這之后蛮放,我們就直接拋出了InterruptedException異常缩抡。

接著往下看:

for (;;) {
    /*if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }*/
    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this);
}
  • 如果任務已經進入終止態(tài)(s > COMPLETING),我們就直接返回任務的狀態(tài);
  • 否則包颁,如果任務正在設置執(zhí)行結果(s == COMPLETING)瞻想,我們就讓出當前線程的CPU資源繼續(xù)等待
  • 否則,就說明任務還沒有執(zhí)行娩嚼,或者任務正在執(zhí)行過程中蘑险,那么這時,如果q現(xiàn)在還為null, 說明當前線程還沒有進入等待隊列岳悟,于是我們新建了一個WaitNode, WaitNode的構造函數我們之前已經看過了佃迄,就是生成了一個記錄了當前線程的節(jié)點;
  • 如果q不為null贵少,說明代表當前線程的WaitNode已經被創(chuàng)建出來了呵俏,則接下來如果queued=false,表示當前線程還沒有入隊滔灶,所以我們執(zhí)行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

這行代碼的作用是通過CAS操作將新建的q節(jié)點添加到waiters鏈表的頭節(jié)點之前普碎,其實就是Treiber棧的入棧操作,寫的還是很簡潔的宽气,一行代碼就搞定了.
這個CAS操作就是為了保證同一時刻如果有多個線程在同時入棧随常,則只有一個能夠操作成功潜沦,也即Treiber棧的規(guī)范。

如果以上的條件都不滿足绪氛,則再接下來因為現(xiàn)在是不帶超時機制的get唆鸡,timed為false,則else if代碼塊跳過枣察,然后來到最后一個else, 把當前線程掛起争占,此時線程就處于阻塞等待的狀態(tài)。

至此序目,在任務沒有執(zhí)行完畢的情況下臂痕,獲取任務執(zhí)行結果的線程就會在Treiber棧中被LockSupport.park(this)掛起了。

那么這個掛起的線程什么時候會被喚醒呢猿涨?有兩種情況:

  1. 任務執(zhí)行完畢了握童,在finishCompletion方法中會喚醒所有在Treiber棧中等待的線程
  2. 等待的線程自身因為被中斷等原因而被喚醒。

我們接下來就繼續(xù)看看線程被喚醒后的情況叛赚,此時澡绩,線程將回到for(;;)循環(huán)的開頭,繼續(xù)下一輪循環(huán):

for (;;) {
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this); // 掛起的線程從這里被喚醒
}

首先自然還是檢測中斷俺附,所不同的是肥卡,此時q已經不為null了,因此在有中斷發(fā)生的情況下事镣,在拋出中斷之前步鉴,多了一步removeWaiter(q)操作,該操作是將當前線程從等待的Treiber棧中移除璃哟,相比入棧操作氛琢,這個出棧操作要復雜一點,這取決于節(jié)點是否位于棧頂随闪。下面我們來仔細分析這個出棧操作:

removeWaiter
實現(xiàn)并發(fā)鏈表中移除隊列節(jié)點的一個操作

private void  removeWaiter(WaitNode node, long i){
        logger.info("removeWaiter node"  + node +", i: "+ i +" begin");
        if(node != null){
            node.thread = null; // 將移除的節(jié)點的thread=null, 為移除做標示

            retry:
            for(;;){ // restart on removeWaiter race
                for(WaitNode pred = null, q = waiters, s; q != null; q = s){
                    logger.info("q : " + q +", i:"+i);
                    s = q.next;
                    // 通過 thread 判斷當前 q 是否是需要移除的 q節(jié)點
                    if(q.thread != null){
                        pred = q;
                        logger.info("q : " + q +", i:"+i);
                    }
                    // 何時執(zhí)行到這個if條件 ?
                    // hehe 只有第一步不滿足時, 也就是q.thread=null (p就是應該移除的節(jié)點)
                    else if(pred != null){
                        logger.info("q : " + q +", i:"+i);
                        pred.next = s; // 將前一個節(jié)點的 next 指向當前節(jié)點的 next 節(jié)點
                        // pred.thread == null 這種情況是在多線程進行并發(fā) removeWaiter 時產生的
                        // 而此時真好移除節(jié)點 node 和 pred, 所以loop跳到retry, 在進行一次
                        if(pred.thread == null){ // check for race
                            continue retry;
                        }
                    }
                    // 這一步何時操作呢?
                    // 想想 若p是頭節(jié)點
                    else if(!unsafe.compareAndSwapObject(this, waitersOffset, q, s)){
                        logger.info("q : " + q +", i:"+i);
                        continue retry; // 這一步還是 cheak for race
                    }
                }
                break ;
            }
            logger.info("removeWaiter node"  + node +", i: "+ i +" end");
        }
    }

首先艺沼,我們把要出棧的WaitNode的thread屬性設置為null, 這相當于一個標記,是我們后面在waiters鏈表中定位該節(jié)點的依據蕴掏。

(1) 要移除的節(jié)點就在棧頂

我們先來看看該節(jié)點就位于棧頂的情況,這說明在該節(jié)點入棧后调鲸,并沒有別的線程再入棧了盛杰。由于一開始我們就將該節(jié)點的thread屬性設為了null,因此藐石,前面的q.thread != null 和 pred != null都不滿足即供,我們直接進入到最后一個else if 分支:

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
    continue retry;

這一段是棧頂節(jié)點出棧的操作,和入棧類似于微,采用了CAS比較逗嫡,將棧頂元素設置成原棧頂節(jié)點的下一個節(jié)點青自。

值得注意的是,當CAS操作不成功時驱证,程序會回到retry處重來延窜,但即使CAS操作成功了,程序依舊會遍歷完整個鏈表抹锄,找尋node.thread == null 的節(jié)點逆瑞,并將它們一并從鏈表中剔除。

(2) 要移除的節(jié)點不在棧頂
當要移除的節(jié)點不在棧頂時伙单,我們會一直遍歷整個鏈表获高,直到找到q.thread == null的節(jié)點,找到之后吻育,我們將進入

else if (pred != null) {
    pred.next = s;
    if (pred.thread == null) // check for race
        continue retry;
}

這是因為節(jié)點不在棧頂念秧,則其必然是有前驅節(jié)點pred的,這時布疼,我們只是簡單的讓前驅節(jié)點指向當前節(jié)點的下一個節(jié)點摊趾,從而將目標節(jié)點從鏈表中剔除。

注意缎除,后面多加的那個if判斷是很有必要的严就,因為removeWaiter方法并沒有加鎖,所以可能有多個線程在同時執(zhí)行器罐,WaitNode的兩個成員變量thread和next都被設置成volatile梢为,這保證了它們的可見性,如果我們在這時發(fā)現(xiàn)了pred.thread == null轰坊,那就意味著它已經被另一個線程標記了铸董,將在另一個線程中被拿出waiters鏈表,而我們當前目標節(jié)點的原后繼節(jié)點現(xiàn)在是接在這個pred節(jié)點上的肴沫,因此粟害,如果pred已經被其他線程標記為要拿出去的節(jié)點,我們現(xiàn)在這個線程再繼續(xù)往后遍歷就沒有什么意義了颤芬,所以這時就調到retry處悲幅,從頭再遍歷。

如果pred節(jié)點沒有被其他線程標記站蝠,那我們就接著往下遍歷汰具,直到整個鏈表遍歷完。

在該方法中菱魔,會傳入一個需要移除的節(jié)點留荔,我們會將這個節(jié)點的thread屬性設置成null,以標記該節(jié)點澜倦。然后無論如何聚蝶,我們會遍歷整個鏈表杰妓,清除那些被標記的節(jié)點(只是簡單的將節(jié)點從鏈表中剔除)。如果要清除的節(jié)點就位于棧頂碘勉,則還需要注意重新設置waiters的值巷挥,指向新的棧頂節(jié)點。所以可以看出恰聘,雖說removeWaiter方法傳入了需要剔除的節(jié)點句各,但是事實上它可能剔除的不止是傳入的節(jié)點,而是所有已經被標記了的節(jié)點晴叨,這樣不僅清除操作容易了些(不需要專門去定位傳入的node在哪里)凿宾,而且提升了效率(可以同時清除所有已經被標記的節(jié)點)。

我們再回到awaitDone方法里:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q); // 剛剛分析到這里了兼蕊,我們接著往下看
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

如果線程不是因為中斷被喚醒初厚,則會繼續(xù)往下執(zhí)行,此時會再次獲取當前的state狀態(tài)孙技。所不同的是产禾,此時q已經不為null, queued已經為true了,所以已經不需要將當前節(jié)點再入waiters棧了牵啦。

至此我們知道亚情,除非被中斷,否則get方法會在原地自旋等待(用的是Thread.yield哈雏,對應于s == COMPLETING)或者直接掛起(對應任務還沒有執(zhí)行完的情況)楞件,直到任務執(zhí)行完成。而我們前面分析run方法和cancel方法的時候知道裳瘪,在run方法結束后土浸,或者cancel方法取消完成后,都會調用finishCompletion()來喚醒掛起的線程彭羹,使它們得以進入下一輪循環(huán)黄伊,獲取任務執(zhí)行結果。

最后派殷,等awaitDone函數返回后还最,get方法返回了report(s),以根據任務的狀態(tài)毡惜,匯報執(zhí)行結果:

@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

可見憋活,report方法非常簡單,它根據當前state狀態(tài)虱黄,返回正常執(zhí)行的結果,或者拋出指定的異常吮成。

get(long timeout, TimeUnit unit)

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

它和上面不帶超時時間的get方法很類似橱乱,只是在awaitDone方法中多了超時檢測:

else if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
        removeWaiter(q);
        return state;
    }
    LockSupport.parkNanos(this, nanos);
}

即辜梳,如果指定的超時時間到了,則直接返回泳叠,如果返回時作瞄,任務還沒有進入終止狀態(tài),則直接拋出TimeoutException異常危纫,否則就像get()方法一樣宗挥,正常的返回執(zhí)行結果。

小結

FutureTask實現(xiàn)了Runnable和Future接口种蝶,它表示了一個帶有任務狀態(tài)和任務結果的任務契耿,它的各種操作都是圍繞著任務的狀態(tài)展開的,值得注意的是螃征,在所有的7個任務狀態(tài)中搪桂,只要不是NEW狀態(tài),就表示任務已經執(zhí)行完畢或者不再執(zhí)行了盯滚,并沒有表示“任務正在執(zhí)行中”的狀態(tài)踢械。

除了代表了任務的Callable對象、代表任務執(zhí)行結果的outcome屬性魄藕,F(xiàn)utureTask還包含了一個代表所有等待任務結束的線程的Treiber棧内列,這一點其實和各種鎖的等待隊列特別像,即如果拿不到鎖背率,則當前線程就會被扔進等待隊列中话瞧;這里則是如果任務還沒有執(zhí)行結束,則所有等待任務執(zhí)行完畢的線程就會被扔進Treiber棧中退渗,直到任務執(zhí)行完畢了移稳,才會被喚醒。

FutureTask雖然為我們提供了獲取任務執(zhí)行結果的途徑会油,遺憾的是个粱,在獲取任務結果時,如果任務還沒有執(zhí)行完成翻翩,則當前線程會自旋或者掛起等待都许,這和我們實現(xiàn)異步的初衷是相違背的,我們后面將繼續(xù)介紹另一個同步工具類CompletableFuture, 它解決了這個問題嫂冻。

FutureTask的使用場景

????FutureTask可用于異步獲取執(zhí)行結果或者取消執(zhí)行任務的場景胶征,通過傳入Runnable或者Callable的任務給FutureTask,直接調用其run方法或者放入線程池執(zhí)行桨仿,之后通過FutureTask的get方法異步獲取執(zhí)行結果睛低,它非常適合用于耗時的計算,主線程可以在完成自己的任務后,再去獲取結果钱雷,并且FutureTask還可以確保即使調動了多次run方法骂铁,它都只執(zhí)行一次Runnable或者Callable任務,或者通過cancel取消FuturTask的執(zhí)行罩抗。

public class FutureTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println("====進入主線程執(zhí)行任務");

        //通過線程池管理多線程
        ExecutorService threadPool = Executors.newCachedThreadPool();

        //線程池提交一個異步任務
        System.out.println("====提交異步任務");
        Future<HashMap<String,String>> future = threadPool.submit(new Callable<HashMap<String,String>>() {

            @Override
            public HashMap<String,String> call() throws Exception {

                System.out.println("異步任務開始執(zhí)行....");
                Thread.sleep(2000);
                System.out.println("異步任務執(zhí)行完畢拉庵,返回執(zhí)行結果!!!!");

                return new HashMap<String,String>(){
                    {this.put("futureKey", "成功獲取future異步任務結果");}
                };
            }

        });

        System.out.println("====提交異步任務之后,立馬返回到主線程繼續(xù)往下執(zhí)行");
        Thread.sleep(1000);

        System.out.println("====此時需要獲取上面異步任務的執(zhí)行結果");

        boolean flag = true;
        while(flag){
            //異步任務完成并且未被取消套蒂,則獲取返回的結果
            if(future.isDone() && !future.isCancelled()){
                HashMap<String,String> futureResult = future.get();
                System.out.println("====異步任務返回的結果是:"+futureResult.get("futureKey"));
                flag = false;
            }
        }

        //關閉線程池
        if(!threadPool.isShutdown()){
            threadPool.shutdown();
        }
    }
}
====進入主線程執(zhí)行任務
====提交異步任務
====提交異步任務之后钞支,立馬返回到主線程繼續(xù)往下執(zhí)行
異步任務開始執(zhí)行....
====此時需要獲取上面異步任務的執(zhí)行結果
異步任務執(zhí)行完畢,返回執(zhí)行結果!!!!
====異步任務返回的結果是:成功獲取future異步任務結果
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末操刀,一起剝皮案震驚了整個濱河市烁挟,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌馍刮,老刑警劉巖信夫,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異卡啰,居然都是意外死亡静稻,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門匈辱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來振湾,“玉大人,你說我怎么就攤上這事亡脸⊙禾拢” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵浅碾,是天一觀的道長大州。 經常有香客問我,道長垂谢,這世上最難降的妖魔是什么厦画? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮滥朱,結果婚禮上根暑,老公的妹妹穿的比我還像新娘。我一直安慰自己徙邻,他們只是感情好排嫌,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著缰犁,像睡著了一般淳地。 火紅的嫁衣襯著肌膚如雪怖糊。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天薇芝,我揣著相機與錄音蓬抄,去河邊找鬼。 笑死夯到,一個胖子當著我的面吹牛,可吹牛的內容都是我干的饮亏。 我是一名探鬼主播耍贾,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼路幸!你這毒婦竟也來了荐开?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤简肴,失蹤者是張志新(化名)和其女友劉穎晃听,沒想到半個月后,有當地人在樹林里發(fā)現(xiàn)了一具尸體砰识,經...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡能扒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了辫狼。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片初斑。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖膨处,靈堂內的尸體忽然破棺而出见秤,到底是詐尸還是另有隱情,我是刑警寧澤真椿,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布鹃答,位于F島的核電站,受9級特大地震影響突硝,放射性物質發(fā)生泄漏测摔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一狞换、第九天 我趴在偏房一處隱蔽的房頂上張望避咆。 院中可真熱鬧,春花似錦修噪、人聲如沸查库。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽樊销。三九已至整慎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間围苫,已是汗流浹背裤园。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留剂府,地道東北人拧揽。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像腺占,于是被迫代替她去往敵國和親淤袜。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內容