轉(zhuǎn):https://blog.csdn.net/lz710117239/article/details/77414726
Reactor單線程模型
1.作為NIO服務(wù)端刹衫,接收客戶端的TCP連接
2.作為NIO客戶端诀紊,向服務(wù)端發(fā)起TCP連接
3.讀取通信對(duì)端的請(qǐng)求或者應(yīng)答消
4.向通信對(duì)端發(fā)送消息請(qǐng)求或者應(yīng)答消息
Reactor單線程模型如下圖所示:
由于Reactor模式使用的是異步非阻塞I/O,所有的I/O操作都不會(huì)導(dǎo)致阻塞斥难,理論上一個(gè)線程可以獨(dú)立處理所有I/O相關(guān)的操作。從架構(gòu)層面上看,一個(gè)NIO線程確實(shí)可以完成其承擔(dān)的職責(zé)苞也。例如螺男,通過Acceptor類接收客戶端的TCP連接請(qǐng)求信息棒厘,當(dāng)鏈路建立成功之后,通過Dispatch將對(duì)應(yīng)的ByteBuffer派發(fā)到指定的Handler上下隧,進(jìn)行消息解碼奢人。用戶線程消息編碼后通過NIO線程將消息發(fā)送給客戶端。
在一些小容量的應(yīng)用場景下淆院,可以使用單線程模型何乎。但是這對(duì)于高負(fù)載,大并發(fā)的應(yīng)用場景卻不合適,主要原因如下土辩。
1.一個(gè)NIO線程同時(shí)處理成百上千的鏈路支救,性能上無法支撐,即便NIO線程的CPU負(fù)荷達(dá)到100%拷淘,也無法滿足海量消息的編碼各墨,解碼,讀取和發(fā)送启涯。
2.當(dāng)NIO線程負(fù)載過重之后欲主,處理速度將變慢,這會(huì)導(dǎo)致大量客戶端連接超時(shí)逝嚎,超時(shí)之后往往會(huì)進(jìn)行重發(fā)扁瓢,這更加NIO線程的負(fù)載,最終會(huì)導(dǎo)致大量消息擠壓和處理超時(shí)补君,成為系統(tǒng)的性能瓶頸引几。
3.可靠性問題:一旦NIO線程意外跑飛,或者進(jìn)入死循環(huán)挽铁,會(huì)導(dǎo)致整個(gè)系統(tǒng)通信模塊不可用伟桅,不能接收和處理外部消息,造成節(jié)點(diǎn)故障叽掘。
Reactor多線程模型
Reactor多線程模型與單線程模型最大的卻別就是有一組NIO線程來處理I/O操作楣铁,它的原理如圖所示:
Reactor多線程模型的特點(diǎn)如下所示:
1.有專門一個(gè)NIO線程——Acceptor線程用于監(jiān)聽服務(wù)端,接收客戶端的TCP連接請(qǐng)求更扁。
2.網(wǎng)絡(luò)I/O操作——讀,寫等由一個(gè)NIO線程負(fù)責(zé)盖腕,線程池可以采用標(biāo)準(zhǔn)的JDK線程池實(shí)現(xiàn)赫冬,它包含一個(gè)任務(wù)隊(duì)列和N個(gè)可用的線程,由這些NIO線程負(fù)責(zé)消息的讀取溃列,解碼劲厌,編碼和發(fā)送。
3.一個(gè)NIO線程可以同時(shí)處理N條鏈路听隐,但是一個(gè)鏈路只對(duì)應(yīng)一個(gè)NIO線程补鼻,防止發(fā)生并發(fā)操作問題。
在大多數(shù)場景下雅任,Reactor多線程模型可以滿足性能需求风范。但是,在個(gè)別特殊場景中沪么,一個(gè)NIO線程負(fù)責(zé)監(jiān)聽和處理所有的客戶端鏈接可能會(huì)存在性能問題乌企。例如并發(fā)百萬客戶端連接,或者服務(wù)端需要對(duì)客戶端握手進(jìn)行安全認(rèn)證成玫,但是認(rèn)證本身非常損耗性能。在這類場景下拳喻,單獨(dú)一個(gè)Acceptor線程可能會(huì)存在性能不足的問題哭当,為了解決性能問題,產(chǎn)生了第三種Reactor線程模型——主從Reactor多線程模型冗澈。
主從Reactor多線程模型
主從Reactor線程模型的特點(diǎn)是:服務(wù)端用于接收客戶端鏈接的不在是一個(gè)單獨(dú)的NIO線程钦勘,而是一個(gè)獨(dú)立的NIO線程池。Acceptor接收到客戶端TCP鏈接請(qǐng)求并處理完成后(可能包含接入認(rèn)證等)亚亲,將新創(chuàng)建的SocketChannel注冊(cè)到I/O線程池(sub reacotr線程池)的某個(gè)I/O線程上彻采,由它負(fù)責(zé)SocketChannel的讀寫和編碼工作。Acceptor線程池僅僅用于客戶端登錄捌归,握手肛响,和安全認(rèn)證,一旦鏈路建立成功惜索,就將鏈路注冊(cè)到后端subReactor線程池的I/O線程上特笋,有I/O線程負(fù)責(zé)后續(xù)的I/O操作。
利用主從NIO線程模型巾兆,可以解決一個(gè)服務(wù)端監(jiān)聽線程無法有效處理所有客戶端連接的性能不足問題猎物。因此,在Netty的官方Demo中角塑,推薦使用該線程模型蔫磨。
Netty的線程模型
Netty的線程模型并不是一成不變的,它實(shí)際取決于用戶的啟動(dòng)參數(shù)配置圃伶。通過設(shè)置不同的啟動(dòng)參數(shù)堤如,Netty可以同時(shí)支持Reactor單線程模型蒲列,多線程模型和主從Reactor多線程模型。如下圖:
通過服務(wù)端啟動(dòng)代碼來了解它的線程模型:
//配置服務(wù)端的NIO線程組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//綁定端口煤惩,同步等待成功
ChannelFuture f = b.bind(port).sync();
//等待服務(wù)端監(jiān)聽端口關(guān)閉
f.channel().closeFuture().sync();
}finally {
//優(yōu)雅退出嫉嘀,釋放線程池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
服務(wù)端啟動(dòng)的時(shí)候,創(chuàng)建了兩個(gè)NioEventLoopGroup魄揉,它們實(shí)際是兩個(gè)獨(dú)立的Reactor線程池剪侮。一個(gè)用于接收客戶端的TCP連接,另一個(gè)用于處理I/O相關(guān)的讀寫操作洛退,或者執(zhí)行系統(tǒng)Task瓣俯,定時(shí)任務(wù)Task等。
Netty用于接收客戶端請(qǐng)求的線程池職責(zé)如下兵怯。
(1)接收客戶端TCP連接欸彩匕,初始化Channel參數(shù);
(2)將鏈路狀態(tài)變更事件通知給ChannelPipeline媒区。
Netty處理I/O操作的Reactor線程池職責(zé)如下驼仪。
(1)異步讀取通信對(duì)端的數(shù)據(jù)報(bào),發(fā)送讀事件到ChannelPipeline袜漩;
(2)異步發(fā)送消息到通信對(duì)端绪爸,調(diào)用ChannelPipeline的消息發(fā)送接口;
(3)執(zhí)行系統(tǒng)調(diào)用Task宙攻;
(4)執(zhí)行定時(shí)任務(wù)Task奠货,例如鏈路空閑狀態(tài)檢測定時(shí)任務(wù)。
通過調(diào)整線程池的線程個(gè)數(shù)座掘,是否共享線程池等方式递惋,Netty的Reactor線程模型可以在單線程,多線程和主從多線程間切換溢陪,這種靈活的配置方式可以最大程度地滿足不同用戶的個(gè)性化定制萍虽。
為了盡可能地提升性能,Netty在很多地方進(jìn)行了無鎖化的設(shè)計(jì)形真,例如在I/O線程內(nèi)部進(jìn)行穿行操作贩挣,避免多線程競爭導(dǎo)致的性能下降問題。表面上看没酣,串行化似乎CPU利用率不高王财,并發(fā)程度不夠。但是裕便,通過調(diào)整NIO線程池的線程參數(shù)绒净,可以同hi啟動(dòng)多個(gè)串行化的線程并行運(yùn)行,這種局部無鎖化的串行線程設(shè)計(jì)相比一個(gè)隊(duì)列——多個(gè)工作線程的模型性能更好偿衰。
它的設(shè)計(jì)原理如下如所示:
Netty的NioEventLoop讀取到消息之后挂疆,直接調(diào)用ChannelPipeline的fireChannelRead(Object msg)改览。只要用戶不主動(dòng)切換線程,一直都是由NioEventLoop調(diào)用用戶的handler缤言,期間不進(jìn)行線程切換宝当。這種串行化處理方式避免了多線程操作導(dǎo)致的鎖的競爭,從性能角度看是最優(yōu)的胆萧。
NioEventLoop源碼分析
NioEventLoop設(shè)計(jì)原理
Netty的NioEventLoop并不是要給純粹的I/O線程庆揩,它除了負(fù)責(zé)I/O的讀寫之外,還兼顧處理以下兩類任務(wù)跌穗。
系統(tǒng)Task:通過調(diào)用NioeventLoop的execute(Runnable task)方法實(shí)現(xiàn)订晌,Netty有很多系統(tǒng)Task,創(chuàng)建它們的主要原因是:當(dāng)I/O線程和用戶線程同時(shí)操作網(wǎng)絡(luò)資源時(shí)蚌吸,為了防止并發(fā)操作導(dǎo)致的鎖競爭锈拨,將用戶線程的操作封裝成Task放入消息隊(duì)列中,由I/O線程負(fù)責(zé)執(zhí)行羹唠,這樣就實(shí)現(xiàn)了局部無鎖化奕枢。
定時(shí)任務(wù):調(diào)用NioEventLoop的schedule(Runnable command,long delay,TimeUnit unit)方法實(shí)現(xiàn)。
正是因?yàn)镹ioEventLoop具備多種職責(zé)佩微,所以它的實(shí)現(xiàn)比較特殊缝彬,它并不是簡單的Runnable,我們來看下它們的繼承關(guān)系喊衫。如下:
它實(shí)現(xiàn)了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口杆怕,正是因?yàn)檫@種設(shè)計(jì)族购,導(dǎo)致NioEventLoop和其父類功能實(shí)現(xiàn)非常復(fù)雜。下面我們重點(diǎn)分析下它的源碼實(shí)現(xiàn):
NioEventLoop
作為NIO框架的Reactor線程陵珍,NioEventLoop需要處理網(wǎng)絡(luò)I/O讀寫事件寝杖,因此它必須聚合一個(gè)多路復(fù)用器對(duì)象,看下它的selector定義:
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
直接調(diào)用Selector.open()就能創(chuàng)建并打開一個(gè)新的Selector互纯。
然后通過反射對(duì)selectedKeys進(jìn)行優(yōu)化:
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
// Ensure the current selector implementation is what we can instrument.
if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
return selector;
}
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
上面代碼瑟幕,如果開啟了selectedKeys優(yōu)化功能,通過反射的方法從Selector實(shí)例中獲取selectedKeys和publicSelectedKeys將上述兩個(gè)成員變量設(shè)置為可寫留潦,通過反射的方式使用Netty構(gòu)造的selectedKeys包裝類selectedKeySet將原JDK的selectedKeys替換掉只盹。默認(rèn)為開啟優(yōu)化功能。
下面看下run()方法的實(shí)現(xiàn):
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
Selector的selectNow()方法會(huì)立即觸發(fā)Selector的選擇操作兔院,如果有準(zhǔn)備就緒的Channel殖卑,則返回就緒的Channel集合。選擇完成后在此判斷用戶是否調(diào)用了Selector的wakeup方法坊萝,如果調(diào)用孵稽,執(zhí)行selector.wakeup()操作许起。下面我們返回到run()方法,繼續(xù)分析代碼菩鲜。如果消息隊(duì)列中沒有消息需要處理园细,則執(zhí)行select()方法,有Selector多路復(fù)用器輪詢接校,看是否有準(zhǔn)備就緒的channel猛频。它的實(shí)現(xiàn)如下:
取當(dāng)前系統(tǒng)的納秒時(shí)間,調(diào)用delayNanos()方法計(jì)算獲得NioEventLoop中定時(shí)任務(wù)的觸發(fā)時(shí)間馅笙。
private void select() throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
break;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding selector.",
selectCnt);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
計(jì)算下一個(gè)將要觸發(fā)的定時(shí)任務(wù)的剩余超時(shí)時(shí)間伦乔,將它轉(zhuǎn)換為毫秒,為超時(shí)時(shí)間增加0.5毫秒的調(diào)整值董习。對(duì)剩余的超時(shí)時(shí)間進(jìn)行判斷烈和,如果需要立即執(zhí)行或者已經(jīng)超時(shí),則調(diào)用selector.selectNow()進(jìn)行輪詢操作皿淋,將selectCnt設(shè)置為1招刹,并退出當(dāng)前循環(huán)。
將定時(shí)任務(wù)剩余的超時(shí)時(shí)間作為參數(shù)進(jìn)行select操作窝趣,沒完成一次select操作疯暑,對(duì)select計(jì)數(shù)器selectCnt加1.
Select操作完成之后,需要對(duì)結(jié)果進(jìn)行判斷哑舒,如果存在下列任意一種情況妇拯,則退出當(dāng)前循環(huán)。
1.有Channel處于就緒狀態(tài)洗鸵,selectedKeys不為0越锈,說明有讀寫事件需要處理。
2.oldWakenUp為true膘滨;
3.系統(tǒng)或者用戶調(diào)用了wakeup操作甘凭,喚醒當(dāng)前的多路復(fù)用器;
4.消息隊(duì)列中有新的任務(wù)需要處理火邓。
如果本次Selector的輪詢結(jié)果為空丹弱,也沒有wakeup操作或者是新的消息需要處理,則說明是個(gè)空輪詢铲咨,有可能觸發(fā)了JDK的epoll bug躲胳,它會(huì)導(dǎo)致Selector的空輪詢,使I/O線程一致處于100%狀態(tài)纤勒。這個(gè)問題一直到JDK1.8才得到解決泛鸟。
解決方式就是上面代碼最下面的那層if語句。
(1)對(duì)Selector的select周期進(jìn)行統(tǒng)計(jì)踊东;
(2)每完成一次select操作進(jìn)行一次計(jì)數(shù)北滥;
(3)當(dāng)select的操作達(dá)到一定次數(shù)后刚操,rebuildSelector()重新輪詢。
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, a);
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
通過銷毀舊的再芋、有問題的多路復(fù)用器菊霜,使用新建的Selector就可以解決空輪詢Selector導(dǎo)致的I/O線程CPU占用100%的問題。
如果輪詢到了處于就緒狀態(tài)的SocketChannel济赎,則需要處理網(wǎng)絡(luò)I/O事件鉴逞,相關(guān)代碼如下:
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
由于默認(rèn)開啟了selectedKeys的優(yōu)化功能,所以會(huì)進(jìn)入processSelectedKeysOptimized分支執(zhí)行司训。進(jìn)入該方法构捡,如果有需要處理的channel則進(jìn)入processSelectedKey方法中,處理I/O事件壳猜,其代碼如下:
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
首先從NioServerSocketChannel或者NioSocketChannel中獲取其內(nèi)部類Unsafe勾徽,判斷當(dāng)前選擇鍵是否可用,如果不可用统扳,調(diào)用Unsafe的close()方法喘帚,釋放連接資源。
如果選擇鍵可用咒钟,則繼續(xù)對(duì)網(wǎng)絡(luò)操作位進(jìn)行判斷吹由,代碼如下:
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
如果是讀或者連接操作,則調(diào)用Unsafe的read方法朱嘴。此處Unsafe的實(shí)現(xiàn)是個(gè)多態(tài)倾鲫,對(duì)于NioServerSocketChannel,它的讀操作就是接收客戶端的TCP連接萍嬉,相關(guān)代碼如下:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
對(duì)于NioSocketChannel乌昔,它的讀操作就是從SocketChannel中讀取ByteBuffer,相關(guān)代碼如下:
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
如果網(wǎng)絡(luò)操作位為寫帚湘,則說明有半包消息尚未發(fā)送完成玫荣,需要繼續(xù)調(diào)用flush方法進(jìn)行發(fā)送甚淡,相關(guān)代碼如下:
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
如果網(wǎng)絡(luò)操作位為連接狀態(tài)大诸,則需要對(duì)連接結(jié)果進(jìn)行判讀,如下:
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
需要注意的是贯卦,在進(jìn)行finishConnect之前资柔,需要將網(wǎng)絡(luò)操作位進(jìn)行修改,注銷掉SelectionKey.OP_CONNECT撵割。
處理完I/O事件后贿堰,NioEventLoop需要執(zhí)行非I/O操作的系統(tǒng)Task和定時(shí)任務(wù),代碼如下:
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
由于NioEventLoop同時(shí)處理I/O事件和非I/O事件啡彬,Netty提供了兩者的比例羹与。
Task的執(zhí)行時(shí)間根據(jù)本次I/O的執(zhí)行時(shí)間得到故硅,方法如下
fetchFromDelayedQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
首先從定時(shí)任務(wù)消息隊(duì)列中彈出消息技能型處理,如果消息隊(duì)列為空纵搁,則退出循環(huán)吃衅。根據(jù)當(dāng)前的時(shí)間戳進(jìn)行判斷,如果該定時(shí)任務(wù)已經(jīng)或者正處于超時(shí)狀態(tài)腾誉,則將其加入到執(zhí)行TaskQueue中徘层,同時(shí)從延時(shí)隊(duì)列中刪除。定時(shí)任務(wù)如果沒有超時(shí)利职,說明本循環(huán)不需要處理趣效,直接退出即可,如下:
private void fetchFromDelayedQueue() {
long nanoTime = 0L;
for (;;) {
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
if (delayedTask == null) {
break;
}
if (nanoTime == 0L) {
nanoTime = ScheduledFutureTask.nanoTime();
}
if (delayedTask.deadlineNanos() <= nanoTime) {
delayedTaskQueue.remove();
taskQueue.add(delayedTask);
} else {
break;
}
}
}
執(zhí)行Task Queue中原有的任務(wù)和從延時(shí)隊(duì)列中復(fù)制的已經(jīng)超時(shí)或者正處于超時(shí)狀態(tài)的定時(shí)任務(wù)猪贪,
由于獲取系統(tǒng)納秒時(shí)間是個(gè)耗時(shí)的操作跷敬,每次循環(huán)都獲取當(dāng)前系統(tǒng)納秒時(shí)間進(jìn)行超時(shí)判斷會(huì)降低性能。為了提升性能哮伟,每執(zhí)行60次循環(huán)判斷一次干花,如果當(dāng)前系統(tǒng)已經(jīng)到了分配給非I/O操作的超時(shí)時(shí)間,則退出循環(huán)楞黄。這是為了防止由于I/O操作過多導(dǎo)致I/O操作被長時(shí)間阻塞池凄。
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
最后判斷系統(tǒng)是否進(jìn)入優(yōu)雅停機(jī)狀態(tài),如果處于關(guān)閉狀態(tài)鬼廓,則需要調(diào)用closeAll方法釋放資源肿仑,并讓NioEventLoop線程退出循環(huán),結(jié)束運(yùn)行碎税。關(guān)閉方法就在NioEventLoop的runAllTasks之后尤慰,進(jìn)入其中,如下:
private void closeAll() {
selectAgain();
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
for (AbstractNioChannel ch: channels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
遍歷所有的channel雷蹂,調(diào)用它的Unsafe.close()方法關(guān)閉所有鏈路伟端,釋放線程池,ChannelPipeline和ChannelHandler等資源匪煌。