4.5 Netty優(yōu)雅退出機制
你也許已經(jīng)習(xí)慣了使用下面的代碼身坐,使一個線程池退出:
bossGroup.shutdownGracefully();
那么它是如何工作的呢万牺?由于bossGroup是一個線程池懒构,線程池的關(guān)閉要求其中的每一個線程關(guān)閉。而線程的實現(xiàn)是在SingleThreadEventExecutor類侨赡,所以我們將再次回到這個類,首先看其中的shutdownGracefully()方法粱侣,其中的參數(shù)quietPeriod為靜默時間羊壹,timeout為截止時間,此外還有一個相關(guān)參數(shù)gracefulShutdownStartTime即優(yōu)雅關(guān)閉開始時間甜害,代碼如下:
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
if (isShuttingDown()) {
return terminationFuture(); // 正在關(guān)閉阻止其他線程
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture(); // 正在關(guān)閉阻止其他線程
}
int newState;
wakeup = true;
oldState = STATE_UPDATER.get(this);
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default: // 一個線程已修改好線程狀態(tài)舶掖,此時這個線程才執(zhí)行16行代碼
newState = oldState;
wakeup = false; // 已經(jīng)有線程喚醒,所以不用再喚醒
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break; // 保證只有一個線程將oldState修改為newState
}
// 隱含STATE_UPDATER已被修改尔店,則在下一次循環(huán)返回
}
// 在default情況下會更新這兩個值
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (oldState == ST_NOT_STARTED) {
thread.start();
}
if (wakeup) {
wakeup(inEventLoop);
}
return terminationFuture();
}
這段代碼真是為多線程同時調(diào)用關(guān)閉的情況操碎了心眨攘,我們抓住其中的關(guān)鍵點:該方法只是將線程狀態(tài)修改為ST_SHUTTING_DOWN并不執(zhí)行具體的關(guān)閉操作(類似的shutdown方法將線程狀態(tài)修改為ST_SHUTDOWN)。for()循環(huán)是為了保證修改state的線程(原生線程或者外部線程)有且只有一個嚣州。如果你還沒有理解這句話鲫售,請查閱compareAndSet()方法的說明然后再看一遍郁油。39-44行代碼之所以這樣處理萍虽,是因為子類的實現(xiàn)中run()方法是一個EventLoop即一個循環(huán)。40行代碼啟動線程可以完整走一遍正常流程并且可以處理添加到隊列中的任務(wù)以及IO事件滋恬。43行喚醒阻塞在阻塞點上的線程匀哄,使其從阻塞狀態(tài)退出秦效。要從一個EventLoop循環(huán)中退出,有什么好方法嗎涎嚼?可能你會想到這樣處理:設(shè)置一個標記阱州,每次循環(huán)都檢測這個標記,如果標記為真就退出法梯。Netty正是使用這種方法苔货,NioEventLoop的run()方法的循環(huán)部分有這樣一段代碼:
if (isShuttingDown()) { // 檢測線程狀態(tài)
closeAll(); // 關(guān)閉注冊的channel
if (confirmShutdown()) {
break;
}
}
查詢線程狀態(tài)的方法有三個,實現(xiàn)簡單立哑,一并列出:
public boolean isShuttingDown() {
return STATE_UPDATER.get(this) >= ST_SHUTTING_DOWN;
}
public boolean isShutdown() {
return STATE_UPDATER.get(this) >= ST_SHUTDOWN;
}
public boolean isTerminated() {
return STATE_UPDATER.get(this) == ST_TERMINATED;
}
需要注意的是調(diào)用shutdownGracefully()方法后線程狀態(tài)為ST_SHUTTING_DOWN夜惭,調(diào)用shutdown()方法后線程狀態(tài)為ST_SHUTDOWN。isShuttingDown()可以一并判斷這兩種調(diào)用方法铛绰。closeAll()方法關(guān)閉注冊到NioEventLoop的所有Channel诈茧,代碼不再列出。confirmShutdown()方法在SingleThreadEventExecutor類捂掰,確定是否可以關(guān)閉或者說是否可以從EventLoop循環(huán)中跳出敢会。代碼如下:
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false; // 沒有調(diào)用shutdown相關(guān)的方法直接返回
}
if (!inEventLoop()) { // 必須是原生線程
throw new IllegalStateException("must be invoked from an event loop");
}
cancelScheduledTasks(); // 取消調(diào)度任務(wù)
if (gracefulShutdownStartTime == 0) { // 優(yōu)雅關(guān)閉開始時間镊叁,這也是一個標記
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 執(zhí)行完普通任務(wù)或者沒有普通任務(wù)時執(zhí)行完shutdownHook任務(wù)
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
return true; // 調(diào)用shutdown()方法直接退出
}
if (gracefulShutdownQuietPeriod == 0) {
return true; // 優(yōu)雅關(guān)閉靜默時間為0也直接退出
}
wakeup(true); // 優(yōu)雅關(guān)閉但有未執(zhí)行任務(wù),喚醒線程執(zhí)行
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
// shutdown()方法調(diào)用直接返回走触,優(yōu)雅關(guān)閉截止時間到也返回
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
// 在靜默期間每100ms喚醒線程執(zhí)行期間提交的任務(wù)
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// 靜默時間內(nèi)沒有任務(wù)提交晦譬,可以優(yōu)雅關(guān)閉,此時若用戶又提交任務(wù)則不會被執(zhí)行
return true;
}
我們總結(jié)一下互广,調(diào)用shutdown()方法從循環(huán)跳出的條件有:
(1).執(zhí)行完普通任務(wù)
(2).沒有普通任務(wù)敛腌,執(zhí)行完shutdownHook任務(wù)
(3).既沒有普通任務(wù)也沒有shutdownHook任務(wù)
調(diào)用shutdownGracefully()方法從循環(huán)跳出的條件有:
(1).執(zhí)行完普通任務(wù)且靜默時間為0
(2).沒有普通任務(wù),執(zhí)行完shutdownHook任務(wù)且靜默時間為0
(3).靜默期間沒有任務(wù)提交
(4).優(yōu)雅關(guān)閉截止時間已到
注意上面所列的條件之間是或的關(guān)系惫皱,也就是說滿足任意一條就會從EventLoop循環(huán)中跳出像樊。我們可以將靜默時間看為一段觀察期,在此期間如果沒有任務(wù)執(zhí)行旅敷,說明可以跳出循環(huán)生棍;如果此期間有任務(wù)執(zhí)行,執(zhí)行完后立即進入下一個觀察期繼續(xù)觀察媳谁;如果連續(xù)多個觀察期一直有任務(wù)執(zhí)行涂滴,那么截止時間到則跳出循環(huán)。我們看一下shutdownGracefully()的默認參數(shù):
public Future<?> shutdownGracefully() {
return shutdownGracefully(2, 15, TimeUnit.SECONDS);
}
可知晴音,Netty默認的shutdownGracefully()機制為:在2秒的靜默時間內(nèi)如果沒有任務(wù)柔纵,則關(guān)閉;否則15秒截止時間到達時關(guān)閉锤躁。換句話說搁料,在15秒時間段內(nèi),如果有超過2秒的時間段沒有任務(wù)則關(guān)閉系羞。至此郭计,我們明白了從EvnetLoop循環(huán)中跳出的機制,最后椒振,我們抵達終點站:線程結(jié)束機制昭伸。這一部分的代碼實現(xiàn)在線程工廠的生成方法中:
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); // 模板方法,EventLoop實現(xiàn)
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
// 用戶調(diào)用了關(guān)閉的方法或者拋出異常
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break; // 拋出異常也將狀態(tài)置為ST_SHUTTING_DOWN
}
}
if (success && gracefulShutdownStartTime == 0) {
// time=0杠人,說明confirmShutdown()方法沒有調(diào)用勋乾,記錄日志
}
try {
for (;;) {
// 拋出異常時宋下,將普通任務(wù)和shutdownHook任務(wù)執(zhí)行完畢
// 正常關(guān)閉時嗡善,結(jié)合前述的循環(huán)跳出條件
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
// 線程狀態(tài)設(shè)置為ST_TERMINATED,線程終止
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
// 關(guān)閉時学歧,任務(wù)隊列中添加了任務(wù)罩引,記錄日志
}
terminationFuture.setSuccess(null); // 異步結(jié)果設(shè)置為成功
}
}
}
}
});
20-22行代碼說明子類在實現(xiàn)模板方法run()時,須調(diào)用confirmShutdown()方法枝笨,不調(diào)用的話會有錯誤日志袁铐。25-31行的for()循環(huán)主要是對異常情況的處理揭蜒,但同時也兼顧了正常調(diào)用關(guān)閉方法的情況√藿埃可以將拋出異常的情況視為靜默時間為0的shutdownGracefully()方法屉更,這樣便于理解循環(huán)跳出條件。34行代碼cleanup()的默認實現(xiàn)什么也不做洒缀,NioEventLoop覆蓋了基類瑰谜,實現(xiàn)關(guān)閉NioEventLoop持有的selector:
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
}
關(guān)于Netty優(yōu)雅關(guān)閉的機制,還有最后一點細節(jié)树绩,那就是runShutdownHooks()方法:
private boolean runShutdownHooks() {
boolean ran = false;
while (!shutdownHooks.isEmpty()) {
// 使用copy是因為shutdwonHook任務(wù)中可以添加或刪除shutdwonHook任務(wù)
List<Runnable> copy = new ArrayList<Runnable>(shutdownHooks);
shutdownHooks.clear();
for (Runnable task: copy) {
try {
task.run();
} catch (Throwable t) {
logger.warn("Shutdown hook raised an exception.", t);
} finally {
ran = true;
}
}
}
if (ran) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
return ran;
}
此外萨脑,還有threadLock.release()方法,如果你還記得字段定義饺饭,threadLock是一個初始值為0的信號量渤早。一個初值為0的信號量,當(dāng)線程請求鎖時只會阻塞瘫俊,這有什么用呢鹊杖?awaitTermination()方法揭曉答案,用來使其他線程阻塞等待原生線程關(guān)閉 :
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
// 由于tryAcquire()永遠不會成功扛芽,所以必定阻塞timeout時間
if (threadLock.tryAcquire(timeout, unit)) {
threadLock.release();
}
return isTerminated();
}