前言
線程池之前需要是[多線程知識(shí):http://www.reibang.com/p/1b2daac373d5]
什么是線程池
顧名思義,線程池就是有一個(gè)容器[底層數(shù)據(jù)結(jié)構(gòu)HashSet<Worker>
],容器用于存放多個(gè)線程趴腋。線程池中存在多個(gè)線程栋艳,如果需要執(zhí)行任務(wù)的話导俘,則從這個(gè)池子中取得一個(gè)線程對(duì)象用于執(zhí)行此任務(wù)巾遭。[只是一個(gè)大概的粗略的介紹平道,具體細(xì)節(jié)請(qǐng)接著往下看9┝丁>酱闸衫!]
怎么使用線程池[對(duì)于CPU/IO密集型]
- CPU密集型:對(duì)于計(jì)算密集型的任務(wù)較多的場(chǎng)景的話,由于任務(wù)會(huì)占用大量的CPU時(shí)間片蔚出,多創(chuàng)建線程也沒有空閑的CPU去處理虫腋,所以就可以適當(dāng)?shù)臏p少線程池中的線程個(gè)數(shù)(一般線程個(gè)數(shù)=cpu個(gè)數(shù)即可)骄酗。
- IO密集型:對(duì)于IO密集型的任務(wù)較多的場(chǎng)景的話,由于任務(wù)并不會(huì)占用大量的CPU時(shí)間片岔乔,相反會(huì)有更多的IO阻塞導(dǎo)致CPU空閑酥筝,這個(gè)時(shí)候可以適當(dāng)?shù)卦黾泳€程池中的個(gè)數(shù)(一般線程數(shù)=2*cpu個(gè)數(shù)即可),提高CPU利用率雏门。
為什么要使用線程池
- 由于線程池的創(chuàng)建和銷毀的過程會(huì)涉及到用戶態(tài)和內(nèi)核態(tài)的切換等一些消耗計(jì)算機(jī)資源的操作(此處針對(duì)的是內(nèi)核線程模型嘿歌,可見http://www.reibang.com/p/39d2a4c050f8),導(dǎo)致效率的降低茁影。線程池的作用就是利用一個(gè)數(shù)據(jù)結(jié)構(gòu)容器維系一些線程宙帝,用于執(zhí)行Application的Task,以此達(dá)到復(fù)用線程提高效率(提高并發(fā)募闲、降低RT)步脓。
- 有些可以實(shí)現(xiàn)與時(shí)間相關(guān)的任務(wù),如定時(shí)任務(wù)浩螺、周期性任務(wù)等靴患。
Java線程池Demo
public class ThreadPoolDemo {
public static void main(String[] args) throws Exception {
// 創(chuàng)建Cached線程池,運(yùn)行速度最快要出、線程數(shù)=理論上可以理解為無窮大
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 創(chuàng)建混合線程池鸳君,運(yùn)行速度中等、線程數(shù)=傳入的 nThreads
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
// 創(chuàng)建單個(gè)線程的線程池患蹂,運(yùn)行速度最慢或颊,但是可以實(shí)現(xiàn)順序執(zhí)行,線程數(shù)=1
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
Future<Double> future1 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 + e2, 1, 9));
Future<Double> future2 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 - e2, 1, 9));
Future<Double> future3 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 * e2, 1, 9));
Future<Double> future4 = cachedThreadPool.submit(() -> doTask((e1, e2) -> e1*1.0 / e2, 1, 9));
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
System.out.println(future4.get());
// 關(guān)閉資源
close(cachedThreadPool);
close(fixedThreadPool);
close(singleThreadExecutor);
close(scheduledThreadPool);
}
public static double doTask(BiFunction<Integer, Integer, Double> biFunction, int arg1, int arg2) {
return biFunction.apply(arg1, arg2);
}
public static void close(ExecutorService executorService) {
executorService.shutdown();
}
}
Executors
類是線程池的工具類(不推薦使用)传于,有一些創(chuàng)建線程池的方法囱挑,最主要的方法如下:
- Executors.newCachedThreadPool():此方法是創(chuàng)建cached的線程池,此線程池由于來一個(gè)需要執(zhí)行的Task沼溜,就會(huì)創(chuàng)建一個(gè)線程來執(zhí)行Task(線程池的最大數(shù)量:理論上無窮大),所以速度也是最快的弹惦。此線程適用大量非耗時(shí)的任務(wù),如果大量耗時(shí)的任務(wù)的話檐嚣,要么會(huì)導(dǎo)致CPU被打滿嗡贺,影響其他業(yè)務(wù)任務(wù)诫睬。
- Executors.newFixedThreadPool(10):此方法是創(chuàng)建混合[給定數(shù)量]的線程池摄凡。此線程也是使用場(chǎng)景比較廣泛的亲澡,CPU密集型和IO密集型都是可以床绪,但是需要根據(jù)不同的場(chǎng)景設(shè)定不同的nThread的值。
- Executors.newSingleThreadExecutor():此方法是創(chuàng)建一個(gè)單個(gè)線程的線程池梭伐,顧名思義练慕,這個(gè)線程池中只有一個(gè)線程铃将,所有的任務(wù)都是順序排隊(duì)處理的劲阎,所以速度也是最慢的悯仙,但是也因?yàn)檫@個(gè)特性導(dǎo)致可以任務(wù)的順序執(zhí)行锡垄,適用于一些特殊的場(chǎng)景路操。
- Executors.newScheduledThreadPool(10):此方法是創(chuàng)建一個(gè)可以可以讓Task在給定的延遲后運(yùn)行或定期執(zhí)行屯仗。適合于一些定時(shí)和周期性的場(chǎng)景魁袜。
前三種方法底層都是調(diào)用的同一個(gè)方法,如下
ThreadPoolExecutor
如果是使用Java原生的線程池,推薦使用ThreadPoolExecutor
先來看看類層級(jí)結(jié)構(gòu)
-
Executor
:ThreadPoolExecutor
線程池的頂級(jí)接口垮卓,此借口中只定義了一個(gè)提交執(zhí)行Task的方法粟按。
-
ExecutorService
:此接口是Executor
接口的擴(kuò)展,其中定義了很多的有關(guān)于管理線程池的方法庙曙,例如:shutdown()
捌朴、submit()
等砂蔽。
-
AbstractExecutorService
:此抽象類左驾,實(shí)現(xiàn)了ExecutorService
接口安岂,并對(duì)其中的submit()
和InvokeAny
的相關(guān)方法給出了一個(gè)默認(rèn)實(shí)現(xiàn)域那。
-
ThreadPoolExecutor
:此對(duì)象是使用頻率較高的線程池類,它對(duì)AbstractExecutorService
進(jìn)行進(jìn)一步的實(shí)現(xiàn),包括至關(guān)重要的execute
以及shutdown/shutdownNow
方法辫秧,除此之外盟戏,還定義了ctl
和一些數(shù)據(jù)結(jié)構(gòu)來維護(hù)線程池的運(yùn)行柿究,還提供了線程工廠對(duì)象創(chuàng)建線程蝇摸,還提供了基本的拒絕策略等貌夕。
構(gòu)造方法
構(gòu)造方法一
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:除非設(shè)置了
allowCoreThreadTimeOut
,即使它們處于空閑狀態(tài)也要保留在池中的線程數(shù)们童。 - maximumPoolSize:池中允許的最大線程數(shù)慧库。
- keepAliveTime:當(dāng)線程數(shù)大于corePoolSize數(shù)時(shí)完沪,這是多余的空閑線程將在終止之前等待新任務(wù)的最長時(shí)間
- unit:
keepAliveTime
參數(shù)的時(shí)間單位听皿。 - workQueue:在執(zhí)行任務(wù)之前用于保留任務(wù)的隊(duì)列尉姨。此隊(duì)列將僅保存由
execute
方法提交的Runnable
且多于corePoolSize
任務(wù)又厉,默認(rèn)為LinkedBlockingQueue
覆致,也可以自定義。 - threadFactory:執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠宣羊,默認(rèn)使用
DefaultThreadFactory
之宿,也可以自定義比被。 - handler:達(dá)到了線程界限和隊(duì)列容量而在執(zhí)行被阻止時(shí)使用的處理程序(拒絕策略),默認(rèn)
AbortPolicy
姐赡,也可以自定義项滑。
構(gòu)造方法二
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
構(gòu)造方法三
public ThreadPoolExecutor
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
構(gòu)造方法四
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
阻塞隊(duì)列
主要的阻塞隊(duì)列如下:
-
ArrayBlockingQueue
:底層數(shù)據(jù)結(jié)構(gòu)為數(shù)組的有界阻塞隊(duì)列枪狂,此隊(duì)列符合FIFO(先進(jìn)先出)原則州疾。 -
LinkedBlockingQueue
:底層數(shù)據(jù)結(jié)構(gòu)為鏈表的無界[理論上]阻塞隊(duì)列严蓖,此隊(duì)列符合FIFO(先進(jìn)先出)原則。 -
ConcurrentLinkedQueue
:底層數(shù)據(jù)結(jié)構(gòu)為鏈表的無界(理論上)線程安全的阻塞隊(duì)列毫深,此隊(duì)列符合FIFO(先進(jìn)先出)原則。 -
DelayQueue
:延遲隊(duì)列闸迷,其中的元素只能在其延遲到期后才能使用俘枫。 -
SynchronousQueue
:同步隊(duì)列巡球,每個(gè)插入操作必須等待另一個(gè)線程進(jìn)行相應(yīng)的刪除操作邓嘹,反之亦然。同步隊(duì)列沒有任何內(nèi)部容量起便,甚至沒有一個(gè)容量妙痹。您無法在同步隊(duì)列中peek
鼻疮,因?yàn)閮H當(dāng)您嘗試刪除它時(shí)耿芹,該元素才存在挪哄。您不能插入元素(使用任何方法)砸彬,除非另一個(gè)線程試圖將其刪除蛀蜜;您無法進(jìn)行迭代绽淘,因?yàn)闆]有要迭代的內(nèi)容壮池。隊(duì)列的head是第一個(gè)排隊(duì)的插入線程試圖添加到隊(duì)列中的元素杀怠;如果沒有這樣的排隊(duì)線程橙依,則沒有元素可用于刪除硕旗,并且poll()
將返回null
漆枚。出于其他Collection
方法(例如contains
)的目的墙基,SynchronousQueue
用作空集合残制。此隊(duì)列不允許null
元素颗祝。
拒絕策略
拒絕策略抽象接口RejectedExecutionHandler
吐葵,其所有的實(shí)現(xiàn)類都在ThreadPoolExecutor
類中温峭,當(dāng)然你也可以自己自己的拒絕策略凤藏。
-
AbortPolicy
:顧名思義栗菜,此拒絕策略則是當(dāng)提交執(zhí)行的Task超過線程池的``maximumPoolSize會(huì)拋出
RejectedExecutionException疙筹。如果不指定拒絕策略,那么對(duì)于
Executors`工具類暴备,此拒絕策略是默認(rèn)的拒絕策略涯捻。 -
CallerRunsPolicy
:此拒絕策略障癌,如果線程池沒有被shutdown混弥,那么就會(huì)在調(diào)用execute
方法的線程中去執(zhí)行該線程(如果是main線程執(zhí)行的execute
方法晾捏,那么就會(huì)使用主線程去執(zhí)行該Task,那么會(huì)導(dǎo)致main線程被阻塞直到該Task執(zhí)行結(jié)束)劳秋。 -
DiscardOldestPolicy
:此拒絕策略,靜默(不會(huì)拋出異常)丟棄被拒絕的任務(wù)补履。 -
DiscardPolicy
:此拒絕策略箫锤,它丟棄最舊的未處理請(qǐng)求阳准,然后重試execute
野蝇,如果線程池被關(guān)閉绕沈,在這種情況下,該任務(wù)將被丟棄戴而。
自定義拒絕策略使用Demo
public class ThreadPoolRejectedExecutionHandlerDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(
Runnable r, // 被拒絕的任務(wù)Task
ThreadPoolExecutor executor // 線程池對(duì)象
) {
// do something
}
});
// 執(zhí)行Task
executor.execute(() -> doTask());
// 關(guān)閉資源
close(executor);
}
public static void doTask() {
System.out.println("do Task");
}
public static void close(ExecutorService executorService) {
executorService.shutdown();
}
}
線程池詳解
ThreadPoolExecutor
類中有一個(gè)ctl
的原子整數(shù)的屬性,該屬性巧妙的表示了兩個(gè)屬性值(workerCount
操漠、runState
)浊伙,workerCount
指示線程的有效數(shù)量,runState
指示線程池的生命周期(是否運(yùn)行串结,關(guān)閉等)哑子。
-
runState
:ctl
高3位表示的是線程的一些生命狀態(tài)。
runState源碼注釋-
RUNNING
:運(yùn)行中肌割,此狀態(tài)可以接受新任務(wù)并處理排隊(duì)的任務(wù)卧蜓。 -
SHUTDOWN
:關(guān)閉中狀態(tài),此狀態(tài)不接受新任務(wù)把敞,而是處理排隊(duì)的任務(wù)弥奸。 -
STOP
:停止?fàn)顟B(tài),不接受新任務(wù)奋早,不處理排隊(duì)的任務(wù)以及中斷進(jìn)行中的任務(wù) -
TIDYING
:所有任務(wù)已終止其爵,workerCount為零摇幻,轉(zhuǎn)換為狀態(tài)TIDYING的線程將運(yùn)行terminated()
方法終結(jié)線程池。 -
TERMINATED
:terminated()
方法(默認(rèn)空實(shí)現(xiàn)帜矾,可以自己根據(jù)場(chǎng)景進(jìn)行自定義)運(yùn)行完畢死陆,代表線程池正在的停止领虹。
-
workerCount
:ctl
低29位表示的是線程池中的線程數(shù)量猾蒂,可以輕易的計(jì)算出一個(gè)線程池中最大可容納的線程數(shù)為2 ^ 29)-1(約5億個(gè))蚊逢。
為什么要使用一個(gè)ctl
的原子整數(shù)來表示這些值呢戳表?
- 原子整數(shù)通過CAS以較高效率保證線程安全价涝,畢竟線程池本身就要天然支持多線程環(huán)境泞遗。
- 使用一個(gè)屬性表示兩個(gè)屬性聊倔,無論是從空間消耗,還是維護(hù)成本來說都是比較理想的。
- 總所周知,計(jì)算機(jī)底層硬件都是通過位運(yùn)算實(shí)現(xiàn)一切的復(fù)雜運(yùn)算的,那么位運(yùn)算就天然比其他運(yùn)輸效率要高最铁。
線程池模型(生命周期)
-
RUNNING
:運(yùn)行中私爷,此狀態(tài)可以接受新任務(wù)并處理排隊(duì)的任務(wù)尸饺。 -
RUNNING
->SHUTDOWN
:執(zhí)行shutdown()方法,將會(huì)導(dǎo)致線程池狀態(tài)又RUNNING
->SHUTDOWN
的變遷愿吹,具體的shutdown()下面會(huì)有介紹。 -
RUNNING
->STOP
:執(zhí)行shutdownNow()方法,將會(huì)導(dǎo)致線程池狀態(tài)又RUNNING
->STOP
的變遷夸浅,具體的shutdown()下面會(huì)有介紹婉刀。 -
SHUTDOWN
/STOP
->TIDYING
:所有任務(wù)已終止堤尾,workerCount為零衔统,狀態(tài)轉(zhuǎn)換為TIDYING。 -
TIDYING
->TERMINATED
:狀態(tài)TIDYING的線程將運(yùn)行terminated()
方法十电,會(huì)導(dǎo)致狀態(tài)變遷為TERMINATED
。
shutdown VS shutdownNow
- shutdown:此方法是停止線程的方法抵蚊,不會(huì)接受新提交的任務(wù),并且會(huì)將當(dāng)前線程池中的工作線程的任務(wù)和阻塞隊(duì)列中的所有任務(wù)都執(zhí)行完。
- shutdownNow:此方法是停止線程的方法,不會(huì)接受新提交的任務(wù)怔昨,也不會(huì)執(zhí)行阻塞隊(duì)列中的所有任務(wù)涩惑,并且會(huì)嘗試停止當(dāng)前線程池中的工作線程的任務(wù)强戴。
線程池執(zhí)行流程
線程提交順序 VS 線程執(zhí)行順序
提交順序
- 應(yīng)用程序調(diào)用
execute
方法向線程池中提交Task執(zhí)行。 - 首先會(huì)向corePool中提交Task镀脂,如果corePool中有空閑的線程或者數(shù)量<
maximumPoolSize
薄翅,則選擇/創(chuàng)建一個(gè)線程執(zhí)行Task光羞。 - 否則铐炫,會(huì)將Task提交到Queue中胞皱,如果Queue隊(duì)列未滿,那么增將Task任務(wù)添加至Queue等待九妈。
- 否則反砌,會(huì)將Task提交到臨時(shí)Pool中(如果有的話,臨時(shí)Pool=
maximumPoolSize
-corePoolSize
)允蚣,如果臨時(shí)Pool有空閑或者數(shù)量<maximumPoolSize
,則選擇/創(chuàng)建一個(gè)線程執(zhí)行Task呆贿。 - 否則嚷兔,將會(huì)調(diào)用拒絕策略(默認(rèn)是拋出異常),當(dāng)然還有其他的實(shí)現(xiàn)或者自定義實(shí)現(xiàn)做入。
執(zhí)行順序
- 執(zhí)行順序與提交順序有一些差別
- 首先會(huì)執(zhí)行corePool和臨時(shí)Pool中的線程任務(wù)冒晰,執(zhí)行順序不固定(搶占式)。
- 等待Queue中的任務(wù)竟块,是處于等待狀態(tài)壶运。只有當(dāng)corePool和臨時(shí)Pool中線程有空閑的時(shí)候才會(huì)被執(zhí)行。
submit VS execute
上源碼
public Future<?> submit(Runnable task)
可見本質(zhì)上還是調(diào)用的
execute
方法浪秘,但是在執(zhí)行execute
方法之前對(duì)任務(wù)進(jìn)行了額外的操作蒋情,將Runnable
的Task通過newTaskFor
方法轉(zhuǎn)為RunnableFuture
Task,目的是為了能夠拿到執(zhí)行的結(jié)果耸携,此處返回結(jié)果默認(rèn)為null棵癣。
public <T> Future<T> submit(Runnable task, T result)
與
1
中類似,T result
是指定的任務(wù)的返回值
public <T> Future<T> submit(Callable<T> task)
與
1
中類似夺衍,不同的是狈谊,此次提交的Task是Callable<T>
類型的,也就是有返回值的沟沙。