前言
關(guān)于線程池
在Java/Android開發(fā)中装获,設(shè)計到并發(fā)的請求,那基本上是離不開線程池了厉颤。用線程池的好處:
- 1穴豫、減少線程頻繁創(chuàng)建、銷毀的開銷逼友;
- 2精肃、好控制并發(fā)量,降低OOM的可能帜乞,至于原因文中會說肋杖;
- 3、提高程序的響應(yīng)速度挖函,因為可以省去部分創(chuàng)建的過程状植;
要不要深度學(xué)習(xí)線程池
對于服務(wù)端的同學(xué)來說應(yīng)該會比較重視這一塊,因為需要做高并發(fā)怨喘;而移動端的同學(xué)可能比較容易忽略這一塊津畸。有些人覺得平時也用不到,移動端沒有那么大并發(fā)量必怜,或者說第三方框架中已經(jīng)完成了肉拓,比如
OkHtttp
;其實只能說有這種想法的同學(xué)還沒有遇到大一點的項目或者說沒有太多多線程優(yōu)化的經(jīng)驗梳庆。如果你真的遇到了這種項目瓶頸暖途,你連線程池的運(yùn)行原理都不知道,那又如何解決項目問題呢膏执?如果你要尋求一份中高級開發(fā)工程師的工作驻售,那線程池是基本是必問題目之一,而且還要有一定深度更米。
如何深度學(xué)習(xí)線程池
這也是我們今天的重點欺栗,本文將從下面幾點帶大家快速掌握線程池的要點:
- 1、從API使用到原碼解析,基于JDK1.8版本迟几;
- 2消请、從源碼閱讀(深入)中總結(jié)出(淺出)線程池工作原理;
- 3类腮、對應(yīng)用場景的分析以及異常處理
預(yù)覽
先對線程池的部分核心類/接口做個簡介臊泰,大家有個印象就好。
Executor接口
public interface Executor {
/**
* 就一個方法蚜枢,用來執(zhí)行線程任務(wù)的因宇,類似于Thread的start()方法
*/
void execute(Runnable command);
}
由于Executor是一個接口,所以execute
是由具體的實現(xiàn)類來完成的祟偷,調(diào)用這個方法察滑,可能會出現(xiàn)如下情況:
- 1.創(chuàng)建一個新線程并立即啟動;
- 2.復(fù)用線程池中空閑的線程來執(zhí)行任務(wù)修肠;
- 3.進(jìn)入一個阻塞隊列中排隊贺辰;
- 4.拋出異常/拒絕接收該任務(wù),這個要看具體的拒絕策略嵌施,默認(rèn)拋出異常饲化。
ExecutorService接口
繼承自Executor接口,我們常用的很多方法就是在這個接口中定義的。主要涉及到:提交任務(wù)
吗伤、關(guān)閉線程
吃靠、獲取結(jié)果
。
public interface ExecutorService extends Executor {
/**
* 關(guān)閉線程池足淆,新提交的任務(wù)會被拒絕巢块,但是已經(jīng)提交的任務(wù)會繼續(xù)執(zhí)行
*/
void shutdown();
/**
* 關(guān)閉線程池,新提交的任務(wù)會被拒絕巧号,并且嘗試關(guān)閉正在執(zhí)行的任務(wù)
*/
List<Runnable> shutdownNow();
/**
* 線程池是否已關(guān)閉
*/
boolean isShutdown();
/**
* 如果調(diào)用了shutdown或者shutdownNow之后族奢,所有的任務(wù)都結(jié)束了,那么返回true丹鸿,否則返回false
*/
boolean isTerminated();
/**
* 當(dāng)調(diào)用shutdown 或 shutdownNow之后越走,再調(diào)用這個方法,會
*等待所有的任務(wù)執(zhí)行完成靠欢,直到超時(超過timeout)或者說當(dāng)前的線程被中斷了
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一個Runnable 任務(wù)
*/
Future<?> submit(Runnable task);
/**
* 執(zhí)行所有任務(wù)廊敌,返回 Future 類型的一個 list
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
}
AbstractExecutorService
抽象類,實現(xiàn)了ExecutorService
接口门怪。主要封裝了通過submit方式提交任務(wù)的一些操作骡澈。
注意: 不需要獲取結(jié)果,可以用
execute
方法薪缆;需要獲取結(jié)果(FutureTask)用submit
方法秧廉。
由于篇幅有限,本文只針對execute
方式做講解拣帽,想了解submit
方式的同學(xué)可以參考深度解讀 java 線程池設(shè)計思想及源碼實現(xiàn)
Executors
這是大多數(shù)人最常用的一個類疼电,實質(zhì)上就是一個工具類〖跏茫可以快速的構(gòu)建一個線程池對象蔽豺,常見的操作有如下:
/**
* 創(chuàng)建一個固定大小的線程池,而且全是核心線程拧粪,
* 會一直存活修陡,除非特別設(shè)置了核心線程的超時時間
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* 創(chuàng)建了一個沒有大小限制的線程池,全是非核心線程可霎;如果線程
* 空閑的時間超過60s就會被移除
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* 這個線程池只有1個唯一的核心線程
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* 創(chuàng)建一個定長的線程池魄鸦,可以執(zhí)行周期性的任務(wù)
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
可以看出這幾種方式最后都是通過ThreadPoolExecutor
來實現(xiàn)的,所以下面就來研究一下今天的主角ThreadPoolExecutor
癣朗,等理解了這個類拾因,也就可以掌握線程池等工作原理,甚至可以根據(jù)自己的策略來自定義線程池旷余。
ThreadPoolExecutor(關(guān)鍵類)
繼承與抽象方法AbstractExecutorService
绢记,也就間接實現(xiàn)了ExecutorService
、Executor
等接口正卧。
從構(gòu)造方法談起
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:
核心線程數(shù)量蠢熄,所謂核心線程就是一直保留在線程池中,及時處于空閑狀態(tài)也不會銷毀等線程炉旷,除非手動調(diào)用allowCoreThreadTimeOut
才可以在超時銷毀签孔。 - maximumPoolSize:
線程池允許創(chuàng)建的最大線程數(shù)量。 - keepAliveTime:
線程池中除了有核心線程之外窘行,還有非核心線程骏啰,非核心線程處于空閑的時候會在一定時間范圍內(nèi)被關(guān)閉,而這個空閑的時間就是keepAliveTime抽高。 - unit:
keepAliveTime
的時間單位判耕,比如秒、分翘骂、時等 - workQueue:
保存待執(zhí)行任務(wù)的阻塞隊列壁熄。如果一個任務(wù)進(jìn)入線程池之后,如果核心線程滿了的話碳竟,就會先嘗試添加到隊列中草丧,當(dāng)然未必添加成功,而且隊列也有多種實現(xiàn)莹桅,具體的后面再說昌执,先簡單理解為排隊即可。 - threadFactory:
如果沒有設(shè)置的話,使用默認(rèn)的ThreadFactory來創(chuàng)建線程懂拾;當(dāng)然你也可以通過ThreadFactory自己創(chuàng)建線程煤禽,比如設(shè)置線程名稱,優(yōu)先級等 - handler:
當(dāng)達(dá)到線程池的最大容量時的拒絕策略岖赋。當(dāng)線程池飽和檬果,繼續(xù)提交任務(wù),需要一種策略來處理該任務(wù)唐断。線程池提供了4種策略:AbortPolicy
:直接拋出異常选脊,這是默認(rèn)策略;
CallerRunsPolicy
:用調(diào)用者所在的線程來執(zhí)行任務(wù)脸甘;
DiscardOldestPolicy
:丟棄阻塞隊列中靠最前的任務(wù)恳啥,并執(zhí)行當(dāng)前任務(wù);
DiscardPolicy
:直接丟棄任務(wù)丹诀;
一些重要屬性和方法
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
這里很關(guān)鍵角寸,一定要認(rèn)真看,后面分析任務(wù)執(zhí)行execute()
方法就需要用到這些基礎(chǔ)忿墅。
Integer.SIZE =32
,這代表了java中扁藕,int最大是32位,所以COUNT_BITS
等于29疚脐;CAPACITY
等于1*2^29 -1亿柑,用它來表示線程池的最大容量是足夠了的。
從線程池的生命周期來看棍弄,線程池有5種狀態(tài):
-
RUNNING
: 能接受新的任務(wù)望薄,也能處理隊列中的任務(wù); -
SHUTDOWN
:不接受新的任務(wù)呼畸,但是會處理隊列中的任務(wù)痕支; -
STOP
: 不接受新的任務(wù),也不處理隊列中的任務(wù),而且會中斷正在進(jìn)行的任務(wù)蛮原; -
TIDYING
: 所有的任務(wù)都完成了宵距,workCount等于 0桂对。線程池的狀態(tài)在轉(zhuǎn)換為 TIDYING 狀態(tài)時,會執(zhí)行鉤子方法 terminated() -
TERMINATED
: terminated() 方法結(jié)束后,線程池的狀態(tài)就會變成這個
關(guān)于狀態(tài)轉(zhuǎn)換
-
RUNNING -> SHUTDOWN
:
當(dāng)調(diào)用shutdown()方法后活鹰,會發(fā)生這個狀態(tài)轉(zhuǎn)換浩姥; -
(RUNNING or SHUTDOWN) -> STOP
:
當(dāng)調(diào)用 shutdownNow() 后戚嗅,會發(fā)生這個狀態(tài)轉(zhuǎn)換露乏; -
SHUTDOWN -> TIDYING
:
當(dāng)隊列和線程池都變成空的時候,會發(fā)生這個狀態(tài)轉(zhuǎn)換笛园; -
STOP -> TIDYING
:
當(dāng)線程池是空的時候隘击,會發(fā)生這個狀態(tài)轉(zhuǎn)換侍芝; -
TIDYING -> TERMINATED
:
當(dāng)terminated() 方法結(jié)束后,會發(fā)生這個狀態(tài)轉(zhuǎn)換埋同。
關(guān)于狀態(tài)轉(zhuǎn)換就講完了州叠,特別是前2個狀態(tài)轉(zhuǎn)換,更是常用莺禁。還有一個關(guān)鍵的屬性ctl
需要講一下留量,初學(xué)者可能不太好理解窄赋,需要一點計算機(jī)基礎(chǔ)哟冬。
首先ctl
是一個AtomicInteger類型的對象,它其實是對int的包裝忆绰,可以在多線程并發(fā)的情況下保證原子性浩峡,它傳入的參數(shù)就是它表示的值。這里是通過ctlOf()
方法來計算的错敢。
在計算之前先補(bǔ)充2個小知識點:
1翰灾、 <<:是移位運(yùn)算符,具體倆說是左移稚茅;右移用>>表示纸淮。左移的意思是將一個二進(jìn)制數(shù)向左邊移動1位,那么左移1位就等于這個數(shù)乘以2亚享,左移n位的話就是乘以2^n咽块;右移的話就是除以;
2欺税、 由于10進(jìn)制數(shù)有正負(fù)之分侈沪,所以轉(zhuǎn)換成二進(jìn)制數(shù)的時候,需要在最高位加上0/1來表示正負(fù)晚凿,正數(shù)用0表示亭罪;負(fù)數(shù)用1來表示。
3歼秽、原碼:加上符號為之后的二進(jìn)制數(shù)应役;反碼:正數(shù)的反碼是其本身,負(fù)數(shù)的反碼:符號位不變燥筷,其余各位取反扛吞;補(bǔ)碼:正數(shù)的補(bǔ)碼就是其本身;負(fù)數(shù)的補(bǔ)碼:即在反碼的基礎(chǔ)上+1荆责。
4滥比、針對二進(jìn)制數(shù)的&、|做院、~盲泛。與(&)運(yùn)算:2個都是1才為1濒持,否則為0;或(|)運(yùn)算:只要有1個為1就是1寺滚,否則為0柑营;非(~)運(yùn)算:取反,1變0村视,0變1官套。
首先RUNNING = -1 << COUNT_BITS;
其中-1的二進(jìn)制數(shù)1001,那轉(zhuǎn)換成補(bǔ)碼就是1111蚁孔,然后左移29位就變成1110 0000 0000 0000 0000 0000 0000 0000
奶赔,因為int最多32位,所以高位的1沒了杠氢;然后再和0做或運(yùn)算站刑,所以結(jié)果還是它本身,所以ctl初始值為1110 0000 0000 0000 0000 0000 0000 0000
鼻百,其中高3位存放線程狀態(tài)绞旅,后面29位存放線程數(shù)量。
還有幾個方法
runStateOf
:獲取運(yùn)行狀態(tài)温艇;workerCountOf
:取出ctl低29位因悲,來表示當(dāng)前線程數(shù);ctlOf
:獲取運(yùn)行狀態(tài)和活動線程數(shù)的值勺爱。
execute(關(guān)鍵方法)
這是線程池的關(guān)鍵方法晃琳,用來提交任務(wù)的。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//表示 “線程池狀態(tài)” 和 “線程數(shù)量” 的整數(shù)
int c = ctl.get();
/*
* 如果當(dāng)前活躍線程數(shù)小于核心線程數(shù)邻寿,就會添加一個worker來執(zhí)行任務(wù)蝎土;
* 具體來說,新建一個核心線程放入線程池中绣否,并把任務(wù)添加到該線程中誊涯。
*/
if (workerCountOf(c) < corePoolSize) {
/*
* addWorker()如果返回true表示添加成功,線程池會執(zhí)行這個任務(wù)蒜撮,那么本方法可以結(jié)束了暴构,返回 false 代表線程池不允許提交任務(wù),那么就會執(zhí)行后面的方法段磨。
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
//程序執(zhí)行到這里取逾,說明要么活躍線程數(shù)大于核心線程數(shù);要么addWorker()失敗
/*
* 如果當(dāng)前線程池是運(yùn)行狀態(tài)苹支,會把任務(wù)添加到隊列
*/
if (isRunning(c) && workQueue.offer(command)) {
/*
*這里的邏輯比較有意思砾隅,又重新檢查了線程狀態(tài)和數(shù)量;
*如果線程不處于 RUNNING 狀態(tài)债蜜,就會移除剛才添加到隊列中的任務(wù)晴埂;
*如果線程池還是 RUNNING 狀態(tài)究反,并且線程數(shù)為 0,那么開啟新的線程儒洛;
* addWorker(null, false)參數(shù)分析:
* 1. 第一個參數(shù)為null精耐,表示在線程池中創(chuàng)建一個線程,但不去啟動琅锻;
* 2. 第二個參數(shù)為false卦停,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPoolSize,
* 添加線程時根據(jù)maximumPoolSize來判斷恼蓬;
*/
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//程序執(zhí)行到這里惊完,說明要么線程狀態(tài)不是RUNNING;要么workQueue隊列已經(jīng)滿了
/*
* 這時滚秩,再調(diào)用addWorker方法去創(chuàng)建線程专执,
* 會把線程池的線程 數(shù)量的上限設(shè)置為maximum淮捆;
* 如果失敗郁油,說明當(dāng)前線程數(shù)已經(jīng)達(dá)到 maximumPoolSize,執(zhí)行拒絕策略
*/
else if (!addWorker(command, false))
reject(command);
}
為什么當(dāng)任務(wù)添加到隊列后攀痊,內(nèi)部還執(zhí)行了那么復(fù)雜的判斷桐腌?
因為擔(dān)心任務(wù)提交到隊列中了,但是線程池卻關(guān)閉了苟径。
當(dāng)執(zhí)行execute
方法提交一個任務(wù)的時候,如果線程池一直處于RUNNING狀態(tài)棘街,那流程如下:
- 1蟆盐、當(dāng)工作線程數(shù)量 < 核心線程數(shù)量,會嘗試創(chuàng)建一個核心線程并提交任務(wù)遭殉;
- 2石挂、當(dāng)工作線程數(shù)量 >= 核心線程數(shù)量,如果阻塞隊列沒有滿险污,則把任務(wù)添加到隊列中痹愚;如果隊列滿了,則嘗試啟動一個新的非核心線程來提交任務(wù)蛔糯;
- 3拯腮、當(dāng)工作線程數(shù)量 > maximumPoolSize,則根據(jù)拒絕策略來處理該任務(wù), 默認(rèn)的處理方式是直接拋異常蚁飒。
注意:
addWorker(null, false);
也是創(chuàng)建一個線程动壤,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue
中了淮逻,當(dāng)worker
在執(zhí)行的時候琼懊,會直接從workQueue
中獲取任務(wù)蜒灰。在workerCountOf(recheck) == 0
時執(zhí)行addWorker(null, false);
也是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。
關(guān)于Worker類和addWorker方法
addWorker()
是嘗試在線程池中創(chuàng)建一個線程并執(zhí)行任務(wù)肩碟,firstTask
表示作為新創(chuàng)建的線程的第一個任務(wù)强窖,core
參數(shù)為true的時候,會用核心線程數(shù)做創(chuàng)建線程的邊界削祈;如果為false翅溺,會用最大線程數(shù)maximumPoolSize
做為邊界。如果addWorker()
返回true髓抑,表示創(chuàng)建線程成功
private boolean addWorker(Runnable firstTask, boolean core) {}
業(yè)務(wù)場景(分析2種常用的線程池)
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
固定大小的線程池咙崎。最大線程數(shù)與核心線程數(shù)相等,keepAliveTime的設(shè)置無效吨拍,因為核心線程默認(rèn)不會銷毀褪猛,阻塞隊列為LinkedBlockingQueue
,它是無界隊列羹饰。這種線程池適合CPU密集型任務(wù).
關(guān)于CPU密集型任務(wù)和IO密集型任務(wù)可以參考這篇文章
線程池核心線程數(shù)多少最為合適(IO密集型和CPU密集型)伊滋?
工作流程:
- 提交任務(wù)
- 如果線程數(shù)小于核心線程數(shù),創(chuàng)建核心線程并執(zhí)行任務(wù)
- 如果線程數(shù)大于核心線程队秩,把任務(wù)添加到LinkedBlockingQueue阻塞隊列
- 如果線程執(zhí)行完任務(wù)笑旺,去阻塞隊列取任務(wù),繼續(xù)執(zhí)行馍资。
雖然線程數(shù)量是固定的筒主,但是由于使用了無界隊列LinkedBlockingQueue,如果線程的并發(fā)量比較大鸟蟹,任務(wù)的執(zhí)行時間比較長乌妙,那還是可能會OOM的。適用于CPU密集型的任務(wù)建钥,也就是那種長期的任務(wù)藤韵。
newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
核心線程數(shù)為 0,最大線程數(shù)為 Integer.MAX_VALUE锦针,所有線程空閑時間 為 60 秒荠察,任務(wù)隊列采用 SynchronousQueue。
用它去處理那種并發(fā)量很大的任務(wù)就不合適奈搜,由于空閑 60 秒的線程會被終止悉盆,長時間保持空閑的 CachedThreadPool 不會占用任何資源。適用于那種任務(wù)可以快速完成的任務(wù)馋吗。
總結(jié)
線程池的內(nèi)容其實是很多的焕盟,絕不是1,2篇文章就能講完的。本文也主要是針對提交任務(wù)之后線程池的工作原理以及線程狀態(tài)變化來做講解脚翘。核心要義如下:
1灼卢、Executors這個工具類下創(chuàng)建的幾種線程池的工作原理。
newFixedThreadPool来农、newCachedThreadPool等鞋真,需要注意每一個的優(yōu)缺點和使用場景。
2沃于、ThreadPoolExecutor這個類的構(gòu)造方法和一些關(guān)鍵成員屬性
Executors創(chuàng)建的多種線程池都是通過它的構(gòu)造方法來實現(xiàn)的涩咖,讀者需要熟悉它的參數(shù)的意義,這樣的話繁莹,就可以自定義滿足個性化需求的線程池檩互。在文中列舉出的一些成員屬性也很重要,后面對線程池的各種操作離不開它們咨演。
3闸昨、理解深刻線程池中的線程創(chuàng)建時機(jī)
主要是那個execute()
方法和addWorker()
方法,主要是根據(jù)線程池狀態(tài)薄风、當(dāng)前線程數(shù)饵较、核心線程數(shù)、隊列大小村刨、線程池最大線程數(shù)來結(jié)合來判斷告抄。
4撰茎、拒絕策略
添加任務(wù)到線程池嵌牺,不一定會被接受。主要看一下哪些情況會執(zhí)行
reject(command)
方法龄糊;還有幾種不同的拒絕策略逆粹,默認(rèn)是拋異常。
5炫惩、異常處理
如果某個任務(wù)執(zhí)行出現(xiàn)異常僻弹,那么執(zhí)行任務(wù)的線程會被關(guān)閉。
感謝以下作者
優(yōu)雅的使用Java線程池
深入理解 Java 線程池:ThreadPoolExecutor
面試必備:Java線程池解析
Java線程池中的核心線程是如何被重復(fù)利用的他嚷?
Java線程池中的核心線程是如何被重復(fù)利用的蹋绽?