版本
本次源碼分析基于Netty的版本為4.1
源碼分析
NioEventLoop可以視為java中的一個線程,只不過NioEventLoop處理的事件烁兰,以及內(nèi)部的處理邏輯會有所不同。先看一下類的繼承關系:
可以看到NioEventLoop實現(xiàn)了很多接口徊都,特別是EventLoop和ScheduledExecutorService沪斟,所以NioEventLoop不僅能實現(xiàn)普通的task,還能實現(xiàn)定時task暇矫。
Selector
Netty的實現(xiàn)是基于Java原生的NIO的主之,其對原生的NIO做了很多優(yōu)化决瞳,避免了某些bug错忱,也提升了很多性能浪慌。但是底層對于網(wǎng)絡IO事件的監(jiān)聽和處理也是離不開多路復用器Selector的遂唧。在NioEventLoop的構(gòu)造方法中進行了Selector的初始化:
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
關鍵還是openSelector()方法泻仙,這里我刪除了一些分支代碼愈涩,剩下的做了注釋笔刹,其中常量 DISABLE_KEY_SET_OPTIMIZATION 的定義如下尖坤,可以手工配置是否開啟優(yōu)化囱持,默認是開啟優(yōu)化的夯接,具體優(yōu)化做了什么事可以查看下面的openSelector()分析盔几。
private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
netty在創(chuàng)建selector的時候就嘗試了優(yōu)化上鞠,具體優(yōu)化其實是將底層的數(shù)據(jù)結(jié)構(gòu)從HashSet改為了數(shù)組,可以從SelectedSelectionKeySet和SelectorImpl的源碼看到這一點,這里就不列了。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 根據(jù)底層的IO模型來創(chuàng)建一個selector驼唱,這里的selector就是java中NIO的selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果未開啟優(yōu)化則直接就返回了,SelectorTuple可以視為一個持有selector引用的句柄
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 通過反射創(chuàng)建一個selector的具體實例
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
// netty自己包裝的一個selectKey的集合類
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// java 9 以上版本會用Unsafe類直接底層替換SelectionKeySet
}
// 利用反射將原生selector中的兩個屬性替換為netty自己的包裝類
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
selectedKeys = selectedKeySet;
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
run()方法
前面也說了,NioEventLoop其實其實可以類比java中的線程,是一個任務執(zhí)行單元钥星,所以run()方法是其中的關鍵,接下來就來分析一下run()方法乖篷,源碼如下秽誊。
@Override
protected void run() {
for (;;) {
try {
try {
// 如果任務隊列非空,那么執(zhí)行selectNowSupplier代表的方法最易,也就是selectNow(),否則返回SelectStrategy.SELECT
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
// NioEventLoop不支持嬉荆,用于EpollEventLoop,理論上不會走到這里
case SelectStrategy.BUSY_WAIT:
// 任務隊列為空的時候限番,會執(zhí)行本邏輯
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 源碼中有注釋為什么這里要調(diào)用如下邏輯别威,感興趣可以查看源碼粥庄,因為原因描述太長惜互,這里就省略了
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// 出現(xiàn)IOException則新建selector,將原有的所有channel重新注冊到新的selector,然后關閉老的selector
rebuildSelector0();
// 異常處理
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// 控制IO處理時間的一個變量呼股,默認是50(代表50%)
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 處理IO事件
processSelectedKeys();
} finally {
// 運行非IO任務,就算ioRatio設置了100,非IO任務還是要執(zhí)行的
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 處理IO事件
processSelectedKeys();
} finally {
// 根據(jù)設置的時間占比運行非IO任務
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
整個run()方法被包裹在一個for循環(huán)中妙啃,唯一能夠結(jié)束循環(huán)的條件是狀態(tài)state為SHUTDOWN或者TERMINATED,NioEventLoop繼承了SingleThreadEventExecutor俊戳,isShuttingDown()和confirmShutdown()都是SingleThreadEventExecutor中的方法揖赴。
可以看到,除去異常處理和一些分支流程抑胎,整個run()方法不是特別負責燥滑,重點在與select()和selectNow()方法,run()方法流程如下圖所示:
接下來看一下兩個關鍵方法select()和selectNow()
- selectNow()
selectNow會立即出發(fā)selector的選擇操作阿逃,如果有準備就緒的channel铭拧,就會返回相應的int值(代表了不同的selectKey的集合),否則返回0恃锉。之后如果發(fā)現(xiàn)用戶手動調(diào)用了selector的wakeup()方法搀菩,會執(zhí)行selector.wakeup()操作。
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
- select()
同樣去掉了一些無關主流程的代碼破托,netty在select()方法中的處理邏輯跟java線程池有相似的地方肪跋,沒有任務的時候都是阻塞的,阻塞時間以最近的任務距離當前時間為準土砂,如果一旦有就緒的channel州既,則立即進行退出循環(huán)進行處理。這里netty還解決了epoll的空輪詢bug萝映,如果觸發(fā)了空輪詢判斷會重建selector吴叶。
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 計算定時任務隊列中最早的任務距離現(xiàn)在的時間,沒有任務默認1秒
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 如果最早的任務開始時間距離當前時間不足0.5毫秒或者已超時序臂,執(zhí)行selectNow()方法
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 隊列中有任務蚌卤,并且selector從false設置為true成功則執(zhí)行selectNow()方法
// 源碼描述了原因,簡單來說,往NioEventLoop中提交任務的時候如果selector未wakeup會調(diào)用selector.wakeup()
// 但如果提交task的時候selector已經(jīng)wakeup造寝,則不會調(diào)用
// 任務可能被掛起知道selector超時,所以這里做了檢測
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 為select方法設置超時吭练,防止定時任務餓死
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 退出循環(huán)的條件
// 1. 存在就緒的channel
// 2. 老的wakeup狀態(tài)是true
// 3. 進入select方法后用戶調(diào)用了wakeup()方法
// 4. 有新的定時任務需要處理
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
// 線程中斷處理
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
// 這里有一個處理epoll空輪詢bug的邏輯
// 超過了timeoutMillis時間不認為是空輪詢
// 當select輪詢超過設定的次數(shù)上限時視為觸發(fā)空輪詢bug诫龙,重建selector
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
// 略
}
現(xiàn)在真的有channel就緒了,NioEventLoop會怎么處理呢鲫咽?回到run()方法签赃,有一段根據(jù)設定的時間比例處理IO事件和用戶任務的邏輯,分別對應兩個方法processSelectedKeys和runAllTasks
- processSelectedKeys()
從源碼可以看到分尸,processSelectedKeysOptimized和processSelectedKeysPlain的大部分處理邏輯是相同的锦聊,區(qū)別就在于對selectedKey的迭代邏輯,記得一開始說過如果開啟了優(yōu)化箩绍,netty對selectedKey的底層集合進行了優(yōu)化孔庭,將HashSet改為了數(shù)組,HashSet底層用HashMap實現(xiàn)材蛛,迭代的效率是沒有數(shù)組高的圆到。
private void processSelectedKeys() {
// 看文章最開頭是否啟用優(yōu)化的設置,如果啟用了會走這里
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// 方便GC回收
selectedKeys.keys[i] = null;
final Object a = k.attachment();
// 根據(jù)類型不同執(zhí)行不同的處理邏輯
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// 一般不會走這個分支卑吭,除非用戶主動注冊NioTask到selector芽淡,netty單元測試里有案例
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 如果為true,則重置之后的所有selectKey豆赏,并調(diào)用selectNow()方法
// 因為run()方法執(zhí)行本方法前已經(jīng)置為false挣菲,所以不會進這里
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
// 處理邏輯基本與processSelectedKeysOptimized相同
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
既然內(nèi)部邏輯類似,重點看一下processSelectedKeysOptimized()方法掷邦,NioTask分支一般不會走白胀,感興趣可以看一下netty的單元測試。重點看一下AbstractNioChannel分支耙饰,如果attachment是AbstractNioChannel類型纹笼,說明它是NioServerSocketChannel或者NioSocketChannel,需要進行IO讀寫相關的操作苟跪。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// SelectionKey無效的處理
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// channel沒有關聯(lián)的eventLoop直接返回
return;
}
// channel關聯(lián)的eventLoop不是本eventLoop廷痘,直接返回,不應關閉channel
if (eventLoop != this || eventLoop == null) {
return;
}
// 關閉channel
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 對于NioSocketChannel件已,連接需要先finishConnect才能繼續(xù)讀寫
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 下面3行的操作只是將OP_CONNECT從感興趣選項中移除笋额,防止重復觸發(fā)
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 說明有半包消息未發(fā)送完成,調(diào)用flush發(fā)送即可
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// unsafe是多態(tài)篷扩,對于NioServerSocketChannel兄猩,read就是接受客戶端TCP連接
// 對于NioSocketChannel,就是從channel中讀取ByteBuffer
// 同時檢測readyOps == 0 是解決JDK的循環(huán)bug
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- runAllTasks()
這個是執(zhí)行用戶任務也就是非IO處理的方法,分為不指定時間和指定時間的兩個重載方法枢冤。最大的不同就是帶時間的方法是有執(zhí)行時間限制的鸠姨,防止用戶任務長時間阻塞IO事件。
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 取一定時間段內(nèi)的定時任務到普通任務隊列里
fetchedAll = fetchFromScheduledTaskQueue();
// 運行任務隊列里的任務
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // 取完所有定時任務為止
if (ranAtLeastOne) {
// 記錄上次執(zhí)行完任務的時間
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// 這是用戶任務指定的截止時間
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// nanoTime()是耗時的操作淹真,所以這里每執(zhí)行64個任務才檢測一次是否超過時間
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// 執(zhí)行任務
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
總結(jié)
從NioEventLoop的源碼可以看到讶迁,netty在很多地方做了優(yōu)化,還避免了很多JDK自帶的bug