抓到Netty一個(gè)Bug昧绣,順帶來(lái)透徹地聊一下Netty是如何高效接收網(wǎng)絡(luò)連接的

本系列Netty源碼解析文章基于 4.1.56.Final版本

對(duì)于一個(gè)高性能網(wǎng)絡(luò)通訊框架來(lái)說(shuō)馋吗,最最重要也是最核心的工作就是如何高效的接收客戶(hù)端連接镊折,這就好比我們開(kāi)了一個(gè)飯店黔衡,那么迎接客人就是飯店最重要的工作,我們要先把客人迎接進(jìn)來(lái)腌乡,不能讓客人一看人多就走掉,只要客人進(jìn)來(lái)了夜牡,哪怕菜做的慢一點(diǎn)也沒(méi)關(guān)系与纽。

本文筆者就來(lái)為大家介紹下netty這塊最核心的內(nèi)容,看看netty是如何高效的接收客戶(hù)端連接的塘装。

下圖為筆者在一個(gè)月黑風(fēng)高天空顯得那么深邃遙遠(yuǎn)的夜晚急迂,閑來(lái)無(wú)事,于是捧起Netty關(guān)于如何接收連接這部分源碼細(xì)細(xì)品讀的時(shí)候蹦肴,意外的發(fā)現(xiàn)了一個(gè)影響Netty接收連接吞吐的一個(gè)Bug僚碎。

issue討論.png

于是筆者就在Github提了一個(gè)Issue#11708,闡述了下這個(gè)Bug產(chǎn)生的原因以及導(dǎo)致的結(jié)果并和Netty的作者一起討論了下修復(fù)措施阴幌。如上圖所示勺阐。

Issue#11708:https://github.com/netty/netty/issues/11708

這里先不詳細(xì)解釋這個(gè)Issue,也不建議大家現(xiàn)在就打開(kāi)這個(gè)Issue查看矛双,筆者會(huì)在本文的介紹中隨著源碼深入的解讀慢慢的為大家一層一層地?fù)荛_(kāi)迷霧渊抽。

之所以在文章的開(kāi)頭把這個(gè)拎出來(lái),筆者是想讓大家?guī)е鴳岩梢楹觯瑢徱暲撩疲蕾p,崇敬栈幸,敬畏的態(tài)度來(lái)一起品讀世界頂級(jí)程序員編寫(xiě)的代碼愤估。由衷的感謝他們?cè)谶@一領(lǐng)域做出的貢獻(xiàn)。

好了速址,問(wèn)題拋出來(lái)后玩焰,我們就帶著這個(gè)疑問(wèn)來(lái)開(kāi)始本文的內(nèi)容吧~~~

文章概要.png

前文回顧

按照老規(guī)矩,再開(kāi)始本文的內(nèi)容之前壳繁,我們先來(lái)回顧下前邊幾篇文章的概要內(nèi)容幫助大家梳理一個(gè)框架全貌出來(lái)震捣。

筆者這里再次想和讀者朋友們強(qiáng)調(diào)的是本文可以獨(dú)立觀看,并不依賴(lài)前邊系列文章的內(nèi)容闹炉,只是大家如果對(duì)相關(guān)細(xì)節(jié)部分感興趣的話蒿赢,可以在閱讀完本文之后在去回看相關(guān)文章。

在前邊的系列文章中渣触,筆者為大家介紹了驅(qū)動(dòng)Netty整個(gè)框架運(yùn)轉(zhuǎn)的核心引擎Reactor的創(chuàng)建羡棵,啟動(dòng),運(yùn)行的全流程嗅钻。從現(xiàn)在開(kāi)始Netty的整個(gè)核心框架就開(kāi)始運(yùn)轉(zhuǎn)起來(lái)開(kāi)始工作了皂冰,本文要介紹的主要內(nèi)容就是Netty在啟動(dòng)之后要做的第一件事件:監(jiān)聽(tīng)端口地址店展,高效接收客戶(hù)端連接。

《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》一文中秃流,我們是從整個(gè)網(wǎng)絡(luò)框架的基石IO模型的角度整體闡述了下Netty的IO線程模型赂蕴。

而Netty中的Reactor正是IO線程在Netty中的模型定義。Reactor在Netty中是以Group的形式出現(xiàn)的舶胀,分為:

  • 主Reactor線程組也就是我們?cè)趩?dòng)代碼中配置的EventLoopGroup bossGroup,main reactor group中的reactor主要負(fù)責(zé)監(jiān)聽(tīng)客戶(hù)端連接事件概说,高效的處理客戶(hù)端連接。也是本文我們要介紹的重點(diǎn)嚣伐。

  • 從Reactor線程組也就是我們?cè)趩?dòng)代碼中配置的EventLoopGroup workerGroup糖赔,sub reactor group中的reactor主要負(fù)責(zé)處理客戶(hù)端連接上的IO事件,以及異步任務(wù)的執(zhí)行轩端。

最后我們得出Netty的整個(gè)IO模型如下:

netty中的reactor.png

本文我們討論的重點(diǎn)就是MainReactorGroup的核心工作上圖中所示的步驟1放典,步驟2,步驟3基茵。

在從整體上介紹完Netty的IO模型之后奋构,我們又在《Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》中完整的介紹了Netty框架的骨架主從Reactor組的搭建過(guò)程,闡述了Reactor是如何被創(chuàng)建出來(lái)的拱层,并介紹了它的核心組件如下圖所示:

image.png
  • thread即為Reactor中的IO線程声怔,主要負(fù)責(zé)監(jiān)聽(tīng)I(yíng)O事件,處理IO任務(wù)舱呻,執(zhí)行異步任務(wù)醋火。

  • selector則是JDK NIO對(duì)操作系統(tǒng)底層IO多路復(fù)用技術(shù)實(shí)現(xiàn)的封裝。用于監(jiān)聽(tīng)I(yíng)O就緒事件箱吕。

  • taskQueue用于保存Reactor需要執(zhí)行的異步任務(wù)芥驳,這些異步任務(wù)可以由用戶(hù)在業(yè)務(wù)線程中向Reactor提交,也可以是Netty框架提交的一些自身核心的任務(wù)茬高。

  • scheduledTaskQueue則是保存Reactor中執(zhí)行的定時(shí)任務(wù)兆旬。代替了原有的時(shí)間輪來(lái)執(zhí)行延時(shí)任務(wù)。

  • tailQueue保存了在Reactor需要執(zhí)行的一些尾部收尾任務(wù)怎栽,在普通任務(wù)執(zhí)行完后 Reactor線程會(huì)執(zhí)行尾部任務(wù)丽猬,比如對(duì)Netty 的運(yùn)行狀態(tài)做一些統(tǒng)計(jì)數(shù)據(jù),例如任務(wù)循環(huán)的耗時(shí)熏瞄、占用物理內(nèi)存的大小等等

在骨架搭建完畢之后脚祟,我們隨后又在在《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》》一文中介紹了本文的主角服務(wù)端NioServerSocketChannel的創(chuàng)建,初始化强饮,綁定端口地址由桌,向main reactor注冊(cè)監(jiān)聽(tīng)OP_ACCEPT事件的完整過(guò)程

Reactor啟動(dòng)后的結(jié)構(gòu).png

main reactor如何處理OP_ACCEPT事件將會(huì)是本文的主要內(nèi)容。

自此Netty框架的main reactor group已經(jīng)啟動(dòng)完畢行您,開(kāi)始準(zhǔn)備監(jiān)聽(tīng)OP_accept事件铭乾,當(dāng)客戶(hù)端連接上來(lái)之后,OP_ACCEPT事件活躍娃循,main reactor開(kāi)始處理OP_ACCEPT事件接收客戶(hù)端連接了炕檩。

而netty中的IO事件分為:OP_ACCEPT事件,OP_READ事件捌斧,OP_WRITE事件和OP_CONNECT事件捧书,netty對(duì)于IO事件的監(jiān)聽(tīng)和處理統(tǒng)一封裝在Reactor模型中,這四個(gè)IO事件的處理過(guò)程也是我們后續(xù)文章中要單獨(dú)拿出來(lái)介紹的骤星,本文我們聚焦OP_ACCEPT事件的處理。

而為了讓大家能夠?qū)O事件的處理有一個(gè)完整性的認(rèn)識(shí)爆哑,筆者寫(xiě)了《一文聊透Netty核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu)》這篇文章洞难,在文章中詳細(xì)介紹了Reactor線程的整體運(yùn)行框架。

Reactor線程運(yùn)行時(shí)結(jié)構(gòu).png

Reactor線程會(huì)在一個(gè)死循環(huán)中996不停的運(yùn)轉(zhuǎn)揭朝,在循環(huán)中會(huì)不斷的輪詢(xún)監(jiān)聽(tīng)Selector上的IO事件队贱,當(dāng)IO事件活躍后,Reactor從Selector上被喚醒轉(zhuǎn)去執(zhí)行IO就緒事件的處理潭袱,在這個(gè)過(guò)程中我們引出了上述四種IO事件的處理入口函數(shù)柱嫌。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //獲取Channel的底層操作類(lèi)Unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            ......如果SelectionKey已經(jīng)失效則關(guān)閉對(duì)應(yīng)的Channel......
        }

        try {
            //獲取IO就緒事件
            int readyOps = k.readyOps();
            //處理Connect事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                //移除對(duì)Connect事件的監(jiān)聽(tīng),否則Selector會(huì)一直通知
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //觸發(fā)channelActive事件處理Connect事件
                unsafe.finishConnect();
            }

            //處理Write事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

             //處理Read事件或者Accept事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

本文筆者將會(huì)為大家重點(diǎn)介紹OP_ACCEPT事件的處理入口函數(shù)unsafe.read()的整個(gè)源碼實(shí)現(xiàn)屯换。

當(dāng)客戶(hù)端連接完成三次握手之后编丘,main reactor中的selector產(chǎn)生OP_ACCEPT事件活躍,main reactor隨即被喚醒彤悔,來(lái)到了OP_ACCEPT事件的處理入口函數(shù)開(kāi)始接收客戶(hù)端連接嘉抓。

1. Main Reactor處理OP_ACCEPT事件

OP_ACCEPT事件活躍.png

當(dāng)Main Reactor輪詢(xún)到NioServerSocketChannel上的OP_ACCEPT事件就緒時(shí),Main Reactor線程就會(huì)從JDK Selector上的阻塞輪詢(xún)APIselector.select(timeoutMillis)調(diào)用中返回晕窑。轉(zhuǎn)而去處理NioServerSocketChannel上的OP_ACCEPT事件抑片。

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ..............省略.................

        try {
            int readyOps = k.readyOps();

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
               ..............處理OP_CONNECT事件.................
            }


            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              ..............處理OP_WRITE事件.................
            }


            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //本文重點(diǎn)處理OP_ACCEPT事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

}
  • 處理IO就緒事件的入口函數(shù)processSelectedKey中的參數(shù)AbstractNioChannel ch正是Netty服務(wù)端NioServerSocketChannel。因?yàn)榇藭r(shí)的執(zhí)行線程為main reactor線程杨赤,而main reactor上注冊(cè)的正是netty服務(wù)端NioServerSocketChannel負(fù)責(zé)監(jiān)聽(tīng)端口地址敞斋,接收客戶(hù)端連接。

  • 通過(guò)ch.unsafe()獲取到的NioUnsafe操作類(lèi)正是NioServerSocketChannel中對(duì)底層JDK NIO ServerSocketChannel的Unsafe底層操作類(lèi)疾牲。

Unsafe接口是Netty對(duì)Channel底層操作行為的封裝植捎,比如NioServerSocketChannel的底層Unsafe操作類(lèi)干的事情就是綁定端口地址處理OP_ACCEPT事件阳柔。

這里我們看到鸥跟,Netty將OP_ACCEPT事件處理的入口函數(shù)封裝在NioServerSocketChannel里的底層操作類(lèi)Unsafe的read方法中。

image.png

而NioServerSocketChannel中的Unsafe操作類(lèi)實(shí)現(xiàn)類(lèi)型為NioMessageUnsafe定義在上圖繼承結(jié)構(gòu)中的AbstractNioMessageChannel父類(lèi)中

下面我們到NioMessageUnsafe#read方法中來(lái)看下Netty對(duì)OP_ACCPET事件的具體處理過(guò)程:

2. 接收客戶(hù)端連接核心流程框架總覽

我們還是按照老規(guī)矩医咨,先從整體上把整個(gè)OP_ACCEPT事件的邏輯處理框架提取出來(lái)步清,讓大家先總體俯視下流程全貌,然后在針對(duì)每個(gè)核心點(diǎn)位進(jìn)行各個(gè)擊破叼屠。

接收客戶(hù)端連接.png

main reactor線程是在一個(gè)do...while{...}循環(huán)read loop中不斷的調(diào)用JDK NIO serverSocketChannel.accept()方法來(lái)接收完成三次握手的客戶(hù)端連接NioSocketChannel的弹澎,并將接收到的客戶(hù)端連接NioSocketChannel臨時(shí)保存在List<Object> readBuf集合中,后續(xù)會(huì)服務(wù)端NioServerSocketChannel的pipeline中通過(guò)ChannelRead事件來(lái)傳遞很泊,最終會(huì)在ServerBootstrapAcceptor這個(gè)ChannelHandler中被處理初始化角虫,并將其注冊(cè)到Sub Reator Group中。

這里的read loop循環(huán)會(huì)被限定只能讀取16次委造,當(dāng)main reactor從NioServerSocketChannel中讀取客戶(hù)端連接NioSocketChannel的次數(shù)達(dá)到16次之后戳鹅,無(wú)論此時(shí)是否還有客戶(hù)端連接都不能在繼續(xù)讀取了。

因?yàn)槲覀冊(cè)?a target="_blank">《一文聊透Netty核心引擎Reactor的運(yùn)轉(zhuǎn)架構(gòu)》一文中提到昏兆,netty對(duì)reactor線程壓榨的比較狠枫虏,要干的事情很多,除了要監(jiān)聽(tīng)輪詢(xún)IO就緒事件爬虱,處理IO就緒事件隶债,還需要執(zhí)行用戶(hù)和netty框架本省提交的異步任務(wù)和定時(shí)任務(wù)。

所以這里的main reactor線程不能在read loop中無(wú)限制的執(zhí)行下去跑筝,因?yàn)檫€需要分配時(shí)間去執(zhí)行異步任務(wù)死讹,不能因?yàn)闊o(wú)限制的接收客戶(hù)端連接而耽誤了異步任務(wù)的執(zhí)行。所以這里將read loop的循環(huán)次數(shù)限定為16次曲梗。

如果main reactor線程在read loop中讀取客戶(hù)端連接NioSocketChannel的次數(shù)已經(jīng)滿(mǎn)了16次赞警,即使此時(shí)還有客戶(hù)端連接未接收,那么main reactor線程也不會(huì)再去接收了虏两,而是轉(zhuǎn)去執(zhí)行異步任務(wù)仅颇,當(dāng)異步任務(wù)執(zhí)行完畢后,還會(huì)在回來(lái)執(zhí)行剩余接收連接的任務(wù)碘举。

Reactor線程運(yùn)行時(shí)結(jié)構(gòu).png

main reactor線程退出read loop循環(huán)的條件有兩個(gè):

  1. 在限定的16次讀取中忘瓦,已經(jīng)沒(méi)有新的客戶(hù)端連接要接收了。退出循環(huán)引颈。

  2. 從NioServerSocketChannel中讀取客戶(hù)端連接的次數(shù)達(dá)到了16次耕皮,無(wú)論此時(shí)是否還有客戶(hù)端連接都需要退出循環(huán)。

以上就是Netty在接收客戶(hù)端連接時(shí)的整體核心邏輯蝙场,下面筆者將這部分邏輯的核心源碼實(shí)現(xiàn)框架提取出來(lái)凌停,方便大家根據(jù)上述核心邏輯與源碼中的處理模塊對(duì)應(yīng)起來(lái),還是那句話售滤,這里只需要總體把握核心處理流程罚拟,不需要讀懂每一行代碼台诗,筆者會(huì)在文章的后邊分模塊來(lái)各個(gè)擊破它們。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //存放連接建立后赐俗,創(chuàng)建的客戶(hù)端SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            //必須在Main Reactor線程中執(zhí)行
            assert eventLoop().inEventLoop();
            //注意下面的config和pipeline都是服務(wù)端ServerSocketChannel中的
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            //創(chuàng)建接收數(shù)據(jù)Buffer分配器(用于分配容量大小合適的byteBuffer用來(lái)容納接收數(shù)據(jù))
            //在接收連接的場(chǎng)景中拉队,這里的allocHandle只是用于控制read loop的循環(huán)讀取創(chuàng)建連接的次數(shù)。
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
                        int localRead = doReadMessages(readBuf);

                        //已無(wú)新的連接可接收則退出read loop
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //統(tǒng)計(jì)在當(dāng)前事件循環(huán)中已經(jīng)讀取到得Message數(shù)量(創(chuàng)建連接的個(gè)數(shù))
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());//判斷是否已經(jīng)讀滿(mǎn)16次
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //在NioServerSocketChannel對(duì)應(yīng)的pipeline中傳播ChannelRead事件
                    //初始化客戶(hù)端SocketChannel阻逮,并將其綁定到Sub Reactor線程組中的一個(gè)Reactor上
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //清除本次accept 創(chuàng)建的客戶(hù)端SocketChannel集合
                readBuf.clear();
                allocHandle.readComplete();
                //觸發(fā)readComplete事件傳播
                pipeline.fireChannelReadComplete();
                ....................省略............
            } finally {
                ....................省略............
            }
        }
    }
  }
}

這里首先要通過(guò)斷言assert eventLoop().inEventLoop()確保處理接收客戶(hù)端連接的線程必須為Main Reactor 線程粱快。

而main reactor中主要注冊(cè)的是服務(wù)端NioServerSocketChannel,主要負(fù)責(zé)處理OP_ACCEPT事件叔扼,所以當(dāng)前main reactor線程是在NioServerSocketChannel中執(zhí)行接收連接的工作事哭。

所以這里我們通過(guò)config()獲取到的是NioServerSocketChannel的屬性配置類(lèi)NioServerSocketChannelConfig,它是在Reactor的啟動(dòng)階段被創(chuàng)建出來(lái)的。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        //父類(lèi)AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監(jiān)聽(tīng)的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中設(shè)置用于Channel接收數(shù)據(jù)用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

同理這里通過(guò)pipeline()獲取到的也是NioServerSocketChannel中的pipeline瓜富。它會(huì)在NioServerSocketChannel向main reactor注冊(cè)成功之后被初始化鳍咱。

ServerChannelPipeline完整結(jié)構(gòu).png

前邊提到main reactor線程會(huì)被限定只能在read loop中向NioServerSocketChannel讀取16次客戶(hù)端連接,所以在開(kāi)始read loop之前与柑,我們需要?jiǎng)?chuàng)建一個(gè)能夠保存記錄讀取次數(shù)的對(duì)象谤辜,在每次read loop循環(huán)之后,可以根據(jù)這個(gè)對(duì)象來(lái)判斷是否結(jié)束read loop仅胞。

這個(gè)對(duì)象就是這里的 RecvByteBufAllocator.Handle allocHandle專(zhuān)門(mén)用于統(tǒng)計(jì)read loop中接收客戶(hù)端連接的次數(shù),以及判斷是否該結(jié)束read loop轉(zhuǎn)去執(zhí)行異步任務(wù)剑辫。

當(dāng)這一切準(zhǔn)備就緒之后干旧,main reactor線程就開(kāi)始在do{....}while(...)循環(huán)中接收客戶(hù)端連接了。

在 read loop中通過(guò)調(diào)用doReadMessages函數(shù)接收完成三次握手的客戶(hù)端連接妹蔽,底層會(huì)調(diào)用到JDK NIO ServerSocketChannel的accept方法椎眯,從內(nèi)核全連接隊(duì)列中取出客戶(hù)端連接。

返回值localRead表示接收到了多少客戶(hù)端連接胳岂,客戶(hù)端連接通過(guò)accept方法只會(huì)一個(gè)一個(gè)的接收编整,所以這里的localRead正常情況下都會(huì)返回1,當(dāng)localRead <= 0時(shí)意味著已經(jīng)沒(méi)有新的客戶(hù)端連接可以接收了乳丰,本次main reactor接收客戶(hù)端的任務(wù)到這里就結(jié)束了掌测,跳出read loop。開(kāi)始新的一輪IO事件的監(jiān)聽(tīng)處理产园。

    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

隨后會(huì)將接收到的客戶(hù)端連接占時(shí)存放到List<Object> readBuf集合中汞斧。

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //存放連接建立后,創(chuàng)建的客戶(hù)端SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();
}

調(diào)用allocHandle.incMessagesRead統(tǒng)計(jì)本次事件循環(huán)中接收到的客戶(hù)端連接個(gè)數(shù)什燕,最后在read loop末尾通過(guò)allocHandle.continueReading判斷是否達(dá)到了限定的16次粘勒。從而決定main reactor線程是繼續(xù)接收客戶(hù)端連接還是轉(zhuǎn)去執(zhí)行異步任務(wù)。

main reactor線程退出read loop的兩個(gè)條件:

  1. 在限定的16次讀取中屎即,已經(jīng)沒(méi)有新的客戶(hù)端連接要接收了庙睡。退出循環(huán)事富。

  2. 從NioServerSocketChannel中讀取客戶(hù)端連接的次數(shù)達(dá)到了16次,無(wú)論此時(shí)是否還有客戶(hù)端連接都需要退出循環(huán)乘陪。

當(dāng)滿(mǎn)足以上兩個(gè)退出條件時(shí)统台,main reactor線程就會(huì)退出read loop,由于在read loop中接收到的客戶(hù)端連接全部暫存在List<Object> readBuf集合中,隨后開(kāi)始遍歷readBuf暂刘,在NioServerSocketChannel的pipeline中傳播ChannelRead事件饺谬。

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //NioServerSocketChannel對(duì)應(yīng)的pipeline中傳播read事件
                    //io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
                    //初始化客戶(hù)端SocketChannel,并將其綁定到Sub Reactor線程組中的一個(gè)Reactor上
                    pipeline.fireChannelRead(readBuf.get(i));
                }

最終pipeline中的ChannelHandler(ServerBootstrapAcceptor)會(huì)響應(yīng)ChannelRead事件谣拣,并在相應(yīng)回調(diào)函數(shù)中初始化客戶(hù)端NioSocketChannel募寨,并將其注冊(cè)到Sub Reactor Group中。此后客戶(hù)端NioSocketChannel綁定到的sub reactor就開(kāi)始監(jiān)聽(tīng)處理客戶(hù)端連接上的讀寫(xiě)事件了森缠。

Netty整個(gè)接收客戶(hù)端的邏輯過(guò)程如下圖步驟1拔鹰,2,3所示贵涵。

netty中的reactor.png

以上內(nèi)容就是筆者提取出來(lái)的整體流程框架列肢,下面我們來(lái)將其中涉及到的重要核心模塊拆開(kāi),一個(gè)一個(gè)詳細(xì)解讀下宾茂。

3. RecvByteBufAllocator簡(jiǎn)介

Reactor在處理對(duì)應(yīng)Channel上的IO數(shù)據(jù)時(shí)瓷马,都會(huì)采用一個(gè)ByteBuffer來(lái)接收Channel上的IO數(shù)據(jù)。而本小節(jié)要介紹的RecvByteBufAllocator正是用來(lái)分配ByteBuffer的一個(gè)分配器跨晴。

還記得這個(gè)RecvByteBufAllocator在哪里被創(chuàng)建的嗎欧聘??

《聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)》一文中端盆,在介紹NioServerSocketChannel的創(chuàng)建過(guò)程中提到怀骤,對(duì)應(yīng)Channel的配置類(lèi)NioServerSocketChannelConfig也會(huì)隨著NioServerSocketChannel的創(chuàng)建而創(chuàng)建。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

在創(chuàng)建NioServerSocketChannelConfig的過(guò)程中會(huì)創(chuàng)建RecvByteBufAllocator焕妙。

   public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

這里我們看到NioServerSocketChannel中的RecvByteBufAllocator實(shí)際類(lèi)型為AdaptiveRecvByteBufAllocator蒋伦,顧名思義,這個(gè)類(lèi)型的RecvByteBufAllocator可以根據(jù)Channel上每次到來(lái)的IO數(shù)據(jù)大小來(lái)自適應(yīng)動(dòng)態(tài)調(diào)整ByteBuffer的容量焚鹊。

對(duì)于服務(wù)端NioServerSocketChannel來(lái)說(shuō)痕届,它上邊的IO數(shù)據(jù)就是客戶(hù)端的連接,它的長(zhǎng)度和類(lèi)型都是固定的末患,所以在接收客戶(hù)端連接的時(shí)候并不需要這樣的一個(gè)ByteBuffer來(lái)接收爷抓,我們會(huì)將接收到的客戶(hù)端連接存放在List<Object> readBuf集合中

對(duì)于客戶(hù)端NioSocketChannel來(lái)說(shuō),它上邊的IO數(shù)據(jù)時(shí)客戶(hù)端發(fā)送來(lái)的網(wǎng)絡(luò)數(shù)據(jù)阻塑,長(zhǎng)度是不定的蓝撇,所以才會(huì)需要這樣一個(gè)可以根據(jù)每次IO數(shù)據(jù)的大小來(lái)自適應(yīng)動(dòng)態(tài)調(diào)整容量的ByteBuffer來(lái)接收。

那么看起來(lái)這個(gè)RecvByteBufAllocator和本文的主題不是很關(guān)聯(lián)陈莽,因?yàn)樵诮邮者B接的過(guò)程中并不會(huì)怎么用到它渤昌,這個(gè)類(lèi)筆者還會(huì)在后面的文章中詳細(xì)介紹虽抄,之所以這里把它拎出來(lái)單獨(dú)介紹是因?yàn)樗捅疚拈_(kāi)頭提到的Bug有關(guān)系,這個(gè)Bug就是由這個(gè)類(lèi)引起的独柑。

3.1 RecvByteBufAllocator.Handle的獲取

在本文中迈窟,我們是通過(guò)NioServerSocketChannel中的unsafe底層操作類(lèi)來(lái)獲取RecvByteBufAllocator.Handle的

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            if (recvHandle == null) {
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }
}

我們看到最終會(huì)在NioServerSocketChannel的配置類(lèi)NioServerSocketChannelConfig中獲取到AdaptiveRecvByteBufAllocator

public class DefaultChannelConfig implements ChannelConfig {
    //用于Channel接收數(shù)據(jù)用的buffer分配器  類(lèi)型為AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;
}

AdaptiveRecvByteBufAllocator中會(huì)創(chuàng)建自適應(yīng)動(dòng)態(tài)調(diào)整容量的ByteBuffer分配器。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {
                  .................省略................
    }
}

這里的newHandle方法返回的具體類(lèi)型為MaxMessageHandle忌栅,這個(gè)MaxMessageHandle里邊保存了每次從Channel中讀取IO數(shù)據(jù)的容量指標(biāo)车酣,方便下次讀取時(shí)分配合適大小的buffer

每次在使用allocHandle前需要調(diào)用allocHandle.reset(config);重置里邊的統(tǒng)計(jì)指標(biāo)索绪。

    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        //每次事件輪詢(xún)時(shí)湖员,最多讀取16次
        private int maxMessagePerRead;
        //本次事件輪詢(xún)總共讀取的message數(shù),這里指的是接收連接的數(shù)量
        private int totalMessages;
        //本次事件輪詢(xún)總共讀取的字節(jié)數(shù)
        private int totalBytesRead;

       @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            //默認(rèn)每次最多讀取16次
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }
    }
  • maxMessagePerRead:用于控制每次read loop里最大可以循環(huán)讀取的次數(shù),默認(rèn)為16次瑞驱,可在啟動(dòng)配置類(lèi)ServerBootstrap中通過(guò)ChannelOption.MAX_MESSAGES_PER_READ選項(xiàng)設(shè)置娘摔。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定義次數(shù))
  • totalMessages:用于統(tǒng)計(jì)read loop中總共接收的連接個(gè)數(shù),每次read loop循環(huán)后會(huì)調(diào)用allocHandle.incMessagesRead增加記錄接收到的連接個(gè)數(shù)唤反。
        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }
  • totalBytesRead:用于統(tǒng)計(jì)在read loop中總共接收到客戶(hù)端連接上的數(shù)據(jù)大小凳寺,這個(gè)字段主要用于sub reactor在接收客戶(hù)端NioSocketChannel上的網(wǎng)絡(luò)數(shù)據(jù)用的,本文我們介紹的是main reactor接收客戶(hù)端連接彤侍,所以這里并不會(huì)用到這個(gè)字段肠缨。這個(gè)字段會(huì)在sub reactor每次讀取完NioSocketChannel上的網(wǎng)絡(luò)數(shù)據(jù)時(shí)增加記錄。
        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

MaxMessageHandler中還有一個(gè)非常重要的方法就是在每次read loop末尾會(huì)調(diào)用allocHandle.continueReading()方法來(lái)判斷讀取連接次數(shù)是否已滿(mǎn)16次盏阶,來(lái)決定main reactor線程是否退出循環(huán)晒奕。

                  do {
                        //底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //統(tǒng)計(jì)在當(dāng)前事件循環(huán)中已經(jīng)讀取到得Message數(shù)量(創(chuàng)建連接的個(gè)數(shù))
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
image.png

紅框中圈出來(lái)的兩個(gè)判斷條件和本文主題無(wú)關(guān),我們這里不需要關(guān)注般哼,筆者會(huì)在后面的文章詳細(xì)介紹吴汪。

  • totalMessages < maxMessagePerRead:在本文的接收客戶(hù)端連接場(chǎng)景中惠窄,這個(gè)條件用于判斷main reactor線程在read loop中的讀取次數(shù)是否超過(guò)了16次蒸眠。如果超過(guò)16次就會(huì)返回false,main reactor線程退出循環(huán)杆融。

  • totalBytesRead > 0:用于判斷當(dāng)客戶(hù)端NioSocketChannel上的OP_READ事件活躍時(shí)楞卡,sub reactor線程在read loop中是否讀取到了網(wǎng)絡(luò)數(shù)據(jù)。

以上內(nèi)容就是RecvByteBufAllocator.Handle在接收客戶(hù)端連接場(chǎng)景下的作用脾歇,大家這里仔細(xì)看下這個(gè)allocHandle.continueReading()方法退出循環(huán)的判斷條件蒋腮,再結(jié)合整個(gè)do{....}while(...)接收連接循環(huán)體,感受下是否哪里有些不對(duì)勁藕各?Bug即將出現(xiàn)~~~

image.png

4. 啊哈3卮荨!Bug ! !

image.png

netty不論是在本文中處理接收客戶(hù)端連接的場(chǎng)景還是在處理接收客戶(hù)端連接上的網(wǎng)絡(luò)數(shù)據(jù)場(chǎng)景都會(huì)在一個(gè)do{....}while(...)循環(huán)read loop中不斷的處理激况。

同時(shí)也都會(huì)利用在上一小節(jié)中介紹的RecvByteBufAllocator.Handle來(lái)記錄每次read loop接收到的連接個(gè)數(shù)和從連接上讀取到的網(wǎng)絡(luò)數(shù)據(jù)大小作彤。

從而在read loop的末尾都會(huì)通過(guò)allocHandle.continueReading()方法判斷是否應(yīng)該退出read loop循環(huán)結(jié)束連接的接收流程或者是結(jié)束連接上數(shù)據(jù)的讀取流程膘魄。

無(wú)論是用于接收客戶(hù)端連接的main reactor也好還是用于接收客戶(hù)端連接上的網(wǎng)絡(luò)數(shù)據(jù)的sub reactor也好,它們的運(yùn)行框架都是一樣的竭讳,只不過(guò)是具體分工不同创葡。

所以netty這里想用統(tǒng)一的RecvByteBufAllocator.Handle來(lái)處理以上兩種場(chǎng)景。

RecvByteBufAllocator.Handle中的totalBytesRead字段主要記錄sub reactor線程在處理客戶(hù)端NioSocketChannel中OP_READ事件活躍時(shí)绢慢,總共在read loop中讀取到的網(wǎng)絡(luò)數(shù)據(jù)灿渴,而這里是main reactor線程在接收客戶(hù)端連接所以這個(gè)字段并不會(huì)被設(shè)置。totalBytesRead字段的值在本文中永遠(yuǎn)會(huì)是0胰舆。

所以無(wú)論同時(shí)有多少個(gè)客戶(hù)端并發(fā)連接到服務(wù)端上骚露,在接收連接的這個(gè)read loop中永遠(yuǎn)只會(huì)接受一個(gè)連接就會(huì)退出循環(huán),因?yàn)?code>allocHandle.continueReading()方法中的判斷條件totalBytesRead > 0永遠(yuǎn)會(huì)返回false思瘟。

                  do {
                        //底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //統(tǒng)計(jì)在當(dāng)前事件循環(huán)中已經(jīng)讀取到得Message數(shù)量(創(chuàng)建連接的個(gè)數(shù))
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

而netty的本意是在這個(gè)read loop循環(huán)中盡可能多的去接收客戶(hù)端的并發(fā)連接荸百,同時(shí)又不影響main reactor線程執(zhí)行異步任務(wù)。但是由于這個(gè)Bug滨攻,main reactor在這個(gè)循環(huán)中只執(zhí)行一次就結(jié)束了够话。這也一定程度上就影響了netty的吞吐

讓我們想象下這樣的一個(gè)場(chǎng)景光绕,當(dāng)有16個(gè)客戶(hù)端同時(shí)并發(fā)連接到了服務(wù)端女嘲,這時(shí)NioServerSocketChannel上的OP_ACCEPT事件活躍,main reactor從Selector上被喚醒诞帐,隨后執(zhí)行OP_ACCEPT事件的處理欣尼。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try { 
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:                  
                          ............省略.........
                    case SelectStrategy.BUSY_WAIT:

                          ............省略.........
                    case SelectStrategy.SELECT:
                            ............監(jiān)聽(tīng)輪詢(xún)IO事件.........
                    default:
                    }
                } catch (IOException e) {
                    ............省略.........
                }

                ............處理IO就緒事件.........
                ............執(zhí)行異步任務(wù).........
    }
}

但是由于這個(gè)Bug的存在,main reactor在接收客戶(hù)端連接的這個(gè)read loop中只接收了一個(gè)客戶(hù)端連接就匆匆返回了停蕉。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {
                    do {
                        int localRead = doReadMessages(readBuf);
                        .........省略...........
                    } while (allocHandle.continueReading());
     }

然后根據(jù)下圖中這個(gè)Reactor的運(yùn)行結(jié)構(gòu)去執(zhí)行異步任務(wù)愕鼓,隨后繞一大圈又會(huì)回到NioEventLoop#run方法中重新發(fā)起一輪OP_ACCEPT事件輪詢(xún)。

Reactor線程運(yùn)行時(shí)結(jié)構(gòu).png

由于現(xiàn)在還有15個(gè)客戶(hù)端并發(fā)連接沒(méi)有被接收慧起,所以此時(shí)Main Reactor線程并不會(huì)在selector.select()上阻塞菇晃,最終繞一圈又會(huì)回到NioMessageUnsafe#read方法的do{.....}while()循環(huán)。在接收一個(gè)連接之后又退出循環(huán)蚓挤。

本來(lái)我們可以在一次read loop中把這16個(gè)并發(fā)的客戶(hù)端連接全部接收完畢的磺送,因?yàn)檫@個(gè)Bug,main reactor需要不斷的發(fā)起OP_ACCEPT事件的輪詢(xún)灿意,繞了很大一個(gè)圈子估灿。同時(shí)也增加了許多不必要的selector.select()系統(tǒng)調(diào)用開(kāi)銷(xiāo)

issue討論.png

這時(shí)大家在看這個(gè)Issue#11708中的討論是不是就清晰很多了~~

Issue#11708:https://github.com/netty/netty/issues/11708

4.1 Bug的修復(fù)

筆者在寫(xiě)這篇文章的時(shí)候,Netty最新版本是4.1.68.final缤剧,這個(gè)Bug在4.1.69.final中被修復(fù)馅袁。

image.png

由于該Bug產(chǎn)生的原因正是因?yàn)榉?wù)端NioServerSocketChannel(用于監(jiān)聽(tīng)端口地址和接收客戶(hù)端連接)和 客戶(hù)端NioSocketChannel(用于通信)中的Config配置類(lèi)混用了同一個(gè)ByteBuffer分配器AdaptiveRecvByteBufAllocator而導(dǎo)致的。

所以在新版本修復(fù)中專(zhuān)門(mén)為服務(wù)端ServerSocketChannel中的Config配置類(lèi)引入了一個(gè)新的ByteBuffer分配器ServerChannelRecvByteBufAllocator荒辕,專(zhuān)門(mén)用于服務(wù)端ServerSocketChannel接收客戶(hù)端連接的場(chǎng)景汗销。

image.png
image.png

ServerChannelRecvByteBufAllocator的父類(lèi)DefaultMaxMessagesRecvByteBufAllocator中引入了一個(gè)新的字段ignoreBytesRead芒粹,用于表示是否忽略網(wǎng)絡(luò)字節(jié)的讀取,在創(chuàng)建服務(wù)端Channel配置類(lèi)NioServerSocketChannelConfig的時(shí)候大溜,這個(gè)字段會(huì)被賦值為true化漆。

image.png

當(dāng)main reactor線程在read loop循環(huán)中接收客戶(hù)端連接的時(shí)候。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {

                    do {
                        int localRead = doReadMessages(readBuf);
                        .........省略...........
                    } while (allocHandle.continueReading());
     }

在read loop循環(huán)的末尾就會(huì)采用從ServerChannelRecvByteBufAllocator中創(chuàng)建的MaxMessageHandle#continueReading方法來(lái)判斷讀取連接次數(shù)是否超過(guò)了16次钦奋。由于這里的ignoreBytesRead == true這回我們就會(huì)忽略totalBytesRead == 0的情況座云,從而使得接收連接的read loop得以繼續(xù)地執(zhí)行下去。在一個(gè)read loop中一次性把16個(gè)連接全部接收完畢付材。

image.png

以上就是對(duì)這個(gè)Bug產(chǎn)生的原因朦拖,以及發(fā)現(xiàn)的過(guò)程,最后修復(fù)的方案一個(gè)全面的介紹厌衔,因此筆者也出現(xiàn)在了netty 4.1.69.final版本發(fā)布公告里的thank-list中璧帝。哈哈,真是令人開(kāi)心的一件事情~~~

image.png

通過(guò)以上對(duì)netty接收客戶(hù)端連接的全流程分析和對(duì)這個(gè)Bug來(lái)龍去脈以及修復(fù)方案的介紹富寿,大家現(xiàn)在一定已經(jīng)理解了整個(gè)接收連接的流程框架睬隶。

接下來(lái)筆者就把這個(gè)流程中涉及到的一些核心模塊在單獨(dú)拎出來(lái)從細(xì)節(jié)入手,為大家各個(gè)擊破~~~

5. doReadMessages接收客戶(hù)端連接

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

}
  • 通過(guò)javaChannel()獲取封裝在Netty服務(wù)端NioServerSocketChannel中的JDK 原生 ServerSocketChannel页徐。
    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }
  • 通過(guò)JDK NIO 原生ServerSocketChannelaccept方法獲取JDK NIO 原生客戶(hù)端連接SocketChannel苏潜。
    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

這一步就是我們?cè)?a target="_blank">《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》介紹到的調(diào)用監(jiān)聽(tīng)Socketaccept方法,內(nèi)核會(huì)基于監(jiān)聽(tīng)Socket創(chuàng)建出來(lái)一個(gè)新的Socket專(zhuān)門(mén)用于與客戶(hù)端之間的網(wǎng)絡(luò)通信這個(gè)我們稱(chēng)之為客戶(hù)端連接Socket变勇。這里的ServerSocketChannel就類(lèi)似于監(jiān)聽(tīng)Socket恤左。SocketChannel就類(lèi)似于客戶(hù)端連接Socket

由于我們?cè)趧?chuàng)建NioServerSocketChannel的時(shí)候搀绣,會(huì)將JDK NIO 原生ServerSocketChannel設(shè)置為非阻塞飞袋,所以這里當(dāng)ServerSocketChannel上有客戶(hù)端連接時(shí)就會(huì)直接創(chuàng)建SocketChannel,如果此時(shí)并沒(méi)有客戶(hù)端連接時(shí)accept調(diào)用就會(huì)立刻返回null并不會(huì)阻塞链患。

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //設(shè)置Channel為非阻塞 配合IO多路復(fù)用模型
            ch.configureBlocking(false);
        } catch (IOException e) {
          ..........省略.............
        }
    }

5.1 創(chuàng)建客戶(hù)端NioSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
          .........省略.......
        }

        return 0;
    }

}

這里會(huì)根據(jù)ServerSocketChannelaccept方法獲取到JDK NIO 原生SocketChannel(用于底層真正與客戶(hù)端通信的Channel)巧鸭,來(lái)創(chuàng)建Netty中的NioSocketChannel

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

}

創(chuàng)建客戶(hù)端NioSocketChannel的過(guò)程其實(shí)和之前講的創(chuàng)建服務(wù)端NioServerSocketChannel大體流程是一樣的锣险,我們這里只對(duì)客戶(hù)端NioSocketChannel和服務(wù)端NioServerSocketChannel在創(chuàng)建過(guò)程中的不同之處做一個(gè)對(duì)比蹄皱。

具體細(xì)節(jié)部分大家可以在回看下《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》一文中關(guān)于NioServerSocketChannel的創(chuàng)建的詳細(xì)細(xì)節(jié)览闰。

5.3 對(duì)比NioSocketChannel與NioServerSocketChannel的不同

1:Channel的層次不同

在我們介紹Reactor的創(chuàng)建文章中芯肤,我們提到Netty中的Channel是具有層次的。由于客戶(hù)端NioSocketChannel是在main reactor接收連接時(shí)在服務(wù)端NioServerSocketChannel中被創(chuàng)建的压鉴,所以在創(chuàng)建客戶(hù)端NioSocketChannel的時(shí)候會(huì)通過(guò)構(gòu)造函數(shù)指定了parent屬性為NioServerSocketChanel崖咨。并將JDK NIO 原生SocketChannel封裝進(jìn)Netty的客戶(hù)端NioSocketChannel中。

而在Reactor啟動(dòng)過(guò)程中創(chuàng)建NioServerSocketChannel的時(shí)候parent屬性指定是null油吭。因?yàn)樗褪琼攲拥?code>Channel击蹲,負(fù)責(zé)創(chuàng)建客戶(hù)端NioSocketChannel署拟。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

2:向Reactor注冊(cè)的IO事件不同

客戶(hù)端NioSocketChannel向Sub Reactor注冊(cè)的是SelectionKey.OP_READ事件,而服務(wù)端NioServerSocketChannel向Main Reactor注冊(cè)的是SelectionKey.OP_ACCEPT事件歌豺。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

}

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

   public NioServerSocketChannel(ServerSocketChannel channel) {
        //父類(lèi)AbstractNioChannel中保存JDK NIO原生ServerSocketChannel以及要監(jiān)聽(tīng)的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中設(shè)置用于Channel接收數(shù)據(jù)用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}

3: 功能屬性不同造成繼承結(jié)構(gòu)的不同

NioSocketChannel.png
NioServerSocketChannel.png

客戶(hù)端NioSocketChannel繼承的是AbstractNioByteChannel推穷,而服務(wù)端NioServerSocketChannel繼承的是AbstractNioMessageChannel
它們繼承的這兩個(gè)抽象類(lèi)一個(gè)前綴是Byte类咧,一個(gè)前綴是Message有什么區(qū)別嗎馒铃??

客戶(hù)端NioSocketChannel主要處理的是服務(wù)端與客戶(hù)端的通信痕惋,這里涉及到接收客戶(hù)端發(fā)送來(lái)的數(shù)據(jù)区宇,而Sub Reactor線程NioSocketChannel中讀取的正是網(wǎng)絡(luò)通信數(shù)據(jù)單位為Byte

服務(wù)端NioServerSocketChannel主要負(fù)責(zé)處理OP_ACCEPT事件值戳,創(chuàng)建用于通信的客戶(hù)端NioSocketChannel议谷。這時(shí)候客戶(hù)端與服務(wù)端還沒(méi)開(kāi)始通信,所以Main Reactor線程NioServerSocketChannel的讀取對(duì)象為Message堕虹。這里的Message指的就是底層的SocketChannel客戶(hù)端連接卧晓。


以上就是NioSocketChannelNioServerSocketChannel創(chuàng)建過(guò)程中的不同之處,后面的過(guò)程就一樣了赴捞。

  • 在AbstractNioChannel 類(lèi)中封裝JDK NIO 原生的SocketChannel禀崖,并將其底層的IO模型設(shè)置為非阻塞,保存需要監(jiān)聽(tīng)的IO事件OP_READ螟炫。
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //設(shè)置Channel為非阻塞 配合IO多路復(fù)用模型
            ch.configureBlocking(false);
        } catch (IOException e) {

        }
    }
  • 為客戶(hù)端NioSocketChannel創(chuàng)建全局唯一的channelId波附,創(chuàng)建客戶(hù)端NioSocketChannel的底層操作類(lèi)NioByteUnsafe,創(chuàng)建pipeline昼钻。
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局唯一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用于底層socket的讀寫(xiě)操作
        unsafe = newUnsafe();
        //為channel分配獨(dú)立的pipeline用于IO事件編排
        pipeline = newChannelPipeline();
    }
  • 在NioSocketChannelConfig的創(chuàng)建過(guò)程中掸屡,將NioSocketChannel的RecvByteBufAllocator類(lèi)型設(shè)置為AdaptiveRecvByteBufAllocator
    public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

在Bug修復(fù)后的版本中服務(wù)端NioServerSocketChannel的RecvByteBufAllocator類(lèi)型設(shè)置為ServerChannelRecvByteBufAllocator

最終我們得到的客戶(hù)端NioSocketChannel結(jié)構(gòu)如下:

NioSocketChannel.png

6. ChannelRead事件的響應(yīng)

接收客戶(hù)端連接.png

在前邊介紹接收連接的整體核心流程框架的時(shí)候然评,我們提到main reactor線程是在一個(gè)do{.....}while(...)循環(huán)read loop中不斷的調(diào)用ServerSocketChannel#accept方法來(lái)接收客戶(hù)端的連接仅财。

當(dāng)滿(mǎn)足退出read loop循環(huán)的條件有兩個(gè):

  1. 在限定的16次讀取中,已經(jīng)沒(méi)有新的客戶(hù)端連接要接收了碗淌。退出循環(huán)盏求。

  2. 從NioServerSocketChannel中讀取客戶(hù)端連接的次數(shù)達(dá)到了16次,無(wú)論此時(shí)是否還有客戶(hù)端連接都需要退出循環(huán)亿眠。

main reactor就會(huì)退出read loop循環(huán)碎罚,此時(shí)接收到的客戶(hù)端連接NioSocketChannel暫存與List<Object> readBuf集合中。


    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            try {
                try {
                    do {
                        ........省略.........
                        //底層調(diào)用NioServerSocketChannel->doReadMessages 創(chuàng)建客戶(hù)端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        ........省略.........
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                
                  ........省略.........
            } finally {
                  ........省略.........
            }
        }
    }

隨后main reactor線程會(huì)遍歷List<Object> readBuf集合中的NioSocketChannel纳像,并在NioServerSocketChannel的pipeline中傳播ChannelRead事件荆烈。

傳播ChannelRead事件.png

最終ChannelRead事件會(huì)傳播到ServerBootstrapAcceptor中,這里正是Netty處理客戶(hù)端連接的核心邏輯所在。

ServerBootstrapAcceptor主要的作用就是初始化客戶(hù)端NioSocketChannel憔购,并將客戶(hù)端NioSocketChannel注冊(cè)到Sub Reactor Group中宫峦,并監(jiān)聽(tīng)OP_READ事件

在ServerBootstrapAcceptor 中會(huì)初始化客戶(hù)端NioSocketChannel的這些屬性玫鸟。

比如:從Reactor組EventLoopGroup childGroup导绷,用于初始化NioSocketChannel中的pipeline用到的ChannelHandler childHandler,以及NioSocketChannel中的一些childOptionschildAttrs屎飘。

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            //向客戶(hù)端NioSocketChannel的pipeline中
            //添加在啟動(dòng)配置類(lèi)ServerBootstrap中配置的ChannelHandler
            child.pipeline().addLast(childHandler);

            //利用配置的屬性初始化客戶(hù)端NioSocketChannel
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                /**
                 * 1:在Sub Reactor線程組中選擇一個(gè)Reactor綁定
                 * 2:將客戶(hù)端SocketChannel注冊(cè)到綁定的Reactor上
                 * 3:SocketChannel注冊(cè)到sub reactor中的selector上诵次,并監(jiān)聽(tīng)OP_READ事件
                 * */
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
}

正是在這里,netty會(huì)將我們?cè)?a target="_blank">《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》的啟動(dòng)示例程序中在ServerBootstrap中配置的客戶(hù)端NioSocketChannel的所有屬性(child前綴配置)初始化到NioSocketChannel中枚碗。

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //創(chuàng)建主從Reactor線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel類(lèi)型
             .option(ChannelOption.SO_BACKLOG, 100)//設(shè)置主Reactor中channel的option選項(xiàng)
             .handler(new LoggingHandler(LogLevel.INFO))//設(shè)置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//設(shè)置從Reactor中注冊(cè)channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 綁定端口啟動(dòng)服務(wù)逾一,開(kāi)始監(jiān)聽(tīng)accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

以上示例代碼中通過(guò)ServerBootstrap配置的NioSocketChannel相關(guān)屬性践惑,會(huì)在Netty啟動(dòng)并開(kāi)始初始化NioServerSocketChannel的時(shí)候?qū)?code>ServerBootstrapAcceptor的創(chuàng)建初始化工作封裝成異步任務(wù)屯援,然后在NioServerSocketChannel注冊(cè)到Main Reactor中成功后執(zhí)行铝噩。

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    @Override
    void init(Channel channel) {
        ................省略................

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ................省略................
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
}

在經(jīng)過(guò)ServerBootstrapAccptor#chanelRead回調(diào)的處理之后畸悬,此時(shí)客戶(hù)端NioSocketChannel中pipeline的結(jié)構(gòu)為:

客戶(hù)端channel pipeline初始結(jié)構(gòu).png

隨后會(huì)將初始化好的客戶(hù)端NioSocketChannel向Sub Reactor Group中注冊(cè)碌冶,并監(jiān)聽(tīng)OP_READ事件羡疗。

如下圖中的步驟3所示:

netty中的reactor.png

7. 向SubReactorGroup中注冊(cè)NioSocketChannel

                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

客戶(hù)端NioSocketChannel向Sub Reactor Group注冊(cè)的流程完全和服務(wù)端NioServerSocketChannel向Main Reactor Group注冊(cè)流程一樣咆疗。

關(guān)于服務(wù)端NioServerSocketChannel的注冊(cè)流程忱屑,筆者已經(jīng)在《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》一文中做出了詳細(xì)的介紹波丰,對(duì)相關(guān)細(xì)節(jié)感興趣的同學(xué)可以在回看下壳坪。

這里筆者在帶大家簡(jiǎn)要回顧下整個(gè)注冊(cè)過(guò)程并著重區(qū)別對(duì)比客戶(hù)端NioSocetChannel與服務(wù)端NioServerSocketChannel注冊(cè)過(guò)程中不同的地方。

7.1 從Sub Reactor Group中選取一個(gè)Sub Reactor進(jìn)行綁定

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

   @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

}

7.2 向綁定的Sub Reactor上注冊(cè)NioSocketChannel

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        //注冊(cè)channel到綁定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //unsafe負(fù)責(zé)channel底層的各種操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

}
  • 當(dāng)時(shí)我們?cè)诮榻BNioServerSocketChannel的注冊(cè)過(guò)程時(shí)掰烟,這里的promise.channel()NioServerSocketChannel爽蝴。底層的unsafe操作類(lèi)為NioMessageUnsafe

  • 此時(shí)這里的promise.channel()NioSocketChannel纫骑。底層的unsafe操作類(lèi)為NioByteUnsafe蝎亚。

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ..............省略....................
            //此時(shí)這里的eventLoop為Sub Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 執(zhí)行channel注冊(cè)的操作必須是Reactor線程來(lái)完成
             *
             * 1: 如果當(dāng)前執(zhí)行線程是Reactor線程,則直接執(zhí)行register0進(jìn)行注冊(cè)
             * 2:如果當(dāng)前執(zhí)行線程是外部線程先馆,則需要將register0注冊(cè)操作 封裝程異步Task 由Reactor線程執(zhí)行
             * */
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ..............省略....................
                }
            }
        }

注意此時(shí)傳遞進(jìn)來(lái)的EventLoop eventLoop為Sub Reactor发框。

但此時(shí)的執(zhí)行線程為Main Reactor線程,并不是Sub Reactor線程(此時(shí)還未啟動(dòng))煤墙。

所以這里的eventLoop.inEventLoop()返回的是false梅惯。

image.png

else分支中向綁定的Sub Reactor提交注冊(cè)NioSocketChannel的任務(wù)。

當(dāng)注冊(cè)任務(wù)提交后仿野,此時(shí)綁定的Sub Reactor線程啟動(dòng)铣减。

7.3 register0

我們又來(lái)到了Channel注冊(cè)的老地方register0方法。在《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》中我們花了大量的篇幅介紹了這個(gè)方法设预。這里我們只對(duì)比NioSocketChannelNioServerSocketChannel不同的地方徙歼。

 private void register0(ChannelPromise promise) {
            try {
                ................省略..................
                boolean firstRegistration = neverRegistered;
                //執(zhí)行真正的注冊(cè)操作
                doRegister();
                //修改注冊(cè)狀態(tài)
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                if (isActive()) {
                    if (firstRegistration) {
                        //觸發(fā)channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                 ................省略..................
            }
        }

這里 doRegister()方法將NioSocketChannel注冊(cè)到Sub Reactor中的Selector上犁河。

public abstract class AbstractNioChannel extends AbstractChannel {

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............省略...............
            }
        }
    }

}

這里是Netty客戶(hù)端NioSocketChannel與JDK NIO 原生 SocketChannel關(guān)聯(lián)的地方鳖枕。此時(shí)注冊(cè)的IO事件依然是0魄梯。目的也是只是為了獲取NioSocketChannel在Selector中的SelectionKey

同時(shí)通過(guò)SelectableChannel#register方法將Netty自定義的NioSocketChannel(這里的this指針)附著在SelectionKey的attechment屬性上宾符,完成Netty自定義Channel與JDK NIO Channel的關(guān)系綁定酿秸。這樣在每次對(duì)Selector進(jìn)行IO就緒事件輪詢(xún)時(shí),Netty 都可以從 JDK NIO Selector返回的SelectionKey中獲取到自定義的Channel對(duì)象(這里指的就是NioSocketChannel)魏烫。

channel與SelectionKey對(duì)應(yīng)關(guān)系.png

隨后調(diào)用pipeline.invokeHandlerAddedIfNeeded()回調(diào)客戶(hù)端NioSocketChannel上pipeline中的所有ChannelHandler的handlerAdded方法辣苏,此時(shí)pipeline的結(jié)構(gòu)中只有一個(gè)ChannelInitializer。最終會(huì)在ChannelInitializer#handlerAdded回調(diào)方法中初始化客戶(hù)端NioSocketChannelpipeline哄褒。

客戶(hù)端channel pipeline初始結(jié)構(gòu).png
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                //初始化工作完成后稀蟋,需要將自身從pipeline中移除
                removeState(ctx);
            }
        }
    }

    protected abstract void initChannel(C ch) throws Exception;
}

關(guān)于對(duì)Channel中pipeline的詳細(xì)初始化過(guò)程,對(duì)細(xì)節(jié)部分感興趣的同學(xué)可以回看下《詳細(xì)圖解Netty Reactor啟動(dòng)全流程》

此時(shí)客戶(hù)端NioSocketChannel中的pipeline中的結(jié)構(gòu)就變?yōu)榱宋覀冏远x的樣子呐赡,在示例代碼中我們自定義的ChannelHandlerEchoServerHandler退客。

客戶(hù)端channel pipeline結(jié)構(gòu).png
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {

        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

當(dāng)客戶(hù)端NioSocketChannel中的pipeline初始化完畢后,netty就開(kāi)始調(diào)用safeSetSuccess(promise)方法回調(diào)regFuture中注冊(cè)的ChannelFutureListener链嘀,通知客戶(hù)端NioSocketChannel已經(jīng)成功注冊(cè)到Sub Reactor上了萌狂。

               childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

在服務(wù)端NioServerSocketChannel注冊(cè)的時(shí)候我們會(huì)在listener中向Main Reactor提交bind綁定端口地址任務(wù)。但是在NioSocketChannel注冊(cè)的時(shí)候怀泊,只會(huì)在listener中處理一下注冊(cè)失敗的情況茫藏。

當(dāng)Sub Reactor線程通知ChannelFutureListener注冊(cè)成功之后,隨后就會(huì)調(diào)用pipeline.fireChannelRegistered()在客戶(hù)端NioSocketChannel的pipeline中傳播ChannelRegistered事件霹琼。

傳播ChannelRegister事件.png

這里筆者重點(diǎn)要強(qiáng)調(diào)下务傲,在之前介紹NioServerSocketChannel注冊(cè)的時(shí)候,我們提到因?yàn)榇藭r(shí)NioServerSocketChannel并未綁定端口地址枣申,所以這時(shí)的NioServerSocketChannel并未激活树灶,這里的isActive()返回falseregister0方法直接返回糯而。

服務(wù)端NioServerSocketChannel判斷是否激活的標(biāo)準(zhǔn)為端口是否綁定成功天通。

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    @Override
    public boolean isActive() {
        return isOpen() && javaChannel().socket().isBound();
    }
}

客戶(hù)端NioSocketChannel判斷是否激活的標(biāo)準(zhǔn)為是否處于Connected狀態(tài)。那么顯然這里肯定是處于connected狀態(tài)的熄驼。

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

NioSocketChannel已經(jīng)處于connected狀態(tài)像寒,這里并不需要綁定端口,所以這里的isActive()返回true瓜贾。

           if (isActive()) {
                    /**
                     * 客戶(hù)端SocketChannel注冊(cè)成功后會(huì)走這里诺祸,在channelActive事件回調(diào)中注冊(cè)O(shè)P_READ事件
                     * */
                    if (firstRegistration) {
                        //觸發(fā)channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        .......省略..........
                    }
                }
            }

最后調(diào)用pipeline.fireChannelActive()在NioSocketChannel中的pipeline傳播ChannelActive事件,最終在pipeline的頭結(jié)點(diǎn)HeadContext中響應(yīng)并注冊(cè)OP_READ事件Sub Reactor中的Selector上祭芦。

傳播ChannelActive事件.png
public abstract class AbstractNioChannel extends AbstractChannel { {

    @Override
    protected void doBeginRead() throws Exception {
        ..............省略................

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化時(shí) readInterestOp設(shè)置的是OP_ACCEPT事件
         * 2:SocketChannel 初始化時(shí) readInterestOp設(shè)置的是OP_READ事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //注冊(cè)監(jiān)聽(tīng)OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

}

注意這里的readInterestOp為客戶(hù)端NioSocketChannel在初始化時(shí)設(shè)置的OP_READ事件筷笨。


到這里,Netty中的Main Reactor接收連接的整個(gè)流程,我們就介紹完了胃夏,此時(shí)Netty中主從Reactor組的結(jié)構(gòu)就變?yōu)椋?/p>

主從Reactor組完整結(jié)構(gòu).png

總結(jié)

本文我們介紹了NioServerSocketChannel處理客戶(hù)端連接事件的整個(gè)過(guò)程轴或。

  • 接收連接的整個(gè)處理框架。

  • 影響Netty接收連接吞吐的Bug產(chǎn)生的原因仰禀,以及修復(fù)的方案照雁。

  • 創(chuàng)建并初始化客戶(hù)端NioSocketChannel

  • 初始化NioSocketChannel中的pipeline答恶。

  • 客戶(hù)端NioSocketChannelSub Reactor注冊(cè)的過(guò)程

其中我們也對(duì)比了NioServerSocketChannelNioSocketChannel在創(chuàng)建初始化以及后面向Reactor注冊(cè)過(guò)程中的差異之處饺蚊。

當(dāng)客戶(hù)端NioSocketChannel接收完畢并向Sub Reactor注冊(cè)成功后,那么接下來(lái)Sub Reactor就開(kāi)始監(jiān)聽(tīng)注冊(cè)其上的所有客戶(hù)端NioSocketChannelOP_READ事件悬嗓,并等待客戶(hù)端向服務(wù)端發(fā)送網(wǎng)絡(luò)數(shù)據(jù)污呼。

后面Reactor的主角就該變?yōu)?code>Sub Reactor以及注冊(cè)在其上的客戶(hù)端NioSocketChannel了。

下篇文章包竹,我們將會(huì)討論Netty是如何接收網(wǎng)絡(luò)數(shù)據(jù)的~~~~ 我們下篇文章見(jiàn)~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末曙求,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子映企,更是在濱河造成了極大的恐慌悟狱,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件堰氓,死亡現(xiàn)場(chǎng)離奇詭異挤渐,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)双絮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)浴麻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人囤攀,你說(shuō)我怎么就攤上這事软免。” “怎么了焚挠?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵膏萧,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蝌衔,道長(zhǎng)榛泛,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任噩斟,我火速辦了婚禮曹锨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘剃允。我一直安慰自己沛简,他們只是感情好齐鲤,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著椒楣,像睡著了一般给郊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上撒顿,一...
    開(kāi)封第一講書(shū)人閱讀 51,754評(píng)論 1 307
  • 那天丑罪,我揣著相機(jī)與錄音荚板,去河邊找鬼凤壁。 笑死,一個(gè)胖子當(dāng)著我的面吹牛跪另,可吹牛的內(nèi)容都是我干的拧抖。 我是一名探鬼主播,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼免绿,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼唧席!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起嘲驾,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤淌哟,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后辽故,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體徒仓,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年誊垢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了掉弛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡喂走,死狀恐怖殃饿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情芋肠,我是刑警寧澤乎芳,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站帖池,受9級(jí)特大地震影響秒咐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜碘裕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一携取、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧帮孔,春花似錦雷滋、人聲如沸不撑。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)焕檬。三九已至,卻和暖如春澳泵,著一層夾襖步出監(jiān)牢的瞬間实愚,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工兔辅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留腊敲,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓维苔,卻偏偏與公主長(zhǎng)得像碰辅,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子介时,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容