在之前的文章中,介紹了DelimiterBasedFrameDecoder
(基于特殊符號的編碼器)和FixedLengthFrameDecoder
定長編碼器)的用法.這篇文章主要是介紹使用LengthFieldBasedFrameDecoder解碼器自定義協(xié)議.通常,協(xié)議的格式如下:
image.png
通常來說,使用
ByteToMessageDocoder
這個編碼器,我們要分別解析出Header,length,body這幾個字段.而使用LengthFieldBasedFrameDecoder
,我們就可以直接接收想要的一部分,相當于在原來的基礎上包上了一層,有了這層之后,我們可以控制我們每次只要讀想讀的字段,這對于自定義協(xié)議來說十分方便.
1. LengthFieldBasedFrameDecoder
的定義
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
private static final int HEADER_SIZE = 6;
/**
*
* @param maxFrameLength 幀的最大長度
* @param lengthFieldOffset length字段偏移的地址
* @param lengthFieldLength length字段所占的字節(jié)長
* @param lengthAdjustment 修改幀數(shù)據(jù)長度字段中定義的值串结,可以為負數(shù) 因為有時候我們習慣把頭部記入長度,若為負數(shù),則說明要推后多少個字段
* @param initialBytesToStrip 解析時候跳過多少個長度
* @param failFast 為true驰贷,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false血巍,讀取完整個幀再報異
*/
public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//在這里調(diào)用父類的方法,實現(xiàn)指得到想要的部分,我在這里全部都要,也可以只要body部分
in = (ByteBuf) super.decode(ctx,in);
if(in == null){
return null;
}
if(in.readableBytes()<HEADER_SIZE){
throw new Exception("字節(jié)數(shù)不足");
}
//讀取type字段
byte type = in.readByte();
//讀取flag字段
byte flag = in.readByte();
//讀取length字段
int length = in.readInt();
if(in.readableBytes()!=length){
throw new Exception("標記的長度不符合實際長度");
}
//讀取body
byte []bytes = new byte[in.readableBytes()];
in.readBytes(bytes);
return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8"));
}
}
- 在上述的代碼中,調(diào)用父類的方法,實現(xiàn)截取到自己想要的字段.
2. 協(xié)議實體的定義
public class MyProtocolBean {
//類型 系統(tǒng)編號 0xA 表示A系統(tǒng),0xB 表示B系統(tǒng)
private byte type;
//信息標志 0xA 表示心跳包 0xC 表示超時包 0xC 業(yè)務信息包
private byte flag;
//內(nèi)容長度
private int length;
//內(nèi)容
private String content;
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public byte getFlag() {
return flag;
}
public void setFlag(byte flag) {
this.flag = flag;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public MyProtocolBean(byte flag, byte type, int length, String content) {
this.flag = flag;
this.type = type;
this.length = length;
this.content = content;
}
@Override
public String toString() {
return "MyProtocolBean{" +
"type=" + type +
", flag=" + flag +
", length=" + length +
", content='" + content + '\'' +
'}';
}
}
3.服務端的實現(xiàn)
public class Server {
private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大長度
private static final int LENGTH_FIELD_LENGTH = 4; //長度字段所占的字節(jié)數(shù)
private static final int LENGTH_FIELD_OFFSET = 2; //長度偏移
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private int port;
public Server(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
ch.pipeline().addLast(new ServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口殃饿,開始接收進來的連接
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server(port).start();
}
}
- 關(guān)于
LengthFieldBasedFrameDecoder
的各個字段的頭特別意義,可以參考這篇文章(http://www.voidcn.com/article/p-gzqzzsyz-bqy.html),在這里,我們只需要掌握
`
maxFrameLength: 幀的最大長度
lengthFieldOffset length: 字段偏移的地址
lengthFieldLength length;字段所占的字節(jié)長
lengthAdjustment: 修改幀數(shù)據(jù)長度字段中定義的值茄唐,可以為負數(shù) 因為有時候我們習慣把頭部記入長度,若為負數(shù),則說明要推后多少個字段
initialBytesToStrip: 解析時候跳過多少個長度
failFast; 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常舀锨,為false岭洲,讀取完整個幀再報異
- 只需要把
MyProtocolDecoder
加入pipeline中就可以.但是得在Handler之前.
- 服務端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyProtocolBean myProtocolBean = (MyProtocolBean)msg; //直接轉(zhuǎn)化成協(xié)議消息實體
System.out.println(myProtocolBean.getContent());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
}
* 服務端Handler沒什么特別的地方,只是輸出接收到的消息
5. 客戶端和客戶端Handler
public class Client {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
客戶端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty");
ctx.writeAndFlush(myProtocolBean);
}
}
- 客戶端Handler實現(xiàn)發(fā)送消息.
客戶端編碼器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> {
@Override
protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception {
if(msg == null){
throw new Exception("msg is null");
}
out.writeByte(msg.getType());
out.writeByte(msg.getFlag());
out.writeInt(msg.getLength());
out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8")));
}
}
- 編碼的時候,只需要按照定義的順序依次寫入到ByteBuf中.
6. 總結(jié)
通過以上的消息,我們能夠在客戶端和服務端實現(xiàn)自定義協(xié)議的交互.其實,在以上的過程中,如果我們把
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
改成
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,6,false));
那么我們就可以直接跳過Header和length字段,從而解碼的時候,就只需要對body字段解碼就行.其他字段跳過就忽略了.這也就是調(diào)用父類的方法的原因(使最后拿到的ByteBuf只有body部分,而沒有其他部分).
參考文章
一起學Netty(九)之LengthFieldBasedFrameDecoder
Netty的LengthFieldBasedFrameDecoder使用