Netty學(xué)習(xí)筆記三-TCP粘包拆包

案例重現(xiàn)

首先我們通過(guò)具體的case重現(xiàn)一下TCP粘包的過(guò)程
我們模擬下故障場(chǎng)景隘谣,客戶端循環(huán)一百次調(diào)用服務(wù)端傳輸報(bào)文,服務(wù)端接收?qǐng)?bào)文并打印接收?qǐng)?bào)文和計(jì)數(shù)竖哩,同時(shí)根據(jù)報(bào)文回應(yīng)客戶端
服務(wù)端代碼

public class TimeServerHandler extends ChannelHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception{
        ByteBuf byteBuf = (ByteBuf)msg;
        byte[] req = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);
        System.out.println("received msg length:"+req.length);
        String body = new String(req,"UTF-8").substring(0,req.length-System.getProperty("line.separator").length());
        System.out.println("the time server receive order:"+body + ";the counter is:" + ++count);
        String currentTIme = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
            System.currentTimeMillis()
        ).toString():"BAD ORDER";
        currentTIme = currentTIme + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTIme.getBytes());
        ctx.writeAndFlush(resp);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

public class TimeServer {
    public void bind(int port) throws Exception {
        //創(chuàng)建兩個(gè)線程組 一個(gè)用于服務(wù)端接收客戶端的連接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //一個(gè)用于網(wǎng)絡(luò)讀寫(xiě)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childHandler(new ChildChannelHander());
            ChannelFuture future = b.bind(port).sync();
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHander extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
           // ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
          //  ch.pipeline().addLast(new StringDecoder());
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static  void main(String[] args) {
        int port = 8080;
        if (args != null && args.length >0) {
            try {
                port = Integer.valueOf(args[0]);
            }catch (NumberFormatException e) {

            }
        }
        try {
            new TimeServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客戶端代碼

public class TimeClientHandler extends ChannelHandlerAdapter {
    private int count;

    private byte[] req;

    public TimeClientHandler() {
         req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i< 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         ByteBuf byteBuf = (ByteBuf)msg;
        byte[] req = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(req);
        String body = new String(req,"UTF-8");
        System.out.println("Now is : "+body + "the counter is :"+ ++count);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Unexpected exception from downstream :"+cause.getMessage());
        ctx.close();
    }
}
public class TimeClient {
    public void connect(int port, String host) throws Exception {
        //創(chuàng)建讀寫(xiě)io線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        //socketChannel.pipeline().addLast(new StringDecoder());
                        socketChannel.pipeline().addLast(new TimeClientHandler());

                    }
                });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {

            }
        }
        try {
            new TimeClient().connect(port, "127.0.0.1");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

程序運(yùn)行結(jié)果
服務(wù)端


image.png

image.png

客戶端


image.png

按照設(shè)計(jì)初衷椅贱,客戶端應(yīng)該發(fā)送了100次數(shù)據(jù)給服務(wù)端,服務(wù)端每接收一次數(shù)據(jù)就回應(yīng)一次客戶端策严,那么客戶端應(yīng)該收到一百次消息,但是實(shí)際上客戶端就收到2次消息饿敲,服務(wù)端也只收到兩次消息妻导。說(shuō)明服務(wù)端和客戶端都發(fā)生了粘包現(xiàn)象。

產(chǎn)生粘包拆包的原因

TCP協(xié)議是基于流的協(xié)議诀蓉,是沒(méi)有邊界的一串?dāng)?shù)據(jù)栗竖,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的拆分,上述例子中默認(rèn)TCP緩存區(qū)的大小是1024個(gè)字節(jié)渠啤,服務(wù)端第一次收到的數(shù)據(jù)大小正好是1024個(gè)字節(jié),也就是說(shuō)多個(gè)小的報(bào)文可能封裝出一個(gè)大的數(shù)據(jù)進(jìn)行傳送添吗,而一個(gè)大的報(bào)文可能會(huì)被拆分成多個(gè)小包進(jìn)行傳送沥曹。

解決辦法

由于TCP是底層通訊協(xié)議,它不關(guān)心上層業(yè)務(wù),無(wú)法保證數(shù)據(jù)包不會(huì)拆包或者粘包妓美,那么這個(gè)問(wèn)題只能通過(guò)上層協(xié)議來(lái)解決僵腺,通常的解決辦法有以下幾點(diǎn):
1、消息定長(zhǎng)壶栋,例如每個(gè)報(bào)文都是500個(gè)字節(jié)辰如,如果報(bào)文不夠500個(gè)字節(jié),那么就填充
2贵试、報(bào)文尾部增加特殊分隔符
3琉兜、消息分為消息頭和消息體,消息頭定義消息的長(zhǎng)度毙玻,消息體還是真實(shí)傳送的報(bào)文(類型UDP協(xié)議)

Netty解決粘包拆包問(wèn)題

Netty提供了多種編碼解碼器處理上述問(wèn)題豌蟋,其中可以使用LineBasedFrameDecoder解決粘包問(wèn)題
服務(wù)端代碼只要上述TimeServer加一個(gè)LineBasedFrameDecoder的ChannelHandler

 private class ChildChannelHander extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new TimeServerHandler());
        }
    }

客戶端代碼只要上述TimeClient中加一個(gè)LineBasedFrameDecoder的ChannelHandler

public void connect(int port, String host) throws Exception {
        //創(chuàng)建讀寫(xiě)io線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                        //socketChannel.pipeline().addLast(new StringDecoder());
                        socketChannel.pipeline().addLast(new TimeClientHandler());

                    }
                });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

代碼運(yùn)行結(jié)果:


[圖片上傳中...(image.png-bdb177-1553093851212-0)]

image.png

客戶端運(yùn)行結(jié)果


image.png

由此可見(jiàn),增加LineBasedFrameDecoder之后解決了粘包問(wèn)題
LineBasedFrameDecoder的工作原理是遍歷緩沖區(qū)的可讀字節(jié)桑滩,判斷是否是“\n”或者"\r\n"梧疲,如果有,那么就以該位置作為結(jié)束位置运准,從緩沖區(qū)可讀區(qū)域到結(jié)束位置作為一個(gè)完整報(bào)文幌氮,他是以標(biāo)識(shí)符作為解碼器。

LineBasedFrameDecoder 源碼分析:

image.png

我們先看下LineBasedFrameDecoder的類繼承圖胁澳,繼承了ByteToMessageDecoder方法该互,ByteToMessageDecoder是將ByteBuf字節(jié)解碼成其他消息類型的抽象類,它有一個(gè)關(guān)鍵的方法:

callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

作用就是從Channel讀到的字節(jié)數(shù)據(jù)轉(zhuǎn)換成對(duì)應(yīng)的具體消息類型List<Object>輸出,而具體怎么解碼是由繼承它的子類實(shí)現(xiàn)

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

LineBasedFrameDecoder繼承ByteToMessageDecoder并實(shí)現(xiàn)了具體的decode方法听哭。
當(dāng)LineBasedFrameDecoder加入到ChannelPipeLine管道后慢洋,當(dāng)Channel緩沖區(qū)中有數(shù)據(jù)時(shí)會(huì)調(diào)用channelRead方法,ByteToMessageDecoder的channelRead源碼:

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //1、判斷是否是字節(jié)數(shù)據(jù)
        if (msg instanceof ByteBuf) {
            //2陆盘、初始化輸出數(shù)據(jù)
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
              //3普筹、判斷cumulation是否為空,初始化后cumulation為空,first為true
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);
                    data.release();
                }
              //調(diào)用解析程序隘马,將字節(jié)流轉(zhuǎn)換傳out集合對(duì)象
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //將緩沖區(qū)回收防止內(nèi)存溢出
                if (cumulation != null && !cumulation.isReadable()) {
                    cumulation.release();
                    cumulation = null;
                }
                //獲得返回?cái)?shù)據(jù)結(jié)果的大小
                int size = out.size();
                decodeWasNull = size == 0;
                //依次調(diào)用ChannelPipeLine的下一個(gè)ChannelRead方法太防,并將編碼后的結(jié)果傳遞下去
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            //1、判斷Bytebuf是否還有數(shù)據(jù)可讀
            while (in.isReadable()) {
                int outSize = out.size();
                int oldInputLength = in.readableBytes();
                //2酸员、調(diào)用實(shí)際的解碼方法蜒车,如果能讀到一行數(shù)據(jù),會(huì)將數(shù)據(jù)添加到out中
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }
              //3幔嗦、如果步驟2沒(méi)有添加成功酿愧,代碼無(wú)數(shù)據(jù)可讀,跳出循環(huán)
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
                //4邀泉、如果還是無(wú)數(shù)據(jù)可讀嬉挡,直接拋異常
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

LineBasedFrameDecoder的Decode方法如下:

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //調(diào)用實(shí)際解碼函數(shù)钝鸽,返回結(jié)果
        Object decoded = decode(ctx, in);
        if (decoded != null) {
        //添加到返回結(jié)果集List中
            out.add(decoded);
        }
    }
 protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        //1、找到一行數(shù)據(jù)的結(jié)束標(biāo)識(shí)的位置
        final int eol = findEndOfLine(buffer);
        if (!discarding) {
            if (eol >= 0) {
                final ByteBuf frame;
                //2庞钢、結(jié)束位置-上次已經(jīng)讀取的位置=當(dāng)前一行數(shù)據(jù)的長(zhǎng)度
                final int length = eol - buffer.readerIndex();
                //3拔恰、下次讀取需要跳過(guò)結(jié)束字符  所以將結(jié)束字符的長(zhǎng)度記錄下
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                //4、如果當(dāng)前一行數(shù)據(jù)長(zhǎng)度已經(jīng)大于最大定義的可解碼數(shù)據(jù)的長(zhǎng)度基括,代表已經(jīng)解析結(jié)束
                if (length > maxLength) {
                    buffer.readerIndex(eol + delimLength);
                    fail(ctx, length);
                    return null;
                }

                if (stripDelimiter) {
                  //5颜懊、讀取當(dāng)前一行數(shù)據(jù)到臨時(shí)緩存區(qū)
                    frame = buffer.readBytes(length);
                  //6、下次讀取需要跳過(guò)結(jié)束字符 
                    buffer.skipBytes(delimLength);
                } else {
                    frame = buffer.readBytes(length + delimLength);
                }
               //返回本次讀取的一行數(shù)據(jù)
                return frame;
            } else {
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                return null;
            }
        } else {
            if (eol >= 0) {
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {
                discardedBytes = buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
            }
            return null;
        }
    }
 private static int findEndOfLine(final ByteBuf buffer) {
        //獲得當(dāng)前ByteBuf寫(xiě)的位置
        final int n = buffer.writerIndex();
        //從ByteBuf可讀位置開(kāi)始循環(huán)风皿,一直遍歷到ByteBuf寫(xiě)的位置河爹,如果有\(zhòng)n 或者\(yùn)r\n,則意味找到一行數(shù)據(jù)結(jié)束的位置 并返回結(jié)束位置
        for (int i = buffer.readerIndex(); i < n; i ++) {
            final byte b = buffer.getByte(i);
            if (b == '\n') {
                return i;
            } else if (b == '\r' && i < n - 1 && buffer.getByte(i + 1) == '\n') {
                return i;  // \r\n
            }
        }
        return -1;  // Not found.
    }

由上源碼分析可知,LineBasedFrameDecoder通過(guò)'\r'或者'\r\n'作為分割界限作為一個(gè)完整的報(bào)文輸出揪阶,如果需要其他定制的分隔符作為界限則可以繼承ByteToMessageDecoder 重寫(xiě)decode方法實(shí)現(xiàn)昌抠。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鲁僚,隨后出現(xiàn)的幾起案子炊苫,更是在濱河造成了極大的恐慌,老刑警劉巖冰沙,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件侨艾,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡拓挥,警方通過(guò)查閱死者的電腦和手機(jī)唠梨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)侥啤,“玉大人当叭,你說(shuō)我怎么就攤上這事「蔷模” “怎么了蚁鳖?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)赁炎。 經(jīng)常有香客問(wèn)我醉箕,道長(zhǎng),這世上最難降的妖魔是什么徙垫? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任讥裤,我火速辦了婚禮,結(jié)果婚禮上姻报,老公的妹妹穿的比我還像新娘己英。我一直安慰自己,他們只是感情好吴旋,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布剧辐。 她就那樣靜靜地躺著寒亥,像睡著了一般邮府。 火紅的嫁衣襯著肌膚如雪荧关。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天褂傀,我揣著相機(jī)與錄音忍啤,去河邊找鬼。 笑死仙辟,一個(gè)胖子當(dāng)著我的面吹牛同波,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播叠国,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼未檩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了粟焊?” 一聲冷哼從身側(cè)響起冤狡,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎项棠,沒(méi)想到半個(gè)月后悲雳,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡香追,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年合瓢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片透典。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡晴楔,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出峭咒,到底是詐尸還是另有隱情税弃,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布讹语,位于F島的核電站钙皮,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏顽决。R本人自食惡果不足惜短条,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望才菠。 院中可真熱鬧茸时,春花似錦、人聲如沸赋访。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至渠牲,卻和暖如春旋炒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背签杈。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工瘫镇, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人答姥。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓铣除,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親鹦付。 傳聞我的和親對(duì)象是個(gè)殘疾皇子尚粘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容