netty中事件的傳播主要包含inBound
事件和outBound
事件
ChannelInboundHandler extends ChannelHandler
ChannelOutboundHandler extends ChannelHandler
首先我們看下ChannelInboundHandler
接口坛怪,主要包含以下方法进倍,基本都是用和連接事件
相關(guān)的
/**
* channel 注冊到NioEventLoop上的回調(diào)
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* channel 解除注冊到NioEventLoop上的回調(diào)
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* channel在激活之后的回調(diào)
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* channel失效之后的回調(diào)
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* channel在讀取數(shù)據(jù),或者接收 到鏈接之后的回調(diào),
* 對于服務(wù)端channel,這里的msg是一個(gè)鏈接
* 對于客戶端channel,這里的msg是一個(gè)ByteBuf
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* 數(shù)據(jù)讀取完成后的一個(gè)回調(diào)
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* 用戶可以自定義的一些事件
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* 異常事件的傳播
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
而InBoundHandler
有其對應(yīng)的對應(yīng)的實(shí)現(xiàn)類
class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
接下來基于ChannelInboundHandlerAdapter
講一下read事件
在pipeline
中的傳播流程,這是我們案例的服務(wù)端代碼
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
/**
添加3個(gè)InBoundHandler
*/
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
// Start the server.
ChannelFuture f = b.bind(8007).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
InBoundHandlerA
的實(shí)現(xiàn)如下:
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().pipeline().fireChannelRead("hello");
}
}
InBoundHandlerB
,InBoundHandlerC
的實(shí)現(xiàn)如下:
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
}
啟動(dòng)server
,本地通過telnet
命令telnet localhost 8007
觸發(fā)后偷办,我們可以看到控制臺會輸出以下結(jié)果:
com.tyust.netty.inbound.InBoundHandlerA read msg:hello
com.tyust.netty.inbound.InBoundHandlerB read msg:hello
com.tyust.netty.inbound.InBoundHandlerC read msg:hello
17:21:48.214 [nioEventLoopGroup-3-1] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.
從控制臺輸出的日志可以得知,read
事件在pipeline
中的傳播是基于InBoundHandler
在pipeline
中的添加順序來的.接下來從InBoundHandlerA
的這行代碼ctx.channel().pipeline().fireChannelRead("hello")
入手,基于源碼分析一下這個(gè)中間的執(zhí)行流程.
代碼位置:io.netty.channel.DefaultChannelPipeline#fireChannelRead
可以看出這個(gè)事件是在
Head
節(jié)點(diǎn)開始傳播的。
基于前面的文章,我們都知道,實(shí)例中的代碼的pipeline
結(jié)構(gòu)是這樣的
而現(xiàn)在我們在IA這個(gè)節(jié)點(diǎn)觸發(fā)了一個(gè)read
事件澄港,流程也就是這樣的
根據(jù)代碼可知椒涯,最開始是在head
節(jié)點(diǎn)開始傳播的,從head
節(jié)點(diǎn)觸發(fā)之后回梧,我們繼續(xù)跟代碼废岂,看到代碼后會進(jìn)入HeadContext
進(jìn)行處理
代碼進(jìn)入這個(gè)位置:io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
這時(shí)候會開始去找尋下一個(gè)
InboundHandler
祖搓,我們看下找尋的邏輯,輪訓(xùn)pipeline
中的Handler
湖苞,碰到inbound
的就返回.inbound
和outbound
的標(biāo)示是在構(gòu)建context
的時(shí)候就定好了的
繼續(xù)跟代碼拯欧,這時(shí)候就得到了InBoundHandlerA
,繼續(xù)調(diào)用其invokeChannelRead
财骨,就進(jìn)入了我們的InBoundHandlerA#channelRead
方法
同樣的镐作,InBoundHandlerB
和InBoundHandlerB
的執(zhí)行也是一樣的思路.
最后,事件會傳播到我們的TailContext
節(jié)點(diǎn)
看下我們TailContext
中的read
邏輯,會打印出前面我們控制臺中顯示的那一段
Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.
的日志,最后msg
進(jìn)行回收,避免內(nèi)存泄漏.
好的隆箩,我們的InBound
事件的傳播就分析到這里该贾,接下來我們看outBound
事件。
看到ChannelOutboundHandler
接口的定義捌臊,可以看出基本都是跟IO讀寫
相關(guān)的事件
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
而OutBoundHandler
也有其對應(yīng)的對應(yīng)的實(shí)現(xiàn)類
class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler
接下來基于ChannelOutboundHandlerAdapter
講一下write事件
在pipeline
中的傳播流程杨蛋,基于之前的代碼,我們改下添加handler
的部分
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
InBoundHandlerA
的實(shí)現(xiàn)如下:
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
}
}
InBoundHandlerB
理澎,InBoundHandlerC
的代碼實(shí)現(xiàn)如下:
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
}
啟動(dòng)server
逞力,本地通過telnet
命令telnet localhost 8007
觸發(fā)后,我們可以看到控制臺會輸出以下結(jié)果:
com.tyust.netty.outbound.OutBoundHandlerC write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerB write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerA write msg: hello,world
從控制臺輸出的日志可以得知, write
事件在pipeline
中的傳播是基于outBoundHandler
在pipeline
中的添加順序逆向順序來的矾端。接下來從OutBoundHandlerA
的這行代碼ctx.channel().pipeline().fireChannelRead("hello")
入手掏击,基于源碼分析一下這個(gè)中間的執(zhí)行流程。
從代碼流程來看秩铆,可以看出事件傳播是從TailContext
開始傳播
接著砚亭,會去pipeline
中開始尋找下一個(gè)節(jié)點(diǎn)OutBoundHandlerC
接著代碼就會進(jìn)入OutBoundHandlerC#write
方法中,OutBoundHandlerC
中事件會繼續(xù)沿著pipeline
往下進(jìn)行傳播殴玛,最終會傳播到HeadContext
流程就是如圖所示:
最后我們看下在HeadContext
中對write事件
的處理捅膘,他會調(diào)用unsafe
的write
方法,unsafe#write
主要是將數(shù)據(jù)寫會到客戶端滚粟,這里對unsafe
不做過多的解析寻仗,后面我們會詳細(xì)講unsafe
。
ok凡壤,outBound事件
就分析到這里署尤,接下來我們分析異常的傳播
修改我們server端的代碼變成如下:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
其中InBoundHandlerB
的代碼如下,調(diào)用channelRead
方法的時(shí)候會拋出一個(gè)異常:
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new RuntimeException(this.getClass().getName() + " happen error");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " exceptionCaught exec ");
ctx.fireExceptionCaught(cause);
}
}
InBoundHandlerA
,InBoundHandlerC
亚侠,OutBoundHandlerA
曹体,OutBoundHandlerB
,OutBoundHandlerC
重些exceptionCaught
方法硝烂,代碼如下:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " exceptionCaught ");
super.exceptionCaught(ctx, cause);
}
啟動(dòng)server
端代碼箕别,本地通過telnet
命令telnet localhost 8007
觸發(fā)后,隨便輸入字符,我們可以看到控制臺會輸出以下結(jié)果:
com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec
com.tyust.netty.exception.InBoundHandlerC exceptionCaught
com.tyust.netty.exception.OutBoundHandlerA exceptionCaught
com.tyust.netty.exception.OutBoundHandlerB exceptionCaught
com.tyust.netty.exception.OutBoundHandlerC exceptionCaught
21:09:05.814 [nioEventLoopGroup-3-1] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: com.tyust.netty.exception.InBoundHandlerB happen error
從日志顯示得出,異常是隨著handler
的添加順序進(jìn)行傳播串稀,接下來我們進(jìn)行斷點(diǎn)分析;在調(diào)用完InBoundHandlerB#channelRead
方法后,事件會往下一個(gè)節(jié)點(diǎn)進(jìn)行傳播除抛,但由于出現(xiàn)了異常,代碼會進(jìn)入這個(gè)位置io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
母截,緊接著到忽,他會去找pipeline
中下一個(gè)重寫了exceptionCaught
的方法
找到了InboundHandlerB
也就出現(xiàn)了我們控制臺中顯示的com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec
日志輸出;
接下來他會繼續(xù)找下一個(gè)重寫了exceptionCaught
的方法也就是InBoundHandlerC
微酬,以此類推绘趋,最后會執(zhí)行到TailContext
的exceptionCaught
方法
最后我們看下TailContext
的exceptionCaught
方法,它什么事情都沒做颗管,只是把日志進(jìn)行輸出,然后進(jìn)行一場回收
這樣其實(shí)很不友好,異常是反映我們系統(tǒng)是否出問題最重要的一個(gè)因素陷遮,我們需要將其捕獲進(jìn)行處理,因此常用的處理流程是調(diào)整我們的代碼添加一個(gè)異常的handler
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " 異常處理,e:" + cause);
}
}
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
ch.pipeline().addLast(new ExceptionHandler());
}
});
好了垦江,我們的事件及異常傳播到這里就結(jié)束了帽馋,留給大家兩個(gè)問題,大家可以沿著我們上面的分析去解決這兩個(gè)問題:
- 在
outbound事件
傳播中比吭,如果OutBoundHandlerA#handlerAdded
使用的case2中的代碼绽族,事件會是怎么樣在pipeline
中傳播的?
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//case 1
ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
//case 2
ctx.executor().schedule(() -> ctx.write("hello,world"), 3, TimeUnit.SECONDS);
}
- 同樣的,在
inBound事件
傳播中衩藤,如果InBoundHandlerA#channelActive
方法中調(diào)用的是case2中的代碼吧慢,那事件是如何傳播的?
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//case 1
ctx.channel().pipeline().fireChannelRead("hello");
//case 2
ctx.fireChannelRead("hello");
}