一、執(zhí)行任務
public void execute(Runnable command) {
// 如果任務為空柬焕,直接拋異常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 獲取線程池運行狀態(tài)
int c = ctl.get();
// 如果運行的線程數(shù)小于核心數(shù)沃饶,添加worker
if (workerCountOf(c) < corePoolSize) {
// 添加worker,并將core設為true石抡,表示是核心線程
if (addWorker(command, true))
return;
// 如果添加失敗重新獲取線程池運行狀態(tài)
c = ctl.get();
}
// 如果線程池在運行且隊列未滿
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果線程池不在運行且刪除任務成功檐嚣,執(zhí)行拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果線程池工作線程為0,添加空的任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 走到這里啰扛,說明核心線程數(shù)用完且任務隊列已滿嚎京,那么啟用非核心線程數(shù),如果失敗隐解,執(zhí)行拒絕策略
else if (!addWorker(command, false))
reject(command);
}
- 首先用核心線程執(zhí)行任務鞍帝,如果核心線程已滿,將任務添加到任務隊列煞茫;如果隊列也滿了帕涌,那么用非核心線程執(zhí)行任務
- addWorker(Runnable firstTask, boolean core)第一個參數(shù)是執(zhí)行的任務,第二個參數(shù)如果為true续徽,表示用的是核心線程蚓曼,false表示用的是非核心線程
- 成員變量ctl是AtomicInteger類型,用來表示線程運行狀態(tài)和線程數(shù)钦扭,高3位表示運行狀態(tài)纫版,低29位表示運行線程數(shù)
private boolean addWorker(Runnable firstTask, boolean core) {
//retry用來判斷是否可以添加任務,并更新線程數(shù)
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池已經(jīng)關閉且沒有任務土全,直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取工作線程數(shù)
int wc = workerCountOf(c);
// 如果工作線程數(shù)大于等于最大線程數(shù)捎琐,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 更新線程數(shù)会涎,如果成功裹匙,跳出retry
if (compareAndIncrementWorkerCount(c))
break retry;
// 走到這,說明更新線程數(shù)失敗了末秃,重新獲取線程池狀態(tài)
c = ctl.get(); // Re-read ctl
// 如果線程池狀態(tài)變化了概页,從retry重新執(zhí)行;如果線程池狀態(tài)沒有變化练慕,繼續(xù)for循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建Worker對象
w = new Worker(firstTask);
// 獲取線程
final Thread t = w.thread;
// 如果線程不為空惰匙,啟動線程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 再次獲取狀態(tài)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 因為線程還沒啟動,所以這里線程是alive铃将,說明是不正常的
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 將新創(chuàng)建的worker加入到workers集合
workers.add(w);
int s = workers.size();
// 更新largestPoolSize值
if (s > largestPoolSize)
largestPoolSize = s;
// workerAdded標記為true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果任務添加完成项鬼,啟動線程且將workerStarted標記為true
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果啟動線程失敗,從workers刪除新創(chuàng)建的任務劲阎,且執(zhí)行tryTerminate
if (! workerStarted)
addWorkerFailed(w);
}
// 最后返回線程是否啟動成功
return workerStarted;
}
- 首先去更新工作的線程數(shù)
- 創(chuàng)建Worker對象绘盟,此時會創(chuàng)建Thread類型的成員變量thread,Worker對象會傳入到該線程,因為Worker對象實現(xiàn)了Runnable方法龄毡,所以啟動線程thread時吠卷,會執(zhí)行Worker的run方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
二、執(zhí)行任務
1. 發(fā)起任務執(zhí)行
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 用來表示是否正常執(zhí)行任務沦零,true表示被打斷了祭隔,false表示未被打斷
boolean completedAbruptly = true;
try {
// 如果worker對象有傳入了任務或者任務隊列有任務
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果線程池狀態(tài)是停止以上的級別
// 或者
// 線程已經(jīng)被中斷且狀態(tài)是停止以上的級別且當前線程還不是打斷狀態(tài)
// 那么中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 方法執(zhí)行前,空方法路操,留給子類實現(xiàn)
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 執(zhí)行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 執(zhí)行完后疾渴,空方法,留給子類實現(xiàn)
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 被中斷標記設為false
completedAbruptly = false;
} finally {
// 任務完成后的操作
processWorkerExit(w, completedAbruptly);
}
}
- 首先執(zhí)行傳入worker對象里的任務寻拂,如果為空程奠,則從任務隊列里獲取任務
- 判斷是否需要打斷線程
- 執(zhí)行任務
- 任務完成相關的操作
2. 獲取任務
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// 獲取線程池狀態(tài)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果線程池狀態(tài)關閉且任務隊列為空
// 或者
// 線程池狀態(tài)是停止
// 那么將線程數(shù)減1,返回空任務
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 獲取線程數(shù)
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果allowCoreThreadTimeOut為真祭钉,表示核心線程也有超時時間瞄沙,一般默認為false
// 或者
// 工作線程數(shù)超過核心線程數(shù)
// 那么將timed設為真,設為真的目的是為了沒任務的時候慌核,減少工作線程的數(shù)量
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果工作線程大于最大線程數(shù)或者超時了
// 且
// 工作線程數(shù)大于1或者任務隊列不為空
// 那么工作線程減1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果有超時設置距境,那么帶有超時時間獲取任務,否則就阻塞獲取任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果任務不為空垮卓,返回任務
if (r != null)
return r;
// 如果任務為空垫桂,將超時設置為true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 這部分代碼能說明線程復用,超時未獲取到任務減少線程的原理
- 如果設置了核心線程有超時時間或者線程數(shù)超過了核心線程數(shù)粟按,那么采用帶超時的方式獲取任務诬滩,如果沒有獲取到任務,那么線程數(shù)會減1灭将;如果不采用帶超時的方式獲取任務疼鸟,那么一直等待,知道從任務隊列里獲取了任務
3. 退出任務執(zhí)行
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從任務集合中刪除任務
workers.remove(w);
} finally {
mainLock.unlock();
}
// 嘗試終止任務
tryTerminate();
int c = ctl.get();
// 如果線程池狀態(tài)小于STOP庙曙,說明還需要工作線程
if (runStateLessThan(c, STOP)) {
// 如果任務執(zhí)行被中斷了空镜,保證至少還有1個工作線程在執(zhí)行
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 相當于線程還在工作
addWorker(null, false);
}
}
- 從任務集合中刪除當前任務
- 嘗試終止線程池
- 如果線程池狀態(tài)是小于STOP,保證有工作線程在工作
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// isRunning(c) 表示在運行捌朴,不能停止
// runStateAtLeast(c, TIDYING)表示已經(jīng)停止了吴攒,沒必要停止
// (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())表示關閉但是有任務,不能停止
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作線程數(shù)不等于0砂蔽,停止1個空閑的工作線程洼怔,通過tryLock判斷是否空閑
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 走到這,說明工作線程是0了左驾,將狀態(tài)改為TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 終止線程池镣隶,空方法泽台,留給子類實現(xiàn)
terminated();
} finally {
// 最后將狀態(tài)改為TERMINATED,通知等待線程池終止的線程
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 如果線程池狀態(tài)不在運行矾缓,且工作線程數(shù)為0怀酷,那么最終將線程池狀態(tài)改為TERMINATED
三、拒絕策略
1. 直接拋異常(默認策略)
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2. 調(diào)用者的線程執(zhí)行任務
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3. 丟掉最早的任務
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
4. 空的策略
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
四嗜闻、線程工廠
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
// 設置線程組
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 設置線程前綴名
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
// 創(chuàng)建線程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
五蜕依、常見四種線程池
1.newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 可自定義線程數(shù)和線程工廠,核心線程數(shù)與最大線程數(shù)相等琉雳,這樣的話線程一旦創(chuàng)建样眠,就會一直運行
2.newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
- 只可以自定義線程工廠,核心線程數(shù)為0翠肘,最大線程數(shù)為Integer最大值
- 相當于沒有限制工作線程數(shù)檐束,任務量大的時候,會影響機器性能
- 空任務的時候束倍,會有1個線程在運行
3.newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
- 只創(chuàng)建一個工作線程被丧,可自定義線程工廠,可以定時執(zhí)行任務
4.newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
- 可定義線程工廠和核心工作線程數(shù)绪妹,最大工作線程數(shù)為Integer.MAX_VALUE
- 可定時執(zhí)行任務