案例重現(xiàn)
首先我們通過(guò)具體的case重現(xiàn)一下TCP粘包的過(guò)程
我們模擬下故障場(chǎng)景隘谣,客戶端循環(huán)一百次調(diào)用服務(wù)端傳輸報(bào)文,服務(wù)端接收?qǐng)?bào)文并打印接收?qǐng)?bào)文和計(jì)數(shù)竖哩,同時(shí)根據(jù)報(bào)文回應(yīng)客戶端
服務(wù)端代碼
public class TimeServerHandler extends ChannelHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
ByteBuf byteBuf = (ByteBuf)msg;
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
System.out.println("received msg length:"+req.length);
String body = new String(req,"UTF-8").substring(0,req.length-System.getProperty("line.separator").length());
System.out.println("the time server receive order:"+body + ";the counter is:" + ++count);
String currentTIme = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()
).toString():"BAD ORDER";
currentTIme = currentTIme + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTIme.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class TimeServer {
public void bind(int port) throws Exception {
//創(chuàng)建兩個(gè)線程組 一個(gè)用于服務(wù)端接收客戶端的連接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//一個(gè)用于網(wǎng)絡(luò)讀寫(xiě)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChildChannelHander());
ChannelFuture future = b.bind(port).sync();
future.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHander extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
// ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length >0) {
try {
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e) {
}
}
try {
new TimeServer().bind(port);
} catch (Exception e) {
e.printStackTrace();
}
}
}
客戶端代碼
public class TimeClientHandler extends ChannelHandlerAdapter {
private int count;
private byte[] req;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i< 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf)msg;
byte[] req = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(req);
String body = new String(req,"UTF-8");
System.out.println("Now is : "+body + "the counter is :"+ ++count);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("Unexpected exception from downstream :"+cause.getMessage());
ctx.close();
}
}
public class TimeClient {
public void connect(int port, String host) throws Exception {
//創(chuàng)建讀寫(xiě)io線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
try {
new TimeClient().connect(port, "127.0.0.1");
} catch (Exception e) {
e.printStackTrace();
}
}
}
程序運(yùn)行結(jié)果
服務(wù)端
客戶端
按照設(shè)計(jì)初衷椅贱,客戶端應(yīng)該發(fā)送了100次數(shù)據(jù)給服務(wù)端,服務(wù)端每接收一次數(shù)據(jù)就回應(yīng)一次客戶端策严,那么客戶端應(yīng)該收到一百次消息,但是實(shí)際上客戶端就收到2次消息饿敲,服務(wù)端也只收到兩次消息妻导。說(shuō)明服務(wù)端和客戶端都發(fā)生了粘包現(xiàn)象。
產(chǎn)生粘包拆包的原因
TCP協(xié)議是基于流的協(xié)議诀蓉,是沒(méi)有邊界的一串?dāng)?shù)據(jù)栗竖,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的拆分,上述例子中默認(rèn)TCP緩存區(qū)的大小是1024個(gè)字節(jié)渠啤,服務(wù)端第一次收到的數(shù)據(jù)大小正好是1024個(gè)字節(jié),也就是說(shuō)多個(gè)小的報(bào)文可能封裝出一個(gè)大的數(shù)據(jù)進(jìn)行傳送添吗,而一個(gè)大的報(bào)文可能會(huì)被拆分成多個(gè)小包進(jìn)行傳送沥曹。
解決辦法
由于TCP是底層通訊協(xié)議,它不關(guān)心上層業(yè)務(wù),無(wú)法保證數(shù)據(jù)包不會(huì)拆包或者粘包妓美,那么這個(gè)問(wèn)題只能通過(guò)上層協(xié)議來(lái)解決僵腺,通常的解決辦法有以下幾點(diǎn):
1、消息定長(zhǎng)壶栋,例如每個(gè)報(bào)文都是500個(gè)字節(jié)辰如,如果報(bào)文不夠500個(gè)字節(jié),那么就填充
2贵试、報(bào)文尾部增加特殊分隔符
3琉兜、消息分為消息頭和消息體,消息頭定義消息的長(zhǎng)度毙玻,消息體還是真實(shí)傳送的報(bào)文(類型UDP協(xié)議)
Netty解決粘包拆包問(wèn)題
Netty提供了多種編碼解碼器處理上述問(wèn)題豌蟋,其中可以使用LineBasedFrameDecoder解決粘包問(wèn)題
服務(wù)端代碼只要上述TimeServer加一個(gè)LineBasedFrameDecoder的ChannelHandler
private class ChildChannelHander extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new TimeServerHandler());
}
}
客戶端代碼只要上述TimeClient中加一個(gè)LineBasedFrameDecoder的ChannelHandler
public void connect(int port, String host) throws Exception {
//創(chuàng)建讀寫(xiě)io線程組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
//socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
代碼運(yùn)行結(jié)果:
客戶端運(yùn)行結(jié)果
由此可見(jiàn),增加LineBasedFrameDecoder之后解決了粘包問(wèn)題
LineBasedFrameDecoder的工作原理是遍歷緩沖區(qū)的可讀字節(jié)桑滩,判斷是否是“\n”或者"\r\n"梧疲,如果有,那么就以該位置作為結(jié)束位置运准,從緩沖區(qū)可讀區(qū)域到結(jié)束位置作為一個(gè)完整報(bào)文幌氮,他是以標(biāo)識(shí)符作為解碼器。
LineBasedFrameDecoder 源碼分析:
我們先看下LineBasedFrameDecoder的類繼承圖胁澳,繼承了ByteToMessageDecoder方法该互,ByteToMessageDecoder是將ByteBuf字節(jié)解碼成其他消息類型的抽象類,它有一個(gè)關(guān)鍵的方法:
callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
作用就是從Channel讀到的字節(jié)數(shù)據(jù)轉(zhuǎn)換成對(duì)應(yīng)的具體消息類型List<Object>輸出,而具體怎么解碼是由繼承它的子類實(shí)現(xiàn)
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
LineBasedFrameDecoder繼承ByteToMessageDecoder并實(shí)現(xiàn)了具體的decode方法听哭。
當(dāng)LineBasedFrameDecoder加入到ChannelPipeLine管道后慢洋,當(dāng)Channel緩沖區(qū)中有數(shù)據(jù)時(shí)會(huì)調(diào)用channelRead方法,ByteToMessageDecoder的channelRead源碼:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//1、判斷是否是字節(jié)數(shù)據(jù)
if (msg instanceof ByteBuf) {
//2陆盘、初始化輸出數(shù)據(jù)
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
//3普筹、判斷cumulation是否為空,初始化后cumulation為空,first為true
first = cumulation == null;
if (first) {
cumulation = data;
} else {
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
expandCumulation(ctx, data.readableBytes());
}
cumulation.writeBytes(data);
data.release();
}
//調(diào)用解析程序隘马,將字節(jié)流轉(zhuǎn)換傳out集合對(duì)象
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
//將緩沖區(qū)回收防止內(nèi)存溢出
if (cumulation != null && !cumulation.isReadable()) {
cumulation.release();
cumulation = null;
}
//獲得返回?cái)?shù)據(jù)結(jié)果的大小
int size = out.size();
decodeWasNull = size == 0;
//依次調(diào)用ChannelPipeLine的下一個(gè)ChannelRead方法太防,并將編碼后的結(jié)果傳遞下去
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//1、判斷Bytebuf是否還有數(shù)據(jù)可讀
while (in.isReadable()) {
int outSize = out.size();
int oldInputLength = in.readableBytes();
//2酸员、調(diào)用實(shí)際的解碼方法蜒车,如果能讀到一行數(shù)據(jù),會(huì)將數(shù)據(jù)添加到out中
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
//3幔嗦、如果步驟2沒(méi)有添加成功酿愧,代碼無(wú)數(shù)據(jù)可讀,跳出循環(huán)
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
//4邀泉、如果還是無(wú)數(shù)據(jù)可讀嬉挡,直接拋異常
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
LineBasedFrameDecoder的Decode方法如下:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//調(diào)用實(shí)際解碼函數(shù)钝鸽,返回結(jié)果
Object decoded = decode(ctx, in);
if (decoded != null) {
//添加到返回結(jié)果集List中
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
//1、找到一行數(shù)據(jù)的結(jié)束標(biāo)識(shí)的位置
final int eol = findEndOfLine(buffer);
if (!discarding) {
if (eol >= 0) {
final ByteBuf frame;
//2庞钢、結(jié)束位置-上次已經(jīng)讀取的位置=當(dāng)前一行數(shù)據(jù)的長(zhǎng)度
final int length = eol - buffer.readerIndex();
//3拔恰、下次讀取需要跳過(guò)結(jié)束字符 所以將結(jié)束字符的長(zhǎng)度記錄下
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
//4、如果當(dāng)前一行數(shù)據(jù)長(zhǎng)度已經(jīng)大于最大定義的可解碼數(shù)據(jù)的長(zhǎng)度基括,代表已經(jīng)解析結(jié)束
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
if (stripDelimiter) {
//5颜懊、讀取當(dāng)前一行數(shù)據(jù)到臨時(shí)緩存區(qū)
frame = buffer.readBytes(length);
//6、下次讀取需要跳過(guò)結(jié)束字符
buffer.skipBytes(delimLength);
} else {
frame = buffer.readBytes(length + delimLength);
}
//返回本次讀取的一行數(shù)據(jù)
return frame;
} else {
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
discardedBytes = buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
private static int findEndOfLine(final ByteBuf buffer) {
//獲得當(dāng)前ByteBuf寫(xiě)的位置
final int n = buffer.writerIndex();
//從ByteBuf可讀位置開(kāi)始循環(huán)风皿,一直遍歷到ByteBuf寫(xiě)的位置河爹,如果有\(zhòng)n 或者\(yùn)r\n,則意味找到一行數(shù)據(jù)結(jié)束的位置 并返回結(jié)束位置
for (int i = buffer.readerIndex(); i < n; i ++) {
final byte b = buffer.getByte(i);
if (b == '\n') {
return i;
} else if (b == '\r' && i < n - 1 && buffer.getByte(i + 1) == '\n') {
return i; // \r\n
}
}
return -1; // Not found.
}
由上源碼分析可知,LineBasedFrameDecoder通過(guò)'\r'或者'\r\n'作為分割界限作為一個(gè)完整的報(bào)文輸出揪阶,如果需要其他定制的分隔符作為界限則可以繼承ByteToMessageDecoder 重寫(xiě)decode方法實(shí)現(xiàn)昌抠。