源碼不會(huì)騙你的=︱取3送埂!
一七问、背景
JAVA通過(guò)多線程的方式實(shí)現(xiàn)并發(fā)颈墅,為了方便線程池的管理蜡镶,JAVA采用線程池的方式對(duì)線線程的整個(gè)生命周期進(jìn)行管理。當(dāng)然恤筛,對(duì)簡(jiǎn)單的并發(fā)自已也可以對(duì)Thread進(jìn)行人工管理官还,但并不是此文的重點(diǎn),而且不建議方式毒坛。本文的重點(diǎn)是研究ThreadPoolExecutor管理線程池的策略望伦,讓大家對(duì)ThreadPoolExecutor的工作原理和過(guò)程有一個(gè)透徹的理解。
二煎殷、幾個(gè)關(guān)系
我們通常采用如下方法創(chuàng)建一個(gè)程池:
public class TestThread {
@Test
public void testCallable() throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<Integer>> results = new ArrayList<>();
int i = 10;
while (i-- > 0) {
results.add(exec.submit(new GetRand()));
}
for (Future<Integer> n : results) {
System.out.println(n.get());
}
}
}
上便通過(guò)一個(gè)工場(chǎng)類Executors創(chuàng)建了一個(gè)工作類屯伞,工場(chǎng)類返回一個(gè)ExecutorService 對(duì)象。Executors可以返回多種類型的線程池豪直,原諒我簡(jiǎn)單啰嗦一下這幾種線程池
newCachedThreadPool() | 緩存型池子劣摇,先查看池中有沒(méi)有以前建立的線程,如果有顶伞,就 reuse.如果沒(méi)有饵撑,就建一個(gè)新的線程加入池中;緩存型池子通常用于執(zhí)行一些生存期很短的異步型任務(wù)因此在一些面向連接的daemon型SERVER中用得不多剑梳。但對(duì)于生存期短的異步任務(wù),它是Executor的首選滑潘。能reuse的線程垢乙,必須是timeout IDLE內(nèi)的池中線程,缺省 timeout是60s,超過(guò)這個(gè)IDLE時(shí)長(zhǎng)语卤,線程實(shí)例將被終止及移出池追逮。注意,放入CachedThreadPool的線程不必?fù)?dān)心其結(jié)束粹舵,超過(guò)TIMEOUT不活動(dòng)钮孵,其會(huì)自動(dòng)被終止。 |
newFixedThreadPool(int) | newFixedThreadPool與cacheThreadPool差不多眼滤,也是能reuse就用巴席,但不能隨時(shí)建新的線程;-其獨(dú)特之處:任意時(shí)間點(diǎn),最多只能有固定數(shù)目的活動(dòng)線程存在诅需,此時(shí)如果有新的線程要建立漾唉,只能放在另外的隊(duì)列中等待,直到當(dāng)前的線程中某個(gè)線程終止直接被移出池子;-和cacheThreadPool不同堰塌,F(xiàn)ixedThreadPool沒(méi)有IDLE機(jī)制(可能也有赵刑,但既然文檔沒(méi)提,肯定非常長(zhǎng)场刑,類似依賴上層的TCP或UDP IDLE機(jī)制之類的)般此,所以FixedThreadPool多數(shù)針對(duì)一些很穩(wěn)定很固定的正規(guī)并發(fā)線程,多用于服務(wù)器;-從方法的源代碼看牵现,cache池和fixed 池調(diào)用的是同一個(gè)底層 池铐懊,只不過(guò)參數(shù)不同:fixed池線程數(shù)固定,并且是0秒IDLE(無(wú)IDLE) cache池線程數(shù)支持0-Integer.MAX_VALUE(顯然完全沒(méi)考慮主機(jī)的資源承受能力)瞎疼,60秒IDLE |
newScheduledThreadPool(int) | 這個(gè)池子里的線程可以按schedule依次delay執(zhí)行居扒,或周期執(zhí)行 |
SingleThreadExecutor() | -單例線程,任意時(shí)間池中只能有一個(gè)線程;-用的是和cache池和fixed池相同的底層池丑慎,但線程數(shù)目是1-1,0秒IDLE(無(wú)IDLE) |
以上幾種線程池都都反回了ExecutorService對(duì)象喜喂,也就是實(shí)際是靠ExecutorService來(lái)管理線程的整個(gè)生命周期。進(jìn)一步地竿裂,我們知道ExecutorService是一個(gè)接口玉吁,沒(méi)有具體實(shí)現(xiàn),最后的具體實(shí)現(xiàn)應(yīng)該由ThreadPoolExecutor實(shí)現(xiàn)的(當(dāng)然不包括周期線程池)腻异。別問(wèn)為什么进副,請(qǐng)自覺(jué)補(bǔ)充接口化編程;我們來(lái)看一下幾個(gè)類的關(guān)系,這里有兩條路線:
Executor 定義了一個(gè)execute接口影斑,ExecutorService繼承了Executor给赞,并定義了管理線程生命周期的接口;
- (1)AbstractExecutorService 實(shí)現(xiàn)了ExecutorService;ThreadPoolExecutor繼承了AbstractExecutorService;這條線是我們關(guān)注的重點(diǎn).
- (2)ScheduleExecutorService 繼承了ExecutorService矫户,并增加周期調(diào)度的接口片迅;ScheduledThreadPoolExecutor 實(shí)現(xiàn)了ScheduleExecutorService,用來(lái)管理周期線程池皆辽;(本文不介紹)
繼承關(guān)系如下圖所示:
我們?cè)賮?lái)看一下Executors工場(chǎng)類產(chǎn)生的線程池的方式如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
其最終的實(shí)現(xiàn)方式都是通過(guò)ThreadPoolExecutor進(jìn)行實(shí)現(xiàn)的柑蛇;通過(guò)上面的介紹,我們知道線程生命周期的管理驱闷,在本質(zhì)上是由ThreadPoolExecutor來(lái)實(shí)現(xiàn)的耻台,因此只需要透徹理解ThreadPoolExecutor的實(shí)現(xiàn)原理即可了解其如何管理線程。
三空另、ThreadPoolExecutor
3.1 構(gòu)造參數(shù)
先來(lái)看一下ThreadPoolExecutor的構(gòu)造函數(shù):
ublic ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
//默認(rèn)ThreadFactory,默認(rèn)handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 默認(rèn)handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
可以看到ThreadPoolExecutor有四個(gè)構(gòu)造函數(shù)盆耽,只有參數(shù)有所不同,其它三個(gè)構(gòu)造函數(shù)的具體實(shí)現(xiàn)都是由第一個(gè)構(gòu)造函數(shù)來(lái)完成的扼菠。那我們只需要來(lái)研究一下第一個(gè)構(gòu)造函數(shù)就可了U髯帧!=吭ァ!
先來(lái)看一下每個(gè)參數(shù)的含義(先了解一下大概意思):
先看一下接口注釋
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
看懂了畅厢,就沒(méi)必要看下面的中文解釋了哈:
corePoolSize:核心線程池大蟹肓 ;啥意思呢框杜?就是線程池應(yīng)該維護(hù)的最小線程的數(shù)量浦楣, 線程池?cái)?shù)量小于該值,則來(lái)一個(gè)新線程時(shí)咪辱,就會(huì)創(chuàng)建一個(gè)新線程振劳,無(wú)論線程池中有無(wú)線程空閑.
maximumPoolSize: 最大線程池大小油狂;它表示線程池中最大創(chuàng)建線程池的數(shù)量.
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止历恐。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)专筷,keepAliveTime才會(huì)起作用弱贼,直到線程池中的線程數(shù)不大于. corePoolSize,
即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí)磷蛹,如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime吮旅,則會(huì)終止,直到線程池中的線程數(shù)不超過(guò)corePoolSize味咳。但是如果調(diào)用了. allowCoreThreadTimeOut(boolean)方法庇勃,
在線程池中的線程數(shù)不大于corePoolSize時(shí)檬嘀,keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0责嚷;
unit:和keepAliveTime相當(dāng)于同一個(gè)參數(shù)鸳兽,有以下幾個(gè)單位:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
BlockingQueue<Runnable> workQueue:阻塞的任務(wù)隊(duì)例;用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù)再层,這個(gè)參數(shù)的選擇也很重要贸铜,會(huì)對(duì)線程池的運(yùn)行過(guò)程產(chǎn)生重大影響,一般來(lái)說(shuō)聂受,這里的阻塞隊(duì)列有以下幾種選擇:
ArrayBlockingQueue;//內(nèi)部維護(hù)一個(gè)數(shù)組蒿秦,F(xiàn)IFO策略
LinkedBlockingQueue;//隊(duì)列使用FIFO策略,內(nèi)部維護(hù)了一個(gè)單向鏈表蛋济,默認(rèn)最大容量是Integer.MAX_VALUE棍鳖,動(dòng)態(tài)生成節(jié)點(diǎn)
所以,線程池最多只有corePoolSize個(gè)thread被創(chuàng)建碗旅,其他都會(huì)在queue中被阻塞
適用場(chǎng)景渡处,確保每個(gè)請(qǐng)求都能被執(zhí)行,不被拒絕
SynchronousQueue;//相當(dāng)于隊(duì)列長(zhǎng)度為0祟辟,因此只要達(dá)到 maximumPoolSize就會(huì)拒絕新提交的任務(wù)
threadFactory:線程工廠医瘫,用來(lái)創(chuàng)建線程.
handler:當(dāng)阻塞隊(duì)列和線程池都滿了后,拒絕任務(wù)的策略旧困,有以下幾種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常醇份。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常吼具。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù)僚纷,然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
3.2 執(zhí)行過(guò)程
我們以execute為例來(lái)看一個(gè)任務(wù)的執(zhí)行過(guò)程(如下流程圖):
我們用源碼解釋一下具體的實(shí)現(xiàn)過(guò)程:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
再來(lái)翻譯一下:
任務(wù)會(huì)通過(guò)創(chuàng)建一個(gè)新線程或者用線程池中的空閑線程來(lái)執(zhí)行;
如果該任務(wù)由于executor被關(guān)閉或者隊(duì)列已滿的原因被拒絕執(zhí)行拗盒,剛會(huì)交給RejectedExecutionHandler來(lái)處理.
前面是函數(shù)注釋哈怖竭,再看函數(shù)內(nèi)的注釋:
任務(wù)執(zhí)行要經(jīng)過(guò)以下三個(gè)步驟(三個(gè)if):
- 1.如果當(dāng)前線程池中的線程數(shù)量小于corePoolSize,則通過(guò)addWorker新創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),addWorker會(huì)檢查workerCount和runState陡蝇,如果創(chuàng)建線程失敗則返回false(addWorker稍后解釋)
- 2.如果runState = RUNNING痊臭,且成功加入隊(duì)列當(dāng)中,還需要進(jìn)行雙因素驗(yàn)證登夫,如果線程池關(guān)閉趣兄,則移除隊(duì)列中的線程,并reject;如果當(dāng)前線程池?zé)o線程悼嫉,則新創(chuàng)建一個(gè)線程艇潭;
- 3.如果無(wú)法將任務(wù)加入隊(duì)例,則嘗試新建線程;如果失敗蹋凝,則reject
解釋一下線程池的runState(來(lái)自參考文獻(xiàn)的拷貝).
在ThreadPoolExecutor中定義了一個(gè)volatile變量鲁纠,另外定義了幾個(gè)static final變量表示線程池的各個(gè)狀態(tài):
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示當(dāng)前線程池的狀態(tài),它是一個(gè)volatile變量用來(lái)保證線程之間的可見(jiàn)性鳍寂;
下面的幾個(gè)static final變量表示runState可能的幾個(gè)取值改含。
當(dāng)創(chuàng)建線程池后,初始時(shí)迄汛,線程池處于RUNNING狀態(tài)捍壤;
如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài)鞍爱,此時(shí)線程池不能夠接受新的任務(wù)鹃觉,它會(huì)等待所有任務(wù)執(zhí)行完畢;
如果調(diào)用了shutdownNow()方法睹逃,則線程池處于STOP狀態(tài)盗扇,此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù)沉填;
當(dāng)線程池處于SHUTDOWN或STOP狀態(tài)疗隶,并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后翼闹,線程池被設(shè)置為TERMINATED狀態(tài)斑鼻。
從上述過(guò)程中知道有三個(gè)關(guān)鍵操作:addWorker(添加任務(wù)),workQueue.offer(將任務(wù)添加至隊(duì)列)猎荠,reject(拒絕任務(wù))坚弱;下面我們來(lái)看看這三個(gè)方法的具體執(zhí)行過(guò)程:
addWorker(添加任務(wù))
同樣先讀源碼,execute代碼中法牲,每個(gè)條件都執(zhí)行了addworker,但參數(shù)都有不同琼掠,大家注意拒垃。
addWorker源碼:
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
下面按照不同條件來(lái)說(shuō)明addWorker都做了什么,直接在源碼中注釋了哈:
current_thread_num < corePoolSize
/*********** execute **************/
public void execute(Runnable command) {
... ...
if (workerCountOf(c) < corePoolSize) { // 當(dāng)前線程數(shù) < corePoolSize
if (addWorker(command, true))
return;
c = ctl.get();
... ...
}
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) { // 參數(shù) firstTask != null, core = true
... // 驗(yàn)證是否滿足可新增線程的條件瓷蛙,曰:滿足^_^
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 通過(guò)ThreadFactory創(chuàng)建一個(gè)線程悼瓮,并且線程用于執(zhí)行firstTask
final Thread t = w.thread;
if (t != null) {
... ...
try {
... ...
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
... ...
workers.add(w);
workerAdded = true;
}
} finally {
... ...
}
if (workerAdded) { // 上面檢查是否確實(shí)添加線程成功,曰:成功
t.start(); // 線程啟動(dòng)艰猬,調(diào)用worker.run
workerStarted = true;
}
}
} finally {
... ...
}
... ...
}
/*************** worker.run ***************/
public void run() { runWorker(this);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // firstTask不為null
w.firstTask = null;
... ...
try {
while (task != null || (task = getTask()) != null) { // 一進(jìn)來(lái)就滿足横堡,就執(zhí)行當(dāng)前這個(gè)task
w.lock();
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
... ...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
maximumPoolSize > current_thread_num >= corePoolSize
/*********** execute **************/
public void execute(Runnable command) {
... ...
if (isRunning(c) && workQueue.offer(command)) { // 放入阻塞隊(duì)列 - workQueue.offer(command)
int recheck = ctl.get();
// 成功加入阻塞隊(duì)列后,仍需要進(jìn)行double-check冠桃,以防 線程終止了或者線程池在進(jìn)入這個(gè)方法的時(shí)候已經(jīng)shutdown了
if (! isRunning(recheck) && remove(command)) // 如果double check失敗命贴,remove用來(lái)回滾 workQueue.offer 的操作,執(zhí)行 workQueue.remove(task)
reject(command); // 拒絕當(dāng)前的任務(wù)
else if (workerCountOf(recheck) == 0) // 如果當(dāng)前沒(méi)有線程就創(chuàng)建一個(gè)
addWorker(null, false); // 注意參數(shù)是 (null, false)
} else if (!addWorker(command, false)) // 如果不能放入阻塞隊(duì)列,那么久創(chuàng)建一個(gè)thread執(zhí)行當(dāng)前任務(wù)
reject(command);
... ...
}
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) { // 參數(shù) firstTask = null, core = false
... ...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
... ...
try {
... ...
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
... ...
workers.add(w);
workerAdded = true;
}
} finally {
... ...
}
if (workerAdded) {
t.start(); // 線程啟動(dòng)胸蛛,調(diào)用worker.run
workerStarted = true;
}
}
} finally {
... ...
}
return workerStarted;
}
/*************** worker.run ***************/
public void run() { runWorker(this);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // firstTask == null
w.firstTask = null;
... ...
try {
while (task != null || (task = getTask()) != null) { // 1. 第一次進(jìn)入污茵,task=null,執(zhí)行g(shù)etTask葬项;2. 獲取到非null的task之后泞当,執(zhí)行task
... ...
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
... ...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/********* getTask ************/
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
... ...
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否設(shè)置了超時(shí)時(shí)間或者線程數(shù)已經(jīng)達(dá)到corePoolSize
... ...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 如果設(shè)置了超時(shí),通過(guò) workQueue.poll 取出還有效的任務(wù)
workQueue.take(); // 如果沒(méi)有設(shè)置超時(shí)民珍,通過(guò) workQueue.take取出任務(wù)
... ...
} catch (InterruptedException retry) {
... ...
}
}
}
這一個(gè)過(guò)程最難理解襟士,即maximumPoolSize > current_thread_num >= corePoolSize時(shí),會(huì)往隊(duì)列中尾部添加一個(gè)任務(wù)嚷量,并從頭部中取出一個(gè)任務(wù)來(lái)運(yùn)行.
current_thread_num >= maximumPoolSize
/*********** execute **************/
public void execute(Runnable command) {
... ...
else if (!addWorker(command, false)) // addWorker = false陋桂,執(zhí)行reject
reject(command);
... ...
}
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) {
for (;;) {
... ...
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 超過(guò)maximumPoolSize之后,返回false; 上面 addWorker=false
return false;
}
}
... ...
}
這個(gè)就是直接reject了
workQueue
workQueue決定如何添加隊(duì)列津肛,如何取隊(duì)列章喉,和并發(fā)關(guān)系并不大,而且是獨(dú)立的一塊身坐,所以這里就不再詳情介紹秸脱,給出三篇文檔:
SynchronousQueue.
LinkedBlockingQueue.
ArrayBlockingQueue.
reject(拒絕任務(wù))
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
按照四個(gè)不同的reject策略詳述,RejectedExecutionHandler部蛇,默認(rèn)使用 AbortPolicy.
class | comment |
---|---|
DiscardPolicy | 什么也不做摊唇,丟棄當(dāng)前的task |
DiscardOldestPolicy | executor如果被關(guān)閉,什么也不做涯鲁;如果沒(méi)被關(guān)閉巷查,丟棄隊(duì)列中最老的task【queue.poll】,重新執(zhí)行execute(task)【放入隊(duì)列】 |
CallerRunsPolicy | 如果當(dāng)前executor沒(méi)有被關(guān)閉抹腿,那么使用當(dāng)前執(zhí)行execute方法的線程執(zhí)行task岛请;關(guān)閉了的話,就什么也不做 |
AbortPolicy | 拋出一個(gè) RejectedExecutionException |
你會(huì)根據(jù)自己的業(yè)務(wù)場(chǎng)景創(chuàng)建線程池了嗎警绩?
參考文獻(xiàn):
https://gold.xitu.io/entry/587601a7b123db4a2ed68485/view.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html