背景
最近收到一道面試題:我們知道JDK的線程池在線程數(shù)達(dá)到corePoolSize之后惨好,先判斷隊(duì)列献联,再判斷maximumPoolSize带兜。如果想反過(guò)來(lái)谅摄,即先判斷maximumPoolSize再判斷隊(duì)列,怎么辦多律?
建議往下瀏覽之前先思考一下解決方案痴突,如果自己面對(duì)這道面試題,該如何作答狼荞?
方案一
由于線程池的行為是定義在JDK相關(guān)代碼中辽装,我們想改變其默認(rèn)行為,很自然的一種想法便是:繼承自JDK的線程池類java.util.concurrent.ThreadPoolExecutor
相味,然后改寫(xiě)其execute
方法拾积,將判斷隊(duì)列與maximumPoolSize的邏輯順序調(diào)整一下,以達(dá)到目的
原來(lái)的邏輯如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 創(chuàng)建新線程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 代碼運(yùn)行到此處丰涉,說(shuō)明線程池?cái)?shù)量達(dá)到了corePoolSize
if (isRunning(c) && workQueue.offer(command)) {
// 將任務(wù)成功入隊(duì)
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 代碼運(yùn)行到此處拓巧,說(shuō)明入隊(duì)失敗
else if (!addWorker(command, false))
// 創(chuàng)建新線程失敗則執(zhí)行拒絕策略
reject(command);
}
但是仔細(xì)閱讀代碼會(huì)發(fā)現(xiàn),execute中涉及到的一些關(guān)鍵方法如workerCountOf
一死、addWorker
等是私有的肛度,關(guān)鍵變量如ctl
、corePoolSize
也是私有的投慈,即無(wú)法通過(guò)簡(jiǎn)單繼承ThreadPoolExecutor改寫(xiě)其execute方法的核心邏輯達(dá)到目的承耿。
那考慮的一個(gè)變種是,定義一個(gè)MyThreadPoolExecutor伪煤,把ThreadPoolExecutor的代碼照搬過(guò)來(lái)加袋,只改寫(xiě)其中execute方法,改寫(xiě)后的邏輯如下:
public void execute(Runnable command) {
if (command == null)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 先判斷maximumPoolSize
if (workerCountOf(c) < maximumPoolSize) {
if (addWorker(command, false))
return;
c = ctl.get();
}
// 再判斷隊(duì)列
else if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (isRunning(c) && !workQueue.offer(command))
reject(command);
}
改寫(xiě)之后带族,發(fā)現(xiàn)reject方法也得重寫(xiě)锁荔,原因是RejectedExecutionHandler#rejectedExecution第二個(gè)入?yún)⑹荰hreadPoolExecutor,不能傳this
// java.util.concurrent.ThreadPoolExecutor#reject
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
// java.util.concurrent.RejectedExecutionHandler
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
這樣,連RejectedExecutionHandler也要改寫(xiě)一下
由于RejectedExecutionHandler的改造并非面試題核心邏輯阳堕,所以此處省略跋理,明白要表達(dá)的意思即可
但這樣做之后,與三方框架的兼容就很難了--->有不少三方框架入?yún)⑹切枰猅hreadPoolExecutor恬总,而不是自定義的MyThreadPoolExecutor前普,后續(xù)的使用會(huì)是個(gè)問(wèn)題
評(píng)價(jià):自定義MyThreadPoolExecutor需要代碼大篇幅的拷貝,麻煩不說(shuō)壹堰,兼容性還是個(gè)問(wèn)題拭卿,從實(shí)戰(zhàn)出發(fā)考慮,可行性很低
方案二
那有沒(méi)有什么方案能夠既省事贱纠,又能兼顧兼容性峻厚?
兩步走:
- 自定義Queue,改寫(xiě)offer邏輯
- 自定義線程池類谆焊,繼承自ThreadPoolExecutor惠桃,改寫(xiě)核心邏輯
自定義Queue
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
// 自定義的線程池類,繼承自ThreadPoolExecutor
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}
// offer方法的含義是:將任務(wù)提交到隊(duì)列中辖试,返回值為true/false辜王,分別代表提交成功/提交失敗
@Override
public boolean offer(Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}
// 線程池的當(dāng)前線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
// 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù),意味著線程池中有空閑線程罐孝,直接扔進(jìn)隊(duì)列里呐馆,讓線程去處理
return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false莲兢,暗含入隊(duì)失敗汹来,讓線程池去創(chuàng)建新的線程
return false;
}
// 重點(diǎn): 代碼運(yùn)行到此處,說(shuō)明當(dāng)前線程數(shù) >= 最大線程數(shù)怒见,需要真正的提交到隊(duì)列中
return super.offer(runnable);
}
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
自定義線程池類
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* 定義一個(gè)成員變量俗慈,用于記錄當(dāng)前線程池中已提交的任務(wù)數(shù)量
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// ThreadPoolExecutor的勾子方法,在task執(zhí)行完后需要將池中已提交的任務(wù)數(shù) - 1
submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
// 將池中已提交的任務(wù)數(shù) + 1
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
核心邏輯:當(dāng)提交任務(wù)給EagerThreadPoolExecutor遣耍,執(zhí)行submittedTaskCount.incrementAndGet();
將池中已提交的任務(wù)數(shù) + 1,然后就調(diào)用父類的execute方法
// 代碼運(yùn)行到此處炮车,說(shuō)明線程數(shù) >= corePoolSize舵变, 此時(shí)workQueue為自定義的TaskQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
核心邏輯:當(dāng)執(zhí)行workQueue.offer(command)
,走到自定義的TaskQueue#offer邏輯瘦穆,而offer方法的返回值決定著是否創(chuàng)建更多的線程:返回true纪隙,代表入隊(duì)成功,不創(chuàng)建線程扛或;返回false绵咱,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程
// 線程池的當(dāng)前線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
// 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù)熙兔,意味著線程池中有空閑線程悲伶,直接扔進(jìn)隊(duì)列里艾恼,讓線程去處理
return super.offer(runnable);
}
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
// 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false麸锉,暗含入隊(duì)失敗钠绍,讓線程池去創(chuàng)建新的線程
return false;
}
// 重點(diǎn): 代碼運(yùn)行到此處,說(shuō)明當(dāng)前線程數(shù) >= 最大線程數(shù)花沉,需要真正的提交到隊(duì)列中
return super.offer(runnable);
核心邏輯:當(dāng)前線程數(shù)小于最大線程數(shù)就返回false柳爽,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程
因此碱屁,總結(jié)起來(lái)就是:自定義的EagerThreadPoolExecutor依賴自定義的TaskQueue的offer返回值來(lái)決定是否創(chuàng)建更多的線程磷脯,達(dá)到先判斷maximumPoolSize再判斷隊(duì)列的目的
評(píng)價(jià):該方案不需要修改JDK線程池的核心邏輯,盡最大可能避免因更改核心流程考慮不周而引入的BUG娩脾。另一方面赵誓,擴(kuò)展Queue的手段,也是JDK提供的一個(gè)能夠讓用戶在不干涉核心流程的情況下晦雨,達(dá)到安全擴(kuò)展線程池能力的方式
題外話
有朋友或許會(huì)有疑問(wèn)架曹,這道面試題是面試官天馬行空想像出來(lái)的嗎?是否有實(shí)際的場(chǎng)景跟需要呢闹瞧?
可以從至少兩個(gè)開(kāi)源框架上找到答案
Dubbo 2.6.2及以上
其實(shí)上邊的方案二绑雄,代碼來(lái)自于Dubbo源碼,
相關(guān)git issue在此: Extension: Eager Thread Pool
Tomcat
Tomcat自定義的線程池類名與JDK的相同奥邮,都叫ThreadPoolExecutor万牺,只是包不同,且Tomcat的ThreadPoolExecutor繼承自JDK的ThreadPoolExecutor
Tomcat自定義的隊(duì)列也叫TaskQueue
Tomcat的ThreadPoolExecutor與TaskQueue核心邏輯洽腺、思想與方案二貼的代碼幾乎一致脚粟。實(shí)際上,是carryxyh(Dubbo EagerThreadPoolExecutor作者)借鑒的Tomcat設(shè)計(jì)蘸朋,關(guān)于這一點(diǎn)Dubbo github issue上作者本人也有提及
JDK線程池與Tomcat線程池方案誰(shuí)最好核无?
筆者認(rèn)為,沒(méi)有哪種方案最好藕坯,技術(shù)沒(méi)有銀彈团南,只是在不同視角進(jìn)行的trade off,在某種場(chǎng)景下最好的方案在另一個(gè)場(chǎng)景中可能卻導(dǎo)致糟糕的后果炼彪⊥赂可以從另一個(gè)角度考慮:如果有一種放之四海皆準(zhǔn),從各個(gè)角度考慮都優(yōu)于其他技術(shù)的存在辐马,那么它的出現(xiàn)必將完全取代它的競(jìng)品拷橘。而從現(xiàn)實(shí)看,顯然, JDK線程線與Tomcat線程池都各有場(chǎng)景與發(fā)展冗疮,并沒(méi)有出現(xiàn)一方取代另一方的情況萄唇,因此不存在哪種方案最好的說(shuō)法
如果線上環(huán)境要使用線程池,哪一種更合適赌厅?
線程數(shù)與CPU核數(shù)穷绵、任務(wù)類型的關(guān)系就不細(xì)說(shuō)了。簡(jiǎn)單而言特愿,如果不能忍受延遲仲墨,期望應(yīng)用能盡快地為用戶提供服務(wù),那么Tomcat線程池可能更適合你揍障;相反目养,如果你能容忍一些延遲來(lái)?yè)Q取性能上的提升,那么JDK線程池可能會(huì)更合適一些
注
方案一的代碼乃筆者隨手而敲毒嫡,未經(jīng)過(guò)任何生產(chǎn)環(huán)境的檢驗(yàn)跟錘煉癌蚁,可能存在潛在的BUG,強(qiáng)烈不建議生產(chǎn)環(huán)境使用兜畸。如果確實(shí)有需要努释,請(qǐng)使用方案二,有知名框架背書(shū)咬摇,且實(shí)現(xiàn)更為安全與優(yōu)雅伐蒂,乃首先之姿
最后,感謝這位朋友的面試題肛鹏,也感謝孤獨(dú)煙(人稱煙哥)分享面試題讓大家參與討論逸邦,以及飛奔的普朗克(人稱何總)提供的思路,才有了本篇的內(nèi)容分享在扰,希望大家都能有所收獲