線程池
為什么要用線程池?
線程池提供了一種限制和管理資源(包括執(zhí)行一個任務(wù))鸥跟。 每個線程池還維護(hù)一些基本統(tǒng)計信息轨功,例如已完成任務(wù)的數(shù)量曼月。
這里借用《Java并發(fā)編程的藝術(shù)》提到的來說一下使用線程池的好處:
- 降低資源消耗。 通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗衙猪。
- 提高響應(yīng)速度馍乙。 當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要的等到線程創(chuàng)建就能立即執(zhí)行垫释。
- 提高線程的可管理性丝格。 線程是稀缺資源,如果無限制的創(chuàng)建棵譬,不僅會消耗系統(tǒng)資源显蝌,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配订咸,調(diào)優(yōu)和監(jiān)控曼尊。
創(chuàng)建的線程池的方式
(1) 使用 Executors 創(chuàng)建
我們上面剛剛提到了 Java 提供的幾種線程池,通過 Executors 工具類我們可以很輕松的創(chuàng)建我們上面說的幾種線程池算谈。但是實(shí)際上我們一般都不是直接使用Java提供好的線程池涩禀,另外在《阿里巴巴Java開發(fā)手冊》中強(qiáng)制線程池不允許使用 Executors 去創(chuàng)建,而是通過 ThreadPoolExecutor 構(gòu)造函數(shù) 的方式然眼,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則艾船,規(guī)避資源耗盡的風(fēng)險。
Executors 返回線程池對象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允許請求的隊列長度為 Integer.MAX_VALUE,可能堆積大量的請求,從而導(dǎo)致OOM屿岂。
CachedThreadPool 和 ScheduledThreadPool : 允許創(chuàng)建的線程數(shù)量為 Integer.MAX_VALUE 践宴,可能會創(chuàng)建大量線程,從而導(dǎo)致OOM爷怀。
(2) ThreadPoolExecutor的構(gòu)造函數(shù)創(chuàng)建
我們可以自己直接調(diào)用 ThreadPoolExecutor 的構(gòu)造函數(shù)來自己創(chuàng)建線程池阻肩。在創(chuàng)建的同時,給 BlockQueue 指定容量就可以了运授。示例如下:
private static ExecutorService executor = new ThreadPoolExecutor(13, 13,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(13));
這種情況下烤惊,一旦提交的線程數(shù)超過當(dāng)前可用線程數(shù)時,就會拋出java.util.concurrent.RejectedExecutionException吁朦,這是因?yàn)楫?dāng)前線程池使用的隊列是有邊界隊列柒室,隊列已經(jīng)滿了便無法繼續(xù)處理新的請求。但是異常(Exception)總比發(fā)生錯誤(Error)要好逗宜。
線程池常用API以及ThreadPoolExecutor構(gòu)造參數(shù):
ThreadPool參數(shù)
ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy());
//創(chuàng)建一個核心數(shù)為1雄右,最大線程數(shù)為2,超時時間為30s,等待隊列最長為1纺讲,拒絕策略為直接拒絕(拋出異常)的線程池
-
corePoolSize:
線程池核心線程數(shù)量擂仍,核心線程不會被回收,即使沒有任務(wù)執(zhí)行熬甚,也會保持空閑狀態(tài)逢渔。如果線程池中的線程少于此數(shù)目,則在執(zhí)行任務(wù)時創(chuàng)建则涯。
-
maxmumPoolSize
池允許最大的線程數(shù)复局,當(dāng)線程數(shù)量達(dá)到corePoolSize,且workQueue隊列塞滿任務(wù)了之后粟判,繼續(xù)創(chuàng)建線程亿昏。
-
KeepAliveTime
超過corePoolSize之后的“臨時線程”的存活時間。
-
unit
keepAliveTime的單位档礁。 (TimeUnit.SECONDS)
-
workQueue
當(dāng)前線程數(shù)超過corePoolSize時角钩,新的任務(wù)會處在等待狀態(tài),并存在workQueue中呻澜,BlockingQueue是一個先進(jìn)先出的阻塞式隊列實(shí)現(xiàn)递礼,底層實(shí)現(xiàn)會涉及Java并發(fā)的AQS機(jī)制.
-
ThreadFactory
創(chuàng)建線程的工廠類,通常我們會自頂一個threadFactory設(shè)置線程的名稱羹幸,這樣我們就可以知道線程是由哪個工廠類創(chuàng)建的脊髓,可以快速定位。
-
handler
線程池執(zhí)行拒絕策略栅受,當(dāng)線數(shù)量達(dá)到maximumPoolSize大小将硝,并且workQueue也已經(jīng)塞滿了任務(wù)的情況下恭朗,線程池會調(diào)用handler拒絕策略來處理請求。
系統(tǒng)默認(rèn)的拒絕策略有以下幾種:
- AbortPolicy:為線程池默認(rèn)的拒絕策略依疼,該策略直接拋異常處理痰腮。
- DiscardPolicy:直接拋棄不處理。
- DiscardOldestPolicy:丟棄隊列中最老的任務(wù)律罢。
- CallerRunsPolicy:將任務(wù)分配給當(dāng)前執(zhí)行execute方法線程來處理膀值。
我們還可以自定義拒絕策略,只需要實(shí)現(xiàn)RejectedExecutionHandler接口即可误辑,友好的拒絕策略實(shí)現(xiàn)有如下:
- 將數(shù)據(jù)保存到數(shù)據(jù)沧踏,待系統(tǒng)空閑時再進(jìn)行處理
- 將數(shù)據(jù)用日志進(jìn)行記錄,后由人工處理
各種線程池的適用場景介紹
- FixedThreadPool: 適用于為了滿足資源管理需求稀余,而需要限制當(dāng)前線程數(shù)量的應(yīng)用場景悦冀。它適用于負(fù)載比較重的服務(wù)器趋翻;
- SingleThreadExecutor: 適用于需要保證順序地執(zhí)行各個任務(wù)并且在任意時間點(diǎn)睛琳,不會有多個線程是活動的應(yīng)用場景。
- CachedThreadPool: 適用于執(zhí)行很多的短期異步任務(wù)的小程序踏烙,或者是負(fù)載較輕的服務(wù)器师骗;
- ScheduledThreadPoolExecutor: 適用于需要多個后臺執(zhí)行周期任務(wù),同時為了滿足資源管理需求而需要限制后臺線程的數(shù)量的應(yīng)用場景讨惩,
- SingleThreadScheduledExecutor: 適用于需要單個后臺線程執(zhí)行周期任務(wù)辟癌,同時保證順序地執(zhí)行各個任務(wù)的應(yīng)用場景。
優(yōu)雅地關(guān)閉線程池
-
shutdown():
- 調(diào)用之后不允許繼續(xù)往線程池內(nèi)繼續(xù)添加線程;
- 線程池的狀態(tài)變?yōu)?code>SHUTDOWN狀態(tài);
- 所有在調(diào)用
shutdown()
方法之前提交到ExecutorSrvice
的任務(wù)都會執(zhí)行; - 一旦所有線程結(jié)束執(zhí)行當(dāng)前任務(wù)荐捻,
ExecutorService
才會真正關(guān)閉黍少。
-
shutdownNow():
- 該方法返回尚未執(zhí)行的 task 的 List;
- 線程池的狀態(tài)變?yōu)?code>STOP狀態(tài);
- 阻止所有正在等待啟動的任務(wù),并且停止當(dāng)前正在執(zhí)行的任務(wù);
- 線程池不再接受新的任務(wù),但是仍然會將任務(wù)隊列中已有的任務(wù)執(zhí)行完畢处面。
-
awaitTermination:
- 設(shè)置定時任務(wù)厂置,代碼內(nèi)的意思為 2s 后檢測線程池內(nèi)的線程是否均執(zhí)行完畢(就像老師告訴學(xué)生,“最后給你 2s 鐘時間把作業(yè)寫完”)魂角,若沒有執(zhí)行完畢昵济,則調(diào)用
shutdownNow()
方法。
- 設(shè)置定時任務(wù)厂置,代碼內(nèi)的意思為 2s 后檢測線程池內(nèi)的線程是否均執(zhí)行完畢(就像老師告訴學(xué)生,“最后給你 2s 鐘時間把作業(yè)寫完”)魂角,若沒有執(zhí)行完畢昵济,則調(diào)用
-
常用的關(guān)閉的線程池的方法:
shutdown方法
awaitTermination方法
shutdownNow方法(發(fā)生異骋熬荆或者是Timeout的時候)
-
代碼示例:
private static final Random random = new Random(System.currentTimeMillis()); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy()); IntStream.range(0, 20).forEach(i -> executorService.execute(() -> { try { TimeUnit.SECONDS.sleep(random.nextInt(10)); System.out.println(Thread.currentThread().getName() + " [" + i + "] finish done"); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); //標(biāo)記線程池中的線程访忿,當(dāng)所有的的線程都執(zhí)行完畢后,線程池關(guān)閉 if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { //如果超時還有線程沒有結(jié)束則強(qiáng)制關(guān)閉正在工作的線程斯稳,并且關(guān)閉線程池 executorService.shutdownNow(); } System.out.println("===============has Shutdowned========== "); }
線程池拒絕策略--RejectedExecutionHandler
-
AbortPolicy :
- 當(dāng)線程池滿了的時候海铆,直接拒接當(dāng)前提交任務(wù)當(dāng)任務(wù)添加到線程池中被拒絕時,它將拋出 RejectedExecutionException異常 挣惰。
-
**CallerRunsPolicy **:
- 當(dāng)任務(wù)添加到線程池中被拒絕時卧斟,會在調(diào)用該execute方法中進(jìn)行該任務(wù)的執(zhí)行系草,如main線程。
-
DiscardOldestPolicy:
- 當(dāng)任務(wù)添加到線程池中被拒絕時唆涝,線程池會放棄等待隊列中最久的未處理任務(wù)找都,然后將被拒絕的任務(wù)添加到等待隊列中。
DiscardPolicy (不建議使用):
當(dāng)任務(wù)添加到線程池中被拒絕時廊酣,線程池將丟棄被拒絕的任務(wù)能耻。
-
Demo:
public class RejectedExecutionExample { public static void main(String[] args) throws InterruptedException { // ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.AbortPolicy());//直接拒絕,拋出 RejectedExecutionException異常 // ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.CallerRunsPolicy());// 當(dāng)任務(wù)添加到線程池中被拒絕時,會在線程池當(dāng)前正在運(yùn)行的Thread線程池中處理被拒絕的任務(wù)亡驰。 ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.DiscardOldestPolicy());// 當(dāng)任務(wù)添加到線程池中被拒絕時晓猛,線程池會放棄等待隊列中最舊的未處理任務(wù),然后將被拒絕的任務(wù)添加到等待隊列中凡辱。 // ExecutorService threadPool = RejectedExecutionHandlerTest(new ThreadPoolExecutor.DiscardPolicy());//直接拒絕 IntStream.range(0, 4).forEach(i -> threadPool.execute(() -> { System.out.println("Thread--> " + i + " is working"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread--> " + i + " work done"); })); TimeUnit.SECONDS.sleep(2); threadPool.execute(() -> System.out.println("Power By: " + Thread.currentThread().getName())); threadPool.shutdown(); if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { threadPool.shutdownNow(); } } public static ExecutorService RejectedExecutionHandlerTest(RejectedExecutionHandler handler) { return new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>(2), Executors.defaultThreadFactory(), handler); } }