線(xiàn)程池
使用線(xiàn)程池的目的
先說(shuō)一下我們?yōu)槭裁匆褂镁€(xiàn)程池禽绪?
- 線(xiàn)程是稀缺資源蓖救,不能頻繁的創(chuàng)建洪规。而且創(chuàng)建和銷(xiāo)毀線(xiàn)程也是比較占用系統(tǒng)開(kāi)銷(xiāo)的。
- 為了做到解耦循捺,線(xiàn)程的創(chuàng)建與執(zhí)行任務(wù)分開(kāi)斩例,方便對(duì)線(xiàn)程進(jìn)行維護(hù)。
- 為了復(fù)用从橘,前面也說(shuō)了創(chuàng)建和銷(xiāo)毀線(xiàn)程比較耗系統(tǒng)開(kāi)銷(xiāo)念赶,那么創(chuàng)建出來(lái)線(xiàn)程放到一個(gè)池子里,可以給其他任務(wù)進(jìn)行復(fù)用恰力。
線(xiàn)程池是如何一步一步創(chuàng)建的
第一版
正常的我們?cè)趧?chuàng)建一個(gè)線(xiàn)程去執(zhí)行任務(wù)的時(shí)候是這樣的:
new Thread(r).start();
但是這是最基本的方式叉谜,我們的項(xiàng)目中有可能很多地方都需要?jiǎng)?chuàng)建一個(gè)新的線(xiàn)程。這個(gè)使用為了減少重復(fù)代碼踩萎,我們會(huì)把這段創(chuàng)建線(xiàn)程的代碼放的一個(gè)工具類(lèi)里面正罢,然后對(duì)外提供工具方法,使用的時(shí)候直接調(diào)用此方法即可驻民。
第二版
/**
* 先定義接口(任務(wù)執(zhí)行器)
*/
public interface Executor {
/**
* 執(zhí)行任務(wù)
* @param runnable 線(xiàn)程任務(wù)
*/
void execute(Runnable runnable);
}
/**
* 實(shí)現(xiàn):直接創(chuàng)建線(xiàn)程。
*/
class ExecutorImpl implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
這種方式實(shí)現(xiàn)了創(chuàng)建線(xiàn)程的代碼的復(fù)用履怯,但是并沒(méi)有實(shí)現(xiàn)線(xiàn)程資源的復(fù)用回还,當(dāng)有1000個(gè)地方需要線(xiàn)程的時(shí)候,會(huì)創(chuàng)建1000個(gè)線(xiàn)程叹洲。
第三版
為了實(shí)現(xiàn)資源也復(fù)用柠硕,增加一個(gè)阻塞隊(duì)列,當(dāng)來(lái)了創(chuàng)建線(xiàn)程的任務(wù)的時(shí)候运提,先放到隊(duì)列里蝗柔,然后再用一個(gè)線(xiàn)程(Worker),來(lái)處理任務(wù)民泵。這樣就完成了線(xiàn)程資源的復(fù)用了癣丧,全程只有一個(gè)線(xiàn)程在來(lái)回的復(fù)用,一直在處理隊(duì)列中的任務(wù)栈妆。
通過(guò)上面的方式胁编,實(shí)現(xiàn)了線(xiàn)程資源的復(fù)用,并且也起到提交任務(wù)和處理任務(wù)之間的解耦鳞尔。但是只有一個(gè)線(xiàn)程處理任務(wù)嬉橙,會(huì)有瓶頸的,所以具體需要多少線(xiàn)程來(lái)處理任務(wù)最好是根據(jù)具體的業(yè)務(wù)場(chǎng)景來(lái)確定寥假,這樣我們把這個(gè)值市框,設(shè)置成一個(gè)參數(shù),當(dāng)創(chuàng)建線(xiàn)程池的時(shí)候傳入糕韧,就叫corePoolSize
吧枫振。
而且任務(wù)隊(duì)列最好也要有容量喻圃,但也應(yīng)該是根據(jù)業(yè)務(wù)場(chǎng)景來(lái)配置容量,而且任務(wù)隊(duì)列還可以定制一些規(guī)則蒋得,例如:按照一定的規(guī)則出隊(duì)级及。所以我們把任務(wù)隊(duì)列也配置成參數(shù),在創(chuàng)建線(xiàn)程池的時(shí)候傳入额衙。參數(shù)名稱(chēng)就叫:workQueue
吧饮焦。
當(dāng)隊(duì)列中任務(wù)滿(mǎn)了之后,任務(wù)就會(huì)被拋棄窍侧,但是如果是重要業(yè)務(wù)任務(wù)县踢,還不能拋棄,所以伟件,當(dāng)隊(duì)列中任務(wù)滿(mǎn)了之后硼啤,在線(xiàn)程池沒(méi)有資源處理任務(wù)的時(shí)候,拒絕策略斧账,我們也根據(jù)業(yè)務(wù)場(chǎng)景來(lái)確定谴返,這樣也在創(chuàng)建的時(shí)候傳入一種拒絕策略,參數(shù)名稱(chēng)就叫:rejectedExecutionHandler
咧织。
繼續(xù)優(yōu)化
雖然多了上面的三個(gè)參數(shù)后效果優(yōu)化了不少嗓袱,但是還可以繼續(xù)優(yōu)化:
- 并不用上來(lái)就創(chuàng)建
corePoolSize
數(shù)量的線(xiàn)程,我們可以增加了一個(gè)變量workCount
习绢,來(lái)記錄已經(jīng)創(chuàng)建出來(lái)了工作線(xiàn)程渠抹,這樣在初始化的時(shí)候只有workCount<corePoolSize
的時(shí)候,我們才創(chuàng)建線(xiàn)程來(lái)執(zhí)行任務(wù)闪萄,當(dāng)workCount>CorePoolSize
的時(shí)候梧却,再來(lái)了任務(wù),就去進(jìn)隊(duì)列败去。 - 在增加拒絕策略的時(shí)候放航,我定義一個(gè)接口:
RejectedExecutionHandler
,然后使用者可以自己去實(shí)現(xiàn)這個(gè)接口圆裕,來(lái)完成自己的拒絕策略三椿。 - 增加一個(gè)線(xiàn)程工廠(chǎng)的入?yún)ⅲ?code>ThreadFactory,這樣保證每次創(chuàng)建線(xiàn)程的時(shí)候不用手動(dòng)去創(chuàng)建線(xiàn)程了葫辐,而是通過(guò)
ThreadFactory
來(lái)獲取線(xiàn)程搜锰,并且也可以增加一些線(xiàn)程的標(biāo)識(shí)。
第四版
雖然說(shuō)第三版的線(xiàn)程池已經(jīng)可以應(yīng)對(duì)日常工作中的情況了耿战,但是還是不夠有彈性蛋叼,所謂的彈性就是指,在任務(wù)提交頻繁時(shí)應(yīng)該處理能力提高,任務(wù)提交不頻繁時(shí)處理能力應(yīng)該降低狈涮。
上面這版線(xiàn)程池就不夠彈性狐胎。
如果某個(gè)時(shí)間段,任務(wù)提交量劇增歌馍,這個(gè)時(shí)候握巢,corePoolSize
和隊(duì)列都滿(mǎn)了,再來(lái)提交任務(wù)就只能走拒絕策略了松却。
你或許會(huì)想到暴浦,那我可以增大corePoolSize
的值,這樣就會(huì)創(chuàng)建出來(lái)更多的線(xiàn)程來(lái)處理任務(wù)晓锻,但是這個(gè)任務(wù)提交量劇增歌焦,只是某個(gè)時(shí)間段,過(guò)了這個(gè)時(shí)間段之后砚哆,創(chuàng)建出來(lái)這么多的線(xiàn)程独撇,可以大部分都會(huì)是空閑的狀態(tài)。這樣也是浪費(fèi)資源了躁锁。
這樣就導(dǎo)致了一個(gè)兩難的情況纷铣,corePoolSize
的值設(shè)置太大了也不好,設(shè)置太小了也不好战转。
這個(gè)時(shí)候搜立,為讓線(xiàn)程池做到彈性伸縮,我們可以為他再添加一個(gè)參數(shù):maximumPoolSize
匣吊,這個(gè)參數(shù)代表的意思是最大線(xiàn)程數(shù)。
當(dāng)corePoolSize
和workQueue
都滿(mǎn)了的時(shí)候寸潦,新提交的任務(wù)仍然可以創(chuàng)建新線(xiàn)程來(lái)進(jìn)行處理色鸳,這些超過(guò)corePoolSize
創(chuàng)建出來(lái)的線(xiàn)程,被稱(chēng)為非核心線(xiàn)程见转。當(dāng)corePoolSize
與非核心線(xiàn)程數(shù)量的和等于maximumPoolSize
再執(zhí)行拒絕策略命雀。
通過(guò)這樣的方式,corePoolSize
斩箫,負(fù)責(zé)平時(shí)情況的線(xiàn)程使用量吏砂,maximumPoolSize
負(fù)責(zé)提交任務(wù)高峰時(shí)的,臨時(shí)擴(kuò)充容量乘客。
但是目前這樣的方式只是考慮到了提交任務(wù)量高峰時(shí)期的擴(kuò)充狐血,但這個(gè)高峰期只是暫時(shí)的,過(guò)了這個(gè)高峰期易核,非核心線(xiàn)程一直放著也是浪費(fèi)資源匈织,所以我們?cè)僭O(shè)定一個(gè)非核心線(xiàn)程的空閑活躍時(shí)間的參數(shù):keepAliveTime
,這樣當(dāng)非核心線(xiàn)程數(shù),空閑時(shí)間超過(guò)這個(gè)值就銷(xiāo)毀線(xiàn)程缀匕,釋放資源纳决。
這一版的線(xiàn)程池,做到了在提交任務(wù)高峰時(shí)可臨時(shí)擴(kuò)容乡小,低谷時(shí)又可及時(shí)回收非核心線(xiàn)程阔加,從而節(jié)省資源。真正的做到了收放自如满钟。
通過(guò)上面幾版線(xiàn)程池的改進(jìn)胜榔,最終改進(jìn)成了和Java中的線(xiàn)程池原理基本相似了。這樣也能更透徹的理解創(chuàng)建線(xiàn)程池時(shí)要傳入的這幾個(gè)關(guān)鍵參數(shù)的意義了零远。
下面說(shuō)幾個(gè)線(xiàn)程池常見(jiàn)的考察點(diǎn)
Java中的線(xiàn)程池的阻塞隊(duì)列都有哪幾種
ArrayBlockingQueue
: 有界隊(duì)列苗分,按照阻塞的先后順序訪(fǎng)問(wèn)隊(duì)列,默認(rèn)情況下不保證線(xiàn)程公平的訪(fǎng)問(wèn)隊(duì)列~如果要保證公平性牵辣,會(huì)降低一定的吞吐量摔癣。底層是靠ReentrantLock
來(lái)實(shí)現(xiàn)的,每一個(gè)方法中纬向,都是靠ReentrantLock
加鎖來(lái)完成阻塞择浊。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
-
LinkedBlockingQueue
:基于鏈表的阻塞隊(duì)列,按照先進(jìn)先出的順序排列逾条,在不設(shè)置隊(duì)列長(zhǎng)度的時(shí)候默認(rèn)Integer.MAX_VALUE琢岩。所以認(rèn)為當(dāng)不設(shè)置隊(duì)列長(zhǎng)度時(shí),LinkedBlockingQueue
為無(wú)界隊(duì)列师脂。當(dāng)指定了隊(duì)列長(zhǎng)度后變?yōu)橛薪珀?duì)列担孔,通常LinkedBlockingQueue
的吞吐量要高于ArrayBlockingQueue
; -
SynchronousQueue
:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作吃警,否則插入操作一直處于阻塞狀態(tài)糕篇。在不允許任務(wù)在隊(duì)列中等待的時(shí)候可以使用此隊(duì)列。 -
DelayQueue
:延遲獲取元素隊(duì)列酌心,按照指定時(shí)間后獲取拌消,為無(wú)界阻塞隊(duì)列。 -
PriorityBlockingQueue
:優(yōu)先級(jí)排序隊(duì)列安券,按照一定的優(yōu)先級(jí)對(duì)任務(wù)進(jìn)行排序墩崩,默認(rèn)是小頂堆。 -
LinkedBlockingDeque
:基于鏈表的雙端阻塞隊(duì)列侯勉。
Java提供了哪幾個(gè)默認(rèn)的線(xiàn)程池鹦筹,為什么實(shí)際開(kāi)發(fā)中不建議直接使用?
-
Executors.newCachedThreadPool
();:阻塞隊(duì)列采用的SynchronousQueue
址貌,所以是不存儲(chǔ)等待任務(wù)的盛龄,并且最大線(xiàn)程數(shù)的值是Integer.MAX_VALUE。所以當(dāng)任務(wù)提交量高峰時(shí),相當(dāng)于無(wú)限制的創(chuàng)建線(xiàn)程余舶。并且空閑時(shí)間是60秒啊鸭,QPS高峰期最終會(huì)將服務(wù)器資源耗盡,所以真正實(shí)際應(yīng)用中不建議使用匿值。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- Executors.newFixedThreadPool(int nThreads);:可重用固定線(xiàn)程數(shù)的線(xiàn)程池赠制,源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心線(xiàn)程數(shù)和最大線(xiàn)程數(shù)相等的線(xiàn)程池,并且阻塞任務(wù)隊(duì)列還是一個(gè)無(wú)界隊(duì)列挟憔,這樣钟些,當(dāng)處理任務(wù)的線(xiàn)程數(shù)量達(dá)到核心線(xiàn)程數(shù)時(shí),再提交的任務(wù)都會(huì)進(jìn)行到阻塞隊(duì)列里绊谭,但是阻塞隊(duì)列是無(wú)界的政恍,這樣就提交任務(wù)高峰期有可能會(huì)造成任務(wù)一直堆積在隊(duì)列里,超出內(nèi)存容量最終導(dǎo)致內(nèi)存溢出达传。
-
Executors.newScheduledThreadPool(int corePoolSize)
;:一個(gè)定長(zhǎng)線(xiàn)程池篙耗,支持定時(shí)及周期性任務(wù)執(zhí)行,這個(gè)線(xiàn)程池的最大線(xiàn)程數(shù)也是Integer.MAX_VALUE宪赶,可以理解為會(huì)無(wú)限創(chuàng)建線(xiàn)程宗弯。存在將資源耗盡的風(fēng)險(xiǎn),所以一般場(chǎng)景下不建議使用搂妻。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
-
Executors.newSingleThreadExecutor();
這種線(xiàn)程池蒙保,會(huì)創(chuàng)建一個(gè)線(xiàn)程數(shù)固定是1的線(xiàn)程池,并且任務(wù)隊(duì)列是無(wú)界的LinkedBlockingQueue欲主,存在任務(wù)隊(duì)列無(wú)限添加造成OOM的風(fēng)險(xiǎn)邓厕。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
-
Executors.newWorkStealingPool();
:一個(gè)具有搶占式操作的線(xiàn)程池。
參數(shù)中傳入的是一個(gè)線(xiàn)程并發(fā)的數(shù)量扁瓢,這里和之前就有很明顯的區(qū)別详恼,前面4種線(xiàn)程池都有核心線(xiàn)程數(shù)、最大線(xiàn)程數(shù)等等涤妒,而這就使用了一個(gè)并發(fā)線(xiàn)程數(shù)解決問(wèn)題单雾。這個(gè)線(xiàn)程池不會(huì)保證任務(wù)的順序執(zhí)行赚哗,也就是 WorkStealing 的意思她紫,搶占式的工作,哪個(gè)線(xiàn)程搶到任務(wù)就執(zhí)行屿储。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
Java中的線(xiàn)程池提供了哪幾種拒絕策略
-
AbortPolicy
:該策略默認(rèn)是飽和策略贿讹。當(dāng)不能在處理提交的任務(wù)時(shí),直接拋出RejectedExecutionException够掠,使用者可以自行捕獲此異常民褂。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
-
CallerRunsPolicy
:該策略是在線(xiàn)程池處理不了任務(wù)時(shí),交給提交任務(wù)的主線(xiàn)程去處理任務(wù),主線(xiàn)程在處理任務(wù)的時(shí)候赊堪,不能在提交任務(wù)了面殖,這樣線(xiàn)程池就可以有時(shí)間去處理堆積的任務(wù)了。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
-
DiscardOldestPolicy
:該策略是哭廉,拋棄最老的任務(wù)脊僚,然后再?lài)L試提交任務(wù),若阻塞隊(duì)列使用PriorityBlockingQueue優(yōu)先級(jí)隊(duì)列遵绰,將會(huì)導(dǎo)致優(yōu)先級(jí)最高的任務(wù)被拋棄辽幌,所以在阻塞隊(duì)列為PriorityBlockingQueue時(shí),不建議使用此策略椿访。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
-
DiscardPolicy
:這是一個(gè)比較任性的策略乌企,當(dāng)線(xiàn)程池處理不了任務(wù)時(shí),直接拋棄成玫,再來(lái)了新任務(wù)也直接拋棄加酵。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
-
RejectHandler
:
直接拋拒絕異常。
public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) {
throw new RejectedExecutionException();
}
Java中線(xiàn)程池核心線(xiàn)程數(shù)與最大線(xiàn)程數(shù)該如何配置
可以根據(jù)提交的任務(wù)不同梁剔,將線(xiàn)程池分開(kāi)虽画。
- 處理CPU密集型任務(wù),線(xiàn)程數(shù)量應(yīng)該較少荣病,可為
N(CPU核數(shù))+1
或N(CPU核數(shù)) * 2
码撰,因?yàn)榇藭r(shí)線(xiàn)程一定調(diào)度到某個(gè)CPU執(zhí)行,若任務(wù)本身是CPU綁定的任務(wù)个盆,那么過(guò)多的線(xiàn)程只會(huì)增加線(xiàn)程切換的開(kāi)銷(xiāo)脖岛,而不能提升吞吐量,但可能需要較長(zhǎng)隊(duì)列做緩沖颊亮。 - I/O密集型任務(wù)柴梆,執(zhí)行較慢、數(shù)量不大的IO任務(wù)终惑,要考慮更多線(xiàn)程數(shù)绍在,而無(wú)需太大隊(duì)列。相比計(jì)算型任務(wù)雹有,需多一些線(xiàn)程偿渡,要結(jié)合具體的 I/O 阻塞時(shí)長(zhǎng)考慮。
但是實(shí)際情況下霸奕,有些任務(wù)是既耗CPU資源溜宽,又占用I/O資源的。所以這個(gè)時(shí)候可以采用類(lèi)似美團(tuán)技術(shù)提出方案质帅,實(shí)時(shí)的監(jiān)控線(xiàn)程池狀態(tài)信息适揉,然后對(duì)線(xiàn)程池的數(shù)據(jù)進(jìn)行調(diào)整留攒。
在監(jiān)控線(xiàn)程池的時(shí)候可以使用如下幾個(gè)線(xiàn)程池屬性:
-
getTaskCount()
:線(xiàn)程池需要執(zhí)行的任務(wù)數(shù)量。 -
completedTaskCount
:線(xiàn)程池在運(yùn)行過(guò)程中已完成的任務(wù)數(shù)量嫉嘀,小于或等于taskCount炼邀。 -
largestPoolSize
:線(xiàn)程池里曾經(jīng)創(chuàng)建過(guò)的最大線(xiàn)程數(shù)量。通過(guò)這個(gè)數(shù)據(jù)可以知道線(xiàn)程池是否曾經(jīng)滿(mǎn)過(guò)剪侮,如該數(shù)值等于線(xiàn)程池的最大線(xiàn)程數(shù)量汤善,則表示線(xiàn)程池曾經(jīng)滿(mǎn)過(guò)。 -
getPoolSize()
:線(xiàn)程池的線(xiàn)程數(shù)量票彪,如果線(xiàn)程池不銷(xiāo)毀的話(huà)红淡,線(xiàn)程池里的線(xiàn)程不會(huì)自動(dòng)銷(xiāo)毀,所以這個(gè)大小只增不減降铸。 -
getActiveCount()
:獲取活動(dòng)的線(xiàn)程數(shù)在旱。