Netty學習之內置處理器以及編解碼器
前言
SSL/TLS
SSL/TLS是目前廣泛使用的加密,位于TCP之上费薄,其他的應用層協(xié)議之下饭宾,當應用層將數據交給SSL/TLS之后役电,數據會被進行加密彤叉,關于SSL/TLS更多的內容庶柿,可以參考:SSL/TLS協(xié)議運行機制的概述、OpenSSL 與 SSL 數字證書概念貼
在javax.net.ssl
中提供了原生的SSL/TLS支持秽浇,通過SSLContext
浮庐、SSLEngine
可以方便地進行數據的加密及解密。
在Netty中兼呵,為了方便開發(fā)者使用SSL/TLS兔辅,Netty提供了SSlHandler
(本質是一個ChannelHandler),只要為其配置一個SSLEngine即可進行加密數據傳輸击喂。
class SslEngineInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslEngineInitializer(SslContext context, boolean startTls) {
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.newEngine(ch.alloc());
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
}
}
HTTP/HTTPS
一個HTTP請求或者相應可能由多個部分組成维苔,一個完整的Http請求由以下內容組成
- 一個
HttpRequest
,表示請求頭部 - 一個或者多個
HttpContent
表示Http的內容 - 一個
LastHttpContent
標志Http內容的結束
由于一個Http請求包含請求部分以及相應部分懂昂,而對于客戶端及服務端來說介时,這兩者是不相同的,客戶端發(fā)送請求凌彬,服務端接收請求沸柔,服務端發(fā)送響應,客戶端接收響應铲敛,所以褐澎,需要有不同的處理器來處理不同的內容
常用編解碼器
HttpRequestEncoder
HttpResponseEncoder
HttpRequestDecoder
HttpResponseDecoder
class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
// 解碼響應
pipeline.addLast("decoder", new HttpResponseEncoder());
// 編碼請求
pipeline.addLast("encoder", new HttpRequestEncoder());
}else {
// 解碼請求
pipeline.addLast("decoder", new HttpRequestDecoder());
// 編碼響應
pipeline.addLast("encoder", new HttpResponseEncoder());
}
}
}
當一個字節(jié)流被解碼成Http內容之后,就可以操作具體的HttpObject消息了伐蒋,但是由于一個完整的請求/響應可能會被拆分成幾個部分工三,所以,直接使用其實不是很合適先鱼,更好地方式是使用聚合器俭正。
class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpAggregatorInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
// 客戶端編解碼器
// 等同于上面的兩者
pipeline.addLast("codec", new HttpClientCodec());
}else {
// 服務端編解碼器
pipeline.addLast("codec", new HttpServerCodec());
}
// 聚合器,允許最大大小為 512 * 1024
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
}
}
聚合之后我們可以直接使用FullHttpRequest
焙畔、FullHttpResponse
消息來處理掸读,這兩個對象表示的是完整的請求/響應了。
當使用HTTP的時候宏多,如果內容大部分是文本數據儿惫,我們一般會使用壓縮技術,雖然會增加CPU開銷伸但,但是可以有效地節(jié)省網絡帶寬姥闪,Netty同樣提供了對應的handler并且提供gzip和deflate技術。
class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpCompressionInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("codec", new HttpClientCodec());
// 客戶端解壓
pipeline.addLast("decompressor", new HttpContentDecompressor());
}else {
pipeline.addLast("codec", new HttpServerCodec());
// 服務端加壓
pipeline.addLast("compressor", new HttpContentCompressor());
}
}
}
同時需要注意砌烁,如果是JDK6及以前的版本筐喳,需要引入JZlib
依賴催式。
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<version>1.1.3</version>
</dependency>
如果我們需要使用HTTPS,只需要將SslHanlder配置在所有handler的最前面即可避归。
空閑檢測及超時
空閑檢測及超時荣月,也可以稱為心跳檢測,目的就是確保連接的另一端依舊在線梳毙,如果不在線哺窄,則斷開連接,節(jié)省資源账锹。
Netty中提供了幾個常用的handler
- IdleStateHandler萌业,如果連接空閑時間過長,則觸發(fā)一個
IdleStateEvent
奸柬,可以通過在ChannelInboundHandler覆蓋userEventTriggered()
來處理該事件 - ReadTimeoutHandler生年,當channel中一段時間沒有inbound數據的時候,拋出
readTimeoutException
并且關閉channel廓奕,可以通過exceptionCaught()
捕獲該異常抱婉。 - WriteTimeoutHandler,當channel中有一段時間沒有outbound數據時桌粉,拋出
writeTimeoutException
并且關閉channel蒸绩,可以通過exceptionCaught()
捕獲異常。
需要注意的是铃肯,IdleStateHandler
的作用是用于檢測channel在指定時間內是否有數據流通患亿,如果沒有的話,則觸發(fā)一個IdleStateEvent
押逼,該Event是用于通知本channel的步藕,而不是用于通知對方,所以宴胧,我們可以根據收到的Event來決定處理邏輯,比如
- 對于服務端:收到超過3個對應的事件表锻,表示超過3 * time時間內沒有交互恕齐,因此決定斷開連接。
- 對于客戶端:收到事件后瞬逊,發(fā)送一個心跳包(內容其實是隨意的显歧,主要是由數據流動),表明自己還活著(注意該事件同樣是給自己的确镊,不是給對方的士骤,所以都需要增加對應的邏輯)
下面舉一個具體的例子
服務端
public class Server {
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new IdleStateHandlerInitializer());
try {
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
/**
* 服務端空閑檢測
*/
class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new HeartbeatHandler());
}
/**
* 服務端的空閑處理邏輯
*/
private class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
// 如果超過,則斷開連接
if (event.state() == IdleState.ALL_IDLE) {
ctx.writeAndFlush(Unpooled.copiedBuffer("bybe".getBytes()));
ctx.close();
}
}else {
super.userEventTriggered(ctx, evt);
}
}
}
}
客戶端
class Client {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 如果這里改成70蕾域,則會斷開
pipeline.addLast(new IdleStateHandler(0, 0, 50, TimeUnit.SECONDS));
pipeline.addLast(new Heartbeat());
}
});
try {
ChannelFuture fu = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
fu.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
/**
* 客戶端空閑檢測
*/
private static class Heartbeat extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 發(fā)送心跳包
ctx.writeAndFlush(Unpooled.copiedBuffer("heartbeat".getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf data = (ByteBuf) msg;
System.out.println(data.toString(CharsetUtil.UTF_8));
}
}
}
基于分隔符及長度的協(xié)議處理
在某些協(xié)議中拷肌,是根據換行符或者指定長度來劃分的到旦,Netty中提供了基于這兩者的處理器
基于分隔符
Netty中主要的基于分隔符的處理器有以下兩個
- DelimeterBasedFrameDecoder,基于指定分隔符
- LineBasedFrameDecoder巨缘,基于換行符(DelimeterBasedFrameDecoder的特例)
基于長度
Netty中基于長度的處理器有以下兩個
- FixedLenghtFrameDecoder添忘,固定長度
- LengthFieldBasedFrameDecoder,通過構造器指定長度字段的偏移及所占字節(jié)數
發(fā)送大數據
為了高效地發(fā)送大量數據若锁,Netty中提供了FileRegion
接口(默認實現DefaultFileRegion
)搁骑,作為支持zero-copy
的傳輸器,用于在channel中發(fā)送文件
如果需要將數據從文件系統(tǒng)拷貝到用戶空間又固,可以使用ChunkedWriteHandler
仲器,它提供了低消耗內存異步將大數據流寫出。
序列化
Netty提供的JDK序列化相關的處理器
- CompatibleObjectDecoder仰冠,適用于非Netty的并且使用JDK的序列化器
- CompatibleObjectEncoder乏冀,同上
- ObjectDecoder,適用于在JDK序列化器之上使用自定義序列化
- ObjectEncoder沪停,同上
同時煤辨,Netty還提供了基于ProtoBuf的處理器,具體的可以參考文件即可木张,使用上基本差不多
總結
本小節(jié)我們主要學習了Netty所提供的幾個常用的handler众辨,包括了SSL/TLS相關的handler、HTTP相關的handler舷礼、空閑處理器(心跳)鹃彻、協(xié)議分割處理器以及序列化處理器等,有了這些常用的處理器妻献,可以不用處理具體協(xié)議的相關內容蛛株,從而可以更專注于邏輯方面的處理。