Java8 CompletableFuture 異步任務(wù)

更多 Java 并發(fā)編程方面的文章,請參見文集《Java 并發(fā)編程》


所謂異步調(diào)用其實(shí)就是實(shí)現(xiàn)一個(gè)可無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。在 Java 語言中,簡單的講就是另啟一個(gè)線程來完成調(diào)用中的部分計(jì)算闯割,使調(diào)用繼續(xù)運(yùn)行或返回,而不需要等待計(jì)算結(jié)果竿拆。但調(diào)用者仍需要取線程的計(jì)算結(jié)果宙拉。

關(guān)于 Java Future,請首先參見

JDK5 新增了 Future 接口丙笋,用于描述一個(gè)異步計(jì)算的結(jié)果谢澈。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便御板,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果锥忿。 例如:

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newSingleThreadExecutor();

    // 在 Java8 中,推薦使用 Lambda 來替代匿名 Callable 實(shí)現(xiàn)類
    Future<Integer> f = es.submit(() -> {
        System.out.println(Thread.currentThread().getName());

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return 123;
    });

    // 當(dāng)前 main 線程阻塞怠肋,直至 future 得到值
    System.out.println(f.get());

    es.shutdown();
}

阻塞的方式顯然和我們的異步編程的初衷相違背敬鬓,輪詢的方式又會(huì)耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果笙各,為什么不能用觀察者設(shè)計(jì)模式呢钉答?即當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽者。(例如通過回調(diào)的方式)

關(guān)于 Future 接口杈抢,還有如下一段描述:

The Future interface was added in Java 5 to serve as a result of an asynchronous computation, but it did not have any methods to combine these computations or handle possible errors.
不能很好地組合多個(gè)異步任務(wù)数尿,也不能處理可能的異常。

CompletableFuture

Java 8 中, 新增加了一個(gè)包含 50 個(gè)方法左右的類 CompletableFuture惶楼,它提供了非常強(qiáng)大的 Future 的擴(kuò)展功能砌创,可以幫助我們簡化異步編程的復(fù)雜性虏缸,并且提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果嫩实,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法。

對于阻塞或者輪詢方式窥岩,依然可以通過 CompletableFuture 類的 CompletionStageFuture 接口方式支持甲献。

CompletableFuture 類聲明了 CompletionStage 接口,CompletionStage 接口實(shí)際上提供了同步或異步運(yùn)行計(jì)算的舞臺颂翼,所以我們可以通過實(shí)現(xiàn)多個(gè) CompletionStage 命令晃洒,并且將這些命令串聯(lián)在一起的方式實(shí)現(xiàn)多個(gè)命令之間的觸發(fā)。

我們可以通過 CompletableFuture.supplyAsync(this::sendMsg); 這么一行代碼創(chuàng)建一個(gè)簡單的異步計(jì)算朦乏。在這行代碼中球及,supplyAsync 支持異步地執(zhí)行我們指定的方法,這個(gè)例子中的異步執(zhí)行方法是 sendMsg呻疹。當(dāng)然吃引,我們也可以使用 Executor 執(zhí)行異步程序,默認(rèn)是 ForkJoinPool.commonPool()刽锤。

我們也可以在異步計(jì)算結(jié)束之后指定回調(diào)函數(shù)镊尺,例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify); 這行代碼中的 thenAccept 被用于增加回調(diào)函數(shù),在我們的示例中 notify 就成了異步計(jì)算的消費(fèi)者并思,它會(huì)處理計(jì)算結(jié)果庐氮。

CompletionStage<T> 接口

A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes.
A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.
一個(gè)可能執(zhí)行的異步計(jì)算的某個(gè)階段,在另一個(gè)CompletionStage完成時(shí)執(zhí)行一個(gè)操作或計(jì)算一個(gè)值宋彼。
一個(gè)階段完成后弄砍,其計(jì)算結(jié)束。但是输涕,該計(jì)算階段可能會(huì)觸發(fā)下一個(gè)計(jì)算階段音婶。

最簡單的例子

CompletableFuture 實(shí)際上也實(shí)現(xiàn)了 Future 接口:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

所以我們也可以利用 CompletableFuture 來實(shí)現(xiàn)基本的 Future 功能,例如:

public static void main(String[] args) throws Exception {
    CompletableFuture future = new CompletableFuture();

    // 在 Java8 中占贫,推薦使用 Lambda 來替代匿名 Runnable 實(shí)現(xiàn)類
    new Thread(
            () -> {
                try {
                    // 模擬一段耗時(shí)的操作
                    Thread.sleep(2000);
                    
                    future.complete("I have completed");
                } catch (Exception e) {
                }
            }
    ).start();

    System.out.println(future.get());
}

此時(shí)此刻主線程 future.get() 將得到字符串的結(jié)果 I have completed桃熄,同時(shí)完成回調(diào)以后將會(huì)立即生效。注意 complete() 方法只能調(diào)用一次型奥,后續(xù)調(diào)用將被忽略瞳收。

注意:get() 方法可能會(huì)拋出異常 InterruptedExceptionExecutionException

如果我們已經(jīng)知道了異步任務(wù)的結(jié)果厢汹,我們也可以直接創(chuàng)建一個(gè)已完成的 future螟深,如下:

public static void main(String[] args) throws Exception {
    // Returns a new CompletableFuture that is already completed with the given value.
    CompletableFuture future = CompletableFuture.completedFuture("I have completed");

    System.out.println(future.get());
}

如果在異步執(zhí)行過程中,我們覺得執(zhí)行會(huì)超時(shí)或者會(huì)出現(xiàn)問題烫葬,我們也可以通過 cancle() 方法取消界弧,此時(shí)調(diào)用 get() 方法時(shí)會(huì)產(chǎn)生異常 java.util.concurrent.CancellationException凡蜻,代碼如下:

public static void main(String[] args) throws Exception {
    CompletableFuture future = new CompletableFuture();

    // 在 Java8 中,推薦使用 Lambda 來替代匿名 Runnable 實(shí)現(xiàn)類
    new Thread(
            () -> {
                try {
                    // 模擬一段耗時(shí)的操作
                    Thread.sleep(2000);

                    future.cancel(false);
                } catch (Exception e) {
                }
            }
    ).start();

    System.out.println(future.get());
}

使用工廠方法創(chuàng)建 CompletableFuture

在上述的代碼中垢箕,我們手動(dòng)地創(chuàng)建 CompletableFuture划栓,并且手動(dòng)的創(chuàng)建一個(gè)線程(或者利用線程池)來啟動(dòng)異步任務(wù),這樣似乎有些復(fù)雜条获。

其實(shí)我們可以利用 CompletableFuture 的工廠方法忠荞,傳入 Supplier 或者 Runnable 的實(shí)現(xiàn)類,直接得到一個(gè) CompletableFuture 的實(shí)例:

  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • public static CompletableFuture<Void> runAsync(Runnable runnable)
  • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

第一個(gè)和第三個(gè)方法帅掘,沒有 Executor 參數(shù)委煤,將會(huì)使用 ForkJoinPool.commonPool() (全局的,在 JDK8 中介紹的通用池)修档,這適用于 CompletableFuture 類中的大多數(shù)的方法碧绞。

  • Runnable 接口方法 public abstract void run(); 沒有返回值
  • Supplier 接口方法 T get(); 有返回值。如果你需要處理異步操作并返回結(jié)果吱窝,使用前兩種 Supplier<U> 方法

一個(gè)小的 Tips:

Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions thanks to the new Java 8 feature. 使用 Lambda 表達(dá)式來傳入 Supplier 或者 Runnable 的實(shí)現(xiàn)類讥邻。

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {

    // 在 Java8 中,推薦使用 Lambda 來替代匿名 Supplier 實(shí)現(xiàn)類
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
        }

        return "I have completed";
    });

    System.out.println(future.get());
}

轉(zhuǎn)換和作用于異步任務(wù)的結(jié)果 (thenApply)

我們可以疊加功能癣诱,把多個(gè) future 組合在一起等

  • public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    • 該方法的作用是在該計(jì)算階段正常完成后计维,將該計(jì)算階段的結(jié)果作為參數(shù)傳遞給參數(shù) fn 值的函數(shù)Function,并會(huì)返回一個(gè)新的 CompletionStage
  • public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    • 該方法和上面的方法 thenApply 功能類似撕予,不同的是對該計(jì)算階段的結(jié)果進(jìn)行計(jì)算的函數(shù) fn 的執(zhí)行時(shí)異步的鲫惶。
  • public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    • 該方法和上面的方法 thenApplyAsync 功能類似,不同的是對該計(jì)算階段的結(jié)果進(jìn)行計(jì)算的函數(shù) fn 的執(zhí)行時(shí)異步的实抡, 并且是在調(diào)用者提供的線程池中執(zhí)行的欠母。

Function 接口方法 R apply(T t); 包含一個(gè)參數(shù)和一個(gè)返回值

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {
    // 在 Java8 中,推薦使用 Lambda 來替代匿名 Supplier 實(shí)現(xiàn)類
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
        }

        return "I have completed";
    });

    CompletableFuture<String> upperfuture = future.thenApply(String::toUpperCase);

    System.out.println(upperfuture.get());
}

運(yùn)行完成的異步任務(wù)的結(jié)果 (thenAccept/thenRun)

future 的管道里有兩種典型的“最終”階段方法吆寨。他們在你使用 future 的值的時(shí)候做好準(zhǔn)備赏淌,當(dāng)
thenAccept() 提供最終的值時(shí),thenRun 執(zhí)行 Runnable啄清。

  • public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
  • public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  • public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
  • public CompletableFuture<Void> thenRun(Runnable action)
  • public CompletableFuture<Void> thenRunAsync(Runnable action)
  • public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

Consumer 接口方法 void accept(T t); 包含一個(gè)參數(shù)六水,但是沒有返回值

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {
    // 在 Java8 中,推薦使用 Lambda 來替代匿名 Supplier 實(shí)現(xiàn)類
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
        }

        return "I have completed";
    });

    future.thenAccept(s -> {
        System.out.println(s);
    });

    // Waits if necessary for this future to complete, and then returns its result.
    future.get();
}

結(jié)合兩個(gè) CompletableFuture

The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.
這就是 CompletableFuture 最大的優(yōu)勢辣卒。

  • public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
    • 傳入前一個(gè) CompletableFuture 的返回值掷贾,返回另外一個(gè) CompletableFuture 實(shí)例
  • public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
  • public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
  • public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future
            = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
        }

        return "Hello ";
    }).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
        }

        return s + "World";
    }));

    System.out.println(future.get()); // Hello World
}

上述功能也可以通過 thenCombine() 方法實(shí)現(xiàn),傳入一個(gè) BiFunction 接口的實(shí)例(以 Lambda 形式) 例如:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future
            = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000);
        } catch (Exception e) {
        }

        return "Hello ";
    }).thenCombine(CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000);
        } catch (Exception e) {
        }

        return "World";
    }), (s1, s2) -> s1 + s2);

    System.out.println(future.get());
}

并行執(zhí)行多個(gè)異步任務(wù)

有時(shí)候我們可能需要等待所有的異步任務(wù)都執(zhí)行完畢荣茫,然后組合他們的結(jié)果想帅。我們可以使用 allOf() 方法:

  • public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future1
            = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2
            = CompletableFuture.supplyAsync(() -> "World");


    CompletableFuture<Void> combinedFuture
            = CompletableFuture.allOf(future1, future2);

    // 這個(gè)方法不會(huì)合并結(jié)果,可以看到他的返回值是 Void 類型
    combinedFuture.get();

    // 我們需要手動(dòng)來處理每一個(gè)并行異步任務(wù)的結(jié)果
    String combined = Stream.of(future1, future2)
            .map(CompletableFuture::join)
            .collect(Collectors.joining(" "));

    System.out.println(combined); // Hello World
}

有時(shí)候我們可能不需要等待所有的異步任務(wù)都執(zhí)行完畢啡莉,只要任何一個(gè)任務(wù)完成就返回結(jié)果港准。我們可以使用 anyOf() 方法:

  • public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future1
            = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future2
            = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                }

                return "World";
            }
    );


    CompletableFuture<Object> combinedFuture
            = CompletableFuture.anyOf(future1, future2);

    System.out.println(combinedFuture.get()); // Hello
}

異常的處理

我們可以在 handle() 方法里處理異常:

  • public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
    • 第一個(gè)參數(shù)為 CompletableFuture 返回的結(jié)果
    • 第二個(gè)參數(shù)為拋出的異常

一個(gè)示例代碼如下:

public static void main(String[] args) throws Exception {
    String name = null;

    CompletableFuture<String> future
            = CompletableFuture.supplyAsync(() -> {
        if (name == null) {
            throw new RuntimeException("Computation error!");
        }
        
        return "Hello, " + name;
    }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

    System.out.println(future.get()); // Hello, Stranger!
}

參考:
Java 8:CompletableFuture終極指南- ImportNew
通過實(shí)例理解JDK8 的CompletableFuture - IBM
Guide To CompletableFuture
java8中CompletableFuture解析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末旨剥,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子浅缸,更是在濱河造成了極大的恐慌轨帜,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疗杉,死亡現(xiàn)場離奇詭異阵谚,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)烟具,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來奠蹬,“玉大人朝聋,你說我怎么就攤上這事《谠辏” “怎么了冀痕?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長狸演。 經(jīng)常有香客問我言蛇,道長,這世上最難降的妖魔是什么宵距? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任腊尚,我火速辦了婚禮,結(jié)果婚禮上满哪,老公的妹妹穿的比我還像新娘婿斥。我一直安慰自己,他們只是感情好哨鸭,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布民宿。 她就那樣靜靜地躺著,像睡著了一般像鸡。 火紅的嫁衣襯著肌膚如雪活鹰。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天只估,我揣著相機(jī)與錄音志群,去河邊找鬼。 笑死仅乓,一個(gè)胖子當(dāng)著我的面吹牛赖舟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播夸楣,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼宾抓,長吁一口氣:“原來是場噩夢啊……” “哼子漩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起石洗,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤幢泼,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后讲衫,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體缕棵,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年涉兽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了招驴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡枷畏,死狀恐怖别厘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拥诡,我是刑警寧澤触趴,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站渴肉,受9級特大地震影響冗懦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜仇祭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一披蕉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧前塔,春花似錦嚣艇、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至寂屏,卻和暖如春贰谣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背迁霎。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工吱抚, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人考廉。 一個(gè)月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓秘豹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親昌粤。 傳聞我的和親對象是個(gè)殘疾皇子既绕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容