24. 并發(fā)終結(jié)之FutureTask

關(guān)于Thread和Runnable的區(qū)別:

1)從對象編程來說:Thread是繼承;Runnable是組合,會比繼承耦合性低,更加靈活捉撮。
2)從對象共享角度:Runnable實例可以由過個線程實例共享彤断,會產(chǎn)生并發(fā)問題野舶。
3)對象創(chuàng)建成本:Thread在創(chuàng)建的時候JVM就會為其分配調(diào)用棧空間宰衙,內(nèi)核線程等資源平道;而Runnable是普通的類,作為參數(shù)傳給Thread供炼,所以Runnable創(chuàng)建成本相對較低一屋。

關(guān)于用戶線程(User)和守護(hù)線程(Daemon):

用戶線程會阻止JVM正常停止;
守護(hù)線程不會影響JVM正常停止袋哼,所以守護(hù)線程通常用于執(zhí)行一些重要性不高的任務(wù)冀墨。

關(guān)于Callable和Runnable的區(qū)別:

Callable方式需要FutureTask實現(xiàn)類的支持,用于接收運(yùn)算結(jié)果涛贯。
FutureTask是Future接口的實現(xiàn)類诽嘉,且FutureTask也可用于閉鎖的操作,因為get() 會阻塞當(dāng)前線程直到Callable返回結(jié)果弟翘。

FutureTask

FutureTask提供了支持cancel的異步計算方式虫腋,它實現(xiàn)了RunnableFuture接口,而RunnableFuture接口繼承了Runnable和Future接口衅胀。
FutureTask可以用來包裝一個Callable和Runnable對象岔乔,且因為實現(xiàn)了Runnable,所以FutureTask可以被提交到線程池處理滚躯。

源碼分析

因為FutureTask實現(xiàn)了Future接口,所以它會實現(xiàn)Future接口的相關(guān)方法嘿歌,比如說get(), cancel()等等掸掏。

成員變量以及構(gòu)造函數(shù)

有三個volatile成員變量,會通過UNSAFE類來進(jìn)行CAS操作宙帝。
另外定義了state變量對應(yīng)的7中狀態(tài)丧凤。
callable變量意味著FutureTask會將Runnable也封裝成Callable來處理。
outcome則是返回這或者異常的信息

private volatile int state;
private static final int NEW          = 0;//初始是NEW
private static final int COMPLETING   = 1;//在set()和setException()里面先CAS更新成COMPLETING榜配,加鎖操作
private static final int NORMAL       = 2;//set()方法執(zhí)行成功從COMPLETING到NORMAL
private static final int EXCEPTIONAL  = 3;//setException()方法執(zhí)行成功從COMPLETING到EXCEPTIONAL
private static final int CANCELLED    = 4;//cancel(false)的時候
private static final int INTERRUPTING = 5;//cancel(true)的時候
private static final int INTERRUPTED  = 6;//cancel(true)最終狀態(tài)

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}
構(gòu)造函數(shù)

FutureTask內(nèi)部有Callable類型的成員變量忽孽,所以Runnable會通過一個適配器RunnableAdapter实撒,轉(zhuǎn)換成Callable,內(nèi)部還是調(diào)用的Runnable的run()方法仍侥。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
//接受Runnable的構(gòu)造函數(shù)
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
//適配器將Runnable轉(zhuǎn)換成Callable
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
//Adapter類,Callable的call方法調(diào)用Runnable的run方法鸳君。
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
run()

run()方法主要是調(diào)用Callable的call方法农渊,如果有異常則通過setException(Throwable t)設(shè)置異常;如果沒有異常或颊,則通過set(V result)方法砸紊,來設(shè)置返回值传于。且這兩個set方法最終都會調(diào)用finishCompletion()來unpark喚醒WaitNode節(jié)點里的等待線程去獲得結(jié)果。
run()方法涉及到的方法有setException(), set(), finishCompletion(), handlePossibleCancellationInterrupt()醉顽。
涉及到的state有COMPLETING, EXCEPTIONAL, NORMAL
且在call()方法返回之前沼溜,都是NEW狀態(tài),這樣cancel()才會有機(jī)會游添。
且run方法最好檢查INTERRUPTING狀態(tài)盛末,會在handlePossibleCancellationInterrupt自旋直到cancel()方法結(jié)束,state變成INTERRUPTED狀態(tài)(下面代碼里分析)否淤。

public void run() {
    //如果state不是NEW或者CAS更新runner成員變量失敗悄但,則直接return。
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //直接調(diào)用Callable的call方法并拿到result
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                //如果call()方法拋出異常石抡,ran=false
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
//如果拋出異常檐嚣,則調(diào)用該方法,更新state狀態(tài)到COMPLETING啰扛,
//將Throwable賦值給outcome變量嚎京,再更新state到EXCEPTIONAL.
//最后finishCompletion()喚醒WaitNode中等待線程節(jié)點
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        //因為上面更新到COMPLETING成功,這邊更新EXCEPTIONAL則不需要CAS操作隐解,而是putOrderedInt內(nèi)存操作
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        //先通過CAS獲得更新資格鞍帝,將waiter變量更新為null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    //拿到q對應(yīng)的線程,更新為null煞茫,并喚醒該線程
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                //然后操作q的后繼節(jié)點
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    //done()方法由子類實現(xiàn)
    done();

    callable = null;        // to reduce footprint
}
//如果沒有異常帕涌,更新state到COMPLETING先,然后將result設(shè)置到outcome续徽,更新state到NORMAL
//最后finishCompletion喚醒等待線程
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}
//final的時候會檢查一下state是不是被中斷蚓曼,如果一直在中斷過程中,則當(dāng)前線程yield讓出CPU
//這邊自旋等待調(diào)用中斷的線程執(zhí)行完畢钦扭,因為此時run方法已經(jīng)結(jié)束纫版,runner也被重置為null
//   所以別的線程可能有機(jī)會提交這個FutureTask,而執(zhí)行run方法客情,這時候cancel可能被應(yīng)用到不同的Task上其弊。
//   所以這里要自旋直到cancel()方法執(zhí)行結(jié)束
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}
cancel()

cancel的時候有幾種情況:
1.任務(wù)還沒開始,直接返回false
2.任務(wù)已經(jīng)開始:
2.1調(diào)用cancel(false)膀斋,就是最后調(diào)用finishCompletion()來喚醒等待線程
2.2調(diào)用cancel(true)梭伐,則通過runner獲得當(dāng)前運(yùn)行線程,調(diào)用運(yùn)行線程的Interrupt()方法來設(shè)置目標(biāo)線程的中斷標(biāo)記位為true
這里涉及到的state狀態(tài)有INTERRUPTING, CANCELLED, INTERRUPTED概页。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          //如果參數(shù)是true籽御,則更新為INTERRUPTING狀態(tài),否則CANCELLED狀態(tài)
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();//通過設(shè)置中斷標(biāo)志位來控制目標(biāo)線程的生命周期
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
get()

如果state的狀態(tài)還沒到COMPLETING,就在awaitDone()里面通過LockSupport.park()掛起當(dāng)前線程技掏,直到run()方法執(zhí)行結(jié)束喚醒等待線程铃将。
異常:
1.get()方法響應(yīng)異常中斷,會拋出InterruptedException哑梳。
2.且如果是get(long timeout, TimeUnit unit)方法劲阎,還會拋出TimeoutException。
3.另外如果Callable的call方法沒有捕捉異常而拋出異常鸠真,則get()方法會拋出ExecutionException悯仙。FutureTask的run()方法在執(zhí)行Callable的call()方法時,會將call拋出的異常捕獲包裝成ExecutionException吠卷,這意味著call()方法出現(xiàn)異常锡垄,不會直接導(dǎo)致執(zhí)行線程運(yùn)行結(jié)束(相比正常的Thread的run()方法拋出異常可能導(dǎo)致執(zhí)行線程提前終止生命周期)祭隔。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //如果state還沒到COMPLETING货岭,則開始掛起當(dāng)前線程
        s = awaitDone(false, 0L);
    //run()方法結(jié)束之后會喚醒當(dāng)前線程,去拿到執(zhí)行結(jié)果疾渴。
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //判斷當(dāng)前線程是否被中斷
        if (Thread.interrupted()) {
            //如果被中斷千贯,則需要把waitNode的節(jié)點都垃圾回收掉,拋出InterruptedException
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //如果已經(jīng)結(jié)束搞坝,則將waitNode的thread設(shè)置為null搔谴,并返回state值
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //COMPLETING表示set開始,快結(jié)束了桩撮,則讓執(zhí)行線程讓出CPU等待一下
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //這里s就還沒到COMPLETING敦第,如果waitNode是null,則創(chuàng)建WaitNode
        else if (q == null)
            q = new WaitNode();
        //如果q不為null距境,則需要添加到WaitNode的next后繼節(jié)點
        else if (!queued)
            //這個UNSAFE操作申尼,先更新q的next=waiter,再將qCAS更新到waiters變量
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //如果是有等待時間的垫桂,則在等待時間過后,removeWaiter()來處理等待的node
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}
//處理最終結(jié)果
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        //如果state是NORMAL粟按,則直接返回outcome值
        return (V)x;
    if (s >= CANCELLED)
        //則就是cancel()使得線程被Interrupt了或者cancel了诬滩,拋出CancellationException
        throw new CancellationException();
    //否則就是拋出異常了
    throw new ExecutionException((Throwable)x);
}

因為是無線循環(huán),所以里面的q = new WaitNode()和UNSAFE更新操作在park之前都會做一邊灭将,即park之前會將當(dāng)前線程封裝到WaitNode鏈表里疼鸟。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市庙曙,隨后出現(xiàn)的幾起案子空镜,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吴攒,死亡現(xiàn)場離奇詭異张抄,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)洼怔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進(jìn)店門署惯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人镣隶,你說我怎么就攤上這事极谊。” “怎么了安岂?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵轻猖,是天一觀的道長。 經(jīng)常有香客問我域那,道長咙边,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任琉雳,我火速辦了婚禮样眠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘翠肘。我一直安慰自己檐束,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布束倍。 她就那樣靜靜地躺著被丧,像睡著了一般。 火紅的嫁衣襯著肌膚如雪绪妹。 梳的紋絲不亂的頭發(fā)上甥桂,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天,我揣著相機(jī)與錄音邮旷,去河邊找鬼黄选。 笑死,一個胖子當(dāng)著我的面吹牛婶肩,可吹牛的內(nèi)容都是我干的办陷。 我是一名探鬼主播,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼律歼,長吁一口氣:“原來是場噩夢啊……” “哼民镜!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起险毁,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤制圈,失蹤者是張志新(化名)和其女友劉穎们童,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鲸鹦,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡慧库,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了亥鬓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片完沪。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖嵌戈,靈堂內(nèi)的尸體忽然破棺而出覆积,到底是詐尸還是另有隱情,我是刑警寧澤熟呛,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布宽档,位于F島的核電站,受9級特大地震影響庵朝,放射性物質(zhì)發(fā)生泄漏吗冤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一九府、第九天 我趴在偏房一處隱蔽的房頂上張望椎瘟。 院中可真熱鬧,春花似錦侄旬、人聲如沸肺蔚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宣羊。三九已至,卻和暖如春汰蜘,著一層夾襖步出監(jiān)牢的瞬間仇冯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工族操, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留苛坚,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓色难,卻偏偏與公主長得像炕婶,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子莱预,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,901評論 2 355