個(gè)人專題目錄
1. Netty概述與高性能架構(gòu)設(shè)計(jì)
1.1 原生NIO 存在的問題
NIO 的類庫(kù)和 API 繁雜,使用麻煩:需要熟練掌握 Selector癌别、ServerSocketChannel结榄、SocketChannel缩挑、ByteBuffer 等烂瘫。
需要具備其他的額外技能:要熟悉 Java 多線程編程,因?yàn)?NIO 編程涉及到 Reactor 模式宏胯,你必須對(duì)多線程和網(wǎng)絡(luò)編程非常熟悉,才能編寫出高質(zhì)量的 NIO 程序本姥。
開發(fā)工作量和難度都非常大:例如客戶端面臨斷連重連肩袍、網(wǎng)絡(luò)閃斷、半包讀寫婚惫、失敗緩存氛赐、網(wǎng)絡(luò)擁塞和異常流的處理等等魂爪。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會(huì)導(dǎo)致 Selector 空輪詢鹰祸,最終導(dǎo)致 CPU 100%甫窟。直到 JDK 1.7 版本該問題仍舊存在,沒有被根本解決蛙婴。
1.2 Netty 的優(yōu)點(diǎn)
Netty 對(duì) JDK 自帶的 NIO 的 API 進(jìn)行了封裝粗井,解決了上述問題。
設(shè)計(jì)優(yōu)雅:適用于各種傳輸類型的統(tǒng)一 API 阻塞和非阻塞 Socket街图;基于靈活且可擴(kuò)展的事件模型浇衬,可以清晰地分離關(guān)注點(diǎn);高度可定制的線程模型 - 單線程餐济,一個(gè)或多個(gè)線程池.
使用方便:詳細(xì)記錄的 Javadoc耘擂,用戶指南和示例;沒有其他依賴項(xiàng)絮姆,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了醉冤。
高性能、吞吐量更高:延遲更低篙悯;減少資源消耗蚁阳;最小化不必要的內(nèi)存復(fù)制。
安全:完整的 SSL/TLS 和 StartTLS 支持鸽照。
社區(qū)活躍螺捐、不斷更新:社區(qū)活躍,版本迭代周期短矮燎,發(fā)現(xiàn)的 Bug 可以被及時(shí)修復(fù)定血,同時(shí),更多的新功能會(huì)被加入
1.3 簡(jiǎn)介
Netty是由JBOSS提供的一個(gè)java開源框架诞外。Netty提供異步的澜沟、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能峡谊、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序倔喂。
也就是說,Netty 是一個(gè)基于NIO的客戶靖苇、服務(wù)器端編程框架席噩,使用Netty 可以確保你快速和簡(jiǎn)單的開發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶贤壁,服務(wù)端應(yīng)用悼枢。Netty相當(dāng)簡(jiǎn)化和流線化了網(wǎng)絡(luò)應(yīng)用的編程開發(fā)過程,例如脾拆,TCP和UDP的socket服務(wù)開發(fā)馒索。
“快速”和“簡(jiǎn)單”并不用產(chǎn)生維護(hù)性或性能上的問題莹妒。Netty 是一個(gè)吸收了多種協(xié)議的實(shí)現(xiàn)經(jīng)驗(yàn),這些協(xié)議包括FTP,SMTP,HTTP绰上,各種二進(jìn)制旨怠,文本協(xié)議,并經(jīng)過相當(dāng)精心設(shè)計(jì)的項(xiàng)目蜈块,最終鉴腻,Netty 成功的找到了一種方式,在保證易于開發(fā)的同時(shí)還保證了其應(yīng)用的性能百揭,穩(wěn)定性和伸縮性爽哎。
Netty從4.x版本開始,需要使用JDK1.6及以上版本提供基礎(chǔ)支撐器一。
在設(shè)計(jì)上:針對(duì)多種傳輸類型的統(tǒng)一接口 - 阻塞和非阻塞课锌;簡(jiǎn)單但更強(qiáng)大的線程模型;真正的無連接的數(shù)據(jù)報(bào)套接字支持祈秕;鏈接邏輯支持復(fù)用渺贤;
在性能上:比核心 Java API 更好的吞吐量,較低的延時(shí)请毛;資源消耗更少志鞍,這個(gè)得益于共享池和重用;減少內(nèi)存拷貝
在健壯性上:消除由于慢获印,快述雾,或重載連接產(chǎn)生的 OutOfMemoryError街州;消除經(jīng)常發(fā)現(xiàn)在 NIO 在高速網(wǎng)絡(luò)中的應(yīng)用中的不公平的讀/寫比
在安全上:完整的 SSL / TLS 和 StartTLS 的支持
且已得到大量商業(yè)應(yīng)用的真實(shí)驗(yàn)證,如:Hadoop項(xiàng)目的Avro(RPC框架)兼丰、Dubbo、Dubbox等RPC框架唆缴。
Netty的官網(wǎng)是:http://netty.io
有三方提供的中文翻譯Netty用戶手冊(cè)(官網(wǎng)提供源信息):http://ifeve.com/netty5-user-guide/
1.4 Netty架構(gòu)
Netty 采用了比較典型的三層網(wǎng)絡(luò)架構(gòu)進(jìn)行設(shè)計(jì)鳍征,邏輯架構(gòu)圖如下所示:
- 第一層,Reactor 通信調(diào)度層面徽,它由一系列輔助類完成艳丛,包括 Reactor 線程 NioEventLoop 以及其父類、NioSocketChannel/NioServerSocketChannel 以及其父 類趟紊、ByteBuffer 以及由其衍生出來的各種 Buffer氮双、Unsafe以及其衍生出的各種內(nèi)部類等。該層的主要職責(zé)就是監(jiān)聽網(wǎng)絡(luò)的讀寫和連接操作霎匈,負(fù)責(zé)將網(wǎng)絡(luò)層的數(shù)據(jù) 讀取到內(nèi)存緩沖區(qū)中戴差,然后觸發(fā)各種網(wǎng)絡(luò)事件,例如連接創(chuàng)建铛嘱、連接激活暖释、讀事 件袭厂、寫事件等等,將這些事件觸發(fā)到 PipeLine 中球匕,由 PipeLine 充當(dāng)?shù)穆氊?zé)鏈來 進(jìn)行后續(xù)的處理纹磺。
- 第二層,職責(zé)鏈 PipeLine,它負(fù)責(zé)事件在職責(zé)鏈中的有序傳播亮曹,同時(shí)負(fù)責(zé)動(dòng)態(tài)的 編排職責(zé)鏈橄杨,職責(zé)鏈可以選擇監(jiān)聽和處理自己關(guān)心的事件,它可以攔截處理和向 后/向前傳播事件乾忱,不同的應(yīng)用的 Handler 節(jié)點(diǎn)的功能也不同讥珍,通常情況下,往往 會(huì)開發(fā)編解碼 Hanlder 用于消息的編解碼窄瘟,它可以將外部的協(xié)議消息轉(zhuǎn)換成內(nèi)部 的 POJO 對(duì)象衷佃,這樣上層業(yè)務(wù)側(cè)只需要關(guān)心處理業(yè)務(wù)邏輯即可,不需要感知底層 的協(xié)議差異和線程模型差異蹄葱,實(shí)現(xiàn)了架構(gòu)層面的分層隔離氏义。
- 第三層,業(yè)務(wù)邏輯處理層图云」哂疲可以分為兩類:
- 純粹的業(yè)務(wù)邏輯 處理,例如訂單處理竣况。
- 應(yīng)用層協(xié)議管理克婶,例如HTTP協(xié)議、FTP協(xié)議等丹泉。
接下來汪拥,我從影響通信性能的三個(gè)方面(I/O模型柴墩、線程調(diào)度模型丐一、序列化方式)來談?wù)凬etty的架構(gòu)卓练。
1.5 線程模型
Netty中支持單線程模型,多線程模型晒哄,主從多線程模型睁宰。
在大多數(shù)場(chǎng)景下,并行多線程處理可以提升系統(tǒng)的并發(fā)性能寝凌。但是柒傻,如果對(duì)于共享資源的并發(fā)訪問處理不當(dāng),會(huì)帶來嚴(yán)重的鎖競(jìng)爭(zhēng)较木,這最終會(huì)導(dǎo)致性能的下降红符。為了盡可能的避免鎖競(jìng)爭(zhēng)帶來的性能損耗,可以通過串行化設(shè)計(jì),即消息的處理盡可能在同一個(gè)線程內(nèi)完成违孝,期間不進(jìn)行線程切換刹前,這樣就避免了多線程競(jìng)爭(zhēng)和同步鎖。
為了盡可能提升性能雌桑,Netty采用了串行無鎖化設(shè)計(jì)喇喉,在I/O線程內(nèi)部進(jìn)行串行操作,避免多線程競(jìng)爭(zhēng)導(dǎo)致的性能下降校坑。表面上看拣技,串行化設(shè)計(jì)似乎CPU利用率不高,并發(fā)程度不夠耍目。但是膏斤,通過調(diào)整NIO線程池的線程參數(shù),可以同時(shí)啟動(dòng)多個(gè)串行化的線程并行運(yùn)行邪驮,這種局部無鎖化的串行線程設(shè)計(jì)相比一個(gè)隊(duì)列-多個(gè)工作線程模型性能更優(yōu)莫辨。
Reactor單線程模型
在ServerBootstrap調(diào)用方法group的時(shí)候,傳遞的參數(shù)是同一個(gè)線程組毅访,且在構(gòu)造線程組的時(shí)候沮榜,構(gòu)造參數(shù)為1,這種開發(fā)方式喻粹,就是一個(gè)單線程模型蟆融。
個(gè)人機(jī)開發(fā)測(cè)試使用。不推薦守呜。
Reactor多線程模型
在ServerBootstrap調(diào)用方法group的時(shí)候型酥,傳遞的參數(shù)是兩個(gè)不同的線程組。負(fù)責(zé)監(jiān)聽的acceptor線程組查乒,線程數(shù)為1弥喉,也就是構(gòu)造參數(shù)為1。負(fù)責(zé)處理客戶端任務(wù)的線程組侣颂,線程數(shù)大于1档桃,也就是構(gòu)造參數(shù)大于1枪孩。這種開發(fā)方式憔晒,就是多線程模型。
長(zhǎng)連接蔑舞,且客戶端數(shù)量較少拒担,連接持續(xù)時(shí)間較長(zhǎng)情況下使用。如:企業(yè)內(nèi)部交流應(yīng)用攻询。
Reactor主從多線程模型
在ServerBootstrap調(diào)用方法group的時(shí)候从撼,傳遞的參數(shù)是兩個(gè)不同的線程組。負(fù)責(zé)監(jiān)聽的acceptor線程組,線程數(shù)大于1低零,也就是構(gòu)造參數(shù)大于1婆翔。負(fù)責(zé)處理客戶端任務(wù)的線程組,線程數(shù)大于1掏婶,也就是構(gòu)造參數(shù)大于1啃奴。這種開發(fā)方式,就是主從多線程模型雄妥。
長(zhǎng)連接最蕾,客戶端數(shù)量相對(duì)較多,連接持續(xù)時(shí)間比較長(zhǎng)的情況下使用老厌。如:對(duì)外提供服務(wù)的相冊(cè)服務(wù)器瘟则。
Netty模型
Netty 線程模式(Netty 主要基于主從Reactor 多線程模型做了一定的改進(jìn),其中主從Reactor 多線程模型有多個(gè)Reactor)
Netty抽象出兩組線程池 BossGroup 專門負(fù)責(zé)接收客戶端的連接, WorkerGroup 專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫
BossGroup 和 WorkerGroup 類型都是 NioEventLoopGroup
NioEventLoopGroup 相當(dāng)于一個(gè)事件循環(huán)組, 這個(gè)組中含有多個(gè)事件循環(huán) 枝秤,每一個(gè)事件循環(huán)是 NioEventLoop
NioEventLoop 表示一個(gè)不斷循環(huán)的執(zhí)行處理任務(wù)的線程醋拧, 每個(gè)NioEventLoop 都有一個(gè)selector , 用于監(jiān)聽綁定在其上的socket的網(wǎng)絡(luò)通訊
NioEventLoopGroup 可以有多個(gè)線程, 即可以含有多個(gè)NioEventLoop
-
每個(gè)Boss NioEventLoop 循環(huán)執(zhí)行的步驟有3步
1.輪詢accept 事件
2.處理accept 事件 , 與client建立連接 , 生成NioScocketChannel , 并將其注冊(cè)到某個(gè)worker NIOEventLoop 上的 selector
3.處理任務(wù)隊(duì)列的任務(wù) , 即 runAllTasks
-
每個(gè) Worker NIOEventLoop 循環(huán)執(zhí)行的步驟
1.輪詢r(jià)ead, write 事件
2.處理i/o事件淀弹, 即read , write 事件趁仙,在對(duì)應(yīng)NioScocketChannel 處理
3.處理任務(wù)隊(duì)列的任務(wù) , 即 runAllTasks
每個(gè)Worker NIOEventLoop處理業(yè)務(wù)時(shí)垦页,會(huì)使用pipeline(管道), pipeline 中包含了 channel , 即通過pipeline 可以獲取到對(duì)應(yīng)通道, 管道中維護(hù)了很多的 處理器
Netty快速入門實(shí)例-TCP服務(wù)
public class NettyServer {
public static void main(String[] args) throws Exception {
//創(chuàng)建BossGroup 和 WorkerGroup
//說明
//1. 創(chuàng)建兩個(gè)線程組 bossGroup 和 workerGroup
//2. bossGroup 只是處理連接請(qǐng)求 , 真正的和客戶端業(yè)務(wù)處理雀费,會(huì)交給 workerGroup完成
//3. 兩個(gè)都是無限循環(huán)
//4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個(gè)數(shù)
// 默認(rèn)實(shí)際 cpu核數(shù) * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//8
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創(chuàng)建服務(wù)器端的啟動(dòng)對(duì)象,配置參數(shù)
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈?zhǔn)骄幊虂磉M(jìn)行設(shè)置//設(shè)置兩個(gè)線程組
bootstrap.group(bossGroup, workerGroup)
//使用NioSocketChannel 作為服務(wù)器的通道實(shí)現(xiàn)
.channel(NioServerSocketChannel.class)
// 設(shè)置線程隊(duì)列得到連接個(gè)數(shù)
.option(ChannelOption.SO_BACKLOG, 128)
//設(shè)置保持活動(dòng)連接狀態(tài)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// .handler(null) // 該 handler對(duì)應(yīng) bossGroup , childHandler 對(duì)應(yīng) workerGroup
//創(chuàng)建一個(gè)通道初始化對(duì)象(匿名對(duì)象)
.childHandler(new ChannelInitializer<SocketChannel>() {
//給pipeline 設(shè)置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//可以使用一個(gè)集合管理 SocketChannel痊焊, 再推送消息時(shí)盏袄,可以將業(yè)務(wù)加入到各個(gè)channel 對(duì)應(yīng)的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
System.out.println("SocketChannel hashcode=" + ch.hashCode());
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 給我們的workerGroup 的 EventLoop 對(duì)應(yīng)的管道設(shè)置處理器
System.out.println(".....服務(wù)器 is ready...");
//綁定一個(gè)端口并且同步, 生成了一個(gè) ChannelFuture 對(duì)象
//啟動(dòng)服務(wù)器(并綁定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//給cf 注冊(cè)監(jiān)聽器,監(jiān)控我們關(guān)心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監(jiān)聽端口 6668 成功");
} else {
System.out.println("監(jiān)聽端口 6668 失敗");
}
}
});
//對(duì)關(guān)閉通道進(jìn)行監(jiān)聽
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 說明
* 1. 我們自定義一個(gè)Handler 需要繼續(xù)netty 規(guī)定好的某個(gè)HandlerAdapter(規(guī)范)
* 2. 這時(shí)我們自定義一個(gè)Handler , 才能稱為一個(gè)handler
*
* @author Administrator
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
/**
* 讀取數(shù)據(jù)實(shí)際(這里我們可以讀取客戶端發(fā)送的消息)
* 1. ChannelHandlerContext ctx:上下文對(duì)象, 含有 管道pipeline , 通道channel, 地址
* 2. Object msg: 就是客戶端發(fā)送的數(shù)據(jù) 默認(rèn)Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//比如這里我們有一個(gè)非常耗時(shí)長(zhǎng)的業(yè)務(wù)-> 異步執(zhí)行 -> 提交該channel 對(duì)應(yīng)的NIOEventLoop 的 taskQueue中,
//解決方案1 用戶程序自定義的普通任務(wù)
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
System.out.println("服務(wù)器發(fā)送線程2 " + Thread.currentThread().getName());
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵2", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("發(fā)生異常" + ex.getMessage());
}
}
});
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
System.out.println(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
System.out.println("服務(wù)器發(fā)送線程3 " + Thread.currentThread().getName());
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵3", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("發(fā)生異常" + ex.getMessage());
}
}
});
//解決方案2 : 用戶自定義定時(shí)任務(wù) -》 該任務(wù)是提交到 scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
System.out.println(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
System.out.println("服務(wù)器發(fā)送線程4 " + Thread.currentThread().getName());
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵4", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("發(fā)生異常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
System.out.println("go on ...");
System.out.println("服務(wù)器讀取線程 " + Thread.currentThread().getName() + " channel =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的關(guān)系");
Channel channel = ctx.channel();
//本質(zhì)是一個(gè)雙向鏈接, 出站入站
ChannelPipeline pipeline = ctx.pipeline();
//將 msg 轉(zhuǎn)成一個(gè) ByteBuf
//ByteBuf 是 Netty 提供的薄啥,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
System.out.println("客戶端發(fā)送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + channel.remoteAddress());
}
/**
* 數(shù)據(jù)讀取完畢
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//將數(shù)據(jù)寫入到緩存辕羽,并刷新
//一般講,我們對(duì)這個(gè)發(fā)送的數(shù)據(jù)進(jìn)行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
/**
* 處理異常, 一般是需要關(guān)閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個(gè)事件循環(huán)組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創(chuàng)建客戶端啟動(dòng)對(duì)象
//注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設(shè)置相關(guān)參數(shù) 設(shè)置線程組
bootstrap.group(group)
// 設(shè)置客戶端通道的實(shí)現(xiàn)類(反射)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入自己的處理器
ch.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客戶端 ok..");
//啟動(dòng)客戶端去連接服務(wù)器端
//關(guān)于 ChannelFuture 要分析垄惧,涉及到netty的異步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//給關(guān)閉通道進(jìn)行監(jiān)聽
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 當(dāng)通道就緒就會(huì)觸發(fā)該方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
}
/**
* 當(dāng)通道有讀取事件時(shí)刁愿,會(huì)觸發(fā)
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服務(wù)器回復(fù)的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服務(wù)器的地址: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
任務(wù)隊(duì)列中的Task 有3 種典型使用場(chǎng)景
用戶程序自定義的普通任務(wù) [舉例說明]
用戶自定義定時(shí)任務(wù)
非當(dāng)前 Reactor 線程調(diào)用 Channel 的各種方法
例如在推送系統(tǒng)的業(yè)務(wù)線程里面,根據(jù)用戶的標(biāo)識(shí)到逊,找到對(duì)應(yīng)的 Channel 引用铣口,然后調(diào)用 Write 類方法向該用戶推送消息,就會(huì)進(jìn)入到這種場(chǎng)景觉壶。最終的Write 會(huì)提交到任務(wù)隊(duì)列中后被異步消費(fèi)
方案再說明
Netty 抽象出兩組線程池脑题,BossGroup 專門負(fù)責(zé)接收客戶端連接,WorkerGroup 專門負(fù)責(zé)網(wǎng)絡(luò)讀寫操作铜靶。
NioEventLoop 表示一個(gè)不斷循環(huán)執(zhí)行處理任務(wù)的線程叔遂,每個(gè) NioEventLoop 都有一個(gè) selector,用于監(jiān)聽綁定在其上的 socket 網(wǎng)絡(luò)通道。
-
NioEventLoop 內(nèi)部采用串行化設(shè)計(jì)已艰,從消息的讀取->解碼->處理->編碼->發(fā)送痊末,始終由 IO 線程 NioEventLoop 負(fù)責(zé)
- NioEventLoopGroup 下包含多個(gè) NioEventLoop
- 每個(gè) NioEventLoop 中包含有一個(gè) Selector,一個(gè) taskQueue
- 每個(gè) NioEventLoop 的 Selector 上可以注冊(cè)監(jiān)聽多個(gè) NioChannel
- 每個(gè) NioChannel 只會(huì)綁定在唯一的 NioEventLoop 上
- 每個(gè) NioChannel 都綁定有一個(gè)自己的 ChannelPipeline
異步模型
基本介紹
異步的概念和同步相對(duì)哩掺。當(dāng)一個(gè)異步過程調(diào)用發(fā)出后舌胶,調(diào)用者不能立刻得到結(jié)果。實(shí)際處理這個(gè)調(diào)用的組件在完成后疮丛,通過狀態(tài)幔嫂、通知和回調(diào)來通知調(diào)用者。
Netty 中的 I/O 操作是異步的誊薄,包括 Bind履恩、Write、Connect 等操作會(huì)簡(jiǎn)單的返回一個(gè) ChannelFuture呢蔫。
調(diào)用者并不能立刻獲得結(jié)果切心,而是通過 Future-Listener 機(jī)制,用戶可以方便的主動(dòng)獲取或者通過通知機(jī)制獲得 IO 操作結(jié)果
Netty 的異步模型是建立在 future 和 callback 的之上的片吊。callback 就是回調(diào)绽昏。重點(diǎn)說 Future,它的核心思想是:假設(shè)一個(gè)方法 fun俏脊,計(jì)算過程可能非常耗時(shí)全谤,等待 fun返回顯然不合適。那么可以在調(diào)用 fun 的時(shí)候爷贫,立馬返回一個(gè) Future认然,后續(xù)可以通過 Future去監(jiān)控方法 fun 的處理過程(即 : Future-Listener 機(jī)制)
Future 說明
表示異步的執(zhí)行結(jié)果, 可以通過它提供的方法來檢測(cè)執(zhí)行是否完成,比如檢索計(jì)算等等.
ChannelFuture 是一個(gè)接口 : public interface ChannelFuture extends Future<Void>
我們可以添加監(jiān)聽器漫萄,當(dāng)監(jiān)聽的事件發(fā)生時(shí)卷员,就會(huì)通知到監(jiān)聽器. 案例說明
工作原理說明
在使用 Netty 進(jìn)行編程時(shí),攔截操作和轉(zhuǎn)換出入站數(shù)據(jù)只需要您提供 callback 或利用future 即可腾务。這使得鏈?zhǔn)讲僮?/strong>簡(jiǎn)單毕骡、高效, 并有利于編寫可重用的、通用的代碼岩瘦。
Netty 框架的目標(biāo)就是讓你的業(yè)務(wù)邏輯從網(wǎng)絡(luò)基礎(chǔ)應(yīng)用編碼中分離出來未巫、解脫出來
Future-Listener機(jī)制
當(dāng) Future 對(duì)象剛剛創(chuàng)建時(shí),處于非完成狀態(tài)担钮,調(diào)用者可以通過返回的 ChannelFuture 來獲取操作執(zhí)行的狀態(tài)橱赠,注冊(cè)監(jiān)聽函數(shù)來執(zhí)行完成后的操作尤仍。
常見有如下操作
- 通過 isDone 方法來判斷當(dāng)前操作是否完成箫津;
- 通過 isSuccess 方法來判斷已完成的當(dāng)前操作是否成功;
- 通過 getCause 方法來獲取已完成的當(dāng)前操作失敗的原因;
- 通過 isCancelled 方法來判斷已完成的當(dāng)前操作是否被取消苏遥;
- 通過 addListener 方法來注冊(cè)監(jiān)聽器饼拍,當(dāng)操作已完成(isDone 方法返回完成),將會(huì)通知指定的監(jiān)聽器田炭;如果 Future 對(duì)象已完成师抄,則通知指定的監(jiān)聽器
- 舉例說明
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]綁定成功!");
} else{
System.err.println("端口["+ port + "]綁定失敗!");
}
});
小結(jié):相比傳統(tǒng)阻塞 I/O,執(zhí)行 I/O 操作后線程會(huì)被阻塞住, 直到操作完成教硫;異步處理的好處是不會(huì)造成線程阻塞叨吮,線程在 I/O 操作期間可以執(zhí)行別的程序,在高并發(fā)情形下會(huì)更穩(wěn)定和更高的吞吐量
快速入門實(shí)例-HTTP 服務(wù)
public class TestServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(6666).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 說明
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
* 2. HttpObject 客戶端和服務(wù)器端相互通訊的數(shù)據(jù)被封裝成 HttpObject
*
* @author Administrator
*/
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* channelRead0 讀取客戶端數(shù)據(jù)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("對(duì)應(yīng)的channel=" + ctx.channel() + " pipeline=" + ctx
.pipeline() + " 通過pipeline獲取channel" + ctx.pipeline().channel());
System.out.println("當(dāng)前ctx的handler=" + ctx.handler());
//判斷 msg 是不是 httpRequest請(qǐng)求
if (msg instanceof HttpRequest) {
System.out.println("ctx 類型=" + ctx.getClass());
System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode());
System.out.println("msg 類型=" + msg.getClass());
System.out.println("客戶端地址" + ctx.channel().remoteAddress());
//獲取到
HttpRequest httpRequest = (HttpRequest) msg;
//獲取uri, 過濾指定的資源
URI uri = new URI(httpRequest.uri());
if ("/favicon.ico".equals(uri.getPath())) {
System.out.println("請(qǐng)求了 favicon.ico, 不做響應(yīng)");
return;
}
//回復(fù)信息給瀏覽器 [http協(xié)議]
ByteBuf content = Unpooled.copiedBuffer("hello, 我是服務(wù)器", CharsetUtil.UTF_8);
//構(gòu)造一個(gè)http的相應(yīng)瞬矩,即 HttpResponse
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
//將構(gòu)建好 response返回
ctx.writeAndFlush(response);
}
}
}
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向管道加入處理器
//得到管道
ChannelPipeline pipeline = ch.pipeline();
//加入一個(gè)netty 提供的httpServerCodec codec =>[coder - decoder]
//1. HttpServerCodec 是netty 提供的處理http的 編-解碼器
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
//2. 增加一個(gè)自定義的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
System.out.println("ok~~~~");
}
}
1.6 流量整形
流量整形(Traffic Shaping)是一種主動(dòng)調(diào)整流量輸出速率的措施茶鉴。Netty的流量整形有兩個(gè)作用:
- 防止由于上下游網(wǎng)元性能不均衡導(dǎo)致下游網(wǎng)元被壓垮,業(yè)務(wù)流程中斷景用;
- 防止由于通信模塊接收消息過快涵叮,后端業(yè)務(wù)線程處理不及時(shí)導(dǎo)致的“撐死”問題。
流量整形的原理示意圖如下:
流量整形(Traffic Shaping)是一種主動(dòng)調(diào)整流量輸出速率的措施伞插。一個(gè)典型應(yīng)用是基于下游網(wǎng)絡(luò)結(jié)點(diǎn)的TP指標(biāo)來控制本地流量的輸出割粮。流量整形與流量監(jiān)管的主要區(qū)別在于,流量整形對(duì)流量監(jiān)管中需要丟棄的報(bào)文進(jìn)行緩存——通常是將它們放入緩沖區(qū)或隊(duì)列內(nèi)媚污,也稱流量整形(Traffic Shaping舀瓢,簡(jiǎn)稱TS)。當(dāng)令牌桶有足夠的令牌時(shí)耗美,再均勻的向外發(fā)送這些被緩存的報(bào)文氢伟。流量整形與流量監(jiān)管的另一區(qū)別是,整形可能會(huì)增加延遲幽歼,而監(jiān)管幾乎不引入額外的延遲朵锣。
Netty支持兩種流量整形模式:
- 全局流量整形:全局流量整形的作用范圍是進(jìn)程級(jí)的,無論你創(chuàng)建了多少個(gè)Channel甸私,它的作用域針對(duì)所有的Channel诚些。用戶可以通過參數(shù)設(shè)置:報(bào)文的接收速率、報(bào)文的發(fā)送速率皇型、整形周期诬烹。[GlobalChannelTrafficShapingHandler]
- 鏈路級(jí)流量整形:?jiǎn)捂溌妨髁空闻c全局流量整形的最大區(qū)別就是它以單個(gè)鏈路為作用域,可以對(duì)不同的鏈路設(shè)置不同的整形策略弃鸦。[ChannelTrafficShapingHandler針對(duì)于每個(gè)channel]
1.7 優(yōu)雅停機(jī)
Netty的優(yōu)雅停機(jī)三部曲:
- 不再接收新消息
- 退出前的預(yù)處理操作
- 資源的釋放操作
Java的優(yōu)雅停機(jī)通常通過注冊(cè)JDK的ShutdownHook來實(shí)現(xiàn)绞吁,當(dāng)系統(tǒng)接收到退出指令后,首先標(biāo)記系統(tǒng)處于退出狀態(tài)唬格,不再接收新的消息家破,然后將積壓的消息處理完颜说,最后調(diào)用資源回收接口將資源銷毀,最后各線程退出執(zhí)行汰聋。
通常優(yōu)雅退出需要有超時(shí)控制機(jī)制门粪,例如30S,如果到達(dá)超時(shí)時(shí)間仍然沒有完成退出前的資源回收等操作烹困,則由停機(jī)腳本直接調(diào)用kill -9 pid玄妈,強(qiáng)制退出。
在實(shí)際項(xiàng)目中髓梅,Netty作為高性能的異步NIO通信框架拟蜻,往往用作基礎(chǔ)通信框架負(fù)責(zé)各種協(xié)議的接入、解析和調(diào)度等枯饿,例如在RPC和分布式服務(wù)框架中瞭郑,往往會(huì)使用Netty作為內(nèi)部私有協(xié)議的基礎(chǔ)通信框架。
當(dāng)應(yīng)用進(jìn)程優(yōu)雅退出時(shí)鸭你,作為通信框架的Netty也需要優(yōu)雅退出屈张,主要原因如下:
- 盡快的釋放NIO線程、句柄等資源袱巨;
- 如果使用flush做批量消息發(fā)送阁谆,需要將積攢在發(fā)送隊(duì)列中的待發(fā)送消息發(fā)送完成;
- 正在write或者read的消息愉老,需要繼續(xù)處理场绿;
- 設(shè)置在NioEventLoop線程調(diào)度器中的定時(shí)任務(wù),需要執(zhí)行或者清理
1.8 Netty架構(gòu)剖析之安全性
Netty面臨的安全挑戰(zhàn):
- 對(duì)第三方開放
- 作為應(yīng)用層協(xié)議的基礎(chǔ)通信框架
安全威脅場(chǎng)景分析:
- 對(duì)第三方開放的通信框架:如果使用Netty做RPC框架或者私有協(xié)議棧嫉入,RPC框架面向非授信的第三方開放焰盗,例如將內(nèi)部的一些能力通過服務(wù)對(duì)外開放出去,此時(shí)就需要進(jìn)行安全認(rèn)證咒林,如果開放的是公網(wǎng)IP熬拒,對(duì)于安全性要求非常高的一些服務(wù),例如在線支付垫竞、訂購(gòu)等澎粟,需要通過SSL/TLS進(jìn)行通信。
- 應(yīng)用層協(xié)議的安全性:作為高性能欢瞪、異步事件驅(qū)動(dòng)的NIO框架活烙,Netty非常適合構(gòu)建上層的應(yīng)用層協(xié)議。由于絕大多數(shù)應(yīng)用層協(xié)議都是公有的遣鼓,這意味著底層的Netty需要向上層提供通信層的安全傳輸功能啸盏。
SSL/TLS
Netty安全傳輸特性:
- 支持SSL V2和V3
- 支持TLS
- 支持SSL單向認(rèn)證、雙向認(rèn)證和第三方CA認(rèn)證骑祟。
Netty通過SslHandler提供了對(duì)SSL的支持回懦,它支持的SSL協(xié)議類型包括:SSL V2气笙、SSL V3和TLS。
- 單向認(rèn)證:?jiǎn)蜗蛘J(rèn)證粉怕,即客戶端只驗(yàn)證服務(wù)端的合法性健民,服務(wù)端不驗(yàn)證客戶端抒巢。
- 雙向認(rèn)證:與單向認(rèn)證不同的是服務(wù)端也需要對(duì)客戶端進(jìn)行安全認(rèn)證贫贝。這就意味著客戶端的自簽名證書也需要導(dǎo)入到服務(wù)端的數(shù)字證書倉(cāng)庫(kù)中。
- CA認(rèn)證:基于自簽名的SSL雙向認(rèn)證蛉谜,只要客戶端或者服務(wù)端修改了密鑰和證書稚晚,就需要重新進(jìn)行簽名和證書交換,這種調(diào)試和維護(hù)工作量是非常大的型诚。因此客燕,在實(shí)際的商用系統(tǒng)中往往會(huì)使用第三方CA證書頒發(fā)機(jī)構(gòu)進(jìn)行簽名和驗(yàn)證。我們的瀏覽器就保存了幾個(gè)常用的CA_ROOT狰贯。每次連接到網(wǎng)站時(shí)只要這個(gè)網(wǎng)站的證書是經(jīng)過這些CA_ROOT簽名過的也搓。就可以通過驗(yàn)證了。
可擴(kuò)展的安全特性
通過Netty的擴(kuò)展特性涵紊,可以自定義安全策略:
- IP地址黑名單機(jī)制
- 接入認(rèn)證
- 敏感信息加密或者過濾機(jī)制
IP地址黑名單是比較常用的弱安全保護(hù)策略傍妒,它的特點(diǎn)就是服務(wù)端在與客戶端通信的過程中,對(duì)客戶端的IP地址進(jìn)行校驗(yàn)摸柄,如果發(fā)現(xiàn)對(duì)方IP在黑名單列表中颤练,則拒絕與其通信,關(guān)閉鏈路驱负。
接入認(rèn)證策略非常多嗦玖,通常是較強(qiáng)的安全認(rèn)證策略,例如基于用戶名+密碼的認(rèn)證跃脊,認(rèn)證內(nèi)容往往采用加密的方式宇挫,例如Base64+AES等。
Netty架構(gòu)剖析之?dāng)U展性
通過Netty的擴(kuò)展特性酪术,可以自定義安全策略:
- 線程模型可擴(kuò)展
- 序列化方式可擴(kuò)展
- 上層協(xié)議椑谈澹可擴(kuò)展
- 提供大量的網(wǎng)絡(luò)事件切面,方便用戶功能擴(kuò)展
Netty的架構(gòu)可擴(kuò)展性設(shè)計(jì)理念如下:
- 判斷擴(kuò)展點(diǎn)拼缝,事先預(yù)留相關(guān)擴(kuò)展接口娱局,給用戶二次定制和擴(kuò)展使用;
- 主要功能點(diǎn)都基于接口編程咧七,方便用戶定制和擴(kuò)展衰齐。
1.9 數(shù)據(jù)安全性之滑動(dòng)窗口協(xié)議
我們假設(shè)一個(gè)場(chǎng)景,客戶端每次請(qǐng)求服務(wù)端必須得到服務(wù)端的一個(gè)響應(yīng)继阻,由于TCP的數(shù)據(jù)發(fā)送和數(shù)據(jù)接收是異步的耻涛,就存在必須存在一個(gè)等待響應(yīng)的過程废酷。該過程根據(jù)實(shí)現(xiàn)方式不同可以分為一下幾類(部分是錯(cuò)誤案例):
- 每次發(fā)送一個(gè)數(shù)據(jù)包,然后進(jìn)入休眠(sleep)或者阻塞(await)狀態(tài)抹缕,直到響應(yīng)回來或者超時(shí)澈蟆,整個(gè)調(diào)用鏈結(jié)束。此場(chǎng)景是典型的一問一答的場(chǎng)景卓研,效率極其低下趴俘;
- 讀寫分離,寫模塊只負(fù)責(zé)寫奏赘,讀模塊則負(fù)責(zé)接收響應(yīng)寥闪,然后做后續(xù)的處理。此種場(chǎng)景能盡可能的利用帶寬進(jìn)行讀寫磨淌。但是此場(chǎng)景不坐控速操作可能導(dǎo)致大量報(bào)文丟失或者重復(fù)發(fā)送疲憋。
- 實(shí)現(xiàn)類似于Windowed Protocol。此窗口是以上兩種方案的折中版梁只,即允許一定數(shù)量的批量發(fā)送缚柳,又能保證數(shù)據(jù)的完整性。