【java并發(fā)編程】全面理解Future模式的原理和使用(Future、RunnableFuture杖小、FutureTask肆汹、Callable的理解及源碼分析)

一愚墓、前言

????通常,java中創(chuàng)建多線程的兩種方式:

  • 直接繼承Thread;
  • 實現(xiàn)Runnable接口昂勉。

????考慮到一些邏輯需要一定的先后順序浪册,如果直接用這兩種方式都會有共同的缺點:

  • 通常為阻塞式(通過join等待一個線程結(jié)束,但這樣就失去了多線程的意義)岗照,或者通過wait村象、notify、notifyAll并結(jié)合狀態(tài)變量等來進行并發(fā)設計攒至,設計起來相當復雜;
  • 線程執(zhí)行完成后難以獲取線程執(zhí)行結(jié)果(需要通過共享變量厚者、線程間通信等方式來獲取, 比較復雜)

????由此,我們想到了多線程開發(fā)中常見的Future模式迫吐。開發(fā)中經(jīng)常有一些操作可能比較耗時库菲,但又不想阻塞式的等待,這時可以先執(zhí)行一些其它操作志膀,等其它操作完成后再去獲取耗時操作的結(jié)果熙宇,這就是Future模式的描述。對應于生活中例子比比皆是:比如溉浙,打開電飯煲燒米飯后繼續(xù)炒菜烫止,等炒菜完了去看下米飯有沒有煲熟,過程中無需死等電飯煲把飯煲熟戳稽,只有在炒完菜后這個時間點馆蠕,我們才嘗試去看電飯煲煲飯的結(jié)果,這就是Future模式的一個生活原型惊奇。
????java從1.5開始荆几,在并發(fā)包中提供了Future模式的設計,我們這要結(jié)合Callable赊时、Future/FutureTask就能很容易的使用Future模式吨铸。

二、Future模式的一個簡單示例

???? 我們來看一個簡單示例:

    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<Integer> future = executor.submit(new Callable<Integer>(){

            @Override
            public Integer call()
                throws Exception
            {
                int total = 0;
                for(int i = 5001; i<=10000; i++){
                    total += i;
                }
                return total;
            }
            
        });
        
        System.out.print("Submit future task now...");
        executor.shutdown();
        
        int total = 0;
        for(int i = 1; i<=5000; i++){
            total += i;
        }
        
        total += future.get();
        
        System.out.print("1+2+...+10000 = " + total);
    }

示例中計算了1~10000且步長為1的等比數(shù)列之和祖秒,將數(shù)列拆均分成兩部分分別求和诞吱,最后進行累計。Future模式通常需要配合ExecutorService和Callable一起使用竭缝,代碼中采用ExecutorService的submit方法提交Callable線程房维,在主線程任務完成后獲取Callable線程的結(jié)果。

三抬纸、源碼分析

  1. ????
    我們先直接看下Future類型的源碼:
public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();
  
    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

????
Future接口的5個方法含義如下:

  • cancel(boolean mayInterruptIfRunning) : 取消任務, 取消成功返回true咙俩;入?yún)ayInterruptIfRunning表示是否允許取消正在執(zhí)行中的任務。
  • isCancelled() : 返回是否取消成功
  • isDone() : 返回任務是否已經(jīng)完成
  • get() : 返回執(zhí)行結(jié)果,如果任務沒有完成會阻塞到任務完成再返回
  • get(long timeout, TimeUnit unit) 獲取執(zhí)行結(jié)果并設置超時時間阿趁,如果超時返回null
  1. ????
    Future模式通常需要配合ExecutorService和Callable一起使用膜蛔,通過ExecutorService的submit方法提交Callable線程。我們知道脖阵,execute()方法在Executor接口中定義皂股,而submit()方法在ExecutorService接口中定義,ExecutorService接口繼承Executor接口:
    public interface Executor {
        void execute(Runnable command);
    }
    
    public interface ExecutorService extends Executor {
        ...
        <T> Future<T> submit(Callable<T> task);
            
        <T> Future<T> submit(Runnable task, T result);
            
        Future<?> submit(Runnable task);
        ...
    }
  1. ????
    ExecutorService只是一個接口命黔,我們以上一節(jié)的newCachedThreadPool為例呜呐,看下它的源碼:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  1. ????
    上面結(jié)果返回的是一個ThreadPoolExecutor,它是ExecutorService的一個子類悍募,看ThreadPoolExecutor源碼可以發(fā)現(xiàn)蘑辑,ThreadPoolExecutor沒有實現(xiàn)submit方法,它的submit方法由其直接父類AbstractExecutorService實現(xiàn):
    ...
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    ...
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    ...    
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    ...
  1. ????
    在上面三個submit方法中坠宴,無論是Callable接口還是Runnable接口洋魂,均是轉(zhuǎn)化成了RunnableFuture實例,看下RunnableFuture的實現(xiàn):
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
  1. ????
    RunnableFuture接口同時繼承了Runnable接口和Future接口啄踊。再看下上面講Callable或Runnable轉(zhuǎn)化成RunnableFuture實例的實現(xiàn):
    ...
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    ...
  1. ????
    通過兩個newTaskFor方法分別將Callable和Runnable實例轉(zhuǎn)化成FutureTask實例忧设,F(xiàn)utureTask是RunnableFuture的實現(xiàn)刁标,上述源碼中涉及FutureTask的兩種構(gòu)造函數(shù):
    ...
    private Callable<V> callable;
    private volatile int state;
    private static final int NEW          = 0;
    ...
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    ...
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    ...
  1. ????
    對于Callable實例颠通,直接將入?yún)allable對象賦值給this.callable屬性,并設置this.state屬性為NEW; 而對于Funnable實例膀懈,需要通過Executors類的callable(runnable, result)方法轉(zhuǎn)化成Callable實例:
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
  1. ????
    Executors類的callable(runnable, result)方法實際生成了一個RunnableAdapter對象顿锰,看下其源碼:
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

????
顯而易見,RunnableAdapter類實現(xiàn)了Callable接口的call()方法启搂,內(nèi)部調(diào)用了Runnable實例的run()方法硼控,并返回預先傳過來的result值。

  1. ????
    回過頭來看下胳赌,F(xiàn)utureTask類實現(xiàn)了RunnableFuture接口牢撼,進而實現(xiàn)了Runnable接口和Future接口的統(tǒng)一,那么它是如何實現(xiàn)Runnable接口的run()方法的呢疑苫?看下其源碼:
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    ...
    
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    ...
    
    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

????
可以看到FutureTask的run()方法內(nèi)部調(diào)用了Runnable實例的call()方法熏版,并且如果運行成功,將call()方法的返回值賦值給outcome捍掺,否則將異常賦值給outcome撼短。
????
這樣也就容易理解ExecutorService的submit方法實現(xiàn)中是如何調(diào)用execute(Runnable command)方法的了,它將Runnable或者Callable實例統(tǒng)一轉(zhuǎn)換成了RunnableFuture實例挺勿,由于RunnableFuture繼承了Runnable接口曲横,所以線程池可以通過execute(Runnable command)方法來進行處理。

  1. ????回過來看下FutureTask類的get()方法實現(xiàn):
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    ...
    
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    
    ...
    
        private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
  • 如果狀態(tài)state為任務執(zhí)行中不瓶,則阻塞等待禾嫉,否則通過report(s)返回結(jié)果灾杰。返回結(jié)果時:如果狀態(tài)正常,則直接返回outcome夭织;如果取消或者中斷吭露,則返回CancellationException異常;如果執(zhí)行異常尊惰,則返回ExecutionException讲竿。
  • 上述源碼可以看出,get()方法通過awaitDone方法進行阻塞等待弄屡,awaitDone方法實現(xiàn)上采用LockSupport.park()進行線程阻塞题禀,在FutureTasl的run()方法執(zhí)行完成或異常發(fā)生,會執(zhí)行set(V v)方法或setException(Throwable t)方法膀捷,兩者的實現(xiàn)中都會調(diào)用finishCompletion()方法迈嘹,并在finishCompletion()方法中采用LockSupport.unpark方法進行了線程喚醒:
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

四、總結(jié)

????通過上述舉例和源碼分析我們理解了java中Future模式的原理和使用全庸,F(xiàn)uture模式對于一些耗時操作(比如網(wǎng)絡請求等)的性能提升還是比較有用的秀仲,實際開發(fā)中可以靈活運用。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末壶笼,一起剝皮案震驚了整個濱河市神僵,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌覆劈,老刑警劉巖保礼,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異责语,居然都是意外死亡炮障,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進店門坤候,熙熙樓的掌柜王于貴愁眉苦臉地迎上來胁赢,“玉大人,你說我怎么就攤上這事白筹≈悄” “怎么了?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵遍蟋,是天一觀的道長吹害。 經(jīng)常有香客問我,道長虚青,這世上最難降的妖魔是什么它呀? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上纵穿,老公的妹妹穿的比我還像新娘下隧。我一直安慰自己,他們只是感情好谓媒,可當我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布淆院。 她就那樣靜靜地躺著,像睡著了一般句惯。 火紅的嫁衣襯著肌膚如雪土辩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天抢野,我揣著相機與錄音拷淘,去河邊找鬼。 笑死指孤,一個胖子當著我的面吹牛启涯,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播恃轩,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼结洼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了叉跛?” 一聲冷哼從身側(cè)響起松忍,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎昧互,沒想到半個月后挽铁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體伟桅,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡敞掘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了楣铁。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片玖雁。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖盖腕,靈堂內(nèi)的尸體忽然破棺而出赫冬,到底是詐尸還是另有隱情,我是刑警寧澤溃列,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布劲厌,位于F島的核電站,受9級特大地震影響听隐,放射性物質(zhì)發(fā)生泄漏补鼻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望风范。 院中可真熱鬧咨跌,春花似錦、人聲如沸硼婿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽寇漫。三九已至刊殉,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間州胳,已是汗流浹背冗澈。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留陋葡,地道東北人亚亲。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像腐缤,于是被迫代替她去往敵國和親捌归。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容