1、線程池常用接口介紹
1.1沪袭、Executor
public interface Executor {
void execute(Runnable command);
}
執(zhí)行提交的Runnable任務(wù)艇抠。其中的execute方法在將來的某個(gè)時(shí)候執(zhí)行給定的任務(wù),該任務(wù)可以在新線程讯榕、池化線程或調(diào)用線程中執(zhí)行,具體由Executor的實(shí)現(xiàn)者決定骤素。
1.2、ExecutorService
ExecutorService繼承自Executor愚屁,下面挑幾個(gè)方法介紹:
1.2.1济竹、shutdown()
void shutdown();
啟動(dòng)有序關(guān)閉線程池,在此過程中執(zhí)行先前提交的任務(wù)霎槐,但不接受任何新任務(wù)送浊。如果線程池已經(jīng)關(guān)閉,調(diào)用此方法不會(huì)產(chǎn)生額外的效果丘跌。此方法不等待以前提交的任務(wù)完成執(zhí)行,可以使用awaitTermination去實(shí)現(xiàn)袭景。
1.2.2、shutdownNow()
List<Runnable> shutdownNow();
嘗試停止所有正在積極執(zhí)行的任務(wù)闭树, 停止處理等待的任務(wù)耸棒,并返回等待執(zhí)行的任務(wù)列表。 此方法不等待以前提交的任務(wù)完成執(zhí)行,可以使用awaitTermination去實(shí)現(xiàn)报辱。除了盡最大努力停止處理積極執(zhí)行的任務(wù)外与殃,沒有任何保證。例如碍现,典型的實(shí)現(xiàn)是:通過Thread#interrupt取消任務(wù)執(zhí)行幅疼,但是任何未能響應(yīng)中斷的任務(wù)都可能永遠(yuǎn)不會(huì)終止。
1.2.3鸵赫、isShutdown()
boolean isShutdown();
返回線程池關(guān)閉狀態(tài)衣屏。
1.2.4、isTerminated()
boolean isTerminated();
如果關(guān)閉后所有任務(wù)都已完成辩棒,則返回 true狼忱。注意,除非首先調(diào)用了shutdown或shutdownNow一睁,否則isTerminated永遠(yuǎn)不會(huì)返回true钻弄。
1.2.5、awaitTermination(long timeout, TimeUnit unit)
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
線程阻塞阻塞者吁,直到所有任務(wù)都在shutdown請(qǐng)求之后執(zhí)行完畢窘俺,或者超時(shí)發(fā)生,或者當(dāng)前線程被中斷(以先發(fā)生的情況為準(zhǔn))复凳。
1.2.6瘤泪、submit
<T> Future<T> submit(Callable<T> task);
提交一個(gè)value-returning任務(wù)以執(zhí)行灶泵,并返回一個(gè)表示該任務(wù)未決結(jié)果的Future。 Future的 get方法將在成功完成任務(wù)后返回任務(wù)的結(jié)果对途。
1.3赦邻、ScheduledExecutorService
安排命令在給定的延遲之后運(yùn)行,或者定期執(zhí)行,繼承自ExecutorService接口由以下四個(gè)方法組成:
//在給定延遲之后啟動(dòng)任務(wù)实檀,返回ScheduledFuture
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個(gè)周期性操作惶洲,該操作在給定的初始延遲之后首次啟動(dòng),然后在給定的周期內(nèi)執(zhí)行;
//如果任務(wù)的任何執(zhí)行遇到異常膳犹,則禁止后續(xù)執(zhí)行恬吕。否則,任務(wù)只會(huì)通過執(zhí)行器的取消或終止而終止须床。
//如果此任務(wù)的任何執(zhí)行時(shí)間超過其周期铐料,則后續(xù)執(zhí)行可能會(huì)延遲開始,但不會(huì)并發(fā)執(zhí)行侨颈。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//創(chuàng)建并執(zhí)行一個(gè)周期性操作余赢,該操作在給定的初始延遲之后首次啟動(dòng),然后在一次執(zhí)行的終止和下一次執(zhí)行的開始之間使用給定的延遲哈垢。
//如果任務(wù)的任何執(zhí)行遇到異常妻柒,則禁止后續(xù)執(zhí)行。否則耘分,任務(wù)只會(huì)通過執(zhí)行器的取消或終止而終止举塔。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
1.4、ThreadFactory
public interface ThreadFactory {
Thread newThread(Runnable r);
}
按需創(chuàng)建新線程的對(duì)象求泰。
1.5央渣、Callable
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
返回任務(wù)結(jié)果也可能拋出異常。
1.6渴频、Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
Future表示異步計(jì)算的結(jié)果芽丹。方法用于檢查計(jì)算是否完成,等待計(jì)算完成并檢索計(jì)算結(jié)果卜朗。只有當(dāng)計(jì)算完成時(shí)拔第,才可以使用方法get檢索結(jié)果,如果需要场钉,可以阻塞蚊俺,直到準(zhǔn)備好為止。取消由cancel方法執(zhí)行逛万。還提供了其他方法來確定任務(wù)是否正常完成或被取消泳猬。一旦計(jì)算完成,就不能取消計(jì)算。
1.7得封、Delayed
public interface Delayed extends Comparable<Delayed> {
//在給定的時(shí)間單位中返回與此對(duì)象關(guān)聯(lián)的剩余延遲
long getDelay(TimeUnit unit);
}
一種混合風(fēng)格的接口埋心,用于標(biāo)記在給定延遲之后應(yīng)該執(zhí)行的對(duì)象。
1.8呛每、ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {}
2踩窖、線程池工作流程
新任務(wù)進(jìn)來時(shí):
- 如果當(dāng)前運(yùn)行的線程少于corePoolSize坡氯,則創(chuàng)建新線程(核心線程)來執(zhí)行任務(wù)晨横。
- 如果運(yùn)行的線程等于或多于corePoolSize ,則將任務(wù)加入BlockingQueue。
- 如果BlockingQueue隊(duì)列已滿箫柳,則創(chuàng)建新的線程(非核心)來處理任務(wù)手形。
- 如果核心線程與非核心線程總數(shù)超出maximumPoolSize,任務(wù)將被拒絕悯恍,并調(diào)用RejectedExecutionHandler拒絕策略库糠。
3、ThreadPoolExecutor介紹
構(gòu)造方法:
public ThreadPoolExecutor(
int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
參數(shù)說明:
- corePoolSize
除非設(shè)置了 allowCoreThreadTimeOut涮毫,否則要保留在線程池中的線程數(shù)(即使它們是空閑的)瞬欧。 - maximumPoolSize
線程池中允許的最大線程數(shù)。 - keepAliveTime
當(dāng)線程數(shù)大于corePoolSize時(shí)罢防,這是多余的空閑線程在終止新任務(wù)之前等待新任務(wù)的最長時(shí)間艘虎。 - unit
keepAliveTime參數(shù)的時(shí)間單位。 - workQueue
用于在任務(wù)執(zhí)行前保存任務(wù)的隊(duì)列咒吐。這個(gè)隊(duì)列只包含execute方法提交的Runnable任務(wù)野建。 - threadFactory
執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠。 - handler
由于達(dá)到線程邊界和隊(duì)列容量而阻塞執(zhí)行時(shí)使用的處理程序恬叹。
3.1候生、BlockingQueue
- SynchronousQueue
不存儲(chǔ)元素的阻塞隊(duì)列,一個(gè)插入操作绽昼,必須等待移除操作結(jié)束唯鸭,每個(gè)任務(wù)一個(gè)線程。使用的時(shí)候maximumPoolSize一般指定成Integer.MAX_VALUE硅确。 - LinkedBlockingQueue
如果當(dāng)前線程數(shù)大于等于核心線程數(shù)目溉,則進(jìn)入隊(duì)列等待。由于這個(gè)隊(duì)列沒有最大值限制疏魏,即所有超過核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中停做。 - ArrayBlockingQueue
可以限定隊(duì)列的長度,接收到任務(wù)的時(shí)候大莫,如果沒有達(dá)到corePoolSize的值蛉腌,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊(duì)等候烙丛,如果隊(duì)列已滿舅巷,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize河咽,并且隊(duì)列也滿了钠右,則執(zhí)行拒絕策略。 - DelayQueue
隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口忘蟹,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn)Delayed接口飒房。這個(gè)隊(duì)列接收到任務(wù)時(shí),首先先入隊(duì)媚值,只有達(dá)到了指定的延時(shí)時(shí)間狠毯,才會(huì)執(zhí)行任務(wù)。 - priorityBlockingQuene
具有優(yōu)先級(jí)的無界阻塞隊(duì)列褥芒。
3.2嚼松、RejectedExecutionHandler
有4個(gè)ThreeadPoolExecutor內(nèi)部類。
- AbortPolicy
直接拋出異常锰扶,默認(rèn)策略献酗。 - CallerRunsPolicy
用調(diào)用者所在的線程來執(zhí)行任務(wù)。 - DiscardOldestPolicy
丟棄阻塞隊(duì)列中靠最前的任務(wù)坷牛,并執(zhí)行當(dāng)前任務(wù)罕偎。
4、DiscardPolicy
直接丟棄任務(wù)漓帅。
最好自定義飽和策略锨亏,實(shí)現(xiàn)RejectedExecutionHandler接口,如:記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。
3.3忙干、線程池大小設(shè)置
- CPU密集型
盡量使用較小的線程池器予,減少CUP上下文切換,一般設(shè)置為CPU核心數(shù)+1捐迫。 - IO密集型
可以適當(dāng)加大線程池?cái)?shù)量乾翔,IO多,所以在等待IO的時(shí)候施戴,充分利用CPU,一般設(shè)置為CPU核心數(shù)2倍反浓。
但是對(duì)于一些特別耗時(shí)的IO操作,盲目的用線程池可能也不是很好赞哗,通過異步+單線程輪詢雷则,上層再配合上一個(gè)固定的線程池,效果可能更好肪笋,參考Reactor模型月劈。 - 混合型
視具體情況而定度迂。
3.4、任務(wù)提交
- Callable
通過submit函數(shù)提交猜揪,返回Future對(duì)象惭墓。 - Runnable
通過execute提交,沒有返回結(jié)果而姐。
3.5腊凶、關(guān)閉線程池
- shutdown()
僅停止阻塞隊(duì)列中等待的線程,那些正在執(zhí)行的線程就會(huì)讓他們執(zhí)行結(jié)束拴念。 - shutdownNow()
不僅會(huì)停止阻塞隊(duì)列中的線程钧萍,而且會(huì)停止正在執(zhí)行的線程。
4丈莺、線程池實(shí)現(xiàn)原理
4.1划煮、 線程池狀態(tài)
線程池的內(nèi)部狀態(tài)由AtomicInteger修飾的ctl表示,其高3位表示線程池的運(yùn)行狀態(tài)缔俄,低29位表示線程池中的線程數(shù)量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
主池控制狀態(tài)ctl是一個(gè)原子整數(shù)器躏,包含兩個(gè)概念字段:
- workerCount:指示有效線程數(shù)俐载。
- runState:指示是否運(yùn)行、關(guān)閉等登失。
為了將這兩個(gè)字段打包成一個(gè)整型遏佣,所以將workerCount限制為(229)-1個(gè)線程,而不是(231)-1個(gè)線程揽浙。
workerCount是工作線程數(shù)量状婶。該值可能與實(shí)際活動(dòng)線程的數(shù)量存在暫時(shí)性差異,例如馅巷,當(dāng)ThreadFactory在被請(qǐng)求時(shí)無法創(chuàng)建線程膛虫,以及退出的線程在終止前仍在執(zhí)行bookkeeping時(shí)。 用戶可見的池大小報(bào)告為工作線程集的當(dāng)前大小钓猬。
runState提供了生命周期稍刀,具有以下值:
- RUNNING:接受新任務(wù)并處理排隊(duì)的任務(wù)
- SHUTDOWN:不接受新任務(wù),而是處理隊(duì)列的任務(wù)敞曹。
- STOP:不接受新任務(wù)账月,不處理隊(duì)列的任務(wù),中斷正在進(jìn)行的任務(wù)澳迫。
- TIDYING:所有任務(wù)都已終止局齿,workerCount為零,過渡到狀態(tài)TIDYING的線程將運(yùn)行terminated()鉤子方法橄登。
- TERMINATED:terminated()方法執(zhí)行完畢抓歼。
為了允許有序比較担平,這些值之間的數(shù)值順序很重要。運(yùn)行狀態(tài)會(huì)隨著時(shí)間單調(diào)地增加锭部,但不需要達(dá)到每個(gè)狀態(tài)暂论。轉(zhuǎn)換:
- RUNNING -> SHUTDOWN
在調(diào)用shutdown()時(shí),可以隱式地在finalize()中調(diào)用拌禾。 - (RUNNING or SHUTDOWN) -> STOP
調(diào)用shutdownNow()取胎。 - SHUTDOWN -> TIDYING
當(dāng)隊(duì)列和池都為空時(shí)。 - STOP -> TIDYING
當(dāng)池是空的時(shí)候湃窍。 - TIDYING -> TERMINATED
當(dāng)terminated()鉤子方法完成時(shí)闻蛀。
當(dāng)狀態(tài)達(dá)到TERMINATED時(shí),在awaitTermination()中等待的線程將返回您市。
下面看以下其他狀態(tài)信息:
//Integer.SIZE為32觉痛,COUNT_BITS為29
private static final int COUNT_BITS = Integer.SIZE - 3;
//2^29-1 最大線程數(shù)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 即高3位為111,該狀態(tài)的線程池會(huì)接收新任務(wù)茵休,并處理阻塞隊(duì)列中的任務(wù)薪棒;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原碼:0000 ... 0001 反碼:1111 ... 1110 補(bǔ)碼:1111 ... 1111
* 左移操作:后面補(bǔ) 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位為000,該狀態(tài)的線程池不會(huì)接收新任務(wù)榕莺,但會(huì)處理阻塞隊(duì)列中的任務(wù)俐芯;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位為001,該狀態(tài)的線程不會(huì)接收新任務(wù)钉鸯,也不會(huì)處理阻塞隊(duì)列中的任務(wù)吧史,而且會(huì)中斷正在* 運(yùn)行的任務(wù);
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位為010唠雕,所有任務(wù)都已終止贸营,workerCount為零,過渡到狀態(tài)TIDYING的線程將運(yùn)行terminated()鉤子方法岩睁;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位為011钞脂,terminated()方法執(zhí)行完畢;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//根據(jù)ctl計(jì)算runState
private static int runStateOf(int c) {
//2^29 = 001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假設(shè)c為 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最終值: 001 0 0000 0000 0000 0000 0000 0000 0000
return c & ~CAPACITY;
}
//根據(jù)ctl計(jì)算 workerCount
private static int workerCountOf(int c) {
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//假設(shè)c = 000 0 0000 0000 0000 0000 0000 0000 0001 1個(gè)線程
//最終值: 000 0 0000 0000 0000 0000 0000 0000 0001 1
return c & CAPACITY;
}
// 根據(jù)runState和workerCount計(jì)算ctl
private static int ctlOf(int rs, int wc) {
//假設(shè) rs: STOP 001 0 0000 0000 0000 0000 0000 0000 0000
//假設(shè) wc: 000 0 0000 0000 0000 0000 0000 0000 0001 1個(gè)線程
//最終值: 001 0 0000 0000 0000 0000 0000 0000 0001
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//RUNNING狀態(tài)為負(fù)數(shù)笙僚,肯定小于SHUTDOWN芳肌,返回線程池是否為運(yùn)行狀態(tài)
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//試圖增加ctl的workerCount字段值。
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//嘗試減少ctl的workerCount字段值肋层。
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//遞減ctl的workerCount字段亿笤。這只在線程突然終止時(shí)調(diào)用(請(qǐng)參閱processWorkerExit)。在getTask中執(zhí)行其他遞減栋猖。
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
Doug Lea大神的設(shè)計(jì)啊净薛,感覺計(jì)算機(jī)的基礎(chǔ)真的是數(shù)學(xué)。
4.2蒲拉、 內(nèi)部類Worker
Worker繼承了AbstractQueuedSynchronizer肃拜,并且實(shí)現(xiàn)了Runnable接口痴腌。
維護(hù)了以下三個(gè)變量,其中completedTasks由volatile修飾燃领。
//線程這個(gè)工作程序正在運(yùn)行士聪。如果工廠失敗,則為空猛蔽。
final Thread thread;
//要運(yùn)行的初始任務(wù)剥悟。可能是null曼库。
Runnable firstTask;
//線程任務(wù)計(jì)數(shù)器
volatile long completedTasks;
構(gòu)造方法:
//使用ThreadFactory中給定的第一個(gè)任務(wù)和線程創(chuàng)建区岗。
Worker(Runnable firstTask) {
//禁止中斷,直到運(yùn)行工作程序
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
既然實(shí)現(xiàn)了Runnable接口毁枯,必然實(shí)現(xiàn)run方法:
//Delegates main run loop to outer runWorker
public void run() {
//核心
runWorker(this);
}
4.3慈缔、runWorker(Worker w)執(zhí)行任務(wù)
先看一眼執(zhí)行流程圖,再看源碼种玛,會(huì)更清晰一點(diǎn):
首先來看runWorker(Worker w)源碼:
final void runWorker(Worker w) {
//獲取當(dāng)前線程
Thread wt = Thread.currentThread();
//獲取第一個(gè)任務(wù)
Runnable task = w.firstTask;
//第一個(gè)任務(wù)位置置空
w.firstTask = null;
//因?yàn)閃orker實(shí)現(xiàn)了AQS藐鹤,此處是釋放鎖,new Worker()是state==-1蒂誉,此處是調(diào)用Worker類的 release(1)方法教藻,將state置為0。Worker中interruptIfStarted()中只有state>=0才允許調(diào)用中斷
w.unlock();
//是否突然完成右锨,如果是由于異常導(dǎo)致的進(jìn)入finally,那么completedAbruptly==true就是突然完成的
boolean completedAbruptly = true;
try {
//先處理firstTask碌秸,之后依次處理其他任務(wù)
while (task != null || (task = getTask()) != null) {
//獲取鎖
w.lock();
//如果池停止绍移,確保線程被中斷;如果沒有,請(qǐng)確保線程沒有中斷讥电。這需要在第二種情況下重新檢查蹂窖,以處理清除中斷時(shí)的shutdownNow競爭
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//自定義實(shí)現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//自定義實(shí)現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
//任務(wù)完成數(shù)+1
w.completedTasks++;
//釋放鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
//Worker的結(jié)束后的處理工作
processWorkerExit(w, completedAbruptly);
}
}
下面再來看上述源碼中的getTask()與processWorkerExit(w, completedAbruptly)方法:
4.3.1、getTask()
根據(jù)當(dāng)前配置設(shè)置執(zhí)行阻塞或定時(shí)等待任務(wù)恩敌,或者如果該worker因?yàn)槿魏卧虮仨毻顺鏊膊猓瑒t返回null,在這種情況下workerCount將遞減。
返回空的情況:
- 大于 maximumPoolSize 個(gè) workers(由于調(diào)用setMaximumPoolSize)
- 線程池關(guān)閉
- 線程池關(guān)閉了并且隊(duì)列為空
- 這個(gè)worker超時(shí)等待任務(wù)纠炮,超時(shí)的worker在超時(shí)等待之前和之后都可能終止(即allowCoreThreadTimeOut || workerCount > corePoolSize)月趟,如果隊(duì)列不是空的,那么這個(gè)worker不是池中的最后一個(gè)線程恢口。
private Runnable getTask() {
// Did the last poll() time out?
boolean timedOut = false;
for (; ; ) {
//獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
//僅在必要時(shí)檢查隊(duì)列是否為空孝宗。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//遞減ctl的workerCount字段
decrementWorkerCount();
return null;
}
//獲取workerCount數(shù)量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//線程超時(shí)控制
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//嘗試減少ctl的workerCount字段
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果有超時(shí)控制,則使用帶超時(shí)時(shí)間的poll耕肩,否則使用take,沒有任務(wù)的時(shí)候一直阻塞因妇,這兩個(gè)方法都會(huì)拋出InterruptedException
Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take();
//有任務(wù)就返回
if (r != null)
return r;
//獲取任務(wù)超時(shí)问潭,肯定是走了poll邏輯
timedOut = true;
} catch (InterruptedException retry) {
//被中斷
timedOut = false;
}
}
}
4.3.1、processWorkerExit(Worker w, boolean completedAbruptly)
為垂死的worker進(jìn)行清理和bookkeeping婚被。僅從工作線程調(diào)用狡忙。除非completedAbruptly被設(shè)置,否則假定workerCount已經(jīng)被調(diào)整以考慮退出址芯。此方法從工作集中移除線程灾茁,如果線程池由于用戶任務(wù)異常而退出,或者運(yùn)行的工作池小于corePoolSize,或者隊(duì)列非空但沒有工作池孝情, 則可能終止線程池或替換工作池捐凭。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If abrupt, then workerCount wasn't adjusted
// true:用戶線程運(yùn)行異常,需要扣減
// false:getTask方法中扣減線程數(shù)量
if (completedAbruptly)
//遞減ctl的workerCount字段。
decrementWorkerCount();
//獲取主鎖逗余,鎖定
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新完成任務(wù)計(jì)數(shù)器
completedTaskCount += w.completedTasks;
//移除worker
workers.remove(w);
} finally {
//解鎖
mainLock.unlock();
}
// 有worker線程移除,可能是最后一個(gè)線程退出需要嘗試終止線程池
tryTerminate();
int c = ctl.get();
// 如果線程為running或shutdown狀態(tài)季惩,即tryTerminate()沒有成功終止線程池录粱,則判斷是否有必要一個(gè)worker
if (runStateLessThan(c, STOP)) {
// 正常退出,計(jì)算min:需要維護(hù)的最小線程數(shù)量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默認(rèn)false:是否需要維持核心線程的數(shù)量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue為空画拾,min = 1
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 如果線程數(shù)量大于最少數(shù)量min,直接返回旗闽,不需要新增線程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一個(gè)沒有firstTask的worker
addWorker(null, false);
}
}
4.4蜜另、任務(wù)提交
提交有兩種:
- Executor#execute(Runnable command)
Executor接口提供的方法举瑰,在將來的某個(gè)時(shí)候執(zhí)行給定的命令.該命令可以在新線程此迅、池化線程或調(diào)用線程中執(zhí)行,具體由Executor的實(shí)現(xiàn)者決定忍些。 - ExecutorService#submit(Callable<T> task)
提交一個(gè)value-returning任務(wù)以執(zhí)行坐昙,并返回一個(gè)表示該任務(wù)未決結(jié)果的Future炸客。Future的get方法將在成功完成任務(wù)后返回任務(wù)的結(jié)果痹仙。
4.5开仰、任務(wù)執(zhí)行
4.5.1众弓、 execute(Runnable command)
任務(wù)執(zhí)行流程圖:
三步處理:
- 如果運(yùn)行的線程小于corePoolSize谓娃,則嘗試用給定的命令作為第一個(gè)任務(wù)啟動(dòng)一個(gè)新線程。對(duì)addWorker的調(diào)用原子性地檢查runState和workerCount,因此可以通過返回false來防止錯(cuò)誤警報(bào)锌订,因?yàn)殄e(cuò)誤警報(bào)會(huì)在不應(yīng)該添加線程的時(shí)候添加線程。
- 如果一個(gè)任務(wù)可以成功排隊(duì)劈猪,那么我們?nèi)匀恍枰俅螜z查是否應(yīng)該添加一個(gè)線程 (因?yàn)樽陨洗螜z查以來已有的線程已經(jīng)死亡)充边,或者池在進(jìn)入這個(gè)方法后關(guān)閉。因此,我們重新檢查狀態(tài)肘习,如果必要的話脖含,如果停止养葵,則回滾隊(duì)列;如果沒有,則啟動(dòng)一個(gè)新線程佃蚜。
- 如果無法對(duì)任務(wù)排隊(duì),則嘗試添加新線程着绊。 如果它失敗了,我們知道pool被關(guān)閉或飽和归露,所以拒絕任務(wù)。
public void execute(Runnable command) {
//任務(wù)為空靶擦,拋出異常
if (command == null)
throw new NullPointerException();
//獲取線程控制字段的值
int c = ctl.get();
//如果當(dāng)前工作線程數(shù)量少于corePoolSize(核心線程數(shù))
if (workerCountOf(c) < corePoolSize) {
//創(chuàng)建新的線程并執(zhí)行任務(wù)腮考,如果成功就返回
if (addWorker(command, true))
return;
//上一步失敗玄捕,重新獲取ctl
c = ctl.get();
}
//如果線城池正在運(yùn)行枚粘,且入隊(duì)成功
if (isRunning(c) && workQueue.offer(command)) {
//重新獲取ctl
int recheck = ctl.get();
//如果線程沒有運(yùn)行且刪除任務(wù)成功
if (!isRunning(recheck) && remove(command))
//拒絕任務(wù)
reject(command);
//如果當(dāng)前的工作線程數(shù)量為0福也,只要還有活動(dòng)的worker線程攀圈,就可以消費(fèi)workerQueue中的任務(wù)
else if (workerCountOf(recheck) == 0)
//第一個(gè)參數(shù)為null赘来,說明只為新建一個(gè)worker線程现喳,沒有指定firstTask
addWorker(null, false);
} else if (!addWorker(command, false))
//如果線程池不是running狀態(tài) 或者 無法入隊(duì)列,嘗試開啟新線程,擴(kuò)容至maxPoolSize犬辰,如果addWork(command, false)失敗了嗦篱,拒絕當(dāng)前command
reject(command);
}
下面詳細(xì)看一下上述代碼中出現(xiàn)的方法:addWorker(Runnable firstTask, boolean core)。
4.5.1.1幌缝、addWorker(Runnable firstTask, boolean core)
檢查是否可以根據(jù)當(dāng)前池狀態(tài)和給定的界限(核心或最大值)添加新worker灸促,如果是這樣,worker計(jì)數(shù)將相應(yīng)地進(jìn)行調(diào)整,如果可能浴栽,將創(chuàng)建并啟動(dòng)一個(gè)新worker荒叼, 并將運(yùn)行firstTask作為其第一個(gè)任務(wù)。 如果池已停止或有資格關(guān)閉吃度,則此方法返回false甩挫。如果線程工廠在被請(qǐng)求時(shí)沒有創(chuàng)建線程,則返回false椿每。如果線程創(chuàng)建失敗伊者,要么是由于線程工廠返回null,要么是由于異常 (通常是Thread.start()中的OutOfMemoryError))间护,我們將回滾亦渗。
private boolean addWorker(Runnable firstTask, boolean core) {
//好久沒見過這種寫法了
retry:
//線程池狀態(tài)與工作線程數(shù)量處理,worker數(shù)量+1
for (; ; ) {
//獲取當(dāng)前線程池狀態(tài)與線程數(shù)
int c = ctl.get();
//獲取當(dāng)前線程池狀態(tài)
int rs = runStateOf(c);
// 僅在必要時(shí)檢查隊(duì)列是否為空。如果池子處于SHUTDOWN汁尺,STOP法精,TIDYING,TERMINATED的時(shí)候 不處理提交的任務(wù),判斷線程池是否可以添加worker線程
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
//線程池處于工作狀態(tài)
for (; ; ) {
//獲取工作線程數(shù)量
int wc = workerCountOf(c);
//如果線程數(shù)量超過最大值或者超過corePoolSize或者超過maximumPoolSize 拒絕執(zhí)行任務(wù)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//試圖增加ctl的workerCount字段
if (compareAndIncrementWorkerCount(c))
//中斷外層循環(huán)
break retry;
// Re-read ctl
c = ctl.get();
//如果當(dāng)前線程池狀態(tài)已經(jīng)改變
if (runStateOf(c) != rs)
//繼續(xù)外層循環(huán)
continue retry;
//否則CAS因workerCount更改而失敗;重試內(nèi)循環(huán)
}
}
//添加到worker線程集合痴突,并啟動(dòng)線程,工作線程狀態(tài)
boolean workerStarted = false;
boolean workerAdded = false;
//繼承AQS并實(shí)現(xiàn)了Runnable接口
Worker w = null;
try {
//將任務(wù)封裝
w = new Worker(firstTask);
//獲取當(dāng)前線程
final Thread t = w.thread;
if (t != null) {
//獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
//全局鎖定
mainLock.lock();
try {
//持鎖時(shí)重新檢查搂蜓。退出ThreadFactory故障,或者在獲取鎖之前關(guān)閉辽装。
int rs = runStateOf(ctl.get());
//如果當(dāng)前線程池關(guān)閉了
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
//測試該線程是否活動(dòng)帮碰。如果線程已經(jīng)啟動(dòng)并且還沒有死,那么它就是活的拾积。
if (t.isAlive())
throw new IllegalThreadStateException();
//入工作線程池
workers.add(w);
int s = workers.size();
//跟蹤最大的池大小
if (s > largestPoolSize)
largestPoolSize = s;
//狀態(tài)
workerAdded = true;
}
} finally {
//釋放鎖
mainLock.unlock();
}
//如果工作線程加入成功殉挽,開始線程的執(zhí)行,并設(shè)置狀態(tài)
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判斷工作線程是否啟動(dòng)成功
if (!workerStarted)
//回滾工作線程創(chuàng)建
addWorkerFailed(w);
}
//返回工作線程狀態(tài)
return workerStarted;
}
再分析回滾工作線程創(chuàng)建邏輯方法:addWorkerFailed(w)拓巧。
回滾工作線程創(chuàng)建,如果存在斯碌,則從worker中移除worker, 遞減ctl的workerCount字段。,重新檢查終止肛度,以防這個(gè)worker的存在導(dǎo)致終止傻唾。
private void addWorkerFailed(Worker w) {
//獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果存在,則從worker中移除worker
if (w != null)
workers.remove(w);
//遞減ctl的workerCount字段承耿。
decrementWorkerCount();
//重新檢查終止
tryTerminate();
} finally {
mainLock.unlock();
}
}
其中的tryTerminate()方法:
如果是SHUTDOWN或者STOP 且池子為空策吠,轉(zhuǎn)為TERMINATED狀態(tài)。如果有條件終止瘩绒,但是workerCount不為零,則中斷空閑worker带族,以確保關(guān)機(jī)信號(hào)傳播锁荔。必須在任何可能使終止成為可能的操作之后調(diào)用此方法--在關(guān)機(jī)期間減少worker數(shù)量或從隊(duì)列中刪除任務(wù)。該方法是非私有的,允許從ScheduledThreadPoolExecutor訪問阳堕。
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
//如果線程池處于運(yùn)行中跋理,或者阻塞隊(duì)列中仍有任務(wù),返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
//還有工作線程
if (workerCountOf(c) != 0) {
//中斷空閑工作線程
interruptIdleWorkers(ONLY_ONE);
return;
}
//獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//設(shè)置ctl狀態(tài)TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//方法在執(zhí)行程序終止時(shí)調(diào)用恬总,默認(rèn)什么都不執(zhí)行
terminated();
} finally {
//完成terminated()方法前普,狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//喚醒所有等待條件的節(jié)點(diǎn)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
//方法在執(zhí)行程序終止時(shí)調(diào)用,默認(rèn)什么都不執(zhí)行
protected void terminated() {}
4.5.1.2、 reject(Runnable command)拒絕策略
為給定的命令調(diào)用被拒絕的執(zhí)行處理程序壹堰。
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}