java多線程CompletableFuture的使用

在JDK1.8中 提供了CompletableFuture類來進行異步編程坟岔,下面我們一起看看怎么實現

1.創(chuàng)建異步任務

package com.wwj.test.thread;


import java.util.Optional;
import java.util.concurrent.*;

public class CompletableFutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //創(chuàng)建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(100);

        //創(chuàng)建有返回值的異步線程
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            //具體的業(yè)務邏輯
            int a = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
            return a;
        }, executorService).handle((value,thr)-> {
            //使用handle進行處理也可以用其它方法
            //CompletableFuture.whenComplete():用于接收帶有返回值的CompletableFuture對象顶瞳,無法修改返回值。
            //CompletableFuture.exceptionally():用于處理異常赶促,只要異步線程中有拋出異常液肌,則進入該方法,修改返回值鸥滨。
            //CompletableFuture.handle():用于處理返回結果嗦哆,可以接收返回值和異常,可以對返回值進行修改婿滓。

            if (thr != null) {
                return -1;
            }
            System.out.println("后續(xù)線程處理");
            System.out.println(Thread.currentThread().getName());
            //handle對線程的后續(xù)處理
            Optional<Integer> value1 = Optional.ofNullable(value);
            Integer integer = value1.get();
            return integer;
        });

        //創(chuàng)建無返回值線程任務
        CompletableFuture.runAsync(()->{
            //具體的業(yè)務邏輯
            int a = 10 / 2;
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName());
            System.out.println(a);
        },executorService);

        System.out.println("main 線程工作");
        int result = integerCompletableFuture.get();
        System.out.println(result);
    }
}

2.多異步任務進行組合

2.1多異步任務串行

package com.wwj.test.thread;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //創(chuàng)建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(100);

        //創(chuàng)建有返回值的異步線程
        CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
            //具體的業(yè)務邏輯
            System.out.println(Thread.currentThread().getName());
            System.out.println("任務1執(zhí)行");
            return 0;
        });

        //在上一個異步線程完成后執(zhí)行下一個異步線程(異步線程的串行)
        CompletableFuture<Integer> integerCompletableFuture1 = integerCompletableFuture.thenApplyAsync((value -> {
            System.out.println(Thread.currentThread().getName());
            System.out.println("任務1的返回值" + value);
            return 2;
        }), executorService);

        System.out.println("main 線程工作");
        int result = integerCompletableFuture1.get();
        System.out.println(result);

        //注:
        // 使線程串行執(zhí)行老速,無入參,無返回值
        //public CompletableFuture<Void> thenRun(Runnable action);
        //public CompletableFuture<Void> thenRunAsync(Runnable action);
        //public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor);

        //// 使線程串行執(zhí)行凸主,有入參橘券,無返回值
        //public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
        //public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
        //public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

        //// 使線程串行執(zhí)行,有入參卿吐,有返回值
        //public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
        //public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
        //public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
    }
}

2.2兩個任務并行執(zhí)行完成后再執(zhí)行下一個任務

package com.wwj.test.thread;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest2 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //創(chuàng)建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(100);

        // 任務1
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務1線程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務1結束:");

            return i;
        }, executorService);

        // 任務2
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務2線程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務2結束");
            return 0;
        }, executorService);

        // 線程并行執(zhí)行完成旁舰,并且執(zhí)行新任務action,新任務無入參嗡官,無返回值
        //public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
        //public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);
        //public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);

        // 線程并行執(zhí)行完成箭窜,并且執(zhí)行新任務action,新任務有入參衍腥,無返回值
        //public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
        //public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
        //public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);

        // 線程并行執(zhí)行完成磺樱,并且執(zhí)行新任務action,新任務有入參婆咸,有返回值
        //public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
        //public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
        //public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);

        CompletableFuture<Integer> integerCompletableFuture = completableFuture1.thenCombineAsync(completableFuture2, (value1, value2) -> {
            System.out.println("接收前任務參數:" + value1 + ":" + value2);
            System.out.println("任務3線程:" + Thread.currentThread().getName());
            return 3;
        },executorService);

        System.out.println("main 線程工作");
        int result = integerCompletableFuture.get();
        System.out.println(result);
    }
}

2.3 兩個異步任務只要其中一個執(zhí)行完就執(zhí)行下一個異步任務

package com.wwj.test.thread;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //創(chuàng)建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(100);

        // 任務1
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務1線程:" + Thread.currentThread().getName());
            int i = 10 / 2;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務1結束:");

            return i;
        }, executorService);

        // 任務2
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("任務2線程:" + Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務2結束");
            return 0;
        }, executorService);

        // 任務并行執(zhí)行竹捉,只要其中有一個執(zhí)行完,就開始執(zhí)行新任務action擅耽,新任務無入參活孩,無返回值
        //public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action);
        //public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
        //public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);

        // 任務并行執(zhí)行,只要其中有一個執(zhí)行完,就開始執(zhí)行新任務action憾儒,新任務有入參(入參類型為Object询兴,因為不確定是哪個任務先執(zhí)行完成),無返回值
        //public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
        //public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
        //public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor);

        // 任務并行執(zhí)行起趾,只要其中有一個執(zhí)行完诗舰,就開始執(zhí)行新任務action,新任務有入參(入參類型為Object训裆,因為不確定是哪個任務先執(zhí)行完成)眶根,有返回值
        //public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
        //public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
        //public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);

        CompletableFuture<Integer> integerCompletableFuture = completableFuture1.applyToEitherAsync(completableFuture2, (value) -> {
            System.out.println("接收前任務參數:" + value.toString());
            System.out.println("任務3線程:" + Thread.currentThread().getName());
            return 3;
        },executorService);

        System.out.println("main 線程工作");
        int result = integerCompletableFuture.get();
        System.out.println(result);
    }
}

2.4 多任務組合(超過兩個任務)

package com.wwj.test.thread;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureTest4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //創(chuàng)建線程池
        ExecutorService executorService = Executors.newFixedThreadPool(100);

        // 任務1
        CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> 1, executorService);
        // 任務2
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2, executorService);
        // 任務3
        CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> 3, executorService);

        //返回3個任務中最快執(zhí)行任務完成的結果
        CompletableFuture<Object> anyOf = completableFuture1.anyOf(completableFuture1, completableFuture2, completableFuture3);
        Object o1 = anyOf.get();
        Object o2 = anyOf.join();

        //等待3個任務全部執(zhí)行完畢,在逐一拿回返回結果
        //completableFuture1.allOf(completableFuture1, completableFuture2, completableFuture3);
        
        System.out.println("main 線程工作");
        System.out.println(o1+":"+o2);
    }
}
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末边琉,一起剝皮案震驚了整個濱河市属百,隨后出現的幾起案子,更是在濱河造成了極大的恐慌变姨,老刑警劉巖族扰,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異定欧,居然都是意外死亡渔呵,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門砍鸠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扩氢,“玉大人,你說我怎么就攤上這事爷辱÷疾颍” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵托嚣,是天一觀的道長巩检。 經常有香客問我,道長示启,這世上最難降的妖魔是什么兢哭? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮夫嗓,結果婚禮上迟螺,老公的妹妹穿的比我還像新娘。我一直安慰自己舍咖,他們只是感情好矩父,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著排霉,像睡著了一般窍株。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天球订,我揣著相機與錄音后裸,去河邊找鬼。 笑死冒滩,一個胖子當著我的面吹牛微驶,可吹牛的內容都是我干的。 我是一名探鬼主播开睡,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼因苹,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了篇恒?” 一聲冷哼從身側響起扶檐,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎胁艰,沒想到半個月后蘸秘,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡蝗茁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了寻咒。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哮翘。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖毛秘,靈堂內的尸體忽然破棺而出饭寺,到底是詐尸還是另有隱情,我是刑警寧澤叫挟,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布艰匙,位于F島的核電站,受9級特大地震影響抹恳,放射性物質發(fā)生泄漏员凝。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一奋献、第九天 我趴在偏房一處隱蔽的房頂上張望健霹。 院中可真熱鬧,春花似錦瓶蚂、人聲如沸糖埋。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瞳别。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間祟敛,已是汗流浹背疤坝。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留垒棋,地道東北人卒煞。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像叼架,于是被迫代替她去往敵國和親畔裕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內容