Netty服務(wù)器源碼分析

本文基于Netty 4

在討論Netty服務(wù)器啟動(dòng)之前俩檬,先回顧一下服務(wù)端使用Java nio selector的啟動(dòng)過程:

//1. 獲取服務(wù)端通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 綁定端口
ssChannel.bind(new InetSocketAddress(9898));
//3. 設(shè)置為非阻塞模式
ssChannel.configureBlocking(false);

Selector selector = Selector.open();
//4. 向監(jiān)聽器注冊(cè)accept事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

while (selector.select() > 0) {
    //5. 獲取監(jiān)聽器上所有的監(jiān)聽事件值
    Iterator<SelectionKey> it = selector.selectedKeys().iterator();
    while (it.hasNext()) {
        SelectionKey key = it.next();
        if (key.isAcceptable()) {            
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            SocketChannel socketChannel = server.accept();
            socketChannel.configureBlocking(false);
            //注冊(cè)read事件
            socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (key.isReadable()) {
            SocketChannel channel = (SocketChannel) key.channel();
            readMsg(channel);
        } else if(...) {
            ...
        }
        it.remove();
    }
}

Netty nio模式的啟動(dòng)過程:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup);
    b.channel(NioServerSocketChannel.class);
    b.option(ChannelOption.SO_BACKLOG, 128);
    b.childOption(ChannelOption.SO_REUSEADDR, true);
    b.childOption(ChannelOption.TCP_NODELAY, true);
    b.childOption(ChannelOption.SO_KEEPALIVE, true);
    b.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("http-decoder", new HttpRequestDecoder());
            pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
            pipeline.addLast("encoder", new HttpResponseEncoder());
            pipeline.addLast(new MyInboundHandler());
        }
    });

    ChannelFuture f = b.bind(8080).sync();
    f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

其實(shí),無論是Java nio,還是Netty nio,總體都包含兩個(gè)方面:1. 線程模型则酝;2. IO模型;下面先分析Netty的線程模型闰集。

Netty線程啟動(dòng)

NioEventLoopGroup是Netty線程的核心沽讹,下面看一下這個(gè)類的初始化
NioEventLoopGroup # 構(gòu)造方法

public NioEventLoopGroup() {
      this(0);
}
//無參的構(gòu)造轉(zhuǎn)調(diào)到了這里
public NioEventLoopGroup(int nThreads) {
     //這里的Executor傳入了null
     this(nThreads, (Executor) null);
}

上面的構(gòu)造方法最終會(huì)調(diào)用父類的構(gòu)造方法
MultithreadEventLoopGroup # 構(gòu)造方法

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
     //這里若threads==0般卑,會(huì)初始化一個(gè)值;
    //該初始值的算法 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2))
     super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

最終走到這里 MultithreadEventExecutorGroup # 構(gòu)造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                       EventExecutorChooserFactory chooserFactory, Object... args) {
 if (nThreads <= 0) {
     throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
 }

 if (executor == null) {
     //之前executor設(shè)為了null爽雄,這里會(huì)賦一個(gè)值蝠检,這個(gè)executor的作用是創(chuàng)建新 Thread并運(yùn)行任務(wù);后面會(huì)發(fā)現(xiàn)它的作用
     executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
 }

 //這里會(huì)根據(jù)threads的值創(chuàng)建若干個(gè)EventExecutor
 children = new EventExecutor[nThreads];

 for (int i = 0; i < nThreads; i ++) {
     boolean success = false;
     try {
         //往里看一下children為何物
         children[i] = newChild(executor, args);
         success = true;
     } catch (Exception e) {
         // TODO: Think about if this is a good exception type
         throw new IllegalStateException("failed to create a child event loop", e);
     } finally {
         if (!success) {
              .........
         }
     }
 }
 //初始了chooser挚瘟,后面會(huì)使用該chooser選擇一個(gè)EventLoop
 chooser = chooserFactory.newChooser(children);

 final FutureListener<Object> terminationListener = new FutureListener<Object>() {
     @Override
     public void operationComplete(Future<Object> future) throws Exception {
         if (terminatedChildren.incrementAndGet() == children.length) {
             terminationFuture.setSuccess(null);
         }
     }
 };

 for (EventExecutor e: children) {
     e.terminationFuture().addListener(terminationListener);
 }

 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
 Collections.addAll(childrenSet, children);
 readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

NioEventLoopGroup # newChild()
上面的children[] 最終保存的是NioEventLoop

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
     return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop中最核心的方法就是execute()

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    //系統(tǒng)剛啟動(dòng)時(shí)叹谁,NioEventLoop中的Thread肯定為null,且state==ST_NOT_STARTED乘盖,那么肯定會(huì)走到else中
    if (inEventLoop) {
        addTask(task);
    } else {
        //這里會(huì)走到doStartThread()
        startThread();
        //startThread()方法如下
        //////////////////////////////////////////// 
           if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                //改變EventLoop的狀態(tài)并啟動(dòng)真正干活的線程焰檩,CAS操作保證了每個(gè)EventLoop只啟動(dòng)一個(gè)線程
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
         ////////////////////////////////////////////
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}
private void doStartThread() {
    assert thread == null;
    //這個(gè)executor就是上文在初始化時(shí)創(chuàng)建的 ThreadPerTaskExecutor
    //ThreadPerTaskExecutor 會(huì)創(chuàng)建新的線程
    executor.execute(new Runnable() {
        @Override
        public void run() {
            //這個(gè)thread常用于inEventLoop()方法
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                //run()為abstract方法,子類會(huì)實(shí)現(xiàn)
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                ......
            }
        }
    });
}

NioEventLoop # run()
該方法會(huì)進(jìn)入死循環(huán)订框,不停地按預(yù)先分配的時(shí)間比例處理IO任務(wù)和其他的例如scheduled任務(wù)

protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //NIO多路復(fù)用選擇器
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        ......
    }
}

通過上面的介紹锅尘,我們知道了初始化NioEventLoopGroup時(shí)會(huì)創(chuàng)建nThreads個(gè)NioEventLoop,當(dāng)有新task到來時(shí)布蔗,會(huì)根據(jù)其所屬Chanenl選擇對(duì)應(yīng)的EventLoop執(zhí)行(execute())

NioEventLoopGroup

線程已經(jīng)先行啟動(dòng)藤违,等著數(shù)據(jù)的到來;接下來看一下IO相關(guān)的初始化纵揍,Netty的IO模型是多路復(fù)用顿乒;

IO的啟動(dòng)

ServerBootstrap是IO的核心類,下面就從這個(gè)類入手泽谨,看一下IO的啟動(dòng)過程

1. 參數(shù)的設(shè)置
ServerBootstrap b = new ServerBootstrap();
//上面啟動(dòng)的線程(池)設(shè)置在這里
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
//tcp 三次握手使用的參數(shù)
b.option(ChannelOption.SO_BACKLOG, 128);
2. 綁定端口

在繼續(xù)深入源碼之前璧榄,需要區(qū)分一下register的含義;Java NIO的register一般指把ServerSocketChannel注冊(cè)到Selector上吧雹;在Netty中骨杂,注冊(cè)的含義不僅僅包含這層含義,還包括把NioServerSocketChannel與EventLoop關(guān)聯(lián)在一起

bind(port)會(huì)進(jìn)入AbstractBootstrap的doBind()中
AbstractBootstrap # doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    // 注冊(cè)已經(jīng)在EventLoop中進(jìn)行雄卷,因此下列方法可能和注冊(cè)過程并發(fā)進(jìn)行
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    // 注冊(cè)是通過線程池完成搓蚪,因此這里要判斷是否注冊(cè)完成
    // 注冊(cè)過程可能很快,這里已經(jīng)完成
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 綁定本地端口
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        // 沒有完成注冊(cè)的話丁鹉,就注冊(cè)一個(gè)監(jiān)聽器妒潭,注冊(cè)完成后進(jìn)行回調(diào)
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

AbstractBootstrap # initAndRegister()
這個(gè)方法開始使用前面已經(jīng)初始化的NioEventLoopGroup

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //設(shè)置參數(shù)時(shí),有 b.channel(NioServerSocketChannel.class)揣钦,
        //因此這里的channel實(shí)際就是 NioServerSocketChannel
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        ......
        return ......
    }
    //這里的group()實(shí)際返回的就是bossGroup雳灾,即上面創(chuàng)建的第一個(gè)NioEventLoopGroup,因此真正的注冊(cè)任務(wù)是由線程池來完成冯凹;
    //只有當(dāng)EventLoop與Channel關(guān)聯(lián)在一起谎亩,才能算注冊(cè)成功
    ChannelFuture regFuture = config().group().register(channel);    
    .......  
    return regFuture;
}

在真正注冊(cè)之前,會(huì)先實(shí)例化NioServerSocketChannel,接下來就看一下NioServerSocketChannel初始化過程匈庭;NioServerSocketChannel的繼承關(guān)系如下:

NioServerSocketChannel 繼承關(guān)系圖

channelFactory.newChannel()會(huì)調(diào)用NioServerSocketChannel的構(gòu)造方法:

public NioServerSocketChannel(ServerSocketChannel channel) {
        // NioServerSocketChannel 關(guān)心的事件是Accept夫凸,即新的客戶端連接
        super(null, channel, SelectionKey.OP_ACCEPT);
        //這里也會(huì)進(jìn)行內(nèi)存分配相關(guān)類的初始化
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

AbstractChannel

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //每個(gè)channel都有一個(gè)id
        id = newId(); 
        //真正與下層交互的東東,比如讀寫socket等操作
        unsafe = newUnsafe();
        //初始化為DefaultChannelPipeline
        pipeline = newChannelPipeline();
}

NioServerSocketChannel已初始化完畢嚎花,但還沒有與EventLoopGroup或EventLoop發(fā)生任何聯(lián)系

ServerBootstrap # init()

//上面已完成Channel的初始化,這里的channel就是 NioServerSocketChannel
void init(Channel channel) throws Exception {
    //一些attrs呀洲、options紊选、handler以及childAttrs、childOptions道逗、childHandler的設(shè)置
    .......
    //這里就是上面創(chuàng)建的DefaultChannelPipeline
    ChannelPipeline p = channel.pipeline();
    //之前創(chuàng)建的第二個(gè)NioEventLoopGroup(workerGroup)
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    //pipeline的鏈表最后加入了一個(gè)ChannelHandler
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

在addLast時(shí)兵罢,有一點(diǎn)要注意:如果還沒有完成注冊(cè),則把Handler保存在一個(gè)臨時(shí)變量中滓窍,等注冊(cè)完畢后再調(diào)用相應(yīng)方法卖词;如果已完成注冊(cè),則應(yīng)調(diào)用handlerAdded()等回調(diào)方法吏夯;
DefaultChannelPipeline # addLast()

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);
        // 加入到Handler鏈中
        addLast0(newCtx);
       
        // 由前面的分析可知此蜈,系統(tǒng)剛啟動(dòng)時(shí)init()會(huì)走到這一步,此時(shí)并沒有完成注冊(cè)噪生,因此會(huì)進(jìn)入if裆赵;
        if (!registered) {
            newCtx.setAddPending();        
            //在注冊(cè)完成后,會(huì)調(diào)用pendingHandlerCallbackHead的相關(guān)方法
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

callHandlerCallbackLater()

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while (pending.next != null) {
            pending = pending.next;
        }
        // 注冊(cè)完成后跺嗽,按照鏈的順序依次調(diào)用
        pending.next = task;
    }
}

addLast()完成后战授,Pipeline中Handler鏈的順序如下:


handler鏈

init()完成后,開始異步注冊(cè)
AbstractBootStrap # initAndRegister()

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
           .......
        }
        // 異步注冊(cè)
        // 使用chooser在EventLoopGroup中選擇一個(gè)EventLoop進(jìn)行注冊(cè)
        ChannelFuture regFuture = config().group().register(channel);
        ......
        return regFuture;
 }

SingleThreadEventLoop # register()

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

AbstractChannel # AbstractUnsafe # register()

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }
     // NioServerSocketChannel的eventLoop 賦了值
    AbstractChannel.this.eventLoop = eventLoop;
    // 系統(tǒng)剛啟動(dòng)時(shí)桨嫁,EventLoopGroup中所有的EventLoop都是未啟動(dòng)狀態(tài)植兰,EventLoop的thread屬性也為null
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            // 由上文可知,execute()方法用來提交任務(wù)璃吧,啟動(dòng)一個(gè)新的EventLoop
            // 在此楣导,channel已經(jīng)和EventLoop產(chǎn)生了聯(lián)系
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ........
        }
    }
}

AbstractChannel # AbstractUnsafe # register0()

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // JDK的注冊(cè):將ServerSocketChannel注冊(cè)到selector上;有一點(diǎn)要注意畜挨,這里注冊(cè)的時(shí)候爷辙,并沒有注冊(cè)感興趣的事件;
        // selectionKey = javaChannel().register(eventLoop().selector, 0, this);
        doRegister();
        neverRegistered = false;
        registered = true;
        // 到這里注冊(cè)完成
        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

到這里可以看出Netty中完成注冊(cè)的標(biāo)識(shí)必須滿足下面三個(gè)條件:

  1. NioServerSocketChannel 的eventLoop 字段賦上了值
  2. 這個(gè)EventLoop要啟動(dòng)(新建的EventLoop是未啟動(dòng)狀態(tài))
  3. JDK級(jí)別的ServerSocketChannel 注冊(cè)到selector上(不一定有事件)

在前面我們準(zhǔn)備了一些動(dòng)作必須等注冊(cè)完成后才能觸發(fā)朦促,這里再回顧一下是哪些動(dòng)作:
a. AbstractBootstrap # doBind() 里注冊(cè)了一個(gè)監(jiān)聽器膝晾,注冊(cè)完成后調(diào)用;
b. DefaultChannelPipeline # addLast() 里注冊(cè)了一個(gè)回調(diào)鏈务冕,也是等注冊(cè)完成后調(diào)用血当,實(shí)際就是調(diào)用handlerAdded();

注冊(cè)后繼續(xù)做了如下動(dòng)作:

pipeline.invokeHandlerAddedIfNeeded();
// 設(shè)置成功標(biāo)志,這里會(huì)調(diào)用listener臊旭;前面注冊(cè)的ChannelFutureListener就會(huì)在此調(diào)用落恼,進(jìn)行本地端口的綁定;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
//
if (isActive()) {
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        beginRead();
    }
}

pipeline.invokeHandlerAddedIfNeeded()會(huì)走到下面代碼

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        // This Channel itself was registered.
        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
        // Null out so it can be GC'ed.
        this.pendingHandlerCallbackHead = null;
    }
    //  這里就是 上文添加的Handler鏈离熏;ChannelInitializer就是其中之一
    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        // 這里會(huì)調(diào)用到ChannelInitializer的initChannel()
        task.execute();
        task = task.next;
    }
}

ChannelInitializer # initChannel()

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            .......
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

上述操作會(huì)導(dǎo)致Pipeline中Handler鏈發(fā)生下面的變化:

handler鏈

safeSetSuccess(promise) 會(huì)走到如下代碼
DefaultPromise # trySuccess()

public boolean trySuccess(V result) {
     if (setSuccess0(result)) {
          //調(diào)用listener佳谦;上文注冊(cè)的ChannelFutureListener就會(huì)在此被調(diào)用
          notifyListeners(); 
          return true;
     }
     return false;
}

到此Netty服務(wù)器已經(jīng)啟動(dòng),等待客戶端連接滋戳,啟動(dòng)大致流程如下:


image.png

觸發(fā)channelActive()

服務(wù)器注冊(cè)完成后钻蔑,Pipeline中Handler鏈的結(jié)構(gòu)如下,此時(shí)調(diào)用pipeline.fireChannelActive()


Handler鏈
設(shè)置ACCEPT事件

DefaultChannelPipeline # fireChannelActive()

public final ChannelPipeline fireChannelActive() {
    // 這里最終會(huì)走到HeadContext中
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

HeadContext # channelActive()

public void channelActive(ChannelHandlerContext ctx) throws Exception {
     //這個(gè)會(huì)沿著Handler鏈的方向調(diào)用下一個(gè)handler的方法
     //在目前的Handler鏈中奸鸯,其實(shí)什么都沒做
     ctx.fireChannelActive();

     readIfIsAutoRead();
     /////////////////////////////////////////// readIfIsAutoRead的方法體
     private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                 //這個(gè)是可以配置關(guān)閉的咪笑,默認(rèn)都是打開;這個(gè)開關(guān)與限流有關(guān)
                // 這個(gè)方法最終走到AbstractNioChannel的doBeginRead()
                channel.read();
            }
     }
     ///////////////////////////////////////////
}

AbstractNioChannel # doBeginRead()

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // readInterestOp在channel初始化時(shí)設(shè)為了accept
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

由上文可知娄涩,注冊(cè)時(shí)沒有注冊(cè)感興趣的事件 (selectionKey = javaChannel().register(eventLoop().selector, 0, this)窗怒,當(dāng)時(shí)傳入的是0)。這里才完成感興趣事件的設(shè)置(至于為什么滯后設(shè)置蓄拣,我覺得是要把所有相關(guān)的類都初始化完成后才注冊(cè)扬虚,否則在這個(gè)過程中有客戶端訪問,可能會(huì)出錯(cuò))球恤。此時(shí)EventLoop的循環(huán)才真正可以檢查IO事件孔轴。

接收客戶端連接

當(dāng)客戶端連接到來時(shí),EventLoop的循環(huán)方法(run())會(huì)檢測(cè)到這一事件碎捺,進(jìn)入processSelectedKey()方法
NioEventLoop # processSelectedKey()

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    ......
    try {
        ......
        //處理 read or accept
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            // 調(diào)用NioMessageUnsafe的read()
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

NioMessageUnsafe # read()

public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                ///////////////////////////////////////////// NioServerSocketChannel.doReadMessages()
                SocketChannel ch = javaChannel().accept();
                try {
                      if (ch != null) {
                            buf.add(new NioSocketChannel(this, ch));
                            return 1;
                      }
                } catch (Throwable t) {
                    ......
                }
                return 0;
                  
                /////////////////////////////////////////////
                ......
                
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        
        pipeline.fireChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        .......
    } finally {
        .......
    }
}

這里會(huì)調(diào)用accept()接受客戶端連接并保存在list中路鹰;接著觸發(fā)了pipeline.fireChannelRead(),channelRead事件就會(huì)從HeadContext傳播到ServerBootstrapAcceptor
ServerBootstrapAcceptor # channelRead()

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //msg 是前文accept()接收的NioSocketChannel
    final Channel child = (Channel) msg;
    // 添加ChannelInitializer
    child.pipeline().addLast(childHandler);

    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        //childGroup為workerGroup
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

上面的方法會(huì)將accept的SocketChannel進(jìn)行注冊(cè)收厨,與ServerSocketChannel的注冊(cè)唯一不同的是此處會(huì)把Handler與另一個(gè)NioEventLoopGroup關(guān)聯(lián)晋柱;下面的代碼是我們對(duì)這個(gè)socket的初始設(shè)置。

ServerBootstrap b = new ServerBootstrap();
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("http-decoder", new HttpRequestDecoder());
        pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
        pipeline.addLast("encoder", new HttpResponseEncoder());
        pipeline.addLast(new MyInboundHandler());
    }
});

Tips: Netty中將與accept()返回的socket有關(guān)的參數(shù)稱之為childXXX诵叁,比如childOption()就是給該socket設(shè)置參數(shù)雁竞,childHandler()是給該socket設(shè)置處理器

channelRead()后蚂斤,系統(tǒng)中就形成了一條新的Handler鏈翘簇,其結(jié)構(gòu)如下限寞;之后與客戶端的讀寫交互都由這個(gè)Handler鏈處理

Child Handler

Netty工作流程圖

netty工作流程圖
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末魂贬,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子病毡,更是在濱河造成了極大的恐慌雷滋,老刑警劉巖平绩,帶你破解...
    沈念sama閱讀 206,602評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件恭垦,死亡現(xiàn)場(chǎng)離奇詭異快毛,居然都是意外死亡格嗅,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門唠帝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來屯掖,“玉大人,你說我怎么就攤上這事襟衰√” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵瀑晒,是天一觀的道長(zhǎng)绍坝。 經(jīng)常有香客問我,道長(zhǎng)瑰妄,這世上最難降的妖魔是什么陷嘴? 我笑而不...
    開封第一講書人閱讀 55,306評(píng)論 1 279
  • 正文 為了忘掉前任映砖,我火速辦了婚禮间坐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘邑退。我一直安慰自己竹宋,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評(píng)論 5 373
  • 文/花漫 我一把揭開白布地技。 她就那樣靜靜地躺著蜈七,像睡著了一般。 火紅的嫁衣襯著肌膚如雪莫矗。 梳的紋絲不亂的頭發(fā)上飒硅,一...
    開封第一講書人閱讀 49,071評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音作谚,去河邊找鬼三娩。 笑死,一個(gè)胖子當(dāng)著我的面吹牛妹懒,可吹牛的內(nèi)容都是我干的雀监。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼眨唬,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼会前!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起匾竿,我...
    開封第一講書人閱讀 37,006評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤瓦宜,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后岭妖,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體歉提,經(jīng)...
    沈念sama閱讀 43,512評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡笛坦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了苔巨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片版扩。...
    茶點(diǎn)故事閱讀 38,094評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖侄泽,靈堂內(nèi)的尸體忽然破棺而出礁芦,到底是詐尸還是另有隱情,我是刑警寧澤悼尾,帶...
    沈念sama閱讀 33,732評(píng)論 4 323
  • 正文 年R本政府宣布柿扣,位于F島的核電站,受9級(jí)特大地震影響闺魏,放射性物質(zhì)發(fā)生泄漏未状。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評(píng)論 3 307
  • 文/蒙蒙 一析桥、第九天 我趴在偏房一處隱蔽的房頂上張望司草。 院中可真熱鬧,春花似錦泡仗、人聲如沸埋虹。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽搔课。三九已至,卻和暖如春截亦,著一層夾襖步出監(jiān)牢的瞬間爬泥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評(píng)論 1 262
  • 我被黑心中介騙來泰國打工崩瓤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留袍啡,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,536評(píng)論 2 354
  • 正文 我出身青樓谷遂,卻偏偏與公主長(zhǎng)得像葬馋,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子肾扰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容