Netty入門教程9——自定義解碼器處理半包消息

概述

在Netty系列之Netty編解碼框架中有各種解碼器, 推薦組合:

  • LengthFieldBasedFrameDecoder
  • ByteToMessageDecoder

這兩個解碼器來處理業(yè)務(wù)消息闲询。但是有時候為了靈活性,會直接選擇繼承
ByteToMessageDecoder
來處理業(yè)務(wù)消息,但是直接繼承ByteToMessageDecoder,則需要自己處理半包問題木羹。

在閱讀本文內(nèi)容之前阳欲,你至少需要了解以下兩個知識點

1、netty的ByteBuf類的基本api用法
2谷羞、什么是TCP半包

雖然JAVA NIO中也有個ByteBuffer類,但是在Netty程序中,基本都是直接用Netty的ByteBuf類,它包裝了更多好用的接口,降低了使用緩沖區(qū)類的難度。

自定義消息協(xié)議

目前自定義的消息協(xié)議用的最多的是在消息中頭四個字節(jié)保存消息的長度,格式大概如下

image.png

len : 表示消息的長度,通常用4個字節(jié)保存
head : 消息頭部
body : 消息內(nèi)容

無論每次請求的業(yè)務(wù)數(shù)據(jù)多大,都是使用上面的消息格式來表示的。

注意

在實際的項目中,消息格式可能會增加一些標志,例如,開始標記,結(jié)束標志,消息序列號,消息的協(xié)議類型(json或者二進制等),這里為了描述的方便,就不講附加的這些消息標志了诀艰。

自定義解碼器處理半包數(shù)據(jù)

如上描述,直接繼承ByteToMessageDecoder類,同時覆蓋其decode方法,完整實現(xiàn)代碼如下

服務(wù)端代碼

package nettyinaction.encode.lengthfield.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new SocketServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;


public class SocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SelfDefineEncodeHandler());
        pipeline.addLast(new BusinessServerHandler());
    }
}







package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class SelfDefineEncodeHandler extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf bufferIn, List<Object> out) throws Exception {
        if (bufferIn.readableBytes() < 4) {
            return;
        }

        int beginIndex = bufferIn.readerIndex();
        int length = bufferIn.readInt();

        if (bufferIn.readableBytes() < length) {
            bufferIn.readerIndex(beginIndex);
            return;
        }

        bufferIn.readerIndex(beginIndex + 4 + length);

        ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);

        otherByteBufRef.retain();

        out.add(otherByteBufRef);
    }
}





package nettyinaction.encode.lengthfield.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BusinessServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf)msg;
        int length = buf.readInt();
        assert length == (8);

        byte[] head = new byte[4];
        buf.readBytes(head);
        String headString = new String(head);
        assert  "head".equals(headString);

        byte[] body = new byte[4];
        buf.readBytes(body);
        String bodyString = new String(body);
        assert  "body".equals(bodyString);
    }
}

客戶端代碼

package nettyinaction.encode.lengthfield.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SocketClient {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .handler(new SocketClientInitializer());

            ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
            channelFuture.channel().closeFuture().sync();
        }
        finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class SocketClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SocketClientHandler());
    }
}





package nettyinaction.encode.lengthfield.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
        ByteBuf buffer = allocator.buffer(20);
        buffer.writeInt(8);
        buffer.writeBytes("head".getBytes());
        buffer.writeBytes("body".getBytes());

        ctx.writeAndFlush(buffer);
    }
}

客戶端一旦啟動,會發(fā)送一條長度為8的消息到服務(wù)端,服務(wù)端首先使用SelfDefineEncodeHandler類對消息進行解碼,處理半包問題雳锋。如果消息是有效的完整的消息,當SelfDefineEncodeHandler處理完消息后,會把消息轉(zhuǎn)發(fā)給BusinessServerHandler處理,BusinessServerHandler只是簡單的做個驗證,判斷消息內(nèi)容是否符合預期黄绩。

運行上面的代碼,代碼如預期那樣,可以正確的讀取到消息并解析消息。

這個例子中,最為核心的類就是SelfDefineEncodeHandler了玷过。里面用了很多的技巧,要理解里面的每行代碼,需要分兩種情況來分析,分別是拆包和粘包

下面分別以拆包和粘包做兩個小試驗,來驗證SelfDefineEncodeHandler是否能正常的處理半包問題爽丹。

拆包試驗

先調(diào)整一下SocketClientHandler類中的channelActive方法中的代碼,將body擴大幾十倍,逼迫TCP發(fā)幾次請求到達服務(wù)端,看看服務(wù)端的SelfDefineEncodeHandler能否正常處理筑煮。

UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
ByteBuf buffer = allocator.buffer(20);
buffer.writeInt(1604);
buffer.writeBytes("head".getBytes());
String longMsgBody = "";
for (int i = 0; i < 400; i++) {
    longMsgBody = longMsgBody + "body";
}
buffer.writeBytes(longMsgBody.getBytes());

 ctx.writeAndFlush(buffer);

使用一個for循環(huán),將消息body的長度設(shè)置為1600,加上長度為4的head,總共消息長度為1604。

然后調(diào)整一下服務(wù)端類SelfDefineEncodeHandler類的代碼,加上三行代碼粤蝎。
第一行代碼是加入一個類變量count真仲,統(tǒng)計一下decode方法的調(diào)用次數(shù)

private static int count = 0;

接著在decode方法中加入三行代碼

System.out.println("decode call count="+ ++count);
System.out.println("bufferIn.readableBytes()="+bufferIn.readableBytes());
System.out.println("beginIndex="+beginIndex);

打印出count和bufferIn.readableBytes()的大小以及beginIndex

最后在BusinessServerHandler類加入

private static int count = 0;

成員變量以及在channelRead方法中加入

System.out.println("BusinessServerHandler call count="+ ++count);

運行代碼,打印結(jié)果如下

decode call count=1
bufferIn.readableBytes()=1024
beginIndex=0

decode call count=2
bufferIn.readableBytes()=1608
beginIndex=0

BusinessServerHandler call count=1

這個結(jié)果說明了,雖然客戶端只是發(fā)送了一條消息,但是其實TCP底層是分兩個包發(fā)送給服務(wù)端,第一次發(fā)送了1024個字節(jié),后面的一次請求,才把消息剩下的內(nèi)容發(fā)送給服務(wù)端初澎。

雖然decode方法被調(diào)用了兩次,但是第一次讀取到的信息不完整,因此ByteToMessageDecoder會靜靜的等待另外一個包的到來,第二次讀取完整消息后,才把消息轉(zhuǎn)發(fā)給BusinessServerHandler類,從打印的結(jié)果看,
BusinessServerHandler類的channelRead方法只被調(diào)用了一次秸应。

到此我們知道SelfDefineEncodeHandler類的decode方法是可以應付拆包問題的,那到底是如何做到的呢?現(xiàn)在我們回頭仔細看看decode方法中的代碼谤狡。

第一部分代碼

if (bufferIn.readableBytes() < 4) {
            return;
}

如果接收到的字節(jié)還不到4個字節(jié),也即是連消息長度字段中的內(nèi)容都不完整的,直接return灸眼。

第二部分代碼

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if (bufferIn.readableBytes() < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }

對于拆包這種場景,由于還未讀取到完整的消息,bufferIn.readableBytes() 會小于length,并重置bufferIn的readerIndex為0,然后退出,ByteToMessageDecoder會乖乖的等待下個包的到來。

由于第一次調(diào)用中readerIndex被重置為0,那么decode方法被調(diào)用第二次的時候,beginIndex還是為0的墓懂。

第三部分代碼

bufferIn.readerIndex(beginIndex + 4 + length);

將readerIndex設(shè)置為最大焰宣。首先代碼能執(zhí)行到這里,針對拆包這種場景而言,已經(jīng)是讀取到一條有效完整的消息了。這個時候需要通知ByteToMessageDecoder類,bufferIn中的數(shù)據(jù)已經(jīng)讀取完畢了,不要再調(diào)用decode方法了捕仔。ByteToMessageDecoder類的底層會根據(jù)bufferIn.isReadable()方法來判斷是否讀取完畢匕积。只有將readerIndex設(shè)置為最大,bufferIn.isReadable()方法才會返回false。

第四部分代碼

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);

當decode方法執(zhí)行完后,會釋放bufferIn這個緩沖區(qū),如果將執(zhí)行完釋放操作的bufferIn傳遞給下個處理器的話,一旦下個處理器調(diào)用bufferIn的讀或者寫的方法時,會立刻報出IllegalReferenceCountException異常的榜跌。

因此slice操作后,必須加上一個retain操作,讓bufferIn的引用計數(shù)器加1,這樣ByteToMessageDecoder會刀下留人,先不釋放bufferIn闪唆。

粘包試驗

首先將SocketClientHandler類中的channelActive方法的實現(xiàn)改為

for (int i = 0; i < 20; i++) {
    UnpooledByteBufAllocator allocator = new UnpooledByteBufAllocator(false);
    ByteBuf buffer = allocator.buffer(20);
    buffer.writeInt(8);
    buffer.writeBytes("head".getBytes());
    buffer.writeBytes("body".getBytes());

    ctx.writeAndFlush(buffer);
}

客戶端發(fā)起20個請求到服務(wù)器端。

接著注釋掉SocketServerInitializer類中的

pipeline.addLast(new SelfDefineEncodeHandler());

代碼,使請求不走SelfDefineEncodeHandler解碼器钓葫。

運行代碼,執(zhí)行結(jié)果如下

BusinessServerHandler call count=1

說明客戶端發(fā)送了粘包,服務(wù)端只接收到一次請求∏睦伲現(xiàn)在把代碼調(diào)整回來,走SelfDefineEncodeHandler解碼器,運行代碼,執(zhí)行效果如下

decode call count=1
bufferIn.readableBytes()=240
beginIndex=0
BusinessServerHandler call count=1

decode call count=2
bufferIn.readableBytes()=228
beginIndex=12
BusinessServerHandler call count=2

decode call count=3
bufferIn.readableBytes()=216
beginIndex=24
BusinessServerHandler call count=3

decode call count=4
bufferIn.readableBytes()=204
beginIndex=36
BusinessServerHandler call count=4

decode call count=5
bufferIn.readableBytes()=192
beginIndex=48
BusinessServerHandler call count=5

decode call count=6
bufferIn.readableBytes()=180
beginIndex=60
BusinessServerHandler call count=6

decode call count=7
bufferIn.readableBytes()=168
beginIndex=72
BusinessServerHandler call count=7

decode call count=8
bufferIn.readableBytes()=156
beginIndex=84
BusinessServerHandler call count=8

decode call count=9
bufferIn.readableBytes()=144
beginIndex=96
BusinessServerHandler call count=9

decode call count=10
bufferIn.readableBytes()=132
beginIndex=108
BusinessServerHandler call count=10

decode call count=11
bufferIn.readableBytes()=120
beginIndex=120
BusinessServerHandler call count=11

decode call count=12
bufferIn.readableBytes()=108
beginIndex=132
BusinessServerHandler call count=12

decode call count=13
bufferIn.readableBytes()=96
beginIndex=144
BusinessServerHandler call count=13

decode call count=14
bufferIn.readableBytes()=84
beginIndex=156
BusinessServerHandler call count=14

decode call count=15
bufferIn.readableBytes()=72
beginIndex=168
BusinessServerHandler call count=15

decode call count=16
bufferIn.readableBytes()=60
beginIndex=180
BusinessServerHandler call count=16

decode call count=17
bufferIn.readableBytes()=48
beginIndex=192
BusinessServerHandler call count=17

decode call count=18
bufferIn.readableBytes()=36
beginIndex=204
BusinessServerHandler call count=18

decode call count=19
bufferIn.readableBytes()=24
beginIndex=216
BusinessServerHandler call count=19

decode call count=20
bufferIn.readableBytes()=12
beginIndex=228
BusinessServerHandler call count=20

結(jié)果符合預期,客戶端發(fā)送20次,服務(wù)端BusinessServerHandler類的channelRead執(zhí)行了20次。SelfDefineEncodeHandler類是如何做到這一點的呢础浮?還是得回頭仔細看看decode方法帆调。

第一部分代碼

if (bufferIn.readableBytes() < 4) {
            return;
}

如果接收到的字節(jié)還不到4個字節(jié),也即是連消息長度字段中的內(nèi)容都不完整的,直接return。

第二部分代碼

 int beginIndex = bufferIn.readerIndex();
 int length = bufferIn.readInt();

 if (bufferIn.readableBytes() < length) {
      bufferIn.readerIndex(beginIndex);
      return;
 }

由于客戶端發(fā)送了粘包,decode方法將會接收到一條聚合了多條業(yè)務(wù)消息的大消息豆同,因此bufferIn.readableBytes()肯定大于length, bufferIn的readerIndex不會被重置番刊。只是decode方法每被執(zhí)行一次,beginIndex將會遞增12,也即是(length+4)。

第三部分代碼

bufferIn.readerIndex(beginIndex + 4 + length);

對于粘包這種場景,這行代碼就不是表示將readerIndex升到最高,而是將readerIndex后移(length+4)位影锈,讓beginIndex遞增(length+4)芹务。

第四部分代碼

ByteBuf otherByteBufRef = bufferIn.slice(beginIndex, 4 + length);
otherByteBufRef.retain();
out.add(otherByteBufRef);

slice操作,目的是從大消息中截取出一條有效的業(yè)務(wù)消息。

參考的文章
Netty權(quán)威指南里沒有說到的Decoder編寫細節(jié)
Netty系列之Netty編解碼框架分析
Netty之有效規(guī)避內(nèi)存泄漏
Netty高性能編程備忘錄(下)

https://blog.csdn.net/linsongbin1/java/article/details/77915686

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鸭廷,一起剝皮案震驚了整個濱河市枣抱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌辆床,老刑警劉巖沃但,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異佛吓,居然都是意外死亡宵晚,警方通過查閱死者的電腦和手機垂攘,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來淤刃,“玉大人晒他,你說我怎么就攤上這事∫菁郑” “怎么了陨仅?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長铝侵。 經(jīng)常有香客問我灼伤,道長,這世上最難降的妖魔是什么咪鲜? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任狐赡,我火速辦了婚禮,結(jié)果婚禮上疟丙,老公的妹妹穿的比我還像新娘颖侄。我一直安慰自己,他們只是感情好享郊,可當我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布览祖。 她就那樣靜靜地躺著,像睡著了一般炊琉。 火紅的嫁衣襯著肌膚如雪展蒂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天苔咪,我揣著相機與錄音锰悼,去河邊找鬼。 笑死悼泌,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的夹界。 我是一名探鬼主播馆里,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼可柿!你這毒婦竟也來了鸠踪?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤复斥,失蹤者是張志新(化名)和其女友劉穎营密,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體目锭,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡评汰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年纷捞,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片被去。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡主儡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出惨缆,到底是詐尸還是另有隱情糜值,我是刑警寧澤,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布坯墨,位于F島的核電站寂汇,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏捣染。R本人自食惡果不足惜骄瓣,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望液斜。 院中可真熱鬧累贤,春花似錦、人聲如沸少漆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽示损。三九已至渗磅,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間检访,已是汗流浹背始鱼。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留脆贵,地道東北人医清。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像卖氨,于是被迫代替她去往敵國和親会烙。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,107評論 2 356