要想了解Netty叽唱,我們還是需要看Netty的基礎(chǔ)使用
這里使用的netty
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.17.Final</version>
</dependency>
1.客戶端
public class NettyClient {
/*IP地址*/
private static final String HOST = "127.0.0.1";
/*端口號(hào)*/
private static final int PORT1 = 9091;
public static void main(String[] args) throws Exception {
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
byte[] bab5BBS = BinaryTransferUtils.hex2Bytes("BAB5BB");
Bootstrap b = new Bootstrap();//客戶端
ByteBuf buf = Unpooled.copiedBuffer(bab5BBS);
b.group(workGroup)
.channel(NioSocketChannel.class)//客戶端 -->NioSocketChannel
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {//handler
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline()
.addLast("delimiter",new DelimiterBasedFrameDecoder( 100000000,false,buf)) //解碼分隔符
.addLast("decoder", new ByteArrayDecoder()) //解碼器
.addLast("encoder", new StringEncoder()) //編碼器
.addLast(new ClientHandler()); //客戶端處理器伤疙,具體處理細(xì)節(jié)
}
});
//創(chuàng)建異步連接 可添加多個(gè)端口
ChannelFuture cf1 = b.connect(HOST, PORT1).sync();
cf1.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
}
//處理器
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
private static final String id= "123";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String send = "{'type':'add','appid':'" + id+ "'}";
byte[] data = send.getBytes();
ByteBuf firstMessage = Unpooled.buffer();
firstMessage.writeBytes(data);
ctx.writeAndFlush(firstMessage);
System.out.println("客戶端發(fā)送消息:" + send);
}
@Override
public void channelRead0(ChannelHandlerContext ctx,Object msg) throws Exception {
System.out.println("接收到客戶端 發(fā)送消息:"+msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
2.服務(wù)端
public class NettyApplication {
private static final Logger logger = LoggerFactory.getLogger(WmledApplication.class);
private static final int PORT = 9091;
public static void main(String[] args) {
EventLoopGroup boosGroup = new NioEventLoopGroup(); //處理連接
EventLoopGroup workerGroup = new NioEventLoopGroup(100); //處理網(wǎng)絡(luò)IO
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//option對(duì)應(yīng)nio中 ServerSocketChannel設(shè)置的參數(shù)
//childOption對(duì)應(yīng)nio中SocketChannel設(shè)置的參數(shù)
.option(ChannelOption.SO_BACKLOG, 2048) //連接數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ServerInitializer()) //服務(wù)端初始化信息
// 服務(wù)器綁定端口監(jiān)聽(tīng)
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
logger.info("----netty服務(wù)已經(jīng)啟動(dòng),端口:" + PORT + "----------");
// 監(jiān)聽(tīng)服務(wù)器關(guān)閉監(jiān)聽(tīng)
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("--- netty服務(wù)異常 ---", e);
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerInitializer 服務(wù)器初始化類
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void initChannel(SocketChannel arg0){
//ChannelPipeline 可以理解為消息傳送通道 通道一旦建立 持續(xù)存在
ChannelPipeline channelPipeline = arg0.pipeline();
//為通道添加功能
ByteBuf buf = Unpooled.copiedBuffer("}".getBytes());//自定義拆包字符,用“}”做拆包
channelPipeline.addLast("delimiter",
new DelimiterBasedFrameDecoder(1024,false,buf));
//字符串解碼 編碼
channelPipeline.addLast("decoder", new StringDecoder());
channelPipeline.addLast("encoder",new ByteArrayEncoder());
//添加自主邏輯,這里我是用spring管理的這個(gè)serverhandler
channelPipeline.addLast(SpringUtil.getBean(ServerHandler.class));
}
}
IO事件處理之ServerHandler處理器
public class ServerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class);
@Autowired
private MessageHandler messageHandler;
@Override
protected void channelRead0(ChannelHandlerContext arg0, String json) {
messageHandler.handle(arg0,json); //通過(guò)自定義的handler來(lái)處理數(shù)據(jù)
}
/**
* channel被激活時(shí)調(diào)用
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
logger.info("檢測(cè)到上線 待加入 【{}】", ctx.channel().remoteAddress());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("【{}】機(jī)器下線 " +ctx.channel().remoteAddress());
ctx.close();
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
logger.info(cause.getMessage() + "---------" + ctx.toString());
}
上面客戶端和服務(wù)端分別啟動(dòng)起來(lái)就可以相互通信了。
3.拆包粘包問(wèn)題
熟悉tcp就知道我們的業(yè)務(wù)數(shù)據(jù)不一定會(huì)按照我們想象的作為某一個(gè)整體發(fā)送,一個(gè)完整的包可能會(huì)被拆分為幾個(gè)包發(fā)送动猬,接受的時(shí)候可能就會(huì)出現(xiàn)獲取部分包的情況,所以就會(huì)出現(xiàn)半包表箭,粘包的情況赁咙。
Netty提供多種處理拆包和粘包的的解碼器:
- 消息固定長(zhǎng)度,消息達(dá)到某一個(gè)長(zhǎng)度就表示讀取到了一個(gè)完整的長(zhǎng)度
-- netty提供方案:FixedLengthFrameDecoder - 將回車換行符("\n","\t\n")作為消息結(jié)束符免钻,比如ftp協(xié)議
-- netty提供方案:LineBasedFrameDecoder - 某種特殊的分隔符作為結(jié)束標(biāo)志
-- netty提供方案:DelimiterBasedFrameDecoder - 通過(guò)在消息頭定義長(zhǎng)度字段表示消息的總長(zhǎng)度
其他一些編碼解碼:
比如StringDecoder,StringEncoder,ByteArrayEncoder等彼水,在通道處理過(guò)程中會(huì)自動(dòng)做編碼處理。
4.序列化
序列化:是把對(duì)象的狀態(tài)信息轉(zhuǎn)化為可存儲(chǔ)或傳輸?shù)男问竭^(guò)程伯襟,也就是把對(duì)象轉(zhuǎn)化為字節(jié)序列的過(guò)程稱為對(duì)象的序列化
反序列化:是序列化的逆向過(guò)程猿涨,把字節(jié)數(shù)組反序列化為對(duì)象,把字節(jié)序
列恢復(fù)為對(duì)象的過(guò)程成為對(duì)象的反序列化
目前存在多種序列化的方式姆怪,比如:
- Java自帶序列化:netty提供ObjectEncoder和ObjectDecoder對(duì)java做序列化處理
- xml
- json
- hession序列化框架
- Protobuf序列化
netty提供:ProtobufDecoder叛赚,ProtobufEncoder,ProtobufVarint32LengthFieldPrepender(某種字節(jié)處理稽揭,便于分割每一條俺附,后面這個(gè)處理這個(gè)半包),ProtobufVarint32FrameDecoder(半包處理)溪掀,
-........
5.相關(guān)協(xié)議開(kāi)發(fā)
Netty支持Http事镣,Websocket,UDP揪胃,自定義協(xié)議開(kāi)發(fā)璃哟,具體怎么使用可以從網(wǎng)絡(luò)上獲取
6.Netty行業(yè)應(yīng)用
Netty基于高性能的異步通信氛琢,常常會(huì)用在 rpc通信,大數(shù)據(jù)随闪,游戲端等場(chǎng)景中阳似;我們常見(jiàn)的比如Dubbo,RocketMQ铐伴,大數(shù)據(jù)中Avro等都有使用Netty做
節(jié)點(diǎn)點(diǎn)的通信撮奏。