本系列Netty源碼解析文章基于 4.1.56.Final版本
在上篇文章《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》中我們花了大量的篇幅來(lái)從內(nèi)核角度詳細(xì)講述了五種IO模型
的演進(jìn)過(guò)程以及ReactorIO線程模型
的底層基石IO多路復(fù)用技術(shù)在內(nèi)核中的實(shí)現(xiàn)原理尔许。
最后我們引出了netty中使用的主從Reactor IO線程模型。
通過(guò)上篇文章的介紹踏拜,我們已經(jīng)清楚了在IO調(diào)用的過(guò)程中內(nèi)核幫我們搞了哪些事情愧口,那么俗話說(shuō)的好內(nèi)核領(lǐng)進(jìn)門(mén)凡蜻,修行在netty
良拼,netty在用戶空間又幫我們搞了哪些事情?
那么從本文開(kāi)始疮方,筆者將從源碼角度來(lái)帶大家看下上圖中的Reactor IO線程模型
在Netty中是如何實(shí)現(xiàn)的。
本文作為Reactor在Netty中實(shí)現(xiàn)系列文章中的開(kāi)篇文章,筆者先來(lái)為大家介紹Reactor的骨架是如何創(chuàng)建出來(lái)的蒜茴。
在上篇文章中我們提到Netty采用的是主從Reactor多線程
的模型星爪,但是它在實(shí)現(xiàn)上又與Doug Lea在Scalable IO in Java論文中提到的經(jīng)典主從Reactor多線程模型
有所差異。
Netty中的Reactor
是以Group
的形式出現(xiàn)的粉私,主從Reactor
在Netty中就是主從Reactor組
顽腾,每個(gè)Reactor Group
中會(huì)有多個(gè)Reactor
用來(lái)執(zhí)行具體的IO任務(wù)
。當(dāng)然在netty中Reactor
不只用來(lái)執(zhí)行IO任務(wù)
诺核,這個(gè)我們后面再說(shuō)抄肖。
-
Main Reactor Group
中的Reactor
數(shù)量取決于服務(wù)端要監(jiān)聽(tīng)的端口個(gè)數(shù),通常我們的服務(wù)端程序只會(huì)監(jiān)聽(tīng)一個(gè)端口猪瞬,所以Main Reactor Group
只會(huì)有一個(gè)Main Reactor
線程來(lái)處理最重要的事情:綁定端口地址
憎瘸,接收客戶端連接
,為客戶端創(chuàng)建對(duì)應(yīng)的SocketChannel
陈瘦,將客戶端SocketChannel分配給一個(gè)固定的Sub Reactor
幌甘。也就是上篇文章筆者為大家舉的例子,飯店最重要的工作就是先把客人迎接進(jìn)來(lái)痊项。“我家大門(mén)常打開(kāi)锅风,開(kāi)放懷抱等你,擁抱過(guò)就有了默契你會(huì)愛(ài)上這里......”
-
Sub Reactor Group
里有多個(gè)Reactor
線程皱埠,Reactor
線程的個(gè)數(shù)可以通過(guò)系統(tǒng)參數(shù)-D io.netty.eventLoopThreads
指定。默認(rèn)的Reactor
的個(gè)數(shù)為CPU核數(shù) * 2
咖驮。Sub Reactor
線程主要用來(lái)輪詢客戶端SocketChannel上的IO就緒事件
,處理IO就緒事件
托修,執(zhí)行異步任務(wù)
忘巧。Sub Reactor Group
做的事情就是上篇飯店例子中服務(wù)員的工作,客人進(jìn)來(lái)了要為客人分配座位睦刃,端茶送水砚嘴,做菜上菜。“不管遠(yuǎn)近都是客人涩拙,請(qǐng)不用客氣际长,相約好了在一起,我們歡迎您......”
一個(gè)
客戶端SocketChannel
只能分配給一個(gè)固定的Sub Reactor
兴泥。一個(gè)Sub Reactor
負(fù)責(zé)處理多個(gè)客戶端SocketChannel
工育,這樣可以將服務(wù)端承載的全量客戶端連接
分?jǐn)偟蕉鄠€(gè)Sub Reactor
中處理,同時(shí)也能保證客戶端SocketChannel上的IO處理的線程安全性
郁轻。
由于文章篇幅的關(guān)系翅娶,作為Reactor在netty中實(shí)現(xiàn)的第一篇我們主要來(lái)介紹主從Reactor Group
的創(chuàng)建流程文留,骨架脈絡(luò)先搭好。
下面我們來(lái)看一段Netty服務(wù)端代碼的編寫(xiě)模板竭沫,從代碼模板的流程中我們來(lái)解析下主從Reactor的創(chuàng)建流程以及在這個(gè)過(guò)程中所涉及到的Netty核心類燥翅。
Netty服務(wù)端代碼模板
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主從Reactor
.channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
.option(ChannelOption.SO_BACKLOG, 100)//設(shè)置主Reactor中channel的option選項(xiàng)
.handler(new LoggingHandler(LogLevel.INFO))//設(shè)置主Reactor中Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {//設(shè)置從Reactor中注冊(cè)channel的pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 綁定端口啟動(dòng)服務(wù),開(kāi)始監(jiān)聽(tīng)accept事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- 首先我們要?jiǎng)?chuàng)建Netty最核心的部分 ->
創(chuàng)建主從Reactor Group
蜕提,在Netty中EventLoopGroup
就是Reactor Group
的實(shí)現(xiàn)類森书。對(duì)應(yīng)的EventLoop
就是Reactor
的實(shí)現(xiàn)類。
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- 創(chuàng)建用于
IO處理
的ChannelHandler
谎势,實(shí)現(xiàn)相應(yīng)IO事件
的回調(diào)函數(shù)凛膏,編寫(xiě)對(duì)應(yīng)的IO處理
邏輯。注意這里只是簡(jiǎn)單示例哈脏榆,詳細(xì)的IO事件處理猖毫,筆者會(huì)單獨(dú)開(kāi)一篇文章專門(mén)講述。
final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
................省略IO處理邏輯................
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
-
創(chuàng)建
ServerBootstrap
Netty服務(wù)端啟動(dòng)類须喂,并在啟動(dòng)類中配置啟動(dòng)Netty服務(wù)端所需要的一些必備信息吁断。通過(guò)
serverBootstrap.group(bossGroup, workerGroup)
為Netty服務(wù)端配置主從Reactor Group
實(shí)例。通過(guò)
serverBootstrap.channel(NioServerSocketChannel.class)
配置Netty服務(wù)端的ServerSocketChannel
用于綁定端口地址
以及創(chuàng)建客戶端SocketChannel
坞生。Netty中的NioServerSocketChannel.class
就是對(duì)JDK NIO中ServerSocketChannel
的封裝仔役。而用于表示客戶端連接
的NioSocketChannel
是對(duì)JDK NIOSocketChannel
封裝。
在上篇文章介紹
Socket內(nèi)核結(jié)構(gòu)
小節(jié)中我們提到是己,在編寫(xiě)服務(wù)端網(wǎng)絡(luò)程序時(shí)又兵,我們首先要?jiǎng)?chuàng)建一個(gè)Socket
用于listen和bind
端口地址,我們把這個(gè)叫做監(jiān)聽(tīng)Socket
,這里對(duì)應(yīng)的就是NioServerSocketChannel.class
卒废。當(dāng)客戶端連接完成三次握手沛厨,系統(tǒng)調(diào)用accept
函數(shù)會(huì)基于監(jiān)聽(tīng)Socket
創(chuàng)建出來(lái)一個(gè)新的Socket
專門(mén)用于與客戶端之間的網(wǎng)絡(luò)通信我們稱為客戶端連接Socket
,這里對(duì)應(yīng)的就是NioSocketChannel.class
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)
設(shè)置服務(wù)端ServerSocketChannel
中的SocketOption
。關(guān)于SocketOption
的選項(xiàng)我們后邊的文章再聊摔认,本文主要聚焦在NettyMain Reactor Group
的創(chuàng)建及工作流程俄烁。serverBootstrap.handler(....)
設(shè)置服務(wù)端NioServerSocketChannel
中對(duì)應(yīng)Pipieline
中的ChannelHandler
。
netty有兩種
Channel類型
:一種是服務(wù)端用于監(jiān)聽(tīng)綁定端口地址的NioServerSocketChannel
,一種是用于客戶端通信的NioSocketChannel
级野。每種Channel類型實(shí)例
都會(huì)對(duì)應(yīng)一個(gè)PipeLine
用于編排對(duì)應(yīng)channel實(shí)例
上的IO事件處理邏輯。PipeLine
中組織的就是ChannelHandler
用于編寫(xiě)特定的IO處理邏輯粹胯。注意
serverBootstrap.handler
設(shè)置的是服務(wù)端NioServerSocketChannel PipeLine
中的ChannelHandler
蓖柔。-
serverBootstrap.childHandler(ChannelHandler childHandler)
用于設(shè)置客戶端NioSocketChannel
中對(duì)應(yīng)Pipieline
中的ChannelHandler
。我們通常配置的編碼解碼器就是在這里风纠。
ServerBootstrap
啟動(dòng)類方法帶有child
前綴的均是設(shè)置客戶端NioSocketChannel
屬性的况鸣。ChannelInitializer
是用于當(dāng)SocketChannel
成功注冊(cè)到綁定的Reactor
上后,用于初始化該SocketChannel
的Pipeline
竹观。它的initChannel
方法會(huì)在注冊(cè)成功后執(zhí)行镐捧。這里只是捎帶提一下,讓大家有個(gè)初步印象,后面我會(huì)專門(mén)介紹策吠。 ChannelFuture f = serverBootstrap.bind(PORT).sync()
這一步會(huì)是下篇文章要重點(diǎn)分析的主題Main Reactor Group
的啟動(dòng)茄猫,綁定端口地址,開(kāi)始監(jiān)聽(tīng)客戶端連接事件(OP_ACCEPT
)列牺。本文我們只關(guān)注創(chuàng)建流程整陌。f.channel().closeFuture().sync()
等待服務(wù)端NioServerSocketChannel
關(guān)閉。Netty服務(wù)端到這里正式啟動(dòng)瞎领,并準(zhǔn)備好接受客戶端連接的準(zhǔn)備泌辫。shutdownGracefully
優(yōu)雅關(guān)閉主從Reactor線程組
里的所有Reactor線程
。
Netty對(duì)IO模型的支持
在上篇文章中我們介紹了五種IO模型
九默,Netty中支持BIO
,NIO
,AIO
以及多種操作系統(tǒng)下的IO多路復(fù)用技術(shù)
實(shí)現(xiàn)震放。
在Netty中切換這幾種IO模型
也是非常的方便,下面我們來(lái)看下Netty如何對(duì)這幾種IO模型進(jìn)行支持的驼修。
首先我們介紹下幾個(gè)與IO模型
相關(guān)的重要接口:
EventLoop
EventLoop
就是Netty中的Reactor
殿遂,可以說(shuō)它就是Netty的引擎,負(fù)責(zé)Channel上IO就緒事件的監(jiān)聽(tīng)
邪锌,IO就緒事件的處理
勉躺,異步任務(wù)的執(zhí)行
驅(qū)動(dòng)著整個(gè)Netty的運(yùn)轉(zhuǎn)。
不同IO模型
下觅丰,EventLoop
有著不同的實(shí)現(xiàn)饵溅,我們只需要切換不同的實(shí)現(xiàn)類就可以完成對(duì)NettyIO模型
的切換。
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在NIO模型
下Netty會(huì)自動(dòng)
根據(jù)操作系統(tǒng)以及版本的不同選擇對(duì)應(yīng)的IO多路復(fù)用技術(shù)實(shí)現(xiàn)
妇萄。比如Linux 2.6版本以上用的是Epoll
蜕企,2.6版本以下用的是Poll
,Mac下采用的是Kqueue
冠句。
其中Linux kernel 在5.1版本引入的異步IO庫(kù)io_uring正在netty中孵化轻掩。
EventLoopGroup
Netty中的Reactor
是以Group
的形式出現(xiàn)的,EventLoopGroup
正是Reactor組
的接口定義懦底,負(fù)責(zé)管理Reactor
唇牧,Netty中的Channel
就是通過(guò)EventLoopGroup
注冊(cè)到具體的Reactor
上的。
Netty的IO線程模型是主從Reactor多線程模型
聚唐,主從Reactor線程組
在Netty源碼中對(duì)應(yīng)的其實(shí)就是兩個(gè)EventLoopGroup
實(shí)例丐重。
不同的IO模型
也有對(duì)應(yīng)的實(shí)現(xiàn):
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
ServerSocketChannel
用于Netty服務(wù)端使用的ServerSocketChannel
,對(duì)應(yīng)于上篇文章提到的監(jiān)聽(tīng)Socket
杆查,負(fù)責(zé)綁定監(jiān)聽(tīng)端口地址扮惦,接收客戶端連接并創(chuàng)建用于與客戶端通信的SocketChannel
。
不同的IO模型
下的實(shí)現(xiàn):
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
SocketChannel
用于與客戶端通信的SocketChannel
亲桦,對(duì)應(yīng)于上篇文章提到的客戶端連接Socket
崖蜜,當(dāng)客戶端完成三次握手后浊仆,由系統(tǒng)調(diào)用accept
函數(shù)根據(jù)監(jiān)聽(tīng)Socket
創(chuàng)建。
不同的IO模型
下的實(shí)現(xiàn):
BIO | NIO | AIO |
---|---|---|
OioSocketChannel | NioSocketChannel | AioSocketChannel |
我們看到在不同IO模型
的實(shí)現(xiàn)中豫领,Netty這些圍繞IO模型
的核心類只是前綴的不同:
- BIO對(duì)應(yīng)的前綴為
Oio
表示old io
抡柿,現(xiàn)在已經(jīng)廢棄不推薦使用。 - NIO對(duì)應(yīng)的前綴為
Nio
氏堤,正是Netty推薦也是我們常用的非阻塞IO模型
沙绝。 - AIO對(duì)應(yīng)的前綴為
Aio
,由于Linux下的異步IO
機(jī)制實(shí)現(xiàn)的并不成熟鼠锈,性能提升表現(xiàn)上也不明顯闪檬,現(xiàn)已被刪除。
我們只需要將IO模型
的這些核心接口對(duì)應(yīng)的實(shí)現(xiàn)類前綴
改為對(duì)應(yīng)IO模型
的前綴购笆,就可以輕松在Netty中完成對(duì)IO模型
的切換粗悯。
多種NIO的實(shí)現(xiàn)
Common | Linux | Mac |
---|---|---|
NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
NioEventLoop | EpollEventLoop | KQueueEventLoop |
NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
我們通常在使用NIO模型
的時(shí)候會(huì)使用Common列
下的這些IO模型
核心類,Common類
也會(huì)根據(jù)操作系統(tǒng)的不同自動(dòng)選擇JDK
在對(duì)應(yīng)平臺(tái)下的IO多路復(fù)用技術(shù)
的實(shí)現(xiàn)同欠。
而Netty自身也根據(jù)操作系統(tǒng)的不同提供了自己對(duì)IO多路復(fù)用技術(shù)
的實(shí)現(xiàn)样傍,比JDK
的實(shí)現(xiàn)性能更優(yōu)。比如:
-
JDK
的 NIO默認(rèn)
實(shí)現(xiàn)是水平觸發(fā)
铺遂,Netty 是邊緣觸發(fā)(默認(rèn))
和水平觸發(fā)可切換衫哥。。 - Netty 實(shí)現(xiàn)的垃圾回收更少襟锐、性能更好撤逢。
我們編寫(xiě)Netty服務(wù)端程序的時(shí)候也可以根據(jù)操作系統(tǒng)的不同,采用Netty自身的實(shí)現(xiàn)來(lái)進(jìn)一步優(yōu)化程序粮坞。做法也很簡(jiǎn)單蚊荣,直接將上圖中紅框里的實(shí)現(xiàn)類替換成Netty的自身實(shí)現(xiàn)類即可完成切換。
經(jīng)過(guò)以上對(duì)Netty服務(wù)端代碼編寫(xiě)模板以及IO模型
相關(guān)核心類的簡(jiǎn)單介紹莫杈,我們對(duì)Netty的創(chuàng)建流程有了一個(gè)簡(jiǎn)單粗略的總體認(rèn)識(shí)互例,下面我們來(lái)深入剖析下創(chuàng)建流程過(guò)程中的每一個(gè)步驟以及這個(gè)過(guò)程中涉及到的核心類實(shí)現(xiàn)。
以下源碼解析部分我們均采用Common列
下NIO
相關(guān)的實(shí)現(xiàn)進(jìn)行解析筝闹。
創(chuàng)建主從Reactor線程組
在Netty服務(wù)端程序編寫(xiě)模板的開(kāi)始媳叨,我們首先會(huì)創(chuàng)建兩個(gè)Reactor線程組:
一個(gè)是主Reactor線程組
bossGroup
用于監(jiān)聽(tīng)客戶端連接,創(chuàng)建客戶端連接NioSocketChannel
关顷,并將創(chuàng)建好的客戶端連接NioSocketChannel
注冊(cè)到從Reactor線程組中一個(gè)固定的Reactor
上肩杈。一個(gè)是從Reactor線程組
workerGroup
,workerGroup
中的Reactor
負(fù)責(zé)監(jiān)聽(tīng)綁定在其上的客戶端連接NioSocketChannel
上的IO就緒事件
解寝,并處理IO就緒事件
,執(zhí)行異步任務(wù)
艘儒。
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor線程組的實(shí)現(xiàn)類為NioEventLoopGroup
聋伦,在創(chuàng)建bossGroup
和workerGroup
的時(shí)候用到了NioEventLoopGroup
的兩個(gè)構(gòu)造函數(shù):
- 帶
nThreads
參數(shù)的構(gòu)造函數(shù)public NioEventLoopGroup(int nThreads)
夫偶。 - 不帶
nThreads
參數(shù)的默認(rèn)
構(gòu)造函數(shù)public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {
this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
......................省略...........................
}
nThreads
參數(shù)表示當(dāng)前要?jiǎng)?chuàng)建的Reactor線程組
內(nèi)包含多少個(gè)Reactor線程
。不指定nThreads
參數(shù)的話采用默認(rèn)的Reactor線程
個(gè)數(shù)觉增,用0
表示兵拢。
最終會(huì)調(diào)用到構(gòu)造函數(shù)
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
下面簡(jiǎn)單介紹下構(gòu)造函數(shù)中這幾個(gè)參數(shù)的作用,后面我們?cè)谥v解本文主線的過(guò)程中還會(huì)提及這幾個(gè)參數(shù)逾礁,到時(shí)在詳細(xì)介紹说铃,這里只是讓大家有個(gè)初步印象,不必做過(guò)多的糾纏嘹履。
-
Executor executor:
負(fù)責(zé)啟動(dòng)Reactor線程
進(jìn)而Reactor才可以開(kāi)始工作腻扇。
Reactor線程組
NioEventLoopGroup
負(fù)責(zé)創(chuàng)建Reactor線程
,在創(chuàng)建的時(shí)候會(huì)將executor
傳入砾嫉。
RejectedExecutionHandler:
當(dāng)向Reactor
添加異步任務(wù)添加失敗時(shí)幼苛,采用的拒絕策略。Reactor的任務(wù)不只是監(jiān)聽(tīng)I(yíng)O活躍事件和IO任務(wù)的處理焕刮,還包括對(duì)異步任務(wù)的處理舶沿。這里大家只需有個(gè)這樣的概念,后面筆者會(huì)專門(mén)詳細(xì)介紹配并。SelectorProvider selectorProvider:
Reactor中的IO模型為IO多路復(fù)用模型
括荡,對(duì)應(yīng)于JDK NIO中的實(shí)現(xiàn)為java.nio.channels.Selector
(就是我們上篇文章中提到的select,poll,epoll
),每個(gè)Reator中都包含一個(gè)Selector
溉旋,用于輪詢
注冊(cè)在該Reactor上的所有Channel
上的IO事件
畸冲。SelectorProvider
就是用來(lái)創(chuàng)建Selector
的。SelectStrategyFactory selectStrategyFactory:
Reactor最重要的事情就是輪詢
注冊(cè)其上的Channel
上的IO就緒事件
低滩,這里的SelectStrategyFactory
用于指定輪詢策略
召夹,默認(rèn)為DefaultSelectStrategyFactory.INSTANCE
。
最終會(huì)將這些參數(shù)交給NioEventLoopGroup
的父類構(gòu)造器恕沫,下面我們來(lái)看下NioEventLoopGroup類
的繼承結(jié)構(gòu):
NioEventLoopGroup類
的繼承結(jié)構(gòu)乍一看比較復(fù)雜监憎,大家不要慌,筆者會(huì)隨著主線的深入慢慢地介紹這些父類接口婶溯,我們現(xiàn)在重點(diǎn)關(guān)注Mutithread
前綴的類鲸阔。
我們知道NioEventLoopGroup
是Netty中的Reactor線程組
的實(shí)現(xiàn),既然是線程組那么肯定是負(fù)責(zé)管理和創(chuàng)建多個(gè)Reactor線程的
迄委,所以Mutithread
前綴的類定義的行為自然是對(duì)Reactor線程組
內(nèi)多個(gè)Reactor線程
的創(chuàng)建和管理工作褐筛。
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
//默認(rèn)Reactor個(gè)數(shù)
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...................省略.....................
}
MultithreadEventLoopGroup類
主要的功能就是用來(lái)確定Reactor線程組
內(nèi)Reactor
的個(gè)數(shù)。
默認(rèn)的Reactor
的個(gè)數(shù)存放于字段DEFAULT_EVENT_LOOP_THREADS
中叙身。
從static {}
靜態(tài)代碼塊中我們可以看出默認(rèn)Reactor
的個(gè)數(shù)的獲取邏輯:
可以通過(guò)系統(tǒng)變量
-D io.netty.eventLoopThreads"
指定渔扎。如果不指定,那么默認(rèn)的就是
NettyRuntime.availableProcessors() * 2
當(dāng)nThread
參數(shù)設(shè)置為0
采用默認(rèn)設(shè)置時(shí)信轿,Reactor線程組
內(nèi)的Reactor
個(gè)數(shù)則設(shè)置為DEFAULT_EVENT_LOOP_THREADS
晃痴。
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup
這里就是本小節(jié)的核心残吩,主要用來(lái)定義創(chuàng)建和管理Reactor
的行為。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//Reactor線程組中的Reactor集合
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
//從Reactor group中選擇一個(gè)特定的Reactor的選擇策略 用于channel注冊(cè)綁定到一個(gè)固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
............................省略................................
}
首先介紹一個(gè)新的構(gòu)造器參數(shù)EventExecutorChooserFactory chooserFactory
倘核。當(dāng)客戶端連接完成三次握手后泣侮,Main Reactor
會(huì)創(chuàng)建客戶端連接NioSocketChannel
,并將其綁定到Sub Reactor Group
中的一個(gè)固定Reactor
紧唱,那么具體要綁定到哪個(gè)具體的Sub Reactor
上呢活尊?這個(gè)綁定策略就是由chooserFactory
來(lái)創(chuàng)建的。默認(rèn)為DefaultEventExecutorChooserFactory
漏益。
下面就是本小節(jié)的主題Reactor線程組
的創(chuàng)建過(guò)程:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//用于創(chuàng)建Reactor線程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
//循環(huán)創(chuàng)建reaactor group中的Reactor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//創(chuàng)建reactor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
................省略................
}
}
}
//創(chuàng)建channel到Reactor的綁定策略
chooser = chooserFactory.newChooser(children);
................省略................
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
1. 創(chuàng)建用于啟動(dòng)Reactor線程的executor
在Netty Reactor Group中的單個(gè)Reactor
的IO線程模型
為上篇文章提到的單Reactor單線程模型
蛹锰,一個(gè)Reactor線程
負(fù)責(zé)輪詢
注冊(cè)其上的所有Channel
中的IO就緒事件
宁仔,處理IO事件翎苫,執(zhí)行Netty中的異步任務(wù)等工作煎谍。正是這個(gè)Reactor線程
驅(qū)動(dòng)著整個(gè)Netty的運(yùn)轉(zhuǎn),可謂是Netty的核心引擎转捕。
而這里的executor
就是負(fù)責(zé)啟動(dòng)Reactor線程
的痘儡,從創(chuàng)建源碼中我們可以看到executor
的類型為ThreadPerTaskExecutor
沉删。
ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我們看到ThreadPerTaskExecutor
做的事情很簡(jiǎn)單醉途,從它的命名前綴ThreadPerTask
我們就可以猜出它的工作方式矾瑰,就是來(lái)一個(gè)任務(wù)就創(chuàng)建一個(gè)線程執(zhí)行。而創(chuàng)建的這個(gè)線程正是netty的核心引擎Reactor線程隘擎。
在Reactor線程
啟動(dòng)的時(shí)候殴穴,Netty會(huì)將Reactor線程
要做的事情封裝成Runnable
,丟給exexutor
啟動(dòng)。
而Reactor線程
的核心就是一個(gè)死循環(huán)
不停的輪詢
IO就緒事件推正,處理IO事件恍涂,執(zhí)行異步任務(wù)。一刻也不停歇植榕,堪稱996典范
。
這里向大家先賣(mài)個(gè)關(guān)子尼夺,"Reactor線程是何時(shí)啟動(dòng)的呢尊残??"
2. 創(chuàng)建Reactor
Reactor線程組NioEventLoopGroup
包含多個(gè)Reactor
寝衫,存放于private final EventExecutor[] children
數(shù)組中。
所以下面的事情就是創(chuàng)建nThread
個(gè)Reactor
汹胃,并存放于EventExecutor[] children
字段中,
我們來(lái)看下用于創(chuàng)建Reactor
的newChild(executor, args)
方法:
newChild
newChild
方法是MultithreadEventExecutorGroup
中的一個(gè)抽象方法宰掉,提供給具體子類實(shí)現(xiàn)。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
這里我們解析的是NioEventLoopGroup
挪拟,我們來(lái)看下newChild
在該類中的實(shí)現(xiàn):
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
前邊提到的眾多構(gòu)造器參數(shù)舞丛,這里會(huì)通過(guò)可變參數(shù)Object... args
傳入到Reactor類NioEventLoop
的構(gòu)造器中球切。
這里介紹下新的參數(shù)EventLoopTaskQueueFactory queueFactory
吨凑,前邊提到Netty中的Reactor
主要工作是輪詢
注冊(cè)其上的所有Channel
上的IO就緒事件
糙臼,處理IO就緒事件
。除了這些主要的工作外,Netty為了極致的壓榨Reactor
的性能凰棉,還會(huì)讓它做一些異步任務(wù)的執(zhí)行工作掏秩。既然要執(zhí)行異步任務(wù),那么Reactor
中就需要一個(gè)隊(duì)列
來(lái)保存任務(wù)魏宽。
這里的EventLoopTaskQueueFactory
就是用來(lái)創(chuàng)建這樣的一個(gè)隊(duì)列來(lái)保存Reactor
中待執(zhí)行的異步任務(wù)构诚。
可以把Reactor
理解成為一個(gè)單線程的線程池
,類似
于JDK
中的SingleThreadExecutor
叠聋,僅用一個(gè)線程來(lái)執(zhí)行輪詢IO就緒事件
,處理IO就緒事件
镇匀,執(zhí)行異步任務(wù)
群发。同時(shí)待執(zhí)行的異步任務(wù)保存在Reactor
里的taskQueue
中。
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
//用于創(chuàng)建JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector輪詢策略 決定什么時(shí)候輪詢,什么時(shí)候處理IO事件告材,什么時(shí)候執(zhí)行異步任務(wù)
private final SelectStrategy selectStrategy;
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
}
這里就正式開(kāi)始了Reactor
的創(chuàng)建過(guò)程疤剑,我們知道Reactor
的核心是采用的IO多路復(fù)用模型
來(lái)對(duì)客戶端連接上的IO事件
進(jìn)行監(jiān)聽(tīng)
,所以最重要的事情是創(chuàng)建Selector
(JDK NIO 中IO多路復(fù)用技術(shù)的實(shí)現(xiàn)
)。
可以把
Selector
理解為我們上篇文章介紹的Select,poll,epoll
管钳,它是JDK NIO
對(duì)操作系統(tǒng)內(nèi)核提供的這些IO多路復(fù)用技術(shù)
的封裝葫隙。
openSelector
openSelector
是NioEventLoop類
中用于創(chuàng)建IO多路復(fù)用
的Selector
焰手,并對(duì)創(chuàng)建出來(lái)的JDK NIO
原生的Selector
進(jìn)行性能優(yōu)化躬拢。
首先會(huì)通過(guò)SelectorProvider#openSelector
創(chuàng)建JDK NIO原生的Selector
。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//通過(guò)JDK NIO SelectorProvider創(chuàng)建Selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
..................省略.............
}
SelectorProvider
會(huì)根據(jù)操作系統(tǒng)的不同選擇JDK在不同操作系統(tǒng)版本下的對(duì)應(yīng)Selector
的實(shí)現(xiàn)。Linux下會(huì)選擇Epoll
魏身,Mac下會(huì)選擇Kqueue
回季。
下面我們就來(lái)看下SelectorProvider
是如何做到自動(dòng)適配不同操作系統(tǒng)下IO多路復(fù)用
實(shí)現(xiàn)的
SelectorProvider
public NioEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, SelectorProvider.provider());
}
SelectorProvider
是在前面介紹的NioEventLoopGroup類
構(gòu)造函數(shù)中通過(guò)調(diào)用SelectorProvider.provider()
被加載,并通過(guò)NioEventLoopGroup#newChild
方法中的可變長(zhǎng)參數(shù)Object... args
傳遞到NioEventLoop
中的private final SelectorProvider provider
字段中泄私。
SelectorProvider的加載過(guò)程:
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
從SelectorProvider
加載源碼中我們可以看出讨阻,SelectorProvider
的加載方式有三種板辽,優(yōu)先級(jí)如下:
- 通過(guò)系統(tǒng)變量
-D java.nio.channels.spi.SelectorProvider
指定SelectorProvider
的自定義實(shí)現(xiàn)類全限定名
。通過(guò)應(yīng)用程序類加載器(Application Classloader)
加載。
private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
}
.................省略.............
}
- 通過(guò)
SPI
方式加載。在工程目錄META-INF/services
下定義名為java.nio.channels.spi.SelectorProvider
的SPI文件
轴踱,文件中第一個(gè)定義的SelectorProvider
實(shí)現(xiàn)類全限定名就會(huì)被加載沸版。
private static boolean loadProviderAsService() {
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}
- 如果以上兩種方式均未被定義橙凳,那么就采用
SelectorProvider
系統(tǒng)默認(rèn)實(shí)現(xiàn)sun.nio.ch.DefaultSelectorProvider
。筆者當(dāng)前使用的操作系統(tǒng)是MacOS
荡灾,從源碼中我們可以看到自動(dòng)適配了KQueue
實(shí)現(xiàn)嗓节。
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}
不同操作系統(tǒng)中JDK對(duì)于
DefaultSelectorProvider
會(huì)有所不同信姓,Linux內(nèi)核版本2.6以上對(duì)應(yīng)的Epoll
,Linux內(nèi)核版本2.6以下對(duì)應(yīng)的Poll
,MacOS對(duì)應(yīng)的是KQueue
俊性。
下面我們接著回到io.netty.channel.nio.NioEventLoop#openSelector
的主線上來(lái)绽诚。
Netty對(duì)JDK NIO 原生Selector的優(yōu)化
首先在NioEventLoop
中有一個(gè)Selector優(yōu)化開(kāi)關(guān)DISABLE_KEY_SET_OPTIMIZATION
,通過(guò)系統(tǒng)變量-D io.netty.noKeySetOptimization
指定,默認(rèn)是開(kāi)啟的儡毕,表示需要對(duì)JDK NIO原生Selector
進(jìn)行優(yōu)化疆股。
public final class NioEventLoop extends SingleThreadEventLoop {
//Selector優(yōu)化開(kāi)關(guān) 默認(rèn)開(kāi)啟 為了遍歷的效率 會(huì)對(duì)Selector中的SelectedKeys進(jìn)行數(shù)據(jù)結(jié)構(gòu)優(yōu)化
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}
如果優(yōu)化開(kāi)關(guān)DISABLE_KEY_SET_OPTIMIZATION
是關(guān)閉的,那么直接返回JDK NIO原生的Selector
。
private SelectorTuple openSelector() {
..........SelectorProvider創(chuàng)建JDK NIO 原生Selector..............
if (DISABLE_KEY_SET_OPTIMIZATION) {
//JDK NIO原生Selector 卷雕,Selector優(yōu)化開(kāi)關(guān) 默認(rèn)開(kāi)啟需要對(duì)Selector進(jìn)行優(yōu)化
return new SelectorTuple(unwrappedSelector);
}
}
下面為Netty對(duì)JDK NIO原生的Selector
的優(yōu)化過(guò)程:
- 獲取
JDK NIO原生Selector
的抽象實(shí)現(xiàn)類sun.nio.ch.SelectorImpl
。JDK NIO原生Selector
的實(shí)現(xiàn)均繼承于該抽象類。用于判斷由SelectorProvider
創(chuàng)建出來(lái)的Selector
是否為JDK默認(rèn)實(shí)現(xiàn)
(SelectorProvider
第三種加載方式)囊扳。因?yàn)?code>SelectorProvider可以是自定義加載细移,所以它創(chuàng)建出來(lái)的Selector
并不一定是JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
JDK NIO Selector的抽象類sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO就緒的SelectionKey(里面包裹著channel)
protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
//注冊(cè)在該Selector上的所有SelectionKey(里面包裹著channel)
protected HashSet<SelectionKey> keys;
// Public views of the key sets
//用于向調(diào)用線程返回的keys,不可變
private Set<SelectionKey> publicKeys; // Immutable
//當(dāng)有IO就緒的SelectionKey時(shí)鸟廓,向調(diào)用線程返回擎浴。只可刪除其中元素契讲,不可增加
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
//不可變
publicKeys = Collections.unmodifiableSet(keys);
//只可刪除其中元素,不可增加
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
這里筆者來(lái)簡(jiǎn)單介紹下JDK NIO中的Selector
中這幾個(gè)字段的含義你虹,我們可以和上篇文章講到的epoll在內(nèi)核中的結(jié)構(gòu)做類比,方便大家后續(xù)的理解:
-
Set<SelectionKey> selectedKeys
類似于我們上篇文章講解Epoll
時(shí)提到的就緒隊(duì)列eventpoll->rdllist
,Selector
這里大家可以理解為Epoll
。Selector
會(huì)將自己監(jiān)聽(tīng)到的IO就緒
的Channel
放到selectedKeys
中谴供。
這里的
SelectionKey
暫且可以理解為Channel
在Selector
中的表示,類比上圖中epitem結(jié)構(gòu)
里的epoll_event
,封裝IO就緒Socket的信息。
其實(shí)SelectionKey
里包含的信息不止是Channel
還有很多IO相關(guān)的信息螃宙。后面我們?cè)谠敿?xì)介紹。
-
HashSet<SelectionKey> keys:
這里存放的是所有注冊(cè)到該Selector
上的Channel
堂湖。類比epoll中的紅黑樹(shù)結(jié)構(gòu)rb_root
SelectionKey
在Channel
注冊(cè)到Selector
中后生成斥季。
Set<SelectionKey> publicSelectedKeys
相當(dāng)于是selectedKeys
的視圖渊迁,用于向外部線程返回IO就緒
的SelectionKey
。這個(gè)集合在外部線程中只能做刪除操作不可增加元素
,并且不是線程安全的
耕漱。Set<SelectionKey> publicKeys
相當(dāng)于keys
的不可變視圖峡钓,用于向外部線程返回所有注冊(cè)在該Selector
上的SelectionKey
這里需要重點(diǎn)關(guān)注
抽象類sun.nio.ch.SelectorImpl
中的selectedKeys
和publicSelectedKeys
這兩個(gè)字段,注意它們的類型都是HashSet
辈赋,一會(huì)優(yōu)化的就是這里0颖琛Q挥尽;魑场!
- 判斷由
SelectorProvider
創(chuàng)建出來(lái)的Selector
是否是JDK NIO原生的Selector
實(shí)現(xiàn)献宫。因?yàn)镹etty優(yōu)化針對(duì)的是JDK NIO 原生Selector
。判斷標(biāo)準(zhǔn)為sun.nio.ch.SelectorImpl
類是否為SelectorProvider
創(chuàng)建出Selector
的父類。如果不是則直接返回。不在繼續(xù)下面的優(yōu)化過(guò)程顶考。
//判斷是否可以對(duì)Selector進(jìn)行優(yōu)化浮庐,這里主要針對(duì)JDK NIO原生Selector的實(shí)現(xiàn)類進(jìn)行優(yōu)化,因?yàn)镾electorProvider可以加載的是自定義Selector實(shí)現(xiàn)
//如果SelectorProvider創(chuàng)建的Selector不是JDK原生sun.nio.ch.SelectorImpl的實(shí)現(xiàn)類,那么無(wú)法進(jìn)行優(yōu)化,直接返回
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
通過(guò)前面對(duì)SelectorProvider
的介紹我們知道,這里通過(guò)provider.openSelector()
創(chuàng)建出來(lái)的Selector
實(shí)現(xiàn)類為KQueueSelectorImpl類
,它繼承實(shí)現(xiàn)了sun.nio.ch.SelectorImpl
盲憎,所以它是JDK NIO 原生的Selector
實(shí)現(xiàn)
class KQueueSelectorImpl extends SelectorImpl {
}
- 創(chuàng)建
SelectedSelectionKeySet
通過(guò)反射替換掉sun.nio.ch.SelectorImpl類
中selectedKeys
和publicSelectedKeys
的默認(rèn)HashSet
實(shí)現(xiàn)串远。
為什么要用SelectedSelectionKeySet
替換掉原來(lái)的HashSet
呢伸但?却妨?
因?yàn)檫@里涉及到對(duì)HashSet類型
的sun.nio.ch.SelectorImpl#selectedKeys
集合的兩種操作:
插入操作: 通過(guò)前邊對(duì)
sun.nio.ch.SelectorImpl類
中字段的介紹我們知道,在Selector
監(jiān)聽(tīng)到IO就緒
的SelectionKey
后,會(huì)將IO就緒
的SelectionKey
插入sun.nio.ch.SelectorImpl#selectedKeys
集合中,這時(shí)Reactor線程
會(huì)從java.nio.channels.Selector#select(long)
阻塞調(diào)用中返回(類似上篇文章提到的epoll_wait
)。遍歷操作:
Reactor線程
返回后,會(huì)從Selector
中獲取IO就緒
的SelectionKey
集合(也就是sun.nio.ch.SelectorImpl#selectedKeys
),Reactor線程
遍歷selectedKeys
,獲取IO就緒
的SocketChannel
,并處理SocketChannel
上的IO事件
步藕。
我們都知道HashSet
底層數(shù)據(jù)結(jié)構(gòu)是一個(gè)哈希表
惦界,由于Hash沖突
這種情況的存在,所以導(dǎo)致對(duì)哈希表
進(jìn)行插入
和遍歷
操作的性能不如對(duì)數(shù)組
進(jìn)行插入
和遍歷
操作的性能好咙冗。
還有一個(gè)重要原因是沾歪,數(shù)組可以利用CPU緩存的優(yōu)勢(shì)來(lái)提高遍歷的效率狂窑。后面筆者會(huì)有一篇專門(mén)的文章來(lái)講述利用CPU緩存行如何為我們帶來(lái)性能優(yōu)勢(shì)烫沙。
所以Netty為了優(yōu)化對(duì)sun.nio.ch.SelectorImpl#selectedKeys
集合的插入,遍歷
性能妒茬,自己用數(shù)組
這種數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)了SelectedSelectionKeySet
夹孔,用它來(lái)替換原來(lái)的HashSet
實(shí)現(xiàn)佑菩。
SelectedSelectionKeySet
初始化
SelectionKey[] keys
數(shù)組大小為1024
,當(dāng)數(shù)組容量不夠時(shí)问顷,擴(kuò)容為原來(lái)的兩倍大小。通過(guò)數(shù)組尾部指針
size
瞎嬉,在向數(shù)組插入元素的時(shí)候可以直接定位到插入位置keys[size++]
毁靶。操作一步到位案铺,不用像哈希表
那樣還需要解決Hash沖突
。對(duì)數(shù)組的遍歷操作也是如絲般順滑拖吼,CPU直接可以在緩存行中遍歷讀取數(shù)組元素?zé)o需訪問(wèn)內(nèi)存。比
HashSet
的迭代器java.util.HashMap.KeyIterator
遍歷方式性能不知高到哪里去了旱易。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
//采用數(shù)組替換到JDK中的HashSet,這樣add操作和遍歷操作效率更高沸移,不需要考慮hash沖突
SelectionKey[] keys;
//數(shù)組尾部指針
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
/**
* 數(shù)組的添加效率高于 HashSet 因?yàn)椴恍枰紤]hash沖突
* */
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
//時(shí)間復(fù)雜度O(1)
keys[size++] = o;
if (size == keys.length) {
//擴(kuò)容為原來(lái)的兩倍大小
increaseCapacity();
}
return true;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
/**
* 采用數(shù)組的遍歷效率 高于 HashSet
* */
@Override
public Iterator<SelectionKey> iterator() {
return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {
return idx < size;
}
@Override
public SelectionKey next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}
看到這里不禁感嘆窗市,從各種小的細(xì)節(jié)可以看出Netty對(duì)性能的優(yōu)化簡(jiǎn)直淋漓盡致商玫,對(duì)性能的追求令人發(fā)指寇壳。細(xì)節(jié)真的是魔鬼。
- Netty通過(guò)反射的方式用
SelectedSelectionKeySet
替換掉sun.nio.ch.SelectorImpl#selectedKeys
,sun.nio.ch.SelectorImpl#publicSelectedKeys
這兩個(gè)集合中原來(lái)HashSet
的實(shí)現(xiàn)阵难。
- 反射獲取
sun.nio.ch.SelectorImpl
類中selectedKeys
和publicSelectedKeys
。
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
-
Java9
版本以上通過(guò)sun.misc.Unsafe
設(shè)置字段值的方式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
- 通過(guò)反射的方式用
SelectedSelectionKeySet
替換掉hashSet
實(shí)現(xiàn)的sun.nio.ch.SelectorImpl#selectedKeys伙菊,sun.nio.ch.SelectorImpl#publicSelectedKeys
产舞。
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
//Java8反射替換字段
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- 將與
sun.nio.ch.SelectorImpl
類中selectedKeys
和publicSelectedKeys
關(guān)聯(lián)好的Netty優(yōu)化實(shí)現(xiàn)SelectedSelectionKeySet
,設(shè)置到io.netty.channel.nio.NioEventLoop#selectedKeys
字段中保存瓜喇。
//會(huì)通過(guò)反射替換selector對(duì)象中的selectedKeySet保存就緒的selectKey
//該字段為持有selector對(duì)象selectedKeys的引用典蝌,當(dāng)IO事件就緒時(shí)坡锡,直接從這里獲取
private SelectedSelectionKeySet selectedKeys;
后續(xù)
Reactor線程
就會(huì)直接從io.netty.channel.nio.NioEventLoop#selectedKeys
中獲取IO就緒
的SocketChannel
- 用
SelectorTuple
封裝unwrappedSelector
和wrappedSelector
返回給NioEventLoop
構(gòu)造函數(shù)冰木。到此Reactor
中的Selector
就創(chuàng)建完畢了踊沸。
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
所謂的
unwrappedSelector
是指被Netty優(yōu)化過(guò)的JDK NIO原生Selector。所謂的
wrappedSelector
就是用SelectedSelectionKeySetSelector
裝飾類將unwrappedSelector
和與sun.nio.ch.SelectorImpl類
關(guān)聯(lián)好的Netty優(yōu)化實(shí)現(xiàn)SelectedSelectionKeySet
封裝裝飾起來(lái)社证。
wrappedSelector
會(huì)將所有對(duì)Selector
的操作全部代理給unwrappedSelector
逼龟,并在發(fā)起輪詢IO事件
的相關(guān)操作中,重置SelectedSelectionKeySet
清空上一次的輪詢結(jié)果追葡。
final class SelectedSelectionKeySetSelector extends Selector {
//Netty優(yōu)化后的 SelectedKey就緒集合
private final SelectedSelectionKeySet selectionKeys;
//優(yōu)化后的JDK NIO 原生Selector
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public SelectorProvider provider() {
return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {
return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {
return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.selectNow();
}
@Override
public int select(long timeout) throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
//重置SelectedKeys集合
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}
到這里Reactor的核心Selector就創(chuàng)建好了腺律,下面我們來(lái)看下用于保存異步任務(wù)的隊(duì)列是如何創(chuàng)建出來(lái)的奕短。
newTaskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
//通過(guò)用SelectedSelectionKeySet裝飾后的unwrappedSelector
this.selector = selectorTuple.selector;
//Netty優(yōu)化過(guò)的JDK NIO遠(yuǎn)程Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
我們繼續(xù)回到創(chuàng)建Reactor
的主線上,到目前為止Reactor
的核心Selector
就創(chuàng)建好了匀钧,前邊我們提到Reactor
除了需要監(jiān)聽(tīng)I(yíng)O就緒事件
以及處理IO就緒事件
外篡诽,還需要執(zhí)行一些異步任務(wù),當(dāng)外部線程向Reactor
提交異步任務(wù)后榴捡,Reactor
就需要一個(gè)隊(duì)列來(lái)保存這些異步任務(wù)杈女,等待Reactor線程
執(zhí)行。
下面我們來(lái)看下Reactor
中任務(wù)隊(duì)列的創(chuàng)建過(guò)程:
//任務(wù)隊(duì)列大小吊圾,默認(rèn)是無(wú)界隊(duì)列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
在
NioEventLoop
的父類SingleThreadEventLoop
中提供了一個(gè)靜態(tài)變量DEFAULT_MAX_PENDING_TASKS
用來(lái)指定Reactor
任務(wù)隊(duì)列的大小达椰。可以通過(guò)系統(tǒng)變量-D io.netty.eventLoop.maxPendingTasks
進(jìn)行設(shè)置项乒,默認(rèn)為Integer.MAX_VALUE
啰劲,表示任務(wù)隊(duì)列默認(rèn)為無(wú)界隊(duì)列
。根據(jù)
DEFAULT_MAX_PENDING_TASKS
變量的設(shè)定檀何,來(lái)決定創(chuàng)建無(wú)界任務(wù)隊(duì)列還是有界任務(wù)隊(duì)列蝇裤。
//創(chuàng)建無(wú)界任務(wù)隊(duì)列
PlatformDependent.<Runnable>newMpscQueue()
//創(chuàng)建有界任務(wù)隊(duì)列
PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
return Mpsc.newMpscQueue(maxCapacity);
}
Reactor
內(nèi)的異步任務(wù)隊(duì)列的類型為MpscQueue
,它是由JCTools
提供的一個(gè)高性能無(wú)鎖隊(duì)列,從命名前綴Mpsc
可以看出频鉴,它適用于多生產(chǎn)者單消費(fèi)者
的場(chǎng)景栓辜,它支持多個(gè)生產(chǎn)者線程安全的訪問(wèn)隊(duì)列,同一時(shí)刻只允許一個(gè)消費(fèi)者線程讀取隊(duì)列中的元素垛孔。
我們知道Netty中的
Reactor
可以線程安全
的處理注冊(cè)其上的多個(gè)SocketChannel
上的IO數(shù)據(jù)
藕甩,保證Reactor線程安全
的核心原因正是因?yàn)檫@個(gè)MpscQueue
,它可以支持多個(gè)業(yè)務(wù)線程在處理完業(yè)務(wù)邏輯后周荐,線程安全的向MpscQueue
添加異步寫(xiě)任務(wù)
狭莱,然后由單個(gè)Reactor線程
來(lái)執(zhí)行這些寫(xiě)任務(wù)
。既然是單線程執(zhí)行概作,那肯定是線程安全的了腋妙。
Reactor對(duì)應(yīng)的NioEventLoop類型繼承結(jié)構(gòu)
NioEventLoop
的繼承結(jié)構(gòu)也是比較復(fù)雜,這里我們只關(guān)注在Reactor
創(chuàng)建過(guò)程中涉及的到兩個(gè)父類SingleThreadEventLoop
,SingleThreadEventExecutor
讯榕。
剩下的繼承體系骤素,我們?cè)诤筮呺S著Netty
源碼的深入在慢慢介紹。
前邊我們提到瘩扼,其實(shí)Reactor
我們可以看作是一個(gè)單線程的線程池谆甜,只有一個(gè)線程用來(lái)執(zhí)行IO就緒事件的監(jiān)聽(tīng)
垃僚,IO事件的處理
集绰,異步任務(wù)的執(zhí)行
。用MpscQueue
來(lái)存儲(chǔ)待執(zhí)行的異步任務(wù)谆棺。
命名前綴為SingleThread
的父類都是對(duì)Reactor
這些行為的分層定義栽燕。也是本小節(jié)要介紹的對(duì)象
SingleThreadEventLoop
Reactor
負(fù)責(zé)執(zhí)行的異步任務(wù)分為三類:
-
普通任務(wù):
這是Netty最主要執(zhí)行的異步任務(wù)罕袋,存放在普通任務(wù)隊(duì)列taskQueue
中。在NioEventLoop
構(gòu)造函數(shù)中創(chuàng)建碍岔。 -
定時(shí)任務(wù):
存放在優(yōu)先級(jí)隊(duì)列中浴讯。后續(xù)我們介紹。 -
尾部任務(wù):
存放于尾部任務(wù)隊(duì)列tailTasks
中蔼啦,尾部任務(wù)一般不常用榆纽,在普通任務(wù)執(zhí)行完后 Reactor線程會(huì)執(zhí)行尾部任務(wù)。使用場(chǎng)景:比如對(duì)Netty 的運(yùn)行狀態(tài)做一些統(tǒng)計(jì)數(shù)據(jù)捏肢,例如任務(wù)循環(huán)的耗時(shí)奈籽、占用物理內(nèi)存的大小等等都可以向尾部隊(duì)列添加一個(gè)收尾任務(wù)完成統(tǒng)計(jì)數(shù)據(jù)的實(shí)時(shí)更新。
SingleThreadEventLoop
負(fù)責(zé)對(duì)尾部任務(wù)隊(duì)列tailTasks
進(jìn)行管理鸵赫。并且提供Channel
向Reactor
注冊(cè)的行為衣屏。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
//任務(wù)隊(duì)列大小,默認(rèn)是無(wú)界隊(duì)列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
//尾部任務(wù)隊(duì)列
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
//尾部隊(duì)列 執(zhí)行一些統(tǒng)計(jì)任務(wù) 不常用
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
@Override
public ChannelFuture register(Channel channel) {
//注冊(cè)channel到綁定的Reactor上
return register(new DefaultChannelPromise(channel, this));
}
}
SingleThreadEventExecutor
SingleThreadEventExecutor
主要負(fù)責(zé)對(duì)普通任務(wù)隊(duì)列
的管理辩棒,以及異步任務(wù)的執(zhí)行
狼忱,Reactor線程的啟停
。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
//parent為Reactor所屬的NioEventLoopGroup Reactor線程組
super(parent);
//向Reactor添加任務(wù)時(shí)一睁,是否喚醒Selector停止輪詢IO就緒事件钻弄,馬上執(zhí)行異步任務(wù)
this.addTaskWakesUp = addTaskWakesUp;
//Reactor異步任務(wù)隊(duì)列的大小
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
//用于啟動(dòng)Reactor線程的executor -> ThreadPerTaskExecutor
this.executor = ThreadExecutorMap.apply(executor, this);
//普通任務(wù)隊(duì)列
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
//任務(wù)隊(duì)列滿時(shí)的拒絕策略
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}
到現(xiàn)在為止,一個(gè)完整的Reactor架構(gòu)
就被創(chuàng)建出來(lái)了者吁。
3. 創(chuàng)建Channel到Reactor的綁定策略
到這一步斧蜕,Reactor線程組NioEventLoopGroup
里邊的所有Reactor
就已經(jīng)全部創(chuàng)建完畢。
無(wú)論是Netty服務(wù)端NioServerSocketChannel
關(guān)注的OP_ACCEPT
事件也好砚偶,還是Netty客戶端NioSocketChannel
關(guān)注的OP_READ
和OP_WRITE
事件也好批销,都需要先注冊(cè)到Reactor
上,Reactor
才能監(jiān)聽(tīng)Channel
上關(guān)注的IO事件
實(shí)現(xiàn)IO多路復(fù)用
染坯。
NioEventLoopGroup
(Reactor線程組)里邊有眾多的Reactor
均芽,那么以上提到的這些Channel
究竟應(yīng)該注冊(cè)到哪個(gè)Reactor
上呢?這就需要一個(gè)綁定的策略來(lái)平均分配单鹿。
還記得我們前邊介紹MultithreadEventExecutorGroup類
的時(shí)候提到的構(gòu)造器參數(shù)EventExecutorChooserFactory
嗎掀宋?
這時(shí)候它就派上用場(chǎng)了,它主要用來(lái)創(chuàng)建Channel
到Reactor
的綁定策略仲锄。默認(rèn)為DefaultEventExecutorChooserFactory.INSTANCE
劲妙。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//從Reactor集合中選擇一個(gè)特定的Reactor的綁定策略 用于channel注冊(cè)綁定到一個(gè)固定的Reactor上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
chooser = chooserFactory.newChooser(children);
}
下面我們來(lái)看下具體的綁定策略:
DefaultEventExecutorChooserFactory
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
...................省略.................
}
我們看到在newChooser
方法綁定策略有兩個(gè)分支,不同之處在于需要判斷Reactor線程組中的Reactor
個(gè)數(shù)是否為2的次冪
儒喊。
Netty中的綁定策略就是采用round-robin
輪詢的方式來(lái)挨個(gè)選擇Reactor
進(jìn)行綁定镣奋。
采用round-robin
的方式進(jìn)行負(fù)載均衡,我們一般會(huì)用round % reactor.length
取余的方式來(lái)挨個(gè)平均的定位到對(duì)應(yīng)的Reactor
上怀愧。
如果Reactor
的個(gè)數(shù)reactor.length
恰好是2的次冪
侨颈,那么就可以用位操作&
運(yùn)算round & reactor.length -1
來(lái)代替%
運(yùn)算round % reactor.length
余赢,因?yàn)槲贿\(yùn)算的性能更高。具體為什么&
運(yùn)算能夠代替%
運(yùn)算哈垢,筆者會(huì)在后面講述時(shí)間輪的時(shí)候?yàn)榇蠹以敿?xì)證明妻柒,這里大家只需記住這個(gè)公式,我們還是聚焦本文的主線耘分。
了解了優(yōu)化原理举塔,我們?cè)诳创a實(shí)現(xiàn)就很容易理解了。
利用%
運(yùn)算的方式Math.abs(idx.getAndIncrement() % executors.length)
來(lái)進(jìn)行綁定求泰。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
利用&
運(yùn)算的方式idx.getAndIncrement() & executors.length - 1
來(lái)進(jìn)行綁定啤贩。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
又一次被Netty對(duì)性能的極致追求所折服~~~~
4. 向Reactor線程組中所有的Reactor注冊(cè)terminated回調(diào)函數(shù)
當(dāng)Reactor線程組NioEventLoopGroup
中所有的Reactor
已經(jīng)創(chuàng)建完畢,Channel
到Reactor
的綁定策略也創(chuàng)建完畢后拜秧,我們就來(lái)到了創(chuàng)建NioEventGroup
的最后一步痹屹。
俗話說(shuō)的好,有創(chuàng)建就有啟動(dòng)枉氮,有啟動(dòng)就有關(guān)閉志衍,這里會(huì)創(chuàng)建Reactor關(guān)閉
的回調(diào)函數(shù)terminationListener
,在Reactor
關(guān)閉時(shí)回調(diào)聊替。
terminationListener
回調(diào)的邏輯很簡(jiǎn)單:
通過(guò)
AtomicInteger terminatedChildren
變量記錄已經(jīng)關(guān)閉的Reactor
個(gè)數(shù)楼肪,用來(lái)判斷NioEventLoopGroup
中的Reactor
是否已經(jīng)全部關(guān)閉。如果所有
Reactor
均已關(guān)閉惹悄,設(shè)置NioEventLoopGroup
中的terminationFuture
為success
春叫。表示Reactor線程組
關(guān)閉成功。
//記錄關(guān)閉的Reactor個(gè)數(shù),當(dāng)Reactor全部關(guān)閉后,才可以認(rèn)為關(guān)閉成功
private final AtomicInteger terminatedChildren = new AtomicInteger();
//關(guān)閉future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
//當(dāng)所有Reactor關(guān)閉后 才認(rèn)為是關(guān)閉成功
terminationFuture.setSuccess(null);
}
}
};
//為所有Reactor添加terminationListener
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
我們?cè)诨氐轿恼麻_(kāi)頭Netty服務(wù)端代碼模板
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
//創(chuàng)建主從Reactor線程組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
...........省略............
}
}
現(xiàn)在Netty的主從Reactor線程組
就已經(jīng)創(chuàng)建完畢斥黑,此時(shí)Netty服務(wù)端的骨架已經(jīng)搭建完畢缎除,骨架如下:
總結(jié)
本文介紹了首先介紹了Netty對(duì)各種IO模型
的支持以及如何輕松切換各種IO模型
配喳。
還花了大量的篇幅介紹Netty服務(wù)端的核心引擎主從Reactor線程組
的創(chuàng)建過(guò)程。在這個(gè)過(guò)程中,我們還提到了Netty對(duì)各種細(xì)節(jié)進(jìn)行的優(yōu)化,展現(xiàn)了Netty對(duì)性能極致的追求晨横。
好了,Netty服務(wù)端的骨架已經(jīng)搭好箫柳,剩下的事情就該綁定端口地址然后接收連接了手形,我們下篇文章再見(jiàn)~~~