1.簡(jiǎn)介
如果并發(fā)的線(xiàn)程數(shù)量很多,并且每個(gè)線(xiàn)程都是執(zhí)行一個(gè)時(shí)間很短的任務(wù)就結(jié)束了笋除,這樣頻繁創(chuàng)建線(xiàn)程就會(huì)大大降低系統(tǒng)的效率听隐,因?yàn)轭l繁創(chuàng)建線(xiàn)程和銷(xiāo)毀線(xiàn)程需要時(shí)間织咧。那么有沒(méi)有一種辦法使得線(xiàn)程可以復(fù)用幸乒,就是執(zhí)行完一個(gè)任務(wù)懦底,并不被銷(xiāo)毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)罕扎,在Java中可以通過(guò)線(xiàn)程池來(lái)達(dá)到這樣的效果基茵。
2.Java中的ThreadPoolExecutor類(lèi)
線(xiàn)程池相關(guān)類(lèi):ThreadPoolExecutor、AbstractExecutorService壳影、ExecutorService、Executor弥臼、Executors(線(xiàn)程池工廠類(lèi))
幾個(gè)類(lèi)的關(guān)系如下:
(1)ThreadPoolExecutor extends AbstractExecutorService
(2)AbstractExecutorService implements ExecutorService
(3)public interface ExecutorService extends Executor
(4)Executors(線(xiàn)程池工廠類(lèi))宴咧,Exectors工廠類(lèi)提供了線(xiàn)程池的初始化接口,可以初始化四種不同的線(xiàn)程池。
Executor是一個(gè)接口径缅,它是Executor框架的基礎(chǔ)掺栅,它將任務(wù)的提交和任務(wù)的執(zhí)行分離開(kāi)來(lái)烙肺。Android中的線(xiàn)程池來(lái)源于Java中的Executor、Executor是一個(gè)接口氧卧,真正的線(xiàn)程池實(shí)現(xiàn)是在ThreadPoolExecutor桃笙,ThreadPoolExecutor提供一系列參數(shù)來(lái)配置線(xiàn)程池,通過(guò)不同的參數(shù)可以創(chuàng)建不同的線(xiàn)程池沙绝,線(xiàn)程池主要分為4類(lèi)搏明,這四類(lèi)線(xiàn)程池可以通過(guò)Executors所提供的工廠方法來(lái)的得到。線(xiàn)程池都是直接或者間接通過(guò)配置ThreadPoolExecutor來(lái)實(shí)現(xiàn)的闪檬。從線(xiàn)程池的上層API來(lái)看星著,再多種的線(xiàn)程池,無(wú)非是參數(shù)的不同粗悯,讓它們呈現(xiàn)出了不同的特性虚循。那么這些特性到底依賴(lài)什么樣的原理實(shí)現(xiàn),就更值得去深究样傍。ThreadPoolExecutor實(shí)現(xiàn)了線(xiàn)程池所需的最小功能集横缔,已能hold住很多場(chǎng)景。
java.uitl.concurrent.ThreadPoolExecutor類(lèi)是線(xiàn)程池中最核心的一個(gè)類(lèi)衫哥,因此如果要透徹地了解Java中的線(xiàn)程池茎刚,必須先了解這個(gè)類(lèi)。線(xiàn)程池都是直接或者間接通過(guò)配置ThreadPoolExecutor來(lái)實(shí)現(xiàn)的炕檩,并且ThreadPoolExecutor有四個(gè)構(gòu)造函數(shù)斗蒋,通過(guò)觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn),發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作笛质。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
ThreadPoolExecutor共七個(gè)參數(shù):核心線(xiàn)程數(shù)泉沾,最大線(xiàn)程數(shù),keepAliveTime妇押,以及keepAliveTime時(shí)間單位跷究,阻塞隊(duì)列、線(xiàn)程工廠敲霍、拒絕策略俊马。
我們先看下四種不同的線(xiàn)程池相關(guān)參數(shù):
FixThreadPool
//適用于負(fù)載比較重的服務(wù)器。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
線(xiàn)程數(shù):固定數(shù)量的線(xiàn)程肩杈,核心線(xiàn)程和最大線(xiàn)程數(shù)一樣
阻塞隊(duì)列:LinkedBlockingQueue基于鏈表結(jié)構(gòu)的阻塞隊(duì)列柴我,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene扩然;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
線(xiàn)程工廠ThreadFactory:默認(rèn)的線(xiàn)程工廠艘儒;
拒絕策略:ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常
CachedThreadPool:大小無(wú)界的線(xiàn)程池,適用于執(zhí)行很多短期異步任務(wù)的小程序,或者負(fù)載較輕的服務(wù)器界睁。阻塞隊(duì)列:SynchronousQueue一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列觉增,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài)翻斟,吞吐量通常要高于LinkedBlockingQuene逾礁;線(xiàn)程工廠和拒絕策略與FixThreadPool相同。
SingleThreadExecutor:適用于需要保證任務(wù)順序執(zhí)行的各個(gè)任務(wù)访惜。核心線(xiàn)程和最大線(xiàn)程都是1嘹履,阻塞隊(duì)列LinkedBlockingQueue:LinkedBlockingQueue基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,阻塞隊(duì)列大小為Integer.MAX_VALUE。線(xiàn)程工廠和拒絕策略也和FixThreadPool一樣疾牲。
ScheduledThreadPoolExecutor:適用于需要多個(gè)后臺(tái)線(xiàn)程執(zhí)行周期任務(wù)植捎。阻塞隊(duì)列:DelayedWorkQueue,默認(rèn)大小是16阳柔。線(xiàn)程工廠和拒絕和FixThreadPool一樣焰枢。
構(gòu)造函數(shù)中各參數(shù)含義:
corePoolSize:線(xiàn)程池中的核心線(xiàn)程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí)舌剂,線(xiàn)程池創(chuàng)建一個(gè)新線(xiàn)程執(zhí)行任務(wù)济锄,直到當(dāng)前線(xiàn)程數(shù)等于corePoolSize;如果當(dāng)前線(xiàn)程數(shù)為corePoolSize霍转,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中荐绝,等待被執(zhí)行;如果執(zhí)行了線(xiàn)程池的prestartAllCoreThreads()方法避消,線(xiàn)程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線(xiàn)程低滩。
maximumPoolSize:線(xiàn)程池中允許的最大線(xiàn)程數(shù)。如果當(dāng)前阻塞隊(duì)列滿(mǎn)了岩喷,且繼續(xù)提交任務(wù)恕沫,則創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù),前提是當(dāng)前線(xiàn)程數(shù)小于maximumPoolSize纱意;
keepAliveTime:線(xiàn)程空閑時(shí)的存活時(shí)間婶溯,即當(dāng)線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí),繼續(xù)存活的時(shí)間偷霉;默認(rèn)情況下迄委,該參數(shù)只在線(xiàn)程數(shù)大于corePoolSize時(shí)才有用;
unit:keepAliveTime的單位类少;
阻塞隊(duì)列workQueue
四種線(xiàn)程池用到了三種阻塞隊(duì)列:LinkedBlockingQueue叙身、SynchronousQueue、DelayedWorkQueue硫狞。阻塞隊(duì)列用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)信轿。四種線(xiàn)程池用到的線(xiàn)程工廠和拒絕策略都一樣赞警,都是默認(rèn)的。
JDK提供的阻塞隊(duì)列有:
(1)ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列虏两,按FIFO排序任務(wù);基于數(shù)組的先進(jìn)先出隊(duì)列世剖,此隊(duì)列創(chuàng)建時(shí)必須指定大卸ò铡;
(2)LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列旁瘫,按FIFO排序任務(wù)祖凫,吞吐量通常要高于ArrayBlockingQuene;基于鏈表的先進(jìn)先出隊(duì)列酬凳,如果創(chuàng)建時(shí)沒(méi)有指定此隊(duì)列大小惠况,則默認(rèn)為Integer.MAX_VALUE;
(3)SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列宁仔,每個(gè)插入操作必須等到另一個(gè)線(xiàn)程調(diào)用移除操作稠屠,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene翎苫;這個(gè)隊(duì)列比較特殊权埠,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線(xiàn)程來(lái)執(zhí)行新來(lái)的任務(wù)煎谍。
(4)PriorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列攘蔽;線(xiàn)程池的排隊(duì)策略與BlockingQueue有關(guān)。 其中ArrayBlockQueue和LinkedBlockingQueue都可以指定初始容量呐粘。
線(xiàn)程工廠threadFactory:可以使用Executors中默認(rèn)線(xiàn)程工廠满俗,也可以通過(guò)自定義的線(xiàn)程工廠可以給每個(gè)新建的線(xiàn)程設(shè)置一個(gè)具有識(shí)別度的線(xiàn)程名。
線(xiàn)程池拒絕策略handler:線(xiàn)程池的飽和策略作岖,當(dāng)阻塞隊(duì)列滿(mǎn)了唆垃,且沒(méi)有空閑的工作線(xiàn)程,如果繼續(xù)提交任務(wù)鳍咱,必須采取一種策略處理該任務(wù)降盹,線(xiàn)程池提供了4種策略:
(1)AbortPolicy:直接拋出異常,默認(rèn)策略谤辜;
(2)CallerRunsPolicy:用調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行任務(wù)蓄坏;
(3)DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)丑念;
(4)DiscardPolicy:直接丟棄任務(wù)涡戳;
當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略脯倚,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)渔彰。
綜合以上嵌屎,線(xiàn)程池阻塞隊(duì)列、線(xiàn)程工廠恍涂、線(xiàn)程拒絕策略都可以自定義宝惰,根據(jù)自己的需求進(jìn)行自定義。線(xiàn)程池各個(gè)參數(shù)都可以進(jìn)行自定義再沧。ThreadFactory和拒絕策略都很容易進(jìn)行自定義尼夺。
如何存儲(chǔ)線(xiàn)程池狀態(tài)以及線(xiàn)程數(shù)量(5種狀態(tài)和線(xiàn)程個(gè)數(shù))?
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 值為29 Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位全為0炒瘸,低29位全為1淤堵,因此線(xiàn)程數(shù)量的表示范圍為 0 ~ 2^29
//1左移29位然后再減1,這樣后29全為1顷扩,所以線(xiàn)程池中線(xiàn)程最大個(gè)數(shù)為2的29次數(shù)拐邪。
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl 獲取線(xiàn)程池狀態(tài),參數(shù)C為ctl最新?tīng)顟B(tài)值。 ~CAPACITY 高三位為111隘截,低29為全為0.這樣就可以得到線(xiàn)程池狀態(tài)扎阶。
private static int runStateOf(int c) { return c & ~CAPACITY; }
//獲取工作線(xiàn)程數(shù)量, 參數(shù)C為ctl最新?tīng)顟B(tài)值 CAPACITY 高三位為000,低29全為1技俐,按位與就可以得到當(dāng)前線(xiàn)程個(gè)數(shù)乘陪。
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
}
ctl用來(lái)控制線(xiàn)程池的狀態(tài),并用來(lái)表示線(xiàn)程池線(xiàn)程數(shù)量雕擂。ctl類(lèi)型為AtomicInteger啡邑,那用一個(gè)基礎(chǔ)如何表示以上五種狀態(tài)以及線(xiàn)程池工作線(xiàn)程數(shù)量呢?int型變量占用4字節(jié)井赌,共32位谤逼,因此采用位表示,可以解決上述問(wèn)題仇穗。5種狀態(tài)使用5種數(shù)值進(jìn)行表示流部,需要占用3位,余下的29位就可以用來(lái)表示線(xiàn)程數(shù)纹坐。因此枝冀,高三位表示進(jìn)程狀態(tài),低29位為線(xiàn)程數(shù)量耘子,ctl是原子操作類(lèi)果漾,進(jìn)行自增自減時(shí)都是線(xiàn)程安全的,原子操作類(lèi)是基于CAS實(shí)現(xiàn)的谷誓。采用了int分位表示線(xiàn)程池狀態(tài)和線(xiàn)程數(shù)量绒障,如何獲取線(xiàn)程狀態(tài)與數(shù)量見(jiàn)代碼注釋。
在Java讀寫(xiě)鎖中捍歪,讀寫(xiě)鎖狀態(tài)也是使用一個(gè)int來(lái)標(biāo)識(shí)讀寫(xiě)鎖不同狀態(tài)户辱,高16位中存儲(chǔ)讀鎖狀態(tài)鸵钝,低16位存儲(chǔ)寫(xiě)鎖狀態(tài)。
如果調(diào)用了shutdown()方法庐镐,則線(xiàn)程池處于SHUTDOWN狀態(tài)恩商,此時(shí)線(xiàn)程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢必逆;如果調(diào)用了shutdownNow()方法痕届,則線(xiàn)程池處于STOP狀態(tài),此時(shí)線(xiàn)程池不能接受新的任務(wù)末患,并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);當(dāng)線(xiàn)程池處于SHUTDOWN或STOP狀態(tài)锤窑,并且所有工作線(xiàn)程已經(jīng)銷(xiāo)毀璧针,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線(xiàn)程池被設(shè)置為T(mén)ERMINATED狀態(tài)渊啰。
3.任務(wù)提交與任務(wù)執(zhí)行
線(xiàn)程池框架提供了兩種方式提交任務(wù)探橱,根據(jù)不同的業(yè)務(wù)需求選擇不同的方式。通過(guò)Executor.execute()方法提交的任務(wù)绘证,必須實(shí)現(xiàn)Runnable接口隧膏,該方式提交的任務(wù)不能獲取返回值,因此無(wú)法判斷任務(wù)是否執(zhí)行成功嚷那。通過(guò)ExecutorService.submit()方法提交的任務(wù)胞枕,可以獲取任務(wù)執(zhí)行完的返回值。用于提交需要返回值的任務(wù)魏宽,線(xiàn)程池會(huì)返回一個(gè)future類(lèi)型的對(duì)象腐泻,通過(guò)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,F(xiàn)uture接口和實(shí)現(xiàn)Future接口的FutureTask類(lèi)队询,代表異步計(jì)算結(jié)果派桩。任務(wù)執(zhí)行:我們以Executor.execute()方法提交的任務(wù)為例,對(duì)任務(wù)執(zhí)行進(jìn)行介紹蚌斩,該部分是線(xiàn)程池的核心铆惑,代碼如下,相關(guān)內(nèi)容在代碼中已添加注釋送膳。
public void execute(Runnable command) {
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//workerCountOf方法根據(jù)ctl的低29位员魏,得到線(xiàn)程池的當(dāng)前線(xiàn)程數(shù),如果線(xiàn)程數(shù)小于corePoolSize肠缨,則執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程(核心線(xiàn)程)執(zhí)行任務(wù)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果線(xiàn)程池處于RUNNING狀態(tài)逆趋,且把提交的任務(wù)成功放入阻塞隊(duì)列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次檢查線(xiàn)程池的狀態(tài),如果線(xiàn)程池沒(méi)有RUNNING晒奕,且成功從阻塞隊(duì)列中刪除任務(wù)闻书,則執(zhí)行reject方法處理任務(wù)名斟;
if (! isRunning(recheck) && remove(command))
reject(command);
//當(dāng)前線(xiàn)程個(gè)數(shù)是0, 則執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程(非核心線(xiàn)程)執(zhí)行任務(wù)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//執(zhí)行addWorker方法創(chuàng)建新的線(xiàn)程執(zhí)行任務(wù)魄眉,如果addWoker執(zhí)行失敗砰盐,則執(zhí)行reject方法處理任務(wù)
//如果任務(wù)隊(duì)列已滿(mǎn),并且核心線(xiàn)程都已經(jīng)啟動(dòng)坑律,則創(chuàng)建非核心線(xiàn)程去執(zhí)行任務(wù)岩梳,如果創(chuàng)建非核心線(xiàn)程失敗,則拒絕任務(wù)
else if (!addWorker(command, false))
reject(command);
}
//addWorker實(shí)現(xiàn)
//從方法execute的實(shí)現(xiàn)可以看出:addWorker主要負(fù)責(zé)創(chuàng)建新的線(xiàn)程并執(zhí)行任務(wù)
private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//線(xiàn)程池的工作線(xiàn)程通過(guò)Woker類(lèi)實(shí)現(xiàn)晃择,當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法冀值;
//Worker實(shí)現(xiàn)了Runnable接口,可以將自身作為一個(gè)任務(wù)在工作線(xiàn)程中執(zhí)行宫屠;
w = new Worker(firstTask);
//w.thread在Worker構(gòu)造函數(shù)中使用ThreadFactory創(chuàng)建線(xiàn)程列疗,this.thread = getThreadFactory().newThread(this);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//執(zhí)行start方法啟動(dòng)線(xiàn)程thread時(shí),本質(zhì)是執(zhí)行了Worker的run方法啟動(dòng)線(xiàn)程浪蹂,然后調(diào)用runWorker()方法抵栈。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//runWorker()方法
//runWorker方法是線(xiàn)程池的核心
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//firstTask執(zhí)行完成之后,通過(guò)getTask方法從阻塞隊(duì)列中獲取等待的任務(wù)坤次,如果隊(duì)列中沒(méi)有任務(wù)古劲,getTask方法會(huì)被阻塞并掛起,不會(huì)占用cpu資源缰猴;
//線(xiàn)程復(fù)用就是在線(xiàn)程執(zhí)行完之后产艾,不斷的從任務(wù)隊(duì)列中取新的任務(wù),直到線(xiàn)程池中的全部任務(wù)執(zhí)行完滑绒,以此來(lái)完成線(xiàn)程復(fù)用
//然后當(dāng)線(xiàn)程沒(méi)有任務(wù)執(zhí)行時(shí),超過(guò)存活時(shí)間之后胰舆,線(xiàn)程終止。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//在執(zhí)行任務(wù)的前后蹬挤,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute和afterExecute方法缚窿;
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行通過(guò)Executor.execute()方法提交的任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//在執(zhí)行任務(wù)的前后,可以根據(jù)業(yè)務(wù)場(chǎng)景自定義beforeExecute和afterExecute方法焰扳;
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 線(xiàn)程池已沒(méi)有任務(wù)了倦零,工作線(xiàn)程達(dá)到了可退出的狀態(tài),Worker退出
processWorkerExit(w, completedAbruptly);
}
}
//getTask()方法
private Runnable getTask() {
try {
//workQueue.take:如果阻塞隊(duì)列為空吨悍,當(dāng)前線(xiàn)程會(huì)被掛起等待扫茅;當(dāng)隊(duì)列中有任務(wù)加入時(shí),線(xiàn)程被喚醒育瓜,take方法返回任務(wù)葫隙,并執(zhí)行;
//workQueue.poll:如果在keepAliveTime時(shí)間內(nèi)躏仇,阻塞隊(duì)列還是沒(méi)有任務(wù)恋脚,則返回null腺办;
//所以,線(xiàn)程池中實(shí)現(xiàn)的線(xiàn)程可以一直執(zhí)行由用戶(hù)提交的任務(wù)糟描。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4.線(xiàn)程池原理
以上我們已經(jīng)對(duì)線(xiàn)程池中參數(shù)和執(zhí)行流程進(jìn)行了講解怀喉,下面通過(guò)示例來(lái)總結(jié)一下線(xiàn)程池工作原理。
假設(shè):corePoolSize=5船响,maxPoolSize=10躬拢,blockQueueSize=10,依次提交6個(gè)比較耗時(shí)的任務(wù)见间,線(xiàn)程池是如何執(zhí)行的聊闯?
答:依次提交6個(gè)耗時(shí)任務(wù),可以理解為正在執(zhí)行的線(xiàn)程都沒(méi)執(zhí)行完米诉,前面提交的5個(gè)任務(wù)馅袁,肯定有coresize=5的5個(gè)線(xiàn)程全部執(zhí)行,第6個(gè)任務(wù)是繼續(xù)創(chuàng)建線(xiàn)程呢荒辕,還是放在隊(duì)列里?第6個(gè)任務(wù)會(huì)放在隊(duì)列里够颠,等待前面5個(gè)任務(wù)有某一個(gè)執(zhí)行完之后勇皇,再執(zhí)行丑孩,maxsize線(xiàn)程只有阻塞隊(duì)列滿(mǎn)之后才會(huì)繼續(xù)創(chuàng)建線(xiàn)程,然后將任務(wù)隊(duì)列中的所有任務(wù)執(zhí)行完之后李皇,非核心線(xiàn)程的消息時(shí)間達(dá)到才會(huì)關(guān)閉線(xiàn)程,但是五個(gè)核心線(xiàn)程是不會(huì)被回收掉的宙枷。
核心點(diǎn):提交任務(wù)時(shí)掉房,如果當(dāng)前線(xiàn)程個(gè)數(shù)小于核心線(xiàn)程數(shù),那么提交新任務(wù)就會(huì)創(chuàng)建新的線(xiàn)程慰丛,如果阻塞隊(duì)列還沒(méi)有滿(mǎn)就不會(huì)創(chuàng)建非核心線(xiàn)程卓囚,非核心線(xiàn)程的創(chuàng)建條件是任務(wù)隊(duì)列必須滿(mǎn),并且有任務(wù)诅病。
5.線(xiàn)程池相關(guān)問(wèn)題
(1)非核心線(xiàn)程延遲死亡哪亿,如何實(shí)現(xiàn)?
通過(guò)阻塞隊(duì)列poll()贤笆,讓線(xiàn)程阻塞等待一段時(shí)間蝇棉,如果沒(méi)有取到任務(wù),則線(xiàn)程死亡芥永。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
(2)核心線(xiàn)程為什么不死篡殷?
線(xiàn)程池里的線(xiàn)程從阻塞隊(duì)列里拿任務(wù),如果存在非核心線(xiàn)程埋涧,假設(shè)阻塞隊(duì)列里沒(méi)有任務(wù)板辽,那么非核心線(xiàn)程也要在等到keepAliveTime時(shí)間后才會(huì)釋放奇瘦。如果當(dāng)前僅有核心線(xiàn)程存在,如果允許釋放核心線(xiàn)程的話(huà)戳气,也就和非核線(xiàn)程的處理方式一樣链患,反之,則通過(guò)take()一直阻塞直到拿到任務(wù)瓶您,這也就是線(xiàn)程池里的核心線(xiàn)程為什么不死的原因麻捻。
(3)如何釋放核心線(xiàn)程?
將allowCoreThreadTimeOut設(shè)置為true呀袱。
(4)核心線(xiàn)程和非核心線(xiàn)程區(qū)別贸毕?
并沒(méi)有發(fā)現(xiàn)有明顯的標(biāo)志來(lái)標(biāo)志核心線(xiàn)程與非核心線(xiàn)程,而是以線(xiàn)程數(shù)來(lái)表達(dá)線(xiàn)程身份夜赵。0 ~ corePoolSize 表示線(xiàn)程池里只有核心線(xiàn)程明棍,corePoolSize ~ maximumPoolSize 表示線(xiàn)程池里核心線(xiàn)程滿(mǎn),存在非核心線(xiàn)程寇僧。線(xiàn)程池實(shí)際并不區(qū)分核心線(xiàn)程與非核心線(xiàn)程摊腋,是根據(jù)當(dāng)前的總體并發(fā)狀態(tài)來(lái)決定怎樣處理線(xiàn)程任務(wù)。實(shí)際上并不存在核心和非核心線(xiàn)程嘁傀,大家都是線(xiàn)程兴蒸,超過(guò)核心線(xiàn)程需要銷(xiāo)毀線(xiàn)程時(shí),當(dāng)前再獲取任務(wù)的線(xiàn)程先銷(xiāo)毀就行了细办。
(5)非核心線(xiàn)程能成為核心線(xiàn)程嗎橙凳?
線(xiàn)程池不區(qū)分核心線(xiàn)程于非核心線(xiàn)程,只是根據(jù)當(dāng)前線(xiàn)程池容量狀態(tài)做不同的處理來(lái)進(jìn)行調(diào)整笑撞,因此看起來(lái)像是有核心線(xiàn)程于非核心線(xiàn)程岛啸,實(shí)際上是滿(mǎn)足線(xiàn)程池期望達(dá)到的并發(fā)狀態(tài)。
(6)Runnable在線(xiàn)程池里如何執(zhí)行?
線(xiàn)程執(zhí)行Worker茴肥,Worker不斷從阻塞隊(duì)列里獲取任務(wù)來(lái)執(zhí)行坚踩。如果任務(wù)加入線(xiàn)程池失敗,則在拒絕策略里瓤狐,還有處理機(jī)會(huì)堕虹。
(7)如何釋放線(xiàn)程?
在線(xiàn)程沒(méi)有拿到任務(wù)后芬首,退出線(xiàn)程赴捞,通過(guò)processWorkerExit()可以證實(shí)上述所言。釋放工作線(xiàn)程也并沒(méi)有區(qū)分核心與非核心郁稍,也是隨機(jī)進(jìn)行的赦政。所謂隨機(jī),就是在前面所說(shuō)的區(qū)間范圍內(nèi),根據(jù)釋放策略恢着,哪個(gè)線(xiàn)程先達(dá)到獲取不到任務(wù)的狀態(tài)桐愉,就釋放哪個(gè)線(xiàn)程。在線(xiàn)程復(fù)用進(jìn)行死循環(huán)執(zhí)行任務(wù)的時(shí)候掰派,如線(xiàn)程通過(guò)take或者poll拿不到任務(wù)从诲,即線(xiàn)程池已經(jīng)沒(méi)有任務(wù)了,在線(xiàn)程達(dá)到最大存活時(shí)間時(shí)就會(huì)銷(xiāo)毀線(xiàn)程靡羡。
(8)線(xiàn)程池如何線(xiàn)程復(fù)用系洛?
線(xiàn)程池復(fù)用是一個(gè)非常巧妙的設(shè)計(jì)方式,假如我們來(lái)設(shè)計(jì)線(xiàn)程池略步,可能會(huì)有一個(gè)任務(wù)分派線(xiàn)程描扯,當(dāng)發(fā)現(xiàn)有線(xiàn)程空閑時(shí),就從任務(wù)緩存隊(duì)列中取一個(gè)任務(wù)交給空閑線(xiàn)程執(zhí)行趟薄。但是在這里绽诚,并沒(méi)有采用這樣的方式,因?yàn)檫@樣會(huì)要額外地對(duì)任務(wù)分派線(xiàn)程進(jìn)行管理杭煎,無(wú)形地會(huì)增加難度和復(fù)雜度恩够,這里直接讓執(zhí)行完任務(wù)的線(xiàn)程去任務(wù)緩存隊(duì)列里面取任務(wù)來(lái)執(zhí)行。
//線(xiàn)程執(zhí)行完任務(wù)之后羡铲,就會(huì)主動(dòng)去任務(wù)緩存隊(duì)列中取任務(wù)去執(zhí)行蜂桶,直到任務(wù)執(zhí)行完畢
while (task != null || (task = getTask()) != null) {
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
}
線(xiàn)程執(zhí)行完任務(wù)之后,會(huì)反復(fù)的從阻塞隊(duì)列中獲取任務(wù)來(lái)執(zhí)行犀勒,以此來(lái)實(shí)現(xiàn)線(xiàn)程復(fù)用。當(dāng)非核心線(xiàn)程超多最大存活時(shí)間之后就會(huì)銷(xiāo)毀非核心線(xiàn)程妥曲。
(9)線(xiàn)程數(shù)如何做選擇?
cpu密集型任務(wù)核心線(xiàn)程數(shù)選擇小一點(diǎn)贾费,io密集型任務(wù)選擇核心線(xiàn)程數(shù)大一點(diǎn),以及大部分是線(xiàn)程等待狀態(tài)檐盟。
6.AsyncTask源碼中的線(xiàn)程池
AsyncTask是一種輕量級(jí)的異步任務(wù)類(lèi)褂萧,可以在線(xiàn)程池中執(zhí)行耗時(shí)任務(wù),然后把執(zhí)行的進(jìn)度和最終結(jié)果傳遞給主線(xiàn)程并更新UI葵萎;
AsyncTask詳細(xì)內(nèi)容請(qǐng)參考:AsyncTask源碼解析导犹,AsyncTask使用的線(xiàn)程池如下代碼所示:
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work 核心線(xiàn)程個(gè)數(shù)是Math.min(CPU_COUNT - 1, 4)
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
}
通過(guò)以上代碼可以發(fā)現(xiàn)核心線(xiàn)程個(gè)數(shù)為Math.min(CPU_COUNT - 1, 4),最大線(xiàn)程數(shù)CPU_COUNT * 2 + 1羡忘,提交到AsyncTask的耗時(shí)任務(wù)最終在THREAD_POOL_EXECUTOR中執(zhí)行谎痢,并且所有提交到AsyncTask的耗時(shí)任務(wù)都是串行執(zhí)行的。只需要一個(gè)線(xiàn)程就可以完成AsyncTask功能卷雕,所以AsyncTask中的線(xiàn)程池中只會(huì)有一個(gè)線(xiàn)程节猿。
那么AsyncTask完成可以使用SingleThreadExecutor線(xiàn)程池,但是為什么AsyncTask不采用SingleThreadExecutor,猜測(cè)有以下兩個(gè)原因:
第一:AsyncTask之前的版本中滨嘱,提交到AsyncTask的任務(wù)都是并行執(zhí)行峰鄙,并不是像現(xiàn)在這樣串行執(zhí)行,所以不能使用SingleThreadExecutor線(xiàn)程池太雨。
第二:可以直接將多個(gè)任務(wù)直接拋到AsyncTask.ThreadPoolExecutor線(xiàn)程池中執(zhí)行吟榴,這樣可以繞過(guò)AsyncTask串行排隊(duì)策略,從而實(shí)現(xiàn)并發(fā)執(zhí)行囊扳。
如果大家有其他的一些看法吩翻,希望可以一起討論一下。
參考資料
深入分析java線(xiàn)程池的實(shí)現(xiàn)原理
關(guān)于Java面試宪拥,你應(yīng)該準(zhǔn)備這些知識(shí)點(diǎn)
Java并發(fā)編程:線(xiàn)程池的使用
你了解線(xiàn)程池嗎
《Java并發(fā)編程的藝術(shù)》