CompletionStage接口
CompletionStage代表異步計算過程中的某一個階段,一個階段完成以后可能會觸發(fā)另外一個階段婴谱。一個階段的計算執(zhí)行可以是一個Function,Consumer或者Runnable。一個階段的執(zhí)行可能是被單個階段的完成觸發(fā)罪治,也可能是由多個階段一起觸發(fā)
CompletableFuture
JDK5新增了Future接口娜遵,用于描述一個異步計算的結(jié)果蜕衡。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便设拟,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果慨仿。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源纳胧,而且也不能及時地得到計算結(jié)果镰吆。
在Java8中,CompletableFuture提供了非常強大的Future的擴展功能跑慕,可以幫助我們簡化異步編程的復雜性万皿,并且提供了函數(shù)式編程的能力摧找,可以通過回調(diào)的方式處理計算結(jié)果,也提供了轉(zhuǎn)換和組合 CompletableFuture 的方法牢硅。
它實現(xiàn)了Future和CompletionStage接口
CompletableFuture實現(xiàn)了Future接口的如下策略:
- CompletableFuture無法直接控制完成蹬耘,所以cancel操作被視為是另一種異常完成形式。方法isCompletedExceptionally可以用來確定一個CompletableFuture是否以任何異常的方式完成减余。
- 以一個CompletionException為例综苔,方法get()和get(long,TimeUnit)拋出一個ExecutionException,對應(yīng)CompletionException位岔。為了在大多數(shù)上下文中簡化用法如筛,這個類還定義了方法join()和getNow,而不是直接在這些情況中直接拋出CompletionException抒抬。
CompletableFuture實現(xiàn)了CompletionStage接口的如下策略:
- 為了完成當前的CompletableFuture接口或者其他完成方法的回調(diào)函數(shù)的線程杨刨,提供了非異步的完成操作。
- 沒有顯式入?yún)xecutor的所有async方法都使用ForkJoinPool.commonPool()為了簡化監(jiān)視擦剑、調(diào)試和跟蹤妖胀,所有生成的異步任務(wù)都是標記接口AsynchronousCompletionTask的實例。
- 所有的CompletionStage方法都是獨立于其他共有方法實現(xiàn)的抓于,因此一個方法的行為不會受到子類中其他方法的覆蓋做粤。
CompletableFuture 提供了四個靜態(tài)方法來創(chuàng)建一個異步操作。
方法 |
---|
CompletableFuture<Void> runAsync(Runnable runnable) |
CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) |
supplyAsync(Supplier<U> supplier) |
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) |
static void runAsync() throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.runAsync(()->{
String value = "this is runAsync";
System.out.println(value);
});
Object o = future.get();
System.out.println(o);
}
static void supplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
String value = "this is supplyAsync";
System.out.println(value);
return value;
});
String o = future.get();
System.out.println(o);
}
其中supplyAsync用于有返回值的任務(wù)捉撮,runAsync則用于沒有返回值的任務(wù)怕品。Executor參數(shù)可以手動指定線程池,否則默認ForkJoinPool.commonPool()系統(tǒng)級公共線程池
計算結(jié)果完成時的回調(diào)方法
當CompletableFuture的計算結(jié)果完成巾遭,或者拋出異常的時候肉康,可以執(zhí)行特定的Action。
方法 |
---|
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) |
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) |
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) |
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn) |
static void whenComplete(){
CompletableFuture.supplyAsync(()->{
String value = "this is supplyAsync";
System.out.println(value);
return value;
}).whenComplete((s, throwable) -> {
System.out.println("this is accept");
System.out.println(s);
System.out.println(throwable);
});
}
static void exceptionally() throws ExecutionException, InterruptedException {
CompletableFuture future = CompletableFuture.supplyAsync(()->{
String value = "this is supplyAsync";
System.out.println(Integer.valueOf(value));
return value;
}).exceptionally(throwable -> {
System.out.println(throwable);
return "執(zhí)行報錯";
});
Object o = future.get();
System.out.println(o);
}
whenComplete 和 whenCompleteAsync 的區(qū)別:
whenComplete:是執(zhí)行當前任務(wù)的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務(wù)灼舍。
whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個任務(wù)繼續(xù)提交給線程池來進行執(zhí)行吼和。
thenApply 方法
當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化骑素。
方法 |
---|
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) |
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) |
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) |
handle 方法
handle 是執(zhí)行任務(wù)完成時對結(jié)果的處理炫乓。handle 方法和 thenApply 方法處理方式基本一樣。不同的是 handle 是在任務(wù)完成后再執(zhí)行献丑,還可以處理異常的任務(wù)末捣。thenApply 只可以執(zhí)行正常的任務(wù),任務(wù)出現(xiàn)異常則不執(zhí)行 thenApply 方法创橄。
方法 |
---|
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); |
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); |
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor); |
thenAccept 消費處理結(jié)果
接收任務(wù)的處理結(jié)果箩做,并消費處理,無返回結(jié)果妥畏。
方法 |
---|
public CompletionStage<Void> thenAccept(Consumer<? super T> action); |
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); |
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); |
thenRun 方法
跟 thenAccept 方法不一樣的是邦邦,不關(guān)心任務(wù)的處理結(jié)果安吁。只要上面的任務(wù)執(zhí)行完成,就開始執(zhí)行 thenAccept 燃辖。
方法 |
---|
public CompletionStage<Void> thenRun(Runnable action) |
public CompletionStage<Void> thenRunAsync(Runnable action) |
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor) |
thenCombine 合并任務(wù)
thenCombine 會把 兩個 CompletionStage 的任務(wù)都執(zhí)行完成后鬼店,把兩個任務(wù)的結(jié)果一塊交給 thenCombine 來處理。
方法 |
---|
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); |
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); |
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); |
thenAcceptBoth
當兩個CompletionStage都執(zhí)行完成后黔龟,把結(jié)果一塊交給thenAcceptBoth來進行消耗
方法 |
---|
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) |
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) |
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) |
applyToEither 方法
兩個CompletionStage薪韩,誰執(zhí)行返回的結(jié)果快,我就用那個CompletionStage的結(jié)果進行下一步的轉(zhuǎn)化操作捌锭。
方法 |
---|
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn) |
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn) |
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor) |
acceptEither 方法
兩個CompletionStage,誰執(zhí)行返回的結(jié)果快罗捎,我就用那個CompletionStage的結(jié)果進行下一步的消耗操作观谦。
方法 |
---|
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); |
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); |
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor); |
runAfterEither 方法
兩個CompletionStage,任何一個完成了都會執(zhí)行下一步的操作(Runnable)
方法 |
---|
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); |
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); |
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor); |
runAfterBoth
兩個CompletionStage桨菜,都完成了計算才會執(zhí)行下一步的操作(Runnable)
方法 |
---|
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); |
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); |
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); |
thenCompose
thenCompose 方法允許你對兩個 CompletionStage 進行流水線操作豁状,第一個操作完成時,將其結(jié)果作為參數(shù)傳遞給第二個操作倒得。
方法 |
---|
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); |
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ; |
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ; |