Future & CompleteFuture 實踐總結(jié)

1 Future介紹

1.1 Future的主要功能

JDK5新增了Future接口换况,用于描述一個異步計算的結(jié)果。

Future就是對于具體的Runnable或者Callable任務的執(zhí)行結(jié)果進行取消厅各、查詢是否完成镜撩、獲取結(jié)果等操作。必要時可以通過get方法獲取執(zhí)行結(jié)果讯检,該方法會阻塞直到任務返回結(jié)果琐鲁。

Future類位于java.util.concurrent包下,它是一個接口:

public interface Future<V> { 
 /** 
  * 方法用來取消任務人灼,如果取消任務成功則返回true围段,如果取消任務失敗則返回false。 * 
  * @param mayInterruptIfRunning 表示是否允許取消正在執(zhí)行卻沒有執(zhí)行完畢的任務投放,如果設置true奈泪,則表示可以取消正在執(zhí)行過程中的任務。 
  * @return 如果任務已經(jīng)完成灸芳,則無論mayInterruptIfRunning為true還是false涝桅,此方法肯定返回false,即如果取消已經(jīng)完成的任務會返回false烙样; 
  * 如果任務正在執(zhí)行冯遂,若mayInterruptIfRunning設置為true,則返回true谒获,若mayInterruptIfRunning設置為false蛤肌,則返回false壁却; 
  * 如果任務還沒有執(zhí)行,則無論mayInterruptIfRunning為true還是false裸准,肯定返回true展东。 
  */ 
  boolean cancel(boolean mayInterruptIfRunning); 

   /** 
    * 方法表示任務是否被取消成功 
    * @return 如果在任務正常完成前被取消成功,則返回 true 
    */ 
   boolean isCancelled(); 

   /** 
    * 方法表示任務是否已經(jīng)完成 
    * @return 若任務完成炒俱,則返回true 
    */ 
   boolean isDone(); 

   /** 
    * 方法用來獲取執(zhí)行結(jié)果盐肃,這個方法會產(chǎn)生阻塞,會一直等到任務執(zhí)行完畢才返回 
    * @return 任務執(zhí)行的結(jié)果值 
    * @throws InterruptedException 線程被中斷異常
    * @throws ExecutionException 任務執(zhí)行異常权悟,如果任務被取消砸王,還會拋出CancellationException
    */ 
   V get() throws InterruptedException, ExecutionException; 

   /** 
    * 用來獲取執(zhí)行結(jié)果,如果在指定時間內(nèi)僵芹,還沒獲取到結(jié)果处硬,就直接返回null(并不是拋出異常,需要注意)拇派。 
    * @param timeout 超時時間 
    * @param unit 超時單位 
    * @return 
    * @throws InterruptedException 
    * @throws ExecutionException 
    * @throws TimeoutException 如果計算超時荷辕,將拋出TimeoutException(待確認)
    */ 
   V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 
}

從上面方法的注釋可以看出,F(xiàn)utrue提供了三種功能:

1)判斷任務是否完成件豌;

2)能夠中斷任務疮方;

3)能夠獲取任務執(zhí)行結(jié)果。(最為常用的)

1.2 Future的局限性

從本質(zhì)上說茧彤,F(xiàn)uture表示一個異步計算的結(jié)果骡显。它提供了isDone()來檢測計算是否已經(jīng)完成,并且在計算結(jié)束后曾掂,可以通過get()方法來獲取計算結(jié)果惫谤。在異步計算中,F(xiàn)uture確實是個非常優(yōu)秀的接口珠洗。但是溜歪,它的本身也確實存在著許多限制:

  • 并發(fā)執(zhí)行多任務:Future只提供了get()方法來獲取結(jié)果,并且是阻塞的许蓖。所以蝴猪,除了等待你別無他法;當 for 循環(huán)批量獲取 Future 的結(jié)果時容易 block膊爪,因此get 方法調(diào)用時應使用 timeout 限制自阱。
  • 無法對多個任務進行鏈式調(diào)用:如果你希望在計算任務完成后執(zhí)行特定動作,比如發(fā)郵件米酬,但Future卻沒有提供這樣的能力沛豌;
  • 無法組合多個任務:如果你運行了10個任務,并期望在它們?nèi)繄?zhí)行結(jié)束后執(zhí)行特定動作赃额,那么在Future中這是無能為力的加派;
  • 沒有異常處理:Future接口中沒有關于異常處理的方法阁簸;

2 CompletableFuture介紹

雖然 Future 以及相關使用方法提供了異步執(zhí)行任務的能力,但是對于結(jié)果的獲取卻是很不方便哼丈,只能通過阻塞或者輪詢的方式得到任務的結(jié)果。如果遇到前面的task執(zhí)行較慢時需要阻塞等待前面的task執(zhí)行完后面task才能取得結(jié)果筛严。

阻塞的方式顯然和我們的異步編程的初衷相違背醉旦,輪詢的方式又會耗費無謂的 CPU 資源,而且也不能及時地得到計算結(jié)果桨啃。而CompletableFuture的主要功能就是一邊生成任務,一邊獲取任務的返回值车胡。讓兩件事分開執(zhí)行,任務之間不會互相阻塞,可以實現(xiàn)先執(zhí)行完的先取結(jié)果照瘾,不再依賴任務順序了匈棘。

2.1 CompletableFuture原理

內(nèi)部通過阻塞隊列+FutureTask,實現(xiàn)了任務先完成可優(yōu)先獲取到析命,即結(jié)果按照完成先后順序排序主卫,內(nèi)部有一個先進先出的阻塞隊列,用于保存已經(jīng)執(zhí)行完成的Future鹃愤,通過調(diào)用它的take方法或poll方法可以獲取到一個已經(jīng)執(zhí)行完成的Future簇搅,進而通過調(diào)用Future接口實現(xiàn)類的get方法獲取最終的結(jié)果。

2.2 應用場景

當需要批量提交異步任務的時候建議使用CompletableFuture软吐。CompletableFuture將線程池Executor和阻塞隊列BlockingQueue的功能融合在了一起瘩将,能夠讓批量異步任務的管理更簡單。

CompletableFuture能夠讓異步任務的執(zhí)行結(jié)果有序化凹耙。先執(zhí)行完的先進入阻塞隊列姿现,利用這個特性,你可以輕松實現(xiàn)后續(xù)處理的有序性肖抱,避免無謂的等待备典。

線程池隔離。CompletionService支持自己創(chuàng)建線程池虐沥,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險熊经。

2.3 CompletableFuture使用詳解

簡單的任務,用Future獲取結(jié)果還好欲险,但我們并行提交的多個異步任務镐依,往往并不是獨立的,很多時候業(yè)務邏輯處理存在串行[依賴]天试、并行槐壳、聚合的關系。如果要我們手動用 Fueture 實現(xiàn)喜每,是非常麻煩的务唐。

CompletableFuture是Future接口的擴展和增強雳攘。CompletableFuture實現(xiàn)了Future接口,并在此基礎上進行了豐富地擴展枫笛,完美地彌補了Future上述的種種問題吨灭。

更為重要的是,CompletableFuture實現(xiàn)了對任務的編排能力刑巧。借助這項能力喧兄,我們可以輕松地組織不同任務的運行順序、規(guī)則以及方式啊楚。從某種程度上說,這項能力是它的核心能力恭理。而在以往拯辙,雖然通過CountDownLatch等工具類也可以實現(xiàn)任務的編排,但需要復雜的邏輯處理颜价,不僅耗費精力且難以維護涯保。

3 CompletableFuture應用梳理

按照使用功能分類

按應用場景分類
  • 很多方法上,可以指定線程池拍嵌,而沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼遭赂。如果指定線程池,則使用指定的線程池運行横辆。

  • 默認情況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池撇他,這個線程池默認創(chuàng)建的線程數(shù)是 CPU 的核數(shù)(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數(shù))。

  • 如果所有 CompletableFuture 共享一個線程池狈蚤,那么一旦有任務執(zhí)行一些很慢的 I/O 操作困肩,就會導致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓脆侮,進而影響整個系統(tǒng)的性能锌畸。所以,強烈建議要根據(jù)不同的業(yè)務類型創(chuàng)建不同的線程池靖避,以避免互相干擾潭枣。

  • 等我們使用的時候,會注意到CompletableFuture的方法命名規(guī)則:

    • xxx():表示該方法將繼續(xù)在已有的線程中執(zhí)行幻捏;
  • xxxAsync():表示可能會使用其它的線程去執(zhí)行(如果使用相同的線程池盆犁,也可能會被同一個線程選中執(zhí)行)。

4 使用案例

4.1 基礎使用案例

串行執(zhí)行:

定義兩個CompletableFuture篡九,第一個CompletableFuture根據(jù)證券名稱查詢證券代碼谐岁,第二個CompletableFuture根據(jù)證券代碼查詢證券價格,這兩個CompletableFuture實現(xiàn)串行操作如下:

CompletableFuture.supplyAsync():創(chuàng)建一個包含返回值的異步任務;
thenApplyAsync():獲取前一個線程的結(jié)果進行轉(zhuǎn)換伊佃,有返回值窜司;
thenAccept():獲取前一個線程的結(jié)果進行消費蹋凝,無返回值赊颠。


public class Demo {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 第一個任務:創(chuàng)建一個包含返回值的CompletableFuture
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中國石油");
        });
        // cfQuery成功后繼續(xù)執(zhí)行下一個任務:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功后打印結(jié)果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主線程不要結(jié)束棒仍,否則CompletableFuture默認使用的線程池會立刻關閉:
        countDownLatch.await();
    }

    public static void main2(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> queryCode("中國石油"))
                .thenApplyAsync((code) -> fetchPrice(code))
                .thenAccept((result) -> System.out.println("price: " + result));
        countDownLatch.await();
    }

    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        String code = "601857";
        System.out.println("查詢證券編碼,name:" + name + ",code:" + code);
        return code;
    }

    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        Double price = 5 + Math.random() * 20;
        System.out.println("根據(jù)證券編碼查詢價格,code:" + code + ";price:" + price);
        return price;
    }
}

并行執(zhí)行:

除了串行執(zhí)行外尉姨,多個CompletableFuture還可以并行執(zhí)行。例如词顾,我們考慮這樣的場景:

同時從新浪和網(wǎng)易查詢證券代碼剃执,只要任意一個返回結(jié)果懦底,就進行下一步查詢價格漠秋,查詢價格也同時從新浪和網(wǎng)易查詢,只要任意一個返回結(jié)果抵屿,就完成操作:

CompletableFuture.supplyAsync():創(chuàng)建一個包含返回值的異步任務庆锦;
CompletableFuture.anyOf(cf1,cf2,cf3).join():多個異步線程任一執(zhí)行完即返回,有返回值Object轧葛;
thenAccept():獲取前一個線程的結(jié)果進行消費搂抒,無返回值。

public class Demo2 {
    private static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 兩個CompletableFuture執(zhí)行異步查詢:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            return queryCode("中國石油", "https://finance.sina.com.cn/code/");
        });
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
            return queryCode("中國石油", "https://money.163.com/code/");
        });

        // 用anyOf合并為一個新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 兩個CompletableFuture執(zhí)行異步查詢:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
        });
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://money.163.com/price/");
        });

        // 用anyOf合并為一個新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最終結(jié)果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主線程不要立刻結(jié)束尿扯,否則CompletableFuture默認使用的線程池會立刻關閉:
        COUNT_DOWN_LATCH.await();
    }

    public static void main2(String[] args) throws Exception {
        // 兩個CompletableFuture執(zhí)行異步查詢:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> queryCode("中國石油", "https://finance.sina.com.cn/code/"));
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> queryCode("中國石油", "https://money.163.com/code/"));

        // 用anyOf合并為一個新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 兩個CompletableFuture執(zhí)行異步查詢:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://finance.sina.com.cn/price/"));
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> fetchPrice((String) code, "https://money.163.com/price/"));

        // 用anyOf合并為一個新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最終結(jié)果:
        cfFetch.thenAccept((result) -> System.out.println("price: " + result));
        // 主線程不要立刻結(jié)束求晶,否則CompletableFuture默認使用的線程池會立刻關閉:
        COUNT_DOWN_LATCH.await();
    }

    static String queryCode(String name, String url) {
        System.out.println(Thread.currentThread().getName() + " query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String code = "601857";
        System.out.println(Thread.currentThread().getName() + " 查詢證券編碼,name:" + name + ",code:" + code);
        return code;
    }

    static Double fetchPrice(String code, String url) {
        System.out.println(Thread.currentThread().getName() + " query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Double price = 5 + Math.random() * 20;
        System.out.println(Thread.currentThread().getName() + " 根據(jù)證券編碼查詢價格,code:" + code + ";price:" + price);
        return price;
    }
}

上述邏輯實現(xiàn)的異步查詢規(guī)則實際上是:

image

4.2 實現(xiàn)最優(yōu)的“燒水泡茶”程序


public class Demo3 {

    public static void main(String[] args) {

        //任務1:洗水壺 -> 燒開水
        CompletableFuture<String> f11 = CompletableFuture.supplyAsync(() -> {
            System.out.println("T1:洗水壺...開始");
            sleep(1000);
            return "T1:洗水壺...完成";
        });
        CompletableFuture<String> f12 = f11.thenApply((f11Result) -> {
            System.out.println(f11Result);
            System.out.println("T1:燒開水...開始");
            sleep(3000);
            return "T1:燒開水...完成";
        });

        //任務2:洗茶壺->洗茶杯->拿茶葉
        CompletableFuture<Void> f21 = CompletableFuture.runAsync(() -> {
            System.out.println("==============T2:洗茶壺...開始");
            sleep(1000);
            System.out.println("==============T2:洗茶壺...完成");
        });
        CompletableFuture<Void> f22 = f21.thenRun(() -> {
            System.out.println("==============T2:洗茶杯...開始");
            sleep(2000);
            System.out.println("==============T2:洗茶杯...完成");
        });
        CompletableFuture<String> f23 = f22.thenApply(result -> {
            System.out.println("==============T2:拿茶葉...開始");
            sleep(1000);
            System.out.println("==============T2:拿茶葉...完成");
            return "龍井";
        });
        //任務3:任務1和任務2完成后執(zhí)行:泡茶
        CompletableFuture<String> f3 = f12.thenCombine(f23, (f1Result, f2Result) -> {
            System.out.println(f1Result);
            System.out.println("************T2:拿到茶葉:result" + f2Result);
            System.out.println("************T3:泡茶...,什么茶:" + f2Result);
            return "上茶:" + f2Result;
        });
        //等待任務3執(zhí)行結(jié)果
        System.out.println(f3.join());
    }

    static void sleep(int t) {
        try {
            Thread.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

參考文檔:

CompletableFuture (Java Platform SE 8 )

Future & CompleteFuture 剖析實戰(zhàn)

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市衷笋,隨后出現(xiàn)的幾起案子芳杏,更是在濱河造成了極大的恐慌,老刑警劉巖辟宗,帶你破解...
    沈念sama閱讀 211,561評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件爵赵,死亡現(xiàn)場離奇詭異,居然都是意外死亡泊脐,警方通過查閱死者的電腦和手機空幻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來容客,“玉大人秕铛,你說我怎么就攤上這事∷跆簦” “怎么了但两?”我有些...
    開封第一講書人閱讀 157,162評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長调煎。 經(jīng)常有香客問我镜遣,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,470評論 1 283
  • 正文 為了忘掉前任悲关,我火速辦了婚禮谎僻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘寓辱。我一直安慰自己艘绍,他們只是感情好,可當我...
    茶點故事閱讀 65,550評論 6 385
  • 文/花漫 我一把揭開白布秫筏。 她就那樣靜靜地躺著诱鞠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪这敬。 梳的紋絲不亂的頭發(fā)上航夺,一...
    開封第一講書人閱讀 49,806評論 1 290
  • 那天,我揣著相機與錄音崔涂,去河邊找鬼阳掐。 笑死,一個胖子當著我的面吹牛冷蚂,可吹牛的內(nèi)容都是我干的缭保。 我是一名探鬼主播,決...
    沈念sama閱讀 38,951評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼蝙茶,長吁一口氣:“原來是場噩夢啊……” “哼艺骂!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起隆夯,我...
    開封第一講書人閱讀 37,712評論 0 266
  • 序言:老撾萬榮一對情侶失蹤钳恕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后蹄衷,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苞尝,經(jīng)...
    沈念sama閱讀 44,166評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,510評論 2 327
  • 正文 我和宋清朗相戀三年宦芦,在試婚紗的時候發(fā)現(xiàn)自己被綠了宙址。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,643評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡调卑,死狀恐怖抡砂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情恬涧,我是刑警寧澤注益,帶...
    沈念sama閱讀 34,306評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站溯捆,受9級特大地震影響丑搔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,930評論 3 313
  • 文/蒙蒙 一啤月、第九天 我趴在偏房一處隱蔽的房頂上張望煮仇。 院中可真熱鬧,春花似錦谎仲、人聲如沸浙垫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夹姥。三九已至,卻和暖如春辙诞,著一層夾襖步出監(jiān)牢的瞬間辙售,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評論 1 266
  • 我被黑心中介騙來泰國打工飞涂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留圾亏,地道東北人。 一個月前我還...
    沈念sama閱讀 46,351評論 2 360
  • 正文 我出身青樓封拧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親夭问。 傳聞我的和親對象是個殘疾皇子泽西,可洞房花燭夜當晚...
    茶點故事閱讀 43,509評論 2 348

推薦閱讀更多精彩內(nèi)容