聊聊Netty那些事兒之Reactor在Netty中的實(shí)現(xiàn)(創(chuàng)建篇)

本系列Netty源碼解析文章基于 4.1.56.Final版本

在上篇文章《聊聊Netty那些事兒之從內(nèi)核角度看IO模型》中我們花了大量的篇幅來(lái)從內(nèi)核角度詳細(xì)講述了五種IO模型的演進(jìn)過(guò)程以及ReactorIO線程模型的底層基石IO多路復(fù)用技術(shù)在內(nèi)核中的實(shí)現(xiàn)原理尔许。

最后我們引出了netty中使用的主從Reactor IO線程模型。

netty中的reactor.png

通過(guò)上篇文章的介紹踏拜,我們已經(jīng)清楚了在IO調(diào)用的過(guò)程中內(nèi)核幫我們搞了哪些事情愧口,那么俗話說(shuō)的好內(nèi)核領(lǐng)進(jìn)門(mén)凡蜻,修行在netty良拼,netty在用戶空間又幫我們搞了哪些事情?

那么從本文開(kāi)始疮方,筆者將從源碼角度來(lái)帶大家看下上圖中的Reactor IO線程模型在Netty中是如何實(shí)現(xiàn)的。

本文作為Reactor在Netty中實(shí)現(xiàn)系列文章中的開(kāi)篇文章,筆者先來(lái)為大家介紹Reactor的骨架是如何創(chuàng)建出來(lái)的蒜茴。

在上篇文章中我們提到Netty采用的是主從Reactor多線程的模型星爪,但是它在實(shí)現(xiàn)上又與Doug LeaScalable IO in Java論文中提到的經(jīng)典主從Reactor多線程模型有所差異。

經(jīng)典主從Reactor多線程模型

Netty中的Reactor是以Group的形式出現(xiàn)的粉私,主從Reactor在Netty中就是主從Reactor組顽腾,每個(gè)Reactor Group中會(huì)有多個(gè)Reactor用來(lái)執(zhí)行具體的IO任務(wù)。當(dāng)然在netty中Reactor不只用來(lái)執(zhí)行IO任務(wù)诺核,這個(gè)我們后面再說(shuō)抄肖。

  • Main Reactor Group中的Reactor數(shù)量取決于服務(wù)端要監(jiān)聽(tīng)的端口個(gè)數(shù),通常我們的服務(wù)端程序只會(huì)監(jiān)聽(tīng)一個(gè)端口猪瞬,所以Main Reactor Group只會(huì)有一個(gè)Main Reactor線程來(lái)處理最重要的事情:綁定端口地址憎瘸,接收客戶端連接為客戶端創(chuàng)建對(duì)應(yīng)的SocketChannel陈瘦,將客戶端SocketChannel分配給一個(gè)固定的Sub Reactor幌甘。也就是上篇文章筆者為大家舉的例子,飯店最重要的工作就是先把客人迎接進(jìn)來(lái)痊项。“我家大門(mén)常打開(kāi)锅风,開(kāi)放懷抱等你,擁抱過(guò)就有了默契你會(huì)愛(ài)上這里......”
我家大門(mén)常打開(kāi)鞍泉,開(kāi)放懷抱等你.png
  • Sub Reactor Group里有多個(gè)Reactor線程皱埠,Reactor線程的個(gè)數(shù)可以通過(guò)系統(tǒng)參數(shù)-D io.netty.eventLoopThreads指定。默認(rèn)的Reactor的個(gè)數(shù)為CPU核數(shù) * 2咖驮。Sub Reactor線程主要用來(lái)輪詢客戶端SocketChannel上的IO就緒事件處理IO就緒事件托修,執(zhí)行異步任務(wù)忘巧。 Sub Reactor Group做的事情就是上篇飯店例子中服務(wù)員的工作,客人進(jìn)來(lái)了要為客人分配座位睦刃,端茶送水砚嘴,做菜上菜。“不管遠(yuǎn)近都是客人涩拙,請(qǐng)不用客氣际长,相約好了在一起,我們歡迎您......”
我們歡迎您..png

一個(gè)客戶端SocketChannel只能分配給一個(gè)固定的Sub Reactor兴泥。一個(gè)Sub Reactor負(fù)責(zé)處理多個(gè)客戶端SocketChannel工育,這樣可以將服務(wù)端承載的全量客戶端連接分?jǐn)偟蕉鄠€(gè)Sub Reactor中處理,同時(shí)也能保證客戶端SocketChannel上的IO處理的線程安全性郁轻。

由于文章篇幅的關(guān)系翅娶,作為Reactor在netty中實(shí)現(xiàn)的第一篇我們主要來(lái)介紹主從Reactor Group的創(chuàng)建流程文留,骨架脈絡(luò)先搭好。

下面我們來(lái)看一段Netty服務(wù)端代碼的編寫(xiě)模板竭沫,從代碼模板的流程中我們來(lái)解析下主從Reactor的創(chuàng)建流程以及在這個(gè)過(guò)程中所涉及到的Netty核心類燥翅。

Netty服務(wù)端代碼模板

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //創(chuàng)建主從Reactor線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主從Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel類型
             .option(ChannelOption.SO_BACKLOG, 100)//設(shè)置主Reactor中channel的option選項(xiàng)
             .handler(new LoggingHandler(LogLevel.INFO))//設(shè)置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//設(shè)置從Reactor中注冊(cè)channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 綁定端口啟動(dòng)服務(wù),開(kāi)始監(jiān)聽(tīng)accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


  1. 首先我們要?jiǎng)?chuàng)建Netty最核心的部分 -> 創(chuàng)建主從Reactor Group蜕提,在Netty中EventLoopGroup就是Reactor Group的實(shí)現(xiàn)類森书。對(duì)應(yīng)的EventLoop就是Reactor的實(shí)現(xiàn)類。
  //創(chuàng)建主從Reactor線程組
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  1. 創(chuàng)建用于IO處理ChannelHandler谎势,實(shí)現(xiàn)相應(yīng)IO事件的回調(diào)函數(shù)凛膏,編寫(xiě)對(duì)應(yīng)的IO處理邏輯。注意這里只是簡(jiǎn)單示例哈脏榆,詳細(xì)的IO事件處理猖毫,筆者會(huì)單獨(dú)開(kāi)一篇文章專門(mén)講述。
final EchoServerHandler serverHandler = new EchoServerHandler();

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ................省略IO處理邏輯................
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 創(chuàng)建ServerBootstrapNetty服務(wù)端啟動(dòng)類须喂,并在啟動(dòng)類中配置啟動(dòng)Netty服務(wù)端所需要的一些必備信息吁断。

    • 通過(guò)serverBootstrap.group(bossGroup, workerGroup)為Netty服務(wù)端配置主從Reactor Group實(shí)例。

    • 通過(guò)serverBootstrap.channel(NioServerSocketChannel.class)配置Netty服務(wù)端的ServerSocketChannel用于綁定端口地址以及創(chuàng)建客戶端SocketChannel坞生。Netty中的NioServerSocketChannel.class就是對(duì)JDK NIO中ServerSocketChannel的封裝仔役。而用于表示客戶端連接NioSocketChannel是對(duì)JDK NIO SocketChannel封裝。

    在上篇文章介紹Socket內(nèi)核結(jié)構(gòu)小節(jié)中我們提到是己,在編寫(xiě)服務(wù)端網(wǎng)絡(luò)程序時(shí)又兵,我們首先要?jiǎng)?chuàng)建一個(gè)Socket用于listen和bind端口地址,我們把這個(gè)叫做監(jiān)聽(tīng)Socket,這里對(duì)應(yīng)的就是NioServerSocketChannel.class卒废。當(dāng)客戶端連接完成三次握手沛厨,系統(tǒng)調(diào)用accept函數(shù)會(huì)基于監(jiān)聽(tīng)Socket創(chuàng)建出來(lái)一個(gè)新的Socket專門(mén)用于與客戶端之間的網(wǎng)絡(luò)通信我們稱為客戶端連接Socket,這里對(duì)應(yīng)的就是NioSocketChannel.class

    • serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)設(shè)置服務(wù)端ServerSocketChannel中的SocketOption。關(guān)于SocketOption的選項(xiàng)我們后邊的文章再聊摔认,本文主要聚焦在Netty Main Reactor Group的創(chuàng)建及工作流程俄烁。

    • serverBootstrap.handler(....)設(shè)置服務(wù)端NioServerSocketChannel中對(duì)應(yīng)Pipieline中的ChannelHandler

    netty有兩種Channel類型:一種是服務(wù)端用于監(jiān)聽(tīng)綁定端口地址的NioServerSocketChannel,一種是用于客戶端通信的NioSocketChannel级野。每種Channel類型實(shí)例都會(huì)對(duì)應(yīng)一個(gè)PipeLine用于編排對(duì)應(yīng)channel實(shí)例上的IO事件處理邏輯。PipeLine中組織的就是ChannelHandler用于編寫(xiě)特定的IO處理邏輯粹胯。

    注意serverBootstrap.handler設(shè)置的是服務(wù)端NioServerSocketChannel PipeLine中的ChannelHandler蓖柔。

    • serverBootstrap.childHandler(ChannelHandler childHandler)用于設(shè)置客戶端NioSocketChannel中對(duì)應(yīng)Pipieline中的ChannelHandler。我們通常配置的編碼解碼器就是在這里风纠。

    ServerBootstrap啟動(dòng)類方法帶有child前綴的均是設(shè)置客戶端NioSocketChannel屬性的况鸣。

    ChannelInitializer是用于當(dāng)SocketChannel成功注冊(cè)到綁定的Reactor上后,用于初始化該SocketChannelPipeline竹观。它的initChannel方法會(huì)在注冊(cè)成功后執(zhí)行镐捧。這里只是捎帶提一下,讓大家有個(gè)初步印象,后面我會(huì)專門(mén)介紹策吠。

  2. ChannelFuture f = serverBootstrap.bind(PORT).sync()這一步會(huì)是下篇文章要重點(diǎn)分析的主題Main Reactor Group的啟動(dòng)茄猫,綁定端口地址,開(kāi)始監(jiān)聽(tīng)客戶端連接事件(OP_ACCEPT)列牺。本文我們只關(guān)注創(chuàng)建流程整陌。

  3. f.channel().closeFuture().sync()等待服務(wù)端NioServerSocketChannel關(guān)閉。Netty服務(wù)端到這里正式啟動(dòng)瞎领,并準(zhǔn)備好接受客戶端連接的準(zhǔn)備泌辫。

  4. shutdownGracefully優(yōu)雅關(guān)閉主從Reactor線程組里的所有Reactor線程

Netty對(duì)IO模型的支持

在上篇文章中我們介紹了五種IO模型九默,Netty中支持BIO,NIO,AIO以及多種操作系統(tǒng)下的IO多路復(fù)用技術(shù)實(shí)現(xiàn)震放。

在Netty中切換這幾種IO模型也是非常的方便,下面我們來(lái)看下Netty如何對(duì)這幾種IO模型進(jìn)行支持的驼修。

首先我們介紹下幾個(gè)與IO模型相關(guān)的重要接口:

EventLoop

EventLoop就是Netty中的Reactor殿遂,可以說(shuō)它就是Netty的引擎,負(fù)責(zé)Channel上IO就緒事件的監(jiān)聽(tīng)邪锌,IO就緒事件的處理勉躺,異步任務(wù)的執(zhí)行驅(qū)動(dòng)著整個(gè)Netty的運(yùn)轉(zhuǎn)。

不同IO模型下觅丰,EventLoop有著不同的實(shí)現(xiàn)饵溅,我們只需要切換不同的實(shí)現(xiàn)類就可以完成對(duì)NettyIO模型的切換。

BIO NIO AIO
ThreadPerChannelEventLoop NioEventLoop AioEventLoop

NIO模型下Netty會(huì)自動(dòng)根據(jù)操作系統(tǒng)以及版本的不同選擇對(duì)應(yīng)的IO多路復(fù)用技術(shù)實(shí)現(xiàn)妇萄。比如Linux 2.6版本以上用的是Epoll蜕企,2.6版本以下用的是Poll,Mac下采用的是Kqueue冠句。

其中Linux kernel 在5.1版本引入的異步IO庫(kù)io_uring正在netty中孵化轻掩。

EventLoopGroup

Netty中的Reactor是以Group的形式出現(xiàn)的,EventLoopGroup正是Reactor組的接口定義懦底,負(fù)責(zé)管理Reactor唇牧,Netty中的Channel就是通過(guò)EventLoopGroup注冊(cè)到具體的Reactor上的。

Netty的IO線程模型是主從Reactor多線程模型聚唐,主從Reactor線程組在Netty源碼中對(duì)應(yīng)的其實(shí)就是兩個(gè)EventLoopGroup實(shí)例丐重。

不同的IO模型也有對(duì)應(yīng)的實(shí)現(xiàn):

BIO NIO AIO
ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup

ServerSocketChannel

用于Netty服務(wù)端使用的ServerSocketChannel,對(duì)應(yīng)于上篇文章提到的監(jiān)聽(tīng)Socket杆查,負(fù)責(zé)綁定監(jiān)聽(tīng)端口地址扮惦,接收客戶端連接并創(chuàng)建用于與客戶端通信的SocketChannel

不同的IO模型下的實(shí)現(xiàn):

BIO NIO AIO
OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

SocketChannel

用于與客戶端通信的SocketChannel亲桦,對(duì)應(yīng)于上篇文章提到的客戶端連接Socket崖蜜,當(dāng)客戶端完成三次握手后浊仆,由系統(tǒng)調(diào)用accept函數(shù)根據(jù)監(jiān)聽(tīng)Socket創(chuàng)建。

不同的IO模型下的實(shí)現(xiàn):

BIO NIO AIO
OioSocketChannel NioSocketChannel AioSocketChannel

我們看到在不同IO模型的實(shí)現(xiàn)中豫领,Netty這些圍繞IO模型的核心類只是前綴的不同:

  • BIO對(duì)應(yīng)的前綴為Oio表示old io抡柿,現(xiàn)在已經(jīng)廢棄不推薦使用。
  • NIO對(duì)應(yīng)的前綴為Nio氏堤,正是Netty推薦也是我們常用的非阻塞IO模型沙绝。
  • AIO對(duì)應(yīng)的前綴為Aio,由于Linux下的異步IO機(jī)制實(shí)現(xiàn)的并不成熟鼠锈,性能提升表現(xiàn)上也不明顯闪檬,現(xiàn)已被刪除。

我們只需要將IO模型的這些核心接口對(duì)應(yīng)的實(shí)現(xiàn)類前綴改為對(duì)應(yīng)IO模型的前綴购笆,就可以輕松在Netty中完成對(duì)IO模型的切換粗悯。

image.png

多種NIO的實(shí)現(xiàn)

Common Linux Mac
NioEventLoopGroup EpollEventLoopGroup KQueueEventLoopGroup
NioEventLoop EpollEventLoop KQueueEventLoop
NioServerSocketChannel EpollServerSocketChannel KQueueServerSocketChannel
NioSocketChannel EpollSocketChannel KQueueSocketChannel

我們通常在使用NIO模型的時(shí)候會(huì)使用Common列下的這些IO模型核心類,Common類也會(huì)根據(jù)操作系統(tǒng)的不同自動(dòng)選擇JDK在對(duì)應(yīng)平臺(tái)下的IO多路復(fù)用技術(shù)的實(shí)現(xiàn)同欠。

而Netty自身也根據(jù)操作系統(tǒng)的不同提供了自己對(duì)IO多路復(fù)用技術(shù)的實(shí)現(xiàn)样傍,比JDK的實(shí)現(xiàn)性能更優(yōu)。比如:

  • JDK的 NIO 默認(rèn)實(shí)現(xiàn)是水平觸發(fā)铺遂,Netty 是邊緣觸發(fā)(默認(rèn))和水平觸發(fā)可切換衫哥。。
  • Netty 實(shí)現(xiàn)的垃圾回收更少襟锐、性能更好撤逢。

我們編寫(xiě)Netty服務(wù)端程序的時(shí)候也可以根據(jù)操作系統(tǒng)的不同,采用Netty自身的實(shí)現(xiàn)來(lái)進(jìn)一步優(yōu)化程序粮坞。做法也很簡(jiǎn)單蚊荣,直接將上圖中紅框里的實(shí)現(xiàn)類替換成Netty的自身實(shí)現(xiàn)類即可完成切換。


經(jīng)過(guò)以上對(duì)Netty服務(wù)端代碼編寫(xiě)模板以及IO模型相關(guān)核心類的簡(jiǎn)單介紹莫杈,我們對(duì)Netty的創(chuàng)建流程有了一個(gè)簡(jiǎn)單粗略的總體認(rèn)識(shí)互例,下面我們來(lái)深入剖析下創(chuàng)建流程過(guò)程中的每一個(gè)步驟以及這個(gè)過(guò)程中涉及到的核心類實(shí)現(xiàn)。

以下源碼解析部分我們均采用Common列NIO相關(guān)的實(shí)現(xiàn)進(jìn)行解析筝闹。

創(chuàng)建主從Reactor線程組

在Netty服務(wù)端程序編寫(xiě)模板的開(kāi)始媳叨,我們首先會(huì)創(chuàng)建兩個(gè)Reactor線程組:

netty中的reactor.png
  • 一個(gè)是主Reactor線程組bossGroup用于監(jiān)聽(tīng)客戶端連接,創(chuàng)建客戶端連接NioSocketChannel关顷,并將創(chuàng)建好的客戶端連接NioSocketChannel注冊(cè)到從Reactor線程組中一個(gè)固定的Reactor上肩杈。

  • 一個(gè)是從Reactor線程組workerGroupworkerGroup中的Reactor負(fù)責(zé)監(jiān)聽(tīng)綁定在其上的客戶端連接NioSocketChannel上的IO就緒事件解寝,并處理IO就緒事件執(zhí)行異步任務(wù)艘儒。

  //創(chuàng)建主從Reactor線程組
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();

Netty中Reactor線程組的實(shí)現(xiàn)類為NioEventLoopGroup聋伦,在創(chuàng)建bossGroupworkerGroup的時(shí)候用到了NioEventLoopGroup的兩個(gè)構(gòu)造函數(shù):

  • nThreads參數(shù)的構(gòu)造函數(shù)public NioEventLoopGroup(int nThreads)夫偶。
  • 不帶nThreads參數(shù)的默認(rèn)構(gòu)造函數(shù)public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    /**
     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup() {
        this(0);
    }

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    ......................省略...........................
}

nThreads參數(shù)表示當(dāng)前要?jiǎng)?chuàng)建的Reactor線程組內(nèi)包含多少個(gè)Reactor線程。不指定nThreads參數(shù)的話采用默認(rèn)的Reactor線程個(gè)數(shù)觉增,用0表示兵拢。

最終會(huì)調(diào)用到構(gòu)造函數(shù)

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

下面簡(jiǎn)單介紹下構(gòu)造函數(shù)中這幾個(gè)參數(shù)的作用,后面我們?cè)谥v解本文主線的過(guò)程中還會(huì)提及這幾個(gè)參數(shù)逾礁,到時(shí)在詳細(xì)介紹说铃,這里只是讓大家有個(gè)初步印象,不必做過(guò)多的糾纏嘹履。

  • Executor executor:負(fù)責(zé)啟動(dòng)Reactor線程進(jìn)而Reactor才可以開(kāi)始工作腻扇。

Reactor線程組NioEventLoopGroup負(fù)責(zé)創(chuàng)建Reactor線程,在創(chuàng)建的時(shí)候會(huì)將executor傳入砾嫉。

  • RejectedExecutionHandler: 當(dāng)向Reactor添加異步任務(wù)添加失敗時(shí)幼苛,采用的拒絕策略。Reactor的任務(wù)不只是監(jiān)聽(tīng)I(yíng)O活躍事件和IO任務(wù)的處理焕刮,還包括對(duì)異步任務(wù)的處理舶沿。這里大家只需有個(gè)這樣的概念,后面筆者會(huì)專門(mén)詳細(xì)介紹配并。

  • SelectorProvider selectorProvider: Reactor中的IO模型為IO多路復(fù)用模型括荡,對(duì)應(yīng)于JDK NIO中的實(shí)現(xiàn)為java.nio.channels.Selector(就是我們上篇文章中提到的select,poll,epoll),每個(gè)Reator中都包含一個(gè)Selector溉旋,用于輪詢注冊(cè)在該Reactor上的所有Channel上的IO事件畸冲。SelectorProvider就是用來(lái)創(chuàng)建Selector的。

  • SelectStrategyFactory selectStrategyFactory: Reactor最重要的事情就是輪詢注冊(cè)其上的Channel上的IO就緒事件低滩,這里的SelectStrategyFactory用于指定輪詢策略召夹,默認(rèn)為DefaultSelectStrategyFactory.INSTANCE

最終會(huì)將這些參數(shù)交給NioEventLoopGroup的父類構(gòu)造器恕沫,下面我們來(lái)看下NioEventLoopGroup類的繼承結(jié)構(gòu):

image.png

NioEventLoopGroup類的繼承結(jié)構(gòu)乍一看比較復(fù)雜监憎,大家不要慌,筆者會(huì)隨著主線的深入慢慢地介紹這些父類接口婶溯,我們現(xiàn)在重點(diǎn)關(guān)注Mutithread前綴的類鲸阔。

我們知道NioEventLoopGroup是Netty中的Reactor線程組的實(shí)現(xiàn),既然是線程組那么肯定是負(fù)責(zé)管理和創(chuàng)建多個(gè)Reactor線程的迄委,所以Mutithread前綴的類定義的行為自然是對(duì)Reactor線程組內(nèi)多個(gè)Reactor線程的創(chuàng)建和管理工作褐筛。

MultithreadEventLoopGroup

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    //默認(rèn)Reactor個(gè)數(shù)
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    ...................省略.....................
}

MultithreadEventLoopGroup類主要的功能就是用來(lái)確定Reactor線程組內(nèi)Reactor的個(gè)數(shù)。

默認(rèn)的Reactor的個(gè)數(shù)存放于字段DEFAULT_EVENT_LOOP_THREADS中叙身。

static {}靜態(tài)代碼塊中我們可以看出默認(rèn)Reactor的個(gè)數(shù)的獲取邏輯:

  • 可以通過(guò)系統(tǒng)變量 -D io.netty.eventLoopThreads"指定渔扎。

  • 如果不指定,那么默認(rèn)的就是NettyRuntime.availableProcessors() * 2

當(dāng)nThread參數(shù)設(shè)置為0采用默認(rèn)設(shè)置時(shí)信轿,Reactor線程組內(nèi)的Reactor個(gè)數(shù)則設(shè)置為DEFAULT_EVENT_LOOP_THREADS晃痴。

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup這里就是本小節(jié)的核心残吩,主要用來(lái)定義創(chuàng)建和管理Reactor的行為。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    //Reactor線程組中的Reactor集合
    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    //從Reactor group中選擇一個(gè)特定的Reactor的選擇策略 用于channel注冊(cè)綁定到一個(gè)固定的Reactor上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    ............................省略................................
}

首先介紹一個(gè)新的構(gòu)造器參數(shù)EventExecutorChooserFactory chooserFactory倘核。當(dāng)客戶端連接完成三次握手后泣侮,Main Reactor會(huì)創(chuàng)建客戶端連接NioSocketChannel,并將其綁定到Sub Reactor Group中的一個(gè)固定Reactor紧唱,那么具體要綁定到哪個(gè)具體的Sub Reactor上呢活尊?這個(gè)綁定策略就是由chooserFactory來(lái)創(chuàng)建的。默認(rèn)為DefaultEventExecutorChooserFactory漏益。

下面就是本小節(jié)的主題Reactor線程組的創(chuàng)建過(guò)程:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            //用于創(chuàng)建Reactor線程
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];
        //循環(huán)創(chuàng)建reaactor group中的Reactor
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //創(chuàng)建reactor
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                     ................省略................
                }
            }
        }
        //創(chuàng)建channel到Reactor的綁定策略
        chooser = chooserFactory.newChooser(children);

         ................省略................

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

1. 創(chuàng)建用于啟動(dòng)Reactor線程的executor

在Netty Reactor Group中的單個(gè)ReactorIO線程模型為上篇文章提到的單Reactor單線程模型蛹锰,一個(gè)Reactor線程負(fù)責(zé)輪詢注冊(cè)其上的所有Channel中的IO就緒事件宁仔,處理IO事件翎苫,執(zhí)行Netty中的異步任務(wù)等工作煎谍。正是這個(gè)Reactor線程驅(qū)動(dòng)著整個(gè)Netty的運(yùn)轉(zhuǎn),可謂是Netty的核心引擎转捕。

單Reactor單線程模型.png

而這里的executor就是負(fù)責(zé)啟動(dòng)Reactor線程的痘儡,從創(chuàng)建源碼中我們可以看到executor的類型為ThreadPerTaskExecutor沉删。

ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

我們看到ThreadPerTaskExecutor做的事情很簡(jiǎn)單醉途,從它的命名前綴ThreadPerTask我們就可以猜出它的工作方式矾瑰,就是來(lái)一個(gè)任務(wù)就創(chuàng)建一個(gè)線程執(zhí)行。而創(chuàng)建的這個(gè)線程正是netty的核心引擎Reactor線程隘擎。

Reactor線程啟動(dòng)的時(shí)候殴穴,Netty會(huì)將Reactor線程要做的事情封裝成Runnable,丟給exexutor啟動(dòng)。

Reactor線程的核心就是一個(gè)死循環(huán)不停的輪詢IO就緒事件推正,處理IO事件恍涂,執(zhí)行異步任務(wù)。一刻也不停歇植榕,堪稱996典范

這里向大家先賣(mài)個(gè)關(guān)子尼夺,"Reactor線程是何時(shí)啟動(dòng)的呢尊残??"

2. 創(chuàng)建Reactor

Reactor線程組NioEventLoopGroup包含多個(gè)Reactor寝衫,存放于private final EventExecutor[] children數(shù)組中。

所以下面的事情就是創(chuàng)建nThread個(gè)Reactor汹胃,并存放于EventExecutor[] children字段中,

我們來(lái)看下用于創(chuàng)建ReactornewChild(executor, args)方法:

newChild

newChild方法是MultithreadEventExecutorGroup中的一個(gè)抽象方法宰掉,提供給具體子類實(shí)現(xiàn)。

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

這里我們解析的是NioEventLoopGroup挪拟,我們來(lái)看下newChild在該類中的實(shí)現(xiàn):

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
}

前邊提到的眾多構(gòu)造器參數(shù)舞丛,這里會(huì)通過(guò)可變參數(shù)Object... args傳入到Reactor類NioEventLoop的構(gòu)造器中球切。

這里介紹下新的參數(shù)EventLoopTaskQueueFactory queueFactory吨凑,前邊提到Netty中的Reactor主要工作是輪詢注冊(cè)其上的所有Channel上的IO就緒事件糙臼,處理IO就緒事件。除了這些主要的工作外,Netty為了極致的壓榨Reactor的性能凰棉,還會(huì)讓它做一些異步任務(wù)的執(zhí)行工作掏秩。既然要執(zhí)行異步任務(wù),那么Reactor中就需要一個(gè)隊(duì)列來(lái)保存任務(wù)魏宽。

這里的EventLoopTaskQueueFactory就是用來(lái)創(chuàng)建這樣的一個(gè)隊(duì)列來(lái)保存Reactor中待執(zhí)行的異步任務(wù)构诚。

可以把Reactor理解成為一個(gè)單線程的線程池類似JDK中的SingleThreadExecutor叠聋,僅用一個(gè)線程來(lái)執(zhí)行輪詢IO就緒事件處理IO就緒事件镇匀,執(zhí)行異步任務(wù)群发。同時(shí)待執(zhí)行的異步任務(wù)保存在Reactor里的taskQueue中。

NioEventLoop

public final class NioEventLoop extends SingleThreadEventLoop {
    //用于創(chuàng)建JDK NIO Selector,ServerSocketChannel
    private final SelectorProvider provider;
    //Selector輪詢策略 決定什么時(shí)候輪詢,什么時(shí)候處理IO事件告材,什么時(shí)候執(zhí)行異步任務(wù)
    private final SelectStrategy selectStrategy;
    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;
    private Selector unwrappedSelector;

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
}

這里就正式開(kāi)始了Reactor的創(chuàng)建過(guò)程疤剑,我們知道Reactor的核心是采用的IO多路復(fù)用模型來(lái)對(duì)客戶端連接上的IO事件進(jìn)行監(jiān)聽(tīng),所以最重要的事情是創(chuàng)建Selector(JDK NIO 中IO多路復(fù)用技術(shù)的實(shí)現(xiàn))。

可以把Selector理解為我們上篇文章介紹的Select,poll,epoll管钳,它是JDK NIO對(duì)操作系統(tǒng)內(nèi)核提供的這些IO多路復(fù)用技術(shù)的封裝葫隙。

openSelector

openSelectorNioEventLoop類中用于創(chuàng)建IO多路復(fù)用Selector焰手,并對(duì)創(chuàng)建出來(lái)的JDK NIO 原生的Selector進(jìn)行性能優(yōu)化躬拢。

首先會(huì)通過(guò)SelectorProvider#openSelector創(chuàng)建JDK NIO原生的Selector

 private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            //通過(guò)JDK NIO SelectorProvider創(chuàng)建Selector
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        ..................省略.............
}

SelectorProvider會(huì)根據(jù)操作系統(tǒng)的不同選擇JDK在不同操作系統(tǒng)版本下的對(duì)應(yīng)Selector的實(shí)現(xiàn)。Linux下會(huì)選擇Epoll魏身,Mac下會(huì)選擇Kqueue回季。

下面我們就來(lái)看下SelectorProvider是如何做到自動(dòng)適配不同操作系統(tǒng)下IO多路復(fù)用實(shí)現(xiàn)的

SelectorProvider

    public NioEventLoopGroup(ThreadFactory threadFactory) {
        this(0, threadFactory, SelectorProvider.provider());
    }

SelectorProvider是在前面介紹的NioEventLoopGroup類構(gòu)造函數(shù)中通過(guò)調(diào)用SelectorProvider.provider()被加載,并通過(guò)NioEventLoopGroup#newChild方法中的可變長(zhǎng)參數(shù)Object... args傳遞到NioEventLoop中的private final SelectorProvider provider字段中泄私。

SelectorProvider的加載過(guò)程:

public abstract class SelectorProvider {

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
}

SelectorProvider加載源碼中我們可以看出讨阻,SelectorProvider的加載方式有三種板辽,優(yōu)先級(jí)如下:

  1. 通過(guò)系統(tǒng)變量-D java.nio.channels.spi.SelectorProvider指定SelectorProvider的自定義實(shí)現(xiàn)類全限定名。通過(guò)應(yīng)用程序類加載器(Application Classloader)加載。
    private static boolean loadProviderFromProperty() {
        String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
        if (cn == null)
            return false;
        try {
            Class<?> c = Class.forName(cn, true,
                                       ClassLoader.getSystemClassLoader());
            provider = (SelectorProvider)c.newInstance();
            return true;
        } 
        .................省略.............
    }
  1. 通過(guò)SPI方式加載。在工程目錄META-INF/services下定義名為java.nio.channels.spi.SelectorProviderSPI文件轴踱,文件中第一個(gè)定義的SelectorProvider實(shí)現(xiàn)類全限定名就會(huì)被加載沸版。
    private static boolean loadProviderAsService() {

        ServiceLoader<SelectorProvider> sl =
            ServiceLoader.load(SelectorProvider.class,
                               ClassLoader.getSystemClassLoader());
        Iterator<SelectorProvider> i = sl.iterator();
        for (;;) {
            try {
                if (!i.hasNext())
                    return false;
                provider = i.next();
                return true;
            } catch (ServiceConfigurationError sce) {
                if (sce.getCause() instanceof SecurityException) {
                    // Ignore the security exception, try the next provider
                    continue;
                }
                throw sce;
            }
        }
    }
  1. 如果以上兩種方式均未被定義橙凳,那么就采用SelectorProvider系統(tǒng)默認(rèn)實(shí)現(xiàn)sun.nio.ch.DefaultSelectorProvider。筆者當(dāng)前使用的操作系統(tǒng)是MacOS荡灾,從源碼中我們可以看到自動(dòng)適配了KQueue實(shí)現(xiàn)嗓节。
public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new KQueueSelectorProvider();
    }
}

不同操作系統(tǒng)中JDK對(duì)于DefaultSelectorProvider會(huì)有所不同信姓,Linux內(nèi)核版本2.6以上對(duì)應(yīng)的Epoll,Linux內(nèi)核版本2.6以下對(duì)應(yīng)的Poll,MacOS對(duì)應(yīng)的是KQueue俊性。

下面我們接著回到io.netty.channel.nio.NioEventLoop#openSelector的主線上來(lái)绽诚。

Netty對(duì)JDK NIO 原生Selector的優(yōu)化

首先在NioEventLoop中有一個(gè)Selector優(yōu)化開(kāi)關(guān)DISABLE_KEY_SET_OPTIMIZATION,通過(guò)系統(tǒng)變量-D io.netty.noKeySetOptimization指定,默認(rèn)是開(kāi)啟的儡毕,表示需要對(duì)JDK NIO原生Selector進(jìn)行優(yōu)化疆股。

public final class NioEventLoop extends SingleThreadEventLoop {
   //Selector優(yōu)化開(kāi)關(guān) 默認(rèn)開(kāi)啟 為了遍歷的效率 會(huì)對(duì)Selector中的SelectedKeys進(jìn)行數(shù)據(jù)結(jié)構(gòu)優(yōu)化
    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}

如果優(yōu)化開(kāi)關(guān)DISABLE_KEY_SET_OPTIMIZATION是關(guān)閉的,那么直接返回JDK NIO原生的Selector

private SelectorTuple openSelector() {
        ..........SelectorProvider創(chuàng)建JDK NIO  原生Selector..............

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            //JDK NIO原生Selector 卷雕,Selector優(yōu)化開(kāi)關(guān) 默認(rèn)開(kāi)啟需要對(duì)Selector進(jìn)行優(yōu)化
            return new SelectorTuple(unwrappedSelector);
        }
}

下面為Netty對(duì)JDK NIO原生的Selector的優(yōu)化過(guò)程:

  1. 獲取JDK NIO原生Selector的抽象實(shí)現(xiàn)類sun.nio.ch.SelectorImplJDK NIO原生Selector的實(shí)現(xiàn)均繼承于該抽象類。用于判斷由SelectorProvider創(chuàng)建出來(lái)的Selector是否為JDK默認(rèn)實(shí)現(xiàn)SelectorProvider第三種加載方式)囊扳。因?yàn)?code>SelectorProvider可以是自定義加載细移,所以它創(chuàng)建出來(lái)的Selector并不一定是JDK NIO 原生的。
       Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

JDK NIO Selector的抽象類sun.nio.ch.SelectorImpl

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO就緒的SelectionKey(里面包裹著channel)
    protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    //注冊(cè)在該Selector上的所有SelectionKey(里面包裹著channel)
    protected HashSet<SelectionKey> keys;

    // Public views of the key sets
    //用于向調(diào)用線程返回的keys,不可變
    private Set<SelectionKey> publicKeys;             // Immutable
    //當(dāng)有IO就緒的SelectionKey時(shí)鸟廓,向調(diào)用線程返回擎浴。只可刪除其中元素契讲,不可增加
    private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition

    protected SelectorImpl(SelectorProvider sp) {
        super(sp);
        keys = new HashSet<SelectionKey>();
        selectedKeys = new HashSet<SelectionKey>();
        if (Util.atBugLevel("1.4")) {
            publicKeys = keys;
            publicSelectedKeys = selectedKeys;
        } else {
            //不可變
            publicKeys = Collections.unmodifiableSet(keys);
            //只可刪除其中元素,不可增加
            publicSelectedKeys = Util.ungrowableSet(selectedKeys);
        }
    }
}

這里筆者來(lái)簡(jiǎn)單介紹下JDK NIO中的Selector中這幾個(gè)字段的含義你虹,我們可以和上篇文章講到的epoll在內(nèi)核中的結(jié)構(gòu)做類比,方便大家后續(xù)的理解:

image.png
  • Set<SelectionKey> selectedKeys 類似于我們上篇文章講解Epoll時(shí)提到的就緒隊(duì)列eventpoll->rdllistSelector這里大家可以理解為EpollSelector會(huì)將自己監(jiān)聽(tīng)到的IO就緒Channel放到selectedKeys中谴供。

這里的SelectionKey暫且可以理解為ChannelSelector中的表示,類比上圖中epitem結(jié)構(gòu)里的epoll_event,封裝IO就緒Socket的信息。
其實(shí)SelectionKey里包含的信息不止是Channel還有很多IO相關(guān)的信息螃宙。后面我們?cè)谠敿?xì)介紹。

  • HashSet<SelectionKey> keys:這里存放的是所有注冊(cè)到該Selector上的Channel堂湖。類比epoll中的紅黑樹(shù)結(jié)構(gòu)rb_root

SelectionKeyChannel注冊(cè)到Selector中后生成斥季。

  • Set<SelectionKey> publicSelectedKeys 相當(dāng)于是selectedKeys的視圖渊迁,用于向外部線程返回IO就緒SelectionKey。這個(gè)集合在外部線程中只能做刪除操作不可增加元素,并且不是線程安全的耕漱。

  • Set<SelectionKey> publicKeys相當(dāng)于keys的不可變視圖峡钓,用于向外部線程返回所有注冊(cè)在該Selector上的SelectionKey

這里需要重點(diǎn)關(guān)注抽象類sun.nio.ch.SelectorImpl中的selectedKeyspublicSelectedKeys這兩個(gè)字段,注意它們的類型都是HashSet辈赋,一會(huì)優(yōu)化的就是這里0颖琛Q挥尽;魑场!

  1. 判斷由SelectorProvider創(chuàng)建出來(lái)的Selector是否是JDK NIO原生的Selector實(shí)現(xiàn)献宫。因?yàn)镹etty優(yōu)化針對(duì)的是JDK NIO 原生Selector。判斷標(biāo)準(zhǔn)為sun.nio.ch.SelectorImpl類是否為SelectorProvider創(chuàng)建出Selector的父類。如果不是則直接返回。不在繼續(xù)下面的優(yōu)化過(guò)程顶考。
        //判斷是否可以對(duì)Selector進(jìn)行優(yōu)化浮庐,這里主要針對(duì)JDK NIO原生Selector的實(shí)現(xiàn)類進(jìn)行優(yōu)化,因?yàn)镾electorProvider可以加載的是自定義Selector實(shí)現(xiàn)
        //如果SelectorProvider創(chuàng)建的Selector不是JDK原生sun.nio.ch.SelectorImpl的實(shí)現(xiàn)類,那么無(wú)法進(jìn)行優(yōu)化,直接返回
        if (!(maybeSelectorImplClass instanceof Class) ||
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

通過(guò)前面對(duì)SelectorProvider的介紹我們知道,這里通過(guò)provider.openSelector()創(chuàng)建出來(lái)的Selector實(shí)現(xiàn)類為KQueueSelectorImpl類,它繼承實(shí)現(xiàn)了sun.nio.ch.SelectorImpl盲憎,所以它是JDK NIO 原生的Selector實(shí)現(xiàn)

class KQueueSelectorImpl extends SelectorImpl {

}
  1. 創(chuàng)建SelectedSelectionKeySet通過(guò)反射替換掉sun.nio.ch.SelectorImpl類selectedKeyspublicSelectedKeys的默認(rèn)HashSet實(shí)現(xiàn)串远。

為什么要用SelectedSelectionKeySet替換掉原來(lái)的HashSet呢伸但?却妨?

因?yàn)檫@里涉及到對(duì)HashSet類型sun.nio.ch.SelectorImpl#selectedKeys集合的兩種操作:

  • 插入操作: 通過(guò)前邊對(duì)sun.nio.ch.SelectorImpl類中字段的介紹我們知道,在Selector監(jiān)聽(tīng)到IO就緒SelectionKey后,會(huì)將IO就緒SelectionKey插入sun.nio.ch.SelectorImpl#selectedKeys集合中,這時(shí)Reactor線程會(huì)從java.nio.channels.Selector#select(long)阻塞調(diào)用中返回(類似上篇文章提到的epoll_wait)。

  • 遍歷操作:Reactor線程返回后,會(huì)從Selector中獲取IO就緒SelectionKey集合(也就是sun.nio.ch.SelectorImpl#selectedKeys),Reactor線程遍歷selectedKeys,獲取IO就緒SocketChannel,并處理SocketChannel上的IO事件步藕。

我們都知道HashSet底層數(shù)據(jù)結(jié)構(gòu)是一個(gè)哈希表惦界,由于Hash沖突這種情況的存在,所以導(dǎo)致對(duì)哈希表進(jìn)行插入遍歷操作的性能不如對(duì)數(shù)組進(jìn)行插入遍歷操作的性能好咙冗。

還有一個(gè)重要原因是沾歪,數(shù)組可以利用CPU緩存的優(yōu)勢(shì)來(lái)提高遍歷的效率狂窑。后面筆者會(huì)有一篇專門(mén)的文章來(lái)講述利用CPU緩存行如何為我們帶來(lái)性能優(yōu)勢(shì)烫沙。

所以Netty為了優(yōu)化對(duì)sun.nio.ch.SelectorImpl#selectedKeys集合的插入,遍歷性能妒茬,自己用數(shù)組這種數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)了SelectedSelectionKeySet夹孔,用它來(lái)替換原來(lái)的HashSet實(shí)現(xiàn)佑菩。

SelectedSelectionKeySet

  • 初始化SelectionKey[] keys數(shù)組大小為1024,當(dāng)數(shù)組容量不夠時(shí)问顷,擴(kuò)容為原來(lái)的兩倍大小。

  • 通過(guò)數(shù)組尾部指針size瞎嬉,在向數(shù)組插入元素的時(shí)候可以直接定位到插入位置keys[size++]毁靶。操作一步到位案铺,不用像哈希表那樣還需要解決Hash沖突

  • 對(duì)數(shù)組的遍歷操作也是如絲般順滑拖吼,CPU直接可以在緩存行中遍歷讀取數(shù)組元素?zé)o需訪問(wèn)內(nèi)存。比HashSet的迭代器java.util.HashMap.KeyIterator 遍歷方式性能不知高到哪里去了旱易。

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    //采用數(shù)組替換到JDK中的HashSet,這樣add操作和遍歷操作效率更高沸移,不需要考慮hash沖突
    SelectionKey[] keys;
    //數(shù)組尾部指針
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    /**
     * 數(shù)組的添加效率高于 HashSet 因?yàn)椴恍枰紤]hash沖突
     * */
    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }
        //時(shí)間復(fù)雜度O(1)
        keys[size++] = o;
        if (size == keys.length) {
            //擴(kuò)容為原來(lái)的兩倍大小
            increaseCapacity();
        }

        return true;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }

    /**
     * 采用數(shù)組的遍歷效率 高于 HashSet
     * */
    @Override
    public Iterator<SelectionKey> iterator() {
        return new Iterator<SelectionKey>() {
            private int idx;

            @Override
            public boolean hasNext() {
                return idx < size;
            }

            @Override
            public SelectionKey next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                return keys[idx++];
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}

看到這里不禁感嘆窗市,從各種小的細(xì)節(jié)可以看出Netty對(duì)性能的優(yōu)化簡(jiǎn)直淋漓盡致商玫,對(duì)性能的追求令人發(fā)指寇壳。細(xì)節(jié)真的是魔鬼。

  1. Netty通過(guò)反射的方式用SelectedSelectionKeySet替換掉sun.nio.ch.SelectorImpl#selectedKeyssun.nio.ch.SelectorImpl#publicSelectedKeys這兩個(gè)集合中原來(lái)HashSet的實(shí)現(xiàn)阵难。
  • 反射獲取sun.nio.ch.SelectorImpl類中selectedKeyspublicSelectedKeys
  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
  Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
  • Java9版本以上通過(guò)sun.misc.Unsafe設(shè)置字段值的方式
       if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {

                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        
                    }
  • 通過(guò)反射的方式用SelectedSelectionKeySet替換掉hashSet實(shí)現(xiàn)的sun.nio.ch.SelectorImpl#selectedKeys伙菊,sun.nio.ch.SelectorImpl#publicSelectedKeys产舞。
          Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
          if (cause != null) {
                return cause;
          }
          cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
          if (cause != null) {
                return cause;
          }
          //Java8反射替換字段
          selectedKeysField.set(unwrappedSelector, selectedKeySet);
          publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
  1. 將與sun.nio.ch.SelectorImpl類中selectedKeyspublicSelectedKeys關(guān)聯(lián)好的Netty優(yōu)化實(shí)現(xiàn)SelectedSelectionKeySet,設(shè)置到io.netty.channel.nio.NioEventLoop#selectedKeys字段中保存瓜喇。
   //會(huì)通過(guò)反射替換selector對(duì)象中的selectedKeySet保存就緒的selectKey
    //該字段為持有selector對(duì)象selectedKeys的引用典蝌,當(dāng)IO事件就緒時(shí)坡锡,直接從這里獲取
    private SelectedSelectionKeySet selectedKeys;

后續(xù)Reactor線程就會(huì)直接從io.netty.channel.nio.NioEventLoop#selectedKeys中獲取IO就緒SocketChannel

  1. SelectorTuple封裝unwrappedSelectorwrappedSelector返回給NioEventLoop構(gòu)造函數(shù)冰木。到此Reactor中的Selector就創(chuàng)建完畢了踊沸。
return new SelectorTuple(unwrappedSelector,
                      new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    private static final class SelectorTuple {
        final Selector unwrappedSelector;
        final Selector selector;

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

        SelectorTuple(Selector unwrappedSelector, Selector selector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = selector;
        }
    }
  • 所謂的unwrappedSelector是指被Netty優(yōu)化過(guò)的JDK NIO原生Selector。

  • 所謂的wrappedSelector就是用SelectedSelectionKeySetSelector裝飾類將unwrappedSelector和與sun.nio.ch.SelectorImpl類關(guān)聯(lián)好的Netty優(yōu)化實(shí)現(xiàn)SelectedSelectionKeySet封裝裝飾起來(lái)社证。

wrappedSelector會(huì)將所有對(duì)Selector的操作全部代理給unwrappedSelector逼龟,并在發(fā)起輪詢IO事件的相關(guān)操作中,重置SelectedSelectionKeySet清空上一次的輪詢結(jié)果追葡。

final class SelectedSelectionKeySetSelector extends Selector {
    //Netty優(yōu)化后的 SelectedKey就緒集合
    private final SelectedSelectionKeySet selectionKeys;
    //優(yōu)化后的JDK NIO 原生Selector
    private final Selector delegate;

    SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
        this.delegate = delegate;
        this.selectionKeys = selectionKeys;
    }

    @Override
    public boolean isOpen() {
        return delegate.isOpen();
    }

    @Override
    public SelectorProvider provider() {
        return delegate.provider();
    }

    @Override
    public Set<SelectionKey> keys() {
        return delegate.keys();
    }

    @Override
    public Set<SelectionKey> selectedKeys() {
        return delegate.selectedKeys();
    }

    @Override
    public int selectNow() throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.selectNow();
    }

    @Override
    public int select(long timeout) throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.select(timeout);
    }

    @Override
    public int select() throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.select();
    }

    @Override
    public Selector wakeup() {
        return delegate.wakeup();
    }

    @Override
    public void close() throws IOException {
        delegate.close();
    }
}

到這里Reactor的核心Selector就創(chuàng)建好了腺律,下面我們來(lái)看下用于保存異步任務(wù)的隊(duì)列是如何創(chuàng)建出來(lái)的奕短。

newTaskQueue

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        //通過(guò)用SelectedSelectionKeySet裝飾后的unwrappedSelector
        this.selector = selectorTuple.selector;
        //Netty優(yōu)化過(guò)的JDK NIO遠(yuǎn)程Selector
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

我們繼續(xù)回到創(chuàng)建Reactor的主線上,到目前為止Reactor的核心Selector就創(chuàng)建好了匀钧,前邊我們提到Reactor除了需要監(jiān)聽(tīng)I(yíng)O就緒事件以及處理IO就緒事件外篡诽,還需要執(zhí)行一些異步任務(wù),當(dāng)外部線程向Reactor提交異步任務(wù)后榴捡,Reactor就需要一個(gè)隊(duì)列來(lái)保存這些異步任務(wù)杈女,等待Reactor線程執(zhí)行。

下面我們來(lái)看下Reactor中任務(wù)隊(duì)列的創(chuàng)建過(guò)程:

    //任務(wù)隊(duì)列大小吊圾,默認(rèn)是無(wú)界隊(duì)列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    private static Queue<Runnable> newTaskQueue(
            EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }  
  • NioEventLoop的父類SingleThreadEventLoop中提供了一個(gè)靜態(tài)變量DEFAULT_MAX_PENDING_TASKS用來(lái)指定Reactor任務(wù)隊(duì)列的大小达椰。可以通過(guò)系統(tǒng)變量-D io.netty.eventLoop.maxPendingTasks進(jìn)行設(shè)置项乒,默認(rèn)為Integer.MAX_VALUE啰劲,表示任務(wù)隊(duì)列默認(rèn)為無(wú)界隊(duì)列

  • 根據(jù)DEFAULT_MAX_PENDING_TASKS變量的設(shè)定檀何,來(lái)決定創(chuàng)建無(wú)界任務(wù)隊(duì)列還是有界任務(wù)隊(duì)列蝇裤。

    //創(chuàng)建無(wú)界任務(wù)隊(duì)列
    PlatformDependent.<Runnable>newMpscQueue()
    //創(chuàng)建有界任務(wù)隊(duì)列
    PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)

    public static <T> Queue<T> newMpscQueue() {
        return Mpsc.newMpscQueue();
    }

    public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
        return Mpsc.newMpscQueue(maxCapacity);
    }

Reactor內(nèi)的異步任務(wù)隊(duì)列的類型為MpscQueue,它是由JCTools提供的一個(gè)高性能無(wú)鎖隊(duì)列,從命名前綴Mpsc可以看出频鉴,它適用于多生產(chǎn)者單消費(fèi)者的場(chǎng)景栓辜,它支持多個(gè)生產(chǎn)者線程安全的訪問(wèn)隊(duì)列,同一時(shí)刻只允許一個(gè)消費(fèi)者線程讀取隊(duì)列中的元素垛孔。

我們知道Netty中的Reactor可以線程安全的處理注冊(cè)其上的多個(gè)SocketChannel上的IO數(shù)據(jù)藕甩,保證Reactor線程安全的核心原因正是因?yàn)檫@個(gè)MpscQueue,它可以支持多個(gè)業(yè)務(wù)線程在處理完業(yè)務(wù)邏輯后周荐,線程安全的向MpscQueue添加異步寫(xiě)任務(wù)狭莱,然后由單個(gè)Reactor線程來(lái)執(zhí)行這些寫(xiě)任務(wù)。既然是單線程執(zhí)行概作,那肯定是線程安全的了腋妙。

Reactor對(duì)應(yīng)的NioEventLoop類型繼承結(jié)構(gòu)

image.png

NioEventLoop的繼承結(jié)構(gòu)也是比較復(fù)雜,這里我們只關(guān)注在Reactor創(chuàng)建過(guò)程中涉及的到兩個(gè)父類SingleThreadEventLoop,SingleThreadEventExecutor讯榕。

剩下的繼承體系骤素,我們?cè)诤筮呺S著Netty源碼的深入在慢慢介紹。

前邊我們提到瘩扼,其實(shí)Reactor我們可以看作是一個(gè)單線程的線程池谆甜,只有一個(gè)線程用來(lái)執(zhí)行IO就緒事件的監(jiān)聽(tīng)垃僚,IO事件的處理集绰,異步任務(wù)的執(zhí)行。用MpscQueue來(lái)存儲(chǔ)待執(zhí)行的異步任務(wù)谆棺。

命名前綴為SingleThread的父類都是對(duì)Reactor這些行為的分層定義栽燕。也是本小節(jié)要介紹的對(duì)象

SingleThreadEventLoop

Reactor負(fù)責(zé)執(zhí)行的異步任務(wù)分為三類:

  • 普通任務(wù):這是Netty最主要執(zhí)行的異步任務(wù)罕袋,存放在普通任務(wù)隊(duì)列taskQueue中。在NioEventLoop構(gòu)造函數(shù)中創(chuàng)建碍岔。
  • 定時(shí)任務(wù): 存放在優(yōu)先級(jí)隊(duì)列中浴讯。后續(xù)我們介紹。
  • 尾部任務(wù): 存放于尾部任務(wù)隊(duì)列tailTasks中蔼啦,尾部任務(wù)一般不常用榆纽,在普通任務(wù)執(zhí)行完后 Reactor線程會(huì)執(zhí)行尾部任務(wù)。使用場(chǎng)景:比如對(duì)Netty 的運(yùn)行狀態(tài)做一些統(tǒng)計(jì)數(shù)據(jù)捏肢,例如任務(wù)循環(huán)的耗時(shí)奈籽、占用物理內(nèi)存的大小等等都可以向尾部隊(duì)列添加一個(gè)收尾任務(wù)完成統(tǒng)計(jì)數(shù)據(jù)的實(shí)時(shí)更新。

SingleThreadEventLoop負(fù)責(zé)對(duì)尾部任務(wù)隊(duì)列tailTasks進(jìn)行管理鸵赫。并且提供ChannelReactor注冊(cè)的行為衣屏。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    //任務(wù)隊(duì)列大小,默認(rèn)是無(wú)界隊(duì)列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    
    //尾部任務(wù)隊(duì)列
    private final Queue<Runnable> tailTasks;

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        //尾部隊(duì)列 執(zhí)行一些統(tǒng)計(jì)任務(wù) 不常用
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

    @Override
    public ChannelFuture register(Channel channel) {
        //注冊(cè)channel到綁定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }
}

SingleThreadEventExecutor

SingleThreadEventExecutor主要負(fù)責(zé)對(duì)普通任務(wù)隊(duì)列的管理辩棒,以及異步任務(wù)的執(zhí)行狼忱,Reactor線程的啟停

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
        //parent為Reactor所屬的NioEventLoopGroup Reactor線程組
        super(parent);
        //向Reactor添加任務(wù)時(shí)一睁,是否喚醒Selector停止輪詢IO就緒事件钻弄,馬上執(zhí)行異步任務(wù)
        this.addTaskWakesUp = addTaskWakesUp;
        //Reactor異步任務(wù)隊(duì)列的大小
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        //用于啟動(dòng)Reactor線程的executor -> ThreadPerTaskExecutor
        this.executor = ThreadExecutorMap.apply(executor, this);
        //普通任務(wù)隊(duì)列
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        //任務(wù)隊(duì)列滿時(shí)的拒絕策略
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}

到現(xiàn)在為止,一個(gè)完整的Reactor架構(gòu)就被創(chuàng)建出來(lái)了者吁。

Reactor結(jié)構(gòu).png

3. 創(chuàng)建Channel到Reactor的綁定策略

到這一步斧蜕,Reactor線程組NioEventLoopGroup里邊的所有Reactor就已經(jīng)全部創(chuàng)建完畢。

無(wú)論是Netty服務(wù)端NioServerSocketChannel關(guān)注的OP_ACCEPT事件也好砚偶,還是Netty客戶端NioSocketChannel關(guān)注的OP_READOP_WRITE事件也好批销,都需要先注冊(cè)到Reactor上,Reactor才能監(jiān)聽(tīng)Channel上關(guān)注的IO事件實(shí)現(xiàn)IO多路復(fù)用染坯。

NioEventLoopGroup(Reactor線程組)里邊有眾多的Reactor均芽,那么以上提到的這些Channel究竟應(yīng)該注冊(cè)到哪個(gè)Reactor上呢?這就需要一個(gè)綁定的策略來(lái)平均分配单鹿。

還記得我們前邊介紹MultithreadEventExecutorGroup類的時(shí)候提到的構(gòu)造器參數(shù)EventExecutorChooserFactory嗎掀宋?

這時(shí)候它就派上用場(chǎng)了,它主要用來(lái)創(chuàng)建ChannelReactor的綁定策略仲锄。默認(rèn)為DefaultEventExecutorChooserFactory.INSTANCE劲妙。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
   //從Reactor集合中選擇一個(gè)特定的Reactor的綁定策略 用于channel注冊(cè)綁定到一個(gè)固定的Reactor上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    chooser = chooserFactory.newChooser(children);
}

下面我們來(lái)看下具體的綁定策略:

DefaultEventExecutorChooserFactory

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
    ...................省略.................
}

我們看到在newChooser方法綁定策略有兩個(gè)分支,不同之處在于需要判斷Reactor線程組中的Reactor個(gè)數(shù)是否為2的次冪儒喊。

Netty中的綁定策略就是采用round-robin輪詢的方式來(lái)挨個(gè)選擇Reactor進(jìn)行綁定镣奋。

采用round-robin的方式進(jìn)行負(fù)載均衡,我們一般會(huì)用round % reactor.length取余的方式來(lái)挨個(gè)平均的定位到對(duì)應(yīng)的Reactor上怀愧。

如果Reactor的個(gè)數(shù)reactor.length恰好是2的次冪侨颈,那么就可以用位操作&運(yùn)算round & reactor.length -1來(lái)代替%運(yùn)算round % reactor.length余赢,因?yàn)槲贿\(yùn)算的性能更高。具體為什么&運(yùn)算能夠代替%運(yùn)算哈垢,筆者會(huì)在后面講述時(shí)間輪的時(shí)候?yàn)榇蠹以敿?xì)證明妻柒,這里大家只需記住這個(gè)公式,我們還是聚焦本文的主線耘分。

了解了優(yōu)化原理举塔,我們?cè)诳创a實(shí)現(xiàn)就很容易理解了。

利用%運(yùn)算的方式Math.abs(idx.getAndIncrement() % executors.length)來(lái)進(jìn)行綁定求泰。

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

利用&運(yùn)算的方式idx.getAndIncrement() & executors.length - 1來(lái)進(jìn)行綁定啤贩。

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

又一次被Netty對(duì)性能的極致追求所折服~~~~

4. 向Reactor線程組中所有的Reactor注冊(cè)terminated回調(diào)函數(shù)

當(dāng)Reactor線程組NioEventLoopGroup中所有的Reactor已經(jīng)創(chuàng)建完畢,ChannelReactor的綁定策略也創(chuàng)建完畢后拜秧,我們就來(lái)到了創(chuàng)建NioEventGroup的最后一步痹屹。

俗話說(shuō)的好,有創(chuàng)建就有啟動(dòng)枉氮,有啟動(dòng)就有關(guān)閉志衍,這里會(huì)創(chuàng)建Reactor關(guān)閉的回調(diào)函數(shù)terminationListener,在Reactor關(guān)閉時(shí)回調(diào)聊替。

terminationListener回調(diào)的邏輯很簡(jiǎn)單:

  • 通過(guò)AtomicInteger terminatedChildren變量記錄已經(jīng)關(guān)閉的Reactor個(gè)數(shù)楼肪,用來(lái)判斷NioEventLoopGroup中的Reactor是否已經(jīng)全部關(guān)閉。

  • 如果所有Reactor均已關(guān)閉惹悄,設(shè)置NioEventLoopGroup中的terminationFuturesuccess春叫。表示Reactor線程組關(guān)閉成功。

       //記錄關(guān)閉的Reactor個(gè)數(shù),當(dāng)Reactor全部關(guān)閉后,才可以認(rèn)為關(guān)閉成功
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        //關(guān)閉future
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //當(dāng)所有Reactor關(guān)閉后 才認(rèn)為是關(guān)閉成功
                    terminationFuture.setSuccess(null);
                }
            }
        };
        
        //為所有Reactor添加terminationListener
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

我們?cè)诨氐轿恼麻_(kāi)頭Netty服務(wù)端代碼模板

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //創(chuàng)建主從Reactor線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ...........省略............
    }
}

現(xiàn)在Netty的主從Reactor線程組就已經(jīng)創(chuàng)建完畢斥黑,此時(shí)Netty服務(wù)端的骨架已經(jīng)搭建完畢缎除,骨架如下:

主從Reactor線程組.png

總結(jié)

本文介紹了首先介紹了Netty對(duì)各種IO模型的支持以及如何輕松切換各種IO模型配喳。

還花了大量的篇幅介紹Netty服務(wù)端的核心引擎主從Reactor線程組的創(chuàng)建過(guò)程。在這個(gè)過(guò)程中,我們還提到了Netty對(duì)各種細(xì)節(jié)進(jìn)行的優(yōu)化,展現(xiàn)了Netty對(duì)性能極致的追求晨横。

好了,Netty服務(wù)端的骨架已經(jīng)搭好箫柳,剩下的事情就該綁定端口地址然后接收連接了手形,我們下篇文章再見(jiàn)~~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市悯恍,隨后出現(xiàn)的幾起案子库糠,更是在濱河造成了極大的恐慌,老刑警劉巖坪稽,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件曼玩,死亡現(xiàn)場(chǎng)離奇詭異鳞骤,居然都是意外死亡窒百,警方通過(guò)查閱死者的電腦和手機(jī)黍判,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)篙梢,“玉大人顷帖,你說(shuō)我怎么就攤上這事〔持停” “怎么了贬墩?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)妄呕。 經(jīng)常有香客問(wèn)我陶舞,道長(zhǎng),這世上最難降的妖魔是什么绪励? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任肿孵,我火速辦了婚禮,結(jié)果婚禮上疏魏,老公的妹妹穿的比我還像新娘停做。我一直安慰自己,他們只是感情好大莫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布蛉腌。 她就那樣靜靜地躺著,像睡著了一般只厘。 火紅的嫁衣襯著肌膚如雪烙丛。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,763評(píng)論 1 307
  • 那天羔味,我揣著相機(jī)與錄音蜀变,去河邊找鬼。 笑死介评,一個(gè)胖子當(dāng)著我的面吹牛库北,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播们陆,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼寒瓦,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了坪仇?” 一聲冷哼從身側(cè)響起杂腰,我...
    開(kāi)封第一講書(shū)人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎椅文,沒(méi)想到半個(gè)月后喂很,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體惜颇,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年少辣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了凌摄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡漓帅,死狀恐怖锨亏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情忙干,我是刑警寧澤器予,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站捐迫,受9級(jí)特大地震影響乾翔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜施戴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一反浓、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧暇韧,春花似錦勾习、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至涂乌,卻和暖如春艺栈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背湾盒。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工湿右, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人罚勾。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓毅人,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親尖殃。 傳聞我的和親對(duì)象是個(gè)殘疾皇子丈莺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容