基于TCP協(xié)議處理網(wǎng)絡(luò)數(shù)據(jù)經(jīng)常面對半包和粘包問題东亦,那么什么是半包問題杏节,什么是粘包問題呢?應(yīng)用層消息在被發(fā)送到網(wǎng)絡(luò)之前會經(jīng)過TCP/IP協(xié)議棧的包裝典阵,每一層協(xié)議都有自己的長度限制奋渔,如果用戶發(fā)送的消息大于ip層的長度限制那么這個消息就會被分割成多個ip數(shù)包發(fā)送出去。發(fā)送端發(fā)送的消息會被先放到發(fā)送緩存中壮啊,這樣接受端接受到的二進制數(shù)據(jù)可能包含多個消息嫉鲸,上面的情況就會導(dǎo)致以下兩種情況的出現(xiàn)
1)半包問題:接收端接受到的數(shù)據(jù)只是客戶端發(fā)送的消息的一部分。如果我們發(fā)送的消息比較大歹啼,那么這條消息就有可能會被分割成不同的ip數(shù)據(jù)報玄渗,那么接受端一次接受到的數(shù)據(jù)可能只是原始數(shù)據(jù)的一部分
2)粘包問題:接受端接受到的數(shù)據(jù)包含多條消息。發(fā)送端一次發(fā)送了多條數(shù)據(jù)狸眼,接受端無法從讀取到的數(shù)據(jù)中去解析出發(fā)送端發(fā)送的消息
針對半包和粘包的問題藤树,我們有以下一些通用的解決方案
1)使用消息分割符來標識出每個消息在二進制數(shù)據(jù)流中的截止位置
2)在每個消息的頭部標識出消息的長度
3)發(fā)送消息的長度是固定值
上面這些解決方案對于數(shù)據(jù)接收端來說都需要一個字節(jié)累積器,讀取到的字節(jié)數(shù)據(jù)會在字節(jié)累積器累積直到能根據(jù)字節(jié)累積器中累積的字節(jié)解析出一條消息拓萌,從字節(jié)累積器解析出這條消息后也榄,把這條消息對應(yīng)的二進制數(shù)據(jù)從字節(jié)累積器中清除然后繼續(xù)解析下一條消息。
在netty中接受端從二進制流中取得消息的過程叫做解碼司志,實現(xiàn)解碼功能的類叫做解碼器,上面三種半包/粘包的解決方案netty都提供了相應(yīng)的編解碼器去實現(xiàn)降宅,我們分析第二種方案在netty中對應(yīng)的解碼器實現(xiàn)骂远。
LengthFieldBasedFrameDecoder
有類消息在消息中用固定長度的字節(jié)來存儲消息體的長度,對于這類消息netty定義了LengthFieldBasedFrameDecoder來解碼它們
ByteToMessageDecoder
二進制數(shù)據(jù)流解碼成具體消息的基類腰根,它是LengthFieldBasedFrameDecoder的父類激才,同時ByteToMessageDecoder繼承了ChannelInboundHandlerAdapter
當netty從網(wǎng)絡(luò)IO中讀取到一份數(shù)據(jù)的時候會觸發(fā)ByteToMessageDecoder.channelRead 方法
ByteToMessageDecoder.channelRead
我們分析下ByteToMessageDecoder.channelRead的方法
//msg 就是通過網(wǎng)絡(luò)IO讀取到的數(shù)據(jù)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//ByteToMessageDecoder 處理的數(shù)據(jù)類型是ByteBuf
if (msg instanceof ByteBuf) {
//CodecOutputList是用來存放將來解析出來的消息
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
//cumulation上面有說到的字節(jié)累積器,它是一個ByteBuf
//cumulator是字節(jié)累積算法實現(xiàn)類额嘿,在ByteToMessageDecoder中它的實現(xiàn)是MERGE_CUMULATOR
//cumulator.cumulate把讀取到的msg寫入到cumulation中
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
//callDecode 就是把字節(jié)累積器交給具體的解碼器實現(xiàn)去解碼
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
//如果cumulation中的數(shù)據(jù)讀完了瘸恼,那么把字節(jié)累積器給釋放了
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
//如果連續(xù)讀取的次數(shù)大于等于discardAfterReads(默認是16),那么執(zhí)行discardSomeReadBytes
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
//解析到的消息數(shù)量
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
//使用解析到的消息觸發(fā)channelRead事件
fireChannelRead(ctx, out, size);
//回收CodecOutputList對象
out.recycle();
}
} else {
//如果msg不是ByteBuf類型册养,把數(shù)據(jù)交給pipeline鏈上的下一個handler處理
ctx.fireChannelRead(msg);
}
}
我分析下讀取到的ByteBuf中的字節(jié)是如何被添加到字節(jié)累積器cumulation中的
//cumulation是一個ByteBuf东帅,alloc用來分配它所需要的空間的,
//in 本次讀取到的ByteBuf球拦,讀取到的數(shù)據(jù)需要添加到cumulation中
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
//如果cumulation是初始默認的EMPTY_BUFFER靠闭,那么直接返回in作為cumulation帐我,同時把EMPTY_BUFFER釋放
cumulation.release();
return in;
}
try {
//獲取本次網(wǎng)絡(luò)IO讀取到消息的大小
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
//如果cumulation中的可用空間小于required的大小,那么需要給cumulation擴容
//根據(jù)cumulation目前存儲的數(shù)據(jù)和新來數(shù)據(jù)的總和去向alloc申請一個新的ByteBuf愧膀,
//cumulation中的數(shù)據(jù)會被搬移到新申請的ByteBuf中拦键,然后把老的cumulation釋放掉,新申請的ByteBuf會被賦值給cumulation
return expandCumulation(alloc, cumulation, in);
}
//把新讀取到的數(shù)據(jù)寫入到cumulation中
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
//釋放in
in.release();
}
}
}
上面介紹了最新讀取到的ByteBuf是如何被添加到字節(jié)累積器中的檩淋,那么如何從字節(jié)累積器中解碼出消息呢芬为?
callDecode是解碼的入口方法
callDecode
// in 就是字節(jié)累積器,out用來存放解析出來的消息
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//一直不斷的從字節(jié)累積器中解碼消息蟀悦,直到字節(jié)累積器中沒有字節(jié)可讀了
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
//如果解碼出了消息那么觸發(fā)channelRead事件
fireChannelRead(ctx, out, outSize);
//觸發(fā)完ChannelRead事件后媚朦,清空消息容器
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
//如果本解碼器對應(yīng)的context從pipeline上刪除了,那么解碼停止
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//獲取字節(jié)累積器中已經(jīng)存放的數(shù)據(jù)大小
int oldInputLength = in.readableBytes();
//解碼器解碼方法decode在這個地方中調(diào)用
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
//字節(jié)累積器存儲的字節(jié)在上次解碼后沒有任何字節(jié)被解碼那么退出解碼過程
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
//代碼執(zhí)行到這里說明從字節(jié)累積器中解析出了消息熬芜,但是字節(jié)累積器中沒有任何數(shù)據(jù)別解碼所以報錯
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
繼續(xù)看調(diào)用鏈上的decodeRemovalReentryProtection方法
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
//初始化解碼狀態(tài)為STATE_CALLING_CHILD_DECODE表示正在解碼
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//調(diào)用解碼實現(xiàn)方法decode
decode(ctx, in, out);
} finally {
//如果在解碼進行中的時候莲镣,解碼handler對應(yīng)的context在pipeline上被刪除了,那么decodeState在handlerRemoved方法中被設(shè)置為STATE_HANDLER_REMOVED_PENDING
//那么在這種情況下removePending就會是true
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
//如果removePending為true涎拉,那么觸發(fā)ChannelRead事件
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
進入解碼的核心方法decode瑞侮,我們以LengthFieldBasedFrameDecoder.decode來分析
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//通過decode方法解析出消息
Object decoded = decode(ctx, in);
if (decoded != null) {
//如果消息不是null,添加到消息容器中
out.add(decoded);
}
}
//解碼具體實現(xiàn)方法
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//對于LengthFieldBasedFrameDecoder來說我們會設(shè)置單個消息的最大長度
//如果從標識消息體長度的字段中解析出的真實長度值大于設(shè)置的最大長度鼓拧,那么需要丟棄這個消息
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
//如果字節(jié)累積器中存儲的字節(jié)長度小于設(shè)置的消息體大小標識字段占用的字節(jié)長度半火,那么直接返回null
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
//我們在LengthFieldBasedFrameDecoder還可以設(shè)置一個lengthFieldOffset參數(shù)用來表示消息大小標識字段在整個消息中的偏移量
//算出消息大小標識字段在字節(jié)累積器中的開始位置
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//lengthFieldLength 表示消息大小標識字段占用多少字節(jié)
//getUnadjustedFrameLength獲取消息體的長度
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
//lengthFieldEndOffset = lengthFieldOffset+lengthFieldLength
//lengthAdjustment是調(diào)整這個消息長度的一個值,不同的消息轉(zhuǎn)換成字節(jié)數(shù)組的時候有不同的策略季俩,
//有的會把頭部轉(zhuǎn)化成字節(jié)數(shù)據(jù)發(fā)送給客戶端钮糖,有一些則不會,但是消息長度字段中有的會把頭部信息的長度算在總長度中酌住,有些又不會
//那么這個時候就需要開發(fā)者根據(jù)實際的情況通過設(shè)置lengthAdjustment來調(diào)和這種情況
//根據(jù)lengthAdjustment和lengthFieldEndOffset計算出frameLength的大小
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
//如果frameLength大于maxFrameLength店归,需要丟棄這個消息
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
//如果字節(jié)累加器中的數(shù)據(jù)小于frameLengthInt那么這次不做消息解碼,因為這個時候字節(jié)累積器中存儲的數(shù)據(jù)不足以解碼出一條消息
if (in.readableBytes() < frameLengthInt) {
return null;
}
//initialBytesToStrip是開發(fā)者設(shè)置的在讀取一個消息之前酪我,先跳過字節(jié)累加器中的前initialBytesToStrip個字節(jié)消痛,這個也是和消息轉(zhuǎn)換成字節(jié)的不同方式有關(guān)
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
//計算出消息體真正的長度
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//從字節(jié)累積器中解碼出一個消息
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
//更新字節(jié)累積器下次讀取數(shù)據(jù)的開始位置
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
上面就是LengthFieldBasedFrameDecoder解碼的過程。
當LengthFieldBasedFrameDecoder讀取的消息長度大于設(shè)置的maxLength都哭,LengthFieldBasedFrameDecoder會把這個消息丟棄秩伞,我解析下這個過程的細節(jié)
exceededFrameLength
private void exceededFrameLength(ByteBuf in, long frameLength) {
//discard等于需要丟棄的數(shù)據(jù)量的減去現(xiàn)在字節(jié)累積器中存的數(shù)據(jù)量。
long discard = frameLength - in.readableBytes();
//tooLongFrameLength設(shè)置為frameLength欺矫,等下解析discardingTooLongFrame方法會用到這個參數(shù)
tooLongFrameLength = frameLength;
if (discard < 0) {
//如果discard <0表示字節(jié)累積器中包含了所有需要丟棄的長度為frameLength的數(shù)據(jù)纱新,直接通過ByteBuf.skip去丟棄這些數(shù)據(jù)
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
//若discard大于0表示字節(jié)累積器中存的數(shù)據(jù)只是丟棄數(shù)據(jù)的一部分
//還有一部分需要丟棄的數(shù)據(jù)可能還在網(wǎng)絡(luò)上傳輸,字節(jié)累積器還沒收集到
//設(shè)置discardingTooLongFrame為true穆趴,那么下次執(zhí)行decode方法的時候就會根據(jù)這個屬性判斷是不是需要繼續(xù)從字節(jié)累積器中丟棄數(shù)據(jù)
discardingTooLongFrame = true;
//bytesToDiscard記錄下次decode執(zhí)行的時候需要繼續(xù)從字節(jié)累積器中丟棄多少字節(jié)數(shù)據(jù)
bytesToDiscard = discard;
//字節(jié)累積器把自己目前存的數(shù)據(jù)全部丟棄
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
}
decode方法第一行代碼就是判斷discardingTooLongFrame==true脸爱,通過上面exceededFrameLength方法的分析我們知道在開始執(zhí)行解碼前我們可能需要從字節(jié)累積器中先丟棄一些數(shù)據(jù)
discardingTooLongFrame
private void discardingTooLongFrame(ByteBuf in) {
long bytesToDiscard = this.bytesToDiscard;
//獲得本次從字節(jié)累積器中需要丟棄的字節(jié)數(shù)
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
//繼續(xù)丟棄localBytesToDiscard個字節(jié)的數(shù)據(jù)
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
//更新bytesToDiscard
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
繼續(xù)分析failIfNecessary方法
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (bytesToDiscard == 0) {
//bytesToDiscard等于0表示需要丟棄的數(shù)據(jù)都已經(jīng)被丟棄了
// Reset to the initial state and tell the handlers that
// the frame was too large.
long tooLongFrameLength = this.tooLongFrameLength;
//重置tooLongFrameLength和discardingTooLongFrame狀態(tài)
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
if (!failFast || firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
if (failFast && firstDetectionOfTooLongFrame) {
//若滿足條件拋出異常,會觸發(fā)handler的exceptionCaught方法
fail(tooLongFrameLength);
}
}
}