???? Netty是一個(gè)基于NIO的網(wǎng)絡(luò)框架,它極大的簡(jiǎn)化并優(yōu)化了TCP和UDP套接字服務(wù)器等網(wǎng)絡(luò)編程源譬,并且性能以及安全性等方面都具備優(yōu)勢(shì)集惋,并且支持多種協(xié)議 ,如 FTP踩娘,SMTP刮刑,HTTP 以及各種二進(jìn)制和基于文本的傳統(tǒng)協(xié)議。
下面先看一個(gè)簡(jiǎn)單的Netty案例:
- 服務(wù)端代碼
@Slf4j
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//創(chuàng)建BossGroup和WorkerGroup
//說(shuō)明:
//1. 創(chuàng)建兩個(gè)線程組bossGroup和workerGroup
//2. bossGroup只是處理鏈接請(qǐng)求养渴,真正的和客戶端業(yè)務(wù)處理雷绢,會(huì)交給workerGroup
//3. 兩個(gè)都是無(wú)限循環(huán)的
//4. bossGroup和workerGroup含有子線程的(NioEventLoop)的個(gè)數(shù),默認(rèn)實(shí)際cup核數(shù)*2
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
/**
* 創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象理卑,配置參數(shù)
*/
ServerBootstrap bootstrap = new ServerBootstrap();
/**
* 使用鏈?zhǔn)阶兂蓙?lái)進(jìn)行設(shè)置
*/
bootstrap.group(bossGroup, workerGroup) //設(shè)置兩個(gè)線程組
.channel(NioServerSocketChannel.class) //使用NioSocketChannel作為服務(wù)器的通道
.option(ChannelOption.SO_BACKLOG, 128) //設(shè)置線程隊(duì)列等待連接個(gè)數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true) //設(shè)置保持活動(dòng)連接狀態(tài)
.childHandler(new ChannelInitializer<SocketChannel>() {//設(shè)置workerGroup的eventLoop對(duì)應(yīng)的管道處理器
//給pipeline設(shè)置處理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//可以使用集合管理SocketChannel翘紊,在推送消息時(shí),可以將業(yè)務(wù)加入到各個(gè)chanel對(duì)應(yīng)的NIOEventLoop的taskQueue或ScheduleTaskQueue
log.info("客戶端SocketChannel hashCode={}", socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
log.info("服務(wù)器 is ready...");
//涉及netty異步模型
ChannelFuture channelFuture = bootstrap.bind(6668).sync(); //綁定一個(gè)端口傻工,并且同步,啟動(dòng)服務(wù)器
//給channelFuture注冊(cè)監(jiān)聽(tīng)器,監(jiān)控我們關(guān)系的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()) {
log.info("監(jiān)聽(tīng)端口6668成功");
}else {
log.info("監(jiān)聽(tīng)端口失敗");
}
}
});
log.info("判斷是否是異步操作");
//對(duì)關(guān)閉通道進(jìn)行監(jiān)聽(tīng)
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 自定義handler
/**
* 自定義handler孵滞, 需要繼承netty規(guī)定好的某個(gè)handlerAdapter
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 讀取數(shù)據(jù)中捆,客戶端發(fā)送的消息
*
* @param ctx 上線文對(duì)象,含有管道pipeline坊饶,通道泄伪,地址
* @param msg 客戶端發(fā)送的數(shù)據(jù),默認(rèn)是Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//將msg轉(zhuǎn)換成byteBuf, netty提供的匿级,不是NIO的byteBuffer
ByteBuf buf = (ByteBuf) msg;
log.info("客戶端發(fā)送的消息是:{}", buf.toString(UTF_8));
log.info("客戶端地址:{}", ctx.channel().remoteAddress());
}
/**
* 讀取數(shù)據(jù)完畢
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//發(fā)送數(shù)蟋滴,需要進(jìn)行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客戶端", UTF_8));
}
/**
* 處理異常,一般需要關(guān)閉通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("發(fā)生異常痘绎,關(guān)閉通道");
ctx.close();
}
}
下面詳細(xì)介紹下Netty的核心組件津函,都是在上面案例使用過(guò)的。
Bootstrap孤页、 ServerBootstrap
BootStrap意思是引導(dǎo)尔苦,一個(gè)Netty應(yīng)用通常由一個(gè)Bootstrap開(kāi)始,主要作用是配置整個(gè)netty程序,串聯(lián)各個(gè)組件允坚,Netty中的Bootstrap類是客戶端程序的啟動(dòng)引導(dǎo)類魂那,ServerBootstrap是服務(wù)端啟動(dòng)引導(dǎo)類
-
常見(jiàn)的方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) //用戶服務(wù)端設(shè)置兩個(gè)EventLoopGroup public B channel(Class<? extends C> channelClass) //設(shè)置一個(gè)服務(wù)器端的通道實(shí)現(xiàn) public <T> B option(ChannelOption<T> option, T value) //給ServerChannel添加配置 public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value)//用來(lái)給接收到的通道添加配置 public ServerBootstrap childHandler(ChannelHandler childHandler)//設(shè)置業(yè)務(wù)處理類(自定義的handler) childHandler 對(duì)應(yīng)workerGroup public ChannelFuture bind(int inetPort) //用戶服務(wù)定,設(shè)置綁定的端口號(hào) public B handler(ChannelHandler handler) //該handler對(duì)應(yīng)BossGroup生效稠项, public ChannelFuture connect(SocketAddress remoteAddress)//用戶客戶端涯雅,用來(lái)連接服務(wù)器
EventLoopGroup 和其實(shí)現(xiàn)類 NioEventLoopGroup
EventLoopGroup 是一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源展运,一般會(huì)有多個(gè) EventLoop 同時(shí)工作活逆,每個(gè) EventLoop 維護(hù)著一個(gè) Selector 實(shí)例。
EventLoopGroup 提供 next 接口乐疆,可以從組里面按照一定規(guī)則獲取其中一個(gè) EventLoop來(lái)處理任務(wù)划乖。在 Netty 服務(wù)器端編程中,我們一般都需要提供兩個(gè) EventLoopGroup挤土,例如:BossEventLoopGroup 和 WorkerEventLoopGroup琴庵。
-
通常一個(gè)服務(wù)端口即一個(gè) ServerSocketChannel對(duì)應(yīng)一個(gè)Selector 和一個(gè)EventLoop線程。BossEventLoop 負(fù)責(zé)接收客戶端的連接并將 SocketChannel 交給 WorkerEventLoopGroup 來(lái)進(jìn)行 IO 處理仰美,如下圖所示
image-20201129201931515.png
BossEventLoopGroup 通常是一個(gè)單線程的 EventLoop迷殿,EventLoop 維護(hù)著一個(gè)注冊(cè)了ServerSocketChannel 的 Selector 實(shí)例BossEventLoop 不斷輪詢 Selector 將連接事件分離出來(lái)
通常是 OP_ACCEPT 事件,然后將接收到的 SocketChannel 交給 WorkerEventLoopGroup
WorkerEventLoopGroup 會(huì)由 next 選擇其中一個(gè) EventLoop來(lái)將這個(gè) SocketChannel 注冊(cè)到其維護(hù)的 Selector 并對(duì)其后續(xù)的 IO 事件進(jìn)行處理
-
常用方法
public NioEventLoopGroup()咖杂,構(gòu)造方法
public Future<?> shutdownGracefully()庆寺,斷開(kāi)連接,關(guān)閉線程
Channel
Netty網(wǎng)絡(luò)通信的組件诉字,能夠用于執(zhí)行網(wǎng)絡(luò)I/O操作
通過(guò)Channel可以獲得當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)
通過(guò)Channel可以獲得網(wǎng)絡(luò)連接的配置參數(shù)(如接受緩沖區(qū)大信吵ⅰ)
Channel提供異步的網(wǎng)絡(luò)I/O操作(如建立連接connect(),讀寫 read()壤圃、write()陵霉,綁定端口 bing()),異步調(diào)用意味著任何I/O調(diào)用都將立即返回伍绳,并不保證在調(diào)用結(jié)束時(shí)踊挠,請(qǐng)求的I/O操作已完成
調(diào)用立即返回一個(gè)ChannelFuture實(shí)例,通過(guò)注冊(cè)監(jiān)聽(tīng)器到ChannelFuture上冲杀,可以I/O操作成功效床,失敗或取消時(shí)回調(diào)通知調(diào)用方
支持關(guān)聯(lián)I/O操作與對(duì)應(yīng)的處理程序
不同協(xié)議、不同的阻塞類型的連接逗游不同的Channel類型與之對(duì)應(yīng)权谁,常見(jiàn)的Channel類型:
NioSocketChannel //異步客戶端TCP Socket連接
NioServerSocketChannel //異步的服務(wù)端TCP Socket連接
NioDatagramChannel //異步的UDP連接
NioSctpChannel //異步的客戶端Sctp連接
NioSctpServerChannel //異步的Sctp的服務(wù)端連接剩檀,這些通道涵蓋了UDP和TCP網(wǎng)絡(luò)IO以及文件IO
Future、channelFuture
netty中所有的IO操作都是異步的旺芽,不能立刻得到消息是否被正確處理谨朝。但是可以過(guò)一會(huì)等到它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽(tīng)卤妒,具體的實(shí)現(xiàn)就是通過(guò)Future和ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽(tīng)字币,當(dāng)操作執(zhí)行成功或者失敗時(shí)監(jiān)聽(tīng)會(huì)自動(dòng)觸發(fā)注冊(cè)的監(jiān)聽(tīng)事件则披。
-
常見(jiàn)方法有
public interface ChannelFuture extends Future<Void> { Channel channel();//返回當(dāng)前執(zhí)行IO操作的通道 ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1); ...... ChannelFuture sync() throws InterruptedException; //等待異步操作執(zhí)行完畢 }
ChanelHandler
- ChannelHandler是一個(gè)接口,處理I/O事件或攔截I/O操作洗出,并將其轉(zhuǎn)發(fā)到其ChannelPipeline(業(yè)務(wù)處理鏈)中的下一個(gè)處理程序
- ChannelHandler本身并沒(méi)有提供很多方法士复,因?yàn)檫@個(gè)接口有許多的方法需要實(shí)現(xiàn),方便使用期間翩活,可以繼承它的子類
-
ChannelHanler及其實(shí)現(xiàn)類一覽圖
image-20201129151459934.png
Pipeline和ChannelPipeline
ChannelPipeline是一個(gè)Handler集合阱洪,它負(fù)責(zé)處理和攔截inbound或者outbound的事件和操作,相當(dāng)于一個(gè)貫穿Netty的鏈(也可以這樣理解:ChannelPipeline是保存ChannelHandler的List菠镇,用于處理或攔截Channel的入站和出站事件)
ChannelPipeline實(shí)現(xiàn)了一種高級(jí)形式的攔截過(guò)濾器模式冗荸,使用戶可以完全控制事件的處理方式,以及Channel中各個(gè)ChannelHandler如何 互相交互利耍。
-
在Netty中每個(gè)Channel都有且僅有一個(gè)ChannelPipeline與之對(duì)應(yīng)蚌本,它們組成關(guān)系如下:
image-20201129172333074.png- 一個(gè) Channel 包含了一個(gè) ChannelPipeline,而 ChannelPipeline 中又維護(hù)了一個(gè)由 ChannelHandlerContext 組成的雙向鏈表隘梨,并且每個(gè) ChannelHandlerContext 中又關(guān)聯(lián)著一個(gè) ChannelHandler
- 入站事件和出站事件在一個(gè)雙向鏈表中程癌,入站事件會(huì)從鏈表 head 往后傳遞到最后一個(gè)入站的 handler,出站事件會(huì)從鏈表 tail 往前傳遞到最前一個(gè)出站的 handler轴猎,兩種類型的 handler 互不干擾
-
常用方法
ChannelPipeline addFirst(ChannelHandler... handlers)嵌莉,把一個(gè)業(yè)務(wù)處理類(handler)添加到鏈中的第一個(gè)位置 ChannelPipeline addLast(ChannelHandler... handlers),把一個(gè)業(yè)務(wù)處理類(handler)添加到鏈中的最后一個(gè)位置
ChannelHandlerContext
保存 Channel 相關(guān)的所有上下文信息捻脖,同時(shí)關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象
即ChannelHandlerContext 中 包 含 一 個(gè) 具 體 的 事 件 處 理 器 ChannelHandler 锐峭, 同 時(shí)ChannelHandlerContext 中也綁定了對(duì)應(yīng)的 pipeline 和 Channel 的信息,方便對(duì) ChannelHandler進(jìn)行調(diào)用.
-
常用方法
ChannelFuture close()可婶,關(guān)閉通道
ChannelOutboundInvoker flush()沿癞,刷新
ChannelFuture writeAndFlush(Object msg) , 將 數(shù) 據(jù) 寫 到 ChannelPipeline 中 當(dāng) 前
ChannelHandler 的下一個(gè) ChannelHandler 開(kāi)始處理(出站)