Netty NioEventLoop源碼解讀

Netty NioEventLoop

Reactor 模型

Netty實(shí)現(xiàn)并擴(kuò)展了Reactor模型,為了更好的了解EventLoop屑彻,我們有必要先看一下Reactor模型的定義徊哑。

Reactor.png

在wiki對reactor pattern的定義中硼啤,指出了一下集中角色:

  • Resource:資源指的是提供系統(tǒng)輸入或者消費(fèi)系統(tǒng)輸出的資源艰猬。在Netty中它指的是SocketChannel横堡,它們應(yīng)支持select。
  • Demultiplexer:事件分離器負(fù)責(zé)對資源進(jìn)行輪尋等待冠桃,當(dāng)資源ready的時(shí)候命贴,分離器負(fù)責(zé)將數(shù)據(jù)發(fā)送給Dispatcher。
  • Dispatcher:處理Handler的注冊和反注冊食听。當(dāng)資源到達(dá)時(shí)負(fù)責(zé)把資源分發(fā)到相應(yīng)的Handler中套么。
  • Handler:負(fù)責(zé)處理數(shù)據(jù)。

在Netty中EventLoop兼負(fù)了Demultiplexer以及Dispatcher兩個(gè)角色碳蛋。下邊我們通過來看NioEventLoop的源碼學(xué)習(xí)學(xué)習(xí)并了解Netty中的EventLoop。

EventLoop源碼

NioEventLoop的核心方法是run()方法省咨,一旦Netty程序啟動(dòng)之后肃弟,這個(gè)就一直循環(huán)跑下去,不間斷的查詢IO和處理task。

protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
        }
        ///
}

關(guān)于selectionStrategy

首先我們來看第一個(gè)邏輯Select Strategy笤受。這段邏輯主要控制這次循環(huán)是執(zhí)行:跳過穷缤;select操作;還是fall through箩兽。判斷依據(jù)是這樣的:

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

如果當(dāng)前EventLoop中有未處理的task津肛,則執(zhí)行selectorNowSupplier。selectorNowSupplier調(diào)用了selectNow汗贫。selectNow調(diào)用的是Selector的selectNow這個(gè)非阻塞方法身坐。執(zhí)行完selectNow則跳出switch運(yùn)行下邊的processSelectedKeys邏輯。

為了高效的利用CPU落包,EventLoop中只要有未消費(fèi)的task則優(yōu)先消費(fèi)task部蛇。

Nio中Selector.select()是阻塞的,直到某個(gè)selection key可用select方法才會(huì)返回咐蝇。Selector.selectNow()則檢查自從上次select到現(xiàn)在有沒有可用的selection key涯鲁,然后立即返回。

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

select操作

select操作主要是檢查當(dāng)前的selection key有序,看哪些已a(bǔ)vailable抹腿。

上邊我們說到了Selector.select操作是阻塞的,那么如果我不想等了旭寿,可以中斷它嗎警绩?可以,Selector.wakeup可以喚醒正在阻塞的select()操作许师。但是如果當(dāng)前沒有select操作房蝉,執(zhí)行了wakeUp操作,那么下次執(zhí)行的select()或者selectNow()操作將被立即喚醒微渠。

但是Selector.wakeup是開銷比較大的操作,不能每次都直接調(diào)用wakeup搭幻,于是NioEventLoop中聲明了wakenUp(AtomicBoolean)字段,用于控制selector.wakeup()的調(diào)用逞盆。調(diào)用wakeup之前先wakenUp.compareAndSet(false, true)檀蹋,如果set成功才執(zhí)行Selector.wakeup()操作。

當(dāng)用戶提交新的任務(wù)時(shí)executor.execute(...)云芦,會(huì)觸發(fā)wakeup操作俯逾。

select(wakenUp.getAndSet(false));

if (wakenUp.get()) {
    selector.wakeup();
}

這段代碼有一段非常長的注釋,解釋了為什么這段邏輯這樣實(shí)現(xiàn)舅逸。并且給出了什么情況下會(huì)產(chǎn)生競態(tài)條件:

wakenUp.set(false)
selector.select(...)

wakenUp.set(false)執(zhí)行后桌肴,用戶出發(fā)了wakeup操作,然后執(zhí)行select操作琉历,這時(shí)select將立即返回坠七。直到下次循環(huán)把wakenUp重置為false水醋,期間所有的wakenUp.compareAndSet(false, true)都是執(zhí)行失敗的,因?yàn)楝F(xiàn)在wakenUp的值是true彪置。所以接下來的select()都不能被wakeup拄踪。

select 內(nèi)部邏輯

接下來我們看select是如何實(shí)現(xiàn)的:

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) { // 1
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                if (hasTasks() && wakenUp.compareAndSet(false, true)) { // 2
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // 3
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    // 重建Selector,舊的Selector中的Selection Key要拷貝到新的Selector中
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }
        ///    
    }

selectCnt標(biāo)記select執(zhí)行的次數(shù)拳魁,用于檢測NIO的epoll bug惶桐。在這個(gè)方法尾部有一個(gè)判斷:

 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {}

判斷select記次是否超過了伐值,如果是的話有可能觸發(fā)了Nio epoll bug潘懊,執(zhí)行重建selector的邏輯:新建一個(gè)Selector姚糊,把原來老的selection key都復(fù)制過去卦尊。重建完成之后再執(zhí)行一次selectNow叛拷。

因?yàn)閟elect操作是阻塞的岂却,如果長時(shí)間沒有IO可用忿薇,就會(huì)造成NioEventLoop中的task積壓躏哩。因此每次執(zhí)行select操作都設(shè)定一個(gè)超時(shí):
1.查詢定時(shí)任務(wù)重最近要被執(zhí)行的task還有多長時(shí)間執(zhí)行.
2.這個(gè)時(shí)間加上0.5ms就是最大超時(shí)時(shí)間署浩。

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

整體來看一下這個(gè)for循環(huán):

  • 第1個(gè)if:如果timeoutMillis小于0,則立即執(zhí)行一次異步的selectNow扫尺,跳出循環(huán)消費(fèi)task。
  • 第2個(gè)if:如果當(dāng)前taskQueue中有task正驻,并且沒有被wakeup,則執(zhí)行一次異步的selectNow姑曙,跳出循環(huán)消費(fèi)task襟交。
  • 接下來執(zhí)行select,并記次伤靠。
  • 第3個(gè)if:如果有available keys 或者 被用戶喚醒 或者 任務(wù)隊(duì)列定時(shí)隊(duì)列有任務(wù)則中斷捣域。
  • 最后就是重建selector的過程宴合。

processSelectedKeys

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        runAllTasks();
    }
} else {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        final long ioTime = System.nanoTime() - ioStartTime;
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

NioEventLoop.run方法的后半段邏輯主要是processSelectedKeys(處理IO)和runTasks(消費(fèi)任務(wù))。這里有一個(gè)參數(shù)用于控制處理這兩種任務(wù)的時(shí)間配比:ioRatio卦洽。

先來看一下processSelectedKeys,它的邏輯由processSelectedKeysOptimized和processSelectedKeysPlain實(shí)現(xiàn)蜗字,調(diào)用那個(gè)函數(shù)取決于你是否開啟了DISABLE_KEYSET_OPTIMIZATION打肝。如果開啟了Selection 優(yōu)化選項(xiàng)挪捕,則在創(chuàng)建Selector的時(shí)候以反射的方式把SelectedSelectionKeySet selectedKeys設(shè)置到selector中争便。具體實(shí)現(xiàn)在openSelector中级零,代碼就不貼出來了滞乙。SelectedSelectionKeySet內(nèi)部是基于Array實(shí)現(xiàn)的,而Selector內(nèi)部selectedKeys是Set類型的斩启,遍歷效率Array效率更好一下。

我們來分析processSelectedKeysPlain方法:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        // 處理channel中的數(shù)據(jù)
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

SelectionKey上邊可以掛載Attachment发绢,一般情況下新的鏈接對象Channel會(huì)掛到attachment上。我們在遍歷selectedKeys時(shí)边酒,首先取出selection key上的attachment,key的類型可能是AbstractNioChannel和NioTask墩朦。根據(jù)不同的類型調(diào)用不同的處理函數(shù)翻擒。我們著重看處理channel的邏輯:

1.如果selection key是:SelectionKey.OP_CONNECT氓涣,那表明這是一個(gè)鏈接操作陋气。對于鏈接操作,我們需要把這個(gè)selection key從intrestOps中清除掉恩伺,否則下次select操作會(huì)直接返回。接下來調(diào)用finishConnect方法晶渠。

2.如果selection key是:SelectionKey.OP_WRITE。則執(zhí)行flush操作褒脯,把數(shù)據(jù)刷到客戶端。

3.如果是read操作則調(diào)用unsafe.read()番川。這個(gè)操作就不展開了脊框,等到接下來的文章践啄,專門分析read操作浇雹。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

整體來看NioEventLoop的實(shí)現(xiàn)也不復(fù)雜,主要就干了兩件事情:select IO以及消費(fèi)task屿讽。因?yàn)閟elect操作是阻塞的(盡管設(shè)置了超時(shí)時(shí)間)昭灵,每次執(zhí)行select時(shí)都會(huì)檢查是否有新的task,有則優(yōu)先執(zhí)行task伐谈。這么做也是做大限度的提高EventLoop的吞吐量,減少阻塞時(shí)間抠蚣。

除了這兩件事兒呢,NioEventLoop還解決了JDK中注明的EPoll bug嘶窄。到此NioEventLoop源碼分析完結(jié)奇昙。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市储耐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌什湘,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件得哆,死亡現(xiàn)場離奇詭異,居然都是意外死亡贩据,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進(jìn)店門饱亮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舍沙,“玉大人,你說我怎么就攤上這事拂铡〈腥蓿” “怎么了斗锭?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長拒迅。 經(jīng)常有香客問我,道長璧微,這世上最難降的妖魔是什么案狠? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任余黎,我火速辦了婚禮塘雳,結(jié)果婚禮上骤星,老公的妹妹穿的比我還像新娘跃巡。我一直安慰自己,他們只是感情好素邪,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著偷线,像睡著了一般。 火紅的嫁衣襯著肌膚如雪声邦。 梳的紋絲不亂的頭發(fā)上摆舟,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機(jī)與錄音恨诱,去河邊找鬼。 笑死胡野,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的硫豆。 我是一名探鬼主播笼呆,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼旨别,長吁一口氣:“原來是場噩夢啊……” “哼诗赌!你這毒婦竟也來了秸弛?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤叼屠,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后镜雨,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡荚坞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年菲盾,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懒鉴。...
    茶點(diǎn)故事閱讀 38,137評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖咆畏,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情旧找,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布钮蛛,位于F島的核電站剖膳,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏吱晒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望偷遗。 院中可真熱鬧,春花似錦氏豌、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纪铺。三九已至碟渺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間止状,已是汗流浹背攒霹。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留催束,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓抠刺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親速妖。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容