前提概要
在java8以前熬甫,我們使用java的多線程編程,一般是通過(guò)Runnable中的run方法來(lái)完成,這種方式唉锌,有個(gè)很明顯的缺點(diǎn),就是诗舰,沒(méi)有返回值警儒。這時(shí)候,大家可能會(huì)去嘗試使用Callable中的call方法眶根,然后用Future返回結(jié)果蜀铲,如下:
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> stringFuture = executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "async thread";
}
});
Thread.sleep(1000);
System.out.println("main thread");
System.out.println(stringFuture.get());
}
通過(guò)觀察控制臺(tái),我們發(fā)現(xiàn)先打印 main thread ,一秒后打印 async thread,似乎能滿足我們的需求属百。但仔細(xì)想我們發(fā)現(xiàn)一個(gè)問(wèn)題记劝,當(dāng)調(diào)用future的get()方法時(shí),當(dāng)前主線程是堵塞的族扰,這好像并不是我們想看到的厌丑。
另一種獲取返回結(jié)果的方式是先輪詢,可以調(diào)用isDone,等完成再獲取渔呵,但這也不能讓我們滿意.
很多個(gè)異步線程執(zhí)行時(shí)間可能不一致,我的主線程業(yè)務(wù)不能一直等著,這時(shí)候我可能會(huì)想要只等最快的線程執(zhí)行完或者最重要的那個(gè)任務(wù)執(zhí)行完,亦或者我只等1秒鐘,至于沒(méi)返回結(jié)果的線程我就用默認(rèn)值代替.
我兩個(gè)異步任務(wù)之間執(zhí)行獨(dú)立,但是第二個(gè)依賴第一個(gè)的執(zhí)行結(jié)果.
java8的CompletableFuture,就在這混亂且不完美的多線程江湖中閃亮登場(chǎng)了.CompletableFuture讓Future的功能和使用場(chǎng)景得到極大的完善和擴(kuò)展,提供了函數(shù)式編程能力,使代碼更加美觀優(yōu)雅,而且可以通過(guò)回調(diào)的方式計(jì)算處理結(jié)果,對(duì)異常處理也有了更好的處理手段.
CompletableFuture源碼中有四個(gè)靜態(tài)方法用來(lái)執(zhí)行異步任務(wù):
創(chuàng)建任務(wù)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
如果有多線程的基礎(chǔ)知識(shí)怒竿,我們很容易看出,run開(kāi)頭的兩個(gè)方法厘肮,用于執(zhí)行沒(méi)有返回值的任務(wù)愧口,因?yàn)樗娜雲(yún)⑹荝unnable對(duì)象。
而supply開(kāi)頭的方法顯然是執(zhí)行有返回值的任務(wù)了类茂,至于方法的入?yún)⑺J簦绻麤](méi)有傳入Executor對(duì)象將會(huì)使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼.在實(shí)際使用中,一般我們使用自己創(chuàng)建的線程池對(duì)象來(lái)作為參數(shù)傳入使用托嚣,這樣速度會(huì)快些.
執(zhí)行異步任務(wù)的方式也很簡(jiǎn)單,只需要使用上述方法就可以了:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//....執(zhí)行任務(wù)
return "hello";
}, executor)
接下來(lái)看一下獲取執(zhí)行結(jié)果的幾個(gè)方法。
V get();
V get(long timeout,Timeout unit);
T getNow(T defaultValue);
T join();
上面兩個(gè)方法是Future中的實(shí)現(xiàn)方式厚骗,get()會(huì)堵塞當(dāng)前的線程示启,這就造成了一個(gè)問(wèn)題,如果執(zhí)行線程遲遲沒(méi)有返回?cái)?shù)據(jù),get()會(huì)一直等待下去,因此,第二個(gè)get()方法可以設(shè)置等待的時(shí)間.
getNow()方法比較有意思,表示當(dāng)有了返回結(jié)果時(shí)會(huì)返回結(jié)果领舰,如果異步線程拋了異常會(huì)返回自己設(shè)置的默認(rèn)值.
接下來(lái)以一些場(chǎng)景的實(shí)例來(lái)介紹一下CompletableFuture中其他一些常用的方法
thenAccept()
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
- 功能:當(dāng)前任務(wù)正常完成以后執(zhí)行夫嗓,當(dāng)前任務(wù)的執(zhí)行結(jié)果可以作為下一任務(wù)的輸入?yún)?shù),無(wú)返回值.
- 場(chǎng)景:執(zhí)行任務(wù)A,同時(shí)異步執(zhí)行任務(wù)B冲秽,待任務(wù)B正常返回后舍咖,B的返回值執(zhí)行任務(wù)C,任務(wù)C無(wú)返回值
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務(wù)A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任務(wù)B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
System.out.println("執(zhí)行任務(wù)C.");
System.out.println("參數(shù):" + b);//參數(shù):任務(wù)B
return "a";
});
thenRun(..)
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
- 功能:對(duì)不關(guān)心上一步的計(jì)算結(jié)果锉桑,執(zhí)行下一個(gè)操作
- 場(chǎng)景:執(zhí)行任務(wù)A,任務(wù)A執(zhí)行完以后,執(zhí)行任務(wù)B,任務(wù)B不接受任務(wù)A的返回值(不管A有沒(méi)有返回值),也無(wú)返回值
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任務(wù)A");
futureA.thenRun(() -> System.out.println("執(zhí)行任務(wù)B"));
thenApply(..)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- 功能:當(dāng)前任務(wù)正常完成以后執(zhí)行排霉,當(dāng)前任務(wù)的執(zhí)行的結(jié)果會(huì)作為下一任務(wù)的輸入?yún)?shù),有返回值
- 場(chǎng)景:多個(gè)任務(wù)串聯(lián)執(zhí)行,下一個(gè)任務(wù)的執(zhí)行依賴上一個(gè)任務(wù)的結(jié)果,每個(gè)任務(wù)都有輸入和輸出
異步執(zhí)行任務(wù)A,當(dāng)任務(wù)A完成時(shí)使用A的返回結(jié)果resultA作為入?yún)⑦M(jìn)行任務(wù)B的處理民轴,可實(shí)現(xiàn)任意多個(gè)任務(wù)的串聯(lián)執(zhí)行
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureB = futureA.thenApply(s->s + " world");
CompletableFuture<String> future3 = futureB.thenApply(String::toUpperCase);
System.out.println(future3.join());
上面的代碼攻柠,我們當(dāng)然可以先調(diào)用future.join()先得到任務(wù)A的返回值,然后再拿返回值做入?yún)⑷?zhí)行任務(wù)B,而thenApply的存在就在于幫我簡(jiǎn)化了這一步,我們不必因?yàn)榈却粋€(gè)計(jì)算完成而一直阻塞著調(diào)用線程扫责,而是告訴CompletableFuture你啥時(shí)候執(zhí)行完就啥時(shí)候進(jìn)行下一步. 就把多個(gè)任務(wù)串聯(lián)起來(lái)了.
thenCombine(..) thenAcceptBoth(..) runAfterBoth(..)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
- 功能:結(jié)合兩個(gè)CompletionStage的結(jié)果给赞,進(jìn)行轉(zhuǎn)化后返回
- 場(chǎng)景:需要根據(jù)商品id查詢商品的當(dāng)前價(jià)格,分兩步,查詢商品的原始價(jià)格和折扣,這兩個(gè)查詢相互獨(dú)立,當(dāng)都查出來(lái)的時(shí)候用原始價(jià)格乘折扣,算出當(dāng)前價(jià)格. 使用方法:thenCombine(..)
CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
System.out.println("最終價(jià)格為:" + futureResult.join()); //最終價(jià)格為:80.0
- thenCombine(..)是結(jié)合兩個(gè)任務(wù)的返回值進(jìn)行轉(zhuǎn)化后再返回,那如果不需要返回呢,那就需要
- thenAcceptBoth(..),同理,如果連兩個(gè)任務(wù)的返回值也不關(guān)心呢,那就需要runAfterBoth了,如果理解了上面三個(gè)方法,thenApply,thenAccept,thenRun,這里就不需要單獨(dú)再提這兩個(gè)方法了,只在這里提一下.
thenCompose(..)
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
功能:這個(gè)方法接收的輸入是當(dāng)前的CompletableFuture的計(jì)算值,返回結(jié)果將是一個(gè)新的CompletableFuture
這個(gè)方法和thenApply非常像,都是接受上一個(gè)任務(wù)的結(jié)果作為入?yún)?執(zhí)行自己的操作,然后返回.那具體有什么區(qū)別呢?
- thenApply():它的功能相當(dāng)于將CompletableFuture<T>轉(zhuǎn)換成CompletableFuture<U>,改變的是同一個(gè)CompletableFuture中的泛型類型
- thenCompose():用來(lái)連接兩個(gè)CompletableFuture轿衔,返回值是一個(gè)新的CompletableFuture
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> futureB = futureA.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "world"));
CompletableFuture<String> future3 = futureB.thenCompose(s -> CompletableFuture.supplyAsync(s::toUpperCase));
System.out.println(future3.join());
applyToEither(..) acceptEither(..) runAfterEither(..)
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
功能:執(zhí)行兩個(gè)CompletionStage的結(jié)果,那個(gè)先執(zhí)行完了,就是用哪個(gè)的返回值進(jìn)行下一步操作
場(chǎng)景:假設(shè)查詢商品a,有兩種方式,A和B,但是A和B的執(zhí)行速度不一樣,我們希望哪個(gè)先返回就用那個(gè)的返回值.
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "通過(guò)方式A獲取商品a";
});
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "通過(guò)方式B獲取商品a";
});
CompletableFuture<String> futureC = futureA.applyToEither(futureB, product -> "結(jié)果:" + product);
System.out.println(futureC.join()); //結(jié)果:通過(guò)方式A獲取商品a
同樣的道理,applyToEither的兄弟方法還有acceptEither(),runAfterEither(),我想不需要我解釋你也知道該怎么用了.
exceptionally(..)
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
- 功能:當(dāng)運(yùn)行出現(xiàn)異常時(shí),調(diào)用該方法可進(jìn)行一些補(bǔ)償操作,如設(shè)置默認(rèn)值.
- 場(chǎng)景:異步執(zhí)行任務(wù)A獲取結(jié)果,如果任務(wù)A執(zhí)行過(guò)程中拋出異常,則使用默認(rèn)值100返回.
CompletableFuture<String> futureA = CompletableFuture.
supplyAsync(() -> "執(zhí)行結(jié)果:" + (100 / 0))
.thenApply(s -> "futureA result:" + s)
.exceptionally(e -> {
System.out.println(e.getMessage()); //java.lang.ArithmeticException: / by zero
return "futureA result: 100";
});
CompletableFuture<String> futureB = CompletableFuture.
supplyAsync(() -> "執(zhí)行結(jié)果:" + 50)
.thenApply(s -> "futureB result:" + s)
.exceptionally(e -> "futureB result: 100");
System.out.println(futureA.join());//futureA result: 100
System.out.println(futureB.join());//futureB result:執(zhí)行結(jié)果:50
上面代碼展示了正常流程和出現(xiàn)異常的情況,可以理解成catch,根據(jù)返回值可以體會(huì)下.
whenComplete(..)
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
功能:當(dāng)CompletableFuture的計(jì)算結(jié)果完成微驶,或者拋出異常的時(shí)候浪谴,都可以進(jìn)入whenComplete方法執(zhí)行,舉個(gè)栗子
CompletableFuture<String> futureA = CompletableFuture.
supplyAsync(() -> "執(zhí)行結(jié)果:" + (100 / 0))
.thenApply(s -> "apply result:" + s)
.whenComplete((s, e) -> {
if (s != null) {
System.out.println(s);//未執(zhí)行
}
if (e == null) {
System.out.println(s);//未執(zhí)行
} else {
System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
}
})
.exceptionally(e -> {
System.out.println("ex"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
return "futureA result: 100"; });
System.out.println(futureA.join());//futureA result: 100
根據(jù)控制臺(tái),我們可以看出執(zhí)行流程是這樣,supplyAsync->whenComplete->exceptionally,可以看出并沒(méi)有進(jìn)入thenApply執(zhí)行,原因也顯而易見(jiàn),在supplyAsync中出現(xiàn)了異常,thenApply只有當(dāng)正常返回時(shí)才會(huì)去執(zhí)行.而whenComplete不管是否正常執(zhí)行,還要注意一點(diǎn),whenComplete是沒(méi)有返回值的.
上面代碼我們使用了函數(shù)式的編程風(fēng)格并且先調(diào)用whenComplete再調(diào)用exceptionally,如果我們先調(diào)用exceptionally,再調(diào)用whenComplete會(huì)發(fā)生什么呢,我們看一下:
復(fù)制代碼
CompletableFuture<String> futureA = CompletableFuture.
supplyAsync(() -> "執(zhí)行結(jié)果:" + (100 / 0))
.thenApply(s -> "apply result:" + s)
.exceptionally(e -> {
System.out.println("ex:"+e.getMessage()); //ex:java.lang.ArithmeticException: / by zero
return "futureA result: 100";
})
.whenComplete((s, e) -> {
if (e == null) {
System.out.println(s);//futureA result: 100
} else {
System.out.println(e.getMessage());//未執(zhí)行
}
})
;
System.out.println(futureA.join());//futureA result: 100
代碼先執(zhí)行了exceptionally后執(zhí)行whenComplete,可以發(fā)現(xiàn),由于在exceptionally中對(duì)異常進(jìn)行了處理,并返回了默認(rèn)值,whenComplete中接收到的結(jié)果是一個(gè)正常的結(jié)果,被exceptionally美化過(guò)的結(jié)果,這一點(diǎn)需要留意一下.
handle(..)
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
功能:當(dāng)CompletableFuture的計(jì)算結(jié)果完成,或者拋出異常的時(shí)候祈搜,可以通過(guò)handle方法對(duì)結(jié)果進(jìn)行處理
CompletableFuture<String> futureA = CompletableFuture.
supplyAsync(() -> "執(zhí)行結(jié)果:" + (100 / 0))
.thenApply(s -> "apply result:" + s)
.exceptionally(e -> {
System.out.println("ex:" + e.getMessage()); //java.lang.ArithmeticException: / by zero
return "futureA result: 100";
})
.handle((s, e) -> {
if (e == null) {
System.out.println(s);//futureA result: 100
} else {
System.out.println(e.getMessage());//未執(zhí)行
}
return "handle result:" + (s == null ? "500" : s);
});
System.out.println(futureA.join());//handle result:futureA result: 100
通過(guò)控制臺(tái),我們可以看出,最后打印的是handle result:futureA result: 100,執(zhí)行exceptionally后對(duì)異常進(jìn)行了"美化",返回了默認(rèn)值,那么handle得到的就是一個(gè)正常的返回,我們?cè)僭囅?先調(diào)用handle再調(diào)用exceptionally的情況.
CompletableFuture<String> futureA = CompletableFuture.
supplyAsync(() -> "執(zhí)行結(jié)果:" + (100 / 0))
.thenApply(s -> "apply result:" + s)
.handle((s, e) -> {
if (e == null) {
System.out.println(s);//未執(zhí)行
} else {
System.out.println(e.getMessage());//java.lang.ArithmeticException: / by zero
}
return "handle result:" + (s == null ? "500" : s);
})
.exceptionally(e -> {
System.out.println("ex:" + e.getMessage()); //未執(zhí)行
return "futureA result: 100";
});
System.out.println(futureA.join());//handle result:500
根據(jù)控制臺(tái)輸出,可以看到先執(zhí)行handle,打印了異常信息,并對(duì)接過(guò)設(shè)置了默認(rèn)值500,exceptionally并沒(méi)有執(zhí)行,因?yàn)樗玫降氖莌andle返回給它的值,由此我們大概推測(cè)handle和whenComplete的區(qū)別
- 都是對(duì)結(jié)果進(jìn)行處理,handle有返回值,whenComplete沒(méi)有返回值
- 由于1的存在,使得handle多了一個(gè)特性,可在handle里實(shí)現(xiàn)exceptionally的功能
allOf(..) anyOf(..)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- allOf:當(dāng)所有的CompletableFuture都執(zhí)行完后執(zhí)行計(jì)算
- anyOf:最快的那個(gè)CompletableFuture執(zhí)行完之后執(zhí)行計(jì)算
場(chǎng)景二:查詢一個(gè)商品詳情,需要分別去查商品信息,賣(mài)家信息,庫(kù)存信息,訂單信息等,這些查詢相互獨(dú)立,在不同的服務(wù)上,假設(shè)每個(gè)查詢都需要一到兩秒鐘,要求總體查詢時(shí)間小于2秒.
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(4);
long start = System.currentTimeMillis();
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + RandomUtils.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "商品詳情";
},executorService);
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + RandomUtils.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "賣(mài)家信息";
},executorService);
CompletableFuture<String> futureC = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + RandomUtils.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "庫(kù)存信息";
},executorService);
CompletableFuture<String> futureD = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + RandomUtils.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "訂單信息";
},executorService);
CompletableFuture<Void> allFuture = CompletableFuture.allOf(futureA, futureB, futureC, futureD);
allFuture.join();
System.out.println(futureA.join() + futureB.join() + futureC.join() + futureD.join());
System.out.println("總耗時(shí):" + (System.currentTimeMillis() - start));
}