這是學(xué)習(xí) Java 8 中的 CompletableFuture 的寫的例子吃挑,總結(jié)并記錄一下割岛。
考慮這樣一種場景,有 4 個(gè)方法連接 DB 查詢數(shù)據(jù)籽慢,串行執(zhí)行所耗時(shí)長較長,現(xiàn)在使用 CompletableFuture 并發(fā)調(diào)用猫胁。
需要使用的方法有兩個(gè):supplyAsync()箱亿,thenApply()
supplyAsync() : 接收一個(gè)
supplier<T>
并且返回CompletableFuture<T>
,T
是通過調(diào)用 傳入的supplier取得的值的類型弃秆。
thenApply() : 獲得一個(gè) CompletableFuture<T> 的回調(diào)届惋,當(dāng) Future 完成時(shí)接收結(jié)果。
這段代碼模擬一個(gè)耗時(shí)操作
@Async
private CompletableFuture<Double> getHeight(){
// 打印日志
logger.warn(Thread.currentThread().getName() + "start height task!");
Double height = 0.0;
try {
TimeUnit.SECONDS.sleep(1);
height = 183.0;
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.warn(Thread.currentThread().getName() + "end height task!");
return CompletableFuture.completedFuture(height);
}
假設(shè)有3個(gè)和上方代碼類似的耗時(shí)操作菠赚,每個(gè)耗時(shí) 1s ,那順序執(zhí)行應(yīng)該耗時(shí) 4s,所以嘗試使用下方代碼讓其并發(fā)執(zhí)行
@Async
public CompletableFuture<Double> getBMI() throws ExecutionException, InterruptedException {
//開始時(shí)間
long start = System.currentTimeMillis();
logger.warn(Thread.currentThread().getName() + "run this task!");
CompletableFuture heightFuture = CompletableFuture.supplyAsync(() -> {
try {
return getHeight().get();
} catch (InterruptedException e) {
e.printStackTrace();
return 0.0;
} catch (ExecutionException e) {
e.printStackTrace();
return 0.0;
}
}, executor).thenApply(c -> {return c;});
CompletableFuture internalFuture = CompletableFuture.supplyAsync(() -> {
try {
return getIsInternal().get();
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (ExecutionException e) {
e.printStackTrace();
return false;
}
},executor).thenApply(c -> {return c;});
CompletableFuture weightFuture = CompletableFuture.supplyAsync(() -> {
try {
return getWeight().get();
} catch (InterruptedException e) {
e.printStackTrace();
return 0.0;
} catch (ExecutionException e) {
e.printStackTrace();
return 0.0;
}
},executor).thenApply(c -> {return c;});
CompletableFuture userFuture = CompletableFuture.supplyAsync(() -> {
try {
return getUser().get();
} catch (InterruptedException e) {
e.printStackTrace();
return null;
} catch (ExecutionException e) {
e.printStackTrace();
return null;
}
},executor).thenApply(c -> {return c;});
Double height = (Double) heightFuture.get();
Double weight = (Double) weightFuture.get();
User user = (User) userFuture.get();
logger.warn("height:" + height);
logger.warn("weight:" + weight);
logger.warn("user:" + user.getUserId());
logger.warn("isinternal:" + internalFuture.get().toString());
Double BMI = weight/((height/100)*(height/100));
// 打印結(jié)果以及運(yùn)行程序運(yùn)行花費(fèi)時(shí)間
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return CompletableFuture.completedFuture(BMI);
}
通過 supplyAsync() 運(yùn)行一個(gè)有返回值的異步任務(wù)脑豹,通過 thenApply() 在任務(wù)完成時(shí)獲取結(jié)果。
在上方的代碼中 supplyAsync() 方法的第二個(gè)參數(shù)是 Executor 類型衡查,我使用的是 ThreadPoolTaskExecutor 這個(gè)類型的線程池瘩欺,如果這里不傳,將會(huì)默認(rèn)使用 ForkJoinPool.commonPool() 這個(gè)線程池。
ThreadPoolTaskExecutor 有幾個(gè)核心參數(shù):
- CorePoolSize 核心線程數(shù)俱饿,表示可同時(shí)運(yùn)行的線程的最小數(shù)量
- MaxPoolSize 最大線程數(shù)歌粥,當(dāng)隊(duì)列滿時(shí)可同時(shí)運(yùn)行的最大線程數(shù)量
- QueueCapacity 隊(duì)列大小,當(dāng)核心線程數(shù)已經(jīng)達(dá)到最大值時(shí)拍埠,任務(wù)會(huì)被加入隊(duì)列失驶,當(dāng)隊(duì)列滿時(shí),會(huì)開啟非核心線程枣购,當(dāng)總線程數(shù)到達(dá)最大值且隊(duì)列已滿時(shí)嬉探,會(huì)按照飽和策略丟棄或用任務(wù)本身的線程繼續(xù)執(zhí)行。
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
@Bean
public Executor taskExecutor() {
// Spring 默認(rèn)配置是核心線程數(shù)大小為1棉圈,最大線程容量大小不受限制甲馋,隊(duì)列容量也不受限制。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數(shù)
executor.setCorePoolSize(CORE_POOL_SIZE);
// 最大線程數(shù)
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 隊(duì)列大小
executor.setQueueCapacity(QUEUE_CAPACITY);
// 當(dāng)最大池已滿時(shí)迄损,此策略保證不會(huì)丟失任務(wù)請求定躏,但是可能會(huì)影響應(yīng)用程序整體性能。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
executor.initialize();
return executor;
}
}