Java8——異步編程
異步編程
所謂異步其實(shí)就是實(shí)現(xiàn)一個(gè)無需等待被調(diào)用函數(shù)的返回值而讓操作繼續(xù)運(yùn)行的方法
創(chuàng)建任務(wù)并執(zhí)行任務(wù)
無參創(chuàng)建
CompletableFuture<String> noArgsFuture = new CompletableFuture<>();
傳入相應(yīng)任務(wù),無返回值
runAsync
方法可以在后臺執(zhí)行異步計(jì)算凭峡,但是此時(shí)并沒有返回值摧冀。持有一個(gè)Runnable
對象。
CompletableFuture noReturn = CompletableFuture.runAsync(()->{
//執(zhí)行邏輯,無返回值
});
傳入相應(yīng)任務(wù),有返回值
此時(shí)我們看到返回的是CompletableFuture<T>
此處的T
就是你想要的返回值的類型惭适。其中的Supplier<T>
是一個(gè)簡單的函數(shù)式接口癞志。
CompletableFuture<String> hasReturn = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "hasReturn";
}
});
此時(shí)可以使用lambda
表達(dá)式使上面的邏輯更加清晰
CompletableFuture<String> hasReturnLambda = CompletableFuture.supplyAsync(TestFuture::get);
private static String get() {
return "hasReturnLambda";
}
獲取返回值
異步任務(wù)也是有返回值的凄杯,當(dāng)我們想要用到異步任務(wù)的返回值時(shí),我們可以調(diào)用CompletableFuture
的get()
阻塞屯碴,直到有異步任務(wù)執(zhí)行完有返回值才往下執(zhí)行膊存。
我們將上面的get()
方法改造一下,使其停頓十秒時(shí)間今艺。
private static String get() {
System.out.println("Begin Invoke getFuntureHasReturnLambda");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
System.out.println("End Invoke getFuntureHasReturnLambda");
return "hasReturnLambda";
}
然后進(jìn)行調(diào)用
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda();
System.out.println("Main Method Is Invoking");
funtureHasReturnLambda.get();
System.out.println("Main Method End");
}
可以看到輸出如下虚缎,只有調(diào)用get()
方法的時(shí)候才會阻塞當(dāng)前線程实牡。
Main Method Is Invoking
Begin Invoke getFuntureHasReturnLambda
End Invoke getFuntureHasReturnLambda
Main Method End
自定義返回值
除了等待異步任務(wù)返回值以外创坞,我們也可以在任意時(shí)候調(diào)用complete()
方法來自定義返回值值桩。
CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda();
System.out.println("Main Method Is Invoking");
new Thread(()->{
System.out.println("Thread Is Invoking ");
try {
Thread.sleep(1000);
funtureHasReturnLambda.complete("custome value");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread End ");
}).run();
String value = funtureHasReturnLambda.get();
System.out.println("Main Method End value is "+ value);
我們可以發(fā)現(xiàn)輸出是新起線程的輸出值奔坟,當(dāng)然這是因?yàn)槲覀兊漠惒椒椒ㄔO(shè)置了等待10秒咳秉,如果此時(shí)異步方法等待1秒,新起的線程等待10秒向挖,那么輸出的值就是異步方法中的值了何之。
Main Method Is Invoking
Begin Invoke getFuntureHasReturnLambda
Thread Is Invoking
Thread End
Main Method End value is custome value
按順序執(zhí)行異步任務(wù)
如果有一個(gè)異步任務(wù)的完成需要依賴前一個(gè)異步任務(wù)的完成溶推,那么該如何寫呢蒜危?是調(diào)用get()
方法獲得返回值以后然后再執(zhí)行嗎辐赞?這樣寫有些麻煩,CompletableFuture
為我們提供了方法來完成我們想要順序執(zhí)行一些異步任務(wù)的需求新思。thenApply
表牢、thenAccept
、thenRun
這三個(gè)方法彰导。這三個(gè)方法的區(qū)別就是。
方法名 | 是否可獲得前一個(gè)任務(wù)的返回值 | 是否有返回值 |
---|---|---|
thenApply |
能獲得 | 有 |
thenAccept |
能獲得 | 無 |
thenRun |
不可獲得 | 無 |
所以一般來說thenAccept
山析、thenRun
這兩個(gè)方法在調(diào)用鏈的最末端使用笋轨。接下來我們用真實(shí)的例子感受一下。
//thenApply 可獲取到前一個(gè)任務(wù)的返回值,也有返回值
CompletableFuture<String> seqFutureOne = CompletableFuture.supplyAsync(()-> "seqFutureOne");
CompletableFuture<String> seqFutureTwo = seqFutureOne.thenApply(name -> name + " seqFutureTwo");
System.out.println(seqFutureTwo.get());
//thenAccept 可獲取到前一個(gè)任務(wù)的返回值,但是無返回值
CompletableFuture<Void> thenAccept = seqFutureOne
.thenAccept(name -> System.out.println(name + "thenAccept"));
System.out.println("-------------");
System.out.println(thenAccept.get());
//thenRun 獲取不到前一個(gè)任務(wù)的返回值,也無返回值
System.out.println("-------------");
CompletableFuture<Void> thenRun = seqFutureOne.thenRun(() -> {
System.out.println("thenRun");
});
System.out.println(thenRun.get());
返回的信息如下
seqFutureOne seqFutureTwo
seqFutureOnethenAccept
-------------
null
-------------
thenRun
null
thenApply和thenApplyAsync的區(qū)別
我們可以發(fā)現(xiàn)這三個(gè)方法都帶有一個(gè)后綴為Async
的方法赊淑,例如thenApplyAsync
爵政。那么帶Async
的方法和不帶此后綴的方法有什么不同呢?我們就以thenApply
和thenApplyAsync
兩個(gè)方法進(jìn)行對比陶缺,其他的和這個(gè)一樣的钾挟。
這兩個(gè)方法區(qū)別就在于誰去執(zhí)行這個(gè)任務(wù),如果使用thenApplyAsync
饱岸,那么執(zhí)行的線程是從ForkJoinPool.commonPool()
中獲取不同的線程進(jìn)行執(zhí)行掺出,如果使用thenApply
,如果supplyAsync
方法執(zhí)行速度特別快苫费,那么thenApply
任務(wù)就是主線程進(jìn)行執(zhí)行百框,如果執(zhí)行特別慢的話就是和supplyAsync
執(zhí)行線程一樣位仁。接下來我們通過例子來看一下钧嘶,使用sleep
方法來反應(yīng)supplyAsync
執(zhí)行速度的快慢有决。
//thenApply和thenApplyAsync的區(qū)別
System.out.println("-------------");
CompletableFuture<String> supplyAsyncWithSleep = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "supplyAsyncWithSleep Thread Id : " + Thread.currentThread();
});
CompletableFuture<String> thenApply = supplyAsyncWithSleep
.thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsync = supplyAsyncWithSleep
.thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApply.get());
System.out.println(thenApplyAsync.get());
System.out.println("-------------No Sleep");
CompletableFuture<String> supplyAsyncNoSleep = CompletableFuture.supplyAsync(()->{
return "supplyAsyncNoSleep Thread Id : " + Thread.currentThread();
});
CompletableFuture<String> thenApplyNoSleep = supplyAsyncNoSleep
.thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread());
CompletableFuture<String> thenApplyAsyncNoSleep = supplyAsyncNoSleep
.thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread());
System.out.println("Main Thread Id: "+ Thread.currentThread());
System.out.println(thenApplyNoSleep.get());
System.out.println(thenApplyAsyncNoSleep.get());
我們可以看到輸出為
-------------
Main Thread Id: Thread[main,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApply Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]
-------------No Sleep
Main Thread Id: Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApply Thread Id : Thread[main,5,main]
supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]
可以看到supplyAsync
方法執(zhí)行速度慢的話thenApply
方法執(zhí)行線程和supplyAsync
執(zhí)行線程相同,如果supplyAsync
方法執(zhí)行速度快的話苟呐,那么thenApply
方法執(zhí)行線程和Main
方法執(zhí)行線程相同。
組合CompletableFuture
將兩個(gè)CompletableFuture
組合到一起有兩個(gè)方法
-
thenCompose()
:當(dāng)?shù)谝粋€(gè)任務(wù)完成時(shí)才會執(zhí)行第二個(gè)操作 -
thenCombine()
:兩個(gè)異步任務(wù)全部完成時(shí)才會執(zhí)行某些操作
thenCompose() 用法
我們定義兩個(gè)異步任務(wù)笆呆,假設(shè)第二個(gè)定時(shí)任務(wù)需要用到第一個(gè)定時(shí)任務(wù)的返回值。
public static CompletableFuture<String> getTastOne(){
return CompletableFuture.supplyAsync(()-> "topOne");
}
public static CompletableFuture<String> getTastTwo(String s){
return CompletableFuture.supplyAsync(()-> s + " topTwo");
}
我們利用thenCompose()
方法進(jìn)行編寫
CompletableFuture<String> thenComposeComplet = getTastOne().thenCompose(s -> getTastTwo(s));
System.out.println(thenComposeComplet.get());
輸出就是
topOne topTwo
如果還記得前面的thenApply()
方法的話劣坊,應(yīng)該會想這個(gè)利用thenApply()
方法也是能夠?qū)崿F(xiàn)類似的功能的。
//thenApply
CompletableFuture<CompletableFuture<String>> thenApply = getTastOne()
.thenApply(s -> getTastTwo(s));
System.out.println(thenApply.get().get());
但是我們發(fā)現(xiàn)返回值是嵌套返回的一個(gè)類型康二,而想要獲得最終的返回值需要調(diào)用兩次get()
thenCombine() 用法
例如我們此時(shí)需要計(jì)算兩個(gè)異步方法返回值的和。求和這個(gè)操作是必須是兩個(gè)異步方法得出來值的情況下才能進(jìn)行計(jì)算诫惭,因此我們可以用thenCombine()
方法進(jìn)行計(jì)算夕土。
CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192);
CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196);
CompletableFuture<Integer> thenComposeCount = thenComposeOne
.thenCombine(thenComposeTwo, (s, y) -> s + y);
System.out.println(thenComposeCount.get());
此時(shí)thenComposeOne
和thenComposeTwo
都完成時(shí)才會調(diào)用傳給thenCombine
方法的回調(diào)函數(shù)。
組合多個(gè)CompletableFuture
在上面我們用thenCompose()
和thenCombine()
兩個(gè)方法將兩個(gè)CompletableFuture
組裝起來篮撑,如果我們想要將任意數(shù)量的CompletableFuture
組合起來呢?可以使用下面兩個(gè)方法進(jìn)行組合。
-
allOf()
:等待所有CompletableFuture
完后以后才會運(yùn)行回調(diào)函數(shù) -
anyOf()
:只要其中一個(gè)CompletableFuture
完成质欲,那么就會執(zhí)行回調(diào)函數(shù)怎憋。注意此時(shí)其他的任務(wù)也就不執(zhí)行了毕匀。
接下來演示一下兩個(gè)方法的用法
//allOf()
CompletableFuture<Integer> one = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> two = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> three = CompletableFuture.supplyAsync(() -> 3);
CompletableFuture<Integer> four = CompletableFuture.supplyAsync(() -> 4);
CompletableFuture<Integer> five = CompletableFuture.supplyAsync(() -> 5);
CompletableFuture<Integer> six = CompletableFuture.supplyAsync(() -> 6);
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(one, two, three, four, five, six);
voidCompletableFuture.thenApply(v->{
return Stream.of(one,two,three,four, five, six)
.map(CompletableFuture::join)
.collect(Collectors.toList());
}).thenAccept(System.out::println);
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
System.out.println("1");
});
我們定義了6個(gè)CompletableFuture
等待所有的CompletableFuture
等待所有任務(wù)完成以后然后將其值輸出。
anyOf()
的用法
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (Exception e) {
}
System.out.println("voidCompletableFuture1");
});
CompletableFuture<Void> voidCompletableFutur2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (Exception e) {
}
System.out.println("voidCompletableFutur2");
});
CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (Exception e) {
}
System.out.println("voidCompletableFuture3");
});
CompletableFuture<Object> objectCompletableFuture = CompletableFuture
.anyOf(voidCompletableFuture1, voidCompletableFutur2, voidCompletableFuture3);
objectCompletableFuture.get();
這里我們定義了3個(gè)CompletableFuture
進(jìn)行一些耗時(shí)的任務(wù),此時(shí)第一個(gè)CompletableFuture
會率先完成。打印結(jié)果如下土铺。
voidCompletableFuture1
異常處理
我們了解了CompletableFuture
如何異步執(zhí)行究恤,如何組合不同的CompletableFuture
,如何順序執(zhí)行CompletableFuture
窟赏。那么接下來還有一個(gè)重要的一步涯穷,就是在執(zhí)行異步任務(wù)時(shí)發(fā)生異常的話該怎么辦。我們先寫個(gè)例子赚瘦。
CompletableFuture.supplyAsync(()->{
//發(fā)生異常
int i = 10/0;
return "Success";
}).thenRun(()-> System.out.println("thenRun"))
.thenAccept(v -> System.out.println("thenAccept"));
CompletableFuture.runAsync(()-> System.out.println("CompletableFuture.runAsync"));
執(zhí)行結(jié)果為,我們發(fā)現(xiàn)只要執(zhí)行鏈中有一個(gè)發(fā)生了異常揽咕,那么接下來的鏈條也就不執(zhí)行了,但是主流程下的其他CompletableFuture
還是會運(yùn)行的逗柴。
CompletableFuture.runAsync
exceptionally()
我們可以使用exceptionally
進(jìn)行異常的處理
//處理異常
CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> {
//發(fā)生異常
int i = 10 / 0;
return "Success";
}).exceptionally(e -> {
System.out.println(e);
return "Exception has Handl";
});
System.out.println(exceptionally.get());
打印如下,可以發(fā)現(xiàn)其接收值是異常信息袍睡,也能夠返回自定義返回值控淡。
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception has Handl
handle()
調(diào)用handle()
方法也能夠捕捉到異常并且自定義返回值,他和exceptionally()
方法不同一點(diǎn)是handle()
方法無論發(fā)沒發(fā)生異常都會被調(diào)用涧狮。例子如下
System.out.println("-------有異常-------");
CompletableFuture.supplyAsync(()->{
//發(fā)生異常
int i = 10/0;
return "Success";
}).handle((response,e)->{
System.out.println("Exception:" + e);
System.out.println("Response:" + response);
return response;
});
System.out.println("-------無異常-------");
CompletableFuture.supplyAsync(()->{
return "Sucess";
}).handle((response,e)->{
System.out.println("Exception:" + e);
System.out.println("Response:" + response);
return response;
});
打印如下,我們可以看到在沒有發(fā)生異常的時(shí)候handle()
方法也被調(diào)用了
-------有異常-------
Exception:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Response:null
-------無異常-------
Exception:null
Response:Sucess