java.util.concurrent解析——FutureTask源碼解析

1. Runnable、Callable球散、Future、FutureTask的區(qū)別與聯(lián)系

和Java異步打交道就不能回避掉Runnable,Callable,Future,FutureTask等類摇肌,首先來介紹下這幾個類的區(qū)別烟零。

1.1 Runnable

Runnable接口是我們最熟悉的,它只有一個run函數(shù)暴备。然后使用某個線程去執(zhí)行該runnable即可實(shí)現(xiàn)多線程悠瞬,Thread類在調(diào)用start()函數(shù)后就是執(zhí)行的是Runnable的run()函數(shù)。Runnable最大的缺點(diǎn)在于run函數(shù)沒有返回值涯捻。

1.2 Callable

Callable接口和Runnable接口類似浅妆,它有一個call函數(shù)。使用某個線程執(zhí)行Callable接口實(shí)質(zhì)就是執(zhí)行其call函數(shù)障癌。call方法和run方法最大的區(qū)別就是call方法有返回值:

public interface Callable<V> {  
    /** 
     * Computes a result, or throws an exception if unable to do so. 
     * 
     * @return computed result 
     * @throws Exception if unable to compute a result 
     */  
    V call() throws Exception;  
} 

1.3 Future

Future就是對于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消凌外、查詢是否完成、獲取結(jié)果涛浙、設(shè)置結(jié)果操作康辑。get方法會阻塞,直到任務(wù)返回結(jié)果(Future簡介)轿亮。

1.4 FutureTask

Future只是一個接口疮薇,在實(shí)際使用過程中,諸如ThreadPoolExecutor返回的都是一個FutureTask實(shí)例我注。

public class FutureTask<V> implements RunnableFuture<V>  

public interface RunnableFuture<V> extends Runnable, Future<V> {  
    /** 
     * Sets this Future to the result of its computation 
     * unless it has been cancelled. 
     */  
    void run();  
}  

可以看到按咒,F(xiàn)utureTask是一個RunnableFuture<V>,而RunnableFuture實(shí)現(xiàn)了Runnbale又實(shí)現(xiàn)了Futrue<V>這兩個接口但骨。

2 FutureTask的構(gòu)造過程

事實(shí)上励七,通過ExecutorService接口的相關(guān)submit方法,實(shí)際上都是提交的Callable或者Runnable嗽冒,包裝成一個FutureTask對象呀伙。

public abstract class AbstractExecutorService implements ExecutorService {
  ...
  //將Runable包裝成FutureTask之后,再調(diào)用execute方法
  public Future<?> submit(Runnable task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<Void> ftask = newTaskFor(task, null);
      execute(ftask);
      return ftask;
  }
  
  //調(diào)用newTaskFor方法添坊,利用Callable構(gòu)造一個FutureTask對象
  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
      return new FutureTask<T>(callable);
  }
}

可以看到AbstractExecutorService的submit方法調(diào)用后返回的就是一個FutureTask對象剿另,接下來看下FutureTask的構(gòu)造方法:

//接受Callable對象作為參數(shù)
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    
}
//接受Runnable對象作為參數(shù)
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);//將Runnable轉(zhuǎn)為Callable對象
    this.state = NEW;     
}

//callable方法,將Runnable轉(zhuǎn)為一個Callable對象贬蛙,包裝設(shè)計(jì)模式
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
 
//RunnableAdapter是Executors的一個內(nèi)部類雨女,實(shí)現(xiàn)了Callable接口
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

可以看到,構(gòu)造FutureTask時(shí)阳准,無論傳入的是Runnable還是Callable氛堕,最終都實(shí)現(xiàn)了Callable接口。

3 FutureTask主要成員

接下來看下FutureTask類的主要成員變量:

public class FutureTask<V> implements RunnableFuture<V> {
     /*
     * FutureTask中定義了一個state變量野蝇,用于記錄任務(wù)執(zhí)行的相關(guān)狀態(tài) 讼稚,狀態(tài)的變化過程如下
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    //主流程狀態(tài)
    private static final int NEW = 0; //當(dāng)FutureTask實(shí)例剛剛創(chuàng)建到callbale的call方法執(zhí)行完成前括儒,處于此狀態(tài)
    private static final int COMPLETING  = 1; //callable的call方法執(zhí)行完成或出現(xiàn)異常時(shí),首先進(jìn)行此狀態(tài)
    private static final int NORMAL    = 2;//callable的call方法正常結(jié)束時(shí)锐想,進(jìn)入此狀態(tài)帮寻,將outcom設(shè)置為正常結(jié)果
    private static final int EXCEPTIONAL = 3;//callable的call方法異常結(jié)束時(shí),進(jìn)入此狀態(tài)赠摇,將outcome設(shè)置為拋出的異常
    //取消任務(wù)執(zhí)行時(shí)可能處于的狀態(tài)
    private static final int CANCELLED= 4;// FutureTask任務(wù)尚未執(zhí)行固逗,即還在任務(wù)隊(duì)列的時(shí)候,調(diào)用了cancel方法藕帜,進(jìn)入此狀態(tài)
    private static final int INTERRUPTING = 5;// FutureTask的run方法已經(jīng)在執(zhí)行烫罩,收到中斷信號,進(jìn)入此狀態(tài)
    private static final int INTERRUPTED  = 6;// 任務(wù)成功中斷后洽故,進(jìn)入此狀態(tài)
    
    private Callable<V> callable;//需要執(zhí)行的任務(wù)贝攒,提示:如果提交的是Runnable對象,會先轉(zhuǎn)換為Callable對象收津,這是構(gòu)造方法參數(shù)
    private Object outcome; //任務(wù)運(yùn)行的結(jié)果
    private volatile Thread runner;//執(zhí)行此任務(wù)的線程
  
    //等待該FutureTask的線程鏈表饿这,對于同一個FutureTask,如果多個線程調(diào)用了get方法撞秋,對應(yīng)的線程都會加入到waiters鏈表中长捧,同時(shí)當(dāng)FutureTask執(zhí)行完成后,也會告知所有waiters中的線程
    private volatile WaitNode waiters;
    ......
}

FutureTask的成員變量并不復(fù)雜吻贿,主要記錄以下幾部分信息:

  • 狀態(tài)
  • 任務(wù)(callable)
  • 結(jié)果(outcome)
  • 等待線程(waiters)

4 FutureTask的執(zhí)行過程

4.1 run

接下來開始看一個FutureTask的執(zhí)行過程串结,F(xiàn)utureTask執(zhí)行任務(wù)的方法當(dāng)然還是run方法:

public void run() {
    //保證callable任務(wù)只被運(yùn)行一次
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //執(zhí)行任務(wù)
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        //判斷該任務(wù)是否正在響應(yīng)中斷,如果中斷沒有完成舅列,則等待中斷操作完成
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
  • 如果狀態(tài)不為new或者運(yùn)行線程runner失敗肌割,說明當(dāng)前任務(wù)已經(jīng)被其他線程啟動或者已經(jīng)被執(zhí)行過,直接返回false
  • 調(diào)用call方法執(zhí)行核心任務(wù)邏輯帐要。如果調(diào)用成功則執(zhí)行set(result)方法把敞,將state狀態(tài)設(shè)置成NORMAL。如果調(diào)用失敗拋出異常則執(zhí)行setException(ex)方法榨惠,將state狀態(tài)設(shè)置成EXCEPTIONAL奋早,喚醒所有在get()方法上等待的線程
  • 如果當(dāng)前狀態(tài)為INTERRUPTING(步驟2已CAS失敗),則一直調(diào)用Thread.yield()直至狀態(tài)不為INTERRUPTING

4.2 set赠橙、setException方法

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

兩個方法的邏輯基本一致耽装,先通過CAS操作將狀態(tài)從NEW置為COMPLETING,然后再將最終狀態(tài)分別置為NORMAL或者EXCEPTIONAL期揪,最后再調(diào)用finishCompletion方法掉奄。

4.3 finishCompletion

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        //通過CAS把棧頂?shù)脑刂脼閚ull,相當(dāng)于彈出棧頂元素
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null;        // to reduce footprint
}

finishCompletion的邏輯也比較簡單:

  • 遍歷waiters鏈表凤薛,取出每一個節(jié)點(diǎn):每個節(jié)點(diǎn)都代表一個正在等待該FutureTask結(jié)果(即調(diào)用過get方法)的線程
  • 通過 LockSupport.unpark(t)喚醒每一個節(jié)點(diǎn)姓建,通知每個線程诞仓,該任務(wù)執(zhí)行完成

4.4 get

在finishCompletion方法中,F(xiàn)utureTask會通知waiters鏈表中的每一個等待線程速兔,那么這些線程是怎么被加入到waiters鏈表中的呢狂芋?上文已經(jīng)講過,當(dāng)在一個線程中調(diào)用了get方法憨栽,該線程就會被加入到waiters鏈表中。所以接下來看下get方法:

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

get方法很簡答翼虫,主要就是調(diào)用awaitDone方法:

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //如果該線程執(zhí)行interrupt()方法屑柔,則從隊(duì)列中移除該節(jié)點(diǎn),并拋出異常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        //如果state狀態(tài)大于COMPLETING 則說明任務(wù)執(zhí)行完成珍剑,或取消
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果state=COMPLETING掸宛,則使用yield,因?yàn)榇藸顟B(tài)的時(shí)間特別短招拙,通過yield比掛起響應(yīng)更快唧瘾。
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //構(gòu)建節(jié)點(diǎn)
        else if (q == null)
            q = new WaitNode();
        //把當(dāng)前節(jié)點(diǎn)入棧
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
        //如果需要阻塞指定時(shí)間,則使用LockSupport.parkNanos阻塞指定時(shí)間
        //如果到指定時(shí)間還沒執(zhí)行完别凤,則從隊(duì)列中移除該節(jié)點(diǎn)饰序,并返回當(dāng)前狀態(tài)
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            //阻塞當(dāng)前線程
            else
                LockSupport.park(this);
        }
}

整個方法的大致邏輯主要分為以下幾步:

  • 如果當(dāng)前狀態(tài)值大于COMPLETING,說明已經(jīng)執(zhí)行完成或者取消规哪,直接返回
  • 如果state=COMPLETING求豫,則使用yield,因?yàn)榇藸顟B(tài)的時(shí)間特別短诉稍,通過yield比掛起響應(yīng)更快
  • 如果當(dāng)前線程是首次進(jìn)入循環(huán)蝠嘉,為當(dāng)前線程創(chuàng)建wait節(jié)點(diǎn)加入到waiters鏈表中
  • 根據(jù)是否定時(shí)將當(dāng)前線程掛起(LockSupport.parkNanos LockSupport.park)來阻塞當(dāng)前線程,直到超時(shí)或者線程被finishCompletion方法喚醒
  • 當(dāng)線程掛起超時(shí)或者被喚醒后杯巨,重新循環(huán)執(zhí)行上述邏輯

get方法是FutureTask中的關(guān)鍵方法蚤告,了解了get方法邏輯也就了解為什么當(dāng)調(diào)用get方法時(shí)線程會被阻塞直到任務(wù)運(yùn)行完成。

4.5 cancel

cancel方法用于結(jié)束當(dāng)前任務(wù):

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  • 根據(jù)mayInterruptIfRunning是否為true服爷,CAS設(shè)置狀態(tài)為INTERRUPTING或CANCELLED杜恰,設(shè)置成功,繼續(xù)第二步层扶,否則返回false
  • 如果mayInterruptIfRunning為true箫章,調(diào)用runner.interupt(),設(shè)置狀態(tài)為INTERRUPTED
  • 喚醒所有在get()方法等待的線程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末镜会,一起剝皮案震驚了整個濱河市檬寂,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌戳表,老刑警劉巖桶至,帶你破解...
    沈念sama閱讀 216,997評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件昼伴,死亡現(xiàn)場離奇詭異,居然都是意外死亡镣屹,警方通過查閱死者的電腦和手機(jī)圃郊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,603評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來女蜈,“玉大人持舆,你說我怎么就攤上這事∥苯眩” “怎么了逸寓?”我有些...
    開封第一講書人閱讀 163,359評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長覆山。 經(jīng)常有香客問我竹伸,道長,這世上最難降的妖魔是什么簇宽? 我笑而不...
    開封第一講書人閱讀 58,309評論 1 292
  • 正文 為了忘掉前任勋篓,我火速辦了婚禮,結(jié)果婚禮上魏割,老公的妹妹穿的比我還像新娘譬嚣。我一直安慰自己,他們只是感情好见妒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,346評論 6 390
  • 文/花漫 我一把揭開白布孤荣。 她就那樣靜靜地躺著,像睡著了一般须揣。 火紅的嫁衣襯著肌膚如雪盐股。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,258評論 1 300
  • 那天耻卡,我揣著相機(jī)與錄音疯汁,去河邊找鬼。 笑死卵酪,一個胖子當(dāng)著我的面吹牛幌蚊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播溃卡,決...
    沈念sama閱讀 40,122評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼溢豆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了瘸羡?” 一聲冷哼從身側(cè)響起漩仙,我...
    開封第一講書人閱讀 38,970評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后队他,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體卷仑,經(jīng)...
    沈念sama閱讀 45,403評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,596評論 3 334
  • 正文 我和宋清朗相戀三年麸折,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了锡凝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,769評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡垢啼,死狀恐怖窜锯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情芭析,我是刑警寧澤衬浑,帶...
    沈念sama閱讀 35,464評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站放刨,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏尸饺。R本人自食惡果不足惜进统,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,075評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浪听。 院中可真熱鬧螟碎,春花似錦、人聲如沸迹栓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,705評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽克伊。三九已至酥郭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間愿吹,已是汗流浹背不从。 一陣腳步聲響...
    開封第一講書人閱讀 32,848評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留犁跪,地道東北人椿息。 一個月前我還...
    沈念sama閱讀 47,831評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像坷衍,于是被迫代替她去往敵國和親寝优。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,678評論 2 354

推薦閱讀更多精彩內(nèi)容