Netty源碼分析——客戶端接入accept過程

基于Netty源代碼版本:netty-all-4.1.33.Final

netty中的reactor線程

netty中最核心的東西莫過于兩種類型的reactor線程,可以看作netty中兩種類型的發(fā)動機,驅動著netty整個框架的運轉

一種類型的reactor線程是boos線程組蚯斯,專門用來接受新的連接搔驼,然后封裝成channel對象扔給worker線程組签赃;還有一種類型的reactor線程是worker線程組痪欲,專門用來處理連接的讀寫咬腋。


不管是boos線程還是worker線程,所做的事情均分為以下三個步驟

  • 輪詢出注冊在selector上面的IO事件(select)
  • 處理這些IO事件(process selected keys)
  • 執(zhí)行異步task

對于boos線程來說向瓷,第一步輪詢出來的基本都是 accept 事件肠套,表示有新的連接,而worker線程輪詢出來的基本都是read/write事件猖任,表示網(wǎng)絡的讀寫事件

新連接的建立

簡單來說你稚,新連接的建立可以分為三個步驟:

  • 1、檢測到有新的連接
  • 2朱躺、將新的連接注冊到worker線程組
  • 3刁赖、注冊新連接的讀事件

檢測到有新連接進入

我們已經(jīng)知道,當服務端綁啟動之后长搀,服務端的channel已經(jīng)注冊到boos reactor線程中宇弛,reactor不斷檢測有新的事件,直到檢測出有accept事件發(fā)生
NioEventLoop.java

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    //檢查該SelectionKey是否有效源请,如果無效枪芒,則關閉channel
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞谁尸,這樣可能會出現(xiàn)cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        // 如果準備好了WRITE則將緩沖區(qū)中的數(shù)據(jù)發(fā)送出去舅踪,如果緩沖區(qū)中數(shù)據(jù)都發(fā)送完成,則清除之前關注的OP_WRITE標記
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 如果準備好READ或ACCEPT則觸發(fā)unsafe.read() ,檢查是否為0良蛮,如上面的源碼英文注釋所說:解決JDK可能會產生死循環(huán)的一個bug抽碌。
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況

  • 1)OP_ACCEPT背镇,接受客戶端連接
  • 2)OP_READ, 可讀事件, 即 Channel 中收到了新數(shù)據(jù)可供上層讀取咬展。
  • 3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數(shù)據(jù)。
  • 4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經(jīng)建立, Channel 處于 active 狀態(tài)瞒斩。

主要來看下當boss線程 selector檢測到OP_ACCEPT事件時破婆,內部干了些什么。

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 如果準備好READ或ACCEPT則觸發(fā)unsafe.read() ,檢查是否為0胸囱,如上面的源碼英文注釋所說:解決JDK可能會產生死循環(huán)的一個bug祷舀。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

boos reactor線程已經(jīng)輪詢到 SelectionKey.OP_ACCEPT 事件,說明有新的連接進入烹笔,此時將調用channel的 unsafe來進行實際的操作,此時的channel為 NioServerSocketChannel裳扯,則unsafe為NioServerSocketChannel的屬性NioMessageUnsafe

那么,我們進入到它的read方法谤职,進入新連接處理的第二步
注冊到reactor線程
NioMessageUnsafe.java

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

調用 doReadMessages 方法不斷地讀取消息饰豺,用 readBuf 作為容器,這里允蜈,其實可以猜到讀取的是一個個連接冤吨,然后調用 pipeline.fireChannelRead()蒿柳,將每條新連接經(jīng)過一層服務端channel的洗禮,之后清理容器漩蟆,觸發(fā) pipeline.fireChannelReadComplete()
下面我們具體看下這兩個方法:

  • 1垒探、doReadMessages(List)
  • 2、pipeline.fireChannelRead(NioSocketChannel)

doReadMessages()

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

@Override
protected ServerSocketChannel javaChannel() {
    return (ServerSocketChannel) super.javaChannel();
}

public final class SocketUtils {
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
}

我們終于窺探到netty調用jdk底層nio的邊界serverSocketChannel.accept();怠李,由于netty中reactor線程第一步就掃描到有accept事件發(fā)生圾叼,因此,這里的accept方法是立即返回的捺癞,返回jdk底層nio創(chuàng)建的一條channel
ServerSocketChannel有阻塞和非阻塞兩種模式:

  • a夷蚊、阻塞模式:ServerSocketChannel.accept() 方法監(jiān)聽新進來的連接,當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel翘簇。阻塞模式下, accept()方法會一直阻塞到有新連接到達撬码。
  • b、非阻塞模式:accept() 方法會立刻返回版保,如果還沒有新進來的連接,返回的將是null。 因此夫否,需要檢查返回的SocketChannel是否是null.

在NioServerSocketChannel的構造函數(shù)分析中彻犁,我們知道,其通過ch.configureBlocking(false);語句設置當前的ServerSocketChannel為非阻塞的凰慈。

在NioServerSocketChannel的構造函數(shù)分析中汞幢,我們知道,在其父類AbstractNioChannel的構造函數(shù)中通過ch.configureBlocking(false);語句設置當前的ServerSocketChannel為非阻塞的微谓。

netty將jdk的 SocketChannel 封裝成自定義的 NioSocketChannel森篷,加入到list里面,這樣外層就可以遍歷該list豺型,做后續(xù)處理

從上一篇文章中锡宋,我們已經(jīng)知道服務端的創(chuàng)建過程中會創(chuàng)建netty中一系列的核心組件叉钥,包括pipeline,unsafe等等,那么,接受一條新連接的時候是否也會創(chuàng)建這一系列的組件呢爹袁?

帶著這個疑問,我們跟進去
NioSocketChannel.java

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

我們重點分析 super(parent, socket)其屏,NioSocketChannel的父類為 AbstractNioByteChannel
AbstractNioByteChannel.java

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

這里证鸥,我們看到jdk nio里面熟悉的影子—— SelectionKey.OP_READ,一般在原生的jdk nio編程中娶眷,也會注冊這樣一個事件似嗤,表示對channel的讀感興趣

我們繼續(xù)往上,追蹤到AbstractNioByteChannel的父類 AbstractNioChannel, 這里届宠,我相信讀了上一篇文章你對于這部分代碼肯定是有印象的

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

在創(chuàng)建服務端channel的時候烁落,最終也會進入到這個方法壳咕,super(parent), 便是在AbstractChannel中創(chuàng)建一系列和該channel綁定的組件,如下

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

而這里的 readInterestOp 表示該channel關心的事件是 SelectionKey.OP_READ顽馋,后續(xù)會將該事件注冊到selector谓厘,之后設置該通道為非阻塞模式,在channel中創(chuàng)建 unsafe 和一條 pipeline

pipeline.fireChannelRead(NioSocketChannel)

對于 pipeline我們前面已經(jīng)了解過寸谜,在netty的各種類型的channel中竟稳,都會包含一個pipeline,字面意思是管道熊痴,我們可以理解為一條流水線工藝他爸,流水線工藝有起點,有結束果善,中間還有各種各樣的流水線關卡诊笤,一件物品,在流水線起點開始處理巾陕,經(jīng)過各個流水線關卡的加工讨跟,最終到流水線結束

對應到netty里面,流水線的開始就是HeadContxt鄙煤,流水線的結束就是TailConext晾匠,HeadContxt中調用Unsafe做具體的操作,TailConext中用于向用戶拋出pipeline中未處理異常以及對未處理消息的警告

通過前面的文章中梯刚,我們已經(jīng)知道在服務端的channel初始化時凉馆,在pipeline中,已經(jīng)自動添加了一個pipeline處理器 ServerBootstrapAcceptor, 并已經(jīng)將用戶代碼中設置的一系列的參數(shù)傳入了構造函數(shù)亡资,接下來澜共,我們就來看下ServerBootstrapAcceptor
ServerBootstrapAcceptor.java

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;

    ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;

        // Task which is scheduled to re-enable auto-read.
        // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
        // not be able to load the class because of the file limit it already reached.
        //
        // See https://github.com/netty/netty/issues/1328
        enableAutoReadTask = new Runnable() {
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;

        child.pipeline().addLast(childHandler);

        setChannelOptions(child, childOptions, logger);

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
}

前面的 pipeline.fireChannelRead(NioSocketChannel); 最終通過head->unsafe->ServerBootstrapAcceptor的調用鏈,調用到這里的 ServerBootstrapAcceptor 的channelRead方法锥腻,而 channelRead 一上來就把這里的msg強制轉換為 Channel

然后嗦董,拿到該channel,也就是我們之前new出來的 NioSocketChannel中對應的pipeline旷太,將用戶代碼中的 childHandler展懈,添加到pipeline,這里的 childHandler 在用戶代碼中的體現(xiàn)為

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         pipeline.addLast(new EchoServerHandler());
     }
 });

其實對應的是 ChannelInitializer供璧,到了這里存崖,NioSocketChannel中pipeline對應的處理器為 head->ChannelInitializer->tail,牢記睡毒,后面會再次提到来惧!

接著,設置 NioSocketChannel 對應的 attr和option演顾,然后進入到 childGroup.register(child)供搀,這里的childGroup就是我們在啟動代碼中new出來的NioEventLoopGroup

我們進入到NioEventLoopGroup的register方法隅居,代理到其父類MultithreadEventLoopGroup
MultithreadEventLoopGroup.java

@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

這里又扯出來一個 next()方法,我們跟進去

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

回到其父類
MultithreadEventExecutorGroup.java

private final EventExecutorChooserFactory.EventExecutorChooser chooser;

@Override
public EventExecutor next() {
    return chooser.next();
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    chooser = chooserFactory.newChooser(children);
}

這里的chooser對應的類為 EventExecutorChooser葛虐,字面意思為事件執(zhí)行器選擇器胎源,放到我們這里的上下文中的作用就是從worker reactor線程組中選擇一個reactor線程

@UnstableApi
public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         */
        EventExecutor next();
    }
}

chooser的實現(xiàn)有兩種

@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

默認情況下,chooser通過 DefaultEventExecutorChooserFactory被創(chuàng)建屿脐,在創(chuàng)建reactor線程選擇器的時候涕蚤,會判斷reactor線程的個數(shù),如果是2的冪的诵,就創(chuàng)建PowerOfTowEventExecutorChooser万栅,否則,創(chuàng)建GenericEventExecutorChooser

兩種類型的選擇器在選擇reactor線程的時候西疤,都是通過Round-Robin的方式選擇reactor線程烦粒,唯一不同的是,PowerOfTowEventExecutorChooser是通過與運算代赁,而GenericEventExecutorChooser是通過取余運算扰她,與運算的效率要高于求余運算

選擇完一個reactor線程,即 NioEventLoop 之后管跺,我們回到注冊的地方

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

SingleThreadEventLoop.java

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

其實义黎,這里已經(jīng)和服務端啟動的過程一樣了,可以參考我前面的文章

AbstractChannel$AbstractUnsafe

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

和服務端啟動過程一樣豁跑,先是調用 doRegister();做真正的注冊過程,如下
AbstractNioChannel

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

將該條channel綁定到一個selector上去泻云,一個selector被一個reactor線程使用艇拍,后續(xù)該channel的事件輪詢,以及事件處理宠纯,異步task執(zhí)行都是由此reactor線程來負責

綁定完reactor線程之后卸夕,調用 pipeline.invokeHandlerAddedIfNeeded()

前面我們說到,到目前為止NioSocketChannel 的pipeline中有三個處理器婆瓜,head->ChannelInitializer->tail快集,最終會調用到 ChannelInitializer 的 handlerAdded 方法

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        if (initChannel(ctx)) {

            // We are done with init the Channel, removing the initializer now.
            removeState(ctx);
        }
    }
}

handlerAdded方法調用 initChannel 方法之后,調用remove(ctx);將自身刪除

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}

而這里的 initChannel 方法又是神馬玩意廉白?讓我們回到用戶方法个初,比如下面這段用戶代碼
用戶代碼

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline pipeline = ch.pipeline();
         pipeline.addLast(new LoggingHandler(LogLevel.INFO));
         pipeline.addLast(new EchoServerHandler());
     }
 });

原來最終跑到我們自己的代碼里去了啊猴蹂!完了之后院溺,NioSocketChannel綁定的pipeline的處理器就包括 head->LoggingHandler->EchoServerHandler->tail

注冊讀事件

接下來,我們還剩下這些代碼沒有分析完
AbstractChannel$AbstractUnsafe

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

pipeline.fireChannelRegistered();磅轻,其實沒有干啥有意義的事情珍逸,最終無非是再調用一下業(yè)務pipeline中每個處理器的 ChannelHandlerAdded方法處理下回調

isActive()在連接已經(jīng)建立的情況下返回true逐虚,所以進入方法塊,進入到 pipeline.fireChannelActive();在這里我詳細步驟先省略谆膳,直接進入到關鍵環(huán)節(jié)

@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {    
    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}

這里其實就是將 SelectionKey.OP_READ事件注冊到selector中去叭爱,表示這條通道已經(jīng)可以開始處理read事件了

至此,netty中關于新連接的處理已經(jīng)向你展示完了漱病,我們做下總結

  • 1买雾、boos reactor線程輪詢到有新的連接進入
  • 2、通過封裝jdk底層的channel創(chuàng)建 NioSocketChannel以及一系列的netty核心組件
  • 3缨称、將該條連接通過chooser凝果,選擇一條worker reactor線程綁定上去
  • 4、注冊讀事件睦尽,開始新連接的讀寫

參考:
https://www.cnblogs.com/java-chen-hao/p/11477358.html

https://www.cnblogs.com/zhangboyu/p/7452611.html

https://www.cnblogs.com/cr1719/p/6360201.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末器净,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子当凡,更是在濱河造成了極大的恐慌山害,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沿量,死亡現(xiàn)場離奇詭異浪慌,居然都是意外死亡,警方通過查閱死者的電腦和手機朴则,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門权纤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人乌妒,你說我怎么就攤上這事汹想。” “怎么了撤蚊?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵古掏,是天一觀的道長。 經(jīng)常有香客問我侦啸,道長槽唾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任光涂,我火速辦了婚禮庞萍,結果婚禮上,老公的妹妹穿的比我還像新娘顶捷。我一直安慰自己挂绰,他們只是感情好,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著葵蒂,像睡著了一般交播。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上践付,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天秦士,我揣著相機與錄音,去河邊找鬼永高。 笑死隧土,一個胖子當著我的面吹牛,可吹牛的內容都是我干的命爬。 我是一名探鬼主播曹傀,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼饲宛!你這毒婦竟也來了皆愉?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤艇抠,失蹤者是張志新(化名)和其女友劉穎幕庐,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體家淤,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡异剥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了絮重。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片冤寿。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖青伤,靈堂內的尸體忽然破棺而出疚沐,到底是詐尸還是另有隱情,我是刑警寧澤潮模,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站痴施,受9級特大地震影響擎厢,放射性物質發(fā)生泄漏。R本人自食惡果不足惜辣吃,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一动遭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧神得,春花似錦厘惦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽酝静。三九已至,卻和暖如春羡玛,著一層夾襖步出監(jiān)牢的瞬間别智,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工稼稿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留薄榛,地道東北人。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓让歼,卻偏偏與公主長得像敞恋,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子谋右,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內容