本文目錄
- 為什么要使用線程池抗俄?
- 線程池參數(shù)詳解
- 6種常見的線程池
- 為什么不能直接自動(dòng)創(chuàng)建線程
- 如果自定義合適的線程池?
- 如何正確關(guān)閉線程池?
- 線程池實(shí)現(xiàn)線程復(fù)用原理
為什么要使用線程池刨仑?
為什么要使用線程池占婉?
反復(fù)創(chuàng)建線程系統(tǒng)開銷比較大,而且每個(gè)線程的創(chuàng)建和銷毀都需要時(shí)間,如果任務(wù)比較簡(jiǎn)單,那么有可能導(dǎo)致線程的創(chuàng)建和銷毀占用的資源超過執(zhí)行任務(wù)所消耗的資源.
如果當(dāng)要執(zhí)行的任務(wù)比較多時(shí),每個(gè)線程負(fù)責(zé)一個(gè)任務(wù),那么需要?jiǎng)?chuàng)建很多線程去執(zhí)行任務(wù),過多的線程會(huì)占用過多的內(nèi)存資源等,還會(huì)帶來上下文切換,同時(shí)還會(huì)導(dǎo)致系統(tǒng)不穩(wěn)定.
線程池好處
線程池解決了線程生命周期的系統(tǒng)開銷問題,線程池中的線程可以反復(fù)使用,可以用少量的線程去執(zhí)行大量的任務(wù),減少了線程創(chuàng)建和銷毀的開銷,而且線程都是創(chuàng)建好的,來任務(wù)就可以執(zhí)行.
通過設(shè)置合適的線程池的線程數(shù),可以避免資源使用不當(dāng),線程池可以通過線程數(shù)和任務(wù)靈活的控制線程數(shù)量,任務(wù)多的時(shí)候可以繼續(xù)創(chuàng)建線程,任務(wù)少的時(shí)候只保留核心線程,這樣可以避免系統(tǒng)資源浪費(fèi)和線程過多導(dǎo)致內(nèi)存溢出.
線程池可以統(tǒng)一管理資源,通過線程書和任務(wù)隊(duì)列,可以統(tǒng)一開始和結(jié)束,并設(shè)置相關(guān)的拒絕策略.
線程池參數(shù)詳解
介紹線程池各個(gè)參數(shù)含義
corePoolSize:核心線程數(shù),常駐線程池的線程數(shù)量
maxPoolSize:線程池最大線程數(shù)量,當(dāng)任務(wù)特別多的時(shí)候,corePoolSize線程數(shù)量無法滿足需求的時(shí)候,就會(huì)繼續(xù)創(chuàng)建線程,最大不超過maxPoolSize.
KeepAliveTime+時(shí)間單位:空閑線程的存活時(shí)間
ThreadFactory:線程工廠,用來創(chuàng)建線程
WorkQueue:任務(wù)隊(duì)列,用來存放任務(wù)
Handler:處理被拒絕的策略
線程池處理任務(wù)流程圖:
如上圖所示,流程如下:
當(dāng)提交任務(wù)后,,線程池首先會(huì)檢查當(dāng)前線程數(shù),如果當(dāng)前線程數(shù)小于核心線程數(shù),則新建線程并執(zhí)行任務(wù).
隨著任務(wù)不斷增加,線程數(shù)達(dá)到了核心線程數(shù)的數(shù)量,此時(shí)任務(wù)依然在增加,那么新來的任務(wù)將會(huì)放到workQueue等待隊(duì)列中,等核心線程執(zhí)行完任務(wù)后重新從隊(duì)列中提取出等待被執(zhí)行的任務(wù)
如果已經(jīng)達(dá)到了核心線程數(shù),且任務(wù)隊(duì)列也滿了,則線程池就會(huì)繼續(xù)創(chuàng)建線程來執(zhí)行任務(wù),如果任務(wù)不斷提交,線程池會(huì)持續(xù)創(chuàng)建線程直到達(dá)到maximumPoolSize最大線程數(shù),當(dāng)達(dá)到了最大線程數(shù)后,任務(wù)仍不斷提交,那么此時(shí)就超過了線程池的最大處理能力,這個(gè)時(shí)候線程池就會(huì)拒絕處理這些任務(wù),處理策略就是handler.
corePoolSize和maximumPoolSize:
從上面的流程中可以看出,線程池初始化時(shí),默認(rèn)的線程數(shù)是0,當(dāng)有任務(wù)提交后,開始創(chuàng)建核心線程去執(zhí)行任務(wù),當(dāng)線程數(shù)達(dá)到核心線程數(shù)時(shí)且任務(wù)隊(duì)列滿了后,開始創(chuàng)建非核心線程執(zhí)行任務(wù),最大可以達(dá)到maximumPoolSize,如果這是任務(wù)不提交了,線程開始空閑,那么默認(rèn)情況下大于corePoolSize的線程在超過設(shè)置的KeepAliveTime時(shí)間后會(huì)被合理的收回,所以默認(rèn)情況下,線程池中的線程數(shù)量處于corePoolSize和maximumPoolSize之間.
KeepAliveTime+時(shí)間單位:
默認(rèn)情況下,當(dāng)線程池中的數(shù)量多于核心線程數(shù)時(shí),而此時(shí)有沒有任務(wù)可做,那么線程池就會(huì)檢測(cè)線程的KeepAliveTime,如果超過了規(guī)定的時(shí)間,則無事可做的線程就會(huì)被銷毀,以便減少內(nèi)存的占用和資源消耗,如果后期任務(wù)又多了起來,則線程池根據(jù)規(guī)則重新創(chuàng)建線程,通過這個(gè)可伸縮的功能,可以實(shí)現(xiàn)對(duì)資源的合理使用,我們可以通過setKeepAliveTime設(shè)置keepAliveTime時(shí)間,還可以通過設(shè)置allowCoreThreadTimeOut參數(shù),這個(gè)參數(shù)默認(rèn)是false,如果設(shè)置成ture,則會(huì)給核心線程數(shù)設(shè)置超時(shí)等待時(shí)間, 如果超過時(shí)間了核心線程就會(huì)銷毀.
ThreadFactory:
ThreadFactory是一個(gè)線程工廠,負(fù)責(zé)生產(chǎn)線程去執(zhí)行任務(wù),默認(rèn)的線程工廠,創(chuàng)建的線程會(huì)在同一個(gè)線程組,并且擁有一樣的優(yōu)先級(jí),且都不是守護(hù)線程,我們也可自定義線程工廠,以便給線程自定義名字.
workQueue:
阻塞隊(duì)列,用來存放任務(wù),我們主要分析一下5種阻塞隊(duì)列:
ArrayBlockingQueue是基于數(shù)組的有界阻塞隊(duì)列,按照FIFO排序,新來的隊(duì)列會(huì)放到隊(duì)列尾部,有界的數(shù)組可以防止資源被耗盡問題,當(dāng)線程達(dá)到了核心線程數(shù),再來任務(wù)的時(shí)候就放到隊(duì)列的尾部,當(dāng)隊(duì)列滿了的時(shí)候,則繼續(xù)創(chuàng)建非核心線程,如果線程數(shù)量達(dá)到了maxPoolSize,則會(huì)執(zhí)行拒絕策略.
LinkedBlockingQueue是基于鏈表的無界阻塞隊(duì)列(最大容量是Integer.MAX),按照FIFO排序,當(dāng)線程池中線程數(shù)量達(dá)到核心線程數(shù)時(shí),繼續(xù)來了新任務(wù)會(huì)一直存放到隊(duì)列中,而不會(huì)創(chuàng)建新線程.因此使用此隊(duì)列時(shí),maxPoolSize是不起做的
SynchronousQueue是一個(gè)不緩存任務(wù)的阻塞隊(duì)列,當(dāng)來了新任務(wù)的時(shí)候,不會(huì)緩存到隊(duì)列中,而是直接被線程執(zhí)行該任務(wù),如果沒有核心線程可用就創(chuàng)建新線程去執(zhí)行任務(wù),達(dá)到了maxPoolSize時(shí),就執(zhí)行拒絕策略.
PriorityBlockingQueue是一個(gè)具有優(yōu)先級(jí)的無界阻塞隊(duì)列,優(yōu)先級(jí)通過參數(shù)Comparator實(shí)現(xiàn)
DelayedWorkQueu隊(duì)列的特點(diǎn)是內(nèi)部的任務(wù)并不是按照放入的時(shí)間排序,而是會(huì)按照延遲的時(shí)間長(zhǎng)短對(duì)任務(wù)進(jìn)行排序,內(nèi)部采用的是“堆”數(shù)據(jù)結(jié)構(gòu).而且它也是一個(gè)無界隊(duì)列.
handler:
拒絕策略是當(dāng)線程池中任務(wù)達(dá)到了隊(duì)列最大容量,且線程數(shù)量也達(dá)到了最大maxPoolSize的時(shí)候,如果繼續(xù)有新任務(wù)來了,則執(zhí)行這個(gè)拒絕策略來處理新來的任務(wù),jdk提供4種拒絕策略,它們都實(shí)現(xiàn)了RejectedExecutionHandler接口:
CallRunsPolicy:該策略下,在調(diào)用者線程中直接執(zhí)行被拒絕任務(wù)的run方法,就是誰提交的任務(wù),誰負(fù)責(zé)執(zhí)行任務(wù),這樣任務(wù)不會(huì)丟失,而且執(zhí)行任務(wù)比較費(fèi)時(shí),那么提交任務(wù)的線程也會(huì)被占用,就可以減緩任務(wù)提交速度.
AbortPolicy:該策略下,直接拋棄任務(wù),并拋RejectedExecutionException異常.
DiscardPolicy:該策略下,直接拋棄任務(wù).
DiscardOldestPolicy:該策略下,拋棄最早進(jìn)入隊(duì)列中的那個(gè)任務(wù),然后嘗試把這次拒絕的任務(wù)放入隊(duì)列.
除此之外虱疏,我們還可以通過實(shí)現(xiàn) RejectedExecutionHandler 接口來實(shí)現(xiàn)自己的拒絕策略,在接口中我們需要實(shí)現(xiàn)rejectedExecution方法,在rejectedExecution方法中,執(zhí)行例如暫存任務(wù)、重新執(zhí)行等自定義拒絕策略.
六種常見的線程池
FixedThreadPool
這個(gè)線程池的核心線程數(shù)和最大線程數(shù)是一樣的,所以可以看作是固定線程數(shù)的線程池,特點(diǎn)是當(dāng)線程達(dá)到核心線程數(shù)后,如果任務(wù)隊(duì)列滿了,也不會(huì)創(chuàng)建額外的非核心線程去執(zhí)行任務(wù),而是執(zhí)行拒絕策略.
CachedThreadPool
這個(gè)線程池叫做緩存線程池,特點(diǎn)是線程數(shù)幾乎是可以無限增加的(最大值是Integer.MAX_VALUE,基本不會(huì)達(dá)到),當(dāng)線程閑置時(shí)還可以進(jìn)行回收,而且它采用的存儲(chǔ)任務(wù)的隊(duì)列是SynchronousQueue隊(duì)列,隊(duì)列容量是0,實(shí)際不存儲(chǔ)任務(wù),只負(fù)責(zé)對(duì)任務(wù)的中轉(zhuǎn)和傳遞,所以來一個(gè)任務(wù)線程池就看是否有空閑的線程,有的話就用空閑的線程去執(zhí)行任務(wù),否則就創(chuàng)建一個(gè)線程去執(zhí)行,效率比較高.
ScheduledThreadPool
通過這個(gè)線程池的名字可以看出,它支持定時(shí)或者周期性執(zhí)行任務(wù),實(shí)現(xiàn)這種功能的方法主要有三種:
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(new Task(), 10, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
第一種是schedule,通過延遲指定時(shí)間后執(zhí)行一次任務(wù),代碼中設(shè)置的是10秒,所以10秒后執(zhí)行一次任務(wù)就結(jié)束.
第二種是scheduleAtFixedRate,通過名稱我們可以看出,第二種是以固定頻率去執(zhí)行任務(wù),它的第二個(gè)參數(shù)initialDelay表示第一次延遲時(shí)間,第三個(gè)參數(shù)period表示周期,總體按照上面的代碼意思就是,第一次延遲10秒后執(zhí)行任務(wù),然后,每次延遲10秒執(zhí)行一次任務(wù).
第三種方法是scheduleWithFixeddelay這種與第二種方法類似,也是周期執(zhí)行任務(wù),不同的是對(duì)周期的定義,之前的scheduleAtFixedRate是以任務(wù)的開始時(shí)間為起點(diǎn)開始計(jì)時(shí),時(shí)間到了就開始執(zhí)行第二次任務(wù),而不管任務(wù)需要多久執(zhí)行,而scheduleWithFixeddelay是以任務(wù)結(jié)束的時(shí)間作為下一次循環(huán)開始的時(shí)間起點(diǎn).
SingleThreadExecutor
第四種線程池中只有一個(gè)線程去執(zhí)行任務(wù),如果執(zhí)行任務(wù)過程中發(fā)生了異常,則線程池會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行后續(xù)任務(wù),這個(gè)線程因?yàn)橹挥幸粋€(gè)線程,所以可以保證任務(wù)執(zhí)行的有序性.
SingleThreadScheduleExecutor
這個(gè)線程池它和ScheduledThreadPool很相似,只不過它的內(nèi)部也只有一個(gè)線程,他只是將核心線程數(shù)設(shè)置為了1,如果執(zhí)行期間發(fā)生異常,同樣會(huì)創(chuàng)建一個(gè)新線程去執(zhí)行任務(wù).
ForkJoinPool
最后一種線程池是ForkJoinPool,這個(gè)線程池是來支持將一個(gè)任務(wù)拆分成多個(gè)“小任務(wù)”并行計(jì)算昙读,這個(gè)線程池是在jdk1.7之后加入的,它主要用于實(shí)現(xiàn)“分而治之”的算法擎椰,特別是分治之后遞歸調(diào)用的函數(shù),這里只是對(duì)ForkJoinPool做了一個(gè)簡(jiǎn)單的介紹,我們先來介紹一下ForkJoinPool和之前的線程池主要的兩個(gè)特點(diǎn)施流。
第一點(diǎn)是fork和join:
我們現(xiàn)來看看fork和join的含義响疚,fork就是將任務(wù)分解成多個(gè)子任務(wù),多個(gè)子任務(wù)互相獨(dú)立瞪醋,不受影響忿晕,執(zhí)行的時(shí)候可以利用 CPU 的多核優(yōu)勢(shì),并行計(jì)算银受,計(jì)算完成后各個(gè)子任務(wù)在調(diào)用join方法進(jìn)行結(jié)果匯總践盼,第一步是拆分也就是 Fork,第二步是匯總也就是 Join宾巍,我們通過下圖來理解:
我們通過舉例斐波那契數(shù)列來展示這個(gè)線程池的使用咕幻。
1.首先我們創(chuàng)建任務(wù)類FibonacciTask繼承RecursiveTask類,重寫compute方法顶霞。其中的ForkJoinTask代表一個(gè)可以并行肄程、合并的任務(wù),F(xiàn)orkJoinTask是一個(gè)抽象類选浑,它還有兩個(gè)抽象子類:RecusiveAction和RecusiveTask蓝厌。其中RecusiveTask代表有返回值的任務(wù),而RecusiveAction代表沒有返回值的任務(wù)古徒,
2.我們?cè)赾ompute方法中實(shí)現(xiàn)斐波那契數(shù)列計(jì)算并獲取返回值拓提。
3.在main方法中創(chuàng)建ForkJoinPool,并調(diào)用線程池的submit(ForkJoinTask<T>task)方法隧膘,通過獲取返回的task.get()方法獲取計(jì)算的返回值代态。
任務(wù)類:FibonacciTask
/**
* 這里我們的定義任務(wù)類繼承RecursiveTask寺惫,需要重寫一個(gè)compute方法,或者任務(wù)執(zhí)行的返回值
* RecursiveAction和RecursiveTask是ForkJoinTask的兩個(gè)抽象子類蹦疑,
* 其中的ForkJoinTask西雀,代表一個(gè)可以并行、合并的任務(wù)其中RecursiveAction
* 表示沒有返回值的任務(wù)必尼,RecursiveTask是有返回值的任務(wù)
*/
public class FibonacciTask extends RecursiveTask<Integer> {
private int i;
FibonacciTask(int i){
this.i=i;
}
@Override
protected Integer compute() {
if(i<=1){
return i;
}
FibonacciTask f1=new FibonacciTask(i-1);
//用 fork() 方法分裂任務(wù)并分別執(zhí)行
f1.fork();
FibonacciTask f2=new FibonacciTask(i-2);
f2.fork();
//使用 join() 方法把結(jié)果匯總
return f1.join()+f2.join();
}
}
main方法:
public static void main(String[] args) {
ForkJoinPool forkJoinPool=new ForkJoinPool();
for(int i=0;i<10;i++){
ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(i));
try {
System.out.println(task.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
計(jì)算結(jié)果如下:
第二點(diǎn)是內(nèi)部結(jié)構(gòu)不同:
之前的線程池所有的線程共用一個(gè)隊(duì)列蒋搜,但 ForkJoinPool 線程池中每個(gè)線程都有自己獨(dú)立的任務(wù)隊(duì)列,這個(gè)隊(duì)列是雙端隊(duì)列,如圖下所示:
ForkJoinPool 線程池內(nèi)部除了有一個(gè)共用的任務(wù)隊(duì)列之外判莉,每個(gè)線程還有一個(gè)對(duì)應(yīng)的雙端隊(duì)列 deque,這時(shí)一旦線程中的任務(wù)被 Fork 分裂了育谬,分裂出來的子任務(wù)放入線程自己的 deque 里券盅,而不是放入公共的任務(wù)隊(duì)列中(公共任務(wù)隊(duì)列采用數(shù)組存放),如果此時(shí)有三個(gè)子任務(wù)放入線程 t1 的 deque 隊(duì)列中膛檀,對(duì)于線程 t1 而言獲取任務(wù)的成本就降低了锰镀,可以直接在自己的任務(wù)隊(duì)列中獲取而不必去公共隊(duì)列中爭(zhēng)搶也不會(huì)發(fā)生阻塞(除了后面會(huì)講到的 steal 情況外),減少了線程間的競(jìng)爭(zhēng)和切換咖刃,是非常高效的泳炉。
我們?cè)倏紤]一種情況,此時(shí)線程有多個(gè)嚎杨,而線程 t1 的任務(wù)特別繁重花鹅,分裂了數(shù)十個(gè)子任務(wù),但是 t0 此時(shí)卻無事可做枫浙,它自己的 deque 隊(duì)列為空刨肃,這時(shí)為了提高效率,t0 就會(huì)想辦法幫助 t1 執(zhí)行任務(wù)箩帚,這就是“work-stealing”的含義真友。
雙端隊(duì)列 deque 中,線程 t1 獲取任務(wù)的邏輯是后進(jìn)先出紧帕,也就是LIFO(Last In Frist Out)盔然,而線程 t0 在“steal”偷線程 t1 的 deque 中的任務(wù)的邏輯是先進(jìn)先出,也就是FIFO(Fast In Frist Out)是嗜,如圖所示愈案,圖中很好的描述了兩個(gè)線程使用雙端隊(duì)列分別獲取任務(wù)的情景。你可以看到叠纷,使用 “work-stealing” 算法和雙端隊(duì)列很好地平衡了各線程的負(fù)載刻帚。
最后,我們用一張全景圖來描述 ForkJoinPool 線程池的內(nèi)部結(jié)構(gòu)涩嚣,你可以看到 ForkJoinPool 線程池和其他線程池很多地方都是一樣的崇众,但重點(diǎn)區(qū)別在于它每個(gè)線程都有一個(gè)自己的雙端隊(duì)列來存儲(chǔ)分裂出來的子任務(wù)掂僵。ForkJoinPool 非常適合用于遞歸的場(chǎng)景,例如樹的遍歷顷歌、最優(yōu)路徑搜索等場(chǎng)景锰蓬。
為什么不能直接自動(dòng)創(chuàng)建線程池
首先自動(dòng)創(chuàng)建線程池通過直接調(diào)用Executors.newCachedThreadPool()方法直接創(chuàng)建線程池.但是開發(fā)中我們不能直接使用創(chuàng)建的線程池,原因如下:
FixedThreadPool
通過下面FiexdThreadPool內(nèi)部代碼可以看出,FixedThreadPool內(nèi)部調(diào)用的是ThreadPoolExecutor的構(gòu)造函數(shù),構(gòu)造函數(shù)中是的的阻塞隊(duì)列是LinkedBlockingQueue,那么這就帶來了問題,當(dāng)任務(wù)處理速度比較慢的時(shí)候,雖然新增任務(wù)越來越多,隊(duì)列中堆積的任務(wù)就越來越多,最終會(huì)占用大量?jī)?nèi)存,并發(fā)生OOM,就會(huì)嚴(yán)重影響到程序運(yùn)行.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
SingleThreadExecutor
通過看下面SingleThreadExecutor的內(nèi)部代碼可以發(fā)現(xiàn),newSingleThreadExecutor和newFixedThreadPool的原理是一樣的,只不過是核心線程數(shù)和最大線程數(shù)都設(shè)置成了1,但是任務(wù)隊(duì)列還是無界的LinkedBlockingQueue,所以也會(huì)導(dǎo)致任務(wù)堆積,發(fā)生OOM問題.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
CachedThreadPool
繼續(xù)看下圖CachedThreadPool內(nèi)部代碼,從代碼中可以看出,CachedThreadPool使用的任務(wù)阻塞隊(duì)列是SynchronousQueue,SynchronousQueue隊(duì)列我們前面介紹過,并不存儲(chǔ)任務(wù),只是對(duì)任務(wù)進(jìn)行直接轉(zhuǎn)發(fā),這個(gè)隊(duì)列不會(huì)引發(fā)OOM問題,但是我們?cè)诳醋畲缶€程數(shù)設(shè)置成了Integer.MAX_VALUE,所以CachedThreadPool線程池并不線程的數(shù)量,那么任務(wù)特別多的時(shí)候,就會(huì)創(chuàng)建非常多的線程,進(jìn)而導(dǎo)致系統(tǒng)內(nèi)存不足.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
ScheduledThreadPool和SingleThreadScheduledExector
ScheduledThreadPool和SingleThreadScheduledExector差不多,只不過是后者線程池中只有一個(gè)線程眯漩,ScheduledThreadPool的源碼如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
我們?cè)谶M(jìn)入ScheduledThreadPoolExecutor構(gòu)造方法中去,從下圖可以看出,它采用的任務(wù)隊(duì)列是DelayWorkQueue,上面我們說過這個(gè)隊(duì)列一個(gè)延遲隊(duì)列同時(shí)也是一個(gè)無界隊(duì)列,所以它和LinkedBlockingQueue一樣,如果任務(wù)過多就可能OOM,代碼如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
SingleThreadExecutor
第四種線程池中只有一個(gè)線程去執(zhí)行任務(wù),如果執(zhí)行任務(wù)過程中發(fā)生了異常,則線程池會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行后續(xù)任務(wù),這個(gè)線程因?yàn)橹挥幸粋€(gè)線程,所以可以保證任務(wù)執(zhí)行的有序性.
如何自定義合適的線程池
這個(gè)也是面試中會(huì)被問到的問題,如果自定義合適的線程池呢?首先我們要調(diào)整線程池中的線程數(shù)量以便充分并合理的使用CPU和內(nèi)存資源,從而最大限度的提高性能.
CPU密集型任務(wù)
如果任務(wù)是一些列比較消耗CPU資源的任務(wù),比如加密芹扭、解密、壓縮赦抖、計(jì)算等,那么最佳線程數(shù)是CPU核心數(shù)的1~2倍,過多很導(dǎo)致占用大量CPU資源,這時(shí)每個(gè)CPU的核心工作基本都是滿負(fù)荷,設(shè)置過多的線程會(huì)造成不必要的上下文切換,而導(dǎo)致性能下降,而且在同一臺(tái)機(jī)器上,我們還要考慮到其他會(huì)占用較多CPU資源的程序運(yùn)行,然后做整體平衡.
耗時(shí)IO任務(wù)
例如數(shù)據(jù)庫舱卡、文件的讀寫,網(wǎng)絡(luò)通信等任務(wù),這種任務(wù)的特點(diǎn)是不會(huì)消耗很多CPU資源,但是IO操作很費(fèi)時(shí).這個(gè)時(shí)候可以設(shè)置最大線程數(shù)一般會(huì)大于CPU核心線程數(shù)很多倍,因?yàn)镮O速度相比于CPU速度比較慢,我們?cè)O(shè)置較少的線程數(shù),就會(huì)浪費(fèi)CPU資源,如果設(shè)置更多的線程數(shù),那么一部分線程正在等待IO的時(shí)候,他們此時(shí)不需要CPU計(jì)算,就能有更多線程去執(zhí)行IO操作,任務(wù)隊(duì)列中的等待任務(wù)就會(huì)減少,更合理的利用了資源.
java并發(fā)編程實(shí)戰(zhàn)中有推薦:線程數(shù) = CPU核心數(shù) *(1+平均等待時(shí)間/平均工作時(shí)間),我們可以通過這個(gè)式子計(jì)算出一個(gè)合理的線程數(shù)量,同時(shí)也可以根據(jù)進(jìn)行壓測(cè)队萤、監(jiān)控jvm的線程情況等方式,確定線程數(shù),更合理的利用資源.
總結(jié)以上特點(diǎn)可以得出以下幾點(diǎn):
線程的平均工作時(shí)間所占比例越高,就需要越少線程.
線程的平均等待時(shí)間所占比例越高,就需要越多的線程
針對(duì)不同的程序,進(jìn)行對(duì)應(yīng)的實(shí)際測(cè)試就可以獲得更合適的選擇.
如何正確關(guān)閉線程池
首先有5種在ThreadPoolExecutor中涉及的關(guān)閉線程的方法轮锥,我們挨個(gè)來分析。
void shutdown()
它可以安全的關(guān)閉一個(gè)線程池,調(diào)用shutdown()方法后,線程池不會(huì)立刻關(guān)閉,而是等執(zhí)行完正在執(zhí)行的任務(wù)和隊(duì)列中等待的任務(wù)后才徹底關(guān)閉,而且調(diào)用shutdown()方法后,如果還有新的任務(wù)繼續(xù)到來,那么線程池會(huì)根據(jù)拒絕策略直接拒絕后面來的新任務(wù).
boolean isShutdown()
這個(gè)方法可以返回ture或者false來判斷是否已經(jīng)開始了關(guān)閉工作,也就是是否執(zhí)行了shutdown或者shutdownNow方法,調(diào)用isShutdown()方法后如果返回true,并不代表線程池已經(jīng)徹底關(guān)閉了,僅僅代表開始了關(guān)閉流程,仍然可能有線程正在執(zhí)行任務(wù),隊(duì)列里也可能有任務(wù)等待被執(zhí)行.
boolean isTerminated()
這個(gè)方法可以檢測(cè)是否真正關(guān)閉了,不僅代表線程池是否已經(jīng)關(guān)閉,同時(shí)也代表線程池中的所有任務(wù)是否已經(jīng)都執(zhí)行完畢,比如已經(jīng)調(diào)用了shutdown()方法,但是有一個(gè)線程正在執(zhí)行任務(wù),則此時(shí)調(diào)用isShutdown方法返回true,而調(diào)用isTerminated方法便返回false,,因?yàn)榫€程池中還有任務(wù)再執(zhí)行,線程池沒有真正關(guān)閉,直到所有線程都執(zhí)行完畢,任務(wù)都執(zhí)行完畢,再調(diào)用isTermainted就返回ture.
boolean awaitTermination(long timeout,TimeUnit unit)要尔,throws IntereuptedException
awaitTermination并不是用來關(guān)閉線程池的,而是用來判斷線程池狀態(tài)的,參數(shù)需要傳入一個(gè)時(shí)間,如果我們?cè)O(shè)置10秒鐘,那么會(huì)有以下幾種情況:
等待期間,線程池已經(jīng)關(guān)閉且所有提交的任務(wù)都執(zhí)行完畢,那么方法就返回ture,相當(dāng)于線程池真正關(guān)閉了.
等待時(shí)間超時(shí)后,第一種情況未發(fā)生,那么方法返回false.
等待時(shí)間中,執(zhí)行任務(wù)的線程被中斷了,方法會(huì)拋出InterruptedException異常.
所以綜上可以看出,調(diào)用 awaitTermination 方法后當(dāng)前線程會(huì)嘗試等待一段指定的時(shí)間舍杜,如果在等待時(shí)間內(nèi),線程池已關(guān)閉并且內(nèi)部的任務(wù)都執(zhí)行完畢了赵辕,也就是說線程池真正“終結(jié)”了既绩,那么方法就返回 true,否則超時(shí)返回 fasle,我們則可以根據(jù) awaitTermination() 返回的布爾值來判斷下一步應(yīng)該執(zhí)行的操作还惠。
List<Runnable> shutdownNow()
調(diào)用shutdownNow()方法后,首先會(huì)給所有線程池中的線程發(fā)送interrupt中斷信號(hào),嘗試中斷這些任務(wù)的執(zhí)行,然后就任務(wù)隊(duì)列中在等待被執(zhí)行的任務(wù)轉(zhuǎn)移到一個(gè)List中并返回,我們可以再根據(jù)List做一些操作,shutdownNow() 的源碼如下所示:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//轉(zhuǎn)換線程運(yùn)行狀態(tài)
advanceRunState(STOP);
//讓每一個(gè)已經(jīng)啟動(dòng)的線程都中斷,如果被中斷的線程對(duì)于中斷信號(hào)不理不睬
//那么依然有可能導(dǎo)致任務(wù)不會(huì)停止
interruptWorkers();
//將隊(duì)列中任務(wù)放入tasks集合中,并返回.
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
線程池復(fù)用原理
線程池復(fù)用原理
線程池可以把線程和任務(wù)進(jìn)行解耦饲握,線程歸線程,任務(wù)歸任務(wù)吸重,擺脫了之前通過 Thread 創(chuàng)建線程時(shí)的一個(gè)線程必須對(duì)應(yīng)一個(gè)任務(wù)的限制互拾。在線程池中,同一個(gè)線程可以從 BlockingQueue 中不斷提取新任務(wù)來執(zhí)行嚎幸,其核心原理在于線程池對(duì) Thread 進(jìn)行了封裝颜矿,并不是每次執(zhí)行任務(wù)都會(huì)調(diào)用 Thread.start() 來創(chuàng)建新線程,而是讓每個(gè)線程去執(zhí)行一個(gè)“循環(huán)任務(wù)”嫉晶,在這個(gè)“循環(huán)任務(wù)”中骑疆,不停地檢查是否還有任務(wù)等待被執(zhí)行,如果有則直接去執(zhí)行這個(gè)任務(wù)替废,也就是調(diào)用任務(wù)的 run 方法箍铭,把 run 方法當(dāng)作和普通方法一樣的地位去調(diào)用,相當(dāng)于把每個(gè)任務(wù)的 run() 方法串聯(lián)了起來椎镣,所以線程數(shù)量并不增加诈火。其中execute代碼如下:
public void execute(Runnable command) {
//如果傳入的Runnable的空,就拋出異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* 當(dāng)前線程數(shù)是否小于核心線程數(shù)状答,如果小于核心線程數(shù)就調(diào)用 addWorker()
* 方法增加一個(gè) Worker冷守,這里的 Worker 就可以理解為一個(gè)線程
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
我們先來分析上面部分代碼刀崖,我們先分析下面這一段代碼:
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
我們主要看addWorK(comond,tue)方法,addWorker 方法的主要作用是在線程池中創(chuàng)建一個(gè)線程并執(zhí)行第一個(gè)參數(shù)傳入的任務(wù)拍摇,它的第二個(gè)參數(shù)是個(gè)布爾值亮钦,如果布爾值傳入 true 代表增加線程時(shí)判斷當(dāng)前線程是否少于 corePoolSize,小于則增加新線程充活,大于等于則不增加蜂莉;同理,如果傳入 false 代表增加線程時(shí)判斷當(dāng)前線程是否少于 maxPoolSize混卵,小于則增加新線程映穗,大于等于則不增加,所以這里的布爾值的含義是以核心線程數(shù)為界限還是以最大線程數(shù)為界限進(jìn)行是否新增線程的判斷淮菠。addWorker() 方法如果返回 true 代表添加成功男公,如果返回 false 代表添加失敗。
接下來我們看下面這部分代碼
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
如果代碼執(zhí)行到這里合陵,說明當(dāng)前線程數(shù)大于或等于核心線程數(shù)或者 addWorker 失敗了,那么就需要通過 if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態(tài)是否為 Running澄阳,如果線程池狀態(tài)是 Running 就把任務(wù)放入任務(wù)隊(duì)列中拥知,也就是 workQueue.offer(command)。如果線程池已經(jīng)不處于 Running 狀態(tài)碎赢,說明線程池被關(guān)閉低剔,那么就移除剛剛添加到任務(wù)隊(duì)列中的任務(wù),并執(zhí)行拒絕策略肮塞。
接下來我們上面這部分代碼的else分支邏輯:
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
能進(jìn)入這個(gè) else 說明前面判斷到線程池狀態(tài)為 Running襟齿,那么當(dāng)任務(wù)被添加進(jìn)來之后就需要防止沒有可執(zhí)行線程的情況發(fā)生(比如之前的線程被回收了或意外終止了),所以此時(shí)如果檢查當(dāng)前線程數(shù)為 0枕赵,也就是 workerCountOf**(recheck) == 0猜欺,那就執(zhí)行 addWorker() 方法新建線程。
接著我們?cè)倏醋詈笠徊糠执a:
else if (!addWorker(command, false))
reject(command);
執(zhí)行到這里拷窜,說明線程池不是 Running 狀態(tài)或線程數(shù)大于或等于核心線程數(shù)并且任務(wù)隊(duì)列已經(jīng)滿了开皿,所以此時(shí)需要添加新線程,直到線程數(shù)達(dá)到“最大線程數(shù)”篮昧,所以此時(shí)就會(huì)再次調(diào)用 addWorker 方法并將第二個(gè)參數(shù)傳入 false赋荆,傳入 false 代表增加線程時(shí)判斷當(dāng)前線程數(shù)是否少于 maxPoolSize,小于則增加新線程懊昨,大于等于則不增加窄潭,也就是以 maxPoolSize 為上限創(chuàng)建新的 worker;addWorker 方法如果返回 true 代表添加成功酵颁,如果返回 false 代表任務(wù)添加失敗嫉你,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maxPoolSize月帝,然后執(zhí)行拒絕策略 reject 方法。如果執(zhí)行到這里線程池的狀態(tài)不是 Running均抽,那么 addWorker 會(huì)失敗并返回 false嫁赏,所以也會(huì)執(zhí)行拒絕策略 reject 方法。
所以看到這里我們需要著重分析addWorker()方法油挥,這里的 Worker 可以理解為是對(duì) Thread 的包裝潦蝇,Worker 內(nèi)部有一個(gè) Thread 對(duì)象,它正是最終真正執(zhí)行任務(wù)的線程深寥,所以一個(gè) Worker 就對(duì)應(yīng)線程池中的一個(gè)線程攘乒,addWorker 就代表增加線程。我們看部分addWorker內(nèi)的方法:
boolean workerStarted = false;
boolean workerAdded = false;
//worker是內(nèi)部類實(shí)現(xiàn)了接口Runnable惋鹅,封裝了Thread
Worker w = null;
try {
//獲取隊(duì)列第一個(gè)任務(wù)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//集合则酝,包含池中的所有工作線程。只有當(dāng)持有主鎖時(shí)才能訪問闰集。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//調(diào)用線程的start方法沽讹。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
通過上圖中的注釋我們可以看出,addWork方法實(shí)際上是調(diào)用自己封裝的線程的start方法來啟動(dòng)線程武鲁,我們繼續(xù)看worker內(nèi)部類的run方法是如何實(shí)現(xiàn)的:
public void run() {
runWorker(this);
}
?
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取第一個(gè)要執(zhí)行的任務(wù)爽雄,先進(jìn)先出
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//實(shí)現(xiàn)線程復(fù)用的邏輯主要在一個(gè)不停循環(huán)的 while 循環(huán)體中
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//直接調(diào)用task的run方法,而不是新建線程
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
從上圖中我們可以看到內(nèi)部類worker的run()方法實(shí)際上是調(diào)用runWorker(this)方法沐鼠,實(shí)現(xiàn)線程復(fù)用的邏輯主要是在一個(gè)不同的循環(huán)體while中進(jìn)行挚瘟,所以在runWorker(this)方法中主要做了兩件事:
通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中獲取待執(zhí)行的任務(wù)。
直接調(diào)用 task 的 run 方法來執(zhí)行具體的任務(wù)(而不是新建線程饲梭,調(diào)用線程的start()方法)乘盖。
好了,本篇文章主要分析了線程池的基本概念和核心原理,也是作者對(duì)線程池學(xué)習(xí)的各方面的總結(jié)憔涉,基本上看完本篇文章能應(yīng)對(duì)很多線程池的相關(guān)面試以及日常開發(fā)需求订框,如果有什么不足或者錯(cuò)誤的地方希望讀者能給出建議!