轉(zhuǎn)載自:https://blog.csdn.net/qq_34436819/article/details/103137347?fps=1&locationNum=2
前言
無論是在工作中,還是在書本中祥诽,我們都可以聽到或者看到關于線程在使用時的一些建議:不要在代碼中自己直接創(chuàng)建線程僧诚,而是通過線程池的方式來使用線程。使用線程池的理由大致可以總結(jié)為以下幾點:
- 降低資源消耗勋桶。線程是操作系統(tǒng)十分寶貴的資源基显,當多個人同時開發(fā)一個項目時蘸吓,在互不知情的情況下,都自己在代碼中創(chuàng)建了線程撩幽,這樣就會導致線程數(shù)過多库继,而且線程的創(chuàng)建和銷毀,在操作系統(tǒng)層面窜醉,需要由用戶態(tài)切換到內(nèi)核態(tài)宪萄,這是一個費時費力的過程。而使用線程池可以避免頻繁的創(chuàng)建線程和銷毀線程榨惰,線程池中線程可以重復使用拜英。
- 提高響應速度。當請求到達時琅催,由于線程池中的線程已經(jīng)創(chuàng)建好了居凶,使用線程池虫给,可以省去線程創(chuàng)建的這段時間。
- 提高線程的可管理性侠碧。線程是稀缺資源抹估,當創(chuàng)建過多的線程時,會造成系統(tǒng)性能的下降弄兜,而使用線程池药蜻,可以對線程進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控替饿。
線程池的使用十分簡單语泽,但是會用不代表用得好。在面試中视卢,基本不會問線程池應該怎么用踱卵,而是問線程池在使用不當時會造成哪些問題,實際上就是考察線程池的實現(xiàn)原理腾夯。因此搞明白線程池的實現(xiàn)原理是很有必要的一件事颊埃,不僅僅對面試會有幫助,也會讓我們在平時工作中避過好多坑蝶俱。
實現(xiàn)原理
在線程池中存在幾個概念:核心線程數(shù)班利、最大線程數(shù)、任務隊列榨呆。核心線程數(shù)指的是線程池的基本大新薇辍;最大線程數(shù)指的是积蜻,同一時刻線程池中線程的數(shù)量最大不能超過該值闯割;任務隊列是當任務較多時,線程池中線程的數(shù)量已經(jīng)達到了核心線程數(shù)竿拆,這時候就是用任務隊列來存儲我們提交的任務宙拉。
與其他池化技術不同的是,線程池是基于生產(chǎn)者-消費者模式來實現(xiàn)的丙笋,任務的提交方是生產(chǎn)者谢澈,線程池是消費者。當我們需要執(zhí)行某個任務時御板,只需要把任務扔到線程池中即可锥忿。線程池中執(zhí)行任務的流程如下圖如下:
- 先判斷線程池中線程的數(shù)量是否超過核心線程數(shù),如果沒有超過核心線程數(shù)怠肋,就創(chuàng)建新的線程去執(zhí)行任務敬鬓;如果超過了核心線程數(shù),就進入到下面流程。
- 判斷任務隊列是否已經(jīng)滿了钉答,如果沒有滿础芍,就將任務添加到任務隊列中;如果已經(jīng)滿了希痴,就進入到下面的流程者甲。
- 再判斷如果創(chuàng)建一個線程后春感,線程數(shù)是否會超過最大線程數(shù)砌创,如果不會超過最大線程數(shù),就創(chuàng)建一個新的線程來執(zhí)行任務鲫懒;如果會嫩实,則進入到下面的流程。
- 執(zhí)行拒絕策略窥岩。
在沒看線程池的具體實現(xiàn)之前甲献,我一直存在這樣的疑惑:為什么是先判斷任務隊列有沒有滿,再判斷有沒有超過最大線程數(shù)颂翼?正常邏輯不是應該先盡可能的創(chuàng)建線程晃洒,讓線程去處理任務嗎?當任務實在是太多了朦乏,線程處理不過來了球及,再將任務添加到任務隊列嗎?知道我看了線程池的具體代碼實現(xiàn)后呻疹,我才知道答案吃引。問題答案在文末。
ThreadPoolExecutor
在JUC包下刽锤,已經(jīng)提供了線程池的具體的實現(xiàn):ThreadPoolExecutor镊尺。ThreadPoolExecutor提供了很多的構造方法,其中最復雜的構造方法有7個參數(shù)并思。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
- corePoolSize:該參數(shù)表示的是線程池的核心線程數(shù)庐氮。當任務提交到線程池時,如果線程池的線程數(shù)量還沒有達到corePoolSize宋彼,那么就會新創(chuàng)建的一個線程來執(zhí)行任務弄砍,如果達到了,就將任務添加到任務隊列中宙暇。
- maximumPoolSize:該參數(shù)表示的是線程池中允許存在的最大線程數(shù)量输枯。當任務隊列滿了以后,再有新的任務進入到線程池時占贫,會判斷再新建一個線程是否會超過maximumPoolSize桃熄,如果會超過,則不創(chuàng)建線程,而是執(zhí)行拒絕策略瞳收。如果不會超過maximumPoolSize碉京,則會創(chuàng)建新的線程來執(zhí)行任務。
- keepAliveTime:當線程池中的線程數(shù)量大于corePoolSize時螟深,那么大于corePoolSize這部分的線程谐宙,如果沒有任務去處理,那么就表示它們是空閑的界弧,這個時候是不允許它們一直存在的凡蜻,而是允許它們最多空閑一段時間,這段時間就是keepAliveTime垢箕,時間的單位就是TimeUnit划栓。
- unit:空閑線程允許存活時間的單位,TimeUnit是一個枚舉值条获,它可以是納秒忠荞、微妙、毫秒帅掘、秒委煤、分、小時修档、天碧绞。
- workQueue:任務隊列,用來存放任務萍悴。該隊列的類型是阻塞隊列头遭,常用的阻塞隊列有ArrayBlockingQueue、LinkedBlockingQueue癣诱、SynchronousQueue计维、PriorityBlockingQueue等。
- ArrayBlockingQueue是一個基于數(shù)組實現(xiàn)的阻塞隊列撕予,元素按照先進先出(FIFO)的順序入隊鲫惶、出隊。因為底層實現(xiàn)是數(shù)組实抡,數(shù)組在初始化時必須指定大小欠母,因此ArrayBlockingQueue是有界隊列。
- LinkedBlockingQueue是一個基于鏈表實現(xiàn)的阻塞隊列吆寨,元素按照先進先出(FIFO)的順序入隊赏淌、出隊。因為頂層是鏈表啄清,鏈表是基于節(jié)點之間的指針指向來維持前后關系的六水,如果不指鏈表的大小,它默認的大小是Integer.MAX_VALUE,即2的32次方?1掷贾,這個數(shù)值太大了睛榄,因此通常稱LinkedBlockingQueue是一個無界隊列。當然如果在初始化的時候想帅,就指定鏈表大小场靴,那么它就是有界隊列了。
- SynchronousQueue是一個不存儲元素的阻塞隊列港准。每個插入操作必須得等到另一個線程調(diào)用了移除操作后旨剥,該線程才會返回,否則將一直阻塞叉趣。吞吐量通常要高于LinkedBlockingQueue泞边。
- PriorityBlockingQueue是一個將元素按照優(yōu)先級排序的阻塞的阻塞隊列该押,元素的優(yōu)先級越高疗杉,將會越先出隊列。這是一個無界隊列蚕礼。
- threadFactory:線程池工廠烟具,用來創(chuàng)建線程。通常在實際項目中奠蹬,為了便于后期排查問題朝聋,在創(chuàng)建線程時需要為線程賦予一定的名稱,通過線程池工廠囤躁,可以方便的為每一個創(chuàng)建的線程設置具有業(yè)務含義的名稱冀痕。
- handler:拒絕策略。當任務隊列已滿狸演,線程數(shù)量達到maximumPoolSize后言蛇,線程池就不會再接收新的任務了,這個時候就需要使用拒絕策略來決定最終是怎么處理這個任務宵距。默認情況下使用AbortPolicy腊尚,表示無法處理新任務,直接拋出異常满哪。在ThreadPoolExecutor類中定義了四個內(nèi)部類婿斥,分別表示四種拒絕策略。我們也可以通過實現(xiàn)RejectExecutionHandler接口來實現(xiàn)自定義的拒絕策略哨鸭。
AbortPocily:不再接收新任務民宿,直接拋出異常。
CallerRunsPolicy:提交任務的線程自己處理像鸡。
DiscardPolicy:不處理活鹰,直接丟棄。
DiscardOldestPolicy:丟棄任務隊列中排在最前面的任務,并執(zhí)行當前任務华望。(排在隊列最前面的任務并不一定是在隊列中待的時間最長的任務蕊蝗,因為有可能是按照優(yōu)先級排序的隊列)
在使用ThreadPoolExecutor時,可以使用execute(Runnable task)方法向線程池中提交一個任務赖舟,沒有返回值蓬戚。也可以使用submit()方法向線程池中添加任務,有返回值宾抓,返回值對象是Future子漩。submit()方法有三個重載的方法。
- submit(Runnable task)石洗,該方法雖然返回值對象是Future幢泼,但是使用Future.get()獲取結(jié)果是null。
- submit(Runnable task,T result)讲衫,方法的返回值對象是Future缕棵,通過Future.get()獲取具體的返回值時,結(jié)果與方法的第二個參數(shù)result相等涉兽。
- submit(Callable task)招驴,該方法的參數(shù)是一個Callable類型的對象,方法有返回值枷畏。
關于Future的原理有興趣的朋友可以自己先了解下别厘,后面一篇文章會專門介紹。
源碼分析
了解了ThreadPoolExecutor的基本用法拥诡,下面將結(jié)合源碼來分析下線程池的代碼實現(xiàn)触趴。從上面的線程池的原理中,我們可以發(fā)現(xiàn)渴肉,線程池的原理相對比較簡單冗懦,代碼實現(xiàn)起來應該不難,看源碼主要是為了學習他人寫的優(yōu)秀代碼宾娜,尤其是編程大師Doug Lea寫的代碼批狐。
對于一個線程池,除了上面介紹的幾個重要屬性以外前塔,我們還需要一個變量來表示線程池狀態(tài)嚣艇,線程池也需要有運行中、關閉中华弓、已關閉等狀態(tài)食零。如果要我們?nèi)崿F(xiàn)一個線程池,可能第一反應就是用一個單獨的變量來表示線程池的狀態(tài)寂屏,再用另一個變量來表示線程池中線程的數(shù)量贰谣。這樣的確可以娜搂,不過Doug Lea并不是這樣實現(xiàn)的,他將兩者用一個變量來表示(不得不感嘆下吱抚,大佬就是大佬百宇,想法果然和他人不一樣)。那么問題來了秘豹,如何用一個變量來表示兩個值携御?閱讀過讀寫鎖源碼的朋友可能就能立想到另外這一中方案(關于讀寫鎖的介紹可以閱讀這一篇文章: 讀寫鎖ReadWriteLock的實現(xiàn)原理)。在讀寫鎖的實現(xiàn)中將一個int型的數(shù)值既绕,按照高低位來拆分啄刹,高位表示一個數(shù),低位再表示另一個數(shù)凄贩,在線程池的實現(xiàn)中誓军,Doug Lea再次使用了按位拆分的技巧。
在線程池中疲扎,使用了一個原子類AtomicInteger的變量來表示線程池狀態(tài)和線程數(shù)量昵时,該變量在內(nèi)存中會占用4個字節(jié),也就是32bit评肆,其中高3位用來表示線程池的狀態(tài)债查,低29位用來表示線程的數(shù)量。線程池的狀態(tài)一共有5中狀態(tài)瓜挽,用3bit最多可以表示8種狀態(tài),因此采用高3位來表示線程池的狀態(tài)完全能滿足需求征绸。示意圖如下:
在看線程池的核心實現(xiàn)邏輯之前久橙,先簡單看下ThreadPoolExecutor中關于各種變量和方法的定義。因為在核心邏輯中管怠,經(jīng)常會用到它們淆衷,而且這些方法和變量大量用到了位運算,看起來不是特別直觀渤弛,所以提前熟悉它們的功能對于看核心邏輯有很大的幫助祝拯。相關源碼和注釋如下。
// 高2位表示線程池的狀態(tài)她肯,其他29位表示線程的數(shù)量佳头,即worker的數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 計數(shù)的位數(shù),29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的最大大小晴氨,2^29 -1康嘉,
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 線程池的狀態(tài),高三位表示線程的運行狀態(tài)籽前,
// RUNNING: 111
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN: 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP: 001
private static final int STOP = 1 << COUNT_BITS;
// TIFYING(整理): 010
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED: 011
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 計算線程池的狀態(tài)亭珍,計算結(jié)果的低29位全為0敷钾,因此最終結(jié)果就是線程池的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 工作線程的數(shù)量,計算結(jié)果的高3位全是0肄梨,因此最終結(jié)果就是工作線程的數(shù)量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據(jù)線程池的狀態(tài)和工作線程的數(shù)量阻荒,計算ctl,實際上就是將兩者合并成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
// 線程池的狀態(tài)是否小于s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 線程池的狀態(tài)大于等于s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判斷線程池是否處于運行狀態(tài)
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
有了前面的基礎众羡,接下來分析下核心實現(xiàn)财松。再使用線程池時我們可以通過execute(Runnable task)來提交一個任務到線程池,因此我們從核心入口execute(Runnable task)方法開始分析纱控。execute()方法的源碼如下辆毡。在源碼中我添加了部分注釋,以供參考:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// workerCountOf(c)方法時計算工作線程的數(shù)量甜害,
// 1. 工作線程數(shù)是否小于核心線程數(shù)舶掖,如果小于核心線程數(shù),就創(chuàng)建新的線程去執(zhí)行任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 任務執(zhí)行失敗后尔店,重新獲取ctl的值眨攘,供下面的邏輯計算
c = ctl.get();
}
// 2. 當工作線程數(shù)大于等于核心線程數(shù)時,線程池是運行狀態(tài)嚣州,且任務添加到隊列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池狀態(tài)不是RUNNING狀態(tài)鲫售,就執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池中線程的數(shù)量為0,就調(diào)用addWorker()方法創(chuàng)建新的worker線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 線程池非運行狀態(tài)该肴,或者任務入隊失敗情竹,就嘗試創(chuàng)建新的worker線程
else if (!addWorker(command, false))
// 4. 如果創(chuàng)建新的worker線程失敗,就執(zhí)行拒絕策略匀哄。
reject(command);
}
execute()方法的邏輯大致分為4部分秦效,分別對應線程池原理部分所提到的4個步驟。
- 先通過workerCountOf(c)方法計算當前線程池中線程的數(shù)量涎嚼,然后與初始化線程池時指定的核心線程數(shù)相比較阱州,如果小于核心線程數(shù)鲁森,就調(diào)用addWorker()方法奏赘,實際上就是創(chuàng)建新的線程來執(zhí)行任務衣厘。addWorker()方法的源碼后面分析首尼。
- 如果當前線程數(shù)大于等于核心線程數(shù)漫谷,那么就通過workQueue.off(command)方法煮仇,將任務添加到任務隊列中岖食。如果此時任務隊列還沒有滿醋安,那么就會添加成功刁憋,workQueue.off(command)就會返回true滥嘴,那么就會進入到if邏輯塊中,進行一些其他的判斷至耻。如果此時任務隊列已經(jīng)滿了若皱,workQueue.off(command)方法就會返回false镊叁,那么就會執(zhí)行后面的3和4。
- 當任務隊列滿了以后走触,就會再次調(diào)用addWorker()方法晦譬,在addworker()方法中會在創(chuàng)建新的線程之前,判斷線程數(shù)會不會超過最大線程數(shù)互广,如果會敛腌,addworker()就會返回false。如果不會惫皱,就會創(chuàng)建新的線程去執(zhí)行任務像樊。
- 如果addWorker()返回true,表示不能再創(chuàng)建新的線程了旅敷,那么此時就會執(zhí)行到4生棍,即調(diào)用reject()方法,執(zhí)行拒絕策略媳谁。reject()方法的邏輯比較簡單涂滴,就是調(diào)用了我們指定的handler的rejectedExecution()方法。其源碼如下:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
從上面的分析中晴音,可以看出柔纵,在好幾處邏輯中均調(diào)用了addWorker()方法,這說明該方法十分重要锤躁。事實也是如此搁料,該方法不僅重要,代碼實現(xiàn)還十分復雜进苍。該方法需要兩個參數(shù)加缘,第一個參數(shù)就是我們傳入的Runnable任務,第二參數(shù)是一個boolean值觉啊,傳入true表示的是當前線程池中的線程數(shù)還沒有達到核心線程數(shù),傳false表示當前線程數(shù)已經(jīng)大于等于核心線程數(shù)了沈贝。addWorker()方法的源碼很長杠人,這里我將其分為兩個部分,下面先看前面一部分的源碼宋下。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1. 當線程池的狀態(tài)大于SHUTDOWN時嗡善,返回false。因為線程池處于關閉狀態(tài)了学歧,就不能再接受任務了
// 2. 當線程池的狀態(tài)等于SHUTDOWN時罩引,firstTask不為空或者任務隊列為空,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 1. 線程數(shù)大于等于理論上的最大數(shù)(2^29-1)枝笨,則直接返回false袁铐。(因為線程數(shù)不能再增加了)
// 2. 根據(jù)core來決定線程數(shù)應該和誰比較揭蜒。當線程數(shù)大于核心線程數(shù)或者最大線程數(shù)時,直接返回false剔桨。
// (因為當大于核心線程數(shù)時屉更,表示此時任務應該直接添加到隊列中(如果隊列滿了,可能入隊失敗)洒缀;當大于最大線程數(shù)時瑰谜,肯定不能再新創(chuàng)建線程了,不然設置最大線程數(shù)有毛用)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 線程數(shù)+1树绩,如果設置成功萨脑,就跳出外層循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 再次獲取線程池的狀態(tài),并將其與前面獲取到的值比較饺饭,
// 如果相同渤早,說明線程池的狀態(tài)沒有發(fā)生變化,繼續(xù)在內(nèi)循環(huán)中進行循環(huán)
// 如果不相同砰奕,說明在這期間蛛芥,線程池的狀態(tài)發(fā)生了變化,需要跳到外層循環(huán)军援,然后再重新進行循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 省略后半部分代碼
// ......
}
在這部分代碼中仅淑,我們可以看到一個很陌生的語法:retry...break retry...continue retry。這種寫法真的是太少見了胸哥,少見到我第一眼看到的時候涯竟,以為是我不小心碰到鍵盤,把源碼給改了空厌。這種語法有點類似于C語言里面的goto(已經(jīng)被遺棄)庐船,其作用就是在for循環(huán)之前定義一個retry,然后在循環(huán)中嘲更,使用break retry時筐钟,就會讓代碼跳出循環(huán),并不再進入循環(huán)赋朦;當使用continue retry時篓冲,表示跳出當前循環(huán),立馬進入到下一次循環(huán)宠哄。由于這里使用了兩層for循環(huán)壹将,因此為了方便在內(nèi)層循環(huán)中一下子跳出到外層循環(huán),就使用了retry這種語法毛嫉。需要說明的是诽俯,這里的retry并不是Java里面的關鍵字,而是隨機定義的一個字符串承粤,我們也可以寫成a,b,c等暴区,但是后面的break和continue后面的字符串需要和前面定義的這個字符串對應闯团。
我們可以看到,前半部分的核心邏輯就是使用了兩個無限for和一個CAS操作來設置線程池的線程數(shù)量颜启。如果線程池的線程數(shù)修改成功偷俭,就中斷循環(huán),進入后半部分代碼的邏輯缰盏,如果修改失敗涌萤,就利用for循環(huán)再一次進行修改,這樣的好處是口猜,既實現(xiàn)了線程安全负溪,也避免使用鎖,提高了效率济炎。在這一部分代碼中川抡,進行了很多判斷,這些判斷主要是校驗線程池的狀態(tài)以及線程數(shù)须尚,個人認為不是特別重要崖堤,我們主要抓住核心邏輯即可。
當成功修改線程數(shù)量以后耐床,就會執(zhí)行addWorker()方法的后半部分代碼密幔,其源碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
// 省略前半部分代碼
// ......
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個新的worker線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 再次獲取線程池的狀態(tài),因為在獲取鎖期間撩轰,線程池的狀態(tài)可能改變了
int rs = runStateOf(ctl.get());
// 如果線程池狀態(tài)時運行狀態(tài)或者是關閉狀態(tài)但是firstTask是空胯甩,就將worker線程添加到線程池
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 判斷worker線程是否已經(jīng)啟動了,如果已經(jīng)啟動堪嫂,就拋出異常
// 個人覺得這一步?jīng)]有任何意義偎箫,因為worker線程是剛new出來的,沒有在任何地方啟動
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 啟動線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果啟動失敗皆串,就將worker線程從線程池移除,并將線程數(shù)減1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在這一部分代碼中恶复,通過new Worker(firstTask)創(chuàng)建了一個Worker對象,Worker對象繼承了AQS寂玲,同時實現(xiàn)了Runnable接口拓哟,它是線程池中真正干活的人。我們提交到線程的任務伶授,最終都是封裝成Worker對象断序,然后由Worker對象來完成任務流纹。先簡單看下Woker的構造方法,其源碼如下违诗。在構造方法中漱凝,首先設置了同步變量state為-1,然后通過ThreadFactory創(chuàng)建了一個線程诸迟,注意在通過ThreadFactory創(chuàng)建線程時茸炒,將Worker自身也就是this,傳入了進去阵苇,也就是說最后創(chuàng)建出來的線程對象壁公,它里面的target屬性就是指向這個Worker對象。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 將this傳入進去绅项,最后就是使線程的target屬性等于當前的worker對象
this.thread = getThreadFactory().newThread(this);
}
}
當通過new Worker(firstTask)創(chuàng)建完worker對象后紊册,此時線程已經(jīng)被創(chuàng)建好了,在啟動線程之前快耿,先通過t.isAlive()判斷線程是已經(jīng)啟動囊陡,如果沒有啟動,才會調(diào)用線程的start()方法來啟動線程掀亥。這里有一點需要注意的是撞反,創(chuàng)建完worker對象后,調(diào)用了mainLock.lock()來保證線程安全铺浇,因為這一步workers.add(w)存在并發(fā)的可能痢畜,所以需要通過獲取鎖來保證線程安全。
當調(diào)用線程的start()方法之后鳍侣,如果線程獲取到CPU的執(zhí)行權丁稀,那么就會執(zhí)行線程的run()方法,在線程的run()方法中倚聚,會執(zhí)行線程中target屬性的run()方法线衫。這里線程的target屬性就是我們創(chuàng)建的worker對象,因此最終會執(zhí)行到Worker的run()方法惑折。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
public void run() {
runWorker(this);
}
}
在Worker類的run()中惨驶,直接調(diào)用了runWorker()方法粗卜。所以Worker執(zhí)行任務的核心邏輯就是在runWorker()方法中實現(xiàn)的。其源碼如下焕数。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果worker線程的firstTask不為空或者能從任務隊列中獲取到任務堡赔,就執(zhí)行
// 否則就會一直阻塞到getTask()方法處
while (task != null || (task = getTask()) != null) {
// 保證worker串行的執(zhí)行線程
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// beforeExecute()是空方法,由子類具體實現(xiàn)
// 該方法的目的是為了讓任務執(zhí)行前做一些其他操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// afterExecute()空方法雕拼,由子類具體實現(xiàn)
// 該方法的目的是為了讓任務執(zhí)行后做一些其他操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker()的源碼看起來也比較長啥寇,但核心邏輯就一行辑甜,即:task.run(),最終執(zhí)行了我們提交的Runnable任務胡诗。在runWorker()方法中通過一個while循環(huán)來讓Worker對象一直來執(zhí)行任務煌恢。當傳入的task對象不為空或者通過getTask()方法能從任務隊列中獲取到任務時瑰抵,worker就會一直執(zhí)行二汛。否則將在finally語句塊中調(diào)用processWorkerExit退出肴颊,讓線程中斷婿着,最終銷毀。
getTask()方法是一個阻塞的方法,當能從任務隊列中獲取到任務時氯葬,就會立即返回一個任務帚称。如果獲取不到任務闯睹,就會阻塞楼吃。它支持超時孩锡,當超過線程池初始化時指定的線程最大存活時間后躬窜,就會返回null荣挨,從而導致worker線程退出while循環(huán)默垄,最終線程銷毀厕倍。
到這兒線程池的execute()方法就分析完了讹弯。最后簡單分析下線程池的shutdown()方法和shutdownNow()方法组民。當調(diào)用shutdown()方法時臭胜,會令線程池的狀態(tài)為SHUTDOWN乱陡,然后中斷空閑的線程憨颠,對于已經(jīng)在執(zhí)行任務的線程并不會中斷爽彤。當調(diào)用shutdownNow()方法時适篙,會令線程池的狀態(tài)為STOP嚷节,然后在中斷所有的線程丹喻,包括正在執(zhí)行任務的線程碍论。
- shutdown()方法的源碼如下鳍悠。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 權限校驗
checkShutdownAccess();
// 將線程池的狀態(tài)設置為SHUTDOWN狀態(tài)
advanceRunState(SHUTDOWN);
// 中斷空閑的worker線程
interruptIdleWorkers();
// 空方法藏研,由子類具體去實現(xiàn)蠢挡,例如ScheduledThreadPoolExecutor
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試將線程池的狀態(tài)設置為TERMINATED
tryTerminate();
}
- shutdownNow()方法的源碼如下业踏。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 線程池狀態(tài)設置為STOP
advanceRunState(STOP);
// 中斷所有線程勤家,包括正在執(zhí)行任務的線程
interruptWorkers();
// 清除任務隊列中的所有任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
- 在實際工作當中伐脖,對于究竟應該使用哪一種方法去中斷線程池讼庇,應該結(jié)合具體的任務來決定,如果要求任務必須執(zhí)行完成认烁,那么就是用shutdown()方法。通常也建議使用shutdown()方法嘹承,更加優(yōu)雅叹卷。
總結(jié)
- 本文主要介紹了線程池的實現(xiàn)原理骤竹,其原理主要分為4個核心步驟蒙揣,先判斷線程數(shù)量是否超過核心線程數(shù)懒震,然后再判斷任務隊列是否已經(jīng)滿了个扰,再判斷線程數(shù)會不會超過設置的最大線程數(shù)葱色,最后執(zhí)行拒絕策略办龄。接著本文詳細介紹了線程池的幾個核心參數(shù)corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler以及它們的各自的意義土榴,然后結(jié)合源碼實現(xiàn)玷禽,詳細分析了任務的執(zhí)行過程。
- 無論是讀寫鎖的實現(xiàn)糯笙,還是線程池的實現(xiàn)给涕,Doug Lea都使用了將一個int型的變量按照高低位拆分的技巧够庙,這種思想很值得學習耘眨,不僅是因為設計巧妙境肾,還因為在計算機中位運算的執(zhí)行效率更高偶宫。
- 最后解釋下關于文章開頭提到的一點疑惑:為什么是先判斷任務隊列有沒有滿纯趋,再判斷線程數(shù)有沒有超過最大線程數(shù)结闸?而不是先判斷最大線程數(shù)桦锄,再判斷任務隊列是否已滿结耀?
- 答案和具體的源碼實現(xiàn)有關图甜。因為當需要創(chuàng)建線程的時候黑毅,都會調(diào)用addWorker()方法矿瘦,在addWorker()的后半部分的邏輯中缚去,會調(diào)用mainLock.lock()方法來獲取全局鎖枕荞,而獲取鎖就會造成一定的資源爭搶躏精。如果先判斷最大線程數(shù)玉控,再判斷任務隊列是否已滿,這樣就會造成線程池原理的4個步驟中碾篡,第1步判斷核心線程數(shù)時要獲取全局鎖开泽,第2步判斷最大線程數(shù)時魁瞪,又要獲取全局鎖导俘,這樣相比于先判斷任務隊列是否已滿,再判斷最大線程數(shù)辅髓,就可能會多出一次獲取全局鎖的過程。因此在設計線程池第焰,為了盡可能的避免因為獲取全局鎖而造成資源的爭搶挺举,所以會先判斷任務隊列是否已滿葵陵,再判斷最大線程數(shù)瞻佛。
- 另外一個疑惑就是:LinkedBlockingQueue的吞吐量比ArrayBlockingQueue的吞吐量要高脱篙。前者是基于鏈表實現(xiàn)的,后者是基于數(shù)組實現(xiàn)的伤柄,正常情況下绊困,不應該是數(shù)組的性能要高于鏈表嗎?
- 然后看了一下這兩個阻塞隊列的源碼才發(fā)現(xiàn)适刀,這是因為LinkedBlockingQueue的讀和寫操作使用了兩個鎖秤朗,takeLock和putLock,讀寫操作不會造成資源的爭搶笔喉。而ArrayBlockingQueue的讀和寫使用的是同一把鎖取视,讀寫操作存在鎖的競爭作谭。因此LinkedBlockingQueue的吞吐量高于ArrayBlockingQueue吼过。
推薦
管程:并發(fā)編程的基石
初識CAS的實現(xiàn)原理
Unsafe類的源碼解讀以及使用場景
隊列同步器(AQS)的設計原理
隊列同步器(AQS)源碼分析
可重入鎖(ReentrantLock)源碼分析
公平鎖與非公平鎖的對比
Condition源碼分析
讀寫鎖ReadWriteLock的實現(xiàn)原理
Semaphore的源碼分析以及使用場景
并發(fā)工具類CountDownLatch的源碼分析以及使用場景
并發(fā)工具類CyclicBarrier的源碼分析以及使用場景
wait()和notify()一定成對出現(xiàn)嗎斤葱?如何解釋Thread.join()