java 線程池采用的是 Thread Pool 線程池模式。
線程池設(shè)計模式主要解決在資源有限的情況下為每一個任務(wù)創(chuàng)建一個線程執(zhí)行消耗資源很不現(xiàn)實预茄。
線程池的設(shè)計思路
采用保存并管理一定數(shù)量的線程,用這些線程去執(zhí)行不斷產(chǎn)生的任務(wù)沛豌。
主要的類:
ThreadPool 負(fù)責(zé)接收和存儲任務(wù)以及線程生命周期的管理恭取。
WorkQueue 工作隊列盹牧,實現(xiàn)任務(wù)的緩存。
WorkerThread 負(fù)責(zé)任務(wù)執(zhí)行的工作線程沈撞。
所以說線程池一般用于單個任務(wù)處理時間短慷荔,但是任務(wù)量卻非常大的場景。
什么是線程缠俺?
線程和進(jìn)程都是對cpu工作時間段的描述
cpu在工作時會存在任務(wù)的切換显晶。進(jìn)程包括上下文切換贷岸。
線程是共享了進(jìn)程的上下文環(huán)境,的更為細(xì)小的CPU時間段吧碾。
進(jìn)程有獨(dú)立的地址空間凰盔,一個進(jìn)程崩潰后,在保護(hù)模式下不會對其它進(jìn)程產(chǎn)生影響倦春,而線程只是一個進(jìn)程中的不同執(zhí)行路徑户敬。線程有自己的堆棧和局部變量,但線程之間沒有單獨(dú)的地址空間睁本,一個線程死掉就等于整個進(jìn)程死掉尿庐,所以多進(jìn)程的程序要比多線程的程序健壯,但在進(jìn)程切換時呢堰,耗費(fèi)資源較大抄瑟,效率要差一些。
線程分為用戶級線程和內(nèi)核級線程枉疼,app自己管理是用戶級線程皮假,操作系統(tǒng)管理內(nèi)核級線程
從java線程到linux線程
在java程序中創(chuàng)建 線程Thread ,會調(diào)用OS操作系統(tǒng)的庫調(diào)度器陷入內(nèi)核空間,創(chuàng)建一個內(nèi)核級線程并維護(hù)在操作系統(tǒng)內(nèi)核線程表內(nèi)讓調(diào)度程序進(jìn)行調(diào)度骂维。
cpu會給每一個線程分配一個執(zhí)行時間惹资,而線程棧中有程序計數(shù)器,寄存器航闺,方法的棧幀褪测,cpu在進(jìn)行計算時計算的中間變量存儲在寄存器里。
在線程切換的過程中潦刃,需要將線程的中間計算結(jié)果侮措,存儲在從寄存器中寫回到內(nèi)存線程PCB中保存現(xiàn)場,清空原有的寄存器乖杠,線程2再進(jìn)行切換分扎。線程2執(zhí)行完再去喚醒線程1,需要將pcb中的中間結(jié)果寫入寄存器胧洒。
線程切換消耗時間主要是笆包?
- 用戶態(tài)切換內(nèi)核態(tài),
- 需要保存上一個線程中現(xiàn)場略荡。
線程池的創(chuàng)建參數(shù)
首先從ThreadPoolExecutor構(gòu)造方法講起,學(xué)習(xí)如何自定義ThreadFactory和RejectedExecutionHandler;
第1個參數(shù): corePoolSize 表示常駐核心線程數(shù)
如果大于0,即使本地任務(wù)執(zhí)行完畢,核心線程也不會被銷毀.
第2個參數(shù): maximumPoolSize 表示線程池能夠容納同時執(zhí)行的最大線程
第3個參數(shù): keepAliveTime 表示線程池中的線程空閑時間
當(dāng)空閑時間達(dá)到keepAliveTime時,線程會被銷毀,直到只剩下corePoolSize個線程
第5個參數(shù): workQueue 表示緩存隊列
當(dāng)請求的線程數(shù)大于maximumPoolSize時,線程進(jìn)入BlockingQueue
第7個參數(shù): handler 表示執(zhí)行拒絕策略的對象
當(dāng)超過第5個參數(shù)workQueue的任務(wù)緩存區(qū)上限的時候,就可以通過該策略處理請求,這是一種簡單的限流保護(hù).
友好的拒絕策略可以是如下三種:
(1 ) 保存到數(shù)據(jù)庫進(jìn)行削峰填谷;在空閑時再提取出來執(zhí)行
(2)轉(zhuǎn)向某個提示頁面
(3)打印日志
線程池工作原理
- 若當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(執(zhí)行這一步需要獲取全局鎖)
- 若運(yùn)行的線程多于或等于corePoolSize,則將任務(wù)加入BlockingQueue
- 若無法將任務(wù)加入BlockingQueue,同時線程數(shù)小于maximumPoolSize庵佣,則創(chuàng)建新的線程來處理任務(wù)(執(zhí)行這一步需要獲取全局鎖)(這里會導(dǎo)致這里的任務(wù)可能比阻塞隊列里的先執(zhí)行,這里沒加入阻塞隊列)
- 若創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()
線程池參數(shù)的合理設(shè)計
- corePoolSize
核心線程數(shù)是與每一個任務(wù)的執(zhí)行時間和每一秒產(chǎn)生的任務(wù)數(shù)量來確定汛兜。 每一個任務(wù)的平均執(zhí)行時間和80% 時間內(nèi)平均產(chǎn)生的任務(wù)數(shù)
- 任務(wù)隊列(workQueue)
隊列的長度 = 核心線程數(shù)/單個任務(wù)執(zhí)行時間 * 2巴粪,最大任務(wù)等待時間是2秒,10 個核心線程,單個任務(wù)0.1秒肛根,隊列長度200
- 最大任務(wù)數(shù)
最大線程數(shù) = (最大任務(wù)數(shù)-隊列長度)* 任務(wù)執(zhí)行時間 = (1000 - 200) * 0.1 = 80
簡單實現(xiàn)線程池
- Task 創(chuàng)建一個 Task 代表任務(wù)類辫塌,用于實現(xiàn)具體的任務(wù),需要實現(xiàn)Runnable接口派哲,傳入id 表示任務(wù)id
- Worker 類 繼承 Thread 類 臼氨,任務(wù)的執(zhí)行類,維護(hù)一個Runnable 的列表芭届,監(jiān)控列表储矩,當(dāng)列表不為空,取出執(zhí)行,構(gòu)造傳入List<Runnable>褂乍,并重寫run方法
- ThreadPool類持隧,線程池類,維護(hù)任務(wù)隊列逃片,
成員變量:1.任務(wù)隊列屡拨,2.當(dāng)前線程數(shù)量,3.核心線程數(shù)褥实,4.最大線程長度呀狼,5任務(wù)隊列長度等,
成員方法:1.提交任務(wù)损离,將任務(wù)加入list集合哥艇,需要判斷當(dāng)前是否超出任務(wù)總長度。2.執(zhí)行任務(wù)草冈,判斷當(dāng)前線程數(shù)量,決定創(chuàng)建核心線程數(shù)還是非核心線程數(shù) - 測試類:1. 創(chuàng)建線程池類對象瓮增,2怎棱。提交多個任務(wù)
創(chuàng)建一個最簡單的線程池
/**
* 任務(wù)類,實現(xiàn)runnable接口绷跑,實現(xiàn)具體任務(wù)
*/
public class Task implements Runnable {
private int id;
public Task(int id){
this.id = id;
}
@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + "當(dāng)前任務(wù): "+id+" 開始執(zhí)行");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + "當(dāng)前任務(wù): "+id+" 執(zhí)行完成");
}
}
public class Worker extends Thread {
private List<Runnable> taskList;
private String name;
public Worker(String name,List<Runnable> taskList){
this.name = name;
this.taskList = taskList;
}
@Override
public void run() {
// 判斷集合中是否有對象拳恋,有對象就執(zhí)行
while (taskList.size() > 0){
Runnable task = taskList.remove(0);
task.run();
}
}
}
public class ThreadPool {
// 創(chuàng)建一個線程安全的集合
private List<Runnable> taskList = Collections.synchronizedList(new LinkedList<>());
// 創(chuàng)建當(dāng)前線程數(shù),最大線程數(shù)砸捏,核心線程數(shù)
private int curNum;
private int corePoolSize;
private int maximumPoolSize;
private int workerSize;
public ThreadPool(int corePoolSize, int maximumPoolSize, int workerSize) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workerSize = workerSize;
}
// 創(chuàng)建任務(wù)的提交方法
public void submit(Task r){
if(curNum >= workerSize){
System.out.println( "任務(wù)隊列已滿谬运,拋出任務(wù)進(jìn)行終止");
}else {
taskList.add(r);
taskExec(r);
}
}
private void taskExec(Task r) {
if(curNum <= corePoolSize){
new Worker("核心線程" + r , taskList).start();
curNum++;
}else if(curNum < maximumPoolSize){
new Worker("非核心線程"+r,taskList).start();
curNum++;
}else{
System.out.println("任務(wù)" + r + "已經(jīng)緩存,但是超過");
}
}
}
線程池源碼解析
execute
線程的執(zhí)行方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt記錄著runState和workerCount
// clt 采用一個整形遍歷記錄前三位線程狀態(tài)垦藏,后29位當(dāng)前線程數(shù)梆暖,這樣可以避免使用總線鎖重量級鎖維護(hù)線程狀態(tài),可以直接使用AtomicInteger來進(jìn)行維護(hù)
int c = ctl.get();
//workerCountOf方法取出低29位的值掂骏,表示當(dāng)前活動的線程數(shù)
//然后拿線程數(shù)和 核心線程數(shù)做比較
if (workerCountOf(c) < corePoolSize) {
// 如果活動線程數(shù)<核心線程數(shù)
// 添加到
//addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷
if (addWorker(command, true))
// 如果成功則返回
return;
// 如果失敗則重新獲取 runState和 workerCount
c = ctl.get();
}
// 如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊列成功
if (isRunning(c) && workQueue.offer(command)) {
// 重新獲取 runState和 workerCount
int recheck = ctl.get();
// 如果不是運(yùn)行狀態(tài)并且
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//第一個參數(shù)為null轰驳,表示在線程池中創(chuàng)建一個線程,但不去啟動
// 第二個參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize
addWorker(null, false);
}
//再次調(diào)用addWorker方法级解,但第二個參數(shù)傳入為false冒黑,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize
else if (!addWorker(command, false))
//如果失敗則拒絕該任務(wù)
reject(command);
}
addworkers
將線程添加到工作隊列, 并啟動當(dāng)前的task進(jìn)行執(zhí)行勤哗,當(dāng)然查看源碼抡爹,重寫了run方法,如果當(dāng)前為null 會從隊列里面那一個運(yùn)行芒划。
加鎖的原因是我們在自己設(shè)計的簡單線程池中List<Runnable> 是線程安全的冬竟,而線程池源碼中,Runnbale使用 HashSet進(jìn)行存儲是線程不安全的腊状,所以需要加鎖诱咏。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取運(yùn)行狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果狀態(tài)值 >= SHUTDOWN (不接新任務(wù)&不處理隊列任務(wù))
// 并且 如果 !(rs為SHUTDOWN 且 firsTask為空 且 阻塞隊列不為空)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 返回false
return false;
for (;;) {
//獲取線程數(shù)wc
int wc = workerCountOf(c);
// 如果wc大與容量 || core如果為true表示根據(jù)corePoolSize來比較,否則為maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加workerCount(原子操作)
if (compareAndIncrementWorkerCount(c))
// 如果增加成功缴挖,則跳出
break retry;
// wc增加失敗袋狞,則再次獲取runState
c = ctl.get(); // Re-read ctl
// 如果當(dāng)前的運(yùn)行狀態(tài)不等于rs,說明狀態(tài)已被改變映屋,返回重新執(zhí)行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據(jù)firstTask來創(chuàng)建Worker對象
w = new Worker(firstTask);
// 根據(jù)worker創(chuàng)建一個線程
final Thread t = w.thread;
if (t != null) {
// new一個鎖
final ReentrantLock mainLock = this.mainLock;
// 加鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 獲取runState
int rs = runStateOf(ctl.get());
// 如果rs小于SHUTDOWN(處于運(yùn)行)或者(rs=SHUTDOWN && firstTask == null)
// firstTask == null證明只新建線程而不執(zhí)行任務(wù)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果t活著就拋異常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 否則加入worker(HashSet)
//workers包含池中的所有工作線程苟鸯。僅在持有mainLock時訪問。
workers.add(w);
// 獲取工作線程數(shù)量
int s = workers.size();
//largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量
if (s > largestPoolSize)
// 如果 s比它還要大棚点,則將s賦值給它
largestPoolSize = s;
// worker的添加工作狀態(tài)改為true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果worker的添加工作完成
if (workerAdded) {
// 啟動線程
t.start();
// 修改線程啟動狀態(tài)
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
// 返回線啟動狀態(tài)
return workerStarted;
}
Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
可以看到它繼承了AQS并發(fā)框架還實現(xiàn)了Runnable早处。證明它還是一個線程任務(wù)類。那我們調(diào)用t.start()事實上就是調(diào)用了該類重寫的run方法瘫析。
Worker為什么不使用ReentrantLock來實現(xiàn)呢砌梆?
tryAcquire方法它是不允許重入的,而ReentrantLock是允許重入的贬循。對于線程來說咸包,如果線程正在執(zhí)行是不允許其它鎖重入進(jìn)來的。
線程只需要兩個狀態(tài)杖虾,一個是獨(dú)占鎖烂瘫,表明正在執(zhí)行任務(wù);一個是不加鎖奇适,表明是空閑狀態(tài)
public void run() { runWorker(this); }
Worker 重寫的Run 方法調(diào)用runWorker(this);
final void runWorker(Worker w) {
// 拿到當(dāng)前線程
Thread wt = Thread.currentThread();
// 拿到當(dāng)前任務(wù)
Runnable task = w.firstTask;
// 將Worker.firstTask置空 并且釋放鎖
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果task或者getTask不為空坟比,則一直循環(huán)
while (task != null || (task = getTask()) != null) {
// 加鎖
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// return ctl.get() >= stop
// 如果線程池狀態(tài)>=STOP 或者 (線程中斷且線程池狀態(tài)>=STOP)且當(dāng)前線程沒有中斷
// 其實就是保證兩點(diǎn):
// 1. 線程池沒有停止
// 2. 保證線程沒有中斷
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中斷當(dāng)前線程
wt.interrupt();
try {
// 空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行run方法(Runable對象)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 執(zhí)行完后, 將task置空嚷往, 完成任務(wù)++葛账, 釋放鎖
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 退出工作
processWorkerExit(w, completedAbruptly);
}
總結(jié)一下runWorker方法的執(zhí)行過程:
while循環(huán)中,不斷地通過getTask()方法從workerQueue中獲取任務(wù)
如果線程池正在停止皮仁,則中斷線程注竿。否則調(diào)用3.
調(diào)用task.run()執(zhí)行任務(wù)茄茁;
如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法巩割,銷毀線程workers.remove(w)
關(guān)閉線程池
可通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池.
它們的原理是遍歷線程池中的工作線程,然后逐個調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止.
但是它們存在一定的區(qū)別
shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表
shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài)裙顽,然后中斷所有沒有正在執(zhí)行任務(wù)的線程.
只要調(diào)用了這兩個關(guān)閉方法中的任意一個,isShutdown方法就會返回true.
當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時調(diào)用isTerminaed方法會返回true.
至于應(yīng)該調(diào)用哪一種方法,應(yīng)該由提交到線程池的任務(wù)的特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,若任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法.
線程的狀態(tài),線程數(shù)保存單個Integer值里
在ThreadPoolExecutor的屬性定義中頻繁地用位運(yùn)算來表示線程池狀態(tài);
位運(yùn)算是改變當(dāng)前值的一種高效手段.
下面從屬性定義開始
Integer 有32位;
最右邊29位表工作線程數(shù);
最左邊3位表示線程池狀態(tài),可表示從0至7的8個不同數(shù)值
線程池的狀態(tài)用高3位表示,其中包括了符號位.
好處: 一個狀態(tài)向另一個狀態(tài)切換到另一個狀態(tài) 不需要使用總線鎖來進(jìn)行保證狀態(tài)切換的安全性宣谈。
核心線程在沒有任務(wù)的時候會阻塞
為什么單線程池和固定線程池使用的任務(wù)阻塞隊列是LinkedBlockingQueue()愈犹,而緩存線程池使用的是SynchronousQueue()呢?
因為單線程池和固定線程池中闻丑,線程數(shù)量是有限的漩怎,因此提交的任務(wù)需要在LinkedBlockingQueue隊列中等待空余的線程;而緩存線程池中嗦嗡,線程數(shù)量幾乎無限(上限為Integer.MAX_VALUE)勋锤,因此提交的任務(wù)只需要在SynchronousQueue隊列中同步移交給空余線程即可。
線程池中一個線程oom其他線程還可以用嗎侥祭?
可以叁执,在線程池中一個線程報錯,或者oom線程池會將其進(jìn)行回收矮冬,避免造成其他線程出錯谈宛。
如果程序能正常處理這個異常情況,比如不再申請更多的內(nèi)存或其它資源胎署,或者放棄那個子任務(wù)或子線程吆录,系統(tǒng)OOM狀態(tài)是可以回到正常情況。
如果主線程拋異常退出了琼牧,子線程還能運(yùn)行么恢筝?
先來一個定義線程不像進(jìn)程,一個進(jìn)程中的線程之間是沒有父子之分的巨坊,都是平級關(guān)系撬槽。即線程都是一樣的, 退出了一個不會影響另外一個。
因此抱究,答案是如果主線程拋異常退出了恢氯,子線程還能運(yùn)行带斑。
其實發(fā)生OOM的線程一般情況下會死亡鼓寺,也就是會被終結(jié)掉,該線程持有的對象占用的heap都會被gc了勋磕,釋放內(nèi)存妈候。因為發(fā)生OOM之前要進(jìn)行g(shù)c,就算其他線程能夠正常工作挂滓,也會因為頻繁gc產(chǎn)生較大的影響苦银。