為什么使用線程池
在沒有線程池的場景技扼,我們使用多線程時:
- 線程池的創(chuàng)建和銷毀需要消耗額外的資源。
線程的創(chuàng)建需要開辟虛擬機棧,本地方法棧枕赵、程序計數(shù)器等線程私有的內(nèi)存空間。
在線程的銷毀時需要回收這些系統(tǒng)資源位隶。頻繁的創(chuàng)建和銷毀線程會浪費大量的系統(tǒng)資源拷窜,增加并發(fā)編程的風險 - 系統(tǒng)超過負載后沒有拒絕策略,進一步加重系統(tǒng)負載涧黄,可能引起系統(tǒng)運行卡頓甚至崩潰
引入線程池的好處:
- 線程資源復(fù)用篮昧;
- 提供線程管理功能,控制線程并發(fā)數(shù)笋妥,合理使用系統(tǒng)資源
運行線程數(shù)量達到核心線程數(shù)量后懊昨,會進入緩存隊列;
隊列滿后會啟動額外線程(數(shù)量不大于最大線程數(shù))加快執(zhí)行春宣;
線程數(shù)量達到最大和隊列已滿的情況酵颁,會執(zhí)行拒絕策略嫉你。 - 線程環(huán)境隔離。不同環(huán)境和優(yōu)先級躏惋,可以分別使用線程池來隔離線程環(huán)境幽污,保證優(yōu)先級高的服務(wù)正常運行。
使用線程池的風險
用線程池有同步錯誤和死鎖簿姨、資源不足和線程泄漏等風險距误。
- 死鎖
任何多線程應(yīng)用程序都有死鎖風險。
雖然任何多線程程序中都有死鎖的風險扁位,但線程池卻引入了另一種死鎖可能深寥,在那種情況下,所有池線程都在執(zhí)行已阻塞的等待隊列中另一任務(wù)的執(zhí)行結(jié)果的任務(wù)贤牛,但這一任務(wù)卻因為沒有未被占用的線程而不能運行惋鹅。當線程池被用來實現(xiàn)涉及許多交互對象的模擬,被模擬的對象可以相互發(fā)送查詢殉簸,這些查詢接下來作為排隊的任務(wù)執(zhí)行闰集,查詢對象又同步等待著響應(yīng)時,會發(fā)生這種情況般卑。 - 資源不足
線程池在恰當?shù)卣{(diào)整了線程池大小時武鲁,通常執(zhí)行得很好。線程消耗包括內(nèi)存和其它系統(tǒng)資源在內(nèi)的大量資源蝠检。
如果線程池太大沐鼠,那么被那些線程消耗的資源可能嚴重地影響系統(tǒng)性能。在線程之間進行切換將會浪費時間叹谁,而且使用超出比您實際需要的線程可能會引起資源匱乏問題饲梭,因為池線程正在消耗一些資源,而這些資源可能會被其它任務(wù)更有效地利用焰檩。除了線程自身所使用的資源以外憔涉,服務(wù)請求時所做的工作可能需要其它資源,例如 JDBC 連接析苫、套接字或文件兜叨。這些也都是有限資源,有太多的并發(fā)請求也可能引起失效衩侥,例如不能分配 JDBC 連接国旷。 - 線程泄漏
各種類型的線程池中一個嚴重的風險是線程泄漏,當從池中除去一個線程以執(zhí)行一項任務(wù)茫死,而在任務(wù)完成后該線程卻沒有返回池時跪但,會發(fā)生這種情況。發(fā)生線程泄漏的一種情形出現(xiàn)在任務(wù)拋出一個 RuntimeException 或一個 Error 時璧榄。如果池類沒有捕捉到它們特漩,那么線程只會退出而線程池的大小將會永久減少一個吧雹。當這種情況發(fā)生的次數(shù)足夠多時骨杂,線程池最終就為空涂身,而且系統(tǒng)將停止,因為沒有可用的線程來處理任務(wù)搓蚪。
有些任務(wù)可能會永遠等待某些資源或來自用戶的輸入蛤售,而這些資源又不能保證變得可用,用戶可能也已經(jīng)回家了妒潭,諸如此類的任務(wù)會永久停止悴能,而這些停止的任務(wù)也會引起和線程泄漏同樣的問題。如果某個線程被這樣一個任務(wù)永久地消耗著雳灾,那么它實際上就被從池除去了漠酿。對于這樣的任務(wù),應(yīng)該要么只給予它們自己的線程谎亩,要么只讓它們等待有限的時間炒嘲。 - 請求過載
請求過多可能壓垮服務(wù)器。在這種情形下匈庭,我們可能不想將每個到來的請求都排隊到我們的工作隊列夫凸,因為排在隊列中等待執(zhí)行的任務(wù)可能會消耗太多的系統(tǒng)資源并引起資源缺乏。在這種情形下決定如何做取決于您自己阱持;在某些情況下夭拌,您可以簡單地拋棄請求,依靠更高級別的協(xié)議稍后重試請求衷咽,您也可以用一個指出服務(wù)器暫時很忙的響應(yīng)來拒絕請求
有效使用線程池的準則
- 不要對那些同步等待其它任務(wù)結(jié)果的任務(wù)排隊鸽扁。這可能會導致上面所描述的那種形式的死鎖,在那種死鎖中镶骗,所有線程都被一些任務(wù)所占用献烦,這些任務(wù)依次等待排隊任務(wù)的結(jié)果,而這些任務(wù)又無法執(zhí)行卖词。
- 在為時間可能很長的操作使用合用的線程時要小心巩那。如果程序必須等待諸如 I/O 完成這樣的某個資源,那么請指定最長的等待時間此蜈,以及隨后是失效還是將任務(wù)重新排隊以便稍后執(zhí)行即横。這樣做保證了:通過將某個線程釋放給某個可能成功完成的任務(wù),從而將最終取得某些進展裆赵。
- 理解任務(wù)东囚。要有效地調(diào)整線程池大小,您需要理解正在排隊的任務(wù)以及它們正在做什么战授。它們是 CPU 限制的(CPU-bound)嗎页藻?它們是 I/O 限制的(I/O-bound)嗎桨嫁?您的答案將影響您如何調(diào)整應(yīng)用程序。如果您有不同的任務(wù)類份帐,這些類有著截然不同的特征璃吧,那么為不同任務(wù)類設(shè)置多個工作隊列可能會有意義,這樣可以相應(yīng)地調(diào)整每個池废境。
線程池的大小設(shè)置
調(diào)整線程池的大小基本上就是避免兩類錯誤:線程太少或線程太多畜挨。在運行于具有 N 個處理器機器上的計算限制的應(yīng)用程序中,在線程數(shù)目接近 N 時添加額外的線程可能會改善總處理能力噩凹,而在線程數(shù)目超過 N 時添加額外的線程將不起作用巴元。事實上,太多的線程甚至會降低性能驮宴,因為它會導致額外的環(huán)境切換開銷逮刨。
- 線程池的最佳大小取決于可用處理器的數(shù)目以及工作隊列中的任務(wù)的性質(zhì)。若在一個具有 N 個處理器的系統(tǒng)上只有一個工作隊列堵泽,其中全部是計算性質(zhì)的任務(wù)修己,在線程池具有 N 或 N+1 個線程時一般會獲得最大的 CPU 利用率。
- 對于那些可能需要等待 I/O 完成的任務(wù)(例如落恼,從套接字讀取 HTTP 請求的任務(wù))箩退,需要讓池的大小超過可用處理器的數(shù)目,因為并不是所有線程都一直在工作佳谦。通過使用概要分析戴涝,您可以估計某個典型請求的等待時間(WT)與服務(wù)時間(ST)之間的比例。如果我們將這一比例稱之為 WT/ST钻蔑,那么對于一個具有 N 個處理器的系統(tǒng)啥刻,需要設(shè)置大約 N*(1+WT/ST) 個線程來保持處理器得到充分利用。
處理器利用率不是調(diào)整線程池大小過程中的唯一考慮事項咪笑。隨著線程池的增長可帽,您可能會碰到調(diào)度程序、可用內(nèi)存方面的限制窗怒,或者其它系統(tǒng)資源方面的限制映跟,例如套接字、打開的文件句柄或數(shù)據(jù)庫連接等的數(shù)目扬虚。
常用的幾種線程池
- newCachedThreadPool
創(chuàng)建一個可緩存線程池努隙,如果線程池長度超過處理需要,可靈活回收空閑線程辜昵,若無可回收荸镊,則新建線程。
特點:
工作線程的創(chuàng)建數(shù)量幾乎沒有限制(數(shù)目限制為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
如果長時間沒有往線程池中提交任務(wù)躬存,即如果工作線程空閑了指定的時間(默認為1分鐘)张惹,則該工作線程將自動終止。終止后岭洲,如果你又提交了新的任務(wù)宛逗,則線程池重新創(chuàng)建一個工作線程。
在使用CachedThreadPool時钦椭,一定要注意控制任務(wù)的數(shù)量拧额,否則碑诉,由于大量線程同時運行彪腔,很有會造成系統(tǒng)癱瘓。
- 執(zhí)行任務(wù)時有空閑線程进栽,直接使用德挣,沒有則創(chuàng)建
- 任務(wù)執(zhí)行完成后,等待1分鐘后關(guān)閉回收
- newFixedThreadPool
創(chuàng)建一個指定工作線程數(shù)量的線程池快毛,無限制(Integer.MAX_VALUE)長度任務(wù)隊列鏈表格嗅,隊列滿后后進任務(wù)直接丟棄。
每當提交一個任務(wù)就創(chuàng)建一個工作線程唠帝,如果工作線程數(shù)量達到線程池初始的最大數(shù)屯掖,則將提交的任務(wù)存入到池隊列中。
FixedThreadPool是一個典型且優(yōu)秀的線程池襟衰,它具有線程池提高程序效率和節(jié)省創(chuàng)建線程時所耗的開銷的優(yōu)點贴铜。但是,在線程池空閑時瀑晒,即線程池中沒有可運行任務(wù)時绍坝,它不會釋放工作線程,還會占用一定的系統(tǒng)資源苔悦。
- 多線程時轩褐,線程最大保持在設(shè)定線程量
- 新任務(wù)到來時,如果線程數(shù)量未達到線程池初始的最大數(shù)玖详,則會創(chuàng)建一個線程
- 任務(wù)執(zhí)行完畢后把介,線程繼續(xù)保持,不會關(guān)閉
newSingleThreadExecutor
創(chuàng)建一個單線程化的Executor蟋座,即只創(chuàng)建唯一的工作者線程來執(zhí)行任務(wù)拗踢,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行蜈七。如果這個線程異常結(jié)束秒拔,會有另一個取代它,保證順序執(zhí)行。單工作線程最大的特點是可保證順序地執(zhí)行各個任務(wù)砂缩,并且在任意給定的時間不會有多個線程是活動的作谚。newScheduleThreadPool
創(chuàng)建一個定長的線程池,無限制長度任務(wù)隊列鏈表庵芭。支持定時的以及周期性的任務(wù)執(zhí)行妹懒,支持定時及周期性任務(wù)執(zhí)行。
- 直接執(zhí)行:
public void testExecute() throws InterruptedException {
scheduledThreadPoolExecutor.execute(new TestTask(scheduledThreadPoolExecutor, 1000));
}
- 延時執(zhí)行
//延遲5秒后執(zhí)行
public void delayExecuteTask() throws InterruptedException {
System.out.println("task start");
int delaySecond = 5;
scheduledThreadPoolExecutor.schedule(new TestTask(scheduledThreadPoolExecutor, 0), delaySecond, TimeUnit.SECONDS);
for (int i = delaySecond; i > 0; i--) {
System.out.println("last time:" + i);
Thread.sleep(1000);
}
}
- 循環(huán)周期執(zhí)行
//延時5秒后双吆,每3秒執(zhí)行一次
public void scheduleWithFixedDelay() throws InterruptedException {
scheduledThreadPoolExecutor.scheduleWithFixedDelay(new TestTask(scheduledThreadPoolExecutor, 0), 5000, 3000, TimeUnit.MILLISECONDS);
for (int i = 0; i < 10000; i++) {
System.out.println("time:" + i);
Thread.sleep(1000);
}
}
線程池的拒絕策略
- CallerRunsPolicy
線程調(diào)用運行該任務(wù)的 execute 本身眨唬。此策略提供簡單的反饋控制機制,能夠減緩新任務(wù)的提交速度好乐。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
這個策略顯然不想放棄執(zhí)行任務(wù)匾竿。但是由于池中已經(jīng)沒有任何資源了,那么就直接使用調(diào)用該execute的線程本身來執(zhí)行蔚万。 - AbortPolicy
處理程序遭到拒絕將拋出運行時 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}
這種策略直接拋出異常岭妖,丟棄任務(wù)。(jdk默認策略反璃,隊列滿并線程滿時直接拒絕添加新任務(wù)昵慌,并拋出異常,所以說有時候放棄也是一種勇氣淮蜈,為了保證后續(xù)任務(wù)的正常進行斋攀,丟棄一些也是可以接收的,記得做好記錄) - DiscardPolicy
不能執(zhí)行的任務(wù)將被刪除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {} - DiscardOldestPolicy
如果執(zhí)行程序尚未關(guān)閉梧田,則位于工作隊列頭部的任務(wù)將被刪除淳蔼,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
該策略就稍微復(fù)雜一些柿扣,在pool沒有關(guān)閉的前提下首先丟掉緩存在隊列中的最早的任務(wù)肖方,然后重新嘗試運行該任務(wù)。這個策略需要適當小心未状。
線程池的原理
線程池的核心類:ThreadPoolExecutor俯画,他實現(xiàn)了Executor接口,如下圖所示:
1司草、提交新的task任務(wù)時艰垂,線程池判斷核心線程池是否已滿遭贸,如果未滿則創(chuàng)建新的worker線程救鲤。否則進入2。
2帕识、判斷任務(wù)隊列是否已經(jīng)滿搔课,如果還沒滿將任務(wù)放入隊列胰柑。否則進入3。
3、判斷線程池的線程是否有空閑柬讨,如果沒有崩瓤,則重新創(chuàng)建worker線程。如果線程池已滿踩官,則執(zhí)行拒絕策略却桶。
4、如果worker線程處于空閑狀態(tài)蔗牡,且線程數(shù)大于核心線程數(shù)颖系,超時后則執(zhí)行線程超時銷毀
詳見下圖:
線程池部分核心代碼
- 執(zhí)行任務(wù)主代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//1、線程數(shù)小于核心線程數(shù)辩越,添加worker任務(wù)并執(zhí)行
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2嘁扼、線程數(shù)>=核心線程數(shù)時,添加任務(wù)到隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//3区匣、添加隊列失敗后偷拔,如果線程未達到最多線程數(shù)蒋院,則新增worker任務(wù)
else if (!addWorker(command, false))
//新增失敗亏钩,執(zhí)行拒絕策略
reject(command);
}
- 添加worker任務(wù)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判斷線程是否達到上限
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS添加線程數(shù)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新增worker任務(wù)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 線程復(fù)用的代碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task==null(非首次任務(wù)代碼),則嘗試從阻塞隊列拿任務(wù)
//沒有拿到任務(wù)欺旧,則銷毀線程
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 從隊列獲取任務(wù)代碼塊
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超時姑丑、線程數(shù)大于核心線程數(shù),且隊列為空時辞友,銷毀線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//獲取任務(wù)栅哀,超過keepAliveTime未獲取到,則繼續(xù)往下執(zhí)行
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}