Netty中NioEventLoop源碼分析

版本

本次源碼分析基于Netty的版本為4.1

源碼分析

NioEventLoop可以視為java中的一個線程,只不過NioEventLoop處理的事件烁兰,以及內(nèi)部的處理邏輯會有所不同。先看一下類的繼承關系:


可以看到NioEventLoop實現(xiàn)了很多接口徊都,特別是EventLoop和ScheduledExecutorService沪斟,所以NioEventLoop不僅能實現(xiàn)普通的task,還能實現(xiàn)定時task暇矫。

Selector

Netty的實現(xiàn)是基于Java原生的NIO的主之,其對原生的NIO做了很多優(yōu)化决瞳,避免了某些bug错忱,也提升了很多性能浪慌。但是底層對于網(wǎng)絡IO事件的監(jiān)聽和處理也是離不開多路復用器Selector的遂唧。在NioEventLoop的構(gòu)造方法中進行了Selector的初始化:

final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;

關鍵還是openSelector()方法泻仙,這里我刪除了一些分支代碼愈涩,剩下的做了注釋笔刹,其中常量 DISABLE_KEY_SET_OPTIMIZATION 的定義如下尖坤,可以手工配置是否開啟優(yōu)化囱持,默認是開啟優(yōu)化的夯接,具體優(yōu)化做了什么事可以查看下面的openSelector()分析盔几。

private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

netty在創(chuàng)建selector的時候就嘗試了優(yōu)化上鞠,具體優(yōu)化其實是將底層的數(shù)據(jù)結(jié)構(gòu)從HashSet改為了數(shù)組,可以從SelectedSelectionKeySet和SelectorImpl的源碼看到這一點,這里就不列了。

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
        // 根據(jù)底層的IO模型來創(chuàng)建一個selector驼唱,這里的selector就是java中NIO的selector
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }
    // 如果未開啟優(yōu)化則直接就返回了,SelectorTuple可以視為一個持有selector引用的句柄
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 通過反射創(chuàng)建一個selector的具體實例
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    // netty自己包裝的一個selectKey的集合類
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                    // java 9 以上版本會用Unsafe類直接底層替換SelectionKeySet
                }
                // 利用反射將原生selector中的兩個屬性替換為netty自己的包裝類
                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            }
        }
    });

    selectedKeys = selectedKeySet;
    return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

run()方法

前面也說了,NioEventLoop其實其實可以類比java中的線程,是一個任務執(zhí)行單元钥星,所以run()方法是其中的關鍵,接下來就來分析一下run()方法乖篷,源碼如下秽誊。

@Override
protected void run() {
    for (;;) {
        try {
            try {
                // 如果任務隊列非空,那么執(zhí)行selectNowSupplier代表的方法最易,也就是selectNow(),否則返回SelectStrategy.SELECT
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                // NioEventLoop不支持嬉荆,用于EpollEventLoop,理論上不會走到這里
                case SelectStrategy.BUSY_WAIT:
                // 任務隊列為空的時候限番,會執(zhí)行本邏輯
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    // 源碼中有注釋為什么這里要調(diào)用如下邏輯别威,感興趣可以查看源碼粥庄,因為原因描述太長惜互,這里就省略了
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // 出現(xiàn)IOException則新建selector,將原有的所有channel重新注冊到新的selector,然后關閉老的selector
                rebuildSelector0();
                // 異常處理
                handleLoopException(e);
                continue;
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            // 控制IO處理時間的一個變量呼股,默認是50(代表50%)
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    // 處理IO事件
                    processSelectedKeys();
                } finally {
                    // 運行非IO任務,就算ioRatio設置了100,非IO任務還是要執(zhí)行的
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    // 處理IO事件
                    processSelectedKeys();
                } finally {
                    // 根據(jù)設置的時間占比運行非IO任務
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

整個run()方法被包裹在一個for循環(huán)中妙啃,唯一能夠結(jié)束循環(huán)的條件是狀態(tài)state為SHUTDOWN或者TERMINATED,NioEventLoop繼承了SingleThreadEventExecutor俊戳,isShuttingDown()和confirmShutdown()都是SingleThreadEventExecutor中的方法揖赴。

可以看到,除去異常處理和一些分支流程抑胎,整個run()方法不是特別負責燥滑,重點在與select()和selectNow()方法,run()方法流程如下圖所示:


接下來看一下兩個關鍵方法select()和selectNow()

  1. selectNow()

selectNow會立即出發(fā)selector的選擇操作阿逃,如果有準備就緒的channel铭拧,就會返回相應的int值(代表了不同的selectKey的集合),否則返回0恃锉。之后如果發(fā)現(xiàn)用戶手動調(diào)用了selector的wakeup()方法搀菩,會執(zhí)行selector.wakeup()操作。

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}
  1. select()

同樣去掉了一些無關主流程的代碼破托,netty在select()方法中的處理邏輯跟java線程池有相似的地方肪跋,沒有任務的時候都是阻塞的,阻塞時間以最近的任務距離當前時間為準土砂,如果一旦有就緒的channel州既,則立即進行退出循環(huán)進行處理。這里netty還解決了epoll的空輪詢bug萝映,如果觸發(fā)了空輪詢判斷會重建selector吴叶。

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        // 計算定時任務隊列中最早的任務距離現(xiàn)在的時間,沒有任務默認1秒
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

        for (;;) {
            // 如果最早的任務開始時間距離當前時間不足0.5毫秒或者已超時序臂,執(zhí)行selectNow()方法
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }

            // 隊列中有任務蚌卤,并且selector從false設置為true成功則執(zhí)行selectNow()方法
            // 源碼描述了原因,簡單來說,往NioEventLoop中提交任務的時候如果selector未wakeup會調(diào)用selector.wakeup()
            // 但如果提交task的時候selector已經(jīng)wakeup造寝,則不會調(diào)用
            // 任務可能被掛起知道selector超時,所以這里做了檢測
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }

            // 為select方法設置超時吭练,防止定時任務餓死
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;

            // 退出循環(huán)的條件
            // 1. 存在就緒的channel
            // 2. 老的wakeup狀態(tài)是true
            // 3. 進入select方法后用戶調(diào)用了wakeup()方法
            // 4. 有新的定時任務需要處理
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            if (Thread.interrupted()) {
                // 線程中斷處理
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }

            long time = System.nanoTime();
            // 這里有一個處理epoll空輪詢bug的邏輯
            // 超過了timeoutMillis時間不認為是空輪詢
            // 當select輪詢超過設定的次數(shù)上限時視為觸發(fā)空輪詢bug诫龙,重建selector
            if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                // timeoutMillis elapsed without anything selected.
                selectCnt = 1;
            } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The code exists in an extra method to ensure the method is not too big to inline as this
                // branch is not very likely to get hit very frequently.
                selector = selectRebuildSelector(selectCnt);
                selectCnt = 1;
                break;
            }

            currentTimeNanos = time;
        }
    } catch (CancelledKeyException e) {
        // 略
    }

現(xiàn)在真的有channel就緒了,NioEventLoop會怎么處理呢鲫咽?回到run()方法签赃,有一段根據(jù)設定的時間比例處理IO事件和用戶任務的邏輯,分別對應兩個方法processSelectedKeys和runAllTasks

  1. processSelectedKeys()

從源碼可以看到分尸,processSelectedKeysOptimized和processSelectedKeysPlain的大部分處理邏輯是相同的锦聊,區(qū)別就在于對selectedKey的迭代邏輯,記得一開始說過如果開啟了優(yōu)化箩绍,netty對selectedKey的底層集合進行了優(yōu)化孔庭,將HashSet改為了數(shù)組,HashSet底層用HashMap實現(xiàn)材蛛,迭代的效率是沒有數(shù)組高的圆到。

private void processSelectedKeys() {
    // 看文章最開頭是否啟用優(yōu)化的設置,如果啟用了會走這里
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // 方便GC回收
        selectedKeys.keys[i] = null;

        final Object a = k.attachment();

        // 根據(jù)類型不同執(zhí)行不同的處理邏輯
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 一般不會走這個分支卑吭,除非用戶主動注冊NioTask到selector芽淡,netty單元測試里有案例
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        // 如果為true,則重置之后的所有selectKey豆赏,并調(diào)用selectNow()方法
        // 因為run()方法執(zhí)行本方法前已經(jīng)置為false挣菲,所以不會進這里
        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

// 處理邏輯基本與processSelectedKeysOptimized相同
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

既然內(nèi)部邏輯類似,重點看一下processSelectedKeysOptimized()方法掷邦,NioTask分支一般不會走白胀,感興趣可以看一下netty的單元測試。重點看一下AbstractNioChannel分支耙饰,如果attachment是AbstractNioChannel類型纹笼,說明它是NioServerSocketChannel或者NioSocketChannel,需要進行IO讀寫相關的操作苟跪。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    // SelectionKey無效的處理
    if (!k.isValid()) {
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();
        } catch (Throwable ignored) {
            // channel沒有關聯(lián)的eventLoop直接返回
            return;
        }
        // channel關聯(lián)的eventLoop不是本eventLoop廷痘,直接返回,不應關閉channel
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        // 關閉channel
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // 對于NioSocketChannel件已,連接需要先finishConnect才能繼續(xù)讀寫
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // 下面3行的操作只是將OP_CONNECT從感興趣選項中移除笋额,防止重復觸發(fā)
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // 說明有半包消息未發(fā)送完成,調(diào)用flush發(fā)送即可
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // unsafe是多態(tài)篷扩,對于NioServerSocketChannel兄猩,read就是接受客戶端TCP連接
        // 對于NioSocketChannel,就是從channel中讀取ByteBuffer
        // 同時檢測readyOps == 0 是解決JDK的循環(huán)bug
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
  1. runAllTasks()
    這個是執(zhí)行用戶任務也就是非IO處理的方法,分為不指定時間和指定時間的兩個重載方法枢冤。最大的不同就是帶時間的方法是有執(zhí)行時間限制的鸠姨,防止用戶任務長時間阻塞IO事件。
protected boolean runAllTasks() {
    assert inEventLoop();
    boolean fetchedAll;
    boolean ranAtLeastOne = false;

    do {
        // 取一定時間段內(nèi)的定時任務到普通任務隊列里
        fetchedAll = fetchFromScheduledTaskQueue();
        // 運行任務隊列里的任務
        if (runAllTasksFrom(taskQueue)) {
            ranAtLeastOne = true;
        }
    } while (!fetchedAll); // 取完所有定時任務為止

    if (ranAtLeastOne) {
        // 記錄上次執(zhí)行完任務的時間
        lastExecutionTime = ScheduledFutureTask.nanoTime();
    }
    afterRunningAllTasks();
    return ranAtLeastOne;
}

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    // 這是用戶任務指定的截止時間
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        safeExecute(task);

        runTasks ++;

        // nanoTime()是耗時的操作淹真,所以這里每執(zhí)行64個任務才檢測一次是否超過時間
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        // 執(zhí)行任務
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

總結(jié)

從NioEventLoop的源碼可以看到讶迁,netty在很多地方做了優(yōu)化,還避免了很多JDK自帶的bug

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末核蘸,一起剝皮案震驚了整個濱河市巍糯,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌客扎,老刑警劉巖祟峦,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異徙鱼,居然都是意外死亡宅楞,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門疆偿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來咱筛,“玉大人,你說我怎么就攤上這事杆故⊙嘎幔” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵处铛,是天一觀的道長饲趋。 經(jīng)常有香客問我,道長撤蟆,這世上最難降的妖魔是什么奕塑? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮家肯,結(jié)果婚禮上龄砰,老公的妹妹穿的比我還像新娘。我一直安慰自己讨衣,他們只是感情好换棚,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著反镇,像睡著了一般固蚤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上歹茶,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天夕玩,我揣著相機與錄音你弦,去河邊找鬼。 笑死燎孟,一個胖子當著我的面吹牛禽作,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播揩页,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼领迈,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了碍沐?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤衷蜓,失蹤者是張志新(化名)和其女友劉穎累提,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體磁浇,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡斋陪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了置吓。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片无虚。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖衍锚,靈堂內(nèi)的尸體忽然破棺而出友题,到底是詐尸還是另有隱情,我是刑警寧澤戴质,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布度宦,位于F島的核電站,受9級特大地震影響告匠,放射性物質(zhì)發(fā)生泄漏戈抄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一后专、第九天 我趴在偏房一處隱蔽的房頂上張望划鸽。 院中可真熱鬧,春花似錦戚哎、人聲如沸裸诽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽崭捍。三九已至,卻和暖如春啰脚,著一層夾襖步出監(jiān)牢的瞬間殷蛇,已是汗流浹背实夹。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留粒梦,地道東北人亮航。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像匀们,于是被迫代替她去往敵國和親缴淋。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容