最近需要做一個長連接的設(shè)備管理厌漂,使用netty可以方便的做到,還可以配置心跳及解碼器
自定義長度解碼器
LengthFieldBasedFrameDecoder解碼器自定義長度解決TCP粘包黏包問題。所以又稱為: 自定義長度解碼器
TCP粘包和黏包現(xiàn)象
TCP粘包是指發(fā)送方發(fā)送的若干個數(shù)據(jù)包到接收方時粘成一個包。從接收緩沖區(qū)來看炭分,后一個包數(shù)據(jù)的頭緊接著前一個數(shù)據(jù)的尾蒋院。
當(dāng)TCP連接建立后沉颂,Client發(fā)送多個報文給Server,TCP協(xié)議保證數(shù)據(jù)可靠性悦污,但無法保證Client發(fā)了n個包铸屉,服務(wù)端也按照n個包接收。Client端發(fā)送n個數(shù)據(jù)包切端,Server端可能收到n-1或n+1個包彻坛。
為什么出現(xiàn)粘包現(xiàn)象?
發(fā)送方原因: TCP默認(rèn)會使用Nagle算法踏枣。而Nagle算法主要做兩件事:1)只有上一個分組得到確認(rèn)昌屉,才會發(fā)送下一個分組;2)收集多個小分組茵瀑,在一個確認(rèn)到來時一起發(fā)送间驮。所以,正是Nagle算法造成了發(fā)送方有可能造成粘包現(xiàn)象马昨。
接收方原因: TCP接收方采用緩存方式讀取數(shù)據(jù)包竞帽,一次性讀取多個緩存中的數(shù)據(jù)包。自然出現(xiàn)前一個數(shù)據(jù)包的尾和后一個收據(jù)包的頭粘到一起鸿捧。
如何解決粘包現(xiàn)象
就是要選擇相應(yīng)的解碼器
- 添加特殊符號屹篓,接收方通過這個特殊符號將接收到的數(shù)據(jù)包拆分開 - DelimiterBasedFrameDecoder特殊分隔符解碼器
- 每次發(fā)送固定長度的數(shù)據(jù)包 - FixedLengthFrameDecoder定長編碼器
- 在消息頭中定義長度字段,來標(biāo)識消息的總長度 - LengthFieldBasedFrameDecoder自定義長度解碼器
LengthFieldBasedFrameDecoder參數(shù)
自定義長度解碼器匙奴,所以構(gòu)造函數(shù)中6個參數(shù)堆巧,基本都圍繞那個定義長度域,進行的描述。
- maxFrameLength - 發(fā)送的數(shù)據(jù)幀最大長度
- lengthFieldOffset - 定義長度域位于發(fā)送的字節(jié)數(shù)組中的下標(biāo)谍肤。換句話說:發(fā)送的字節(jié)數(shù)組中下標(biāo)為${lengthFieldOffset}的地方是長度域的開始地方
- lengthFieldLength - 用于描述定義的長度域的長度啦租。換句話說:發(fā)送字節(jié)數(shù)組bytes時, 字節(jié)數(shù)組bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength]域?qū)?yīng)于的定義長度域部分
- lengthAdjustment - 滿足公式: 發(fā)送的字節(jié)數(shù)組bytes.length - lengthFieldLength = bytes[lengthFieldOffset, lengthFieldOffset+lengthFieldLength] + lengthFieldOffset + lengthAdjustment
- initialBytesToStrip - 接收到的發(fā)送數(shù)據(jù)包,去除前initialBytesToStrip位
- failFast - true: 讀取到長度域超過maxFrameLength荒揣,就拋出一個 TooLongFrameException刷钢。false: 只有真正讀取完長度域的值表示的字節(jié)之后,才會拋出 TooLongFrameException乳附,默認(rèn)情況下設(shè)置為true内地,建議不要修改,否則可能會造成內(nèi)存溢出
- ByteOrder - 數(shù)據(jù)存儲采用大端模式或小端模式
舉例解釋參數(shù)如何寫
客戶端多次發(fā)送"HELLO, WORLD"字符串給服務(wù)端赋除。"HELLO, WORLD"共12字節(jié)(12B)阱缓。長度域中的內(nèi)容是16進制的值,如下:
0x000c -----> 12
0x000e -----> 14
場景1
數(shù)據(jù)包大小: 14B = 長度域2B + "HELLO, WORLD"
解釋:
如上圖举农,長度域的值為12B(0x000c)荆针。希望解碼后保持一樣,根據(jù)上面的公式,參數(shù)應(yīng)該為:
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0 = 數(shù)據(jù)包長度(14) - lengthFieldOffset - lengthFieldLength - 長度域的值(12)
initialBytesToStrip = 0 - 解碼過程中颁糟,沒有丟棄任何數(shù)據(jù)
場景2
數(shù)據(jù)包大小: 14B = 長度域2B + "HELLO, WORLD"
解釋:
上圖中航背,解碼后,希望丟棄長度域2B字段棱貌,所以玖媚,只要initialBytesToStrip = 2即可。其他與場景1相同
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0 = 數(shù)據(jù)包長度(14) - lengthFieldOffset - lengthFieldLength - 長度域的值(12)
initialBytesToStrip = 2 解碼過程中婚脱,丟棄2個字節(jié)的數(shù)據(jù)
場景3
數(shù)據(jù)包大小: 14B = 長度域2B + "HELLO, WORLD"今魔。與場景1不同的是:場景3中長度域的值為14(0x000E)
解釋:
如上圖,長度域的值為14(0x000E)障贸。希望解碼后保持一樣错森,根據(jù)上面的公式,參數(shù)應(yīng)該為:
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2 = 數(shù)據(jù)包長度(14) - lengthFieldOffset - lengthFieldLength - 長度域的值(14)
initialBytesToStrip = 0 - 解碼過程中篮洁,沒有丟棄任何數(shù)據(jù)
場景4
場景4在長度域前添加2個字節(jié)的Header涩维。長度域的值(0x00000C) = 12≡ǎ總數(shù)據(jù)包長度: 17=Header(2B) + 長度域(3B) + "HELLO, WORLD"
解釋
如上圖瓦阐。編碼解碼后,長度保持一致锋叨,所以initialBytesToStrip = 0垄分。參數(shù)應(yīng)該為:
lengthFieldOffset = 2
lengthFieldLength = 3
lengthAdjustment = 0 = 數(shù)據(jù)包長度(17) - lengthFieldOffset(2) - lengthFieldLength(3) - 長度域的值(12)
initialBytesToStrip = 0 - 解碼過程中,沒有丟棄任何數(shù)據(jù)
場景5
與場景4不同的地方是: Header與長度域的位置換了娃磺。總數(shù)據(jù)包長度: 17=長度域(3B) + Header(2B) + "HELLO, WORLD"
解釋
如上圖叫倍。編碼解碼后偷卧,長度保持一致豺瘤,所以initialBytesToStrip = 0。參數(shù)應(yīng)該為:
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 = 數(shù)據(jù)包長度(17) - lengthFieldOffset(0) - lengthFieldLength(3) - 長度域的值(12)
initialBytesToStrip = 0 - 解碼過程中听诸,沒有丟棄任何數(shù)據(jù)
場景6
如下圖坐求,"HELLO, WORLD"域前有多個字段∩卫妫總數(shù)據(jù)長度: 16 = HEADER1(1) + 長度域(2) + HEADER2(1) + "HELLO, WORLD"
lengthFieldOffset = 1
lengthFieldLength = 2
lengthAdjustment = 1 = 數(shù)據(jù)包長度(16) - lengthFieldOffset(1) - lengthFieldLength(2) - 長度域的值(12)
initialBytesToStrip = 0 - 解碼過程中桥嗤,沒有丟棄任何數(shù)據(jù)
自定義協(xié)議
很多時候并不能按照以上參數(shù)的方式去解析數(shù)據(jù),所以需要自定義協(xié)議仔蝌,LengthFieldBasedFrameDecoder解碼器自定義協(xié)議.通常,協(xié)議的格式如下:
通常來說,使用
ByteToMessageDocoder
這個編碼器,我們要分別解析出Header,length,body這幾個字段.而使用LengthFieldBasedFrameDecoder
,我們就可以直接接收想要的一部分,相當(dāng)于在原來的基礎(chǔ)上包上了一層,有了這層之后,我們可以控制我們每次只要讀想讀的字段,這對于自定義協(xié)議來說十分方便.
- MyProtocolDecoder的定義
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
private static final int HEADER_SIZE = 6;
/**
*
* @param maxFrameLength 幀的最大長度
* @param lengthFieldOffset length字段偏移的地址
* @param lengthFieldLength length字段所占的字節(jié)長
* @param lengthAdjustment 修改幀數(shù)據(jù)長度字段中定義的值泛领,可以為負(fù)數(shù) 因為有時候我們習(xí)慣把頭部記入長度,若為負(fù)數(shù),則說明要推后多少個字段
* @param initialBytesToStrip 解析時候跳過多少個長度
* @param failFast 為true,當(dāng)frame長度超過maxFrameLength時立即報TooLongFrameException異常敛惊,為false渊鞋,讀取完整個幀再報異
*/
public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//在這里調(diào)用父類的方法,實現(xiàn)指得到想要的部分,我在這里全部都要,也可以只要body部分
in = (ByteBuf) super.decode(ctx,in);
if(in == null){
return null;
}
if(in.readableBytes()<HEADER_SIZE){
throw new Exception("字節(jié)數(shù)不足");
}
//讀取type字段
byte type = in.readByte();
//讀取flag字段
byte flag = in.readByte();
//讀取length字段
int length = in.readInt();
if(in.readableBytes()!=length){
throw new Exception("標(biāo)記的長度不符合實際長度");
}
//讀取body
byte []bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
}
}
在上述的代碼中,調(diào)用父類的方法瞧挤,實現(xiàn)截取到自己想要的字段锡宋,如可以判斷數(shù)據(jù)必須以xx開頭。
- 協(xié)議實體的定義
public class MyProtocolBean {
//類型 系統(tǒng)編號 0xA 表示A系統(tǒng)特恬,0xB 表示B系統(tǒng)
private byte type;
//信息標(biāo)志 0xA 表示心跳包 0xC 表示超時包 0xC 業(yè)務(wù)信息包
private byte flag;
//內(nèi)容長度
private int length;
//內(nèi)容
private String content;
public MyProtocolBean(byte flag, byte type, int length, String content) {
this.flag = flag;
this.type = type;
this.length = length;
this.content = content;
}
}
3.服務(wù)端的實現(xiàn)
public class Server {
private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大長度
private static final int LENGTH_FIELD_LENGTH = 4; //長度字段所占的字節(jié)數(shù)
private static final int LENGTH_FIELD_OFFSET = 2; //長度偏移
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private int port;
public Server(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
ch.pipeline().addLast(new ServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口执俩,開始接收進來的連接
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server(port).start();
}
}
- 服務(wù)端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyProtocolBean myProtocolBean = (MyProtocolBean)msg; //直接轉(zhuǎn)化成協(xié)議消息實體
System.out.println(myProtocolBean.getContent());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
}
服務(wù)端Handler沒什么特別的地方,只是輸出接收到的消息
- 客戶端
public class Client {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
- 客戶端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
ctx.writeAndFlush(myProtocolBean);
}
}
客戶端Handler實現(xiàn)發(fā)送消息.
- 客戶端編碼器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {
@Override
protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
if(msg == null){
throw new Exception("msg is null");
}
out.writeByte(msg.getType());
out.writeByte(msg.getFlag());
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
}
}
編碼的時候,只需要按照定義的順序依次寫入到ByteBuf中.
小結(jié)
若是上面的參數(shù)直接可以滿足要求,可以直接使用參數(shù)癌刽,若不可以則通過自定義的方式去實現(xiàn)奠滑,更加靈活。