使用LengthFieldBasedFrameDecoder解碼器及自定義

最近需要做一個長連接的設(shè)備管理厌漂,使用netty可以方便的做到,還可以配置心跳及解碼器

自定義長度解碼器

LengthFieldBasedFrameDecoder解碼器自定義長度解決TCP粘包黏包問題。所以又稱為: 自定義長度解碼器

TCP粘包和黏包現(xiàn)象

  1. TCP粘包是指發(fā)送方發(fā)送的若干個數(shù)據(jù)包到接收方時粘成一個包。從接收緩沖區(qū)來看炭分,后一個包數(shù)據(jù)的頭緊接著前一個數(shù)據(jù)的尾蒋院。

  2. 當(dāng)TCP連接建立后沉颂,Client發(fā)送多個報文給Server,TCP協(xié)議保證數(shù)據(jù)可靠性悦污,但無法保證Client發(fā)了n個包铸屉,服務(wù)端也按照n個包接收。Client端發(fā)送n個數(shù)據(jù)包切端,Server端可能收到n-1或n+1個包彻坛。

為什么出現(xiàn)粘包現(xiàn)象?

  1. 發(fā)送方原因: TCP默認(rèn)會使用Nagle算法踏枣。而Nagle算法主要做兩件事:1)只有上一個分組得到確認(rèn)昌屉,才會發(fā)送下一個分組;2)收集多個小分組茵瀑,在一個確認(rèn)到來時一起發(fā)送间驮。所以,正是Nagle算法造成了發(fā)送方有可能造成粘包現(xiàn)象马昨。

  2. 接收方原因: TCP接收方采用緩存方式讀取數(shù)據(jù)包竞帽,一次性讀取多個緩存中的數(shù)據(jù)包。自然出現(xiàn)前一個數(shù)據(jù)包的尾和后一個收據(jù)包的頭粘到一起鸿捧。

如何解決粘包現(xiàn)象

就是要選擇相應(yīng)的解碼器

  • 添加特殊符號屹篓,接收方通過這個特殊符號將接收到的數(shù)據(jù)包拆分開 - DelimiterBasedFrameDecoder特殊分隔符解碼器
  • 每次發(fā)送固定長度的數(shù)據(jù)包 - FixedLengthFrameDecoder定長編碼器
  • 在消息頭中定義長度字段,來標(biāo)識消息的總長度 - LengthFieldBasedFrameDecoder自定義長度解碼器

LengthFieldBasedFrameDecoder參數(shù)

自定義長度解碼器匙奴,所以構(gòu)造函數(shù)中6個參數(shù)堆巧,基本都圍繞那個定義長度域,進行的描述。

  1. maxFrameLength - 發(fā)送的數(shù)據(jù)幀最大長度
  2. lengthFieldOffset - 定義長度域位于發(fā)送的字節(jié)數(shù)組中的下標(biāo)谍肤。換句話說:發(fā)送的字節(jié)數(shù)組中下標(biāo)為${lengthFieldOffset}的地方是長度域的開始地方
  3. lengthFieldLength - 用于描述定義的長度域的長度啦租。換句話說:發(fā)送字節(jié)數(shù)組bytes時, 字節(jié)數(shù)組bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength]域?qū)?yīng)于的定義長度域部分
  4. lengthAdjustment - 滿足公式: 發(fā)送的字節(jié)數(shù)組bytes.length - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
  5. initialBytesToStrip - 接收到的發(fā)送數(shù)據(jù)包,去除前initialBytesToStrip位
  6. failFast - true: 讀取到長度域超過maxFrameLength荒揣,就拋出一個 TooLongFrameException刷钢。false: 只有真正讀取完長度域的值表示的字節(jié)之后,才會拋出 TooLongFrameException乳附,默認(rèn)情況下設(shè)置為true内地,建議不要修改,否則可能會造成內(nèi)存溢出
  7. ByteOrder - 數(shù)據(jù)存儲采用大端模式或小端模式

舉例解釋參數(shù)如何寫

客戶端多次發(fā)送"HELLO, WORLD"字符串給服務(wù)端赋除。"HELLO, WORLD"共12字節(jié)(12B)阱缓。長度域中的內(nèi)容是16進制的值,如下:

  1. 0x000c -----> 12

  2. 0x000e -----> 14

場景1

數(shù)據(jù)包大小: 14B = 長度域2B + "HELLO, WORLD"


1.png

解釋:

如上圖举农,長度域的值為12B(0x000c)荆针。希望解碼后保持一樣,根據(jù)上面的公式,參數(shù)應(yīng)該為:

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 2

  3. lengthAdjustment = 0 = 數(shù)據(jù)包長度(14) - lengthFieldOffset - lengthFieldLength - 長度域的值(12)

  4. initialBytesToStrip = 0 - 解碼過程中颁糟,沒有丟棄任何數(shù)據(jù)

場景2

數(shù)據(jù)包大小: 14B = 長度域2B + "HELLO, WORLD"


2.png

解釋:

上圖中航背,解碼后,希望丟棄長度域2B字段棱貌,所以玖媚,只要initialBytesToStrip = 2即可。其他與場景1相同

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 2

  3. lengthAdjustment = 0 = 數(shù)據(jù)包長度(14) - lengthFieldOffset - lengthFieldLength - 長度域的值(12)

  4. initialBytesToStrip = 2 解碼過程中婚脱,丟棄2個字節(jié)的數(shù)據(jù)

場景3

數(shù)據(jù)包大小: 14B = 長度域2B + "HELLO, WORLD"今魔。與場景1不同的是:場景3中長度域的值為14(0x000E)


3.png

解釋:

如上圖,長度域的值為14(0x000E)障贸。希望解碼后保持一樣错森,根據(jù)上面的公式,參數(shù)應(yīng)該為:

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 2

  3. lengthAdjustment = -2 = 數(shù)據(jù)包長度(14) - lengthFieldOffset - lengthFieldLength - 長度域的值(14)

  4. initialBytesToStrip = 0 - 解碼過程中篮洁,沒有丟棄任何數(shù)據(jù)

場景4

場景4在長度域前添加2個字節(jié)的Header涩维。長度域的值(0x00000C) = 12≡ǎ總數(shù)據(jù)包長度: 17=Header(2B) + 長度域(3B) + "HELLO, WORLD"


4.png

解釋

如上圖瓦阐。編碼解碼后,長度保持一致锋叨,所以initialBytesToStrip = 0垄分。參數(shù)應(yīng)該為:

  1. lengthFieldOffset = 2

  2. lengthFieldLength = 3

  3. lengthAdjustment = 0 = 數(shù)據(jù)包長度(17) - lengthFieldOffset(2) - lengthFieldLength(3) - 長度域的值(12)

  4. initialBytesToStrip = 0 - 解碼過程中,沒有丟棄任何數(shù)據(jù)

場景5

與場景4不同的地方是: Header與長度域的位置換了娃磺。總數(shù)據(jù)包長度: 17=長度域(3B) + Header(2B) + "HELLO, WORLD"


5.png

解釋

如上圖叫倍。編碼解碼后偷卧,長度保持一致豺瘤,所以initialBytesToStrip = 0。參數(shù)應(yīng)該為:

  1. lengthFieldOffset = 0

  2. lengthFieldLength = 3

  3. lengthAdjustment = 2 = 數(shù)據(jù)包長度(17) - lengthFieldOffset(0) - lengthFieldLength(3) - 長度域的值(12)

  4. initialBytesToStrip = 0 - 解碼過程中听诸,沒有丟棄任何數(shù)據(jù)

場景6

如下圖坐求,"HELLO, WORLD"域前有多個字段∩卫妫總數(shù)據(jù)長度: 16 = HEADER1(1) + 長度域(2) + HEADER2(1) + "HELLO, WORLD"


6.png
  1. lengthFieldOffset = 1

  2. lengthFieldLength = 2

  3. lengthAdjustment = 1 = 數(shù)據(jù)包長度(16) - lengthFieldOffset(1) - lengthFieldLength(2) - 長度域的值(12)

  4. initialBytesToStrip = 0 - 解碼過程中桥嗤,沒有丟棄任何數(shù)據(jù)

自定義協(xié)議

很多時候并不能按照以上參數(shù)的方式去解析數(shù)據(jù),所以需要自定義協(xié)議仔蝌,LengthFieldBasedFrameDecoder解碼器自定義協(xié)議.通常,協(xié)議的格式如下:

協(xié)議格式.png

通常來說,使用ByteToMessageDocoder這個編碼器,我們要分別解析出Header,length,body這幾個字段.而使用LengthFieldBasedFrameDecoder,我們就可以直接接收想要的一部分,相當(dāng)于在原來的基礎(chǔ)上包上了一層,有了這層之后,我們可以控制我們每次只要讀想讀的字段,這對于自定義協(xié)議來說十分方便.

  1. MyProtocolDecoder的定義
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
    private static final int HEADER_SIZE = 6;
    /**
     *
     * @param maxFrameLength  幀的最大長度
     * @param lengthFieldOffset length字段偏移的地址
     * @param lengthFieldLength length字段所占的字節(jié)長
     * @param lengthAdjustment 修改幀數(shù)據(jù)長度字段中定義的值泛领,可以為負(fù)數(shù) 因為有時候我們習(xí)慣把頭部記入長度,若為負(fù)數(shù),則說明要推后多少個字段
     * @param initialBytesToStrip 解析時候跳過多少個長度
     * @param failFast 為true,當(dāng)frame長度超過maxFrameLength時立即報TooLongFrameException異常敛惊,為false渊鞋,讀取完整個幀再報異
     */
    public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在這里調(diào)用父類的方法,實現(xiàn)指得到想要的部分,我在這里全部都要,也可以只要body部分
        in = (ByteBuf) super.decode(ctx,in);  

        if(in == null){
            return null;
        }
        if(in.readableBytes()<HEADER_SIZE){
            throw new Exception("字節(jié)數(shù)不足");
        }
        //讀取type字段
        byte type = in.readByte();
        //讀取flag字段
        byte flag = in.readByte();
        //讀取length字段
        int length = in.readInt();
        
        if(in.readableBytes()!=length){
            throw new Exception("標(biāo)記的長度不符合實際長度");
        }
        //讀取body
        byte []bytes = new byte[in.readableBytes()];
        in.readBytes(bytes);
        return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
    }
}

在上述的代碼中,調(diào)用父類的方法瞧挤,實現(xiàn)截取到自己想要的字段锡宋,如可以判斷數(shù)據(jù)必須以xx開頭。

  1. 協(xié)議實體的定義
public class MyProtocolBean {
    //類型  系統(tǒng)編號 0xA 表示A系統(tǒng)特恬,0xB 表示B系統(tǒng)
    private byte type;
    //信息標(biāo)志  0xA 表示心跳包    0xC 表示超時包  0xC 業(yè)務(wù)信息包
    private byte flag;
    //內(nèi)容長度
    private int length;
    //內(nèi)容
    private String content;

    public MyProtocolBean(byte flag, byte type, int length, String content) {
        this.flag = flag;
        this.type = type;
        this.length = length;
        this.content = content;
    }
}

3.服務(wù)端的實現(xiàn)

public class Server {

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //最大長度
    private static final int LENGTH_FIELD_LENGTH = 4;  //長度字段所占的字節(jié)數(shù)
    private static final int LENGTH_FIELD_OFFSET = 2;  //長度偏移
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
                            ch.pipeline().addLast(new ServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 綁定端口执俩,開始接收進來的連接
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port );
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new Server(port).start();
    }
}
  1. 服務(wù)端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MyProtocolBean myProtocolBean = (MyProtocolBean)msg;  //直接轉(zhuǎn)化成協(xié)議消息實體
        System.out.println(myProtocolBean.getContent());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
}

服務(wù)端Handler沒什么特別的地方,只是輸出接收到的消息

  1. 客戶端
public class Client {
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolEncoder());
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    });

            ChannelFuture future = b.connect(HOST, PORT).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}
  1. 客戶端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
        ctx.writeAndFlush(myProtocolBean);
    }
}

客戶端Handler實現(xiàn)發(fā)送消息.

  1. 客戶端編碼器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {

    @Override
    protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
        if(msg == null){
            throw new Exception("msg is null");
        }
        out.writeByte(msg.getType());
        out.writeByte(msg.getFlag());
        out.writeInt(msg.getLength());
        out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
    }
}

編碼的時候,只需要按照定義的順序依次寫入到ByteBuf中.

小結(jié)
若是上面的參數(shù)直接可以滿足要求,可以直接使用參數(shù)癌刽,若不可以則通過自定義的方式去實現(xiàn)奠滑,更加靈活。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末妒穴,一起剝皮案震驚了整個濱河市宋税,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌讼油,老刑警劉巖杰赛,帶你破解...
    沈念sama閱讀 221,888評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異矮台,居然都是意外死亡乏屯,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評論 3 399
  • 文/潘曉璐 我一進店門瘦赫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來辰晕,“玉大人,你說我怎么就攤上這事确虱『眩” “怎么了?”我有些...
    開封第一講書人閱讀 168,386評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長窘问。 經(jīng)常有香客問我辆童,道長,這世上最難降的妖魔是什么惠赫? 我笑而不...
    開封第一講書人閱讀 59,726評論 1 297
  • 正文 為了忘掉前任把鉴,我火速辦了婚禮,結(jié)果婚禮上儿咱,老公的妹妹穿的比我還像新娘庭砍。我一直安慰自己,他們只是感情好混埠,可當(dāng)我...
    茶點故事閱讀 68,729評論 6 397
  • 文/花漫 我一把揭開白布怠缸。 她就那樣靜靜地躺著,像睡著了一般岔冀。 火紅的嫁衣襯著肌膚如雪凯旭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,337評論 1 310
  • 那天使套,我揣著相機與錄音罐呼,去河邊找鬼。 笑死侦高,一個胖子當(dāng)著我的面吹牛嫉柴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播奉呛,決...
    沈念sama閱讀 40,902評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼计螺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了瞧壮?” 一聲冷哼從身側(cè)響起登馒,我...
    開封第一講書人閱讀 39,807評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎咆槽,沒想到半個月后陈轿,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,349評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡秦忿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,439評論 3 340
  • 正文 我和宋清朗相戀三年麦射,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灯谣。...
    茶點故事閱讀 40,567評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡潜秋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出胎许,到底是詐尸還是另有隱情峻呛,我是刑警寧澤罗售,帶...
    沈念sama閱讀 36,242評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站杀饵,受9級特大地震影響莽囤,放射性物質(zhì)發(fā)生泄漏谬擦。R本人自食惡果不足惜切距,卻給世界環(huán)境...
    茶點故事閱讀 41,933評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望惨远。 院中可真熱鬧谜悟,春花似錦、人聲如沸北秽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,420評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贺氓。三九已至蔚叨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間辙培,已是汗流浹背蔑水。 一陣腳步聲響...
    開封第一講書人閱讀 33,531評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留扬蕊,地道東北人搀别。 一個月前我還...
    沈念sama閱讀 48,995評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像尾抑,于是被迫代替她去往敵國和親歇父。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,585評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 簡介 用簡單的話來定義tcpdump再愈,就是:dump the traffic on a network榜苫,根據(jù)使用者...
    JasonShi6306421閱讀 1,245評論 0 1
  • 簡介 用簡單的話來定義tcpdump,就是:dump the traffic on a network翎冲,根據(jù)使用者...
    保川閱讀 5,961評論 1 13
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,111評論 1 32
  • 國家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 11,005評論 6 13
  • 最全的iOS面試題及答案 iOS面試小貼士 ———————————————回答好下面的足夠了-----------...
    zweic閱讀 2,704評論 0 73