Callable/Future 使用及原理分析,F(xiàn)uture .get()為啥能等待呢力麸?

線程池的執(zhí)行任務(wù)有兩種方法可款,一種是 submit、一種是 execute末盔;這兩個(gè)方法是有區(qū)別的筑舅,那么基于這個(gè)區(qū)別我們?cè)賮?lái)看看。

execute 和 submit 區(qū)別

  1. execute 只可以接收一個(gè) Runnable 的參數(shù)

  2. execute 如果出現(xiàn)異常會(huì)拋出

  3. execute 沒(méi)有返回值

  4. submit 可以接收 Runable 和 Callable 這兩種類型的參數(shù)陨舱,

  5. 對(duì)于 submit 方法翠拣,如果傳入一個(gè) Callable,可以得到一個(gè) Future 的返回值

  6. submit 方法調(diào)用不會(huì)拋異常游盲,除非調(diào)用 Future.get
    這里误墓,我們重點(diǎn)了解一下 Callable/Future,可能很多同學(xué)知道他是一個(gè)帶返回值的線程益缎,但是具體的實(shí)現(xiàn)可能不清楚谜慌。

Callable/Future 案例演示

Callable/Future 和 Thread 之類的線程構(gòu)建最大的區(qū)別在于,能夠很方便的獲取線程執(zhí)行完以后的結(jié)果莺奔。首先來(lái)看一個(gè)簡(jiǎn)單的例子

public class CallableDemo implements Callable<String> {
    public String call() throws Exception {
        Thread.sleep(3000);//阻塞案例演示
        return "hello world";
    }

    public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask futureTask = new FutureTask(callableDemo);
        new Thread(futureTask).start();
        System.out.println(futureTask.get());
    }
}

想一想我們?yōu)槭裁葱枰褂没卣{(diào)呢欣范?那是因?yàn)榻Y(jié)果值是由另一線程計(jì)算的,當(dāng)前線程是不知道結(jié)果值什么時(shí)候計(jì)算完成令哟,所以它傳遞一個(gè)回調(diào)接口給計(jì)算線程恼琼,當(dāng)計(jì)算完成時(shí),調(diào)用這個(gè)回調(diào)接口屏富,回傳結(jié)果值晴竞。

這個(gè)在很多地方有用到,比如 Dubbo 的異步調(diào)用狠半,比如消息中間件的異步通信等等… 利用 FutureTask噩死、 Callable、 Thread 對(duì)耗時(shí)任務(wù)(如查詢數(shù)據(jù)庫(kù))做預(yù)處理神年,在需要計(jì)算結(jié)果之前就啟動(dòng)計(jì)算已维。

所以我們來(lái)看一下 Future/Callable 是如何實(shí)現(xiàn)的

Callable/Future 原理分析

在剛剛實(shí)現(xiàn)的 demo 中,我們用到了兩個(gè) api已日,分別是 Callable 和 FutureTask垛耳。Callable 是一個(gè)函數(shù)式接口,里面就只有一個(gè) call 方法。子類可以重寫(xiě)這個(gè)方法艾扮,并且這個(gè)方法會(huì)有一個(gè)返回值

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

FutureTask

FutureTask 的類關(guān)系圖如下,它實(shí)現(xiàn) RunnableFuture 接口占婉,那么這個(gè) RunnableFuture 接口的作用是什么呢泡嘴。

在講解 FutureTask 之前,先看看 Callable, Future, FutureTask 它們之間的關(guān)系圖逆济,如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
  void run();
}

RunnableFuture 是一個(gè)接口酌予,它繼承了 Runnable 和 Future 這兩個(gè)接口, Runnable 太熟悉了奖慌, 那么 Future 是什么呢抛虫?

Future 表示一個(gè)任務(wù)的生命周期,并提供了相應(yīng)的方法來(lái)判斷是否已經(jīng)完成或取消简僧,以及獲取任務(wù)的結(jié)果和取消任務(wù)等建椰。

public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);
  // 當(dāng)前的 Future 是否被取消,返回 true 表示已取消
  boolean isCancelled();
  // 當(dāng)前 Future 是否已結(jié)束岛马。包括運(yùn)行完成棉姐、拋出異常以及取消,都表示當(dāng)前 Future 已結(jié)束
  boolean isDone();
  // 獲取 Future 的結(jié)果值啦逆。如果當(dāng)前 Future 還沒(méi)有結(jié)束伞矩,那么當(dāng)前線程就等待,
  // 直到 Future 運(yùn)行結(jié)束夏志,那么會(huì)喚醒等待結(jié)果值的線程的乃坤。
  V get() throws InterruptedException, ExecutionException;
  // 獲取 Future 的結(jié)果值。與 get()相比較多了允許設(shè)置超時(shí)時(shí)間
  V get(long timeout, TimeUnit unit)
  throws InterruptedException, ExecutionException, TimeoutException;
}

分析到這里我們其實(shí)有一些初步的頭緒了沟蔑, FutureTask 是 Runnable 和 Future 的結(jié)合湿诊,如果我們把 Runnable 比作是生產(chǎn)者, Future 比作是消費(fèi)者溉贿,那么 FutureTask 是被這兩者共享的枫吧,生產(chǎn)者運(yùn)行 run 方法計(jì)算結(jié)果,消費(fèi)者通過(guò) get 方法獲取結(jié)果宇色。

作為生產(chǎn)者消費(fèi)者模式九杂,有一個(gè)很重要的機(jī)制,就是如果生產(chǎn)者數(shù)據(jù)還沒(méi)準(zhǔn)備的時(shí)候宣蠕,消費(fèi)者會(huì)被阻塞例隆。當(dāng)生產(chǎn)者數(shù)據(jù)準(zhǔn)備好了以后會(huì)喚醒消費(fèi)者繼續(xù)執(zhí)行。

這個(gè)有點(diǎn)像我們上次可分析的阻塞隊(duì)列抢蚀,那么在 FutureTask 里面是基于什么方式實(shí)現(xiàn)的呢镀层?

state 的含義

表示 FutureTask 當(dāng)前的狀態(tài),分為七種狀態(tài)

private static final int NEW = 0; // NEW 新建狀態(tài),表示這個(gè) FutureTask還沒(méi)有開(kāi)始運(yùn)行
// COMPLETING 完成狀態(tài)唱逢, 表示 FutureTask 任務(wù)已經(jīng)計(jì)算完畢了
// 但是還有一些后續(xù)操作吴侦,例如喚醒等待線程操作,還沒(méi)有完成坞古。
private static final int COMPLETING = 1;
// FutureTask 任務(wù)完結(jié)备韧,正常完成,沒(méi)有發(fā)生異常
private static final int NORMAL = 2;
// FutureTask 任務(wù)完結(jié)痪枫,因?yàn)榘l(fā)生異常织堂。
private static final int EXCEPTIONAL = 3;
// FutureTask 任務(wù)完結(jié),因?yàn)槿∠蝿?wù)
private static final int CANCELLED = 4;
// FutureTask 任務(wù)完結(jié)奶陈,也是取消任務(wù)易阳,不過(guò)發(fā)起了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
private static final int INTERRUPTING = 5;
// FutureTask 任務(wù)完結(jié),也是取消任務(wù)吃粒,已經(jīng)完成了中斷運(yùn)行任務(wù)線程的中斷請(qǐng)求
private static final int INTERRUPTED = 6;

run 方法

   public void run() {
        // 如果狀態(tài) state 不是 NEW潦俺,或者設(shè)置 runner 值失敗
        // 表示有別的線程在此之前調(diào)用 run 方法,并成功設(shè)置了 runner 值
        // 保證了只有一個(gè)線程可以運(yùn)行 try 代碼塊中的代碼声搁。
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                        null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            //只有c不為null且狀態(tài)state為NEW的情況
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //調(diào)用callable的call方法黑竞,并獲得返回結(jié)果
                    result = c.call();
                    //運(yùn)行成功
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //設(shè)置結(jié)果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

其實(shí) run 方法作用非常簡(jiǎn)單,就是調(diào)用 callable 的 call 方法返回結(jié)果值 result疏旨,根據(jù)是否發(fā)生異常很魂,調(diào)用 set(result)或 setException(ex)方法表示 FutureTask 任務(wù)完結(jié)。

不過(guò)因?yàn)?FutureTask 任務(wù)都是在多線程環(huán)境中使用檐涝,所以要注意并發(fā)沖突問(wèn)題遏匆。注意在 run方法中,我們沒(méi)有使用 synchronized 代碼塊或者 Lock 來(lái)解決并發(fā)問(wèn)題谁榜,而是使用了 CAS 這個(gè)樂(lè)觀鎖來(lái)實(shí)現(xiàn)并發(fā)安全幅聘,保證只有一個(gè)線程能運(yùn)行 FutureTask 任務(wù)。

get 方法

get 方法就是阻塞獲取線程執(zhí)行結(jié)果窃植,這里主要做了兩個(gè)事情

  1. 判斷當(dāng)前的狀態(tài)帝蒿,如果狀態(tài)小于等于 COMPLETING,表示 FutureTask 任務(wù)還沒(méi)有完結(jié)巷怜,所以調(diào)用 awaitDone 方法葛超,讓當(dāng)前線程等待。
  2. report 返回結(jié)果值或者拋出異常
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

awaitDone

如果當(dāng)前的結(jié)果還沒(méi)有被執(zhí)行完延塑,把當(dāng)前線程線程和插入到等待隊(duì)列

    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        FutureTask.WaitNode q = null;
        boolean queued = false;// 節(jié)點(diǎn)是否已添加
        for (;;) {
            // 如果當(dāng)前線程中斷標(biāo)志位是 true绣张,
            // 那么從列表中移除節(jié)點(diǎn) q,并拋出 InterruptedException 異常
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 當(dāng)狀態(tài)大于 COMPLETING 時(shí)关带,表示FutureTask任務(wù)已結(jié)束
            if (s > COMPLETING) {
                if (q != null)
                    // 將節(jié)點(diǎn) q 線程設(shè)置為 null侥涵,因?yàn)榫€程沒(méi)有阻塞等待
                    q.thread = null;
                return s;
            }
            // 表示還有一些后序操作沒(méi)有完成,那么當(dāng)前線程讓出執(zhí)行權(quán)
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
                //表示狀態(tài)是 NEW,那么就需要將當(dāng)前線程阻塞等待芜飘。
                // 就是將它插入等待線程鏈表中
            else if (q == null)
                q = new FutureTask.WaitNode();
            // 使用 CAS 函數(shù)將新節(jié)點(diǎn)添加到鏈表中务豺,如果添加失敗,那么queued 為 false嗦明,
            // 下次循環(huán)時(shí)冲呢,會(huì)繼續(xù)添加,知道成功招狸。
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            // timed 為 true 表示需要設(shè)置超時(shí)
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 讓當(dāng)前線程等待 nanos 時(shí)間
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

被阻塞的線程,會(huì)等到 run 方法執(zhí)行結(jié)束之后被喚醒

report

report 方法就是根據(jù)傳入的狀態(tài)值 s邻薯,來(lái)決定是拋出異常裙戏,還是返回結(jié)果值。 這個(gè)兩種情況都表示 FutureTask 完結(jié)了

private V report(int s) throws ExecutionException {
    Object x = outcome;//表示 call 的返回值
    if (s == NORMAL) // 表示正常完結(jié)狀態(tài)厕诡,所以返回結(jié)果值
        return (V)x;
    // 大于或等于 CANCELLED累榜,都表示手動(dòng)取消 FutureTask 任務(wù),
    // 所以拋出 CancellationException 異常
  if (s >= CANCELLED)
    throw new CancellationException();
   // 否則就是運(yùn)行過(guò)程中灵嫌,發(fā)生了異常壹罚,這里就拋出這個(gè)異常
   throw new ExecutionException((Throwable)x);
}

線程池對(duì)于 Future/Callable 的執(zhí)行

我們現(xiàn)在再來(lái)看線程池里面的 submit 方法,就會(huì)很清楚了寿羞。

public class CallableDemo implements Callable<String> {
    public String call() throws Exception {
        Thread.sleep(3000);//阻塞案例演示
        return "hello world";
    }

    public static void main(String[] args) throws ExecutionException,
            InterruptedException {
        ExecutorService es= Executors.newFixedThreadPool(1);
        CallableDemo callableDemo = new CallableDemo();
        FutureTask futureTask = new FutureTask(callableDemo);

        Future future=es.submit(callableDemo);
        System.out.println(futureTask.get());
    }
}

AbstractExecutorService.submit

調(diào)用抽象類中的 submit 方法猖凛,這里其實(shí)相對(duì)于 execute 方法來(lái)說(shuō),只多做了一步操作绪穆,就是封裝了一個(gè) RunnableFuture

   public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

newTaskFor

更簡(jiǎn)單

 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

然后調(diào)用 execute 方法辨泳,這里面的邏輯前面分析過(guò)了,會(huì)通過(guò) worker 線程來(lái)調(diào)用過(guò) ftask 的run 方法玖院。而這個(gè) ftask 其實(shí)就是 FutureTask 里面最終實(shí)現(xiàn)的邏輯菠红。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市难菌,隨后出現(xiàn)的幾起案子试溯,更是在濱河造成了極大的恐慌,老刑警劉巖郊酒,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遇绞,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡猎塞,警方通過(guò)查閱死者的電腦和手機(jī)试读,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)荠耽,“玉大人钩骇,你說(shuō)我怎么就攤上這事。” “怎么了倘屹?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵银亲,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我纽匙,道長(zhǎng)务蝠,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任烛缔,我火速辦了婚禮馏段,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘践瓷。我一直安慰自己院喜,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布晕翠。 她就那樣靜靜地躺著喷舀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪淋肾。 梳的紋絲不亂的頭發(fā)上硫麻,一...
    開(kāi)封第一講書(shū)人閱讀 51,443評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音樊卓,去河邊找鬼拿愧。 笑死,一個(gè)胖子當(dāng)著我的面吹牛碌尔,可吹牛的內(nèi)容都是我干的赶掖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼七扰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼奢赂!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起颈走,我...
    開(kāi)封第一講書(shū)人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤膳灶,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后立由,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體轧钓,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年锐膜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了毕箍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡道盏,死狀恐怖而柑,靈堂內(nèi)的尸體忽然破棺而出文捶,到底是詐尸還是另有隱情,我是刑警寧澤媒咳,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布粹排,位于F島的核電站,受9級(jí)特大地震影響涩澡,放射性物質(zhì)發(fā)生泄漏顽耳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一妙同、第九天 我趴在偏房一處隱蔽的房頂上張望射富。 院中可真熱鬧,春花似錦粥帚、人聲如沸辉浦。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至掂恕,卻和暖如春拖陆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背懊亡。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工依啰, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人店枣。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓速警,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親鸯两。 傳聞我的和親對(duì)象是個(gè)殘疾皇子闷旧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354