1. 什么是線程池闲延?
通俗來講是就是裝有線程的池子整袁,和我們使用到的各種連接池的概念類似挽拂,那么線程池解決了什么問題呢,來看下官方的闡述說明:
Thread pools address two different problems: they usually provide improved performance when
executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead,
and they provide a means of bounding and managing the resources, including threads, consumed
when executing a collection of tasks. Each {@code ThreadPoolExecutor} also maintains some basic
statistics, such as the number of completed tasks.
翻譯如下人灼,線程池主要解決了兩個問題:
1. 通過減少調(diào)用開銷围段,提高大量異步任務(wù)執(zhí)行的性能。
2. 可以管理和限制線程數(shù)量投放、任務(wù)數(shù)量等資源奈泪。
首先線程池通過長期持有一組線程,避免了線程頻繁的創(chuàng)建和銷毀帶來的額外開銷灸芳。并且提供了豐富的工具方法來管理和限制線程池狀態(tài)段磨、資源等。
2. 初識線程池
2.1 簡單的上手例子
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2
, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
threadPool.submit(() -> {
// 執(zhí)行的任務(wù)內(nèi)容僅作為實例使用耗绿,沒有任何實際意義。
System.out.println("task has been executing");
});
}
程序輸出:task has been executing
第一步我們通過使用ThreadPoolExecutor
構(gòu)造方法定義一個擁有兩個線程的線程池砾隅,并且成功了執(zhí)行我們提交的任務(wù)误阻。
2.2 剖析構(gòu)造方法參數(shù)的含義
ThreadPoolExecutor
的構(gòu)造方法有以下四個:
既然是剖析,當然是關(guān)注最下面那個參數(shù)最多的構(gòu)造方法啦~晴埂,先來看看每個參數(shù)的意義以及它們作用究反。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
... 省略中間無關(guān)代碼
}
corePoolSize
線程池里持有的核心線程的數(shù)量,即最小線程數(shù)量儒洛。額精耐,除非設(shè)定了allowCoreThreadTimeOut
為true,那么在指定時間內(nèi)如果核心線程一直處于空閑狀態(tài)琅锻,則會被終止卦停。
maximumPoolSize
線程池最大的線程數(shù)量。如果新的任務(wù)被添加恼蓬,存放任務(wù)的隊列已經(jīng)滿了惊完,并且當前的線程數(shù)量小于maximumPoolSize
數(shù)量,線程池將會主動的創(chuàng)建一個新的線程处硬。
keepAliveTime TimeUnit
兩個組合使用的參數(shù)小槐,用來控制線程池里線程最大的空閑時間上限。如果超出指定時間空閑的線程將會被終止荷辕,以釋放資源凿跳。
ThreadFactory
用來統(tǒng)一創(chuàng)建線程的工廠類件豌,可以通過接口ThreadFactory
自定義實現(xiàn),看下默認使用的線程工廠類內(nèi)容控嗜。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
通過newThread
方法可以大至得知一下信息:指定了線程名茧彤,設(shè)定了優(yōu)先級、設(shè)定為非后臺線程躬审,如果想修改成自定義值巫员,可以參考使用com.google.common.util.concurrent.ThreadFactoryBuilder
類。
BlockingQueue
存放待執(zhí)行任務(wù)隊列啄踊,特性如下:
- 任務(wù)先進先出
- 阻塞毡鉴,具體體現(xiàn)在隊列為空時阻塞出隊操作;隊列滿的時候阻塞入隊操作博助。
下面的表格列舉了BlockingQueue
中不同方法在操作不能立刻完成時的行為险污。
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
ThreadPoolExecutor
在添加任務(wù)的時候使用的是offer()
方法,如果隊列滿了則會返回一個false富岳。取任務(wù)的時候使用到了take()蛔糯、poll()、poll(time, unit)
窖式,具體使用的地方請看下面執(zhí)行流程的分析蚁飒。
常用的具體實現(xiàn)類分別是:
1. ArrayBlockingQueue 有界隊列,可以通過限定任務(wù)堆積數(shù)量
2. LinkedBlockingQueue 無界隊列萝喘,任務(wù)可以無限堆積淮逻,對執(zhí)行時間不敏感的業(yè)務(wù)可以使用這個隊列。
3. SynchronousQueue 阻塞隊列阁簸,不能緩沖任務(wù)爬早。新任務(wù)進來的時候會一直阻塞到有空閑的線程去處理。
RejectedExecutionHandler
用來處理不能被線程池執(zhí)行任務(wù)启妹。首先看下接口定義:
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
注釋里說明了什么情況下任務(wù)不能被執(zhí)行:1.沒有多余的線程或者任務(wù)隊列滿了 2.線程池被關(guān)閉了
看下其子類實現(xiàn)的具體的拒絕策略有哪些:
-
CallerRunsPolicy
直接使用調(diào)用submit或者execute的當前線程來執(zhí)行任務(wù)
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1
, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 3; i++) {
threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName());
});
}
Thread.sleep(200000000);
}
輸出的結(jié)果:
main
pool-1-thread-1
pool-1-thread-1
這里定義了固定線程大小的線程池并且隊列中只存放一個任務(wù)筛严,所以for循環(huán)添加的第三個任務(wù)會被執(zhí)行CallerRunsPolicy
策略,進而會被當前調(diào)用submit()方法的線程執(zhí)行饶米,也就是主線程(main)桨啃。
-
AbortPolicy
核心代碼如下:
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
比較直白,直接拋出非檢查異常檬输。
-
DiscardPolicy
直接忽略任務(wù)优幸,什么也不干。 -
DiscardOldestPolicy
核心代碼如下:
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
因為緩存任務(wù)是使用的隊列(先進先出)褪猛,所以在執(zhí)行e.getQueue().poll();
的時候把存放時間最久的任務(wù)出隊舍棄网杆。下面看下驗證示例:
public class Demo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1
, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 5; i++) {
threadPool.submit(new NamedRunnable(String.valueOf(i)));
}
Thread.sleep(10000);
}
}
class NamedRunnable implements Runnable {
String name;
public NamedRunnable(String name) {
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.print(name + ", ");
}
}
程序輸出:
0, 4, 中間1、2、3任務(wù)被丟棄了
3.核心概念
3.1 線程池狀態(tài)
線程池的線程數(shù)量碳却、線程池狀態(tài)都是通過一個原子類AtomicInteger
來控制队秩,其中低28位用來表示線程的數(shù)量(5億左右),高3位用來表示線程的狀態(tài)(允許有8中狀態(tài)昼浦,實際使用了5種)馍资。
代碼如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
重點關(guān)注下高3位線程池狀態(tài)的表示:
1110 0000 0000 0000 0000 0000 0000 0000 RUNNING
0000 0000 0000 0000 0000 0000 0000 0000 SHUTDOWN
0010 0000 0000 0000 0000 0000 0000 0000 STOP
0100 0000 0000 0000 0000 0000 0000 0000 TIDYING
0110 0000 0000 0000 0000 0000 0000 0000 TERMINATED
線程池狀態(tài)相互轉(zhuǎn)換關(guān)系如下圖:
3.2 Woker
Worker
繼承自AQS實現(xiàn)了簡單的非重入鎖的功能,并實現(xiàn)了Runnable
接口关噪,實現(xiàn)了run()
方法鸟蟹,調(diào)用了runWorker()
方法(ps:runWorker這個方法后續(xù)會分析到)。每個worker對象都保存了當前工作線程使兔,和需要執(zhí)行的任務(wù)建钥。代碼如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
3.3 提交任務(wù)過程
任務(wù)提交的方法有:submit()
和execute()
兩種方法,submit()
代碼如下:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
這里submit()
其實還是調(diào)用execute()
虐沥,execute()
代碼如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 當前線程數(shù)量如果小于核心線程數(shù)量熊经,創(chuàng)建新的線程去執(zhí)行任務(wù),而不是將任務(wù)加入隊列欲险。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果線程池是運行狀態(tài)镐依,并且入隊成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次檢查線程池狀態(tài),如果不符合條件將任務(wù)刪除
if (! isRunning(recheck) && remove(command))
// rejectHandler處理任務(wù)天试。
reject(command);
// 如果線程數(shù)量為0槐壳,創(chuàng)建新的線程去執(zhí)行任務(wù)。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 創(chuàng)建線程執(zhí)行當前任務(wù)
else if (!addWorker(command, false))
// 失敗則執(zhí)行rejectHandler
reject(command);
}
這里的邏輯還是比較清晰的喜每,下面看下addWorker()
:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取線程池狀態(tài)
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
* rs > SHUTDOWN || firstTask != null || workQueue.isEmpty()
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 線程數(shù)量
int wc = workerCountOf(c);
/**
* 線程不能大于最大數(shù)量限制宏粤。如果core為true則限定線程數(shù)量為corePoolSize
* 大小,否則限定為maximumPoolSize大小
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 線程池線程數(shù)量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果線程池狀態(tài)變更則繼續(xù)loop
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 添加的線程啟動是否成功
boolean workerStarted = false;
// 線程是否成功添加
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
/**
* 線程池為RUNNING狀態(tài)時灼卢,或者調(diào)用了shutdown()方法之后空閑狀態(tài)的
* 線程被清理了,則會主動的添加一個新的工作線程來執(zhí)行之前的任務(wù)来农。
* 如果線程池為SHUTDOWN狀態(tài)時鞋真,新的任務(wù)不能被添加進來,以前的任務(wù)可
* 以繼續(xù)被執(zhí)行沃于。
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 檢測Worker中的線程是否已經(jīng)被啟動涩咖。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 記錄線程池曾經(jīng)最大的線程數(shù)量。
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動worker實例中的線程繁莹。
t.start();
workerStarted = true;
}
}
} finally {
// 如果添加失敗檩互,則執(zhí)行清理的工作:線程池線程數(shù)量減1、移除剛剛添加的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
到這里任務(wù)就已經(jīng)提交過程就分析完畢了咨演≌⒆颍總結(jié)下:
1.當線程池的線程數(shù)量小于核心線程數(shù)量時,直接創(chuàng)建線程數(shù)量執(zhí)行
2.當線程池的線程數(shù)量大于等于核顯線程數(shù)來時,先將任務(wù)入隊等待工作中的線程依次處理饵较。
3.當線程入隊失敗時說明隊列已經(jīng)滿了拍嵌,就再次創(chuàng)建新的線程去處理(這里線程數(shù)量受maximumPoolSize影響),如果超過了限制循诉,則會將任務(wù)丟給rejectHandler處理横辆。
3.4 工作線程啟動過程
分析完了提交任務(wù)的過程,下面關(guān)注下啟動線程過程茄猫。下面代碼是Worker被添加之后開始工作的入口狈蚤,看下相關(guān)代碼:
// 省略無關(guān)代碼
if (workerAdded) {
// 啟動worker實例中的線程。
t.start();
workerStarted = true;
}
// 省略無關(guān)代碼
在上面3.2Worker
中有提到其實現(xiàn)了Runnable
接口中的run()
方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
那么worker實例中的線程啟動的時候是如何調(diào)用該runWorker(this)
方法的呢划纽?這個要從創(chuàng)建Worker實例說起~脆侮!
Worker創(chuàng)建代碼如下:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
其中threadFactory的默認實現(xiàn)是Executors.DefaultThreadFactory類,代碼如下:
class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
重點關(guān)注newThread(Runnable r)的方法阿浓,在getThreadFactory().newThread(this)這里偷偷
的傳入了this參數(shù)他嚷,而Worker實現(xiàn)了Runnable接口的run()方法,所以當線程啟動的時候會執(zhí)行
runWorker(this)這個方法芭毙。額筋蓖,有那么一點點繞~,大家明白就好退敦。
開始真正的啟動過程的分析:
final void runWorker(Worker w) {
// 因為調(diào)用當前方法是剛剛啟動的新的工作線程粘咖,這里自然就是獲取到的當前工作線程。
Thread wt = Thread.currentThread();
// 獲取任務(wù)侈百。
Runnable task = w.firstTask;
w.firstTask = null;
// 更新worker狀態(tài)瓮下,允許響應(yīng)中斷。
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循環(huán)獲取隊列中的任務(wù)钝域,getTask()下面分析
while (task != null || (task = getTask()) != null) {
w.lock();
// 這一大段就說了一件事情讽坏,線程池被shutdown()的時候,將當前線程置為
// 中斷狀態(tài)
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 執(zhí)行鉤子函數(shù)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行鉤子函數(shù)
afterExecute(task, thrown);
}
} finally {
task = null;
// 記錄當前工作線程已經(jīng)完成的任務(wù)數(shù)量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 異常的時候執(zhí)行清理工作例证。
processWorkerExit(w, completedAbruptly);
}
}
至此啟動過程就完成了路呜,總結(jié)下:
1. 通過while循環(huán)不斷的獲取任務(wù),如果獲取不到任務(wù)就進入finally執(zhí)行processWorkerExit(w,false)
2. 如果能獲取到就執(zhí)行任務(wù)织咧,中間如果報錯就拋異常胀葱,進入finally執(zhí)行processWorkerExit(w,true)
注意1和2執(zhí)行processWorkerExit()
方法時傳入的參數(shù)是不同,具體的區(qū)別后面會分析到笙蒙。
3.5 獲取任務(wù)的過程
上面介紹了完了worker的啟動過程抵屿,但是啟動之后從任務(wù)隊列中獲取task過程是什么樣的呢?
private Runnable getTask() {
// 上次獲取任務(wù)是否超時
boolean timedOut = false; // Did the last poll() time out?
// 死循環(huán)
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 線程池狀態(tài)>=STOP()獲取任務(wù)隊列為空的時候捅位,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/**
* 是否需要剔除工作線程轧葛。
* 如果設(shè)定了allowCoreThreadTimeOut為true則超時的核心線程也會被清理搂抒,
* 如果設(shè)置為false則只會清理超過corePoolSize數(shù)量之外的線程。
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* (如果線程數(shù)量大于maximumPoolSize的限制 || 上次獲取任務(wù)超時而且需要剔除多余的線程)
* && (線程數(shù)量大于1 或者 隊列為空)朝群,滿足以上條件則返回null
* /
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果需要剔除線程燕耿,則使用poll()限定時間內(nèi)沒有獲取到任務(wù),則返回空值
// 如果不需要剔除線程姜胖,則使用take()無限制等待誉帅,知道有任務(wù)返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果任務(wù)為空,則說明超時了右莱。
timedOut = true;
} catch (InterruptedException retry) {
// 線程如果被打斷則認為沒有超時蚜锨,繼續(xù)下次循環(huán)
timedOut = false;
}
}
}
上面各種判斷看著著實有點頭大,其實總結(jié)下就做了兩個方面的工作:返回可執(zhí)行的任務(wù)慢蜓,或者返回null亚再。返回null的意義在于退出上面3.4 runWorker()的循環(huán),進而執(zhí)行processWorkerExit(w, false)
晨抡,進行線程清理的工作氛悬。
那么在執(zhí)行getTask()
有哪些情況返回null呢?
1. 線程池狀態(tài)>=STOP或者隊列為空的時候耘柱,線程池能處于STOP狀態(tài)(上面有線程池狀態(tài)轉(zhuǎn)換的圖例)是因為調(diào)用了shutdownNow()的方法如捅,這時候不管任務(wù)隊列有沒有任務(wù)線程都會被清理。
2. 線程池數(shù)量超過maximumPoolSize的限制 && (線程數(shù)量大于1 或者 任務(wù)隊列為空)
在設(shè)定allowCoreThreadTimeOut為true的情況下:
3. 清理上次獲取任務(wù)超時的線程(核心線程也會被清理)
在設(shè)定allowCoreThreadTimeOut為false的情況下:
4. 清理上次獲取任務(wù)超時的線程(只會清理大于核心線程數(shù)量之外的線程)
3.6 線程池線程數(shù)量的控制
上接3.5獲取任務(wù)的過程调煎,這里看下清理線程的相關(guān)代碼:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是因為異常
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 將當前worker完成的任務(wù)數(shù)量納入總的數(shù)量
completedTaskCount += w.completedTasks;
// 移除worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試將線程池置為TERMINATED狀態(tài)
tryTerminate();
// 假如線程池狀態(tài)處于RUNNING镜遣、SHUTDOWN狀態(tài)則至少保證有一個線程繼續(xù)處理已有的任務(wù),
// 直至任務(wù)隊列為空
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
4. 寫在最后
希望博客的內(nèi)容能給廣大的Java道友提供一些的幫助和提升士袄。由于筆者水平有限悲关,如果內(nèi)容有誤,希望大家批評指出娄柳。