為什么要粘包拆包
為什么要粘包
首先你得了解一下TCP/IP協(xié)議切揭,在用戶數(shù)據(jù)量非常小的情況下,極端情況下锁摔,一個字節(jié)廓旬,該TCP數(shù)據(jù)包的有效載荷非常低,傳遞100字節(jié)的數(shù)據(jù)谐腰,需要100次TCP傳送嗤谚,100次ACK,在應用及時性要求不高的情況下怔蚌,將這100個有效數(shù)據(jù)拼接成一個數(shù)據(jù)包,那會縮短到一個TCP數(shù)據(jù)包旁赊,以及一個ack桦踊,有效載荷提高了,帶寬也節(jié)省了
非極端情況终畅,有可能兩個數(shù)據(jù)包拼接成一個數(shù)據(jù)包籍胯,也有可能一個半的數(shù)據(jù)包拼接成一個數(shù)據(jù)包,也有可能兩個半的數(shù)據(jù)包拼接成一個數(shù)據(jù)包
為什么要拆包
拆包和粘包是相對的离福,一端粘了包杖狼,另外一端就需要將粘過的包拆開,舉個栗子妖爷,發(fā)送端將三個數(shù)據(jù)包粘成兩個TCP數(shù)據(jù)包發(fā)送到接收端蝶涩,接收端就需要根據(jù)應用協(xié)議將兩個數(shù)據(jù)包重新組裝成三個數(shù)據(jù)包
還有一種情況就是用戶數(shù)據(jù)包超過了mss(最大報文長度),那么這個數(shù)據(jù)包在發(fā)送的時候必須拆分成幾個數(shù)據(jù)包,接收端收到之后需要將這些數(shù)據(jù)包粘合起來之后绿聘,再拆開
拆包的原理
在沒有netty的情況下嗽上,用戶如果自己需要拆包,基本原理就是不斷從TCP緩沖區(qū)中讀取數(shù)據(jù)熄攘,每次讀取完都需要判斷是否是一個完整的數(shù)據(jù)包
1.如果當前讀取的數(shù)據(jù)不足以拼接成一個完整的業(yè)務數(shù)據(jù)包兽愤,那就保留該數(shù)據(jù),繼續(xù)從tcp緩沖區(qū)中讀取挪圾,直到得到一個完整的數(shù)據(jù)包
2.如果當前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個數(shù)據(jù)包浅萧,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個完整的業(yè)務數(shù)據(jù)包傳遞到業(yè)務邏輯哲思,多余的數(shù)據(jù)仍然保留洼畅,以便和下次讀到的數(shù)據(jù)嘗試拼接
netty中拆包的基類
netty 中的拆包也是如上這個原理,內(nèi)部會有一個累加器也殖,每次讀取到數(shù)據(jù)都會不斷累加土思,然后嘗試對累加到的數(shù)據(jù)進行拆包,拆成一個完整的業(yè)務數(shù)據(jù)包忆嗜,這個基類叫做 ByteToMessageDecoder
己儒,下面我們先詳細分析下這個類
累加器
ByteToMessageDecoder
中定義了兩個累加器
public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;
默認情況下,會使用 MERGE_CUMULATOR
private Cumulator cumulator = MERGE_CUMULATOR;
MERGE_CUMULATOR
的原理是每次都將讀取到的數(shù)據(jù)通過內(nèi)存拷貝的方式捆毫,拼接到一個大的字節(jié)容器中闪湾,這個字節(jié)容器在 ByteToMessageDecoder
中叫做 cumulation
ByteBuf cumulation;
下面我們看一下 MERGE_CUMULATOR
是如何將新讀取到的數(shù)據(jù)累加到字節(jié)容器里的
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
netty 中ByteBuf的抽象,使得累加非常簡單绩卤,通過一個簡單的api調(diào)用 buffer.writeBytes(in);
便將新數(shù)據(jù)累加到字節(jié)容器中途样,為了防止字節(jié)容器大小不夠,在累加之前還進行了擴容處理
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
擴容也是一個內(nèi)存拷貝操作濒憋,新增的大小即是新讀取數(shù)據(jù)的大小
拆包抽象
累加器原理清楚之后何暇,下面我們回到主流程,目光集中在 channelRead
方法凛驮,channelRead
方法是每次從TCP緩沖區(qū)讀到數(shù)據(jù)都會調(diào)用的方法裆站,觸發(fā)點在AbstractNioByteChannel
的read
方法中,里面有個while
循環(huán)不斷讀取黔夭,讀取到一次就觸發(fā)一次channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
方法體不長不短宏胯,可以分為以下幾個邏輯步驟
1.累加數(shù)據(jù)
2.將累加到的數(shù)據(jù)傳遞給業(yè)務進行業(yè)務拆包
3.清理字節(jié)容器
4.傳遞業(yè)務數(shù)據(jù)包給業(yè)務解碼器處理
1 累加數(shù)據(jù)
如果當前累加器沒有數(shù)據(jù),就直接跳過內(nèi)存拷貝本姥,直接將字節(jié)容器的指針指向新讀取的數(shù)據(jù)肩袍,否則,調(diào)用累加器累加數(shù)據(jù)至字節(jié)容器
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
2 將累加到的數(shù)據(jù)傳遞給業(yè)務進行拆包
到這一步婚惫,字節(jié)容器里的數(shù)據(jù)已是目前未拆包部分的所有的數(shù)據(jù)了
CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);
callDecode
將嘗試將字節(jié)容器的數(shù)據(jù)拆分成業(yè)務數(shù)據(jù)包塞到業(yè)務數(shù)據(jù)容器out
中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
while (in.isReadable()) {
// 記錄一下字節(jié)容器中有多少字節(jié)待拆
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
if (out.size() == 0) {
// 拆包器未讀取任何數(shù)據(jù)
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 拆包器已讀取部分數(shù)據(jù)氛赐,還需要繼續(xù)
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
}
我將原始代碼做了一些精簡魂爪,在解碼之前,先記錄一下字節(jié)容器中有多少字節(jié)待拆鹰祸,然后調(diào)用抽象函數(shù) decode
進行拆包
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
netty中對各種用戶協(xié)議的支持就體現(xiàn)在這個抽象函數(shù)中甫窟,傳進去的是當前讀取到的未被消費的所有的數(shù)據(jù),以及業(yè)務協(xié)議包容器蛙婴,所有的拆包器最終都實現(xiàn)了該抽象方法
業(yè)務拆包完成之后街图,如果發(fā)現(xiàn)并沒有拆到一個完整的數(shù)據(jù)包浇衬,這個時候又分兩種情況
1.一個是拆包器什么數(shù)據(jù)也沒讀取餐济,可能數(shù)據(jù)還不夠業(yè)務拆包器處理耘擂,直接break等待新的數(shù)據(jù)
2.拆包器已讀取部分數(shù)據(jù),說明解碼器仍然在工作醉冤,繼續(xù)解碼
業(yè)務拆包完成之后,如果發(fā)現(xiàn)已經(jīng)解到了數(shù)據(jù)包,但是靖苇,發(fā)現(xiàn)并沒有讀取任何數(shù)據(jù),這個時候就會拋出一個Runtime異常 DecoderException
绰上,告訴你百揭,你什么數(shù)據(jù)都沒讀取请毛,卻解析出一個業(yè)務數(shù)據(jù)包鳍征,這是有問題的
3 清理字節(jié)容器
業(yè)務拆包完成之后,只是從字節(jié)容器中取走了數(shù)據(jù)铛嘱,但是這部分空間對于字節(jié)容器來說依然保留著橄杨,而字節(jié)容器每次累加字節(jié)數(shù)據(jù)的時候都是將字節(jié)數(shù)據(jù)追加到尾部,如果不對字節(jié)容器做清理乾忱,那么時間一長就會OOM
正常情況下讥珍,其實每次讀取完數(shù)據(jù),netty都會在下面這個方法中將字節(jié)容器清理窄瘟,只不過衷佃,當發(fā)送端發(fā)送數(shù)據(jù)過快,channelReadComplete
可能會很久才被調(diào)用一次
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
這里順帶插一句蹄葱,如果一次數(shù)據(jù)讀取完畢之后(可能接收端一邊收氏义,發(fā)送端一邊發(fā),這里的讀取完畢指的是接收端在某個時間不再接受到數(shù)據(jù)為止)图云,發(fā)現(xiàn)仍然沒有拆到一個完整的用戶數(shù)據(jù)包惯悠,即使該channel的設置為非自動讀取,也會觸發(fā)一次讀取操作 ctx.read()
竣况,該操作會重新向selector注冊op_read事件克婶,以便于下一次能讀到數(shù)據(jù)之后拼接成一個完整的數(shù)據(jù)包
所以為了防止發(fā)送端發(fā)送數(shù)據(jù)過快,netty會在每次讀取到一次數(shù)據(jù)丹泉,業(yè)務拆包之后對字節(jié)字節(jié)容器做清理情萤,清理部分的代碼如下
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
如果字節(jié)容器當前已無數(shù)據(jù)可讀取,直接銷毀字節(jié)容器摹恨,并且標注一下當前字節(jié)容器一次數(shù)據(jù)也沒讀取
如果連續(xù)16次(discardAfterReads
的默認值)筋岛,字節(jié)容器中仍然有未被業(yè)務拆包器讀取的數(shù)據(jù),那就做一次壓縮晒哄,有效數(shù)據(jù)段整體移到容器首部
discardSomeReadBytes之前睁宰,字節(jié)累加器中的數(shù)據(jù)分布
+--------------+----------+----------+
| readed | unreaded | writable |
+--------------+----------+----------+
discardSomeReadBytes之后,字節(jié)容器中的數(shù)據(jù)分布
+----------+-------------------------+
| unreaded | writable |
+----------+-------------------------+
這樣字節(jié)容器又可以承載更多的數(shù)據(jù)了
4 傳遞業(yè)務數(shù)據(jù)包給業(yè)務解碼器處理
以上三個步驟完成之后寝凌,就可以將拆成的包丟到業(yè)務解碼器處理了柒傻,代碼如下
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
期間用一個成員變量 decodeWasNull
來標識本次讀取數(shù)據(jù)是否拆到一個業(yè)務數(shù)據(jù)包,然后調(diào)用 fireChannelRead
將拆到的業(yè)務數(shù)據(jù)包都傳遞到后續(xù)的handler
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
這樣较木,就可以把一個個完整的業(yè)務數(shù)據(jù)包傳遞到后續(xù)的業(yè)務解碼器進行解碼诅愚,隨后處理業(yè)務邏輯
行拆包器
下面,以一個具體的例子來看看業(yè)netty自帶的拆包器是如何來拆包的
這個類叫做 LineBasedFrameDecoder
,基于行分隔符的拆包器违孝,TA可以同時處理 \n
以及\r\n
兩種類型的行分隔符,核心方法都在繼承的 decode
方法中
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
netty 中自帶的拆包器都是如上這種模板泳赋,其實可以加一層雌桑,把這這層模板抽取出來的,不知道為什么netty沒有這么做祖今,我們接著跟進去校坑,代碼比較長,我們還是分模塊來剖析
1 找到換行符位置
final int eol = findEndOfLine(buffer);
private static int findEndOfLine(final ByteBuf buffer) {
int i = buffer.forEachByte(ByteProcessor.FIND_LF);
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
return i;
}
ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');
for循環(huán)遍歷千诬,找到第一個 \n
的位置,如果\n
前面的字符為\r
耍目,那就返回\r
的位置
2 非discarding模式的處理
接下來,netty會判斷徐绑,當前拆包是否屬于丟棄模式邪驮,用一個成員變量來標識
private boolean discarding;
第一次拆包不在discarding模式( 后面的分支會講何為非discarding模式),于是進入以下環(huán)節(jié)
2.1 非discarding模式下找到行分隔符的處理
// 1.計算分隔符和包長度
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 丟棄異常數(shù)據(jù)
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
// 取包的時候是否包括分隔符
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
1.首先傲茄,新建一個幀毅访,計算一下當前包的長度和分隔符的長度(因為有兩種分隔符)
2.然后判斷一下需要拆包的長度是否大于該拆包器允許的最大長度(maxLength
),這個參數(shù)在構(gòu)造函數(shù)中被傳遞進來盘榨,如超出允許的最大長度喻粹,就將這段數(shù)據(jù)拋棄,返回null
3.最后草巡,將一個完整的數(shù)據(jù)包取出守呜,如果構(gòu)造本解包器的時候指定 stripDelimiter
為false,即解析出來的包包含分隔符山憨,默認為不包含分隔符
2.2 非discarding模式下未找到分隔符的處理
沒有找到對應的行分隔符查乒,說明字節(jié)容器沒有足夠的數(shù)據(jù)拼接成一個完整的業(yè)務數(shù)據(jù)包,進入如下流程處理
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
首先取得當前字節(jié)容器的可讀字節(jié)個數(shù)萍歉,接著侣颂,判斷一下是否已經(jīng)超過可允許的最大長度,如果沒有超過枪孩,直接返回null憔晒,字節(jié)容器中的數(shù)據(jù)沒有任何改變,否則蔑舞,就需要進入丟棄模式
使用一個成員變量 discardedBytes
來表示已經(jīng)丟棄了多少數(shù)據(jù)拒担,然后將字節(jié)容器的讀指針移到寫指針,意味著丟棄這一部分數(shù)據(jù)攻询,設置成員變量discarding
為true表示當前處于丟棄模式从撼。如果設置了failFast
,那么直接拋出異常钧栖,默認情況下failFast
為false低零,即安靜得丟棄數(shù)據(jù)
3 discarding模式
如果解包的時候處在discarding模式婆翔,也會有兩種情況發(fā)生
3.1 discarding模式下找到行分隔符
在discarding模式下,如果找到分隔符掏婶,那可以將分隔符之前的都丟棄掉
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
計算出分隔符的長度之后啃奴,直接把分隔符之前的數(shù)據(jù)全部丟棄,當然丟棄的字符也包括分隔符雄妥,經(jīng)過這么一次丟棄最蕾,后面就有可能是正常的數(shù)據(jù)包,下一次解包的時候就會進入正常的解包流程
3.2discarding模式下未找到行分隔符
這種情況比較簡單老厌,因為當前還在丟棄模式瘟则,沒有找到行分隔符意味著當前一個完整的數(shù)據(jù)包還沒丟棄完,當前讀取的數(shù)據(jù)是丟棄的一部分枝秤,所以直接丟棄
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
特定分隔符拆包
這個類叫做 DelimiterBasedFrameDecoder
醋拧,可以傳遞給TA一個分隔符列表,數(shù)據(jù)包會按照分隔符列表進行拆分宿百,讀者可以完全根據(jù)行拆包器的思路去分析這個DelimiterBasedFrameDecoder
趁仙,這里不在贅述,有問題可以留言
總結(jié)
netty中的拆包過程其實是和你自己去拆包過程一樣垦页,只不過TA將拆包過程中邏輯比較獨立的部分抽象出來變成幾個不同層次的類雀费,方便各種協(xié)議的擴展,我們平時在寫代碼過程中痊焊,也必須培養(yǎng)這種抽象能力盏袄,這樣你的coding水平才會不斷提高,完薄啥。
如果你覺得看的不過癮辕羽,想系統(tǒng)學習Netty原理,那么你一定不要錯過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html