Netty理論三:Netty線程模型

1燎字、Reactor模式:NIO網(wǎng)絡框架的典型模式

Reactor是網(wǎng)絡編程中的一種設計模式牺弄,reactor會解耦并發(fā)請求的服務并分發(fā)給對應的事件處理器來處理。目前广凸,許多流行的開源框架都用到了reactor模式典鸡,如:netty被廓、node.js、Cindy等萝玷,包括java的nio。

何為Reactor線程模型昆婿?

Reactor模式是事件驅動的球碉,有一個或多個并發(fā)輸入源,有一個Service Handler仓蛆,有多個Request Handlers睁冬;這個Service Handler會同步的將輸入的請求(Event)多路復用的分發(fā)給相應的Request Handler

image.png

Reactor模式的三種形式
1、Reactor 單線程模式:

image.png

這種實現(xiàn)方式看疙,和第一章java NIO中單線程NIO實現(xiàn)是一樣的豆拨,一個Reactor處理所有的事情。

image.png

2能庆、Reactor 多線程模式:
編解碼及業(yè)務處理使用線程池施禾,這樣的話,可以避免IO阻塞(IO阻塞的代價是非常大的)搁胆。

image.png
image.png

3弥搞、Reactors 主從模式:
把Reactor分為兩個,一個負責接收渠旁,一個負責讀寫攀例,業(yè)務處理可以用線程池,在服務端啟動時配置(也可以選擇不用線程池顾腊,這個看具體業(yè)務需求)

image.png
image.png

2粤铭、Netty中如何使用Reactor模式

  • 單線程Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup )
  • 多線程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
//Handler使用線程池進行處理
  • 主從Reactors 模式(官方推薦)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )

EventLoopGroup初始化是創(chuàng)建創(chuàng)建兩個NioEventLoopGroup類型的Reactor線程池bossGroup和workGroup分別用來處理客戶端的連接請求(bossGroup)和通道IO事件(workerGroup);
注:new NioEventLoopGroup()默認創(chuàng)建cpu核數(shù)*2的線程數(shù)

主從模式的好處有:
1杂靶、業(yè)務解耦:一個reactor用來處理客戶端連接梆惯,一個reactor用來處理業(yè)務
2、安全性:業(yè)務解耦以后伪煤,就可以在bossGroup中做一些SSL校驗加袋、ip黑名單、登錄之類的安全性校驗
3抱既、性能提升:只有通過安全性校驗的客戶端才能繼續(xù)進行業(yè)務處理职烧,這樣也能提升處理性能,否則大量的無效客戶端接入和正常的業(yè)務處理混雜在一起,影響業(yè)務處理性能蚀之。
使用demo:

serverBootstrap.group(boss,worker).handler(new RuleBasedIpFilter()).childHandler(new NettyServerInitializer(this.factoryCode));

3蝗敢、Netty EventLoop源碼解析

1、NioEventLoopGroup整體結構

image.png

EventExecutorGroup視圖

image.png

new NioEventLoopGroup源碼

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //EventExecutorGroup里面有一個EventExecutor數(shù)組足删,保存了多個EventExecutor;
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //初始化EventExecutor數(shù)組寿谴,數(shù)組是NioEventLoop,見下面
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        //EventExecutorChooser.next()定義選擇EventExecutor的策略失受;
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

NioEventLoopGroup.class

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

NioEventLoop.class

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        //創(chuàng)建selector
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
  • EventExecutorGroup里面有一個EventExecutor數(shù)組讶泰,保存了多個EventExecutor(NIOEventLoop);
  • EventExecutorGroup是不干什么事情的,當收到一個請后拂到,他就調(diào)用next()獲得一個它里面的EventExecutor痪署,再調(diào)用這個executor的方法;
  • EventExecutorChooserFactory.EventExecutorChooser.next()定義選擇EventExecutor的策略(有兩種兄旬,都是輪詢)狼犯;

2、NioEventLoopGroup創(chuàng)建分析

bossGroup

image.png

注:從圖中可以看出领铐,一個NioEventLoopGroup中包含多個NioEventLoop悯森,一個NioEventLoop中包含一個Selector,Selector監(jiān)聽NioServerSocketChannel绪撵,當NioServerSocketChannel上有客戶端channel連接后瓢姻,觸發(fā)Acceptor事件,在ServerBootstrapAcceptor handler中轉發(fā)給workGroup

workerGroup

image.png

當客戶端channel初次連接時莲兢,將其注冊到workGroup中的NioEventLoop上(通過EventExecuorChooser.next()獲取workGroup中的一個NioEventLoop)汹来,然后NioEventLoop中的Selector不斷輪詢其所管理的NioSocketChannel,如果其中有讀寫事件準備好改艇,則由DefaultChannelPipeline處理收班。

3、ServerBootstrap啟動流程分析

image.png

4谒兄、ServerBootstrap執(zhí)行流程分析

image.png
        // 配置服務端的NIO線程組
        // 主線程組, 用于接受客戶端的連接摔桦,但是不做任何具體業(yè)務處理,像老板一樣承疲,
        //負責接待客戶邻耕,不具體服務客戶
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 工作線程組, 老板線程組會把任務丟給他,讓手下線程組去做任務燕鸽,服務客戶
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             //欲加到NioServerSocketChannel Pipeline的handler
             .handler(new LoggingHandler(LogLevel.INFO))
              //欲加到NioSocketChannel(accept()返回的)Pipeline的handler
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                     ch.pipeline().addLast("decoder", new StringDecoder());
                     ch.pipeline().addLast("encoder", new StringEncoder());
                     ch.pipeline().addLast(new EchoServerHandler());
 
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 綁定端口兄世,開始接收進來的連接
            ChannelFuture f = b.bind(port).sync(); // (7)
image.png
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市啊研,隨后出現(xiàn)的幾起案子御滩,更是在濱河造成了極大的恐慌鸥拧,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件削解,死亡現(xiàn)場離奇詭異富弦,居然都是意外死亡,警方通過查閱死者的電腦和手機氛驮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門腕柜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人矫废,你說我怎么就攤上這事盏缤。” “怎么了磷脯?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵蛾找,是天一觀的道長。 經(jīng)常有香客問我赵誓,道長,這世上最難降的妖魔是什么柿赊? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮碰声,結果婚禮上诡蜓,老公的妹妹穿的比我還像新娘。我一直安慰自己胰挑,他們只是感情好蔓罚,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著瞻颂,像睡著了一般豺谈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贡这,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天茬末,我揣著相機與錄音,去河邊找鬼盖矫。 笑死丽惭,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的辈双。 我是一名探鬼主播责掏,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼湃望!你這毒婦竟也來了换衬?” 一聲冷哼從身側響起痰驱,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎冗疮,沒想到半個月后萄唇,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡术幔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年另萤,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诅挑。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡四敞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拔妥,到底是詐尸還是另有隱情忿危,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布没龙,位于F島的核電站铺厨,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏硬纤。R本人自食惡果不足惜解滓,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望筝家。 院中可真熱鬧洼裤,春花似錦、人聲如沸溪王。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽莹菱。三九已至移国,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間芒珠,已是汗流浹背桥狡。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留皱卓,地道東北人裹芝。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像娜汁,于是被迫代替她去往敵國和親嫂易。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容