一、前言
??如果我們平時接觸過多線程開發(fā)稀轨,那肯定對線程池不陌生。在我們原先的學習中岸军,我們了解到奋刽,如果我們需要創(chuàng)建多個線程來執(zhí)行程序,那么只需要簡單使用Thread艰赞,Runnable或者Callable就可以完成我們所需的功能佣谐。
但線程頻繁的創(chuàng)建與銷毀是需要系統(tǒng)開銷的,我們舉幾個例子來說下使用線程池的優(yōu)點:
- 例如方妖,創(chuàng)建線程消耗時間T1狭魂,執(zhí)行任務消耗時間T2,銷毀線程消耗時間T3党觅,那么如果T1+T3>T2趁蕊,那么是不是說開啟一個線程來執(zhí)行這個任務太不劃算了!這時候使用線程池仔役,線程池緩存線程,可用已有的閑置線程來執(zhí)行新任務是己,避免了T1+T3帶來的系統(tǒng)開銷又兵;
- 使用多線程會占用系統(tǒng)資源,如果同時執(zhí)行的線程過多卒废,就有可能導致系統(tǒng)資源不足而產(chǎn)生阻塞的情況沛厨,而使用線程池能有效的控制線程的最大并發(fā)數(shù),避免這種問題的產(chǎn)生摔认;
- 使用線程池還可以進行延遲執(zhí)行逆皮,定時執(zhí)行等操作;
本文測試使用的JDK版本為JDK 8.0参袱;
二电谣、Java中的線程池實現(xiàn)
??Java中的線程池實現(xiàn)是通過Executor
框架體系和工具類Executors
來實現(xiàn)的秽梅,其中最核心的類是ThreadPoolExecutor
這個類,我們將圍繞著這個類來進行學習剿牺。我們先來看一下Executor框架的繼承關系:
1. 接口簡單介紹
-
-
Executor企垦,線程池框架最基礎的任務執(zhí)行接口,Executor框架中幾乎所有類都直接或者間接實現(xiàn) Executor 接口晒来,該接口提供了一種將
任務提交
與任務執(zhí)行
分離開來的機制钞诡,該接口只有一個方法,用來執(zhí)行已經(jīng)提供的線程任務:
void execute(Runnable command);
-
Executor企垦,線程池框架最基礎的任務執(zhí)行接口,Executor框架中幾乎所有類都直接或者間接實現(xiàn) Executor 接口晒来,該接口提供了一種將
-
-
ExcutorService湃崩,繼承自
Executor
接口荧降,擴展了對任務各種操作的接口,該接口是我們最常用的線程池接口攒读,我們來看一下它的一些方法:
public interface ExecutorService extends Executor { /** * 啟動一次有順序的關閉朵诫,之前提交的任務正常執(zhí)行,新的任務不再執(zhí)行 */ void shutdown(); /** * 試圖停止所有正在執(zhí)行的任務整陌,暫停處理正在等待的任務拗窃,并返回等待執(zhí)行的任務列表 */ List<Runnable> shutdownNow(); /** * 如果執(zhí)行程序已經(jīng)停止,則返回false */ boolean isShutdown(); /** * 如果所有任務都在關閉后完成泌辫,返回true */ boolean isTerminated(); /** * 調(diào)用此方法随夸,在shutdown請求發(fā)起后,除非以下任何一種情況發(fā)生震放, 否則當前線程將一直阻塞宾毒,直到所有任務執(zhí)行完成: 1、所有任務執(zhí)行完成 2殿遂、發(fā)生超時诈铛; 3、當前線程被中斷 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一個有返回值的任務墨礁,多個重載方法幢竹,其中一個是指定返回值 */ <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); /** * 執(zhí)行給定的一組任務,返回持有任務執(zhí)行完成的結果和狀態(tài)的Future的list恩静, 對于每一個返回的結果焕毫,F(xiàn)uture.isDone = true 完成的任務可能正常結束或者拋出異常結束, 如果在任務執(zhí)行過程中參數(shù)Collection改變了驶乾,那么返回結果是不確定的邑飒。 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * 和上面接口一致,不同的是 如果所有任務執(zhí)行完成或者超時级乐, 那么對于每一個返回的結果疙咸,F(xiàn)uture.isDone = true */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 執(zhí)行給定的一組任務,只要有一個執(zhí)行成功就返回結果风科, 不論正常返回還是異常結束撒轮,未執(zhí)行的任務都會被取消乞旦; */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * 和上面接口一致,不同的是腔召,該接口在未超時情況下杆查,只要有一個執(zhí)行成功就返回結果; */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
-
ExcutorService湃崩,繼承自
- AbstractExecutorService臀蛛,ExecutorService接口的抽象類實現(xiàn)亲桦,提供ExecutorService執(zhí)行方法的默認實現(xiàn),該類實現(xiàn)了submit浊仆、invokeAny和invokeAll方法客峭,并且提供了newTaskFor方法返回一個RunnableFuture對象。
-
- ScheduledExecutorService抡柿,ExecutorService的另一個實現(xiàn)舔琅,用于延遲或定時執(zhí)行任務,提供了如下幾個方法:
/** * 創(chuàng)建并執(zhí)行一個在給定的延遲之后的ScheduledFuture */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 創(chuàng)建并執(zhí)行一個周期性動作洲劣,在給定的初始延遲之后執(zhí)行备蚓,后續(xù)按照周期執(zhí)行 * 也就是先延遲initialDelay后執(zhí)行,下次是initialDelay+period執(zhí)行囱稽, * 再下次是initialDelay+period * 2郊尝,initialDelay+period * 3依次執(zhí)行; * 如果遇到異常战惊,則會停止執(zhí)行流昏,否則該任務將僅通過取消或終止執(zhí)行器終止; */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 創(chuàng)建并執(zhí)行一個周期性動作吞获,在給定的初始延遲之后執(zhí)行况凉, * 在每一次執(zhí)行終止和下一次執(zhí)行開始之間都存在給定的延遲執(zhí)行; */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
2. ThreadPoolExecutor介紹
ThreadPoolExecutor是Java線程池中最核心的類了各拷,我們學習線程池很大概念上就是學習這個類的使用刁绒,我們來看一下它的構造方法,參數(shù)及相應的實現(xiàn)烤黍。
2.1 構造方法
ThreadPoolExecutor共有4個構造方法膛锭,但其實參數(shù)一共就7種類型,我們主要就來看一下這具體的7種類型:
// 5個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
// 6個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
// 6個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
// 7個參數(shù)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
2.2 corePoolSize
該線程池中核心線程的最大值蚊荣。
- 線程池新建線程的時候,如果當前線程總數(shù)小于corePoolSize莫杈,則新建的是核心線程互例,如果超過corePoolSize,則新建的是非核心線程筝闹。核心線程默認情況下會一直存活在線程池中媳叨,即使這個核心線程啥也不干(閑置狀態(tài))腥光;
- 如果指定
ThreadPoolExecutor
的allowCoreThreadTimeOut
這個屬性為true,那么核心線程如果不干活(閑置狀態(tài))的話糊秆,超過一定時間(時長下面參數(shù)決定)武福,就會被銷毀掉;
2.3 maximumPoolSize
該線程池中線程總數(shù)最大值痘番。
線程總數(shù) = 核心線程數(shù) + 非核心線程數(shù)捉片。
2.4 keepAliveTime
該線程池中非核心線程閑置的超時時長。
一個非核心線程汞舱,如果不干活(閑置狀態(tài))的時長超過這個參數(shù)所設定的時長伍纫,就會被銷毀掉,而如果設置
allowCoreThreadTimeOut = true
昂芜,則會作用于核心線程莹规;
2.5 TimeUnit
keepAliveTime的單位,TimeUnit是一個枚舉類型泌神,其實前面已經(jīng)仔細學習過這個類良漱,這里不多說了:Java線程-Lock學習(五).
2.6 BlockingQueue workQueue
??該線程池中的任務隊列,用于維護等待執(zhí)行的Runnable對象欢际。當所有的核心線程都在執(zhí)行時母市,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了幼苛,則新建非核心線程執(zhí)行任務窒篱。該任務隊列決定了線程池的排隊策略。常用的workQueue類型有:
- SynchronousQueue舶沿,這個隊列接收到任務的時候墙杯,會直接提交給線程處理,而不保留它括荡,如果所有線程都在工作怎么辦高镐?那就新建一個線程來處理這個任務!所以為了保證不出現(xiàn)
線程數(shù)達到了maximumPoolSize而不能新建線程
的錯誤畸冲,使用這個類型隊列的時候嫉髓,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大邑闲;- LinkedBlockingQueue算行,無界隊列,這個隊列接收到任務的時候苫耸,如果當前線程數(shù)小于核心線程數(shù)州邢,則新建線程(核心線程)處理任務;如果當前線程數(shù)等于核心線程數(shù)褪子,則進入隊列等待量淌。由于這個隊列沒有最大值限制骗村,即所有超過核心線程數(shù)的任務都將被添加到隊列中,這也就導致了maximumPoolSize的設定失效呀枢,因為總線程數(shù)永遠不會超過corePoolSize胚股;
- ArrayBlockingQueue,有界隊列裙秋,可以限定隊列的長度琅拌,接收到任務的時候,如果沒有達到corePoolSize的值残吩,則新建線程(核心線程)執(zhí)行任務财忽,如果達到了,則入隊等候泣侮,如果隊列已滿即彪,則新建線程(非核心線程)執(zhí)行任務,又如果總線程數(shù)到了maximumPoolSize活尊,并且隊列也滿了隶校,則添加的新的線程將發(fā)生異常:RejectedExecutionException 表示被拒絕策略拒絕了,也就是說線程超出了線程池的總容量蛹锰;
- DelayQueue深胳,隊列內(nèi)元素必須實現(xiàn)Delayed接口,這就意味著你傳進去的任務必須先實現(xiàn)Delayed接口铜犬。這個隊列接收到任務時舞终,首先先入隊,只有達到了指定的延時時間癣猾,才會執(zhí)行任務敛劝;
2.7 ThreadFactory threadFactory
??線程工廠,用來創(chuàng)建線程纷宇,通過newThread()方法提供創(chuàng)建線程的功能夸盟。通過源碼我們知道,通過newThread()方法創(chuàng)建的線程都是非守護線程像捶,并且線程優(yōu)先級都是Thread.NORM_PRIORITY
上陕。可以通過Executors.defaultThreadFactory()
來創(chuàng)建默認的線程工廠拓春。
可以通過線程工廠給每個創(chuàng)建出來的線程設置更有意義的名字释簿,Debug和定位問題時非常又幫助。
2.8 RejectedExecutionHandler handler
用于拋出異常的硼莽,比如說上面的隊列發(fā)生了異常辕万,可以通過指定該參數(shù)來對異常進行拋出處理,有4種可選類型:
- CallerRunsPolicy,不在新線程中執(zhí)行任務渐尿,而是強制由調(diào)用者所在的線程來執(zhí)行任務;
- AbortPolicy矾瑰,默認處理砖茸,直接拋出異常,然后不再執(zhí)行相應的任務殴穴;
- DiscardPolicy凉夯,不執(zhí)行任務,也不拋出異常采幌,也就是忽略這個任務劲够;
- DiscardOldestPolicy,將隊列中最前面的那個任務丟棄休傍,然后執(zhí)行新任務征绎;
3. ThreadPoolExecutor實現(xiàn)及注意事項
3.1 ThreadPoolExecutor的執(zhí)行策略
上面介紹參數(shù)的時候其實已經(jīng)說到了ThreadPoolExecutor執(zhí)行的策略,這里再總結一下磨取,當一個任務被添加進線程池時:
- 線程數(shù)量未達到corePoolSize人柿,則新建一個線程(核心線程)執(zhí)行任務;
- 線程數(shù)量達到了corePools忙厌,則將任務移入隊列等待凫岖;
- 隊列已滿,新建線程(非核心線程)執(zhí)行任務逢净;
- 隊列已滿哥放,總線程數(shù)又達到了maximumPoolSize,就會由上面配置的異常處理RejectedExecutionHandler來拋出異常爹土。
3.2 ThreadPoolExecutor任務提交
??前面說了這么多甥雕,一直在介紹ThreadPoolExecutor的各個參數(shù),現(xiàn)在我們new了一個線程池后着饥,如何執(zhí)行線程任務呢犀农?其實有兩種方式,Executor.execute()
宰掉、ExecutorService.submit()
呵哨,submit
方法和execute
方法不同的是它能夠返回任務執(zhí)行的結果Future,來看一個簡單的例子:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i =0 ;i < 4; i++) {
threadPoolExecutor.execute(() -> System.out.println("threadName:" + Thread.currentThread().getName()));
}
Future<Integer> future = threadPoolExecutor.submit(() ->
System.out.println("threadName:" + Thread.currentThread().getName()), 1);
System.out.println(future.get());
threadPoolExecutor.shutdown();
}
4. Executors工具類
??Executors工具類是Executor框架的一個工具幫助類轨奄,提供了4種創(chuàng)建線程池的方式孟害,這4種方式都是直接或間接通過ThreadPoolExecutor
來實現(xiàn)的,一般情況下我們可以通過該工具類來創(chuàng)建線程池挪拟,如果該工具類的幾個方法滿足不了的情況下挨务,我們可以自定義實現(xiàn)。
4.1 CachedThreadPool方法
創(chuàng)建可緩存的線程池,看源碼就知道了:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
創(chuàng)建線程簡單例子:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
該方法創(chuàng)建的線程池線程數(shù)量無限制谎柄,有空閑線程則復用空閑線程丁侄,無空閑線程則創(chuàng)建新的線程;
4.2 FixedThreadPool方法
創(chuàng)建固定數(shù)量的線程池朝巫,來看下源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
創(chuàng)建線程簡單例子:
ExecutorService executorService= Executors.newFixedThreadPool(10);
ExecutorService executorService= Executors.newFixedThreadPool(10, Executors.defaultThreadFactory());
該方法創(chuàng)建的線程池中的最大線程數(shù)固定鸿摇,超出的線程會進入隊列等待;
這里再簡單說下無界隊列LinkedBlockingQueue的問題劈猿,無界隊列的隊列大小無限制拙吉,使用無界隊列做為阻塞隊列時要尤其當心,因為newFixedThreadPool
采用就是 LinkedBlockingQueue揪荣,當任務耗時較長或者QPS很高時可能會導致大量新任務在隊列中堆積筷黔,有可能會導致cpu和內(nèi)存飆升服務器掛掉。
這里可參考一個線上問題:一次Java線程池誤用引發(fā)的血案和總結
4.3 ScheduledThreadPool方法
創(chuàng)建支持延遲和定時的固定數(shù)量的線程池仗颈,看下源碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
可以看到佛舱,該方法是通過ScheduledThreadPoolExecutor
的構造方法來實現(xiàn)的,但底層仍然是通過ThreadPoolExecutor
的構造方法來實現(xiàn)的揽乱,并且任務隊列是DelayQueue
名眉,簡單例子:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, Executors.defaultThreadFactory());
4.4 SingleThreadExecutor方法
創(chuàng)建一個只有單個線程的線程池,看下源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
簡單例子:
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
通過該方法創(chuàng)建的線程池有且只有一個工作線程執(zhí)行任務凰棉,任務會按照隊列的順序來執(zhí)行损拢。
4.5 newSingleThreadScheduledExecutor方法
該方法結合了ScheduledThreadPool方法和SingleThreadExecutor方法,就不多說了撒犀。
三福压、總結
其實,JDK7之后還引入了一種采用分治思想的fork/join
框架或舞,該類框架的接口也繼承了ExecutorService荆姆,所以說也算是一種特殊的線程池,下一章再專門來學習該框架映凳。
- 在創(chuàng)建了線程池后胆筒,默認情況下,線程池中并沒有任何線程诈豌,而是等待有任務到來才創(chuàng)建線程去執(zhí)行任務仆救,除非調(diào)用了prestartCoreThread或prestartallcorethread方法;這兩個方法是用于預創(chuàng)建線程矫渔,也就是即使沒有任務來的時候就預先創(chuàng)建corePoolSize或1個線程彤蔽;
1. ExecutorService關閉方式shutdown和shutdownNow區(qū)別
針對shutdown方法而言:
調(diào)用該方法后不允許繼續(xù)往線程池內(nèi)添加線程,線程池的狀態(tài)變?yōu)镾HUTDOWN狀態(tài)庙洼,而所有在調(diào)用shutdown()方法之前提交到ExecutorSrvice的任務都會執(zhí)行顿痪,并且一旦所有線程執(zhí)行任務結束镊辕,ExecutorService才會真正關閉;
而shutdownNow方法則是:
調(diào)用該方法后蚁袭,會將線程池的狀態(tài)變?yōu)閟top狀態(tài)征懈,然后試圖停止當前正在執(zhí)行的任務,并返回在等待中沒有執(zhí)行的任務列表揩悄;
這里參考自:JAVA線程池shutdown和shutdownNow的區(qū)別受裹,而有關這兩個方法及awaitTermination方法的使用方面的注意事項,可以參考:[翻譯][Java]ExecutorService的正確關閉方法
2. 有關阿里巴巴開發(fā)手冊規(guī)范中的一項內(nèi)容
在代碼中使用Executors創(chuàng)建線程池的時候虏束,idea的阿里規(guī)范掃描插件會給出一項警告:
【強制】線程池不允許使用 Executors 去創(chuàng)建,而是通過 ThreadPoolExecutor的方式厦章,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則镇匀,規(guī)避資源耗盡的風險。
說明: Executors 返回的線程池對象的弊端如下:
- 1) FixedThreadPool 和 SingleThreadPool :
允許的請求隊列長度為 Integer.MAX_VALUE 袜啃,可能會堆積大量的請求汗侵,從而導致 OOM 。- 2) CachedThreadPool 和 ScheduledThreadPool : 允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE 群发,可能會創(chuàng)建大量的線程晰韵,從而導致 OOM 。
所以這里建議自定義實現(xiàn)線程池熟妓。
3. 參考文檔及文章
首先強烈推薦JDK-API文檔雪猪,另外,本文還參考了:
海子-Java并發(fā)編程:線程池的使用
線程池起愈,這一篇或許就夠了
并發(fā)編程網(wǎng)-聊聊并發(fā)(三)Java線程池的分析和使用