一絮识、粘包與拆包
1绿聘、發(fā)送時(shí)的粘包與拆包
TCP連接維護(hù)了一個(gè)發(fā)送緩存區(qū)。將要發(fā)送給對(duì)端的數(shù)據(jù)會(huì)由socket API寫入該發(fā)送緩存區(qū)次舌。
TCP每次發(fā)送的報(bào)文段大小有限制熄攘,MSS就是單個(gè)TCP數(shù)據(jù)包能夠承載的最大數(shù)據(jù)分段大小。
TCP為了考慮數(shù)據(jù)傳輸效率彼念,會(huì)采用如下發(fā)送策略:
- 發(fā)送緩存區(qū)中存放的數(shù)據(jù)達(dá)到MSS字節(jié)時(shí)鲜屏,就組裝一個(gè)TCP報(bào)文段發(fā)送出去;
- 由發(fā)送方的應(yīng)用進(jìn)程指明要求立即發(fā)送報(bào)文段国拇;
- 發(fā)送計(jì)時(shí)器到期洛史,這時(shí)把緩存區(qū)中已有的數(shù)據(jù)裝入報(bào)文段(但長度不能超過MSS)發(fā)送出去;
正是由于MSS和發(fā)送策略酱吝,會(huì)有以下情況:
- 用戶數(shù)據(jù)包超過了mss也殖,那么這個(gè)用戶數(shù)據(jù)包在發(fā)送的時(shí)候必須拆分成多個(gè)TCP數(shù)據(jù)包,即發(fā)生拆包务热。
- 用戶數(shù)據(jù)包有效載荷非常低忆嗜,TCP的發(fā)送策略會(huì)將多個(gè)用戶數(shù)據(jù)包合并為一個(gè)TCP數(shù)據(jù)包進(jìn)行發(fā)送,即發(fā)生粘包崎岂。
2捆毫、接收時(shí)的粘包與拆包
由于TCP在發(fā)送數(shù)據(jù)時(shí)會(huì)發(fā)生粘包/拆包。所以接收過程也需要進(jìn)行對(duì)應(yīng)的粘包/拆包冲甘,以便將接收到的TCP數(shù)據(jù)包重新組裝為發(fā)送端發(fā)來的原始用戶數(shù)據(jù)包绩卤,并進(jìn)行后續(xù)業(yè)務(wù)處理途样。
3、接收端進(jìn)行粘包/拆包的原理
基本原理就是不斷從TCP緩沖區(qū)中讀取數(shù)據(jù)濒憋,并將新讀取到的數(shù)據(jù)向后追加到 本地消息緩存 中何暇,然后進(jìn)行解碼處理:
- 如果當(dāng)前本地消息緩存中不足以拼接成一個(gè)業(yè)務(wù)數(shù)據(jù)包,那就保留數(shù)據(jù)凛驮,繼續(xù)從tcp緩沖區(qū)中讀取數(shù)據(jù)裆站;
- 如果當(dāng)前本地消息緩存中能夠拼接成一個(gè)業(yè)務(wù)數(shù)據(jù)包,那就將對(duì)應(yīng)數(shù)據(jù)解碼成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包并傳遞給業(yè)務(wù)邏輯處理黔夭,本地消息緩存中剩余的多余數(shù)據(jù)仍然保留宏胯,以便和下次讀到的數(shù)據(jù)嘗試拼接。
二本姥、netty中的拆包器
0肩袍、總述
netty 中的拆包過程原理同上,拆包器基類為ByteToMessageDecoder
扣草,其內(nèi)部有一個(gè) 累加器 了牛,將每次新讀取到的數(shù)據(jù)不斷累加到本地字節(jié)容器颜屠,然后嘗試對(duì)累加后的本地字節(jié)容器中的數(shù)據(jù)進(jìn)行拆包辰妙,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包。
1甫窟、累加器
netty通過累加器實(shí)現(xiàn)將每次新讀取到的數(shù)據(jù)不斷累積到本地字節(jié)容器的操作密浑。
ByteToMessageDecoder
中定義了兩個(gè)累加器:
public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;
默認(rèn)情況下使用簡單的MERGE_CUMULATOR
累加器,原理是每次都將讀取到的數(shù)據(jù)通過內(nèi)存拷貝的方式粗井,拼接到一個(gè)大的字節(jié)容器中尔破,這個(gè)大的字節(jié)容器即為ByteToMessageDecoder
中的cumulation。
private Cumulator cumulator = MERGE_CUMULATOR;
累加器的累加操作實(shí)現(xiàn)
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
- 若當(dāng)前cumulation空間不足容納新讀取到的數(shù)據(jù)浇衬,則進(jìn)行擴(kuò)容懒构;
ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable);
- 使用
ByteBuf.writeBytes(in);
將新數(shù)據(jù)累加到字節(jié)容器cumulation中;
2耘擂、數(shù)據(jù)讀取與粘包/拆包
1胆剧、 代碼入口
2、 累加新讀取的數(shù)據(jù)到本地自己容器中
3醉冤、 將本地字節(jié)容器中的數(shù)據(jù)傳遞給業(yè)務(wù)拆包器拆包
4秩霍、 清理字節(jié)容器
5、 傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理
1)代碼入口
a蚁阳、NioEventLoop線程在處理IO事件的代碼processSelectedKey(SelectionKey k, AbstractNioChannel ch)
中铃绒,對(duì)于OP_READ事件會(huì)調(diào)用相應(yīng)Channel的NioByteUnsafe.read()
進(jìn)行處理。
b螺捐、NioByteUnsafe.read()
會(huì)分配一個(gè)ByteBuf byteBuf
并將TCP接收緩存中的數(shù)據(jù)讀取到byteBuf中颠悬,最后觸發(fā)channelRead事件將byteBuf傳遞給pipeline中的ByteToMessageDecoder
回調(diào)處理矮燎。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
上述代碼分為如下四個(gè)步驟:
1、 累加新讀取的數(shù)據(jù)到本地自己容器中
2椿疗、 將本地字節(jié)容器中的數(shù)據(jù)傳遞給業(yè)務(wù)拆包器拆包
3漏峰、 清理字節(jié)容器
4、 傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理
2)累加新讀取的數(shù)據(jù)到本地自己容器中
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
- 若當(dāng)前累加器中沒有數(shù)據(jù)(cumulation==null)届榄,則直接跳過內(nèi)存拷貝浅乔,將字節(jié)容器cumulation的指針指向新讀取的數(shù)據(jù);
- 若當(dāng)前累加器中有數(shù)據(jù)铝条,調(diào)用累加器cumulation的```cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)``方法累加新讀取到的數(shù)據(jù)到本地字節(jié)容器靖苇。
3)將本地字節(jié)容器中的數(shù)據(jù)傳遞給業(yè)務(wù)拆包器拆包
CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);
1、 到這一步班缰,本地字節(jié)容器中的數(shù)據(jù)是目前未經(jīng)拆包的所有數(shù)據(jù)贤壁;
2、 callDecode
將嘗試將本地字節(jié)容器的數(shù)據(jù)拆分成業(yè)務(wù)數(shù)據(jù)包埠忘,并放入業(yè)務(wù)數(shù)據(jù)包容器CodecOutputList out
中脾拆;
3、 對(duì)于業(yè)務(wù)數(shù)據(jù)包容器out莹妒,遍歷其中的業(yè)務(wù)數(shù)據(jù)包名船,通過ctx.fireChannelRead(msg);
將每個(gè)業(yè)務(wù)數(shù)據(jù)包傳遞給后續(xù)處理器進(jìn)行業(yè)務(wù)處理
具體的拆包工作由ByteToMessageDecode
的抽象方法定義:
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
netty中對(duì)各種用戶協(xié)議的支持就體現(xiàn)在這個(gè)抽象方法中,所有的拆包器最終都實(shí)現(xiàn)了該抽象方法旨怠。
decode后渠驼,如果發(fā)現(xiàn)并沒有拆到一個(gè)完整的數(shù)據(jù)包
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
- 若拆包器什么數(shù)據(jù)也沒讀取
oldInputLength == in.readableBytes()
,可能數(shù)據(jù)還不夠業(yè)務(wù)拆包器處理鉴腻,直接break等待新的數(shù)據(jù)迷扇; - 若拆包器已讀取部分?jǐn)?shù)據(jù),說明解碼器仍然在工作爽哎,繼續(xù)循環(huán)解碼蜓席。
4)清理字節(jié)容器
NioByteUnsafe.read()
每次處理OP_READ事件讀取完數(shù)據(jù),都會(huì)觸發(fā)一次channelReadComplete事件课锌。
ByteToMessageDecoder.channelReadComplete(ChannelHandlerContext ctx)
方法中實(shí)現(xiàn)了對(duì)本地字節(jié)容器的清理邏輯:
discardSomeReadBytes();
另外厨内,為防止發(fā)送端發(fā)送數(shù)據(jù)過快ByteToMessageDecoder.channelRead
中在每次拆包過后都會(huì)做一次判斷,如果讀取到的數(shù)據(jù)量過多也會(huì)主動(dòng)執(zhí)行本地字節(jié)容器的清理邏輯:
if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
discardSomeReadBytes()之前产镐,本地字節(jié)容器中的數(shù)據(jù)分布:
+--------------+----------+----------+
| readed | unreaded | writable |
+--------------+----------+----------+
discardSomeReadBytes()之后隘庄,本地字節(jié)容器中的數(shù)據(jù)分布:
+----------+-------------------------+
| unreaded | writable |
+----------+-------------------------+
**5)傳遞業(yè)務(wù)數(shù)據(jù)包給業(yè)務(wù)解碼器處理**
經(jīng)過上面幾個(gè)步驟完成之后,就可以將拆成的業(yè)務(wù)數(shù)據(jù)包交給后續(xù)業(yè)務(wù)處理器處理了:
fireChannelRead(ctx, out, size);
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
#三癣亚、拆包器的具體實(shí)現(xiàn)類
LineBasedFrameDecoder -> 根據(jù)換行符\n或\r\n進(jìn)行拆包
DelimiterBasedFrameDecoder -> 根據(jù)用戶定義的標(biāo)識(shí)符進(jìn)行拆包
LengthFieldBasedFrameDecoder -> 根據(jù)包頭長度進(jìn)行拆包丑掺,適用于私有協(xié)議解碼