更多 Java 并發(fā)編程方面的文章,請參見文集《Java 并發(fā)編程》
所謂異步調(diào)用其實(shí)就是實(shí)現(xiàn)一個(gè)可無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法。在 Java 語言中,簡單的講就是另啟一個(gè)線程來完成調(diào)用中的部分計(jì)算闯割,使調(diào)用繼續(xù)運(yùn)行或返回,而不需要等待計(jì)算結(jié)果竿拆。但調(diào)用者仍需要取線程的計(jì)算結(jié)果宙拉。
關(guān)于 Java Future,請首先參見
JDK5 新增了 Future 接口丙笋,用于描述一個(gè)異步計(jì)算的結(jié)果谢澈。雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對于結(jié)果的獲取卻是很不方便御板,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果锥忿。 例如:
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
// 在 Java8 中,推薦使用 Lambda 來替代匿名 Callable 實(shí)現(xiàn)類
Future<Integer> f = es.submit(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 123;
});
// 當(dāng)前 main 線程阻塞怠肋,直至 future 得到值
System.out.println(f.get());
es.shutdown();
}
阻塞的方式顯然和我們的異步編程的初衷相違背敬鬓,輪詢的方式又會(huì)耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果笙各,為什么不能用觀察者設(shè)計(jì)模式呢钉答?即當(dāng)計(jì)算結(jié)果完成及時(shí)通知監(jiān)聽者。(例如通過回調(diào)的方式)
關(guān)于 Future
接口杈抢,還有如下一段描述:
The Future interface was added in Java 5 to serve as a result of an asynchronous computation, but it did not have any methods to combine these computations or handle possible errors.
不能很好地組合多個(gè)異步任務(wù)数尿,也不能處理可能的異常。
CompletableFuture
Java 8 中, 新增加了一個(gè)包含 50 個(gè)方法左右的類 CompletableFuture
惶楼,它提供了非常強(qiáng)大的 Future
的擴(kuò)展功能砌创,可以幫助我們簡化異步編程的復(fù)雜性虏缸,并且提供了函數(shù)式編程的能力,可以通過回調(diào)的方式處理計(jì)算結(jié)果嫩实,也提供了轉(zhuǎn)換和組合 CompletableFuture
的方法。
對于阻塞或者輪詢方式窥岩,依然可以通過 CompletableFuture
類的 CompletionStage
和 Future
接口方式支持甲献。
CompletableFuture
類聲明了 CompletionStage
接口,CompletionStage
接口實(shí)際上提供了同步或異步運(yùn)行計(jì)算的舞臺颂翼,所以我們可以通過實(shí)現(xiàn)多個(gè) CompletionStage
命令晃洒,并且將這些命令串聯(lián)在一起的方式實(shí)現(xiàn)多個(gè)命令之間的觸發(fā)。
我們可以通過 CompletableFuture.supplyAsync(this::sendMsg);
這么一行代碼創(chuàng)建一個(gè)簡單的異步計(jì)算朦乏。在這行代碼中球及,supplyAsync
支持異步地執(zhí)行我們指定的方法,這個(gè)例子中的異步執(zhí)行方法是 sendMsg
呻疹。當(dāng)然吃引,我們也可以使用 Executor
執(zhí)行異步程序,默認(rèn)是 ForkJoinPool.commonPool()
刽锤。
我們也可以在異步計(jì)算結(jié)束之后指定回調(diào)函數(shù)镊尺,例如 CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);
這行代碼中的 thenAccept
被用于增加回調(diào)函數(shù),在我們的示例中 notify
就成了異步計(jì)算的消費(fèi)者并思,它會(huì)處理計(jì)算結(jié)果庐氮。
CompletionStage<T> 接口
A stage of a possibly asynchronous computation, that performs an action or computes a value when another CompletionStage completes.
A stage completes upon termination of its computation, but this may in turn trigger other dependent stages.
一個(gè)可能執(zhí)行的異步計(jì)算的某個(gè)階段,在另一個(gè)CompletionStage完成時(shí)執(zhí)行一個(gè)操作或計(jì)算一個(gè)值宋彼。
一個(gè)階段完成后弄砍,其計(jì)算結(jié)束。但是输涕,該計(jì)算階段可能會(huì)觸發(fā)下一個(gè)計(jì)算階段音婶。
最簡單的例子
CompletableFuture
實(shí)際上也實(shí)現(xiàn)了 Future
接口:
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
所以我們也可以利用 CompletableFuture
來實(shí)現(xiàn)基本的 Future
功能,例如:
public static void main(String[] args) throws Exception {
CompletableFuture future = new CompletableFuture();
// 在 Java8 中占贫,推薦使用 Lambda 來替代匿名 Runnable 實(shí)現(xiàn)類
new Thread(
() -> {
try {
// 模擬一段耗時(shí)的操作
Thread.sleep(2000);
future.complete("I have completed");
} catch (Exception e) {
}
}
).start();
System.out.println(future.get());
}
此時(shí)此刻主線程 future.get()
將得到字符串的結(jié)果 I have completed
桃熄,同時(shí)完成回調(diào)以后將會(huì)立即生效。注意 complete()
方法只能調(diào)用一次型奥,后續(xù)調(diào)用將被忽略瞳收。
注意:get()
方法可能會(huì)拋出異常 InterruptedException
和 ExecutionException
。
如果我們已經(jīng)知道了異步任務(wù)的結(jié)果厢汹,我們也可以直接創(chuàng)建一個(gè)已完成的 future
螟深,如下:
public static void main(String[] args) throws Exception {
// Returns a new CompletableFuture that is already completed with the given value.
CompletableFuture future = CompletableFuture.completedFuture("I have completed");
System.out.println(future.get());
}
如果在異步執(zhí)行過程中,我們覺得執(zhí)行會(huì)超時(shí)或者會(huì)出現(xiàn)問題烫葬,我們也可以通過 cancle()
方法取消界弧,此時(shí)調(diào)用 get()
方法時(shí)會(huì)產(chǎn)生異常 java.util.concurrent.CancellationException
凡蜻,代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture future = new CompletableFuture();
// 在 Java8 中,推薦使用 Lambda 來替代匿名 Runnable 實(shí)現(xiàn)類
new Thread(
() -> {
try {
// 模擬一段耗時(shí)的操作
Thread.sleep(2000);
future.cancel(false);
} catch (Exception e) {
}
}
).start();
System.out.println(future.get());
}
使用工廠方法創(chuàng)建 CompletableFuture
在上述的代碼中垢箕,我們手動(dòng)地創(chuàng)建 CompletableFuture
划栓,并且手動(dòng)的創(chuàng)建一個(gè)線程(或者利用線程池)來啟動(dòng)異步任務(wù),這樣似乎有些復(fù)雜条获。
其實(shí)我們可以利用 CompletableFuture
的工廠方法忠荞,傳入 Supplier
或者 Runnable
的實(shí)現(xiàn)類,直接得到一個(gè) CompletableFuture
的實(shí)例:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
第一個(gè)和第三個(gè)方法帅掘,沒有 Executor
參數(shù)委煤,將會(huì)使用 ForkJoinPool.commonPool()
(全局的,在 JDK8 中介紹的通用池)修档,這適用于 CompletableFuture
類中的大多數(shù)的方法碧绞。
-
Runnable
接口方法public abstract void run();
沒有返回值 -
Supplier
接口方法T get();
有返回值。如果你需要處理異步操作并返回結(jié)果吱窝,使用前兩種Supplier<U>
方法
一個(gè)小的 Tips:
Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions thanks to the new Java 8 feature. 使用 Lambda 表達(dá)式來傳入
Supplier
或者Runnable
的實(shí)現(xiàn)類讥邻。
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
// 在 Java8 中,推薦使用 Lambda 來替代匿名 Supplier 實(shí)現(xiàn)類
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "I have completed";
});
System.out.println(future.get());
}
轉(zhuǎn)換和作用于異步任務(wù)的結(jié)果 (thenApply)
我們可以疊加功能癣诱,把多個(gè) future
組合在一起等
-
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
- 該方法的作用是在該計(jì)算階段正常完成后计维,將該計(jì)算階段的結(jié)果作為參數(shù)傳遞給參數(shù)
fn
值的函數(shù)Function
,并會(huì)返回一個(gè)新的CompletionStage
- 該方法的作用是在該計(jì)算階段正常完成后计维,將該計(jì)算階段的結(jié)果作為參數(shù)傳遞給參數(shù)
-
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
- 該方法和上面的方法
thenApply
功能類似撕予,不同的是對該計(jì)算階段的結(jié)果進(jìn)行計(jì)算的函數(shù)fn
的執(zhí)行時(shí)異步的鲫惶。
- 該方法和上面的方法
-
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- 該方法和上面的方法
thenApplyAsync
功能類似,不同的是對該計(jì)算階段的結(jié)果進(jìn)行計(jì)算的函數(shù)fn
的執(zhí)行時(shí)異步的实抡, 并且是在調(diào)用者提供的線程池中執(zhí)行的欠母。
- 該方法和上面的方法
Function
接口方法 R apply(T t);
包含一個(gè)參數(shù)和一個(gè)返回值
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
// 在 Java8 中,推薦使用 Lambda 來替代匿名 Supplier 實(shí)現(xiàn)類
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "I have completed";
});
CompletableFuture<String> upperfuture = future.thenApply(String::toUpperCase);
System.out.println(upperfuture.get());
}
運(yùn)行完成的異步任務(wù)的結(jié)果 (thenAccept/thenRun)
在 future
的管道里有兩種典型的“最終”階段方法吆寨。他們在你使用 future
的值的時(shí)候做好準(zhǔn)備赏淌,當(dāng)
thenAccept()
提供最終的值時(shí),thenRun
執(zhí)行 Runnable
啄清。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
Consumer
接口方法 void accept(T t);
包含一個(gè)參數(shù)六水,但是沒有返回值
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
// 在 Java8 中,推薦使用 Lambda 來替代匿名 Supplier 實(shí)現(xiàn)類
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "I have completed";
});
future.thenAccept(s -> {
System.out.println(s);
});
// Waits if necessary for this future to complete, and then returns its result.
future.get();
}
結(jié)合兩個(gè) CompletableFuture
The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.
這就是 CompletableFuture 最大的優(yōu)勢辣卒。
-
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
- 傳入前一個(gè)
CompletableFuture
的返回值掷贾,返回另外一個(gè)CompletableFuture
實(shí)例
- 傳入前一個(gè)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return "Hello ";
}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return s + "World";
}));
System.out.println(future.get()); // Hello World
}
上述功能也可以通過 thenCombine()
方法實(shí)現(xiàn),傳入一個(gè) BiFunction
接口的實(shí)例(以 Lambda 形式) 例如:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
return "Hello ";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "World";
}), (s1, s2) -> s1 + s2);
System.out.println(future.get());
}
并行執(zhí)行多個(gè)異步任務(wù)
有時(shí)候我們可能需要等待所有的異步任務(wù)都執(zhí)行完畢荣茫,然后組合他們的結(jié)果想帅。我們可以使用 allOf()
方法:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2);
// 這個(gè)方法不會(huì)合并結(jié)果,可以看到他的返回值是 Void 類型
combinedFuture.get();
// 我們需要手動(dòng)來處理每一個(gè)并行異步任務(wù)的結(jié)果
String combined = Stream.of(future1, future2)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
System.out.println(combined); // Hello World
}
有時(shí)候我們可能不需要等待所有的異步任務(wù)都執(zhí)行完畢啡莉,只要任何一個(gè)任務(wù)完成就返回結(jié)果港准。我們可以使用 anyOf()
方法:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
return "World";
}
);
CompletableFuture<Object> combinedFuture
= CompletableFuture.anyOf(future1, future2);
System.out.println(combinedFuture.get()); // Hello
}
異常的處理
我們可以在 handle()
方法里處理異常:
-
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
- 第一個(gè)參數(shù)為
CompletableFuture
返回的結(jié)果 - 第二個(gè)參數(shù)為拋出的異常
- 第一個(gè)參數(shù)為
一個(gè)示例代碼如下:
public static void main(String[] args) throws Exception {
String name = null;
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");
System.out.println(future.get()); // Hello, Stranger!
}
參考:
Java 8:CompletableFuture終極指南- ImportNew
通過實(shí)例理解JDK8 的CompletableFuture - IBM
Guide To CompletableFuture
java8中CompletableFuture解析