粘包和拆包是TCP網(wǎng)絡(luò)編程中不可避免的镐依,無論是服務(wù)端還是客戶端眉厨,當(dāng)我們讀取或者發(fā)送消息的時(shí)候滑黔,都需要考慮TCP底層的粘包/拆包機(jī)制。
拆包與粘包同時(shí)發(fā)生在數(shù)據(jù)的發(fā)送方與接收方兩方宵距。
產(chǎn)生原因
發(fā)送方通過網(wǎng)絡(luò)每發(fā)送一批二進(jìn)制數(shù)據(jù)包猜极,那么這次所發(fā)送的數(shù)據(jù)包就稱為一幀,即
Frame消玄。在進(jìn)行基于 TCP 的網(wǎng)絡(luò)傳輸時(shí)跟伏,TCP 協(xié)議會(huì)將用戶真正要發(fā)送的數(shù)據(jù)根據(jù)當(dāng)前緩存
的實(shí)際情況對其進(jìn)行拆分或重組,變?yōu)橛糜诰W(wǎng)絡(luò)傳輸?shù)?Frame翩瓜。在 Netty 中就是將 ByteBuf
中的數(shù)據(jù)拆分或重組為二進(jìn)制的 Frame受扳。而接收方則需要將接收到的 Frame 中的數(shù)據(jù)進(jìn)行重
組或拆分,重新恢復(fù)為發(fā)送方發(fā)送時(shí)的 ByteBuf 數(shù)據(jù)兔跌。
具體場景描述:
- 發(fā)送方發(fā)送的 ByteBuf 較大勘高,在傳輸之前會(huì)被 TCP 底層拆分為多個(gè) Frame 進(jìn)行發(fā)送,這
個(gè)過程稱為發(fā)送拆包坟桅;接收方在接收到需要將這些 Frame 進(jìn)行合并华望,這個(gè)合并的過程稱
為接收方粘包。
- 發(fā)送方發(fā)送的 ByteBuf 較小仅乓,無法形成一個(gè) Frame赖舟,此時(shí) TCP 底層會(huì)將很多的這樣的小
的 ByteBuf 合并為一個(gè) Frame 進(jìn)行傳輸,這個(gè)合并的過程稱為發(fā)送方的粘包夸楣;接收方在
接收到這個(gè) Frame 后需要進(jìn)行拆包宾抓,拆分出多個(gè)原來的小的 ByteBuf,這個(gè)拆分的過程
稱為接收方拆包豫喧。
- 當(dāng)一個(gè) Frame 無法放入整數(shù)倍個(gè) ByteBuf 時(shí)石洗,最后一個(gè) ByteBuf 會(huì)會(huì)發(fā)生拆包。這個(gè)
ByteBuf 中的一部分入入到了一個(gè) Frame 中紧显,另一部分被放入到了另一個(gè) Frame 中讲衫。這
個(gè)過程就是發(fā)送方拆包。但對于將這些 ByteBuf 放入到一個(gè) Frame 的過程孵班,就是發(fā)送方
粘包涉兽;當(dāng)接收方在接收到兩個(gè) Frame 后,對于第一個(gè) Frame 的最后部分重父,與第二個(gè) Frame
的最前部分會(huì)進(jìn)行合并花椭,這個(gè)合并的過程就是接收方粘包。但在將 Frame 中的各個(gè)
ByteBuf 拆分出來的過程房午,就是接收方拆包。
具體情況如下圖所示:
解決方案
固定長度
對于使用固定長度的粘包和拆包場景丹允,可以使用:
FixedLengthFrameDecoder:每次讀取固定長度的消息郭厌,如果當(dāng)前讀取到的消息不足指定長度袋倔,那么就會(huì)等待下一個(gè)消息到達(dá)后進(jìn)行補(bǔ)足。其使用也比較簡單折柠,只需要在構(gòu)造函數(shù)中指定每個(gè)消息的長度即可宾娜。
bootstrap.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//接收套接字緩沖區(qū)大小
.option(ChannelOption.SO_RCVBUF, 1024 * 1024)
//發(fā)送套接字緩沖區(qū)大小
.option(ChannelOption.SO_SNDBUF, 1024 * 1024)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 這里將FixedLengthFrameDecoder添加到pipeline中,指定長度為100
pipeline.addLast(new FixedLengthFrameDecoder(100));
// StringEncoder:字符串編碼器扇售,將String編碼為將要發(fā)送到Channel中的ByteBuf
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
// StringDecoder:字符串解碼器前塔,將Channel中的ByteBuf數(shù)據(jù)解碼為String
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
//綁定處理器(可綁定多個(gè))
pipeline.addLast(new ServerHandler()); //處理業(yè)務(wù)
}
});
行拆包
LineBasedFrameDecoder:每個(gè)應(yīng)用層數(shù)據(jù)包,都以換行符作為分隔符承冰,進(jìn)行分割拆分华弓,LineBasedFrameDecoder依次遍歷ByteBuf中的可讀字節(jié),判斷是否有"\n"或者"\r\n"困乒,如果有就以此位置為結(jié)束位置寂屏,從可讀索引到結(jié)束位置區(qū)間的字節(jié)就組成了一行,它是以換行符為結(jié)束標(biāo)志的解碼器娜搂,支持?jǐn)y帶結(jié)束符或者不攜帶結(jié)束符兩種解碼方式迁霎,同時(shí)支持配置單行的最大長度,如果連續(xù)讀到最大長度后仍然沒有發(fā)現(xiàn)換行符百宇,就會(huì)拋出異常考廉,同時(shí)忽略掉之前讀到的異常碼流。這個(gè)使用也比較簡單:
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 這里將FixedLengthFrameDecoder添加到pipeline中携御,指定長度為100
//pipeline.addLast(new FixedLengthFrameDecoder(100));
//這里將LineBasedFrameDecoder添加到pipeline中芝此,設(shè)置最大長度為1024
pipeline.addLast(new LineBasedFrameDecoder(1024));
// StringEncoder:字符串編碼器,將String編碼為將要發(fā)送到Channel中的ByteBuf
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
// StringDecoder:字符串解碼器因痛,將Channel中的ByteBuf數(shù)據(jù)解碼為String
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
//綁定處理器(可綁定多個(gè))
pipeline.addLast(new ServerHandler()); //處理業(yè)務(wù)
}
});
指定分隔符
對于通過分隔符進(jìn)行粘包和拆包問題的處理婚苹,Netty提供了
DelimiterBasedFrameDecoder:通過用戶指定的分隔符對數(shù)據(jù)進(jìn)行粘包和拆包處理,用法如下:
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 這里將FixedLengthFrameDecoder添加到pipeline中鸵膏,指定長度為100
// pipeline.addLast(new FixedLengthFrameDecoder(100));
//這里將LineBasedFrameDecoder添加到pipeline中膊升,設(shè)置最大長度為1024
// pipeline.addLast(new LineBasedFrameDecoder(1024));
//被按照$_$進(jìn)行分隔,這里1024指的是分隔的最大長度谭企,即當(dāng)讀取到1024個(gè)字節(jié)的數(shù)據(jù)之后廓译,
// 若還是未讀取到分隔符,則舍棄當(dāng)前數(shù)據(jù)段债查,因?yàn)槠浜苡锌赡苁怯捎诖a流紊亂造成的
ByteBuf delimiter = copiedBuffer(Constants.MESSAGE_DELIMITER.getBytes(Charset.forName("UTF-8")));
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
// StringEncoder:字符串編碼器非区,將String編碼為將要發(fā)送到Channel中的ByteBuf
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
// StringDecoder:字符串解碼器,將Channel中的ByteBuf數(shù)據(jù)解碼為String
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
//綁定處理器(可綁定多個(gè))
pipeline.addLast(new ServerHandler()); //處理業(yè)務(wù)
}
});
基于數(shù)據(jù)包長度的拆包
LengthFieldBasedFrameDecoder:將應(yīng)用層數(shù)據(jù)包的長度盹廷,作為接收端應(yīng)用層數(shù)據(jù)包的拆分依據(jù)征绸。按照應(yīng)用層數(shù)據(jù)包的大小,拆包。這個(gè)拆包器管怠,有一個(gè)要求淆衷,就是應(yīng)用層協(xié)議中包含數(shù)據(jù)包的長度,應(yīng)用如下:
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 這里將LengthFieldBasedFrameDecoder添加到pipeline的首位渤弛,因?yàn)槠湫枰獙邮盏降臄?shù)據(jù)
// 進(jìn)行長度字段解碼祝拯,這里也會(huì)對數(shù)據(jù)進(jìn)行粘包和拆包處理
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// LengthFieldPrepender是一個(gè)編碼器,主要是在響應(yīng)字節(jié)數(shù)據(jù)前面添加字節(jié)長度字段
pipeline.addLast(new LengthFieldPrepender(2));
// StringEncoder:字符串編碼器她肯,將String編碼為將要發(fā)送到Channel中的ByteBuf
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
// StringDecoder:字符串解碼器佳头,將Channel中的ByteBuf數(shù)據(jù)解碼為String
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
//綁定處理器(可綁定多個(gè))
pipeline.addLast(new ServerHandler()); //處理業(yè)務(wù)
}
});
自定義粘包拆包器
可以通過實(shí)現(xiàn)MessageToByteEncoder
和ByteToMessageDecoder
來實(shí)現(xiàn)自定義粘包和拆包處理的目的。
MessageToByteEncoder:作用是將響應(yīng)數(shù)據(jù)編碼為一個(gè)ByteBuf對象
ByteToMessageDecoder:將接收到的ByteBuf數(shù)據(jù)轉(zhuǎn)換為某個(gè)對象數(shù)據(jù)
最后我們也可以自定義編碼器MessageToMessageEncoder和自定義解碼器MessageToMessageDecoder晴氨,來實(shí)現(xiàn)消息內(nèi)容的轉(zhuǎn)換康嘉,比如序列化成某個(gè)對象,處理器里面我們就可以不用再去轉(zhuǎn)換對象瑞筐,具體實(shí)現(xiàn)如下:
自定義解碼器
/**
* @Description: 自定義解碼器
* @author: dy
*/
public class CustomDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
System.out.println("====1111111111===="+msg.toString(Charset.forName("UTF-8")));
out.add(JSON.parseObject(msg.toString(Charset.forName("UTF-8")), Message.class));
}
}
使用只需要在構(gòu)造器里面加入我們自定義的編碼器就可以了:
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 這里將LengthFieldBasedFrameDecoder添加到pipeline的首位凄鼻,因?yàn)槠湫枰獙邮盏降臄?shù)據(jù)
// 進(jìn)行長度字段解碼,這里也會(huì)對數(shù)據(jù)進(jìn)行粘包和拆包處理
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// LengthFieldPrepender是一個(gè)編碼器聚假,主要是在響應(yīng)字節(jié)數(shù)據(jù)前面添加字節(jié)長度字段
pipeline.addLast(new LengthFieldPrepender(2));
// StringEncoder:字符串編碼器块蚌,將message對象編碼為將要發(fā)送到Channel中的ByteBuf
pipeline.addLast(new CustomDecoder());
// StringDecoder:字符串解碼器,將Channel中的ByteBuf數(shù)據(jù)解碼為String
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
//綁定處理器(可綁定多個(gè))
pipeline.addLast(new ServerHandler()); //處理業(yè)務(wù)
}
});