4.Netty執(zhí)行IO事件和非IO任務

回顧NioEventLoop的run方法流程

上文說到NioEventLoop的run方法可以分為3個步驟:

  1. 輪詢channel中就緒的IO事件
  2. 處理輪詢出的IO事件
  3. 處理所有任務肉渴,也包括定時任務

其中步驟1已在上一節(jié)講述访惜,這里接著講述下面2個步驟

IO事件與非IO任務

首先看一下在步驟2和步驟3的主干代碼

final int ioRatio = this.ioRatio;
// 將所有任務執(zhí)行完
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        runAllTasks();
    }
} else {
    // 記錄IO事件消耗的時間,然后按比例處理分配時間處理非IO任務
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        // Ensure we always run tasks.
        final long ioTime = System.nanoTime() - ioStartTime;
        // ioRatio默認50呐籽,(100-ioRatio)/ioRatio剛好等于1盏浇,做到平均分配
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

ioRadio是NioEventLoop的一個成員變量,用來控制分配花費在IO事件與非IO任務時間的比例。默認情況下芽狗,ioRadio是50绢掰,表示IO事件與非IO任務
將分配相同時間。而當ioRatio為100時童擎,該值失效滴劲,不再平衡兩種動作的時間分配比值。
了解了這一點顾复,上述兩種分支代碼就不難理解了班挖,我們直接進入processSelectedKeys,看看netty如何執(zhí)行IO事件

處理IO事件

先進入processSelectedKeys方法內部芯砸。

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

可以看到這里又根據(jù)selectedKeys是否為空這個條件來確定是處理優(yōu)化過的keys還是普通keys萧芙。關于selectedKeys给梅,在NioEventLoop介紹這一節(jié)中,
我們介紹了NioEventLoop的創(chuàng)建双揪,在創(chuàng)建過程中动羽,默認會將SelectedKeys由Hashset替換為數(shù)組實現(xiàn),此處的selectedKeys正是替換過后的實現(xiàn)渔期。
我們繼續(xù)跟進到processSelectedKeysOptimized方法

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}

方法內部用一個for循環(huán)處理selectedKeys运吓。key的attchment默認是在注冊時附加上去的NioServerSocketChannel和NioSocketChannel。
繼續(xù)跟進processSelectedKey(k, (AbstractNioChannel) a)方法疯趟。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        final EventLoop eventLoop = ch.eventLoop();  
        if (eventLoop != this || eventLoop == null) {
            return;
        }
        unsafe.close(unsafe.voidPromise());
        return;
    }

    int readyOps = k.readyOps();
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        unsafe.finishConnect();
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

netty首先對selectionKey的有效性做了一個判斷拘哨。當key無效時,關閉key所在的channel信峻。當key有效時倦青,委托NioUnsafe對象對key進行IO操作。
注意這里先進行OP_CONNECT,再執(zhí)行OP_WRITE,最后執(zhí)行OP_READ和OP_ACCEPT站欺。關于Unsafe的這些IO操作留待以后分析姨夹。

processSelectedKeysPlain方法流程類似,略過

處理非IO任務

由于IoRatio默認為50矾策,我們先進入runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法磷账。

protected boolean runAllTasks(long timeoutNanos) {
    // 步驟1
    fetchFromScheduledTaskQueue();
    // 步驟2
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    // 步驟3
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        // 步驟4
        safeExecute(task);
        runTasks ++;
        // 步驟5
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // 步驟6
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

非IO任務的執(zhí)行可以分為6個步驟

  1. 從定時任務隊列聚合任務到普通任務隊列
  2. 從普通隊列中獲取任務
  3. 計算任務執(zhí)行的超時時間
  4. 安全執(zhí)行任務
  5. 任務執(zhí)行到一定次數(shù),計算是否超時
  6. 執(zhí)行完taskQueue普通隊列里的任務后贾虽,再去執(zhí)行tailTaskQueue里的任務逃糟。但目前暫時沒有看到tailTaskQueue使用的地方,也許是一個擴展點吧蓬豁,這里先略過绰咽。

我們一個一個步驟講解

聚合定時任務到普通任務隊列

首先看一下整體流程

private boolean fetchFromScheduledTaskQueue() {
    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
        return true;
    }
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    for (;;) {
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        if (scheduledTask == null) {
            return true;
        }
        if (!taskQueue.offer(scheduledTask)) {
            scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
    }
}

首先先判斷定時任務隊列是否有任務,然后調用了一個AbstractScheduledEventExecutor.nanoTime(),該方法返回ScheduledFutureTask類從初始化
到當前時刻的差值地粪。也即將ScheduledFutureTask初始化的時刻當成零時刻取募。
獲取到零時刻到當前時刻的差值后,用一個for循環(huán)不斷去定時任務隊列里獲取終止時刻在當前時刻之后的任務(scheduledTask.deadlineNanos() - nanoTime<=0)
當獲取到定時任務后蟆技,將它添加到普通任務隊列taskQueue里玩敏。同時添加失敗后,還會再重新添加回定時任務隊列质礼,防止任務直接丟失旺聚。

說到定時任務隊列,也少不了一探其實現(xiàn)眶蕉。scheduledTaskQueue初始化代碼如下:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<>(
                SCHEDULED_FUTURE_TASK_COMPARATOR,
                11);
    }
    return scheduledTaskQueue;
}

采用的是一個懶加載的方式砰粹,在調用scheduledTaskQueue()創(chuàng)建定時任務時才進行初始化。從名字可以看出造挽,它是一個優(yōu)先級隊列碱璃,初始化容量為11弄痹,
采用的Comparator是調用2個ScheduledFutureTask的compareTo方法,首先比較任務的終止時間厘贼,然后比較兩個任務的id界酒。代碼較簡單浮毯,就不列了降盹。

然后我們看下調度方法schedule

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if (inEventLoop()) {
        scheduledTaskQueue().add(task.setId(nextTaskId++));
    } else {
        executeScheduledRunnable(new Runnable() {
            @Override
            public void run() {
                scheduledTaskQueue().add(task.setId(nextTaskId++));
            }
        }, true, task.deadlineNanos());
    }
    return task;
}

可以發(fā)現(xiàn),netty將"添加定時任務"也當做一個任務缀台,放入任務隊列里岳掐。

從普通隊列中獲取任務

// NioEventLoop中定義的pollTask方法
protected Runnable pollTask() {
    Runnable task = super.pollTask();
    if (needsToSelectAgain) {
        selectAgain();
    }
    return task;
}
// super.pollTask調用了此方法凭疮,定義在SingleThreadEventExecutor中
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
        for (;;) {
            Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) {
                return task;
            }
        }
    }

這里依然是通過輪詢從任務隊列里取出任務,并且忽略WAKEUP_TASK這個標記性任務串述。

計算任務執(zhí)行的超時時間

在當前時間上执解,加上IO事件執(zhí)行的時間,作為非IO任務執(zhí)行的超時時間

安全執(zhí)行

protected static void safeExecute(Runnable task) {
    try {
        task.run();
    } catch (Throwable t) {
        logger.warn("A task raised an exception. Task: {}", task, t);
    }
}

捕獲所有異常纲酗,使得定時任務報錯時不退出

計算是否超時

由于nanoTime()是一個相對耗時的操作衰腌,netty默認執(zhí)行了64次非IO任務后,才計算是否超時觅赊。若執(zhí)行了超過64個任務沒或者任務隊列已經沒有任務右蕊,
就打斷循環(huán),并將當前時間更新為lastExecutionTime吮螺。

總結

到了這里饶囚,我們已經介紹完了大部分NioEventLoop的內容,限于筆者水平和文章篇幅鸠补,nioEventLoop所使用的任務隊列MpscQueue和ScheduleFutureTask
內部執(zhí)行原理不再進一步深究萝风。但這也已經足夠對NioEventLoop塑造一個比較整體性的認識了。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末紫岩,一起剝皮案震驚了整個濱河市规惰,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌泉蝌,老刑警劉巖歇万,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異梨与,居然都是意外死亡堕花,警方通過查閱死者的電腦和手機文狱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門粥鞋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瞄崇,你說我怎么就攤上這事呻粹『韭” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵等浊,是天一觀的道長腮郊。 經常有香客問我,道長筹燕,這世上最難降的妖魔是什么轧飞? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮撒踪,結果婚禮上过咬,老公的妹妹穿的比我還像新娘。我一直安慰自己制妄,他們只是感情好掸绞,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著耕捞,像睡著了一般衔掸。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上俺抽,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天敞映,我揣著相機與錄音,去河邊找鬼凌埂。 笑死驱显,一個胖子當著我的面吹牛,可吹牛的內容都是我干的瞳抓。 我是一名探鬼主播埃疫,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼孩哑!你這毒婦竟也來了栓霜?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤横蜒,失蹤者是張志新(化名)和其女友劉穎胳蛮,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體丛晌,經...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡仅炊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了澎蛛。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片抚垄。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內的尸體忽然破棺而出呆馁,到底是詐尸還是另有隱情桐经,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布浙滤,位于F島的核電站阴挣,受9級特大地震影響,放射性物質發(fā)生泄漏纺腊。R本人自食惡果不足惜畔咧,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望揖膜。 院中可真熱鬧盒卸,春花似錦、人聲如沸次氨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽煮寡。三九已至虹蓄,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間幸撕,已是汗流浹背薇组。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留坐儿,地道東北人律胀。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像貌矿,于是被迫代替她去往敵國和親炭菌。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內容