5. Netty解析:connect/bind方法背后

前言

?? 在之前的文章中春锋,我們已經(jīng)知道了netty中channel創(chuàng)建及注冊(cè):這個(gè)過程是connect方法(client端)或者bind方法(server端)所做的第一件事期奔,體現(xiàn)在initAndRegister方法中馁痴,在這之后還需要完成一些操作以實(shí)現(xiàn)connect肺孤。我們先從client端開始赠堵。

    private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();

        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }

客戶端connect

??initAndRegister會(huì)返回一個(gè)ChannelFuture對(duì)象粤铭,注冊(cè)邏輯會(huì)提交給對(duì)應(yīng)EventLoop來異步的執(zhí)行,而通過這個(gè)ChannelFuture實(shí)例我們就可以判斷異步任務(wù)的執(zhí)行狀態(tài)吗垮。由于是異步任務(wù)烁登,所以它是否已經(jīng)執(zhí)行完畢不得知饵沧,所以通過ChannelFuture判斷任務(wù)(注冊(cè)任務(wù))是否執(zhí)行完畢,如果沒有執(zhí)行完畢就為其添加一個(gè)監(jiān)聽回調(diào)羡儿,回調(diào)時(shí)機(jī)發(fā)生在任務(wù)結(jié)束掠归。當(dāng)任務(wù)完成后虏冻,開始執(zhí)行doConnect0方法厨相。并返回一個(gè)新的ChannelFuture實(shí)例鸥鹉,順便提一下通過這里的regFuture和promise绪撵,我們也可以看出netty中存在大量的異步處理方式祝蝠。

    private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, promise);
                    } else {
                        channel.connect(remoteAddress, localAddress, promise);
                    }
                    promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

??通過代碼细溅,我們看到喇聊,通道的連接操作又是作為一個(gè)異步任務(wù)交于channel所注冊(cè)的EventLoop來執(zhí)行誓篱,前提條件是注冊(cè)任務(wù)必須已經(jīng)成功完成了窜骄。在客戶端邻遏,一般沒有執(zhí)行l(wèi)ocalAddress赎线,所以我們繼續(xù)跟蹤channel.connect(remoteAddress, promise)垂寥,發(fā)現(xiàn)矫废,channel的connect操作由pipeline來實(shí)現(xiàn),這次與之前不同的是台舱,它調(diào)用了connect操作竞惋,完成出站處理器在流水線上的執(zhí)行拆宛,與入站從頭開始不同浑厚,出站操作connect是從尾部開始的。與入站相似物蝙,會(huì)依次找到下一個(gè)出站處理器诬乞,回調(diào)其中的connect方法(這里大家可以調(diào)試看一下震嫉,不在贅述)责掏,最終pipeline的流程會(huì)到達(dá)頭結(jié)點(diǎn)

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return pipeline.connect(remoteAddress, promise);
    }


    @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

↓頭結(jié)點(diǎn)負(fù)責(zé)完成客戶端連接的代碼↓

    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

??在頭結(jié)點(diǎn)中,調(diào)用了一個(gè)unsafe實(shí)例的connect方法叫潦。重點(diǎn)關(guān)注doConnect方法矗蕊。

    @Override
    public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            /*忽略*/

            boolean wasActive = isActive();
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);
            } else {
                /*忽略*/
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }

    // NioSocketChannel類中

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }


    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
            throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
                @Override
                public Boolean run() throws IOException {
                    return socketChannel.connect(remoteAddress);
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }


??通過SocketUtils的connect方法卿操,我們可以看到害淤,底層借助NIO的SocketChannel進(jìn)行連接窥摄。而由于連接不會(huì)立即成功崭放,所以一般不會(huì)返回true,因此connected為false值骇,則會(huì)執(zhí)行下面這行代碼莹菱,注冊(cè)NIO連接事件

selectionKey().interestOps(SelectionKey.OP_CONNECT);

??由于配置了連接事件吱瘩,所以當(dāng)?shù)讓舆B接建立好之后道伟,后續(xù)的邏輯處理在哪里呢?還記得NioEventLoop里面的run方法吧使碾。代碼在這里再貼一下蜜徽。

    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

當(dāng)連接建立好后,會(huì)通過processSelectedKeys方法處理連接事件票摇。最終會(huì)執(zhí)行到這樣一段在之前見到過的代碼灰蛙。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
             
                return;
            }
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
           if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

當(dāng)接收到連接事件時(shí)會(huì)取消掉連接事件的注冊(cè)。隨后調(diào)用了unsafe.finishConnect()完成連接后的處理组力,finishConnect中調(diào)用了fulfillConnectPromise(connectPromise, wasActive)方法。

    private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
        if (promise == null) {
            // Closed via cancellation and the promise has been notified already.
            return;
        }

        // 當(dāng)連接建立后脱柱,底層的socketChannl打開并建立好連接随闺,active返回為true
        boolean active = isActive();

        // 修改異步執(zhí)行狀態(tài)
        boolean promiseSet = promise.trySuccess();
        if (!wasActive && active) {
            // 流水線從頭逐個(gè)回調(diào)入站的channelActive方法散罕。
            pipeline().fireChannelActive();
        }

        // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
        if (!promiseSet) {
            close(voidPromise());
        }
    }

隨后缚甩,pipeline().fireChannelActive()就開始從流水線頭部回調(diào)channelActive方法。

   // 頭部節(jié)點(diǎn)HeadContext的channelActive方法派继。
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();

        readIfIsAutoRead();
    }

頭部節(jié)點(diǎn)會(huì)首先讓流水線上的channelActive回調(diào)繼續(xù)下去(在Echo Server這個(gè)例子中绅络,EchoClientHandler的channelActive方法也會(huì)執(zhí)行),當(dāng)所有的channelActive回調(diào)完成后灭袁,調(diào)用readIfIsAutoRead方法從流水線尾部開始逐個(gè)回調(diào)read方法(這里省略了一些步驟软瞎,大家可以自行查看)芙代。最終read回調(diào)又會(huì)到達(dá)頭結(jié)點(diǎn)裹驰。

    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

    @Override
    public final void beginRead() {
        assertEventLoop();

        if (!isActive()) {
            return;
        }

        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireExceptionCaught(e);
                }
            });
            close(voidPromise());
        }
    }


    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        if (inputShutdown) {
            return;
        }

        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

在頭部節(jié)點(diǎn)調(diào)用了unsafe.beginRead()整葡,隨后又調(diào)用doBeginRead旬渠,可以發(fā)現(xiàn)腥寇,在doBeginRead中赢赊,注冊(cè)了readInterestOp事件。而readInterestOp所代表的的事件就是在生成channel時(shí)傳入的讀事件涩蜘。因此在這里是完成了讀事件的注冊(cè)嚼贡。

服務(wù)端bind

??分析了客戶端后误窖,服務(wù)端也就比較好去分析了。服務(wù)端在bind執(zhí)行后,會(huì)先去調(diào)用initAndRegister完成NioServerSocketChannel向父循環(huán)組中的時(shí)間循環(huán)的注冊(cè),但是再注冊(cè)的時(shí)候并沒有注冊(cè)有效的事件僻爽。注冊(cè)后依次經(jīng)歷下面幾個(gè)方法:doBind0 --> channel.bind --> pipeline.bind。pipeline的bind方法又會(huì)從尾部依次調(diào)用流水線上的出站處理器bind回調(diào)方法杰刽,一直延續(xù)到頭結(jié)點(diǎn)菠发。頭結(jié)點(diǎn)又調(diào)用unsafe.bind()王滤。在unsafe.bind()中,doBind借助serverSocketChannel.bind方法完成綁定滓鸠。綁定操作就此結(jié)束雁乡。隨后如同客戶端在借助SocketChannel完成connect后會(huì)發(fā)出pipeline.fireChannelActive()一樣,server端在綁定結(jié)束后也會(huì)進(jìn)行流水線上channelActive的回調(diào)糜俗□馍裕回調(diào)從頭結(jié)點(diǎn)開始,這就跟client端很相似悠抹。但不同之處在于珠月,客戶端的頭結(jié)點(diǎn)在fireChannelRead后的readIfIsAutoRead會(huì)將讀事件注冊(cè),而在server端楔敌,由于在創(chuàng)建NioServerSocketChannel時(shí)傳入的readInterestOp為accept事件啤挎,因此在通道激活active后,為NioServerSocketChannel中的ServerSocketChannel注冊(cè)了接受連接Accept事件卵凑。

總結(jié)

??我們綜合前面的文章以及本文庆聘,來總結(jié)一下connect和bind方法背后的邏輯。兩者首先都進(jìn)行了通道(NioSocketChannel或NioServerSocketChannel)的創(chuàng)建和注冊(cè)勺卢,注冊(cè)的過程只是把其中封裝的SocketChannel或者ServerSocketChannel注冊(cè)到對(duì)應(yīng)的NioEventLoop的selector中伙判,并沒有實(shí)際注冊(cè)什么有效事件。當(dāng)通道完成注冊(cè)后黑忱,添加到流水線上的handler的handlerAdded方法才會(huì)被回調(diào)(而通道注冊(cè)完成后宴抚,再向流水線添加handler時(shí),其handlerAdded方法會(huì)立即回調(diào))甫煞。隨后流水線調(diào)用fireChannelRegistered菇曲。當(dāng)具體通道的連接或者綁定操作完成后,流水線又會(huì)調(diào)用fireChannelActive方法危虱,表明通道已經(jīng)激活羊娃。通道激活并且channelActive回調(diào)都執(zhí)行完成后,客戶端注冊(cè)了讀事件而服務(wù)端注冊(cè)了accept事件埃跷。
??

*鏈接

1. Netty解析:第一個(gè)demo——Echo Server
2. Netty解析:NioEventLoopGroup事件循環(huán)組
3. Netty解析:NioSocketChannel蕊玷、NioServerSocketChannel的創(chuàng)建及注冊(cè)
4. Netty解析:Handler、Pipeline大動(dòng)脈及其在注冊(cè)過程中體現(xiàn)
5. Netty解析:connect/bind方法背后
6. Netty解析:服務(wù)端如何接受連接并后續(xù)處理讀寫事件

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末弥雹,一起剝皮案震驚了整個(gè)濱河市垃帅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌剪勿,老刑警劉巖贸诚,帶你破解...
    沈念sama閱讀 222,681評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡酱固,警方通過查閱死者的電腦和手機(jī)械念,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來运悲,“玉大人龄减,你說我怎么就攤上這事“嗝校” “怎么了希停?”我有些...
    開封第一講書人閱讀 169,421評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)署隘。 經(jīng)常有香客問我宠能,道長(zhǎng),這世上最難降的妖魔是什么磁餐? 我笑而不...
    開封第一講書人閱讀 60,114評(píng)論 1 300
  • 正文 為了忘掉前任违崇,我火速辦了婚禮,結(jié)果婚禮上诊霹,老公的妹妹穿的比我還像新娘亦歉。我一直安慰自己,他們只是感情好畅哑,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,116評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著水由,像睡著了一般荠呐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上砂客,一...
    開封第一講書人閱讀 52,713評(píng)論 1 312
  • 那天泥张,我揣著相機(jī)與錄音,去河邊找鬼鞠值。 笑死媚创,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的彤恶。 我是一名探鬼主播钞钙,決...
    沈念sama閱讀 41,170評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼声离!你這毒婦竟也來了芒炼?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,116評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤术徊,失蹤者是張志新(化名)和其女友劉穎本刽,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,651評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡子寓,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,714評(píng)論 3 342
  • 正文 我和宋清朗相戀三年暗挑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片斜友。...
    茶點(diǎn)故事閱讀 40,865評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡炸裆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出蝙寨,到底是詐尸還是另有隱情晒衩,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布墙歪,位于F島的核電站听系,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏虹菲。R本人自食惡果不足惜靠胜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,211評(píng)論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望毕源。 院中可真熱鬧浪漠,春花似錦、人聲如沸霎褐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冻璃。三九已至响谓,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間省艳,已是汗流浹背娘纷。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留跋炕,地道東北人赖晶。 一個(gè)月前我還...
    沈念sama閱讀 49,299評(píng)論 3 379
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像辐烂,于是被迫代替她去往敵國(guó)和親遏插。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,870評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容