0. 前言
為什么需要學(xué)習(xí)并發(fā)編程?
Tomcat醉锅、Netty等框架源碼,需要并發(fā)編程基礎(chǔ)才能看懂;
并發(fā)也是Java程序員的必經(jīng)之路
本篇文章的學(xué)習(xí)內(nèi)容有:
- 20+并發(fā)工具:線程池曙搬,各種鎖,原子類鸽嫂,并發(fā)容器
- 兩種并發(fā)策略:ThreadLocal和final
- 兩大底層原理:CAS原理與AQS框架
- 控制并發(fā)流程:Semaphore
- 實(shí)戰(zhàn)高性能緩存
1. 總覽并發(fā)工具
并發(fā)工具類根據(jù)功能可分為三大類:
- 并發(fā)安全:
- 從底層原理分類:互斥同步(鎖)纵装、非互斥同步(atomic)、無同步方案(final)
- 從使用角度分類:限制共享變量据某,避免共享變量橡娄,成熟并發(fā)工具
- 管理線程:線程池
- 線程協(xié)作:三大并發(fā)工具類等
更加詳細(xì)的分類參考思維導(dǎo)圖的建立并發(fā)知識(shí)框架分支
2. 線程池-線程治理最大法寶
線程池是并發(fā)工具中用來管理線程的工具。
碼農(nóng)翻身的這篇小白入門線程池文章對(duì)線程池有一個(gè)大致的概念和介紹癣籽,推薦閱讀挽唉。
2.1 什么是線程池?
為什么要使用線程池筷狼?
使用 new Thread 的方式創(chuàng)建線程執(zhí)行任務(wù)瓶籽,存在兩個(gè)問題:
- 反復(fù)創(chuàng)建線程開銷大,線程創(chuàng)建需要開辟虛擬機(jī)棧埂材、本地方法棧塑顺、程序計(jì)數(shù)器等線程私有的內(nèi)存空間,線程銷毀時(shí)需要回收這些資源俏险,頻繁創(chuàng)建銷毀線程會(huì)浪費(fèi)大量系統(tǒng)資源
- 過多的線程會(huì)占用大量CPU和內(nèi)存严拒,大量線程回收會(huì)帶來GC壓力扬绪,甚至?xí)霈F(xiàn) OOM 異常,CPU頻繁的進(jìn)行上下文切換也會(huì)降低系統(tǒng)性能
線程池主要解決兩個(gè)問題:
- 提升響應(yīng)速度裤唠。不需要反復(fù)創(chuàng)建和回收線程挤牛,消除了創(chuàng)建線程的延遲和銷毀線程的時(shí)間
- 合理利用CPU和內(nèi)存資源,提供更好的性能巧骚。靈活調(diào)整線程數(shù)量赊颠,不會(huì)線程太多導(dǎo)致OOM,也不會(huì)線程太少浪費(fèi)CPU資源劈彪。如果不使用線程池竣蹦,每次需要執(zhí)行異步任務(wù)時(shí)直接 new 一個(gè)線程來運(yùn)行,而線程的創(chuàng)建和銷毀都是需要開銷的沧奴。線程池里面的線程是可復(fù)用的痘括,不需要每次執(zhí)行任務(wù)時(shí)都重新創(chuàng)建和銷毀線程,節(jié)省了計(jì)算機(jī)資源
- 統(tǒng)一管理線程滔吠,比如停止線程池中的3000個(gè)線程纲菌,比挨個(gè)停止線程方便很多;可以動(dòng)態(tài)新增線程疮绷,限制線程個(gè)數(shù)翰舌。每個(gè)線程池也都保留了一些基本的統(tǒng)計(jì)數(shù)據(jù),比如當(dāng)前線程池完成的任務(wù)數(shù)目等冬骚。
線程池的適用場(chǎng)景:
- 服務(wù)器接收到大量請(qǐng)求時(shí)椅贱,適用線程池可以大大減少線程的創(chuàng)建和銷毀,提高服務(wù)器的性能只冻,實(shí)際中Tomcat也是這么做的
- 實(shí)際開發(fā)中庇麦,如果需要?jiǎng)?chuàng)建5個(gè)以上的線程,那么就可以使用線程池來管理喜德。
面試題1:線程不能重復(fù) start山橄,為什么線程池線程可以復(fù)用恨狈?
線程重復(fù) start薄货,會(huì)報(bào) IllegalThreadStateException 異常礁鲁,因?yàn)榫€程聲明周期就是從 start 到 terminated胶台,沒有辦法從 terminated 恢復(fù)到 start污尉,詳細(xì)答案參考面試題4與2.8 runWorker源碼解析章節(jié)
2.2 線程池詳解與 6 大屬性
JDK 中 ThreadPoolExecutor 類的構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
線程池創(chuàng)建主要有6大參數(shù)
參數(shù)名 | 類型 | 含義 |
---|---|---|
corePoolSize | int | 核心線程數(shù)數(shù) |
maxPoolSize | int | 最大線程數(shù)數(shù) |
keepAliveTime | long | 線程保持存活時(shí)間 |
workQueue | BlockingQueue | 任務(wù)存儲(chǔ)隊(duì)列 |
threadFactory | ThreadFactory | 線程工廠得哆,線程池需要新線程時(shí)使用ThreadFactory創(chuàng)建 |
handler | RejectedExecutionHandler | 線程池?zé)o法接受提交的任務(wù)時(shí)的拒絕策略 |
corePoolSize
常駐核心線程數(shù)沥曹,如果等于0髓帽,則任務(wù)執(zhí)行完畢奄薇,會(huì)銷毀線程池所有線程驳阎;如果大于0,任務(wù)執(zhí)行完畢核心線程不會(huì)銷毀。這個(gè)值的設(shè)置非常關(guān)鍵呵晚,過大會(huì)浪費(fèi)資源蜘腌,過小灰導(dǎo)致線程被頻繁創(chuàng)建和銷毀。maximumPoolSize
最大線程數(shù)饵隙,表示線程池能夠容納同時(shí)執(zhí)行的最大線程數(shù)撮珠,必須大于等于1。如果待執(zhí)行的線程數(shù)大于此值金矛,則緩存在任務(wù)隊(duì)列 workQueue 中芯急。如果 maxPoolSize 等于 corePoolSize,即是固定大小線程池workQueue
任務(wù)隊(duì)列驶俊,當(dāng)線程數(shù)大于等于 corePoolSize娶耍,則新任務(wù)保存到 BlockingQueue 阻塞隊(duì)列。阻塞隊(duì)列是線程安全的饼酿,使用兩個(gè) ReentrantLock 鎖來保證從出隊(duì)和入隊(duì)的原子性榕酒,是一個(gè)生產(chǎn)消費(fèi)模型隊(duì)列
2.2.1 線程池添加線程規(guī)則
任務(wù)提交到線程池,添加線程規(guī)則如下:
可記為:一cool二queue三max最后 reject故俐。
- 如果線程數(shù)小于 corePoolSize想鹰,即使其他線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來執(zhí)行新任務(wù)药版。
- 如果線程數(shù)等于(或大于)corePoolSize 但小于 maxPoolSize辑舷,則將任務(wù)放入隊(duì)列
- 如果隊(duì)列已滿,且線程數(shù)小于 maxPoolSize槽片,則創(chuàng)建一個(gè)新線程來執(zhí)行任務(wù)
- 如果隊(duì)列已滿惩妇,且線程數(shù)等于 maxPoolSize,則使用拒絕策略拒絕該任務(wù)
比喻: 線程池中的線程就像燒烤店的餐桌筐乳,店里面的餐桌數(shù) corePoolSize,坐滿后讓顧客在排隊(duì)區(qū) workQueue 排隊(duì)等待乔妈;如果顧客少店內(nèi)坐不滿蝙云,店內(nèi)餐桌也不會(huì)收起來;
如果排隊(duì)區(qū)也滿了路召,那只能冒著被城管罰款的風(fēng)險(xiǎn)在店外面廣場(chǎng)加桌子勃刨,廣場(chǎng)面積有限,最大餐桌數(shù)為 maxPoolSize股淡;如果廣場(chǎng)的顧客吃完了身隐,那抓緊把桌子收起來;
如果排隊(duì)區(qū)和廣場(chǎng)都坐滿了唯灵,那來了新顧客只能拒絕服務(wù) rejected 了贾铝。
源碼分析: 這里參考了《碼出高效p243》,execute方法的作用是提交任務(wù)command
到線程池執(zhí)行,用戶提交任務(wù)到線程池的模型圖如下所示
/*
* ThreadPoolExecutor 源碼
* ctl.get()獲取表示線程池狀態(tài)和線程個(gè)數(shù)的int值垢揩,
* workerCountOf獲取線程池中的線程數(shù)玖绿,isRunning判斷線程池是否為Running,是Running才接受新任務(wù)
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 返回包含線程池狀態(tài)和線程數(shù)的 int 值叁巨,這個(gè)值的詳解見線程池狀態(tài)章節(jié)
int c = ctl.get();
// 1. 線程數(shù)小于常駐核心線程數(shù)斑匪,使用addWorker創(chuàng)建新線程,傳入任務(wù)command
if (workerCountOf(c) < corePoolSize) {
// 創(chuàng)建線程成功則return返回
if (addWorker(command, true))
return;
// 如果創(chuàng)建失敗锋勺,防止外部已經(jīng)在線程池中加入新任務(wù)蚀瘸,重新獲取c
c = ctl.get();
}
// 2. 如果線程數(shù)大于等于coresize,且線程池處于Running狀態(tài)庶橱,則將任務(wù)添加到隊(duì)列
// 如果線程池調(diào)用了shutdown方法贮勃,則isRunning返回false,不會(huì)將任務(wù)添加到隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.2 添加成功后返回true悬包,再次檢查是否需要添加一個(gè)線程(因?yàn)槿腙?duì)過程中可能有線程被銷毀core=0 keeptime=0)衙猪,
// 因?yàn)榉荝unning狀態(tài)都不接受新任務(wù),如果isRunning返回false則從隊(duì)列移除任務(wù)command布近,并執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.3 如果當(dāng)前線程池為空垫释,則新創(chuàng)建一個(gè)線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果線程數(shù)大于等coresize,且任務(wù)隊(duì)列已滿撑瞧,則創(chuàng)建新線程棵譬,
// 參數(shù)false表示線程數(shù)要小于maxsize,創(chuàng)建成功返回true预伺,失敗則執(zhí)行拒絕策略
else if (!addWorker(command, false))
// 如果addWorker返回false订咸,即創(chuàng)建線程失敗,則使用拒絕策略
reject(command);
}
代碼1判斷如果線程池中線程個(gè)數(shù)小于 corePoolSize酬诀,則會(huì)向workers 里新增一個(gè)核心core線程執(zhí)行該任務(wù)脏嚷。
如果線程池中線程個(gè)數(shù)大于等于 corePoolSize 則執(zhí)行代碼2,如果當(dāng)前線程池是RUNNING狀態(tài)則添加任務(wù)到任務(wù)隊(duì)列瞒御,非RUNNING狀態(tài)是拒絕接受新任務(wù)的父叙。
如果向任務(wù)隊(duì)列添加成功,則代碼2.2對(duì)線程池狀態(tài)進(jìn)行二次校驗(yàn)肴裙,這是因?yàn)樘砑尤蝿?wù)到隊(duì)列后趾唱,執(zhí)行代碼2.2前有可能線程池的狀態(tài)已經(jīng)有變化了。這里二次校驗(yàn)如果線程池狀態(tài)不是RUNNING蜻懦,則把該任務(wù)從任務(wù)隊(duì)列移除甜癞,移除后政治性拒絕策略;如果二次校驗(yàn)通過宛乃,則執(zhí)行代碼2.3判斷線程池中線程個(gè)數(shù)悠咱,如果線程個(gè)數(shù)為0則新建一個(gè)線程蒸辆。
如果代碼2添加任務(wù)失敗,則說明任務(wù)隊(duì)列已滿乔煞,那么執(zhí)行代碼3吁朦,嘗試新創(chuàng)建線程來執(zhí)行該任務(wù),如果線程池線程個(gè)數(shù) ≥ maxxinumPoolSize 則執(zhí)行拒絕策略渡贾。
執(zhí)行拒絕策略的兩種情況:1. 線程池狀態(tài)為非RUNNING逗宜;2. 任務(wù)隊(duì)列已滿且線程個(gè)數(shù) ≥ maxxinumPoolSize
2.2.2 常見 3 種工作隊(duì)列
-
空隊(duì)列:SynchronousQueue,隊(duì)列長(zhǎng)度為0空骚,僅起到一個(gè)交接作用纺讲。任務(wù)保存到隊(duì)列后直接交給線程池,很容易創(chuàng)建新的線程囤屹,所以線程池需要設(shè)置大一點(diǎn)的 maxPoolSize熬甚。比如
Executors#newCachedThreadPool
創(chuàng)建線程池就使用的該隊(duì)列,并且設(shè)置 maxPoolSize 為 Integer.MAX_VALUE肋坚。 -
無界隊(duì)列:LinkedBlockingQueue乡括,鏈表結(jié)構(gòu),隊(duì)列長(zhǎng)度默認(rèn)為 Integer.MAX_VALUE智厌。由于隊(duì)列不會(huì)滿诲泌,所以線程數(shù)不會(huì)大于 corePoolSize,所以 maxPoolSize 相當(dāng)于沒有意義铣鹏》笊ǎ可以應(yīng)對(duì)流量突增,新任務(wù)都會(huì)被放到隊(duì)列中诚卸,缺點(diǎn)是如果處理速度小于任務(wù)提交速度葵第,則會(huì)造成 OOM 異常。比如
Executors#newFixedThreadPool
創(chuàng)建線程池使用的就是該隊(duì)列合溺,并且 maxPoolSize 等于 corePoolSize卒密。 - 有界隊(duì)列:ArrayBlockingQueue,需要手動(dòng)指定長(zhǎng)度的有界隊(duì)列棠赛,使用該隊(duì)列 maxPoolSize 有意義哮奇,當(dāng)隊(duì)列滿了之后,會(huì)創(chuàng)建新線程來執(zhí)行任務(wù)恭朗。
2.2.3 線程池 OOM
前面提到無界隊(duì)列 LinkedBlockingQueue,如果處理速度小于任務(wù)提交速度則會(huì)出現(xiàn) OOM依疼,JDK 中Executors#newFixedThreadPool
創(chuàng)建的線程池就使用的 LinkedBlockingQueue痰腮,使用不當(dāng)就會(huì)出現(xiàn)OOM,示例代碼中任務(wù)的提交速度遠(yuǎn)高于執(zhí)行速度律罢,并且使用JVM 參數(shù)調(diào)小堆內(nèi)存-Xmx8m -Xms8m
膀值,方便復(fù)現(xiàn)OOM棍丐。
面試題2:線程池線程數(shù)能大于maxPoolSize嗎?
// 待解答沧踏,看源碼貌似可以歌逢,但是與定義不符啊
面試題3:線程池沒有任務(wù)時(shí)會(huì)怎么樣?
會(huì)阻塞住翘狱,線程池沒有任務(wù)執(zhí)行時(shí)秘案,會(huì)從任務(wù)隊(duì)列 workQueue 取出任務(wù),如果 workQueue 為空潦匈,則會(huì)調(diào)用 wait() 方法進(jìn)入阻塞狀態(tài)阱高,直到有新任務(wù)進(jìn)來喚醒。實(shí)際中大多使用LinkedBlockingQueue 阻塞隊(duì)列茬缩,這里附上了簡(jiǎn)單的 ArrayBlockingQueue源碼
// ArrayBlockQueue#take源碼
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果阻塞隊(duì)列位空赤惊,則進(jìn)入阻塞狀態(tài),等待offer方法喚醒
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
查看 2.8.3 getTask源碼分析章節(jié)凰锡,當(dāng)沒有任務(wù)時(shí)未舟,核心線程會(huì)從任務(wù)隊(duì)列 take 任務(wù)進(jìn)入阻塞狀態(tài),非核心線程會(huì)從任務(wù)隊(duì)列 poll 任務(wù)掂为,若超時(shí)后沒有任務(wù)則線程被回收(執(zhí)行結(jié)束)裕膀。
-
keepAliveTime
線程空閑時(shí)間,單位為 TimeUnit菩掏。當(dāng)線程空閑時(shí)間達(dá)到 keepAliveTime 時(shí)為空閑線程魂角,空閑線程會(huì)被回收銷毀,直到只剩下 corePoolSize 個(gè)線程為止智绸,避免浪費(fèi)內(nèi)存和句柄資源野揪?。默認(rèn)情況下瞧栗,線程數(shù)大于 corePoolSize時(shí)斯稳,keepAliveTime 才會(huì)起作用。比如Executors#newCachedThreadPool
就設(shè)置 keepAliveTime 為60秒迹恐,當(dāng)超過線程空閑時(shí)間超過60s時(shí)挣惰,則會(huì)回收線程直到只剩下 corePoolSize 個(gè)線程為止。 -
threadFactory
線程工廠殴边。用來創(chuàng)建線程憎茂,默認(rèn)使用 Executors.defaultThreadFactory(),創(chuàng)建出來的線程都屬于同一個(gè)線程組锤岸,相同優(yōu)先級(jí)NORM_PRIORITY竖幔,都不是守護(hù)線程,線程名稱形如 pool-poolNumber-trhead-id是偷,自定義線程工廠可以設(shè)置更加友好易讀的線程名稱拳氢。
// Executors 內(nèi)部類 DefaultThreadFactory源碼募逞,參數(shù)r是用戶傳入的任務(wù)
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 設(shè)置為非守護(hù)線程
if (t.isDaemon())
t.setDaemon(false);
// 設(shè)置相同優(yōu)先級(jí)
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
上述的Executors 默認(rèn)的線程工廠過于簡(jiǎn)單,對(duì)用戶不夠友好馋评,線程名稱必須具有特定意義放接,如包含調(diào)用來源,業(yè)務(wù)含義等留特。清晰的線程名稱方便后期調(diào)試和分析纠脾,有助于快速定位死鎖,StackOverflow等問題磕秤。以下為簡(jiǎn)單的自定義線程工廠:
去Github查看代碼示例
2.2.4 常見 4 種拒絕策略
-
handler
線程池該屬性用于執(zhí)行拒絕策略乳乌,默認(rèn)使用 RejectedExecutionHandler。當(dāng)任務(wù)隊(duì)列 workQueue 已滿且線程數(shù)已經(jīng)達(dá)到了 maxPoolSize市咆,則通過該策略拒絕處理請(qǐng)求汉操,這是一種簡(jiǎn)單的限流保護(hù)。友好的拒絕策略應(yīng)該做到以下三點(diǎn):- 保存任務(wù)到數(shù)據(jù)庫(kù)進(jìn)行削峰填谷蒙兰,在空閑時(shí)再取出來執(zhí)行
- 轉(zhuǎn)向某個(gè)提示頁面
- 打印日志
在ThreadPoolExecutor
中提供了 4 個(gè)公開的靜態(tài)內(nèi)部類:
- AbortPolicy(默認(rèn)):丟棄任務(wù)并拋出
RejectedExecutionException
異常 - DiscardPolicy:丟棄任務(wù)磷瘤,但是不拋出異常,不推薦
- DiscardOldestPolicy:丟棄隊(duì)列中等待最久的任務(wù)搜变,然后把當(dāng)前任務(wù)加入隊(duì)列中
- CallerRunsPolicy:調(diào)用任務(wù)的
run()
方法繞過線程池直接執(zhí)行采缚。
一般使用默認(rèn)的拒絕策略即可,如果需要自定義拒絕策略挠他,可以參考代碼示例
2.3 常見5種線程池
Executor 接口定義線程池執(zhí)行任務(wù)的方法 execute()扳抽,
Executors 提供創(chuàng)建線程池的各種方式,類似于Collections
ExecutorService 接口定義了管理線程任務(wù)的方法 submit()殖侵、shutdown() 和 invokeAll()
AbstractExecutorService 對(duì) submit()和 invokeAll() 進(jìn)行實(shí)現(xiàn)贸呢。
Parameter | FixedThreadPool | SingleTExecutor | CachedTPool | ScheduledTPool |
---|---|---|---|---|
corePoolSize | n | 1 | 0 | n |
maxNumPoolSize | n | 1 | Integer.MAX_VALUE | Integer.MAX_VALUE |
keepAliveTime | 0 s | 0 s | 60 s | 60 s |
workQueue | LinkedBlockingQueue | LinkedBlockingQueue | SynchronousQueue | DelayedWorkQueue |
以下方法都有多種重載方法,可以設(shè)置線程數(shù)和自定義線程工廠拢军,前三個(gè)返回ThreadPoolExecutor 對(duì)象楞陷,第四個(gè)返回 ScheduledThreadPoolExecutor 對(duì)象。
-
Executors#newSingleThreadExecutor()
創(chuàng)建一個(gè)單線程的線程池茉唉,相當(dāng)于單線程串行執(zhí)行所有任務(wù)固蛾。 -
Executors#newFixedThreadPool(n)
創(chuàng)建指定固定線程數(shù)的線程池,core 等于 maxSize度陆,不存在空閑線程艾凯,所以keepAliveTime為 0 -
Executors#newCachedThreadPool()
高度可伸縮線程池,coreSize為0懂傀,任務(wù)隊(duì)列長(zhǎng)度為0趾诗,任務(wù)直接交給線程池創(chuàng)建線程執(zhí)行,線程空閑時(shí)間超過60s會(huì)被回收鸿竖。 -
Executors#newScheduledThreadPool()
適支持定時(shí)及周期性任務(wù)執(zhí)行沧竟,比 Timer 更安全,功能更強(qiáng)大缚忧。是ScheduledThreadPoolExecutor
的對(duì)象悟泵。
通過了解以上4種自動(dòng)創(chuàng)建線程池的方法,每種方法都有不恰當(dāng)之處闪水,maxNumPoolSize 設(shè)置過大糕非,workQueue 設(shè)置為無界隊(duì)列,在流量突增時(shí)都會(huì)引發(fā) OOM球榆,所以《阿里巴巴Java編程規(guī)范》中不允許使用 Executors朽肥,推薦使用 ThreadPoolExecutor 的方式創(chuàng)建線程池,這樣的處理方式能更加明確線程池的運(yùn)行規(guī)則持钉,規(guī)避資源耗盡的風(fēng)險(xiǎn)衡招。
-
Executors#newWorkStealingPool()
JDK8引入,創(chuàng)建持有足夠線程的線程池每强。從下面源碼中可以看出始腾,該線程池把 CPU 核心數(shù)量設(shè)置為默認(rèn)的并行度。通過線程池類圖可知空执,其他線程池本質(zhì)都是ThreadPoolExecutor
浪箭,而Executors#newWorkStealingPool()
創(chuàng)建的線程池是ForkJoinPool
對(duì)象
// Executors 創(chuàng)建線程池源碼
public static ExecutorService newWorkStealingPool() {
// 線程數(shù)為 CPU 核心數(shù)
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
這個(gè)線程池和前面的線程池有所不同,在能產(chǎn)生子任務(wù)的場(chǎng)景才適合使用該線程池辨绊。線程可以竊取(Stealing)其他線程的任務(wù)奶栖,提高并行度。
需要注意的是最好不要加鎖门坷,因?yàn)槿蝿?wù)并行執(zhí)行宣鄙,不加鎖才能發(fā)揮出并行的效果。不保證執(zhí)行順序拜鹤,使用場(chǎng)景有限框冀。例如遞歸情況,可以使用該線程池敏簿。
2.4 手動(dòng)創(chuàng)建線程池
上文中說道明也,手動(dòng)創(chuàng)建線程池優(yōu)于JDK自動(dòng)創(chuàng)建線程池,但是線程池中的線程數(shù)應(yīng)該設(shè)置為多少呢惯裕?
CPU 密集型:比如加密、計(jì)算 hash 等任務(wù)蜻势,任務(wù)屬于 CPU 密集型撑刺,最佳線程數(shù)為 CPU 核心數(shù)。
耗時(shí) IO 型:比如讀寫數(shù)據(jù)庫(kù)握玛、文件和網(wǎng)絡(luò)通信等够傍,CPU一般是不工作的甫菠,外設(shè)的速度遠(yuǎn)遠(yuǎn)慢于 CPU,最佳線程數(shù)可以設(shè)置為 cpu 核心數(shù)的很多倍冕屯。
線程數(shù) = cpu 核心數(shù) * (1+平均等待時(shí)間/平均工作時(shí)間)
手動(dòng)創(chuàng)建線程池寂诱,應(yīng)該注意以下 4 點(diǎn):
- 避免誤解隊(duì)列,防止 OOM
- 自定義線程工廠安聘,設(shè)置合適的線程名稱痰洒,方便后期調(diào)試
- 一般使用默認(rèn)的拒絕策略,也可以自定義拒絕策略
- 設(shè)置合適的 corePoolSize 和 maxPoolSize
代碼示例如下所示浴韭,去Github查看詳細(xì)代碼示例
public class UserThreadPool {
/**
* 執(zhí)行main方法會(huì)打印如下日志:
* UserThreadFactory's 第1機(jī)房-Worker-1 // 線程工程創(chuàng)建線程丘喻,打印線程名稱
* UserThreadFactory's 第1機(jī)房-Worker-2
* running_0 // task內(nèi)容,打印任務(wù)執(zhí)行次數(shù)
* running_1
* running_2
* running_3
* you task is rejected. count=158. java.util.concurrent.ThreadPoolExecutor@677327b6[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 4]
* // 打印線程池狀態(tài)念颈,線程池線程數(shù)量為2泉粉,達(dá)到了我們定義的maxPoolSize=2,任務(wù)隊(duì)列數(shù)量為2榴芳,達(dá)到了我們定義的workQueue容量搀继,完成的任務(wù)數(shù)completed tasks有4個(gè)。
* // 到最終日志翠语,任務(wù)的執(zhí)行次數(shù) running_和任務(wù)拒絕次數(shù)rejectCount相加等于總?cè)蝿?wù)數(shù)200
* // 有時(shí) queued tasks 不一定等于2叽躯,因?yàn)閳?zhí)行拒絕策略時(shí)隊(duì)列元素為2,打印時(shí)隊(duì)列元素可能已經(jīng)被取走執(zhí)行了肌括,復(fù)現(xiàn)時(shí)可以刪除 threadpool.demo.Task 類的 sleep方法点骑。
*
*/
public static void main(String[] args) {
// 1. 任務(wù)隊(duì)列,避免誤無解隊(duì)列谍夭,防止OOM異常
BlockingQueue workQueue = new LinkedBlockingQueue(2);
// 2. 線程工廠黑滴,定義合適的線程名稱
UserThreadFactory f1 = new UserThreadFactory("第1機(jī)房");
// 3. 拒絕策略
UserRejectHandler handler = new UserRejectHandler();
// 4.1 創(chuàng)建線程池,使用自定義線程工廠和拒絕策略
ThreadPoolExecutor threadPool1 = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, workQueue, f1, handler);
// 4.2 使用默認(rèn)的線程工廠和拒絕策略紧索,使用大多數(shù)情況袁辈,這里的拒絕策略時(shí)AbortPolicy,即丟棄任務(wù)并拋出異常
ThreadPoolExecutor threadPool2 = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, workQueue);
// 使用線程池珠漂,
Runnable task= new Task();
for (int i = 0; i < 200; i++) {
// 使用自定義默認(rèn)策略晚缩,拒絕任務(wù)并打印線程池狀態(tài)
// 任務(wù)的執(zhí)行次數(shù) running_和任務(wù)拒絕次數(shù)rejectCount之和應(yīng)為總?cè)蝿?wù)數(shù)200
threadPool1.execute(task);
//threadPool2.execute(task); // 使用默認(rèn)的拒絕策略,拒絕任務(wù)媳危,并拋出異常
}
}
}
// todo 此處應(yīng)該補(bǔ)充 Spring 中創(chuàng)建池的方法荞彼。
https://mp.weixin.qq.com/s/z3gjfk4l-s8aKD4cvY8CHA
https://mp.weixin.qq.com/s/05Ud0t7ECIYWMePhuOf6tg
// springboot中啟動(dòng)異步任務(wù)@Async ,定時(shí)任務(wù)@Scheduled
https://www.bilibili.com/video/av38657363?p=95
2.5 停止線程池
ThreadPoolExecutor
停止線程有 5 個(gè)相關(guān)方法:
-
shutdown()
:通知線程池停止待笑,線程池會(huì)等已經(jīng)添加的任務(wù)執(zhí)行結(jié)束后停止 -
isShutdown()
:查看線程池是否收到了 shutdown通知鸣皂,若收到,則不能添加新任務(wù) -
isTerminated()
:判斷線程池是否已經(jīng)停止,與shutdown不同寞缝,shutdown是判斷線程池是否開始停止癌压,即是否收到了通知 - `awaitTermination(long timeout)) :等待一段時(shí)間,檢測(cè)線程池是否已經(jīng)停止荆陆,該方法是一個(gè)死循環(huán)措拇,若線程池停止了,返回true慎宾,若線程池未停止,則一直檢測(cè)直至超時(shí)浅悉,返回false
-
shutdownNow()
:強(qiáng)制停止線程池趟据,使用 interrupt 方法通知正在執(zhí)行的線程,并返回任務(wù)隊(duì)列中的任務(wù)集合术健,用于記錄日志或保存到數(shù)據(jù)庫(kù)中汹碱。
去Github查看停止線程池 5 種相關(guān)方法的代碼示例。
這5個(gè)方法的源碼分析見《Java 并發(fā)編程之美 8.3.3》
2.6 鉤子方法與線程池監(jiān)控
2.6.1 鉤子Hook方法
鉤子方法:簡(jiǎn)單的理解就是一個(gè)流程荞估,在一個(gè)方法實(shí)現(xiàn)咳促,流程中存在需要自定義的部分,則抽象出一個(gè)鉤子方法勘伺,相同的部分則在流程中的實(shí)現(xiàn)即可跪腹。查看Java中的鉤子方法
在每個(gè)任務(wù)執(zhí)行前后都會(huì)調(diào)用鉤子方法,可以進(jìn)行日志記錄和統(tǒng)計(jì)等工作飞醉。
ThreadPoolExecutor 類中有三個(gè)個(gè)鉤子方法冲茸,默認(rèn)都是空實(shí)現(xiàn)。其中
beforeExecute
和afterExecute
缅帘,在每個(gè)任務(wù)執(zhí)行前后都會(huì)調(diào)用這兩個(gè)方法轴术,在1.8源碼分析章節(jié) runWorker 中可以看到這兩個(gè)方法在任務(wù)執(zhí)行前后被調(diào)用;terminated
是在線程池被回收前調(diào)用钦无。使用鉤子方法就是繼承 ThreadPoolExecutor 類并重寫這幾個(gè)方法逗栽,然后就可以實(shí)現(xiàn)相關(guān)功能。
在1.8源碼分析章節(jié)可以看到線程池執(zhí)行任務(wù) runWorker 方法前后都會(huì)調(diào)用兩個(gè)鉤子方法失暂。
可以利用鉤子方法實(shí)現(xiàn)線程池的暫停與恢復(fù)彼宠,具體代碼示例見Github。
2.6.2 線程池監(jiān)控
線程池具有非常多的屬性弟塞,比如前面提到的核心線程數(shù)兵志,最大線程數(shù)等,線程池對(duì)于各種屬性提供了訪問接口:
-
getCorePoolSize()
獲取核心線程數(shù) corePoolSize -
getMaximumPoolSize()
獲取最大線程數(shù) maximumPoolSize -
getKeepAliveTime()
獲取空閑線程存活時(shí)間 -
getQueue()
獲取任務(wù)隊(duì)列 -
getRejectedExecutionHandler()
獲取拒絕策略對(duì)象 -
getThreadFactory()
獲取線程工廠對(duì)象
前6個(gè)方法是獲取 ThreadPoolExecutor 的構(gòu)造參數(shù)宣肚,下面的方法是獲取線程池的一些其他屬性想罕,這些屬性我們會(huì)在1.8源碼分析章節(jié)經(jīng)常見到。
-
getPoolSize()
獲取線程池中線程個(gè)數(shù) -
getLargestPoolSize()
獲取線程池中線程數(shù)出現(xiàn)過的最大值,創(chuàng)建工作線程 addWorker 方法會(huì)更新該屬性 -
getActiveCount()
獲取工作線程數(shù)按价,即工作線程集 workers 的大小 -
getTaskCount()
獲取任務(wù)總數(shù)惭适,包括已完成、未完成和正在執(zhí)行的任務(wù) -
getCompletedTaskCount()
獲取已完成的任務(wù)數(shù)楼镐,在 runWorker 中線程 worker 任務(wù)執(zhí)行完成會(huì)更新該屬性癞志,getCompletedTaskCount就是將線程工作集 workers 中所有線程完成任務(wù)數(shù)相加返回
2.7 線程池的 5 種狀態(tài)
線程池一共有 5 種狀態(tài),狀態(tài)轉(zhuǎn)換如下圖所示:
- RUNNING:接受新任務(wù)框产,并且處理阻塞隊(duì)列里的任務(wù)
- SHUTDOWN:拒絕新任務(wù)凄杯,但是處理阻塞隊(duì)列里的任務(wù)
- STOP:拒絕新任務(wù),并且丟棄阻塞隊(duì)列里的任務(wù)
- TIDYING:整理狀態(tài)秉宿,所有任務(wù)都執(zhí)行完后(包括任務(wù)隊(duì)列中的任務(wù))戒突,當(dāng)前線程池工作線程為0,將要調(diào)用 terminated 方法
- TERMINATED:終止?fàn)顟B(tài)描睦,terminated 方法調(diào)用完后的狀態(tài)
線程池狀態(tài)轉(zhuǎn)換:
- RUNNING -> SHUTDOWN:用戶顯式調(diào)用 shutdown() 方法膊存,或者線程池對(duì)象被回收時(shí)隱式調(diào)用了 finalize() 中的shutdown() 方法。
- RUNNING / SHUTDOWN -> STOP:顯式調(diào)用 shutdownNow() 方法
- SHUTDOWN -> TIDYING:當(dāng)線程池和任務(wù)隊(duì)列都為空時(shí)
- STOP -> TIDYING:當(dāng)線程池為空時(shí)
- TIDYING -> TERMINATER:當(dāng) terminated() hook方法執(zhí)行完成時(shí)
源碼分析:
在 ThreadPoolExecutor 的屬性定義中頻繁使用位移運(yùn)算來表示線程池狀態(tài)忱叭,位移運(yùn)算是改變當(dāng)前值的一種高效手段隔崎,查看 ThreadPoolExecutor 源碼可知,線程池一共有 5 種狀態(tài):
// ThreadPoolExecutor 屬性韵丑,用來表示線程池狀態(tài)
// 線程池默認(rèn)為RUNNING狀態(tài)爵卒,線程個(gè)數(shù)為0,合并起來得到線程池的ctl值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// int值共有32位撵彻,右邊29位表示工作線程數(shù)技潘,左邊3位表示線程池狀態(tài)。3個(gè)二進(jìn)制位可以表示0-7
// Integer.SIZE為32千康,COUNT_BITS
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000-11111111111111111111111111111,類似于子網(wǎng)掩碼享幽,用于位運(yùn)算,使用見下方方法
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 用左邊3位二進(jìn)制拾弃,表示線程池狀態(tài)
// RUNNING狀態(tài)值桩,表示可以接受新任務(wù),并且處理隊(duì)列里的任務(wù)
// -1左移29位表示 111_00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN狀態(tài)豪椿,此狀態(tài)不再接受新任務(wù)奔坟,但可以繼續(xù)執(zhí)行隊(duì)列中的任務(wù)
// 0左移29位表示 000_00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP狀態(tài),此狀態(tài)全面拒絕任務(wù)搭盾,并中斷正在處理的任務(wù)
// 1左移29位表示 001_00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// TIDYING狀態(tài)咳秉,此狀態(tài)表示所有任務(wù)已經(jīng)被終止
// 2左移29位表示 010_00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED狀態(tài),此狀態(tài)表示所有任務(wù)已經(jīng)被終止
// 3左移29位表示 011_00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 返回線程池狀態(tài)鸯隅,c是線程池狀態(tài)和線程數(shù)的int值澜建,與~CAPACITY做與運(yùn)算向挖,得到左3位二進(jìn)制線程池狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 返回線程池線程數(shù),與CAPACITY做與運(yùn)算炕舵,得到右29位二進(jìn)制線程數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
// 把左邊三位與右邊29位或運(yùn)算何之,合并成一個(gè)值
private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池的狀態(tài)用ctl
整型的左3位表示,五種線程池狀態(tài)的十進(jìn)制值大小依次為:
RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
這樣設(shè)計(jì)的好處是很容易通過比較值大小來判斷線程池狀態(tài)咽筋,例如線程池中經(jīng)常需要 isRunning 判斷
// 判斷線程池是否為Running狀態(tài)
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
根據(jù)以上可知溶推,線程池調(diào)用 shutdown() 方法后會(huì)從RUNNING狀態(tài)轉(zhuǎn)換為SHUTDOWN狀態(tài),下面 ThreadPoolExecutor 源碼中展示了線程池狀態(tài) RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED 的轉(zhuǎn)換過程奸攻,參考《并發(fā)編程之美8.3.3》鳄哭。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 1. 設(shè)置線程池狀態(tài)為SHUTDOWN
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試終止線程池
tryTerminate();
}
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// ......
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 2. 設(shè)置當(dāng)前線程池狀態(tài)為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 調(diào)用鉤子方法
terminated();
} finally {
// 3. terminated方法調(diào)用完畢后静稻,設(shè)置線程池狀態(tài)為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
protected void terminated() { }
上面的源碼線程轉(zhuǎn)換過程缺少了STOP狀態(tài)惩阶,下面源碼展示了線程對(duì)象調(diào)用shutdownNow()
方法后蜂大,線程狀態(tài) RUNNING -> STOP -> TIDYING -> TERMINATED 的轉(zhuǎn)換過程。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 1. 設(shè)置線程池狀態(tài)為STOP
advanceRunState(STOP);
interruptWorkers();
// 將任務(wù)隊(duì)列元素移出疏橄,不執(zhí)行這些任務(wù)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 2. 設(shè)置線程池狀態(tài)為TIDYING,調(diào)用terminated方法
// 3. 設(shè)置線程池狀態(tài)為TERMINATED
tryTerminate();
return tasks;
}
// 略就?疑問:SHUTDOWN -> TIDYING 要等待線程池和任務(wù)隊(duì)列為空才行捎迫,源碼中并沒有看到這一點(diǎn)。
// 補(bǔ)充:mainLock的作用分析
2.8 實(shí)現(xiàn)原理與源碼分析
ThreadPoolExecutor 源碼有 4 個(gè)重要組成部分:
- 線程池的各個(gè)狀態(tài)表牢,源碼分析見線程池狀態(tài)章節(jié)窄绒,
- 提交任務(wù)執(zhí)行是 execute() 方法,源碼分析見添加線程規(guī)則章節(jié)
- 創(chuàng)建并啟動(dòng)新線程是 addWorker()方法
- 使用線程池中線程執(zhí)行任務(wù)是 runWorker()方法
ThreadPoolExecutor 重要方法調(diào)用圖如下所示崔兴,詳細(xì)是嵌套調(diào)用彰导,向右是串行執(zhí)行:
上圖中 Worker 是 Runnable 接口實(shí)現(xiàn)類,啟動(dòng)線程需要 new Thread(Runnable worker).start()敲茄,而創(chuàng)建線程工作在 Worker 構(gòu)造方法中使用線程工廠創(chuàng)建位谋,創(chuàng)建的線程保存在 Worker 類的 final Thread thread 屬性中,啟動(dòng)線程也是調(diào)用的 run() 方法就是 Runnable worker 的run() 方法堰燎,調(diào)用了 ThreadPoolExecutor#runWorker方法掏父。這里比較繞,詳細(xì)在 addWorker 與 runWorker 源碼分析章節(jié)介紹秆剪。
2.8.1 創(chuàng)建工作線程 addWorker 源碼分析
execute() 方法中創(chuàng)建線程調(diào)用的是 addWorker()方法赊淑,即新增工作線程 worker,代碼如下:
/**
* 根據(jù)當(dāng)前線程池狀態(tài)仅讽,檢測(cè)是否可以添加新的工作線程陶缺,如果可以則創(chuàng)建新線程并執(zhí)行任務(wù)
* 如果一切正常則返回true,返回false有倆種情況:
* 1. 線程池非RUNNING狀態(tài)
* 2. 線程工廠創(chuàng)建新線程失敗
* @param firstTask 用戶需要執(zhí)行的任務(wù)Runnable洁灵,使用該任務(wù)來new 線程Thread饱岸,外部啟動(dòng)線程池時(shí)需要構(gòu)造的第一個(gè)線程,它是線程的母體
* @param core 新增線程時(shí)的判斷指標(biāo):
* true 判斷指標(biāo)為corePoolSize,RUNNING狀態(tài)線程池的線程個(gè)數(shù)小于corePoolSize則可以創(chuàng)建新線程
* false 判斷指標(biāo)為maximumPoolSize伶贰,RUNNING狀態(tài)線程池的線程數(shù)小于maximumPoolSize則可以創(chuàng)建新線程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 獲取線程池的狀態(tài)與線程個(gè)數(shù)組合值c蛛砰,詳細(xì)見線程狀態(tài)章節(jié)
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// 1. 如果為RUNNING狀態(tài),則無法進(jìn)入if語句黍衙,直接進(jìn)行下一步for
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 2. 循環(huán)CAS增加線程個(gè)數(shù)
for (;;) {
// 獲取線程池線程個(gè)數(shù)
int wc = workerCountOf(c);
// 2.1 線程數(shù)最大為2^29泥畅,否則將影響左3位的線程池狀態(tài)值
// 判斷線程個(gè)數(shù)是否大于等于核心數(shù)(最大數(shù))
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 2.2 CAS 增加線程數(shù),修改ctl值琅翻,成功后跳出循環(huán)到retry標(biāo)簽
if (compareAndIncrementWorkerCount(c))
break retry;
// 2.3 如果CAS增加線程數(shù)失敗位仁,則繼續(xù)執(zhí)行內(nèi)層for循環(huán),重新CAS
// 線程池狀態(tài)線程個(gè)數(shù)值是可變化的方椎,需要獲取最新值
c = ctl.get(); // Re-read ctl
// 查看線程池狀態(tài)是否變化聂抢,如果變化則跳出for循環(huán)到retry標(biāo)簽
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 3. 到這里說明CAS成功了,這里開始創(chuàng)建新的工作線程
boolean workerStarted = false; // 標(biāo)記線程是否啟動(dòng)成功
boolean workerAdded = false; // 標(biāo)記線程是否新增到workers成功
// Worker是工作線程棠众,實(shí)現(xiàn)了Runnable接口琳疏,是ThreadPoolExecutor的內(nèi)部類
Worker w = null;
try {
// 3.1 創(chuàng)建新線程,這行代碼已經(jīng)利用Worker構(gòu)造方法中的線程池工廠創(chuàng)建了新線程
// 并封裝成工作線程Worker對(duì)象闸拿,線程創(chuàng)建時(shí)參數(shù)Runnable為firstTask
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 3.2 添加獨(dú)占鎖空盼,為了實(shí)現(xiàn)workers同步,因?yàn)榭赡芏鄠€(gè)線程同時(shí)調(diào)用了同一個(gè)線程池的execute方法
mainLock.lock();
try {
// 3.3 獲取線程池狀態(tài)新荤,避免在獲取鎖前調(diào)用了shutdown方法
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果線程已啟動(dòng)調(diào)用過start方法揽趾,則拋出異常,等價(jià)于普通線程啟動(dòng)狀態(tài)校驗(yàn)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 3.4 添加新創(chuàng)建的工作線程 w 到工作線程集workers
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 3.5 添加成功后啟動(dòng)工作線程苛骨,調(diào)用線程start方法篱瞎,這個(gè)工作線程是線程池工廠創(chuàng)建的
if (workerAdded) {
// 啟動(dòng)線程,調(diào)用線程t的run方法痒芝,
// 線程t是new Thread(Runnable worker)得到的俐筋,所以會(huì)調(diào)用worker.run()方法
t.start();
// 線程啟動(dòng)成功
workerStarted = true;
}
}
} finally {
// 3.6 如果線程啟動(dòng)失敗,則把新增的線程從workers中remove掉严衬,然后線程個(gè)數(shù)ctl減1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代碼較長(zhǎng)校哎,主要分為兩個(gè)部分:1.雙重循環(huán)通過 CAS 操作增加線程數(shù);2.創(chuàng)建新工作線程瞳步,并線程線程安全的將新線程添加到 workers 中闷哆,并且啟動(dòng)新線程。
首先來分析第一段代碼单起,通過CAS增加線程數(shù)抱怔,代碼1中,下面三種情況會(huì)新增線程失敗并返回 false:
- 當(dāng)前線程池狀態(tài)為STOP嘀倒,TIDYING和TERMINATED
- 線程池狀態(tài)為SHUTDOWN并且已經(jīng)有了firstTask
- 當(dāng)前線程池狀態(tài)為SHUTDOWN并且任務(wù)隊(duì)列為空
內(nèi)層循環(huán)的作用是使用CAS操作設(shè)置線程個(gè)數(shù)屈留,代碼2.1判斷如果線程個(gè)數(shù)超限制則返回 false局冰,否則執(zhí)行代碼2.2CAS操作設(shè)置線程個(gè)數(shù),CAS成功則退出雙新歡灌危,CAS失敗則執(zhí)行代碼2.3看線程狀態(tài)是否變化康二,如果變了,則再次進(jìn)入外層循環(huán)重新獲取線程池狀態(tài)勇蝙,否則依舊在內(nèi)存循環(huán)繼續(xù)CAS嘗試沫勿。
compareAndIncrementWorkerCount() 方法是CAS添加線程個(gè)數(shù),修改表示線程池狀態(tài)和線程個(gè)數(shù)的 ctl 值味混,執(zhí)行失敗概率非常低产雹,類似自旋鎖原理。這里的處理邏輯是線程個(gè)數(shù)先加 1翁锡,如果后面創(chuàng)建線程失敗再減 1蔓挖,這是輕量處理并發(fā)創(chuàng)建線程的方式。如果先創(chuàng)建線程馆衔,成功再加 1瘟判,當(dāng)發(fā)現(xiàn)超出線程數(shù)限制后再銷毀線程,這種處理方式明顯比前者的代價(jià)要大角溃。
第二部分的代碼3說明使用CAS成功的增加了線程個(gè)數(shù)拷获,但是現(xiàn)在新線程還未創(chuàng)建,這里使用全局的獨(dú)占鎖把系新增的線程 worker 添加的工作線程集 workers 中开镣。
代碼3.1使用線程工廠創(chuàng)建了一個(gè)新的工作線程刀诬,代碼3.2獲取了獨(dú)占鎖咽扇,代碼3.3重新檢查線程池狀態(tài)邪财,這是為了避免在獲取鎖前其他線程調(diào)用了shutdown方法關(guān)閉了線程池。如果線程池被關(guān)閉质欲,則直接釋放鎖树埠;否則執(zhí)行代碼3.4添加新工作線程 w 到工作線程集 workers 中,然后釋放鎖嘶伟。代碼3.5判斷如果新增工作線程成功怎憋,則啟動(dòng)新線程。
代碼3.6判斷線程是否啟動(dòng)成功九昧,如果啟動(dòng)會(huì)失敗則從工作線程集 wokers 中移除新線程 w绊袋,并將線程數(shù)減 1。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 從工作線程集中移除新線程
workers.remove(w);
// 將線程個(gè)數(shù)減1铸鹰,修改ctl值癌别,對(duì)應(yīng)compareAndIncrementWorkerCount方法
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
用戶提交任務(wù)到線程池后,由工作線程 Worker 來執(zhí)行蹋笼,Worker 的構(gòu)造函數(shù)如下:
// 工作線程Worker實(shí)現(xiàn)了Runnable接口展姐,并把本對(duì)象作為參數(shù)傳入newThread()創(chuàng)建新線程躁垛,
// new Thread(Runnable r)中參數(shù)r就是用戶任務(wù)firstTask
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
Worker(Runnable firstTask) {
setState(-1); // 在調(diào)用runWorker前禁止中斷
this.firstTask = firstTask;
// 使用線程工廠創(chuàng)建一個(gè)新線程, 并把本對(duì)象作為參數(shù)傳入newThread()創(chuàng)建新線程
// 線程工廠中new Thread(Runnable worker)參數(shù)就是當(dāng)前worker對(duì)象
this.thread = getThreadFactory().newThread(this);
}
// 在addWorker中啟動(dòng)線程是Worker.thread.start(),
// 最終還是調(diào)用worker對(duì)象的run方法圾笨,即調(diào)用runWorker()方法
@Override
public void run() {
// 任務(wù)執(zhí)行邏輯與線程復(fù)用
runWorker(this);
}
由上面源碼可知教馆,addWorker 中使用線程工廠創(chuàng)建并啟動(dòng)新線程,最終執(zhí)行任務(wù)調(diào)用的是 Worker#runWorker 方法擂达。
2.8.2 實(shí)現(xiàn)線程復(fù)用 runWorker 源碼分析
上一小節(jié)我們知道 addWorker 創(chuàng)建了工作線程后土铺,調(diào)用 Worker.thread.start() 啟動(dòng)線程,執(zhí)行用戶傳入的任務(wù)則在 Worker#run 方法中谍婉,具體的執(zhí)行任務(wù)邏輯和線程復(fù)用執(zhí)行任務(wù)的實(shí)現(xiàn)在 runWorker 方法中舒憾,源碼如下所示:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 用戶提交的任務(wù)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// getTask從任務(wù)隊(duì)列取出任務(wù),執(zhí)行任務(wù)穗熬,直至任務(wù)隊(duì)列為空
// 當(dāng)前代碼都在工作線程的run方法中镀迂,循環(huán)調(diào)用任務(wù)的run方法相當(dāng)于在復(fù)用工作線程
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 1. 調(diào)用前置鉤子方法,在每個(gè)任務(wù)執(zhí)行前調(diào)用
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 2. 調(diào)用用戶提交任務(wù)的task的run()方法唤蔗,worker串行執(zhí)行探遵,相當(dāng)于worker線程一直在運(yùn)行用戶的任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 3. 調(diào)用后置鉤子方法,在每個(gè)任務(wù)執(zhí)行后調(diào)用
afterExecute(task, thrown);
}
} finally {
task = null;
// 4. 統(tǒng)計(jì)該worker線程完成任務(wù)的個(gè)數(shù)
w.completedTasks++;
w.unlock();
}
} // while
// 如果在執(zhí)行任務(wù)期間未拋出異常妓柜,則可以執(zhí)行這一行代碼
completedAbruptly = false;
} finally {
// 5. 執(zhí)行清理工作箱季,每個(gè)worker執(zhí)行多個(gè)任務(wù),但只進(jìn)行一次清理工作
// 統(tǒng)計(jì)所有worker線程完成的任務(wù)總數(shù)棍掐,對(duì)當(dāng)前worker進(jìn)行銷毀
processWorkerExit(w, completedAbruptly);
}
}
實(shí)現(xiàn)工作線程復(fù)用就是在while循環(huán)中不斷從隊(duì)列中獲取任務(wù)Runnable task藏雏,然后調(diào)用task.run() 來處理用戶任務(wù),即實(shí)現(xiàn)了工作線程復(fù)用作煌。
2.8.3 取出任務(wù)與回收超時(shí)線程 getTask
runWorker 在執(zhí)行任務(wù)時(shí)掘殴,需要不斷從任務(wù)隊(duì)列取出任務(wù),取出任務(wù)的邏輯則在 getTask() 方法中實(shí)現(xiàn)粟誓,getTask() 取出任務(wù)有 3 種情況:
- 任務(wù)隊(duì)列不為空奏寨,取出任務(wù)并返回
- 任務(wù)隊(duì)列為空,則需要判斷是否需要回收超時(shí)線程鹰服,若需要回收超時(shí)線程病瞳,則該工作線程調(diào)用
workQueue.poll(keepAliveTime)
,等待任務(wù)隊(duì)列 keepAliveTime悲酷,超時(shí)后返回null任務(wù)套菜。 - 任務(wù)隊(duì)列為空,若不需要回收超時(shí)線程设易,則該工作線程調(diào)用
workQueue.take()
進(jìn)入阻塞狀態(tài)逗柴,直至任務(wù)隊(duì)列有新任務(wù)出現(xiàn)。這也告訴了我們當(dāng)線程池沒有任務(wù)時(shí)亡嫌,線程池中 corePoolSize 數(shù)量的線程處于阻塞狀態(tài)嚎于,有了新任務(wù)后喚醒線程到RUNNABLE狀態(tài)掘而,繼續(xù)復(fù)用該線程。
彩蛋:阻塞隊(duì)列出隊(duì)操作 take 與 poll 的區(qū)別如何記憶于购?
poll 中有 O袍睡,就像一個(gè)表在計(jì)時(shí),超時(shí)后就返回null了肋僧,而 take 會(huì)一直等待
getTask() 的源碼分析如下:
private Runnable getTask() {
// 判斷是否對(duì)超時(shí)空閑線程進(jìn)行回收斑胜,當(dāng)任務(wù)隊(duì)列為空時(shí)設(shè)置timeOut為true
boolean timedOut = false; // Did the last poll() time out?
// 從隊(duì)列中取出任務(wù),若隊(duì)列為空嫌吠,則CAS修改ctl變量的工作線程個(gè)數(shù)
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 工作線程數(shù)
int wc = workerCountOf(c);
// 工作線程數(shù)大于核心數(shù)止潘,則需要超時(shí)判斷
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 工作線程數(shù)必須大于1或工作隊(duì)列為空,此時(shí)才考慮回收線程
// 為什么這里不進(jìn)行線程超時(shí)判斷就減少工作線程辫诅?
// 超時(shí)判斷keepAliveTime在下面從隊(duì)列取出元素時(shí)進(jìn)行
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// ctl 工作線程數(shù)-1凭戴,具體的線程銷毀工作會(huì)在processWorkerExit中執(zhí)行
if (compareAndDecrementWorkerCount(c))
// 2. 需要回收超時(shí)線程,超時(shí)未取到任務(wù)返回null
return null;
continue;
}
try {
// 工作線程數(shù)若大于corePoolSize炕矮,需要回收超時(shí)線程么夫,三目表達(dá)式
// 2. poll 是從任務(wù)隊(duì)列中取出隊(duì)首元素,若隊(duì)列為空肤视,則等待keepAliveTime档痪,超時(shí)后返回null
// 3. take 是從任務(wù)隊(duì)列中取出隊(duì)首元素,若隊(duì)列為空邢滑,則一直阻塞腐螟,直至取出任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
// 1. 任務(wù)隊(duì)列不為空,返回任務(wù)
return r;
// 任務(wù)隊(duì)列為空困后,keepAliveTime時(shí)間內(nèi)也沒有取出元素乐纸,
// 則表示需要線程超時(shí)判斷置為true,回收空閑線程
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
// 工作線程w執(zhí)行完隊(duì)列中所有任務(wù)后操灿,執(zhí)行清理統(tǒng)計(jì)工作
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 工作線程執(zhí)行任務(wù)未正確結(jié)束锯仪,如發(fā)生了異常等泵督,需要將工作線程數(shù)-1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1.統(tǒng)計(jì)整個(gè)線程池完成的任務(wù)數(shù)
completedTaskCount += w.completedTasks;
// 2.移除工作線程w趾盐,線程數(shù)ctl值的修改在getTask任務(wù)中已經(jīng)完成
// 這里移除的線程一定是非核心線程,核心線程沒有任務(wù)會(huì)阻塞小腊,不會(huì)進(jìn)行到這一步
workers.remove(w);
} finally {
mainLock.unlock();
}
// 3. 設(shè)置線程池狀態(tài)為TIDYING救鲤,調(diào)用鉤子terminated方法
// 4. 設(shè)置線程池狀態(tài)為 TERMINATED
tryTerminate();
// 創(chuàng)建線程至核心數(shù)corePoolSize,存在corePoolSize=0秩冈,出現(xiàn)新任務(wù)的情況
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 該工作線程成功結(jié)束
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 最小線程數(shù)為0且任務(wù)隊(duì)列不為空本缠,說明不合適,需要將最小線程數(shù)設(shè)置為1
// 隊(duì)列為空則說明最小線程為1也可以入问,則不修改min
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 工作線程數(shù)大于min丹锹,則返回即可
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 到這一步稀颁,說明工作線程數(shù)小于min,需要新增工作線程
addWorker(null, false);
}
}
面試題4: 我們知道線程狀態(tài)從RUNNABLE到TERMINATED是不可逆的楣黍,比如一個(gè)線程只能調(diào)用一次start() 方法匾灶,線程生命周期到了TERMINATED就結(jié)束了,無法重新到達(dá)RUNNABLE狀態(tài)租漂。
那么線程池能夠復(fù)用線程是怎么實(shí)現(xiàn)的阶女?
通過 ThreadPoolExecutor#runWorker 源碼可知,一個(gè)工作線程啟動(dòng)后哩治,run 方法中循環(huán)調(diào)用用戶任務(wù) Runnable#run() 方法執(zhí)行任務(wù)秃踩,相當(dāng)于工作線程始終處于 RUNNABLE 狀態(tài),直至所有用戶任務(wù)執(zhí)行完畢业筏,線程才會(huì)退出循環(huán)憔杨,線程到達(dá)TERMINATED狀態(tài)。
面試題5: 前面提到線程池中任務(wù)執(zhí)行完畢后蒜胖,會(huì)回收空閑線程芍秆,保留 corePoolSize 個(gè)線程。如上一問題所說翠勉,任務(wù)執(zhí)行完畢后線程會(huì)到達(dá)TERMINATED狀態(tài)妖啥,那如何保留工作線程呢?
通過 ThreadPoolExecutor#getTask 源碼可知对碌,在工作線程執(zhí)行任務(wù)時(shí)荆虱,會(huì)從任務(wù)隊(duì)列取出任務(wù),當(dāng)任務(wù)隊(duì)列為空時(shí)朽们,會(huì)調(diào)用workQueue.take()
怀读,使線程進(jìn)入阻塞狀態(tài),當(dāng)有任務(wù)時(shí)骑脱,會(huì)線性線程到 Runnable 狀態(tài)繼續(xù)復(fù)用線程菜枷。
1.9 線程池設(shè)計(jì)模式
雖然在 Java 語言中創(chuàng)建線程看上去就像創(chuàng)建一個(gè)對(duì)象一樣簡(jiǎn)單,只需要 new Thread() 就可以了叁丧,但實(shí)際上創(chuàng)建線程遠(yuǎn)不是創(chuàng)建一個(gè)對(duì)象那么簡(jiǎn)單啤誊。創(chuàng)建對(duì)象,僅僅是在 JVM 的堆里分配一塊內(nèi)存而已拥娄;而創(chuàng)建一個(gè)線程蚊锹,卻需要調(diào)用操作系統(tǒng)內(nèi)核的 API,然后操作系統(tǒng)要為線程分配一系列的資源稚瘾,這個(gè)成本就很高了牡昆,所以線程是一個(gè)重量級(jí)的對(duì)象,應(yīng)該避免頻繁創(chuàng)建和銷毀摊欠。
線程池和一般意義上的池化資源是不同的丢烘。一般意義上的池化資源如數(shù)據(jù)庫(kù)連接池柱宦,都是下面這樣,當(dāng)你需要資源的時(shí)候就調(diào)用 acquire() 方法來申請(qǐng)資源播瞳,用完之后就調(diào)用 release() 釋放資源捷沸。
class XXXPool{
// 獲取池化資源
XXX acquire() {......}
// 釋放池化資源
void release(XXX x){......}
}
目前業(yè)界線程池的設(shè)計(jì),普遍采用的都是生產(chǎn)者 - 消費(fèi)者模式狐史。線程池的使用方是生產(chǎn)者痒给,線程池本身是消費(fèi)者。在下面的示例代碼中骏全,我們創(chuàng)建了一個(gè)非常簡(jiǎn)單的線程池 MyThreadPool苍柏,你可以通過它來理解線程池的工作原理。
//簡(jiǎn)化的線程池姜贡,僅用來說明工作原理
class MyThreadPool {
//利用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式
BlockingQueue<Runnable> workQueue;
//保存內(nèi)部工作線程
List<WorkerThread> threads = new ArrayList<>();
// 構(gòu)造方法
MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
// 創(chuàng)建工作線程
for (int idx = 0; idx < poolSize; idx++) {
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任務(wù)
void execute(Runnable command) {
workQueue.put(command);
}
// 工作線程負(fù)責(zé)消費(fèi)任務(wù)试吁,并執(zhí)行任務(wù)
class WorkerThread extends Thread {
public void run() {
//循環(huán)取任務(wù)并執(zhí)行
while (true) {
Runnable task = workQueue.take();
task.run();
}
}
}
}
/** 下面是使用示例 **/
// 創(chuàng)建有界阻塞隊(duì)列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
// 創(chuàng)建線程池
MyThreadPool pool = new MyThreadPool(10, workQueue);
// 提交任務(wù)
pool.execute(()->{System.out.println("hello");});
1.10 自己動(dòng)手實(shí)現(xiàn)線程池
線程池沒你想的那么簡(jiǎn)單 上 - crossoverJie
線程池沒你想的那么簡(jiǎn)單 下 - crossoverJie
推薦閱讀
Java并發(fā)編程之美 - 翟陸續(xù) 內(nèi)容和慕課網(wǎng)玩轉(zhuǎn)Java并發(fā)類似,可以配合閱讀楼咳,有豐富的源碼分析熄捍,實(shí)踐部分有10個(gè)小案例
Java并發(fā)編程實(shí)戰(zhàn) - 極客時(shí)間 內(nèi)容有深度,并發(fā)設(shè)計(jì)模式母怜,分析了 4 個(gè)并發(fā)應(yīng)用案例 Guava RateLimiter余耽,Netty,Disrupter 和 HiKariCP苹熏,還介紹了 4 種其他類型的并發(fā)模型 Actor碟贾,協(xié)程,CSP等
精通Java并發(fā)編程 - 哈維爾 非常多的案例轨域,幾乎每個(gè)知識(shí)點(diǎn)和章節(jié)都有案例袱耽,學(xué)習(xí)后能更熟悉Java并發(fā)的應(yīng)用
傳智播客8天并發(fā) 筆記有并發(fā)案例,CPU原理等筆記干发,非常深入朱巨,后面畫時(shí)間學(xué)習(xí)一下精