線程池中有一定數(shù)量的工作線程寞蚌,工作線程會循環(huán)從任務隊列中獲取任務峻村,并執(zhí)行這個任務麸折。那么怎么去停止這些工作線程呢?
這里就涉及到線程池兩個重要概念:工作線程數(shù)量和線程池狀態(tài)粘昨。
一.線程池狀態(tài)和工作線程數(shù)量
這本來是兩個不同的概念垢啼,但是在ThreadPoolExecutor中我們使用一個變量ctl來存儲這兩個值窜锯,這樣我們只需要維護這一個變量的并發(fā)問題,提高運行效率芭析。
/**
* 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
* int類型是32位锚扎,它的高3位,表示線程池的狀態(tài)馁启,低29位表示W(wǎng)orker的數(shù)量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 29位驾孔,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示線程池中創(chuàng)建Worker工作線程數(shù)量的最大值。即 0b0001.....1(29位1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
怎么使用一個變量ctl存儲兩個值呢惯疙?
就是利用int變量的高3位來儲存線程池狀態(tài)翠勉,用int變量的低29位來儲存工作線程數(shù)量。
這樣就有兩個需要注意的地方:
- 工作線程數(shù)量最大值不能超過int類型29位的值CAPACITY 即0b0001.....1(29位1)
- 因為線程池狀態(tài)都是高3位儲存的霉颠,所以工作線程數(shù)量不會影響狀態(tài)值大小關系对碌。
1.1 線程池狀態(tài)
// 高3位值是111
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位值是000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位值是001
private static final int STOP = 1 << COUNT_BITS;
// 高3位值是010
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位值是011
private static final int TERMINATED = 3 << COUNT_BITS;
線程池狀態(tài)分析:
- RUNNING狀態(tài):線程池剛創(chuàng)建時的狀態(tài)。向任務隊列中添加任務蒿偎,并執(zhí)行任務隊列中的任務朽们。因為高3位值是111,即處于RUNNING狀態(tài)下的ctl值都是負數(shù)诉位。
- SHUTDOWN狀態(tài): 調(diào)用shutdown方法华坦,會將線程池設置成這個狀態(tài)。不能向任務隊列中添加任務不从,但是可以執(zhí)行任務隊列中已添加的任務惜姐。并且處于SHUTDOWN狀態(tài)下正在運行任務的工作線程不能中斷的,就是保證任務能夠執(zhí)行完成椿息。
- STOP狀態(tài): 調(diào)用shutdownNow方法歹袁,會將線程池設置成這個狀態(tài)。不能向任務隊列中添加任務寝优,也不能再執(zhí)行任務隊列中已添加的任務条舔。
- TIDYING狀態(tài): 調(diào)用tryTerminate方法,可能會將線程池設置成這個狀態(tài)乏矾。這個只是中斷過度狀態(tài)孟抗,表示線程池即將變成TERMINATED狀態(tài)。
- TERMINATED狀態(tài): 調(diào)用tryTerminate方法钻心,可能會將線程池設置成這個狀態(tài)凄硼。表示線程池已經(jīng)完全終止,即任務隊列為空捷沸,工作線程數(shù)量也是0.
線程池為什么要定義這么多狀態(tài)呢摊沉?按道理說線程池只應該有運行和終止這兩種狀態(tài)啊。
主要是因為終止線程池時痒给,要考慮正在執(zhí)行的任務和已經(jīng)添加到任務隊列中待執(zhí)行的任務該如何處理说墨,否則的話骏全,這些任務可能就會被丟失。
線程池提供了兩個方式處理:
- shutdown方法: 它會將線程池狀態(tài)變成SHUTDOWN 狀態(tài)尼斧。禁止向添加新的任務姜贡,但是會讓任務隊列中的任務繼續(xù)執(zhí)行,最后釋放所有的工作線程棺棵,讓線程池狀態(tài)變成TERMINATED狀態(tài)鲁豪。
- shutdownNow方法: 它會將線程池狀態(tài)變成STOP 狀態(tài)。禁止向添加新的任務律秃,也不會執(zhí)行任務隊列中的任務爬橡,但是會返回這個任務集合,釋放所有的工作線程棒动,讓線程池狀態(tài)變成TERMINATED狀態(tài)糙申。
1.2 操作ctl的方法
1.2.1 獲取線程池的狀態(tài)
/**
* 獲取線程池的狀態(tài)。因為線程池的狀態(tài)是使用高3位儲存船惨,所以屏蔽低29位就行了柜裸。
* 所以就c與~CAPACITY(0b1110..0)進行&操作,屏蔽低29位的值了粱锐。
* 注意:這里是屏蔽低29位的值疙挺,而不是右移29位。
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
1.2.2 獲取工作線程數(shù)量
/**
* 獲取線程池中Worker工作線程的數(shù)量怜浅,
* 因為只使用低29位保存Worker的數(shù)量铐然,只要屏蔽高3位的值就行了
* 所以就c與CAPACITY(0b0001...1)進行&操作,屏蔽高3位的值了恶座。
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
1.2.3 合并ctl的值
/**
* 得到ctl的值搀暑。
* 接受兩個參數(shù)rs和wc。rs表示線程池的狀態(tài)跨琳,wc表示W(wǎng)orker工作線程的數(shù)量自点。
* 對于rs來說我們只需要高3位的值,對于wc來說我們需要低29位的值脉让。
* 所以我們將rs | wc就可以得到ctl的值了桂敛。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
1.2.4 其他方法
// 因為RUNNING狀態(tài)高三位是111,所以狀態(tài)值rs與工作線程數(shù)量ws相與的結(jié)果值c一定是個負數(shù)溅潜,
// 而其他狀態(tài)值都是大于等于0的數(shù)术唬,所以c是負數(shù),那么表示當前線程處于運行狀態(tài)伟恶。
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS函數(shù)將ctl值自增
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS函數(shù)將ctl值自減
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS函數(shù)加循環(huán)方法這種樂觀鎖的方式碴开,解決并發(fā)問題毅该。
* 保證使ctl值減一
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
二. 重要成員變量
// 記錄線程池中Worker工作線程數(shù)量和線程池的狀態(tài)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 任務線程的阻塞隊列博秫,因為是阻塞隊列潦牛,所以它是并發(fā)安全的
private final BlockingQueue<Runnable> workQueue;
// 獨占鎖,用來保證操作成員變量的并發(fā)安全問題
private final ReentrantLock mainLock = new ReentrantLock();
// 等待線程池完全終止的條件Condition挡育,
private final Condition termination = mainLock.newCondition();
//----------------- 需要mainLock來保證并發(fā)安全-------------------------//
// 線程池中工作線程集合巴碗。Worker中持有線程thread變量
private final HashSet<Worker> workers = new HashSet<Worker>();
// 線程池中曾擁有過的最大工作線程個數(shù)
private int largestPoolSize;
// 線程池完成過任務的總個數(shù)
private long completedTaskCount;
//----------------- 需要mainLock來保證并發(fā)安全-------------------------//
// 創(chuàng)建線程的工廠類
private volatile ThreadFactory threadFactory;
// 當任務被拒絕時,用來處理這個被拒絕的任務
private volatile RejectedExecutionHandler handler;
// 工作線程空閑的超時時間keepAliveTime
private volatile long keepAliveTime;
// 是否允許核心池線程超時釋放
private volatile boolean allowCoreThreadTimeOut;
// 線程池核心池線程個數(shù)
private volatile int corePoolSize;
// 線程池最大的線程個數(shù)
private volatile int maximumPoolSize;
成員變量的含義已經(jīng)標注了:
- mainLock:使用mainLock來保證會發(fā)生變化成員變量的并發(fā)安全問題即寒。會發(fā)生的成員變量有5個:ctl橡淆、workQueue、workers母赵、largestPoolSize和completedTaskCount逸爵。但是其中ctl和workQueue的類型本身就是多線程安全的,所以不用mainLock鎖保護凹嘲。
- termination:等待線程池完全終止的條件师倔,如果線程池沒有完全終止,調(diào)用它的awaitNanos方法周蹭,讓線程等待趋艘。當線程池完全終止后,調(diào)用它的signalAll方法凶朗,喚醒所有等待termination條件的線程瓷胧。
- workers:記錄所有的工作線程Worker
- workQueue:記錄所有待執(zhí)行的任務。使用阻塞隊列BlockingQueue棚愤,可以在隊列為空時搓萧,線程等待,隊列有值時宛畦,喚醒等待的線程矛绘。
- largestPoolSize:線程池中曾擁有過的最大工作線程個數(shù)
- completedTaskCount:線程池完成過任務的總個數(shù)
- threadFactory:創(chuàng)建線程的工廠類
- handler:當任務被拒絕時,用來處理這個被拒絕的任務
- keepAliveTime:工作線程允許空閑的超時時間刃永,一般都是針對超過核心池數(shù)量的工作線程货矮。
- allowCoreThreadTimeOut: 是否允許核心池的工作線程超時釋放。
- corePoolSize:線程池核心池線程個數(shù)斯够。
- maximumPoolSize: 線程池最大的線程個數(shù)囚玫。
這里注意一下兩個概念核心池個數(shù)和最大線程池個數(shù):
- 核心池個數(shù)就是線程池能夠維持的常用工作線程個數(shù),當工作線程沒有執(zhí)行任務空閑時读规,它不會被銷毀抓督,而是在等待。但是如果設置allowCoreThreadTimeOut為true束亏,那么核心池工作線程也是會被銷毀铃在。
- 最大線程池個數(shù)就是線程池允許開啟的最大工作線程個數(shù)。最大線程池的意義就是當核心池的工作線程不夠用,且任務隊列也已經(jīng)滿了定铜,不能添加新的任務了阳液,那么就要開啟新的工作線程來執(zhí)行任務。
三. 執(zhí)行任務execute方法
在線程池中如何執(zhí)行一個任務command揣炕,要分三種情況:
- 線程池中工作線程的數(shù)量沒有達到核心池個數(shù)帘皿,那么線程池就應該開啟新的工作線程來執(zhí)行任務。
- 線程池中工作線程的數(shù)量達到核心池個數(shù)畸陡,那么就應該將任務添加到任務隊列中鹰溜,等待著工作線程去任務隊列中獲取任務并執(zhí)行。
- 如果任務添加到任務隊列失敗丁恭,那么就要開啟新的工作線程來執(zhí)行任務曹动。
public void execute(Runnable command) {
// 如果command為null,拋出異常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 分為三個步驟:
* 1. 如果運行的工作線程數(shù)量少于核心池數(shù)量corePoolSize牲览,
* 那么就調(diào)用addWorker方法開啟一個新的工作線程仁期,運行任務command。
* 2. 如果開啟新的工作線程失敗竭恬,就將任務添加到任務隊列中跛蛋。
* 3. 添加到任務隊列失敗,
* 那么仍然addWorker方法在最大池中開啟一個新的工作線程痊硕,運行任務command赊级。
*/
int c = ctl.get();
// 運行的工作線程數(shù)量少于核心池數(shù)量corePoolSize
if (workerCountOf(c) < corePoolSize) {
/**
* 開啟一個新的工作線程,運行任務command岔绸。
* 返回true理逊,表示開啟工作線程成功,直接return盒揉。
* 返回false晋被,表示沒有開啟新線程。那么任務command就沒有運行刚盈,所以要執(zhí)行下面代碼羡洛。
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
// 線程池處于運行狀態(tài),
// 且任務添加到任務阻塞隊列workQueue中成功藕漱,即workQueue隊列有剩余空間欲侮。
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查線程池狀態(tài)和工作線程數(shù)量
int recheck = ctl.get();
/**
* 如果線程池不在運行狀態(tài),那么就調(diào)用remove方法移除workQueue隊列這個任務command肋联,
* 如果移除成功威蕉,那么調(diào)用reject(command)方法,進行拒絕任務的處理橄仍。
* 如果移除失敗韧涨,那么這個任務還是會被執(zhí)行牍戚,那么就不用調(diào)用reject(command)方法
*/
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作線程數(shù)量為0,但是workQueue隊列中我們添加過任務虑粥,
// 那么必須調(diào)用addWorker方法如孝,開啟一個新的工作線程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 調(diào)用addWorker方法舀奶,開啟一個新的工作線程暑竟,運行任務command斋射。
// 如果還是失敗育勺,那么這個任務command就不會不可能執(zhí)行了,
// 那么調(diào)用reject(command)方法拒絕這個任務
else if (!addWorker(command, false))
reject(command);
}
方法流程上面已經(jīng)有標注罗岖,注意有以下幾點:
- addWorker(Runnable firstTask, boolean core):表示開啟一個新的工作線程執(zhí)行任務firstTask涧至。core是用來判斷核心池還是最大池。返回false桑包,表示開啟新線程失敗南蓬,即任務firstTask沒有機會執(zhí)行。
- isRunning(c)線程池處于RUNNING狀態(tài),只有處于RUNNING狀態(tài)下哑了,才能將任務添加到任務隊列赘方。
- reject(command) 當任務command不能在線程池中執(zhí)行時,就會調(diào)用這個方法弱左,告訴調(diào)用值窄陡,線程池拒絕執(zhí)行這個任務。
四. 添加工作線程addWorker方法
就是利用任務task創(chuàng)建一個新的工作線程Work拆火,然后將它添加到工作線程集合workers中跳夭。但是需要注意多線程并發(fā)問題。
private boolean addWorker(Runnable firstTask, boolean core) {
// 利用死循環(huán)和CAS函數(shù)们镜,實現(xiàn)樂觀鎖币叹,來實現(xiàn)多線程改變ctl值的并發(fā)問題
// 因為ctl值代表兩個東西,工作線程數(shù)量和線程池狀態(tài)模狭。
// 這里就用了兩個for循環(huán)颈抚,一個是線程池狀態(tài)的for循環(huán),一個是工作線程數(shù)量的for循環(huán)
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池運行狀態(tài)rs嚼鹉,
int rs = runStateOf(c);
// 首先判斷線程池狀態(tài)和任務隊列狀態(tài)邪意,
// 來判斷能否創(chuàng)建新的工作線程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 線程池中工作線程數(shù)量wc
int wc = workerCountOf(c);
// 當線程池工作線程數(shù)量wc大于線程上限CAPACITY,
// 或者用戶規(guī)定核心池數(shù)量corePoolSize或用戶規(guī)定最大線程池數(shù)量maximumPoolSize
// 表示不能創(chuàng)建工作線程了反砌,所以返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS函數(shù)雾鬼,使工作線程數(shù)量wc加一
if (compareAndIncrementWorkerCount(c))
// 跳出retry循環(huán)
break retry;
// 來到這里表示CAS函數(shù)失敗,那么就要循環(huán)重新判斷
// 但是c還代表線程狀態(tài)宴树,如果線程狀態(tài)改變策菜,那么就必須跳轉(zhuǎn)到retry循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作線程是否開始,即調(diào)用了線程的start方法
boolean workerStarted = false;
// 工作線程是否添加到工作線程隊列workers中
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個Worker對象
w = new Worker(firstTask);
// 得到Worker所擁有的線程thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 并發(fā)鎖
mainLock.lock();
try {
// 獲取線程池運行狀態(tài)rs
int rs = runStateOf(ctl.get());
// 當線程池是運行狀態(tài),或者是SHUTDOWN狀態(tài)但firstTask為null又憨,
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程t已經(jīng)被開啟翠霍,就拋出異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將w添加到工作線程集合workers中
workers.add(w);
// 獲取工作線程集合workers的個數(shù)
int s = workers.size();
// 記錄線程池歷史最大的工作線程個數(shù)
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果已經(jīng)添加到工作線程隊列中,那么開啟線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果開啟工作線程失敗蠢莺,那么這個任務也就沒有執(zhí)行
// 因此移除這個任務w(如果隊列中有)寒匙,減少工作線程數(shù)量,因為這個數(shù)量在之前已經(jīng)增加了
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
添加一個新的工作線程躏将,就涉及到兩個成員變量的改變锄弱,一個是工作線程數(shù)量ctl,一個是工作線程集合workers祸憋。而ctl的類型是AtomicInteger会宪,所以它可以使用樂觀鎖解決并發(fā)問題,workers就只能使用mainLock互斥鎖來保證并發(fā)安全問題蚯窥。
4.1 更改工作線程數(shù)量ctl
因為ctl儲存了兩個值掸鹅,工作線程數(shù)量和線程池狀態(tài)。所以使用了兩個for循環(huán)來監(jiān)控多線程對這兩個值的更改拦赠。
用線程池狀態(tài)來判斷是否允許添加新的工作線程:
// 是對addWorker中線程狀態(tài)if判斷的拆分
// 當線程池不是處于運行狀態(tài)
if (rs >= SHUTDOWN) {
/**
* 線程池狀態(tài)不是SHUTDOWN巍沙,或者firstTask不為null,或者任務隊列為空,
* 都直接返回false荷鼠,表示開啟新工作線程失敗句携。
* 只有當線程池狀態(tài)是SHUTDOWN,firstTask為null颊咬,任務隊列不為空時务甥,
* 需要創(chuàng)建新的工作線程。
* 從execute(Runnable command)方法中分析喳篇,firstTask參數(shù)為空只有一種情況敞临,
* 此時線程池中工作線程數(shù)量是0,而任務隊列不為空麸澜,
* 那么就要開啟一個新工作線程去執(zhí)行任務隊列中的任務涡贱,否則這些任務會被丟失欢峰。
*/
if (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()) {
return false;
}
}
由此可以得出腊凶,只有兩種情形允許添加新的工作線程:
- 線程池處于RUNNING狀態(tài)
- 線程池雖然處于SHUTDOWN狀態(tài)狼渊,但是線程池工作線程個數(shù)是0(即這里的firstTask != null),且任務隊列workQueue不為空,那么就要開啟一個新工作線程去執(zhí)行任務隊列中的任務馁害。
然后使用for循環(huán)和CAS函數(shù)方式窄俏,來給工作線程數(shù)量加一。注意此時工作線程還沒有創(chuàng)建碘菜,并添加到線程集合workers中凹蜈,所以如果線程添加失敗限寞,那么還要將工作線程數(shù)量減一。
4.2 添加工作線程集合workers
創(chuàng)建一個工作線程Worker仰坦,將它添加到線程集合workers中履植,然后開啟這個工作線程,使用mainLock獨占鎖保證成員變量workers的并發(fā)安全問題悄晃。
五. 內(nèi)部類Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 該Worker所擁有的工作線程 */
final Thread thread;
/** Worker擁有的第一個任務玫霎,初始化的時候賦值 */
Runnable firstTask;
/** 該工作線程Worker完成任務的數(shù)量 */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 將state設置為-1,禁止發(fā)起中斷請求妈橄,
// 直到調(diào)用過runWorker方法庶近,即線程已經(jīng)運行時。
setState(-1);
// 第一個任務
this.firstTask = firstTask;
// 創(chuàng)建一個thread線程對象眷细,它的run方法就是本W(wǎng)orker的run方法
// 這個thread就是Worker真正執(zhí)行任務的工作線程
this.thread = getThreadFactory().newThread(this);
}
/** 復寫的是Runnable中的run方法拦盹,所以當工作線程開啟運行后鹃祖,會調(diào)用這個方法溪椎。 */
public void run() {
runWorker(this);
}
// 當前獨占鎖是否空閑
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 嘗試獲取獨占鎖
protected boolean tryAcquire(int unused) {
// 如果通過CAS函數(shù),可以將state值從0改變成1恬口,那么表示獲取獨占鎖成功校读。
// 否則獨占鎖被別的線程獲取了。
if (compareAndSetState(0, 1)) {
// 設置擁有獨占鎖的線程是當前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 釋放獨占鎖
protected boolean tryRelease(int unused) {
// 設置擁有獨占鎖的線程為null
setExclusiveOwnerThread(null);
// 設置獲取獨占鎖的次數(shù)是0祖能,表示鎖是空閑狀態(tài)
setState(0);
return true;
}
// 獲取獨占鎖歉秫,如果鎖被別的獲取,就一直等待养铸。
public void lock() { acquire(1); }
// 嘗試獲取獨占鎖雁芙,如果鎖被別的獲取,就直接返回false钞螟,表示獲取失敗兔甘。
public boolean tryLock() { return tryAcquire(1); }
// 釋放獨占鎖
public void unlock() { release(1); }
// 當前獨占鎖是否空閑
public boolean isLocked() { return isHeldExclusively(); }
// 如果Worker的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求鳞滨。
void interruptIfStarted() {
Thread t;
// getState() >= 0表示thread已經(jīng)開啟
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker實現(xiàn)了Runnable接口洞焙,那么就可以通過Worker對象創(chuàng)建一個新線程thread,這個thread就是Worker的工作線程拯啦,而任務都在run方法中執(zhí)行澡匪。
Worker還繼承自AbstractQueuedSynchronizer類。我們知道可以通過AQS類實現(xiàn)獨占鎖和共享鎖褒链,而Worker中實現(xiàn)了tryAcquire和tryRelease方法唁情,說明Worker對象也是個獨占鎖對象。我們可以考慮一下Worker這個獨占鎖的作用是什么甫匹?在后面會介紹到甸鸟。
六. 工作線程運行任務runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 將w的state狀態(tài)設置成0夯巷,這樣就允許對w的thread線程進行中斷請求了。
w.unlock();
// completedAbruptly表示線程突然終結(jié)
boolean completedAbruptly = true;
try {
// 通過getTask從任務隊列中獲取任務task執(zhí)行哀墓,這個方法是個阻塞方法趁餐。
while (task != null || (task = getTask()) != null) {
// 獲取w獨占鎖,保證當本工作線程運行任務時篮绰,
// 不能對該線程進行中斷請求后雷。
w.lock();
/**
* 如果線程池大于STOP狀態(tài),且Worker工作線程中斷標志位是false吠各,
* 那么就調(diào)用wt的interrupt方法發(fā)起中斷請求臀突。
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// Worker工作線程發(fā)起中斷請求
wt.interrupt();
try {
// 鉤子方法,提供給子類贾漏。在執(zhí)行任務之前調(diào)用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 調(diào)用run方法候学,執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子方法,提供給子類纵散。在執(zhí)行任務完成后調(diào)用
afterExecute(task, thrown);
}
} finally {
// 將task設置為null梳码,進行下一次循環(huán)
task = null;
// 將work完成的任務數(shù)completedTasks加一
w.completedTasks++;
// 釋放w獨占鎖
w.unlock();
}
}
// completedAbruptly = false表示線程正常完成終結(jié)
completedAbruptly = false;
} finally {
// 進行一個工作線程完結(jié)后的后續(xù)操作
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法是在每個工作線程的run方法中調(diào)用,通過getTask()方法從任務隊列中獲取任務task執(zhí)行伍掀,這個方法可以阻塞當前工作線程掰茶,如果getTask()方法返回null,那么工作線程就會運行結(jié)束蜜笤,釋放線程濒蒋。
雖然runWorker方法運行在每個工作線程中,但是對于一個Worker來說把兔,只會有它的工作線程能夠運行runWorker方法沪伙,而且改變的也是這個Worker的成員變量,且這些成員變量也只能在runWorker方法改變县好,那么它沒有多線程并發(fā)問題啊围橡,那么為什么在這里加鎖呢?
這是因為Worker中有一個變量是可以被其他線程改變的聘惦,就是它的工作線程thread的中斷請求某饰,所以Worker獨占鎖的作用就是控制別的線程對它的工作線程thread中斷請求的。
最后調(diào)用processWorkerExit方法善绎,進行一個工作線程完結(jié)后的后續(xù)操作黔漂。
七. 獲取任務getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)rs
int rs = runStateOf(c);
// 如果有需要檢查任務隊列workQueue是否為空
// 即rs >= STOP或者rs == SHUTDOWN且workQueue為空,那么返回null禀酱,停止工作線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 將工作線程數(shù)量減一
decrementWorkerCount();
return null;
}
// 獲取工作線程數(shù)量wc
int wc = workerCountOf(c);
/**
* 如果allowCoreThreadTimeOut為true或者wc > corePoolSize時炬守,
* 就要減少工作線程數(shù)量了。
* 當工作線程在keepAliveTime時間內(nèi)剂跟,沒有獲取到可執(zhí)行的任務减途,
* 那么該工作線程就要被銷毀酣藻。
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 工作線程數(shù)量減一,返回null鳍置,銷毀工作線程辽剧。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 從任務隊列workQueue中獲取了任務r,會阻塞當前線程税产。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果r不為null怕轿,返回這個任務r
if (r != null)
return r;
// r是null,表示獲取任務超時
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
從阻塞任務隊列workQueue中獲取任務返回辟拷,因為是阻塞任務隊列撞羽,所以可以阻塞當前線程。如果返回null衫冻,那么會完結(jié)調(diào)用getTask方法的那個工作線程诀紊。那么getTask方法在什么情況下返回null呢?
- 線程池的狀態(tài)大于等于STOP隅俘,或者線程狀態(tài)是SHUTDOWN且當前任務隊列為空邻奠,那么返回null,停止工作線程考赛。
- 獲取任務時間超時惕澎,那么也會返回null莉测,停止工作線程颜骤。因為線程池一般只維護一定數(shù)量的工作線程,如果超過這個數(shù)量捣卤,那么超過數(shù)量的工作線程忍抽,在空閑一定時間后,應該被釋放董朝。
八. 終止線程池的方法
8.1 shutdown和shutdownNow方法
/**
* 終止線程池鸠项。不能在添加新任務了,但是已經(jīng)添加到任務隊列的任務還是會執(zhí)行子姜。
* 且對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查是否擁有Shutdown的權(quán)限
checkShutdownAccess();
// 將線程池狀態(tài)變成SHUTDOWN狀態(tài)
advanceRunState(SHUTDOWN);
// 對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求
interruptIdleWorkers();
// 鉤子方法祟绊,提供給子類實現(xiàn)。表示線程池已經(jīng)shutdown了
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試去終結(jié)線程池
tryTerminate();
}
/**
* 終止線程池哥捕。不能在添加新任務了牧抽,也不會執(zhí)行已經(jīng)添加到任務隊列的任務,只是將這些任務返回遥赚。
* 且對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 檢查是否擁有Shutdown的權(quán)限
checkShutdownAccess();
// 將線程池狀態(tài)變成STOP狀態(tài)
advanceRunState(STOP);
// 對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務
interruptWorkers();
// 返回阻塞隊列workQueue中未執(zhí)行任務的集合
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試去終結(jié)線程池
tryTerminate();
return tasks;
}
shutdown和shutdownNow區(qū)別:
- shutdown方法將線程池設置成SHUTDOWN狀態(tài)扬舒,shutdownNow將線程池設置成STOP狀態(tài)。
- shutdown方法調(diào)用之后不能在添加新任務了凫佛,但是已經(jīng)添加到任務隊列的任務還是會執(zhí)行讲坎。shutdownNow方法調(diào)用之后不能在添加新任務了孕惜,也不會執(zhí)行已經(jīng)添加到任務隊列的任務,只是將這些任務返回晨炕。
- shutdown方法會對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求衫画,shutdownNow方法會對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務。
8.2 advanceRunState方法
private void advanceRunState(int targetState) {
// 采用樂觀鎖的方法瓮栗,來并發(fā)更改線程池狀態(tài)碧磅。
for (;;) {
int c = ctl.get();
// 如果runStateAtLeast方法返回true,表示當前線程池狀態(tài)已經(jīng)是目標狀態(tài)targetState
// 采用CAS函數(shù)嘗試更改線程池狀態(tài)遵馆,如果失敗就循環(huán)繼續(xù)鲸郊。
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
這個方法來改變線程池狀態(tài),使用樂觀鎖的方式保證并發(fā)安全货邓。
8.3 中斷空閑狀態(tài)下的工作線程
/**
* 對所有不是正在執(zhí)行任務的工作線程都發(fā)起中斷請求秆撮。
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍歷工作線程Worker集合
for (Worker w : workers) {
Thread t = w.thread;
// 如果工作線程中斷標志位是false,
// 且能夠獲取鎖换况,即當前工作線程沒有運行任務
if (!t.isInterrupted() && w.tryLock()) {
try {
// 發(fā)起中斷請求职辨。
// 因為獲取了鎖,所以在進入中斷請求時戈二,worker工作線程不會執(zhí)行任務
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// 釋放鎖
w.unlock();
}
}
// 是否只進行一個工作線程的中斷請求舒裤。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
遍歷工作線程Worker集合,如果工作線程出于空閑狀態(tài)觉吭,且沒有被中斷腾供,那么就發(fā)起中斷請求。通過獨占鎖Worker知道鲜滩,當前工作線程是否在執(zhí)行任務伴鳖。
8.4 對所有已開啟的工作線程發(fā)起中斷請求
/**
* 對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 如果w的工作線程thread已經(jīng)開啟,那么發(fā)起中斷請求徙硅。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
遍歷工作線程Worker集合榜聂,調(diào)用Worker的interruptIfStarted方法,如果工作線程已開啟嗓蘑,那么就會發(fā)起中斷须肆。
8.5 嘗試完結(jié)線程池的方法
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 如果線程池是RUNNING狀態(tài),
* 或者線程池是TIDYING狀態(tài)(是因為已經(jīng)有別的線程在終止線程池了)
* 或者線程池是SHUTDOWN狀態(tài)且任務隊列不為空桩皿,
* 線程池不能被terminate終止豌汇,直接return返回
*
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 線程池中工作線程數(shù)量不是0,線程池不能被terminate終止业簿,所以要return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 鉤子方法瘤礁,提供給子類實現(xiàn)。表示線程池已經(jīng)終止梅尤。
terminated();
} finally {
// 設置線程池狀態(tài)是TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
線程池在什么情況下算是完全停止了呢柜思?有三個條件:
- 線程池不是RUNNING狀態(tài)岩调。
- 線程池中工作線程數(shù)量是0。
- 線程池中任務隊列為空赡盘。
所以在看看tryTerminate()中号枕,前面兩個if判斷條件,就可以理解了陨享。
8.6 等待線程池完結(jié)的方法
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果是TERMINATED已終止狀態(tài)葱淳,那么就返回true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 如果已經(jīng)超時就返回false
if (nanos <= 0)
return false;
// 讓當前線程等待。并設置超時時間nanos
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
如果線程池不是TERMINATED狀態(tài)抛姑,就讓當前線程在termination條件上等待赞厕,直到線程池變成TERMINATED狀態(tài),或者等待時間超時才會被喚醒定硝。
8.7 工作線程退出的方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果工作線程突然被終結(jié)皿桑,那么工作線程的數(shù)量就沒有減一。
if (completedAbruptly)
// 將工作線程數(shù)量減一蔬啡。
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將工作線程的任務完成數(shù)添加到線程池完成任務總數(shù)中
completedTaskCount += w.completedTasks;
// 從工作線程集合中移除本工作線程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 因為有一個工作線程已經(jīng)完成被釋放诲侮,那么就去嘗試終結(jié)線程池。
tryTerminate();
int c = ctl.get();
// 如果線程池狀態(tài)小于STOP,
// 就要判斷終結(jié)了這個工作線程之后箱蟆,線程池中工作線程數(shù)量是否滿足需求沟绪。
if (runStateLessThan(c, STOP)) {
// 如果工作線程正常終結(jié),
// 那么要看線程池中工作線程數(shù)量是否滿足需求空猜。
if (!completedAbruptly) {
// 不允許核心池線程釋放绽慈,那么最小值是corePoolSize,否則就可以為0
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 但是如果任務隊列中還有任務抄肖,那么工作線程數(shù)量最少為1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果工作線程數(shù)量小于min值久信,就要創(chuàng)建新的工作線程了。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 開啟一個新的工作線程
addWorker(null, false);
}
}
工作線程被釋放漓摩,有兩種情況,一種是運行完成正常結(jié)束入客,一種是發(fā)生異常意外終止管毙。
當工作線程被釋放時,需要將它從工作線程集合workers中移除桌硫,將該工作線程任務完成數(shù)添加到線程池完成任務總數(shù)中夭咬。調(diào)用tryTerminate方法嘗試終結(jié)線程池。
另外因為有一個工作線程被釋放铆隘,那么就要考慮線程池中當前工作線程數(shù)量是否符合要求卓舵,要不要添加新的工作線程。
九. 創(chuàng)建線程池的方法膀钠。
上面分析完線程池的功能方法后掏湾,再來說說怎樣創(chuàng)建一個線程池裹虫。
9.1 構(gòu)造函數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 判斷設置的核心池數(shù)量corePoolSize、最大池數(shù)量maximumPoolSize融击、
// 與線程空閑存活時間keepAliveTime的值筑公,是否符合要求
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 賦值
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor類一共有四個構(gòu)造函數(shù),前面三個構(gòu)造函數(shù)都是調(diào)用后面那個構(gòu)造函數(shù)來實現(xiàn)的尊浪。參數(shù)意義:
- corePoolSize: 線程池核心池線程個數(shù)匣屡。
- maximumPoolSize: 線程池允許最大的線程個數(shù)。
- keepAliveTime: 線程空閑時拇涤,允許存活的時間捣作。
- unit:輔助變量,用來將keepAliveTime參數(shù)鹅士,轉(zhuǎn)成對應納秒值虾宇。
- workQueue:儲存所有待執(zhí)行任務的阻塞隊列
- threadFactory:用來創(chuàng)建線程的工廠類
- handler:通過它來通知調(diào)用值,線程池拒絕了任務如绸。
注:有沒有注意到嘱朽,沒有傳遞allowCoreThreadTimeOut這個參數(shù),那么怎么設置這個成語變量呢怔接?通過allowCoreThreadTimeOut(boolean value)方法來設置搪泳。
一般我們不用自己來new ThreadPoolExecutor對象,而是通過Executors這個工具類來創(chuàng)建ThreadPoolExecutor實例扼脐。
9.2 創(chuàng)建固定數(shù)量的線程池
// 創(chuàng)建固定數(shù)量的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 創(chuàng)建固定數(shù)量的線程池
public static ExecutorService newFixedThreadPool(int nThreads,
ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
根據(jù)我們前面講解岸军,要想線程池維持固定數(shù)量的工作線程,那么工作線程就不能被釋放瓦侮,就要做到兩點:
- allowCoreThreadTimeOut為false艰赞,這個是默認的。keepAliveTime設置為0肚吏,這樣當調(diào)用allowCoreThreadTimeOut(boolean value)方法修改allowCoreThreadTimeOut值時方妖,會拋出異常,不允許修改罚攀。
- 核心池數(shù)量和最大池數(shù)量一樣党觅,防止添加新的工作線程池。任務隊列容量要足夠大斋泄,防止任務添加到任務隊列中失敗杯瞻,不能執(zhí)行。
9.3 創(chuàng)建單個線程的線程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(
ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
與固定數(shù)量的線程池相比:
- 將固定數(shù)量nThreads變成了1
- 使用了FinalizableDelegatedExecutorService這個代理類炫掐,主要作用就是當對象被銷毀時魁莉,會調(diào)用shutdown方法,停止線程池。
9.4 創(chuàng)建緩存線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
什么叫做緩存線程池旗唁,當有任務執(zhí)行時畦浓,會創(chuàng)建工作線程來執(zhí)行任務,當任務執(zhí)行完畢后逆皮,工作線程會等待一段時間宅粥,如果還是沒有任務需要執(zhí)行,那么就會釋放工作線程电谣。
十. ThreadPoolExecutor 重要參數(shù)方法
10.1 getActiveCount
正在執(zhí)行任務線程數(shù)
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
// isLocked() 表示這個 Worker 正在執(zhí)行任務
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
這個方法獲取正在執(zhí)行任務的線程數(shù)量秽梅。w.isLocked()
表示正在執(zhí)行任務的 Worker
。
10.2 getCompletedTaskCount
返回完成任務大致數(shù)量
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和剿牺。
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
返回已完成執(zhí)行的任務的大致總數(shù)企垦。由于任務和線程的狀態(tài)可能在計算過程中動態(tài)變化,因此返回值只是一個近似值晒来。
completedTaskCount
表示已完成Worker
的completedTasks
數(shù)量之和钞诡。在Worker
退出方法processWorkerExit()
中進行增加操作。
10.3 getTaskCount
返回已計劃執(zhí)行的任務的大致總數(shù)
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount 表示已完成 Worker 的completedTasks數(shù)量之和湃崩。
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
// w.isLocked() 表示一個任務正在執(zhí)行
if (w.isLocked())
++n;
}
// 再加上待執(zhí)行的任務數(shù)量
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
返回已計劃執(zhí)行的任務的大致總數(shù)荧降。由于任務和線程的狀態(tài)可能在計算過程中動態(tài)變化,因此返回值只是一個近似值攒读。
10.4 getCorePoolSize
方法
public int getCorePoolSize() {
return corePoolSize;
}
返回線程池核心線程數(shù)量朵诫。
10.5 getKeepAliveTime
方法
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
返回線程保持活動時間。一是當線程超過核心線程數(shù)時薄扁,超過的線程超過 keepAliveTime
時間沒有執(zhí)行任務剪返,就會關閉;二是 allowCoreThreadTimeOut
值為 true
邓梅,那么核心線程空閑事件超過 keepAliveTime
也會關閉脱盲。
10.6 getLargestPoolSize
方法
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
獲取曾經(jīng)出現(xiàn)的最大線程數(shù)。
10.7 getMaximumPoolSize
方法
public int getMaximumPoolSize() {
return maximumPoolSize;
}
獲取最大線程數(shù)日缨。
10.8 getPoolSize
方法
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
返回池中的當前線程數(shù)钱反。
10.9 getQueue
方法
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
返回待執(zhí)行任務隊列。
十一 總結(jié)
線程池有兩個概念核心池與最大池殿遂。
- 核心池:線程池應該維持的工作線程數(shù)量诈铛,如果線程池中工作線程數(shù)量小于核心池數(shù)量,就會創(chuàng)建新的工作線程添加到線程池中墨礁。
- 最大池: 線程池中臨時存在的工作線程,當任務隊列不能添加新任務時耳峦,就會創(chuàng)建新的工作線程添加到線程池中恩静。執(zhí)行完任務后,超過一定時間沒有接受到新任務,這個臨時工作線程就會被釋放驶乾。
兩者的區(qū)別:
- 線程釋放:最大池中的線程當超過一定時間沒有接受到新任務邑飒,就會被釋放,而核心池中的線程级乐,一般不釋放疙咸,只有設置allowCoreThreadTimeOut為true,且超過一定時間沒有接受到新任務风科,也會被釋放撒轮。
- 創(chuàng)建時機:線程池中工作線程數(shù)量小于核心池數(shù)量,就會創(chuàng)建核心池線程贼穆。但是對于最大池來說题山,只有任務隊列已滿,不能添加新任務時故痊,才會創(chuàng)建新線程顶瞳,放入最大池中。
注:一般稱小于等于corePoolSize數(shù)量的工作線程池是核心池中的線程愕秫,大于corePoolSize數(shù)量的工作線程池就是最大池中的線程慨菱。
11.1 線程池執(zhí)行任務流程
通過execute方法執(zhí)行新任務command,分為三個步驟:
- 線程池中工作線程數(shù)量小于核心池數(shù)量戴甩,那么就開啟新的工作線程來執(zhí)行任務符喝。
- 線程池中工作線程數(shù)量達到核心池數(shù)量,那么就將新任務添加到任務隊列中等恐。
- 如果新任務添加到任務隊列失敗洲劣,那么就開啟新的工作線程來執(zhí)行任務(這個線程就在最大池中了)。
在每個工作線程课蔬,會通過循環(huán)囱稽,調(diào)用getTask方法,不斷地從任務隊列中獲取任務來執(zhí)行二跋。如果任務隊列中沒有任務战惊,那么getTask方法會阻塞當前工作線程。
但是工作線程被喚醒后扎即,getTask方法返回null吞获,那么就會跳出循環(huán),該工作線程運行結(jié)束谚鄙,準備釋放各拷。
11.2 終止線程池
線程池不可能立即就終止,因為涉及到線程池正在執(zhí)行任務的線程和任務隊列中等待執(zhí)行的任務該如何處理問題闷营,有兩個方式:
- shutdown方法:不能再向線程池中添加新任務了烤黍,但是已經(jīng)添加到任務隊列的任務還是會執(zhí)行知市,也不會對正在執(zhí)行任務的線程發(fā)起中斷請求。等待任務隊列任務執(zhí)行完成速蕊,釋放線程池中所有線程嫂丙,線程池進入完全終止狀態(tài)。
- shutdownNow方法:不能再向線程池中添加新任務了规哲,也不會執(zhí)行已經(jīng)添加到任務隊列的任務跟啤,但是會返回未執(zhí)行的任務集合。而且對所有工作線程都發(fā)起中斷請求, 不管這個工作線程是否正在執(zhí)行任務唉锌。等待線程池中所有線程釋放隅肥,線程池進入完全終止狀態(tài)。
兩者的區(qū)別:
兩者都不能再向線程池中添加新任務了糊秆。shutdown方法還是會將已添加的任務都執(zhí)行完畢武福,而shutdownNow方法不會再執(zhí)行任何新任務了。
注:對于正在執(zhí)行的任務是可能執(zhí)行完成的痘番,因為中斷請求只能中斷處于WAITING與TIMED_WAITING狀態(tài)的線程捉片,對于處于其他狀態(tài)的線程不起作用。
十二. 重要示例
12.1 正常運行線程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"開始運行 任務"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定數(shù)量的線程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 單個線程的線程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 緩存線程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
}
}
運行結(jié)果:
--線程1開始運行 任務1
--線程2開始運行 任務2
--線程3開始運行 任務3
=======線程1結(jié)束 任務1
--線程1開始運行 任務4
=======線程2結(jié)束 任務2
--線程2開始運行 任務5
=======線程3結(jié)束 任務3
--線程3開始運行 任務6
=======線程1結(jié)束 任務4
=======線程2結(jié)束 任務5
=======線程3結(jié)束 任務6
這里使用的是固定數(shù)量的線程池汞舱,所以只有三個線程來執(zhí)行任務伍纫,未執(zhí)行到的任務只能等待。
如果換成單個線程的線程池昂芜,那么只有一個線程在執(zhí)行任務莹规。
而緩存線程池呢?你就會發(fā)現(xiàn)居然有六個線程在執(zhí)行任務泌神,就是有多少任務創(chuàng)建多少個線程良漱。
運行完任務后,你會發(fā)現(xiàn)程序沒有結(jié)束欢际,那是因為線程池沒有被終止母市。
12.2 終止線程池
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"開始運行 任務"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定數(shù)量的線程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 單個線程的線程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 緩存線程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
// 還是會執(zhí)行完已經(jīng)添加的任務
service.shutdown();
}
}
運行結(jié)果:
--線程1開始運行 任務1
--線程3開始運行 任務3
--線程2開始運行 任務2
=======線程1結(jié)束 任務1
--線程1開始運行 任務4
=======線程2結(jié)束 任務2
--線程2開始運行 任務5
=======線程3結(jié)束 任務3
--線程3開始運行 任務6
=======線程1結(jié)束 任務4
=======線程2結(jié)束 任務5
=======線程3結(jié)束 任務6
Process finished with exit code 0
使用shutdown方法艇棕,還是會執(zhí)行完已經(jīng)添加的任務此改。最后程序退出。
package com.zhang._22._5;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class Run implements Runnable {
private int index;
public Run(int index) {
this.index = index+1;
}
@Override
public void run() {
System.out.println("--"+Thread.currentThread().getName()+"開始運行 任務"+index);
try {
int waitTime = 100 + index * 10;
Thread.sleep(waitTime);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+" 發(fā)生中斷異常 exception=="+e.getMessage());
}
System.out.println("======="+Thread.currentThread().getName()+"結(jié)束 任務"+index);
}
}
class MyThreadFactory implements ThreadFactory {
private int sequenceNumber = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "線程"+(++sequenceNumber));
}
}
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory threadFactory = new MyThreadFactory();
// 固定數(shù)量的線程池
ExecutorService service = Executors.newFixedThreadPool(3, threadFactory);
// // 單個線程的線程池
// ExecutorService service = Executors.newSingleThreadExecutor(threadFactory);
//
// // 緩存線程池
// ExecutorService service = Executors.newCachedThreadPool(threadFactory);
for (int i = 0; i < 6; i++) {
service.execute(new Run(i));
}
service.shutdownNow();
}
}
運行結(jié)果:
--線程1開始運行 任務1
--線程2開始運行 任務2
--線程3開始運行 任務3
線程2 發(fā)生中斷異常 exception==sleep interrupted
線程1 發(fā)生中斷異常 exception==sleep interrupted
=======線程1結(jié)束 任務1
=======線程2結(jié)束 任務2
線程3 發(fā)生中斷異常 exception==sleep interrupted
=======線程3結(jié)束 任務3
Process finished with exit code 0
使用shutdownNow方法示启,在任務隊列中等待的任務是不會執(zhí)行的浑槽,而且立即發(fā)起線程中斷請求蒋失。