什么是 CompletableFuture?
CompletableFuture 用于 Java 中的異步編程各拷。異步編程是一種編寫非阻塞代碼的方法刁绒,方法是在與主應(yīng)用程序線程不同的線程上運行任務(wù),并通知主線程其進度烤黍、完成或失敗知市。
這樣,您的主線程不會阻塞/等待任務(wù)的完成蚊荣,它可以并行執(zhí)行其他任務(wù)初狰。擁有這種并行性極大地提高了程序的性能。
我們首先看看CompletableFuture的類圖關(guān)系互例,CompletableFuture實現(xiàn)了Future和CompletionStage接口奢入,因此看來CompletableFuture具有Future和CompletionStage的特性
Future vs CompletableFuture
CompletableFuture 是Java 8 中引入的 Java Future API的擴展。
Future 用作對異步計算結(jié)果的引用。它提供了isDone()
一種檢查計算是否完成的get()
方法腥光,以及一種在計算完成時檢索計算結(jié)果的方法关顷。
Future API 是向 Java 異步編程邁出的一大步,但它缺乏一些重要且有用的特性
Future局限性
-
無法手動完成:
假設(shè)您編寫了一個接口來從遠程 API 獲取商品信息武福。由于此 API 調(diào)用非常耗時议双,因此您在單獨的線程中運行它并從您的函數(shù)中返回一個 Future。
現(xiàn)在捉片,讓我們假設(shè)平痰,如果遠程API服務(wù)關(guān)閉了,那么您希望根據(jù)商品的最后一次緩存價格手動完成Future伍纫。你能在Future上做到這一點嗎?不!
-
你不能在沒有阻塞的情況下對 Future 的結(jié)果執(zhí)行進一步的操作:
Future 不會通知你它的完成宗雇。它提供了一個阻塞
get()
方法,直到結(jié)果可用莹规。您無法將回調(diào)函數(shù)附加到 Future 并在 Future 的結(jié)果可用時自動調(diào)用它赔蒲。
-
多個Future不能鏈接在一起:
有時您需要執(zhí)行長時間運行的計算,當(dāng)計算完成后良漱,您需要將其結(jié)果發(fā)送到另一個長時間運行的計算舞虱,依此類推。
-
您不能將多個Future組合在一起:
假設(shè)您有 10 個不同的 Futures 想要并行運行母市,然后在它們?nèi)客瓿珊筮\行某個函數(shù)矾兜。你不能用 Future 做到這一點。
-
無異常處理:
Future API 沒有任何異常處理結(jié)構(gòu)窒篱。
CompletableFuture提供的靜態(tài)方法
// 無返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 無返回值 可以自定義線程池
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 有返回值 可以自定義線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
supply開頭:這種方法焕刮,可以返回異步線程執(zhí)行之后的結(jié)果。
run開頭:這種不會返回結(jié)果墙杯,就只是執(zhí)行線程任務(wù)配并。
如果你想異步運行一些后臺任務(wù)并且不想從任務(wù)中返回任何東西,那么你可以使用run開頭的
舉個例子:
CompletableFuture.runAsync(() -> System.out.println("執(zhí)行無返回結(jié)果的異步任務(wù)"));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行有返回值的異步任務(wù)");
return "hello";
});
我們可以通過get()或者join()方法來獲取返回的結(jié)果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行有返回值的異步任務(wù)");
return "hello";
});
future.get();
輸出:hello
那么這里我們總結(jié)一下
join&get&getNow的區(qū)別
join()與get() 區(qū)別在于join() 返回計算的結(jié)果或者拋出一個unchecked異常(CompletionException)高镐,
而get() 返回一個具體的異常(ExecutionException, InterruptedException).
getNow() 則有所區(qū)別溉旋,也就是返回當(dāng)前執(zhí)行好的結(jié)果 如果當(dāng)前為執(zhí)行完 則
參數(shù)valueIfAbsent的意思是當(dāng)計算結(jié)果不存在或者當(dāng)前時刻沒有完成任務(wù),給定一個確定的值嫉髓。
getNow()舉個例子:
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模擬業(yè)務(wù)時長
ThreadUtil.sleep(500000);
System.out.println("執(zhí)行有返回值的異步任務(wù)");
return "你好 2022";
});
System.out.println(future2.getNow("沒有完成任務(wù)"));
輸出:沒有完成任務(wù)
但是注意9劾啊!!該CompletableFuture.get()
方法是阻塞的算行。它一直等到 Future 完成并在完成后返回結(jié)果梧油。但是,這樣不相當(dāng)同步阻塞了嘛州邢?這不是我們想要的儡陨。我們想真正構(gòu)建我們的異步操作。
可以使用thenApply(),thenAccept(),thenRun()方法附加到CompletableFuture上
1. thenApply()方法
可以使用thenApply()
方法在 CompletableFuture 到達時對其進行處理和轉(zhuǎn)換。它將Function < T,R >作為參數(shù)骗村。Function < T,R >是一個簡單的函數(shù)式接口嫌褪,表示一個接受 T 類型參數(shù)并產(chǎn)生 R 類型結(jié)果的函數(shù)
什么意思呢,也就是拿到上一個異步線程返回的結(jié)構(gòu)進行后續(xù)處理
舉個例子
打個比方我想獲取訂單的詳情頁胚股,那么訂單詳情中間又有一個獲取商品信息的操作笼痛,那么我需要一直等待等到拿到商品信息之后才進行訂單詳情的操作嘛,這樣不就是同步并阻塞的嗎琅拌。那么我們可以獲商品信息之后再thenApply()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模擬業(yè)務(wù)時長
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "商品信息";
});
CompletableFuture<String> resultFuture = future.thenApply(name -> "訂單信息 " + name);
System.out.println(resultFuture.get());
輸出:訂單信息 商品信息 訂單編號信息
我們總結(jié)一下:
thenApply()是線程的后續(xù)操作缨伊,并且有這么一個特性,可以拿到上一次線程執(zhí)行的返回結(jié)果這本次thenApply()的參數(shù)一直傳遞下去财忽。 并且是有返回結(jié)果的倘核。
小提示:方法中帶Async后綴的都是異步操作例如:thenApplyAsync(),以下同理即彪。
2. thenAccept() 和 thenRun()方法
如果你不想從你的回調(diào)函數(shù)中返回任何東西,只想在 Future 完成后運行一些代碼活尊,那么你可以使用thenAccept()
andthenRun()
方法劲件。這些方法是消費者Consumer<? super T> action章蚣,通常用作回調(diào)鏈中的最后一個回調(diào)。
舉個例子:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "商品信息");
future.thenAccept(product -> System.out.println("我拿到了 " + product));
輸出:我拿到了 商品信息
但是thenRun()無法拿到上一次的結(jié)果,thenRun()意為然后執(zhí)行什么弄屡,里面接收一個Runnable參數(shù)
最后貼出這些接口:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
關(guān)于方法里面的參數(shù)Executor問題
runAsync方法和supplyAsync方法第二個參數(shù)需要指定 線程池Executor,如果不指定荔茬,會使用默認(rèn)的線程池ForkJoinPool霞揉,關(guān)于什么是ForkJoinPool,可以自行查閱癣猾,大致是:
ForkJoinPool就是用來解決這種問題的:將一個大任務(wù)拆分成多個小任務(wù)后敛劝,
使用fork可以將小任務(wù)分發(fā)給其他線程同時處理,使用join可以將多個線程處理的結(jié)果進行匯總纷宇;這實際上就是分治思想的并行版本夸盟。
這些線程都是Daemon線程,主線程結(jié)束Daemon線程不結(jié)束像捶,只有JVM關(guān)閉時上陕,生命周期終止
當(dāng)然我們也可以自己實現(xiàn)線程池
3.complete()
注意complete()其實也是個消費操作,但是與thenRun()不同的是拓春,里面可以可拋出的異常
// 區(qū)別就是不是異步處理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
// 使用異步處理
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
// 區(qū)別在于可以指定線程池
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
// 接收一個可拋出的異常释簿,且必須有返回值
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("執(zhí)行結(jié)束!");
return "hello";
});
future.whenComplete((t, action) -> System.out.println(t + " 執(zhí)行完成硼莽!"));
// 異常時執(zhí)行
future.exceptionally(t -> {
System.out.println("執(zhí)行失斒堋:" + t.getMessage());
return "異常";
}).join();
執(zhí)行結(jié)果:
null 執(zhí)行完成!
執(zhí)行失敗:java.lang.ArithmeticException: / by zero
Process finished with exit code 0
其實說白了就是拿到上個階段線程執(zhí)行的結(jié)果當(dāng)作下個階段續(xù)操作的參數(shù)處理
4.handle()
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
handle方法集和上面的complete方法集沒有區(qū)別渐尿,同樣有兩個參數(shù)一個返回結(jié)果和可拋出異常醉途,區(qū)別就在于返回值
CompletableFuture 組合
假如你想做一個詳情頁,里面有用戶信息砖茸,商品信息隘擎,你想把他們組合到一起
當(dāng)然我們可以使用上面說到的thenApply()操作,也可以使用thenCompose()
舉個例子:
使用thenApply()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "商品信息");
CompletableFuture<String> total = future.thenApply(product -> product + " 用戶信息");
System.out.println(total.join());
輸出:商品信息 用戶信息
使用thenCompose()
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> {
// 第一個任務(wù)拿到商品信息
return "商品信息";
}).thenCompose(product -> {
// 向第二個任務(wù)傳遞參數(shù)list(上一個任務(wù)的信息)
return CompletableFuture.supplyAsync(() -> product + " 用戶信息");
});
System.out.println(productFuture.get());
輸出:商品信息 用戶信息
有什么區(qū)別呢凉夯?
上述情況的最終結(jié)果是一個嵌套的 CompletableFuture货葬,什么是嵌套?也就是total是最終返回結(jié)果total嵌套著future
如果您希望最終結(jié)果是頂級 Future,用thenCompose()方法
多個 CompletableFuture 組合在一起
假設(shè)有3個接口 獲取用戶信息,獲取商品信息劲够,獲取會員信息,先每個接口1s,那個3個接口總耗時3s震桶。按順序執(zhí)行此操作,但這將花費大量時間征绎,
所以異步線程會大大減少耗時蹲姐,增加程序的性能
CompletableFuture.allOf()
CompletableFuture.allOf()里面包含了多個CompletableFuture操作,查閱源碼
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
接收一個CompletableFuture類型的可變參數(shù)人柿。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "用戶信息");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "商品信息");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "會員信息");
CompletableFuture.allOf(future1, future2, future3);
System.out.println(future1.join());
System.out.println(future2.join());
System.out.println(future3.join());
輸出:
用戶信息
商品信息
會員信息
我們可以再看
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模擬業(yè)務(wù)時長
ThreadUtil.sleep(2000);
System.out.println("future1當(dāng)前線程:" + Thread.currentThread().getId());
return "用戶信息";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("future2當(dāng)前線程:" + Thread.currentThread().getId());
return "商品信息";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("future3當(dāng)前線程:" + Thread.currentThread().getId());
return "會員信息";
});
假設(shè)用戶接口中間做了一些業(yè)務(wù)操作柴墩,消耗了時長,那么想想凫岖,不可能總是等吧江咳,當(dāng)然不是,這里CompletableFuture會開啟多個線程去執(zhí)行操作哥放。我們打印線程歼指。
輸出
future2當(dāng)前線程:12
future3當(dāng)前線程:14
future1當(dāng)前線程:13
用戶信息
商品信息
會員信息
可以看到線程id是不一樣的
CompletableFuture.anyOf()
顧名思義是任何的意思,也就是多個任務(wù)中哪個任務(wù)先返回我就返回結(jié)果甥雕。
有的時候可能每個接口執(zhí)行的時長是不同的踩身,那我們可能需要耗時最短的結(jié)果作為代替時
舉個例子:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模擬業(yè)務(wù)時長
ThreadUtil.sleep(3000);
return "用戶信息";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
// 模擬業(yè)務(wù)時長
ThreadUtil.sleep(2000);
return "商品信息";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "會員信息";
});
CompletableFuture<Object> future = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(future.join());
輸出結(jié)果:會員信息
CompletableFuture 異常處理
如果在執(zhí)行過程中害怕出錯,可以加上異常處理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int age = -1;
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "成年人";
} else {
return "小孩";
}
}).exceptionally(ex -> {
System.out.println("年齡錯誤 " + ex.getMessage());
return "Unknown!";
});
System.out.println(future.join());
輸出:
年齡錯誤 java.lang.IllegalArgumentException: Age can not be negative
Unknown!
當(dāng)然handle()方法也可以處理異常
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int age = -1;
if(age < 0) {
throw new IllegalArgumentException("Age can not be negative");
}
if(age > 18) {
return "成年人";
} else {
return "小孩";
}
}).handle((res, ex) -> {
System.out.println("年齡錯誤: " + ex.getMessage());
System.out.println("res: " + res);
return "Unknown!";
});
System.out.println(future.join());
輸出:
年齡錯誤: java.lang.IllegalArgumentException: Age can not be negative
res: null
Unknown!
如果發(fā)生異常犀农,則res
參數(shù)將為 null惰赋,否則,ex
參數(shù)將為 null呵哨。
以上還有很多不足之處赁濒,歡迎補充