Callable實(shí)現(xiàn)子線程獲取函數(shù)返回值

Callable接口

Java中的子線程通常是通過Thread或者Runnable的方式實(shí)現(xiàn)的伺帘,但是這種方式只能通過回調(diào),或者共享變量等方式來傳遞數(shù)據(jù)簸州,而Callable則是可以獲取返回結(jié)果的一種子線程實(shí)現(xiàn)方式谨敛。

Callable是一個(gè)接口尊蚁,源碼如下:

public interface Callable<V> {
    V call() throws Exception;
}

非常簡(jiǎn)單,只有一個(gè)方法吃嘿,和一個(gè)泛型V祠乃,所以我們創(chuàng)建Callable對(duì)象的時(shí)候,也只需要指定返回類型并實(shí)現(xiàn)call方法就可以了兑燥。

Future接口

看完了Callable接口亮瓷,會(huì)發(fā)現(xiàn)它非常簡(jiǎn)單,沒有辦法在子線程中直接通過它來獲取到返回結(jié)果的降瞳,這時(shí)候就需要Future發(fā)揮作用了嘱支。源碼如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • boolean cancel(boolean mayInterruptIfRunning) 在任務(wù)開始前取消,傳入值表示任務(wù)開始后是否允許打斷力崇,返回值表示是否取消成功(任務(wù)已經(jīng)開始不允許打斷斗塘,已經(jīng)運(yùn)行結(jié)束,已經(jīng)取消等等狀態(tài)會(huì)返回失斄裂ァ)
  • boolean isCancelled() 是否已經(jīng)被取消
  • boolean isDone() 是否完成任務(wù)
  • V get() 嘗試獲取返回結(jié)果馍盟,阻塞方法
  • V get(long timeout, TimeUnit unit) 同上,可以指定超時(shí)時(shí)間

可以看到茧吊,Future實(shí)際上可以理解為Callable的管理類贞岭。

在線程池中執(zhí)行任務(wù)時(shí),除了execute方法之外搓侄,還有一個(gè)submit方法:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

它返回的就是一個(gè)Future對(duì)象瞄桨,可以通過它來回去Callable任務(wù)的執(zhí)行結(jié)果:


Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("Sub Thread is calculating...");
                Thread.sleep(10000);
                return 10;
            }
        };
        
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
    try {
            System.out.println("Main Thread start waiting result... ");

            int res = future.get();
            System.out.println("Main Thread get result: " + res);
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

//運(yùn)行結(jié)果:
Main Thread start waiting result... 
Sub Thread is calculating...
Main Thread get result: 10

FutureTask

如果不用Java提供的線程池,直接用Thread怎樣在子線程中運(yùn)行Callable呢讶踪? 這時(shí)候就要用到FutureTask類了芯侥。

FutureTask實(shí)現(xiàn)了FutureRunnable接口,這就意味著乳讥,它既可以放在Thread中去運(yùn)行柱查,又能夠?qū)θ蝿?wù)進(jìn)行管理,下面是源碼:


    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

可以看到云石,構(gòu)造函數(shù)中傳入了待運(yùn)行的任務(wù)Callable對(duì)象或者Runnable對(duì)象和指定的返回結(jié)果唉工,V result用來指定運(yùn)行完成后的返回值,如果不想用指定值汹忠,可以用Future<?> f = new FutureTask<Void>(runnable, null)來返回null淋硝。采用Callable構(gòu)造方法創(chuàng)建的FutureTask對(duì)象雹熬,執(zhí)行完畢返回的是實(shí)際運(yùn)算結(jié)果,而Runnable 構(gòu)造函數(shù)返回值是傳入的result谣膳。

task的狀態(tài)

FutureTask中持有的任務(wù)對(duì)象竿报,有以下幾種狀態(tài):

    private static final int NEW          = 0; //新建或運(yùn)行中
    private static final int COMPLETING   = 1;//任務(wù)運(yùn)行結(jié)束,正在處理一些后續(xù)操作
    private static final int NORMAL       = 2;//任務(wù)已經(jīng)完成参歹,COMPLETING的下一個(gè)狀態(tài)
    private static final int EXCEPTIONAL  = 3;//任務(wù)拋出異常仰楚,COMPLETING的下一個(gè)狀態(tài)
    private static final int CANCELLED    = 4;//任務(wù)被取消
    private static final int INTERRUPTING = 5;//收到打斷指令,還沒有執(zhí)行interrupt
    private static final int INTERRUPTED  = 6;//收到打斷指令犬庇,也執(zhí)行了interrupt

可能的狀態(tài)變化主要有以下幾種:

     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED

task的執(zhí)行過程

FutureTask實(shí)現(xiàn)了Runnable接口僧界,是可以直接放在Thread中執(zhí)行的,實(shí)際上運(yùn)行的就是它的run方法:

public void run() {
    //r如果當(dāng)前狀態(tài)不是NEW臭挽,說明任務(wù)已經(jīng)執(zhí)行完成了捂襟,直接返回
    //如果當(dāng)前狀態(tài)是NEW,嘗試用CAS方式將當(dāng)前線程賦值給RUNNER欢峰,賦值前RUNNER的值應(yīng)該是null葬荷,否則賦值失敗
    //賦值失敗表示已經(jīng)有線程執(zhí)行了run方法,直接返回
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //運(yùn)行中拋出異常
                    setException(ex);
                }
                //ran為true纽帖,說明正常運(yùn)行結(jié)束宠漩,得到了返回結(jié)果
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

執(zhí)行結(jié)果其實(shí)是比較簡(jiǎn)單的,通過RUNNER來記錄執(zhí)行任務(wù)的線程懊直,從而保證只有一個(gè)線程可以執(zhí)行該任務(wù)扒吁。運(yùn)行結(jié)束后有兩個(gè)出口:

  • setException(ex); 運(yùn)行中出錯(cuò),拋出異常
  • set(result); 任務(wù)執(zhí)行完畢室囊,獲取到返回值
    protected void setException(Throwable t) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = t;
            U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //喚醒等待線程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

這兩個(gè)方法實(shí)際上是一樣的雕崩,都是將狀態(tài)賦值為COMPLETING,然后保存結(jié)果(運(yùn)行結(jié)果或錯(cuò)誤信息)融撞,再執(zhí)行finishCompletion方法盼铁,通知WAITERS里記錄的等待線程繼續(xù)執(zhí)行,并清空WAITERS尝偎。

獲取返回結(jié)果

獲取返回結(jié)果是通過get()方法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
            //cas機(jī)制饶火,將新建節(jié)點(diǎn)q的next指向原來的節(jié)點(diǎn)workers,然后將workers更新為新建的節(jié)點(diǎn)致扯。workers(WAITERS)實(shí)際上就是持有了所有等待線程的一個(gè)鏈表
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

get()方法比較簡(jiǎn)單趁窃,先判斷當(dāng)前狀態(tài),如果狀態(tài) < COMPLETING急前,說明任務(wù)沒有執(zhí)行完畢,直接調(diào)用awaitDone方法瀑构。

awaitDone方法可以接受兩個(gè)參數(shù)裆针,用來指定是否設(shè)置超時(shí)時(shí)間刨摩。它內(nèi)部是一個(gè)無限for循環(huán)。下面是awaitDone方法的執(zhí)行步驟(忽略超時(shí)設(shè)置):

  1. 進(jìn)入awaitDone方法時(shí)世吨,state一定是小于COMPLETING的澡刹,第一次會(huì)走else if (q == null)分支,創(chuàng)建一個(gè)WaitNode()對(duì)象用來保存當(dāng)前線程

  2. 第二次循環(huán)q已經(jīng)不是null了耘婚,如果任務(wù)仍然沒有結(jié)束罢浇,會(huì)執(zhí)行else if (!queued)分支,queued表示創(chuàng)建的WaitNode()是否已經(jīng)添加到鏈表里沐祷,如果沒有嘗試添加嚷闭,直到添加成功為止。

  3. 等待線程添加成功以后進(jìn)入下一個(gè)循環(huán)赖临,此時(shí)如果任務(wù)仍然沒有結(jié)束胞锰,會(huì)走到else分支,掛起當(dāng)前線程(阻塞)

  4. 此處阻塞的是等待結(jié)果的線程兢榨,也就是調(diào)用FutureTaskget()方法的線程嗅榕,而不是執(zhí)行任務(wù)的線程。阻塞線程用的是LockSupport.park(this)方法吵聪,喚醒的方法是LockSupport.unpark()凌那,該方法在上邊的finishCompletion()中出現(xiàn)了,也就是說吟逝,任務(wù)執(zhí)行結(jié)束(運(yùn)行完帽蝶,拋出異常,被取消)時(shí)澎办,等待的線程才會(huì)被喚醒嘲碱,繼續(xù)下一次循環(huán)。

  5. 任務(wù)結(jié)束以后局蚀,如果state是COMPLETING狀態(tài)麦锯,說明一些清理任務(wù)還沒有執(zhí)行完,等待的線程會(huì)讓出cpu琅绅,讓其他線程優(yōu)先執(zhí)行

  6. 直到state 大于COMPLETING扶欣,說明FutureTask已經(jīng)完全結(jié)束了,此時(shí)會(huì)會(huì)執(zhí)行(s > COMPLETING)分支千扶,把節(jié)點(diǎn)置空料祠,并返回。

awaitDone返回以后澎羞,說明任務(wù)已經(jīng)執(zhí)行完成了髓绽,會(huì)進(jìn)入report方法:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

可以看到,如果是正常結(jié)束妆绞,或者拋出異常結(jié)束顺呕,會(huì)返回結(jié)果枫攀,而如果是被取消,則會(huì)拋出異常株茶。

使用

Callable<Integer> call = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("正在計(jì)算結(jié)果...");
                Thread.sleep(3000);
                return 1;
            }
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();
        Integer result = task.get();
        System.out.println("結(jié)果為:" + result);

總結(jié)

  1. FutureTask可以視為一個(gè)管理Callable任務(wù)的工具類来涨,執(zhí)行Callable任務(wù)的是FutureTaskrun方法,所以启盛,可以通過 new Thread(futuretask)的方法來實(shí)現(xiàn)子線程執(zhí)行任務(wù)

  2. 獲取執(zhí)行結(jié)果是通過FutureTaskget方法蹦掐,調(diào)用該方法后,如果線程會(huì)被掛起僵闯,知道任務(wù)結(jié)束為止

  3. 獲取結(jié)果的線程數(shù)量沒有限定卧抗,可以是任意個(gè)線程

  4. 獲取結(jié)果的線程被掛起以后,可以通過取消棍厂,超時(shí)等方法在任務(wù)執(zhí)行完畢以前結(jié)束掛起狀態(tài)颗味。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市牺弹,隨后出現(xiàn)的幾起案子浦马,更是在濱河造成了極大的恐慌,老刑警劉巖张漂,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件晶默,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡航攒,警方通過查閱死者的電腦和手機(jī)磺陡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來漠畜,“玉大人币他,你說我怎么就攤上這事°灸” “怎么了蝴悉?”我有些...
    開封第一講書人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)瘾敢。 經(jīng)常有香客問我拍冠,道長(zhǎng),這世上最難降的妖魔是什么簇抵? 我笑而不...
    開封第一講書人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任庆杜,我火速辦了婚禮,結(jié)果婚禮上碟摆,老公的妹妹穿的比我還像新娘晃财。我一直安慰自己,他們只是感情好典蜕,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開白布拓劝。 她就那樣靜靜地躺著雏逾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪郑临。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,837評(píng)論 1 290
  • 那天屑宠,我揣著相機(jī)與錄音厢洞,去河邊找鬼。 笑死典奉,一個(gè)胖子當(dāng)著我的面吹牛躺翻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播卫玖,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼公你,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了假瞬?” 一聲冷哼從身側(cè)響起陕靠,我...
    開封第一講書人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎脱茉,沒想到半個(gè)月后剪芥,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡琴许,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年税肪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片榜田。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡益兄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出箭券,到底是詐尸還是另有隱情净捅,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布邦鲫,位于F島的核電站灸叼,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏庆捺。R本人自食惡果不足惜古今,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望滔以。 院中可真熱鬧捉腥,春花似錦、人聲如沸你画。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至拟逮,卻和暖如春撬统,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背敦迄。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工恋追, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人罚屋。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓苦囱,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親脾猛。 傳聞我的和親對(duì)象是個(gè)殘疾皇子撕彤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容