本文章講解的內容是Java線程池源碼分析罢缸。
本文章分析的相關的源碼基于Java Development Kit(JDK) 13师逸。
概述
線程是操作系統(tǒng)的內核資源静盅,是CPU調度的最小單位,所有的應用程序都運行在線程上辣吃,它是我們實現(xiàn)并發(fā)和異步的基礎。在Java的API中芬探,Thread是實現(xiàn)線程的基礎類神得,每創(chuàng)建一個Thread對象,操作系統(tǒng)內核就會啟動一個線程偷仿,在Thread的源碼中哩簿,它所有的關鍵方法都是本地方法(Native Method),內部實現(xiàn)是大量的JNI的調用酝静,因為線程的實現(xiàn)必須由操作系統(tǒng)提供直接支持节榜。在Linux操作系統(tǒng)中,每一個Java thread對應一個native thread别智,它們是一一對應的宗苍。在Android中,創(chuàng)建線程的過程中會調用Linux API中的pthread_create函數(shù)亿遂。
線程的調用會存在以下問題:
- 線程不是輕量級資源浓若,大量創(chuàng)建線程會消耗系統(tǒng)大量資源渺杉,傳統(tǒng)的阻塞調用會導致系統(tǒng)存在大量的因為阻塞而不能運行的線程,這是非常浪費系統(tǒng)資源挪钓。
- 線程運行狀態(tài)和阻塞狀態(tài)的切換會存在相當大的開銷是越,一直以來都是優(yōu)化點,例如:Java虛擬機在運行時會對鎖進行優(yōu)化碌上,就像自旋鎖倚评、鎖粗化和鎖消除等。
線程池(Thread Pool)是一種基于池化思想管理線程的工具馏予,使用線程池有如下好處:
- 降低資源消耗:通過池化技術重復利用已創(chuàng)建的線程天梧,降低線程的創(chuàng)建和銷毀帶來的損耗。
- 提高響應速度:任務到達時霞丧,無需等待線程創(chuàng)建就可以立即執(zhí)行呢岗。
- 提高線程的可管理性:線程是稀缺資源,如果無限制創(chuàng)建蛹尝,不僅會消耗系統(tǒng)資源后豫,還會因為線程的不合理分布導致資源調度失衡,降低系統(tǒng)的穩(wěn)定性突那,使用線程池可以進行統(tǒng)一的分配挫酿、調優(yōu)和監(jiān)控。
- 提供更多更強大的功能:線程池具備拓展性愕难,允許開發(fā)人員向其中增加更多的功能早龟。
結構
ThreadPoolExecutor的UML類圖,如下圖所示:
ThreadPoolExecutor類繼承AbstractExecutorService抽象類猫缭,AbstractExecutorService抽象類實現(xiàn)ExecutorService接口葱弟,ExecutorService接口繼承Executor接口。
Executor
接口Executor可以執(zhí)行提交的Runnable對象的任務饵骨,它的思想是可以將任務提交和每個任務運行機制(例如:線程的使用和線程的調度)解耦翘悉,我們無需關注線程是怎樣創(chuàng)建的,也無需關注線程是怎樣被調度執(zhí)行任務的居触,只需要提供Runnable對象妖混。源碼如下所示:
// Executor.java
package java.util.concurrent;
public interface Executor {
// 在之后的某個時段執(zhí)行給定的任務,該任務可以在新線程中執(zhí)行轮洋,也可以在線程池中的線程中執(zhí)行制市,也可以在調用線程中執(zhí)行,這個是由Executor的實現(xiàn)決定的
void execute(Runnable command);
}
ExecutorService
接口ExeutorService可以提供如下能力:
- 擴充Executor的能力:提供了為一個或者多個異步任務生成Future的方法弊予。
- 提供管理線程的能力:提供了終止線程池運行的方法祥楣。
源碼如下所示:
// ExecutorService.java
package java.util.concurrent;
import java.util.Collection;
import java.util.List;
public interface ExecutorService extends Executor {
// 啟動有序關閉,在這個關閉過程中,會繼續(xù)執(zhí)行先前提交的任務误褪,但是不接受新任務责鳍,如果已經(jīng)關閉,調用這個方法沒有額外影響
void shutdown();
// 嘗試停止所有正在執(zhí)行的任務兽间,停止等待任務的處理历葛,并且返回等待執(zhí)行的任務列表
List<Runnable> shutdownNow();
// 如果Excutor已經(jīng)關閉,就返回true嘀略,否則返回false
boolean isShutdown();
// 如果關閉后所有任務都已經(jīng)完成恤溶,就返回true,要注意的是帜羊,除非先調用shutdown()方法或者shutdownNow()方法咒程,否則這個方法永遠不會返回true
boolean isTerminated();
// 阻塞,直到所有任務在關閉請求讼育,或者發(fā)生超時帐姻,或者當前線程被中斷(要注意的是,以最先發(fā)生的為準)后完成執(zhí)行執(zhí)行
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一個有返回值的任務以執(zhí)行窥淆,并且返回一個表示該任務掛起結果的Future卖宠,F(xiàn)uture的get方法將在任務成功完成時返回任務的結果
<T> Future<T> submit(Callable<T> task);
// 提交一個可運行的任務以執(zhí)行,并且返回一個表示該任務掛起結果的Future忧饭,F(xiàn)uture的get方法將在任務成功完成時返回任務的結果
<T> Future<T> submit(Runnable task, T result);
// 提交一個可運行的任務以執(zhí)行,并且返回一個表示該任務掛起結果的Future筷畦,F(xiàn)uture的get方法將在任務完成后返回null
Future<?> submit(Runnable task);
// 執(zhí)行給定的任務词裤,當所有任務完成時,返回包含其狀態(tài)和結果的Future列表鳖宾,F(xiàn)uture列表中的每個元素中的isDone()方法是返回true吼砂,要注意的是,一個已完成的任務可以正常終止鼎文,也可以通過拋出異常終止渔肩,如果在執(zhí)行此操作時修改了給定的集合,則此方法的結果將未知
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 執(zhí)行給定的任務拇惋,當所有任務完成時周偎,返回包含其狀態(tài)和結果的Future列表,F(xiàn)uture列表中的每個元素中的isDone()方法是返回true撑帖,在返回時蓉坎,未完成的任務將被取消,要注意的是胡嘿,一個已完成的任務可以正常終止蛉艾,也可以通過拋出異常終止,如果在執(zhí)行此操作時修改了給定的集合,則此方法的結果將未知
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 執(zhí)行給定的任務勿侯,如果有的話拓瞪,就返回一個已成功完成的任務的結果(例如:沒有拋出異常),在正持觯或者異常返回時吴藻,未完成的任務將被取消,要注意的是弓柱,如果在執(zhí)行此操作時修改了給定的集合沟堡,則此方法的結果將未知
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 執(zhí)行給定的任務,如果有任務在給定超時結束之前完成矢空,就返回已成功完成的任務的結果(例如:沒有拋出異常)航罗,在正常或者異常返回時屁药,未完成的任務將被取消粥血,如果在執(zhí)行此操作時修改了給定的集合,則此方法的結果將未知
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService
抽象類AbstractExecutorService提供了接口ExecutorService執(zhí)行方法的默認實現(xiàn)酿箭。這個抽象類使用由newTaskFor方法返回的RunnableFuture來實現(xiàn)submit方法复亏、invokeAny方法和invokeAll方法。
線程池分為兩個部分:任務管理和線程管理缭嫡,它使用了生產(chǎn)者消費者模型缔御,任務管理充當生產(chǎn)者的角色,線程管理充當消費者的角色妇蛀。
任務管理
當任務提交后耕突,線程池會執(zhí)行以下事情:
- 申請線程執(zhí)行任務。
- 將任務放到緩沖隊列中评架,等待執(zhí)行眷茁。
- 拒絕執(zhí)行任務。
線程管理
線程池根據(jù)任務進行線程分配纵诞,當線程執(zhí)行完當前任務后會繼續(xù)執(zhí)行下個任務上祈,如果線程獲取不到任務就會被回收。
運行狀態(tài)
線程池的運行狀態(tài)使用變量ctl控制浙芙,源碼如下所示:
// ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 運行狀態(tài)(runState)會存儲在高階位中
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 包裝和拆裝ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
COUNT_BITS的值是Integer.SIZE(值是32)減3登刺,也就是29,可以看到運行狀態(tài)(runState)都是基于COUNT_BITS做左移運算茁裙,也就是運行狀態(tài)會存儲在高3位中塘砸。
變量ctl使用兩個值維護,分別是運行狀態(tài)(runState)和工作線程數(shù)量(workerCount)晤锥,高3位保存運行狀態(tài)掉蔬,低29位保存工作線程數(shù)量廊宪,這樣做的好處是可以避免執(zhí)行相關邏輯的時候,如果出現(xiàn)不一致的情況下女轿,不需要因為維護兩者的一致箭启,而占用鎖資源◎燃#可以發(fā)現(xiàn)這里采用位運算傅寡,根據(jù)之前閱讀源碼的經(jīng)驗,采用位運算會比基本運算效率要高北救。
線程池的五種狀態(tài):
- RUNNING:接受新任務荐操,并且處理阻塞隊列中的任務。
- SHUTDOWN:不接受新任務珍策,但是會去處理阻塞隊列中的任務托启。
- STOP:不接受新任務,不處理阻塞隊列中的任務攘宙,也會中斷正在進行中的任務屯耸。
- TIDYING:所有任務都已經(jīng)終止,線程數(shù)量是零蹭劈,轉換到這個狀態(tài)的線程將調用terminated()鉤子方法(hook method)疗绣。
- TERMINATED:調用terminated()方法完成后就會進入這個狀態(tài)。
線程池的狀態(tài)先是RUNNING狀態(tài)铺韧,然后分成兩種情況:
- 如果調用shutdown()方法多矮,就會進入SHUTDOWN狀態(tài),然后進入TIDYING狀態(tài)祟蚀,阻塞隊列為空工窍,線程池中的工作線程數(shù)量為零,最后調用terminated()方法后進入TERMINATED狀態(tài)前酿。
- 如果調用shutdownNow()方法,就會進入STOP狀態(tài)鹏溯,然后進入TIDYING狀態(tài)罢维,線程池中的工作線程數(shù)量為零,但是阻塞隊列不一定為空丙挽,最后調用terminated()方法后進入TERMINATED狀態(tài)肺孵。
任務調度
任務調度是線程池的核心機制,相關的邏輯在execute(Runnable command)方法中颜阐,源碼如下所示:
// ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
// 如果變量command為空平窘,就拋出NullPointerException異常
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果workerCount(工作線程數(shù)量)小于corePoolSize(核心池大小)凳怨,就創(chuàng)建并且啟動一個線程執(zhí)行新提交的任務
if (addWorker(command, true))
// 如果添加任務成功瑰艘,就返回
return;
// 如果添加任務失敗是鬼,就再次獲取運行狀態(tài)和工作線程數(shù)量
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// 如果當前線程池的運行狀態(tài)是RUNNING狀態(tài),并且添加任務成功紫新,執(zhí)行以下邏輯
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 如果當前線程池的運行狀態(tài)不是RUNNING狀態(tài)均蜜,并且移除任務成功,就調用reject(Runnable command)方法芒率,拒絕任務
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果沒有工作線程囤耳,就調用addWorker(Runnable firstTask, boolean core)方法,第一個參數(shù)firstTask傳入null偶芍,表示在線程池中創(chuàng)建一個線程充择,但是不啟動它;第二參數(shù)core傳入false匪蟀,表示將線程池的最大線程數(shù)量設為maximumPoolSize的值
addWorker(null, false);
}
else if (!addWorker(command, false))
// 如果添加任務失敗椎麦,就調用reject(Runnable command),拒絕任務
reject(command);
}
一些重要的成員變量萄窜,源碼如下所示:
// ThreadPoolExecutor.java
// 用于保存任務并且將其傳遞給工作線程的隊列铃剔,可以使用使用isEmpty()方法判斷隊列是否為空,例如:判斷是否決定從SHUTDOWN狀態(tài)進入TIDYING狀態(tài)
private final BlockingQueue<Runnable> workQueue;
// 對變量和工作線程加上可重入的獨占鎖查刻。雖然可以使用其他可以處理并發(fā)問題的集合键兜,但是使用ReentrantLock會更加可取,其中一個原因是這種方法序列化了interruptIdleWorkers穗泵,從而可以避免不必要的中斷普气,特別是在shutdown的時候,否則佃延,已經(jīng)退出的線程將并發(fā)地中斷那些尚未中斷的線程现诀,它還對largestPoolSize等相關的變量加鎖,shutdown()方法和shutdownNow()持有這個鎖履肃,以確保工作線程是穩(wěn)定的仔沿,能正確處理中斷狀態(tài)
private final ReentrantLock mainLock = new ReentrantLock();
// 包含線程池中所有工作線程的集合(僅在持有mainLock鎖時訪問)
private final HashSet<Worker> workers = new HashSet<>();
// 等待條件,以支持等待終止
private final Condition termination = mainLock.newCondition();
// 最大池大谐咂濉(僅在持有mainLock鎖時訪問)
private int largestPoolSize;
// 已完成任務的計數(shù)器封锉,僅在工作線程終止時更新(僅在持有mainLock鎖時訪問)
private long completedTaskCount;
// 創(chuàng)建線程的工廠
private volatile ThreadFactory threadFactory;
// 處理拒絕策略
private volatile RejectedExecutionHandler handler;
// 空閑線程等待工作的超時時間(以納秒為單位),當workerCount(工作線程的數(shù)量)大于corePoolSize或者allowCoreThreadTimeOut時膘螟,線程使用這個超時時間
private volatile long keepAliveTime;
// 如果返回false(默認)成福,核心線程即使在空閑時也保持活動;如果返回true荆残,核心線程使用keepAliveTime來超時等待工作
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小奴艾,它是保持活動的工作線程的最小數(shù)量(并且不允許超時等)
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
// 默認的拒絕策略是AbortPolicy
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
// 調用shutdown()方法和shutdownNow()方法需要的權限
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
執(zhí)行過程如下所示:
- 檢查線程池的運行狀態(tài),保證是在RUNNING狀態(tài)下執(zhí)行任務内斯,如果不是這個狀態(tài)蕴潦,就直接拒絕像啼。
- 如果workerCount(工作線程數(shù)量)小于corePoolSize(核心池大小)品擎,就創(chuàng)建并且啟動一個線程執(zhí)行新提交的任務埋合。
- 如果workerCount大于等于corePoolSize,并且如果阻塞隊列還沒滿萄传,就將新提交的任務放到阻塞隊列中甚颂。
- 如果workerCount大于等于corePoolSize,并且阻塞隊列已經(jīng)滿了秀菱,并且workerCount小于maximumPoolSize振诬,就創(chuàng)建并且啟動一個線程執(zhí)行新提交的任務。
- 如果workerCount大于等于corePoolSize衍菱,并且阻塞隊列已經(jīng)滿了赶么,并且workerCount大于等于maximumPoolSize,就根據(jù)拒絕策略處理這個任務脊串,默認的處理方式是拋出RejectedExecutionException異常辫呻。
部分過程是在addWorker(Runnable firstTask, boolean core)方法中,后面會詳細講解琼锋。
任務緩沖
任務緩沖是線程池能夠管理任務的核心機制放闺,它是通過一個阻塞隊列(BlockingQueue)實現(xiàn)的,阻塞隊列緩存任務缕坎,工作線程從阻塞隊列中獲取任務怖侦,它符合生產(chǎn)者消費者模型,生產(chǎn)者是添加元素的線程谜叹,消費者是獲取元素的線程匾寝。
阻塞隊列(BlockingQueue)的數(shù)據(jù)結構是隊列,它支持兩個附加操作荷腊,分別是:
- 在阻塞隊列為空的時候艳悔,獲取元素的線程會等待該隊列變?yōu)榉强铡?/strong>
- 在阻塞隊列已經(jīng)滿的時候,添加元素的線程會等待該隊列變?yōu)榭捎谩?/strong>
源碼如下所示:
// BlockingQueue.java
public interface BlockingQueue<E> extends Queue<E> {
// 在不超過隊列容量的前提下女仰,將指定的元素添加到隊列中很钓,成功就返回true,如果隊列已經(jīng)滿了董栽,就拋出IllegalStateException異常,當使用有容量限制的隊列時企孩,最好還是調用offer(E e)方法
boolean add(E e);
// 在不超過容量的前提下锭碳,將指定的元素添加到隊列中,成功就返回true勿璃,如果隊列已經(jīng)滿了擒抛,就返回false推汽,當使用有容量限制的隊列時,調用這個方法會比add(E e)方法好
boolean offer(E e);
// 將指定的元素添加到隊列中歧沪,如果沒有足夠的空間歹撒,就等待
void put(E e) throws InterruptedException;
// 將指定的元素添加到隊列中,如果沒有足夠的空間诊胞,就等待到指定的等待時間
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 檢索并且刪除隊列的頭元素暖夭,如果沒有足夠的空間,就等待
E take() throws InterruptedException;
// 檢索并且刪除隊列的頭元素撵孤,如果沒有足夠的空間迈着,就等待到指定的等待時間
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 返回隊列在理想情況下(在沒有內存或者資源約束的情況下)可以不阻塞地接受額外元素的數(shù)量,如果沒有限制邪码,就返回Integer.MAX_VALUE的值
int remainingCapacity();
// 如果隊列中包含一個或者多個指定的元素(通過equals方法判斷是否是同一個元素)裕菠,就刪除這些元素,并且返回true闭专,否則返回false
boolean remove(Object o);
// 如果隊列中包含一個或者多個指定的元素(通過equals方法判斷是否是同一個元素)奴潘,就返回true,否則返回false
boolean contains(Object o);
// 從隊列中刪除指定的集合c中的元素影钉,如果指定的集合c是該隊列或者該隊列的某個元素的某些屬性阻止將其添加到指定的集合中画髓,就拋出IllegalArgumentException異常,如果在執(zhí)行此操作時修改了給定的集合斧拍,則此方法的結果將未知
int drainTo(Collection<? super E> c);
// 從隊列中刪除指定的集合c中的數(shù)量是maxElements的元素雀扶,如果指定的集合c是該隊列或者該隊列的某個元素的某些屬性阻止將其添加到指定的集合中,就拋出IllegalArgumentException異常肆汹,如果在執(zhí)行此操作時修改了給定的集合愚墓,則此方法的結果將未知
int drainTo(Collection<? super E> c, int maxElements);
}
看下接口BlockingQueue的實現(xiàn)類,可得知阻塞隊列有以下幾種類型:
- ArrayBlockingQueue:使用數(shù)組實現(xiàn)的有容量限制的阻塞隊列昂勉,按照先進先出(FIFO浪册,F(xiàn)irst-In-First-Out)的原則對元素進行排序,支持公平鎖和非公平鎖岗照。
- DelayQueue:實現(xiàn)延遲獲取的沒有容量限制的阻塞隊列村象,可以指定延遲時間,只有達到指定的延遲時間攒至,才能獲取該隊列中的元素厚者。
- LinkedBlockingDeque:使用雙向鏈表實現(xiàn)的阻塞隊列,隊頭和隊尾都可以添加或者刪除元素迫吐,在并發(fā)環(huán)境下库菲,可以將鎖的競爭最多降到一半。
- LinkedBlockingQueue:使用鏈表實現(xiàn)的有容量限制的阻塞隊列志膀,按照先進先出(FIFO熙宇,F(xiàn)irst-In-First-Out)的原則對元素進行排序鳖擒,默認長度是Integer.MAX_VALUE,所以默認創(chuàng)建該隊列可能有容量危險烫止,該隊列通常比基于數(shù)組實現(xiàn)的隊列具有更高的吞吐量蒋荚,但是在并發(fā)環(huán)境下,性能會比較差馆蠕。
- LinkedTransferQueue:使用鏈表實現(xiàn)的沒有容量限制的阻塞隊列期升,按照先進先出(FIFO,F(xiàn)irst-In-First-Out)的原則對元素進行排序荆几,和其他隊列對比吓妆,多出了兩個方法,分別是:transfer(E e)方法和tryTransfer(E e, long timeout, TimeUnit unit)方法吨铸。
- PriorityBlockingQueue:支持線程優(yōu)先級排序的沒有容量限制的阻塞隊列行拢,默認是按自然順序排序,也就是從優(yōu)先級低到高排序诞吱,也可以實現(xiàn)compareTo()方法來指定元素的排序規(guī)則舟奠,但是不能保證同優(yōu)先級元素的順序。
- SynchronousQueue:不存儲元素的阻塞隊列房维,支持公平鎖和非公平鎖沼瘫,每一個插入操作都必須要等待另一個線程的移除操作,每一個移除操作都必須要等待另一個線程的插入操作咙俩,其中耿戚,Executors.newCachedThreadPool()使用了這個隊列。
任務申請
任務申請相關的邏輯在getTask()方法中阿趁,源碼如下所示:
// ThreadPoolExecutor.java
private Runnable getTask() {
boolean timedOut = false;
// 循環(huán)執(zhí)行
for (;;) {
// 獲取線程池的運行狀態(tài)和工作線程數(shù)量
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// 如果線程池的運行狀態(tài)至少是SHUTDOWN狀態(tài)膜蛔,并且至少是STOP狀態(tài),或者阻塞隊列為空脖阵,就執(zhí)行以下邏輯
// 調用decrementWorkerCount()方法皂股,減少成員變量ctl的workerCount的值
decrementWorkerCount();
// 返回null
return null;
}
// 獲取工作線程數(shù)量
int wc = workerCountOf(c);
// 獲取線程是否是回收狀態(tài)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 如果線程數(shù)量大于maximumPoolSize的值,也就是線程數(shù)量過多命黔,就返回null
return null;
continue;
}
try {
// 如果線程是可回收的呜呐,就調用poll(long timeout, TimeUnit unit)方法,否則調用take()方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
執(zhí)行過程如下所示:
- 獲取線程池的運行狀態(tài)和工作線程數(shù)量悍募。
- 如果線程池已經(jīng)停止執(zhí)行蘑辑,就返回null,否則執(zhí)行步驟3坠宴。
- 如果線程池數(shù)量過多以躯,就返回null,否則執(zhí)行執(zhí)行步驟4。
- 如果該線程是可回收的忧设,就調用poll(long timeout, TimeUnit unit)方法,檢索并且刪除隊列的頭元素颠通,如果沒有足夠的空間址晕,就等待到指定的等待時間;如果該線程是不可回收的顿锰,就調用take()方法谨垃,檢索并且刪除隊列的頭元素,如果沒有足夠的空間硼控,就等待馒索。
任務拒絕
任務拒絕是線程池的保護機制跺涤,相關的邏輯在RejectedExecutionHandler中,源碼如下所示:
// RejectedExecutionHandler.java
public interface RejectedExecutionHandler {
// 當線程池不能接受任務時,會調用這個方法
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
看下接口RejectedExecutionHandler的實現(xiàn)類爽哎,可得知拒絕策略有以下幾種類型:
- ThreadPoolExecutor.AbortPolicy:線程池的默認拒絕策略,也就是ThreadPoolExecutor和ScheduledThreadPoolExecutor的默認拒絕策略爬早,丟棄任務氯夷,并且拋出RejectedExecutionException異常,建議使用這個策略撼短,因為可以方便通過異常發(fā)現(xiàn)再膳。
- ThreadPoolExecutor.CallerRunsPolicy:由提交任務的線程處理該任務,這種情況下曲横,需要等到所有任務執(zhí)行完畢喂柒,這種策略適合大量計算的任務類型。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務禾嫉,然后重新提交被拒絕的任務灾杰,這種策略適合經(jīng)常需要丟棄舊的任務類型。
- ThreadPoolExecutor.DiscardPolicy:丟棄任務夭织,但是不拋出異常吭露,使用這種策略會導致我們無法發(fā)現(xiàn)異常。
工作線程管理
Worker類是線程池的工作線程的類尊惰,它是ThreadPoolExecutor類的一個被關鍵字final修飾的內部類讲竿,繼承了AbstractQueuedSynchronizer類,并且實現(xiàn)了Runnable接口弄屡,源碼如下所示:
// ThreadPoolExecutor.java
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 序列化版本號
private static final long serialVersionUID = 6138294804551838833L;
// 正在運行的工作線程题禀,如果創(chuàng)建失敗的,就為空
final Thread thread;
// 要運行的初始任務膀捷,可能為空
Runnable firstTask;
// 線程任務計數(shù)器
volatile long completedTasks;
// 從ThreadFactory中創(chuàng)建的第一個線程和任務
Worker(Runnable firstTask) {
// 在運行工作線程之前禁止中斷
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 調用runWorker(Worker w)方法
runWorker(this);
}
// 以下是加鎖的方法迈嘹,值是0表示未解鎖狀態(tài),值是1表示解鎖狀態(tài)
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
根據(jù)前面的源碼可得知,線程池使用HashSet存儲所有的工作線程秀仲,也就是存儲所有的Worker對象融痛,這樣就可以方便地通過添加或者刪除元素來控制線程池中的線程。
Worker類通過繼承AbstractQueuedSynchronizer類實現(xiàn)獨占鎖神僵,獨占鎖的意思是每次只有一個線程持有鎖雁刷,沒有使用ReentrantLock的原因是因為它是可重入鎖,線程池不能允許工作線程能夠多次獲取鎖保礼,所以使用AbstractQueuedSynchronizer沛励。
AbstractQueuedSynchronizer是依賴先進先出(FIFO,F(xiàn)irst-In-First-Out)的隊列的阻塞鎖和相關同步器(信號量炮障、事件等)實現(xiàn)的目派,里面維護著一個int的state。
添加工作線程
看下addWorker(Runnable firstTask, boolean core)方法胁赢,源碼如下所示:
// ThreadPoolExecutor.java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 獲取線程池的運行狀態(tài)和工作線程數(shù)量
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
// 如果線程池已經(jīng)停止企蹭,就添加工作線程失敗
return false;
// 循環(huán)執(zhí)行
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 獲取工作線程數(shù)量,如果工作線程數(shù)量大于等于核心池大小或者最大池大小徘键,就添加工作線程失敗
return false;
// 增加workerCount(原子操作)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加失敗练对,就再次獲取ctl
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
// 如果中途有其他線程修改線程池的運行狀態(tài),使其變成SHUTDOWN狀態(tài)吹害,就跳到retry標簽處螟凭,回到最外層的循環(huán)
continue retry;
}
}
// Worker對象對應的線程是否已經(jīng)啟動
boolean workerStarted = false;
// Worker對象是否添加到HashSet成功
boolean workerAdded = false;
Worker w = null;
try {
// 使用firstTask創(chuàng)建Worker對象
w = new Worker(firstTask);
// 根據(jù)Worker對象創(chuàng)建一個線程
final Thread t = w.thread;
if (t != null) {
// 如果線程不為空,就創(chuàng)建ReentrantLock對象
final ReentrantLock mainLock = this.mainLock;
// 對mainLock加鎖
mainLock.lock();
try {
// 保持鎖定狀態(tài)時再次檢查它呀,在ThreadFactory失敗或者在獲得鎖之前關閉退出
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 如果線程池處于小于STOP狀態(tài)螺男,也就是RUNNING狀態(tài),就執(zhí)行以下邏輯
if (t.getState() != Thread.State.NEW)
// 如果線程不是創(chuàng)建狀態(tài)纵穿,就拋出IllegalThreadStateException異常
throw new IllegalThreadStateException();
// 將Worker對象添加到HashSet中
workers.add(w);
// Worker對象成功添加到HashSet
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
// 如果線程數(shù)量大于最大池大小下隧,就將該值賦值給最大池大小
largestPoolSize = s;
}
} finally {
// 對mainLock解鎖
mainLock.unlock();
}
if (workerAdded) {
// 如果Worker對象成功添加到HashSet中,就啟動線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果線程沒有啟動谓媒,就調用addWorkerFailed(Worker w)方法淆院,執(zhí)行回滾工作線程的創(chuàng)建,也就是做一些清理工作
addWorkerFailed(w);
}
// 返回線程是否啟動成功
return workerStarted;
}
執(zhí)行流程如下所示:
- 檢查線程池是否已經(jīng)停止句惯,如果是土辩,就添加線程失敗,否則執(zhí)行步驟2抢野。
- 檢查線程池是否正在停止拷淘,如果是,就執(zhí)行步驟3指孤,否則添加線程失敗启涯。
- 檢查線程是否用于執(zhí)行剩余任務贬堵,如果是,就執(zhí)行步驟4结洼,否則添加線程失敗黎做。
- 獲取工作線程數(shù)量,檢查線程池的運行狀態(tài)是否發(fā)生改變补君,如果有改變引几,執(zhí)行步驟1,否則執(zhí)行步驟5挽铁。
- 檢查工作線程數(shù)量是否大于核心池大小或者最大池大小(取決于布爾型形式參數(shù)core敞掘,core是true叽掘,就使用核心池大小玖雁;core是false更扁,就使用最大池大小)赫冬,如果是浓镜,就添加線程失敗,否則執(zhí)行步驟6劲厌。
- 把Worker對象存儲到HashSet中膛薛,如果成功添加,就啟動線程补鼻,否則執(zhí)行步驟4哄啄。
執(zhí)行工作線程
// ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 獲取第一個任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 調用getTask獲取任務
while (task != null || (task = getTask()) != null) {
// 對Worker對象加鎖
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果線程大于等于STOP狀態(tài),并且線程還不是中斷狀態(tài)风范,就中斷線程
wt.interrupt();
try {
// 調用beforeExecute(Thread t, Runnable r)方法咨跌,這個方法可以給子類去實現(xiàn)
beforeExecute(wt, task);
try {
// 執(zhí)行任務
task.run();
// 調用afterExecute(Runnable r, Throwable t)方法寇漫,這個方法是可以給子類去實現(xiàn)
afterExecute(task, null);
} catch (Throwable ex) {
// 如果有異常,就調用afterExecute(Runnable r, Throwable t)方法猪腕,這個方法是可以給子類去實現(xiàn)
afterExecute(task, ex);
throw ex;
}
} finally {
// task設為null,準備下一個任務
task = null;
// 完成任務數(shù)量的值自增
w.completedTasks++;
// 對Worker對象解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 調用processWorkerExit(Worker w, boolean completedAbruptly)方法陋葡,回收工作線程
processWorkerExit(w, completedAbruptly);
}
}
執(zhí)行流程如下所示:
- 如果firstTask不為空,執(zhí)行firstTask,調用getTask()方法獲取任務執(zhí)行角塑。
- 如果firstTask為空,調用getTask()方法獲取任務執(zhí)行侥猩。
回收工作線程
// ThreadPoolExecutor.java
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
// 如果completedAbruptly是true的話,證明workerCount的值還沒有被減少
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
// 對mainLock加鎖
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移除HashSet中的對應的Worker對象
workers.remove(w);
} finally {
// 對mainLock解鎖
mainLock.unlock();
}
// 調用tryTerminate()方法,這個方法會調用terminated()方法,轉換成TERMINATED狀態(tài)
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
執(zhí)行流程如下所示:
- 檢查任務執(zhí)行情況溢陪,例如:檢查workCount的值是否被減少。
- 移除HashSet中對應的Worker對象宝当。
- 線程池的運行狀態(tài)變?yōu)門ERMINATED狀態(tài)订晌。
線程池的種類
在Executors類中封裝了五種線程池:
newFixedThreadPool
源碼如下所示:
// Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
該線程池線程數(shù)量固定虏辫,核心池大小(coolPoolSize)和最大池大行獠Α(maximumPoolSize)都一樣砌庄,并且keepAliveTime是0L,使用的阻塞隊列是LinkedBlockingQueue奕枢,它使用鏈表實現(xiàn)的有容量限制的阻塞隊列娄昆,按照先進先出(FIFO,F(xiàn)irst-In-First-Out)的原則對元素進行排序缝彬,默認長度是Integer.MAX_VALUE萌焰,所以默認創(chuàng)建該隊列可能有容量危險,該隊列通常比基于數(shù)組實現(xiàn)的隊列具有更高的吞吐量谷浅,但是在并發(fā)環(huán)境下扒俯,性能會比較差奶卓。
適用于在已知并發(fā)壓力下,執(zhí)行耗時長的任務的場景陵珍。
newWorkStealingPool
源碼如下所示:
// Executors.java
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
該線程池可以擁有多個隊列寝杖,以便減少連接數(shù),默認線程數(shù)量是當前計算機可用的CPU數(shù)量互纯。
適用于耗時長瑟幕、需要并發(fā)執(zhí)行任務的場景。
newSingleThreadExecutor
源碼如下所示:
// Executors.java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
該線程池線程數(shù)量只有一個留潦,核心池大兄豁铩(coolPoolSize)和最大池大小(maximumPoolSize)都1兔院,并且keepAliveTime是0L殖卑,使用的阻塞隊列是LinkedBlockingQueue,它使用鏈表實現(xiàn)的有容量限制的阻塞隊列坊萝,按照先進先出(FIFO孵稽,F(xiàn)irst-In-First-Out)的原則對元素進行排序,默認長度是Integer.MAX_VALUE十偶,所以默認創(chuàng)建該隊列可能有容量危險菩鲜,該隊列通常比基于數(shù)組實現(xiàn)的隊列具有更高的吞吐量,但是在并發(fā)環(huán)境下惦积,性能會比較差接校。
適用于需要保證任務執(zhí)行順序的場景。
newCachedThreadPool
源碼如下所示:
// Executors.java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
該線程池可以緩存線程狮崩,它會根據(jù)需要創(chuàng)建新線程蛛勉,但是會在可用的時候重用之前創(chuàng)建的線程,核心池大心啦瘛(coolPoolSize)是0诽凌,最大池大小(maximumPoolSize)是Integer.MAX_VALUE爱只,并且keepAliveTime是60L皿淋,使用的阻塞隊列是SynchronousQueue,它不存儲元素的阻塞隊列恬试,支持公平鎖和非公平鎖窝趣,每一個插入操作都必須要等待另一個線程的移除操作,每一個移除操作都必須要等待另一個線程的插入操作训柴。
適用于執(zhí)行耗時比較短的任務的場景哑舒。
newScheduledThreadPool
源碼如下所示:
// Executors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
該線程池可以安排任務在給定的延遲后運行或者定期運行,使用的阻塞隊列是DelayedWorkQueue幻馁,它是一個支持延遲的阻塞隊列洗鸵。
適用于執(zhí)行周期性任務的場景越锈。
我的GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架
我的掘金:譚嘉俊
我的簡書:譚嘉俊
我的CSDN:譚嘉俊