本系列Netty源碼解析文章基于 4.1.56.Final版本
對(duì)于一個(gè)高性能網(wǎng)絡(luò)通訊框架來(lái)說(shuō)馋吗,最最重要也是最核心的工作就是如何高效的接收客戶(hù)端連接镊折,這就好比我們開(kāi)了一個(gè)飯店黔衡,那么迎接客人就是飯店最重要的工作,我們要先把客人迎接進(jìn)來(lái)腌乡,不能讓客人一看人多就走掉,只要客人進(jìn)來(lái)了夜牡,哪怕菜做的慢一點(diǎn)也沒(méi)關(guān)系与纽。
本文筆者就來(lái)為大家介紹下netty這塊最核心的內(nèi)容,看看netty是如何高效的接收客戶(hù)端連接的塘装。
下圖為筆者在一個(gè)月黑風(fēng)高天空顯得那么深邃遙遠(yuǎn)的夜晚急迂,閑來(lái)無(wú)事,于是捧起Netty關(guān)于如何接收連接這部分源碼細(xì)細(xì)品讀的時(shí)候蹦肴,意外的發(fā)現(xiàn)了一個(gè)影響Netty接收連接吞吐的一個(gè)Bug僚碎。
于是筆者就在Github提了一個(gè)Issue#11708,闡述了下這個(gè)Bug產(chǎn)生的原因以及導(dǎo)致的結(jié)果并和Netty的作者一起討論了下修復(fù)措施阴幌。如上圖所示勺阐。
Issue#11708:https://github.com/netty/netty/issues/11708
這里先不詳細(xì)解釋這個(gè)Issue,也不建議大家現(xiàn)在就打開(kāi)這個(gè)Issue查看矛双,筆者會(huì)在本文的介紹中隨著源碼深入的解讀慢慢的為大家一層一層地?fù)荛_(kāi)迷霧渊抽。
之所以在文章的開(kāi)頭把這個(gè)拎出來(lái),筆者是想讓大家?guī)е鴳岩梢楹觯瑢徱暲撩疲蕾p,崇敬栈幸,敬畏的態(tài)度來(lái)一起品讀世界頂級(jí)程序員編寫(xiě)的代碼愤估。由衷的感謝他們?cè)谶@一領(lǐng)域做出的貢獻(xiàn)。
好了速址,問(wèn)題拋出來(lái)后玩焰,我們就帶著這個(gè)疑問(wèn)來(lái)開(kāi)始本文的內(nèi)容吧~~~
前文回顧
按照老規(guī)矩,再開(kāi)始本文的內(nèi)容之前壳繁,我們先來(lái)回顧下前邊幾篇文章的概要內(nèi)容幫助大家梳理一個(gè)框架全貌出來(lái)震捣。
筆者這里再次想和讀者朋友們強(qiáng)調(diào)的是本文可以獨(dú)立觀看,并不依賴(lài)前邊系列文章的內(nèi)容闹炉,只是大家如果對(duì)相關(guān)細(xì)節(jié)部分感興趣的話蒿赢,可以在閱讀完本文之后在去回看相關(guān)文章。
在前邊的系列文章中渣触,筆者為大家介紹了驅(qū)動(dòng)Netty整個(gè)框架運(yùn)轉(zhuǎn)的核心引擎Reactor的創(chuàng)建羡棵,啟動(dòng),運(yùn)行的全流程嗅钻。從現(xiàn)在開(kāi)始Netty的整個(gè)核心框架就開(kāi)始運(yùn)轉(zhuǎn)起來(lái)開(kāi)始工作了皂冰,本文要介紹的主要內(nèi)容就是Netty在啟動(dòng)之后要做的第一件事件:監(jiān)聽(tīng)端口地址店展,高效接收客戶(hù)端連接。
在《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》一文中秃流,我們是從整個(gè)網(wǎng)絡(luò)框架的基石IO模型的角度整體闡述了下Netty的IO線程模型赂蕴。
而Netty中的Reactor正是IO線程在Netty中的模型定義。Reactor在Netty中是以Group的形式出現(xiàn)的舶胀,分為:
主Reactor線程組也就是我們?cè)趩?dòng)代碼中配置的
EventLoopGroup bossGroup
,main reactor group中的reactor主要負(fù)責(zé)監(jiān)聽(tīng)客戶(hù)端連接事件概说,高效的處理客戶(hù)端連接。也是本文我們要介紹的重點(diǎn)嚣伐。從Reactor線程組也就是我們?cè)趩?dòng)代碼中配置的
EventLoopGroup workerGroup
糖赔,sub reactor group中的reactor主要負(fù)責(zé)處理客戶(hù)端連接上的IO事件,以及異步任務(wù)的執(zhí)行轩端。
最后我們得出Netty的整個(gè)IO模型如下:
本文我們討論的重點(diǎn)就是MainReactorGroup的核心工作上圖中所示的步驟1放典,步驟2,步驟3基茵。
在從整體上介紹完Netty的IO模型之后奋构,我們又在《Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》中完整的介紹了Netty框架的骨架主從Reactor組的搭建過(guò)程,闡述了Reactor是如何被創(chuàng)建出來(lái)的拱层,并介紹了它的核心組件如下圖所示:
thread
即為Reactor中的IO線程声怔,主要負(fù)責(zé)監(jiān)聽(tīng)I(yíng)O事件,處理IO任務(wù)舱呻,執(zhí)行異步任務(wù)醋火。selector
則是JDK NIO對(duì)操作系統(tǒng)底層IO多路復(fù)用技術(shù)實(shí)現(xiàn)的封裝。用于監(jiān)聽(tīng)I(yíng)O就緒事件箱吕。taskQueue
用于保存Reactor需要執(zhí)行的異步任務(wù)芥驳,這些異步任務(wù)可以由用戶(hù)在業(yè)務(wù)線程中向Reactor提交,也可以是Netty框架提交的一些自身核心的任務(wù)茬高。scheduledTaskQueue
則是保存Reactor中執(zhí)行的定時(shí)任務(wù)兆旬。代替了原有的時(shí)間輪來(lái)執(zhí)行延時(shí)任務(wù)。tailQueue
保存了在Reactor需要執(zhí)行的一些尾部收尾任務(wù)怎栽,在普通任務(wù)執(zhí)行完后 Reactor線程會(huì)執(zhí)行尾部任務(wù)丽猬,比如對(duì)Netty 的運(yùn)行狀態(tài)做一些統(tǒng)計(jì)數(shù)據(jù),例如任務(wù)循環(huán)的耗時(shí)熏瞄、占用物理內(nèi)存的大小等等
在骨架搭建完畢之后脚祟,我們隨后又在在《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》》一文中介紹了本文的主角服務(wù)端NioServerSocketChannel的創(chuàng)建,初始化强饮,綁定端口地址由桌,向main reactor注冊(cè)監(jiān)聽(tīng)OP_ACCEPT事件
的完整過(guò)程。
main reactor如何處理OP_ACCEPT事件將會(huì)是本文的主要內(nèi)容。
自此Netty框架的main reactor group已經(jīng)啟動(dòng)完畢行您,開(kāi)始準(zhǔn)備監(jiān)聽(tīng)OP_accept事件铭乾,當(dāng)客戶(hù)端連接上來(lái)之后,OP_ACCEPT事件活躍娃循,main reactor開(kāi)始處理OP_ACCEPT事件接收客戶(hù)端連接了炕檩。
而netty中的IO事件分為:OP_ACCEPT事件,OP_READ事件捌斧,OP_WRITE事件和OP_CONNECT事件捧书,netty對(duì)于IO事件的監(jiān)聽(tīng)和處理統(tǒng)一封裝在Reactor模型中,這四個(gè)IO事件的處理過(guò)程也是我們后續(xù)文章中要單獨(dú)拿出來(lái)介紹的骤星,本文我們聚焦OP_ACCEPT事件的處理。
而為了讓大家能夠?qū)O事件的處理有一個(gè)完整性的認(rèn)識(shí)爆哑,筆者寫(xiě)了《一文聊透Netty核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu)》這篇文章洞难,在文章中詳細(xì)介紹了Reactor線程的整體運(yùn)行框架。
Reactor線程會(huì)在一個(gè)死循環(huán)中996不停的運(yùn)轉(zhuǎn)揭朝,在循環(huán)中會(huì)不斷的輪詢(xún)監(jiān)聽(tīng)Selector上的IO事件队贱,當(dāng)IO事件活躍后,Reactor從Selector上被喚醒轉(zhuǎn)去執(zhí)行IO就緒事件的處理潭袱,在這個(gè)過(guò)程中我們引出了上述四種IO事件的處理入口函數(shù)柱嫌。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//獲取Channel的底層操作類(lèi)Unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
......如果SelectionKey已經(jīng)失效則關(guān)閉對(duì)應(yīng)的Channel......
}
try {
//獲取IO就緒事件
int readyOps = k.readyOps();
//處理Connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
//移除對(duì)Connect事件的監(jiān)聽(tīng),否則Selector會(huì)一直通知
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
//觸發(fā)channelActive事件處理Connect事件
unsafe.finishConnect();
}
//處理Write事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//處理Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
本文筆者將會(huì)為大家重點(diǎn)介紹OP_ACCEPT事件
的處理入口函數(shù)unsafe.read()
的整個(gè)源碼實(shí)現(xiàn)屯换。
當(dāng)客戶(hù)端連接完成三次握手之后编丘,main reactor中的selector產(chǎn)生OP_ACCEPT事件
活躍,main reactor隨即被喚醒彤悔,來(lái)到了OP_ACCEPT事件
的處理入口函數(shù)開(kāi)始接收客戶(hù)端連接嘉抓。
1. Main Reactor處理OP_ACCEPT事件
當(dāng)Main Reactor
輪詢(xún)到NioServerSocketChannel
上的OP_ACCEPT事件
就緒時(shí),Main Reactor線程就會(huì)從JDK Selector
上的阻塞輪詢(xún)APIselector.select(timeoutMillis)
調(diào)用中返回晕窑。轉(zhuǎn)而去處理NioServerSocketChannel
上的OP_ACCEPT事件
抑片。
public final class NioEventLoop extends SingleThreadEventLoop {
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
..............省略.................
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
..............處理OP_CONNECT事件.................
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
..............處理OP_WRITE事件.................
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//本文重點(diǎn)處理OP_ACCEPT事件
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
}
處理IO就緒事件的入口函數(shù)
processSelectedKey
中的參數(shù)AbstractNioChannel ch
正是Netty服務(wù)端NioServerSocketChannel
。因?yàn)榇藭r(shí)的執(zhí)行線程為main reactor線程杨赤,而main reactor上注冊(cè)的正是netty服務(wù)端NioServerSocketChannel負(fù)責(zé)監(jiān)聽(tīng)端口地址敞斋,接收客戶(hù)端連接。通過(guò)
ch.unsafe()
獲取到的NioUnsafe操作類(lèi)正是NioServerSocketChannel中對(duì)底層JDK NIO ServerSocketChannel的Unsafe底層操作類(lèi)疾牲。
Unsafe接口
是Netty對(duì)Channel底層操作行為的封裝植捎,比如NioServerSocketChannel的底層Unsafe操作類(lèi)干的事情就是綁定端口地址
,處理OP_ACCEPT事件
阳柔。
這里我們看到鸥跟,Netty將OP_ACCEPT事件
處理的入口函數(shù)封裝在NioServerSocketChannel
里的底層操作類(lèi)Unsafe的read
方法中。
而NioServerSocketChannel中的Unsafe操作類(lèi)實(shí)現(xiàn)類(lèi)型為NioMessageUnsafe
定義在上圖繼承結(jié)構(gòu)中的AbstractNioMessageChannel父類(lèi)中
。
下面我們到NioMessageUnsafe#read
方法中來(lái)看下Netty對(duì)OP_ACCPET事件
的具體處理過(guò)程:
2. 接收客戶(hù)端連接核心流程框架總覽
我們還是按照老規(guī)矩医咨,先從整體上把整個(gè)OP_ACCEPT事件的邏輯處理框架提取出來(lái)步清,讓大家先總體俯視下流程全貌,然后在針對(duì)每個(gè)核心點(diǎn)位進(jìn)行各個(gè)擊破叼屠。
main reactor線程是在一個(gè)do...while{...}
循環(huán)read loop中不斷的調(diào)用JDK NIO serverSocketChannel.accept()
方法來(lái)接收完成三次握手的客戶(hù)端連接NioSocketChannel
的弹澎,并將接收到的客戶(hù)端連接NioSocketChannel臨時(shí)保存在List<Object> readBuf
集合中,后續(xù)會(huì)服務(wù)端NioServerSocketChannel的pipeline中通過(guò)ChannelRead事件來(lái)傳遞很泊,最終會(huì)在ServerBootstrapAcceptor這個(gè)ChannelHandler中被處理初始化角虫,并將其注冊(cè)到Sub Reator Group中。
這里的read loop循環(huán)會(huì)被限定只能讀取16次委造,當(dāng)main reactor從NioServerSocketChannel中讀取客戶(hù)端連接NioSocketChannel的次數(shù)達(dá)到16次之后戳鹅,無(wú)論此時(shí)是否還有客戶(hù)端連接都不能在繼續(xù)讀取了。
因?yàn)槲覀冊(cè)?a target="_blank">《一文聊透Netty核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu)》一文中提到昏兆,netty對(duì)reactor線程壓榨的比較狠枫虏,要干的事情很多,除了要監(jiān)聽(tīng)輪詢(xún)IO就緒事件爬虱,處理IO就緒事件隶债,還需要執(zhí)行用戶(hù)和netty框架本省提交的異步任務(wù)和定時(shí)任務(wù)。
所以這里的main reactor線程不能在read loop中無(wú)限制的執(zhí)行下去跑筝,因?yàn)檫€需要分配時(shí)間去執(zhí)行異步任務(wù)死讹,不能因?yàn)闊o(wú)限制的接收客戶(hù)端連接而耽誤了異步任務(wù)的執(zhí)行。所以這里將read loop的循環(huán)次數(shù)限定為16次曲梗。
如果main reactor線程在read loop中讀取客戶(hù)端連接NioSocketChannel的次數(shù)已經(jīng)滿(mǎn)了16次赞警,即使此時(shí)還有客戶(hù)端連接未接收,那么main reactor線程也不會(huì)再去接收了虏两,而是轉(zhuǎn)去執(zhí)行異步任務(wù)仅颇,當(dāng)異步任務(wù)執(zhí)行完畢后,還會(huì)在回來(lái)執(zhí)行剩余接收連接的任務(wù)碘举。
main reactor線程退出read loop循環(huán)的條件有兩個(gè):
在限定的16次讀取中忘瓦,已經(jīng)沒(méi)有新的客戶(hù)端連接要接收了。退出循環(huán)引颈。
從NioServerSocketChannel中讀取客戶(hù)端連接的次數(shù)達(dá)到了16次耕皮,無(wú)論此時(shí)是否還有客戶(hù)端連接都需要退出循環(huán)。
以上就是Netty在接收客戶(hù)端連接時(shí)的整體核心邏輯蝙场,下面筆者將這部分邏輯的核心源碼實(shí)現(xiàn)框架提取出來(lái)凌停,方便大家根據(jù)上述核心邏輯與源碼中的處理模塊對(duì)應(yīng)起來(lái),還是那句話售滤,這里只需要總體把握核心處理流程罚拟,不需要讀懂每一行代碼台诗,筆者會(huì)在文章的后邊分模塊來(lái)各個(gè)擊破它們。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放連接建立后赐俗,創(chuàng)建的客戶(hù)端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//必須在Main Reactor線程中執(zhí)行
assert eventLoop().inEventLoop();
//注意下面的config和pipeline都是服務(wù)端ServerSocketChannel中的
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//創(chuàng)建接收數(shù)據(jù)Buffer分配器(用于分配容量大小合適的byteBuffer用來(lái)容納接收數(shù)據(jù))
//在接收連接的場(chǎng)景中拉队,這里的allocHandle只是用于控制read loop的循環(huán)讀取創(chuàng)建連接的次數(shù)。
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
//底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
int localRead = doReadMessages(readBuf);
//已無(wú)新的連接可接收則退出read loop
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統(tǒng)計(jì)在當(dāng)前事件循環(huán)中已經(jīng)讀取到得Message數(shù)量(創(chuàng)建連接的個(gè)數(shù))
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());//判斷是否已經(jīng)讀滿(mǎn)16次
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//在NioServerSocketChannel對(duì)應(yīng)的pipeline中傳播ChannelRead事件
//初始化客戶(hù)端SocketChannel阻逮,并將其綁定到Sub Reactor線程組中的一個(gè)Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
//清除本次accept 創(chuàng)建的客戶(hù)端SocketChannel集合
readBuf.clear();
allocHandle.readComplete();
//觸發(fā)readComplete事件傳播
pipeline.fireChannelReadComplete();
....................省略............
} finally {
....................省略............
}
}
}
}
}
這里首先要通過(guò)斷言assert eventLoop().inEventLoop()
確保處理接收客戶(hù)端連接的線程必須為Main Reactor 線程粱快。
而main reactor中主要注冊(cè)的是服務(wù)端NioServerSocketChannel,主要負(fù)責(zé)處理OP_ACCEPT事件
叔扼,所以當(dāng)前main reactor線程是在NioServerSocketChannel中執(zhí)行接收連接的工作事哭。
所以這里我們通過(guò)config()
獲取到的是NioServerSocketChannel的屬性配置類(lèi)NioServerSocketChannelConfig
,它是在Reactor的啟動(dòng)階段被創(chuàng)建出來(lái)的。
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類(lèi)AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監(jiān)聽(tīng)的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設(shè)置用于Channel接收數(shù)據(jù)用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
同理這里通過(guò)pipeline()
獲取到的也是NioServerSocketChannel中的pipeline
瓜富。它會(huì)在NioServerSocketChannel向main reactor注冊(cè)成功之后被初始化鳍咱。
前邊提到main reactor線程會(huì)被限定只能在read loop中向NioServerSocketChannel讀取16次客戶(hù)端連接,所以在開(kāi)始read loop之前与柑,我們需要?jiǎng)?chuàng)建一個(gè)能夠保存記錄讀取次數(shù)的對(duì)象谤辜,在每次read loop循環(huán)之后,可以根據(jù)這個(gè)對(duì)象來(lái)判斷是否結(jié)束read loop仅胞。
這個(gè)對(duì)象就是這里的 RecvByteBufAllocator.Handle allocHandle
專(zhuān)門(mén)用于統(tǒng)計(jì)read loop中接收客戶(hù)端連接的次數(shù),以及判斷是否該結(jié)束read loop轉(zhuǎn)去執(zhí)行異步任務(wù)剑辫。
當(dāng)這一切準(zhǔn)備就緒之后干旧,main reactor線程就開(kāi)始在do{....}while(...)
循環(huán)中接收客戶(hù)端連接了。
在 read loop中通過(guò)調(diào)用doReadMessages函數(shù)
接收完成三次握手的客戶(hù)端連接妹蔽,底層會(huì)調(diào)用到JDK NIO ServerSocketChannel的accept方法椎眯,從內(nèi)核全連接隊(duì)列中取出客戶(hù)端連接。
返回值localRead
表示接收到了多少客戶(hù)端連接胳岂,客戶(hù)端連接通過(guò)accept方法只會(huì)一個(gè)一個(gè)的接收编整,所以這里的localRead
正常情況下都會(huì)返回1
,當(dāng)localRead <= 0
時(shí)意味著已經(jīng)沒(méi)有新的客戶(hù)端連接可以接收了乳丰,本次main reactor接收客戶(hù)端的任務(wù)到這里就結(jié)束了掌测,跳出read loop。開(kāi)始新的一輪IO事件的監(jiān)聽(tīng)處理产园。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
隨后會(huì)將接收到的客戶(hù)端連接占時(shí)存放到List<Object> readBuf
集合中汞斧。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//存放連接建立后,創(chuàng)建的客戶(hù)端SocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
}
調(diào)用allocHandle.incMessagesRead
統(tǒng)計(jì)本次事件循環(huán)中接收到的客戶(hù)端連接個(gè)數(shù)什燕,最后在read loop末尾通過(guò)allocHandle.continueReading
判斷是否達(dá)到了限定的16次粘勒。從而決定main reactor線程是繼續(xù)接收客戶(hù)端連接還是轉(zhuǎn)去執(zhí)行異步任務(wù)。
main reactor線程退出read loop的兩個(gè)條件:
在限定的16次讀取中屎即,已經(jīng)沒(méi)有新的客戶(hù)端連接要接收了庙睡。退出循環(huán)事富。
從NioServerSocketChannel中讀取客戶(hù)端連接的次數(shù)達(dá)到了16次,無(wú)論此時(shí)是否還有客戶(hù)端連接都需要退出循環(huán)乘陪。
當(dāng)滿(mǎn)足以上兩個(gè)退出條件時(shí)统台,main reactor線程就會(huì)退出read loop,由于在read loop中接收到的客戶(hù)端連接全部暫存在List<Object> readBuf
集合中,隨后開(kāi)始遍歷readBuf暂刘,在NioServerSocketChannel的pipeline中傳播ChannelRead事件饺谬。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//NioServerSocketChannel對(duì)應(yīng)的pipeline中傳播read事件
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
//初始化客戶(hù)端SocketChannel,并將其綁定到Sub Reactor線程組中的一個(gè)Reactor上
pipeline.fireChannelRead(readBuf.get(i));
}
最終pipeline中的ChannelHandler(ServerBootstrapAcceptor)會(huì)響應(yīng)ChannelRead事件谣拣,并在相應(yīng)回調(diào)函數(shù)中初始化客戶(hù)端NioSocketChannel募寨,并將其注冊(cè)到Sub Reactor Group中。此后客戶(hù)端NioSocketChannel綁定到的sub reactor就開(kāi)始監(jiān)聽(tīng)處理客戶(hù)端連接上的讀寫(xiě)事件了森缠。
Netty整個(gè)接收客戶(hù)端的邏輯過(guò)程如下圖步驟1拔鹰,2,3所示贵涵。
以上內(nèi)容就是筆者提取出來(lái)的整體流程框架列肢,下面我們來(lái)將其中涉及到的重要核心模塊拆開(kāi),一個(gè)一個(gè)詳細(xì)解讀下宾茂。
3. RecvByteBufAllocator簡(jiǎn)介
Reactor在處理對(duì)應(yīng)Channel上的IO數(shù)據(jù)時(shí)瓷马,都會(huì)采用一個(gè)ByteBuffer
來(lái)接收Channel上的IO數(shù)據(jù)。而本小節(jié)要介紹的RecvByteBufAllocator正是用來(lái)分配ByteBuffer的一個(gè)分配器跨晴。
還記得這個(gè)RecvByteBufAllocator
在哪里被創(chuàng)建的嗎欧聘??
在《聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中端盆,在介紹NioServerSocketChannel
的創(chuàng)建過(guò)程中提到怀骤,對(duì)應(yīng)Channel的配置類(lèi)NioServerSocketChannelConfig也會(huì)隨著NioServerSocketChannel的創(chuàng)建而創(chuàng)建。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
在創(chuàng)建NioServerSocketChannelConfig
的過(guò)程中會(huì)創(chuàng)建RecvByteBufAllocator
焕妙。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
這里我們看到NioServerSocketChannel中的RecvByteBufAllocator實(shí)際類(lèi)型為AdaptiveRecvByteBufAllocator
蒋伦,顧名思義,這個(gè)類(lèi)型的RecvByteBufAllocator可以根據(jù)Channel上每次到來(lái)的IO數(shù)據(jù)大小來(lái)自適應(yīng)動(dòng)態(tài)調(diào)整ByteBuffer的容量焚鹊。
對(duì)于服務(wù)端NioServerSocketChannel來(lái)說(shuō)痕届,它上邊的IO數(shù)據(jù)就是客戶(hù)端的連接,它的長(zhǎng)度和類(lèi)型都是固定的末患,所以在接收客戶(hù)端連接的時(shí)候并不需要這樣的一個(gè)ByteBuffer來(lái)接收爷抓,我們會(huì)將接收到的客戶(hù)端連接存放在List<Object> readBuf
集合中
對(duì)于客戶(hù)端NioSocketChannel來(lái)說(shuō),它上邊的IO數(shù)據(jù)時(shí)客戶(hù)端發(fā)送來(lái)的網(wǎng)絡(luò)數(shù)據(jù)阻塑,長(zhǎng)度是不定的蓝撇,所以才會(huì)需要這樣一個(gè)可以根據(jù)每次IO數(shù)據(jù)的大小來(lái)自適應(yīng)動(dòng)態(tài)調(diào)整容量的ByteBuffer來(lái)接收。
那么看起來(lái)這個(gè)RecvByteBufAllocator和本文的主題不是很關(guān)聯(lián)陈莽,因?yàn)樵诮邮者B接的過(guò)程中并不會(huì)怎么用到它渤昌,這個(gè)類(lèi)筆者還會(huì)在后面的文章中詳細(xì)介紹虽抄,之所以這里把它拎出來(lái)單獨(dú)介紹是因?yàn)樗捅疚拈_(kāi)頭提到的Bug有關(guān)系,這個(gè)Bug就是由這個(gè)類(lèi)引起的独柑。
3.1 RecvByteBufAllocator.Handle的獲取
在本文中迈窟,我們是通過(guò)NioServerSocketChannel中的unsafe底層操作類(lèi)來(lái)獲取RecvByteBufAllocator.Handle的
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) {
recvHandle = config().getRecvByteBufAllocator().newHandle();
}
return recvHandle;
}
}
我們看到最終會(huì)在NioServerSocketChannel的配置類(lèi)NioServerSocketChannelConfig中獲取到AdaptiveRecvByteBufAllocator
public class DefaultChannelConfig implements ChannelConfig {
//用于Channel接收數(shù)據(jù)用的buffer分配器 類(lèi)型為AdaptiveRecvByteBufAllocator
private volatile RecvByteBufAllocator rcvBufAllocator;
}
AdaptiveRecvByteBufAllocator
中會(huì)創(chuàng)建自適應(yīng)動(dòng)態(tài)調(diào)整容量的ByteBuffer分配器。
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
@Override
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
private final class HandleImpl extends MaxMessageHandle {
.................省略................
}
}
這里的newHandle
方法返回的具體類(lèi)型為MaxMessageHandle
忌栅,這個(gè)MaxMessageHandle
里邊保存了每次從Channel
中讀取IO數(shù)據(jù)
的容量指標(biāo)车酣,方便下次讀取時(shí)分配合適大小的buffer
。
每次在使用allocHandle
前需要調(diào)用allocHandle.reset(config);
重置里邊的統(tǒng)計(jì)指標(biāo)索绪。
public abstract class MaxMessageHandle implements ExtendedHandle {
private ChannelConfig config;
//每次事件輪詢(xún)時(shí)湖员,最多讀取16次
private int maxMessagePerRead;
//本次事件輪詢(xún)總共讀取的message數(shù),這里指的是接收連接的數(shù)量
private int totalMessages;
//本次事件輪詢(xún)總共讀取的字節(jié)數(shù)
private int totalBytesRead;
@Override
public void reset(ChannelConfig config) {
this.config = config;
//默認(rèn)每次最多讀取16次
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
}
-
maxMessagePerRead:用于控制每次read loop里最大可以循環(huán)讀取的次數(shù),默認(rèn)為16次瑞驱,可在啟動(dòng)配置類(lèi)
ServerBootstrap
中通過(guò)ChannelOption.MAX_MESSAGES_PER_READ
選項(xiàng)設(shè)置娘摔。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.MAX_MESSAGES_PER_READ, 自定義次數(shù))
-
totalMessages:用于統(tǒng)計(jì)read loop中總共接收的連接個(gè)數(shù),每次read loop循環(huán)后會(huì)調(diào)用
allocHandle.incMessagesRead
增加記錄接收到的連接個(gè)數(shù)唤反。
@Override
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
- totalBytesRead:用于統(tǒng)計(jì)在read loop中總共接收到客戶(hù)端連接上的數(shù)據(jù)大小凳寺,這個(gè)字段主要用于sub reactor在接收客戶(hù)端NioSocketChannel上的網(wǎng)絡(luò)數(shù)據(jù)用的,本文我們介紹的是main reactor接收客戶(hù)端連接彤侍,所以這里并不會(huì)用到這個(gè)字段肠缨。這個(gè)字段會(huì)在sub reactor每次讀取完NioSocketChannel上的網(wǎng)絡(luò)數(shù)據(jù)時(shí)增加記錄。
@Override
public void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
MaxMessageHandler中還有一個(gè)非常重要的方法就是在每次read loop末尾會(huì)調(diào)用allocHandle.continueReading()
方法來(lái)判斷讀取連接次數(shù)是否已滿(mǎn)16次盏阶,來(lái)決定main reactor線程是否退出循環(huán)晒奕。
do {
//底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統(tǒng)計(jì)在當(dāng)前事件循環(huán)中已經(jīng)讀取到得Message數(shù)量(創(chuàng)建連接的個(gè)數(shù))
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
紅框中圈出來(lái)的兩個(gè)判斷條件和本文主題無(wú)關(guān),我們這里不需要關(guān)注般哼,筆者會(huì)在后面的文章詳細(xì)介紹吴汪。
totalMessages < maxMessagePerRead
:在本文的接收客戶(hù)端連接場(chǎng)景中惠窄,這個(gè)條件用于判斷main reactor線程在read loop中的讀取次數(shù)是否超過(guò)了16次蒸眠。如果超過(guò)16次就會(huì)返回false,main reactor線程退出循環(huán)杆融。totalBytesRead > 0
:用于判斷當(dāng)客戶(hù)端NioSocketChannel上的OP_READ事件活躍時(shí)楞卡,sub reactor線程在read loop中是否讀取到了網(wǎng)絡(luò)數(shù)據(jù)。
以上內(nèi)容就是RecvByteBufAllocator.Handle在接收客戶(hù)端連接場(chǎng)景下的作用脾歇,大家這里仔細(xì)看下這個(gè)allocHandle.continueReading()
方法退出循環(huán)的判斷條件蒋腮,再結(jié)合整個(gè)do{....}while(...)
接收連接循環(huán)體,感受下是否哪里有些不對(duì)勁藕各?Bug即將出現(xiàn)~~~
4. 啊哈3卮荨!Bug ! !
netty不論是在本文中處理接收客戶(hù)端連接的場(chǎng)景還是在處理接收客戶(hù)端連接上的網(wǎng)絡(luò)數(shù)據(jù)場(chǎng)景都會(huì)在一個(gè)do{....}while(...)
循環(huán)read loop中不斷的處理激况。
同時(shí)也都會(huì)利用在上一小節(jié)中介紹的RecvByteBufAllocator.Handle
來(lái)記錄每次read loop接收到的連接個(gè)數(shù)和從連接上讀取到的網(wǎng)絡(luò)數(shù)據(jù)大小作彤。
從而在read loop的末尾都會(huì)通過(guò)allocHandle.continueReading()
方法判斷是否應(yīng)該退出read loop循環(huán)結(jié)束連接的接收流程或者是結(jié)束連接上數(shù)據(jù)的讀取流程膘魄。
無(wú)論是用于接收客戶(hù)端連接的main reactor也好還是用于接收客戶(hù)端連接上的網(wǎng)絡(luò)數(shù)據(jù)的sub reactor也好,它們的運(yùn)行框架都是一樣的竭讳,只不過(guò)是具體分工不同创葡。
所以netty這里想用統(tǒng)一的RecvByteBufAllocator.Handle
來(lái)處理以上兩種場(chǎng)景。
而RecvByteBufAllocator.Handle
中的totalBytesRead
字段主要記錄sub reactor線程在處理客戶(hù)端NioSocketChannel中OP_READ事件活躍時(shí)绢慢,總共在read loop中讀取到的網(wǎng)絡(luò)數(shù)據(jù)灿渴,而這里是main reactor線程在接收客戶(hù)端連接所以這個(gè)字段并不會(huì)被設(shè)置。totalBytesRead字段的值在本文中永遠(yuǎn)會(huì)是0
胰舆。
所以無(wú)論同時(shí)有多少個(gè)客戶(hù)端并發(fā)連接到服務(wù)端上骚露,在接收連接的這個(gè)read loop中永遠(yuǎn)只會(huì)接受一個(gè)連接就會(huì)退出循環(huán),因?yàn)?code>allocHandle.continueReading()方法中的判斷條件totalBytesRead > 0
永遠(yuǎn)會(huì)返回false
思瘟。
do {
//底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//統(tǒng)計(jì)在當(dāng)前事件循環(huán)中已經(jīng)讀取到得Message數(shù)量(創(chuàng)建連接的個(gè)數(shù))
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
而netty的本意是在這個(gè)read loop循環(huán)中盡可能多的去接收客戶(hù)端的并發(fā)連接荸百,同時(shí)又不影響main reactor線程執(zhí)行異步任務(wù)。但是由于這個(gè)Bug滨攻,main reactor在這個(gè)循環(huán)中只執(zhí)行一次就結(jié)束了够话。這也一定程度上就影響了netty的吞吐。
讓我們想象下這樣的一個(gè)場(chǎng)景光绕,當(dāng)有16個(gè)客戶(hù)端同時(shí)并發(fā)連接到了服務(wù)端女嘲,這時(shí)NioServerSocketChannel上的OP_ACCEPT事件
活躍,main reactor從Selector上被喚醒诞帐,隨后執(zhí)行OP_ACCEPT事件
的處理欣尼。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
............省略.........
case SelectStrategy.BUSY_WAIT:
............省略.........
case SelectStrategy.SELECT:
............監(jiān)聽(tīng)輪詢(xún)IO事件.........
default:
}
} catch (IOException e) {
............省略.........
}
............處理IO就緒事件.........
............執(zhí)行異步任務(wù).........
}
}
但是由于這個(gè)Bug的存在,main reactor在接收客戶(hù)端連接的這個(gè)read loop中只接收了一個(gè)客戶(hù)端連接就匆匆返回了停蕉。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
然后根據(jù)下圖中這個(gè)Reactor的運(yùn)行結(jié)構(gòu)去執(zhí)行異步任務(wù)愕鼓,隨后繞一大圈又會(huì)回到NioEventLoop#run
方法中重新發(fā)起一輪OP_ACCEPT事件輪詢(xún)。
由于現(xiàn)在還有15個(gè)客戶(hù)端并發(fā)連接沒(méi)有被接收慧起,所以此時(shí)Main Reactor線程并不會(huì)在selector.select()
上阻塞菇晃,最終繞一圈又會(huì)回到NioMessageUnsafe#read
方法的do{.....}while()
循環(huán)。在接收一個(gè)連接之后又退出循環(huán)蚓挤。
本來(lái)我們可以在一次read loop中把這16個(gè)并發(fā)的客戶(hù)端連接全部接收完畢的磺送,因?yàn)檫@個(gè)Bug,main reactor需要不斷的發(fā)起OP_ACCEPT事件的輪詢(xún)灿意,繞了很大一個(gè)圈子估灿。同時(shí)也增加了許多不必要的selector.select()系統(tǒng)調(diào)用開(kāi)銷(xiāo)
這時(shí)大家在看這個(gè)Issue#11708中的討論是不是就清晰很多了~~
Issue#11708:https://github.com/netty/netty/issues/11708
4.1 Bug的修復(fù)
筆者在寫(xiě)這篇文章的時(shí)候,Netty最新版本是4.1.68.final缤剧,這個(gè)Bug在4.1.69.final中被修復(fù)馅袁。
由于該Bug產(chǎn)生的原因正是因?yàn)榉?wù)端NioServerSocketChannel(用于監(jiān)聽(tīng)端口地址和接收客戶(hù)端連接)和 客戶(hù)端NioSocketChannel(用于通信)中的Config配置類(lèi)混用了同一個(gè)ByteBuffer分配器AdaptiveRecvByteBufAllocator
而導(dǎo)致的。
所以在新版本修復(fù)中專(zhuān)門(mén)為服務(wù)端ServerSocketChannel中的Config配置類(lèi)引入了一個(gè)新的ByteBuffer分配器ServerChannelRecvByteBufAllocator
荒辕,專(zhuān)門(mén)用于服務(wù)端ServerSocketChannel接收客戶(hù)端連接的場(chǎng)景汗销。
在ServerChannelRecvByteBufAllocator
的父類(lèi)DefaultMaxMessagesRecvByteBufAllocator
中引入了一個(gè)新的字段ignoreBytesRead
芒粹,用于表示是否忽略網(wǎng)絡(luò)字節(jié)的讀取,在創(chuàng)建服務(wù)端Channel配置類(lèi)NioServerSocketChannelConfig的時(shí)候大溜,這個(gè)字段會(huì)被賦值為true
化漆。
當(dāng)main reactor線程在read loop循環(huán)中接收客戶(hù)端連接的時(shí)候。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
do {
int localRead = doReadMessages(readBuf);
.........省略...........
} while (allocHandle.continueReading());
}
在read loop循環(huán)的末尾就會(huì)采用從ServerChannelRecvByteBufAllocator
中創(chuàng)建的MaxMessageHandle#continueReading
方法來(lái)判斷讀取連接次數(shù)是否超過(guò)了16次钦奋。由于這里的ignoreBytesRead == true
這回我們就會(huì)忽略totalBytesRead == 0
的情況座云,從而使得接收連接的read loop得以繼續(xù)地執(zhí)行下去。在一個(gè)read loop中一次性把16個(gè)連接全部接收完畢付材。
以上就是對(duì)這個(gè)Bug產(chǎn)生的原因朦拖,以及發(fā)現(xiàn)的過(guò)程,最后修復(fù)的方案一個(gè)全面的介紹厌衔,因此筆者也出現(xiàn)在了netty 4.1.69.final版本發(fā)布公告里的thank-list中璧帝。哈哈,真是令人開(kāi)心的一件事情~~~
通過(guò)以上對(duì)netty接收客戶(hù)端連接的全流程分析和對(duì)這個(gè)Bug來(lái)龍去脈以及修復(fù)方案的介紹富寿,大家現(xiàn)在一定已經(jīng)理解了整個(gè)接收連接的流程框架睬隶。
接下來(lái)筆者就把這個(gè)流程中涉及到的一些核心模塊在單獨(dú)拎出來(lái)從細(xì)節(jié)入手,為大家各個(gè)擊破~~~
5. doReadMessages接收客戶(hù)端連接
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
- 通過(guò)
javaChannel()
獲取封裝在Netty服務(wù)端NioServerSocketChannel
中的JDK 原生 ServerSocketChannel
页徐。
@Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
- 通過(guò)
JDK NIO 原生
的ServerSocketChannel
的accept方法
獲取JDK NIO 原生
客戶(hù)端連接SocketChannel
苏潜。
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
這一步就是我們?cè)?a target="_blank">《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》介紹到的調(diào)用監(jiān)聽(tīng)Socket
的accept方法
,內(nèi)核會(huì)基于監(jiān)聽(tīng)Socket
創(chuàng)建出來(lái)一個(gè)新的Socket
專(zhuān)門(mén)用于與客戶(hù)端之間的網(wǎng)絡(luò)通信這個(gè)我們稱(chēng)之為客戶(hù)端連接Socket
变勇。這里的ServerSocketChannel
就類(lèi)似于監(jiān)聽(tīng)Socket
恤左。SocketChannel
就類(lèi)似于客戶(hù)端連接Socket
。
由于我們?cè)趧?chuàng)建NioServerSocketChannel
的時(shí)候搀绣,會(huì)將JDK NIO 原生
的ServerSocketChannel
設(shè)置為非阻塞
飞袋,所以這里當(dāng)ServerSocketChannel
上有客戶(hù)端連接時(shí)就會(huì)直接創(chuàng)建SocketChannel
,如果此時(shí)并沒(méi)有客戶(hù)端連接時(shí)accept調(diào)用
就會(huì)立刻返回null
并不會(huì)阻塞链患。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設(shè)置Channel為非阻塞 配合IO多路復(fù)用模型
ch.configureBlocking(false);
} catch (IOException e) {
..........省略.............
}
}
5.1 創(chuàng)建客戶(hù)端NioSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
.........省略.......
}
return 0;
}
}
這里會(huì)根據(jù)ServerSocketChannel
的accept
方法獲取到JDK NIO 原生
的SocketChannel
(用于底層真正與客戶(hù)端通信的Channel)巧鸭,來(lái)創(chuàng)建Netty中的NioSocketChannel
。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}
創(chuàng)建客戶(hù)端NioSocketChannel
的過(guò)程其實(shí)和之前講的創(chuàng)建服務(wù)端NioServerSocketChannel
大體流程是一樣的锣险,我們這里只對(duì)客戶(hù)端NioSocketChannel
和服務(wù)端NioServerSocketChannel
在創(chuàng)建過(guò)程中的不同之處做一個(gè)對(duì)比蹄皱。
具體細(xì)節(jié)部分大家可以在回看下《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》一文中關(guān)于
NioServerSocketChannel
的創(chuàng)建的詳細(xì)細(xì)節(jié)览闰。
5.3 對(duì)比NioSocketChannel與NioServerSocketChannel的不同
1:Channel的層次不同
在我們介紹Reactor的創(chuàng)建文章中芯肤,我們提到Netty中的Channel
是具有層次的。由于客戶(hù)端NioSocketChannel是在main reactor接收連接時(shí)在服務(wù)端NioServerSocketChannel中被創(chuàng)建的压鉴,所以在創(chuàng)建客戶(hù)端NioSocketChannel的時(shí)候會(huì)通過(guò)構(gòu)造函數(shù)指定了parent屬性為NioServerSocketChanel
崖咨。并將JDK NIO 原生
的SocketChannel
封裝進(jìn)Netty的客戶(hù)端NioSocketChannel
中。
而在Reactor啟動(dòng)過(guò)程中創(chuàng)建NioServerSocketChannel
的時(shí)候parent屬性
指定是null
油吭。因?yàn)樗褪琼攲拥?code>Channel击蹲,負(fù)責(zé)創(chuàng)建客戶(hù)端NioSocketChannel
署拟。
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
2:向Reactor注冊(cè)的IO事件不同
客戶(hù)端NioSocketChannel向Sub Reactor注冊(cè)的是SelectionKey.OP_READ事件
,而服務(wù)端NioServerSocketChannel向Main Reactor注冊(cè)的是SelectionKey.OP_ACCEPT事件
歌豺。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
public NioServerSocketChannel(ServerSocketChannel channel) {
//父類(lèi)AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監(jiān)聽(tīng)的事件OP_ACCEPT
super(null, channel, SelectionKey.OP_ACCEPT);
//DefaultChannelConfig中設(shè)置用于Channel接收數(shù)據(jù)用的buffer->AdaptiveRecvByteBufAllocator
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
}
3: 功能屬性不同造成繼承結(jié)構(gòu)的不同
客戶(hù)端NioSocketChannel
繼承的是AbstractNioByteChannel
推穷,而服務(wù)端NioServerSocketChannel
繼承的是AbstractNioMessageChannel
。
它們繼承的這兩個(gè)抽象類(lèi)一個(gè)前綴是Byte
类咧,一個(gè)前綴是Message
有什么區(qū)別嗎馒铃??
客戶(hù)端
NioSocketChannel
主要處理的是服務(wù)端與客戶(hù)端的通信痕惋,這里涉及到接收客戶(hù)端發(fā)送來(lái)的數(shù)據(jù)区宇,而Sub Reactor線程
從NioSocketChannel
中讀取的正是網(wǎng)絡(luò)通信數(shù)據(jù)單位為Byte
。
服務(wù)端
NioServerSocketChannel
主要負(fù)責(zé)處理OP_ACCEPT事件
值戳,創(chuàng)建用于通信的客戶(hù)端NioSocketChannel
议谷。這時(shí)候客戶(hù)端與服務(wù)端還沒(méi)開(kāi)始通信,所以Main Reactor線程
從NioServerSocketChannel
的讀取對(duì)象為Message
堕虹。這里的Message
指的就是底層的SocketChannel
客戶(hù)端連接卧晓。
以上就是NioSocketChannel
與NioServerSocketChannel
創(chuàng)建過(guò)程中的不同之處,后面的過(guò)程就一樣了赴捞。
- 在AbstractNioChannel 類(lèi)中封裝JDK NIO 原生的
SocketChannel
禀崖,并將其底層的IO模型設(shè)置為非阻塞
,保存需要監(jiān)聽(tīng)的IO事件OP_READ
螟炫。
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
//設(shè)置Channel為非阻塞 配合IO多路復(fù)用模型
ch.configureBlocking(false);
} catch (IOException e) {
}
}
- 為客戶(hù)端NioSocketChannel創(chuàng)建全局唯一的
channelId
波附,創(chuàng)建客戶(hù)端NioSocketChannel的底層操作類(lèi)NioByteUnsafe
,創(chuàng)建pipeline昼钻。
protected AbstractChannel(Channel parent) {
this.parent = parent;
//channel全局唯一ID machineId+processId+sequence+timestamp+random
id = newId();
//unsafe用于底層socket的讀寫(xiě)操作
unsafe = newUnsafe();
//為channel分配獨(dú)立的pipeline用于IO事件編排
pipeline = newChannelPipeline();
}
- 在NioSocketChannelConfig的創(chuàng)建過(guò)程中掸屡,將NioSocketChannel的RecvByteBufAllocator類(lèi)型設(shè)置為
AdaptiveRecvByteBufAllocator
。
public DefaultChannelConfig(Channel channel) {
this(channel, new AdaptiveRecvByteBufAllocator());
}
在Bug修復(fù)后的版本中服務(wù)端NioServerSocketChannel的RecvByteBufAllocator類(lèi)型設(shè)置為
ServerChannelRecvByteBufAllocator
最終我們得到的客戶(hù)端NioSocketChannel
結(jié)構(gòu)如下:
6. ChannelRead事件的響應(yīng)
在前邊介紹接收連接的整體核心流程框架的時(shí)候然评,我們提到main reactor線程是在一個(gè)do{.....}while(...)
循環(huán)read loop中不斷的調(diào)用ServerSocketChannel#accept
方法來(lái)接收客戶(hù)端的連接仅财。
當(dāng)滿(mǎn)足退出read loop循環(huán)的條件有兩個(gè):
在限定的16次讀取中,已經(jīng)沒(méi)有新的客戶(hù)端連接要接收了碗淌。退出循環(huán)盏求。
從NioServerSocketChannel中讀取客戶(hù)端連接的次數(shù)達(dá)到了16次,無(wú)論此時(shí)是否還有客戶(hù)端連接都需要退出循環(huán)亿眠。
main reactor就會(huì)退出read loop循環(huán)碎罚,此時(shí)接收到的客戶(hù)端連接NioSocketChannel暫存與List<Object> readBuf
集合中。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
try {
try {
do {
........省略.........
//底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
int localRead = doReadMessages(readBuf);
........省略.........
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
........省略.........
} finally {
........省略.........
}
}
}
隨后main reactor線程會(huì)遍歷List<Object> readBuf
集合中的NioSocketChannel纳像,并在NioServerSocketChannel的pipeline中傳播ChannelRead事件荆烈。
最終ChannelRead事件
會(huì)傳播到ServerBootstrapAcceptor
中,這里正是Netty處理客戶(hù)端連接的核心邏輯所在。
ServerBootstrapAcceptor
主要的作用就是初始化客戶(hù)端NioSocketChannel
憔购,并將客戶(hù)端NioSocketChannel注冊(cè)到Sub Reactor Group
中宫峦,并監(jiān)聽(tīng)OP_READ事件
。
在ServerBootstrapAcceptor 中會(huì)初始化客戶(hù)端NioSocketChannel的這些屬性玫鸟。
比如:從Reactor組EventLoopGroup childGroup
导绷,用于初始化NioSocketChannel
中的pipeline
用到的ChannelHandler childHandler
,以及NioSocketChannel
中的一些childOptions
和childAttrs
屎飘。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//向客戶(hù)端NioSocketChannel的pipeline中
//添加在啟動(dòng)配置類(lèi)ServerBootstrap中配置的ChannelHandler
child.pipeline().addLast(childHandler);
//利用配置的屬性初始化客戶(hù)端NioSocketChannel
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
/**
* 1:在Sub Reactor線程組中選擇一個(gè)Reactor綁定
* 2:將客戶(hù)端SocketChannel注冊(cè)到綁定的Reactor上
* 3:SocketChannel注冊(cè)到sub reactor中的selector上诵次,并監(jiān)聽(tīng)OP_READ事件
* */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
正是在這里,netty會(huì)將我們?cè)?a target="_blank">《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》的啟動(dòng)示例程序中在ServerBootstrap中配置的客戶(hù)端NioSocketChannel的所有屬性(child前綴配置)初始化到NioSocketChannel中枚碗。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類(lèi)型
.option(ChannelOption.SO_BACKLOG, 100)//設(shè)置主Reactor中channel的option選項(xiàng)
.handler(new LoggingHandler(LogLevel.INFO))//設(shè)置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設(shè)置從Reactor中注冊(cè)channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定端口啟動(dòng)服務(wù)逾一,開(kāi)始監(jiān)聽(tīng)accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上示例代碼中通過(guò)ServerBootstrap配置的NioSocketChannel相關(guān)屬性践惑,會(huì)在Netty啟動(dòng)并開(kāi)始初始化NioServerSocketChannel
的時(shí)候?qū)?code>ServerBootstrapAcceptor的創(chuàng)建初始化工作封裝成異步任務(wù)
屯援,然后在NioServerSocketChannel
注冊(cè)到Main Reactor
中成功后執(zhí)行铝噩。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
@Override
void init(Channel channel) {
................省略................
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
................省略................
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
在經(jīng)過(guò)ServerBootstrapAccptor#chanelRead回調(diào)
的處理之后畸悬,此時(shí)客戶(hù)端NioSocketChannel中pipeline的結(jié)構(gòu)為:
隨后會(huì)將初始化好的客戶(hù)端NioSocketChannel向Sub Reactor Group中注冊(cè)碌冶,并監(jiān)聽(tīng)OP_READ事件
羡疗。
如下圖中的步驟3所示:
7. 向SubReactorGroup中注冊(cè)NioSocketChannel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
客戶(hù)端NioSocketChannel向Sub Reactor Group注冊(cè)的流程完全和服務(wù)端NioServerSocketChannel向Main Reactor Group注冊(cè)流程一樣咆疗。
關(guān)于服務(wù)端NioServerSocketChannel的注冊(cè)流程忱屑,筆者已經(jīng)在《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》一文中做出了詳細(xì)的介紹波丰,對(duì)相關(guān)細(xì)節(jié)感興趣的同學(xué)可以在回看下壳坪。
這里筆者在帶大家簡(jiǎn)要回顧下整個(gè)注冊(cè)過(guò)程并著重區(qū)別對(duì)比客戶(hù)端NioSocetChannel與服務(wù)端NioServerSocketChannel注冊(cè)過(guò)程中不同的地方。
7.1 從Sub Reactor Group中選取一個(gè)Sub Reactor進(jìn)行綁定
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
return chooser.next();
}
}
7.2 向綁定的Sub Reactor上注冊(cè)NioSocketChannel
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
//注冊(cè)channel到綁定的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
//unsafe負(fù)責(zé)channel底層的各種操作
promise.channel().unsafe().register(this, promise);
return promise;
}
}
當(dāng)時(shí)我們?cè)诮榻B
NioServerSocketChannel
的注冊(cè)過(guò)程時(shí)掰烟,這里的promise.channel()
為NioServerSocketChannel
爽蝴。底層的unsafe操作類(lèi)為NioMessageUnsafe
。此時(shí)這里的
promise.channel()
為NioSocketChannel
纫骑。底層的unsafe操作類(lèi)為NioByteUnsafe
蝎亚。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
..............省略....................
//此時(shí)這里的eventLoop為Sub Reactor
AbstractChannel.this.eventLoop = eventLoop;
/**
* 執(zhí)行channel注冊(cè)的操作必須是Reactor線程來(lái)完成
*
* 1: 如果當(dāng)前執(zhí)行線程是Reactor線程,則直接執(zhí)行register0進(jìn)行注冊(cè)
* 2:如果當(dāng)前執(zhí)行線程是外部線程先馆,則需要將register0注冊(cè)操作 封裝程異步Task 由Reactor線程執(zhí)行
* */
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
..............省略....................
}
}
}
注意此時(shí)傳遞進(jìn)來(lái)的EventLoop eventLoop為Sub Reactor发框。
但此時(shí)的執(zhí)行線程為Main Reactor線程
,并不是Sub Reactor線程(此時(shí)還未啟動(dòng))煤墙。
所以這里的eventLoop.inEventLoop()
返回的是false
梅惯。
在else分支
中向綁定的Sub Reactor提交注冊(cè)NioSocketChannel
的任務(wù)。
當(dāng)注冊(cè)任務(wù)提交后仿野,此時(shí)綁定的
Sub Reactor線程
啟動(dòng)铣减。
7.3 register0
我們又來(lái)到了Channel注冊(cè)的老地方register0方法
。在《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》中我們花了大量的篇幅介紹了這個(gè)方法设预。這里我們只對(duì)比NioSocketChannel
與NioServerSocketChannel
不同的地方徙歼。
private void register0(ChannelPromise promise) {
try {
................省略..................
boolean firstRegistration = neverRegistered;
//執(zhí)行真正的注冊(cè)操作
doRegister();
//修改注冊(cè)狀態(tài)
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
if (isActive()) {
if (firstRegistration) {
//觸發(fā)channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
................省略..................
}
}
這里 doRegister()方法
將NioSocketChannel注冊(cè)到Sub Reactor中的Selector
上犁河。
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...............省略...............
}
}
}
}
這里是Netty客戶(hù)端NioSocketChannel
與JDK NIO 原生 SocketChannel關(guān)聯(lián)的地方鳖枕。此時(shí)注冊(cè)的IO事件
依然是0
魄梯。目的也是只是為了獲取NioSocketChannel在Selector中的SelectionKey
。
同時(shí)通過(guò)SelectableChannel#register
方法將Netty自定義的NioSocketChannel(這里的this指針)附著在SelectionKey的attechment屬性上宾符,完成Netty自定義Channel與JDK NIO Channel的關(guān)系綁定酿秸。這樣在每次對(duì)Selector進(jìn)行IO就緒事件輪詢(xún)時(shí),Netty 都可以從 JDK NIO Selector返回的SelectionKey中獲取到自定義的Channel對(duì)象(這里指的就是NioSocketChannel)魏烫。
隨后調(diào)用pipeline.invokeHandlerAddedIfNeeded()
回調(diào)客戶(hù)端NioSocketChannel上pipeline中的所有ChannelHandler的handlerAdded方法
辣苏,此時(shí)pipeline
的結(jié)構(gòu)中只有一個(gè)ChannelInitializer
。最終會(huì)在ChannelInitializer#handlerAdded
回調(diào)方法中初始化客戶(hù)端NioSocketChannel
的pipeline
哄褒。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
//初始化工作完成后稀蟋,需要將自身從pipeline中移除
removeState(ctx);
}
}
}
protected abstract void initChannel(C ch) throws Exception;
}
關(guān)于對(duì)Channel中pipeline的詳細(xì)初始化過(guò)程,對(duì)細(xì)節(jié)部分感興趣的同學(xué)可以回看下《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》
此時(shí)客戶(hù)端NioSocketChannel中的pipeline中的結(jié)構(gòu)就變?yōu)榱宋覀冏远x的樣子呐赡,在示例代碼中我們自定義的ChannelHandler
為EchoServerHandler
退客。
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
當(dāng)客戶(hù)端NioSocketChannel中的pipeline初始化完畢后,netty就開(kāi)始調(diào)用safeSetSuccess(promise)方法
回調(diào)regFuture
中注冊(cè)的ChannelFutureListener
链嘀,通知客戶(hù)端NioSocketChannel已經(jīng)成功注冊(cè)到Sub Reactor上了萌狂。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
在服務(wù)端NioServerSocketChannel注冊(cè)的時(shí)候我們會(huì)在listener中向Main Reactor提交
bind綁定端口地址任務(wù)
。但是在NioSocketChannel
注冊(cè)的時(shí)候怀泊,只會(huì)在listener
中處理一下注冊(cè)失敗的情況茫藏。
當(dāng)Sub Reactor線程通知ChannelFutureListener注冊(cè)成功之后,隨后就會(huì)調(diào)用pipeline.fireChannelRegistered()
在客戶(hù)端NioSocketChannel的pipeline中傳播ChannelRegistered事件
霹琼。
這里筆者重點(diǎn)要強(qiáng)調(diào)下务傲,在之前介紹NioServerSocketChannel注冊(cè)的時(shí)候,我們提到因?yàn)榇藭r(shí)NioServerSocketChannel并未綁定端口地址枣申,所以這時(shí)的NioServerSocketChannel并未激活树灶,這里的isActive()
返回false
。register0方法
直接返回糯而。
服務(wù)端NioServerSocketChannel判斷是否激活的標(biāo)準(zhǔn)為端口是否綁定成功天通。
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
public boolean isActive() {
return isOpen() && javaChannel().socket().isBound();
}
}
客戶(hù)端
NioSocketChannel
判斷是否激活的標(biāo)準(zhǔn)為是否處于Connected狀態(tài)
。那么顯然這里肯定是處于connected狀態(tài)
的熄驼。
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
NioSocketChannel
已經(jīng)處于connected狀態(tài)
像寒,這里并不需要綁定端口,所以這里的isActive()
返回true
瓜贾。
if (isActive()) {
/**
* 客戶(hù)端SocketChannel注冊(cè)成功后會(huì)走這里诺祸,在channelActive事件回調(diào)中注冊(cè)O(shè)P_READ事件
* */
if (firstRegistration) {
//觸發(fā)channelActive事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
.......省略..........
}
}
}
最后調(diào)用pipeline.fireChannelActive()
在NioSocketChannel中的pipeline傳播ChannelActive事件
,最終在pipeline
的頭結(jié)點(diǎn)HeadContext
中響應(yīng)并注冊(cè)OP_READ事件
到Sub Reactor
中的Selector
上祭芦。
public abstract class AbstractNioChannel extends AbstractChannel { {
@Override
protected void doBeginRead() throws Exception {
..............省略................
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化時(shí) readInterestOp設(shè)置的是OP_ACCEPT事件
* 2:SocketChannel 初始化時(shí) readInterestOp設(shè)置的是OP_READ事件
* */
if ((interestOps & readInterestOp) == 0) {
//注冊(cè)監(jiān)聽(tīng)OP_ACCEPT或者OP_READ事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
注意這里的
readInterestOp
為客戶(hù)端NioSocketChannel
在初始化時(shí)設(shè)置的OP_READ事件
筷笨。
到這里,Netty中的Main Reactor
接收連接的整個(gè)流程,我們就介紹完了胃夏,此時(shí)Netty中主從Reactor組的結(jié)構(gòu)就變?yōu)椋?/p>
總結(jié)
本文我們介紹了NioServerSocketChannel
處理客戶(hù)端連接事件的整個(gè)過(guò)程轴或。
接收連接的整個(gè)處理框架。
影響Netty接收連接吞吐的Bug產(chǎn)生的原因仰禀,以及修復(fù)的方案照雁。
創(chuàng)建并初始化客戶(hù)端
NioSocketChannel
。初始化
NioSocketChannel
中的pipeline
答恶。客戶(hù)端
NioSocketChannel
向Sub Reactor
注冊(cè)的過(guò)程
其中我們也對(duì)比了NioServerSocketChannel
與NioSocketChannel
在創(chuàng)建初始化以及后面向Reactor
注冊(cè)過(guò)程中的差異之處饺蚊。
當(dāng)客戶(hù)端NioSocketChannel
接收完畢并向Sub Reactor
注冊(cè)成功后,那么接下來(lái)Sub Reactor
就開(kāi)始監(jiān)聽(tīng)注冊(cè)其上的所有客戶(hù)端NioSocketChannel
的OP_READ事件
悬嗓,并等待客戶(hù)端向服務(wù)端發(fā)送網(wǎng)絡(luò)數(shù)據(jù)污呼。
后面Reactor
的主角就該變?yōu)?code>Sub Reactor以及注冊(cè)在其上的客戶(hù)端NioSocketChannel
了。
下篇文章包竹,我們將會(huì)討論Netty是如何接收網(wǎng)絡(luò)數(shù)據(jù)的~~~~ 我們下篇文章見(jiàn)~~