ThreadPoolExecutor的線程調(diào)度及其中的問題

問題現(xiàn)象

在我們的系統(tǒng)中,使用了這樣的配置來開啟異步操作:

<task:annotation-driven executor="executor"
    scheduler="scheduler" />
<task:executor id="executor" pool-size="16-128"
    keep-alive="60" rejection-policy="CALLER_RUNS" 
    queue-capacity="1000" />

客戶端開啟異步代碼如下:

@Async()
public Future<Result4Calculate> calculateByLendId(int lendrequestId) {
    // 標(biāo)記1
    // 調(diào)用REST服務(wù)镶骗;監(jiān)控調(diào)用時(shí)間伴找。
  }
  
// 獲取Future后的處理如下
try {
     keplerOverdue = summay4Overdue.get(5, TimeUnit.SECONDS);
     // 后續(xù)處理
} catch (Exception e) {
     // 標(biāo)記2
     // 異常報(bào)警 
} 

然而在這種配置下嗅绰,客戶端在標(biāo)記1處監(jiān)控到的調(diào)用時(shí)間普遍在4s以內(nèi)(平均時(shí)間不到1s抡秆,個(gè)別峰值請(qǐng)求會(huì)突破5s镐侯,全天超過5s的請(qǐng)求不到10個(gè)侦讨。然而,在標(biāo)記2處捕獲到的超時(shí)異常卻非常多苟翻,一天接近700+韵卤。問題出在哪兒?

原因分析

上述問題相關(guān)代碼的調(diào)用時(shí)序如下圖所示袜瞬。

image

其中怜俐,rest client 與rest server間的交互時(shí)間可以明確監(jiān)控到身堡,用時(shí)超過5s的非常少邓尤。但是,get方法卻經(jīng)常拋出超時(shí)異常贴谎。經(jīng)過初步分析汞扎,問題出現(xiàn)在ThreadPoolTaskExecutor的任務(wù)調(diào)度過程中。

任務(wù)調(diào)度邏輯

使用<task:executor>注解得到的bean是ThreadPoolTaskExecutor的實(shí)例擅这。這個(gè)類本身并不做調(diào)度澈魄,而是將調(diào)度工作委托給了ThreadPoolExecutor。后者的任務(wù)調(diào)度代碼如下:


/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

通過其中的注釋仲翎,我們可以知道它的核心調(diào)度邏輯如下(省略了一些檢查等方法):

  1. 如果正在運(yùn)行的線程數(shù)量小于corePoolSize(最小線程數(shù))痹扇,則嘗試啟動(dòng)一個(gè)新線程,并將當(dāng)入?yún)ommand作為該線程的第一個(gè)task溯香。否則進(jìn)入步驟二鲫构。

  2. 如果沒有按步驟1執(zhí)行,那么嘗試把入?yún)ommand放入workQueue中玫坛。如果能成功入隊(duì)结笨,做后續(xù)處理;否則湿镀,進(jìn)入步驟三炕吸。

  3. 如果沒有按步驟2執(zhí)行,那么將嘗試創(chuàng)建一個(gè)新線程勉痴,然后做后續(xù)處理赫模。

簡(jiǎn)單的說,當(dāng)向ThreadPoolExecutor提交一個(gè)任務(wù)時(shí)蒸矛,它會(huì)優(yōu)先交給線程池中的現(xiàn)有線程瀑罗;如果暫時(shí)沒有可用的線程扫外,那么它會(huì)將任務(wù)放到隊(duì)列中;一般只有在隊(duì)列滿了的時(shí)候(導(dǎo)致無法成功入隊(duì))廓脆,才會(huì)創(chuàng)建新線程來處理隊(duì)列中的任務(wù)筛谚。順帶一說,任務(wù)入隊(duì)后停忿,在某些條件下也會(huì)創(chuàng)建新線程驾讲。但新線程不會(huì)立即執(zhí)行當(dāng)前任務(wù),而是從隊(duì)列中獲取一個(gè)任務(wù)并開始執(zhí)行席赂。參見:http://www.infoq.com/cn/articles/java-threadPool

匯總分析

綜上所述吮铭,我們可以確定以下信息:

  • 根據(jù)系統(tǒng)的配置,ThreadPoolExecutor中的corePoolSize = 16颅停。

  • 在異步調(diào)度過程中谓晌,線程池?cái)?shù)量沒有增長(zhǎng)(最多是16個(gè))。

    這一點(diǎn)是通過日志中的線程名稱確認(rèn)的癞揉。日志中纸肉,異步線程的id從executor-1、executor-2一直到executor-16喊熟,但17及以上的都沒有出現(xiàn)過柏肪。

  • 當(dāng)并發(fā)數(shù)超過16時(shí),ThreadPoolExecutor會(huì)按照步驟二進(jìn)行任務(wù)調(diào)度芥牌,即把任務(wù)放入隊(duì)列中烦味,但沒有及時(shí)創(chuàng)建新線程來執(zhí)行這個(gè)任務(wù)。

    這一點(diǎn)是推測(cè)壁拉。在后面的測(cè)試中會(huì)驗(yàn)證這一點(diǎn)谬俄。

  • 隊(duì)列中的任務(wù)出現(xiàn)積壓、時(shí)間累積弃理,導(dǎo)致某一個(gè)任務(wù)超時(shí)后溃论,后續(xù)大量任務(wù)都超時(shí)。但是超時(shí)并沒有阻止任務(wù)執(zhí)行案铺;任務(wù)仍然會(huì)繼續(xù)通過rest client調(diào)用rest server蔬芥,并被監(jiān)控代碼記錄下時(shí)間。

    任務(wù)在隊(duì)列中積壓控汉、累積笔诵,是引發(fā)一天數(shù)百次異常、報(bào)警的原因姑子。而監(jiān)控代碼并未監(jiān)控到任務(wù)調(diào)度的時(shí)間乎婿,因此都沒有出現(xiàn)超時(shí)。

image

模擬重現(xiàn)

模擬當(dāng)線程池中工作線程數(shù)達(dá)到CorePoolSize街佑、且任務(wù)數(shù)未達(dá)到queue-capacity時(shí)的情形谢翎。線程池配置如下捍靠,其中corePoolSize配置為2,queue-capacity配置為1000.


<!-- 調(diào)用rest接口時(shí)森逮,使用此異步執(zhí)行器榨婆。避免占用全局的線程池 -->
<task:executor id="keplerRestExecutor" pool-size="2-128"
    keep-alive="60" rejection-policy="CALLER_RUNS"
     queue-capacity="1000" />

測(cè)試代碼如下:


@Test
public void test_multi_thread() {
    System.out.println("start");
    for (int i = 0; i < 10; i++) {
        new Thread(
            () -> {
                long start = System.currentTimeMillis();
                System.out.println("committed.");
                Future<Result4Calculate> result = 
                  BizOverdueCalculateServiceTest.
                  this.bizOverdueCalculateService.
                  calculateByLendId(1231);
                System.out.println("to get. cost:"
                    + (System.currentTimeMillis() - start));
                start = System.currentTimeMillis();
                try {
                    result.get(5, TimeUnit.SECONDS);
                } catch (InterruptedException | ExecutionException
                        | TimeoutException e) {
                    e.printStackTrace();
                }
                System.out.println("getted. cost:"
                    + (System.currentTimeMillis() - start));
            }, "thread_" + i).start();
    }
    System.out.println("all started");
    try {
        Thread.sleep(10001);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

其中bizOverdueCalculateService.calculateByLendId(1231)中的代碼如下:


/**
 * 按進(jìn)件id試算。 * <p>
 * 結(jié)清日期默認(rèn)為系統(tǒng)當(dāng)前日期
 *
 * @param lendrequestId
 * @return
 */
@Async("keplerRestExecutor")
public Future<Result4Calculate> calculateByLendId(int lendrequestId) {
    long start = System.currentTimeMillis();
    Future<Result4Calculate> f = 
      this.calculateByLendId(lendrequestId,
        new Date());
    System.out.println(Thread.currentThread().getName() + ", active count:"
        + this.keplerRestExecutor.getActiveCount() + ",  queue size :"
        + this.keplerRestExecutor.getThreadPoolExecutor().getQueue().size()
        + " rest cost: " + (System.currentTimeMillis() - start));
    return f;
}

根據(jù)上述分析褒侧,預(yù)計(jì):

  • 測(cè)試類并發(fā)的發(fā)起10個(gè)rest調(diào)用任務(wù)后良风,只有兩個(gè)任務(wù)會(huì)被線程池中的工作線程立即執(zhí)行,其它八個(gè)任務(wù)都進(jìn)入隊(duì)列闷供。

  • 線程池中始終只有兩個(gè)工作線程烟央。

  • 隊(duì)列中每個(gè)任務(wù)的執(zhí)行時(shí)間都不超時(shí),但執(zhí)行過若干個(gè)任務(wù)后歪脏,后續(xù)任務(wù)全部超時(shí)疑俭。

實(shí)際輸出如下:

  • 全部提交后,只有兩個(gè)線程在執(zhí)行婿失,其它8個(gè)任務(wù)全部在隊(duì)列中:active count:2, queue size :8钞艇。

  • 線程池中始終只有keplerRestExecutor-1、keplerRestExecutor-2兩個(gè)線程移怯。active count也始終為2香璃。

  • 任務(wù)的實(shí)際執(zhí)行時(shí)間(rest cost)都在1s上下这难。但從第9(每次測(cè)試舟误,這個(gè)數(shù)字會(huì)略有不同)個(gè)任務(wù)開始,result.get(5, TimeUnit.SECONDS)方法出現(xiàn)超時(shí)姻乓。

測(cè)試輸出如下:

start
committed.
committed.
committed.
committed.
committed.
all started
committed.
committed.
committed.
committed.
committed.
to get. cost:37
to get. cost:33
to get. cost:33
to get. cost:37
to get. cost:37
to get. cost:33
to get. cost:37
to get. cost:33
to get. cost:37
to get. cost:35
keplerRestExecutor-1, active count:2, queue size :8 rest cost: 1437
getted. cost:1444
keplerRestExecutor-2, active count:2, queue size :7 rest cost: 1437
getted. cost:1444
keplerRestExecutor-1, active count:2, queue size :6 rest cost: 1155
getted. cost:2599
keplerRestExecutor-2, active count:2, queue size :5 rest cost: 1155
getted. cost:2600
keplerRestExecutor-1, active count:2, queue size :4 rest cost: 1140
getted. cost:3739
keplerRestExecutor-2, active count:2, queue size :3 rest cost: 1140
getted. cost:3740
keplerRestExecutor-1, active count:2, queue size :2 rest cost: 1176
getted. cost:4915
keplerRestExecutor-2, active count:2, queue size :1 rest cost: 1176
getted. cost:4916
java.util.concurrent.TimeoutException
getted. cost:5001
getted. cost:5001
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest.lambda$0(BizOverdueCalculateServiceTest.java:99)
at cn.youcredit.thread.common.service.portfolio.BizOverdueCalculateServiceTest$$Lambda$21/648122621.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
keplerRestExecutor-1, active count:2, queue size :0 rest cost: 1175
keplerRestExecutor-2, active count:1, queue size :0 rest cost: 1175

rejection-policy的幾種配置

rejection-policy是指當(dāng)線程池中的工作線程和任務(wù)隊(duì)列都達(dá)到飽和狀態(tài)時(shí)嵌溢、或者線程池已經(jīng)關(guān)閉的情況下,線程池處理新的任務(wù)的規(guī)則蹋岩。

配置值 基本含義 備注
ABORT 直接中斷任務(wù)的提交動(dòng)作 在主線程中拋出任務(wù)被拒絕異常赖草。
CALLER_RUNS 由主線程來執(zhí)行新的任務(wù) 異步操作會(huì)轉(zhuǎn)換為同步操作
DISCARD 直接丟棄當(dāng)前任務(wù) 在對(duì)當(dāng)前任務(wù)進(jìn)行g(shù)et時(shí),會(huì)出現(xiàn)超時(shí)異常剪个。
DISCARD_OLDEST 丟棄任務(wù)隊(duì)列中第一個(gè)任務(wù) 在關(guān)閉隊(duì)列的情況下秧骑,會(huì)陷入“嘗試丟棄隊(duì)首任務(wù)-嘗試入隊(duì)-嘗試丟棄-嘗試入隊(duì)” 的死循環(huán)中。

解決方案

針對(duì)線程數(shù)和隊(duì)列大小扣囊,考慮方案有三:

提高初始線程數(shù)乎折。

提高步并發(fā)的初始線程數(shù)(如將16-128調(diào)整為32-128)。以此減少新任務(wù)進(jìn)入隊(duì)列的幾率侵歇。但是這個(gè)方案只是降低隊(duì)列積壓的風(fēng)險(xiǎn)骂澄,并不解決問題。

關(guān)閉隊(duì)列惕虑。

將隊(duì)列大小調(diào)整為0坟冲,以此保證每一個(gè)新任務(wù)都有一個(gè)新線程來執(zhí)行磨镶。這個(gè)方案的問題在于,并發(fā)壓力大時(shí)健提,可能導(dǎo)致線程不夠用琳猫。此時(shí)的異步調(diào)用會(huì)根據(jù)rejection-policy="CALLER_RUNS"的配置而變?yōu)橥秸{(diào)用。

更換線程池私痹。

使用優(yōu)先創(chuàng)建新線程(而非優(yōu)先入隊(duì)列)的線程池沸移。改動(dòng)最大的方案。目前考慮:底層系統(tǒng)并發(fā)壓力并不大侄榴;根據(jù)ELK的統(tǒng)計(jì)雹锣,最高并發(fā)大約也就30+rps●希可以考慮在指定專用ThreadPoolTaskExecutor的前提下蕊爵,關(guān)閉隊(duì)列。

此外桦山,rejection-policy的配置攒射,考慮方案有二:

設(shè)定為CALLER_RUNS。

這種方式可以保證任務(wù)得到執(zhí)行恒水;但有可能會(huì)阻塞主線程会放。并且阻塞時(shí)間視REST調(diào)用時(shí)間而定。

設(shè)定為DISCARD钉凌。

這種方式實(shí)際上也會(huì)阻塞主線程咧最,但是最長(zhǎng)會(huì)阻塞5s。目前考慮:試算服務(wù)試運(yùn)行期間設(shè)定為DISCARD御雕,以免主線程阻塞時(shí)間過長(zhǎng)矢沿。逾期試算服務(wù)完成性能優(yōu)化、并且服務(wù)穩(wěn)定之后酸纲,設(shè)定為CALLER_RUNS捣鲸,以確保試算任務(wù)得到執(zhí)行。

關(guān)閉隊(duì)列方案測(cè)試代碼

復(fù)用模擬重現(xiàn)中的測(cè)試代碼闽坡,修改線程池配置如下

<!-- 調(diào)用kepler的rest接口時(shí)栽惶,使用此異步執(zhí)行器。避免占用全局的線程池 -->
<task:executor id="keplerRestExecutor" pool-size="2-128"
    keep-alive="60" rejection-policy="CALLER_RUNS"
    queue-capacity="0" />
<task:scheduler id="scheduler" pool-size="32" />

再次執(zhí)行上測(cè)試代碼疾嗅。預(yù)計(jì):

  • 任務(wù)一經(jīng)提交外厂,就會(huì)創(chuàng)建10個(gè)工作線程來分別執(zhí)行姓言。

  • 隊(duì)列大小始終為0.

  • 不會(huì)出現(xiàn)超時(shí)浙值。

  • 可能會(huì)出現(xiàn)后續(xù)任務(wù)中,active count 小于10的情況肯适。

關(guān)閉隊(duì)列后的日志輸出

start
committed.
committed.
committed.
committed.
committed.
all started
committed.
committed.
committed.
committed.
committed.
to get. cost:3
to get. cost:7
to get. cost:7
to get. cost:3
to get. cost:7
to get. cost:7
to get. cost:7
to get. cost:7
to get. cost:4
to get. cost:4
keplerRestExecutor-7, active count:10, queue size :0 rest cost: 2177
getted. cost:2182
keplerRestExecutor-4, active count:9, queue size :0 rest cost: 2182
getted. cost:2187
keplerRestExecutor-9, active count:8, queue size :0 rest cost: 2185
getted. cost:2190
keplerRestExecutor-1, active count:7, queue size :0 rest cost: 2190
getted. cost:2196
keplerRestExecutor-3, active count:6, queue size :0 rest cost: 2191
getted. cost:2196
keplerRestExecutor-2, active count:5, queue size :0 rest cost: 2191
keplerRestExecutor-5, active count:4, queue size :0 rest cost: 2191
getted. cost:2196
getted. cost:2196
keplerRestExecutor-10, active count:3, queue size :0 rest cost: 2192
getted. cost:2197
keplerRestExecutor-8, active count:2, queue size :0 rest cost: 2192
getted. cost:2197
keplerRestExecutor-6, active count:1, queue size :0 rest cost: 2193
getted. cost:2198
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末次泽,一起剝皮案震驚了整個(gè)濱河市穿仪,隨后出現(xiàn)的幾起案子席爽,更是在濱河造成了極大的恐慌,老刑警劉巖啊片,帶你破解...
    沈念sama閱讀 216,692評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件只锻,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡紫谷,警方通過查閱死者的電腦和手機(jī)齐饮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來笤昨,“玉大人祖驱,你說我怎么就攤上這事÷髦希” “怎么了捺僻?”我有些...
    開封第一講書人閱讀 162,995評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)崇裁。 經(jīng)常有香客問我匕坯,道長(zhǎng),這世上最難降的妖魔是什么拔稳? 我笑而不...
    開封第一講書人閱讀 58,223評(píng)論 1 292
  • 正文 為了忘掉前任葛峻,我火速辦了婚禮,結(jié)果婚禮上巴比,老公的妹妹穿的比我還像新娘术奖。我一直安慰自己,他們只是感情好匿辩,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,245評(píng)論 6 388
  • 文/花漫 我一把揭開白布腰耙。 她就那樣靜靜地躺著,像睡著了一般铲球。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上晰赞,一...
    開封第一講書人閱讀 51,208評(píng)論 1 299
  • 那天稼病,我揣著相機(jī)與錄音,去河邊找鬼掖鱼。 笑死然走,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的戏挡。 我是一名探鬼主播芍瑞,決...
    沈念sama閱讀 40,091評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼褐墅!你這毒婦竟也來了拆檬?” 一聲冷哼從身側(cè)響起洪己,我...
    開封第一講書人閱讀 38,929評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎竟贯,沒想到半個(gè)月后答捕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,346評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡屑那,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,570評(píng)論 2 333
  • 正文 我和宋清朗相戀三年拱镐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片持际。...
    茶點(diǎn)故事閱讀 39,739評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡沃琅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蜘欲,到底是詐尸還是另有隱情阵难,我是刑警寧澤,帶...
    沈念sama閱讀 35,437評(píng)論 5 344
  • 正文 年R本政府宣布芒填,位于F島的核電站呜叫,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏殿衰。R本人自食惡果不足惜朱庆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,037評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望闷祥。 院中可真熱鬧娱颊,春花似錦、人聲如沸凯砍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)悟衩。三九已至剧罩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間座泳,已是汗流浹背惠昔。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留挑势,地道東北人镇防。 一個(gè)月前我還...
    沈念sama閱讀 47,760評(píng)論 2 369
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像潮饱,于是被迫代替她去往敵國(guó)和親来氧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,647評(píng)論 2 354