【死磕Netty】-----服務(wù)端啟動(dòng)過(guò)程分析

原文出處http://cmsblogs.com/ 『chenssy』
轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處颖榜,謝謝!

上篇博客(【死磕Netty】----Netty的核心組件及其設(shè)計(jì)),了解了 Netty 的核心組件及其設(shè)計(jì),但是這些都是零散的,不成體系姥闭。那么 Netty 是如何利用這些組件構(gòu)建成一個(gè)高性能的異步通信框架。通過(guò)這篇博客可以初步了解越走。

下面先來(lái)一段 Netty 服務(wù)端的代碼:

public class NettyServer {

    public void bind(int port){
        // 創(chuàng)建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();        //創(chuàng)建BOSS線程組 用于服務(wù)端接受客戶端的連接
        EventLoopGroup workerGroup = new NioEventLoopGroup();      //創(chuàng)建WORK線程組 用于進(jìn)行SocketChannel的網(wǎng)絡(luò)讀寫(xiě)

        try {
            // 創(chuàng)建ServerBootStrap實(shí)例
            // ServerBootstrap 用于啟動(dòng)NIO服務(wù)端的輔助啟動(dòng)類棚品,目的是降低服務(wù)端的開(kāi)發(fā)復(fù)雜度
            ServerBootstrap b = new ServerBootstrap();
            // 綁定Reactor線程池
            b.group(bossGroup, workerGroup)
                    // 設(shè)置并綁定服務(wù)端Channel
                    // 指定所使用的NIO傳輸?shù)腃hannel
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingServerHandler())
                    .childHandler(new ChannelInitializer(){

                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            //do something
                        }
                    });

            // 綁定端口,同步等待成功
            ChannelFuture future = b.bind(port).sync();
            // 等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 優(yōu)雅地關(guān)閉
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class LoggingServerHandler extends ChannelInboundHandlerAdapter{
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("loggin-channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("loggin-channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("loggin-handlerAdded");
        }
    }

    public static void main(String[] args){
            new NettyServer().bind(8899);
    }
}

上面代碼為 Netty 服務(wù)器端的完整代碼廊敌,在整個(gè)服務(wù)端代碼中會(huì)涉及如下幾個(gè)核心類铜跑。

ServerBootstrap

ServerBootstrap 為 Netty 服務(wù)端的啟動(dòng)輔助類,它提供了一系列的方法用于設(shè)置服務(wù)端啟動(dòng)相關(guān)的參數(shù)骡澈。

Channel

Channel 為 Netty 網(wǎng)絡(luò)操作抽象類锅纺,它定義了一組功能,其提供的 API 大大降低了直接使用 Socket 類的復(fù)雜性肋殴。當(dāng)然它也不僅僅只是包括了網(wǎng)絡(luò) IO 操作的基本功能囤锉,還包括一些與 Netty 框架相關(guān)的功能坦弟,包括獲取該 Channel 的 EventLoop 等等。

EventLoopGroup

EventLoopGroup 為 Netty 的 Reactor 線程池官地,它實(shí)際上就是 EventLoop 的容器减拭,而 EventLoop 為 Netty 的核心抽象類,它的主要職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器 Selector 上的 Channel区丑。

ChannelHandler

ChannelHandler 作為 Netty 的主要組件,它主要負(fù)責(zé) I/O 事件或者 I/O 操作進(jìn)行攔截和處理修陡,它可以選擇性地?cái)r截和處理自己感覺(jué)興趣的事件沧侥,也可以透?jìng)骱徒K止事件的傳遞。

ChannelPipeline

ChannelPipeline 是 ChannelHandler 鏈的容器魄鸦,它負(fù)責(zé) ChannelHandler 的管理和事件攔截與調(diào)度宴杀。每當(dāng)新建一個(gè) Channel 都會(huì)分配一個(gè)新的 ChannelPepeline,同時(shí)這種關(guān)聯(lián)是永久性的拾因。

以上是簡(jiǎn)要介紹旺罢,詳細(xì)介紹請(qǐng)參考(【死磕Netty】-----Netty的核心組件及其設(shè)計(jì))

服務(wù)端創(chuàng)建流程

Netty 服務(wù)端創(chuàng)建的時(shí)序圖,如下(摘自《Netty權(quán)威指南(第二版)》)

Netty 服務(wù)端創(chuàng)建的時(shí)序圖

主要步驟為:

  1. 創(chuàng)建 ServerBootstrap 實(shí)例
  2. 設(shè)置并綁定 Reactor 線程池
  3. 設(shè)置并綁定服務(wù)端 Channel
  4. 創(chuàng)建并初始化 ChannelPipeline
  5. 添加并設(shè)置 ChannelHandler
  6. 綁定并啟動(dòng)監(jiān)聽(tīng)端口

服務(wù)端源碼分析

1绢记、創(chuàng)建兩個(gè)EventLoopGroup

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

bossGroup 為 BOSS 線程組扁达,用于服務(wù)端接受客戶端的連接, workerGroup 為 worker 線程組,用于進(jìn)行 SocketChannel 的網(wǎng)絡(luò)讀寫(xiě)蠢熄。當(dāng)然也可以創(chuàng)建一個(gè)并共享跪解。

2、創(chuàng)建ServerBootstrap實(shí)例

ServerBootstrap b = new ServerBootstrap();

ServerBootStrap為Netty服務(wù)端的啟動(dòng)引導(dǎo)類签孔,用于幫助用戶快速配置叉讥、啟動(dòng)服務(wù)端服務(wù)。提供的方法如下:

方法名稱 方法描述
group 設(shè)置 ServerBootstrap 要用的 EventLoopGroup
channel 設(shè)置將要被實(shí)例化的 ServerChannel 類
option 實(shí)例化的 ServerChannel 的配置項(xiàng)
childHandler 設(shè)置并添加 ChannelHandler
bind 綁定 ServerChannel

ServerBootStrap底層采用裝飾者模式饥追。

關(guān)于 ServerBootStrap 我們后續(xù)做詳細(xì)分析图仓。

3、設(shè)置并綁定Reactor線程池

調(diào)用 group() 方法但绕,為 ServerBootstrap 實(shí)例設(shè)置并綁定 Reactor 線程池救崔。

b.group(bossGroup, workerGroup)

EventLoopGroup 為 Netty 線程池,它實(shí)際上就是 EventLoop 的數(shù)組容器捏顺。EventLoop 的職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器 Selector 上的 Channel帚豪,Selector 的輪詢操作由綁定的 EventLoop 線程 run 方法驅(qū)動(dòng),在一個(gè)循環(huán)體內(nèi)循環(huán)執(zhí)行草丧。通俗點(diǎn)講就是一個(gè)死循環(huán)狸臣,不斷的檢測(cè) I/O 事件、處理 I/O 事件昌执。

這里設(shè)置了兩個(gè)group烛亦,這個(gè)其實(shí)有點(diǎn)兒像我們工作一樣诈泼。需要兩類型的工人,一個(gè)老板(bossGroup),一個(gè)工人(workerGroup)煤禽,老板負(fù)責(zé)從外面接活铐达,工人則負(fù)責(zé)死命干活(尼瑪,和我上家公司一模一樣)檬果。所以這里 bossGroup 的作用就是不斷地接收新的連接瓮孙,接收之后就丟給 workerGroup 來(lái)處理,workerGroup 負(fù)責(zé)干活就行(負(fù)責(zé)客戶端連接的 IO 操作)选脊。

源碼如下:

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);        // 綁定boosGroup
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;    // 綁定workerGroup
        return this;
    }

其中父 EventLoopGroup 傳遞到父類的構(gòu)造函數(shù)中:

    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return (B) this;
    }

4杭抠、設(shè)置并綁定服務(wù)端Channel
綁定線程池后,則需要設(shè)置 channel 類型恳啥,服務(wù)端用的是 NioServerSocketChannel 偏灿。

.channel(NioServerSocketChannel.class)

調(diào)用 ServerBootstrap.channel 方法用于設(shè)置服務(wù)端使用的 Channel,傳遞一個(gè) NioServerSocketChannel Class對(duì)象钝的,Netty通過(guò)工廠類翁垂,利用反射創(chuàng)建NioServerSocketChannel 對(duì)象,如下:

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

channelFactory() 用于設(shè)置 Channel 工廠的:

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }
    
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return (B) this;
    }

這里傳遞的是 ReflectiveChannelFactory硝桩,其源代碼如下:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }
    //需要?jiǎng)?chuàng)建 channel 的時(shí)候沿猜,該方法將被調(diào)用
    @Override
    public T newChannel() {
        try {
            // 反射創(chuàng)建對(duì)應(yīng) channel
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

確定服務(wù)端的 Channel(NioServerSocketChannel)后,調(diào)用 option()方法設(shè)置 Channel 參數(shù)碗脊,作為服務(wù)端邢疙,主要是設(shè)置TCP的backlog參數(shù),如下:

.option(ChannelOption.SO_BACKLOG, 1024)

option()源碼如下:

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }
    
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

五望薄、添加并設(shè)置ChannelHandler

設(shè)置完 Channel 參數(shù)后疟游,用戶可以為啟動(dòng)輔助類和其父類分別指定 Handler。

 .handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
    //省略代碼
})

這兩個(gè) Handler 不一樣痕支,前者(handler())設(shè)置的 Handler 是服務(wù)端 NioServerSocketChannel的颁虐,后者(childHandler())設(shè)置的 Handler 是屬于每一個(gè)新建的 NioSocketChannel 的。跟蹤源代碼會(huì)發(fā)現(xiàn)兩種所處的類不一樣卧须,handler 位于 AbstractBootstrap 中另绩,childHandler 位于 ServerBootstrap 中,如下:

    // AbstractBootstrap
    public B handler(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
        return (B) this;
    }
    
    // ServerBootstrap
    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }

ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的花嘶,所有連接該監(jiān)聽(tīng)端口的客戶端都會(huì)執(zhí)行它笋籽,父類 AbstractBootstrap 中的 Handler 是一個(gè)工廠類,它為每一個(gè)新接入的客戶端都創(chuàng)建一個(gè)新的 Handler椭员。如下圖(《Netty權(quán)威指南(第二版)》):

[圖片上傳失敗...(image-bcab2c-1512393901237)]

六车海、綁定端口,啟動(dòng)服務(wù)

服務(wù)端最后一步隘击,綁定端口并啟動(dòng)服務(wù)侍芝,如下:

ChannelFuture future = b.bind(port).sync();

調(diào)用 ServerBootstrap 的 bind() 方法進(jìn)行端口綁定:

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }    

首先調(diào)用 validate() 方法進(jìn)行參數(shù)校驗(yàn)研铆,然后調(diào)用 doBind() 方法:

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化并注冊(cè)一個(gè)Channel
        final ChannelFuture regFuture = initAndRegister();

        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        // 注冊(cè)成功
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            // 調(diào)用doBind0綁定
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

該方法涉及內(nèi)容較多,我們分解來(lái)看州叠,如下:

  1. 首先通過(guò) initAndRegister() 得到一個(gè) ChannelFuture 對(duì)象 regFuture棵红;
  2. 根據(jù)得到的 regFuture 對(duì)象判斷該對(duì)象是否拋出異常 (regFuture.cause()),如果是咧栗,直接返回逆甜;
  3. 根據(jù) regFuture.isDone()判斷 initAndRegister()是否執(zhí)行完畢,如果執(zhí)行完成致板,則調(diào)用 doBind0交煞;
  4. initAndRegister() 沒(méi)有執(zhí)行完畢,則向 regFuture 對(duì)象添加一個(gè) ChannelFutureListener 監(jiān)聽(tīng)可岂,當(dāng) initAndRegister() 執(zhí)行完畢后會(huì)調(diào)用 operationComplete(),在 operationComplete() 中依然會(huì)判斷 ChannelFuture 是否拋出異常翰灾,如果沒(méi)有則調(diào)用 doBind0進(jìn)行綁定缕粹。

按照上面的步驟我們一步一步來(lái)剖析 doBind() 方法。

initAndRegister()

執(zhí)行 initAndRegister() 會(huì)得到一個(gè) ChannelFuture 對(duì)象 regFuture纸淮,代碼如下:

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 新建一個(gè)Channel
            channel = channelFactory.newChannel();
            // 初始化Channel
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                channel.unsafe().closeForcibly();
            }
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        // /向EventLoopGroup中注冊(cè)一個(gè)channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

首先調(diào)用 newChannel() 新建一個(gè)Channel平斩,這里是NioServerSocketChannel,還記前面 4咽块、設(shè)置并綁定服務(wù)端Channel(.channel(NioServerSocketChannel.class)中 設(shè)置的Channel工廠類么绘面?在這里派上用處了。在上面提到了通過(guò)反射的機(jī)制我們可以得到一個(gè) NioServerSocketChannel 類的實(shí)例侈沪。那么 NioServerSocketChannel 到底是一個(gè)什么東西呢揭璃?如下圖:

這里寫(xiě)圖片描述

上圖是 NioServerSocketChannel 的繼承體系結(jié)構(gòu)圖, NioServerSocketChannel 在構(gòu)造函數(shù)中會(huì)依靠父類來(lái)完成一項(xiàng)一項(xiàng)的初始化工作亭罪。先看 NioServerSocketChannel 構(gòu)造函數(shù)瘦馍。

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

newSocket() 方法較為簡(jiǎn)單,它是利用 SelectorProvider.openServerSocketChannel()应役,產(chǎn)生一個(gè) ServerSocketChannel 對(duì)象情组。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

該構(gòu)造函數(shù)首先是調(diào)用父類的構(gòu)造方法,然后設(shè)置 config屬性箩祥。父類構(gòu)造方法如下:

    // AbstractNioMessageChannel
    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
    
    // AbstractNioChannel
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    
    // AbstractChannel
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

通過(guò) super() 院崇,一層一層往上,直到 AbstractChannel袍祖。我們從最上層解析底瓣。

  • AbstractChannel 設(shè)置了 unsafe (unsafe = newUnsafe())和 pipeline(pipeline = newChannelPipeline());
  • AbstractNioChannel 將當(dāng)前 ServerSocketChannel 設(shè)置成了非阻塞(ch.configureBlocking(false);)蕉陋,同時(shí)設(shè)置SelectionKey.OP_ACCEPT事件(this.readInterestOp = readInterestOp; readInterestOp 值由 NioServerSocketChannel 中傳遞)键耕;
  • NioServerSocketChannel 設(shè)置 config屬性(config = new NioServerSocketChannelConfig(this, javaChannel().socket()))。

所以 channel = channelFactory.newChannel() 通過(guò)反射機(jī)制產(chǎn)生了 NioServerSocketChannel 類實(shí)例柑营。同時(shí)該實(shí)例設(shè)置了NioMessageUnsafe屈雄、DefaultChannelPipeline、非阻塞官套、SelectionKey.OP_ACCEPT事件 和 NioServerSocketChannelConfig 屬性酒奶。

看完了 channelFactory.newChannel();,我們?cè)倏?init()奶赔。

    void init(Channel channel) throws Exception {
         // 設(shè)置配置的option參數(shù)
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        // 獲取綁定的pipeline
        ChannelPipeline p = channel.pipeline();
        
        // 準(zhǔn)備child用到的4個(gè)part
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        // 為NioServerSocketChannel的pipeline添加一個(gè)初始化Handler,
        // 當(dāng)NioServerSocketChannel在EventLoop注冊(cè)成功時(shí)惋嚎,該handler的init方法將被調(diào)用
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                //如果用戶配置過(guò)Handler
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 為NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor處理器
                        // 該Handler主要用來(lái)將新創(chuàng)建的NioSocketChannel注冊(cè)到EventLoopGroup中
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

其實(shí)整個(gè)過(guò)程可以分為三個(gè)步驟:

  1. 設(shè)置 Channel 的 option 和 attr;
  2. 獲取綁定的 pipeline站刑,然后為 NioServerSocketChanne l綁定的 pipeline 添加 Handler另伍;
  3. 將用于服務(wù)端注冊(cè)的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中。ServerBootstrapAcceptor 為一個(gè)接入器绞旅,專門接受新請(qǐng)求摆尝,把新的請(qǐng)求扔給某個(gè)事件循環(huán)器。

至此初始化部分已經(jīng)結(jié)束因悲,我們?cè)倏醋?cè)部分堕汞,

        // /向EventLoopGroup中注冊(cè)一個(gè)channel
        ChannelFuture regFuture = config().group().register(channel);

注冊(cè)方法的調(diào)用位于 initAndRegister() 方法中。注意這里的 group() 返回的是前面的 boss NioEvenLoopGroup晃琳,它繼承 MultithreadEventLoopGroup讯检,調(diào)用的 register(),也是 MultithreadEventLoopGroup 中的卫旱。如下:

    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

調(diào)用 next() 方法從 EventLoopGroup 中獲取下一個(gè) EventLoop人灼,調(diào)用 register() 方法注冊(cè):

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

將Channel和EventLoop封裝成一個(gè)DefaultChannelPromise對(duì)象,然后調(diào)用register()方法顾翼。DefaultChannelPromis為ChannelPromise的默認(rèn)實(shí)現(xiàn)挡毅,而ChannelPromisee繼承Future,具備異步執(zhí)行結(jié)構(gòu)暴构,綁定Channel跪呈,所以又具備了監(jiān)聽(tīng)的能力,故而ChannelPromis是Netty異步執(zhí)行的核心接口取逾。

    public ChannelFuture register(ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

首先獲取 channel 的 unsafe 對(duì)象耗绿,該 unsafe 對(duì)象就是在之前設(shè)置過(guò)得。然后調(diào)用 register() 方法砾隅,如下:

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;
                
            // 必須要保證注冊(cè)是由該EventLoop發(fā)起的
            if (eventLoop.inEventLoop()) {
                register0(promise);        // 注冊(cè)
            } else {
                // 如果不是單獨(dú)封裝成一個(gè)task異步執(zhí)行
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

過(guò)程如下:

  1. 首先通過(guò)isRegistered() 判斷該 Channel 是否已經(jīng)注冊(cè)到 EventLoop 中误阻;
  2. 通過(guò) eventLoop.inEventLoop() 來(lái)判斷當(dāng)前線程是否為該 EventLoop 自身發(fā)起的,如果是,則調(diào)用 register0() 直接注冊(cè)究反;
  3. 如果不是寻定,說(shuō)明該 EventLoop 中的線程此時(shí)沒(méi)有執(zhí)行權(quán),則需要新建一個(gè)線程精耐,單獨(dú)封裝一個(gè) Task狼速,而該 Task 的主要任務(wù)則是執(zhí)行 register0()

無(wú)論當(dāng)前 EventLoop 的線程是否擁有執(zhí)行權(quán)卦停,最終都會(huì)要執(zhí)行 register0()向胡,如下:

        private void register0(ChannelPromise promise) {
            try {
                // 確保 Channel 處于 open
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                    
                // 真正的注冊(cè)動(dòng)作
                doRegister();
                
                neverRegistered = false;
                registered = true;        
            
                pipeline.invokeHandlerAddedIfNeeded();    
                safeSetSuccess(promise);        //設(shè)置注冊(cè)結(jié)果為成功
                
                pipeline.fireChannelRegistered();
            
                if (isActive()) { 
                    //如果是首次注冊(cè),發(fā)起 pipeline 的 fireChannelActive
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

如果 Channel 處于 open 狀態(tài),則調(diào)用 doRegister() 方法完成注冊(cè)惊完,然后將注冊(cè)結(jié)果設(shè)置為成功僵芹。最后判斷如果是首次注冊(cè)且處于激活狀態(tài),則發(fā)起 pipeline 的 fireChannelActive()小槐。

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 注冊(cè)到NIOEventLoop的Selector上
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

這里注冊(cè)時(shí) ops 設(shè)置的是 0拇派,也就是說(shuō) ServerSocketChannel 僅僅只是表示了注冊(cè)成功,還不能監(jiān)聽(tīng)任何網(wǎng)絡(luò)操作凿跳,這樣做的目的是(摘自《Netty權(quán)威指南(第二版)》):

  1. 注冊(cè)方式是多態(tài)的件豌,它既可以被 NIOServerSocketChannel 用來(lái)監(jiān)聽(tīng)客戶端的連接接入,也可以注冊(cè) SocketChannel 用來(lái)監(jiān)聽(tīng)網(wǎng)絡(luò)讀或者寫(xiě)操作拄显。
  2. 通過(guò) SelectionKey.interestOps(int ops) 方法可以方便地修改監(jiān)聽(tīng)操作位苟径。所以案站,此處注冊(cè)需要獲取 SelectionKey 并給 AbstractNIOChannel 的成員變量 selectionKey 賦值躬审。

由于這里 ops 設(shè)置為 0,所以還不能監(jiān)聽(tīng)讀寫(xiě)事件蟆盐。調(diào)用 doRegister()后承边,然后調(diào)用pipeline.invokeHandlerAddedIfNeeded();,這個(gè)時(shí)候控制臺(tái)會(huì)出現(xiàn) loggin-handlerAdded石挂,內(nèi)部如何調(diào)用博助,我們?cè)谄饰?pipeline 時(shí)再做詳細(xì)分析。然后將注冊(cè)結(jié)果設(shè)置為成功(safeSetSuccess(promise))痹愚。調(diào)用 pipeline.fireChannelRegistered(); 這個(gè)時(shí)候控制臺(tái)會(huì)打印 loggin-channelRegistered富岳。這里簡(jiǎn)單分析下該方法。

    public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
    
    static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

pipeline 維護(hù)著 handle 鏈表拯腮,事件會(huì)在 NioServerSocketChannel 的 pipeline 中傳播窖式。最終都會(huì)調(diào)用 next.invokeChannelRegistered(),如下:

    private void invokeChannelRegistered() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRegistered(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRegistered();
        }
    }

invokeChannelRegistered() 會(huì)調(diào)用我們?cè)谇懊嬖O(shè)置的 handler (還記得簽名的 handler(new LoggingServerHandler() )么)的 channelRegistered()动壤,這個(gè)時(shí)候控制臺(tái)應(yīng)該會(huì)打印 loggin-channelRegistered萝喘。

到這里initAndRegister() (final ChannelFuture regFuture = initAndRegister();)就分析完畢了,該方法主要做如下三件事:

  1. 通過(guò)反射產(chǎn)生了一個(gè) NioServerSocketChannle 對(duì)象;
  2. 調(diào)用 init(channel)完成初始化工作阁簸;
  3. 將NioServerSocketChannel進(jìn)行了注冊(cè)爬早。

initAndRegister()篇幅較長(zhǎng),分析完畢了启妹,我們?cè)俜祷氐?code>doBind(final SocketAddress localAddress)筛严。在 doBind(final SocketAddress localAddress) 中如果 initAndRegister()執(zhí)行完成,則 regFuture.isDone() 則為 true翅溺,執(zhí)行 doBind0()脑漫。如果沒(méi)有執(zhí)行完成,則會(huì)注冊(cè)一個(gè)監(jiān)聽(tīng) ChannelFutureListener咙崎,當(dāng) initAndRegister() 完成后优幸,會(huì)調(diào)用該監(jiān)聽(tīng)的 operationComplete()方法,最終目的還是執(zhí)行 doBind0()褪猛。故而我們下面分析 doBind0()到底做了些什么网杆。源碼如下:

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

doBind0() 較為簡(jiǎn)單,首先new 一個(gè)線程 task伊滋,然后將該任務(wù)提交到 NioEventLoop 中進(jìn)行處理碳却,我們先看 execute()

  public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

調(diào)用 inEventLoop() 判斷當(dāng)前線程是否為該 NioEventLoop 所關(guān)聯(lián)的線程笑旺,如果是昼浦,則調(diào)用 addTask() 將任務(wù) task 添加到隊(duì)列中,如果不是筒主,則先啟動(dòng)線程关噪,在調(diào)用 addTask() 將任務(wù) task 添加到隊(duì)列中。addTask() 如下:

    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

offerTask()添加到隊(duì)列中:

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }

task 添加到任務(wù)隊(duì)列 taskQueue成功后乌妙,執(zhí)行任務(wù)會(huì)調(diào)用如下方法:

 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

channel 首先調(diào)用 bind() 完成 channel 與端口的綁定使兔,如下:

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return tail.bind(localAddress, promise);
    }

tail 在 DefaultChannelPipeline 中定義:final AbstractChannelHandlerContext tail; 有 tail 就會(huì)有 head ,在 DefaultChannelPipeline 中維護(hù)這一個(gè) AbstractChannelHandlerContext 節(jié)點(diǎn)的雙向鏈表藤韵,該鏈表是實(shí)現(xiàn) Pipeline 機(jī)制的關(guān)鍵虐沥,更多詳情會(huì)在 ChannelPipeline 中做詳細(xì)說(shuō)明。bind() 最終會(huì)調(diào)用 DefaultChannelPipeline 的 bind() 方法泽艘。如下:

    public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        if (!validatePromise(promise, false)) {
            // cancelled
            return promise;
        }

        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }

首先對(duì) localAddress 欲险、 promise 進(jìn)行校驗(yàn),符合規(guī)范則調(diào)用 findContextOutbound() 匹涮,該方法用于在 pipeline 中獲取 AbstractChannelHandlerContext 雙向鏈表中的一個(gè)節(jié)點(diǎn)天试,如下:

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

從該方法可以看出,所獲取的節(jié)點(diǎn)是從 tail 開(kāi)始遍歷焕盟,獲取第一個(gè)節(jié)點(diǎn)屬性 outbound 為 true 的節(jié)點(diǎn)秋秤。其實(shí)該節(jié)點(diǎn)是 AbstractChannelHandlerContext 雙向鏈表的 head 節(jié)點(diǎn)宏粤。獲取該節(jié)點(diǎn)后,調(diào)用 invokeBind()灼卢,如下:

    private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            bind(localAddress, promise);
        }
    }

handler() 返回的是 HeadContext 對(duì)象绍哎,然后調(diào)用其bind(),如下:

        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }

unsafe 定義在 HeadContext 中鞋真,在構(gòu)造函數(shù)中初始化(unsafe = pipeline.channel().unsafe();)崇堰,調(diào)用 bind() 如下:

        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {

                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 最核心方法
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

內(nèi)部調(diào)用 doBind() ,該方法為綁定中最核心的方法涩咖,位于 NioServerSocketChannel 中海诲,如下:

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

javaChannel()返回的是 NioServerSocketChannel 實(shí)例初始化時(shí)所產(chǎn)生的 Java NIO ServerSocketChannel 實(shí)例(ServerSocketChannelImple實(shí)例),然后調(diào)用其 bind()檩互,如下:

    public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
        Object var3 = this.lock;
        synchronized(this.lock) {
            if(!this.isOpen()) {
                throw new ClosedChannelException();
            } else if(this.isBound()) {
                throw new AlreadyBoundException();
            } else {
                InetSocketAddress var4 = var1 == null?new InetSocketAddress(0):Net.checkAddress(var1);
                SecurityManager var5 = System.getSecurityManager();
                if(var5 != null) {
                    var5.checkListen(var4.getPort());
                }

                NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
                Net.bind(this.fd, var4.getAddress(), var4.getPort());
                Net.listen(this.fd, var2 < 1?50:var2);
                Object var6 = this.stateLock;
                synchronized(this.stateLock) {
                    this.localAddress = Net.localAddress(this.fd);
                }

                return this;
            }
        }
    }

該方法屬于 Java NIO 層次的特幔,該方法涉及到服務(wù)端端口的綁定,端口的監(jiān)聽(tīng)闸昨,這些內(nèi)容在后續(xù)的 Channel 時(shí)做詳細(xì)介紹蚯斯。

到這里就真正完成了服務(wù)端端口的綁定。

這篇博客比較長(zhǎng)饵较,大體上從源碼層次稍微解讀了 Netty 服務(wù)端的啟動(dòng)過(guò)程拍嵌,當(dāng)中涉及到 Netty 的各個(gè)核心組件,只能籠統(tǒng)來(lái)描述服務(wù)端的啟動(dòng)過(guò)程循诉,具體的細(xì)節(jié)部分還需要后續(xù)做詳細(xì)分析横辆,而且其中有多個(gè)點(diǎn)還是懵懵懂懂,相信在后面對(duì) Netty 的分析過(guò)程會(huì)一一解答茄猫。

謝謝閱讀狈蚤,祝好!!!

參考資料


歡迎掃一掃我的公眾號(hào)關(guān)注 — 及時(shí)得到博客訂閱哦!

個(gè)人微信公眾號(hào)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末募疮,一起剝皮案震驚了整個(gè)濱河市炫惩,隨后出現(xiàn)的幾起案子僻弹,更是在濱河造成了極大的恐慌阿浓,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蹋绽,死亡現(xiàn)場(chǎng)離奇詭異芭毙,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)卸耘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門退敦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人蚣抗,你說(shuō)我怎么就攤上這事侈百。” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,543評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵钝域,是天一觀的道長(zhǎng)讽坏。 經(jīng)常有香客問(wèn)我,道長(zhǎng)例证,這世上最難降的妖魔是什么路呜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,221評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮织咧,結(jié)果婚禮上胀葱,老公的妹妹穿的比我還像新娘。我一直安慰自己笙蒙,他們只是感情好抵屿,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,224評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著捅位,像睡著了一般晌该。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上绿渣,一...
    開(kāi)封第一講書(shū)人閱讀 49,007評(píng)論 1 284
  • 那天朝群,我揣著相機(jī)與錄音,去河邊找鬼中符。 笑死姜胖,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的淀散。 我是一名探鬼主播右莱,決...
    沈念sama閱讀 38,313評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼档插!你這毒婦竟也來(lái)了慢蜓?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,956評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤郭膛,失蹤者是張志新(化名)和其女友劉穎晨抡,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體则剃,經(jīng)...
    沈念sama閱讀 43,441評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡耘柱,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,925評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了棍现。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片调煎。...
    茶點(diǎn)故事閱讀 38,018評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情占锯,我是刑警寧澤审磁,帶...
    沈念sama閱讀 33,685評(píng)論 4 322
  • 正文 年R本政府宣布玷室,位于F島的核電站药版,受9級(jí)特大地震影響爷肝,放射性物質(zhì)發(fā)生泄漏笛质。R本人自食惡果不足惜西土,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,234評(píng)論 3 307
  • 文/蒙蒙 一讶舰、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧需了,春花似錦跳昼、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,240評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至墓造,卻和暖如春堪伍,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背觅闽。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,464評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工帝雇, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蛉拙。 一個(gè)月前我還...
    沈念sama閱讀 45,467評(píng)論 2 352
  • 正文 我出身青樓尸闸,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親孕锄。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吮廉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,762評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容