線程池ExecutorService的參數(shù)
線程創(chuàng)建的時(shí)機(jī)
接下來伪窖,我們來具體看下這兩個(gè)參數(shù)所代表的含義逸寓,以及線程池中創(chuàng)建線程的時(shí)機(jī)。如上圖所示覆山,當(dāng)提交任務(wù)后竹伸,線程池首先會檢查當(dāng)前線程數(shù),如果此時(shí)線程數(shù)小于核心線程數(shù)簇宽,比如最開始線程數(shù)量為 0勋篓,則新建線程并執(zhí)行任務(wù),隨著任務(wù)的不斷增加魏割,線程數(shù)會逐漸增加并達(dá)到核心線程數(shù)譬嚣,此時(shí)如果仍有任務(wù)被不斷提交,就會被放入 workQueue 任務(wù)隊(duì)列中钞它,等待核心線程執(zhí)行完當(dāng)前任務(wù)后重新從 workQueue 中提取正在等待被執(zhí)行的任務(wù)拜银。
此時(shí),假設(shè)我們的任務(wù)特別的多遭垛,已經(jīng)達(dá)到了 workQueue 的容量上限尼桶,這時(shí)線程池就會啟動后備力量,也就是 maximumPoolSize 最大線程數(shù)锯仪,線程池會在 corePoolSize 核心線程數(shù)的基礎(chǔ)上繼續(xù)創(chuàng)建線程來執(zhí)行任務(wù)泵督,假設(shè)任務(wù)被不斷提交,線程池會持續(xù)創(chuàng)建線程直到線程數(shù)達(dá)到 maximumPoolSize 最大線程數(shù)庶喜,如果依然有任務(wù)被提交小腊,這就超過了線程池的最大處理能力,這個(gè)時(shí)候線程池就會拒絕這些任務(wù)久窟,我們可以看到實(shí)際上任務(wù)進(jìn)來之后秩冈,線程池會逐一判斷 corePoolSize、workQueue瘸羡、maximumPoolSize漩仙,如果依然不能滿足需求,則會拒絕任務(wù)犹赖。
corePoolSize 與 maximumPoolSize
通過上面的流程圖队他,我們了解了 corePoolSize 和 maximumPoolSize 的具體含義,corePoolSize 指的是核心線程數(shù)峻村,線程池初始化時(shí)線程數(shù)默認(rèn)為 0麸折,當(dāng)有新的任務(wù)提交后,會創(chuàng)建新線程執(zhí)行任務(wù)粘昨,如果不做特殊設(shè)置垢啼,此后線程數(shù)通常不會再小于 corePoolSize 窜锯,因?yàn)樗鼈兪呛诵木€程,即便未來可能沒有可執(zhí)行的任務(wù)也不會被銷毀芭析。隨著任務(wù)量的增加锚扎,在任務(wù)隊(duì)列滿了之后,線程池會進(jìn)一步創(chuàng)建新線程馁启,最多可以達(dá)到 maximumPoolSize 來應(yīng)對任務(wù)多的場景驾孔,如果未來線程有空閑,大于 corePoolSize 的線程會被合理回收惯疙。所以正常情況下翠勉,線程池中的線程數(shù)量會處在 corePoolSize 與 maximumPoolSize 的閉區(qū)間內(nèi)。
比如線程池的 corePoolSize 為 5霉颠,maximumPoolSize 為 10对碌,任務(wù)隊(duì)列容量為 100,隨著任務(wù)被提交蒿偎,我們的線程數(shù)量會從 0 慢慢增長到 5朽们,然后就不再增長了,新的任務(wù)會被放入隊(duì)列中酥郭,直到隊(duì)列被塞滿华坦,然后在 corePoolSize 的基礎(chǔ)上繼續(xù)創(chuàng)建新線程來執(zhí)行隊(duì)列中的任務(wù)愿吹,線程會逐漸增加到 maximumPoolSize不从, 然后線程數(shù)不再增加,如果此時(shí)仍有任務(wù)被不斷提交犁跪,線程池就會拒絕任務(wù)椿息。隨著隊(duì)列中任務(wù)被執(zhí)行完,被創(chuàng)建的 10 個(gè)線程現(xiàn)在無事可做了坷衍,這時(shí)線程池會根據(jù) keepAliveTime 參數(shù)來銷毀線程寝优,已達(dá)到減少內(nèi)存占用的目的。
線程池的幾個(gè)特點(diǎn)
線程池希望保持較少的線程數(shù)枫耳,并且只有在負(fù)載變得很大時(shí)才增加線程乏矾。
線程池只有在任務(wù)隊(duì)列填滿時(shí)才創(chuàng)建多于 corePoolSize 的線程,如果使用的是無界隊(duì)列(例如 LinkedBlockingQueue)迁杨,那么由于隊(duì)列不會滿钻心,所以線程數(shù)不會超過 corePoolSize。
通過設(shè)置 corePoolSize 和 maximumPoolSize 為相同的值铅协,就可以創(chuàng)建固定大小的線程池捷沸。
通過設(shè)置 maximumPoolSize 為很高的值,例如 Integer.MAX_VALUE狐史,就可以允許線程池創(chuàng)建任意多的線程痒给。
keepAliveTime+時(shí)間單位
第三個(gè)參數(shù)是 keepAliveTime + 時(shí)間單位说墨,當(dāng)線程池中線程數(shù)量多于核心線程數(shù)時(shí),而此時(shí)又沒有任務(wù)可做苍柏,線程池就會檢測線程的 keepAliveTime尼斧,如果超過規(guī)定的時(shí)間,無事可做的線程就會被銷毀试吁,以便減少內(nèi)存的占用和資源消耗突颊。如果后期任務(wù)又多了起來,線程池也會根據(jù)規(guī)則重新創(chuàng)建線程潘悼,所以這是一個(gè)可伸縮的過程律秃,比較靈活,我們也可以用 setKeepAliveTime 方法動態(tài)改變 keepAliveTime 的參數(shù)值治唤。
ThreadFactory
第四個(gè)參數(shù)是 ThreadFactory棒动,ThreadFactory 實(shí)際上是一個(gè)線程工廠,它的作用是生產(chǎn)線程以便執(zhí)行任務(wù)宾添。我們可以選擇使用默認(rèn)的線程工廠船惨,創(chuàng)建的線程都會在同一個(gè)線程組,并擁有一樣的優(yōu)先級缕陕,且都不是守護(hù)線程粱锐,我們也可以選擇自己定制線程工廠,以方便給線程自定義命名扛邑,不同的線程池內(nèi)的線程通常會根據(jù)具體業(yè)務(wù)來定制不同的線程名怜浅。
workQueue 和 Handler
最后兩個(gè)參數(shù)是 workQueue 和 Handler,它們分別對應(yīng)阻塞隊(duì)列和任務(wù)拒絕策略蔬崩。
新建線程池時(shí)可以指定它的任務(wù)拒絕策略恶座。
ExecutorService executorService =
new ThreadPoolExecutor(5, 10, 200l,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy());
線程池會在以下兩種情況下會拒絕新提交的任務(wù)。
第一種情況是當(dāng)我們調(diào)用 shutdown 等方法關(guān)閉線程池后沥阳,即便此時(shí)可能線程池內(nèi)部依然有沒執(zhí)行完的任務(wù)正在執(zhí)行跨琳,但是由于線程池已經(jīng)關(guān)閉,此時(shí)如果再向線程池內(nèi)提交任務(wù)桐罕,就會遭到拒絕脉让。
第二種情況是線程池沒有能力繼續(xù)處理新提交的任務(wù),也就是工作已經(jīng)非常飽和的時(shí)候功炮。
我們具體講一下第二種情況溅潜,也就是由于工作飽和導(dǎo)致的拒絕。比如新建一個(gè)線程池死宣,使用容量上限為 10 的 ArrayBlockingQueue 作為任務(wù)隊(duì)列伟恶,并且指定線程池的核心線程數(shù)為 5,最大線程數(shù)為 10毅该,假設(shè)此時(shí)有 20 個(gè)耗時(shí)任務(wù)被提交博秫,在這種情況下潦牛,線程池會首先創(chuàng)建核心數(shù)量的線程,也就是5個(gè)線程來執(zhí)行任務(wù)挡育,然后往隊(duì)列里去放任務(wù)巴碗,隊(duì)列的 10 個(gè)容量被放滿了之后,會繼續(xù)創(chuàng)建新線程即寒,直到達(dá)到最大線程數(shù) 10橡淆。此時(shí)線程池中一共有 20 個(gè)任務(wù),其中 10 個(gè)任務(wù)正在被 10 個(gè)線程執(zhí)行母赵,還有 10 個(gè)任務(wù)在任務(wù)隊(duì)列中等待逸爵,而且由于線程池的最大線程數(shù)量就是 10,所以已經(jīng)不能再增加更多的線程來幫忙處理任務(wù)了凹嘲,這就意味著此時(shí)線程池工作飽和师倔,這個(gè)時(shí)候再提交新任務(wù)時(shí)就會被拒絕。
我們結(jié)合圖示來分析上述情況周蹭,首先看右側(cè)上方的隊(duì)列部分趋艘,你可以看到目前隊(duì)列已經(jīng)滿了,而圖中隊(duì)列下方的每個(gè)線程都在工作凶朗,且線程數(shù)已經(jīng)達(dá)到最大值 10瓷胧,如果此時(shí)再有新的任務(wù)提交,線程池由于沒有能力繼續(xù)處理新提交的任務(wù)棚愤,所以就會拒絕搓萧。
我們了解了線程池拒絕任務(wù)的時(shí)機(jī),那么我們?nèi)绾握_地選擇拒絕策略呢遇八?Java 在 ThreadPoolExecutor 類中為我們提供了 4 種默認(rèn)的拒絕策略來應(yīng)對不同的場景矛绘,都實(shí)現(xiàn)了 RejectedExecutionHandler 接口,如圖所示:
拒絕策略
AbortPolicy
這種拒絕策略在拒絕任務(wù)時(shí)刃永,會直接拋出一個(gè)類型為 RejectedExecutionException 的 RuntimeException,讓你感知到任務(wù)被拒絕了羊精,于是你便可以根據(jù)業(yè)務(wù)邏輯選擇重試或者放棄提交等策略斯够。
DiscardPolicy,
這種拒絕策略正如它的名字所描述的一樣喧锦,當(dāng)新任務(wù)被提交后直接被丟棄掉读规,也不會給你任何的通知,相對而言存在一定的風(fēng)險(xiǎn)燃少,因?yàn)槲覀兲峤坏臅r(shí)候根本不知道這個(gè)任務(wù)會被丟棄束亏,可能造成數(shù)據(jù)丟失。
DiscardOldestPolicy
如果線程池沒被關(guān)閉且沒有能力執(zhí)行阵具,則會丟棄任務(wù)隊(duì)列中的頭結(jié)點(diǎn)碍遍,通常是存活時(shí)間最長的任務(wù)定铜,這種策略與第二種不同之處在于它丟棄的不是最新提交的,而是隊(duì)列中存活時(shí)間最長的怕敬,這樣就可以騰出空間給新提交的任務(wù)揣炕,但同理它也存在一定的數(shù)據(jù)丟失風(fēng)險(xiǎn)。
CallerRunsPolicy
相對而言它就比較完善了东跪,當(dāng)有新任務(wù)提交后畸陡,如果線程池沒被關(guān)閉且沒有能力執(zhí)行,則把這個(gè)任務(wù)交于提交任務(wù)的線程執(zhí)行虽填,也就是誰提交任務(wù)丁恭,誰就負(fù)責(zé)執(zhí)行任務(wù)。這樣做主要有兩點(diǎn)好處:
1.新提交的任務(wù)不會被丟棄斋日,這樣也就不會造成業(yè)務(wù)損失涩惑。
2.由于誰提交任務(wù)誰就要負(fù)責(zé)執(zhí)行任務(wù),這樣提交任務(wù)的線程就得負(fù)責(zé)執(zhí)行任務(wù)桑驱,而執(zhí)行任務(wù)又是比較耗時(shí)的竭恬,在這段期間,提交任務(wù)的線程被占用熬的,也就不會再提交新的任務(wù)痊硕,減緩了任務(wù)提交的速度,相當(dāng)于是一個(gè)負(fù)反饋押框。在此期間岔绸,線程池中的線程也可以充分利用這段時(shí)間來執(zhí)行掉一部分任務(wù),騰出一定的空間橡伞,相當(dāng)于是給了線程池一定的緩沖期盒揉。
測試代碼如下:
package com.example.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author liujy
* @description 自定義容量的線程池
* @since 2020-12-30 14:29
*/
public class CustomizedThreadPool {
public static void main(String[] args) {
ExecutorService executorService =
new ThreadPoolExecutor(5, 10, 200l,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
// new ThreadPoolExecutor.AbortPolicy());
// new ThreadPoolExecutor.DiscardPolicy());
// new ThreadPoolExecutor.DiscardOldestPolicy());
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 100; i++) {
executorService.execute(new Task());
}
}
static class Task implements Runnable {
@Override
public void run() {
System.out.println("my name is " + Thread.currentThread().getName());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}