netty4用最簡單的協(xié)議解決一個半包問題

有時候簡化實現(xiàn)別人的代碼唆迁,有助于你更好的理解代碼逢享,不要一味地讀源代碼。

問題來源

客戶端往服務器發(fā)送小文件

解決思路

1秀又、使用netty(廢話)
2单寂、只是用ByteBuf
3、自定義一種協(xié)議吐辙,用最小的網(wǎng)絡代價完成數(shù)據(jù)傳送

實現(xiàn)

其實netty有很多的定義好的協(xié)議來解決各種各樣的問題宣决,這篇文章來自《netty權威指南》作者李林峰,詳細介紹了netty的編解碼框架昏苏,以及一些常用的編解碼協(xié)議尊沸。

在解決這個問題的時候威沫,我遇到的一個主要問題就是我在客戶端發(fā)送一個數(shù)據(jù)包,這個數(shù)據(jù)包的大小可以很大洼专,但是如果只用簡單的channelRead去讀取數(shù)據(jù)的話得到的數(shù)據(jù)并不是完整的棒掠。具體原因參考netty用戶指南中的tcp stream-based傳輸?shù)膯栴}。

我先做了一個簡單的協(xié)議設計:
packet = |文件名長度|文件名|文件字節(jié)長度|文件字節(jié)流|

于是就有了客戶端發(fā)送的簡單代碼

            String name = "diagram.png";
            FileInputStream fileInputStream = new FileInputStream(new File("src/main/resources/diagram.png"));
            byte[] bytes = new byte[fileInputStream.available()];
            fileInputStream.read(bytes);

            ByteBuf byteBuf = Unpooled.buffer();

            byteBuf.writeInt("diagram.png".getBytes().length);
            byteBuf.writeBytes("diagram.png".getBytes());

            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
            channelFuture.channel().writeAndFlush(byteBuf);

這樣發(fā)送沒有問題屁商,因為byteBuf是動態(tài)擴展的烟很。但是接受的時候就有問題了。如果我們接受比較小的蜡镶,比如一個int雾袱,我們可以直接這樣寫

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if(msg instanceof ByteBuf)
        {
            ByteBuf byteBuf = (ByteBuf)msg;
            if(byteBuf.readableBytes() > 4)
            {
                int result = byteBuf.readInt();
            }
        }
}

但是當長度很大的時候,我們就需要解決讀半包的問題了官还。直到讀到完整的數(shù)據(jù)才進行處理芹橡。但是每次收到的數(shù)據(jù)怎么去判斷是不是和上一個數(shù)據(jù)是連續(xù)的,如何在沒有收集到完整數(shù)據(jù)時不處理數(shù)據(jù)而繼續(xù)接受呢望伦?這是我一直困擾的問題林说。因為我把每一個byteBuf當成一個message來想了,其實不是的屯伞,ByteBuf中有兩個指針readIndex和writeIndex腿箩,readIndex永遠小于writeIndex。大概如下圖所示


ByteBuf示例圖

在netty的設計中ByteBuf是可以被重用的劣摇,所以可能針對這一個ChannelRead一直讀取的是同一個ByteBuf度秘。這其中readrIndex之前的是已經讀取過的,就是已經被調用readXXX()之后的數(shù)據(jù)饵撑,可以重新去讀取剑梳,readerIndex和writerIndex之前的是當前的readableBytes,writerIndex到capacity的是writeableBytes滑潘,當writerIndex超過capacity時就會擴展垢乙。同時為了重用這部分空間,當調用discardBytes時语卤,會把readerIndex和writerIndex拷貝到開頭追逮,這樣前面廢棄的部分就被重用了,也一定程度場避免了擴容粹舵,節(jié)省了空間钮孵。

那如何針對上面的輸入寫B(tài)yteBuf的解碼呢?
先看看netty自帶的解碼器怎么解決這個問題眼滤,其中LengthFieldBasedFrameDecoder就是用來解決這一類的問題的巴席。在李林峰的文章中有詳細介紹,這里就不贅述了诅需。
我在之前代碼的基礎上添加了兩行代碼漾唉。

//在服務器的pipeline中添加的這個解碼器荧库,然后用4個字節(jié)表示整個包的長度,并且廢棄掉這四個字節(jié)赵刑。
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));

//在發(fā)送的byteBuf頭部添加真?zhèn)€包的長度
byteBuf.writeInt(4+ name.getBytes().length +4+ bytes.length);

然后我再在ChannelRead中處理剩下的數(shù)據(jù)
packet = |文件名長度|文件名|文件字節(jié)長度|文件字節(jié)流|

       if(msg instanceof ByteBuf)
        {
            ByteBuf byteBuf = (ByteBuf)msg;
            int nameSize = byteBuf.readInt();
            String name = new String(byteBuf.readBytes(nameSize).array(), "UTF-8");
            int fileSize = byteBuf.readInt();
            FileOutputStream fileOutputStream = new FileOutputStream(new File(name));
            fileOutputStream.write(byteBuf.readBytes(fileSize).array());
            System.out.println(name + " " + fileSize);
        }

問題解決分衫,但是自己如何實現(xiàn)這個解碼器呢?先看看netty怎么實現(xiàn)的般此。

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            long bytesToDiscard = this.bytesToDiscard;
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            in.skipBytes(localBytesToDiscard);
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;

            failIfNecessary(false);
        }

        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }

        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "negative pre-adjustment length field: " + frameLength);
        }

        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;

            if (discard < 0) {
                // buffer contains more bytes then the frameLength so we can discard all now
                in.skipBytes((int) frameLength);
            } else {
                // Enter the discard mode and discard everything received so far.
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            return null;
        }

        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);

        // extract frame
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

好長蚪战。。里面對于不合理的協(xié)議做了很多假設铐懊,并使不合理的輸入快速失敗屎勘。但是讓我一個初學者寫還是寫不出來。所以我假設協(xié)議就是我設計的那樣居扒,簡化這部分代碼,便于理解丑慎。
變量給一個固定值

    private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    private int maxFrameLength = 1024*10;
    private int lengthFieldLength = 4;
    private int initialBytesToStrip = 0;
    private long tooLongFrameLength;
    private long bytesToDiscard;
    private boolean failFast = true;

然后寫decode函數(shù)喜喂,就這么簡單。竿裂。

 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        int frameLength = (int) in.getUnsignedInt(0);//獲取頭部
        if(in.readableBytes() < frameLength)//當ByteBuf沒有達到長度時玉吁,return null
        {
            return null;
        }
        in.skipBytes(4);//舍棄頭部
        int index =  in.readerIndex();
        ByteBuf frame = in.slice(index, frameLength).retain();//取出自己定義的packet包返回給ChannelRead

        in.readerIndex(frameLength);//這一步一定要有,不然其實bytebuf的readerIndex沒有變腻异,netty會一直從這里開始讀取进副,將readerIndex移動就相當于把前面的數(shù)據(jù)處理過了廢棄掉了。
        return  frame;
    }

所以其實我們只要不處理bytebuf的數(shù)據(jù)知道可以讀的數(shù)據(jù)達到我們需要的長度在處理就可以了悔常。當然包的順序不會出錯是由底層tcp保證的影斑,不用關心。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末机打,一起剝皮案震驚了整個濱河市矫户,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌残邀,老刑警劉巖皆辽,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異芥挣,居然都是意外死亡驱闷,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門空免,熙熙樓的掌柜王于貴愁眉苦臉地迎上來空另,“玉大人,你說我怎么就攤上這事蹋砚”曰唬” “怎么了征字?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長娇豫。 經常有香客問我匙姜,道長,這世上最難降的妖魔是什么冯痢? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任氮昧,我火速辦了婚禮,結果婚禮上浦楣,老公的妹妹穿的比我還像新娘袖肥。我一直安慰自己,他們只是感情好振劳,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布椎组。 她就那樣靜靜地躺著,像睡著了一般历恐。 火紅的嫁衣襯著肌膚如雪寸癌。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天弱贼,我揣著相機與錄音蒸苇,去河邊找鬼。 笑死吮旅,一個胖子當著我的面吹牛溪烤,可吹牛的內容都是我干的。 我是一名探鬼主播庇勃,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼檬嘀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了责嚷?” 一聲冷哼從身側響起枪眉,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎再层,沒想到半個月后贸铜,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡聂受,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年蒿秦,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛋济。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡棍鳖,死狀恐怖,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情渡处,我是刑警寧澤镜悉,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站医瘫,受9級特大地震影響侣肄,放射性物質發(fā)生泄漏。R本人自食惡果不足惜醇份,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一稼锅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧僚纷,春花似錦矩距、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至痊臭,卻和暖如春哮肚,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背趣兄。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留悼嫉,地道東北人艇潭。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像戏蔑,于是被迫代替她去往敵國和親蹋凝。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 綜述 netty通...
    jiangmo閱讀 5,842評論 0 13
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理总棵,服務發(fā)現(xiàn)鳍寂,斷路器,智...
    卡卡羅2017閱讀 134,601評論 18 139
  • 國家電網(wǎng)公司企業(yè)標準(Q/GDW)- 面向對象的用電信息數(shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 10,869評論 6 13
  • 前言 問題 現(xiàn)如今我們使用通用的應用程序或者類庫來實現(xiàn)系統(tǒng)之間地互相訪問情龄。例如迄汛,我們經常使用一個HTTP客戶端來從...
    Kohler閱讀 767評論 0 2
  • 文/鄉(xiāng)土依舊 都知道冬吃蘿卜對健康有益鞍爱,所以當?shù)厝藗兞晳T種植晚秋季蘿卜,可趕在大地封凍之前收獲专酗。 一來供自家食用睹逃,...
    小小有夢閱讀 231評論 0 2