涉及的主要方法
void shutdown();
List<Runnable> shutdownNow();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
線程池狀態(tài)
/*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
*
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
*
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
*/
void shutdown()
注釋:
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();
該方法會(huì)停止ExecutorService
添加新的任務(wù), 但是老任務(wù)還是會(huì)繼續(xù)執(zhí)行.
This method does not wait for previously submitted tasks to * complete execution.
這句話指的是該方法會(huì)立即返回, 但不一定代表之前提交的任務(wù)已經(jīng)全部完成了. 如果需要一個(gè)阻塞的方法, 可以調(diào)用awaitTermination
方法.
該方法內(nèi)部實(shí)現(xiàn)是設(shè)置了狀態(tài), 并interrupt
了所有的空閑線程, 使其不再接受新的任務(wù).
List<Runnable> shutdownNow()
注釋:
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();
該方法嘗試停止所有正在執(zhí)行的任務(wù), 停止對(duì)正在等待執(zhí)行的任務(wù)的處理, 并且返回正在等待執(zhí)行的任務(wù).
同shutdown()
, 該方法也是立刻返回的, 不會(huì)等到所有任務(wù)終止以后才返回.
因?yàn)榻K止是通過interrupt
實(shí)現(xiàn)的, 所以如果那個(gè)任務(wù)沒有對(duì)interrupt
做出正確響應(yīng), 那么該方法將無法終止該任務(wù). 所以傳進(jìn)去的任務(wù)需要對(duì)interrup
做出合適的響應(yīng).
boolean awaitTermination(long timeout, TimeUnit unit)
注釋:
/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
該方法是阻塞的, 阻塞到所有任務(wù)都完成(必須在shutdown
調(diào)用之后)或者超時(shí). 如果executor
在超時(shí)之前終止了, 那么返回true, 否則返回false.
注意, 如果不在awaitTermination
前調(diào)用shutdown
, 則即使在超時(shí)之前所有任務(wù)都已經(jīng)完成, awaitTermination
仍然會(huì)等待著, 而且最后一定返回false
, 因?yàn)闆]有shutdown
的調(diào)用不會(huì)使executor
的狀態(tài)變?yōu)?code>terminated.
例子:
public static void testAwaitTerminationWithoutShutdown(){
Runnable runnable = () -> {
System.out.println("I'm a very quick task");
};
executorService.submit(runnable);
try {
executorService.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
上面這段代碼將會(huì)阻塞3000
毫秒, 并且最終返回true
, 即使僅有的任務(wù)一瞬間就完成了, 因?yàn)闆]有對(duì)shutdown
的調(diào)用, 所以executorService
的狀態(tài)不可能會(huì)變成terminated
.
實(shí)例
shutdown
后再嘗試添加任務(wù):
public static void testShutdown(){
Runnable runnable = () -> {
try {
System.out.println("going to sleep for 1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
scheduledExecutorService.submit(runnable);
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.submit(runnable);
} catch (RejectedExecutionException e){
System.out.println("cannot add task after shutdown");
}
}
輸出(輸出順序不一定一致):
cannot add task after shutdown
going to sleep for 1s
從輸出可以到確實(shí)有RejectedExecutionException
被拋出了, 另外從這次輸出也可以看出shutdown
確實(shí)立馬就返回了.
shutdownNow()
關(guān)閉成功的例子:
public static void shutdownNowNormally() throws InterruptedException {
Runnable task = () -> {
try {
System.out.println(String.format("now is %s, I'm going to sleep for 10s", getCurrentTime()));
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println(String.format("someone asked me to terminate at: %s", getCurrentTime()));
}
};
scheduledExecutorService.submit(task);
Thread.sleep(1000);
scheduledExecutorService.shutdownNow();
}
輸出:
Now is 13:47:30, I'm going to sleep for 10s
someone asked me to terminate at: 13:47:31
shutdownNow()
不成功的例子:
因?yàn)?code>shutdownNow()最終是通過interrupt
來打斷工作線程, 如果任務(wù)沒有對(duì)interrupt
做出反應(yīng), 那么shutdownNow()
將無法正常terminate
.
public static void shutdownNowNotWorking(){
Runnable task = () ->{
while (true){
try {
System.out.println("I'm gonna sleep for 1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
// e.printStackTrace();
System.out.println(String.format("I'll ignore this InterruptedException. Now is : %s", getCurrentTime()));
}
}
};
scheduledExecutorService.submit(task);
scheduledExecutorService.shutdownNow();
}
輸出:
I'm gonna sleep for 1s
I'll ignore this InterruptedException. Now is : 13:53:12
I'm gonna sleep for 1s
I'm gonna sleep for 1s
I'm gonna sleep for 1s
I'm gonna sleep for 1s
void shutdown()
源碼
ThreadPoolExecutor
中的實(shí)現(xiàn):
設(shè)置狀態(tài)并interrupt
全部空閑的工作線程(即不讓其再繼續(xù)從任務(wù)隊(duì)列中獲取任務(wù)). 但是之前提交的任務(wù)還會(huì)被執(zhí)行.
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
interruptIdleWorkers
方法:
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
List<Runnable> shutdownNow()
源碼
ThreadPoolExecutor
中的源碼:
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
其中interruptWorkers
方法:
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
可以看到即使這個(gè)工作線程已經(jīng)拿到任務(wù)在執(zhí)行中, 也會(huì)被interrupt
, 這種情況需要我們的任務(wù)對(duì)interrupt
做出響應(yīng), 否則就會(huì)導(dǎo)致shutdownNow
也無法終止executorService
.
runWoker()
源碼
idle worker
指的就是正在執(zhí)行while (task != null || (task = getTask()) != null)
這個(gè)while
條件的worker
, 即還未成功取到task
的任務(wù).
而interruptIdleWorkers()
方法就是針對(duì)這個(gè)狀態(tài)的woker
, 如果getTask()
返回值是null
, 那么該woker
線程就會(huì)結(jié)束了. 從getTask()
源碼中可以看到, 如果shutdown
的時(shí)候, wokerQueue
(BlockingQueue
)的poll()
或者take()
方法能夠響應(yīng)interrupt()
, 從而導(dǎo)致getTask()
會(huì)繼續(xù)下一次循環(huán), 從而能夠檢查到shutdown
狀態(tài), 從而直接返回null
, 進(jìn)而使woker
退出. 所以shutdown
不會(huì)對(duì)已經(jīng)進(jìn)入while
body的woker
線程起作用.
而shutdown
僅僅調(diào)用了一次interruptIdleWorkers()
, 所以那些idle
的wokers
被直接結(jié)束了, 但是剩下的仍然在工作的workers
不會(huì)受到影響, 如果任務(wù)隊(duì)列中仍然有剩余的任務(wù), 那么這些woker
仍然能夠取出并且完成 (因?yàn)?code>shutdown()方法僅僅將狀態(tài)改成了SHUTDOWN
).
shutdownNow()
中設(shè)置狀態(tài)為STOP
, 并調(diào)用了interruptWorkers()
方法. 所以即使worker
已經(jīng)執(zhí)行到task.run()
, 如果我們傳進(jìn)去的任務(wù)的run
方法有對(duì)interrupt
做出合適響應(yīng), 那么依然可以被停止, 否則shutdownNow()
也無法終止. 另外結(jié)合getTask()
, 可以知道即使已經(jīng)緩存在任務(wù)隊(duì)列中的任務(wù)也不會(huì)被執(zhí)行了 (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())
).
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其中getTask()
方法:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
其中processWorkerExit
方法:
注意processWorkerExit
方法會(huì)調(diào)用tryTerminate()
方法. 所以每次有一個(gè)woker
結(jié)束的時(shí)候, 都會(huì)嘗試termiante
, 所以僅僅調(diào)用shutdown
也可以使得在全部任務(wù)完成以后terminate
.
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate()
源碼:
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
一個(gè)關(guān)鍵的地方在于interruptIdleWorkers(ONLY_ONE);
, 下面是關(guān)于這個(gè)參數(shù)的解釋:
/*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
*/
如果這個(gè)參數(shù)是true
的話, 那么一次最多interrupt
一個(gè)空閑的worker
. 因?yàn)槊恳粋€(gè)worker
在退出的時(shí)候都會(huì)調(diào)用processWorkerExit
方法, 而且processWorkerExit
方法中也會(huì)繼續(xù)調(diào)用tryTerminate()
方法, 所以注釋里面的propagate
就能解釋得通了. in case all threads are currently waiting
, 這里還不是很理解, 這里是說避免所有線程都在那時(shí)刻等待的情況, 但是這樣做的目的還是不很清楚.
總結(jié)
要讓ExecutorService
能夠被正常關(guān)閉, 需要任務(wù)本身對(duì)interrupted
這個(gè)狀態(tài)做出反應(yīng), 否則可能無法正常關(guān)閉ExecutorService
.