概述
在Netty系列之Netty編解碼框架中有各種解碼器, 推薦組合:
- LengthFieldBasedFrameDecoder
- ByteToMessageDecoder
這兩個解碼器來處理業(yè)務(wù)消息闲询。但是有時候為了靈活性,會直接選擇繼承
ByteToMessageDecoder
來處理業(yè)務(wù)消息,但是直接繼承ByteToMessageDecoder,則需要自己處理半包問題木羹。
在閱讀本文內(nèi)容之前阳欲,你至少需要了解以下兩個知識點
1、netty的ByteBuf類的基本api用法
2谷羞、什么是TCP半包
雖然JAVA NIO中也有個ByteBuffer類,但是在Netty程序中,基本都是直接用Netty的ByteBuf類,它包裝了更多好用的接口,降低了使用緩沖區(qū)類的難度。
自定義消息協(xié)議
目前自定義的消息協(xié)議用的最多的是在消息中頭四個字節(jié)保存消息的長度,格式大概如下
len : 表示消息的長度,通常用4個字節(jié)保存
head : 消息頭部
body : 消息內(nèi)容
無論每次請求的業(yè)務(wù)數(shù)據(jù)多大,都是使用上面的消息格式來表示的。
注意
在實際的項目中,消息格式可能會增加一些標志,例如,開始標記,結(jié)束標志,消息序列號,消息的協(xié)議類型(json或者二進制等),這里為了描述的方便,就不講附加的這些消息標志了诀艰。
自定義解碼器處理半包數(shù)據(jù)
如上描述,直接繼承ByteToMessageDecoder類,同時覆蓋其decode方法,完整實現(xiàn)代碼如下
服務(wù)端代碼
package nettyinaction.encode.lengthfield.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new SocketServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}
finally {
parentGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}
package nettyinaction.encode.lengthfield.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SelfDefineEncodeHandler());
pipeline.addLast(new BusinessServerHandler());
}
}
package nettyinaction.encode.lengthfield.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
if (bufferIn.readableBytes() < 4) {
return;
}
int beginIndex = bufferIn.readerIndex();
int length = bufferIn.readInt();
if (bufferIn.readableBytes() < length) {
bufferIn.readerIndex(beginIndex);
return;
}
bufferIn.readerIndex(beginIndex + 4 + length);
ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);
}
}
package nettyinaction.encode.lengthfield.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class BusinessServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf)msg;
int length = buf.readInt();
assert length == (8);
byte[] head = new byte[4];
buf.readBytes(head);
String headString = new String(head);
assert "head".equals(headString);
byte[] body = new byte[4];
buf.readBytes(body);
String bodyString = new String(body);
assert "body".equals(bodyString);
}
}
客戶端代碼
package nettyinaction.encode.lengthfield.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new SocketClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().closeFuture().sync();
}
finally {
eventLoopGroup.shutdownGracefully();
}
}
}
package nettyinaction.encode.lengthfield.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SocketClientHandler());
}
}
package nettyinaction.encode.lengthfield.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class SocketClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(8);
buffer.writeBytes("head".getBytes());
buffer.writeBytes("body".getBytes());
ctx.writeAndFlush(buffer);
}
}
客戶端一旦啟動,會發(fā)送一條長度為8的消息到服務(wù)端,服務(wù)端首先使用SelfDefineEncodeHandler類對消息進行解碼,處理半包問題雳锋。如果消息是有效的完整的消息,當SelfDefineEncodeHandler處理完消息后,會把消息轉(zhuǎn)發(fā)給BusinessServerHandler處理,BusinessServerHandler只是簡單的做個驗證,判斷消息內(nèi)容是否符合預期黄绩。
運行上面的代碼,代碼如預期那樣,可以正確的讀取到消息并解析消息。
這個例子中,最為核心的類就是SelfDefineEncodeHandler了玷过。里面用了很多的技巧,要理解里面的每行代碼,需要分兩種情況來分析,分別是拆包和粘包
下面分別以拆包和粘包做兩個小試驗,來驗證SelfDefineEncodeHandler是否能正常的處理半包問題爽丹。
拆包試驗
先調(diào)整一下SocketClientHandler類中的channelActive方法中的代碼,將body擴大幾十倍,逼迫TCP發(fā)幾次請求到達服務(wù)端,看看服務(wù)端的SelfDefineEncodeHandler能否正常處理筑煮。
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(1604);
buffer.writeBytes("head".getBytes());
String longMsgBody = "";
for (int i = 0; i < 400; i++) {
longMsgBody = longMsgBody + "body";
}
buffer.writeBytes(longMsgBody.getBytes());
ctx.writeAndFlush(buffer);
使用一個for循環(huán),將消息body的長度設(shè)置為1600,加上長度為4的head,總共消息長度為1604。
然后調(diào)整一下服務(wù)端類SelfDefineEncodeHandler類的代碼,加上三行代碼粤蝎。
第一行代碼是加入一個類變量count真仲,統(tǒng)計一下decode方法的調(diào)用次數(shù)
private static int count = 0;
接著在decode方法中加入三行代碼
System.out.println("decode call count="+ ++count);
System.out.println("bufferIn.readableBytes()="+bufferIn.readableBytes());
System.out.println("beginIndex="+beginIndex);
打印出count和bufferIn.readableBytes()的大小以及beginIndex
最后在BusinessServerHandler類加入
private static int count = 0;
成員變量以及在channelRead方法中加入
System.out.println("BusinessServerHandler call count="+ ++count);
運行代碼,打印結(jié)果如下
decode call count=1
bufferIn.readableBytes()=1024
beginIndex=0
decode call count=2
bufferIn.readableBytes()=1608
beginIndex=0
BusinessServerHandler call count=1
這個結(jié)果說明了,雖然客戶端只是發(fā)送了一條消息,但是其實TCP底層是分兩個包發(fā)送給服務(wù)端,第一次發(fā)送了1024個字節(jié),后面的一次請求,才把消息剩下的內(nèi)容發(fā)送給服務(wù)端初澎。
雖然decode方法被調(diào)用了兩次,但是第一次讀取到的信息不完整,因此ByteToMessageDecoder會靜靜的等待另外一個包的到來,第二次讀取完整消息后,才把消息轉(zhuǎn)發(fā)給BusinessServerHandler類,從打印的結(jié)果看,
BusinessServerHandler類的channelRead方法只被調(diào)用了一次秸应。
到此我們知道SelfDefineEncodeHandler類的decode方法是可以應付拆包問題的,那到底是如何做到的呢?現(xiàn)在我們回頭仔細看看decode方法中的代碼谤狡。
第一部分代碼
if (bufferIn.readableBytes() < 4) {
return;
}
如果接收到的字節(jié)還不到4個字節(jié),也即是連消息長度字段中的內(nèi)容都不完整的,直接return灸眼。
第二部分代碼
int beginIndex = bufferIn.readerIndex();
int length = bufferIn.readInt();
if (bufferIn.readableBytes() < length) {
bufferIn.readerIndex(beginIndex);
return;
}
對于拆包這種場景,由于還未讀取到完整的消息,bufferIn.readableBytes() 會小于length,并重置bufferIn的readerIndex為0,然后退出,ByteToMessageDecoder會乖乖的等待下個包的到來。
由于第一次調(diào)用中readerIndex被重置為0,那么decode方法被調(diào)用第二次的時候,beginIndex還是為0的墓懂。
第三部分代碼
bufferIn.readerIndex(beginIndex + 4 + length);
將readerIndex設(shè)置為最大焰宣。首先代碼能執(zhí)行到這里,針對拆包這種場景而言,已經(jīng)是讀取到一條有效完整的消息了。這個時候需要通知ByteToMessageDecoder類,bufferIn中的數(shù)據(jù)已經(jīng)讀取完畢了,不要再調(diào)用decode方法了捕仔。ByteToMessageDecoder類的底層會根據(jù)bufferIn.isReadable()方法來判斷是否讀取完畢匕积。只有將readerIndex設(shè)置為最大,bufferIn.isReadable()方法才會返回false。
第四部分代碼
ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);
當decode方法執(zhí)行完后,會釋放bufferIn這個緩沖區(qū),如果將執(zhí)行完釋放操作的bufferIn傳遞給下個處理器的話,一旦下個處理器調(diào)用bufferIn的讀或者寫的方法時,會立刻報出IllegalReferenceCountException異常的榜跌。
因此slice操作后,必須加上一個retain操作,讓bufferIn的引用計數(shù)器加1,這樣ByteToMessageDecoder會刀下留人,先不釋放bufferIn闪唆。
粘包試驗
首先將SocketClientHandler類中的channelActive方法的實現(xiàn)改為
for (int i = 0; i < 20; i++) {
UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(8);
buffer.writeBytes("head".getBytes());
buffer.writeBytes("body".getBytes());
ctx.writeAndFlush(buffer);
}
客戶端發(fā)起20個請求到服務(wù)器端。
接著注釋掉SocketServerInitializer類中的
pipeline.addLast(new SelfDefineEncodeHandler());
代碼,使請求不走SelfDefineEncodeHandler解碼器钓葫。
運行代碼,執(zhí)行結(jié)果如下
BusinessServerHandler call count=1
說明客戶端發(fā)送了粘包,服務(wù)端只接收到一次請求∏睦伲現(xiàn)在把代碼調(diào)整回來,走SelfDefineEncodeHandler解碼器,運行代碼,執(zhí)行效果如下
decode call count=1
bufferIn.readableBytes()=240
beginIndex=0
BusinessServerHandler call count=1
decode call count=2
bufferIn.readableBytes()=228
beginIndex=12
BusinessServerHandler call count=2
decode call count=3
bufferIn.readableBytes()=216
beginIndex=24
BusinessServerHandler call count=3
decode call count=4
bufferIn.readableBytes()=204
beginIndex=36
BusinessServerHandler call count=4
decode call count=5
bufferIn.readableBytes()=192
beginIndex=48
BusinessServerHandler call count=5
decode call count=6
bufferIn.readableBytes()=180
beginIndex=60
BusinessServerHandler call count=6
decode call count=7
bufferIn.readableBytes()=168
beginIndex=72
BusinessServerHandler call count=7
decode call count=8
bufferIn.readableBytes()=156
beginIndex=84
BusinessServerHandler call count=8
decode call count=9
bufferIn.readableBytes()=144
beginIndex=96
BusinessServerHandler call count=9
decode call count=10
bufferIn.readableBytes()=132
beginIndex=108
BusinessServerHandler call count=10
decode call count=11
bufferIn.readableBytes()=120
beginIndex=120
BusinessServerHandler call count=11
decode call count=12
bufferIn.readableBytes()=108
beginIndex=132
BusinessServerHandler call count=12
decode call count=13
bufferIn.readableBytes()=96
beginIndex=144
BusinessServerHandler call count=13
decode call count=14
bufferIn.readableBytes()=84
beginIndex=156
BusinessServerHandler call count=14
decode call count=15
bufferIn.readableBytes()=72
beginIndex=168
BusinessServerHandler call count=15
decode call count=16
bufferIn.readableBytes()=60
beginIndex=180
BusinessServerHandler call count=16
decode call count=17
bufferIn.readableBytes()=48
beginIndex=192
BusinessServerHandler call count=17
decode call count=18
bufferIn.readableBytes()=36
beginIndex=204
BusinessServerHandler call count=18
decode call count=19
bufferIn.readableBytes()=24
beginIndex=216
BusinessServerHandler call count=19
decode call count=20
bufferIn.readableBytes()=12
beginIndex=228
BusinessServerHandler call count=20
結(jié)果符合預期,客戶端發(fā)送20次,服務(wù)端BusinessServerHandler類的channelRead執(zhí)行了20次。SelfDefineEncodeHandler類是如何做到這一點的呢础浮?還是得回頭仔細看看decode方法帆调。
第一部分代碼
if (bufferIn.readableBytes() < 4) {
return;
}
如果接收到的字節(jié)還不到4個字節(jié),也即是連消息長度字段中的內(nèi)容都不完整的,直接return。
第二部分代碼
int beginIndex = bufferIn.readerIndex();
int length = bufferIn.readInt();
if (bufferIn.readableBytes() < length) {
bufferIn.readerIndex(beginIndex);
return;
}
由于客戶端發(fā)送了粘包,decode方法將會接收到一條聚合了多條業(yè)務(wù)消息的大消息豆同,因此bufferIn.readableBytes()肯定大于length, bufferIn的readerIndex不會被重置番刊。只是decode方法每被執(zhí)行一次,beginIndex將會遞增12,也即是(length+4)。
第三部分代碼
bufferIn.readerIndex(beginIndex + 4 + length);
對于粘包這種場景,這行代碼就不是表示將readerIndex升到最高,而是將readerIndex后移(length+4)位影锈,讓beginIndex遞增(length+4)芹务。
第四部分代碼
ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);
slice操作,目的是從大消息中截取出一條有效的業(yè)務(wù)消息。
參考的文章
Netty權(quán)威指南里沒有說到的Decoder編寫細節(jié)
Netty系列之Netty編解碼框架分析
Netty之有效規(guī)避內(nèi)存泄漏
Netty高性能編程備忘錄(下)
https://blog.csdn.net/linsongbin1/java/article/details/77915686