問題
記錄一下生產(chǎn)環(huán)境出現(xiàn)的問題禀梳。。利耍。
幾天生產(chǎn)環(huán)境有同事反映分頁查詢一直在轉(zhuǎn)圈查不出來數(shù)據(jù)蚌本,跟我反饋,我也是很積極的去看有什么問題隘梨,我以為就是比較常見的問題吧程癌,當(dāng)我看的時(shí)候覺得很奇怪。
有一個(gè)分頁的接口其實(shí)有很多的日志需要打印轴猎,為什么只打印了一點(diǎn)日志就沒有后續(xù)了嵌莉,然后前臺(tái)頁面一直在轉(zhuǎn)圈圈等待數(shù)據(jù)的返回,怎么滴捻脖?是不喜歡下面的代碼不想執(zhí)行么锐峭,很顯然不是中鼠。
應(yīng)該正確打印的日志:
而實(shí)際生產(chǎn)上面打印到下面這條日志結(jié)束了。
查詢字典信息沿癞,返回?cái)?shù)據(jù): ······
那是什么情況呢援雇?
首先我們說明一下出現(xiàn)問題的場景,emm其實(shí)就是一個(gè)分頁查詢椎扬。但是呢惫搏,分頁的數(shù)據(jù)需要查詢一些其他的數(shù)據(jù),組裝以后返回給前端頁面蚕涤。
簡單點(diǎn)就是這樣子:
- 前端發(fā)起請求查詢數(shù)據(jù)
- 后端根據(jù)查詢到的數(shù)據(jù)筐赔,組裝后請求三方接口(三個(gè)接口)
- 三方接口返回?cái)?shù)據(jù)給后端服務(wù)
- 后端服務(wù)請求完成后返回給頁面
我們現(xiàn)在將后端服務(wù)詳細(xì)的執(zhí)行過程表述下
- 查詢到分頁數(shù)據(jù)后,循環(huán)每條數(shù)據(jù)钻趋,使用多線程進(jìn)行查詢?nèi)浇涌冢ǘ嗑€程交給線程池執(zhí)行)
- 每個(gè)數(shù)據(jù)的線程在查詢數(shù)據(jù)時(shí)有分了三個(gè)線程去查詢數(shù)據(jù)(同樣交給多線程)川陆,數(shù)據(jù)的線程等待查詢的線程相應(yīng)結(jié)果才能往下執(zhí)行
- 查詢返回的結(jié)果組裝后返回
正文
下面看下代碼時(shí)怎么寫的。蛮位。较沪。
public PageUtils queryPage(Map<String, Object> params) {
//查詢分頁數(shù)據(jù)
Page page = new Page((Integer) params.get(Constant.PAGE),
(Integer) params.get(Constant.SIZE));
IPage<FlowCardInfoDto> iPage = this.baseMapper.getPage(page, params);
List<FlowCardInfoDto> records = iPage.getRecords();
if (records.isEmpty()) {
return new PageUtils(iPage);
}
//CountDownLatch 等待所有結(jié)果返回才進(jìn)行下一步 size=10
CountDownLatch latch = new CountDownLatch(records.size());
//查詢卡商對(duì)象關(guān)系---這里打印:查詢字典信息失仁,返回?cái)?shù)據(jù): ······
Map<String, Object> dealBeanMap = dictUtil.getDealBeanMap("dealer_type");
List<FlowCardInfoDto> collect = records.stream().peek(item -> {
//查詢實(shí)時(shí)流量
String dealerBeanName = (String) dealBeanMap.get(item.getDealerNo());
if (StringUtils.isNotBlank(dealerBeanName)) {
Runnable runnable = () -> {
//有此卡商
try {
//*****省略部分代碼*****
//這里根據(jù)分頁的數(shù)據(jù): iccId查詢?nèi)浇涌跀?shù)據(jù)
trafficMap = strategy.trafficQueryDoPost(trafficMap);
//*****省略部分代碼*****
} finally {
//CountDownLatch 執(zhí)行完遞減
latch.countDown();
}
};
//這里將 runnable 交給 flowCardThreadPoolExecutor 線程池
flowCardThreadPoolExecutor.execute(runnable);
} else {
log.info("卡商<{}>處理接口未配置J!萄焦!", dealerBeanName);
latch.countDown();
}
}).collect(Collectors.toList());
try {
latch.await();
log.info("====================================");
} catch (Exception e) {
log.error("卡商查詢主線程等待異常", e);
}
iPage.setRecords(collect);
return new PageUtils(iPage);
}
/**
* 根據(jù) iccId查詢?nèi)綌?shù)據(jù)
* @return Map
*/
protected Map<String, Object> trafficQueryDoPost(Map<String, Object> param) {
log.info("開始查詢數(shù)據(jù)");
Map<String, Object> cardInfoMap = new HashMap<>();
cardInfoMap.put("cardNumber", param.get("iccId"));
CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
//查詢數(shù)據(jù)一交給線程池 flowCardThreadPoolExecutor控轿,http請求
}, flowCardThreadPoolExecutor).whenComplete((object, throwable) -> cardInfoMap.put("cardInfo", object));
CompletableFuture<Object> future2 = CompletableFuture.supplyAsync(() -> {
//查詢數(shù)據(jù)二交給線程池 flowCardThreadPoolExecutor,http請求
}, flowCardThreadPoolExecutor).whenComplete((object, throwable) -> cardInfoMap.put("dailyFlow", object));
CompletableFuture<Object> future3 = CompletableFuture.supplyAsync(() -> {
//查詢數(shù)據(jù)三交給線程池 flowCardThreadPoolExecutor拂封,http請求
}, flowCardThreadPoolExecutor).whenComplete((object, throwable) -> cardInfoMap.put("monthlyFlow", object));
//等待所有結(jié)果的
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
//阻塞茬射,直到所有任務(wù)結(jié)束。
log.info("**:等待所有結(jié)果返回");
all.join();
log.info("**:所有結(jié)果全部返回");
return cardInfoMap;
}
/**
* 線程池配置
* @return Map
*/
@Bean("flowCardThreadPoolExecutor")
public ThreadPoolTaskExecutor flowCardThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix(threadNamePrefix);
executor.setRejectedExecutionHandler((r, executor1) -> {
if (!executor1.isShutdown()) {
try {
executor1.getQueue().put(r);
} catch (InterruptedException e) {
log.error("interruptedException:{}", e.toString());
}
}
});
// 初始化
executor.initialize();
return executor;
}
上面的代碼展示了分頁查詢的大體邏輯冒签,每次分頁的數(shù)據(jù)條數(shù)默認(rèn)十條帶有 iccId
的數(shù)據(jù)在抛,每條 iccId
都會(huì)開辟一個(gè)線程來查流量,而這個(gè)線程交給線程池萧恕,然后每個(gè)線程查詢流量時(shí)刚梭,開辟三個(gè)線程去查詢流量,同樣交給了同一個(gè)線程池票唆,流量返回后組裝完成朴读,一條 iccId
卡才算執(zhí)行完成進(jìn)行 countDown()
。
問題就出在了線程池上面走趋,我們可以想一下衅金,有關(guān)線程池的線程沒有日志時(shí)怎么回事,沒有執(zhí)行嗎?是的氮唯,它就是沒有執(zhí)行酥宴。
分頁查詢的十條數(shù)據(jù)開辟的線程池交給了線程池,瞬間就占用了僅有的十個(gè)核心線程您觉,而這十個(gè)核心線程每個(gè)都必須等待自己開辟的三個(gè)核心線程都有結(jié)果后才能釋放資源,但是這三個(gè)線程都在隊(duì)列里面無法執(zhí)行(隊(duì)列未滿時(shí)授滓,只有核心線程在工作琳水,此時(shí)沒有普通線程數(shù)),造成了查詢流量的三十個(gè)線程(三個(gè)查詢流量的線程 * 十條 iccId
)壓根就沒有機(jī)會(huì)執(zhí)行般堆,而核心線程又在等待它們的結(jié)果 all.join();
一直等待 在孝。
造成的結(jié)果就是系統(tǒng)服務(wù)中凡是涉及到交給線程池執(zhí)行的操作都不能正常執(zhí)行。
改進(jìn)
順序執(zhí)行:將查詢流量的三個(gè)三方接口順序執(zhí)行淮摔,不依靠多線程和線程池私沮。缺點(diǎn)就是時(shí)間長,效率低和橙,頁面使用的人可能要罵人仔燕,服務(wù)間調(diào)用可能會(huì)超時(shí)(既然能順序執(zhí)行你猜我為什么要用多線程?)
線程隔離:另起一個(gè)線程配置魔招,將分頁數(shù)據(jù)的線程依舊交給原來的線程池
flowCardThreadPoolExecutor
晰搀,將查詢流量的三條線程交給另外一個(gè)線程池配置,使得兩個(gè)線程互不影響办斑,查詢流量的線程始終有機(jī)會(huì)執(zhí)行外恕,就不會(huì)造成flowCardThreadPoolExecutor
線程池阻塞。
保險(xiǎn)起見乡翅,將查詢流量的多線程操作進(jìn)行如下改動(dòng)鳞疲,設(shè)置取值的最大等待時(shí)間,超過時(shí)間拋棄此次請求蠕蚜,保證數(shù)據(jù)的線程不受影響尚洽。
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
//阻塞,直到所有任務(wù)結(jié)束波势。
log.info("**:等待所有結(jié)果返回");
try {
all.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("**:等待結(jié)果返回異常:", e);
}