回顧NioEventLoop的run方法流程
上文說到NioEventLoop的run方法可以分為3個步驟:
- 輪詢channel中就緒的IO事件
- 處理輪詢出的IO事件
- 處理所有任務肉渴,也包括定時任務
其中步驟1已在上一節(jié)講述访惜,這里接著講述下面2個步驟
IO事件與非IO任務
首先看一下在步驟2和步驟3的主干代碼
final int ioRatio = this.ioRatio;
// 將所有任務執(zhí)行完
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// 記錄IO事件消耗的時間,然后按比例處理分配時間處理非IO任務
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// ioRatio默認50呐籽,(100-ioRatio)/ioRatio剛好等于1盏浇,做到平均分配
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
ioRadio是NioEventLoop的一個成員變量,用來控制分配花費在IO事件與非IO任務時間的比例。默認情況下芽狗,ioRadio是50绢掰,表示IO事件與非IO任務
將分配相同時間。而當ioRatio為100時童擎,該值失效滴劲,不再平衡兩種動作的時間分配比值。
了解了這一點顾复,上述兩種分支代碼就不難理解了班挖,我們直接進入processSelectedKeys,看看netty如何執(zhí)行IO事件
處理IO事件
先進入processSelectedKeys方法內部芯砸。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
可以看到這里又根據(jù)selectedKeys是否為空這個條件來確定是處理優(yōu)化過的keys還是普通keys萧芙。關于selectedKeys给梅,在NioEventLoop介紹這一節(jié)中,
我們介紹了NioEventLoop的創(chuàng)建双揪,在創(chuàng)建過程中动羽,默認會將SelectedKeys由Hashset替換為數(shù)組實現(xiàn),此處的selectedKeys正是替換過后的實現(xiàn)渔期。
我們繼續(xù)跟進到processSelectedKeysOptimized方法
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
方法內部用一個for循環(huán)處理selectedKeys运吓。key的attchment默認是在注冊時附加上去的NioServerSocketChannel和NioSocketChannel。
繼續(xù)跟進processSelectedKey(k, (AbstractNioChannel) a)方法疯趟。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop = ch.eventLoop();
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
}
netty首先對selectionKey的有效性做了一個判斷拘哨。當key無效時,關閉key所在的channel信峻。當key有效時倦青,委托NioUnsafe對象對key進行IO操作。
注意這里先進行OP_CONNECT,再執(zhí)行OP_WRITE,最后執(zhí)行OP_READ和OP_ACCEPT站欺。關于Unsafe的這些IO操作留待以后分析姨夹。
processSelectedKeysPlain方法流程類似,略過
處理非IO任務
由于IoRatio默認為50矾策,我們先進入runAllTasks(ioTime * (100 - ioRatio) / ioRatio)方法磷账。
protected boolean runAllTasks(long timeoutNanos) {
// 步驟1
fetchFromScheduledTaskQueue();
// 步驟2
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// 步驟3
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 步驟4
safeExecute(task);
runTasks ++;
// 步驟5
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 步驟6
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
非IO任務的執(zhí)行可以分為6個步驟
- 從定時任務隊列聚合任務到普通任務隊列
- 從普通隊列中獲取任務
- 計算任務執(zhí)行的超時時間
- 安全執(zhí)行任務
- 任務執(zhí)行到一定次數(shù),計算是否超時
- 執(zhí)行完taskQueue普通隊列里的任務后贾虽,再去執(zhí)行tailTaskQueue里的任務逃糟。但目前暫時沒有看到tailTaskQueue使用的地方,也許是一個擴展點吧蓬豁,這里先略過绰咽。
我們一個一個步驟講解
聚合定時任務到普通任務隊列
首先看一下整體流程
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) {
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
首先先判斷定時任務隊列是否有任務,然后調用了一個AbstractScheduledEventExecutor.nanoTime(),該方法返回ScheduledFutureTask類從初始化
到當前時刻的差值地粪。也即將ScheduledFutureTask初始化的時刻當成零時刻取募。
獲取到零時刻到當前時刻的差值后,用一個for循環(huán)不斷去定時任務隊列里獲取終止時刻在當前時刻之后的任務(scheduledTask.deadlineNanos() - nanoTime<=0)
當獲取到定時任務后蟆技,將它添加到普通任務隊列taskQueue里玩敏。同時添加失敗后,還會再重新添加回定時任務隊列质礼,防止任務直接丟失旺聚。
說到定時任務隊列,也少不了一探其實現(xiàn)眶蕉。scheduledTaskQueue初始化代碼如下:
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
11);
}
return scheduledTaskQueue;
}
采用的是一個懶加載的方式砰粹,在調用scheduledTaskQueue()創(chuàng)建定時任務時才進行初始化。從名字可以看出造挽,它是一個優(yōu)先級隊列碱璃,初始化容量為11弄痹,
采用的Comparator是調用2個ScheduledFutureTask的compareTo方法,首先比較任務的終止時間厘贼,然后比較兩個任務的id界酒。代碼較簡單浮毯,就不列了降盹。
然后我們看下調度方法schedule
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task.setId(nextTaskId++));
} else {
executeScheduledRunnable(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task.setId(nextTaskId++));
}
}, true, task.deadlineNanos());
}
return task;
}
可以發(fā)現(xiàn),netty將"添加定時任務"也當做一個任務缀台,放入任務隊列里岳掐。
從普通隊列中獲取任務
// NioEventLoop中定義的pollTask方法
protected Runnable pollTask() {
Runnable task = super.pollTask();
if (needsToSelectAgain) {
selectAgain();
}
return task;
}
// super.pollTask調用了此方法凭疮,定義在SingleThreadEventExecutor中
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) {
Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {
return task;
}
}
}
這里依然是通過輪詢從任務隊列里取出任務,并且忽略WAKEUP_TASK這個標記性任務串述。
計算任務執(zhí)行的超時時間
在當前時間上执解,加上IO事件執(zhí)行的時間,作為非IO任務執(zhí)行的超時時間
安全執(zhí)行
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
捕獲所有異常纲酗,使得定時任務報錯時不退出
計算是否超時
由于nanoTime()是一個相對耗時的操作衰腌,netty默認執(zhí)行了64次非IO任務后,才計算是否超時觅赊。若執(zhí)行了超過64個任務沒或者任務隊列已經沒有任務右蕊,
就打斷循環(huán),并將當前時間更新為lastExecutionTime吮螺。
總結
到了這里饶囚,我們已經介紹完了大部分NioEventLoop的內容,限于筆者水平和文章篇幅鸠补,nioEventLoop所使用的任務隊列MpscQueue和ScheduleFutureTask
內部執(zhí)行原理不再進一步深究萝风。但這也已經足夠對NioEventLoop塑造一個比較整體性的認識了。