Netty 半包,粘包處理

基于TCP協(xié)議處理網(wǎng)絡(luò)數(shù)據(jù)經(jīng)常面對半包和粘包問題东亦,那么什么是半包問題杏节,什么是粘包問題呢?應(yīng)用層消息在被發(fā)送到網(wǎng)絡(luò)之前會經(jīng)過TCP/IP協(xié)議棧的包裝典阵,每一層協(xié)議都有自己的長度限制奋渔,如果用戶發(fā)送的消息大于ip層的長度限制那么這個消息就會被分割成多個ip數(shù)包發(fā)送出去。發(fā)送端發(fā)送的消息會被先放到發(fā)送緩存中壮啊,這樣接受端接受到的二進制數(shù)據(jù)可能包含多個消息嫉鲸,上面的情況就會導(dǎo)致以下兩種情況的出現(xiàn)

1)半包問題:接收端接受到的數(shù)據(jù)只是客戶端發(fā)送的消息的一部分。如果我們發(fā)送的消息比較大歹啼,那么這條消息就有可能會被分割成不同的ip數(shù)據(jù)報玄渗,那么接受端一次接受到的數(shù)據(jù)可能只是原始數(shù)據(jù)的一部分

2)粘包問題:接受端接受到的數(shù)據(jù)包含多條消息。發(fā)送端一次發(fā)送了多條數(shù)據(jù)狸眼,接受端無法從讀取到的數(shù)據(jù)中去解析出發(fā)送端發(fā)送的消息

針對半包和粘包的問題藤树,我們有以下一些通用的解決方案
1)使用消息分割符來標識出每個消息在二進制數(shù)據(jù)流中的截止位置
2)在每個消息的頭部標識出消息的長度
3)發(fā)送消息的長度是固定值

上面這些解決方案對于數(shù)據(jù)接收端來說都需要一個字節(jié)累積器,讀取到的字節(jié)數(shù)據(jù)會在字節(jié)累積器累積直到能根據(jù)字節(jié)累積器中累積的字節(jié)解析出一條消息拓萌,從字節(jié)累積器解析出這條消息后也榄,把這條消息對應(yīng)的二進制數(shù)據(jù)從字節(jié)累積器中清除然后繼續(xù)解析下一條消息。
在netty中接受端從二進制流中取得消息的過程叫做解碼司志,實現(xiàn)解碼功能的類叫做解碼器,上面三種半包/粘包的解決方案netty都提供了相應(yīng)的編解碼器去實現(xiàn)降宅,我們分析第二種方案在netty中對應(yīng)的解碼器實現(xiàn)骂远。

LengthFieldBasedFrameDecoder

有類消息在消息中用固定長度的字節(jié)來存儲消息體的長度,對于這類消息netty定義了LengthFieldBasedFrameDecoder來解碼它們

ByteToMessageDecoder

二進制數(shù)據(jù)流解碼成具體消息的基類腰根,它是LengthFieldBasedFrameDecoder的父類激才,同時ByteToMessageDecoder繼承了ChannelInboundHandlerAdapter
當netty從網(wǎng)絡(luò)IO中讀取到一份數(shù)據(jù)的時候會觸發(fā)ByteToMessageDecoder.channelRead 方法

ByteToMessageDecoder.channelRead

我們分析下ByteToMessageDecoder.channelRead的方法

   //msg 就是通過網(wǎng)絡(luò)IO讀取到的數(shù)據(jù)
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //ByteToMessageDecoder 處理的數(shù)據(jù)類型是ByteBuf
        if (msg instanceof ByteBuf) {
         //CodecOutputList是用來存放將來解析出來的消息
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                //cumulation上面有說到的字節(jié)累積器,它是一個ByteBuf
                //cumulator是字節(jié)累積算法實現(xiàn)類额嘿,在ByteToMessageDecoder中它的實現(xiàn)是MERGE_CUMULATOR
                //cumulator.cumulate把讀取到的msg寫入到cumulation中
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
              //callDecode 就是把字節(jié)累積器交給具體的解碼器實現(xiàn)去解碼
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
       
                if (cumulation != null && !cumulation.isReadable()) {
                    //如果cumulation中的數(shù)據(jù)讀完了瘸恼,那么把字節(jié)累積器給釋放了
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    //如果連續(xù)讀取的次數(shù)大于等于discardAfterReads(默認是16),那么執(zhí)行discardSomeReadBytes
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }
                //解析到的消息數(shù)量
                int size = out.size();
                firedChannelRead |= out.insertSinceRecycled();
               //使用解析到的消息觸發(fā)channelRead事件
                fireChannelRead(ctx, out, size);
               //回收CodecOutputList對象
                out.recycle();
            }
        } else {
           //如果msg不是ByteBuf類型册养,把數(shù)據(jù)交給pipeline鏈上的下一個handler處理
            ctx.fireChannelRead(msg);
        }
    }

我分析下讀取到的ByteBuf中的字節(jié)是如何被添加到字節(jié)累積器cumulation中的

        //cumulation是一個ByteBuf东帅,alloc用來分配它所需要的空間的,
        //in 本次讀取到的ByteBuf球拦,讀取到的數(shù)據(jù)需要添加到cumulation中
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            if (!cumulation.isReadable() && in.isContiguous()) {
                // If cumulation is empty and input buffer is contiguous, use it directly
              //如果cumulation是初始默認的EMPTY_BUFFER靠闭,那么直接返回in作為cumulation帐我,同時把EMPTY_BUFFER釋放
                cumulation.release();
                return in;
            }
            try {
                //獲取本次網(wǎng)絡(luò)IO讀取到消息的大小
                final int required = in.readableBytes();
                if (required > cumulation.maxWritableBytes() ||
                        (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                        cumulation.isReadOnly()) {
                    // Expand cumulation (by replacing it) under the following conditions:
                    // - cumulation cannot be resized to accommodate the additional data
                    // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                    //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                   //如果cumulation中的可用空間小于required的大小,那么需要給cumulation擴容
                   //根據(jù)cumulation目前存儲的數(shù)據(jù)和新來數(shù)據(jù)的總和去向alloc申請一個新的ByteBuf愧膀,
                  //cumulation中的數(shù)據(jù)會被搬移到新申請的ByteBuf中拦键,然后把老的cumulation釋放掉,新申請的ByteBuf會被賦值給cumulation
                    return expandCumulation(alloc, cumulation, in);
                }
             //把新讀取到的數(shù)據(jù)寫入到cumulation中
                cumulation.writeBytes(in, in.readerIndex(), required);
                in.readerIndex(in.writerIndex());
                return cumulation;
            } finally {
                // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
                // for whatever release (for example because of OutOfMemoryError)
                //釋放in
                in.release();
            }
        }
    }

上面介紹了最新讀取到的ByteBuf是如何被添加到字節(jié)累積器中的檩淋,那么如何從字節(jié)累積器中解碼出消息呢芬为?
callDecode是解碼的入口方法

callDecode
// in 就是字節(jié)累積器,out用來存放解析出來的消息
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            //一直不斷的從字節(jié)累積器中解碼消息蟀悦,直到字節(jié)累積器中沒有字節(jié)可讀了
            while (in.isReadable()) {
                int outSize = out.size();
                
                if (outSize > 0) {
                    //如果解碼出了消息那么觸發(fā)channelRead事件
                    fireChannelRead(ctx, out, outSize);
                    //觸發(fā)完ChannelRead事件后媚朦,清空消息容器
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                   //如果本解碼器對應(yīng)的context從pipeline上刪除了,那么解碼停止
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
               //獲取字節(jié)累積器中已經(jīng)存放的數(shù)據(jù)大小
                int oldInputLength = in.readableBytes();
              //解碼器解碼方法decode在這個地方中調(diào)用
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                       //字節(jié)累積器存儲的字節(jié)在上次解碼后沒有任何字節(jié)被解碼那么退出解碼過程
                        break;
                    } else {
                        continue;
                    }
                }
             
                if (oldInputLength == in.readableBytes()) {
                    //代碼執(zhí)行到這里說明從字節(jié)累積器中解析出了消息熬芜,但是字節(jié)累積器中沒有任何數(shù)據(jù)別解碼所以報錯
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

繼續(xù)看調(diào)用鏈上的decodeRemovalReentryProtection方法

 final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
       //初始化解碼狀態(tài)為STATE_CALLING_CHILD_DECODE表示正在解碼
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            //調(diào)用解碼實現(xiàn)方法decode
            decode(ctx, in, out);
        } finally {
           //如果在解碼進行中的時候莲镣,解碼handler對應(yīng)的context在pipeline上被刪除了,那么decodeState在handlerRemoved方法中被設(shè)置為STATE_HANDLER_REMOVED_PENDING
          //那么在這種情況下removePending就會是true
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
               //如果removePending為true涎拉,那么觸發(fā)ChannelRead事件
                fireChannelRead(ctx, out, out.size());
                out.clear();
                handlerRemoved(ctx);
            }
        }
    }

進入解碼的核心方法decode瑞侮,我們以LengthFieldBasedFrameDecoder.decode來分析

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
       //通過decode方法解析出消息
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            //如果消息不是null,添加到消息容器中
            out.add(decoded);
        }
    }


//解碼具體實現(xiàn)方法
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
       //對于LengthFieldBasedFrameDecoder來說我們會設(shè)置單個消息的最大長度
       //如果從標識消息體長度的字段中解析出的真實長度值大于設(shè)置的最大長度鼓拧,那么需要丟棄這個消息
        if (discardingTooLongFrame) {
            discardingTooLongFrame(in);
        }
        //如果字節(jié)累積器中存儲的字節(jié)長度小于設(shè)置的消息體大小標識字段占用的字節(jié)長度半火,那么直接返回null
        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }
        //我們在LengthFieldBasedFrameDecoder還可以設(shè)置一個lengthFieldOffset參數(shù)用來表示消息大小標識字段在整個消息中的偏移量
        //算出消息大小標識字段在字節(jié)累積器中的開始位置
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
          //lengthFieldLength 表示消息大小標識字段占用多少字節(jié)
          //getUnadjustedFrameLength獲取消息體的長度
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
     
        if (frameLength < 0) {
            failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
        }
  
      //lengthFieldEndOffset = lengthFieldOffset+lengthFieldLength
      //lengthAdjustment是調(diào)整這個消息長度的一個值,不同的消息轉(zhuǎn)換成字節(jié)數(shù)組的時候有不同的策略季俩,
      //有的會把頭部轉(zhuǎn)化成字節(jié)數(shù)據(jù)發(fā)送給客戶端钮糖,有一些則不會,但是消息長度字段中有的會把頭部信息的長度算在總長度中酌住,有些又不會
     //那么這個時候就需要開發(fā)者根據(jù)實際的情況通過設(shè)置lengthAdjustment來調(diào)和這種情況
     //根據(jù)lengthAdjustment和lengthFieldEndOffset計算出frameLength的大小
        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }
       //如果frameLength大于maxFrameLength店归,需要丟棄這個消息
        if (frameLength > maxFrameLength) {
            exceededFrameLength(in, frameLength);
            return null;
        }

        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        //如果字節(jié)累加器中的數(shù)據(jù)小于frameLengthInt那么這次不做消息解碼,因為這個時候字節(jié)累積器中存儲的數(shù)據(jù)不足以解碼出一條消息
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }
        //initialBytesToStrip是開發(fā)者設(shè)置的在讀取一個消息之前酪我,先跳過字節(jié)累加器中的前initialBytesToStrip個字節(jié)消痛,這個也是和消息轉(zhuǎn)換成字節(jié)的不同方式有關(guān)
        if (initialBytesToStrip > frameLengthInt) {
            failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);

        // extract frame
        int readerIndex = in.readerIndex();
       //計算出消息體真正的長度
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
       //從字節(jié)累積器中解碼出一個消息
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        //更新字節(jié)累積器下次讀取數(shù)據(jù)的開始位置
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

上面就是LengthFieldBasedFrameDecoder解碼的過程。
當LengthFieldBasedFrameDecoder讀取的消息長度大于設(shè)置的maxLength都哭,LengthFieldBasedFrameDecoder會把這個消息丟棄秩伞,我解析下這個過程的細節(jié)

exceededFrameLength

 private void exceededFrameLength(ByteBuf in, long frameLength) {
        //discard等于需要丟棄的數(shù)據(jù)量的減去現(xiàn)在字節(jié)累積器中存的數(shù)據(jù)量。
        long discard = frameLength - in.readableBytes();
        //tooLongFrameLength設(shè)置為frameLength欺矫,等下解析discardingTooLongFrame方法會用到這個參數(shù)
        tooLongFrameLength = frameLength;

        if (discard < 0) {
            //如果discard <0表示字節(jié)累積器中包含了所有需要丟棄的長度為frameLength的數(shù)據(jù)纱新,直接通過ByteBuf.skip去丟棄這些數(shù)據(jù)
            // buffer contains more bytes then the frameLength so we can discard all now
            in.skipBytes((int) frameLength);
        } else {
            // Enter the discard mode and discard everything received so far.
         //若discard大于0表示字節(jié)累積器中存的數(shù)據(jù)只是丟棄數(shù)據(jù)的一部分
         //還有一部分需要丟棄的數(shù)據(jù)可能還在網(wǎng)絡(luò)上傳輸,字節(jié)累積器還沒收集到 
   
           //設(shè)置discardingTooLongFrame為true穆趴,那么下次執(zhí)行decode方法的時候就會根據(jù)這個屬性判斷是不是需要繼續(xù)從字節(jié)累積器中丟棄數(shù)據(jù)
            discardingTooLongFrame = true;
         //bytesToDiscard記錄下次decode執(zhí)行的時候需要繼續(xù)從字節(jié)累積器中丟棄多少字節(jié)數(shù)據(jù)
            bytesToDiscard = discard;
           //字節(jié)累積器把自己目前存的數(shù)據(jù)全部丟棄
            in.skipBytes(in.readableBytes());
        }
        failIfNecessary(true);
    }

decode方法第一行代碼就是判斷discardingTooLongFrame==true脸爱,通過上面exceededFrameLength方法的分析我們知道在開始執(zhí)行解碼前我們可能需要從字節(jié)累積器中先丟棄一些數(shù)據(jù)

discardingTooLongFrame
 private void discardingTooLongFrame(ByteBuf in) {
        long bytesToDiscard = this.bytesToDiscard;
        //獲得本次從字節(jié)累積器中需要丟棄的字節(jié)數(shù)
        int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
       //繼續(xù)丟棄localBytesToDiscard個字節(jié)的數(shù)據(jù)
        in.skipBytes(localBytesToDiscard);
        bytesToDiscard -= localBytesToDiscard;
         //更新bytesToDiscard
        this.bytesToDiscard = bytesToDiscard;
        failIfNecessary(false);
    }

繼續(xù)分析failIfNecessary方法

private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {

        if (bytesToDiscard == 0) {
            //bytesToDiscard等于0表示需要丟棄的數(shù)據(jù)都已經(jīng)被丟棄了
            // Reset to the initial state and tell the handlers that
            // the frame was too large.
            long tooLongFrameLength = this.tooLongFrameLength;
           //重置tooLongFrameLength和discardingTooLongFrame狀態(tài)
            this.tooLongFrameLength = 0;
            discardingTooLongFrame = false;
            if (!failFast || firstDetectionOfTooLongFrame) {
                fail(tooLongFrameLength);
            }
        } else {
            // Keep discarding and notify handlers if necessary.
            if (failFast && firstDetectionOfTooLongFrame) {
                //若滿足條件拋出異常,會觸發(fā)handler的exceptionCaught方法
                fail(tooLongFrameLength);
            }
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末未妹,一起剝皮案震驚了整個濱河市阅羹,隨后出現(xiàn)的幾起案子勺疼,更是在濱河造成了極大的恐慌,老刑警劉巖捏鱼,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件执庐,死亡現(xiàn)場離奇詭異,居然都是意外死亡导梆,警方通過查閱死者的電腦和手機轨淌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來看尼,“玉大人递鹉,你說我怎么就攤上這事〔卣叮” “怎么了躏结?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長狰域。 經(jīng)常有香客問我媳拴,道長,這世上最難降的妖魔是什么兆览? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任屈溉,我火速辦了婚禮,結(jié)果婚禮上抬探,老公的妹妹穿的比我還像新娘子巾。我一直安慰自己,他們只是感情好小压,可當我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布线梗。 她就那樣靜靜地躺著,像睡著了一般怠益。 火紅的嫁衣襯著肌膚如雪仪搔。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天溉痢,我揣著相機與錄音,去河邊找鬼憋他。 笑死孩饼,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的竹挡。 我是一名探鬼主播镀娶,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼揪罕!你這毒婦竟也來了梯码?” 一聲冷哼從身側(cè)響起宝泵,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎轩娶,沒想到半個月后儿奶,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡鳄抒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棒口。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡保屯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出贤重,到底是詐尸還是另有隱情茬祷,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布并蝗,位于F島的核電站祭犯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏借卧。R本人自食惡果不足惜盹憎,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望铐刘。 院中可真熱鬧陪每,春花似錦、人聲如沸镰吵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽疤祭。三九已至盼产,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間勺馆,已是汗流浹背戏售。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留草穆,地道東北人灌灾。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像悲柱,于是被迫代替她去往敵國和親锋喜。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,933評論 2 355