- Netty 應用實例:群聊系統(tǒng)互例。
- 要求:①實現(xiàn)多人群聊粗井;②服務器端:可以監(jiān)測用戶上線枝哄,離線,并實現(xiàn)消息轉發(fā)功能键科;③客戶端:通過 channel 可以無阻塞發(fā)送消息給其它所有用戶闻丑,同時可以接受其它用戶發(fā)送的消息(有服務器轉發(fā)得到)。
- GroupChatServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //監(jiān)聽端口
public GroupChatServer(int port) {
this.port = port;
}
//編寫run方法勋颖,處理客戶端的請求
public void run() throws Exception {
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//獲取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解碼器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入編碼器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的業(yè)務處理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服務器啟動...");
ChannelFuture channelFuture = b.bind(port).sync();
//監(jiān)聽關閉
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
- GroupChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//public static List<Channel> channels = new ArrayList<Channel>();
//使用一個hashMap 管理
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定義一個channel 組嗦嗡,管理所有的channel
//GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器(單例)
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示連接一旦建立,第一個被執(zhí)行
//將當前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//將該客戶加入聊天的信息推送給其它在線的客戶端
//該方法會將 channelGroup 中所有的channel 都遍歷一次饭玲,并發(fā)送消息侥祭,所以我們不需要自己遍歷
channelGroup.writeAndFlush("[客戶端]:" + channel.remoteAddress() + " 加入聊天:" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
//斷開連接,將xx客戶離開信息推送給當前在線的客戶
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客戶端]:" + channel.remoteAddress() + " 離開了\n");
System.out.println("channelGroup size:" + channelGroup.size());
}
//表示channel 處于活動狀態(tài),提示 xxx 上線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上線了~");
}
//表示channel 處于不活動狀態(tài),提示 xxx離線了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 離線了~");
}
//讀取數(shù)據(jù)
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//獲取到當前channel
Channel channel = ctx.channel();
//這時我們遍歷channelGroup, 根據(jù)不同的情況殊者,回送不同的消息
channelGroup.forEach(ch -> {
//若不是當前的channel鲸沮,則轉發(fā)消息
if (channel != ch) {
ch.writeAndFlush("[客戶端]:" + channel.remoteAddress() + " 發(fā)送了消息:" + msg + "\n");
} else {//回顯自己發(fā)送的消息給自己
ch.writeAndFlush("[自己]發(fā)送了消息:" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉通道
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//屬性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相關handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定義的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress() + "--------");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通過channel 發(fā)送到服務器端
channel.writeAndFlush(msg + "\r\n");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
- GroupChatClientHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
- Netty 應用實例:心跳機制檢測往扔。
- 要求:當服務器超過 3 秒沒有讀時,就提示讀空閑;當服務器超過 5 秒沒有寫操作時,就提示寫空閑琼牧,實現(xiàn)當服務器超過 7 秒沒有讀或者寫操作時恢筝,就提示讀寫空閑。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception {
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); //增加一個日志處理器
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一個netty 提供 IdleStateHandler
/*
1巨坊、IdleStateHandler 是netty 提供的處理空閑狀態(tài)的處理器
2撬槽、long readerIdleTime:表示多長時間沒有讀,就會發(fā)送一個心跳檢測包檢測是否連接
3抱究、long writerIdleTime:表示多長時間沒有寫恢氯,就會發(fā)送一個心跳檢測包檢測是否連接
4、long allIdleTime:表示多長時間沒有讀寫鼓寺,就會發(fā)送一個心跳檢測包檢測是否連接
5、文檔說明:triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.
6勋磕、當 IdleStateEvent 觸發(fā)后妈候,就會傳遞給管道 的下一個handler去處理
通過調(diào)用(觸發(fā))下一個handler 的 userEventTriggered,在該方法中去處理 IdleStateEvent(讀空閑挂滓,寫空閑苦银,讀寫空閑)
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
//加入一個對空閑檢測進一步處理的handler(自定義)
pipeline.addLast(new MyServerHandler());
}
});
//啟動服務器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
//將 evt 向下轉型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "讀空閑";
break;
case WRITER_IDLE:
eventType = "寫空閑";
break;
case ALL_IDLE:
eventType = "讀寫空閑";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType);
System.out.println("服務器做相應處理...");
//若發(fā)生空閑,則關閉通道
//ctx.channel().close();
}
}
}
- Netty 應用實例:Netty 通過 WebSocket 編程實現(xiàn)服務器和客戶端長連接赶站。
- 要求:實現(xiàn)基于 WebSocket 的長連接的全雙工地交互幔虏,通過改變 Http 協(xié)議多次請求的約束,實現(xiàn)了長連接贝椿,服務器可以發(fā)送消息給瀏覽器想括。客戶端瀏覽器和服務器端會相互感知烙博,比如服務器關閉了瑟蜈,瀏覽器會感知,同樣瀏覽器關閉了渣窜,服務器會感知铺根。
- hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判斷當前瀏覽器是否支持websocket
if (window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello");
//相當于channelReado,ev收到服務器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
};
//相當于連接開啟(感知到連接開啟)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "連接開啟了.."
};
//相當于連接關閉(感知到連接關閉)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "連接關閉了.."
}
} else {
alert("當前瀏覽器不支持websocket...")
}
//發(fā)送消息到服務器
function send(message) {
if (!window.socket) { //先判斷socket是否創(chuàng)建好
return;
}
if (socket.readyState == WebSocket.OPEN) {
//通過socket 發(fā)送消息
socket.send(message)
} else {
alert("連接沒有開啟...");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="發(fā)生消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空內(nèi)容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class MyServer {
public static void main(String[] args) throws Exception {
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因為基于http協(xié)議乔宿,所以要使用http的編碼和解碼器
pipeline.addLast(new HttpServerCodec());
//數(shù)據(jù)以塊方式寫位迂,需要添加ChunkedWriteHandler處理器
pipeline.addLast(new ChunkedWriteHandler());
/*
1、http數(shù)據(jù)在傳輸過程中是分段详瑞,HttpObjectAggregator掂林,就是可以將多個段聚合
2、這就是為什么當瀏覽器發(fā)送大量數(shù)據(jù)時蛤虐,就會發(fā)出多次http請求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
1党饮、對應websocket,它的數(shù)據(jù)是以幀(frame) 形式傳遞
2驳庭、WebSocketFrame 有六個實現(xiàn)子類
3刑顺、瀏覽器請求 ws://localhost:7000/xxx:表示請求的uri
4氯窍、WebSocketServerProtocolHandler 核心功能是將http協(xié)議升級為ws協(xié)議(通過一個狀態(tài)碼 101來切換),保持長連接狀態(tài)
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
//自定義的handler蹲堂,處理業(yè)務邏輯
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
//啟動服務器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- MyTextWebSocketFrameHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
//這里 TextWebSocketFrame 類型狼讨,表示一個文本幀(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服務器收到消息:" + msg.text());
//回復消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器時間:" + LocalDateTime.now() + " " + msg.text()));
}
//當web客戶端連接后,handlerAdded 方法被觸發(fā)執(zhí)行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id 表示唯一的值柒竞,LongText 是唯一的政供,ShortText 不是唯一
System.out.println("handlerAdded 被調(diào)用:" + ctx.channel().id().asLongText());
System.out.println("handlerAdded 被調(diào)用:" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被調(diào)用:" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("異常發(fā)生 " + cause.getMessage());
ctx.close(); //關閉連接
}
}
- 編寫網(wǎng)絡應用程序時,因為數(shù)據(jù)在網(wǎng)絡中傳輸?shù)亩际嵌M制字節(jié)碼數(shù)據(jù)朽基,所以在發(fā)送數(shù)據(jù)時就需要編碼布隔,接收數(shù)據(jù)時就需要解碼。
- codec(編解碼器)的組成部分有兩個:
decoder
(解碼器)和 encoder
(編碼器)稼虎。encoder 負責把業(yè)務數(shù)據(jù)轉換成字節(jié)碼數(shù)據(jù)衅檀,decoder 負責把字節(jié)碼數(shù)據(jù)轉換成業(yè)務數(shù)據(jù)。
- Netty 提供的編碼器:
StringEncoder
:對字符串數(shù)據(jù)進行編碼霎俩;ObjectEncoder
:對Java對象進行編碼哀军。
- Netty 提供的解碼器:
StringDecoder
:對字符串數(shù)據(jù)進行解碼;ObjectDecoder
:對 Java 對象進行解碼打却。
- Netty 本身自帶的
ObjectDecoder
和ObjectEncoder
可以用來實現(xiàn) POJO 對象或各種業(yè)務對象的編碼和解碼杉适,底層使用的仍是Java序列化技術,而Java序列化技術本身效率就不高柳击,存在如下問題:①無法跨語言猿推;②序列化后的體積太大,是二進制編碼的5倍多腻暮;③序列化性能太低彤守。
-
Protobuf(Google Protocol Buffers)
是一種輕便高效的結構化數(shù)據(jù)存儲格式,可以用于結構化數(shù)據(jù)串行化(序列化)哭靖。它很適合做數(shù)據(jù)存儲或 RPC 數(shù)據(jù)交換格式(遠程過程調(diào)用 remote procedure call)具垫。目前很多公司使用的數(shù)據(jù)交換技術:http + json tcp + protobuf。
- Protobuf 是以
message
的方式來管理數(shù)據(jù)的试幽,并且支持跨平臺筝蚕、跨語言。使用 protobuf 編譯器能自動生成代碼铺坞,Protobuf 是將類的定義使用.proto
文件進行描述起宽。
- Protobuf 入門案例:要求:客戶端可以隨機發(fā)送
StudentPoJo / WorkerPoJo
對象到服務器,服務端能接StudentPoJo / WorkerPoJo
對象(需要判斷是哪種類型)济榨,并顯示信息(通過 Protobuf 解碼)坯沪。
- 導入protobuf依賴:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.13.0</version>
</dependency>
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package = "com.zzw.netty.codec2"; //指定生成到哪個包下
option java_outer_classname = "MyDataInfo"; // 外部類名, 文件名
//protobuf 可以使用 message 管理其他的message
message MyMessage {
//定義一個枚舉類型
enum DataType {
StudentType = 0; //在proto3中要求enum的編號從0開始
WorkerType = 1;
}
//用data_type 來標識傳的是哪一個枚舉類型
DataType data_type = 1;
//表示每次枚舉類型最多只能出現(xiàn)其中的一個,節(jié)省空間
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student類的屬性
string name = 2;
}
message Worker {
string name = 1;
int32 age = 2;
}
- 下載好protoc編譯器擒滑,配置好環(huán)境變量腐晾,然后編譯
Student.proto
文件并在當前路徑下存放生成的文件MyDataInfo.java
:protoc --java_out=. Student.proto
叉弦。
- NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
//創(chuàng)建BossGroup 和 WorkerGroup
//說明
//1、創(chuàng)建兩個線程組 bossGroup 和 workerGroup
//2藻糖、bossGroup 只是處理連接請求淹冰,真正的客戶端業(yè)務處理是會交給 workerGroup 完成
//3、兩個都是無限循環(huán)
//4巨柒、bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數(shù):默認值為 cpu核數(shù) * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個
try {
//創(chuàng)建服務器端的啟動對象樱拴,配置參數(shù)
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈式編程來進行設置
bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作為服務器的通道實現(xiàn)
.option(ChannelOption.SO_BACKLOG, 128) //設置線程隊列等待連接的個數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態(tài)
//.handler(null) // 該 handler對應 bossGroup , childHandler 對應 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { //創(chuàng)建一個通道初始化對象(匿名對象)
//給 pipeline 設置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtobufDecoder
//指定對哪種對象進行解碼
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler()); //給 workerGroup 的 EventLoop 對應的管道設置處理器
}
});
System.out.println(".....服務器 is ready...");
//綁定一個端口并且同步處理,生成了一個 ChannelFuture 對象
//啟動服務器(并綁定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//給 cf 注冊監(jiān)聽器洋满,監(jiān)控我們關心的事件
//綁定端口是異步操作晶乔,當綁定操作處理完,將會調(diào)用相應的監(jiān)聽器處理邏輯
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監(jiān)聽端口 6668 成功");
} else {
System.out.println("監(jiān)聽端口 6668 失敗");
}
}
});
//對關閉通道進行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 說明:自定義一個Handler 需要繼承 netty 規(guī)定好的某個HandlerAdapter(規(guī)范)
*/
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//讀取數(shù)據(jù)實際(這里我們可以讀取客戶端發(fā)送的消息)
/**
* 1芦岂、ChannelHandlerContext ctx:上下文對象瘪弓,含有管道 pipeline,通道channel禽最,地址
* 2、Object msg:客戶端發(fā)送的數(shù)據(jù)袱饭,默認是 Object
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MyDataInfo.MyMessage msg) throws Exception {
//根據(jù)dataType來顯示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("學生id:" + student.getId() + "川无,學生姓名:" + student.getName());
} else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的年齡:" + worker.getAge() + ",工人的姓名:" + worker.getName());
} else {
System.out.println("傳輸?shù)念愋筒徽_虑乖!");
}
}
//數(shù)據(jù)讀取完畢
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush:write + flush
//將數(shù)據(jù)寫入到緩存懦趋,并刷新
//一般需要對發(fā)送的數(shù)據(jù)進行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//發(fā)生異常時疹味,需要關閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個事件循環(huán)組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創(chuàng)建客戶端啟動對象
//注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設置相關參數(shù)
bootstrap.group(group) //設置線程組
.channel(NioSocketChannel.class) // 設置客戶端通道的實現(xiàn)類(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入編碼器 ProtobufEncoder()
pipeline.addLast("encoder", new ProtobufEncoder());
//加入自定義的處理器
pipeline.addLast(new NettyClientHandler());
}
});
System.out.println("...客戶端 is ok...");
//啟動客戶端去連接服務器端
//關于ChannelFuture 涉及到netty的異步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//給關閉通道進行監(jiān)聽
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//當通道就緒就會觸發(fā)該方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//隨機地發(fā)送 Student 或者 Worker 對象
int num = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if (0 == num) {
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 盧俊義").build()).build();
} else {
//發(fā)送一個Worker 對象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
//當通道有讀取事件時仅叫,會觸發(fā)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服務器回復的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服務器的地址:" + ctx.channel().remoteAddress());
}
//發(fā)生異常時,需要關閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- Netty 的主要組件有 Channel糙捺、EventLoop诫咱、ChannelFuture、ChannelHandler洪灯、ChannelPipe 等坎缭。
- ChannelHandler 充當了處理入站和出站數(shù)據(jù)的應用程序邏輯的容器。例如签钩,實現(xiàn) ChannelInboundHandler 接口(或 ChannelInboundHandlerAdapter)掏呼,就可以接收入站事件和數(shù)據(jù),這些數(shù)據(jù)會被業(yè)務邏輯處理铅檩。當要給客戶端發(fā)送響應時憎夷,也可以從 ChannelInboundHandler 沖刷數(shù)據(jù)。業(yè)務邏輯通常寫在一個或者多個 ChannelInboundHandler 中昧旨。ChannelOutboundHandler 原理一樣拾给,只不過它是用來處理出站數(shù)據(jù)的祥得。
- ChannelPipeline 提供了 ChannelHandler 鏈的容器。以客戶端應用程序為例鸣戴,若事件的運動方向是從客戶端到服務端啃沪,則稱這些事件為出站的,即客戶端發(fā)送給服務端的數(shù)據(jù)會通過 pipeline 中的一系列 ChannelOutboundHandler窄锅,并被這些 Handler 處理创千,反之則稱為入站的。
- 當 Netty 發(fā)送或者接受一個消息時入偷,就將會發(fā)生一次數(shù)據(jù)轉換追驴。入站消息會被解碼:從字節(jié)轉換為另一種格式(比如 java 對象);若是出站消息疏之,則會被編碼成字節(jié)殿雪。
- Netty 提供一系列實用的編解碼器,它們都實現(xiàn)了 ChannelInboundHadnler 或ChannelOutboundHandler 接口锋爪。在這些類中丙曙,channelRead 方法已經(jīng)被重寫了。以入站為例其骄,對于每個從入站 Channel 讀取的消息亏镰,這個方法會被調(diào)用。隨后拯爽,它將調(diào)用由解碼器所提供的 decode() 方法進行解碼索抓,并將已經(jīng)解碼的字節(jié)轉發(fā)給 ChannelPipeline 中的下一個 ChannelInboundHandler。
- 由于不可能知道遠程節(jié)點是否會一次性發(fā)送一個完整的信息毯炮,tcp 有可能出現(xiàn)粘包拆包的問題逼肯,這個類會對入站數(shù)據(jù)進行緩沖,直到它準備好被處理為止桃煎。
關于 ByteToMessageDecoder 實例分析
- Netty 應用實例:使用自定義的編碼器和解碼器來說明 Netty 的 handler 鏈調(diào)用機制篮幢,要求客戶端發(fā)送 long類型的數(shù)據(jù)給服務器,同時服務端也發(fā)送 long 類型的數(shù)據(jù)給客戶端备禀。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception {
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
//自定義初始化類
serverBootstrap.childHandler(new MyServerInitializer());
//啟動服務器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//注意編碼解碼操作必須在handler操作前
//入站的handler進行解碼
pipeline.addLast(new MyByteToLongDecoder());
//出站的handler進行編碼
pipeline.addLast(new MyLongToByteEncoder());
//自定義一個handler來處理業(yè)務邏輯
pipeline.addLast(new MyServerHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("從客戶端:" + ctx.channel().remoteAddress() + " 讀到的數(shù)據(jù)為:" + msg);
//給客戶端發(fā)送一個long類型的數(shù)據(jù)
ctx.writeAndFlush(98765L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* decode 會被調(diào)用多次洲拇,直到確定沒有新的元素被添加到 list 集合為止 或者 ByteBuf 沒有更多可讀的字節(jié)為止
* 若 list out不為空,則會將 list 的內(nèi)容傳遞給下一個 channelinboundhandler 處理曲尸,該處理器的方法也會被調(diào)用多次
* @param ctx 上下文對象
* @param in 入站的 ByteBuf
* @param out 將解碼后的數(shù)據(jù)傳給下一個 handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder decode 被調(diào)用");
if (in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個事件循環(huán)組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創(chuàng)建客戶端啟動對象
//注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設置相關參數(shù)
bootstrap.group(group) //設置線程組
.channel(NioSocketChannel.class) // 設置客戶端通道的實現(xiàn)類(反射)
.handler(new MyClientInitializer());
System.out.println("...客戶端 is ok...");
//啟動客戶端去連接服務器端
//關于ChannelFuture 涉及到netty的異步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
//給關閉通道進行監(jiān)聽
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//注意:順序不能相反
//加入一個出站的handler赋续,對數(shù)據(jù)進行編碼
pipeline.addLast(new MyLongToByteEncoder());
//加入一個入站的handler,對數(shù)據(jù)進行解碼
pipeline.addLast(new MyByteToLongDecoder());
//加入一個自定義的handler來處理業(yè)務
pipeline.addLast(new MyClientHandler());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被調(diào)用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服務器的ip:" + ctx.channel().remoteAddress());
System.out.println("收到服務器的消息:" + msg);
}
//發(fā)送數(shù)據(jù)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 發(fā)送數(shù)據(jù)");
//1另患、"abcdabcdabcdabcd" 是 16個字節(jié)
//2纽乱、該處理器的前一個handler 是 MyLongToByteEncoder
//3、MyLongToByteEncoder 的父類:MessageToByteEncoder
//4昆箕、父類 MessageToByteEncoder
/*
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try { //判斷當前msg 是否為處理的類型鸦列,若是則處理租冠,否則跳過自定義的 encode 過程
if (this.acceptOutboundMessage(msg)) {
I cast = msg;
buf = this.allocateBuffer(ctx, msg, this.preferDirect);
try {
this.encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(msg);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException var17) {
throw var17;
} catch (Throwable var18) {
throw new EncoderException(var18);
} finally {
if (buf != null) {
buf.release();
}
}
}
5、因此薯嗤,我們編寫 Encoder 是要注意傳入的數(shù)據(jù)類型和處理的數(shù)據(jù)類型一致
*/
// ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));
ctx.writeAndFlush(1234567L);
}
}
- 總結:①不論解碼器 handler 還是編碼器 handler 即接收的消息類型必須與待處理的消息類型必須一致顽爹,否則該 handler 不會被執(zhí)行;②在解碼器進行數(shù)據(jù)解碼時骆姐,需要判斷緩存區(qū)(ByteBuf)的數(shù)據(jù)是否足夠镜粤,否則接收到的結果和期望結果可能不一致。
-
ReplayingDecoder
解碼器(public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
)擴展了ByteToMessageDecoder
類玻褪,使用了這個類肉渴,我們就不必調(diào)用readableBytes()
方法。其中带射,參數(shù) S 指定了用戶狀態(tài)管理的類型同规,Void
值代表不需要狀態(tài)管理。
- 應用實例:使用
ReplayingDecoder
編寫解碼器對上一個案例進行簡化窟社。
- MyByteToLongDecoder2.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被調(diào)用");
//在 ReplayingDecoder 不需要判斷數(shù)據(jù)是否足夠讀取券勺,內(nèi)部會進行處理判斷
out.add(in.readLong());
}
}
- 雖然使用
ReplayingDecoder
很方便,但它也有一些局限性:
- 并不是所有的 ByteBuf 操作都被支持灿里,若調(diào)用了一個不被支持的方法朱灿,則會拋出一個
UnsupportedOperationException
。
-
ReplayingDecoder
在某些情況下可能稍慢于ByteToMessageDecoder
钠四,例如網(wǎng)絡緩慢并且消息格式復雜時,消息會被拆成了多個碎片跪楞,速度變慢缀去。
- 其它的編解碼器:
-
LineBasedFrameDecoder
:這個類在 Netty 內(nèi)部也有使用,它使用行尾控制字符(\n
或者\r\n
)作為分隔符來解析數(shù)據(jù)甸祭。
-
DelimiterBasedFrameDecoder
:使用自定義的特殊字符作為消息的分隔符缕碎。
-
HttpObjectDecoder
:一個 HTTP 數(shù)據(jù)的解碼器。
-
LengthFieldBasedFrameDecoder
:通過指定長度來標識整包消息池户,這樣就可以自動地處理黏包和半包消息咏雌。
- TCP 是面向連接的,面向流的校焦,提供高可靠性的服務赊抖。收發(fā)兩端(客戶端和服務器端)都要有一一成對的 socket。因此寨典,發(fā)送端為了將多個發(fā)給接收端的包更有效地發(fā)給對方氛雪,使用了優(yōu)化方法(
Nagle
算法),將多次間隔較小且數(shù)據(jù)量小的數(shù)據(jù)耸成,合并成一個大的數(shù)據(jù)塊报亩,然后進行封包浴鸿。這樣做雖然提高了效率,但是接收端難于分辨出完整的數(shù)據(jù)包了弦追,因為面向流的通信是無消息保護邊界的岳链。
- 由于 TCP 無消息保護邊界,需要在接收端處理消息邊界問題劲件,也就是我們所說的粘包掸哑、拆包問題。
- 對上圖的說明:假設客戶端分別發(fā)送了兩個數(shù)據(jù)包 D1 和 D2 給服務端举户,由于服務端一次讀取到字節(jié)數(shù)是不確定的,故可能存在以下四種情況:
- 服務端分兩次讀取到了兩個獨立的數(shù)據(jù)包遍烦,分別是 D1 和 D2俭嘁,沒有粘包和拆包。
- 服務端一次接受到了兩個數(shù)據(jù)包服猪,D1 和 D2 粘合在一起供填,稱之為
TCP 粘包
。
- 服務端分兩次讀取到了數(shù)據(jù)包罢猪,第一次讀取到了完整的 D1 包和 D2 包的部分內(nèi)容近她,第二次讀取到了 D2 包的剩余內(nèi)容,稱之為
TCP 拆包
膳帕。
- 服務端分兩次讀取到了數(shù)據(jù)包粘捎,第一次讀取到了 D1 包的部分內(nèi)容 D1_1,第二次讀取到了 D1 包的剩余部分內(nèi)容 D1_2 和完整的 D2 包危彩,也稱之為
TCP 拆包
攒磨。
- Netty應用實例:TCP 粘包和拆包現(xiàn)象實例。
- MyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//將buffer轉成字符串
String message = new String(buffer, StandardCharsets.UTF_8);
System.out.println("服務器接收到數(shù)據(jù):" + message);
System.out.println("服務器接收到消息量為:" + (++this.count));
//服務器回送數(shù)據(jù)給客戶端汤徽,回送一個隨機id
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", StandardCharsets.UTF_8);
ctx.writeAndFlush(responseByteBuf);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定義一個初始化類
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客戶端發(fā)送10條數(shù)據(jù) hello娩缰,server + 編號
for (int i = 0; i < 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server + " + i, StandardCharsets.UTF_8);
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, StandardCharsets.UTF_8);
System.out.println("客戶端接收到消息:" + message);
System.out.println("客戶端接收消息數(shù)量為:" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- TCP 粘包和拆包解決方案:使用自定義協(xié)議包+編解碼器來解決谒府,關鍵就是要解決服務器端每次讀取數(shù)據(jù)長度的問題拼坎,一旦解決了,就不會出現(xiàn)服務器多讀或少讀數(shù)據(jù)的問題完疫,從而避免的 TCP 粘包泰鸡、拆包。
- Netty 應用實例:解決TCP 粘包趋惨、拆包問題鸟顺。
- 要求:客戶端一共發(fā)送 5 個
Message
對象,每發(fā)送一個Message
對象,服務器端就接收一個 Message讯嫂,將其解碼并回復一個 Message 對象給客戶端蹦锋。
//協(xié)議包
public class MessageProtocol {
private int len; //關鍵
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被調(diào)用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被調(diào)用");
//需要將得到二進制字節(jié)碼轉成 MessageProtocol 數(shù)據(jù)包(對象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封裝成 MessageProtocol 對象,放入 out欧芽, 傳遞下一個handler業(yè)務處理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定義一個初始化類
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageDecoder());//解碼器
pipeline.addLast(new MyMessageEncoder());//編碼器
pipeline.addLast(new MyServerHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
//處理業(yè)務的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到數(shù)據(jù)莉掂,并處理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println();
System.out.println("服務器接收到信息如下:");
System.out.println("長度=" + len);
System.out.println("內(nèi)容=" + new String(content, StandardCharsets.UTF_8));
System.out.println("服務器接收到消息包數(shù)量=" + (++this.count));
//回復消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes(StandardCharsets.UTF_8).length;
byte[] responseContent2 = responseContent.getBytes(StandardCharsets.UTF_8);
//構建一個協(xié)議包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent2);
ctx.writeAndFlush(messageProtocol);
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定義一個初始化類
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder()); //加入編碼器
pipeline.addLast(new MyMessageDecoder()); //加入解碼器
pipeline.addLast(new MyClientHandler());
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客戶端發(fā)送10條數(shù)據(jù) "今天天氣冷,吃火鍋" 編號
for (int i = 0; i < 5; i++) {
String mes = "今天天氣冷千扔,吃火鍋憎妙!";
byte[] content = mes.getBytes(StandardCharsets.UTF_8);
int length = content.length;
//創(chuàng)建協(xié)議包對象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("客戶端接收到消息如下:");
System.out.println("長度=" + len);
System.out.println("內(nèi)容=" + new String(content, StandardCharsets.UTF_8));
System.out.println("客戶端接收消息數(shù)量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("異常消息:" + cause.getMessage());
ctx.close();
}
}