1. 編碼器解碼器的引入
通常情況下,當(dāng)我們得到ByteBuf情況下,我們?nèi)绻媒獯a得到我們想要的消息,通常情況下是如下這樣處理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
ByteBuf byteBuf = (ByteBuf)msg;
byte [] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String s = new String(bytes, Charset.forName("UTF-8"));
}
這樣的一個弊端是,每次InBoundHandler的channelRead讀取數(shù)據(jù)時候,都要讀到一個數(shù)組中,然后轉(zhuǎn)化成我們想要的數(shù)據(jù).這樣太過于麻煩了.如果我們直接將msg轉(zhuǎn)化成我們想要的類型,這就非常的快捷.這就是解碼器的目的.
2. DelimiterBasedFrameDecoder解碼器
DelimiterBasedFrameDecoder解碼器用來解碼以自定義特殊字符串結(jié)尾的消息,例如以"#"結(jié)尾的消息,每遇到一個以"#"字符串,則觸發(fā)channelRead()讀取數(shù)據(jù).
服務(wù)端代碼
public class TimeServer {
public void bind(int port)throws Exception{
/* 配置服務(wù)端的NIO線程組 */
// NioEventLoopGroup類 是個線程組耕餐,包含一組NIO線程脆粥,用于網(wǎng)絡(luò)事件的處理
// (實際上它就是Reactor線程組)瓦侮。
// 創(chuàng)建的2個線程組涵卵,1個是服務(wù)端接收客戶端的連接,另一個是進(jìn)行SocketChannel的
// 網(wǎng)絡(luò)讀寫
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup WorkerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 類击困,是啟動NIO服務(wù)器的輔助啟動類
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,WorkerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHandler());
// 綁定端口,同步等待成功
ChannelFuture f= b.bind(port).sync();
// 等待服務(wù)端監(jiān)聽端口關(guān)閉
f.channel().closeFuture().sync();
}finally {
// 釋放線程池資源
bossGroup.shutdownGracefully();
WorkerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel arg0)throws Exception{
// 消息用 _#_ 作為分隔符,加入到DelimiterBasedFrameDecoder中涎劈,第一個參數(shù)表示單個消息的最大長度,當(dāng)達(dá)到該
// 長度后仍然沒有查到分隔符沛励,就拋出TooLongFrameException異常责语,防止由于異常碼流缺失分隔符導(dǎo)致的內(nèi)存溢出
ByteBuf delimiter = Unpooled.copiedBuffer("_#_".getBytes());
arg0.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
arg0.pipeline().addLast(new StringDecoder());
arg0.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new TimeServer().bind(port);
}
}
- 以上定義了DelimiterBasedFrameDecoder,并且添加到Pipeline中.
- 同樣引入了StringDecoder編碼器,這個編碼器的作用是把讀到的內(nèi)容作為整個字符串,不分開.
- 以上過程中需要注意DelimiterBasedFrameDecoder,StringDecoder加入到 PipeLine中的順序.
先加入DelimiterBasedFrameDecoder,后加入StringDecoder.表明是以"#"作為結(jié)束符號,然后StringDecoder把這個結(jié)束的字符串作為讀取成功的.
服務(wù)端Handler
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter=0;
// 當(dāng)客戶端和服務(wù)端建立tcp成功之后,Netty的NIO線程會調(diào)用channelActive
// 發(fā)送查詢時間的指令給服務(wù)端目派。
// 調(diào)用ChannelHandlerContext的writeAndFlush方法,將請求消息發(fā)送給服務(wù)端
// 當(dāng)服務(wù)端應(yīng)答時胁赢,channelRead方法被調(diào)用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println(body+";the counter is :"+ (++counter));
ByteBuf resp = Unpooled.copiedBuffer(("hello"+"_#_").getBytes());
ctx.writeAndFlush(resp); //寫入操作
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
- 這里服務(wù)端傳給客戶端同樣采用以"#"作為分隔符號,然后讀取每個字符串.
客戶端代碼
public class TimeClient {
public void connect(String host,int port)throws Exception{
// 配置服務(wù)端的NIO線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
// Bootstrap 類企蹭,是啟動NIO服務(wù)器的輔助啟動類
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception{
ByteBuf delimiter = Unpooled.copiedBuffer("_#_".getBytes());
// 增加 DelimiterBasedFrameDecoder編碼器
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 發(fā)起異步連接操作
ChannelFuture f= b.connect(host,port).sync();
// 等待客服端鏈路關(guān)閉
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[]args)throws Exception{
int port = 8080;
if(args!=null && args.length>0){
try {
port = Integer.valueOf(args[0]);
}
catch (NumberFormatException ex){}
}
new TimeClient().connect("127.0.0.1",port);
}
}
- 客戶端的邏輯和服務(wù)端差不多
客戶端Handler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private int counter;
private final String echo_req = "aaaa_#_aa_#_";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception{
String body = (String)msg;
System.out.println( body+";the countor is : "+ ++counter);
}
@Override
public void channelActive(ChannelHandlerContext ctx){
// 當(dāng)客戶端和服務(wù)端建立tcp成功之后,Netty的NIO線程會調(diào)用channelActive
// 發(fā)送查詢時間的指令給服務(wù)端智末。
// 調(diào)用ChannelHandlerContext的writeAndFlush方法谅摄,將請求消息發(fā)送給服務(wù)端
// 當(dāng)服務(wù)端應(yīng)答時,channelRead方法被調(diào)用
ctx.writeAndFlush(Unpooled.copiedBuffer(echo_req.getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
System.out.println("message from:"+cause.getMessage());
ctx.close();
}
}
客戶端的Handler與服務(wù)端的Handler差不多.
客戶端運(yùn)行結(jié)果如圖
客戶端運(yùn)行結(jié)果如圖
結(jié)果分析,客戶端首先發(fā)動"aaaa"字符串(以"#"分隔),然后服務(wù)端發(fā)動hello給客戶端.接著客戶端發(fā)送剩下的"aa"給服務(wù)端,服務(wù)端發(fā)動"hello"給客戶端.
- 編碼器,解碼器的編程示范
接下來實現(xiàn)的是一個把一個字節(jié)轉(zhuǎn)化int的編解碼器.
解碼器實現(xiàn)如下
public class ByteToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes()>=4){
int n = in.readInt();
System.out.println("decode meg is "+n);
out.add(n);
}
}
}
- 注意最后需要List中去
編碼器實現(xiàn)如下
public class IntegerToByteEncoder extends MessageToByteEncoder<Integer>{
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
System.out.println("encode message is msg"+msg);
out.writeInt(msg);
}
}
- 上面的編碼器實現(xiàn)的是把int轉(zhuǎn)化成字節(jié)碼.
總結(jié):
通過以上的方式,我們能夠很快定義一個編碼器和解碼器,來處理傳輸過程中的數(shù)據(jù)轉(zhuǎn)換.對于Tcp的粘包問題,將在下面講到.
上面的代碼github地址:https://github.com/maskwang520/nettyinaction
參考文章: