Netty中雙方建立通信之后,對象數(shù)據(jù)會按照ByteBuf字節(jié)碼的方式進行傳輸娃善。
自定義一種通信協(xié)議焚虱,協(xié)議將傳輸數(shù)據(jù)定義了消息頭和消息正文订框。
管道中傳遞LuckMessage對象,LuckMessage中定義了消息頭LuckHeader和消息正文content晨另。消息頭header包括version潭千,contentLength,sessionId借尿。
消息定義:
// 消息的頭部
public class LuckHeader {
// 協(xié)議版本
private int version;
// 消息內(nèi)容長度
private int contentLength;
// 服務(wù)名稱
private String sessionId;
public LuckHeader(int version, int contentLength, String sessionId) {
this.version = version;
this.contentLength = contentLength;
this.sessionId = sessionId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}
// 消息的主體
public class LuckMessage {
private LuckHeader luckHeader;
private String content;
public LuckMessage(LuckHeader luckHeader, String content) {
this.luckHeader = luckHeader;
this.content = content;
}
public LuckHeader getLuckHeader() {
return luckHeader;
}
public void setLuckHeader(LuckHeader luckHeader) {
this.luckHeader = luckHeader;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]",
luckHeader.getVersion(),
luckHeader.getContentLength(),
luckHeader.getSessionId(),
content);
}
}
服務(wù)端代碼:
public class LuckServer {
public static void main(String args[]) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些屬性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一個NIO連接通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new LuckServerInitializer());
// 綁定對應(yīng)的端口號,并啟動開始監(jiān)聽端口上的連接
Channel ch = serverBootstrap.bind(8899).sync().channel();
System.out.printf("luck協(xié)議啟動地址:127.0.0.1:%d/\n", 8899);
// 等待關(guān)閉,同步端口
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服務(wù)端初始化連接:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class LuckServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LuckEncoder());
pipeline.addLast(new LuckDecoder());
// 添加邏輯控制層
pipeline.addLast(new LuckServerHandler());
}
}
編碼Handler:
package com.zhihao.miao.test.day08;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class LuckEncoder extends MessageToByteEncoder<LuckMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf out) throws Exception {
// 將Message轉(zhuǎn)換成二進制數(shù)據(jù)
LuckHeader header = message.getLuckHeader();
// 這里寫入的順序就是協(xié)議的順序.
// 寫入Header信息
out.writeInt(header.getVersion());
out.writeInt(message.getContent().length());
out.writeBytes(header.getSessionId().getBytes());
// 寫入消息主體信息
out.writeBytes(message.getContent().getBytes());
}
}
解碼器Handler:
package com.zhihao.miao.test.day08;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class LuckDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 獲取協(xié)議的版本
int version = in.readInt();
// 獲取消息長度
int contentLength = in.readInt();
// 獲取SessionId
byte[] sessionByte = new byte[36];
in.readBytes(sessionByte);
String sessionId = new String(sessionByte);
// 組裝協(xié)議頭
LuckHeader header = new LuckHeader(version, contentLength, sessionId);
// 讀取消息內(nèi)容刨晴,這邊demo中不對
byte[] contentbys = new byte[in.readableBytes()];
in.readBytes(contentbys);
String content = new String(contentbys);
LuckMessage message = new LuckMessage(header, content);
out.add(message);
}
}
自定義服務(wù)端handler處理器:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class LuckServerHandler extends SimpleChannelInboundHandler<LuckMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LuckMessage msg) throws Exception {
// 簡單地打印出server接收到的消息
System.out.println(msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("service exception:"+cause.getMessage());
}
}
客戶端:
package com.zhihao.miao.test.day08;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.UUID;
public class LuckClient {
public static void main(String args[]) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new LuckServerInitializer());
// Start the connection attempt.
Channel ch = b.connect("127.0.0.1", 8899).sync().channel();
int version = 1;
String sessionId = UUID.randomUUID().toString();
String content = "I'm the luck protocol!";
LuckHeader header = new LuckHeader(version, content.length(), sessionId);
LuckMessage message = new LuckMessage(header, content);
ch.writeAndFlush(message);
ch.close();
} finally {
group.shutdownGracefully();
}
}
}
客戶端初始化連接:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class LuckClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 添加編解碼器, 由于ByteToMessageDecoder的子類無法使用@Sharable注解,
// 這里必須給每個Handler都添加一個獨立的Decoder.
pipeline.addLast(new LuckEncoder());
pipeline.addLast(new LuckDecoder());
pipeline.addLast(new LuckClientHandler());
}
}
客戶端自定義handler:
package com.zhihao.miao.test.day08;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class LuckClientHandler extends SimpleChannelInboundHandler<LuckMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LuckMessage message) throws Exception {
System.out.println(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception:"+cause.getMessage());
}
}
啟動服務(wù)器和客戶端,服務(wù)器端控制臺打勇贩:
七月 04, 2017 5:22:27 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x9df966cc, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0x99c6480a, L:/127.0.0.1:8899 - R:/127.0.0.1:55722]
七月 04, 2017 5:22:27 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x9df966cc, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
[version=1,contentLength=22,sessionId=c9345f67-99b6-46d2-97ff-eef853c9d569,content=I'm the luck protocol!]