CompletableFuture 使用詳解

1逼肯、 runAsync 和 supplyAsync方法

CompletableFuture 提供了四個靜態(tài)方法來創(chuàng)建一個異步操作。

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼。如果指定線程池卢未,則使用指定的線程池運行觅够。以下所有的方法都類同。

  • runAsync方法不支持返回值骡苞。
  • supplyAsync可以支持返回值垂蜗。

示例

//無返回值
public static void runAsync() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        System.out.println("run end ...");
    });
    
    future.get();
}

//有返回值
public static void supplyAsync() throws Exception {         
    CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        System.out.println("run end ...");
        return System.currentTimeMillis();
    });

    long time = future.get();
    System.out.println("time = "+time);
}

2、計算結果完成時的回調方法

當CompletableFuture的計算結果完成解幽,或者拋出異常的時候贴见,可以執(zhí)行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的類型是BiConsumer<? super T,? super Throwable>它可以處理正常的計算結果躲株,或者異常情況片部。

whenComplete 和 whenCompleteAsync 的區(qū)別:
whenComplete:是執(zhí)行當前任務的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務。
whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個任務繼續(xù)提交給線程池來進行執(zhí)行徘溢。

示例

public static void whenComplete() throws Exception {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
        if(new Random().nextInt()%2>=0) {
            int i = 12/0;
        }
        System.out.println("run end ...");
    });
    
    future.whenComplete(new BiConsumer<Void, Throwable>() {
        @Override
        public void accept(Void t, Throwable action) {
            System.out.println("執(zhí)行完成吞琐!");
        }
        
    });
    future.exceptionally(new Function<Throwable, Void>() {
        @Override
        public Void apply(Throwable t) {
            System.out.println("執(zhí)行失敗然爆!"+t.getMessage());
            return null;
        }
    });
    
    TimeUnit.SECONDS.sleep(2);
}

3站粟、 thenApply 方法

當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化曾雕。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

Function<? super T,? extends U>
T:上一個任務返回結果的類型
U:當前任務的返回值類型

示例

private static void thenApply() throws Exception {
    CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
        @Override
        public Long get() {
            long result = new Random().nextInt(100);
            System.out.println("result1="+result);
            return result;
        }
    }).thenApply(new Function<Long, Long>() {
        @Override
        public Long apply(Long t) {
            long result = t*5;
            System.out.println("result2="+result);
            return result;
        }
    });
    
    long result = future.get();
    System.out.println(result);
}

第二個任務依賴第一個任務的結果奴烙。

4、 handle 方法

handle 是執(zhí)行任務完成時對結果的處理剖张。
handle 方法和 thenApply 方法處理方式基本一樣切诀。不同的是 handle 是在任務完成后再執(zhí)行,還可以處理異常的任務搔弄。thenApply 只可以執(zhí)行正常的任務幅虑,任務出現(xiàn)異常則不執(zhí)行 thenApply 方法。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

示例

public static void handle() throws Exception{
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {

        @Override
        public Integer get() {
            int i= 10/0;
            return new Random().nextInt(10);
        }
    }).handle(new BiFunction<Integer, Throwable, Integer>() {
        @Override
        public Integer apply(Integer param, Throwable throwable) {
            int result = -1;
            if(throwable==null){
                result = param * 2;
            }else{
                System.out.println(throwable.getMessage());
            }
            return result;
        }
     });
    System.out.println(future.get());
}

從示例中可以看出顾犹,在 handle 中可以根據(jù)任務是否有異常來進行做相應的后續(xù)處理操作倒庵。而 thenApply 方法褒墨,如果上個任務出現(xiàn)錯誤,則不會執(zhí)行 thenApply 方法擎宝。

5郁妈、 thenAccept 消費處理結果

接收任務的處理結果,并消費處理绍申,無返回結果噩咪。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

示例

public static void thenAccept() throws Exception{
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return new Random().nextInt(10);
        }
    }).thenAccept(integer -> {
        System.out.println(integer);
    });
    future.get();
}

從示例代碼中可以看出,該方法只是消費執(zhí)行完成的任務极阅,并可以根據(jù)上面的任務返回的結果進行處理胃碾。并沒有后續(xù)的輸錯操作。

6涂屁、thenRun 方法

跟 thenAccept 方法不一樣的是书在,不關心任務的處理結果。只要上面的任務執(zhí)行完成拆又,就開始執(zhí)行 thenAccept 儒旬。

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

示例

public static void thenRun() throws Exception{
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            return new Random().nextInt(10);
        }
    }).thenRun(() -> {
        System.out.println("thenRun ...");
    });
    future.get();
}

該方法同 thenAccept 方法類似。不同的是上個任務處理完成后帖族,并不會把計算的結果傳給 thenRun 方法栈源。只是處理玩任務后,執(zhí)行 thenAccept 的后續(xù)操作竖般。

7甚垦、thenCombine 合并任務

thenCombine 會把 兩個 CompletionStage 的任務都執(zhí)行完成后,把兩個任務的結果一塊交給 thenCombine 來處理涣雕。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

示例

private static void thenCombine() throws Exception {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            return "hello";
        }
    });
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(new Supplier<String>() {
        @Override
        public String get() {
            return "hello";
        }
    });
    CompletableFuture<String> result = future1.thenCombine(future2, new BiFunction<String, String, String>() {
        @Override
        public String apply(String t, String u) {
            return t+" "+u;
        }
    });
    System.out.println(result.get());
}

8艰亮、thenAcceptBoth

當兩個CompletionStage都執(zhí)行完成后,把結果一塊交給thenAcceptBoth來進行消耗

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

示例

private static void thenAcceptBoth() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        }
    });
        
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        }
    });
    f1.thenAcceptBoth(f2, new BiConsumer<Integer, Integer>() {
        @Override
        public void accept(Integer t, Integer u) {
            System.out.println("f1="+t+";f2="+u+";");
        }
    });
}

9挣郭、applyToEither 方法

兩個CompletionStage迄埃,誰執(zhí)行返回的結果快,我就用那個CompletionStage的結果進行下一步的轉化操作兑障。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

示例

private static void applyToEither() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        }
    });
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        }
    });
    
    CompletableFuture<Integer> result = f1.applyToEither(f2, new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer t) {
            System.out.println(t);
            return t * 2;
        }
    });

    System.out.println(result.get());
}

10侄非、acceptEither 方法

兩個CompletionStage,誰執(zhí)行返回的結果快流译,我就用那個CompletionStage的結果進行下一步的消耗操作逞怨。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);

示例

private static void acceptEither() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        }
    });
        
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        }
    });
    f1.acceptEither(f2, new Consumer<Integer>() {
        @Override
        public void accept(Integer t) {
            System.out.println(t);
        }
    });
}

11、runAfterEither 方法

兩個CompletionStage福澡,任何一個完成了都會執(zhí)行下一步的操作(Runnable)

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例

private static void runAfterEither() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        }
    });
        
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        }
    });
    f1.runAfterEither(f2, new Runnable() {
        
        @Override
        public void run() {
            System.out.println("上面有一個已經(jīng)完成了叠赦。");
        }
    });
}

12、runAfterBoth

兩個CompletionStage革砸,都完成了計算才會執(zhí)行下一步的操作(Runnable)

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);

示例

private static void runAfterBoth() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1="+t);
            return t;
        }
    });
        
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
        @Override
        public Integer get() {
            int t = new Random().nextInt(3);
            try {
                TimeUnit.SECONDS.sleep(t);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2="+t);
            return t;
        }
    });
    f1.runAfterBoth(f2, new Runnable() {
        
        @Override
        public void run() {
            System.out.println("上面兩個任務都執(zhí)行完成了除秀。");
        }
    });
}

13窥翩、thenCompose 方法

thenCompose 方法允許你對兩個 CompletionStage 進行流水線操作,第一個操作完成時鳞仙,將其結果作為參數(shù)傳遞給第二個操作。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

示例

private static void thenCompose() throws Exception {
        CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new Supplier<Integer>() {
            @Override
            public Integer get() {
                int t = new Random().nextInt(3);
                System.out.println("t1="+t);
                return t;
            }
        }).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
            @Override
            public CompletionStage<Integer> apply(Integer param) {
                return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        int t = param *2;
                        System.out.println("t2="+t);
                        return t;
                    }
                });
            }
            
        });
        System.out.println("thenCompose result : "+f.get());
    }
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末笔时,一起剝皮案震驚了整個濱河市棍好,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌允耿,老刑警劉巖借笙,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異较锡,居然都是意外死亡业稼,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門蚂蕴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來低散,“玉大人,你說我怎么就攤上這事骡楼∪酆牛” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵鸟整,是天一觀的道長引镊。 經(jīng)常有香客問我,道長篮条,這世上最難降的妖魔是什么弟头? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮涉茧,結果婚禮上赴恨,老公的妹妹穿的比我還像新娘。我一直安慰自己降瞳,他們只是感情好嘱支,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著挣饥,像睡著了一般除师。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上扔枫,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天汛聚,我揣著相機與錄音,去河邊找鬼短荐。 笑死倚舀,一個胖子當著我的面吹牛叹哭,可吹牛的內容都是我干的。 我是一名探鬼主播痕貌,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼风罩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了舵稠?” 一聲冷哼從身側響起超升,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎哺徊,沒想到半個月后室琢,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡落追,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年盈滴,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片轿钠。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡巢钓,死狀恐怖,靈堂內的尸體忽然破棺而出疗垛,到底是詐尸還是另有隱情竿报,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布继谚,位于F島的核電站烈菌,受9級特大地震影響,放射性物質發(fā)生泄漏花履。R本人自食惡果不足惜芽世,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望诡壁。 院中可真熱鬧济瓢,春花似錦、人聲如沸妹卿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夺克。三九已至箕宙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铺纽,已是汗流浹背柬帕。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人陷寝。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓锅很,卻偏偏與公主長得像,于是被迫代替她去往敵國和親凤跑。 傳聞我的和親對象是個殘疾皇子爆安,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內容