Netty實現(xiàn)自定義協(xié)議和源碼分析

原文博客:Doi技術團隊
鏈接地址:https://blog.doiduoyi.com
初心:記錄優(yōu)秀的Doi技術團隊學習經(jīng)歷

本篇 主要講的是自定義協(xié)議是如何實現(xiàn)的恕酸,以及自定義協(xié)議中會出現(xiàn)的問題和Netty是如何支持的紊馏。

分為4個部分
|-- 粘包 拆包 和解決方案
|-- 代碼實現(xiàn)
|-- ByteToMessageDecoder的源碼分析
|-- 過程流程圖

粘包

在傳輸數(shù)據(jù)太多的時候鹅龄,TCP是會將數(shù)據(jù)塊分包發(fā)送的殊鞭,也就是在網(wǎng)絡延遲的問題疗韵,本來一個完整數(shù)據(jù)塊分成兩個包权薯,在開始讀取數(shù)據(jù)的時候嗓化,只收到了一個數(shù)據(jù)包帽馋,這個時候搅方,怎么辦?如果先處理一個數(shù)據(jù)包绽族,這樣數(shù)據(jù)不完整姨涡。等待?那如何知道數(shù)據(jù)完整了呢吧慢?

拆包

TCP是以字節(jié)流流的方式來傳輸?shù)奶纹瑪?shù)據(jù)是存儲在緩沖區(qū)。雖然發(fā)送數(shù)據(jù)是以每個包發(fā)送的检诗,但如果網(wǎng)絡出現(xiàn)延遲匈仗,在第一個包的數(shù)據(jù)還存儲在緩沖區(qū)的時候,第二個包就發(fā)送過來了逢慌。此時第二個包的數(shù)據(jù)也存儲到緩沖區(qū)悠轩,這時候,就不知道第一個包在哪兒結束攻泼,第二個包的數(shù)據(jù)是從哪里開始讀取了哗蜈。這就是TCP粘包。

解決方案

方案一:
解決方案其實就是坠韩,如何去自定義定義這個協(xié)議包,去解決粘包的問題和數(shù)據(jù)包不全的問題炼列。
自定義協(xié)議包括如下:

  • 一個開始標志:比如定義一個Int類型只搁,4個字節(jié)的標志。那么在讀到這個開始標志的時候就判斷為是一個數(shù)據(jù)包的開始俭尖。
  • 數(shù)據(jù)的長度:也是Int4個字節(jié),表明這個數(shù)據(jù)塊的大小是多小個字節(jié)氢惋,這樣根據(jù)這個長度就可以知道數(shù)據(jù)包是否已經(jīng)接受完畢,如果還沒有稽犁,那么就等待焰望。
  • 數(shù)據(jù):真正的傳輸數(shù)據(jù)

代碼實現(xiàn)

協(xié)議包對象類

/**
 *
 * 自己定義的協(xié)議
 *  數(shù)據(jù)包格式
 * +——----——+——-----——+——----——+
 * |協(xié)議開始標志|  長度             |   數(shù)據(jù)       |
 * +——----——+——-----——+——----——+
 * 1.協(xié)議開始標志head_data,為int類型的數(shù)據(jù)已亥,16進制表示為0X76
 * 2.傳輸數(shù)據(jù)的長度contentLength熊赖,int類型
 * 3.要傳輸?shù)臄?shù)據(jù)
 *
 */
public class CustomDate {


    /**
     * 消息開頭的信息標志
     * 是一個常量 X077 
     */
    private  final int head_Date = Costom.HEAD_DATA.getVaule();


    /**
     * 消息的長度
     */
    private int contentLength;

    /**
     * 消息的內(nèi)容
     */
    private byte[] conctent;


    public CustomDate() {
        super();
    }

    public CustomDate(int contentLength, byte[] conctent) {
        this.contentLength = contentLength;
        this.conctent = conctent;
    }

    public int getContentLength() {
        return contentLength;
    }

    public void setContentLength(int contentLength) {
        this.contentLength = contentLength;
    }

    public byte[] getConctent() {
        return conctent;
    }

    public void setConctent(byte[] conctent) {
        this.conctent = conctent;
    }

    public int getHead_Date() {
        return head_Date;
    }
}

解碼類

核心思想:

  • 1 在開始讀取數(shù)據(jù)的時候先判斷字節(jié)大小是否基本數(shù)據(jù)長度 (標志+數(shù)據(jù)長度)
  • 2 如果緩沖區(qū)數(shù)據(jù)太大,這種情況不正常虑椎,應該移動2048個字節(jié)震鹉,直接處理后面的字節(jié)俱笛。因為,可能是網(wǎng)絡延遲導致传趾,或者是惡意發(fā)送大量數(shù)據(jù)迎膜。
  • 3 開始讀取緩沖區(qū)了,對緩沖區(qū)的操作浆兰。首先標記一下閱讀標記點磕仅,然后開始尋找開始標記,如果不是開始標記簸呈,那么就跳過一個標記節(jié)點榕订。
  • 4 如果找到了開始標記,那么就繼續(xù)獲取長度蝶棋。如果長度大小大于緩沖區(qū)的可讀長度卸亮,那么就證明還有數(shù)據(jù)還沒到。就回滾到閱讀標記點玩裙。繼續(xù)等待數(shù)據(jù)兼贸。
  • 5 如果數(shù)據(jù)已經(jīng)到達了,那么就開始讀取數(shù)據(jù)區(qū)吃溅。

繼承 ByteToMessageDecoder 類溶诞。該類主要作用是將從網(wǎng)絡緩沖區(qū)讀取的字節(jié)轉換成有意義的消息對象的

/**
 *
 * 自己定義的協(xié)議
 *  數(shù)據(jù)包格式
 * +——----——+——-----——+——----——+
 * |協(xié)議開始標志|  長度             |   數(shù)據(jù)       |
 * +——----——+——-----——+——----——+
 * 1.協(xié)議開始標志head_data,為int類型的數(shù)據(jù)决侈,16進制表示為0X76
 * 2.傳輸數(shù)據(jù)的長度contentLength螺垢,int類型
 * 3.要傳輸?shù)臄?shù)據(jù),長度不應該超過2048,防止socket流的攻擊
 *
 */
public class CustomDecoder extends ByteToMessageDecoder {

    /**
     * 協(xié)議開始的標準head_data赖歌,int類型枉圃,占據(jù)4個字節(jié).
     * 表示數(shù)據(jù)的長度contentLength,int類型庐冯,占據(jù)4個字節(jié).
     */

    private final int BASE_LENGTH = 4 + 4;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        //1. 首先確認可讀長度大于基本長度
        if (buffer.readableBytes() > BASE_LENGTH) {

            //2.
            // 防止socket字節(jié)流攻擊
            // 防止孽亲,客戶端傳來的數(shù)據(jù)過大
            // 因為,太大的數(shù)據(jù)展父,是不合理的
            if (buffer.readableBytes() > 2048) {
                //將readerIndex移動
                buffer = buffer.skipBytes(buffer.readableBytes());
            }


            //3. 記錄閱讀開始
            int beginRead;
            while (true) {
                //獲取包頭開始的index;
                beginRead = buffer.readerIndex();
                // 標記包頭開始的index
                buffer.markReaderIndex();
                //如果讀到了數(shù)據(jù)包的協(xié)議開頭返劲,那么就結束循環(huán)
                if (buffer.readInt() == Costom.HEAD_DATA.getVaule()) {
                    break;
                }

                //沒讀到協(xié)議開頭,退回到標記
                buffer.resetReaderIndex();
                //跳過一個字節(jié)
                buffer.readByte();

                //如果可讀長度小于基本長度
                //
                if (buffer.readableBytes() < BASE_LENGTH) {
                    return;
                }
            }
            //獲取消息的長度
            int length = buffer.readInt();

            //判斷請求數(shù)據(jù)包是否到齊
            if (buffer.readableBytes() < length) {
                buffer.resetReaderIndex();
                return;
            }

            byte[] date = new byte[length];
            buffer.readBytes(date);
            CustomDate customDate = new CustomDate(length, date);
            out.add(customDate);

        }


    }
}

編碼類

  • 其實就是往緩沖區(qū)里面寫數(shù)據(jù)栖茉。
  • 繼承 MessageToByteEncoder
public class CustomEncoder extends MessageToByteEncoder<CustomDate> {

    @Override
    protected void encode(ChannelHandlerContext ctx, CustomDate msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getHead_Date());
        out.writeInt(msg.getContentLength());
        out.writeBytes(msg.getConctent());
    }
}

添加到管道

public class ServerHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new CustomDecoder());
        pipeline.addLast(new CustomEncoder());
        pipeline.addLast(new ServerHandler());

    }
}

輸出協(xié)議數(shù)據(jù)

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof CustomDate) {
           CustomDate customDate= (CustomDate) msg;
            byte[] conctent = customDate.getConctent();
            System.out.println("獲取到的內(nèi)容"+new String(conctent));
            ReferenceCountUtil.release(msg);
        }
    }
}

過程分析

我們研究一下解碼的過程篮绿。
1 自定義的解碼類是繼承ByteToMessageDecoder類。先看下ByteToMessageDecoder類

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{}

可以看到ByteToMessageDecoder 是繼承ChannelInboundHandlerAdapter吕漂,那也就是說亲配,數(shù)據(jù)處理應該是通過重寫channelRead()類了。
2 那就繼續(xù)看ByteToMessageDecoder 的channelRead() 方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                // 獲取到緩沖區(qū)
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 2 開始解碼
                callDecode(ctx, cumulation, out);
            } finally {
                // 資源釋放代碼
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

重點是2 callDecode()方法。該方法是開始解碼弃榨。繼續(xù)往下看該方法

 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            // 1 
            while (in.isReadable()) {
                int outSize = out.size();
                // 2
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }
                
                //3
                int oldInputLength = in.readableBytes();
                //4
                decodeRemovalReentryProtection(ctx, in, out);

                   
                if (ctx.isRemoved()) {
                    break;
                }
                
                //5
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException( );
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } 
    }

1 用while循環(huán)不斷處理緩沖區(qū)菩收,判斷條件是如果緩沖區(qū)還有可讀數(shù)據(jù),就繼續(xù)執(zhí)行鲸睛。
2 這個是非常好的設計娜饵,Out變量存儲的是解碼生成的對象。如果out里面已經(jīng)有對象官辈,那么就把該對象通過fireChannelRead()方法傳到下一個handler(也就是本程序中的輸出handler)箱舞。

出現(xiàn)這種情況是因為:粘包!H凇G绻伞!肺魁! 當處理完一個數(shù)據(jù)包的數(shù)據(jù)后电湘,緩沖區(qū)還有下一個數(shù)據(jù)包的數(shù)據(jù),所以先把處理完的數(shù)據(jù)包交給下一個handler處理后鹅经,再進行緩沖區(qū)的讀取寂呛。

3 做一個標記。記錄這次解碼對緩沖區(qū)數(shù)據(jù)有沒有被讀锐巍(也就是有沒有讀取數(shù)據(jù))贷痪。如果沒有,下面就會結束while循環(huán)蹦误。

為什么會要做這個標記呢劫拢?為什么要結束循環(huán)呢?
因為:緩沖區(qū)的數(shù)據(jù)沒有讀取强胰,也就是說數(shù)據(jù)還沒全部到齊舱沧,需要等待數(shù)據(jù)完整再處理。所以就需要結束while循環(huán)偶洋。等待下一次的處理熟吏。

4 decodeRemovalReentryProtection() 就是調用自己重寫的decode()方法了。

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            // 自己重寫的decode
            decode(ctx, in, out);
        } finally {
            //省略
        }
    }

5 這里就是判斷3 中的標記涡真,是否退出循環(huán)。

過程流程圖

看完代碼分析還是一頭霧水肾筐? 那就再看一下流程圖吧


image.png
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末哆料,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子吗铐,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件兵睛,死亡現(xiàn)場離奇詭異,居然都是意外死亡奋渔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門壮啊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嫉鲸,“玉大人,你說我怎么就攤上這事歹啼⌒” “怎么了?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵狸眼,是天一觀的道長藤树。 經(jīng)常有香客問我,道長拓萌,這世上最難降的妖魔是什么岁钓? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮微王,結果婚禮上屡限,老公的妹妹穿的比我還像新娘。我一直安慰自己骂远,他們只是感情好囚霸,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著激才,像睡著了一般拓型。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瘸恼,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天劣挫,我揣著相機與錄音,去河邊找鬼东帅。 笑死压固,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的靠闭。 我是一名探鬼主播帐我,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼愧膀!你這毒婦竟也來了拦键?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤檩淋,失蹤者是張志新(化名)和其女友劉穎芬为,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡媚朦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年氧敢,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片询张。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡孙乖,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出瑞侮,到底是詐尸還是另有隱情的圆,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布半火,位于F島的核電站越妈,受9級特大地震影響,放射性物質發(fā)生泄漏钮糖。R本人自食惡果不足惜梅掠,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望店归。 院中可真熱鬧阎抒,春花似錦、人聲如沸消痛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽秩伞。三九已至逞带,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間纱新,已是汗流浹背展氓。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留脸爱,地道東北人遇汞。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像簿废,于是被迫代替她去往敵國和親空入。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容