Netty源碼分析5--服務(wù)端消息處理流程

上篇文章中已經(jīng)介紹了pipline相關(guān)的內(nèi)容pipline,如果對這部分內(nèi)容比較熟悉的話,理解這部分內(nèi)容就很簡單了袭祟。為了容易說明遏插,還是把上一節(jié)的demo程序先放到這里捂贿。

    public void start() {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .localAddress(new InetSocketAddress(port))
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new InBoundHandler1());
                        ch.pipeline().addLast(new InBoundHandler2());
                        ch.pipeline().addLast(new InBoundHandler3());
                    }
                });
       ....
    }

服務(wù)端能夠接收消息的前提是已經(jīng)和客戶端建立一個channel通道,想要了解這個channel怎么建立的可以參考這篇文章Netty源碼--accept連接胳嘲,這里不再贅述厂僧。
通道建立后,是在NioEventLoop類中監(jiān)聽這個channel的讀寫事件了牛,具體過程之前已經(jīng)在這篇文章Netty源碼--accept連接中分析颜屠,這里直接跳到processSelectedKey這個方法的實現(xiàn):

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
         
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

在這個方法中辰妙,當(dāng)監(jiān)聽到讀事件后,會調(diào)用unsafe的read方法甫窟,那么就看下這個unsafe的具體類型是啥密浑。
AbstractNioChannel.NioUnsafe unsafe = ch.unsafe()這句代碼返回了一個NioUnsafe對象,NioUnsafe 是一個接口粗井,具體實現(xiàn)類主要有兩個NioByteUnsafe和NioMessageUnsafe尔破。由于這里的unsafe是通過調(diào)用ch.unsafe生成的,ch具體類型是NioSocketChannel浇衬,通過追溯代碼這個unsafe是在NioSocketChannel的構(gòu)造函數(shù)中通過調(diào)用這個類的newUnsafe方法初始化的懒构。

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioSocketChannelUnsafe();
    }

    private final class NioSocketChannelUnsafe extends NioByteUnsafe {

從上面代碼可以看到,這個unsafe是一個NioByteUnsafe類型的耘擂,因此監(jiān)聽到讀事件后調(diào)用的unsafe.read()這個方法具體實現(xiàn)就是在NioByteUnsafe這個類中胆剧。

        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
              
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

這個方法首先調(diào)用doReadBytes這個方法讀取數(shù)據(jù)到ByteBuf中,然后調(diào)用 pipeline.fireChannelRead(byteBuf)將ByteBuf中數(shù)據(jù)發(fā)送到pipeline中保存的第一個handler中醉冤,看下具體調(diào)用過程秩霍。

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }

首先調(diào)用DedaultChannelPipline類中的fireChannelRead方法,在這個方法中調(diào)用了AbstractChannelHandlerContext這個類的invokeChannelRead方法冤灾,并將DedaultChannelPipline的指向鏈表首節(jié)點的head指針作為這個方法的參數(shù)傳遞進(jìn)去前域。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
          
            });
        }
    }

最終調(diào)用next.invokeChannelRead(m)方法,handler()返回的是HeadContext類韵吨,看下這個類中invokeChannelRead方法的實現(xiàn)匿垄。

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
          @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }

然后調(diào)用ctx.fireChannelRead(msg)這個方法,其實選擇channel的邏輯主要在這個方法實現(xiàn)归粉。

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

findContextInbound這個方法其實返回的就是DefaultChannelPipeline中鏈表中下一個需要處理的channelHandler椿疗,通過這個方法使消息能夠在多個channelHandler傳遞。選擇好下一個channelHandler所對應(yīng)的AbstractChannelHandlerContext類后糠悼,調(diào)用invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)方法届榄。

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
          
            });
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

主要看這句 ((ChannelInboundHandler) handler()).channelRead(this, msg),handler()返回的是當(dāng)前AbstractChannelHandlerContext 對應(yīng)的channelHandler倔喂,這個channelHandler其實就是我們在demo程序中初始化時添加的InBoundHandler1铝条、InBoundHandler2、InBoundHandler3席噩。這三個類都繼承ChannelInboundHandlerAdapter班缰,實現(xiàn)了channelRead方法,這樣我們就可以在這個channelRead方法根據(jù)自己的協(xié)議以及業(yè)務(wù)特點悼枢,對數(shù)據(jù)做特定的處理埠忘,這也是netty作為一個網(wǎng)絡(luò)通信框架非常靈活的一點。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市莹妒,隨后出現(xiàn)的幾起案子名船,更是在濱河造成了極大的恐慌,老刑警劉巖旨怠,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件渠驼,死亡現(xiàn)場離奇詭異,居然都是意外死亡运吓,警方通過查閱死者的電腦和手機(jī)渴邦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門疯趟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拘哨,“玉大人,你說我怎么就攤上這事信峻【肭啵” “怎么了?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵盹舞,是天一觀的道長产镐。 經(jīng)常有香客問我,道長踢步,這世上最難降的妖魔是什么癣亚? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮获印,結(jié)果婚禮上述雾,老公的妹妹穿的比我還像新娘。我一直安慰自己兼丰,他們只是感情好玻孟,可當(dāng)我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鳍征,像睡著了一般黍翎。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上艳丛,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天匣掸,我揣著相機(jī)與錄音,去河邊找鬼氮双。 笑死碰酝,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的眶蕉。 我是一名探鬼主播砰粹,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了碱璃?” 一聲冷哼從身側(cè)響起弄痹,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎嵌器,沒想到半個月后肛真,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡爽航,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年蚓让,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片讥珍。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡历极,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出衷佃,到底是詐尸還是另有隱情趟卸,我是刑警寧澤,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布氏义,位于F島的核電站锄列,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏惯悠。R本人自食惡果不足惜邻邮,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望克婶。 院中可真熱鬧筒严,春花似錦、人聲如沸鸠补。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽紫岩。三九已至规惰,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間泉蝌,已是汗流浹背歇万。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留勋陪,地道東北人贪磺。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像诅愚,于是被迫代替她去往敵國和親寒锚。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容