Netty說自己是異步事件驅(qū)動的框架弛针,并沒有說網(wǎng)絡(luò)模型用的是異步模型裳朋,異步事件驅(qū)動框架體現(xiàn)在所有的I/O操作是異步的匪蝙,所有的IO調(diào)用會立即返回敛摘,并不保證調(diào)用成功與否门烂,但是調(diào)用會返回ChannelFuture,netty會通過ChannelFuture通知你調(diào)用是成功了還是失敗了亦或是取消了兄淫。
第 6 章 Netty 核心模塊組件
6.1 Bootstrap屯远、ServerBootstrap
- Bootstrap 意思是引導(dǎo),一個 Netty 應(yīng)用通常由一個 Bootstrap 開始捕虽,主要作用是配置整個 Netty 程序慨丐,串聯(lián) 各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導(dǎo)類泄私,ServerBootstrap 是服務(wù)端啟動引導(dǎo)類
- 常見的方法有
1.public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)房揭,該方法用于服務(wù)器端备闲, 用來設(shè)置兩個 EventLoop
2.public B group(EventLoopGroup group) ,該方法用于客戶端捅暴,用來設(shè)置一個 EventLoop
3.public B channel(Class<? extends C> channelClass)恬砂,該方法用來設(shè)置一個服務(wù)器端的通道實現(xiàn)
4.public <T> B option(ChannelOption<T> option, T value),用來給 ServerChannel 添加配置
5.public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)蓬痒,用來給接收到的通道添加配置
6.public ServerBootstrap childHandler(ChannelHandler childHandler)觉既,該方法用來設(shè)置業(yè)務(wù)處理類(自定義的 handler)Handler是對應(yīng)bossGroup,chileHandler對應(yīng)workGroup
7.public ChannelFuture bind(int inetPort) ,該方法用于服務(wù)器端乳幸,用來設(shè)置占用的端口號
8.public ChannelFuture connect(String inetHost, int inetPort) 瞪讼,該方法用于客戶端,用來連接服務(wù)器端
6.2 Future粹断、ChannelFuture
Netty 中所有的 IO 操作都是異步的符欠,不能立刻得知消息是否被正確處理。但是可以過一會等它執(zhí)行完成或 者直接注冊一個監(jiān)聽瓶埋,具體的實現(xiàn)就是通過 Future 和 ChannelFutures希柿,他們可以注冊一個監(jiān)聽,當(dāng)操作執(zhí)行成功 或失敗時監(jiān)聽會自動觸發(fā)注冊的監(jiān)聽事件
常見的方法有
Channel channel()养筒,返回當(dāng)前正在進行 IO 操作的通道
ChannelFuture sync()曾撤,等待異步操作執(zhí)行完畢
6.3 Channel
- Netty 網(wǎng)絡(luò)通信的組件,能夠用于執(zhí)行網(wǎng)絡(luò) I/O 操作晕粪。
- 通過 Channel 可獲得當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)
- 通過 Channel 可獲得 網(wǎng)絡(luò)連接的配置參數(shù) (例如接收緩沖區(qū)大屑废ぁ)
- Channel 提供異步的網(wǎng)絡(luò) I/O 操作(如建立連接,讀寫巫湘,綁定端口)装悲,異步調(diào)用意味著任何 I/O 調(diào)用都將立即返 回,并且不保證在調(diào)用結(jié)束時所請求的 I/O 操作已完成
- 調(diào)用立即返回一個 ChannelFuture 實例尚氛,通過注冊監(jiān)聽器到 ChannelFuture 上诀诊,可以 I/O 操作成功、失敗或取 消時回調(diào)通知調(diào)用方
- 支持關(guān)聯(lián) I/O 操作與對應(yīng)的處理程序
- 不同協(xié)議阅嘶、不同的阻塞類型的連接都有不同的 Channel 類型與之對應(yīng)属瓣,
常用的 Channel 類型:
NioSocketChannel,異步的客戶端 TCP Socket 連接讯柔。
NioServerSocketChannel抡蛙,異步的服務(wù)器端 TCP Socket 連接。
NioDatagramChannel磷杏,異步的 UDP 連接溜畅。
NioSctpChannel捏卓,異步的客戶端 Sctp 連接极祸。
NioSctpServerChannel慈格,異步的 Sctp 服務(wù)器端連接,這些通道涵蓋了 UDP 和 TCP 網(wǎng)絡(luò) IO 以及文件 IO遥金。
6.4 Selector
- Netty 基于 Selector 對象實現(xiàn) I/O 多路復(fù)用浴捆,通過 Selector 一個線程可以監(jiān)聽多個連接的 Channel 事件。
- 當(dāng)向一個 Selector 中注冊 Channel 后稿械,Selector 內(nèi)部的機制就可以自動不斷地查詢(Select) 這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀选泻,可寫,網(wǎng)絡(luò)連接完成等)美莫,這樣程序就可以很簡單地使用一個 線程高效地管理多個 Channel
6.5 ChannelHandler 及其實現(xiàn)類
- ChannelHandler 是一個接口页眯,處理 I/O 事件或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè)務(wù)處理鏈) 中的下一個處理程序厢呵。
- ChannelHandler 本身并沒有提供很多方法窝撵,因為這個接口有許多的方法需要實現(xiàn),方便使用期間襟铭,可以繼承它 的子類
-
ChannelHandler 及其實現(xiàn)類一覽圖(后)
image.png -
我們經(jīng)常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter碌奉,然后通過重寫相應(yīng)方法實現(xiàn)業(yè)務(wù) 邏輯,我們接下來看看一般都需要重寫哪些方法
image.png
6.6 Pipeline 和 ChannelPipeline
ChannelPipeline 是一個重點:
- ChannelPipeline 是一個 Handler 的集合寒砖,它負責(zé)處理和攔截 inbound 或者 outbound 的事件和操作赐劣,相當(dāng)于 一個貫穿 Netty 的鏈。(也可以這樣理解:ChannelPipeline 是 保存 ChannelHandler 的 List哩都,用于處理或攔截 Channel 的入站事件和出站操作)
- ChannelPipeline 實現(xiàn)了一種高級形式的攔截過濾器模式魁兼,使用戶可以完全控制事件的處理方式,以及 Channel 中各個的 ChannelHandler 如何相互交互
-
在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應(yīng)漠嵌,它們的組成關(guān)系如下
image.png - 常用方法
ChannelPipeline addFirst(ChannelHandler... handlers)璃赡,把一個業(yè)務(wù)處理類(handler)添加到鏈中的第一個位置
ChannelPipeline addLast(ChannelHandler... handlers),把一個業(yè)務(wù)處理類(handler)添加到鏈中的最后一個位置
6.7 ChannelHandlerContext
- 保存 Channel 相關(guān)的所有上下文信息献雅,同時關(guān)聯(lián)一個 ChannelHandler 對象
- 即 ChannelHandlerContext 中 包 含 一 個 具 體 的 事 件 處 理 器 ChannelHandler 碉考, 同 時 ChannelHandlerContext 中也綁定了對應(yīng)的 pipeline 和 Channel 的信息,方便對 ChannelHandler 進行調(diào)用.
-
常用方法
IMG_2239(20201128-152407).JPG
6.8 ChannelOption
- Netty 在創(chuàng)建 Channel 實例后,一般都需要設(shè)置 ChannelOption 參數(shù)挺身。
-
ChannelOption 參數(shù)如下:
image.png
6.9 EventLoopGroup 和其實現(xiàn)類 NioEventLoopGroup
- EventLoopGroup 是一組 EventLoop 的抽象侯谁,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop 同時工作章钾,每個 EventLoop 維護著一個 Selector 實例墙贱。
- EventLoopGroup 提供 next 接口,可以從組里面按照一定規(guī)則獲取其中一個 EventLoop 來處理任務(wù)贱傀。在 Netty 服 務(wù) 器 端 編 程 中 虽抄, 我 們 一 般 都 需 要 提 供 兩 個 EventLoopGroup , 例 如 : BossEventLoopGroup 和 WorkerEventLoopGroup碗短。
-
通常一個服務(wù)端口即一個 ServerSocketChannel 對應(yīng)一個 Selector 和一個 EventLoop 線程。BossEventLoop 負責(zé) 接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進行 IO 處理报腔,如下圖所示
image.png - 常用方法 public NioEventLoopGroup(),構(gòu)造方法
public Future<?> shutdownGracefully()剖淀,斷開連接纯蛾,關(guān)閉線程
6.10 Unpooled 類
- Netty 提供一個專門用來操作緩沖區(qū)(即 Netty 的數(shù)據(jù)容器)的工具類
-
常用方法如下所示
image.png -
舉例說明 Unpooled 獲取 Netty 的數(shù)據(jù)容器 ByteBuf 的基本使用 【案例演示】
image.png
案例 1
package com.atguigu.netty.buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class NettyByteBuf01 {
public static void main(String[] args) {
//創(chuàng)建一個ByteBuf
//說明
//1. 創(chuàng)建 對象,該對象包含一個數(shù)組arr , 是一個byte[10]
//2. 在netty 的buffer中纵隔,不需要使用flip 進行反轉(zhuǎn)
// 底層維護了 readerindex 和 writerIndex
//3. 通過 readerindex 和 writerIndex 和 capacity翻诉, 將buffer分成三個區(qū)域
// 0---readerindex 已經(jīng)讀取的區(qū)域
// readerindex---writerIndex , 可讀的區(qū)域
// writerIndex -- capacity, 可寫的區(qū)域
ByteBuf buffer = Unpooled.buffer(10);
for(int i = 0; i < 10; i++) {
buffer.writeByte(i);
}
System.out.println("capacity=" + buffer.capacity());//10
//輸出
// for(int i = 0; i<buffer.capacity(); i++) {
// System.out.println(buffer.getByte(i));
// }
for(int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte());
}
System.out.println("執(zhí)行完畢");
}
}
案例 2
package com.atguigu.netty.buf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
public class NettyByteBuf02 {
public static void main(String[] args) {
//創(chuàng)建ByteBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
//使用相關(guān)的方法
if(byteBuf.hasArray()) { // true
byte[] content = byteBuf.array();
//將 content 轉(zhuǎn)成字符串
System.out.println(new String(content, Charset.forName("utf-8")));
System.out.println("byteBuf=" + byteBuf);
System.out.println(byteBuf.arrayOffset()); // 0
System.out.println(byteBuf.readerIndex()); // 0
System.out.println(byteBuf.writerIndex()); // 12
System.out.println(byteBuf.capacity()); // 36
//System.out.println(byteBuf.readByte()); //
System.out.println(byteBuf.getByte(0)); // 104
int len = byteBuf.readableBytes(); //可讀的字節(jié)數(shù) 12
System.out.println("len=" + len);
//使用for取出各個字節(jié)
for(int i = 0; i < len; i++) {
System.out.println((char) byteBuf.getByte(i));
}
//按照某個范圍讀取
System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
}
}
}
6.11 Netty 應(yīng)用實例-群聊系統(tǒng)
實例要求:
- 編寫一個 Netty 群聊系統(tǒng)捌刮,實現(xiàn)服務(wù)器端和客戶端之間的數(shù)據(jù)簡單通訊(非阻塞)
- 實現(xiàn)多人群聊
- 服務(wù)器端:可以監(jiān)測用戶上線碰煌,離線,并實現(xiàn)消息轉(zhuǎn)發(fā)功能
- 客戶端:通過 channel 可以無阻塞發(fā)送消息給其它所有用戶绅作,同時可以接受其它用戶發(fā)送的消息(有服務(wù)器轉(zhuǎn)發(fā) 得到)
-
目的:進一步理解 Netty 非阻塞網(wǎng)絡(luò)編程機制
image.png
GroupChatServer
package com.atguigu.netty.groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //監(jiān)聽端口
public GroupChatServer(int port) {
this.port = port;
}
//編寫run方法拄查,處理客戶端的請求
public void run() throws Exception{
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//獲取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解碼器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入編碼器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的業(yè)務(wù)處理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服務(wù)器啟動");
ChannelFuture channelFuture = b.bind(port).sync();
//監(jiān)聽關(guān)閉
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
GroupChatServerHandler
package com.atguigu.netty.groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//public static List<Channel> channels = new ArrayList<Channel>();
//使用一個hashmap 管理
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定義一個channle 組,管理所有的channel
//GlobalEventExecutor.INSTANCE) 是全局的事件執(zhí)行器棚蓄,是一個單例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示連接建立堕扶,一旦連接,第一個被執(zhí)行
//將當(dāng)前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//將該客戶加入聊天的信息推送給其它在線的客戶端
/*
該方法會將 channelGroup 中所有的channel 遍歷梭依,并發(fā)送 消息稍算,
我們不需要自己遍歷
*/
channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
}
//斷開連接, 將xx客戶離開信息推送給當(dāng)前在線的客戶
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
//表示channel 處于活動狀態(tài), 提示 xx上線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上線了~");
}
//表示channel 處于不活動狀態(tài), 提示 xx離線了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 離線了~");
}
//讀取數(shù)據(jù)
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//獲取到當(dāng)前channel
Channel channel = ctx.channel();
//這時我們遍歷channelGroup, 根據(jù)不同的情況,回送不同的消息
channelGroup.forEach(ch -> {
if(channel != ch) { //不是當(dāng)前的channel,轉(zhuǎn)發(fā)消息
ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 發(fā)送了消息" + msg + "\n");
}else {//回顯自己發(fā)送的消息給自己
ch.writeAndFlush("[自己]發(fā)送了消息" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關(guān)閉通道
ctx.close();
}
}
GroupChatClient
package com.atguigu.netty.groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//屬性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相關(guān)handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定義的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress()+ "--------");
//客戶端需要輸入信息役拴,創(chuàng)建一個掃描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通過channel 發(fā)送到服務(wù)器端
channel.writeAndFlush(msg + "\r\n");
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
GroupChatClientHandler
package com.atguigu.netty.groupchat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
6.12 Netty 心跳檢測機制案例
實例要求:
- 編寫一個 Netty 心跳檢測機制案例, 當(dāng)服務(wù)器超過 3 秒沒有讀時糊探,就提示讀空閑
- 當(dāng)服務(wù)器超過 5 秒沒有寫操作時,就提示寫空閑
- 實現(xiàn)當(dāng)服務(wù)器超過 7 秒沒有讀或者寫操作時河闰,就提示讀寫空閑
MyServer
package com.atguigu.netty.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception{
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一個netty 提供 IdleStateHandler
/*
說明
1. IdleStateHandler 是netty 提供的處理空閑狀態(tài)的處理器
2. long readerIdleTime : 表示多長時間沒有讀, 就會發(fā)送一個心跳檢測包檢測是否連接
3. long writerIdleTime : 表示多長時間沒有寫, 就會發(fā)送一個心跳檢測包檢測是否連接
4. long allIdleTime : 表示多長時間沒有讀寫, 就會發(fā)送一個心跳檢測包檢測是否連接
5. 文檔說明
triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
* read, write, or both operation for a while.
* 6. 當(dāng) IdleStateEvent 觸發(fā)后 , 就會傳遞給管道 的下一個handler去處理
* 通過調(diào)用(觸發(fā))下一個handler 的 userEventTiggered , 在該方法中去處理 IdleStateEvent(讀空閑科平,寫空閑,讀寫空閑)
*/
pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
//加入一個對空閑檢測進一步處理的handler(自定義)
pipeline.addLast(new MyServerHandler());
}
});
//啟動服務(wù)器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyServerHandler
package com.atguigu.netty.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
//將 evt 向下轉(zhuǎn)型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "讀空閑";
break;
case WRITER_IDLE:
eventType = "寫空閑";
break;
case ALL_IDLE:
eventType = "讀寫空閑";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType);
System.out.println("服務(wù)器做相應(yīng)處理..");
//如果發(fā)生空閑姜性,我們關(guān)閉通道
// ctx.channel().close();
}
}
}
6.13 Netty 通過 WebSocket 編程實現(xiàn)服務(wù)器和客戶端長連接 實例要求:
- Http 協(xié)議是無狀態(tài)的, 瀏覽器和服務(wù)器間的請求響應(yīng)一次瞪慧,下一次會重新創(chuàng)建連接.
- 要求:實現(xiàn)基于 webSocket 的長連接的全雙工的交互
- 改變 Http 協(xié)議多次請求的約束,實現(xiàn)長連接了部念, 服務(wù)器可以發(fā)送消息給瀏覽器
- 客戶端瀏覽器和服務(wù)器端會相互感知弃酌,比如服務(wù)器關(guān)閉了,瀏覽器會感知儡炼,同樣瀏覽器關(guān)閉了妓湘,服務(wù)器會感知
-
運行界面
image.png
MyServer
package com.atguigu.netty.websocket;
import com.atguigu.netty.heartbeat.MyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception{
//創(chuàng)建兩個線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因為基于http協(xié)議,使用http的編碼和解碼器
pipeline.addLast(new HttpServerCodec());
//是以塊方式寫乌询,添加ChunkedWriteHandler處理器
//一般情況下在header部分會有一個欄目:content-length榜贴,用于指明body部分的數(shù)據(jù)大小,那么我們在讀取數(shù)據(jù)的時候只要讀取相應(yīng)大小的字節(jié)數(shù)據(jù)就表示body已經(jīng)讀完了妹田。唬党。鹃共。
//但是有的情況下如果沒有content-length,那么就需要那么就需要chunked來傳輸body的數(shù)據(jù)了初嘹。。沮趣。
pipeline.addLast(new ChunkedWriteHandler());
/*
說明
1. http數(shù)據(jù)在傳輸過程中是分段, HttpObjectAggregator 屯烦,就是可以將多個段聚合
2. 這就就是為什么,當(dāng)瀏覽器發(fā)送大量數(shù)據(jù)時房铭,就會發(fā)出多次http請求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
說明
1. 對應(yīng)websocket 驻龟,它的數(shù)據(jù)是以 幀(frame) 形式傳遞
2. 可以看到WebSocketFrame 下面有六個子類
3. 瀏覽器請求時 ws://localhost:7000/hello 表示請求的uri
4. WebSocketServerProtocolHandler 核心功能是將 http協(xié)議升級為 ws協(xié)議 , 保持長連接
5. 是通過一個 狀態(tài)碼 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
//自定義的handler ,處理業(yè)務(wù)邏輯
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
//啟動服務(wù)器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
MyTextWebSocketFrameHandler
package com.atguigu.netty.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
//這里 TextWebSocketFrame 類型缸匪,表示一個文本幀(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服務(wù)器收到消息 " + msg.text());
//回復(fù)消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服務(wù)器時間" + LocalDateTime.now() + " " + msg.text()));
}
//當(dāng)web客戶端連接后翁狐, 觸發(fā)方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
System.out.println("handlerAdded 被調(diào)用" + ctx.channel().id().asLongText());
System.out.println("handlerAdded 被調(diào)用" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被調(diào)用" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("異常發(fā)生 " + cause.getMessage());
ctx.close(); //關(guān)閉連接
}
}
hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判斷當(dāng)前瀏覽器是否支持websocket
if(window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello2");
//相當(dāng)于channelReado, ev 收到服務(wù)器端回送的消息
//當(dāng)發(fā)生onmessage事件時凌蔬,將接收到的數(shù)據(jù)放入id =“responseText”的元素中
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
//相當(dāng)于連接開啟(感知到連接開啟)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "連接開啟了.."
}
//相當(dāng)于連接關(guān)閉(感知到連接關(guān)閉)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "連接關(guān)閉了.."
}
} else {
alert("當(dāng)前瀏覽器不支持websocket")
}
//發(fā)送消息到服務(wù)器
function send(message) {
if(!window.socket) { //先判斷socket是否創(chuàng)建好
return;
}
if(socket.readyState == WebSocket.OPEN) {
//通過socket 發(fā)送消息
socket.send(message)
} else {
alert("連接沒有開啟");
}
}
</script>
<!--onsubmit:當(dāng)提交表單時執(zhí)行一段 JavaScript,則return false表示禁止表單提交.-->
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="發(fā)生消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空內(nèi)容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>