Netty源碼筆記之ServerBootstrap

前一章節(jié)我們分析了客戶端啟動(Bootstrap)的流程靶擦,接下來我們就分析下服務(wù)端的一個啟動流程吧坷襟。代碼來自Netty官方example的echo示例心墅。

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

和客戶端代碼相比大致是相同的枷遂,只有些許配置部分不同漆枚。

  • 配置EventLoopGroup(NioEventLoopGroup);
  • 配置Channel(NioServerSocketChannel)膀曾;
  • 配置Handler

配置EventLoopGroup

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
    return this;
}

在服務(wù)端這里我們配置了2個EventLoopGroup县爬,之所有要配置2個是因?yàn)椋琋etty將其分別處理不同的任務(wù)添谊,1個用來處理客戶端的連接财喳,一個處理客戶端的IO任務(wù),各司其職斩狱,才能更加高效的完成網(wǎng)絡(luò)任務(wù)耳高,如下圖任務(wù)所示。

Netty線程模型

這里的BossGroup配置調(diào)用了父類AbstractBootstrap的構(gòu)造方法所踊,如下所示:

public B group(EventLoopGroup group) {
    ObjectUtil.checkNotNull(group, "group");
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return self();
}

也就是我們配置的BossGroup最終是放到了ServerBootstrap的group字段中泌枪,而WorkerGroup放到了ServerBootstrap的childGroup中。

配置Channel

繼續(xù)往下走便到了配置Channel的類型了秕岛,因?yàn)槭欠?wù)端所以我們這里配置的是一個NioServerSocketChannel.class:

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

這里我們實(shí)例化了一個ReflectiveChannelFactory類碌燕,通過名字可以知道這是一個Channel反射工廠類,看下其實(shí)現(xiàn):

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        this.constructor = clazz.getConstructor();
    }

    @Override
    public T newChannel() {
        return constructor.newInstance();
    }
}

通過源碼可以看出继薛,該類的作用是根據(jù)傳遞進(jìn)來的Channel類型獲取對應(yīng)的默認(rèn)構(gòu)造方法修壕,最后通過newChannel方法實(shí)例化Channel對象。

實(shí)例化ReflectiveChannelFactory對象后遏考,通過channelFactory方法設(shè)置對應(yīng)的ChannelFactory對象

public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);
}

配置handler

public B handler(ChannelHandler handler) {
    this.handler = ObjectUtil.checkNotNull(handler, "handler");
    return self();
}

這里我們配置了一個LoggingHandler并且設(shè)置日志級別為LogLevel.INFO慈鸠。

配置ChildHandler

public ServerBootstrap childHandler(ChannelHandler childHandler) {
    this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
    return this;
}

這里使用了Netty提供的一個特殊ChannelHandler抽象類ChannelInitializer,這里我們先不表诈皿,后面的內(nèi)容會講到該內(nèi)容林束。

綁定

當(dāng)所有的參數(shù)設(shè)置好之后,就到了我們綁定的階段了稽亏,先來看下代碼實(shí)現(xiàn)吧:

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

public ChannelFuture bind(SocketAddress localAddress) {
    //校驗(yàn)group和ChannelFactory是否為空,如果為空則會拋出IllegalStateException異常
    validate();
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    //初始化并注冊Channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        //綁定本地端口
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

這里我們先看下initAndRegister方法,這個方法我們在Bootstrap中也有講到過缕题,這里繼續(xù)來看下:

final ChannelFuture initAndRegister() {
    //...省略部分代碼
    Channel channel = channelFactory.newChannel();
    init(channel);
    return regFuture;
}

該方法中使用到了ChannelFactory的newChannel方法截歉,在之前我們提到過ChannelFactory是用于將給定的Channel類型類,然后通過反射構(gòu)造方法進(jìn)行實(shí)例化對象烟零,所以我們這里實(shí)例化的對象為NioServerSocketChannel瘪松。接下來看下該類的一個結(jié)構(gòu)圖咸作。

NioServerSocketChanne類圖

再來看下NioServerSocketChannel的默認(rèn)構(gòu)造方法里做了哪些處理吧。

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {

    private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
         return provider.openServerSocketChannel();
    }

    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    public NioServerSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}

在NioServerSocketChannel默認(rèn)構(gòu)造函數(shù)中宵睦,調(diào)用了newSocket方法记罚,通過該方法開啟了一個服務(wù)端的ServerSocketChannel,最終調(diào)用父類AbstractNioChannel的構(gòu)造方法:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}

在NioServerSocketChannel中我們配置感興趣的事件為SelectionKey.OP_ACCEPT壳嚎,代表只監(jiān)聽連接事件桐智,父類AbstractNioChannel這里設(shè)置了Channel事件,可以看到這里還調(diào)用了父類方法將parent參數(shù)傳遞進(jìn)去烟馅,來看下具體實(shí)現(xiàn)说庭。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    //實(shí)例化一個DefaultChannelId
    id = newId();
    //實(shí)例化對象為NioMessageUnsafe
    unsafe = newUnsafe();
    //實(shí)例化DefaultChannelPipeline(AbstractHandlerContext為head、tail的雙向鏈表Handler節(jié)點(diǎn))
    pipeline = newChannelPipeline();
}

再回到我們的initAndRegister方法郑趁,繼續(xù)往下執(zhí)行就到了init方法了刊驴,因?yàn)樵诟割怉bstractBootstrap中該方法是一個抽象方法,所以這個方法的實(shí)現(xiàn)是交給ServerBootstrap來實(shí)現(xiàn)的寡润,如下所示:

void init(Channel channel) {
    //設(shè)置Channel的一些網(wǎng)絡(luò)配置選項(xiàng)
    setChannelOptions(channel, newOptionsArray(), logger);
    //設(shè)置Channel的一些配置屬性
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
    }
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

init方法向ChannelPipeline添加了一個ChannelInitializer抽象類Handler捆憎,該Handler的initChannel方法會在該Handler添加后調(diào)用,看下其實(shí)現(xiàn)梭纹。

public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }
    return this;
}

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //檢查是否重復(fù)添加
        checkMultiplicity(handler);
        //實(shí)例化一個DefaultChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);
        //將其添加到雙向鏈表中
        addLast0(newCtx);

        //如果registered是false意味著Channel還沒有注冊到EventLoop.
        //在這種情況下躲惰,我們將上下文添加到管道中,并添加一個任務(wù)栗柒,該任務(wù)將在注冊通道后調(diào)用ChannelHandler.handlerAdded(...)
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

這里我們通過斷點(diǎn)調(diào)試發(fā)現(xiàn)registered這個變量是false礁扮,這就意味著這時(shí)候Channel還沒有注冊到EventLoop上的,所以我們來看下callHandlerCallbackLater方法實(shí)現(xiàn):

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}

可以看到這里使用了一個待處理的PendingHandlerAddedTask類對象來處理瞬沦,看下其實(shí)現(xiàn):

private final class PendingHandlerAddedTask extends PendingHandlerCallback {

    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    public void run() {
        callHandlerAdded0(ctx);
    }

    @Override
    void execute() {
        EventExecutor executor = ctx.executor();
        if (executor.inEventLoop()) {
            callHandlerAdded0(ctx);
        } else {
            //...省略try-catch代碼
            executor.execute(this);
        }
    }
}

這個任務(wù)采用異步待處理任務(wù)來執(zhí)行任務(wù)太伊,最終的任務(wù)是通過調(diào)用callHandlerAdded0方法來實(shí)現(xiàn)的,看下其實(shí)現(xiàn):

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    //...
    ctx.callHandlerAdded();
}

這里繼續(xù)深入AbstractChannelHandlerContext#callHandlerAdded方法:

final void callHandlerAdded() throws Exception {
    // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
    // any pipeline events ctx.handler() will miss them because the state will not allow it.
    if (setAddComplete()) {
        handler().handlerAdded(this);
    }
}

到這里我們就清晰了逛钻,這里調(diào)用的并是ChannelInitializer#handlerAdded方法了:

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            //移除該ChannelHandlerContext也就是對應(yīng)的ChannelInitializer抽象類
            removeState(ctx);
        }
    }
}

這里判斷了NioServerSocketChannel是否已經(jīng)注冊了僚焦,然后調(diào)用initChannel方法,如下:

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

在initChannel方法中曙痘,首先獲取ChannelPipeline芳悲,這里handler()獲取的便是我們之前的配置Handler,如果有配置Handler便將其添加到ChannelPipeline中边坤,這里的handler()我們配置的是LoggingHandler名扛。最后通過綁定在該Channel上的EventLoop線程執(zhí)行一個異步任務(wù),將ServerBootstrapAcceptor添加到ChannelPipeline中茧痒,來看下其實(shí)現(xiàn):

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    //配置的WorkerGroup
    private final EventLoopGroup childGroup;
    //配置的childHandler
    private final ChannelHandler childHandler;
    //配置的childOptions
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    //配置的childAttrs
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;

    ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;

        enableAutoReadTask = new Runnable() {
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //這里的msg便是接收的客戶端連接對象Channel(NioSocketChannel)
        final Channel child = (Channel) msg;
        //將配置的childHandler添加到NioSocketChannel的ChannelPipeline中
        child.pipeline().addLast(childHandler);
        //設(shè)置ChannelOption
        setChannelOptions(child, childOptions, logger);
        //設(shè)置Channel屬性
        setAttributes(child, childAttrs);

        //執(zhí)行EventLoop與Channel的綁定工作
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    }
}

有好奇的小伙伴可能會有疑問channelRead方法是在什么時(shí)候調(diào)用的肮韧,我也比較好奇,不過這里先不表。

再回到我們的initAndRegister方法弄企,繼續(xù)往下執(zhí)行:

//...
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
        channel.close();
    } else {
        channel.unsafe().closeForcibly();
    }
}
//...

這里主要關(guān)注register(channel)方法超燃,這里group()方法也就是我們之前配置的NioEventLoopGroup(BossGroup),跳該方法是由父類MultithreadEventLoopGroup實(shí)現(xiàn)的拘领,代碼如下:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

通過next()方法我們返回的是NioEventLoop對象意乓,該方法也是由父類SingleThreadEventLoop來實(shí)現(xiàn)的,如下:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

這里又有熟悉的Unsafe對象约素,其實(shí)現(xiàn)為NioMessageUnsafe届良,該方法也是由父類AbstractUnsafe來實(shí)現(xiàn)的,如下:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

這里我們跟進(jìn)register0方法即可:

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

這里繼續(xù)跟進(jìn)doRegister方法业汰,該方法由抽象類AbstractNioChannel完成:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            //將該channel注冊到selector多路復(fù)用器
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

在完成initAndRegister方法后伙窃,繼續(xù)往下執(zhí)行來到了doBind0方法,如下:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

這里是通過綁定在Channel上的EventLoop調(diào)度一個異步任務(wù)執(zhí)行channel.bind方法样漆,這里的Channel是NioServerSocketChannel为障,我們跟進(jìn)該方法:

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

這里調(diào)用的是ChannelPipeline的bind方法,繼續(xù)跟進(jìn):

 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

這里使用的是tail(AbstractHandlerContext)節(jié)點(diǎn)bind方法放祟,繼續(xù)進(jìn)去:

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        return promise;
    }
    //我們最終找到的是HeadContext
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        //調(diào)用HeadContext的bind方法
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}

AbstractHandlerContext#invokeBind:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

因?yàn)槲覀冎繦andler是HeadContext鳍怨,所以我們直接定位到HeadContext#bind方法即可:

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}

看到該方法內(nèi)部是調(diào)用了NioMessageUnsafe#bind方法,這個在前文中有提及到跪妥,可以翻閱前文查看鞋喇。這里我們直接跳到NioMessageUnsafe的父類AbstractUnsafe#bind方法:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

這里我們其它暫時(shí)忽略,暫時(shí)只看doBind方法即可眉撵,該方法在父類中是個抽象方法侦香,具體的實(shí)現(xiàn)是由其子類NioServerSocketChannel實(shí)現(xiàn)的:

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

這里又回到了熟悉的Java底層API了,并且這里還針對JVM版本做了不同的處理纽疟,該方法將給定的地址端口進(jìn)行綁定操作罐韩。

到此為止,關(guān)于ServerBootstrap的整個啟動流程就完成了污朽,接下來我們分析下客戶端連接上服務(wù)端是如何處理的散吵。

首先我們定位到NioEventLoop#run方法:

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

這里我們暫時(shí)先看processSelectedKeys()方法:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

跟進(jìn)processSelectedKeysOptimized方法:

private void processSelectedKeysOptimized() {
    //遍歷所有的key
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;
        //因?yàn)閍ttachment我們是存放著對應(yīng)的channel,所以這里從attachment中獲取channel
        final Object a = k.attachment();
        
        if (a instanceof AbstractNioChannel) {
            //處理對應(yīng)的channel
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

這里我們直接看processSelectedKey方法:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

接下來我們重點(diǎn)放在unsafe.read方法上,由上面可知該Unsafe的實(shí)現(xiàn)為NioMessageUnsafe:

public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    //處理socket
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            //遍歷消息然后通過ChannelPipeline觸發(fā)channelRead
            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);
                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

在doReadMessages方法中蟆肆,主要處理接收Socket的連接矾睦,如果SocketChannel不為空,則添加到buf中即可炎功。

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    //...省略部分代碼
    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
}

最終的會存儲在List<Object>的readBuf中枚冗,我們客戶端連接上來后會在該列表中保存一個NioSocketChannel對象∩咚穑可以看到如果SocketChannel不為空的話官紫,則會實(shí)例化一個NioSocketChannel對象肛宋,我們來看看這個對象州藕。

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

跟進(jìn)父類AbstractNioByteChannel:

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

繼續(xù)跟進(jìn)父類(AbstractNioChannel)束世,這里需要注意下這里SelectionKey.OP_READ表示監(jiān)聽讀事件:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}

居然還有父類(AbstractChannel),繼續(xù)跟進(jìn)去看看:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

原來和之前一樣床玻,老三件了毁涉。

最后通過ChannelPipeline觸發(fā)ChannelRead事件,還記得我們之前在ServerBootstrap中注冊了一個ChannelInitializer中為客戶端Channel注冊了一個ServerBootstrapAcceptor處理器嗎锈死,而在這里就派上用場了贫堰。通過ChannelPipeline的fireChannelRead方法,最終也會調(diào)用到ServerBootstrapAcceptor對象的channelRead方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //這里的msg為NioSocketChannel
    final Channel child = (Channel) msg;
    //配置對應(yīng)的childHandler
    child.pipeline().addLast(childHandler);
    //配置對應(yīng)的childOptions
    setChannelOptions(child, childOptions, logger);
    //配置對應(yīng)的Attributes
    setAttributes(child, childAttrs);

    try {
        //childGroup為之前配置的WorkerGroup,調(diào)用register方法將NioSocketChannel與EventLoop進(jìn)行綁定
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

到這里為止待牵,Netty服務(wù)端啟動以及獲取客戶端連接的整個流程就已經(jīng)清晰了其屏。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市缨该,隨后出現(xiàn)的幾起案子偎行,更是在濱河造成了極大的恐慌,老刑警劉巖贰拿,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蛤袒,死亡現(xiàn)場離奇詭異,居然都是意外死亡膨更,警方通過查閱死者的電腦和手機(jī)妙真,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來荚守,“玉大人珍德,你說我怎么就攤上這事〈Q” “怎么了锈候?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長缩功。 經(jīng)常有香客問我晴及,道長,這世上最難降的妖魔是什么嫡锌? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任虑稼,我火速辦了婚禮,結(jié)果婚禮上势木,老公的妹妹穿的比我還像新娘蛛倦。我一直安慰自己癌幕,他們只是感情好后专,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著圃郊,像睡著了一般。 火紅的嫁衣襯著肌膚如雪且改。 梳的紋絲不亂的頭發(fā)上验烧,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機(jī)與錄音又跛,去河邊找鬼碍拆。 笑死,一個胖子當(dāng)著我的面吹牛慨蓝,可吹牛的內(nèi)容都是我干的感混。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼礼烈,長吁一口氣:“原來是場噩夢啊……” “哼弧满!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起此熬,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤庭呜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后摹迷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疟赊,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年峡碉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了近哟。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡鲫寄,死狀恐怖吉执,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情地来,我是刑警寧澤戳玫,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站未斑,受9級特大地震影響咕宿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蜡秽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一府阀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧芽突,春花似錦试浙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽钠糊。三九已至,卻和暖如春壹哺,著一層夾襖步出監(jiān)牢的瞬間抄伍,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工斗躏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留逝慧,地道東北人。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓啄糙,卻偏偏與公主長得像,于是被迫代替她去往敵國和親云稚。 傳聞我的和親對象是個殘疾皇子隧饼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評論 2 344