為什么要使用線程池
減少線程實例的創(chuàng)建和銷毀,使線程能夠重用谴古,提高系統(tǒng)性能
可以根據(jù)系統(tǒng)的承受能力,調(diào)整線程池中工作線程的數(shù)量,防止線程消耗過多內(nèi)存導致服務器崩潰
線程池的實現(xiàn)原理
判斷線程池里的核心線程是否都在執(zhí)行任務经磅,如果不是(核心線程空閑或者還有核心線程沒有被創(chuàng)建)則創(chuàng)建一個新的工作線程來執(zhí)行任務。如果核心線程都在執(zhí)行任務钮追,則進入下個流程预厌。
線程池判斷工作隊列是否已滿,如果工作隊列沒有滿元媚,則將新提交的任務存儲在這個工作隊列里轧叽。如果工作隊列滿了苗沧,則進入下個流程。
判斷線程池里的線程是否都處于工作狀態(tài)炭晒,如果沒有待逞,則創(chuàng)建一個新的工作線程來執(zhí)行任務。如果已經(jīng)滿了网严,則交給飽和策略來處理這個任務贬墩。
流程圖如下
JDK原生線程池的創(chuàng)建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
參數(shù)說明
corePoolSize:線程池核心線程數(shù)量
maximumPoolSize:線程池最大線程數(shù)量
keepAliverTime:當活躍線程數(shù)大于核心線程數(shù)時插掂,空閑的多余線程最大存活時間
unit:存活時間的單位
workQueue:存放任務的隊列
handler:超出線程范圍和隊列容量的任務的處理程序
線程池的源碼解讀
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
* 進行下面三步
*
* 1. 如果運行的線程小于corePoolSize,則嘗試使用用戶定義的Runnalbe對象創(chuàng)建一個新的線程
* 調(diào)用addWorker函數(shù)會原子性的檢查runState和workCount鳞芙,通過返回false來防止在不應
* 該添加線程時添加了線程
* 2. 如果一個任務能夠成功入隊列骚灸,在添加一個線城時仍需要進行雙重檢查(因為在前一次檢查后
* 該線程死亡了),或者當進入到此方法時驴一,線程池已經(jīng)shutdown了休雌,所以需要再次檢查狀態(tài),
* 若有必要肝断,當停止時還需要回滾入隊列操作杈曲,或者當線程池沒有線程時需要創(chuàng)建一個新線程
* 3. 如果無法入隊列,那么需要增加一個新線程胸懈,如果此操作失敗担扑,那么就意味著線程池已經(jīng)shut
* down或者已經(jīng)飽和了,所以拒絕任務
*/
// 獲取線程池控制狀態(tài)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //worker數(shù)量小于corePoolSize
if (addWorker(command, true))
return;
// 不成功則再次獲取線程池控制狀態(tài)
c = ctl.get();
}
// 線程池處于RUNNING狀態(tài)趣钱,將命令(用戶自定義的Runnable對象)添加進workQueue隊列
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查涌献,獲取線程池控制狀態(tài)
int recheck = ctl.get();
// 線程池不處于RUNNING狀態(tài),將命令從workQueue隊列中移除
if (! isRunning(recheck) && remove(command))
reject(command);// 拒絕執(zhí)行命令
else if (workerCountOf(recheck) == 0)// worker數(shù)量等于0
// 添加worker
addWorker(null, false);
}
// 添加worker失敗
else if (!addWorker(command, false))
reject(command);// 拒絕執(zhí)行命令
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) { // 外層無限循環(huán)
// 獲取線程池控制狀態(tài)
int c = ctl.get();
// 獲取狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && // 狀態(tài)大于等于SHUTDOWN首有,初始的ctl為RUNNING燕垃,小于SHUTDOWN
! (rs == SHUTDOWN && // 狀態(tài)為SHUTDOWN
firstTask == null && // 第一個任務為null
! workQueue.isEmpty())) // worker隊列不為空
// 返回
return false;
for (;;) {
// worker數(shù)量
int wc = workerCountOf(c);
if (wc >= CAPACITY || // worker數(shù)量大于等于最大容量
wc >= (core ? corePoolSize : maximumPoolSize)) // worker數(shù)量大于等于核心線程池大小或者最大線程池大小
return false;
if (compareAndIncrementWorkerCount(c)) // 比較并增加worker的數(shù)量
// 跳出外層循環(huán)
break retry;
// 獲取線程池控制狀態(tài)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 此次的狀態(tài)與上次獲取的狀態(tài)不相同
// 跳過剩余部分,繼續(xù)循環(huán)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker開始標識
boolean workerStarted = false;
// worker被添加標識
boolean workerAdded = false;
//
Worker w = null;
try {
// 初始化worker
w = new Worker(firstTask);
// 獲取worker對應的線程
final Thread t = w.thread;
if (t != null) { // 線程不為null
// 線程池鎖
final ReentrantLock mainLock = this.mainLock;
// 獲取鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 線程池的運行狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || // 小于SHUTDOWN
(rs == SHUTDOWN && firstTask == null)) { // 等于SHUTDOWN并且firstTask為null
if (t.isAlive()) // precheck that t is startable // 線程剛添加進來井联,還未啟動就存活
// 拋出線程狀態(tài)異常
throw new IllegalThreadStateException();
// 將worker添加到worker集合
workers.add(w);
// 獲取worker集合的大小
int s = workers.size();
if (s > largestPoolSize) // 隊列大小大于largestPoolSize
// 重新設置largestPoolSize
largestPoolSize = s;
// 設置worker已被添加標識
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
if (workerAdded) { // worker被添加
// 開始執(zhí)行worker的run方法
t.start();
// 設置worker已開始標識
workerStarted = true;
}
}
} finally {
if (! workerStarted) // worker沒有開始
// 添加worker失敗
addWorkerFailed(w);
}
return workerStarted;
}
說明:此函數(shù)可能會完成如下幾件任務
原子性的增加workerCount卜壕。
將用戶給定的任務封裝成為一個worker,并將此worker添加進workers集合中烙常。
啟動worker對應的線程轴捎,并啟動該線程,運行worker的run方法蚕脏。
回滾worker的創(chuàng)建動作侦副,即將worker從workers集合中刪除,并原子性的減少workerCount蝗锥。
我們通過一個程序來觀察線程池的工作原理:
public class ThreadPoolTest {
public static void main(String[] args) {
BlockingQueue<Runnable> blockingQueue = new LinkedBlockingDeque<>(5);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, blockingQueue);
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("線程池中活躍的線程數(shù): " + threadPoolExecutor.getPoolSize());
if (blockingQueue.size() > 0) {
System.out.println("----------------隊列中阻塞的線程數(shù)" + blockingQueue.size());
}
}
}
}
執(zhí)行結果如下
線程池中活躍的線程數(shù): 1
線程池中活躍的線程數(shù): 2
線程池中活躍的線程數(shù): 3
線程池中活躍的線程數(shù): 4
線程池中活躍的線程數(shù): 5
線程池中活躍的線程數(shù): 5
----------------隊列中阻塞的線程數(shù)1
線程池中活躍的線程數(shù): 5
----------------隊列中阻塞的線程數(shù)2
線程池中活躍的線程數(shù): 5
----------------隊列中阻塞的線程數(shù)3
線程池中活躍的線程數(shù): 5
----------------隊列中阻塞的線程數(shù)4
線程池中活躍的線程數(shù): 5
----------------隊列中阻塞的線程數(shù)5
線程池中活躍的線程數(shù): 6
----------------隊列中阻塞的線程數(shù)5
線程池中活躍的線程數(shù): 7
----------------隊列中阻塞的線程數(shù)5
線程池中活躍的線程數(shù): 8
----------------隊列中阻塞的線程數(shù)5
線程池中活躍的線程數(shù): 9
----------------隊列中阻塞的線程數(shù)5
線程池中活躍的線程數(shù): 10
----------------隊列中阻塞的線程數(shù)5
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task thread.ThreadPoolTest$$Lambda$1/1313922862@27bc2616 rejected from java.util.concurrent.ThreadPoolExecutor@3941a79c[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at thread.ThreadPoolTest.main(ThreadPoolTest.java:18)
從結果可以觀察出:
創(chuàng)建的線程池具體配置為:核心線程數(shù)量為5個跃洛;全部線程數(shù)量為10個;工作隊列的長度為5终议。
我們通過queue.size()的方法來獲取工作隊列中的任務數(shù)。
-
運行原理:
剛開始都是在創(chuàng)建新的線程,達到核心線程數(shù)量5個后穴张,新的任務進來后不再創(chuàng)建新的線程细燎,而是將任務加入工作隊列,任務隊列到達上線5個后皂甘,新的任務又會創(chuàng)建新的普通線程玻驻,直到達到線程池最大的線程數(shù)量10個,后面的任務則根據(jù)配置的飽和策略來處理偿枕。我們這里沒有具體配置璧瞬,使用的是默認的配置AbortPolicy:直接拋出異常。
當然渐夸,為了達到我需要的效果嗤锉,上述線程處理的任務都是利用休眠導致線程沒有釋放!D顾瘟忱!
RejectedExecutionHandler:飽和策略
$ 隊列和線程池都滿了,說明線程池處于飽和狀態(tài)苫幢,那么必須對新提交的任務采用一種特殊的策略來進行處理访诱。這個策略默認配置是AbortPolicy,表示無法處理新的任務而拋出異常韩肝。JAVA提供了4中策略:
AbortPolicy:直接拋出異常
CallerRunsPolicy:只用調(diào)用所在的線程運行任務
DiscardOldestPolicy:丟棄隊列里最近的一個任務触菜,并執(zhí)行當前任務。
DiscardPolicy:不處理哀峻,丟棄掉涡相。