Java如何讓線程池滿后再放隊(duì)列

背景

最近收到一道面試題:我們知道JDK的線程池在線程數(shù)達(dá)到corePoolSize之后惨好,先判斷隊(duì)列献联,再判斷maximumPoolSize带兜。如果想反過(guò)來(lái)谅摄,即先判斷maximumPoolSize再判斷隊(duì)列,怎么辦多律?

建議往下瀏覽之前先思考一下解決方案痴突,如果自己面對(duì)這道面試題,該如何作答狼荞?


方案一

由于線程池的行為是定義在JDK相關(guān)代碼中辽装,我們想改變其默認(rèn)行為,很自然的一種想法便是:繼承自JDK的線程池類java.util.concurrent.ThreadPoolExecutor相味,然后改寫(xiě)其execute方法拾积,將判斷隊(duì)列與maximumPoolSize的邏輯順序調(diào)整一下,以達(dá)到目的

原來(lái)的邏輯如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 創(chuàng)建新線程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 代碼運(yùn)行到此處丰涉,說(shuō)明線程池?cái)?shù)量達(dá)到了corePoolSize
    if (isRunning(c) && workQueue.offer(command)) {
        // 將任務(wù)成功入隊(duì)
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 代碼運(yùn)行到此處拓巧,說(shuō)明入隊(duì)失敗
    else if (!addWorker(command, false))
        // 創(chuàng)建新線程失敗則執(zhí)行拒絕策略
        reject(command);
}

但是仔細(xì)閱讀代碼會(huì)發(fā)現(xiàn),execute中涉及到的一些關(guān)鍵方法如workerCountOf一死、addWorker等是私有的肛度,關(guān)鍵變量如ctlcorePoolSize也是私有的投慈,即無(wú)法通過(guò)簡(jiǎn)單繼承ThreadPoolExecutor改寫(xiě)其execute方法的核心邏輯達(dá)到目的承耿。

那考慮的一個(gè)變種是,定義一個(gè)MyThreadPoolExecutor伪煤,把ThreadPoolExecutor的代碼照搬過(guò)來(lái)加袋,只改寫(xiě)其中execute方法,改寫(xiě)后的邏輯如下:

public void execute(Runnable command) {
    if (command == null)
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 先判斷maximumPoolSize
    if (workerCountOf(c) < maximumPoolSize) {
        if (addWorker(command, false))
            return;
        c = ctl.get();
    }
    // 再判斷隊(duì)列
    else if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (isRunning(c) && !workQueue.offer(command))
        reject(command);
}

改寫(xiě)之后带族,發(fā)現(xiàn)reject方法也得重寫(xiě)锁荔,原因是RejectedExecutionHandler#rejectedExecution第二個(gè)入?yún)⑹荰hreadPoolExecutor,不能傳this

// java.util.concurrent.ThreadPoolExecutor#reject

final void reject(Runnable command) {
     handler.rejectedExecution(command, this);
}
// java.util.concurrent.RejectedExecutionHandler

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

這樣,連RejectedExecutionHandler也要改寫(xiě)一下

由于RejectedExecutionHandler的改造并非面試題核心邏輯阳堕,所以此處省略跋理,明白要表達(dá)的意思即可

但這樣做之后,與三方框架的兼容就很難了--->有不少三方框架入?yún)⑹切枰猅hreadPoolExecutor恬总,而不是自定義的MyThreadPoolExecutor前普,后續(xù)的使用會(huì)是個(gè)問(wèn)題

評(píng)價(jià):自定義MyThreadPoolExecutor需要代碼大篇幅的拷貝,麻煩不說(shuō)壹堰,兼容性還是個(gè)問(wèn)題拭卿,從實(shí)戰(zhàn)出發(fā)考慮,可行性很低

方案二

那有沒(méi)有什么方案能夠既省事贱纠,又能兼顧兼容性峻厚?

兩步走:

  1. 自定義Queue,改寫(xiě)offer邏輯
  2. 自定義線程池類谆焊,繼承自ThreadPoolExecutor惠桃,改寫(xiě)核心邏輯
自定義Queue
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;
    
    // 自定義的線程池類,繼承自ThreadPoolExecutor
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    // offer方法的含義是:將任務(wù)提交到隊(duì)列中辖试,返回值為true/false辜王,分別代表提交成功/提交失敗
    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        // 線程池的當(dāng)前線程數(shù)
        int currentPoolThreadSize = executor.getPoolSize();
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            // 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù),意味著線程池中有空閑線程罐孝,直接扔進(jìn)隊(duì)列里呐馆,讓線程去處理
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            // 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false莲兢,暗含入隊(duì)失敗汹来,讓線程池去創(chuàng)建新的線程
            return false;
        }

        // 重點(diǎn): 代碼運(yùn)行到此處,說(shuō)明當(dāng)前線程數(shù) >= 最大線程數(shù)怒见,需要真正的提交到隊(duì)列中
        return super.offer(runnable);
    }

    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}
自定義線程池類
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * 定義一個(gè)成員變量俗慈,用于記錄當(dāng)前線程池中已提交的任務(wù)數(shù)量
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }


    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // ThreadPoolExecutor的勾子方法,在task執(zhí)行完后需要將池中已提交的任務(wù)數(shù) - 1
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 將池中已提交的任務(wù)數(shù) + 1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

核心邏輯:當(dāng)提交任務(wù)給EagerThreadPoolExecutor遣耍,執(zhí)行submittedTaskCount.incrementAndGet();將池中已提交的任務(wù)數(shù) + 1,然后就調(diào)用父類的execute方法

// 代碼運(yùn)行到此處炮车,說(shuō)明線程數(shù) >= corePoolSize舵变, 此時(shí)workQueue為自定義的TaskQueue
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

核心邏輯:當(dāng)執(zhí)行workQueue.offer(command),走到自定義的TaskQueue#offer邏輯瘦穆,而offer方法的返回值決定著是否創(chuàng)建更多的線程:返回true纪隙,代表入隊(duì)成功,不創(chuàng)建線程扛或;返回false绵咱,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程

// 線程池的當(dāng)前線程數(shù)
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
    // 已提交的任務(wù)數(shù)量小于當(dāng)前線程數(shù)熙兔,意味著線程池中有空閑線程悲伶,直接扔進(jìn)隊(duì)列里艾恼,讓線程去處理
    return super.offer(runnable);
}

if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
    // 重點(diǎn): 當(dāng)前線程數(shù)小于 最大線程數(shù) ,返回false麸锉,暗含入隊(duì)失敗钠绍,讓線程池去創(chuàng)建新的線程
    return false;
}

// 重點(diǎn): 代碼運(yùn)行到此處,說(shuō)明當(dāng)前線程數(shù) >= 最大線程數(shù)花沉,需要真正的提交到隊(duì)列中
return super.offer(runnable);

核心邏輯:當(dāng)前線程數(shù)小于最大線程數(shù)就返回false柳爽,代表入隊(duì)失敗,需要?jiǎng)?chuàng)建線程

因此碱屁,總結(jié)起來(lái)就是:自定義的EagerThreadPoolExecutor依賴自定義的TaskQueue的offer返回值來(lái)決定是否創(chuàng)建更多的線程磷脯,達(dá)到先判斷maximumPoolSize再判斷隊(duì)列的目的

評(píng)價(jià):該方案不需要修改JDK線程池的核心邏輯,盡最大可能避免因更改核心流程考慮不周而引入的BUG娩脾。另一方面赵誓,擴(kuò)展Queue的手段,也是JDK提供的一個(gè)能夠讓用戶在不干涉核心流程的情況下晦雨,達(dá)到安全擴(kuò)展線程池能力的方式

題外話

有朋友或許會(huì)有疑問(wèn)架曹,這道面試題是面試官天馬行空想像出來(lái)的嗎?是否有實(shí)際的場(chǎng)景跟需要呢闹瞧?

可以從至少兩個(gè)開(kāi)源框架上找到答案

Dubbo 2.6.2及以上

其實(shí)上邊的方案二绑雄,代碼來(lái)自于Dubbo源碼,
相關(guān)git issue在此: Extension: Eager Thread Pool

Tomcat

Tomcat自定義的線程池類名與JDK的相同奥邮,都叫ThreadPoolExecutor万牺,只是包不同,且Tomcat的ThreadPoolExecutor繼承自JDK的ThreadPoolExecutor

Tomcat自定義的隊(duì)列也叫TaskQueue

Tomcat的ThreadPoolExecutor與TaskQueue核心邏輯洽腺、思想與方案二貼的代碼幾乎一致脚粟。實(shí)際上,是carryxyh(Dubbo EagerThreadPoolExecutor作者)借鑒的Tomcat設(shè)計(jì)蘸朋,關(guān)于這一點(diǎn)Dubbo github issue上作者本人也有提及

JDK線程池與Tomcat線程池方案誰(shuí)最好核无?

筆者認(rèn)為,沒(méi)有哪種方案最好藕坯,技術(shù)沒(méi)有銀彈团南,只是在不同視角進(jìn)行的trade off,在某種場(chǎng)景下最好的方案在另一個(gè)場(chǎng)景中可能卻導(dǎo)致糟糕的后果炼彪⊥赂可以從另一個(gè)角度考慮:如果有一種放之四海皆準(zhǔn),從各個(gè)角度考慮都優(yōu)于其他技術(shù)的存在辐马,那么它的出現(xiàn)必將完全取代它的競(jìng)品拷橘。而從現(xiàn)實(shí)看,顯然, JDK線程線與Tomcat線程池都各有場(chǎng)景與發(fā)展冗疮,并沒(méi)有出現(xiàn)一方取代另一方的情況萄唇,因此不存在哪種方案最好的說(shuō)法

如果線上環(huán)境要使用線程池,哪一種更合適赌厅?

線程數(shù)與CPU核數(shù)穷绵、任務(wù)類型的關(guān)系就不細(xì)說(shuō)了。簡(jiǎn)單而言特愿,如果不能忍受延遲仲墨,期望應(yīng)用能盡快地為用戶提供服務(wù),那么Tomcat線程池可能更適合你揍障;相反目养,如果你能容忍一些延遲來(lái)?yè)Q取性能上的提升,那么JDK線程池可能會(huì)更合適一些

方案一的代碼乃筆者隨手而敲毒嫡,未經(jīng)過(guò)任何生產(chǎn)環(huán)境的檢驗(yàn)跟錘煉癌蚁,可能存在潛在的BUG,強(qiáng)烈不建議生產(chǎn)環(huán)境使用兜畸。如果確實(shí)有需要努释,請(qǐng)使用方案二,有知名框架背書(shū)咬摇,且實(shí)現(xiàn)更為安全與優(yōu)雅伐蒂,乃首先之姿


最后,感謝這位朋友的面試題肛鹏,也感謝孤獨(dú)煙(人稱煙哥)分享面試題讓大家參與討論逸邦,以及飛奔的普朗克(人稱何總)提供的思路,才有了本篇的內(nèi)容分享在扰,希望大家都能有所收獲

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缕减,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子芒珠,更是在濱河造成了極大的恐慌桥狡,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件皱卓,死亡現(xiàn)場(chǎng)離奇詭異总放,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)好爬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)甥啄,“玉大人存炮,你說(shuō)我怎么就攤上這事。” “怎么了穆桂?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵宫盔,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我享完,道長(zhǎng)灼芭,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任般又,我火速辦了婚禮彼绷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘茴迁。我一直安慰自己寄悯,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布堕义。 她就那樣靜靜地躺著猜旬,像睡著了一般。 火紅的嫁衣襯著肌膚如雪倦卖。 梳的紋絲不亂的頭發(fā)上洒擦,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音怕膛,去河邊找鬼熟嫩。 笑死,一個(gè)胖子當(dāng)著我的面吹牛嘉竟,可吹牛的內(nèi)容都是我干的邦危。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼舍扰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼倦蚪!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起边苹,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤陵且,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后个束,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體慕购,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年茬底,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了沪悲。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡阱表,死狀恐怖殿如,靈堂內(nèi)的尸體忽然破棺而出贡珊,到底是詐尸還是另有隱情,我是刑警寧澤涉馁,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布门岔,位于F島的核電站,受9級(jí)特大地震影響烤送,放射性物質(zhì)發(fā)生泄漏寒随。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一帮坚、第九天 我趴在偏房一處隱蔽的房頂上張望妻往。 院中可真熱鬧,春花似錦叶沛、人聲如沸蒲讯。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)判帮。三九已至,卻和暖如春溉箕,著一層夾襖步出監(jiān)牢的瞬間晦墙,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工肴茄, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留晌畅,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓寡痰,卻偏偏與公主長(zhǎng)得像抗楔,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拦坠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 線程池不僅在項(xiàng)目中是非常常用的一項(xiàng)技術(shù)而且在面試中基本上也是必問(wèn)的知識(shí)點(diǎn)连躏,接下來(lái)跟著我一起來(lái)鞏固一下線程池的相關(guān)知...
    不學(xué)無(wú)數(shù)的程序員閱讀 3,166評(píng)論 0 21
  • 第6章介紹了任務(wù)執(zhí)行框架, 它不僅能簡(jiǎn)化任務(wù)與線程的生命周期管理贞滨, 而且還提供一 種簡(jiǎn)單靈活的方式將任務(wù)的提交與任...
    好好學(xué)習(xí)Sun閱讀 1,174評(píng)論 0 2
  • 昨晚手機(jī)被張小帥拿去查資料入热,我邊看書(shū)邊等他,打算等他用完之后就來(lái)簡(jiǎn)書(shū)更新晓铆,誰(shuí)知勺良,居然一不小心睡著了。 等張小帥把手...
    葛芳閱讀 416評(píng)論 6 7
  • 王仙娥 古稀老人王仙娥 十五年來(lái)素生活 八十萬(wàn)元救兒命 企盼母愛(ài)勝病魔 (古稀老人骄噪,為救兒子給兒子移腎用去八十萬(wàn)元...
    旖旎i閱讀 386評(píng)論 3 9
  • 說(shuō)起來(lái)尚困,鐘書(shū)閣是離我們最近的書(shū)店,卻是第一次去链蕊。 據(jù)說(shuō)重慶鐘書(shū)閣成了網(wǎng)紅打卡地尾组,春節(jié)很多人排一兩個(gè)小時(shí)才能進(jìn)去忙芒,不...
    不舍札記閱讀 441評(píng)論 0 0