線程的概念
- 線程:線程是調度CPU資源的最小單位酣溃。
-
線程的幾種狀態(tài):
1逸月、NEW:新建栓撞,線程被創(chuàng)建后,就進入了新建狀態(tài)碗硬。
2瓤湘、RUNNABLE:就緒,被其它對象調用了start()方法恩尾,處于就緒狀態(tài)的線程弛说,隨時可能被CPU調度執(zhí)行。
3翰意、BLOCKED:阻塞木人,線程因為某種原因放棄CPU使用權信柿,暫時停止運行
4、WAITING:等待醒第。
5渔嚷、TIMED_WAITING:超時等待
6、TERMINATED:終結:線程執(zhí)行完了或者因異常退出了run()方法稠曼,該線程結束生命周期形病。
狀態(tài)切換如下圖所示:
image.png
線程池的概念
- 線程池就是線程的一個緩存
線程池的優(yōu)勢
- 降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗霞幅;
- 提高響應速度漠吻。當任務到達時,任務可以不需要等到線程創(chuàng)建就能立即執(zhí)行司恳;
- 提高線程的可管理性途乃。線程是稀缺資源,如果無限制地創(chuàng)建抵赢,不僅會消耗系統(tǒng)資源欺劳,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配铅鲤、調優(yōu)和監(jiān)控划提。
線程的核心參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心線程數(shù)。提交一個任務時邢享,線程池創(chuàng)建一個新線程執(zhí)行任務鹏往,直到當前線程數(shù)等于corePoolSize;如果當前線程數(shù)為corePoolSize骇塘,繼續(xù)提交的任務被保存到阻塞隊列中伊履,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法款违,線程池會提前創(chuàng)建并啟動所有核心線程唐瀑。
- maximumPoolSize:線程池中允許的最大線程數(shù)。如果當前阻塞隊列滿了插爹,且繼續(xù)提交任務哄辣,則創(chuàng)建新的線程執(zhí)行任務,直到當前線程數(shù)等于maximumPoolSize赠尾;
- keepAliveTime:線程池維護線程所允許的空閑時間力穗。當線程池中的線程數(shù)量大于corePoolSize的時候,如果沒有新的任務提交气嫁,核心線程外的線程不會立即銷毀当窗,而是會等待,直到等待的時間超過了keepAliveTime寸宵;
- unit:keepAliveTime的時間單位崖面。
- workQueue:阻塞隊列元咙。用來保存等待被執(zhí)行的任務的阻塞隊列,且任務必須實現(xiàn)Runable接口嘶朱。
1蛾坯、ArrayBlockingQueue:基于數(shù)組的有界隊列。
2疏遏、LinkedBlockingQueue:基于鏈表的無界隊列脉课。
3、SynchronousQueue:一個不存儲元素的阻塞隊列财异,每個插入操作必須等到另一個線程調用移除操作倘零,否則插入操作一直處于阻塞狀態(tài)。
4戳寸、PriorityBlockingQueue:具有優(yōu)先級的無界阻塞隊列呈驶。 - threadFactory:線程工廠。用來創(chuàng)建新線程
- handler:拒絕策略疫鹊。
1袖瞻、AbortPolicy:直接拋出異常,默認的拒絕策略拆吆。
2聋迎、CallerRunsPolicy:用調用者所在的線程來執(zhí)行任務。
3枣耀、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務霉晕,并執(zhí)行當前任務。
4捞奕、DiscardPolicy:直接丟棄當前任務牺堰。
線程池的執(zhí)行流程
1、提交任務到線程池颅围,如果正在運行的線程數(shù)量小于corePoolSize伟葫,則創(chuàng)建新的核心線程執(zhí)行任務。
2院促、如果正在運行的線程數(shù)量大于或等于corePoolSize筏养,則將該任務放入隊列。
3一疯、如果隊列已滿并且正在運行的線程數(shù)量小于maximumPoolSize,則創(chuàng)建新的非核心線程執(zhí)行這個任務夺姑。
4墩邀、如果隊列已滿并且正在運行的線程數(shù)量大于或等于maximumPoolSize,線程池執(zhí)行拒絕策略盏浙。
5眉睹、當一個線程執(zhí)行完一個任務之后荔茬,會從隊列中取出下一個任務執(zhí)行。
6竹海、當一個線程的空閑時間超過keepAliveTime時慕蔚,如果線程池正在運行的線程數(shù)量大于corePoolSize,這個線程將被停掉
線程池的源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl記錄runState和workerCount
int c = ctl.get();
/**
* workerCountOf方法取出低29位的值斋配,表示當前活動的線程數(shù)孔飒;
* 如果當前活動的線程數(shù)小于corePoolSize,則新建一個線程放入線程池中艰争,
* 并把任務添加到該線程中
*/
if (workerCountOf(c) < corePoolSize) {
/**
* addWorker方法中的第二個參數(shù)表示限制添加線程的數(shù)量是根據(jù)
* corePoolSize來判斷還是maximumPoolSize來判斷坏瞄;
*/
if (addWorker(command, true))
return;
// 如果添加失敗,則重新獲取ctl值
c = ctl.get();
}
/**
* 如果當前線程池是運行狀態(tài)并且任務添加到隊列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
// 重新獲取ctl值
int recheck = ctl.get();
/**
* 再次判斷線程池的運行狀態(tài)甩卓,如果不是運行狀態(tài)鸠匀,由于之前已經(jīng)把command
* 添加到workQueue中了,這時需要移除該command
*/
if (! isRunning(recheck) && remove(command))
// 執(zhí)行過后通過handler使用拒絕策略對該任務進行處理
reject(command);
// 獲取線程池中的有效線程數(shù)逾柿,如果數(shù)量是0缀棍,則執(zhí)行addWorker方法
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 如果執(zhí)行到這里,有兩種情況:
* 1. 線程池已經(jīng)不是RUNNING狀態(tài)机错;
* 2. 線程池是RUNNING狀態(tài)爬范,但workerCount >= corePoolSize并且workQueue已滿。
* 這時毡熏,再次調用addWorker方法坦敌,但第二個參數(shù)傳入為false,將線程池的有限線程數(shù)量的上限設置為maximumPoolSize痢法;
*如果失敗則拒絕該任務
*/
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池的狀態(tài)
int rs = runStateOf(c);
/**
* 如果rs >= SHUTDOWN狱窘,表示此時不再接收新的任務;
* rs == SHUTDOWN财搁,這時表示關閉狀態(tài)蘸炸,不再接受新提交的任務,
* 可以繼續(xù)處理阻塞隊列中已保存的任務
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取線程數(shù)
int wc = workerCountOf(c);
/**
* 如果wc超過CAPACITY尖奔,也就是ctl的低29位的最大值(二進制是29個1)搭儒,返回false;
* core為true提茁,根據(jù)corePoolSize比較淹禾,即大于或等于核心線程數(shù),返回false茴扁;
* core為false铃岔,根據(jù)maximumPoolSize比較,即大于或等于最大線程數(shù)峭火,返回false毁习;
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試增加workerCount智嚷,如果成功,跳出第一個循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失敗纺且,重新獲取ctl的值
c = ctl.get(); // Re-read ctl
/**
* 如果當前的運行狀態(tài)不等于rs盏道,說明狀態(tài)已被改變,
* 返回第一個for循環(huán)繼續(xù)執(zhí)行
*/
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據(jù)firstTask來創(chuàng)建Worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/**
* rs < SHUTDOWN表示是RUNNING狀態(tài)载碌;
* 因為在SHUTDOWN時不會在添加新的任務猜嘱,但還是會執(zhí)行workQueue中的任務
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個HashSet
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 線程啟動
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
因為Worker本身繼承了Runnable接口,所以一個Worker對象在啟動的時候會調用Worker類中的run方法
public void run() {
runWorker(this);
}
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread();
// 獲取第一個任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 是否因為異常退出循環(huán)
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果線程池正在停止恐仑,確保線程是中斷的
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
// timeOut變量的值表示上次從阻塞隊列中取任務時是否超時
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 如果線程池狀態(tài)rs >= SHUTDOWN泉坐,也就是非RUNNING狀態(tài),
* rs >= STOP裳仆,線程池是否正在stop腕让,阻塞隊列是否為空
* 滿足上面的條件,workerCount減一
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* allowCoreThreadTimeOut默認為false歧斟,即核心線程不允許進行超時纯丸;
* wc > corePoolSize,表示當前線程池中的線程數(shù)量大于核心線程數(shù)量
* 對于超過核心線程數(shù)量的這些線程静袖,需要進行超時控制
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 根據(jù)timed來判斷觉鼻,如果為true,則通過阻塞隊列的poll方法進行超時控制队橙,
* 如果在keepAliveTime時間內沒有獲取到任務坠陈,則返回null;
*
* 否則通過take方法捐康,如果這時隊列為空仇矾,則take方法會阻塞直到隊列不為空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
/**
* 如果completedAbruptly值為true解总,則說明線程執(zhí)行時出現(xiàn)了異常贮匕,需要將workerCount減1;
* 如果線程執(zhí)行時沒有出現(xiàn)異常花枫,說明在getTask()方法中已經(jīng)已經(jīng)對workerCount進行了減1操作刻盐,這里就不必再減了。
*/
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 統(tǒng)計完成的任務數(shù)
completedTaskCount += w.completedTasks;
// 從workers中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根據(jù)線程池狀態(tài)進行判斷是否結束線程池
tryTerminate();
int c = ctl.get();
// 當線程池是RUNNING或SHUTDOWN狀態(tài)時劳翰,如果worker是異常結束敦锌,那么會直接addWorker;
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
/**
* 如果allowCoreThreadTimeOut=true佳簸,并且等待隊列有任務乙墙,至少保留一個worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize伶丐。
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}