0. ThreadPoolExecutor簡介
-
ExecutorService
的一種實現(xiàn)類丢氢,提供線程池的管理方法
ThreadPoolExecutor類圖.png
ThreadPoolExecutor
繼承了AbstractExecutorService
抽象類傅联,主要提供了線程池生命周期的管理、任務(wù)提交的方法疚察。
提交任務(wù):execute
submit
方法
關(guān)閉線程池:shutdown
shutdownNow
方法
1. 主要屬性介紹
ctl
AtomicInteger ctl
線程池的狀態(tài)及容量控制蒸走,低29位表示容量,高3位表示狀態(tài)
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
corePoolSize
核心線程數(shù)稍浆,當(dāng)前運(yùn)行線程數(shù)小于此數(shù)目時直接創(chuàng)建核心線程载碌,大于此數(shù)目時會先將任務(wù)入隊列猜嘱,入隊列失敗才會再創(chuàng)建非核心線程衅枫,但保證總數(shù)目不大于maximumPoolSize
,失敗執(zhí)行reject
方法朗伶。
maximumPoolSize
線程池中最多線程數(shù)目弦撩。
keepAliveTime
線程存活時間,線程數(shù)目大于corePoolSize
或者allowCoreThreadTimeOut
為true
時论皆,如有線程在此時間內(nèi)沒有執(zhí)行任務(wù)則會結(jié)束線程益楼。
allowCoreThreadTimeOut
是否允許核心線程到時間后結(jié)束線程。
workQueue
BlockingQueue
對象点晴,存放待執(zhí)行任務(wù)感凤。
2. 主要方法介紹
構(gòu)造方法
構(gòu)造方法有4個,但是最終都是調(diào)用最后一個粒督,主要是設(shè)置一些屬性
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
execute
向線程池提交任務(wù)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果當(dāng)前運(yùn)行的線程數(shù)小于corePoolSize陪竿,嘗試啟動一個新線程
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 成功放入隊列,重復(fù)檢查是否需要創(chuàng)建一個新線程
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 不能入隊屠橄,嘗試添加一個新線程族跛,如果失敗,拒絕任務(wù)
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//添加一個核心線程
if (addWorker(command, true))
return;
c = ctl.get();
}
//達(dá)到corePoolSize锐墙,嘗試放入等待隊列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次檢查礁哄,若線程池關(guān)閉,移除任務(wù)溪北,拒絕任務(wù)
if (! isRunning(recheck) && remove(command))
reject(command);
//若當(dāng)前沒有線程桐绒,添加一個非核心線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//放入等待隊列失敗夺脾,嘗試添加非核心線程
else if (!addWorker(command, false))
//添加非核心線程失敗,拒絕任務(wù)
reject(command);
}
addWorker
添加一個工作線程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//檢查隊列是否為空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//檢查線程容量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//線程數(shù)+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一個worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將worker放入工作集中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//啟動剛才的線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker
在創(chuàng)建worker對象時茉继,線程參數(shù)是worker自身
this.thread = getThreadFactory().newThread(this);
所以啟動worker線程時執(zhí)行的是runWorker
方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
getTask
方法負(fù)責(zé)從workQueue
等待隊列中取出待執(zhí)行任務(wù)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//getTask表示某個worker的當(dāng)前任務(wù)完成劳翰,來取下一個任務(wù),如果線程池已經(jīng)關(guān)閉馒疹,則不繼續(xù)執(zhí)行佳簸,worker數(shù)目-1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//數(shù)量超出限制或者超時,worker數(shù)目-1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//限時用poll颖变,否則take阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//r為null生均,超時了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker
方法執(zhí)行上面取到的task
final void runWorker(Worker w) {
//獲取執(zhí)行線程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果線程池STOP了,但是wt沒中斷腥刹,中斷之
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執(zhí)行task
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//worker退出時執(zhí)行昵时,如果是異常中斷,可能會新建一個worker來代替
processWorkerExit(w, completedAbruptly);
}
}
reject
任務(wù)提交失敗時捏悬,拒絕任務(wù)
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
ThreadPoolExecutor
提供了4中拒絕策略症昏,分別是
-
CallerRunsPolicy
在調(diào)用線程中執(zhí)行 -
AbortPolicy
丟棄任務(wù),拋出RejectedExecutionException
-
DiscardPolicy
僅丟棄任務(wù) -
DiscardOldestPolicy
丟棄隊列中最早的任務(wù)垫卤,然后添加本任務(wù)
shutdown威彰、shutdownNow
關(guān)閉線程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查權(quán)限,確保調(diào)用者有關(guān)閉權(quán)限
checkShutdownAccess();
//將線程池狀態(tài)設(shè)置為shutdown
advanceRunState(SHUTDOWN);
//中斷空閑線程
interruptIdleWorkers();
//結(jié)束回調(diào)
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//狀態(tài)設(shè)置為stop
advanceRunState(STOP);
//中斷所有線程
interruptWorkers();
//返回未執(zhí)行的任務(wù)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
3. 線程池的使用
通過JUC包內(nèi)提供的工具類Executors
來創(chuàng)建一個線程池
- 線程數(shù)固定的線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 單線程的線程池穴肘,順序執(zhí)行任務(wù)
其中FinalizableDelegatedExecutorService
是ExecutorService
的另一個實現(xiàn)類歇盼,使用了代理模式,其行為全部代理給ThreadPoolExecutor
對象评抚。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- 緩存線程池
沒有核心線程豹缀,隨用隨建,60s內(nèi)無任務(wù)則結(jié)束
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
4. 總結(jié)
- 通過JUC工具類
Executors
創(chuàng)建線程池 - 通過
execute
submit
向線程池提交任務(wù)- 提交
Callable
任務(wù)時慨代,submit
會返回Future
對象邢笙,可以通過此對象獲取結(jié)果 -
submit
也是通過execute
方法來提交任務(wù)
- 提交
- 提交任務(wù)時
- 如果當(dāng)前線程數(shù)小于核心線程數(shù),會創(chuàng)建一個核心線程侍匙,即使當(dāng)前有空閑線程
- 如果大于核心線程數(shù)氮惯,任務(wù)會入隊,入隊失敗的話丈积,會創(chuàng)建一個非核心線程來處理筐骇,如果創(chuàng)建失敗,則會拒絕任務(wù)
- 線程的結(jié)束
- 非核心線程在
keepAliveTime
時間內(nèi)未執(zhí)行任務(wù)則會結(jié)束 - 如果
allowCoreThreadTimeOut
為true
江滨,核心線程在keepAliveTime
時間內(nèi)未執(zhí)行任務(wù)也會結(jié)束
- 非核心線程在
- 拒絕任務(wù)策略
-
CallerRunsPolicy
在調(diào)用線程中執(zhí)行 -
AbortPolicy
丟棄任務(wù)铛纬,拋出RejectedExecutionException
-
DiscardPolicy
僅丟棄任務(wù) -
DiscardOldestPolicy
丟棄隊列中最早的任務(wù),然后添加本任務(wù)
-
- 線程池的結(jié)束
-
shutdown
關(guān)閉線程池唬滑,不再接受新任務(wù)告唆,但是會執(zhí)行完等待隊列的任務(wù) -
shutdownNow
關(guān)閉線程池棺弊,執(zhí)行完或中斷當(dāng)前運(yùn)行線程,返回等待隊列的任務(wù)列表
-
5. 參考
- ThreadPoolExecutor源碼build 1.8.0_121-b13版本
- 并發(fā)編程3:線程池的使用與執(zhí)行流程