Netty源碼深度解析-Pipeline(2) 以客戶端為例分析Pipeline工作原理

導(dǎo)讀

原創(chuàng)文章浆洗,轉(zhuǎn)載請注明出處狈蚤。

本文源碼地址:netty-source-code-analysis

本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼

在"Pipeline的構(gòu)造"這一節(jié)中我們已經(jīng)講過了,Pipeline中利用了責(zé)任鏈模式笋粟,而發(fā)揮責(zé)任鏈功能的數(shù)據(jù)結(jié)構(gòu)就是由多個HandlerContext構(gòu)成的的雙向鏈表特占,而每一個HandlerContext又對應(yīng)一個ChannelHandler(組合模式或者繼承)乃摹。由于每一個HandlerContext對應(yīng)一個ChannelHandler,所以本文行文中有時候會把HandlerContextChannelHandler等同對待赢乓,例如文章中

從當(dāng)前HandlerContexttail)開始向head方向遍歷尋找下一個ChannelOutboundHandler

實際上準確來說應(yīng)該是

從當(dāng)前HandlerContexttail)開始向head方向遍歷尋找下一個HandlerContext忧侧,并且該HandlerContext包含或者實現(xiàn)了ChannelOutboundHandler

為了行文不那么拗口牌芋,我們就把HandlerContextChannelHandler等同對待了蚓炬。

本文我們以客戶端建立連接、收發(fā)數(shù)據(jù)為例來學(xué)習(xí)一下Pipeline的工作原理躺屁。

1 ChannelHandler方法概覽

ChannelPipeline的構(gòu)造這篇文章中我們提到過ChannelHandler肯夏,Pipeline中雙向鏈表由HandlerContext組成,而每一個HandlerContext又包含一個ChannleHandler犀暑。我們看一下其中的主要方法驯击,這是所有ChannelHandler的公共接口,公共接口中主要有兩個方法handlerAddedhandlerRemoved耐亏,這是在ChannelHandler添加和刪除完成之后的回調(diào)方法徊都。上一篇文章中我們已經(jīng)分析過PipelineChannelHandler的添加和刪除了,這里不再贅述广辰。

public interface ChannelHandler {
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
     @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
}

我們重點來看ChannelHandler的兩個子接口暇矫,ChannelInboundHandlerChannelOutboundHandler主之。
首先來看ChannelInboundHandlerChannelInboundHandler類上的注釋如下

which adds callbacks for state changes. This allows the user
to hook in to state changes easily.

翻譯過來就是“Channel狀態(tài)改變時的回調(diào)方法”李根。其中的方法名多為Channel + 動詞過去分詞的形式槽奕。

public interface ChannelInboundHandler extends ChannelHandler {
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

再來看ChannelOutboundHandlerChannelOutboundHandler上的注釋如下

which will get notified for IO-outbound-operations.

翻譯過來就是“當(dāng)發(fā)生IO出站操作時的回調(diào)方法”朱巨,其中的方法名多為動詞原型的形式史翘。

public interface ChannelOutboundHandler extends ChannelHandler {
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
    void read(ChannelHandlerContext ctx) throws Exception;
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
    void flush(ChannelHandlerContext ctx) throws Exception;
}

2 ChannelInboundInvoker和ChannelOutboundInvoker

和本文相關(guān)的組件中,ChannelPipelineChannelHandlerContext都實現(xiàn)了ChannelInboundInvokerChannelOutboundInvoker接口冀续。

ChannelInboundInvoker和ChannelOutboundInvoker UML類圖

ChannelInboundInvoker內(nèi)的方法的特點是大部分都是以fire開頭琼讽,并且是fire + 動詞的過去分詞的形式。我們知道過去分詞是表被動的意思洪唐,所以這里呢钻蹬,一般是Channel里發(fā)生了某些“被動事件”之后調(diào)用的。

public interface ChannelInboundInvoker {
    ChannelInboundInvoker fireChannelRegistered();
    ChannelInboundInvoker fireChannelUnregistered();
    ChannelInboundInvoker fireChannelActive();
    ChannelInboundInvoker fireChannelInactive();
    ChannelInboundInvoker fireExceptionCaught(Throwable cause);
    ChannelInboundInvoker fireUserEventTriggered(Object event);
    ChannelInboundInvoker fireChannelRead(Object msg);
    ChannelInboundInvoker fireChannelReadComplete();
    ChannelInboundInvoker fireChannelWritabilityChanged();
}

ChannelOutboundInvoker內(nèi)的方法的特點是除了后邊幾個以new開頭的方法之外凭需,都是以單個或者兩個動詞原型的形式作為方法名稱问欠,比如readwritewriteAndFlush粒蜈。這里直接用動詞原型就是表示主動的意思顺献,所以這里一般是向Channel發(fā)送主動命令使用的。

public interface ChannelOutboundInvoker {
    ChannelFuture bind(SocketAddress localAddress);
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect(ChannelPromise promise);
    ChannelFuture close(ChannelPromise promise);
    ChannelFuture deregister(ChannelPromise promise);
    ChannelOutboundInvoker read();
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);
    ChannelPromise voidPromise();

我們來仔細分析一下上文中提到的ChannelInboundHandler枯怖、ChannelInboundInvoker注整、ChannelOutboundHandlerChannelOutboundInvoker
我們看到ChannelInboundHandlerChannelInboundInvoker中的方法非常相似度硝,ChannelInboundHandler中的方法名多以Channel加動詞過去分詞的形式組成肿轨,而ChannelInboundInvoker中的方法就是ChannelInboundHandler中的方法加上fire構(gòu)成。

ChannelOutboundHandlerChannelOutboundInvoker中的方法名幾乎一模一樣蕊程,都是以動詞原形作為方法名椒袍。

我們來仔細琢磨一下這些方法的命名,Channel加動詞過去分詞表示該Channel發(fā)生了某種事件藻茂,而fireChannel加動詞過去分詞表示在該Channel上觸發(fā)這種事件驹暑。例如ChannelRead表示該Channel讀到了數(shù)據(jù)事件,fireChannelRead表示在該Channel上觸發(fā)ChannelRead事件辨赐。

直接用動詞原形作為方法名岗钩,表示命令該Channel進行某種操作,例如read表示命令該Channel進行數(shù)據(jù)的讀取肖油。為什么ChannelOutboundInvoker中的方法名沒有以fire開頭呢兼吓,因為ChannelOutbound傳播的不是事件,而是一種主動的操作森枪。

還有同學(xué)記得我在“Netty整體架構(gòu)”這篇文章中提到的“事件”和“命令”嗎视搏,在某些地方翻譯成“入站事件”和“出站事件”审孽,我個人認為翻譯成“事件”和“命令”更容易理解,因為我們感知到“事件”的發(fā)生是被動的浑娜,而發(fā)出“命令”是主動的佑力,都翻譯成“事件”不容易理解。

3 以客戶端為例分析Pipeline工作原理

本文的示例代碼在package com.zhongdaima.netty.analysis.pipeline中筋遭。
服務(wù)端示例代碼如下打颤,這個服務(wù)端的功能就是將所有客戶端發(fā)過來的數(shù)據(jù)原封不動地返回。

/**
 * 歡迎關(guān)注公眾號“種代碼“漓滔,獲取博主微信深入交流
 *
 * @author wangjianxin
 */
public class ServerBoot {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    //連接發(fā)過來的數(shù)據(jù)原樣返回
                                    if (msg instanceof ByteBuf) {
                                        ctx.write(msg);
                                        ctx.flush();
                                    }
                                }

                            });
                        }
                    });
            ChannelFuture f = b.bind(8000).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

客戶端的示例代碼如下编饺,在客戶端中我們添加了3個ChannelHandler,分別是A响驴,B透且,C

/**
 * 歡迎關(guān)注公眾號“種代碼“豁鲤,獲取博主微信深入交流
 *
 * @author wangjianxin
 */
public class ClientBoot {
    public static void main(String[] args) {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast("A", new AInBoundHandler());
                            ch.pipeline().addLast("B", new BOutBoundHandler());
                            ch.pipeline().addLast("C", new CDuplexHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000);
            Channel channel = channelFuture.syncUninterruptibly().channel();
            channel.write(Unpooled.wrappedBuffer("Hello, 種代碼".getBytes()));
            channel.flush();
            channel.closeFuture().awaitUninterruptibly();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

AInboundHandler繼承自ChannelInboundHandlerAdapter秽誊,ChannelInboundHandlerAdapterNettyChannelInboundHandler的默認實現(xiàn),AInboundHandler是一個ChannelInboundHandler琳骡。

public class AInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ChannelActive in A");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
            //這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
            ((ByteBuf) msg).getBytes(0, bytes);
            System.out.println("ChannelRead in A, msg=" + new String(bytes));
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("ExceptionCaught in A, cause=" + cause);
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("HandlerAdded in A");
        super.handlerAdded(ctx);
    }
}

BOutboundHandler繼承自ChannelOutboundHandlerAdapter锅论,ChannelOutboundHandlerAdapter是netty中``ChannelOutboundHandler的默認實現(xiàn),BOutboundHandler是一個ChannelOutboundHandler`楣号。

public class BOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("Connect in B");
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Read in B");
        super.read(ctx);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof ByteBuf){
            byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
            //這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
            ((ByteBuf) msg).getBytes(0, bytes);
            System.out.println("Write in B, msg=" + new String(bytes));
        }
        super.write(ctx, msg, promise);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("HandlerAdded in B");
        super.handlerAdded(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("ExceptionCaught in B, cause=" + cause);
        super.exceptionCaught(ctx, cause);
    }
}

CDuplexHandler繼承自ChannelDuplexHandler棍厌,ChannelDuplexHandler同時實現(xiàn)了ChannelInboundHandlerChannelOutboundHandler,所以CDuplexHandler既是一個ChannelInboundHandler又是一個ChannelOutboundHandler竖席。

public class CDuplexHandler extends ChannelDuplexHandler {
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        System.out.println("Connect in C");
        super.connect(ctx, remoteAddress, localAddress, promise);
    }

    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Read in C");
        super.read(ctx);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof ByteBuf) {
            byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
            //這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
            ((ByteBuf) msg).getBytes(0, bytes);
            System.out.println("Write in C, msg=" + new String(bytes));
        }
        super.write(ctx, msg, promise);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("ChannelActive in C");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
            //這里調(diào)用getBytes不會導(dǎo)致readerIndex的移動
            ((ByteBuf) msg).getBytes(0, bytes);
            System.out.println("ChannelRead in C, msg=" + new String(bytes));
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("ExceptionCaught in C, cause=" + cause);
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.out.println("HandlerAdded in C");
        super.handlerAdded(ctx);
    }
}

在添加完這3個Handler之后,我們來看一下客戶端的Pipeline狀態(tài)敬肚,當(dāng)前Pipeline的狀態(tài)如下圖所示毕荐。除了內(nèi)置的HeadContextTailContext之外,還有我們手動添加的A艳馒、B憎亚、C3個Handler

當(dāng)前客戶端的Pipeline狀態(tài)

先啟動服務(wù)端弄慰,再啟動客戶端第美,我們可以在客戶端的控制臺中看到如下輸出。

Connect in C
Connect in B
ChannelActive in A
ChannelActive in C
Read in C
Read in B
Write in C, msg=Hello, 種代碼
Write in B, msg=Hello, 種代碼
ChannelRead in A, msg=Hello, 種代碼
ChannelRead in C, msg=Hello, 種代碼
Read in C
Read in B

4 ChannelPipeline的方向

我們在Pipeline的構(gòu)造中提到過Pipeline中有兩個特殊的ChannelHandlerContext陆爽,分別是HeadContextTailContext什往。既然有了HeadTail那必然是有方向了,那Pipeline的方向是什么樣的呢慌闭,我們看下面這張圖别威。

image.png

Channel直接接觸的方向是Head躯舔,另一端是Tail
在“Netty整體架構(gòu)”這篇文章中也提到過ChannelInboundHandlerChannelOutboundHandler省古,那InboundOutbound又是哪個方向呢粥庄。我們站在應(yīng)用的位置來看,即站在BizHandler的位置來看豺妓,從Channel讀取數(shù)據(jù)進入到我們的應(yīng)用即是Inbound惜互,而從我們的應(yīng)用向Channel寫數(shù)據(jù)即是Outbound

5 Pipeline的工作原理

我們在前面的文章中在牽涉到Pipeline的地方都一筆帶過了琳拭,今天咱們來詳細分析一下曾經(jīng)略過的內(nèi)容训堆。

5.1 connect的傳播

在我們調(diào)用bootstrap.connect("127.0.0.1", 8000)這行代碼進行連接時最終會調(diào)用到AbstractChannel#connect方法,該方法邏輯很簡單臀栈,直接調(diào)用了pipelineconnect方法蔫慧,咱們跟進去看。

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

Pipelineconnect方法同樣很簡單权薯,調(diào)用了tailconnect方法姑躲,TailContext繼承自AbstractChannelHandlerContext,并沒有覆蓋connect方法盟蚣,所以tail.connect調(diào)用到了AbstractChannelHandlerContext中的connect方法黍析。

public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

AbstractChannelHandlerContext中的connect方法,先是調(diào)用findContextOutbound屎开,找到下一個ChannelOutboundHandler阐枣,這個方法很簡單,就是從當(dāng)前HandlerContexttail)開始向head方向遍歷尋找下一個ChannelOutboundHandler奄抽,在我們的示例中蔼两,tail的下一個ChannelOutboundHandler就是我們所添加的CDuplexHandler,找到之后調(diào)用了next.invokeContext方法逞度。

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
}

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    //從當(dāng)前HandlerContext向`head`方向查找下一個包含`ChannelOutboundHandler`的`HandlerContext`
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

invokeConnect方法首先調(diào)用invokeHandler()方法判斷當(dāng)前Handler是否已經(jīng)添加完成(當(dāng)前HandlerhandlerAdded方法已經(jīng)被調(diào)用過)额划。如果返回true就調(diào)用當(dāng)前Handlerconnect方法,這就是我們的CDuplexHandlerconnect方法被調(diào)用到的地方档泽,也就是我們在控制臺看到的第一個輸出Connect in C俊戳。如果返回false就繼續(xù)調(diào)用HandlerContextconnect方法繼續(xù)查找下一個ChannelOutboundHanlder

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    //判斷當(dāng)前Handler是否已經(jīng)完成添加(即handlerAdded方法被調(diào)用過了)
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

回到咱們的CDuplexHandlerconnect方法馆匿,打印完Connect in C之后調(diào)用了super.connect方法抑胎,即ChannelDuplexHandler#connect方法。

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    System.out.println("Connect in C");
    super.connect(ctx, remoteAddress, localAddress, promise);
}

ChannelDuplexHandler#connect方法調(diào)用了它所在的HandlerContextconnect方法渐北,即AbstractChannelHandlerContext#connect方法阿逃,熟悉的身影又回來了,咱們剛剛分析過這個方法,即tail.connect盆昙,而TailContext未覆蓋這個方法羽历,tail.connect就是ChannelDuplexHandler#connect

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
                    SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

顯而易見下一個ChannelOutboundHander就是BOutboundHandler淡喜,所以我們看到控制臺的第二行輸出Connect in B秕磷。
最后一個ChannelOutboundHandlerHeadContext,我們到HeadContext來看一下它的connnect方法炼团。HeadContextconnect方法很簡單澎嚣,直接調(diào)用了unsafeconnect方法,而unsafe.connect方法我們在“客戶端的啟動過程”這篇文章中已經(jīng)分析過了瘟芝,不再贅述易桃。

@Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

5.2 ChannelActive的傳播

我們在“客戶端的啟動過程”這篇文章中提到過AbstractNioUnsafe#finishConnect方法,當(dāng)NioEventLoop檢測到Channel上有OP_CONNECT事件發(fā)生時锌俱,會調(diào)用unsafe.finishConnect方法晤郑。

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

finishConnect方法在AbstratNioUnsafe中。在調(diào)用完doFinishConnect方法之后會調(diào)用fulfillConnectPromise方法贸宏,我們一起來看一下造寝。

@Override
public final void finishConnect() {
    try {
        boolean wasActive = isActive();
        doFinishConnect();
        fulfillConnectPromise(connectPromise, wasActive);
    } catch (Throwable t) {

    } finally {

    }
}

fulfillConnectPromise方法里有很多內(nèi)容,這里咱們只關(guān)注本次要講的重點吭练,pipeline().fireChannelActive()诫龙。

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {

    if (!wasActive && active) {
        pipeline().fireChannelActive();
    }

}

我們看一下pipelinefireChannelActive方法,這里調(diào)用了invokeChannelActive方法鲫咽,傳遞的參數(shù)是HeadContext签赃。

@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

invokeChannelActive方法調(diào)用了next.invokeChannelActive,這里的next就是HeadContext分尸。

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

invokeChannelActive方法調(diào)用了HandlerchannelActive方法锦聊,這里的Handler自然也是HeadContext

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelActive();
    }
}

HeadContext里的channelActive方法調(diào)用了ct.fireChannelActive()箩绍,顯然這里的ctx就是HeadContext本身孔庭。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

fireChannelActive方法在AbstractChannelHandlerContext中,它首先從當(dāng)前HandlerContext查找下一個ChannelInboundHandler伶选,然后調(diào)用invokeChannelActive方法,熟悉的身影又回來了尖昏,invokeChannelActive方法咱們剛剛也見過仰税。剛才調(diào)用invokeChannelActive方法是由Pipeline發(fā)起的,直接傳遞了head作為參數(shù)抽诉,而這里是由head發(fā)起的陨簇,參數(shù)是從head開始的下一個包含ChannelInboundHandler,顯然這個參數(shù)是我們所添加的AInboundHandler,所以我們在控制臺看到ChannelActive in A河绽。

@Override
public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound());
    return this;
}

繼續(xù)下去可以想見AInboundHandler被調(diào)用后也會查找下一個ChannelInboundHandler就是CDuplexHandler己单,所以我們在控制臺看到的下一條輸出就是ChannelActive in C

5.3 read的傳播

剛才咱們在看HeadContext里的channelActive方法時耙饰,其中并不僅僅只有ctx.fireChannelActive()調(diào)用纹笼,緊跟著的就是readIfIsAutoRead()的調(diào)用。咱們一起來看一下苟跪。這里判斷channel是否配置了自動讀取廷痘,默認情況下是true,接著調(diào)用了channel.read()件已。

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

channel.read方法在AbstractChannel中笋额,調(diào)用了pipeline.read方法。

@Override
public Channel read() {
    pipeline.read();
    return this;
}

pipelineread方法調(diào)用了tail.read篷扩,這里的tail就是指TailContext了兄猩。

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

tail沒有覆蓋read方法,這個方法的實現(xiàn)在AbstractChannelHandlerContext中鉴未,首先調(diào)用findContextOutbound方法查找下一個ChannelOutboundHandler枢冤。第一個查找到的就是CDuplexHandler,調(diào)用了它的invokeRead方法歼狼。

@Override
public ChannelHandlerContext read() {
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeRead();
    } else {
        Runnable task = next.invokeReadTask;
        if (task == null) {
            next.invokeReadTask = task = new Runnable() {
                @Override
                public void run() {
                    next.invokeRead();
                }
            };
        }
        executor.execute(task);
    }

    return this;
}

invokeRead方法在AbstractChannelHandlerContext中掏导,invokeRead調(diào)用了它所包含的ChannelInboundHandlerchannelRead方法,即CDuplexHandlerchannelRead方法羽峰,在控制臺打印出Read in C趟咆。

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

CDuplexHandlerchannelRead方法在打印完成后調(diào)用了super.read(ctx),我們跟下去看一下梅屉。

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
    System.out.println("Read in C");
    super.read(ctx);
}

接著調(diào)用了ctx.read()值纱,即AbstractChannelHandlerContext#read方法,熟悉的方法又出現(xiàn)了坯汤,接著調(diào)用下去就到了下一個ChannelOutboundHandlerBOutboundHandler虐唠,在控制臺打印出Read in B

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
    ctx.read();
}

接著往下調(diào)用就到了HeadContext惰聂,HeadContextread方法調(diào)用了unsafe.beginRead

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

unsafebeginRead方法調(diào)用了doBeginRead疆偿。

@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

我們以AbstractNioChannel為例看一下doBeginRead方法。這里的doBeginRead方法把readInterestOp加入到興趣事件中搓幌,readInterestOp在構(gòu)造方法中被賦值杆故,即是OP_READ興趣事件。

    @Override
    protected void doBeginRead() throws Exception {
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

調(diào)用read方法溉愁,并不能真正從Channel中讀取數(shù)據(jù)处铛,只是把OP_READ興趣事件加入到selectionKey

5.4 write的傳播

在完成連接后,我們在客戶端代碼中調(diào)用了channel.write(Unpooled.wrappedBuffer("Hello, 種代碼".getBytes()));向服務(wù)端發(fā)送數(shù)據(jù)撤蟆。咱們跟進去看看奕塑,這里的channel.write方法在AbstractChannel中,直接調(diào)用了pipeline.write方法家肯。

@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

pipelinewrite方法調(diào)用了tail.write方法龄砰,tail即是TailContext

@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

咱們就不再一步步跟下去了息楔,從控制臺打印出的Write in C, msg=Hello, 種代碼Write in B, msg=Hello, 種代碼來推測寝贡,write命令從tail開始經(jīng)過了CDuplexHandlerBOutboundHandler最后到達HeadContext

5.5 ChannelRead的傳播

在服務(wù)端接收到數(shù)據(jù)之后值依,直接將數(shù)據(jù)原樣返回圃泡,當(dāng)Eventloop發(fā)現(xiàn)Channel中發(fā)生OP_READ事件時,調(diào)用unsafe.read()進行數(shù)據(jù)的讀取愿险。這部分代碼咱們在講EventLoop的時候已經(jīng)提到過颇蜡,在NioEventLoopprocessSelectedKey方法中。

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

咱們跟進去看看unsafe.read方法辆亏,方法實現(xiàn)在NioByteUnsafe中风秤,其中分配緩沖區(qū)讀取數(shù)據(jù)的過程咱們先略過,這篇文章咱們著重介紹Pipeline扮叨,直接看讀取數(shù)據(jù)之后調(diào)用了什么方法缤弦。在讀取數(shù)據(jù)之后(所分配的緩沖區(qū)填滿)調(diào)用了pipeline.fireChannelRead(byteBuf),而在讀完數(shù)據(jù)之后(tcp緩沖區(qū)數(shù)據(jù)被讀完)調(diào)用了pipeline.fireChannelReadComplete()彻磁。

 @Override
    public final void read() {

        try {
            do {
                //分配緩沖區(qū)
                byteBuf = allocHandle.allocate(allocator);
                //讀取數(shù)據(jù)
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                //觸發(fā)ChannelRead事件
                pipeline.fireChannelRead(byteBuf);
                
            } while (allocHandle.continueReading());
            //觸發(fā)ChannelReadComplete事件
            pipeline.fireChannelReadComplete();

        } catch (Throwable t) {

        } finally {

        }
    }
}

先看看``pipeline.fireChannelRead(byteBuf)方法碍沐,很簡單,這里調(diào)用了AbstractChannelHandlerContext.invokeChannelRead方法衷蜓,并傳入?yún)?shù)head累提,即是HeadContext`。

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

invokeChannelRead方法又調(diào)用了next.invokeChannelRead磁浇,這里的nextHeadContext斋陪。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

到這兒里咱們也不再一步步跟下去了,根據(jù)控制臺打印出的ChannelRead in A, msg=Hello, 種代碼ChannelRead in C, msg=Hello, 種代碼來推測置吓,ChannelRead事件從HeadContext開始經(jīng)過AInboundHandler无虚,和CDuplexHandler到達TailContext
最后在控制臺又打印出“Read in C”和“Read in B”是怎么回事呢衍锚,那是因為在pipeline.fireChannelReadComplete()又發(fā)出了read命令友题,咱們不再贅述。

6 總結(jié)

  • ChannelOutboundInvoker:用來向Channel發(fā)送“命令”的接口构拳。
  • ChannelInboundInvoker:用來觸發(fā)Channel上發(fā)生“事件”的接口咆爽。
  • ChannelOutboundHandler:“命令”在Pipeline上傳播時的回調(diào)方法。
  • ChannelInboundHandler: “事件”在Pipeline上傳播時的回調(diào)方法置森。
  • Pipeline中主動發(fā)出的命令斗埂,例如connectread凫海、write等由TailContext開始依次經(jīng)過所有的ChannelOutboundHandler呛凶,最后到達HeadContext
  • Pipeline中被動發(fā)生的事件行贪,例如ChannelActive漾稀、ChannelRead等由HeadContext開始依次經(jīng)過所有的ChannelInboundHandler,最后到達TailContext建瘫。

關(guān)于作者

王建新崭捍,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負責(zé)服務(wù)治理啰脚、RPC框架殷蛇、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等橄浓。愛技術(shù)粒梦、愛學(xué)習(xí),歡迎聯(lián)系交流荸实。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末匀们,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子准给,更是在濱河造成了極大的恐慌泄朴,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件圆存,死亡現(xiàn)場離奇詭異叼旋,居然都是意外死亡,警方通過查閱死者的電腦和手機沦辙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門夫植,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人油讯,你說我怎么就攤上這事详民。” “怎么了陌兑?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵沈跨,是天一觀的道長。 經(jīng)常有香客問我兔综,道長饿凛,這世上最難降的妖魔是什么狞玛? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮涧窒,結(jié)果婚禮上心肪,老公的妹妹穿的比我還像新娘。我一直安慰自己纠吴,他們只是感情好硬鞍,可當(dāng)我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著戴已,像睡著了一般固该。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上糖儡,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天伐坏,我揣著相機與錄音,去河邊找鬼握联。 笑死著淆,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的拴疤。 我是一名探鬼主播永部,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼呐矾!你這毒婦竟也來了苔埋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蜒犯,失蹤者是張志新(化名)和其女友劉穎组橄,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體罚随,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡玉工,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了淘菩。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片遵班。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖潮改,靈堂內(nèi)的尸體忽然破棺而出狭郑,到底是詐尸還是另有隱情,我是刑警寧澤汇在,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布翰萨,位于F島的核電站,受9級特大地震影響糕殉,放射性物質(zhì)發(fā)生泄漏亩鬼。R本人自食惡果不足惜殖告,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望雳锋。 院中可真熱鬧丛肮,春花似錦、人聲如沸魄缚。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至虽风,卻和暖如春咆瘟,著一層夾襖步出監(jiān)牢的瞬間嚼隘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工袒餐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留飞蛹,地道東北人。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓灸眼,卻偏偏與公主長得像卧檐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子焰宣,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容