2019-05-24 pipeline 事件及異常的傳播

netty中事件的傳播主要包含inBound事件和outBound事件

 ChannelInboundHandler extends ChannelHandler 
 ChannelOutboundHandler extends ChannelHandler

首先我們看下ChannelInboundHandler接口坛怪,主要包含以下方法进倍,基本都是用和連接事件相關(guān)的

    /**
     * channel 注冊到NioEventLoop上的回調(diào)
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;
    /**
     * channel 解除注冊到NioEventLoop上的回調(diào)
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
    /**
     * channel在激活之后的回調(diào)
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;
    /**
     * channel失效之后的回調(diào)
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;
    /**
     * channel在讀取數(shù)據(jù),或者接收 到鏈接之后的回調(diào),
     * 對于服務(wù)端channel,這里的msg是一個(gè)鏈接
     * 對于客戶端channel,這里的msg是一個(gè)ByteBuf
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    /**
     * 數(shù)據(jù)讀取完成后的一個(gè)回調(diào)
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    /**
     * 用戶可以自定義的一些事件
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
    /**
     * 異常事件的傳播
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

InBoundHandler有其對應(yīng)的對應(yīng)的實(shí)現(xiàn)類

class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

接下來基于ChannelInboundHandlerAdapter講一下read事件pipeline中的傳播流程,這是我們案例的服務(wù)端代碼

public static void main(String[] args) throws Exception {
       // Configure the server.
       EventLoopGroup bossGroup = new NioEventLoopGroup(1);
       EventLoopGroup workerGroup = new NioEventLoopGroup();
       try {
           ServerBootstrap b = new ServerBootstrap();
           b.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                   .option(ChannelOption.SO_BACKLOG, 100)
                   .handler(new LoggingHandler(LogLevel.INFO))
                   .childHandler(new ChannelInitializer<SocketChannel>() {
                       @Override
                     /**
                     添加3個(gè)InBoundHandler
                     */
                       public void initChannel(SocketChannel ch) throws Exception {
                           ch.pipeline().addLast(new InBoundHandlerA());
                           ch.pipeline().addLast(new InBoundHandlerB());
                           ch.pipeline().addLast(new InBoundHandlerC());
                       }
                   });

           // Start the server.
           ChannelFuture f = b.bind(8007).sync();

           // Wait until the server socket is closed.
           f.channel().closeFuture().sync();
       } finally {
           // Shut down all event loops to terminate all threads.
           bossGroup.shutdownGracefully();
           workerGroup.shutdownGracefully();
       }
   }

InBoundHandlerA的實(shí)現(xiàn)如下:

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(this.getClass().getName() + " read msg:" + msg);
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {    
       ctx.channel().pipeline().fireChannelRead("hello");
    }
}

InBoundHandlerB,InBoundHandlerC的實(shí)現(xiàn)如下:

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(this.getClass().getName() + " read msg:" + msg);
        super.channelRead(ctx, msg);
    }
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(this.getClass().getName() + " read msg:" + msg);
        super.channelRead(ctx, msg);
    }
}

啟動(dòng)server,本地通過telnet命令telnet localhost 8007 觸發(fā)后偷办,我們可以看到控制臺會輸出以下結(jié)果:

com.tyust.netty.inbound.InBoundHandlerA read msg:hello
com.tyust.netty.inbound.InBoundHandlerB read msg:hello
com.tyust.netty.inbound.InBoundHandlerC read msg:hello
17:21:48.214 [nioEventLoopGroup-3-1] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.

從控制臺輸出的日志可以得知,read事件在pipeline中的傳播是基于InBoundHandlerpipeline中的添加順序來的.接下來從InBoundHandlerA的這行代碼ctx.channel().pipeline().fireChannelRead("hello")入手,基于源碼分析一下這個(gè)中間的執(zhí)行流程.

代碼位置:io.netty.channel.DefaultChannelPipeline#fireChannelRead

image.png

可以看出這個(gè)事件是在Head節(jié)點(diǎn)開始傳播的。

基于前面的文章,我們都知道,實(shí)例中的代碼的pipeline結(jié)構(gòu)是這樣的

image.png

而現(xiàn)在我們在IA這個(gè)節(jié)點(diǎn)觸發(fā)了一個(gè)read事件澄港,流程也就是這樣的

image.png

根據(jù)代碼可知椒涯,最開始是在head節(jié)點(diǎn)開始傳播的,從head節(jié)點(diǎn)觸發(fā)之后回梧,我們繼續(xù)跟代碼废岂,看到代碼后會進(jìn)入HeadContext進(jìn)行處理

image.png

image.png

代碼進(jìn)入這個(gè)位置:io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead

image.png

image.png

這時(shí)候會開始去找尋下一個(gè)InboundHandler祖搓,我們看下找尋的邏輯,輪訓(xùn)pipeline中的Handler湖苞,碰到inbound的就返回.
image.png

inboundoutbound的標(biāo)示是在構(gòu)建context的時(shí)候就定好了的

image.png

繼續(xù)跟代碼拯欧,這時(shí)候就得到了InBoundHandlerA,繼續(xù)調(diào)用其invokeChannelRead财骨,就進(jìn)入了我們的InBoundHandlerA#channelRead方法

image.png

image.png

同樣的镐作,InBoundHandlerBInBoundHandlerB的執(zhí)行也是一樣的思路.
最后,事件會傳播到我們的TailContext節(jié)點(diǎn)

image.png

看下我們TailContext中的read邏輯,會打印出前面我們控制臺中顯示的那一段
Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.的日志,最后msg進(jìn)行回收,避免內(nèi)存泄漏.

image.png

image.png

好的隆箩,我們的InBound事件的傳播就分析到這里该贾,接下來我們看outBound事件。


看到ChannelOutboundHandler接口的定義捌臊,可以看出基本都是跟IO讀寫相關(guān)的事件

public interface ChannelOutboundHandler extends ChannelHandler {
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}

OutBoundHandler也有其對應(yīng)的對應(yīng)的實(shí)現(xiàn)類

class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler

接下來基于ChannelOutboundHandlerAdapter講一下write事件pipeline中的傳播流程杨蛋,基于之前的代碼,我們改下添加handler的部分

...
.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new OutBoundHandlerA());
                            ch.pipeline().addLast(new OutBoundHandlerB());
                            ch.pipeline().addLast(new OutBoundHandlerC());
                        }
                    });

InBoundHandlerA的實(shí)現(xiàn)如下:

public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println(this.getClass().getName() + " write msg: " + msg);
        super.write(ctx, msg, promise);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
    }
}

InBoundHandlerB理澎,InBoundHandlerC的代碼實(shí)現(xiàn)如下:

public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println(this.getClass().getName() + " write msg: " + msg);
        super.write(ctx, msg, promise);
    }
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println(this.getClass().getName() + " write msg: " + msg);
        super.write(ctx, msg, promise);
    }
}

啟動(dòng)server逞力,本地通過telnet命令telnet localhost 8007 觸發(fā)后,我們可以看到控制臺會輸出以下結(jié)果:

com.tyust.netty.outbound.OutBoundHandlerC write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerB write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerA write msg: hello,world

從控制臺輸出的日志可以得知, write事件在pipeline中的傳播是基于outBoundHandlerpipeline中的添加順序逆向順序來的矾端。接下來從OutBoundHandlerA的這行代碼ctx.channel().pipeline().fireChannelRead("hello")入手掏击,基于源碼分析一下這個(gè)中間的執(zhí)行流程。

從代碼流程來看秩铆,可以看出事件傳播是從TailContext開始傳播

image.png

image.png

接著砚亭,會去pipeline中開始尋找下一個(gè)節(jié)點(diǎn)OutBoundHandlerC

image.png

image.png
image.png

接著代碼就會進(jìn)入OutBoundHandlerC#write方法中,OutBoundHandlerC中事件會繼續(xù)沿著pipeline往下進(jìn)行傳播殴玛,最終會傳播到HeadContext

image.png

流程就是如圖所示:


image.png

最后我們看下在HeadContext中對write事件的處理捅膘,他會調(diào)用unsafewrite方法,unsafe#write主要是將數(shù)據(jù)寫會到客戶端滚粟,這里對unsafe不做過多的解析寻仗,后面我們會詳細(xì)講unsafe

image.png

ok凡壤,outBound事件就分析到這里署尤,接下來我們分析異常的傳播


修改我們server端的代碼變成如下:

    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new InBoundHandlerA());
                            ch.pipeline().addLast(new InBoundHandlerB());
                            ch.pipeline().addLast(new InBoundHandlerC());
                            ch.pipeline().addLast(new OutBoundHandlerA());
                            ch.pipeline().addLast(new OutBoundHandlerB());
                            ch.pipeline().addLast(new OutBoundHandlerC());
                        }
                    });

其中InBoundHandlerB的代碼如下,調(diào)用channelRead方法的時(shí)候會拋出一個(gè)異常:

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        throw new RuntimeException(this.getClass().getName() + " happen error");
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(this.getClass().getName() + " exceptionCaught exec ");
        ctx.fireExceptionCaught(cause);
    }
}

InBoundHandlerAInBoundHandlerC亚侠,OutBoundHandlerA曹体,OutBoundHandlerBOutBoundHandlerC 重些exceptionCaught方法硝烂,代碼如下:

@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(this.getClass().getName() + " exceptionCaught ");
        super.exceptionCaught(ctx, cause);
    }

啟動(dòng)server端代碼箕别,本地通過telnet命令telnet localhost 8007 觸發(fā)后,隨便輸入字符,我們可以看到控制臺會輸出以下結(jié)果:

com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec 
com.tyust.netty.exception.InBoundHandlerC exceptionCaught 
com.tyust.netty.exception.OutBoundHandlerA exceptionCaught 
com.tyust.netty.exception.OutBoundHandlerB exceptionCaught 
com.tyust.netty.exception.OutBoundHandlerC exceptionCaught 
21:09:05.814 [nioEventLoopGroup-3-1] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: com.tyust.netty.exception.InBoundHandlerB happen error

從日志顯示得出,異常是隨著handler的添加順序進(jìn)行傳播串稀,接下來我們進(jìn)行斷點(diǎn)分析;在調(diào)用完InBoundHandlerB#channelRead方法后,事件會往下一個(gè)節(jié)點(diǎn)進(jìn)行傳播除抛,但由于出現(xiàn)了異常,代碼會進(jìn)入這個(gè)位置io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)母截,緊接著到忽,他會去找pipeline中下一個(gè)重寫了exceptionCaught的方法
找到了InboundHandlerB

image.png
image.png
image.png

也就出現(xiàn)了我們控制臺中顯示的com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec 日志輸出;

接下來他會繼續(xù)找下一個(gè)重寫了exceptionCaught的方法也就是InBoundHandlerC微酬,以此類推绘趋,最后會執(zhí)行到TailContextexceptionCaught方法

image.png

image.png

最后我們看下TailContextexceptionCaught方法,它什么事情都沒做颗管,只是把日志進(jìn)行輸出,然后進(jìn)行一場回收

image.png

image.png

這樣其實(shí)很不友好,異常是反映我們系統(tǒng)是否出問題最重要的一個(gè)因素陷遮,我們需要將其捕獲進(jìn)行處理,因此常用的處理流程是調(diào)整我們的代碼添加一個(gè)異常的handler

public class ExceptionHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(this.getClass().getName() + " 異常處理,e:" + cause);
    }
}
...
 .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new InBoundHandlerA());
                            ch.pipeline().addLast(new InBoundHandlerB());
                            ch.pipeline().addLast(new InBoundHandlerC());
                            ch.pipeline().addLast(new OutBoundHandlerA());
                            ch.pipeline().addLast(new OutBoundHandlerB());
                            ch.pipeline().addLast(new OutBoundHandlerC());
                            ch.pipeline().addLast(new ExceptionHandler());
                        }
                    });

好了垦江,我們的事件及異常傳播到這里就結(jié)束了帽馋,留給大家兩個(gè)問題,大家可以沿著我們上面的分析去解決這兩個(gè)問題:

  • outbound事件傳播中比吭,如果OutBoundHandlerA#handlerAdded使用的case2中的代碼绽族,事件會是怎么樣在pipeline中傳播的?
   @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //case 1
        ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);

        //case 2
        ctx.executor().schedule(() -> ctx.write("hello,world"), 3, TimeUnit.SECONDS);
    }
  • 同樣的,在 inBound事件傳播中衩藤,如果InBoundHandlerA#channelActive方法中調(diào)用的是case2中的代碼吧慢,那事件是如何傳播的?
@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //case  1
        ctx.channel().pipeline().fireChannelRead("hello");
        //case  2
        ctx.fireChannelRead("hello");
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市赏表,隨后出現(xiàn)的幾起案子检诗,更是在濱河造成了極大的恐慌,老刑警劉巖瓢剿,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件逢慌,死亡現(xiàn)場離奇詭異,居然都是意外死亡间狂,警方通過查閱死者的電腦和手機(jī)攻泼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鉴象,“玉大人忙菠,你說我怎么就攤上這事》谋祝” “怎么了只搁?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長俭尖。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么稽犁? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任焰望,我火速辦了婚禮,結(jié)果婚禮上已亥,老公的妹妹穿的比我還像新娘熊赖。我一直安慰自己,他們只是感情好虑椎,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布震鹉。 她就那樣靜靜地躺著,像睡著了一般捆姜。 火紅的嫁衣襯著肌膚如雪传趾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天泥技,我揣著相機(jī)與錄音浆兰,去河邊找鬼。 笑死珊豹,一個(gè)胖子當(dāng)著我的面吹牛簸呈,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播店茶,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼蜕便,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了贩幻?” 一聲冷哼從身側(cè)響起轿腺,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎段直,沒想到半個(gè)月后吃溅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鸯檬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年决侈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片喧务。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡赖歌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出功茴,到底是詐尸還是另有隱情庐冯,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布坎穿,位于F島的核電站展父,受9級特大地震影響返劲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜栖茉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一篮绿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧吕漂,春花似錦亲配、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至苍鲜,卻和暖如春思灰,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背坡贺。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工官辈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人遍坟。 一個(gè)月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓拳亿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親愿伴。 傳聞我的和親對象是個(gè)殘疾皇子肺魁,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354