Netty源碼分析之服務(wù)端Accept過(guò)程詳解

作者: 一字馬胡
轉(zhuǎn)載標(biāo)志 【2017-11-03】

更新日志

日期 更新內(nèi)容 備注
2017-11-03 添加轉(zhuǎn)載標(biāo)志 持續(xù)更新

NI/O C/S通信過(guò)程

下面分別展示了NI/O模式下的客戶端/服務(wù)端編程模型:

NI/O服務(wù)端
NI/O客戶端

Netty是一種基于NI/O的網(wǎng)絡(luò)框架袋倔,網(wǎng)絡(luò)層面的操作只是對(duì)NI/O提供的API的封裝,所以生真,它的服務(wù)端流程和客戶端流程是和NI/O的一致的区拳,對(duì)于客戶端而言溃列,它要做的事情就是連接到服務(wù)端,然后發(fā)送/接收消息。對(duì)于服務(wù)端而言涕蜂,它要bind一個(gè)端口馒稍,然后等待客戶端的連接皿哨,接收客戶端的連接使用的是Accept操作,一個(gè)服務(wù)端需要為多個(gè)客戶端提供服務(wù)纽谒,而每次accept都會(huì)生成一個(gè)新的Channel证膨,Channel是一個(gè)服務(wù)端和客戶端之間數(shù)據(jù)傳輸?shù)耐ǖ溃梢韵蚱鋡rite數(shù)據(jù)鼓黔,或者從中read數(shù)據(jù)央勒。本文將分析Netty框架的Accept流程。

Netty的Accept流程

Netty是一個(gè)較為復(fù)雜的網(wǎng)絡(luò)框架请祖,想要理解它的設(shè)計(jì)需要首先了解NI/O的相關(guān)知識(shí)订歪,為了對(duì)Netty框架有一個(gè)大概的了解,你可以參考Netty線程模型及EventLoop詳解肆捕,該文章詳解解析了Netty中重要的事件循環(huán)處理流程,包含EventLoop的初始化和啟動(dòng)等相關(guān)內(nèi)容盖高。下面首先展示了Netty中的EventLoop的分配模型慎陵,Netty服務(wù)端會(huì)為每一個(gè)新建立的Channel分配一個(gè)EventLoop,并且這個(gè)EventLoop將服務(wù)于這個(gè)Channel得整個(gè)生命周期不會(huì)改變喻奥,而一個(gè)EventLoop可能會(huì)被分配給多個(gè)Channel席纽,也就是一個(gè)EventLoop可能會(huì)服務(wù)于多個(gè)Channel的讀寫事件,這對(duì)于想要使用ThreadLocal的場(chǎng)景需要認(rèn)真考慮撞蚕。

Netty NI/O模式下EventLoop分配模型

在文章Netty線程模型及EventLoop詳解中已經(jīng)分析了EventLoop的流程润梯,現(xiàn)在從事件循環(huán)的起點(diǎn)開(kāi)始看起,也就是NioEventLoop的run方法甥厦,本文關(guān)心的是Netty的Accept事件纺铭,當(dāng)在Channel上發(fā)生了事件之后,會(huì)執(zhí)行processSelectedKeysPlain方法刀疙,看一下這個(gè)方法:


 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

接著會(huì)執(zhí)行processSelectedKey這個(gè)方法舶赔,下面是它的細(xì)節(jié):


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

現(xiàn)在我們可以看到和NI/O一樣的類似OP_READ和OP_ACCEPT之類的東西了,OP_ACCEPT表示的是有Accept事件發(fā)生了谦秧,需要我們處理竟纳,但是發(fā)現(xiàn)好像OP_READ 事件和OP_ACCEPT事件的處理都是通過(guò)一個(gè)read方法進(jìn)行的,我們先來(lái)找到這個(gè)read方法:

-------------------------------------------------
AbstractNioMessageChannel.NioMessageUnsafe.read
-------------------------------------------------
        public void read() {
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

            } 
         }
    }


本文中的所有代碼都是已經(jīng)處理過(guò)的疚鲤,完整的代碼參考源代碼锥累,本文為了控制篇幅去除了一些不影響閱讀(影響邏輯)的代碼,上面的read方法中有一個(gè)關(guān)鍵的方法doReadMessages集歇,下面是它的實(shí)現(xiàn):


--------------------------------------------
NioServerSocketChannel. doReadMessages
--------------------------------------------

    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

這個(gè)方法就是處理accept類型的事件的桶略,為了更好的理解上面的代碼,下面展示一段在NI/O中服務(wù)端的代碼:


int port = 8676;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking (false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind (new InetSocketAddres(port));
Selector selector = Selector.open();
serverChannel.register (selector, SelectionKey.OP_ACCEPT);
while (true) {
    int n = selector.select();
    Iterator it = selector.selectedKeys().iterator();
    while (it.hasNext()) {
        SelectionKey key = (SelectionKey) it.next();
        if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel channel = server.accept();
            channel.configureBlocking (false);
            channel.register (selector, SelectionKey.OP_READ);
        }
        if (key.isReadable( )) {
            processReadEvent(key);
        }
        
        it.remove( );
    }
} 

可以看到,服務(wù)端accept一次就會(huì)產(chǎn)生一個(gè)新的Channel删性,Netty也是亏娜,每次Accept都會(huì)new一個(gè)新的NioSocketChannel,當(dāng)然蹬挺,這個(gè)Channel需要分配一個(gè)EventLoop給他才能開(kāi)始事件循環(huán)维贺,但是Netty服務(wù)端的Accept事件到此應(yīng)該可以清楚流程了,下面分析這個(gè)新的Channel是怎么開(kāi)始事件循環(huán)的巴帮。繼續(xù)看AbstractNioMessageChannel.NioMessageUnsafe.read這個(gè)方法溯泣,其中有一個(gè)句話:


pipeline.fireChannelRead(readBuf.get(i));

現(xiàn)在來(lái)跟蹤一下這個(gè)方法的調(diào)用鏈:


 -> ChannelPipeline.fireChannelRead
 -> DefaultChannelPipeline.fireChannelRead
 -> AbstractChannelHandlerContext.invokeChannelRead
 -> ChannelInboundHandler.channelRead
 ->ServerBootstrapAcceptor.channelRead

上面列出的是主要的調(diào)用鏈路,只是為了分析Accept的過(guò)程榕茧,到ServerBootstrapAcceptor.channelRead這個(gè)方法就可以看到是怎么分配EventLoop給Channel的了垃沦,下面展示了ServerBootstrapAcceptor.channelRead這個(gè)方法的細(xì)節(jié):


        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);        【1】

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());   【2】
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {   【3】
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

  • 【1】首先將服務(wù)端的處理邏輯賦值給新建立的這個(gè)Channel得pipeline,使得這個(gè)新建立的Channel可以得到服務(wù)端提供的服務(wù)
  • 【2】屬性賦值
  • 【3】將這個(gè)新建立的Channel添加到EventLoopGroup中去用押,這里進(jìn)行EventLoop的分配

下面來(lái)仔細(xì)看一下【3】這個(gè)register方法:

========================================
MultithreadEventLoopGroup.register
========================================

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    
========================================
SingleThreadEventLoop.register
========================================

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    

========================================
AbstractUnsafe.register
========================================    
    
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

上面的流程展示了這個(gè)新的Channel是怎么獲得一個(gè)EventLoop的肢簿,而EventLoopGroup分配EventLoop的關(guān)鍵在于MultithreadEventLoopGroup.register這個(gè)方法中的next方法,而這部分的分析已經(jīng)在Netty線程模型及EventLoop詳解中做過(guò)了蜻拨,不再贅述池充。當(dāng)一個(gè)新的連接被服務(wù)端Accept之后,會(huì)創(chuàng)建一個(gè)新的Channel來(lái)維持服務(wù)端與客戶端之間的通信缎讼,而每個(gè)新建立的Channel都會(huì)被分配一個(gè)EventLoop來(lái)實(shí)現(xiàn)事件循環(huán)收夸,本文分析了Netty服務(wù)端Accept的詳細(xì)過(guò)程,至此血崭,對(duì)于Netty的EventLoop卧惜、EventLoopGroup以及EventLoop是如何被運(yùn)行起來(lái)的,以及服務(wù)端是如何Accept新的連接的這些問(wèn)題應(yīng)該都已經(jīng)有答案了夹纫。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末咽瓷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子捷凄,更是在濱河造成了極大的恐慌忱详,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跺涤,死亡現(xiàn)場(chǎng)離奇詭異匈睁,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)桶错,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門航唆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人院刁,你說(shuō)我怎么就攤上這事糯钙。” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 164,862評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵任岸,是天一觀的道長(zhǎng)再榄。 經(jīng)常有香客問(wèn)我,道長(zhǎng)享潜,這世上最難降的妖魔是什么困鸥? 我笑而不...
    開(kāi)封第一講書人閱讀 58,728評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮剑按,結(jié)果婚禮上疾就,老公的妹妹穿的比我還像新娘。我一直安慰自己艺蝴,他們只是感情好猬腰,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,743評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著猜敢,像睡著了一般姑荷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上锣枝,一...
    開(kāi)封第一講書人閱讀 51,590評(píng)論 1 305
  • 那天厢拭,我揣著相機(jī)與錄音,去河邊找鬼撇叁。 笑死,一個(gè)胖子當(dāng)著我的面吹牛畦贸,可吹牛的內(nèi)容都是我干的陨闹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,330評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼薄坏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼趋厉!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起胶坠,我...
    開(kāi)封第一講書人閱讀 39,244評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤君账,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后沈善,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體乡数,經(jīng)...
    沈念sama閱讀 45,693評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,885評(píng)論 3 336
  • 正文 我和宋清朗相戀三年闻牡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了净赴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,001評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡罩润,死狀恐怖玖翅,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤金度,帶...
    沈念sama閱讀 35,723評(píng)論 5 346
  • 正文 年R本政府宣布应媚,位于F島的核電站,受9級(jí)特大地震影響猜极,放射性物質(zhì)發(fā)生泄漏中姜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,343評(píng)論 3 330
  • 文/蒙蒙 一魔吐、第九天 我趴在偏房一處隱蔽的房頂上張望扎筒。 院中可真熱鬧,春花似錦酬姆、人聲如沸嗜桌。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,919評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)骨宠。三九已至,卻和暖如春相满,著一層夾襖步出監(jiān)牢的瞬間层亿,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,042評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工立美, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留匿又,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,191評(píng)論 3 370
  • 正文 我出身青樓建蹄,卻偏偏與公主長(zhǎng)得像碌更,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子洞慎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,955評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容