高性能IO框架Netty:手把手教你解決“粘包/半包”問題

前言:demo演示

首先昂利,我們來看個demo


image.png

1、EchoServer

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws InterruptedException {
        EchoServer echoServer = new EchoServer(9999);
        System.out.println("服務器即將啟動");
        echoServer.start();
        System.out.println("服務器關閉");
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服務端啟動必須*/
            b.group(group)/*將線程組傳入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO進行網(wǎng)絡傳輸*/
                    .localAddress(new InetSocketAddress(port))/*指定服務器監(jiān)聽端口*/
                    /*服務端每接收到一個連接請求,就會新啟一個socket通信粉臊,也就是channel,
                    所以下面這段代碼的作用就是為這個子channel增加handle*/
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(serverHandler);/*添加到該子channel的pipeline的尾部*/
                        }
                    });
            ChannelFuture f = b.bind().sync();/*異步綁定到服務器驶兜,sync()會阻塞直到完成*/
            System.out.println("服務器啟動完成扼仲,等待客戶端的連接和數(shù)據(jù).....");
            f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/
        } finally {
            group.shutdownGracefully().sync();/*優(yōu)雅關閉線程組*/
        }
    }

}

2远寸、EchoServerHandler

@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 服務端讀取到網(wǎng)絡數(shù)據(jù)后的處理*/
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept[" + request
                + "] and the counter is:" + counter.incrementAndGet());
        String resp = "Hello," + request + ". Welcome to Netty World!"
                + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));

    }

    /*** 發(fā)生異常后的處理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

使用netty實現(xiàn)了個服務端,當接收到客戶端的消息是屠凶,打印出來請求的內(nèi)容驰后,并統(tǒng)計接收請求的次數(shù)。

3矗愧、EchoClient

public class EchoClient {

    private final int port;
    private final String host;

    public EchoClient(int port, String host) {
        this.port = port;
        this.host = host;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            final Bootstrap b = new Bootstrap();
            /*客戶端啟動必須*/
            b.group(group)/*將線程組傳入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO進行網(wǎng)絡傳輸*/
                    .remoteAddress(new InetSocketAddress(host, port))/*配置要連接服務器的ip地址和端口*/
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient(9999, "127.0.0.1").start();
    }
}

4灶芝、EchoClientHandler

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客戶端讀取到網(wǎng)絡數(shù)據(jù)后的處理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
                + "] and the counter is:" + counter.incrementAndGet());
    }

    /*** 客戶端被通知channel活躍后,做事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "test1,test2,test3,test4"
                + System.getProperty("line.separator");
        for (int i = 0; i < 100; i++) {
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    /*** 發(fā)生異常后的處理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

使用netty實現(xiàn)了個客戶端唉韭,鏈接建立完成之后向服務端發(fā)送消息夜涕。循環(huán)100次。并且打印服務端返回的消息属愤。并統(tǒng)計返回次數(shù)女器。

執(zhí)行結(jié)果

服務端輸出

image

客戶端打印

image

結(jié)果發(fā)現(xiàn),我們客戶單發(fā)送了100次數(shù)據(jù)春塌,但實際上只接收了30次晓避。而且每次消息發(fā)送的是test1,test2,test3,test4,test5簇捍,但實際接受的卻有很多相鏈接起來的只壳。這是為什么呢?為什么不是100次test1,test2,test3,test4,test5呢暑塑?這就是TCP傳輸?shù)恼嘲?半包問題吼句。

一、什么是TCP粘包半包事格?

image

假設客戶端分別發(fā)送了兩個數(shù)據(jù)包D1和D2給服務端惕艳,由于服務端一次讀取到的字節(jié)數(shù)是不確定的,故可能存在以下4種情況驹愚。

  • 服務端分兩次讀取到了兩個獨立的數(shù)據(jù)包远搪,分別是D1和D2,沒有粘包和拆包逢捺;
  • 服務端一次接收到了兩個數(shù)據(jù)包谁鳍,D1和D2粘合在一起,被稱為TCP粘包劫瞳;
  • 服務端分兩次讀取到了兩個數(shù)據(jù)包倘潜,第一次讀取到了完整的D1包和D2包的部分內(nèi)容,第二次讀取到了D2包的剩余內(nèi)容志于,這被稱為TCP拆包涮因;
  • 服務端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了D1包的部分內(nèi)容D1_1伺绽,第二次讀取到了D1包的剩余內(nèi)容D1_2和D2包的整包养泡。

如果此時服務端TCP接收滑窗非常小嗜湃,而數(shù)據(jù)包D1和D2比較大,很有可能會發(fā)生第五種可能澜掩,即服務端分多次才能將D1和D2包接收完全净蚤,期間發(fā)生多次拆包。

二输硝、TCP粘包/半包發(fā)生的原因

由于TCP協(xié)議本身的機制(面向連接的可靠地協(xié)議-三次握手機制)客戶端與服務器會維持一個連接(Channel)今瀑,數(shù)據(jù)在連接不斷開的情況下,可以持續(xù)不斷地將多個數(shù)據(jù)包發(fā)往服務器点把,但是如果發(fā)送的網(wǎng)絡數(shù)據(jù)包太小橘荠,那么他本身會啟用Nagle算法(可配置是否啟用)對較小的數(shù)據(jù)包進行合并(基于此,TCP的網(wǎng)絡延遲要UDP的高些)然后再發(fā)送(超時或者包大小足夠)郎逃。那么這樣的話哥童,服務器在接收到消息(數(shù)據(jù)流)的時候就無法區(qū)分哪些數(shù)據(jù)包是客戶端自己分開發(fā)送的,這樣產(chǎn)生了粘包褒翰;服務器在接收到數(shù)據(jù)庫后贮懈,放到緩沖區(qū)中,如果消息沒有被及時從緩存區(qū)取走优训,下次在取數(shù)據(jù)的時候可能就會出現(xiàn)一次取出多個數(shù)據(jù)包的情況朵你,造成粘包現(xiàn)象

UDP:本身作為無連接的不可靠的傳輸協(xié)議(適合頻繁發(fā)送較小的數(shù)據(jù)包),他不會對數(shù)據(jù)包進行合并發(fā)送(也就沒有Nagle算法之說了)揣非,他直接是一端發(fā)送什么數(shù)據(jù)抡医,直接就發(fā)出去了,既然他不會對數(shù)據(jù)合并早敬,每一個數(shù)據(jù)包都是完整的(數(shù)據(jù)+UDP頭+IP頭等等發(fā)一次數(shù)據(jù)封裝一次)也就沒有粘包一說了忌傻。

分包產(chǎn)生的原因就簡單的多:可能是IP分片傳輸導致的,也可能是傳輸過程中丟失部分包導致出現(xiàn)的半包搞监,還有可能就是一個包可能被分成了兩次傳輸水孩,在取數(shù)據(jù)的時候,先取到了一部分(還可能與接收的緩沖區(qū)大小有關系)琐驴,總之就是一個數(shù)據(jù)包被分成了多次接收俘种。

更具體的原因有三個,分別如下棍矛。

  • 應用程序?qū)懭霐?shù)據(jù)的字節(jié)大小大于套接字發(fā)送緩沖區(qū)的大小
  • 進行MSS大小的TCP分段安疗。MSS是最大報文段長度的縮寫。MSS是TCP報文段中的數(shù)據(jù)字段的最大長度够委。數(shù)據(jù)字段加上TCP首部才等于整個的TCP報文段荐类。所以MSS并不是TCP報文段的最大長度,而是:MSS=TCP報文段長度-TCP首部長
  • 以太網(wǎng)的payload大于MTU進行IP分片茁帽。MTU指:一種通信協(xié)議的某一層上面所能通過的最大數(shù)據(jù)包大小玉罐。如果IP層有一個數(shù)據(jù)包要傳屈嗤,而且數(shù)據(jù)的長度比鏈路層的MTU大,那么IP層就會進行分片吊输,把數(shù)據(jù)包分成托干片饶号,讓每一片都不超過MTU。注意季蚂,IP分片可以發(fā)生在原始發(fā)送端主機上茫船,也可以發(fā)生在中間路由器上。

三扭屁、解決粘包半包問題

由于底層的TCP無法理解上層的業(yè)務數(shù)據(jù)算谈,所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的,這個問題只能通過上層的應用協(xié)議棧設計來解決料滥,根據(jù)業(yè)界的主流協(xié)議的解決方案然眼,可以歸納如下。

1葵腹、在包尾增加分割符

在包尾增加分割符高每,比如回車換行符進行分割,例如FTP協(xié)議践宴;

demo如下:

LineBaseEchoServer

public class LineBaseEchoServer {

    public static final int PORT = 9998;

    public static void main(String[] args) throws InterruptedException {
        LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
        System.out.println("服務器即將啟動");
        lineBaseEchoServer.start();
    }

    public void start() throws InterruptedException {
        final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服務端啟動必須*/
            b.group(group)/*將線程組傳入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO進行網(wǎng)絡傳輸*/
                    .localAddress(new InetSocketAddress(PORT))/*指定服務器監(jiān)聽端口*/
                    /*服務端每接收到一個連接請求鲸匿,就會新啟一個socket通信,也就是channel浴井,
                    所以下面這段代碼的作用就是為這個子channel增加handle*/
                    .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();/*異步綁定到服務器晒骇,sync()會阻塞直到完成*/
            System.out.println("服務器啟動完成,等待客戶端的連接和數(shù)據(jù).....");
            f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/
        } finally {
            group.shutdownGracefully().sync();/*優(yōu)雅關閉線程組*/
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //添加換行解碼器
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseServerHandler());
        }
    }

}

LineBaseEchoServer

public class LineBaseEchoServer {

    public static final int PORT = 9998;

    public static void main(String[] args) throws InterruptedException {
        LineBaseEchoServer lineBaseEchoServer = new LineBaseEchoServer();
        System.out.println("服務器即將啟動");
        lineBaseEchoServer.start();
    }

    public void start() throws InterruptedException {
        final LineBaseServerHandler serverHandler = new LineBaseServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            ServerBootstrap b = new ServerBootstrap();/*服務端啟動必須*/
            b.group(group)/*將線程組傳入*/
                    .channel(NioServerSocketChannel.class)/*指定使用NIO進行網(wǎng)絡傳輸*/
                    .localAddress(new InetSocketAddress(PORT))/*指定服務器監(jiān)聽端口*/
                    /*服務端每接收到一個連接請求磺浙,就會新啟一個socket通信,也就是channel徒坡,
                    所以下面這段代碼的作用就是為這個子channel增加handle*/
                    .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();/*異步綁定到服務器撕氧,sync()會阻塞直到完成*/
            System.out.println("服務器啟動完成,等待客戶端的連接和數(shù)據(jù).....");
            f.channel().closeFuture().sync();/*阻塞直到服務器的channel關閉*/
        } finally {
            group.shutdownGracefully().sync();/*優(yōu)雅關閉線程組*/
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //添加換行解碼器
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseServerHandler());
        }
    }

}

LineBaseEchoClient

public class LineBaseEchoClient {

    private final String host;

    public LineBaseEchoClient(String host) {
        this.host = host;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();/*線程組*/
        try {
            final Bootstrap b = new Bootstrap();
            b.group(group)/*將線程組傳入*/
                    .channel(NioSocketChannel.class)/*指定使用NIO進行網(wǎng)絡傳輸*/
                    .remoteAddress(new InetSocketAddress(host, LineBaseEchoServer.PORT))/*配置要連接服務器的ip地址和端口*/
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已連接到服務器.....");
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //回車符做了分割
            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
            ch.pipeline().addLast(new LineBaseClientHandler());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new LineBaseEchoClient("127.0.0.1").start();
    }
}

LineBaseClientHandler

 public class LineBaseClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    private AtomicInteger counter = new AtomicInteger(0);

    /*** 客戶端讀取到網(wǎng)絡數(shù)據(jù)后的處理*/
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("client Accept[" + msg.toString(CharsetUtil.UTF_8)
                + "] and the counter is:" + counter.incrementAndGet());
        ctx.close();
    }

    /*** 客戶端被通知channel活躍后喇完,做事*/
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "test1,test2,test3,test4,test5"
                + System.getProperty("line.separator");
        for (int i = 0; i < 10; i++) {
            Thread.sleep(500);
            System.out.println(System.currentTimeMillis() + ":即將發(fā)送數(shù)據(jù):"
                    + request);
            msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    /*** 發(fā)生異常后的處理*/
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

執(zhí)行效果

image

2伦泥、消息定長

例如每個報文的大小為固定長度200字節(jié),如果不夠锦溪,空位補空格不脯;

服務端只需將服務端的ChannelInitializerImp 解碼器new LineBasedFrameDecoder(1024)替換為new FixedLengthFrameDecoder( FixedLengthEchoClient.REQUEST.length())即可。

 private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            //添加定長報文長度解碼器刻诊,長度問請求的長度
            ch.pipeline().addLast(
                    new FixedLengthFrameDecoder(
                            FixedLengthEchoClient.REQUEST.length()));
            ch.pipeline().addLast(new FixedLengthServerHandler());
        }
    }

3衰倦、將消息分為消息頭和消息體

消息頭中包含表示消息總長度(或者消息體長度)的字段蜂厅,通常設計思路為消息頭的第一個字段使用int32來表示消息的總長度。類似與第二條顷帖,只是我們按照頭部的content-length長度進行定長解碼。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末涣旨,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌峦剔,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件角钩,死亡現(xiàn)場離奇詭異吝沫,居然都是意外死亡,警方通過查閱死者的電腦和手機递礼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門野舶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人宰衙,你說我怎么就攤上這事平道。” “怎么了供炼?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵一屋,是天一觀的道長。 經(jīng)常有香客問我袋哼,道長冀墨,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任涛贯,我火速辦了婚禮诽嘉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弟翘。我一直安慰自己虫腋,他們只是感情好,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布稀余。 她就那樣靜靜地躺著悦冀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪睛琳。 梳的紋絲不亂的頭發(fā)上盒蟆,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天,我揣著相機與錄音师骗,去河邊找鬼历等。 笑死,一個胖子當著我的面吹牛辟癌,可吹牛的內(nèi)容都是我干的寒屯。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼愿待,長吁一口氣:“原來是場噩夢啊……” “哼浩螺!你這毒婦竟也來了靴患?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤要出,失蹤者是張志新(化名)和其女友劉穎鸳君,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體患蹂,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡或颊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了传于。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片囱挑。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖沼溜,靈堂內(nèi)的尸體忽然破棺而出平挑,到底是詐尸還是另有隱情,我是刑警寧澤系草,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布通熄,位于F島的核電站,受9級特大地震影響找都,放射性物質(zhì)發(fā)生泄漏唇辨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一能耻、第九天 我趴在偏房一處隱蔽的房頂上張望赏枚。 院中可真熱鬧,春花似錦晓猛、人聲如沸饿幅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽诫睬。三九已至,卻和暖如春帕涌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背续徽。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工蚓曼, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人钦扭。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓纫版,卻偏偏與公主長得像,于是被迫代替她去往敵國和親客情。 傳聞我的和親對象是個殘疾皇子其弊,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容