通過SSL/TLS 保護(hù)Netty 應(yīng)用程序
Netty 通過一個(gè)名為SslHandler 的ChannelHandler實(shí)現(xiàn)利用javax.net.ssl下的SSLContext和SSLEngine的api贾陷,其中SslHandler 在內(nèi)部使用SSLEngine 來完成實(shí)際的工作婿屹。
通過SslHandler 進(jìn)行解密和加密的數(shù)據(jù)流
public class SslChannelInitializer extends ChannelInitializer<Channel>{
private final SslContext context; // 傳入要使用的SslContext
private final boolean startTls; // 如果設(shè)置為true儡羔,第一個(gè)寫入的消息將不會(huì)被加密(客戶端應(yīng)該設(shè)置為true)
public SslChannelInitializer(SslContext context,boolean startTls) {
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
// 對于每個(gè)SslHandler 實(shí)例,都使用Channel 的ByteBuf-Allocator 從SslContext 獲取一個(gè)新的SSLEngine
SSLEngine engine = context.newEngine(ch.alloc());
// 將SslHandler 作為第一個(gè)ChannelHandler 添加到ChannelPipeline 中
ch.pipeline().addFirst("ssl",new SslHandler(engine, startTls));
}
}
SslHandler 的方法
構(gòu)建基于Netty 的HTTP/HTTPS 應(yīng)用程序
- HTTP 解碼器、編碼器和編解碼器
HTTP 請求的組成部分
HTTP 響應(yīng)的組成部分
添加HTTP 支持例子,這樣你就知道有多簡單了:
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) { // 如果是客戶端
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
} else {
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
}
}
}
聚合HTTP 消息
由于HTTP 的請求和響應(yīng)可能由許多部分組成,因此你需要聚合它們以形成完整的消息。為了消除這項(xiàng)繁瑣的任務(wù)焰雕,Netty 提供了一個(gè)聚合器,它可以將多個(gè)消息部分合并為FullHttpRequest 或者FullHttpResponse 消息芳杏。
/**
* 自動(dòng)聚合HTTP 的消息片段
*/
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregatorInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024));
}
}
-
HTTP 壓縮
Netty 為壓縮和解壓縮提供了ChannelHandler 實(shí)現(xiàn)矩屁,它們同時(shí)支持gzip 和deflate 編碼。
/**
* 自動(dòng)壓縮HTTP 消息
*/
public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpCompressionInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
// 如果是客戶端爵赵,則添加HttpContentDecompressor 以處理來自服務(wù)器的壓縮內(nèi)容.
pipeline.addLast("decompressor",new HttpContentDecompressor());
} else {
pipeline.addLast("codec", new HttpServerCodec());
// 如果是服務(wù)器吝秕,則添加HttpContentCompressor來壓縮數(shù)據(jù)(如果客戶端支持它)
pipeline.addLast("compressor",new HttpContentCompressor());
}
}
}
- 使用HTTPS
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean isClient;
public HttpsCodecInitializer(SslContext context, boolean isClient) {
this.context = context;
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = context.newEngine(ch.alloc());
pipeline.addFirst("ssl", new SslHandler(engine));
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
}
}
-
WebSocket
WebSocket為網(wǎng)頁和遠(yuǎn)程服務(wù)器之間的雙向通信提供了一種替代HTTP輪詢的方案。
WebSocket 協(xié)議
要想向你的應(yīng)用程序中添加對于WebSocket 的支持空幻,你需要將適當(dāng)?shù)目蛻舳嘶蛘叻?wù)器WebSocket ChannelHandler 添加到ChannelPipeline 中烁峭。這個(gè)類將處理由WebSocket 定義的稱為幀的特殊消息類型。如表所示秕铛,WebSocketFrame 可以被歸類為數(shù)據(jù)幀或者控制幀约郁。
WebSocketFrame 類型
public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536), // 為握手提供聚合的HttpRequest
new WebSocketServerProtocolHandler("/websocket"), // 如果被請求的端點(diǎn)是"/websocket",則處理該升級握手
new TextFrameHandler(), // TextFrameHandler 處理TextWebSocketFrame
new BinaryFrameHandler(),
new ContinuationFrameHandler());
}
public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
}
要想為WebSocket 添加安全性但两,只需要將SslHandler 作為第一個(gè)ChannelHandler 添加到ChannelPipeline 中鬓梅。
空閑的連接和超時(shí)
用于空閑連接以及超時(shí)的ChannelHandler
以下代碼展示了當(dāng)使用通常的發(fā)送心跳消息到遠(yuǎn)程節(jié)點(diǎn)的方法時(shí),如果在60 秒之內(nèi)沒有接收或者發(fā)送任何的數(shù)據(jù)谨湘,我們將如何得到通知绽快;如果沒有響應(yīng),則連接會(huì)被關(guān)閉.
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler 將在被觸發(fā)時(shí)發(fā)送一個(gè)IdleStateEvent 事件
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE =Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { // 發(fā)送心跳消息悲关,并在發(fā)送失敗時(shí)關(guān)閉該連接
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else { // 不是IdleStateEvent事件谎僻,所以將它傳遞給下一個(gè)ChannelInboundHandler
super.userEventTriggered(ctx, evt);
}
}
}
}
解碼基于分隔符的協(xié)議和基于長度的協(xié)議
-
基于分隔符的協(xié)議
基于分隔符的(delimited)消息協(xié)議使用定義的字符來標(biāo)記的消息或者消息段(通常被稱為幀)的開頭或者結(jié)尾娄柳。由RFC文檔正式定義的許多協(xié)議(如SMTP寓辱、POP3、IMAP以及Telnet)都是這樣的.
用于處理基于分隔符的協(xié)議和基于長度的協(xié)議的解碼器
由行尾符分隔的幀
處理由行尾符分隔的幀的例子:
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx,ByteBuf msg) throws Exception {
// Do something with the data extracted from the frame
}
}
}
作為示例赤拒,我們將使用下面的協(xié)議規(guī)范:
1.傳入數(shù)據(jù)流是一系列的幀秫筏,每個(gè)幀都由換行符(\n)分隔肃弟;
2.每個(gè)幀都由一系列的元素組成免姿,每個(gè)元素都由單個(gè)空格字符分隔乐设;
3.一個(gè)幀的內(nèi)容代表一個(gè)命令瘩蚪,定義為一個(gè)命令名稱后跟著數(shù)目可變的參數(shù)抖所。
我們用于這個(gè)協(xié)議的自定義解碼器將定義以下類:
1.Cmd—將幀(命令)的內(nèi)容存儲在ByteBuf 中金吗,一個(gè)ByteBuf 用于名稱罗珍,另一個(gè)用于參數(shù)糊秆;
2.CmdDecoder—從被重寫了的decode()方法中獲取一行字符串始衅,并從它的內(nèi)容構(gòu)建一個(gè)Cmd 的實(shí)例冷蚂;
3.CmdHandler —從CmdDecoder 獲取解碼的Cmd 對象缭保,并對它進(jìn)行一些處理;
4.CmdHandlerInitializer —為了簡便起見蝙茶,我們將會(huì)把前面的這些類定義為專門的ChannelInitializer 的嵌套類艺骂,其將會(huì)把這些ChannelInboundHandler 安裝到ChannelPipeline 中。
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
final byte SPACE = (byte)' ';
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CmdDecoder(64 * 1024)); // 添加CmdDecoder 以提取Cmd 對象隆夯,并將它轉(zhuǎn)發(fā)給下一個(gè)ChannelInboundHandler
pipeline.addLast(new CmdHandler()); // 添加CmdHandler 以接收和處理Cmd 對象
}
public static final class Cmd {
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf name() {
return name;
}
public ByteBuf args() {
return args;
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
if (frame == null) {
return null;
}
// 查找第一個(gè)空格字符的索引钳恕。前面是命令名稱,接著是參數(shù)
int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(), SPACE);
// 使用包含有命令名稱和參數(shù)的切片創(chuàng)建新的Cmd 對象
return new Cmd(frame.slice(frame.readerIndex(), index),frame.slice(index + 1, frame.writerIndex()));
}
}
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
// Do something with the command(獲取Cmd對象進(jìn)一步操作)
}
}
}
- 基于長度的協(xié)議
用于基于長度的協(xié)議的解碼器
解碼長度為8 字節(jié)的幀
你將經(jīng)常會(huì)遇到被編碼到消息頭部的幀大小不是固定值的協(xié)議蹄衷。為了處理這種變長幀忧额,你可以使用LengthFieldBasedFrameDecoder,它將從頭部字段確定幀長愧口,然后從數(shù)據(jù)流中提取指定的字節(jié)數(shù)宙址。
將變長幀大小編碼進(jìn)頭部的消息
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx,ByteBuf msg) throws Exception {
// Do something with the frame
}
}
}
-
寫大型數(shù)據(jù)
通過支持零拷貝的文件傳輸?shù)腃hannel 來發(fā)送的文件區(qū)域.
FileInputStream in = new FileInputStream(file);
// 以該文件的完整長度創(chuàng)建一個(gè)新的DefaultFileRegion
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// 發(fā)送該DefaultFileRegion,并注冊一個(gè)ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause(); // 處理失敗
// Do something
}
}
});
這個(gè)示例只適用于文件內(nèi)容的直接傳輸调卑,不包括應(yīng)用程序?qū)?shù)據(jù)的任何處理抡砂。在需要將數(shù)據(jù)從文件系統(tǒng)復(fù)制到用戶內(nèi)存中時(shí),可以使用ChunkedWriteHandler恬涧,它支持異步寫大型數(shù)據(jù)流注益,而又不會(huì)導(dǎo)致大量的內(nèi)存消耗。
ChunkedInput 的實(shí)現(xiàn)
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WriteStreamHandler()); // 一旦連接建立溯捆,WriteStreamHandler就開始寫文件數(shù)據(jù)
}
public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
序列化數(shù)據(jù)
- JDK 序列化
JDK 序列化編解碼器
-
使用JBoss Marshalling 進(jìn)行序列化
比JDK序列化最多快3 倍丑搔,而且也更加緊湊。
JBoss Marshalling 編解碼器
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider,MarshallerProvider marshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
pipeline.addLast(new MarshallingEncoder(marshallerProvider));
pipeline.addLast(new ObjectHandler()); // 添加ObjectHandler提揍,以處理普通的實(shí)現(xiàn)了Serializable 接口的POJO
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext,Serializable serializable) throws Exception {
// Do something
}
}
}
-
通過Protocol Buffers 序列化
Protocol Buffers 以一種緊湊而高效的方式對結(jié)構(gòu)化的數(shù)據(jù)進(jìn)行編碼以及解碼啤月。它具有許多的編程語言綁定,使得它很適合跨語言的項(xiàng)目劳跃。(由Google公司開發(fā)的、現(xiàn)在已經(jīng)開源的數(shù)據(jù)交換格式刨仑。)
Protobuf 編解碼器
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
private final MessageLite lite;
public ProtoBufInitializer(MessageLite lite) {
this.lite = lite;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufEncoder()); ①
pipeline.addLast(new ProtobufDecoder(lite));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Do something with the object
}
}
}