ExecutorService的invokeAll方法無法拋出異常
最近在線上發(fā)現(xiàn)一個(gè)多線程查詢有問題亡呵,始終查詢不到數(shù)據(jù),但是查看線上日志又未發(fā)現(xiàn)報(bào)錯(cuò)日志硫戈,經(jīng)過排查發(fā)現(xiàn)是由于invokeAll方法將拋出的異常進(jìn)行二次轉(zhuǎn)換成ExecutionException锰什,但是又沒有對(duì)該異常進(jìn)行處理導(dǎo)致
invokeAll方法介紹(不想看的可以直接跳過)
invokeAll方法接收一個(gè)Callable集合,并返回一個(gè)Future對(duì)象的列表掏愁,它可以安排多個(gè)任務(wù)在線程池中執(zhí)行歇由,然后等待這些任務(wù)都執(zhí)行完成。invokeAll方法會(huì)阻塞當(dāng)前線程果港,直到所有任務(wù)都完成沦泌,或者超時(shí)(如果提供),或者線程被中斷辛掠,此時(shí)它將拋出InterruptedException異常谢谦。它允許任務(wù)之間存在依賴關(guān)系,即一個(gè)任務(wù)可以依賴于其他任務(wù)的結(jié)果萝衩。它還允許客戶端獲取每個(gè)任務(wù)的執(zhí)行結(jié)果回挽,這是通過返回的Future對(duì)象列表來實(shí)現(xiàn)的
驗(yàn)證
public class InvokeAllMethodTest {
public static void main(String[] args) {
// 自定義線程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy());
// 創(chuàng)建要執(zhí)行的任務(wù)
List<Callable<Integer>> futures = Lists.newArrayList();
futures.add(() -> {
// 執(zhí)行的業(yè)務(wù)邏輯出現(xiàn)異常
if (true) {
throw new RuntimeException("發(fā)生業(yè)務(wù)邏輯異常了");
}
return 1;
});
try {
threadPoolExecutor.invokeAll(futures);
System.out.println("=============運(yùn)行結(jié)束===============");
} catch (InterruptedException e) {
System.out.println("=========發(fā)生中斷異常了============");
throw new RuntimeException(e);
}catch (Exception e){
System.out.println("=========發(fā)生其他異常了============");
throw new RuntimeException(e);
}
}
}
運(yùn)行結(jié)果截圖
可以看出代碼中已經(jīng)明確的拋出了指定異常,但是在調(diào)用invokeAll方法后卻并未將異常拋出
解釋
進(jìn)入AbstractExecutorService類的invokeAll方法
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 創(chuàng)建一個(gè)任務(wù)集合
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
// 任務(wù)是否完成的標(biāo)識(shí)
boolean done = false;
try {
for (Callable<T> t : tasks) {
// 把Callable線程封裝成了FutureTask
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
// 執(zhí)行任務(wù)
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
/* 獲取線程執(zhí)行的結(jié)果,
這里也能保證所有的線程都執(zhí)行完才會(huì)返回
這里的get方法也就是為什么異常被吞的真正的罪魁禍?zhǔn)仔梢辏瑢⑽覀儝伋龅漠惓^D(zhuǎn)成了ExecutionException 異常
前面講到f已經(jīng)是FutureTask類型了,所以直接找到他的get方法
*/
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
下面通過debug查看源碼的執(zhí)行過程:
解決方案
如果想要批量執(zhí)行任務(wù)千劈,這里推薦使用CompletableFuture ,這里不做詳細(xì)介紹,具體可以參考博客:CompletableFuture用法詳解
改造后的代碼:
public class InvokeAllMethodTest {
public static void main(String[] args) {
// 自定義線程池
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory(), new AbortPolicy());
// 創(chuàng)建要執(zhí)行的任務(wù)
List<CompletableFuture<Integer>> tasks = Lists.newArrayList();
tasks.add(CompletableFuture.supplyAsync(()->{
System.out.println("===========任務(wù)1正常執(zhí)行===========");
return 1;
}));
tasks.add(CompletableFuture.supplyAsync(()->{
// 執(zhí)行的業(yè)務(wù)邏輯出現(xiàn)異常
if (true) {
throw new RuntimeException("任務(wù)2發(fā)生業(yè)務(wù)邏輯異常了");
}
return 1;
}));
// 執(zhí)行牌捷,由于allOf方法的入?yún)⑹强勺儏?shù)墙牌,因此將參數(shù)通過toArray轉(zhuǎn)成數(shù)組,通過join執(zhí)行
CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
}
}
執(zhí)行結(jié)果: