CompletableFuture特別是對微服務(wù)架構(gòu)而言漂佩,會有很大的作為脖含。舉一個具體的場景,電商的商品頁面可能會涉及到商品詳情服務(wù)投蝉、商品評論服務(wù)养葵、相關(guān)商品推薦服務(wù)等等。獲取商品的信息時(/productdetails?productid=xxx)瘩缆,需要調(diào)用多個服務(wù)來處理這一個請求并返回結(jié)果关拒。這里可能會涉及到并發(fā)編程,我們完全可以使用Java 8的CompletableFuture或者RxJava來實現(xiàn)庸娱。
使用demo
public List<String> findPriceExecutorsCompletableFuture(String product){
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100));
List<CompletableFuture<String>> priceFuture = shops.stream()
.map(shop -> CompletableFuture
.supplyAsync(()-> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)), executor))
.collect(Collectors.toList());
return priceFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
https://my.oschina.net/u/3703858/blog/1799785
建議如下
:
- 如果你進行的是計算密集型的操作,并且沒有I/O,那么推薦使用Stream接口,因為實現(xiàn)簡單,同時效率也可能是最高的
- 反之,如果你并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待).那么使用CompletableFuture是靈活性更好,你可以像前面討論的那樣,依據(jù)等待/計算,或者W/C的比率設(shè)定需要使用的線程數(shù)
Future Callable例子:
public void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
//創(chuàng)建Callable着绊,它代表了下載所有的圖片
final Callable<List<ImageData>> task = () ->
info.stream()
.map(ImageInfo::downloadImage)
.collect(Collectors.toList());
// 將下載任務(wù)提交到executor
Future<List<ImageData>> images = executor.submit(task);
// renderText(source);
try {
// 獲得所有下載的圖片(在所有圖片可用之前會一直阻塞)
final List<ImageData> imageDatas = images.get();
// 渲染圖片
imageDatas.forEach(this::renderImage);
} catch (InterruptedException e) {
// 重新維護線程的中斷狀態(tài)
Thread.currentThread().interrupt();
// 我們不需要結(jié)果,所以取消任務(wù)
images.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause()); }
}
CompletableFuture
CompletableFuture類實現(xiàn)了CompletionStage和Future接口熟尉。Future是Java 5添加的類归露,用來描述一個異步計算的結(jié)果,但是獲取一個結(jié)果時方法較少,要么通過輪詢isDone斤儿,確認(rèn)完成后剧包,調(diào)用get()獲取值,要么調(diào)用get()設(shè)置一個超時時間雇毫。但是這個get()方法會阻塞住調(diào)用線程玄捕,這種阻塞的方式顯然和我們的異步編程的初衷相違背。
為了解決這個問題棚放,JDK吸收了guava的設(shè)計思想枚粘,加入了Future的諸多擴展功能形成了CompletableFuture。
CompletionStage是一個接口飘蚯,從命名上看得知是一個完成的階段馍迄,它里面的方法也標(biāo)明是在某個運行階段得到了結(jié)果之后要做的事情。
1局骤、進行變換
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
首先說明一下已Async結(jié)尾的方法都是可以異步執(zhí)行的攀圈,如果指定了線程池,會在指定的線程池中執(zhí)行峦甩,如果沒有指定赘来,默認(rèn)會在ForkJoinPool.commonPool()中執(zhí)行,下文中將會有好多類似的凯傲,都不詳細(xì)解釋了犬辰。關(guān)鍵的入?yún)⒅挥幸粋€Function,它是函數(shù)式接口冰单,所以使用Lambda表示起來會更加優(yōu)雅幌缝。它的入?yún)⑹巧弦粋€階段計算后的結(jié)果,返回值是經(jīng)過轉(zhuǎn)化后結(jié)果诫欠。
例如:
@Test
public void thenApply() {
String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join();
System.out.println(result);
}
結(jié)果為:
hello world
2涵卵、進行消耗
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
thenAccept是針對結(jié)果進行消耗浴栽,因為他的入?yún)⑹荂onsumer,有入?yún)o返回值轿偎。
例如:
@Test
public void thenAccept(){
CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world"));
}
結(jié)果為:hello world
3典鸡、對上一步的計算結(jié)果不關(guān)心,執(zhí)行下一個操作
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenRun它的入?yún)⑹且粋€Runnable的實例贴硫,表示當(dāng)?shù)玫缴弦徊降慕Y(jié)果時的操作椿每。
例如:
@Test
public void thenRun(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenRun(() -> System.out.println("hello world"));
while (true){}
}
4、結(jié)合兩個CompletionStage的結(jié)果英遭,進行轉(zhuǎn)化后返回
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
它需要原來的處理返回值,并且other代表的CompletionStage也要返回值之后亦渗,利用這兩個返回值挖诸,進行轉(zhuǎn)換后返回指定類型的值。
例如:
@Test
public void thenCombine() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> s1 + " " + s2).join();
System.out.println(result);
}
5法精、結(jié)合兩個CompletionStage的結(jié)果多律,進行消耗
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
它需要原來的處理返回值,并且other代表的CompletionStage也要返回值之后搂蜓,利用這兩個返回值狼荞,進行消耗。
例如:
@Test
public void thenAcceptBoth() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "world";
}), (s1, s2) -> System.out.println(s1 + " " + s2));
while (true){}
}
6帮碰、在兩個CompletionStage都運行完執(zhí)行
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
不關(guān)心這兩個CompletionStage的結(jié)果相味,只關(guān)心這兩個CompletionStage執(zhí)行完畢,之后在進行操作(Runnable)殉挽。
例如:
@Test
public void runAfterBoth(){
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterBothAsync(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
while (true){}
}
7丰涉、兩個CompletionStage,誰計算的快斯碌,我就用那個CompletionStage的結(jié)果進行下一步的轉(zhuǎn)化操作
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
我們現(xiàn)實開發(fā)場景中一死,總會碰到有兩種渠道完成同一個事情,所以就可以調(diào)用這個方法傻唾,找一個最快的結(jié)果進行處理投慈。
例如:
@Test
public void applyToEither() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).applyToEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), s -> s).join();
System.out.println(result);
}
8、兩個CompletionStage冠骄,誰計算的快伪煤,我就用那個CompletionStage的結(jié)果進行下一步的消耗操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
例如:
@Test
public void acceptEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).acceptEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello world";
}), System.out::println);
while (true){}
}
9猴抹、兩個CompletionStage带族,任何一個完成了都會執(zhí)行下一步的操作(Runnable)
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
例如:
@Test
public void runAfterEither() {
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).runAfterEither(CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s2";
}), () -> System.out.println("hello world"));
while (true) {
}
}
10、當(dāng)運行時出現(xiàn)了異常蟀给,可以通過exceptionally進行補償蝙砌。
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
例如:
@Test
public void exceptionally() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
11阳堕、當(dāng)運行完成時,對結(jié)果的記錄择克。這里的完成時有兩種情況恬总,一種是正常執(zhí)行,返回值肚邢。另外一種是遇到異常拋出造成程序的中斷壹堰。這里為什么要說成記錄,因為這幾個方法都會返回CompletableFuture骡湖,當(dāng)Action執(zhí)行完畢后它的結(jié)果返回原始的CompletableFuture的計算結(jié)果或者返回異常贱纠。所以不會對結(jié)果產(chǎn)生任何的作用。
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
例如:
@Test
public void whenComplete() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).whenComplete((s, t) -> {
System.out.println(s);
System.out.println(t.getMessage());
}).exceptionally(e -> {
System.out.println(e.getMessage());
return "hello world";
}).join();
System.out.println(result);
}
結(jié)果:
null
java.lang.RuntimeException: 測試一下異常情況
java.lang.RuntimeException: 測試一下異常情況
hello world
12响蕴、運行完成時谆焊,對結(jié)果的處理。這里的完成時有兩種情況浦夷,一種是正常執(zhí)行辖试,返回值。另外一種是遇到異常拋出造成程序的中斷劈狐。
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
例如:
出現(xiàn)異常時
@Test
public void handle() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//出現(xiàn)異常
if (1 == 1) {
throw new RuntimeException("測試一下異常情況");
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
結(jié)果:hello world
未出現(xiàn)異常時
@Test
public void handle() {
String result = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "s1";
}).handle((s, t) -> {
if (t != null) {
return "hello world";
}
return s;
}).join();
System.out.println(result);
}
結(jié)果為:s1
上面就是CompletionStage接口中方法的使用實例罐孝,CompletableFuture同樣也同樣實現(xiàn)了Future,所以也同樣可以使用get進行阻塞獲取值肥缔,總的來說莲兢,CompletableFuture使用起來還是比較爽的,看起來也比較優(yōu)雅一點辫继。
處理自定義異常
1怒见、創(chuàng)建原子對象保存異常對象
final AtomicReference<BizException> foundException = new AtomicReference<>();
...
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
//todo 業(yè)務(wù)邏輯
} catch (BizException e) {
foundException.set(e);
}
}
return "OK";
});
...
if(foundException.get() != null){
throw foundException.get();
}
2、使用CompletionException
List<CompletableFuture<Object>> futures =
tasks.stream()
.map(task -> CompletableFuture.supplyAsync(() -> businessLogic(task)))
.collect(Collectors.toList());
try {
List<Object> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
} catch (CompletionException e) {
throw e.getCause() instanceof BusinessException?
new BadRequestException("at least one async task had an exception"): e;
}
摘自: http://www.reibang.com/p/6f3ee90ab7d3
https://leokongwq.github.io/2017/01/17/java8-CompletableFuture.html
https://www.jdon.com/idea/java/java-8-completablefuture-vs-parallel-stream.html