線程是程序執(zhí)行流的最小單元涡尘,合理的使用線程可以充分利用系統(tǒng)資源忍弛、提高吞吐率以及加快響應(yīng)時(shí)間。然而創(chuàng)建線程的消耗很大考抄,為了節(jié)約系統(tǒng)資源细疚,方便管理和監(jiān)控,我們通常會(huì)使用線程池川梅。java 1.5中引入了線程池框架executors疯兼,給我們提供了一個(gè)開箱即用的線程池實(shí)現(xiàn)然遏。
案例1
public class ExecutorCase {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executorService.execute(new Task());
}
}
private static class Task implements Runnable {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(300);
System.out.println(Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
運(yùn)行結(jié)果為:
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
上面的用例生成了一個(gè)大小為3的線程池,并提交了若干任務(wù)吧彪。查看newFixedThreadPool可見其代碼如下啦鸣,可見實(shí)際上它創(chuàng)建了一個(gè)ThreadPoolExecutor
的實(shí)例。其實(shí)上来氧,Executors
的各種靜態(tài)方法中诫给,除了與ScheduledExecutorService
有關(guān)的都是以不同的參數(shù)創(chuàng)建了一個(gè)ThreadPoolExecutor
的實(shí)例。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
參數(shù)說明
ThreadPoolExecutor的構(gòu)建函數(shù)為:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
每個(gè)參數(shù)的說明如下:
corePoolSize
線程池中的核心線程數(shù)啦扬,當(dāng)提交一個(gè)任務(wù)時(shí)中狂,線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize扑毡;如果當(dāng)前線程數(shù)為corePoolSize胃榕,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行瞄摊。這些線程將一直存活勋又,除非設(shè)置了allowCoreThreadTimeOut
maximumPoolSize
線程池中的最大線程數(shù)。當(dāng)阻塞隊(duì)列已滿但客戶端還繼續(xù)向線程池提交任務(wù)時(shí)换帜,若corePoolSize小于maximumPoolSize楔壤,則會(huì)新建線程加入線程池中keepAliveTime
線程池中的線程的存活時(shí)間TimeUnit
keepAliveTime的時(shí)間單位workQueue
某種blockingQueue的實(shí)現(xiàn),用于在沒有空閑線程時(shí)暫時(shí)存儲(chǔ)需要處理的任務(wù)**threadFactory **
用于自定義線程的創(chuàng)建惯驼,比如給線程一些標(biāo)識(shí)度比較高的名字蹲嚣。**handler **
線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了祟牲,且沒有空閑的工作線程隙畜,如果繼續(xù)提交任務(wù),必須進(jìn)行某種處理说贝,默認(rèn)的處理方式是拋出RejectedExecutionException
異常议惰。
了解了上面這些參數(shù)的含義,對(duì)Executors中的各種預(yù)定義線程池的特性應(yīng)當(dāng)就會(huì)比較了解了乡恕。
ThreadPoolExecutor的內(nèi)部狀態(tài)
其內(nèi)部狀態(tài)如下言询,用ctl的低29位存儲(chǔ)線程數(shù)量,用高三位存儲(chǔ)線程的運(yùn)行狀態(tài)几颜,CAPACITY 頭三位為0倍试,其他位為1,可以方便的獲取狀態(tài)和線程數(shù)蛋哭。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
提交方式
有兩種提交方式
Executor.execute()
ExecutorService.submit()
前者簡單的提交一個(gè)線程县习,后者返回一個(gè)FutureTask,可以獲取提交的計(jì)算結(jié)果。區(qū)別留待后續(xù)討論躁愿。
execute代碼
翻譯一下代碼中的注釋:
- 如果線程數(shù)小于corePoolSize叛本,則向線程池中添加一個(gè)新線程。
- 否則將其加入等待執(zhí)行隊(duì)列中彤钟,如果加入成功来候,則需要再檢查一下是否需要添加新線程或是線程池狀態(tài)已經(jīng)發(fā)生了變化。
- 如果隊(duì)列已滿逸雹,則再嘗試添加新的線程营搅,不過調(diào)用參數(shù)core會(huì)發(fā)生改變,說明不是添加的“核心線程”
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
addWorker的實(shí)現(xiàn)
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
這部分代碼首先判斷線程池的狀態(tài)梆砸,如果線程池的狀態(tài)值大于或等SHUTDOWN转质,則不處理提交的任務(wù)(有一種特殊情況,就是狀態(tài)處于SHUTDOWN 但是已提交了一個(gè)任務(wù)或是任務(wù)隊(duì)列里還有待完成的隊(duì)列)帖世,直接返回休蟹;然后通過參數(shù)core判斷當(dāng)前需要?jiǎng)?chuàng)建的線程是否為核心線程,如果core為true日矫,且當(dāng)前線程數(shù)小于corePoolSize赂弓,則跳出循環(huán),開始創(chuàng)建新的線程:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker( firstTask );
final Thread t = w.thread;
if ( t != null )
{
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/*
* Recheck while holding lock.
* Back out on ThreadFactory failure or if
* shut down before lock acquired.
*/
int rs = runStateOf( ctl.get() );
if ( rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null) )
{
if ( t.isAlive() ) /* precheck that t is startable */
throw new IllegalThreadStateException();
workers.add( w );
int s = workers.size();
if ( s > largestPoolSize )
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if ( workerAdded )
{
t.start();
workerStarted = true;
}
}
} finally {
if ( !workerStarted )
addWorkerFailed( w );
}
return(workerStarted);
這里的核心邏輯就是生成一個(gè)新Worker
哪轿,將其加入到workers中盈魁,并且啟動(dòng)與該worker相關(guān)的線程。
Worker簡介
查看Worker類的源碼缔逛,我們可以發(fā)現(xiàn)其繼承了AbstractQueuedSynchronizer
备埃,可以方便的實(shí)現(xiàn)工作線程的中止操作姓惑;并實(shí)現(xiàn)了Runnable
褐奴,可以將自身作為一個(gè)任務(wù)在工作線程中執(zhí)行;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
在構(gòu)造方法中于毙,先將state設(shè)置為-1敦冬,并且使用線程工廠創(chuàng)建線程。
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
在run方法中唯沮,將所有任務(wù)全部委托給了runWorker脖旱。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
首先,調(diào)用unlock
將AQS標(biāo)志設(shè)置為0表示開始接收中斷介蛉,然后保證在線程池即將停止時(shí)確定線程被中斷萌庆,而沒有被停止時(shí)清除中斷信號(hào)。注意币旧,Thread.interrupted()會(huì)清除中斷標(biāo)志践险。而后,調(diào)用task的run方法,在完成了第一個(gè)任務(wù)后巍虫,會(huì)使用getTask方法從阻塞隊(duì)列中獲取等待的任務(wù)彭则,如果隊(duì)列中沒有任務(wù),getTask方法會(huì)被阻塞并掛起占遥,不會(huì)占用cpu資源:
getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
注意在getTask方法中俯抖,會(huì)根據(jù)allowCoreThreadTimeOut 以及線程數(shù)與corePoolSize的關(guān)系給timed賦值,循環(huán)的從workQueue中獲取Runnable瓦胎。注意當(dāng)線程數(shù)大于最大線程數(shù)或timed為true且獲取任務(wù)超時(shí)時(shí)芬萍,會(huì)直接返回null,從而使相關(guān)的Worker runTask的線程退出搔啊。
submit的實(shí)現(xiàn)
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看出先將Callable包裝成了FutureTask(實(shí)現(xiàn)了RunnableFuture)然后進(jìn)行execute担忧。其內(nèi)部狀態(tài)為:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
其get方法為:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
可以看到核心為調(diào)用awaitDone方法:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
這段代碼的邏輯如下:
- 如果主線程被中斷,則拋出中斷異常坯癣;
- 判斷FutureTask當(dāng)前的state瓶盛,如果大于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完成示罗,則直接返回惩猫;
- 如果當(dāng)前state等于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完蚜点,這時(shí)主線程只需通過yield方法讓出cpu資源轧房,等待state變成NORMAL;
- 通過WaitNode類封裝當(dāng)前線程
WaitNode() { thread = Thread.currentThread(); }
绍绘,為了提高效率通過UNSAFE的CAS將自己添加到waiters鏈表奶镶; - 最終通過LockSupport的park或parkNanos掛起線程;
FutureTask的run實(shí)現(xiàn)
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
這里首先查看狀態(tài)必須為NEW陪拘,并且通過CAS將當(dāng)前線程設(shè)置為Future的runner厂镇,然后調(diào)用Callable
的call方法,并將結(jié)果或異常存儲(chǔ)左刽,使用Unsafe修改state的狀態(tài)捺信,并調(diào)用finishCompletion
方法。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
這里讀取waiters隊(duì)列(注意每次獲取都使用了CAS)欠痴,獲取以后處理隊(duì)列中所有節(jié)點(diǎn)迄靠。還記得在awaitDone中我們調(diào)用LockSupport.parkNanos(this, nanos)
或LockSupport.parkNanos(this)
將線程掛起么?在finishCompletion
方法中我們會(huì)調(diào)用LockSupport.unpark(t)
喚醒在等待的線程喇辽。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor使用ScheduledFutureTask
作為自己的task掌挚,其run方法中在調(diào)用run后又重新設(shè)置了自己的狀態(tài),而后更新下次運(yùn)行時(shí)間菩咨,將自己重新加入workQueue中吠式。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
在schedule方法中,組裝了RunnableScheduledFuture舅世,而后調(diào)用delayedExecute。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
在delayedExecute
中奇徒,調(diào)用了ensurePrestart
雏亚,從而使用addWorker添加了工作進(jìn)程:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
而ScheduledThreadPoolExecutor的workQueue是一個(gè)delayQueue,可以在指定時(shí)間后才獲取到隊(duì)列中的元素摩钙,這樣就實(shí)現(xiàn)了定時(shí)反復(fù)運(yùn)行罢低。