注:(本例用submit實現會更簡單按傅,可以參見ThreadPoolExecutor execute 和 submit)
昨日上線了我的并發(fā)編程改造后的代碼,速率提升十分明顯服协,原本需要運行將近30小時的任務縮短到十分之一(我開啟了最大十個并發(fā))绍昂,不過,雖然達到了提升速率的目的偿荷,但是結合日志我還是發(fā)現了兩個潛在Bug窘游。上一篇傳送門:ThreadPoolExecutor + CountDownLatch 實際應用
首先,我在線程池中捕獲了異常跳纳,并且記錄日志忍饰。
try {
// 模擬耗時
Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (InterruptedException e) {
// 實際應用在此記錄日志
log.error("ThreadPoolExecutor error", e);
}
接著我在日志中發(fā)現的確有報錯,這種情況下寺庄,我應該判斷任務運行結果為失敗艾蓝,然而數據庫中最終任務運行狀態(tài)是成功力崇。再一看代碼就明白了,因為我只判斷了線程池中子線程的運行狀態(tài)是否完成赢织,但是沒有判斷邏輯結果是不是對的亮靴,再加上線程內異常被吞了,最后任務的狀態(tài)肯定是成功的于置,哪怕所有線程都報錯茧吊,結果也能輸出,只不過是錯的八毯。解決方法如下:
1搓侄、創(chuàng)建內部類,作為狀態(tài)標識變量(還可以用其他引用類型變量代替)
static class Flag {
private boolean f;
public Flag() {
f = true;// 初始設為true
}
public boolean isF() {
return f;
}
public void setF(boolean f) {
this.f = f;
}
}
2话速、初始化Flag
final Flag flag = new Flag();
3休讳、在異常捕獲中設置為false
...
catch (Exception e) {
// 如果子線程報錯,狀態(tài)標識為false
flag.setF(false);
}
4尿孔、最后判斷狀態(tài)是否為true
if (flag.isF()) {
// 打印計數
System.out.println("結束:" + totalRows.get());
} else {
System.out.println("有子線程報錯,結果不準確");
}
另外筹麸,我還發(fā)現了一個潛在Bug活合。我的任務核心部分是在while 循環(huán)
內請求第三方api
下載數據,如果第三方返回結果為空物赶,手動break
退出循環(huán)白指,但是未在catch
中退出。這樣就有問題了酵紫,一旦循環(huán)體內拋出運行時異常告嘲,代碼未執(zhí)行到判斷是否break
時就被異常了,但是catch
未執(zhí)行退出奖地,將可能導致死循環(huán)橄唬。好比示例代碼中:
while (true) {
// 此處發(fā)送請求拉取數據,一旦報錯参歹,將跳過下面的break 判斷仰楚,而catch中未退出,將導致死循環(huán)
String result = download();
// 判斷是否break
if (result == null) {
break;
}
} catch (Exception e) {
e.printStackTrace();
// 未執(zhí)行退出
}
}
修改后
boolean loop = true;
while (loop) {
String result = download();
// 判斷是否break
if (result == null) {
loop = false;
}
} catch (Exception e) {
// 退出循環(huán)
loop = false;
}
}
完整示例(while循環(huán)內用次數模擬和真實業(yè)務邏輯還是有區(qū)別的)
package com.yzy.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Main1 {
static class Flag {
private boolean f;
public Flag() {
f = true;
}
public boolean isF() {
return f;
}
public void setF(boolean f) {
this.f = f;
}
}
public static void main(String[] args) throws InterruptedException {
// 線程安全的計數器
AtomicInteger totalRows = new AtomicInteger(0);
// 創(chuàng)建線程池犬庇,其中核心線程10僧界,也是我期望的最大并發(fā)數,最大線程數和隊列大小都為30臭挽,即我的總任務數
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
// 初始化CountDownLatch捂襟,大小為30
CountDownLatch countDownLatch = new CountDownLatch(30);
// 記錄狀態(tài)
final Flag flag = new Flag();
// 模擬遍歷參數集合
for (int i = 0; i < 30; i++) {
// 往線程池提交任務
executor.execute(new Runnable() {
@Override
public void run() {
int times = 0;
boolean loop = true;
// 模擬數據拉取過程可能需要分頁
while (loop) {
// 模擬每個任務需要分頁5次
if (times >= 5) {
break;
}
times++;
// 模擬計數
totalRows.incrementAndGet();
try {
// 模擬耗時
Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (Exception e) {
// 如果子線程報錯,退出循環(huán)
loop = false;
// 如果子線程報錯欢峰,狀態(tài)標識為false
flag.setF(false);
}
}
// 子線程完成葬荷,countDownLatch執(zhí)行countDown
countDownLatch.countDown();
}
});
// 打印線程池運行狀態(tài)
System.out.println("線程池中線程數目:" + executor.getPoolSize() + "涨共,隊列中等待執(zhí)行的任務數目:" +
executor.getQueue().size() + ",已執(zhí)行結束的任務數目:" + executor.getCompletedTaskCount());
}
// 標記多線程關閉闯狱,但不會立馬關閉
executor.shutdown();
// 阻塞當前線程煞赢,知道所有子線程都執(zhí)行countDown方法才會繼續(xù)執(zhí)行
countDownLatch.await();
// 打印線程池運行狀態(tài)
System.out.println("線程池中線程數目:" + executor.getPoolSize() + ",隊列中等待執(zhí)行的任務數目:" +
executor.getQueue().size() + "哄孤,已執(zhí)行結束的任務數目:" + executor.getCompletedTaskCount());
if (flag.isF()) {
// 打印計數
System.out.println("結束:" + totalRows.get());
} else {
System.out.println("有子線程報錯照筑,結果不準確");
}
}
}