JAVA線程池原理與源碼分析

image

1、線程池常用接口介紹

1.1沪袭、Executor

public interface Executor {
void execute(Runnable command);
}

執(zhí)行提交的Runnable任務(wù)艇抠。其中的execute方法在將來的某個(gè)時(shí)候執(zhí)行給定的任務(wù),該任務(wù)可以在新線程讯榕、池化線程或調(diào)用線程中執(zhí)行,具體由Executor的實(shí)現(xiàn)者決定骤素。

1.2、ExecutorService

ExecutorService繼承自Executor愚屁,下面挑幾個(gè)方法介紹:

1.2.1济竹、shutdown()
void shutdown();

啟動(dòng)有序關(guān)閉線程池,在此過程中執(zhí)行先前提交的任務(wù)霎槐,但不接受任何新任務(wù)送浊。如果線程池已經(jīng)關(guān)閉,調(diào)用此方法不會(huì)產(chǎn)生額外的效果丘跌。此方法不等待以前提交的任務(wù)完成執(zhí)行,可以使用awaitTermination去實(shí)現(xiàn)袭景。

1.2.2、shutdownNow()
List<Runnable> shutdownNow();

嘗試停止所有正在積極執(zhí)行的任務(wù)闭树, 停止處理等待的任務(wù)耸棒,并返回等待執(zhí)行的任務(wù)列表。 此方法不等待以前提交的任務(wù)完成執(zhí)行,可以使用awaitTermination去實(shí)現(xiàn)报辱。除了盡最大努力停止處理積極執(zhí)行的任務(wù)外与殃,沒有任何保證。例如碍现,典型的實(shí)現(xiàn)是:通過Thread#interrupt取消任務(wù)執(zhí)行幅疼,但是任何未能響應(yīng)中斷的任務(wù)都可能永遠(yuǎn)不會(huì)終止。

1.2.3鸵赫、isShutdown()
boolean isShutdown();

返回線程池關(guān)閉狀態(tài)衣屏。

1.2.4、isTerminated()
boolean isTerminated();

如果關(guān)閉后所有任務(wù)都已完成辩棒,則返回 true狼忱。注意,除非首先調(diào)用了shutdown或shutdownNow一睁,否則isTerminated永遠(yuǎn)不會(huì)返回true钻弄。

1.2.5、awaitTermination(long timeout, TimeUnit unit)
boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

線程阻塞阻塞者吁,直到所有任務(wù)都在shutdown請(qǐng)求之后執(zhí)行完畢窘俺,或者超時(shí)發(fā)生,或者當(dāng)前線程被中斷(以先發(fā)生的情況為準(zhǔn))复凳。

1.2.6瘤泪、submit
 <T> Future<T> submit(Callable<T> task);

提交一個(gè)value-returning任務(wù)以執(zhí)行灶泵,并返回一個(gè)表示該任務(wù)未決結(jié)果的Future。 Future的 get方法將在成功完成任務(wù)后返回任務(wù)的結(jié)果对途。

1.3赦邻、ScheduledExecutorService

安排命令在給定的延遲之后運(yùn)行,或者定期執(zhí)行,繼承自ExecutorService接口由以下四個(gè)方法組成:

//在給定延遲之后啟動(dòng)任務(wù)实檀,返回ScheduledFuture
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個(gè)周期性操作惶洲,該操作在給定的初始延遲之后首次啟動(dòng),然后在給定的周期內(nèi)執(zhí)行;
//如果任務(wù)的任何執(zhí)行遇到異常膳犹,則禁止后續(xù)執(zhí)行恬吕。否則,任務(wù)只會(huì)通過執(zhí)行器的取消或終止而終止须床。
//如果此任務(wù)的任何執(zhí)行時(shí)間超過其周期铐料,則后續(xù)執(zhí)行可能會(huì)延遲開始,但不會(huì)并發(fā)執(zhí)行侨颈。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//創(chuàng)建并執(zhí)行一個(gè)周期性操作余赢,該操作在給定的初始延遲之后首次啟動(dòng),然后在一次執(zhí)行的終止和下一次執(zhí)行的開始之間使用給定的延遲哈垢。
//如果任務(wù)的任何執(zhí)行遇到異常妻柒,則禁止后續(xù)執(zhí)行。否則耘分,任務(wù)只會(huì)通過執(zhí)行器的取消或終止而終止举塔。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

1.4、ThreadFactory

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

按需創(chuàng)建新線程的對(duì)象求泰。

1.5央渣、Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

返回任務(wù)結(jié)果也可能拋出異常。

1.6渴频、Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

Future表示異步計(jì)算的結(jié)果芽丹。方法用于檢查計(jì)算是否完成,等待計(jì)算完成并檢索計(jì)算結(jié)果卜朗。只有當(dāng)計(jì)算完成時(shí)拔第,才可以使用方法get檢索結(jié)果,如果需要场钉,可以阻塞蚊俺,直到準(zhǔn)備好為止。取消由cancel方法執(zhí)行逛万。還提供了其他方法來確定任務(wù)是否正常完成或被取消泳猬。一旦計(jì)算完成,就不能取消計(jì)算。

1.7得封、Delayed

public interface Delayed extends Comparable<Delayed> {
    //在給定的時(shí)間單位中返回與此對(duì)象關(guān)聯(lián)的剩余延遲
    long getDelay(TimeUnit unit);
}

一種混合風(fēng)格的接口埋心,用于標(biāo)記在給定延遲之后應(yīng)該執(zhí)行的對(duì)象。

1.8呛每、ScheduledFuture

public interface ScheduledFuture<V> extends Delayed, Future<V> {}

2踩窖、線程池工作流程

線程池的主要工作流程.jpg

新任務(wù)進(jìn)來時(shí):

  1. 如果當(dāng)前運(yùn)行的線程少于corePoolSize坡氯,則創(chuàng)建新線程(核心線程)來執(zhí)行任務(wù)晨横。
  2. 如果運(yùn)行的線程等于或多于corePoolSize ,則將任務(wù)加入BlockingQueue。
  3. 如果BlockingQueue隊(duì)列已滿箫柳,則創(chuàng)建新的線程(非核心)來處理任務(wù)手形。
  4. 如果核心線程與非核心線程總數(shù)超出maximumPoolSize,任務(wù)將被拒絕悯恍,并調(diào)用RejectedExecutionHandler拒絕策略库糠。

3、ThreadPoolExecutor介紹

構(gòu)造方法:

public ThreadPoolExecutor(
int corePoolSize,int maximumPoolSize,
long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

參數(shù)說明:

  • corePoolSize
    除非設(shè)置了 allowCoreThreadTimeOut涮毫,否則要保留在線程池中的線程數(shù)(即使它們是空閑的)瞬欧。
  • maximumPoolSize
    線程池中允許的最大線程數(shù)。
  • keepAliveTime
    當(dāng)線程數(shù)大于corePoolSize時(shí)罢防,這是多余的空閑線程在終止新任務(wù)之前等待新任務(wù)的最長時(shí)間艘虎。
  • unit
    keepAliveTime參數(shù)的時(shí)間單位。
  • workQueue
    用于在任務(wù)執(zhí)行前保存任務(wù)的隊(duì)列咒吐。這個(gè)隊(duì)列只包含execute方法提交的Runnable任務(wù)野建。
  • threadFactory
    執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠。
  • handler
    由于達(dá)到線程邊界和隊(duì)列容量而阻塞執(zhí)行時(shí)使用的處理程序恬叹。

3.1候生、BlockingQueue

  • SynchronousQueue
    不存儲(chǔ)元素的阻塞隊(duì)列,一個(gè)插入操作绽昼,必須等待移除操作結(jié)束唯鸭,每個(gè)任務(wù)一個(gè)線程。使用的時(shí)候maximumPoolSize一般指定成Integer.MAX_VALUE硅确。
  • LinkedBlockingQueue
    如果當(dāng)前線程數(shù)大于等于核心線程數(shù)目溉,則進(jìn)入隊(duì)列等待。由于這個(gè)隊(duì)列沒有最大值限制疏魏,即所有超過核心線程數(shù)的任務(wù)都將被添加到隊(duì)列中停做。
  • ArrayBlockingQueue
    可以限定隊(duì)列的長度,接收到任務(wù)的時(shí)候大莫,如果沒有達(dá)到corePoolSize的值蛉腌,則新建線程(核心線程)執(zhí)行任務(wù),如果達(dá)到了,則入隊(duì)等候烙丛,如果隊(duì)列已滿舅巷,則新建線程(非核心線程)執(zhí)行任務(wù),又如果總線程數(shù)到了maximumPoolSize河咽,并且隊(duì)列也滿了钠右,則執(zhí)行拒絕策略。
  • DelayQueue
    隊(duì)列內(nèi)元素必須實(shí)現(xiàn)Delayed接口忘蟹,這就意味著你傳進(jìn)去的任務(wù)必須先實(shí)現(xiàn)Delayed接口飒房。這個(gè)隊(duì)列接收到任務(wù)時(shí),首先先入隊(duì)媚值,只有達(dá)到了指定的延時(shí)時(shí)間狠毯,才會(huì)執(zhí)行任務(wù)。
  • priorityBlockingQuene
    具有優(yōu)先級(jí)的無界阻塞隊(duì)列褥芒。

3.2嚼松、RejectedExecutionHandler

有4個(gè)ThreeadPoolExecutor內(nèi)部類。

  • AbortPolicy
    直接拋出異常锰扶,默認(rèn)策略献酗。
  • CallerRunsPolicy
    用調(diào)用者所在的線程來執(zhí)行任務(wù)。
  • DiscardOldestPolicy
    丟棄阻塞隊(duì)列中靠最前的任務(wù)坷牛,并執(zhí)行當(dāng)前任務(wù)罕偎。
    4、DiscardPolicy
    直接丟棄任務(wù)漓帅。

最好自定義飽和策略锨亏,實(shí)現(xiàn)RejectedExecutionHandler接口,如:記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

3.3忙干、線程池大小設(shè)置

  • CPU密集型
    盡量使用較小的線程池器予,減少CUP上下文切換,一般設(shè)置為CPU核心數(shù)+1捐迫。
  • IO密集型
    可以適當(dāng)加大線程池?cái)?shù)量乾翔,IO多,所以在等待IO的時(shí)候施戴,充分利用CPU,一般設(shè)置為CPU核心數(shù)2倍反浓。
    但是對(duì)于一些特別耗時(shí)的IO操作,盲目的用線程池可能也不是很好赞哗,通過異步+單線程輪詢雷则,上層再配合上一個(gè)固定的線程池,效果可能更好肪笋,參考Reactor模型月劈。
  • 混合型
    視具體情況而定度迂。

3.4、任務(wù)提交

  • Callable
    通過submit函數(shù)提交猜揪,返回Future對(duì)象惭墓。
  • Runnable
    通過execute提交,沒有返回結(jié)果而姐。

3.5腊凶、關(guān)閉線程池

  • shutdown()
    僅停止阻塞隊(duì)列中等待的線程,那些正在執(zhí)行的線程就會(huì)讓他們執(zhí)行結(jié)束拴念。
  • shutdownNow()
    不僅會(huì)停止阻塞隊(duì)列中的線程钧萍,而且會(huì)停止正在執(zhí)行的線程。

4丈莺、線程池實(shí)現(xiàn)原理

4.1划煮、 線程池狀態(tài)

線程池的內(nèi)部狀態(tài)由AtomicInteger修飾的ctl表示,其高3位表示線程池的運(yùn)行狀態(tài)缔俄,低29位表示線程池中的線程數(shù)量。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

主池控制狀態(tài)ctl是一個(gè)原子整數(shù)器躏,包含兩個(gè)概念字段:

  • workerCount:指示有效線程數(shù)俐载。
  • runState:指示是否運(yùn)行、關(guān)閉等登失。

為了將這兩個(gè)字段打包成一個(gè)整型遏佣,所以將workerCount限制為(229)-1個(gè)線程,而不是(231)-1個(gè)線程揽浙。

workerCount是工作線程數(shù)量状婶。該值可能與實(shí)際活動(dòng)線程的數(shù)量存在暫時(shí)性差異,例如馅巷,當(dāng)ThreadFactory在被請(qǐng)求時(shí)無法創(chuàng)建線程膛虫,以及退出的線程在終止前仍在執(zhí)行bookkeeping時(shí)。 用戶可見的池大小報(bào)告為工作線程集的當(dāng)前大小钓猬。

runState提供了生命周期稍刀,具有以下值:

  • RUNNING:接受新任務(wù)并處理排隊(duì)的任務(wù)
  • SHUTDOWN:不接受新任務(wù),而是處理隊(duì)列的任務(wù)敞曹。
  • STOP:不接受新任務(wù)账月,不處理隊(duì)列的任務(wù),中斷正在進(jìn)行的任務(wù)澳迫。
  • TIDYING:所有任務(wù)都已終止局齿,workerCount為零,過渡到狀態(tài)TIDYING的線程將運(yùn)行terminated()鉤子方法橄登。
  • TERMINATED:terminated()方法執(zhí)行完畢抓歼。

為了允許有序比較担平,這些值之間的數(shù)值順序很重要。運(yùn)行狀態(tài)會(huì)隨著時(shí)間單調(diào)地增加锭部,但不需要達(dá)到每個(gè)狀態(tài)暂论。轉(zhuǎn)換:


線程池內(nèi)部狀態(tài)轉(zhuǎn)換圖.png
  • RUNNING -> SHUTDOWN
    在調(diào)用shutdown()時(shí),可以隱式地在finalize()中調(diào)用拌禾。
  • (RUNNING or SHUTDOWN) -> STOP
    調(diào)用shutdownNow()取胎。
  • SHUTDOWN -> TIDYING
    當(dāng)隊(duì)列和池都為空時(shí)。
  • STOP -> TIDYING
    當(dāng)池是空的時(shí)候湃窍。
  • TIDYING -> TERMINATED
    當(dāng)terminated()鉤子方法完成時(shí)闻蛀。

當(dāng)狀態(tài)達(dá)到TERMINATED時(shí),在awaitTermination()中等待的線程將返回您市。

下面看以下其他狀態(tài)信息:

//Integer.SIZE為32觉痛,COUNT_BITS為29
private static final int COUNT_BITS = Integer.SIZE - 3;
//2^29-1 最大線程數(shù)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 即高3位為111,該狀態(tài)的線程池會(huì)接收新任務(wù)茵休,并處理阻塞隊(duì)列中的任務(wù)薪棒;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原碼:0000 ... 0001 反碼:1111 ... 1110 補(bǔ)碼:1111 ... 1111
* 左移操作:后面補(bǔ) 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位為000,該狀態(tài)的線程池不會(huì)接收新任務(wù)榕莺,但會(huì)處理阻塞隊(duì)列中的任務(wù)俐芯;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位為001,該狀態(tài)的線程不會(huì)接收新任務(wù)钉鸯,也不會(huì)處理阻塞隊(duì)列中的任務(wù)吧史,而且會(huì)中斷正在* 運(yùn)行的任務(wù);
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位為010唠雕,所有任務(wù)都已終止贸营,workerCount為零,過渡到狀態(tài)TIDYING的線程將運(yùn)行terminated()鉤子方法岩睁;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位為011钞脂,terminated()方法執(zhí)行完畢;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
//根據(jù)ctl計(jì)算runState
private static int runStateOf(int c) {
//2^29   =  001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假設(shè)c為 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最終值:    001 0 0000 0000 0000 0000 0000 0000 0000
    return c & ~CAPACITY;
}
//根據(jù)ctl計(jì)算 workerCount
private static int workerCountOf(int c) {
//2^29-1 =  000 1 1111 1111 1111 1111 1111 1111 1111
//假設(shè)c =   000 0 0000 0000 0000 0000 0000 0000 0001  1個(gè)線程
//最終值:  000 0 0000 0000 0000 0000 0000 0000 0001  1
    return c & CAPACITY;
}
// 根據(jù)runState和workerCount計(jì)算ctl
private static int ctlOf(int rs, int wc) {
//假設(shè) rs: STOP  001 0 0000 0000 0000 0000 0000 0000 0000
//假設(shè) wc:       000 0 0000 0000 0000 0000 0000 0000 0001  1個(gè)線程
//最終值:       001 0 0000 0000 0000 0000 0000 0000 0001
    return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
//RUNNING狀態(tài)為負(fù)數(shù)笙僚,肯定小于SHUTDOWN芳肌,返回線程池是否為運(yùn)行狀態(tài)
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
//試圖增加ctl的workerCount字段值。
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}
//嘗試減少ctl的workerCount字段值肋层。
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
//遞減ctl的workerCount字段亿笤。這只在線程突然終止時(shí)調(diào)用(請(qǐng)參閱processWorkerExit)。在getTask中執(zhí)行其他遞減栋猖。
private void decrementWorkerCount() {
    do {
    } while (!compareAndDecrementWorkerCount(ctl.get()));
}

Doug Lea大神的設(shè)計(jì)啊净薛,感覺計(jì)算機(jī)的基礎(chǔ)真的是數(shù)學(xué)。

4.2蒲拉、 內(nèi)部類Worker

Worker繼承了AbstractQueuedSynchronizer肃拜,并且實(shí)現(xiàn)了Runnable接口痴腌。
維護(hù)了以下三個(gè)變量,其中completedTasks由volatile修飾燃领。

 //線程這個(gè)工作程序正在運(yùn)行士聪。如果工廠失敗,則為空猛蔽。
final Thread thread;
//要運(yùn)行的初始任務(wù)剥悟。可能是null曼库。
Runnable firstTask;
//線程任務(wù)計(jì)數(shù)器
volatile long completedTasks;

構(gòu)造方法:

//使用ThreadFactory中給定的第一個(gè)任務(wù)和線程創(chuàng)建区岗。
Worker(Runnable firstTask) {
    //禁止中斷,直到運(yùn)行工作程序
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

既然實(shí)現(xiàn)了Runnable接口毁枯,必然實(shí)現(xiàn)run方法:

//Delegates main run loop to outer runWorker
public void run() {
    //核心
    runWorker(this);
}

4.3慈缔、runWorker(Worker w)執(zhí)行任務(wù)

先看一眼執(zhí)行流程圖,再看源碼种玛,會(huì)更清晰一點(diǎn):


runWorker.png

首先來看runWorker(Worker w)源碼:

final void runWorker(Worker w) {
    //獲取當(dāng)前線程
    Thread wt = Thread.currentThread();
    //獲取第一個(gè)任務(wù)
    Runnable task = w.firstTask;
    //第一個(gè)任務(wù)位置置空
    w.firstTask = null;
    //因?yàn)閃orker實(shí)現(xiàn)了AQS藐鹤,此處是釋放鎖,new Worker()是state==-1蒂誉,此處是調(diào)用Worker類的 release(1)方法教藻,將state置為0。Worker中interruptIfStarted()中只有state>=0才允許調(diào)用中斷
    w.unlock();
    //是否突然完成右锨,如果是由于異常導(dǎo)致的進(jìn)入finally,那么completedAbruptly==true就是突然完成的
    boolean completedAbruptly = true;
    try {
        //先處理firstTask碌秸,之后依次處理其他任務(wù)
        while (task != null || (task = getTask()) != null) {
            //獲取鎖
            w.lock();
            //如果池停止绍移,確保線程被中斷;如果沒有,請(qǐng)確保線程沒有中斷讥电。這需要在第二種情況下重新檢查蹂窖,以處理清除中斷時(shí)的shutdownNow競爭
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
                //自定義實(shí)現(xiàn)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    //自定義實(shí)現(xiàn)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //任務(wù)完成數(shù)+1
                w.completedTasks++;
                //釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //Worker的結(jié)束后的處理工作
        processWorkerExit(w, completedAbruptly);
    }
}

下面再來看上述源碼中的getTask()與processWorkerExit(w, completedAbruptly)方法:

4.3.1、getTask()

根據(jù)當(dāng)前配置設(shè)置執(zhí)行阻塞或定時(shí)等待任務(wù)恩敌,或者如果該worker因?yàn)槿魏卧虮仨毻顺鏊膊猓瑒t返回null,在這種情況下workerCount將遞減。

返回空的情況:

  1. 大于 maximumPoolSize 個(gè) workers(由于調(diào)用setMaximumPoolSize)
  2. 線程池關(guān)閉
  3. 線程池關(guān)閉了并且隊(duì)列為空
  4. 這個(gè)worker超時(shí)等待任務(wù)纠炮,超時(shí)的worker在超時(shí)等待之前和之后都可能終止(即allowCoreThreadTimeOut || workerCount > corePoolSize)月趟,如果隊(duì)列不是空的,那么這個(gè)worker不是池中的最后一個(gè)線程恢口。
private Runnable getTask() {
    // Did the last poll() time out?
    boolean timedOut = false;
    for (; ; ) {
        //獲取線程池狀態(tài)
        int c = ctl.get();
        int rs = runStateOf(c);
        //僅在必要時(shí)檢查隊(duì)列是否為空孝宗。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //遞減ctl的workerCount字段
            decrementWorkerCount();
            return null;
        }
        //獲取workerCount數(shù)量
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //線程超時(shí)控制
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            //嘗試減少ctl的workerCount字段
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //如果有超時(shí)控制,則使用帶超時(shí)時(shí)間的poll耕肩,否則使用take,沒有任務(wù)的時(shí)候一直阻塞因妇,這兩個(gè)方法都會(huì)拋出InterruptedException
            Runnable r = timed ?workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS) :workQueue.take();
            //有任務(wù)就返回
            if (r != null)
                return r;
            //獲取任務(wù)超時(shí)问潭,肯定是走了poll邏輯
            timedOut = true;
        } catch (InterruptedException retry) {
            //被中斷
            timedOut = false;
        }
    }
}
4.3.1、processWorkerExit(Worker w, boolean completedAbruptly)

為垂死的worker進(jìn)行清理和bookkeeping婚被。僅從工作線程調(diào)用狡忙。除非completedAbruptly被設(shè)置,否則假定workerCount已經(jīng)被調(diào)整以考慮退出址芯。此方法從工作集中移除線程灾茁,如果線程池由于用戶任務(wù)異常而退出,或者運(yùn)行的工作池小于corePoolSize,或者隊(duì)列非空但沒有工作池孝情, 則可能終止線程池或替換工作池捐凭。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // If abrupt, then workerCount wasn't adjusted
    // true:用戶線程運(yùn)行異常,需要扣減
    // false:getTask方法中扣減線程數(shù)量
    if (completedAbruptly)
        //遞減ctl的workerCount字段。
        decrementWorkerCount();
    //獲取主鎖逗余,鎖定
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //更新完成任務(wù)計(jì)數(shù)器
        completedTaskCount += w.completedTasks;
        //移除worker
        workers.remove(w);
    } finally {
        //解鎖
        mainLock.unlock();
    }
    // 有worker線程移除,可能是最后一個(gè)線程退出需要嘗試終止線程池
    tryTerminate();
    int c = ctl.get();
    // 如果線程為running或shutdown狀態(tài)季惩,即tryTerminate()沒有成功終止線程池录粱,則判斷是否有必要一個(gè)worker
    if (runStateLessThan(c, STOP)) {
        // 正常退出,計(jì)算min:需要維護(hù)的最小線程數(shù)量
        if (!completedAbruptly) {
            // allowCoreThreadTimeOut 默認(rèn)false:是否需要維持核心線程的數(shù)量
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果min ==0 或者workerQueue為空画拾,min = 1
            if (min == 0 && !workQueue.isEmpty())
                min = 1;
            // 如果線程數(shù)量大于最少數(shù)量min,直接返回旗闽,不需要新增線程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 添加一個(gè)沒有firstTask的worker
        addWorker(null, false);
    }
}

4.4蜜另、任務(wù)提交

提交有兩種:

  • Executor#execute(Runnable command)
    Executor接口提供的方法举瑰,在將來的某個(gè)時(shí)候執(zhí)行給定的命令.該命令可以在新線程此迅、池化線程或調(diào)用線程中執(zhí)行,具體由Executor的實(shí)現(xiàn)者決定忍些。
  • ExecutorService#submit(Callable<T> task)
    提交一個(gè)value-returning任務(wù)以執(zhí)行坐昙,并返回一個(gè)表示該任務(wù)未決結(jié)果的Future炸客。Future的get方法將在成功完成任務(wù)后返回任務(wù)的結(jié)果痹仙。

4.5开仰、任務(wù)執(zhí)行

4.5.1众弓、 execute(Runnable command)

任務(wù)執(zhí)行流程圖:

execute.png

三步處理:

  1. 如果運(yùn)行的線程小于corePoolSize谓娃,則嘗試用給定的命令作為第一個(gè)任務(wù)啟動(dòng)一個(gè)新線程。對(duì)addWorker的調(diào)用原子性地檢查runState和workerCount,因此可以通過返回false來防止錯(cuò)誤警報(bào)锌订,因?yàn)殄e(cuò)誤警報(bào)會(huì)在不應(yīng)該添加線程的時(shí)候添加線程。
  2. 如果一個(gè)任務(wù)可以成功排隊(duì)劈猪,那么我們?nèi)匀恍枰俅螜z查是否應(yīng)該添加一個(gè)線程 (因?yàn)樽陨洗螜z查以來已有的線程已經(jīng)死亡)充边,或者池在進(jìn)入這個(gè)方法后關(guān)閉。因此,我們重新檢查狀態(tài)肘习,如果必要的話脖含,如果停止养葵,則回滾隊(duì)列;如果沒有,則啟動(dòng)一個(gè)新線程佃蚜。
  3. 如果無法對(duì)任務(wù)排隊(duì),則嘗試添加新線程着绊。 如果它失敗了,我們知道pool被關(guān)閉或飽和归露,所以拒絕任務(wù)。
public void execute(Runnable command) {
    //任務(wù)為空靶擦,拋出異常
    if (command == null)
        throw new NullPointerException();
   //獲取線程控制字段的值
   int c = ctl.get();
   //如果當(dāng)前工作線程數(shù)量少于corePoolSize(核心線程數(shù))
   if (workerCountOf(c) < corePoolSize) {
       //創(chuàng)建新的線程并執(zhí)行任務(wù)腮考,如果成功就返回
       if (addWorker(command, true))
            return;
       //上一步失敗玄捕,重新獲取ctl
       c = ctl.get();
    }
    //如果線城池正在運(yùn)行枚粘,且入隊(duì)成功
    if (isRunning(c) && workQueue.offer(command)) {
        //重新獲取ctl
        int recheck = ctl.get();
        //如果線程沒有運(yùn)行且刪除任務(wù)成功
        if (!isRunning(recheck) && remove(command))
            //拒絕任務(wù)
            reject(command);
        //如果當(dāng)前的工作線程數(shù)量為0福也,只要還有活動(dòng)的worker線程攀圈,就可以消費(fèi)workerQueue中的任務(wù)
        else if (workerCountOf(recheck) == 0)
            //第一個(gè)參數(shù)為null赘来,說明只為新建一個(gè)worker線程现喳,沒有指定firstTask
            addWorker(null, false);
    } else if (!addWorker(command, false))
    //如果線程池不是running狀態(tài) 或者 無法入隊(duì)列,嘗試開啟新線程,擴(kuò)容至maxPoolSize犬辰,如果addWork(command, false)失敗了嗦篱,拒絕當(dāng)前command
        reject(command);
}

下面詳細(xì)看一下上述代碼中出現(xiàn)的方法:addWorker(Runnable firstTask, boolean core)。

4.5.1.1幌缝、addWorker(Runnable firstTask, boolean core)
addWorker.jpg

檢查是否可以根據(jù)當(dāng)前池狀態(tài)和給定的界限(核心或最大值)添加新worker灸促,如果是這樣,worker計(jì)數(shù)將相應(yīng)地進(jìn)行調(diào)整,如果可能浴栽,將創(chuàng)建并啟動(dòng)一個(gè)新worker荒叼, 并將運(yùn)行firstTask作為其第一個(gè)任務(wù)。 如果池已停止或有資格關(guān)閉吃度,則此方法返回false甩挫。如果線程工廠在被請(qǐng)求時(shí)沒有創(chuàng)建線程,則返回false椿每。如果線程創(chuàng)建失敗伊者,要么是由于線程工廠返回null,要么是由于異常 (通常是Thread.start()中的OutOfMemoryError))间护,我們將回滾亦渗。

private boolean addWorker(Runnable firstTask, boolean core) {
    //好久沒見過這種寫法了
    retry:
    //線程池狀態(tài)與工作線程數(shù)量處理,worker數(shù)量+1
    for (; ; ) {
        //獲取當(dāng)前線程池狀態(tài)與線程數(shù)
        int c = ctl.get();
        //獲取當(dāng)前線程池狀態(tài)
        int rs = runStateOf(c);
        // 僅在必要時(shí)檢查隊(duì)列是否為空。如果池子處于SHUTDOWN汁尺,STOP法精,TIDYING,TERMINATED的時(shí)候 不處理提交的任務(wù),判斷線程池是否可以添加worker線程
        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;
        //線程池處于工作狀態(tài)
        for (; ; ) {
            //獲取工作線程數(shù)量
            int wc = workerCountOf(c);
            //如果線程數(shù)量超過最大值或者超過corePoolSize或者超過maximumPoolSize 拒絕執(zhí)行任務(wù)
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //試圖增加ctl的workerCount字段
            if (compareAndIncrementWorkerCount(c))
                //中斷外層循環(huán)
                break retry;
            // Re-read ctl
            c = ctl.get();
            //如果當(dāng)前線程池狀態(tài)已經(jīng)改變
            if (runStateOf(c) != rs)
                //繼續(xù)外層循環(huán)
                continue retry;
            //否則CAS因workerCount更改而失敗;重試內(nèi)循環(huán)
        }
    }
    //添加到worker線程集合痴突,并啟動(dòng)線程,工作線程狀態(tài)
    boolean workerStarted = false;
    boolean workerAdded = false;
    //繼承AQS并實(shí)現(xiàn)了Runnable接口
    Worker w = null;
    try {
        //將任務(wù)封裝
        w = new Worker(firstTask);
        //獲取當(dāng)前線程
        final Thread t = w.thread;
        if (t != null) {
            //獲取全局鎖
            final ReentrantLock mainLock = this.mainLock;
            //全局鎖定
            mainLock.lock();
            try {
                //持鎖時(shí)重新檢查搂蜓。退出ThreadFactory故障,或者在獲取鎖之前關(guān)閉辽装。
                int rs = runStateOf(ctl.get());
                //如果當(dāng)前線程池關(guān)閉了
                if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                   //測試該線程是否活動(dòng)帮碰。如果線程已經(jīng)啟動(dòng)并且還沒有死,那么它就是活的拾积。                                       
                   if (t.isAlive())
                        throw new IllegalThreadStateException();
                   //入工作線程池
                   workers.add(w);
                   int s = workers.size();
                   //跟蹤最大的池大小
                   if (s > largestPoolSize)
                        largestPoolSize = s;
                   //狀態(tài)
                   workerAdded = true;
                }
            } finally {
                //釋放鎖
                mainLock.unlock();
            }
            //如果工作線程加入成功殉挽,開始線程的執(zhí)行,并設(shè)置狀態(tài)
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //判斷工作線程是否啟動(dòng)成功
        if (!workerStarted)
            //回滾工作線程創(chuàng)建
            addWorkerFailed(w);
    }
    //返回工作線程狀態(tài)
    return workerStarted;
}

再分析回滾工作線程創(chuàng)建邏輯方法:addWorkerFailed(w)拓巧。
回滾工作線程創(chuàng)建,如果存在斯碌,則從worker中移除worker, 遞減ctl的workerCount字段。,重新檢查終止肛度,以防這個(gè)worker的存在導(dǎo)致終止傻唾。

private void addWorkerFailed(Worker w) {
    //獲取全局鎖
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //如果存在,則從worker中移除worker
        if (w != null)
            workers.remove(w);
        //遞減ctl的workerCount字段承耿。
        decrementWorkerCount();
        //重新檢查終止
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

其中的tryTerminate()方法:
如果是SHUTDOWN或者STOP 且池子為空策吠,轉(zhuǎn)為TERMINATED狀態(tài)。如果有條件終止瘩绒,但是workerCount不為零,則中斷空閑worker带族,以確保關(guān)機(jī)信號(hào)傳播锁荔。必須在任何可能使終止成為可能的操作之后調(diào)用此方法--在關(guān)機(jī)期間減少worker數(shù)量或從隊(duì)列中刪除任務(wù)。該方法是非私有的,允許從ScheduledThreadPoolExecutor訪問阳堕。

final void tryTerminate() {
    for (; ; ) {
        int c = ctl.get();
        //如果線程池處于運(yùn)行中跋理,或者阻塞隊(duì)列中仍有任務(wù),返回
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
           return;
        //還有工作線程
        if (workerCountOf(c) != 0) {
            //中斷空閑工作線程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //獲取全局鎖
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //設(shè)置ctl狀態(tài)TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //方法在執(zhí)行程序終止時(shí)調(diào)用恬总,默認(rèn)什么都不執(zhí)行
                    terminated();
                } finally {
                    //完成terminated()方法前普,狀態(tài)為TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    //喚醒所有等待條件的節(jié)點(diǎn)
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

//方法在執(zhí)行程序終止時(shí)調(diào)用,默認(rèn)什么都不執(zhí)行
protected void terminated() {}

4.5.1.2、 reject(Runnable command)拒絕策略

為給定的命令調(diào)用被拒絕的執(zhí)行處理程序壹堰。

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
tencent.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末拭卿,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子贱纠,更是在濱河造成了極大的恐慌峻厚,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,386評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谆焊,死亡現(xiàn)場離奇詭異惠桃,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)辖试,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,142評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門辜王,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人罐孝,你說我怎么就攤上這事呐馆。” “怎么了肾档?”我有些...
    開封第一講書人閱讀 164,704評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵摹恰,是天一觀的道長。 經(jīng)常有香客問我怒见,道長俗慈,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,702評(píng)論 1 294
  • 正文 為了忘掉前任遣耍,我火速辦了婚禮闺阱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舵变。我一直安慰自己酣溃,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,716評(píng)論 6 392
  • 文/花漫 我一把揭開白布纪隙。 她就那樣靜靜地躺著赊豌,像睡著了一般。 火紅的嫁衣襯著肌膚如雪绵咱。 梳的紋絲不亂的頭發(fā)上碘饼,一...
    開封第一講書人閱讀 51,573評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼艾恼。 笑死住涉,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的钠绍。 我是一名探鬼主播舆声,決...
    沈念sama閱讀 40,314評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼柳爽!你這毒婦竟也來了媳握?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,230評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤泻拦,失蹤者是張志新(化名)和其女友劉穎毙芜,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體争拐,經(jīng)...
    沈念sama閱讀 45,680評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡腋粥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,873評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了架曹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片隘冲。...
    茶點(diǎn)故事閱讀 39,991評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖绑雄,靈堂內(nèi)的尸體忽然破棺而出展辞,到底是詐尸還是另有隱情,我是刑警寧澤万牺,帶...
    沈念sama閱讀 35,706評(píng)論 5 346
  • 正文 年R本政府宣布罗珍,位于F島的核電站,受9級(jí)特大地震影響脚粟,放射性物質(zhì)發(fā)生泄漏覆旱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,329評(píng)論 3 330
  • 文/蒙蒙 一核无、第九天 我趴在偏房一處隱蔽的房頂上張望扣唱。 院中可真熱鬧,春花似錦团南、人聲如沸噪沙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,910評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽正歼。三九已至,卻和暖如春拷橘,著一層夾襖步出監(jiān)牢的瞬間朋腋,已是汗流浹背齐疙。 一陣腳步聲響...
    開封第一講書人閱讀 33,038評(píng)論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留旭咽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,158評(píng)論 3 370
  • 正文 我出身青樓赌厅,卻偏偏與公主長得像穷绵,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子特愿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,941評(píng)論 2 355