首先簡(jiǎn)單介紹下ThreadPoolExecutor的原理由驹,如圖:
每當(dāng)有新任務(wù)需要提交到線程池執(zhí)行的時(shí)候,大概的過(guò)程如下:
- step1.調(diào)用ThreadPoolExecutor的execute提交線程逝嚎,首先檢查CorePool,如果CorePool內(nèi)的線程小于CorePoolSize详恼,新創(chuàng)建線程執(zhí)行任務(wù)补君。
- step2.如果當(dāng)前CorePool內(nèi)的線程大于等于CorePoolSize,那么將任務(wù)加入到BlockingQueue昧互。
- step3.如果不能加入BlockingQueue挽铁,在小于MaxPoolSize的情況下創(chuàng)建線程執(zhí)行任務(wù)伟桅。
- step4.如果線程數(shù)大于等于MaxPoolSize,那么執(zhí)行拒絕策略叽掘。
其中1,2,4步驟的機(jī)制原來(lái)也就知道楣铁,只是對(duì)于step3原先關(guān)注的不多,這次踩到了够掠。本意是想讓超出MaxPoolSize的時(shí)候民褂,任務(wù)能執(zhí)行在父線程里執(zhí)行,所以想當(dāng)然的將BlockingQueue的大小設(shè)置為1了疯潭。結(jié)果出現(xiàn)的現(xiàn)象是赊堪,在并發(fā)很低的情況下,出現(xiàn)了任務(wù)被線程池拒絕的情況竖哩。調(diào)試了一下午哭廉,才發(fā)現(xiàn)卡在了step3,源代碼如下:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// step 3 workQueue.offer(command))
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
關(guān)鍵點(diǎn)就在workQueue.offer(command))這句:如果當(dāng)前線程數(shù)已經(jīng)大于coreSize了相叁,則會(huì)先嘗試往對(duì)列里放遵绰。如果隊(duì)列已經(jīng)滿了,則直接看能不能再建線程增淹,如果還不能則走拒絕策略椿访。這個(gè)時(shí)候,其實(shí)coreSize的線程可能都是空閑的虑润,他們是在等待workQueue里有新的任務(wù)出現(xiàn)成玫。而新的任務(wù)(特別是一次性批量提交大于1個(gè)的時(shí)候),因?yàn)锽lockingQueue size是1拳喻,所以只有1個(gè)能提交成功哭当,其他的都被拒絕了。
想想也是啊冗澈,這就是一個(gè)生產(chǎn)者消費(fèi)者模式钦勘,線程被創(chuàng)建出來(lái)之后,和后續(xù)提交的任務(wù)之間唯一媒介就是workQueueQ乔住彻采!
更詳細(xì)的關(guān)于ThreadPoolExecutor的機(jī)制可以參考:
http://www.reibang.com/p/ade771d2c9c0