6. ChannelHandler and ChannelPipeline
6.1 The ChannelHandler family
6.1.1 Channel的生命周期
ChannelUnregistered
已創(chuàng)建魁衙,但是還沒有被注冊(cè)到EventLoop上。
ChannelRegistered
已創(chuàng)建,并且已經(jīng)注冊(cè)到EventLoop。
ChannelActive
連接上遠(yuǎn)程主機(jī)。
ChannelActive
沒有連接到遠(yuǎn)程主機(jī)太雨。
Channel狀態(tài)的變化會(huì)觸發(fā)相應(yīng)的事件。
6.1.2 ChannelHandler的生命周期
handlerAdd
添加handler
handlerRemove
刪除handler
exceptionCaught
發(fā)生異常
ChannelHandler有兩個(gè)重要的子接口:ChannelInboundHandler和ChannelOutboundHandler。
6.1.3 ChannelInboundHandler接口
接受到數(shù)據(jù)或者Channel的狀態(tài)發(fā)生改變會(huì)調(diào)用ChannelInboundHandler中的方法芦圾。注意,當(dāng)ChannelInboundHandler中的channelRead()方法被overwrite俄认,需要對(duì)ByteBuf實(shí)例持有的資源進(jìn)行顯示釋放个少。
public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}
可以使用SimpleChannelInboundHandler,它會(huì)自動(dòng)釋放資源眯杏,無(wú)需人工干預(yù):
@Sharable
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special
}
}
6.1.4 ChannelOutboundHandler接口
它一個(gè)比較強(qiáng)大的功能是延遲執(zhí)行夜焦。
CHANNELPROMISE VS. CHANNELFUTURE
CHANNELPROMISE是CHANNELFUTURE的子接口,CHANNELFUTURE是不可寫的岂贩,CHANNELPROMISE是可寫的(例如setSuccess(),setFailure()方法)
6.1.5 ChannelHandler adapters
6.1.6 資源管理
要注意ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()要釋放相應(yīng)的資源茫经,否則會(huì)產(chǎn)生內(nèi)存泄漏。netty使用引用計(jì)數(shù)法來(lái)管理內(nèi)存資源萎津⌒渡。可以使用netty提供的ResourceLeakDetector來(lái)發(fā)現(xiàn)潛在的內(nèi)存泄漏問題。
java -Dio.netty.leakDetectionLevel=ADVANCED
leakDetectionLevel可以為DISABLED锉屈、SIMPLE(默認(rèn))荤傲、ADVANCED和PARANOID。
6.2 ChannelPipeline接口
ChannelPipeline可以看成由ChannelHandler組成的鏈表部念,I/O事件會(huì)在ChannelPipeline上傳播弃酌。每個(gè)新Channel會(huì)綁定一個(gè)新ChannelPipeline,兩者是一對(duì)一關(guān)系儡炼。
事件傳播的時(shí)候妓湘,會(huì)判斷ChannelHandler的類型(implements Inbound還是OutBound的接口)和事件傳播的方向是否一致,不一致跳過乌询。
6.2.1 ChannelPipeline修改
ChannelPipeline中ChannelHandler可以動(dòng)態(tài)地被添加榜贴、刪除或者替換。
6.2.2 Firing events
會(huì)調(diào)用ChannelPipeline中下一個(gè)ChannelHandler里的方法。
代碼示例:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new MyHandler());
ch.pipeline().addLast(new MyHandler2());
}
});
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
class MyHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("in MyHandler1 , messageReceived invoked");
for(int i = 0;i < 10 ; i++) {
ctx.fireChannelInactive();//調(diào)用fireChannelInactive 10次
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("in MyHandler1 , channelInactive invoked");
}
}
class MyHandler2 extends SimpleChannelInboundHandler<String>{
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("in MyHandler2 ,messageReceived invoked");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("in MyHandler2 , channelInactive invoked");
}
}
輸出:
6.3 ChannelHandlerContext接口
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之間的聯(lián)系唬党,無(wú)論何時(shí)鹃共,添加一個(gè)ChannelHandler到ChannelPipeline就會(huì)創(chuàng)建一個(gè)ChannelHandlerContext。ChannelHandlerContext的主要功能是和所在ChannelPipeline的其他ChannelHandler交互驶拱。
ChannelHandlerContext有很多方法霜浴,大部分方法在Channel和ChannelPipeline里都出現(xiàn)過,但是這里有一個(gè)非常大的區(qū)別蓝纲,調(diào)用Channel和ChannelPipeline里的方法阴孟,會(huì)在整個(gè)pipeline里傳播(從頭到尾),而ChannelHandlerContext里同名的方法税迷,是從當(dāng)前ChannelHandler開始傳播永丝。
6.3.1 Using ChannelHandlerContext
6.3.2 ChannelHandler和ChannelHandlerContext的高級(jí)用法。
- ChannelHandlerContext的pipeline()方法可以獲取ChannelPipeline的引用箭养,這樣我們可以通過這個(gè)引用操作ChannelHandler慕嚷,實(shí)現(xiàn)動(dòng)態(tài)協(xié)議。
- 可以把ChannelHandlerContext的引用緩存起來(lái)毕泌,在ChannelHandler方法外面用喝检,甚至在一個(gè)不同的線程里使用。下面提供了一個(gè)示例懈词。
- 可以將一個(gè)ChannelHandler實(shí)例可能會(huì)被添加到不同的ChannelPipeline里蛇耀,但是需要使用@Sharable注解,此外還需注意的是坎弯,這個(gè)Sharable的ChannelHandler需要是線程安全的纺涤。
為什么需要@Sharable的ChannelHandler,一個(gè)需求就是通過這個(gè)@Sharable來(lái)統(tǒng)計(jì)多個(gè)Channel的數(shù)據(jù)抠忘。
6.4 異常處理
6.4.1 Inbound異常處理
由于exception默認(rèn)會(huì)從觸發(fā)異常的ChannelHandler繼續(xù)向后流動(dòng)撩炊,所以圖中的這種處理邏輯,我們一般放在最后ChannelPipeline的末尾崎脉。這樣就可以確保拧咳,無(wú)論是哪個(gè)ChannelHandler觸發(fā)異常,都能夠被捕獲并處理囚灼。如果不對(duì)異常做捕獲處理操作骆膝,netty會(huì)打印異常未被捕獲的日志。
6.4.2 outbound異常處理
進(jìn)行outbound操作灶体,要想知道結(jié)果(正常完成還是發(fā)生異常)阅签,需要這樣做:
每個(gè)outbound操作都會(huì)返回一個(gè)ChannelFuture。添加到ChannelFuture上的監(jiān)聽器會(huì)收到成功或者錯(cuò)誤通知蝎抽。
ChannelOutboundHandler中的方法絕大多數(shù)都會(huì)ChannelPromise類型的參數(shù)政钟。ChannelPromise也可以添加監(jiān)聽來(lái)接受異步通知。ChannelPromise是可寫的,可以通過它的setSucess()方法或者setFailure(Throwable cause)立即發(fā)布通知养交。
如果ChannelOutboundHandler自己拋出異常精算,netty會(huì)通知添加到ChannelPromise上的監(jiān)聽器。
7. EventLoop and threading model
7.1 Threading model overview
JDK早期版本多線程編程的方式是create
新線程再start
碎连。JDK5推出了Executor API
灰羽,它的線程池技術(shù)通過緩存和重用大大提高了性能。
- 有任務(wù)(
Runnable實(shí)現(xiàn)
)的時(shí)候破花,從線程池里挑選出一個(gè)空閑線程谦趣,把任務(wù)submit
給它疲吸。 - 任務(wù)執(zhí)行完畢了座每,線程變成空閑,回到線程池摘悴,等待下一次挑選使用峭梳。
線程池不能解決上下文切換開銷的問題,上下文的開銷在heavy load下會(huì)很大蹂喻。
7.2 EventLoop接口
EventLoop
是一個(gè)用來(lái)處理事件的任務(wù)葱椭,基本思想如下圖所示:
EventLoop
接口的API分為兩類:concurrent和networking。
- concurrent
基于java.util.concurrent
包口四,提供thread executors - networking
io.netty.channel
繼承了EventLoop接口孵运,提供了和Channel事件交互的能力。
7.2.1 Netty 4中I/O事件的處理
7.3.1 JDK 任務(wù)調(diào)度API
JDK5之前蔓彩,任務(wù)調(diào)度只能用java.util.Timer
治笨,Timer就是一個(gè)后臺(tái)線程,有很多限制:
- 如果執(zhí)行多個(gè)定時(shí)任務(wù)赤嚼,一個(gè)任務(wù)發(fā)生異常沒有捕獲旷赖,整個(gè)Timer線程會(huì)掛掉(其他所有任務(wù)都會(huì)down掉)
- 假如某個(gè)任務(wù)的執(zhí)行時(shí)間過長(zhǎng),超過一些任務(wù)的間隔時(shí)間更卒,會(huì)導(dǎo)致這些任務(wù)執(zhí)行推遲等孵。
JDK后續(xù)推出了java.util.concurrent
,其中定義的ScheduleExecutorService
克服了這些缺陷蹂空。
ScheduledExecutorService executor =Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
new Runnable() {
@Override
public void run() {
System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
//to do
executor.shutdown();
盡管ScheduledExecutorSevice
挺好用的俯萌,但是在負(fù)載大的時(shí)候有較大的性能耗費(fèi),netty進(jìn)行了優(yōu)化上枕。
7.3.2 使用EventLoop進(jìn)行任務(wù)調(diào)度
ScheduledExecutorService
也有一些限制咐熙,例如會(huì)創(chuàng)建額外創(chuàng)建一些線程來(lái)管理線程池,這在任務(wù)調(diào)度非常激烈的情況下姿骏,會(huì)成為性能的瓶頸糖声。netty沒有直接使用ScheduledExecutorService
,使用了繼承于ScheduledExecutorService
,自己實(shí)現(xiàn)的EventLoop
蘸泻。
Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().schedule(
new Runnable() {
@Override
public void run() {
System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
重復(fù)定時(shí)執(zhí)行:
Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
System.out.println("Run every 60 seconds");
}
}, 60, 60, TimeUnit.Seconds);
7.4 實(shí)現(xiàn)細(xì)節(jié)
7.4.1 線程管理
netty線程模型的優(yōu)越之處是在于它會(huì)確定當(dāng)前執(zhí)行線程的身份琉苇,再進(jìn)行相應(yīng)操作。如果當(dāng)前執(zhí)行線程被綁定到當(dāng)前的Channel
和EventLoop
悦施,會(huì)被直接執(zhí)行并扇,否則會(huì)被放到EventLoop
的隊(duì)列里,每個(gè)EventLoop
有自己?jiǎn)为?dú)的隊(duì)列抡诞。
Never put a long-running task in the execution queue, because it will block any other task from executing on the same thread.” If you must make blocking calls or execute long-running tasks, we advise the use of a dedicated EventExecutor.
7.4.2 EventLoop/Thread分配
EventLoopGroup
包含了EventLoops
和Channels
穷蛹,EventLoops
創(chuàng)建方式取決于使用哪種I/O.
異步I/O
異步I/O僅僅使用少量的EventLoops
,這些EventLoops
被很多的Channels
共享昼汗,這樣就可以用最少的線程接受很多的Channels
,而不是一個(gè)線程一個(gè)Channel
肴熏。
阻塞I/O
共同點(diǎn):每個(gè)Channel
的I/O事件只會(huì)被一個(gè)線程處理。
8. Bootstrapping
bootstrapping an application is the process of configuring it to run
8.1 Bootstrap classes
Namely, a server devotes a parent channel to accepting connections from clients and
creating child channels for conversing with them, whereas a client will most likely
require only a single, non-parent channel for all network interactions. (As we’ll see, this
applies also to connectionless transports such as UDP , because they don’t require a
channel for each connection.)
server需要一個(gè)parent channel來(lái)接受客戶端連接顷窒,需要?jiǎng)?chuàng)建多個(gè)child channels來(lái)應(yīng)答客戶端蛙吏。
client只需要一個(gè)單獨(dú)的channel,不需要parent channel鞋吉。
服務(wù)端處理使用ServerBootstrap
,客戶端使用Bootstrap
鸦做。
Why are the bootstrap classes Cloneable?
You’ll sometimes need to create multiple channels that have similar or identical settings. To support this pattern without requiring a new bootstrap instance to be created and configured for each channel, AbstractBootstrap has been marked Cloneable . Calling clone() on an already configured bootstrap will return another bootstrap instance that’s immediately usable. Note that this creates only a shallow copy of the bootstrap’s EventLoopGroup , so the latter will be shared among all of the cloned channels. This is acceptable, as the cloned channels are often short-lived, a typical case being a channel created to make an HTTP request.
8.2 Bootstrapping clients and connectionless protocols
Bootstrap
主要用來(lái)給客戶端和使用面向無(wú)連接的應(yīng)用創(chuàng)建Channels
。
Bootstraping a client:
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channeRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
} );
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
8.2.2 Channel和EventLoopGroup的兼容性
you can’t mix components having different
prefixes, such as NioEventLoopGroup and OioSocketChannel . The following listing
shows an attempt to do just that.
Channel
和EventLoopGroup
的前綴要一樣谓着。否則會(huì)拋出IllegalStateException
8.3 Bootstraping servers
ServerBootstrap類
A ServerBootstrap creating a ServerChannel on bind() , and the ServerChannel managing a number of child Channels.
相比 Bootstrap
類泼诱,增加了childHandler()
,childAttr()
,childOption()
方法。ServerChannel
來(lái)創(chuàng)建許許多多的子Channel
赊锚,代表接受的連接治筒。ServerBootstrap
提供了這些方法來(lái)簡(jiǎn)化對(duì)子Channel
的配置。
NioEventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throw Exception {
System.out.println("Received data");
}
} );
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bound attempt failed");
channelFuture.cause().printStackTrace();
}
}
} );
8.4 Bootstrapping clients from a Channel
Suppose your server is processing a client request that requires it to act as a client to
a third system. This can happen when an application, such as a proxy server, has to
integrate with an organization’s existing systems, such as web services or databases. In
such cases you’ll need to bootstrap a client Channel from a ServerChannel
作為服務(wù)端接受連接改抡,同時(shí)又作為客戶端矢炼,請(qǐng)求遠(yuǎn)程服務(wù)器(類似于proxy),最容易想到的辦法是再創(chuàng)建一個(gè)客戶端的Bootstrap
阿纤,但是這樣需要另外一個(gè)EventLoop
來(lái)處理客戶端角色的Channel
句灌,發(fā)生在服務(wù)端Channel
和客戶端Channel
之間數(shù)據(jù)交換引起的上文切換也會(huì)帶來(lái)額外的性能損耗。
最好的辦法是創(chuàng)建的客戶端Channel
和服務(wù)端Channel
欠拾,共享同一個(gè)EventLoop
:
ServerBootstrap bootstrap = new ServerBootstrap();
//Sets the EventLoopGroups that provide EventLoops for processing Channel events
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
ChannelFuture connectFuture;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//Creates a Bootstrap to connect to remote host
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println("Received data");
}
});
//Uses the same EventLoop as the one assigned to the accepted channel
bootstrap.group(ctx.channel().eventLoop());
connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws Exception {
if (connectFuture.isDone()) {
// do something with the data
//When the connection is complete performs some data operation (such as proxying)
}
}
});
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Server bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
8.5 Adding multiple ChannelHandlers during a bootstrap
在bootstrap
的時(shí)候胰锌,如何添加多個(gè)ChannelHandler
?
netty提供了ChannelInboundHandlerAdapter
的特殊子類ChannelInitializer
:
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
ChannelInitializer
提供了initChannel()
可以輕松添加ChannelHandlers
到ChannelPipeline
。
protected abstract void initChannel(C ch) throws Exception;
一旦Channel
注冊(cè)到EventLoop
,我們實(shí)現(xiàn)的initChannel()
就會(huì)被調(diào)用藐窄。當(dāng)initChannel()
返回的時(shí)候资昧,ChannelInitializer
實(shí)例會(huì)把自己從ChannelPipeline
中刪除。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializerImpl());
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();
對(duì)應(yīng)ChannelInitializerImpl
的實(shí)現(xiàn):
final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
8.6 Using Netty ChannelOptions and attributes
不需要我們手工配置每個(gè)Channel
,netty提供了option()
方法來(lái)把ChannelOptions
應(yīng)用到bootstrap
荆忍,ChannelOptions
中的配置會(huì)自動(dòng)地應(yīng)用到所有Channel
Netty的Channel
和bootstrap
類格带,提供了AttributeMap
抽象集合和AttributeKey<T>
泛型類,用來(lái)insert和retrieve屬性值撤缴。使用這些工具,我們可以安全地把任意類型的數(shù)據(jù)和Channel
關(guān)聯(lián)起來(lái)叽唱。
Attribute
的一個(gè)使用場(chǎng)景是屈呕,服務(wù)端應(yīng)用需要追蹤用戶和Channels
的關(guān)系」淄ぃ可以把用戶的ID作為一個(gè)屬性存到Channel
里虎眨。這樣就可以實(shí)現(xiàn)根據(jù)ID來(lái)路由消息和Channel
不活躍自動(dòng)關(guān)閉等功能。
final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
Integer idValue = ctx.channel().attr(id).get();
// do something with the idValue
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
throws Exception {
System.out.println("Received data");
}
});
bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();
8.7 Bootstrapping DatagramChannels
之前的bootstrap
示例代碼都是基于TCP-based的SocketChannel
镶摘,bootstrap
也可以配置為無(wú)連接協(xié)議嗽桩。
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
.handler(new SimpleChannelInboundHandler<DatagramPacket>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
// Do something with the packet
}
});
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Channel bound");
} else {
System.err.println("Bind attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
8.8 Shutdown
Alternatively, you can call Channel.close() explicitly on all active channels before calling EventLoopGroup.shutdownGracefully() . But in all cases, remember to shut down the EventLoopGroup itself.
EventLoopGroup.shutdownGracefully()
,它的返回值是一個(gè)future
,這也是一個(gè)異步操作。
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();
9 Unit testing
Netty提供了embedded transport
來(lái)測(cè)試ChannelHandlers
,embedded transport
是EmbeddedChannel
(一種特殊的Channel
實(shí)現(xiàn)) 的特色功能凄敢,可以簡(jiǎn)單地實(shí)現(xiàn)在pipeline
中傳播事件碌冶。
我們可以寫入inbound
或者outbound
數(shù)據(jù)到EmbeddedChannel
,然后檢查是否有東西傳輸?shù)?code>ChannelPipeline的末尾贡未。我們還可以確定消息是否被編解碼种樱,是否有ChannelHandler
被觸發(fā)。
Inbound data
會(huì)被ChannelInboundHandlers
處理俊卤,代表著從遠(yuǎn)程主機(jī)讀取的數(shù)據(jù)。
outbound data
會(huì)被ChannelOutboundHandlers
處理害幅,代表將要發(fā)送到遠(yuǎn)程主機(jī)的數(shù)據(jù)消恍。
相關(guān)API:
圖9.1展示了數(shù)據(jù)在EmbededChannel
的流動(dòng)情況。我們可以:
使用
writeOutbound()
,寫入消息到Channel
,讓消息以outbound
方向在pipeline
中傳遞以现。后續(xù)狠怨,我們可以使用readOutbound()
讀取處理過后的數(shù)據(jù),判斷結(jié)果是否與預(yù)期一致邑遏。使用
writeInbound()
,寫入消息到Channel
,讓消息以inbound
方向在pipeline
中傳遞佣赖。后續(xù),我們可以使用readInbound()
讀取處理過后的數(shù)據(jù)记盒,判斷結(jié)果是否與預(yù)期一致憎蛤。
9.2 Testing ChannelHandlers with EmbeddedChannel
9.2.1 Testing inbound messages
圖9.2 展示了一個(gè)簡(jiǎn)單的ByteToMessageDecoder
實(shí)現(xiàn)。如果有足夠的數(shù)據(jù)纪吮,這個(gè)Decoder會(huì)產(chǎn)生固定大小的frame俩檬。如果沒有足夠的數(shù)據(jù),沒有達(dá)到這個(gè)固定的size值碾盟,它會(huì)等待接下來(lái)的數(shù)據(jù)棚辽,繼續(xù)判斷能否接著產(chǎn)生frame。
具體代碼實(shí)現(xiàn)如下:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
while (in.readableBytes() >= frameLength) {
ByteBuf buf = in.readBytes(frameLength);
out.add(buf);
}
}
}
那么如何進(jìn)行單元測(cè)試呢冰肴,測(cè)試代碼如下:
public class FixedLengthFrameDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
// write bytes
assertTrue(channel.writeInbound(input.retain()));
assertTrue(channel.finish());
// read messages
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
@Test
public void testFramesDecoded2() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
assertFalse(channel.writeInbound(input.readBytes(2)));
assertTrue(channel.writeInbound(input.readBytes(7)));
assertTrue(channel.finish());
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(3), read);
read.release();
assertNull(channel.readInbound());
buf.release();
}
}
9.2.2 Testing outbound messages
我們需要測(cè)試一個(gè)編碼器:AbsIntegerEncoder
,它是Netty的MessageToMessageEncode
的一個(gè)實(shí)現(xiàn)屈藐,功能是將整數(shù)取絕對(duì)值榔组。
我們的流程如下:
EmbeddedChannel
會(huì)將一個(gè)四字節(jié)負(fù)數(shù)按照outbound方向?qū)懭?code>Channel。編碼器會(huì)從到來(lái)的
ByteBuf
讀取每個(gè)負(fù)數(shù)联逻,調(diào)用Math.abs()
獲得絕對(duì)值瓷患。-
編碼器將絕對(duì)值寫入到
ChannelHandlerPipe
。
編碼器代碼實(shí)現(xiàn):
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
ByteBuf in, List<Object> out) throws Exception {
while (in.readableBytes() >= 4) {
int value = Math.abs(in.readInt());
out.add(value);
}
}
}
怎么測(cè)試遣妥?請(qǐng)看下文:
public class AbsIntegerEncoderTest {
@Test
public void testEncoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 1; i < 10; i++) {
buf.writeInt(i * -1);
}
EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
assertTrue(channel.writeOutbound(buf));
assertTrue(channel.finish());
// read bytes
for (int i = 1; i < 10; i++) {
assertEquals(i, channel.readOutbound());
}
assertNull(channel.readOutbound());
}
}
9.3 Testing exception handling
為了測(cè)試異常處理擅编,我們有如下的示例。
為防止資源耗盡箫踩,當(dāng)我們讀取到的數(shù)據(jù)多于某個(gè)數(shù)值爱态,我們會(huì)拋出一個(gè)TooLongFrameException
在圖9.4中,最大frame的大小為3字節(jié)境钟,當(dāng)一個(gè)frame的字節(jié)數(shù)大于3锦担,它會(huì)被忽略,并且會(huì)拋出
TooLongFrameException
慨削,其他的pipeline
里的其他ChannelHandlers
要么覆寫exceptionCaught()
進(jìn)行捕獲處理洞渔,要么會(huì)忽略這個(gè)異常。
解碼器代碼:
public class FrameChunkDecoder extends ByteToMessageDecoder {
private final int maxFrameSize;
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (readableBytes > maxFrameSize) {
// discard the bytes
in.clear();
throw new TooLongFrameException();
}
ByteBuf buf = in.readBytes(readableBytes);
out.add(buf);
}
}
如何測(cè)試缚态,請(qǐng)看:
public class FrameChunkDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
assertTrue(channel.writeInbound(input.readBytes(2)));
try {
channel.writeInbound(input.readBytes(4));
Assert.fail();
} catch (TooLongFrameException e) {
// expected exception
}
assertTrue(channel.writeInbound(input.readBytes(3)));
assertTrue(channel.finish());
// Read frames
ByteBuf read = (ByteBuf) channel.readInbound();
assertEquals(buf.readSlice(2), read);
read.release();
read = (ByteBuf) channel.readInbound();
assertEquals(buf.skipBytes(4).readSlice(3), read);
read.release();
buf.release();
}
}
10.The codec framework
encoder
,將outbound
消息轉(zhuǎn)換成易于傳輸?shù)姆绞?大部分是字節(jié)流)磁椒。
decoder
,將inbound
網(wǎng)絡(luò)字節(jié)流轉(zhuǎn)回成應(yīng)用程序消息格式。
10.2 Decoders
兩種場(chǎng)景需要使用到Decoders
:
- 將字節(jié)流解碼成消息--
ByteToMessageDecoder
和ReplayingDecoder
- 將一種消息類型解碼成另一種類型--
MessageToMessageDecoder
10.2.1 ByteToMessageDecoder抽象類
功能: 將字節(jié)流解碼成消息或者另一種字節(jié)流玫芦。
使用示例ToIntegerDecoder
:
每次從ByteBuf
讀取四個(gè)字節(jié)浆熔,解碼成int
,添加到List
里。當(dāng)沒有更多的數(shù)據(jù)添加到List
,List
里的內(nèi)容會(huì)傳遞到下一個(gè)ChannelInboundHandler
桥帆。
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
編解碼框架里医增,消息處理完了,會(huì)自動(dòng)調(diào)用ReferenceCountUtil.release(message)
老虫,資源會(huì)自動(dòng)釋放叶骨。
Reference counting in codecs
As we mentioned in chapters 5 and 6, reference counting requires special attention. In the case of encoders and decoders, the procedure is quite simple: once a mes- sage has been encoded or decoded, it will automatically be released by a call toReferenceCountUtil.release(message)
. If you need to keep a reference for later use you can callReferenceCountUtil.retain(message)
. This increments the reference count, preventing the message from being released.
10.2.2 ReplayingDecoder抽象類
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
繼承于ByteToMessageDecoder
,特點(diǎn)是我們不再需要調(diào)用readableBytes()
,省了判斷數(shù)據(jù)是否足夠的邏輯祈匙。
注意:
不是所有的
ByteBuf
的操作都被支持忽刽。如果不支持會(huì)拋出UnsupportedOperationException
異常。ReplayingDecoder
會(huì)比ByteToMessageDecoder
稍慢菊卷。
ToIntegerDecoder2
:
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readInt());
}
}
更多的解碼工具可以在io.netty.handler.codec
下找到缔恳。
io.netty.handler.codec.LineBasedFrameDecoder
,通過換行符(\n
或者\r\n
)來(lái)解析消息洁闰。io.netty.handler.codec.http.HttpObjectDecoder
歉甚,解析HTTP數(shù)據(jù)。
10.2.3 MessageToMessageDecoder抽象類
消息格式互相轉(zhuǎn)換扑眉,如把一種類型的POJO轉(zhuǎn)換成另外一種纸泄。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
API差不多
示例:IntegerToStringDecoder
public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
@Override
public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
out.add(String.valueOf(msg));
}
}
一個(gè)更貼切詳細(xì)的例子是io.netty.handler.codec.http.HttpObjectAggregator
用 TooLongFrameException
防止資源耗盡:
public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
private static final int MAX_FRAME_SIZE = 1024;
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readable = in.readableBytes();
if (readable > MAX_FRAME_SIZE) {
in.skipBytes(readable);
throw new TooLongFrameException("Frame too big!");
}
// do something
}
}
10.3 Encoders
與解碼器類似赖钞,Encoders
分為兩種:
- 將消息編碼成字節(jié)流。
- 將一種消息編碼成另一種格式的消息聘裁。
10.3.1 MessageToByteEncoder抽象類
示例ShortToByteEncoder
:
public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
out.writeShort(msg);
}
}
更具體的應(yīng)用實(shí)踐可以參見io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder
10.4 編解碼抽象類
既能encode
,又能decode
雪营,二合一。
10.4.1 ByteToMessageCodec抽象類
Any request/response protocol could be a good candidate for using the
ByteToMessageCodec
. For example, in an SMTP implementation, the codec would read incoming bytes and decode them to a custom message type, saySmtpRequest
. On the receiving side, when a response is created, anSmtpResponse
will be produced, which will be encoded back to bytes for transmission.
10.4.2 MessageToMessageCodec抽象類
public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
@Override
protected void encode(ChannelHandlerContext ctx,
WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out)
throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
switch (msg.getType()) {
case BINARY:
out.add(new BinaryWebSocketFrame(payload));
break;
case TEXT:
out.add(new TextWebSocketFrame(payload));
break;
case CLOSE:
out.add(new CloseWebSocketFrame(true, 0, payload));
break;
case CONTINUATION:
out.add(new ContinuationWebSocketFrame(payload));
break;
case PONG:
out.add(new PongWebSocketFrame(payload));
break;
case PING:
out.add(new PingWebSocketFrame(payload));
break;
default:
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
@Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg,
List<Object> out) throws Exception {
ByteBuf payload = msg.getData().duplicate().retain();
if (msg instanceof BinaryWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY,
payload));
} else if (msg instanceof CloseWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE,
payload));
} else if (msg instanceof PingWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING,
payload));
} else if (msg instanceof PongWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG,
payload));
} else if (msg instanceof TextWebSocketFrame) {
out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT,
payload));
} else if (msg instanceof ContinuationWebSocketFrame) {
out.add(new MyWebSocketFrame(
MyWebSocketFrame.FrameType.CONTINUATION, payload));
} else {
throw new IllegalStateException("Unsupported websocket msg " + msg);
}
}
public static final class MyWebSocketFrame {
private final FrameType type;
private final ByteBuf data;
public WebSocketFrame(FrameType type, ByteBuf data) {
this.type = type;
this.data = data;
}
public FrameType getType() {
return type;
}
public ByteBuf getData() {
return data;
}
public enum FrameType {BINARY,
CLOSE,
PING,
PONG,
TEXT,
CONTINUATION;
}
}
}
10.4.3 CombinedChannelDuplexHandler類
將編碼器解碼器放在一塊影響代碼的重用性衡便。CombinedChannelDuplexHandler
可以解決這個(gè)問題献起。我們可以使用它而不直接使用codec抽象類。
方法簽名:
public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
下面是一個(gè)使用范例:
解碼器例子ByteToCharDecoder
功能是一次讀取2個(gè)字節(jié)镣陕,解碼成char
寫到List
里
public class ByteToCharDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
while (in.readableBytes() >= 2) {
out.add(in.readChar());
}
}
}
編碼器例子CharToByteEncoder
public class CharToByteEncoder extends MessageToByteEncoder<Character> {
@Override
public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out)
throws Exception {
out.writeChar(msg);
}
}
是時(shí)候combine
了:
public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
public CombinedByteCharCodec() {
super(new ByteToCharDecoder(), new CharToByteEncoder());
}
}