如何“快準狠”的使用線程池呢禁熏,JDK給出了一個工具類:CompletableFuture來簡化開發(fā)。
1. 快速入手
1.1 執(zhí)行單個任務
任務無非兩種邑彪,一種沒有返回值:Runnable瞧毙,一種有返回值:Callable。而CompletableFuture可以理解為一個簡化操作的工具類寄症,其提供的API目的就是為了更加便利的進行流式操作宙彪。
- 提交任務的方法
方法 | 參數 | 作用 |
---|---|---|
supplyAsync | Supplier<U>,executor | 帶有返回值的任務,自定義的線程池 |
runAsync | Runnable,executor | 沒有返回值的任務有巧,自定義的線程池 |
- 等待任務的方法
方法 | 作用 |
---|---|
join | 阻塞等待結果释漆,但不會拋出uncheck異常 |
get | 阻塞等待結果,但會拋出uncheck異常 |
join和get區(qū)別:
- get()方法會
throws InterruptedException, ExecutionException
(繼承了Exception類)篮迎; - join()方法包裝了異常男图,將異常封裝為
CompletionException
異常(繼承了RuntimeException類);
準備代碼:
@Slf4j
public class TestBf {
static ThreadPoolExecutor executor =
new ThreadPoolExecutor(10, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
static List<String> sources = Arrays.asList("tom", "liMing", "tony");
private static User createUser(String name) {
if (StringUtils.startsWith(name, "t")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
throw new RuntimeException("eee");
}
User user = new User();
user.setName(name);
return user;
}
@Data
public static class User {
private String name;
}
}
測試代碼:
/**
* 1. 子線執(zhí)行supplyAsync內部方法甜橱,父線程調用join/get方法等待子線程執(zhí)行結果逊笆。
* 2. 當不傳遞executor參數時,默認使用forkJoinPool線程池
*/
private static void testSupplyAsync() {
User user = CompletableFuture.supplyAsync(() -> {
log.info("執(zhí)行supplyAsyn方法{}");
return createUser("t");
}, executor).join();
}
/**
* 1. 子線執(zhí)行runAsync內部方法岂傲,父線程調用join/get方法等待子線程執(zhí)行結果难裆。
* 2. 當不傳遞executor參數時,默認使用forkJoinPool線程池
*/
private static void testRunAsync() {
CompletableFuture.runAsync(() -> {
log.info("執(zhí)行runAsync方法{}");
}, executor).join();
}
1.2 執(zhí)行多個任務
實際上,我們現實中需求一般要求并發(fā)批量處理一組數據乃戈,在最慢的子任務執(zhí)行完畢后褂痰,統(tǒng)一的返回結果。
方法 | 作用 |
---|---|
allOf | 聚合多個CompletableFuture對象偏化,全部執(zhí)行完畢后才會返回結果 |
anyOf | 聚合多個CompletableFuture對象脐恩,最快的任務執(zhí)行完畢后返回結果 |
join/get | 利用lambda特性。全部執(zhí)行完畢才會返回結果 |
1.2.1 allOf實戰(zhàn)
說下為什么allOf()的響應參數為CompletableFuture<Void>
對象侦讨,因為是將一批CompletableFuture對象(這里稱為futures)進行聚合(可能響應對象不同),所以allOf()無法使用一個公共的響應對象苟翻,只能使用void韵卤。
獲取結果時,依舊還得去遍歷futures拿到每個子任務的響應對象崇猫。
private static void testAllOf() throws InterruptedException, ExecutionException {
//提交一批任務
List<CompletableFuture<User>> futures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
log.info("begin allOf");
//將這一批CompletableFuture對象沈条,使用allOf操作得到一個CompletableFuture對象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
//等待所有結果執(zhí)行完畢
allFutures.join();
//拿到所有結果(注意此處邏輯是主線程去進行的map操作,可進行優(yōu)化)
List<User> list = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
log.info("執(zhí)行完畢:{}", list);
}
優(yōu)化版2:使用子線程鏈式的處理后續(xù)結果
private static void testAllOfAndThenApply() throws InterruptedException, ExecutionException {
//提交一批任務
List<CompletableFuture<User>> futures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
log.info("begin allOf");
//將這一批CompletableFuture對象诅炉,使用allOf操作得到一個CompletableFuture對象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
//allFutures(即所有futures)執(zhí)行完畢后蜡歹。執(zhí)行thenApply()內部的邏輯,實現轉化涕烧。因為此處依舊是交由子線程處理的月而,所以返回的依舊是CompletableFuture<List<User>>對象
CompletableFuture<List<User>> listCompletableFuture =
allFutures.thenApply(s -> futures.stream().map(r -> {
User user = r.join();
if (user.getName().startsWith("t")) {
sleepWithNoException(1000);
}
log.info("此處是子線程打印:{}", user);
return user;
}).collect(Collectors.toList()));
log.info("執(zhí)行完畢:{}", listCompletableFuture.get());
}
1.2.2 anyOf實戰(zhàn)
獲取執(zhí)行最快的子任務议纯,某些場景下畢竟有用父款,響應對象是CompletableFuture<Object>
,還是一樣的原因瞻凤,因為聚合的子任務的響應對象可以是不同的憨攒。
獲取結果時,需要強制阀参。
private static void testAnyOf() throws InterruptedException, ExecutionException {
//提交一批任務
List<CompletableFuture<User>> userFutures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
//使用anyOf聚合
CompletableFuture<Object> anyFuture =
CompletableFuture.anyOf(userFutures.toArray(new CompletableFuture[] {}));
log.info("anyOf操作...");
//獲取執(zhí)行最快的任務
User user = (User) anyFuture.get();
log.info("最終輸出{}", user);
}
1.2.3 join/get操作
private static void testBatchJoin() {
//當createUser拋出異常時肝集,該方法也會拋出異常
List<User> users = sources.stream()
.map(s -> CompletableFuture.supplyAsync(() -> createUser(s))) //提交任務
.map(CompletableFuture::join) //等待結果
.collect(Collectors.toList()); //轉化列表
log.info("打印最終參數:{}", users);
}
這個方法的也是可以并發(fā)處理多個任務,但是不如allOf()的就是map(CompletableFuture::join)
的操作實際上是主線程執(zhí)行的蛛壳。
2. 鏈式操作
2.1 子線程鏈式處理
CompletableFuture提供了豐富的鏈式操作邏輯杏瞻。
方法 | 入參 | 作用 |
---|---|---|
thenApply | Function<? super T,? extends U> | 將傳入的對象轉換為另一個對象,響應對象依舊是CompletableFuture |
thenCompose | Function<? super T, ? extends CompletionStage<U>> | 連接兩個CompletableFuture對象炕吸,響應對象依舊是CompletableFuture |
thenAccept | Consumer<? super T> action | 消費傳入的對象伐憾,響應對象void |
thenRun | Runnable | 執(zhí)行下一個任務 |
- thenXxx:即拿到上層流返回的結果(或者上層流執(zhí)行完畢后),然后在將任務(theXxx內部邏輯)交由子線程處理赫模。
- XxxAsync:任務(theXxx內部邏輯)交由新的子線程處理树肃。
thenApply和thenCompose的區(qū)別
thenApply()轉換的是泛型中的類型,是同一個CompletableFuture瀑罗,相當于將CompletableFuture<T> 轉換成CompletableFuture<U>
thenCompose()用來組合兩個CompletableFuture胸嘴,是生成一個新的CompletableFuture雏掠。
thenApply的案例:
private static void testAllOfAndThenApply() throws InterruptedException, ExecutionException {
//提交一批任務
List<CompletableFuture<User>> futures =
sources.stream().map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor))
.collect(Collectors.toList());
log.info("begin allOf");
//將這一批CompletableFuture對象,使用allOf操作得到一個CompletableFuture對象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
//allFutures(即所有futures)執(zhí)行完畢后劣像。執(zhí)行thenApply()內部的邏輯乡话,實現轉化。因為此處依舊是交由子線程處理的耳奕,所以依舊是Com
CompletableFuture<List<User>> listCompletableFuture =
allFutures.thenApply(s -> futures.stream().map(r -> {
User user = r.join();
if (user.getName().startsWith("t")) {
sleepWithNoException(1000);
}
log.info("run thenApply:{}", user);
return user;
}).collect(Collectors.toList()));
log.info("執(zhí)行完畢:{}", listCompletableFuture.get());
}
結果:可以看到thenApply
使用的是同一個CompletableFuture
绑青,thenApply中的邏輯也是使用子線程進行處理。
17:34:50.827 [main] INFO com.tellme.obj.TestBf - begin allOf
17:34:50.831 [main] INFO com.tellme.obj.TestBf - after allOf
17:34:51.848 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tom)
17:34:51.848 [pool-1-thread-3] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tony)
17:34:52.848 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=liMing)
17:34:53.852 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=tom)
17:34:53.852 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=liMing)
17:34:54.857 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run thenApply:TestBf.User(name=tony)
17:34:54.857 [main] INFO com.tellme.obj.TestBf - 執(zhí)行完畢:[TestBf.User(name=tom), TestBf.User(name=liMing), TestBf.User(name=tony)]
(等效)thenCompose的案例:
private static void testAllOfAndThenCompose() throws InterruptedException, ExecutionException {
//提交一批任務
List<CompletableFuture<User>> futures =
sources.stream()
.map(s -> CompletableFuture.supplyAsync(() -> createUser(s), executor)) //此時的線程為executor
.collect(Collectors.toList());
log.info("begin allOf");
//將這一批CompletableFuture對象屋群,使用allOf操作得到一個CompletableFuture對象
CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}));
log.info("after allOf");
CompletableFuture<List<User>> listCompletableFuture = allFutures
.thenCompose(s -> CompletableFuture.supplyAsync(
() -> futures.stream().map(future -> {
log.info("print thenCompose request:{}", s);
User user = future.join();
if (user.getName().startsWith("t")) {
sleepWithNoException(1000);
}
log.info("print thenCompose response:{}", user); //此時的線程為ForkJoinPool
return user;
}).collect(Collectors.toList())));
log.info("執(zhí)行完畢:{}", listCompletableFuture.get());
}
結果:可以看到thenCompose
是將兩個CompletableFuture
組合起來闸婴,然后返回結果,使用的子線程也并不是一個芍躏。
20:16:00.602 [main] INFO com.tellme.obj.TestBf - begin allOf
20:16:00.606 [main] INFO com.tellme.obj.TestBf - after allOf
20:16:01.636 [pool-1-thread-3] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tony)
20:16:01.627 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=tom)
20:16:02.628 [pool-1-thread-2] INFO com.tellme.obj.TestBf - run createUser,TestBf.User(name=liMing)
20:16:02.633 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=tom)
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:03.636 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=liMing)
20:16:03.637 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose request:null
20:16:04.642 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - print thenCompose response:TestBf.User(name=tony)
20:16:04.642 [main] INFO com.tellme.obj.TestBf - 執(zhí)行完畢:[TestBf.User(name=tom), TestBf.User(name=liMing), TestBf.User(name=tony)]
2.2 結果合并
方法 | 作用 |
---|---|
thenCombine | 兩個CompletableFuture結合邪乍,有返回結果 |
thenAcceptBoth | 兩個CompletableFuture結合,無返回結果 |
/**
* 無返回值的結合
*/
private static void testThenAcceptBoth() {
CompletableFuture<User> userCompletableFuture1 = CompletableFuture.supplyAsync(() -> createUser("1"));
CompletableFuture<User> userCompletableFuture2 = CompletableFuture.supplyAsync(() -> createUser("2"));
userCompletableFuture1.thenAcceptBoth(userCompletableFuture2, (r1, r2) -> {
log.info("打印數據{},{}", r1, r2);
}).join();
}
/**
* 有返回值的結合
*/
private static void testTheCombine() {
CompletableFuture<User> userCompletableFuture1 = CompletableFuture.supplyAsync(() -> createUser("1"));
CompletableFuture<User> userCompletableFuture2 = CompletableFuture.supplyAsync(() -> createUser("2"));
//兩個結果結合
CompletableFuture<String> resFuture =
userCompletableFuture1.thenCombine(userCompletableFuture2, (r1, r2) -> r1.getName() + r2.getName());
log.info("兩個結果的結合:{}" + resFuture.join());
}
2.3 結果(異常)回調
方法 | 參數 | 作用 |
---|---|---|
exceptionally | Function<Throwable, ? extends T> | 當子線程拋出異常時对竣,將回調該方法庇楞,完成降級,該方法有返回值 |
whenComplete | BiConsumer<? super T, ? super Throwable> action | 當子線程執(zhí)行完后/拋出異常否纬,將調用該方法吕晌,該方法無返回值 |
handle | BiFunction<? super T, Throwable, ? extends U> | 當子線程執(zhí)行完后/拋出異常,將調用該方法,該方法有返回值 |
exceptionally使用場景:
- 并發(fā)處理多個任務時,若一個任務拋出異常時粮彤,不希望去終止所有的任務時,可以使用該方法柏靶。
private static void testExceptionally() {
List<User> users = sources.stream()
.map(s -> CompletableFuture
.supplyAsync(() -> createUser(s), executor) //執(zhí)行此方法,當此方法拋出異常溃论,可能影響導致并發(fā)執(zhí)行失敗
.exceptionally(ex -> new User())) //當出現異常時屎蜓,將回調exceptionally方法。完成降級
.map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(users);
}
- thenXxx鏈式調用:防止雪崩效應(即前面流程出現異常钥勋,導致theXx內部邏輯不執(zhí)行)
/**
* 防止雪崩效應炬转。
*/
private static void testExceptionally() {
String res = CompletableFuture.supplyAsync(() -> {
log.info("a1 請求");
return "a1";
})
.thenApply(s -> {
int i = 1 / 0;
log.info("a2 請求參數{}", s);
return "a2";
})
.exceptionally(ex -> {
return "a2 error"; //此處是降級邏輯
}).thenApply(s -> {
log.info("a3 請求參數:{}", s); //此時s的值為a2 error
return "a3";
})
.join();
log.info("最終結果:{}", res);
}
結果:
21:14:50.631 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - a1 請求
21:14:50.633 [ForkJoinPool.commonPool-worker-19] INFO com.tellme.obj.TestBf - a3 請求參數:a2 error
21:14:50.634 [main] INFO com.tellme.obj.TestBf - 最終結果:a3
whenComplete使用場景:
無論是否出現異常,肯定會回調whenComplete方法算灸,但是出現異常后扼劈,main方法會被中斷。
whenComplete參數是BiConsumer<? super T, ? super Throwable>
當觸發(fā)方法后菲驴,并不會返回降級結果荐吵。
private static void testWhenComplete() {
String res = CompletableFuture.supplyAsync(() -> {
log.info("run supplyAsync ");
int i = 1 / 0; //出現異常
return createUser("1");
}, executor).thenApply(u -> {
log.info("run thenApply ");
return u.getName();
}).whenComplete((s, ex) -> {
log.info("run whenComplete {}", s);
}).join();
log.info("main {}:", res);
}
結果:
21:19:12.180 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run supplyAsync
21:19:12.184 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run whenComplete null
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArithmeticException: / by zero
at com.tellme.obj.TestBf.lambda$testWhenComplete$5(TestBf.java:106)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
handle使用場景:
方法執(zhí)行完畢,或者出現異常時,都會調用這個方法先煎,都結果進行最終的后處理贼涩。
private static void testHandle() {
String join = CompletableFuture.supplyAsync(() -> {
log.info("run supplyAsync ");
return createUser("1");
}, executor).thenApply(u -> {
log.info("run thenApply ");
return u.getName();
}).handle((s, ex) -> s + "123") // 方法執(zhí)行完畢,或者出現異常時薯蝎,都會調用這個方法遥倦,都結果進行最終的后處理。
.join();
System.out.println(join);
}
結果:
21:22:33.607 [pool-1-thread-1] INFO com.tellme.obj.TestBf - run supplyAsync
null123
好文閱讀
Java8的CompletableFuture進階之道
Java8 CompletableFuture(6) thenCompose和thenCombine的區(qū)別
歷史文章
多線程——線程池ThreadPoolExecutor
SpringBoot2.x整合線程池(ThreadPoolTaskExecutor)