執(zhí)行多線程并發(fā)任務的時候斋射,如果任務類型相同,一般會考慮使用線程池,一方面利用了并發(fā)的優(yōu)勢甥角,一方面避免創(chuàng)建大量線程得不償失。使用線程池執(zhí)行的任務一般是我們自己的代碼呼寸,或者第三方的代碼艳汽,有沒有想過,如果這些代碼拋出異常時对雪,線程池會怎么處理呢河狐?如果不處理又會有什么影響?
異常的影響
Java 理論與實踐: 嗨瑟捣,我的線程到哪里去了馋艺?這篇文章列舉了一個由于RuntimeException引發(fā)的線程泄漏問題:
考慮這樣一個假設的中間件服務器應用程序,它聚合來自各種輸入源的消息迈套,然后將它們提交到外部服務器應用程序捐祠,從外部應用程序接收響應并將響應路由回適當?shù)妮斎朐础τ诿總€輸入源桑李,都有一個以其自己的方式接受其輸入消息的插件(通過掃描文件目錄踱蛀、等待套接字連接、輪詢數(shù)據(jù)庫表等)贵白。插件可以由第三方編寫率拒,即使它們是在服務器 JVM 上運行的。這個應用程序擁有(至少)兩個內(nèi)部工作隊列 ― 從插件處接收的正在等待被發(fā)送到服務器的消息(“出站消息”隊列)禁荒,以及從服務器接收的正在等待被傳遞到適當插件的響應(“入站響應”隊列)猬膨。通過調(diào)用插件對象上的服務例程 incomingResponse() ,消息被路由到最初發(fā)出請求的插件呛伴。
從插件接收消息后寥掐,就被排列到出站消息隊列中。由一個或多個從隊列讀取消息的線程處理出站消息隊列中的消息磷蜀、記錄其來源并將它提交給遠程服務器應用程序(假定通過 Web 服務接口)召耘。遠程應用程序最終通過 Web 服務接口返回響應,然后我們的服務器將接收的響應排列到入站響應隊列中褐隆。一個或多個響應線程從入站響應隊列讀取消息并將其路由到適當?shù)牟寮鬯瑥亩瓿赏怠奥贸獭薄?br> 在這個應用程序中,有兩個消息隊列庶弃,分別用于出站請求和入站響應衫贬,不同的插件內(nèi)可能也有另外的隊列。我們還有幾種服務線程歇攻,一個從出站消息隊列讀取請求并將其提交給外部服務器固惯,一個從入站響應隊列讀取響應并將其路由到插件,在用于向套接字或其它外部請求源提供服務的插件中可能也有一些線程缴守。
如果這些線程中的一個(如響應分派線程)消失了葬毫,將會發(fā)生什么镇辉?因為插件仍能夠提交新消息,所以它們可能不會立即注意到某些方面出錯了贴捡。消息仍將通過各種輸入源到達忽肛,并通過我們的應用程序提交到外部服務。因為插件并不期待立即獲得其響應烂斋,因此它仍沒有意識到出了問題屹逛。最后,接收的響應將排滿隊列汛骂。如果它們存儲在內(nèi)存中罕模,那么最終將耗盡內(nèi)存。即使不耗盡內(nèi)存帘瞭,也會有人在某個時刻發(fā)現(xiàn)響應得不到傳遞 ― 但這可能需要一些時間手销,因為系統(tǒng)的其它方面仍能正常發(fā)揮作用。
當主要的任務處理方面由線程池而不是單個線程來處理時图张,對于偶然的線程泄漏的后果有一定程度的保護锋拖,因為一個執(zhí)行得很好的八線程的線程池,用七個線程完成其工作的效率可能仍可以接受祸轮。起初兽埃,可能沒有任何顯著的差異。但是适袜,系統(tǒng)性能最終將下降柄错,雖然這種下降的方式不易被察覺。
服務器應用程序中的線程泄漏問題在于不是總是容易從外部檢測它苦酱。因為大多數(shù)線程只處理服務器的部分工作負載售貌,或可能僅處理特定類型的后臺任務,所以當程序?qū)嶋H上遭遇嚴重故障時疫萤,在用戶看來它仍在正常工作颂跨。這一點,再加上引起線程泄漏的因素并不總是留下明顯痕跡扯饶,就會引起令人驚訝甚或使人迷惑的應用程序行為恒削。
我們在使用線程池處理并行任務時,在線程池的生命周期當中尾序,將通過某種抽象機制(Runnable)調(diào)用許多未知的代碼钓丰,這些代碼有可能是我們自己寫的,也有可能來自第三方每币。任何代碼都有可能拋出一個RuntimeException携丁,如果這些提交的Runnable拋出了RuntimeException,線程池可以捕獲他兰怠,線程池有可能會創(chuàng)建一個新的線程來代替這個因為拋出異常而結束的線程梦鉴,也有可能什么也不做(這要看線程池的策略)李茫。即使不會造成線程泄漏,我們也會丟失這個任務的執(zhí)行情況尚揣,無法感知任務執(zhí)行出現(xiàn)了異常涌矢。
所以掖举,有必要處理提交到線程池運行的代碼拋出的異常快骗。
如何處理異常
簡單了解線程池
上面是我畫的思維導圖
先介紹一下jdk中線程池的實現(xiàn):
Executor定義了一個通用的并發(fā)任務框架,即通過execute方法執(zhí)行一個任務塔次。
ExecutorService定義了并發(fā)框架(線程池)的生命周期方篮。
AbstractExecutorService、ThreadPoolExecutor励负、ScheduledThreadPoolExecutor實現(xiàn)了并發(fā)任務框架(線程池)藕溅。其中ScheduledThreadPoolExecutor支持定時及周期性任務的執(zhí)行。
Executors相當于一個線程池工廠類继榆,返回了不同執(zhí)行策略的線程池對象巾表。
我們一般使用Executors.new...方法來得到某種線程池:
newCachedThreadPool
創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要略吨,可靈活回收空閑線程集币,若無可回收,則新建線程翠忠。
newFixedThreadPool
創(chuàng)建一個定長線程池鞠苟,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待秽之。
newSingleThreadExecutor
創(chuàng)建一個單線程化的線程池当娱,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行考榨。
newScheduledThreadPool
創(chuàng)建一個定長線程池跨细,支持定時及周期性任務執(zhí)行。
其中河质,前三者返回ExecutorService實例扼鞋,他們的實現(xiàn)為ThreadPoolExecutor或其包裝類;newScheduledThreadPool返回的是ScheduledExecutorService實例愤诱,他的實現(xiàn)為ScheduledThreadPoolExecutor或其包裝類云头。
ExecutorService exec = Executors.newFixedThreadPool(8);
以上述代碼為例,得到ExecutorService實例后淫半,我們可以通過兩種方式提交任務(Runnable):
exec.execute(runnable)
exec.submit(runnable)
對于這兩種不同的任務提交方式溃槐,我們有不同的異常處理辦法。
exec.submit(runnable)
使用exec.submit(runnable)
這種方式提交任務時科吭,submit方法會將我們的Runnable包裝為一個RunnableFuture對象昏滴,這個對象實際上是FutureTask實例猴鲫,然后將這個FutureTask交給execute方法執(zhí)行。
Future用來管理任務的生命周期谣殊,將Future實例提交給異步線程執(zhí)行后拂共,可以調(diào)用Future.get方法獲取任務執(zhí)行的結果。我們知道Runnable執(zhí)行是沒有返回結果的姻几,那么這個結果是怎么來的宜狐?
可以看到,在FutureTask的構造方法中蛇捌,將Runnable包裝成了一個Callable類型的對象抚恒。
FutureTask的run方法中,調(diào)用了callable對象的call方法络拌,也就調(diào)用了我們傳入的Runnable對象的run方法俭驮。可以看到春贸,如果代碼(Runnable)拋出異常混萝,會被捕獲并且把這個異常保存下來。
可以看到萍恕,在調(diào)用get方法時逸嘀,會將保存的異常重新拋出。所以雄坪,我們在使用submit方法提交任務的時候厘熟,利用返回的Future對象,通過他的get方法可以得到任務運行中拋出的異常维哈,然后針對異常做一些處理绳姨。
由于我們在調(diào)用submit時并沒有給Runnable指定返回結果,所以在將Runnable包裝為Callable的時候阔挠,會傳入一個null飘庄,故get方法返回一個null.
當然,我們也可以直接傳入Callable類型的任務购撼,這樣就可以獲取任務執(zhí)行返回結果跪削,并且得到任務執(zhí)行拋出的異常。
這就是使用線程池時處理任務中拋出異常的第一種方法:使用ExecutorService.submit執(zhí)行任務迂求,利用返回的Future對象的get方法接收拋出的異常碾盐,然后進行處理
exec.execute(runnable)
利用Future.get得到任務拋出的異常的缺點在于,我們需要顯式的遍歷Future揩局,調(diào)用get方法獲取每個任務執(zhí)行拋出的異常毫玖,然后處理。
很多時候我們僅僅是使用exec.execute(runnable)
這種方法來提交我們的任務。這種情況下任務拋出的異常如何處理呢付枫?
在使用exec.execute(runnable)
提交任務的時候(submit其實也是調(diào)用execute方法執(zhí)行)烹玉,我們的任務最終會被一個Worker對象執(zhí)行。這個Worker內(nèi)部封裝了一個Thread對象阐滩,這個Thread就是線程池的工作者線程二打。工作者線程會調(diào)用runWorker方法來執(zhí)行我們提交的任務:(代碼比較長,就直接粘過來了)
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
上面代碼的基本意思就是不停的從任務隊列中取出任務執(zhí)行掂榔,如果任務代碼(task.run)拋出異常继效,會被最內(nèi)層的try--catch
塊捕獲,然后重新拋出衅疙。注意到最里面的finally塊莲趣,在重新
拋出異常之前鸳慈,要先執(zhí)行afterExecute
方法饱溢,這個方法的默認實現(xiàn)為空,即什么也不做走芋。我們可以在這個方法上做點文章绩郎,這就是我們的第二種方法,
重寫ThreadPoolExecutor.afterExecute
方法翁逞,處理傳遞到afterExecute
方法中的異常:
class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method. If you would like to trap both kinds of failures in this method, you can further probe for such cases, as in this sample subclass that prints either the direct cause or the underlying exception if a task has been aborted:
上面是java doc給出的建議肋杖。可以看到挖函,代碼中還處理了task
是FutureTask
的情況状植。回想一下submit
方式提交任務的情況:
在submit方法中怨喘,我們傳入的Runnable/Callable(要執(zhí)行的任務)被封裝為FutureTask對象津畸,交給
execute
方法執(zhí)行-
經(jīng)過一系列操作,提交的FutureTask對象被Worker對象中的工作者線程所執(zhí)行必怜,也就是runWorker方法
此時的代碼運行情況:runWorker->submit方法封裝的FutureTask的run方法->我們提交的Runnable的run方法
此時從
我們提交的Runnable的run方法
中拋出了一個未檢測異常RunnableException肉拓,被FutureTask的run方法捕獲FutureTask的run方法捕獲異常后保存,不再重新拋出梳庆。同時意味著run方法執(zhí)行結束暖途。
runWorker方法沒有檢測到異常,
task.run
當作正常運行結束膏执。但是還是會執(zhí)行afterExecute方法驻售。
經(jīng)過這樣的梳理,上面的代碼為什么這么寫就一目了然了更米。
上面已經(jīng)提到了兩種解決任務代碼拋出未檢測異常的方案欺栗。接下來是第三種:
當一個線程因為未捕獲的異常而退出時,JVM會把這個事件報告給應用提供的UncaughtExceptionHandler
異常處理器,如果沒有提供任何的異常處理器纸巷,那么默認的行為就是將堆棧信息輸送到System.err镇草。
看一下上面的runWorker
方法,如果task.run
(任務代碼)拋出了異常瘤旨,異常會層層拋出梯啤,最終導致這個線程退出。此時這個拋出的異常就會傳遞到UncaughtExceptionHandler
實例當中存哲,由uncaughtException(Thread t,Throwable e)
這個方法處理因宇。
于是就有了第三種解決任務代碼拋出異常的方案:為工作者線程設置UncaughtExceptionHandler
,在uncaughtException
方法中處理異常
注意祟偷,這個方案不適用與使用submit
方式提交任務的情況察滑,原因上面也提到了,F(xiàn)utureTask的run方法捕獲異常后保存修肠,不再重新拋出贺辰,意味著runWorker
方法并不會捕獲到拋出的異常,線程也就不會退出嵌施,也不會執(zhí)行我們設置的UncaughtExceptionHandler
饲化。
如何為工作者線程設置UncaughtExceptionHandler
呢?ThreadPoolExecutor
的構造函數(shù)提供一個ThreadFactory
吗伤,可以在其中設置我們自定義的UncaughtExceptionHandler
吃靠,這里不再贅述。
至于第四中方案足淆,就很簡單了:在我們提供的Runnable的run方法中捕獲任務代碼可能拋出的所有異常巢块,包括未檢測異常。這種方法比較簡單巧号,也有他的局限性族奢,不夠靈活,我們的處理被局限在了線程代碼邊界之內(nèi)裂逐。
總結
通過上面的分析我們得到了四種解決任務代碼拋異常的方案:
在我們提供的Runnable的run方法中捕獲任務代碼可能拋出的所有異常歹鱼,包括未檢測異常
使用ExecutorService.submit執(zhí)行任務,利用返回的Future對象的get方法接收拋出的異常卜高,然后進行處理
重寫
ThreadPoolExecutor.afterExecute
方法弥姻,處理傳遞到afterExecute
方法中的異常為工作者線程設置
UncaughtExceptionHandler
,在uncaughtException
方法中處理異常
要注意的是掺涛,使用最后一種方案時庭敦,無法處理以submit
的方式提交的任務。