- 原生 NIO 存在的問題:
- NIO 的類庫和 API 繁雜相味,使用麻煩:需要熟練掌握 Selector拾积、ServerSocketChannel、SocketChannel丰涉、ByteBuffer等拓巧。
- 需要具備其他的額外技能:要熟悉 Java 多線程編程,因?yàn)?NIO 編程涉及到 Reactor 模式一死,你必須對多線程和網(wǎng)絡(luò)編程非常熟悉肛度,才能編寫出高質(zhì)量的 NIO 程序。
- 開發(fā)工作量和難度都非常大:例如客戶端面臨斷連重連投慈、網(wǎng)絡(luò)閃斷承耿、半包讀寫、失敗緩存伪煤、網(wǎng)絡(luò)擁塞和異常流的處理等等加袋。
- JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會導(dǎo)致 Selector 空輪詢抱既,最終導(dǎo)致 CPU100%职烧。直到 JDK1.7 版本該問題仍舊存在,沒有被根本解決防泵。
- Netty的優(yōu)點(diǎn):Netty 對 JDK 自帶的 NIO 的 API 進(jìn)行了封裝蚀之,解決了上述問題。
- 設(shè)計(jì)優(yōu)雅:適用于各種傳輸類型的統(tǒng)一 API 阻塞和非阻塞 Socket捷泞;基于靈活且可擴(kuò)展的事件模型足删,可以清晰地分離關(guān)注點(diǎn);高度可定制的線程模型-單線程锁右,一個或多個線程池失受。
- 使用方便:詳細(xì)記錄的 Javadoc,用戶指南和示例骡湖;沒有其他依賴項(xiàng)贱纠,JDK5(Netty3.x)或 6(Netty4.x)就足夠了峻厚。
- 高性能响蕴、吞吐量更高:延遲更低;減少資源消耗惠桃;最小化不必要的內(nèi)存復(fù)制浦夷。
- 安全:完整的 SSL/TLS 和 StartTLS 支持辖试。
- 社區(qū)活躍、不斷更新:社區(qū)活躍劈狐,版本迭代周期短罐孝,發(fā)現(xiàn)的 Bug 可以被及時修復(fù),同時更多的新功能會被加入肥缔。
- 目前存在的線程模型有:
傳統(tǒng)阻塞I/O服務(wù)模型
和Reactor模式
莲兢。
傳統(tǒng)阻塞I/O服務(wù)模型
- 傳統(tǒng)阻塞I/O服務(wù)模型的特點(diǎn):
- 采用阻塞 IO 的模式獲取輸入的數(shù)據(jù);
- 每個連接都需要獨(dú)立的線程完成數(shù)據(jù)的輸入续膳,業(yè)務(wù)處理改艇,數(shù)據(jù)返回。
- 上圖反應(yīng)的問題分析:
- 當(dāng)并發(fā)數(shù)很大坟岔,就會創(chuàng)建大量的線程谒兄,占用很大的系統(tǒng)資源;
- 連接創(chuàng)建后社付,若當(dāng)前線程暫時沒有數(shù)據(jù)可讀承疲,則該線程會阻塞在 read 操作,造成線程資源浪費(fèi)鸥咖。
- 針對傳統(tǒng)阻塞 I/O 服務(wù)模型的缺點(diǎn)燕鸽,給出以下解決方案:
-
基于 I/O 復(fù)用模型
:多個連接共用一個阻塞對象,應(yīng)用程序只需要在一個阻塞對象等待啼辣,無需阻塞等待所有連接绵咱。當(dāng)某個連接有新的數(shù)據(jù)可以處理時,操作系統(tǒng)通知應(yīng)用程序熙兔,線程從阻塞狀態(tài)返回悲伶,開始進(jìn)行業(yè)務(wù)處理。 -
基于線程池復(fù)用線程資源
:不必再為每個連接創(chuàng)建線程住涉,將連接完成后的業(yè)務(wù)處理任務(wù)分配給線程進(jìn)行處理麸锉,一個線程可以處理多個連接的業(yè)務(wù)。
-
- Reactor模式有3鐘叫法:①
反應(yīng)器模式
舆声;②分發(fā)者模式(Dispatcher)
花沉;③通知者模式(notifier)
。
Reactor模式
- 對上圖的說明:
-
Reactor模式
:通過一個或多個輸入同時傳遞給服務(wù)處理器的模式(基于事件驅(qū)動)媳握。 - 服務(wù)器端程序處理傳入的多個請求碱屁,并將它們同步分派到相應(yīng)的處理線程,因此 Reactor 模式也叫
Dispatcher模式
蛾找。 - Reactor 模式使用 IO 復(fù)用監(jiān)聽事件娩脾,收到事件后,分發(fā)給某個線程(進(jìn)程)打毛,這點(diǎn)就是網(wǎng)絡(luò)服務(wù)器高并發(fā)的處理關(guān)鍵柿赊。
-
- 根據(jù) Reactor 的數(shù)量和處理資源池線程的數(shù)量不同俩功,有 3 種典型的實(shí)現(xiàn):①
單Reactor單線程
;②單Reactor多線程
碰声;③主從Reactor多線程
诡蜓。Netty 主要基于主從Reactor多線程
模型做了一定的改進(jìn)锅移,其中主從 Reactor 多線程模型有多個 Reactor班巩。
單Reactor單線程
- 對上圖的說明:
-
select
是前面 I/O 復(fù)用模型介紹的標(biāo)準(zhǔn)網(wǎng)絡(luò)編程 API,可以實(shí)現(xiàn)應(yīng)用程序通過一個阻塞對象監(jiān)聽多路連接請求荠耽; - Reactor 對象通過 Select 監(jiān)控客戶端請求事件瞻颂,收到事件后
Dispatch
分發(fā)給處理進(jìn)程脚粟; - 若是
建立連接請求事件
,則由 Acceptor 通過 Accept 處理連接請求蘸朋,然后創(chuàng)建一個 Handler 對象處理連接完成后的后續(xù)業(yè)務(wù)處理核无; - 若不是建立連接事件,則 Reactor 會分發(fā)調(diào)用連接對應(yīng)的 Handler 來響應(yīng)
Handler藕坯,會完成 Read → 業(yè)務(wù)處理 → Send 的完整業(yè)務(wù)流程团南。 - 優(yōu)點(diǎn):模型簡單,沒有多線程炼彪、進(jìn)程通信吐根、競爭的問題,全部都在一個線程中完成辐马;
- 缺點(diǎn):性能問題拷橘,只有一個線程,
無法完全發(fā)揮多核CPU的性能
喜爷。Handler在處理某個連接上的業(yè)務(wù)時冗疮,整個進(jìn)程無法處理其他連接事件,很容易導(dǎo)致性能瓶頸檩帐∈踽#可靠性問題:線程意外終止,或者進(jìn)入死循環(huán)湃密,會導(dǎo)致整個系統(tǒng)通信模塊不可用诅挑,不能接收和處理外部消息,造成節(jié)點(diǎn)故障泛源。 - 使用場景:客戶端的數(shù)量有限拔妥,業(yè)務(wù)處理非常快速达箍,比如 Redis 在業(yè)務(wù)處理方面的時間復(fù)雜度為
的情況没龙。
-
單Reactor多線程
- 對上圖的說明:
- Reactor 對象通過 Select 監(jiān)控客戶端請求事件,收到事件后,通過 Dispatch 進(jìn)行分發(fā)兜畸;
- 若建立連接請求努释,則由 Acceptor 通過 accept 處理連接請求碘梢,然后創(chuàng)建一個 Handler 對象處理完成連接后的各種事件咬摇;
- 若不是連接請求,則由 Reactor 分發(fā)調(diào)用連接對應(yīng)的 handler 來處理煞躬;
- handler 只負(fù)責(zé)響應(yīng)事件肛鹏,不做具體的業(yè)務(wù)處理,通過 read 讀取數(shù)據(jù)后恩沛,會分發(fā)給后面的 worker 線程池的某個線程處理業(yè)務(wù)在扰;
- worker 線程池會分配獨(dú)立的線程完成真正的業(yè)務(wù),并將結(jié)果返回給 handler雷客;
- handler 收到響應(yīng)后芒珠,通過 send 將結(jié)果返回給 client。
- 優(yōu)點(diǎn):可以充分的利用多核 cpu 的處理能力搅裙;
- 缺點(diǎn):多線程數(shù)據(jù)共享和訪問比較復(fù)雜皱卓,Reactor 處理所有的事件的監(jiān)聽和響應(yīng),在單線程中運(yùn)行部逮,在高并發(fā)場景時運(yùn)行容易出現(xiàn)性能瓶頸娜汁。
主從Reactor多線程
- 對上圖的說明:
- Reactor 主線程 MainReactor 對象通過 select 監(jiān)聽連接事件,收到事件后兄朋,通過 Acceptor 處理連接事件掐禁;
- 當(dāng) Acceptor 處理連接事件后,MainReactor 將連接分配給 SubReactor颅和;
- Subreactor 將連接加入到連接隊(duì)列進(jìn)行監(jiān)聽傅事,并創(chuàng)建 handler 進(jìn)行各種事件處理,當(dāng)有新事件發(fā)生時峡扩,Subreactor 就會調(diào)用對應(yīng)的 handler 處理享完;
- handler 通過 read 讀取數(shù)據(jù),分發(fā)給后面的 worker 線程處理有额,worker 線程池分配獨(dú)立的 worker 線程進(jìn)行業(yè)務(wù)處理般又,并返回結(jié)果;
- handler 收到響應(yīng)的結(jié)果后巍佑,再通過 send 將結(jié)果返回給 client茴迁;
- Reactor 主線程可以對應(yīng)多個 Reactor 子線程,即 MainRecator 可以關(guān)聯(lián)多個 SubReactor萤衰。
- 優(yōu)點(diǎn):父線程與子線程的數(shù)據(jù)交互簡單職責(zé)明確堕义,父線程只需要接收新連接,子線程完成后續(xù)的業(yè)務(wù)處理。父線程與子線程的數(shù)據(jù)交互簡單倦卖,Reactor 主線程只需要把新連接傳給子線程洒擦,子線程無需返回?cái)?shù)據(jù)。
- 缺點(diǎn):編程復(fù)雜度較高怕膛。
- 應(yīng)用場景:這種模型在許多項(xiàng)目中廣泛使用熟嫩,包括 Nginx 主從 Reactor 多進(jìn)程模型,Memcached 主從多線程褐捻,Netty 主從多線程模型的支持掸茅。
- Reactor模式的優(yōu)點(diǎn):①響應(yīng)快,不必為單個同步時間所阻塞柠逞,雖然 Reactor 本身依然是同步的昧狮;②可以最大程度地避免復(fù)雜的多線程及同步問題、多線程/進(jìn)程的切換帶來的開銷板壮;③擴(kuò)展性好逗鸣,可以方便的通過增加 Reactor 實(shí)例個數(shù)來充分利用 CPU 資源;④復(fù)用性好绰精,Reactor 模型本身與具體事件處理邏輯無關(guān)撒璧,具有很高的復(fù)用性。
- 對上圖的說明:
- BossGroup 線程維護(hù) Selector茬底,只關(guān)注 Accecpt沪悲;
- 當(dāng)接收到 Accept 事件,獲取到對應(yīng)的 SocketChannel阱表,封裝成 NIOScoketChannel殿如,并注冊到 Worker 線程(事件循環(huán))進(jìn)行維護(hù);
- 當(dāng) Worker 線程監(jiān)聽到 Selector 中注冊的通道發(fā)生自己感興趣的事件后最爬,就進(jìn)行處理(由 handler來完成)涉馁。注意: handler 已經(jīng)加入到通道。
- 對上圖的說明:
- Netty 抽象出兩組線程池:BossGroup 專門負(fù)責(zé)接收客戶端的連接爱致,WorkerGroup 專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫烤送;
- BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup;
- NioEventLoopGroup 相當(dāng)于一個事件循環(huán)組糠悯,這個組中含有多個事件循環(huán)帮坚,每一個事件循環(huán)是 NioEventLoop;
- NioEventLoop 表示一個不斷循環(huán)的執(zhí)行處理任務(wù)的線程互艾,每個 NioEventLoop 都有一個 Selector试和,用于監(jiān)聽綁定在其上的 socket 的網(wǎng)絡(luò)通訊;
- NioEventLoopGroup 可以有多個線程纫普,即可以含有多個 NioEventLoop阅悍;
- 每個 BossNioEventLoop 循環(huán)執(zhí)行的步驟有 3 步:
- 輪詢 accept 事件;
- 處理 accept 事件,與 client 建立連接节视,生成 NioScocketChannel拳锚,并將其注冊到某個 worker NIOEventLoop 上的 Selector;
- 處理任務(wù)隊(duì)列的任務(wù)寻行,即 runAllTasks霍掺;
- 每個 Worker NIOEventLoop 循環(huán)執(zhí)行的步驟:
- 輪詢 read,write 事件寡痰;
- 處理 I/O 事件抗楔,即 read棋凳,write 事件拦坠,在對應(yīng) NioScocketChannel 處理;
- 處理任務(wù)隊(duì)列的任務(wù)剩岳,即 runAllTasks贞滨;
- 每個 Worker NIOEventLoop 處理業(yè)務(wù)時,會使用 pipeline(管道)拍棕,pipeline 中包含了 channel晓铆,即通過 pipeline 可以獲取到對應(yīng)通道,管道中維護(hù)了很多的處理器绰播。
- Netty入門案例:TCP通信骄噪。
- 導(dǎo)入maven依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.54.Final</version>
</dependency>
- NettyServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
//創(chuàng)建BossGroup 和 WorkerGroup
//說明
//1、創(chuàng)建兩個線程組 bossGroup 和 workerGroup
//2蠢箩、bossGroup 只是處理連接請求链蕊,真正的客戶端業(yè)務(wù)處理是會交給 workerGroup 完成
//3、兩個都是無限循環(huán)
//4谬泌、bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數(shù):默認(rèn)值為 cpu核數(shù) * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個
try {
//創(chuàng)建服務(wù)器端的啟動對象滔韵,配置參數(shù)
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈?zhǔn)骄幊虂磉M(jìn)行設(shè)置
bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個線程組
.channel(NioServerSocketChannel.class) //使用 NioSocketChannel 作為服務(wù)器的通道實(shí)現(xiàn)
.option(ChannelOption.SO_BACKLOG, 128) //設(shè)置線程隊(duì)列等待連接的個數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true) //設(shè)置保持活動連接狀態(tài)
//.handler(null) // 該 handler對應(yīng) bossGroup , childHandler 對應(yīng) workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { //創(chuàng)建一個通道初始化對象(匿名對象)
//給 pipeline 設(shè)置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//可以使用一個集合管理 SocketChannel,在推送消息時掌实,可以將業(yè)務(wù)加入到各個channel 對應(yīng)的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue 中
System.out.println("客戶socketChannel hashcode=" + ch.hashCode());
ch.pipeline().addLast(new NettyServerHandler()); //給 workerGroup 的 EventLoop 對應(yīng)的管道設(shè)置處理器
}
});
System.out.println(".....服務(wù)器 is ready...");
//綁定一個端口并且同步處理陪蜻,生成了一個 ChannelFuture 對象
//啟動服務(wù)器(并綁定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//給 cf 注冊監(jiān)聽器,監(jiān)控我們關(guān)心的事件
//綁定端口是異步操作贱鼻,當(dāng)綁定操作處理完宴卖,將會調(diào)用相應(yīng)的監(jiān)聽器處理邏輯
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監(jiān)聽端口 6668 成功");
} else {
System.out.println("監(jiān)聽端口 6668 失敗");
}
}
});
//對關(guān)閉通道進(jìn)行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- NettyServerHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 說明:自定義一個Handler 需要繼承 netty 規(guī)定好的某個HandlerAdapter(規(guī)范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取數(shù)據(jù)實(shí)際(這里我們可以讀取客戶端發(fā)送的消息)
/**
* 1、ChannelHandlerContext ctx:上下文對象邻悬,含有管道 pipeline症昏,通道channel,地址
* 2拘悦、Object msg:客戶端發(fā)送的數(shù)據(jù)齿兔,默認(rèn)是 Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服務(wù)器讀取線程:" + Thread.currentThread().getName() + ",channel=" + ctx.channel());
System.out.println("server ctx:" + ctx);
System.out.println("看看channel 和 pipeline的關(guān)系");
Channel channel = ctx.channel();
//本質(zhì)是一個雙向鏈接, 出站入站
ctx.pipeline();
//將 msg 轉(zhuǎn)成一個 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer分苇。
ByteBuf buf = (ByteBuf) msg;
System.out.println("客戶端發(fā)送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + channel.remoteAddress());
}
//數(shù)據(jù)讀取完畢
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush:write + flush
//將數(shù)據(jù)寫入到緩存添诉,并刷新
//一般需要對發(fā)送的數(shù)據(jù)進(jìn)行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//發(fā)生異常時医寿,需要關(guān)閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
- NettyClient.java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個事件循環(huán)組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創(chuàng)建客戶端啟動對象
//注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設(shè)置相關(guān)參數(shù)
bootstrap.group(group) //設(shè)置線程組
.channel(NioSocketChannel.class) // 設(shè)置客戶端通道的實(shí)現(xiàn)類(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自定義的處理器
}
});
System.out.println("...客戶端 is ok...");
//啟動客戶端去連接服務(wù)器端
//關(guān)于ChannelFuture 涉及到netty的異步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//給關(guān)閉通道進(jìn)行監(jiān)聽
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
- NettyClientHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//當(dāng)通道就緒就會觸發(fā)該方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello栏赴,server: (>^ω^<)喵", CharsetUtil.UTF_8));
}
//當(dāng)通道有讀取事件時,會觸發(fā)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服務(wù)器回復(fù)的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服務(wù)器的地址:" + ctx.channel().remoteAddress());
}
//發(fā)生異常時靖秩,需要關(guān)閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- 任務(wù)隊(duì)列中的 Task 有 3 種典型使用場景:
- 用戶程序自定義的普通任務(wù)须眷;
- 用戶自定義定時任務(wù);
- 非當(dāng)前 Reactor 線程調(diào)用 Channel 的各種方法:例如在推送系統(tǒng)的業(yè)務(wù)線程里面沟突,根據(jù)用戶的標(biāo)識花颗,找到對應(yīng)的 Channel 引用,然后調(diào)用 Write 類方法向該用戶推送消息惠拭,就會進(jìn)入到這種場景扩劝,最終的 Write 會提交到任務(wù)隊(duì)列中后被異步消費(fèi)。
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* 說明:自定義一個Handler 需要繼承 netty 規(guī)定好的某個HandlerAdapter(規(guī)范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取數(shù)據(jù)實(shí)際(這里我們可以讀取客戶端發(fā)送的消息)
/**
* 1职辅、ChannelHandlerContext ctx:上下文對象棒呛,含有管道 pipeline,通道channel域携,地址
* 2簇秒、Object msg:客戶端發(fā)送的數(shù)據(jù),默認(rèn)是 Object
* 假設(shè)這里有一個非常耗時長的業(yè)務(wù) -> 異步執(zhí)行 -> 提交該channel 對應(yīng)的 NIOEventLoop 的 taskQueue中
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服務(wù)器讀取線程:" + Thread.currentThread().getName() + "秀鞭,channel=" + ctx.channel());
System.out.println("server ctx:" + ctx);
System.out.println("看看channel 和 pipeline的關(guān)系");
Channel channel = ctx.channel();
//本質(zhì)是一個雙向鏈接, 出站入站
ctx.pipeline();
//將 msg 轉(zhuǎn)成一個 ByteBuf
//ByteBuf 是 Netty 提供的趋观,不是 NIO 的 ByteBuffer。
ByteBuf buf = (ByteBuf) msg;
System.out.println("客戶端發(fā)送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + channel.remoteAddress());
//解決方案1:用戶程序自定義的普通任務(wù)
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello气筋,客戶端~(>^ω^<)喵2", CharsetUtil.UTF_8));
System.out.println("channel code:" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("發(fā)生異常" + ex.getMessage());
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello拆内,客戶端~(>^ω^<)喵3", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("發(fā)生異常" + ex.getMessage());
}
}
});
//解決方案2 : 用戶自定義定時任務(wù) -> 該任務(wù)是提交到 scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端~(>^ω^<)喵4", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("發(fā)生異常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
System.out.println("go on ...");
}
//數(shù)據(jù)讀取完畢
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush:write + flush
//將數(shù)據(jù)寫入到緩存宠默,并刷新
//一般需要對發(fā)送的數(shù)據(jù)進(jìn)行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello麸恍,客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//發(fā)生異常時,需要關(guān)閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
- 每個 NioEventLoop 中包含有一個 Selector搀矫,一個 taskQueue抹沪;每個 NioEventLoop 的 Selector 上可以注冊監(jiān)聽多個 NioChannel;每個 NioChannel 只會綁定在唯一的 NioEventLoop 上瓤球;每個 NioChannel 都綁定有一個自己的 ChannelPipeline融欧。
- 異步模型:當(dāng)一個異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果卦羡。實(shí)際處理這個調(diào)用的組件在完成后噪馏,通過狀態(tài)麦到、通知和回調(diào)來通知調(diào)用者。
- Netty 中的 I/O 操作是異步的欠肾,包括 Bind瓶颠、Write、Connect 等操作會簡單的返回一個 ChannelFuture刺桃。調(diào)用者并不能立刻獲得結(jié)果粹淋,而是通過
Future-Listener
機(jī)制,用戶可以方便地主動獲取或者通過通知機(jī)制獲得 IO 操作結(jié)果瑟慈。 - Netty 的異步模型是建立在 future 和 callback 的之上的桃移。callback 就是回調(diào)。重點(diǎn)說 Future葛碧,它的核心思想是:假設(shè)一個方法 fun借杰,計(jì)算過程可能非常耗時,等待 fun 返回顯然不合適吹埠。那么可以在調(diào)用 fun 的時候第步,立馬返回一個 Future疮装,后續(xù)可以通過 Future 去監(jiān)控方法 fun 的處理過程(即:Future-Listener 機(jī)制)缘琅。
- Future說明:表示異步的執(zhí)行結(jié)果,可以通過它提供的方法來檢測執(zhí)行是否完成廓推,比如檢索計(jì)算等等刷袍。
ChannelFuture
是一個接口:public interface ChannelFuture extends Future<Void>
,可以添加自己的監(jiān)聽器樊展,當(dāng)監(jiān)聽的事件發(fā)生時呻纹,就會通知到監(jiān)聽器。
工作原理示意圖
- Future-Listener機(jī)制:當(dāng) Future 對象剛剛創(chuàng)建時专缠,處于非完成狀態(tài)雷酪,調(diào)用者可以通過返回的 ChannelFuture 來獲取操作執(zhí)行的狀態(tài),注冊監(jiān)聽函數(shù)來執(zhí)行完成后的操作涝婉。常見的操作如下:
- 通過
isDone
方法來判斷當(dāng)前操作是否完成哥力; - 通過
isSuccess
方法來判斷已完成的當(dāng)前操作是否成功; - 通過
getCause
方法來獲取已完成的當(dāng)前操作失敗的原因墩弯; - 通過
isCancelled
方法來判斷已完成的當(dāng)前操作是否被取消吩跋; - 通過
addListener
方法來注冊監(jiān)聽器,當(dāng)操作已完成(isDone方法返回完成)渔工,將會通知指定的監(jiān)聽器锌钮;若 Future 對象已完成,則通知指定的監(jiān)聽器引矩。
- 通過
- Netty入門案例:HTTP服務(wù)梁丘。
- TestServer.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TestServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(23333).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- TestServerInitializer.java
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向管道加入處理器
//獲取管道
ChannelPipeline pipeline = ch.pipeline();
//加入一個netty 提供的httpServerCodec codec =>[coder - decoder]
//HttpServerCodec 說明
//1侵浸、HttpServerCodec 是netty提供的處理http的編-解碼器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//2、增加一個自定義的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
System.out.println("ok~~~~");
}
}
- TestHttpServerHandler.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import java.net.URI;
/**
* 1氛谜、SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的抽象子類
* 2通惫、HttpObject 客戶端和服務(wù)器端相互通訊的數(shù)據(jù)被封裝成 HttpObject
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
//讀取客戶端數(shù)據(jù)
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("對應(yīng)的channel=" + ctx.channel() + ",pipeline=" + ctx
.pipeline() + "混蔼,通過pipeline獲取channel" + ctx.pipeline().channel());
System.out.println("當(dāng)前ctx的handler=" + ctx.handler());
//判斷 msg 是不是 http request請求
if (msg instanceof HttpRequest) {
System.out.println("ctx 類型=" + ctx.getClass());
System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + "履腋,TestHttpServerHandler hash=" + this.hashCode());
System.out.println("msg 類型=" + msg.getClass());
System.out.println("客戶端地址" + ctx.channel().remoteAddress());
HttpRequest httpRequest = (HttpRequest) msg;
//獲取uri,過濾指定的資源
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("請求了 favicon.ico惭嚣,不做響應(yīng)");
return;
}
//回復(fù)信息給瀏覽器 [http協(xié)議]
ByteBuf content = Unpooled.copiedBuffer("hello遵湖,我是服務(wù)器", CharsetUtil.UTF_8);
//構(gòu)造一個http的響應(yīng),即 http response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//將構(gòu)建好 response 并返回
ctx.writeAndFlush(response);
}
}
}
- Netty 中 Bootstrap 類是客戶端程序的啟動引導(dǎo)類晚吞,ServerBootstrap 是服務(wù)端啟動引導(dǎo)類延旧。常見的方法有:
-
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup)
:該方法用于服務(wù)器端,用來設(shè)置兩個 EventLoop槽地。 -
public B group(EventLoopGroup group)
:該方法用于客戶端迁沫,用來設(shè)置一個 EventLoop。 -
public B channel(Class<? extends C> channelClass)
:該方法用來設(shè)置一個服務(wù)器端的通道實(shí)現(xiàn)捌蚊。 -
public <T> B option(ChannelOption<T> option, T value)
:用來給 ServerChannel 添加配置集畅。 -
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)
:用來給接收到的通道添加配置。 -
public ServerBootstrap childHandler(ChannelHandler childHandler)
:該方法用來設(shè)置業(yè)務(wù)處理類(自定義的handler)缅糟。 -
public ChannelFuture bind(int inetPort)
:該方法用于服務(wù)器端挺智,用來設(shè)置占用的端口號。 -
public ChannelFuture connect(String inetHost, int inetPort)
:該方法用于客戶端窗宦,用來連接服務(wù)器端赦颇。
-
- Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理赴涵,但可以過一會等它執(zhí)行完成或者直接注冊一個監(jiān)聽(通過 Future 和 ChannelFutures來實(shí)現(xiàn))媒怯,當(dāng)操作執(zhí)行成功或失敗時會自動觸發(fā)注冊的監(jiān)聽事件。常見的方法有:
-
Channel channel()
:返回當(dāng)前正在進(jìn)行 IO 操作的通道髓窜。 -
ChannelFuture sync()
:等待異步操作執(zhí)行完畢扇苞。
-
-
Channel
是Netty 網(wǎng)絡(luò)通信的組件,能夠用于執(zhí)行網(wǎng)絡(luò) I/O 操作纱烘。 - 通過 Channel 可獲得當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)杨拐。
- 通過 Channel 可獲得網(wǎng)絡(luò)連接的配置參數(shù)(例如接收緩沖區(qū)大小)擂啥。
- Channel 提供異步的網(wǎng)絡(luò) I/O 操作(如建立連接哄陶,讀寫,綁定端口)哺壶,異步調(diào)用意味著任何 I/O 調(diào)用都將立即返回屋吨,并且不保證在調(diào)用結(jié)束時所請求的 I/O 操作已完成蜒谤。調(diào)用立即返回一個 ChannelFuture 實(shí)例,通過注冊監(jiān)聽器到 ChannelFuture 上至扰,可以 I/O 操作成功鳍徽、失敗或取消時回調(diào)通知調(diào)用方。Channel 支持關(guān)聯(lián) I/O 操作與對應(yīng)的處理程序敢课。不同協(xié)議阶祭、不同的阻塞類型的連接都有不同的 Channel 類型與之對應(yīng),常用的 Channel 類型:
-
NioSocketChannel
:異步的客戶端 TCP Socket 連接直秆。 -
NioServerSocketChannel
:異步的服務(wù)器端 TCP Socket 連接濒募。 -
NioDatagramChannel
:異步的 UDP 連接。 -
NioSctpChannel
:異步的客戶端 Sctp 連接圾结。 -
NioSctpServerChannel
:異步的 Sctp 服務(wù)器端連接瑰剃,這些通道涵蓋了 UDP 和 TCP 網(wǎng)絡(luò) IO 以及文件 IO。
-
- Netty 基于 Selector 對象實(shí)現(xiàn) I/O 多路復(fù)用筝野,通過 Selector 一個線程可以監(jiān)聽多個連接的 Channel 事件晌姚。
- 當(dāng)向一個 Selector 中注冊 Channel 后,Selector 內(nèi)部的機(jī)制就可以自動不斷地查詢(Select)這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀歇竟,可寫挥唠,網(wǎng)絡(luò)連接完成等),這樣程序就可以很簡單地使用一個線程高效地管理多個 Channel途蒋。
- ChannelHandler 是一個接口猛遍,用于處理 I/O 事件或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè)務(wù)處理鏈)中的下一個處理程序号坡。
- ChannelHandler 本身并沒有提供很多方法,因?yàn)檫@個接口有許多的方法需要實(shí)現(xiàn)梯醒,方便使用期間宽堆,可以繼承它的子類。
ChannelHandler 及其實(shí)現(xiàn)類
- 我們經(jīng)常需要自定義一個 Handler 類去繼承 ChannelInboundHandlerAdapter茸习,然后通過重寫相應(yīng)方法實(shí)現(xiàn)業(yè)務(wù)邏輯畜隶,接下來看看一般都需要重寫哪些方法:
- ChannelPipeline 是一個 Handler 的集合,它負(fù)責(zé)處理和攔截 inbound 或者 outbound 的事件和操作号胚,相當(dāng)于一個貫穿 Netty 的鏈籽慢。(也可以這樣理解:ChannelPipeline 是保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作)
- ChannelPipeline 實(shí)現(xiàn)了一種高級形式的攔截過濾器模式猫胁,使用戶可以完全控制事件的處理方式箱亿,以及 Channel 中各個的 ChannelHandler 如何相互交互。
- 在 Netty 中每個 Channel 都有且僅有一個 ChannelPipeline 與之對應(yīng)弃秆,它們的組成關(guān)系如下:
- ChannelPipeline 常用的相關(guān)方法:
ChannelPipeline addFirst(ChannelHandler... handlers)
:把一個業(yè)務(wù)處理類(handler)添加到鏈中的第一個位置届惋;ChannelPipeline addLast(ChannelHandler... handlers)
:把一個業(yè)務(wù)處理類(handler)添加到鏈中的最后一個位置髓帽。 - ChannelHandlerContext 中包含一個具體的事件處理器 ChannelHandler,同時 ChannelHandlerContext 中也綁定了對應(yīng)的 pipeline 和 Channel 的信息脑豹,方便對 ChannelHandler 進(jìn)行調(diào)用郑藏。常用的方法如下:
-
ChannelFuture close()
:關(guān)閉通道。 -
ChannelOutboundInvoker flush()
:刷新瘩欺。 -
ChannelFuture writeAndFlush(Object msg)
:將數(shù)據(jù)寫到ChannelPipeline 中當(dāng)前 ChannelHandler 的下一個 ChannelHandler 開始處理(出站)必盖。
-
- Netty 在創(chuàng)建 Channel 實(shí)例后,一般都需要設(shè)置
ChannelOption
參數(shù)俱饿。
ChannelOption 參數(shù)如下:-
ChannelOption.SO_BACKLOG
:對應(yīng) TCP/IP 協(xié)議 listen 函數(shù)中的 backlog 參數(shù)筑悴,用來初始化服務(wù)器可連接隊(duì)列大小。因?yàn)榉?wù)端處理客戶端連接請求是順序處理的稍途,所以同一時間只能處理一個客戶端連接阁吝,多個客戶端來的時候,服務(wù)端將不能處理的客戶端連接請求放在隊(duì)列中等待處理械拍,backlog 參數(shù)指定了隊(duì)列的大小突勇。 -
ChannelOption.SO_KEEPALIVE
:一直保持連接活動狀態(tài)。
-
-
EventLoopGroup
是一組 EventLoop 的抽象坷虑,Netty 為了更好地利用多核 CPU 資源甲馋,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護(hù)著一個 Selector 實(shí)例迄损。 -
EventLoopGroup
提供 next 接口定躏,可以從組里面按照一定規(guī)則獲取其中一個 EventLoop 來處理任務(wù)。在 Netty 服務(wù)器端編程中芹敌,一般都需要提供兩個 EventLoopGroup痊远,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。常用的方法如下:-
public NioEventLoopGroup()
:構(gòu)造方法氏捞。 -
public Future<?> shutdownGracefully()
:斷開連接碧聪,關(guān)閉線程。
-
- 通常一個服務(wù)端口即一個 ServerSocketChannel 對應(yīng)一個 Selector 和一個 EventLoop 線程液茎。BossEventLoop 負(fù)責(zé)接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來進(jìn)行 IO 處理逞姿,如下圖所示:
- Netty 提供一個專門用來操作緩沖區(qū)(即 Netty 的數(shù)據(jù)容器)的工具類,常用的方法:
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
捆等,類似于 NIO 中的 ByteBuffer 但有區(qū)別滞造。
- NettyByteBuf01.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class NettyByteBuf01 {
public static void main(String[] args) {
//創(chuàng)建一個ByteBuf
//1、創(chuàng)建 ByteBuf 對象栋烤,該對象包含一個數(shù)組谒养,是一個byte[10]
//2、在netty的buffer中班缎,不需要使用flip進(jìn)行反轉(zhuǎn)蝴光,其底層維護(hù)了 readerIndex 和 writerIndex
//3她渴、通過 readerIndex、writerIndex 和 capacity蔑祟,將buffer分成三個區(qū)域
//0 到 readerIndex:表示已經(jīng)讀取的區(qū)域
//readerIndex 到 writerIndex:表示可讀的區(qū)域
//writerIndex 到 capacity:表示可寫的區(qū)域
ByteBuf buffer = Unpooled.buffer(10);
for (int i = 0; i < 10; i++) {
buffer.writeByte(i);
}
System.out.println("capacity=" + buffer.capacity());//10
/*for (int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.getByte(i));
}*/
for (int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte());
}
System.out.println("執(zhí)行完畢");
}
}
- NettyByteBuf02.java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
public class NettyByteBuf02 {
public static void main(String[] args) {
//創(chuàng)建ByteBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", StandardCharsets.UTF_8);
if (byteBuf.hasArray()) { // true
byte[] content = byteBuf.array();
//將 content 轉(zhuǎn)成字符串
System.out.println(new String(content, StandardCharsets.UTF_8));
System.out.println("byteBuf=" + byteBuf);
System.out.println(byteBuf.arrayOffset()); // 0
System.out.println(byteBuf.readerIndex()); // 0
System.out.println(byteBuf.writerIndex()); // 12
System.out.println(byteBuf.capacity()); // 64
//System.out.println(byteBuf.readByte()); //注意:若讀取一個趁耗,則下面可讀取的字節(jié)數(shù)將減1
System.out.println(byteBuf.getByte(0)); // 104
int len = byteBuf.readableBytes(); //返回可讀的字節(jié)數(shù) :12
System.out.println("len=" + len);
//使用for取出各個字節(jié),注意:不會改變r(jià)eaderIndex的值
for (int i = 0; i < len; i++) {
System.out.print((char) byteBuf.getByte(i));
}
System.out.println();
//按照某個范圍讀取
System.out.println(byteBuf.getCharSequence(0, 4, StandardCharsets.UTF_8));
System.out.println(byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8));
}
}
}