1.概覽
ExecutorService是一個(gè)由JDK提供的框架担钮,它簡化了以異步模式運(yùn)行task的工作。通常來說三痰,ExecutorService會(huì)自動(dòng)提供一個(gè)線程池以及常用的API瑟啃。
2.實(shí)例化ExecutorService
?? 2.1 Executors類的工廠方法
??? 創(chuàng)建ExecutorService的最簡單的方法是使用Executors類的一個(gè)工廠方法先蒋。例如周叮,下面的一行代碼將會(huì)創(chuàng)建一個(gè)擁有10個(gè)線程的線程池辩撑。
? ?? ExecutorService executor = Executors.newFixedThreadPool(10);
? 還有其他幾個(gè)工廠方法用于創(chuàng)建預(yù)定義的ExecutorService,它們會(huì)滿足特定的使用場景仿耽。尋找最合適你的方法槐臀,可以咨詢Oracle的官方文檔。
2.2 直接創(chuàng)建一個(gè)ExecutorService
由于ExecutorService是一個(gè)接口氓仲,所以可以使用它的任一實(shí)現(xiàn)來創(chuàng)建一個(gè)實(shí)例。在java.util.concurrent包中有好幾個(gè)實(shí)現(xiàn)可供選擇,或者你可以創(chuàng)建自己的實(shí)現(xiàn)敬扛。例如晰洒,ThreadPoolExecutor類就有好幾個(gè)構(gòu)造方法可用于配置一個(gè)executor service以及它內(nèi)部的線程池。
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
你可能注意到了啥箭,上面的代碼和工廠方法newSingleThreadExecutor()的源代碼很像谍珊,在大多數(shù)情況下,詳細(xì)的手工配置都不是必需的急侥。
3.分配task給ExecutorService
ExecutorService可以運(yùn)行 Runnable 以及Callable任務(wù)砌滞。為了讓事情簡單一點(diǎn),我們將使用倆個(gè)最基本的任務(wù)坏怪。注意:這里我們使用拉姆達(dá)表達(dá)式來取代匿名內(nèi)部類贝润。
Runnable runnableTask = () -> {??
? ? try {????????
??????? TimeUnit.MILLISECONDS.sleep(300);??
? ? ? ? } catch (InterruptedException e) {??
??????e.printStackTrace();?
???}
};
?Callable callableTask = () -> {??
??TimeUnit.MILLISECONDS.sleep(300);????
?? return "Task's execution";
};
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
有好幾個(gè)方法可以用于把task分配給ExecutorService,包括: execute()、 submit()铝宵、 invokeAny()打掘、 invokeAll()。其中鹏秋,execute()方法是繼承自Executor接口尊蚁。execute()方法的返回值是void,使用它無法獲取任務(wù)的運(yùn)行結(jié)果或者檢查任務(wù)的狀態(tài)(例如,是正在運(yùn)行還是已經(jīng)被執(zhí)行了)侣夷。
executorService.execute(runnableTask);
submit()可以提交一個(gè)Callable或Runnable任務(wù)給ExecutorService横朋,并且返回一個(gè)Future類型的結(jié)果。
Future future =? executorService.submit(callableTask);
invokeAny()可以給ExecutorService分配一個(gè)任務(wù)集合百拓。導(dǎo)致他們中的每一個(gè)都被執(zhí)行琴锭,并且返回一個(gè)任務(wù)成功運(yùn)行的結(jié)果(如果存在一個(gè)成功運(yùn)行的話)。
String result = executorService.invokeAny(callableTasks);
invokeAll()也是可以給ExecutorService分配一個(gè)任務(wù)集合耐版。導(dǎo)致他們中的每一個(gè)都被執(zhí)行祠够,并且以Future集合的形式返回所有任務(wù)的運(yùn)行結(jié)果。
List> futures = executorService.invokeAll(callableTasks);
現(xiàn)在粪牲,在進(jìn)行下一步之前古瓤,我們有兩件事必須要討論一下: 關(guān)閉ExecutorService并且處理Future返回類型。
4.關(guān)閉一個(gè)ExecutorService
在通常情況下腺阳,當(dāng)沒有待處理的任務(wù)(task)時(shí)落君,ExecutorService將不會(huì)自動(dòng)銷毀。它會(huì)一直保持存活并且等待執(zhí)行新任務(wù)亭引。
在某些情況下绎速,這或許很有用;例如,有一個(gè)應(yīng)用需要處理一些不規(guī)則的任務(wù)或大量在編譯時(shí)不可知的任務(wù)焙蚓。在另一方面纹冤,即使一個(gè)app在執(zhí)行到最后洒宝,它也不會(huì)被停止,因?yàn)橐粋€(gè)處于等待中的ExecutorService 將導(dǎo)致JVM保持運(yùn)行萌京。
為了正確地關(guān)閉一個(gè)ExecutorService雁歌,我們有兩個(gè)API可供使用: shutdown() 以及shutdownNow()。
shutdown()方法不會(huì)導(dǎo)致ExecutorService立即銷毀知残。它將讓ExecutorService停止接收新任務(wù)靠瞎,并且在所有運(yùn)行中的線程完成他們當(dāng)前工作之后,關(guān)閉ExecutorService求妹。
executorService.shutdown();
而shutdownNow()方法會(huì)試圖立即銷毀ExecutorService,但是它并不保證在同一時(shí)刻下所有運(yùn)行中的線程都能被終止乏盐。該方法會(huì)返回一個(gè)任務(wù)集合,這些任務(wù)都是等待被ExecutorService處理的制恍。具體怎么處理這些任務(wù)則取決于開發(fā)者父能。
List notExecutedTasks = executorService.shutDownNow();
關(guān)閉ExecutorService的最好方式(同時(shí)也是Oracle所推薦的)是上面?zhèn)z種方法與awaitTermination()的組合使用。使用這種方法吧趣,ExecutorService將會(huì)先停止接收新任務(wù)法竞,然后等待一段時(shí)間好讓所有任務(wù)都被完成。如果時(shí)間過期了强挫,執(zhí)行將被理解終止:
executorService.shutdown();
try {
? ? if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
? ? ? ? executorService.shutdownNow();
? ? }
} catch (InterruptedException e) {
? ? executorService.shutdownNow();
}
5.Future接口
submit()和invokeAll()方法會(huì)返回一個(gè)對(duì)象或者一個(gè)Future類型的集合岔霸,這使得我們可以獲取任務(wù)的執(zhí)行結(jié)果或者檢查任務(wù)的狀態(tài)(是正在運(yùn)行還是已經(jīng)執(zhí)行完成)。
Future接口提供了一個(gè)特殊的額阻塞方法get()俯渤,get()方法會(huì)返回一個(gè)Callable任務(wù)實(shí)際的執(zhí)行結(jié)果呆细,或者null(如果是Runnable任務(wù)的話)。在任務(wù)仍處于運(yùn)行期間的時(shí)候八匠,調(diào)用get()方法將會(huì)導(dǎo)致阻塞絮爷,直到任務(wù)被恰當(dāng)?shù)貓?zhí)行并且結(jié)果是可用時(shí)。
Future future = executorService.submit(callableTask);
String result = null;
try {
? ? result = future.get();
} catch (InterruptedException | ExecutionException e) {
? ? e.printStackTrace();
}
如果get()方法導(dǎo)致長久的阻塞的話梨树,應(yīng)用的執(zhí)行性能就會(huì)下降坑夯。如果結(jié)果數(shù)據(jù)不重要的話,可以使用超時(shí)時(shí)間來避免這個(gè)問題:
String result = future.get(200, TimeUnit.MILLISECONDS);
如果執(zhí)行時(shí)間比指定時(shí)間長的話(此時(shí)抡四,即:200毫秒)柜蜈,就會(huì)拋出TimeoutException異常。isDone()方法可以用來檢查分配的任務(wù)是否已經(jīng)被處理了指巡。Future接口也提供了cancel()方法用于取消任務(wù)的執(zhí)行淑履,并且可以使用isCancelled()方法來檢查該任務(wù)是否已經(jīng)被取消。
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
6.ScheduledExecutorService接口
ScheduledExecutorService會(huì)在預(yù)定義的延遲之后藻雪,運(yùn)行任務(wù)秘噪。再次說明,實(shí)例化ScheduledExecutorService的最好方式是使用Executor類的工廠方法勉耀。在本章節(jié)中指煎,我們將使用單線程的ScheduledExecutorService:
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
在一個(gè)固定延遲之后蹋偏,調(diào)度一個(gè)單任務(wù)執(zhí)行,可以使用ScheduledExecutorService的scheduled()方法贯要。這里有兩個(gè)scheduled()方法供你運(yùn)行一個(gè)Runnable或Callable任務(wù):
Future resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
scheduleAtFixedRate()方法可以讓我們?cè)诠潭ǖ难舆t時(shí)間之后暖侨,周期性地執(zhí)行一個(gè)任務(wù)。上面的代碼會(huì)在執(zhí)行callableTask之前崇渗,延遲一秒。
下面的代碼塊則會(huì)在100毫秒的初始延遲之后京郑,執(zhí)行一個(gè)任務(wù)宅广,并且在那之后,它會(huì)每隔450毫秒執(zhí)行同一個(gè)任務(wù)些举。如果處理器在處理一個(gè)任務(wù)時(shí)需要的時(shí)間比scheduledAtFixedRate()方法的period參數(shù)更長的話跟狱,ScheduledExecutorService將會(huì)在開始下一個(gè)任務(wù)之前一直等待,直到當(dāng)前任務(wù)完成户魏。
Future resultFuture = service .scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);
如果必需在兩個(gè)任務(wù)迭代之間有一個(gè)固定長度的延遲的話驶臊,應(yīng)當(dāng)使用scheduleWithFixedDelay()。例如叼丑,下面的代碼將會(huì)保證在結(jié)束當(dāng)前運(yùn)行和開始另一個(gè)任務(wù)之間有150毫秒的停滯关翎。
service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);
根據(jù)scheduledAtFixedRate()以及scheduleWithFixedDelay()方法的約定,如果ExecutorService終止或者在任務(wù)執(zhí)行過程中有異常拋出的時(shí)鸠信,任務(wù)的周期執(zhí)行將會(huì)結(jié)束纵寝。
7.ExecutorService VS Fork/Join
在java7發(fā)布之后,許多開發(fā)者任務(wù)星立,應(yīng)該使用fork/join框架取代ExecutorService框架爽茴。但這并不總是合適的選擇。雖然fork/join可以帶來使用的簡潔性以及運(yùn)行的流暢性绰垂,但是它也讓降低了我們對(duì)并發(fā)執(zhí)行的控制室奏。ExecutorService讓開發(fā)者可以控制一定數(shù)量的線程,以及任務(wù)執(zhí)行的粒度劲装。
ExecutorService最適用于處理那些相互獨(dú)立的任務(wù)胧沫。例如,事務(wù) 或 符合“單任務(wù)單線程”的請(qǐng)求等酱畅。
相比之下琳袄,根據(jù)Oracle的官方文檔,fork/join在設(shè)計(jì)之初的目的就是加速那些可以被分解成小塊的工作纺酸。
8.總結(jié)
盡管ExecutorService比較簡單窖逗,但還是有很多坑需要我們注意。我們來總結(jié)一下:
? 1.保持一個(gè)未使用的ExecutorService存活:? 我們?cè)诒酒恼碌恼鹿?jié)4中講解了如何關(guān)閉一個(gè)ExecutorService餐蔬。
? 2.在使用固定長度的線程池時(shí)碎紊,錯(cuò)誤的線程池容量: 決定一個(gè)應(yīng)用到底需要多少線程才可以有效地執(zhí)行任務(wù)時(shí)非常重要的佑附,線程池太大會(huì)導(dǎo)致 不 必要的花銷,因?yàn)榇蠖鄶?shù)線程將處于等待模式waiting mode仗考。線程池太小的話音同,又會(huì)因?yàn)殛?duì)列中任務(wù)的長時(shí)間地等待而降低應(yīng)用的響應(yīng)性。
? 3.在一個(gè)task取消之后秃嗜,調(diào)用Future的get()方法:試圖獲取一個(gè)已經(jīng)取消的任務(wù)的執(zhí)行結(jié)果將會(huì)觸發(fā)一個(gè)CancellationException权均。
? 4.Future.get()方法的長期阻塞:超時(shí)時(shí)間應(yīng)該用于避免無謂的等待。
文章中的代碼可以在github上找到锅锨,地址: sourceCode