它們都是某種線程池,可以控制線程創(chuàng)建,釋放,并通過某種策略嘗試復(fù)用線程去執(zhí)行任務(wù)的一個管理框架
在Java8中,按照線程池的創(chuàng)建方法來看
有五種線程池,創(chuàng)建方法如下
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
進(jìn)一步查看源碼發(fā)現(xiàn),這些方法最終都調(diào)用了ThreadPoolExecutor和ScheduledExecutorService的構(gòu)造函數(shù)
而ScheduledExecutorService繼承自ThreadPoolExecutor,因此最終所有線程池的構(gòu)造函數(shù)都調(diào)用了Java5后推出的ThreadPoolExecutor的如下構(gòu)造函數(shù)
Java默認(rèn)提供的線程池
Java中的線程池是運用場景最多的并發(fā)框架,幾乎所有需要異步或并發(fā)執(zhí)行任務(wù)的程序都可以使用線程池
在開發(fā)中,合理地使用線程池能夠帶來3個好處
- 降低資源消耗 通過重復(fù)利用已創(chuàng)建的線程,降低創(chuàng)建和銷毀線程造成的系統(tǒng)資源消耗
- 提高響應(yīng)速度 當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行
- 提高線程的可管理性 線程是稀缺資源,如果過多地創(chuàng)建,不僅會消耗系統(tǒng)資源蛾茉,還會降低系統(tǒng)的穩(wěn)定性臭埋,導(dǎo)致使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控蹂随。
我們只需要將待執(zhí)行的方法放入 run 方法中锹漱,將 Runnable 接口的實現(xiàn)類交給線程池的
execute 方法佩谷,作為他的一個參數(shù),比如:
Executor e=Executors.newSingleThreadExecutor();
e.execute(new Runnable(){ //匿名內(nèi)部類 public void run(){
//需要執(zhí)行的任務(wù)
}
});
1 線程池的實現(xiàn)原理
當(dāng)向線程池提交一個任務(wù)之后,線程池是如何處理這個任務(wù)的呢?
ThreadPoolExecutor執(zhí)行execute()分4種情況
- 若當(dāng)前運行的線程少于
corePoolSize
,則創(chuàng)建新線程來執(zhí)行任務(wù)(執(zhí)行這一步需要獲取全局鎖) - 若運行的線程多于或等于
corePoolSize
,則將任務(wù)加入BlockingQueue
- 若無法將任務(wù)加入
BlockingQueue
,則創(chuàng)建新的線程來處理任務(wù)(執(zhí)行這一步需要獲取全局鎖) - 若創(chuàng)建新線程將使當(dāng)前運行的線程超出
maximumPoolSize
,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()
采取上述思路,是為了在執(zhí)行execute()
時,盡可能避免獲取全局鎖
在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖
源碼分析
上面的流程分析讓我們很直觀地了解了線程池的工作原理,讓我們再通過源碼來看看是如何實現(xiàn)的,線程池執(zhí)行任務(wù)的方法如下
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1\. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2\. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3\. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();
// 如果線程數(shù)小于基本線程數(shù)婿牍,則創(chuàng)建線程并執(zhí)行當(dāng)前任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如線程數(shù)大于等于基本線程數(shù)或線程創(chuàng)建失敗侈贷,則將當(dāng)前任務(wù)放到工作隊列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
// 拋出RejectedExecutionException異常
reject(command);
}
/** * 檢查是否可以根據(jù)當(dāng)前池狀態(tài)和給定的邊界(核心或最大) * 添加新工作線程牍汹。如果是這樣,工作線程數(shù)量會相應(yīng)調(diào)整铐维,如果可能的話,一個新的工作線程創(chuàng)建并啟動 * 將firstTask作為其運行的第一項任務(wù)。 * 如果池已停止此方法返回false * 如果線程工廠在被訪問時未能創(chuàng)建線程,也返回false * 如果線程創(chuàng)建失敗慎菲,或者是由于線程工廠返回null嫁蛇,或者由于異常(通常是在調(diào)用Thread.start()后的OOM)),我們干凈地回滾露该。 * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** * Check if queue empty only if necessary. * * 如果線程池已關(guān)閉睬棚,并滿足以下條件之一,那么不創(chuàng)建新的 worker: * 1\. 線程池狀態(tài)大于 SHUTDOWN解幼,也就是 STOP, TIDYING, 或 TERMINATED * 2\. firstTask != null * 3\. workQueue.isEmpty() * 簡單分析下: * 狀態(tài)控制的問題抑党,當(dāng)線程池處于 SHUTDOWN ,不允許提交任務(wù)撵摆,但是已有任務(wù)繼續(xù)執(zhí)行 * 當(dāng)狀態(tài)大于 SHUTDOWN 底靠,不允許提交任務(wù),且中斷正在執(zhí)行任務(wù) * 多說一句:若線程池處于 SHUTDOWN特铝,但 firstTask 為 null暑中,且 workQueue 非空,是允許創(chuàng)建 worker 的 * */
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果成功鲫剿,那么就是所有創(chuàng)建線程前的條件校驗都滿足了鳄逾,準(zhǔn)備創(chuàng)建線程執(zhí)行任務(wù)
// 這里失敗的話,說明有其他線程也在嘗試往線程池中創(chuàng)建線程
if (compareAndIncrementWorkerCount(c))
break retry;
// 由于有并發(fā)灵莲,重新再讀取一下 ctl
c = ctl.get(); // Re-read ctl
// 正常如果是 CAS 失敗的話雕凹,進(jìn)到下一個里層的for循環(huán)就可以了
// 可如果是因為其他線程的操作,導(dǎo)致線程池的狀態(tài)發(fā)生了變更,如有其他線程關(guān)閉了這個線程池
// 那么需要回到外層的for循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/* * * 到這里枚抵,我們認(rèn)為在當(dāng)前這個時刻线欲,可以開始創(chuàng)建線程來執(zhí)行任務(wù) */
// worker 是否已經(jīng)啟動
boolean workerStarted = false;
// 是否已將這個 worker 添加到 workers 這個 HashSet 中
boolean workerAdded = false;
Worker w = null;
try {
// 把 firstTask 傳給 worker 的構(gòu)造方法
w = new Worker(firstTask);
// 取 worker 中的線程對象,Worker的構(gòu)造方法會調(diào)用 ThreadFactory 來創(chuàng)建一個新的線程
final Thread t = w.thread;
if (t != null) {
//先加鎖
final ReentrantLock mainLock = this.mainLock;
// 這個是整個類的全局鎖汽摹,持有這個鎖才能讓下面的操作“順理成章”询筏,
// 因為關(guān)閉一個線程池需要這個鎖,至少我持有鎖的期間竖慧,線程池不會被關(guān)閉
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 小于 SHUTTDOWN 即 RUNNING
// 如果等于 SHUTDOWN,不接受新的任務(wù)逆屡,但是會繼續(xù)執(zhí)行等待隊列中的任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker 里面的 thread 不能是已啟動的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加到 workers 這個 HashSet 中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 若添加成功
if (workerAdded) {
// 啟動線程
t.start();
workerStarted = true;
}
}
} finally {
// 若線程沒有啟動圾旨,做一些清理工作,若前面 workCount 加了 1魏蔗,將其減掉
if (! workerStarted)
addWorkerFailed(w);
}
// 返回線程是否啟動成功
return workerStarted;
}
看下 addWorkFailed
worker
中的線程 start
后砍的,其 run
方法會調(diào)用 runWorker
繼續(xù)往下看 runWorker
// worker 線程啟動后調(diào)用,while 循環(huán)(即自旋!)不斷從等待隊列獲取任務(wù)并執(zhí)行
// worker 初始化時,可指定 firstTask莺治,那么第一個任務(wù)也就可以不需要從隊列中獲取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 該線程的第一個任務(wù)(若有)
Runnable task = w.firstTask;
w.firstTask = null;
// 允許中斷
w.unlock();
boolean completedAbruptly = true;
try {
// 循環(huán)調(diào)用 getTask 獲取任務(wù)
while (task != null || (task = getTask()) != null) {
w.lock();
// 若線程池狀態(tài)大于等于 STOP廓鞠,那么意味著該線程也要中斷
/** * 若線程池STOP,請確保線程 已被中斷 * 如果沒有谣旁,請確保線程未被中斷 * 這需要在第二種情況下進(jìn)行重新檢查床佳,以便在關(guān)中斷時處理shutdownNow競爭 */
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 這是一個鉤子方法,留給需要的子類實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到這里終于可以執(zhí)行任務(wù)了
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 這里不允許拋出 Throwable榄审,所以轉(zhuǎn)換為 Error
thrown = x; throw new Error(x);
} finally {
// 也是一個鉤子方法砌们,將 task 和異常作為參數(shù),留給需要的子類實現(xiàn)
afterExecute(task, thrown);
}
} finally {
// 置空 task搁进,準(zhǔn)備 getTask 下一個任務(wù)
task = null;
// 累加完成的任務(wù)數(shù)
w.completedTasks++;
// 釋放掉 worker 的獨占鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 到這里浪感,需要執(zhí)行線程關(guān)閉
// 1\. 說明 getTask 返回 null,也就是說饼问,這個 worker 的使命結(jié)束了影兽,執(zhí)行關(guān)閉
// 2\. 任務(wù)執(zhí)行過程中發(fā)生了異常
// 第一種情況,已經(jīng)在代碼處理了將 workCount 減 1莱革,這個在 getTask 方法分析中說
// 第二種情況峻堰,workCount 沒有進(jìn)行處理,所以需要在 processWorkerExit 中處理
processWorkerExit(w, completedAbruptly);
}
}
看看 getTask()
// 此方法有三種可能
// 1\. 阻塞直到獲取到任務(wù)返回驮吱。默認(rèn) corePoolSize 之內(nèi)的線程是不會被回收的茧妒,它們會一直等待任務(wù)
// 2\. 超時退出。keepAliveTime 起作用的時候左冬,也就是如果這么多時間內(nèi)都沒有任務(wù)桐筏,那么應(yīng)該執(zhí)行關(guān)閉
// 3\. 如果發(fā)生了以下條件,須返回 null
// 池中有大于 maximumPoolSize 個 workers 存在(通過調(diào)用 setMaximumPoolSize 進(jìn)行設(shè)置)
// 線程池處于 SHUTDOWN拇砰,而且 workQueue 是空的梅忌,前面說了狰腌,這種不再接受新的任務(wù)
// 線程池處于 STOP,不僅不接受新的線程牧氮,連 workQueue 中的線程也不再執(zhí)行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 允許核心線程數(shù)內(nèi)的線程回收琼腔,或當(dāng)前線程數(shù)超過了核心線程數(shù),那么有可能發(fā)生超時關(guān)閉
// 這里 break踱葛,是為了不往下執(zhí)行后一個 if (compareAndDecrementWorkerCount(c))
// 兩個 if 一起看:如果當(dāng)前線程數(shù) wc > maximumPoolSize丹莲,或者超時,都返回 null
// 那這里的問題來了尸诽,wc > maximumPoolSize 的情況甥材,為什么要返回 null?
// 換句話說性含,返回 null 意味著關(guān)閉線程洲赵。
// 那是因為有可能開發(fā)者調(diào)用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調(diào)小了
// 如果此 worker 發(fā)生了中斷,采取的方案是重試
// 解釋下為什么會發(fā)生中斷商蕴,這個讀者要去看 setMaximumPoolSize 方法叠萍,
// 如果開發(fā)者將 maximumPoolSize 調(diào)小了,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量绪商,
// 那么意味著超出的部分線程要被關(guān)閉苛谷。重新進(jìn)入 for 循環(huán),自然會有部分線程會返回 null
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// CAS 操作部宿,減少工作線程數(shù)
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 如果此 worker 發(fā)生了中斷抄腔,采取的方案是重試
// 解釋下為什么會發(fā)生中斷,這個讀者要去看 setMaximumPoolSize 方法理张,
// 如果開發(fā)者將 maximumPoolSize 調(diào)小了赫蛇,導(dǎo)致其小于當(dāng)前的 workers 數(shù)量,
// 那么意味著超出的部分線程要被關(guān)閉雾叭。重新進(jìn)入 for 循環(huán)悟耘,自然會有部分線程會返回 null
timedOut = false;
}
}
}
到這里,基本上也說完了整個流程织狐,回到 execute(Runnable command) 方法暂幼,看看各個分支幕随,我把代碼貼過來一下:
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1\. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2\. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3\. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
//表示 “線程池狀態(tài)” 和 “線程數(shù)” 的整數(shù)
int c = ctl.get();
// 如果當(dāng)前線程數(shù)少于核心線程數(shù)熬北,直接添加一個 worker 執(zhí)行任務(wù),
// 創(chuàng)建一個新的線程本冲,并把當(dāng)前任務(wù) command 作為這個線程的第一個任務(wù)(firstTask)
if (workerCountOf(c) < corePoolSize) {
// 添加任務(wù)成功厨埋,即結(jié)束
// 執(zhí)行的結(jié)果邪媳,會包裝到 FutureTask
// 返回 false 代表線程池不允許提交任務(wù)
if (addWorker(command, true))
return;
c = ctl.get();
}
// 到這說明,要么當(dāng)前線程數(shù)大于等于核心線程數(shù),要么剛剛 addWorker 失敗
// 如果線程池處于 RUNNING 雨效,把這個任務(wù)添加到任務(wù)隊列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
/* 若任務(wù)進(jìn)入 workQueue迅涮,我們是否需要開啟新的線程 * 線程數(shù)在 [0, corePoolSize) 是無條件開啟新線程的 * 若線程數(shù)已經(jīng)大于等于 corePoolSize,則將任務(wù)添加到隊列中徽龟,然后進(jìn)到這里 */
int recheck = ctl.get();
// 若線程池不處于 RUNNING 叮姑,則移除已經(jīng)入隊的這個任務(wù),并且執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 若線程池還是 RUNNING 据悔,且線程數(shù)為 0传透,則開啟新的線程
// 這塊代碼的真正意圖:擔(dān)心任務(wù)提交到隊列中了,但是線程都關(guān)閉了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 若 workQueue 滿极颓,到該分支
// 以 maximumPoolSize 為界創(chuàng)建新 worker旷祸,
// 若失敗,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize讼昆,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
工作線程:線程池創(chuàng)建線程時,會將線程封裝成工作線程Worker,Worker在執(zhí)行完任務(wù)后,還會循環(huán)獲取工作隊列里的任務(wù)來執(zhí)行.我們可以從Worker類的run()方法里看到這點
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//先加鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
線程池中的線程執(zhí)行任務(wù)分兩種情況
- 在execute()方法中創(chuàng)建一個線程時,會讓這個線程執(zhí)行當(dāng)前任務(wù)
- 這個線程執(zhí)行完上圖中 1 的任務(wù)后,會反復(fù)從BlockingQueue獲取任務(wù)來執(zhí)行
2 線程池的使用
2.1 線程池的創(chuàng)建
我們可以通過ThreadPoolExecutor來創(chuàng)建一個線程池
創(chuàng)建一個線程池時需要的參數(shù)
corePoolSize(核心線程數(shù)量)
線程池中應(yīng)該保持的主要線程的數(shù)量.即使線程處于空閑狀態(tài),除非設(shè)置了allowCoreThreadTimeOut
這個參數(shù),當(dāng)提交一個任務(wù)到線程池時,若線程數(shù)量<corePoolSize,線程池會創(chuàng)建一個新線程放入works(一個HashSet)中執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也還是會創(chuàng)建新線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時就不再創(chuàng)建,會嘗試放入等待隊列workQueue(一個BlockingQueue),如果調(diào)用了線程池的prestartAllCoreThreads()
,線程池會提前創(chuàng)建并啟動所有核心線程workQueue
存儲待執(zhí)行任務(wù)的阻塞隊列骚烧,這些任務(wù)必須是Runnable
的對象(如果是Callable對象浸赫,會在submit內(nèi)部轉(zhuǎn)換為Runnable對象)-
runnableTaskQueue(任務(wù)隊列):用于保存等待執(zhí)行的任務(wù)的阻塞隊列.可以選擇以下幾個阻塞隊列.
- LinkedBlockingQueue:一個基于鏈表結(jié)構(gòu)的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue.靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個隊列
- SynchronousQueue:一個不存儲元素的阻塞隊列.每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個隊列
maximumPoolSize(線程池最大線程數(shù))
線程池允許創(chuàng)建的最大線程數(shù)
若隊列滿,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程放入works中執(zhí)行任務(wù),CashedThreadPool的關(guān)鍵,固定線程數(shù)的線程池?zé)o效
若使用了無界任務(wù)隊列,這個參數(shù)就沒什么效果ThreadFactory:用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個創(chuàng)建出來的線程設(shè)置更有意義的名字.使用開源框架guava提供ThreadFactoryBuilder可以快速給線程池里的線程設(shè)置有意義的名字,代碼如下
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
- RejectedExecutionHandler(飽和策略):當(dāng)隊列和線程池都滿,說明線程池處于飽和,必須采取一種策略處理提交的新任務(wù).策略默認(rèn)AbortPolicy,表無法處理新任務(wù)時拋出異常.在JDK 1.5中Java線程池框架提供了以下4種策略
- AbortPolicy:丟棄任務(wù),拋出 RejectedExecutionException
- CallerRunsPolicy:只用調(diào)用者所在線程來運行任務(wù),有反饋機制赃绊,使任務(wù)提交的速度變慢)既峡。
- DiscardOldestPolicy
若沒有發(fā)生shutdown,嘗試丟棄隊列里最近的一個任務(wù),并執(zhí)行當(dāng)前任務(wù), 丟棄任務(wù)緩存隊列中最老的任務(wù),并且嘗試重新提交新的任務(wù) - DiscardPolicy:不處理,丟棄掉, 拒絕執(zhí)行碧查,不拋異常
當(dāng)然,也可以根據(jù)應(yīng)用場景需要來實現(xiàn)RejectedExecutionHandler接口自定義策略.如記錄日志或持久化存儲不能處理的任務(wù)
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */
final void reject(Runnable command) {
// 執(zhí)行拒絕策略
handler.rejectedExecution(command, this);
}
handler
構(gòu)造線程池時候就傳的參數(shù)运敢,RejectedExecutionHandler
的實例
RejectedExecutionHandler
在 ThreadPoolExecutor
中有四個實現(xiàn)類可供我們直接使用,當(dāng)然忠售,也可以實現(xiàn)自己的策略传惠,一般也沒必要。
//只要線程池沒有被關(guān)閉稻扬,由提交任務(wù)的線程自己來執(zhí)行這個任務(wù)
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
// 不管怎樣卦方,直接拋出 RejectedExecutionException 異常
// 默認(rèn)的策略,如果我們構(gòu)造線程池的時候不傳相應(yīng)的 handler 泰佳,則指定使用這個
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 不做任何處理盼砍,直接忽略掉這個任務(wù)
public static class DiscardPolicy implements RejectedExecutionHandler {
/** * Creates a {@code DiscardPolicy}. */
public DiscardPolicy() { }
/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 若線程池未被關(guān)閉
// 把隊列隊頭的任務(wù)(也就是等待了最長時間的)直接扔掉,然后提交這個任務(wù)到等待隊列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
/** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
keepAliveTime(線程活動保持時間)
線程沒有任務(wù)執(zhí)行時最多保持多久時間終止
線程池的工作線程空閑后逝她,保持存活的時間浇坐。
所以,如果任務(wù)很多黔宛,并且每個任務(wù)執(zhí)行的時間比較短近刘,可以調(diào)大時間,提高線程的利用率TimeUnit(線程活動保持時間的單位):指示第三個參數(shù)的時間單位;可選的單位有天(DAYS)跌宛、小時(HOURS)酗宋、分鐘(MINUTES)、毫秒(MILLISECONDS)疆拘、微秒(MICROSECONDS蜕猫,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)
再來看這五種線程池
單線程池:newSingleThreadExecutor()方法創(chuàng)建哎迄,五個參數(shù)分別是ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())回右。含義是池中保持一個線程,最多也只有一個線程漱挚,也就是說這個線程池是順序執(zhí)行任務(wù)的翔烁,多余的任務(wù)就在隊列中排隊。
-
固定線程池:newFixedThreadPool(nThreads)方法創(chuàng)建
池中保持nThreads個線程旨涝,最多也只有nThreads個線程蹬屹,多余的任務(wù)也在隊列中排隊。
線程數(shù)固定且線程不超時
- 緩存線程池:newCachedThreadPool()創(chuàng)建白华,五個參數(shù)分別是ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())慨默。
含義是池中不保持固定數(shù)量的線程,隨需創(chuàng)建弧腥,最多可以創(chuàng)建Integer.MAX_VALUE個線程(說一句厦取,這個數(shù)量已經(jīng)大大超過目前任何操作系統(tǒng)允許的線程數(shù)了),空閑的線程最多保持60秒管搪,多余的任務(wù)在SynchronousQueue(所有阻塞虾攻、并發(fā)隊列在后續(xù)文章中具體介紹)中等待。
為什么單線程池和固定線程池使用的任務(wù)阻塞隊列是LinkedBlockingQueue()更鲁,而緩存線程池使用的是SynchronousQueue()呢霎箍?
因為單線程池和固定線程池中,線程數(shù)量是有限的澡为,因此提交的任務(wù)需要在LinkedBlockingQueue隊列中等待空余的線程朋沮;而緩存線程池中,線程數(shù)量幾乎無限(上限為Integer.MAX_VALUE)缀壤,因此提交的任務(wù)只需要在SynchronousQueue隊列中同步移交給空余線程即可樊拓。
- 單線程調(diào)度線程池:newSingleThreadScheduledExecutor()創(chuàng)建,五個參數(shù)分別是 (1, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())塘慕。含義是池中保持1個線程筋夏,多余的任務(wù)在DelayedWorkQueue中等待。
- 固定調(diào)度線程池:newScheduledThreadPool(n)創(chuàng)建图呢,五個參數(shù)分別是 (n, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())条篷。含義是池中保持n個線程骗随,多余的任務(wù)在DelayedWorkQueue中等待。
有一項技術(shù)可以緩解執(zhí)行時間較長任務(wù)造成的影響赴叹,即限定任務(wù)等待資源的時間鸿染,而不要無限的等待
先看第一個例子,測試單線程池乞巧、固定線程池和緩存線程池(注意增加和取消注釋):
public class ThreadPoolExam {
public static void main(String[] args) {
//first test for singleThreadPool
ExecutorService pool = Executors.newSingleThreadExecutor();
//second test for fixedThreadPool
// ExecutorService pool = Executors.newFixedThreadPool(2);
//third test for cachedThreadPool
// ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
pool.execute(new TaskInPool(i));
}
pool.shutdown();
}
}
class TaskInPool implements Runnable {
private final int id;
TaskInPool(int id) {
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
System.out.println("TaskInPool-["+id+"] is running phase-"+i);
TimeUnit.SECONDS.sleep(1);
}
System.out.println("TaskInPool-["+id+"] is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2.2 向線程池提交任務(wù)
可以使用兩個方法向線程池提交任務(wù)
2.2.1 execute()
用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功.通過以下代碼可知execute()方法輸入的任務(wù)是一個Runnable類的實例.
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
從運行結(jié)果可以看出涨椒,單線程池中的線程是順序執(zhí)行的。固定線程池(參數(shù)為2)中绽媒,永遠(yuǎn)最多只有兩個線程并發(fā)執(zhí)行蚕冬。緩存線程池中,所有線程都并發(fā)執(zhí)行是辕。
第二個例子囤热,測試單線程調(diào)度線程池和固定調(diào)度線程池。
public class ScheduledThreadPoolExam {
public static void main(String[] args) {
//first test for singleThreadScheduledPool
ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
//second test for scheduledThreadPool
// ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
for (int i = 0; i < 5; i++) {
scheduledPool.schedule(new TaskInScheduledPool(i), 0, TimeUnit.SECONDS);
}
scheduledPool.shutdown();
}
}
class TaskInScheduledPool implements Runnable {
private final int id;
TaskInScheduledPool(int id) {
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
System.out.println("TaskInScheduledPool-["+id+"] is running phase-"+i);
TimeUnit.SECONDS.sleep(1);
}
System.out.println("TaskInScheduledPool-["+id+"] is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
從運行結(jié)果可以看出获三,單線程調(diào)度線程池和單線程池類似旁蔼,而固定調(diào)度線程池和固定線程池類似。
總結(jié):
- 如果沒有特殊要求疙教,使用緩存線程池總是合適的牌芋;
- 如果只能運行一個線程,就使用單線程池松逊。
- 如果要運行調(diào)度任務(wù),則按需使用調(diào)度線程池或單線程調(diào)度線程池
- 如果有其他特殊要求肯夏,則可以直接使用ThreadPoolExecutor類的構(gòu)造函數(shù)來創(chuàng)建線程池经宏,并自己給定那五個參數(shù)。
2.2.2 submit()
用于提交需要返回值的任務(wù).線程池會返回一個future類型對象,通過此對象可以判斷任務(wù)是否執(zhí)行成功
并可通過get()獲取返回值,get()會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout驯击,TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候可能任務(wù)沒有執(zhí)行完.
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 處理中斷異常
} catch (ExecutionException e) {
// 處理無法執(zhí)行任務(wù)異常
} finally {
// 關(guān)閉線程池
executor.shutdown();
}
2.3 關(guān)閉線程池
可通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池.
它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止.
但是它們存在一定的區(qū)別
- shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表
- shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài)烁兰,然后中斷所有沒有正在執(zhí)行任務(wù)的線程.
只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true.
當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true.
至于應(yīng)該調(diào)用哪一種方法,應(yīng)該由提交到線程池的任務(wù)的特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,若任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法.
2.4 合理配置
要想合理地配置線程池,就必須首先分析任務(wù)特性,可從以下幾個角度來分析
- 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)
- 任務(wù)的優(yōu)先級:高徊都、中和低
- 任務(wù)的執(zhí)行時間:長沪斟、中和短
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接暇矫。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理.
- CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置N(CPU)+1個線程的線程池
- 由于I/O密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*N(CPU)
- 混合型的任務(wù),如果可以拆分,將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量.如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進(jìn)行分解.
可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù).
優(yōu)先級不同的任務(wù)可以使用PriorityBlockingQueue處理.它可以讓優(yōu)先級高
的任務(wù)先執(zhí)行.
注意 如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行.
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時間越長,則CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU.
建議使用有界隊列 有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力主之,可以根據(jù)需要設(shè)大一點,比如幾千.
假如系統(tǒng)里后臺任務(wù)線程池的隊列和線程池全滿了,不斷拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因為后臺任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞,任務(wù)積壓在線程池里.
如果我們設(shè)置成無界隊列,那么線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題.
2.5 線程池的監(jiān)控
如果在系統(tǒng)中大量使用線程池,則有必要對線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問題時,可以根據(jù)線程池的使用狀況快速定位問題.可通過線程池提供的參數(shù)進(jìn)行監(jiān)控,在監(jiān)控線程池的時候可以使用以下屬性:
- taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量
- completedTaskCount:線程池在運行過程中已完成的任務(wù)數(shù)量,小于或等于taskCount李根。
- largestPoolSize:線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量.通過這個數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過.如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過.
- getPoolSize:線程池的線程數(shù)量.如果線程池不銷毀的話,線程池里的線程不會自動銷毀槽奕,所以這個大小只增不減.
- getActiveCount:獲取活動的線程數(shù).
通過擴展線程池進(jìn)行監(jiān)控.可以通過繼承線程池來自定義線程池,重寫線程池的
beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前房轿、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來進(jìn)行監(jiān)控.例如,監(jiān)控任務(wù)的平均執(zhí)行時間粤攒、最大執(zhí)行時間和最小執(zhí)行時間等.
這幾個方法在線程池里是空方法.
protected void beforeExecute(Thread t, Runnable r) { }
2.6 線程池的狀態(tài)
1.當(dāng)線程池創(chuàng)建后所森,初始為 running 狀態(tài)
2.調(diào)用 shutdown 方法后,處 shutdown 狀態(tài)夯接,此時不再接受新的任務(wù)焕济,等待已有的任務(wù)執(zhí)行完畢
3.調(diào)用 shutdownnow 方法后,進(jìn)入 stop 狀態(tài)盔几,不再接受新的任務(wù)晴弃,并且會嘗試終止正在執(zhí)行的任務(wù)。
4.當(dāng)處于 shotdown 或 stop 狀態(tài)问欠,并且所有工作線程已經(jīng)銷毀肝匆,任務(wù)緩存隊列已清空,線程池被設(shè)為 terminated 狀態(tài)顺献。
總結(jié)
總結(jié)
java 線程池有哪些關(guān)鍵屬性旗国?
- corePoolSize 到 maximumPoolSize 之間的線程會被回收,當(dāng)然 corePoolSize 的線程也可以通過設(shè)置而得到回收(allowCoreThreadTimeOut(true))注整。
- workQueue 用于存放任務(wù)能曾,添加任務(wù)的時候,如果當(dāng)前線程數(shù)超過了 corePoolSize肿轨,那么往該隊列中插入任務(wù)寿冕,線程池中的線程會負(fù)責(zé)到隊列中拉取任務(wù)。
- keepAliveTime 用于設(shè)置空閑時間椒袍,如果線程數(shù)超出了 corePoolSize驼唱,并且有些線程的空閑時間超過了這個值,會執(zhí)行關(guān)閉這些線程的操作
- rejectedExecutionHandler 用于處理當(dāng)線程池不能執(zhí)行此任務(wù)時的情況驹暑,默認(rèn)有拋出 RejectedExecutionException 異常玫恳、忽略任務(wù)、使用提交任務(wù)的線程來執(zhí)行此任務(wù)和將隊列中等待最久的任務(wù)刪除优俘,然后提交此任務(wù)這四種策略京办,默認(rèn)為拋出異常。
線程池中的線程創(chuàng)建時機帆焕?
- 如果當(dāng)前線程數(shù)少于 corePoolSize惭婿,那么提交任務(wù)的時候創(chuàng)建一個新的線程,并由這個線程執(zhí)行這個任務(wù)叶雹;
- 如果當(dāng)前線程數(shù)已經(jīng)達(dá)到 corePoolSize财饥,那么將提交的任務(wù)添加到隊列中,等待線程池中的線程去隊列中取任務(wù)折晦;
- 如果隊列已滿佑力,那么創(chuàng)建新的線程來執(zhí)行任務(wù),需要保證池中的線程數(shù)不會超過 maximumPoolSize筋遭,如果此時線程數(shù)超過了 maximumPoolSize打颤,那么執(zhí)行拒絕策略暴拄。
任務(wù)執(zhí)行過程中發(fā)生異常怎么處理?
如果某個任務(wù)執(zhí)行出現(xiàn)異常编饺,那么執(zhí)行任務(wù)的線程會被關(guān)閉乖篷,而不是繼續(xù)接收其他任務(wù)。然后會啟動一個新的線程來代替它透且。
什么時候會執(zhí)行拒絕策略撕蔼?
- workers 的數(shù)量達(dá)到了 corePoolSize,任務(wù)入隊成功秽誊,以此同時線程池被關(guān)閉了鲸沮,而且關(guān)閉線程池并沒有將這個任務(wù)出隊,那么執(zhí)行拒絕策略锅论。這里說的是非常邊界的問題讼溺,入隊和關(guān)閉線程池并發(fā)執(zhí)行,讀者仔細(xì)看看 execute 方法是怎么進(jìn)到第一個 reject(command) 里面的最易。
- workers 的數(shù)量大于等于 corePoolSize怒坯,準(zhǔn)備入隊,可是隊列滿了藻懒,任務(wù)入隊失敗剔猿,那么準(zhǔn)備開啟新的線程,可是線程數(shù)已經(jīng)達(dá)到 maximumPoolSize嬉荆,那么執(zhí)行拒絕策略归敬。
作者:湯圓叔
鏈接:http://www.reibang.com/p/c8d68f57d06d
來源:簡書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán)鄙早,非商業(yè)轉(zhuǎn)載請注明出處汪茧。