Executor的主要作用是解耦任務(wù)提交和任務(wù)執(zhí)行(包括如何使用線程捧书,如何調(diào)度)
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
Executor本身并不表示使用線程
ExecutorService提供了關(guān)閉機(jī)制以及提交任務(wù)返回Future對(duì)象用于追蹤任務(wù)執(zhí)行進(jìn)度或取消任務(wù)伴嗡。
先分析一下AbstractExecutorService實(shí)現(xiàn)中用到的Future的實(shí)現(xiàn)類FutureTask的實(shí)現(xiàn)機(jī)制
如果當(dāng)前的任務(wù)是Runnable,通過RunnableAdapter轉(zhuǎn)為Callable
FutureTask自身實(shí)現(xiàn)了Runnable栏尚,包裝內(nèi)部的Callable或Runnable
Future#get實(shí)現(xiàn)機(jī)制:如果任務(wù)還沒開始,調(diào)用線程加入任務(wù)的等待隊(duì)列,等待任務(wù)完成或取消時(shí)被喚醒,否則等待任務(wù)到達(dá)最終狀態(tài)辕近,正常執(zhí)行返回結(jié)果或異常時(shí)拋出ExecutionException異常
Future#cancle實(shí)現(xiàn)機(jī)制:如果任務(wù)還沒開始,狀態(tài)改為INTERRUPTING或CANCELLED亏推,如果支持中斷,打斷當(dāng)前線程(參考run方法年堆,先設(shè)置runner線程,再修改狀態(tài)盏浇,所以可能當(dāng)前狀態(tài)是NEW变丧,但是runner已經(jīng)設(shè)置了),然后喚醒所有之前等待的線程绢掰。
AbstractExecutorService的實(shí)現(xiàn)機(jī)制:任務(wù)的具體執(zhí)行都委托給從Executor繼承的execute方法痒蓬,主要實(shí)現(xiàn)了submit和invokeAll,invokeAny方法滴劲。
任務(wù)的執(zhí)行都委托給Executor攻晒,所有提交的任務(wù)都用QueueingFuture包裝,任務(wù)執(zhí)行完加入內(nèi)部的BlockingQueue班挖。
invokeAny:先提交一個(gè)任務(wù)鲁捏,然后循環(huán)檢查ExecutorCompletionService的阻塞隊(duì)列是否有已完成的任務(wù),有就返回萧芙,沒有就再提交一個(gè)新任務(wù)给梅,直到任務(wù)都提交完假丧,然后阻塞。第一個(gè)任務(wù)完成后动羽,cancel所有可以cancel的任務(wù)包帚。
AbstractExecutorService有兩個(gè)具體的子類:ThreadPoolExecutor和ForkJoinPool,ScheduledThreadPoolExecutor又繼承了ThreadPoolExecutor
ThreadPoolExecutor:
workQueue表示任務(wù)隊(duì)列运吓,workers表示當(dāng)前執(zhí)行任務(wù)的線程集合渴邦。
Worker繼承了AbstractQueuedSynchronizer,自身就是一個(gè)簡(jiǎn)單的互斥鎖拘哨,實(shí)現(xiàn)了Runnable谋梭,Worker在構(gòu)造時(shí)內(nèi)部會(huì)利用ThreadFactory產(chǎn)生一個(gè)線程,線程啟動(dòng)時(shí)宅静,執(zhí)行Worker自身的run方法章蚣。
Worker執(zhí)行過程中,會(huì)通過getTask獲取任務(wù)姨夹,每次執(zhí)行任務(wù)之前都會(huì)獲取worker自身的互斥鎖
getTask通過返回null(線程池stop纤垂,或shutdown之后任務(wù)隊(duì)列為空,或者動(dòng)態(tài)調(diào)整參數(shù)之后線程太多磷账,或者獲取任務(wù)超時(shí)(說明任務(wù)太少了峭沦,不需要那么多線程)),控制Worker結(jié)束循環(huán)
Worker循環(huán)結(jié)束有兩種原因:執(zhí)行的任務(wù)拋出異常逃糟,getTask返回null吼鱼。
如果是后者,再次檢查以確保目前的線程數(shù)不低于最低要求绰咽,線程數(shù)不夠時(shí)添加worker線程菇肃。因異常而結(jié)束任務(wù)循環(huán)也會(huì)添加新的worker線程。
添加worker失敗的原因有三:線程池stop取募;shutdown之后任務(wù)隊(duì)列為空琐谤;當(dāng)前線程數(shù)超過最大線程數(shù)。worker添加成功之后玩敏,啟動(dòng)內(nèi)部的線程斗忌,開始循環(huán)處理任務(wù)。
關(guān)鍵點(diǎn)在于旺聚,核心線程全部啟動(dòng)之后织阳,任務(wù)會(huì)先加入任務(wù)隊(duì)列,只有任務(wù)隊(duì)列是有界隊(duì)列砰粹,且隊(duì)列滿了才會(huì)啟動(dòng)非核心線程_蠖恪!!
shutdown之后惊窖,修改狀態(tài)為SHUTDOWN刽宪,然后打斷所有idle線程,所謂idle界酒,就是可以獲取worker的互斥鎖圣拄,說明worker當(dāng)前在等待任務(wù)而不是執(zhí)行任務(wù),參考runWorker方法毁欣。如果當(dāng)前所有worker正巧都在等待任務(wù)庇谆,所有worker都會(huì)被打斷(processWorkerExit方法會(huì)在worker退出循環(huán)時(shí)調(diào)用,根據(jù)情況再添加worker)凭疮。tryTerminate中會(huì)先檢查如果當(dāng)前狀態(tài)是SHUTDOWN但是任務(wù)隊(duì)列不為空饭耳,不能進(jìn)入terminal狀態(tài),如果當(dāng)前是shutdown且任務(wù)隊(duì)列為空且線程數(shù)為空执解,修改狀態(tài)為過渡狀態(tài)TIDYING寞肖,然后修改為最終狀態(tài)TERMINATED。
打斷所有已經(jīng)啟動(dòng)的worker衰腌,返回所有還未執(zhí)行的任務(wù)新蟆。
shutdown之后線程池并不一定關(guān)閉!S胰铩琼稻!所以正確的做法是shutdown之后調(diào)用awaitTermination等待所有任務(wù)執(zhí)行完后所有線程被打斷。
ThreadPoolExecutor可控制參數(shù):
corePoolSize:核心線程數(shù)饶囚,worker數(shù)量小于corePoolSize時(shí)每次提交任務(wù)都啟動(dòng)一個(gè)core線程帕翻,可以使用set方法在運(yùn)行時(shí)調(diào)整。
maximumPoolSize:最大線程數(shù)萝风,包括core和非core線程嘀掸,從上面的源碼分析可以直到只有任務(wù)隊(duì)列為有界隊(duì)列時(shí)才會(huì)啟動(dòng)非core線程。
workQueue:任務(wù)隊(duì)列规惰,只有任務(wù)隊(duì)列為有界隊(duì)列時(shí)才會(huì)啟動(dòng)非core線程横殴。
keepAliveTime:worker在指定時(shí)間內(nèi)獲取不到任務(wù),說明此時(shí)人浮于事卿拴,需要裁員,getTask會(huì)返回null梨与,結(jié)束獲取任務(wù)超時(shí)的worker堕花。
threadFactory:定義如何產(chǎn)生線程追他,默認(rèn)直接new Thread雕擂。
handler:提交任務(wù)時(shí)任務(wù)隊(duì)列滿了或線程池shutdown之后的行為,默認(rèn)拋出RejectedExecutionException異常员串,可選策略包括忽略(DiscardPolicy),在提交任務(wù)的線程中執(zhí)行(CallerRunsPolicy)壕曼,移除任務(wù)隊(duì)列里最前面的任務(wù)(DiscardOldestPolicy)苏研。
keepAliveTime:如果通過set設(shè)置了值,如果一個(gè)worker超過指定時(shí)間未獲得任務(wù)就會(huì)timeout而結(jié)束循環(huán)腮郊,如果當(dāng)前線程數(shù)超過了corePoolSize摹蘑,不會(huì)再添加新的worker,默認(rèn)不支持timeout轧飞。
allowCoreThreadTimeOut:默認(rèn)線程數(shù)小于corePoolSize衅鹿,timeout之后就會(huì)添加新的worker,如果設(shè)置了allowCoreThreadTimeOut过咬,只有當(dāng)前線程為0時(shí)才會(huì)添加新的worker大渤。
下面分析一下ThreadPoolExecutor的子類ScheduledThreadPoolExecutor的實(shí)現(xiàn)機(jī)制:
從構(gòu)造上看,主要是任務(wù)隊(duì)列使用了DelayedWorkQueue掸绞,DelayedWorkQueue是一個(gè)簡(jiǎn)單的基于二叉堆實(shí)現(xiàn)的優(yōu)先級(jí)阻塞無界隊(duì)列泵三,所有任務(wù)按觸發(fā)時(shí)刻排序,keepAliveTime為0衔掸,不支持worker超時(shí)烫幕。從上文的分析可知,使用無界隊(duì)列時(shí)是不會(huì)啟動(dòng)非core線程的具篇,maximumPoolSize設(shè)置成了Integer.MAX_VALUE而不是corePoolSize纬霞,避免運(yùn)行時(shí)修改corePoolSize時(shí)還要修改maximumPoolSize。
所有提交的任務(wù)都會(huì)用ScheduledFutureTask包裝
任務(wù)先按觸發(fā)時(shí)刻排序驱显,同時(shí)觸發(fā)的任務(wù)按提交順序排序
如果是重復(fù)任務(wù)诗芜,任務(wù)執(zhí)行完,計(jì)算下次觸發(fā)時(shí)刻埃疫,重新加入任務(wù)隊(duì)列伏恐。此處有一個(gè)細(xì)節(jié):就算是fixed-rate的任務(wù),也是上次執(zhí)行完之后才會(huì)再次加入任務(wù)隊(duì)列栓霜。
shutdown之后不允許提交新任務(wù)翠桦,如果是之前提交的延遲任務(wù)還沒到時(shí)間或者是周期性任務(wù),根據(jù)參數(shù)決定是否還能繼續(xù)執(zhí)行胳蛮,默認(rèn)運(yùn)行繼續(xù)等待執(zhí)行延遲任務(wù)销凑,不允許執(zhí)行周期任務(wù)。
ForkJoinPool:日后補(bǔ)充=龃丁6酚住!
下面來分析一下Executors里的靜態(tài)方法構(gòu)造的都是什么線程:
無界隊(duì)列抚垄,不支持timeout蜕窿,固定線程數(shù)谋逻。
newSingleThreadExecutor = newFixedThreadPool(1)
使用特殊的隊(duì)列SynchronousQueue,相當(dāng)于容量為1的阻塞隊(duì)列桐经,只有這樣毁兆,如果已經(jīng)有任務(wù)在等待執(zhí)行了,再次提交任務(wù)時(shí)才會(huì)啟動(dòng)非core線程阴挣。