#Netty入門——拆粘包與編解碼(三)

Netty入門——拆粘包與編解碼(三)

回顧一下上一篇文章中MyClientInitializer類中的代碼裸燎。

        //(1)加入拆包器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));  
        //(2)加入粘包器
        pipeline.addLast(new LengthFieldPrepender(4));             
        //字符串解碼 (3)
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        //字符串編碼 (4)
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

上面代碼中用到了Netty的拆包器、粘包器、編解碼器管行。本文會對Netty 是如何拆包進行分析本冲。

TCP的粘包與拆包

在TCP/IP協(xié)議中准脂,數(shù)據(jù)是以二進制流的方式傳播的,數(shù)據(jù)包映射到數(shù)據(jù)鏈路層檬洞、IP 層和 TCP 層分別叫Frame狸膏、Packet、Segment添怔,我們這邊不死磕如何翻譯湾戳,下面都統(tǒng)一用英文表示。

粘包

粘包是TCP在傳輸過程中广料,為了提高有效負載砾脑,把多個數(shù)據(jù)包合并成一個數(shù)據(jù)包發(fā)送的現(xiàn)象。如何理解艾杏?比如10字節(jié)的數(shù)據(jù)韧衣,每次發(fā)送1個字節(jié),需要10次TCP傳輸,10 ACK 確認畅铭。如果合并成一個數(shù)據(jù)包一起發(fā)送氏淑,可以提高有效負載,節(jié)省帶寬(前提是對數(shù)據(jù)實時性要求不高的場景)顶瞒。
但是粘包會引發(fā)語義級別的message識別問題夸政。比如下面這張圖:


image.png

ABC+DEF+GHI分3個message, 也就是3個Frame 發(fā)送出去,接收端收到4個Frame榴徐,不在是原來的3個message 對應的3 個Frame守问。這就是TCP的粘包與半包現(xiàn)象。AB坑资、H耗帕、I的情況是半包,CDEFG的情況是粘包袱贮。雖然順序是和原來一樣仿便,但是分組不再是原來的3個分組,這個時候就需要語義上message識別攒巍,即拆包嗽仪。

拆包

發(fā)送端把4個數(shù)據(jù)包粘成2個就需要接收端把這2個數(shù)據(jù)包拆分成4個。按照如下步驟進行拆包:


image.png

1柒莉、讀取數(shù)據(jù)闻坚,根據(jù)協(xié)議判斷是否可以構成一個完整的包
2、如果能夠構成一個完整的數(shù)據(jù)包兢孝,那么和之前接收到的數(shù)據(jù)一起拼接成一個完整的數(shù)據(jù)包給業(yè)務邏輯層窿凤,多余的數(shù)據(jù)等待下一次的拼接。
3跨蟹、如果不能雳殊,那么繼續(xù)從緩存中讀取數(shù)據(jù)。

那么如何判斷是否是一個完整的包窗轩?
有兩種方式:
方式 1:分隔符夯秃。為人熟知的SMTP、POP3品姓、IMAP寝并、Telnet等等。下圖顯示的是使用“\r\n”分隔符的處理過程腹备。


image.png

圖中的數(shù)字說明:1衬潦、字節(jié)流。2植酥、第一幀镀岛。3弦牡、第二幀
方式 2:固定長度。大家最熟悉的HTTP協(xié)議就是這種方式:Header+Content漂羊。

  • Header : 協(xié)議頭部驾锰,放置一些Meta信息。
  • Content : 應用之間交互的信息主體走越。
    在HTTP header中 通過Content-Length告知message有多長椭豫,應用層才能識別到這條message。比如下圖的HTTP1.1協(xié)議旨指。


    image.png

Netty拆包流程

首先在Netty的拆包流程中有兩個重要的變量cumulationcumulator赏酥。cumulation 是Netty中自定義的ByteBuf,與Java原生的ByteBuf還不一樣谆构,這個我們之后再講裸扶,我們就直接理解成一個字節(jié)容器,cumulator 是一個累加器搬素。

    ByteBuf cumulation;
    private Cumulator cumulator = MERGE_CUMULATOR;

累加器的代碼如下呵晨。簡單講就是通過調(diào)用API buffer.writeBytes(in); 把in數(shù)據(jù)通過內(nèi)存拷貝的方式合并到cumulation中,在合并前判斷是否要對cumulation 進行擴容熬尺。

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            final ByteBuf buffer;
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain() or if its read-only.
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };

理解了這兩個變量后我們在看ByteToMessageDecoder 中的channelRead方法摸屠,該方法是每次從緩沖區(qū)讀到數(shù)據(jù)時自動調(diào)用。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
               //1粱哼、合并數(shù)據(jù)到字節(jié)容器中
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //2餐塘、把字節(jié)容器中的數(shù)據(jù)拆包并添加到業(yè)務數(shù)據(jù)容器out中
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //3、清理字節(jié)容器
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }
                //4皂吮、把拆包后的數(shù)據(jù)交給后面的Handler解碼
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

主要步驟已經(jīng)在代碼中注釋。下面我們來具體分析一下税手。
1蜂筹、合并數(shù)據(jù)到字節(jié)容器中。先判斷字節(jié)容器cumulation中是否有數(shù)據(jù)芦倒,沒有就直接賦值艺挪,有的話,有的話就調(diào)用累加器進行累加兵扬。
2麻裳、把字節(jié)容器中的數(shù)據(jù)拆包并添加到業(yè)務數(shù)據(jù)容器out中。
我們來看一下callDecode

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
               ............
               //1器钟、讀取字節(jié)累加容器中的可讀字節(jié)數(shù)
                int oldInputLength = in.readableBytes();
                //2津坑、交給業(yè)務拆包器進行拆包
               decode(ctx, in, out);
              .............
                 //3、還沒有解出數(shù)據(jù)包
                if (outSize == out.size()) {
                   //4傲霸、如果字節(jié)累加容器中的可讀字節(jié)數(shù)沒變跳出循環(huán)接著讀數(shù)據(jù)
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                    //5疆瑰、還沒有解出數(shù)據(jù)包眉反,繼續(xù)解包
                        continue;
                    }
                }
                //6、解出來了穆役,但是累加容器實際沒有讀到數(shù)據(jù)拋異常
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }
       ............
    }

3寸五、清理字節(jié)容器。如果累加容器不為空并且沒有可讀數(shù)據(jù)耿币,那么直接釋放掉梳杏。否則如果還有未讀數(shù)據(jù),并且次數(shù)大于16淹接,discardAfterReads的默認值為16十性。調(diào)用discardSomeReadBytes,discardSomeReadBytes源碼如下表示當讀索引超過容量的一半時蹈集,進行數(shù)據(jù)前移烁试。

    @Override
    public ByteBuf discardSomeReadBytes() {
     ......
        if (readerIndex >= capacity() >>> 1) {
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
            readerIndex = 0;
        }
     ......
        return this;
    }

丟棄前


image.png

丟棄后


image.png

4、把拆包后的數(shù)據(jù)交給后面的Handler解碼拢肆。通過fireChannelRead實現(xiàn)减响,同時會設置變量decodeWasNull,用來標識是否解出數(shù)據(jù)包郭怪。該變量用在 channelReadComplete 函數(shù)中支示,該函數(shù)的源碼如下。

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        numReads = 0;
        discardSomeReadBytes();
        if (decodeWasNull) {
            decodeWasNull = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        ctx.fireChannelReadComplete();
    }

decodeWasNull 為true鄙才,表示沒有解出數(shù)據(jù)包颂鸿,這個時候如果channel 設置成了非自動讀取,調(diào)用ctx.read()攒庵。這個方法有什么作用呢晨仑?
AbstractChannelHandlerContext類中找到了read 方法胖喳。read()函數(shù)會調(diào)用invokeRead(),這個方法的源代碼如下:

    private void invokeRead() {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }

最終傳播到HeadContext的read()方法,最后調(diào)用unsafe.beginRead()設置關心底層read事件艘款,從而實現(xiàn)激活后自動讀取數(shù)據(jù)煌妈。

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

unsafe是Netty內(nèi)部實現(xiàn)底層IO細節(jié)的類坤塞,beginRead()方法設定底層Selector關心read事件焕济,如果read事件就緒,則會調(diào)用unsafe.read()方法讀取數(shù)據(jù)场梆,然后調(diào)用channelPipe.fireChannelRead()方法通知用戶已讀取到數(shù)據(jù)墅冷,可進行業(yè)務處理。

自定義協(xié)議的拆包

在上一篇文章中采用的是通用拆包器LengthFieldBasedFrameDecoder或油,基本上所有的基于長度的二進制協(xié)議都可以用他進行拆包寞忿。關于LengthFieldBasedFrameDecoder如何使用,我這里依然推薦閃電俠同學的這篇文章装哆,LengthFieldBasedFrameDecoder罐脊,這里不在贅述定嗓。
那么如何對自定義的協(xié)議進行拆包?
我曾在簡書上看到過關于這方面的文章萍桌,比如這篇文章宵溅,但是這篇文章中的例子卻沒有考慮半包問題,所以我這邊以另外一個例子作為講解內(nèi)容上炎。
我在學習RPC開源框架的時候恃逻,偶然發(fā)現(xiàn)了張旭大神一個人寫的RPC框架——Navi-pbrpc,這個框架基于Netty網(wǎng)絡通信和Protobuf的序列化藕施,非常適合學習RPC的同學入門(推薦學習完Navi-pbrpc源碼寇损,再去看Dubbo的源碼會更好)。在這個框架中裳食,應用層他自定義了一個協(xié)議矛市,該協(xié)議基于header+body方式,header內(nèi)含的body length屬性來表明二進制數(shù)據(jù)長度诲祸,body采用經(jīng)過protobuf壓縮后的二進制數(shù)據(jù)浊吏。

NsHead是Navi-pbrpc 內(nèi)部的header。NsHead + protobuf序列化body包結構示意如下


image.png

NsHead結構如下:


image.png

NsHead的固定長度36個字節(jié)救氯,Header各字段中可以看到body-length字段找田,用來標識消息體長度。
了解了協(xié)議結構之后着憨,下面我們看一下Navi-pbrpc是如何進行拆包的墩衙。
拆包的源碼如下:

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 解決半包問題,此時Nshead還沒有接收全甲抖,channel中留存的字節(jié)流不做處理
        if (in.readableBytes() < NsHead.NSHEAD_LEN) {
            return;
        }

        in.markReaderIndex();

        byte[] bytes = new byte[NsHead.NSHEAD_LEN];
        in.readBytes(bytes, 0, NsHead.NSHEAD_LEN);

        NsHead nsHead = new NsHead();
        nsHead.wrap(bytes);

        // 解決半包問題漆改,此時body還沒有接收全,channel中留存的字節(jié)流不做處理准谚,重置readerIndex
        if (in.readableBytes() < (int) nsHead.getBodyLen()) {
            in.resetReaderIndex();
            return;
        }

        // 此時接受到了足夠的一個包籽懦,開始處理
        in.markReaderIndex();

        byte[] totalBytes = new byte[(int) nsHead.getBodyLen()];
        in.readBytes(totalBytes, 0, (int) nsHead.getBodyLen());

        PbrpcMsg decoded = PbrpcMsg.of(nsHead).setData(totalBytes);
        ContextHolder.putContext("_logid", nsHead.getLogId()); // TODO

        if (decoded != null) {
            out.add(decoded);
        }
    }

1、可以看到的是先從ByteBuf 中讀取可讀的字節(jié)數(shù)氛魁,如果沒有達到NsHead的大小,就不做處理厅篓,如果達到了就讀取36個字節(jié)到字節(jié)數(shù)組中秀存,并且封裝成一個nsHead,里面包含了上圖NsHead結構里的所有字段羽氮。
2或链、拿到NsHead中body-length,,可以知道body的長度档押,接著讀取ByteBuf中可讀字段大小澳盐,如果小于body長度就返回祈纯,否則讀取ByteBuf到字節(jié)數(shù)組中,解決半包問題叼耙。
3腕窥、最后利用Protobuf客戶端SDK反序列化方法拿到消息體。
這里涉及到Netty的ByteBuf筛婉,留在以后的文章中再講簇爆。

總結

本文主要通過源碼分析介紹了Netty的拆包流程,主要分為了四個步驟:
1爽撒、合并數(shù)據(jù)到字節(jié)容器中入蛆。
2、把字節(jié)容器中的數(shù)據(jù)拆包并添加到業(yè)務數(shù)據(jù)容器out中
3硕勿、清理字節(jié)容器
4哨毁、把拆包后的數(shù)據(jù)交給后面的Handler解碼
并且通過一個例子介紹了如何自定義協(xié)議拆包。其實不管是netty還是自己去拆包源武,流程無非是文章開頭介紹的流程扼褪,萬變不離其宗吧。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末软能,一起剝皮案震驚了整個濱河市迎捺,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌查排,老刑警劉巖凳枝,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異跋核,居然都是意外死亡岖瑰,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門砂代,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蹋订,“玉大人,你說我怎么就攤上這事刻伊÷督洌” “怎么了?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵捶箱,是天一觀的道長智什。 經(jīng)常有香客問我,道長丁屎,這世上最難降的妖魔是什么荠锭? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮晨川,結果婚禮上证九,老公的妹妹穿的比我還像新娘删豺。我一直安慰自己,他們只是感情好愧怜,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布呀页。 她就那樣靜靜地躺著,像睡著了一般叫搁。 火紅的嫁衣襯著肌膚如雪赔桌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天渴逻,我揣著相機與錄音疾党,去河邊找鬼。 笑死惨奕,一個胖子當著我的面吹牛雪位,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播梨撞,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼雹洗,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了卧波?” 一聲冷哼從身側響起时肿,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎港粱,沒想到半個月后螃成,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡查坪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年寸宏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片偿曙。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡氮凝,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出望忆,到底是詐尸還是另有隱情罩阵,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布启摄,位于F島的核電站永脓,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏鞋仍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一搅吁、第九天 我趴在偏房一處隱蔽的房頂上張望威创。 院中可真熱鬧落午,春花似錦、人聲如沸肚豺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吸申。三九已至梗劫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間截碴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工日丹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留走哺,地道東北人。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓哲虾,卻偏偏與公主長得像丙躏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子束凑,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 5,846評論 0 13
  • 一晒旅、粘包與拆包 1、發(fā)送時的粘包與拆包 TCP連接維護了一個發(fā)送緩存區(qū)汪诉。將要發(fā)送給對端的數(shù)據(jù)會由socket AP...
    益文的圈閱讀 4,261評論 6 14
  • 為什么要粘包拆包 為什么要粘包 首先你得了解一下TCP/IP協(xié)議废恋,在用戶數(shù)據(jù)量非常小的情況下,極端情況下摩瞎,一個字節(jié)...
    簡書閃電俠閱讀 20,621評論 23 77
  • 1. 有些人無需在意拴签。 本來不需要為這樣的人多費口舌和心力,不過也算是一種經(jīng)歷 來英國兩年終于清晰的感受到了可能是...
    水仙與惡魔Slog閱讀 231評論 0 0
  • 這是一個冷笑話岸梨,內(nèi)容是—— 今天,你向首頁投稿了嗎稠氮? 上周三上午十一點主編在群里發(fā)了個好消息:今天首頁投稿完全開放...
    Jk不二子閱讀 1,874評論 75 71