Netty源碼分析之NioEventLoop

上一章節(jié)中败晴,我們分析了Netty服務(wù)的啟動過程滩租,本章節(jié)分析Netty的NioEventLoop是如工作的彪笼。

NioEventLoop中維護(hù)了一個線程睬愤,線程啟動時會調(diào)用NioEventLoop的run方法句伶,執(zhí)行I/O任務(wù)和非I/O任務(wù):

I/O任務(wù)
即selectionKey中ready的事件劲蜻,如accept、connect考余、read先嬉、write等,由processSelectedKeys方法觸發(fā)楚堤。

非IO任務(wù)
添加到taskQueue中的任務(wù)疫蔓,如register0、bind0等任務(wù)身冬,由runAllTasks方法觸發(fā)衅胀。

兩種任務(wù)的執(zhí)行時間比由變量ioRatio控制,默認(rèn)為50吏恭,則表示允許非IO任務(wù)執(zhí)行的時間與IO任務(wù)的執(zhí)行時間相等拗小。

NioEventLoop.run 方法實現(xiàn)

protected void run() {
    for (;;) {
        boolean oldWakenUp = wakenUp.getAndSet(false);
        try {
            if (hasTasks()) {
                selectNow();
            } else {
                select(oldWakenUp);
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                processSelectedKeys();
                runAllTasks();
            } else {
                final long ioStartTime = System.nanoTime();

                processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;
                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    break;
                }
            }
        } catch (Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);

            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    }
}

hasTasks()方法判斷當(dāng)前taskQueue是否有元素。
1樱哼、 如果taskQueue中有元素哀九,執(zhí)行 selectNow() 方法剿配,最終執(zhí)行selector.selectNow(),該方法會立即返回阅束。

void selectNow() throws IOException {
    try {
        selector.selectNow();
    } finally {
        // restore wakup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

2呼胚、 如果taskQueue沒有元素,執(zhí)行 select(oldWakenUp) 方法息裸,代碼如下:

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                // - Selected something,
                // - waken up by user, or
                // - the task queue has a pending task.
                // - a scheduled task is ready for processing
                break;
            }
            if (Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                        selectCnt);

                rebuildSelector();
                selector = this.selector;

                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
            if (logger.isDebugEnabled()) {
                logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
            }
        }
    } catch (CancelledKeyException e) {
        if (logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
        }
        // Harmless exception - log anyway
    }
}

這個方法解決了Nio中臭名昭著的bug:selector的select方法導(dǎo)致cpu100%蝇更。
1、delayNanos(currentTimeNanos):計算延遲任務(wù)隊列中第一個任務(wù)的到期執(zhí)行時間(即最晚還能延遲多長時間執(zhí)行)呼盆,默認(rèn)返回1s年扩。每個SingleThreadEventExecutor都持有一個延遲執(zhí)行任務(wù)的優(yōu)先隊列PriorityQueue,啟動線程時访圃,往隊列中加入一個任務(wù)厨幻。

protected long delayNanos(long currentTimeNanos) {  
    ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();  
    if (delayedTask == null) {  
        return SCHEDULE_PURGE_INTERVAL;  
    }  
    return delayedTask.delayNanos(currentTimeNanos);  
}  
  
//ScheduledFutureTask  
public long delayNanos(long currentTimeNanos) {  
    return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));  
}  
public long deadlineNanos() {  
    return deadlineNanos;  
}  

2、如果延遲任務(wù)隊列中第一個任務(wù)的最晚還能延遲執(zhí)行的時間小于500000納秒腿时,且selectCnt == 0(selectCnt 用來記錄selector.select方法的執(zhí)行次數(shù)和標(biāo)識是否執(zhí)行過selector.selectNow())况脆,則執(zhí)行selector.selectNow()方法并立即返回。
3批糟、否則執(zhí)行selector.select(timeoutMillis)格了,這個方法已經(jīng)在深入淺出NIO Socket分析過。
4徽鼎、如果已經(jīng)存在ready的selectionKey盛末,或者selector被喚醒,或者taskQueue不為空纬傲,或則scheduledTaskQueue不為空满败,則退出循環(huán)。
5叹括、如果 selectCnt 沒達(dá)到閾值SELECTOR_AUTO_REBUILD_THRESHOLD(默認(rèn)512)算墨,則繼續(xù)進(jìn)行for循環(huán)。其中 currentTimeNanos 在select操作之后會重新賦值當(dāng)前時間净嘀,如果selector.select(timeoutMillis)行為真的阻塞了timeoutMillis,第二次的timeoutMillis肯定等于0侠讯,此時selectCnt 為1挖藏,所以會直接退出for循環(huán)膜眠。
6架谎、如果觸發(fā)了epool cpu100%的bug,會發(fā)生什么会涎?
selector.select(timeoutMillis)操作會立即返回末秃,不會阻塞timeoutMillis,導(dǎo)致 currentTimeNanos 幾乎不變零截,這種情況下,會反復(fù)執(zhí)行selector.select(timeoutMillis)弧哎,變量selectCnt 會逐漸變大蠢终,當(dāng)selectCnt 達(dá)到閾值程奠,則執(zhí)行rebuildSelector方法瞄沙,進(jìn)行selector重建泛粹,解決cpu占用100%的bug伪货。

public void rebuildSelector() {  
        if (!inEventLoop()) {  
            execute(new Runnable() {  
                @Override  
                public void run() {  
                    rebuildSelector();  
                }  
            });  
            return;  
        }  
        final Selector oldSelector = selector;  
        final Selector newSelector;  
        if (oldSelector == null) {  
            return;  
        }  
        try {  
            newSelector = openSelector();  
        } catch (Exception e) {  
            logger.warn("Failed to create a new Selector.", e);  
            return;  
        }  
        // Register all channels to the new Selector.  
        int nChannels = 0;  
        for (;;) {  
            try {  
                for (SelectionKey key: oldSelector.keys()) {  
                    Object a = key.attachment();  
                    try {  
                        if (key.channel().keyFor(newSelector) != null) {  
                            continue;  
                        }  
                        int interestOps = key.interestOps();  
                        key.cancel();  
                        key.channel().register(newSelector, interestOps, a);  
                        nChannels ++;  
                    } catch (Exception e) {  
                        logger.warn("Failed to re-register a Channel to the new Selector.", e);  
                        if (a instanceof AbstractNioChannel) {  
                            AbstractNioChannel ch = (AbstractNioChannel) a;  
                            ch.unsafe().close(ch.unsafe().voidPromise());  
                        } else {  
                            @SuppressWarnings("unchecked")  
                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;  
                            invokeChannelUnregistered(task, key, e);  
                        }  
                    }  
                }  
            } catch (ConcurrentModificationException e) {  
                // Probably due to concurrent modification of the key set.  
                continue;  
            }  
  
            break;  
        }    
        selector = newSelector;  
        try {  
            // time to close the old selector as everything else is registered to the new one  
            oldSelector.close();  
        } catch (Throwable t) {  
            if (logger.isWarnEnabled()) {  
                logger.warn("Failed to close the old Selector.", t);  
            }  
        }    
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");  
    }  

rebuildSelector過程:
1愚臀、通過方法openSelector創(chuàng)建一個新的selector馋袜。
2、將old selector的selectionKey執(zhí)行cancel矾缓。
3蜕依、將old selector的channel重新注冊到新的selector中。

對selector進(jìn)行rebuild后咐吼,需要重新執(zhí)行方法selectNow厢塘,檢查是否有已ready的selectionKey抓半。

方法selectNow()或select(oldWakenUp)返回后糕簿,執(zhí)行方法processSelectedKeys和runAllTasks病附。
1贮竟、processSelectedKeys 用來處理有事件發(fā)生的selectkey昔逗,這里對優(yōu)化過的方法processSelectedKeysOptimized進(jìn)行分析:

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    for (int i = 0;; i ++) {
        final SelectionKey k = selectedKeys[i];
        if (k == null) {
            break;
        }
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys[i] = null;

        final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            for (;;) {
                i++;
                if (selectedKeys[i] == null) {
                    break;
                }
                selectedKeys[i] = null;
            }

            selectAgain();
            // Need to flip the optimized selectedKeys to get the right reference to the array
            // and reset the index to -1 which will then set to 0 on the for loop
            // to start over again.
            //
            // See https://github.com/netty/netty/issues/1523
            selectedKeys = this.selectedKeys.flip();
            i = -1;
        }
    }
}

在優(yōu)化過的方法中误堡,有事件發(fā)生的selectkey存放在數(shù)組selectedKeys中傻谁,通過遍歷selectedKeys瘩蚪,處理每一個selectkey闻察,具體處理過程拱礁,會在后續(xù)進(jìn)行分析琢锋。

2、runAllTasks 處理非I/O任務(wù)呢灶。
如果 ioRatio 不為100時吴超,方法runAllTasks的執(zhí)行時間只能為ioTime * (100 - ioRatio) / ioRatio,其中ioTime 是方法processSelectedKeys的執(zhí)行時間鸯乃。

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }
        runTasks ++;
        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

方法fetchFromScheduledTaskQueue把scheduledTaskQueue中已經(jīng)超過延遲執(zhí)行時間的任務(wù)移到taskQueue中等待被執(zhí)行鲸阻。

private void fetchFromScheduledTaskQueue() {
    if (hasScheduledTasks()) {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {
                break;
            }
            taskQueue.add(scheduledTask);
        }
    }
}

依次從taskQueue任務(wù)task執(zhí)行,每執(zhí)行64個任務(wù)缨睡,進(jìn)行耗時檢查鸟悴,如果已執(zhí)行時間超過預(yù)先設(shè)定的執(zhí)行時間,則停止執(zhí)行非IO任務(wù)奖年,避免非IO任務(wù)太多细诸,影響IO任務(wù)的執(zhí)行。

END陋守。
我是占小狼震贵。
在魔都艱苦奮斗,白天是上班族水评,晚上是知識服務(wù)工作者猩系。
如果讀完覺得有收獲的話,記得關(guān)注和點贊哦中燥。
非要打賞的話寇甸,我也是不會拒絕的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末疗涉,一起剝皮案震驚了整個濱河市拿霉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌咱扣,老刑警劉巖友浸,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異偏窝,居然都是意外死亡,警方通過查閱死者的電腦和手機武学,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門祭往,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人火窒,你說我怎么就攤上這事硼补。” “怎么了熏矿?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵已骇,是天一觀的道長离钝。 經(jīng)常有香客問我,道長褪储,這世上最難降的妖魔是什么卵渴? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮鲤竹,結(jié)果婚禮上浪读,老公的妹妹穿的比我還像新娘。我一直安慰自己辛藻,他們只是感情好碘橘,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著吱肌,像睡著了一般痘拆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上氮墨,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天纺蛆,我揣著相機與錄音,去河邊找鬼勇边。 笑死犹撒,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的粒褒。 我是一名探鬼主播识颊,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼奕坟!你這毒婦竟也來了祥款?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤月杉,失蹤者是張志新(化名)和其女友劉穎刃跛,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體苛萎,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡桨昙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了腌歉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛙酪。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖翘盖,靈堂內(nèi)的尸體忽然破棺而出桂塞,到底是詐尸還是另有隱情,我是刑警寧澤馍驯,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布阁危,位于F島的核電站玛痊,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏狂打。R本人自食惡果不足惜擂煞,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望菱父。 院中可真熱鬧颈娜,春花似錦、人聲如沸浙宜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粟瞬。三九已至同仆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間裙品,已是汗流浹背俗批。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留市怎,地道東北人岁忘。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像区匠,于是被迫代替她去往敵國和親干像。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容