問題現(xiàn)象
在我們的系統(tǒng)中,使用了這樣的配置來開啟異步操作:
<task:annotation-driven executor="executor"
scheduler="scheduler" />
<task:executor id="executor" pool-size="16-128"
keep-alive="60" rejection-policy="CALLER_RUNS"
queue-capacity="1000" />
客戶端開啟異步代碼如下:
@Async()
public Future<Result4Calculate> calculateByLendId(int lendrequestId) {
// 標(biāo)記1
// 調(diào)用REST服務(wù)镶骗;監(jiān)控調(diào)用時(shí)間伴找。
}
// 獲取Future后的處理如下
try {
keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);
// 后續(xù)處理
} catch (Exception e) {
// 標(biāo)記2
// 異常報(bào)警
}
然而在這種配置下嗅绰,客戶端在標(biāo)記1處監(jiān)控到的調(diào)用時(shí)間普遍在4s以內(nèi)(平均時(shí)間不到1s抡秆,個(gè)別峰值請(qǐng)求會(huì)突破5s镐侯,全天超過5s的請(qǐng)求不到10個(gè)侦讨。然而,在標(biāo)記2處捕獲到的超時(shí)異常卻非常多苟翻,一天接近700+韵卤。問題出在哪兒?
原因分析
上述問題相關(guān)代碼的調(diào)用時(shí)序如下圖所示袜瞬。
其中怜俐,rest client 與rest server間的交互時(shí)間可以明確監(jiān)控到身堡,用時(shí)超過5s的非常少邓尤。但是,get方法卻經(jīng)常拋出超時(shí)異常贴谎。經(jīng)過初步分析汞扎,問題出現(xiàn)在ThreadPoolTaskExecutor的任務(wù)調(diào)度過程中。
任務(wù)調(diào)度邏輯
使用<task:executor>注解得到的bean是ThreadPoolTaskExecutor的實(shí)例擅这。這個(gè)類本身并不做調(diào)度澈魄,而是將調(diào)度工作委托給了ThreadPoolExecutor。后者的任務(wù)調(diào)度代碼如下:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
通過其中的注釋仲翎,我們可以知道它的核心調(diào)度邏輯如下(省略了一些檢查等方法):
如果正在運(yùn)行的線程數(shù)量小于corePoolSize(最小線程數(shù))痹扇,則嘗試啟動(dòng)一個(gè)新線程,并將當(dāng)入?yún)ommand作為該線程的第一個(gè)task溯香。否則進(jìn)入步驟二鲫构。
如果沒有按步驟1執(zhí)行,那么嘗試把入?yún)ommand放入workQueue中玫坛。如果能成功入隊(duì)结笨,做后續(xù)處理;否則湿镀,進(jìn)入步驟三炕吸。
如果沒有按步驟2執(zhí)行,那么將嘗試創(chuàng)建一個(gè)新線程勉痴,然后做后續(xù)處理赫模。
簡(jiǎn)單的說,當(dāng)向ThreadPoolExecutor提交一個(gè)任務(wù)時(shí)蒸矛,它會(huì)優(yōu)先交給線程池中的現(xiàn)有線程瀑罗;如果暫時(shí)沒有可用的線程扫外,那么它會(huì)將任務(wù)放到隊(duì)列中;一般只有在隊(duì)列滿了的時(shí)候(導(dǎo)致無法成功入隊(duì))廓脆,才會(huì)創(chuàng)建新線程來處理隊(duì)列中的任務(wù)筛谚。順帶一說,任務(wù)入隊(duì)后停忿,在某些條件下也會(huì)創(chuàng)建新線程驾讲。但新線程不會(huì)立即執(zhí)行當(dāng)前任務(wù),而是從隊(duì)列中獲取一個(gè)任務(wù)并開始執(zhí)行席赂。參見:http://www.infoq.com/cn/articles/java-threadPool
匯總分析
綜上所述吮铭,我們可以確定以下信息:
根據(jù)系統(tǒng)的配置,ThreadPoolExecutor中的corePoolSize = 16颅停。
-
在異步調(diào)度過程中谓晌,線程池?cái)?shù)量沒有增長(zhǎng)(最多是16個(gè))。
這一點(diǎn)是通過日志中的線程名稱確認(rèn)的癞揉。日志中纸肉,異步線程的id從executor-1、executor-2一直到executor-16喊熟,但17及以上的都沒有出現(xiàn)過柏肪。
-
當(dāng)并發(fā)數(shù)超過16時(shí),ThreadPoolExecutor會(huì)按照步驟二進(jìn)行任務(wù)調(diào)度芥牌,即把任務(wù)放入隊(duì)列中烦味,但沒有及時(shí)創(chuàng)建新線程來執(zhí)行這個(gè)任務(wù)。
這一點(diǎn)是推測(cè)壁拉。在后面的測(cè)試中會(huì)驗(yàn)證這一點(diǎn)谬俄。
-
隊(duì)列中的任務(wù)出現(xiàn)積壓、時(shí)間累積弃理,導(dǎo)致某一個(gè)任務(wù)超時(shí)后溃论,后續(xù)大量任務(wù)都超時(shí)。但是超時(shí)并沒有阻止任務(wù)執(zhí)行案铺;任務(wù)仍然會(huì)繼續(xù)通過rest client調(diào)用rest server蔬芥,并被監(jiān)控代碼記錄下時(shí)間。
任務(wù)在隊(duì)列中積壓控汉、累積笔诵,是引發(fā)一天數(shù)百次異常、報(bào)警的原因姑子。而監(jiān)控代碼并未監(jiān)控到任務(wù)調(diào)度的時(shí)間乎婿,因此都沒有出現(xiàn)超時(shí)。
模擬重現(xiàn)
模擬當(dāng)線程池中工作線程數(shù)達(dá)到CorePoolSize街佑、且任務(wù)數(shù)未達(dá)到queue-capacity時(shí)的情形谢翎。線程池配置如下捍靠,其中corePoolSize配置為2,queue-capacity配置為1000.
<!-- 調(diào)用rest接口時(shí)森逮,使用此異步執(zhí)行器榨婆。避免占用全局的線程池 -->
<task:executor id="keplerRestExecutor" pool-size="2-128"
keep-alive="60" rejection-policy="CALLER_RUNS"
queue-capacity="1000" />
測(cè)試代碼如下:
@Test
public void test_multi_thread() {
System.out.println("start");
for (int i = 0; i < 10; i++) {
new Thread(
() -> {
long start = System.currentTimeMillis();
System.out.println("committed.");
Future<Result4Calculate> result =
BizOverdueCalculateServiceTest.
this.bizOverdueCalculateService.
calculateByLendId(1231);
System.out.println("to get. cost:"
+ (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
try {
result.get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException
| TimeoutException e) {
e.printStackTrace();
}
System.out.println("getted. cost:"
+ (System.currentTimeMillis() - start));
}, "thread_" + i).start();
}
System.out.println("all started");
try {
Thread.sleep(10001);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
其中bizOverdueCalculateService.calculateByLendId(1231)中的代碼如下:
/**
* 按進(jìn)件id試算。 * <p>
* 結(jié)清日期默認(rèn)為系統(tǒng)當(dāng)前日期
*
* @param lendrequestId
* @return
*/
@Async("keplerRestExecutor")
public Future<Result4Calculate> calculateByLendId(int lendrequestId) {
long start = System.currentTimeMillis();
Future<Result4Calculate> f =
this.calculateByLendId(lendrequestId,
new Date());
System.out.println(Thread.currentThread().getName() + ", active count:"
+ this.keplerRestExecutor.getActiveCount() + ", queue size :"
+ this.keplerRestExecutor.getThreadPoolExecutor().getQueue().size()
+ " rest cost: " + (System.currentTimeMillis() - start));
return f;
}
根據(jù)上述分析褒侧,預(yù)計(jì):
測(cè)試類并發(fā)的發(fā)起10個(gè)rest調(diào)用任務(wù)后良风,只有兩個(gè)任務(wù)會(huì)被線程池中的工作線程立即執(zhí)行,其它八個(gè)任務(wù)都進(jìn)入隊(duì)列闷供。
線程池中始終只有兩個(gè)工作線程烟央。
隊(duì)列中每個(gè)任務(wù)的執(zhí)行時(shí)間都不超時(shí),但執(zhí)行過若干個(gè)任務(wù)后歪脏,后續(xù)任務(wù)全部超時(shí)疑俭。
實(shí)際輸出如下:
全部提交后,只有兩個(gè)線程在執(zhí)行婿失,其它8個(gè)任務(wù)全部在隊(duì)列中:active count:2, queue size :8钞艇。
線程池中始終只有keplerRestExecutor-1、keplerRestExecutor-2兩個(gè)線程移怯。active count也始終為2香璃。
任務(wù)的實(shí)際執(zhí)行時(shí)間(rest cost)都在1s上下这难。但從第9(每次測(cè)試舟误,這個(gè)數(shù)字會(huì)略有不同)個(gè)任務(wù)開始,result.get(5, TimeUnit.SECONDS)方法出現(xiàn)超時(shí)姻乓。
測(cè)試輸出如下:
start
committed.
committed.
committed.
committed.
committed.
all started
committed.
committed.
committed.
committed.
committed.
to get. cost:37
to get. cost:33
to get. cost:33
to get. cost:37
to get. cost:37
to get. cost:33
to get. cost:37
to get. cost:33
to get. cost:37
to get. cost:35
keplerRestExecutor-1, active count:2, queue size :8 rest cost: 1437
getted. cost:1444
keplerRestExecutor-2, active count:2, queue size :7 rest cost: 1437
getted. cost:1444
keplerRestExecutor-1, active count:2, queue size :6 rest cost: 1155
getted. cost:2599
keplerRestExecutor-2, active count:2, queue size :5 rest cost: 1155
getted. cost:2600
keplerRestExecutor-1, active count:2, queue size :4 rest cost: 1140
getted. cost:3739
keplerRestExecutor-2, active count:2, queue size :3 rest cost: 1140
getted. cost:3740
keplerRestExecutor-1, active count:2, queue size :2 rest cost: 1176
getted. cost:4915
keplerRestExecutor-2, active count:2, queue size :1 rest cost: 1176
getted. cost:4916
java.util.concurrent.TimeoutException
getted. cost:5001
getted. cost:5001
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
keplerRestExecutor-1, active count:2, queue size :0 rest cost: 1175
keplerRestExecutor-2, active count:1, queue size :0 rest cost: 1175
rejection-policy的幾種配置
rejection-policy是指當(dāng)線程池中的工作線程和任務(wù)隊(duì)列都達(dá)到飽和狀態(tài)時(shí)嵌溢、或者線程池已經(jīng)關(guān)閉的情況下,線程池處理新的任務(wù)的規(guī)則蹋岩。
配置值 | 基本含義 | 備注 |
---|---|---|
ABORT | 直接中斷任務(wù)的提交動(dòng)作 | 在主線程中拋出任務(wù)被拒絕異常赖草。 |
CALLER_RUNS | 由主線程來執(zhí)行新的任務(wù) | 異步操作會(huì)轉(zhuǎn)換為同步操作 |
DISCARD | 直接丟棄當(dāng)前任務(wù) | 在對(duì)當(dāng)前任務(wù)進(jìn)行g(shù)et時(shí),會(huì)出現(xiàn)超時(shí)異常剪个。 |
DISCARD_OLDEST | 丟棄任務(wù)隊(duì)列中第一個(gè)任務(wù) | 在關(guān)閉隊(duì)列的情況下秧骑,會(huì)陷入“嘗試丟棄隊(duì)首任務(wù)-嘗試入隊(duì)-嘗試丟棄-嘗試入隊(duì)” 的死循環(huán)中。 |
解決方案
針對(duì)線程數(shù)和隊(duì)列大小扣囊,考慮方案有三:
提高初始線程數(shù)乎折。
提高步并發(fā)的初始線程數(shù)(如將16-128調(diào)整為32-128)。以此減少新任務(wù)進(jìn)入隊(duì)列的幾率侵歇。但是這個(gè)方案只是降低隊(duì)列積壓的風(fēng)險(xiǎn)骂澄,并不解決問題。
關(guān)閉隊(duì)列惕虑。
將隊(duì)列大小調(diào)整為0坟冲,以此保證每一個(gè)新任務(wù)都有一個(gè)新線程來執(zhí)行磨镶。這個(gè)方案的問題在于,并發(fā)壓力大時(shí)健提,可能導(dǎo)致線程不夠用琳猫。此時(shí)的異步調(diào)用會(huì)根據(jù)rejection-policy="CALLER_RUNS"的配置而變?yōu)橥秸{(diào)用。
更換線程池私痹。
使用優(yōu)先創(chuàng)建新線程(而非優(yōu)先入隊(duì)列)的線程池沸移。改動(dòng)最大的方案。目前考慮:底層系統(tǒng)并發(fā)壓力并不大侄榴;根據(jù)ELK的統(tǒng)計(jì)雹锣,最高并發(fā)大約也就30+rps●希可以考慮在指定專用ThreadPoolTaskExecutor的前提下蕊爵,關(guān)閉隊(duì)列。
此外桦山,rejection-policy的配置攒射,考慮方案有二:
設(shè)定為CALLER_RUNS。
這種方式可以保證任務(wù)得到執(zhí)行恒水;但有可能會(huì)阻塞主線程会放。并且阻塞時(shí)間視REST調(diào)用時(shí)間而定。
設(shè)定為DISCARD钉凌。
這種方式實(shí)際上也會(huì)阻塞主線程咧最,但是最長(zhǎng)會(huì)阻塞5s。目前考慮:試算服務(wù)試運(yùn)行期間設(shè)定為DISCARD御雕,以免主線程阻塞時(shí)間過長(zhǎng)矢沿。逾期試算服務(wù)完成性能優(yōu)化、并且服務(wù)穩(wěn)定之后酸纲,設(shè)定為CALLER_RUNS捣鲸,以確保試算任務(wù)得到執(zhí)行。
關(guān)閉隊(duì)列方案測(cè)試代碼
復(fù)用模擬重現(xiàn)中的測(cè)試代碼闽坡,修改線程池配置如下
<!-- 調(diào)用kepler的rest接口時(shí)栽惶,使用此異步執(zhí)行器。避免占用全局的線程池 -->
<task:executor id="keplerRestExecutor" pool-size="2-128"
keep-alive="60" rejection-policy="CALLER_RUNS"
queue-capacity="0" />
<task:scheduler id="scheduler" pool-size="32" />
再次執(zhí)行上測(cè)試代碼疾嗅。預(yù)計(jì):
任務(wù)一經(jīng)提交外厂,就會(huì)創(chuàng)建10個(gè)工作線程來分別執(zhí)行姓言。
隊(duì)列大小始終為0.
不會(huì)出現(xiàn)超時(shí)浙值。
可能會(huì)出現(xiàn)后續(xù)任務(wù)中,active count 小于10的情況肯适。
關(guān)閉隊(duì)列后的日志輸出
start
committed.
committed.
committed.
committed.
committed.
all started
committed.
committed.
committed.
committed.
committed.
to get. cost:3
to get. cost:7
to get. cost:7
to get. cost:3
to get. cost:7
to get. cost:7
to get. cost:7
to get. cost:7
to get. cost:4
to get. cost:4
keplerRestExecutor-7, active count:10, queue size :0 rest cost: 2177
getted. cost:2182
keplerRestExecutor-4, active count:9, queue size :0 rest cost: 2182
getted. cost:2187
keplerRestExecutor-9, active count:8, queue size :0 rest cost: 2185
getted. cost:2190
keplerRestExecutor-1, active count:7, queue size :0 rest cost: 2190
getted. cost:2196
keplerRestExecutor-3, active count:6, queue size :0 rest cost: 2191
getted. cost:2196
keplerRestExecutor-2, active count:5, queue size :0 rest cost: 2191
keplerRestExecutor-5, active count:4, queue size :0 rest cost: 2191
getted. cost:2196
getted. cost:2196
keplerRestExecutor-10, active count:3, queue size :0 rest cost: 2192
getted. cost:2197
keplerRestExecutor-8, active count:2, queue size :0 rest cost: 2192
getted. cost:2197
keplerRestExecutor-6, active count:1, queue size :0 rest cost: 2193
getted. cost:2198