JDK 線程池使用過程中论悴,很多人都知道有一些關(guān)鍵參數(shù)需要配置,
? public ThreadPoolExecutor(int corePoolSize,
? int maximumPoolSize,
? long keepAliveTime,
? TimeUnit unit,
? BlockingQueue<Runnable> workQueue,
? RejectedExecutionHandler handler)
也大致知道線程池的大致原理墓律,但是不一定能解釋某些現(xiàn)象膀估。
有個(gè)系統(tǒng),設(shè)計(jì)大致是這樣的:服務(wù)A 發(fā)送消息到 MQ(具體來說是 kafka)耻讽,消費(fèi)端調(diào)用服務(wù) B 實(shí)際執(zhí)行消費(fèi)操作察纯。公司的中間件對(duì) kafka 做了一層封裝,能夠動(dòng)態(tài)配置一些參數(shù)针肥,動(dòng)態(tài)重建 consumer 應(yīng)用新的配置饼记,其中有一個(gè)參數(shù)就是并行度 parallelCount ,含義是:對(duì)于同一個(gè) partition 分配多少個(gè)線程并行處理消息慰枕。
系統(tǒng)設(shè)計(jì)之初具则,就考慮了使用這個(gè)配置來動(dòng)態(tài)調(diào)整整個(gè)系統(tǒng)的承載能力,因?yàn)榱髁繌椥员容^高具帮,少的時(shí)候一天沒有調(diào)用量博肋,多的時(shí)候可能需要在較短時(shí)間內(nèi)處理幾十萬到上百萬的消息,處理時(shí)間甚至可能需要根據(jù)下游系統(tǒng)性能調(diào)整蜂厅。所以這個(gè)參數(shù)的動(dòng)態(tài)調(diào)整至關(guān)重要匪凡。
但是實(shí)際上線之后,一次大批量調(diào)用葛峻,觀察到并行度調(diào)整似乎沒有達(dá)到預(yù)期效果锹雏,默認(rèn) parallelCount = 1,如果業(yè)務(wù)能保證不依賴消息順序术奖,則可以調(diào)整并行度提高吞吐量
有一天線上收到告警礁遵,消息積壓。于是趕緊調(diào)整并行度,
n=1,
n=2,
…
一切有序進(jìn)行中采记,消息處理速度整體不斷增加
n=8,
n=9,
n=10
…
n=16
但是觀察線上監(jiān)控佣耐,似乎處理能力不再增加了?這是怎么回事唧龄?
我記得下游系統(tǒng) actual service 配置了最大 64 線程兼砖,這還差很多呢,怎么就不線性增長(zhǎng)了既棺?
下游系統(tǒng)響應(yīng)變慢讽挟?
開始的時(shí)候猜想,是不是下游系統(tǒng)處理能力不夠了丸冕?請(qǐng)求的響應(yīng)速度變慢耽梅,所以請(qǐng)求堆積起來了?
由于下游系統(tǒng)是個(gè)外部的 HTTP 服務(wù)胖烛,所以無從得知眼姐,但是從歷史經(jīng)驗(yàn)來看诅迷,遠(yuǎn)遠(yuǎn)達(dá)不到這個(gè)系統(tǒng)的瓶頸,因?yàn)檫@個(gè)系統(tǒng)其實(shí)有很多的外部調(diào)用方众旗,我們的請(qǐng)求量不見得算很大罢杉。
而且從 actual service 的內(nèi)部打點(diǎn)來看,實(shí)際執(zhí)行 HTTP 調(diào)用的地方 TP99 并沒有變慢贡歧,和平時(shí)一樣滩租。
下游系統(tǒng)限流?
這是有可能的艘款,因?yàn)橄掠蜗到y(tǒng)這個(gè)HTTP服務(wù)本身有對(duì)各個(gè)接入放有限流持际,但是查了文檔,當(dāng)前調(diào)用量還遠(yuǎn)沒有達(dá)到限流閾值哗咆。
系統(tǒng)內(nèi)部分析
那么問題只會(huì)出現(xiàn)在系統(tǒng)內(nèi)部了
查看監(jiān)控
consumer 應(yīng)用內(nèi)部蜘欲,當(dāng)時(shí)的線程堆棧采樣可以看出來,kafka consumer 端線程數(shù)量確實(shí)有 16 個(gè)
這樣也就排除了 consumer 并行度調(diào)整不生效的問題晌柬。
actual service 內(nèi)部姥份,查看當(dāng)時(shí)的線程堆棧采樣,對(duì)應(yīng) consumer 并行度 16 的時(shí)候年碘,service 內(nèi)部用于處理任務(wù)的專用線程池澈歉,thread-count == 10
起初很奇怪,細(xì)想一下終于明白了屿衅。JDK 線程池確實(shí)是這個(gè)邏輯
簡(jiǎn)單來說埃难,就是 threadCount > coreSize ,先開始排隊(duì)涤久,隊(duì)列滿再擴(kuò)充線程池
//java.util.concurrent.ThreadPoolExecutor#execute
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
JDK 代碼注釋中有解釋涡尘,代碼短小精悍。
優(yōu)化方案
知道問題所在了响迂,那么怎么解決呢考抄?
初步想,有兩種方案蔗彤。
方案一
直接把 coreSize 設(shè)置到一個(gè)足夠大的值川梅,比如 64,或者干脆配置一個(gè) fixed size 的線程池
優(yōu)點(diǎn):簡(jiǎn)單直接然遏,能解決問題
缺點(diǎn):請(qǐng)求量低的時(shí)候大量線程閑置贫途,浪費(fèi)系統(tǒng)資源
方案二
這也是本篇的精髓所在了,改造 JDK 線程池待侵。
既然缺陷在于先排隊(duì)后擴(kuò)容潮饱,延遲了擴(kuò)容的時(shí)機(jī),那就改成先擴(kuò)容后排隊(duì)诫给,這樣就能確保在一定空間下處理能力線性增長(zhǎng)了香拉。
怎么做呢?分析上面的代碼中狂,第二個(gè) if 語句凫碌,isRunning(c) && workQueue.offer(command)
如果入隊(duì)成功了就不會(huì)創(chuàng)建線程,所以只要重載 Queue胃榕,判斷當(dāng)前 threadCount > coreSize && threadCount < maxCount 的時(shí)候返回 false盛险,就可以了,等到 threadCount > maxSize 的時(shí)候再實(shí)際執(zhí)行入隊(duì)操作勋又。
其實(shí)這就是 tomcat 線程池的做法苦掘,細(xì)節(jié)上需要注意:queue 需要感知到 threadPool 當(dāng)前的 count,需要做一些改造楔壤。
看源碼:tomcat 8.0.30 版本
//org.apache.tomcat.util.threads.TaskQueue#offer
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
創(chuàng)建的時(shí)候持有 Pool 的引用
// org.apache.catalina.core.StandardThreadExecutor#startInternal
@Override
protected void startInternal() throws LifecycleException {
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
setState(LifecycleState.STARTING);
}
剝離開 tomcat 的一些不相關(guān)的參數(shù)鹤啡,簡(jiǎn)單改造一下就可以用了。
感謝 tomcat 蹲嚣,隨便一看都是寶藏