前言
本文將會具體分析NioEventLoop中的thread,它的啟動時機,以及所履行的職責(zé)。還會分析一些netty的實現(xiàn)細(xì)節(jié)旗芬,比如解決NIO的bug和一些優(yōu)化等。
thread啟動
之前說到NioEventLoop是由一個thread處理I/O事件和提交的任務(wù)捆蜀。先看一下這個thread啟動的流程。
execute 簡化流程
private void execute(Runnable task, boolean immediate) {
//是當(dāng)前線程調(diào)用,直接加入隊列
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//啟動線程
startThread();
// ......
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
可以看出啟動thread是一個延遲加載的過程辆它,在執(zhí)行第一個任務(wù)的時候才會啟動thread誊薄。跟進(jìn)去看startThread()
private void startThread() {
//判斷線程狀態(tài)是否已啟動
if (state == ST_NOT_STARTED) {
//CAS設(shè)置線程狀態(tài)為已啟動
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//真正去啟動線程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
doStartThread
private void doStartThread() {
assert thread == null;
//調(diào)用傳入?yún)?shù)的executor的execute方法,
//executor會新建一個線程去執(zhí)行任務(wù)
executor.execute(new Runnable() {
@Override
public void run() {
//將執(zhí)行該任務(wù)的線程賦值給thread
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//執(zhí)行任務(wù)
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ......
}
}
前文分析了executor為ThreadPerTaskExecutor锰茉,執(zhí)行execute方法時候為新建一個線程去執(zhí)行任務(wù)呢蔫,NioEventLoop的thread就是在此時賦值。
thread的啟動流程簡化為飒筑,首先thread啟動是一個懶加載的過程片吊,在第一次執(zhí)行任務(wù)才會啟動。在啟動的過程中协屡,會有一個CAS的狀態(tài)判斷當(dāng)前線程是否已經(jīng)被啟動俏脊,如果thread沒有啟動,則通過傳入的executor對象去創(chuàng)建thread對象肤晓,并執(zhí)行SingleThreadEventExecutor.this.run()這個方法爷贫。
下面分析SingleThreadEventExecutor.this.run()這個方法,
/**
* Run the tasks in the {@link #taskQueue}
*/
protected abstract void run();
可以看見是一個抽象方法补憾,然后找到文本分析的NioEventLoop對于run的實現(xiàn)漫萄,這里做一個將代碼做一個簡化,只有主要流程
protected void run() {
int selectCnt = 0;
for (; ; ) {
//1盈匾、檢測IO事件
select();
try {
//2腾务、處理準(zhǔn)備就緒的IO事件
processSelectedKeys();
} finally {
// 3、執(zhí)行隊列里的任務(wù)
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
NioEventLoop的職責(zé)只有三個削饵,1岩瘦、檢測IO事件 ;2葵孤、處理準(zhǔn)備就緒的IO事件担钮;3、執(zhí)行隊列里的任務(wù)尤仍,用一個死循環(huán)去不斷執(zhí)行這三件事情箫津。如之前畫的圖所示:
接下來就著重分析這三個步驟。
select
select步驟的核心是調(diào)用通過NIO中的selector的select()方法宰啦,返回selector上所監(jiān)聽到IO事件苏遥。
case SelectStrategy.SELECT:
// 獲取當(dāng)前任務(wù)隊列的延遲時間
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
//當(dāng)前任務(wù)隊列為空,監(jiān)聽IO事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
select方法
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
流程整體比較簡單赡模,如果時間參數(shù)deadlineNanos為NONE田炭,就調(diào)用selector.select()方法,這個方法會一直阻塞直到有IO事件返回漓柑。否則再判斷deadlineNanos是否小于等于0教硫,如果是調(diào)用selectNow()會立即返回當(dāng)前selector上準(zhǔn)備就緒的IO事件叨吮,否則調(diào)用selector.select(timeoutMillis)方法,會在指定時間內(nèi)返回瞬矩,不管是否有IO事件發(fā)生茶鉴。然后跟select()方法,找到實現(xiàn)類io.netty.channel.nio.SelectedSelectionKeySetSelector景用,
public int select() throws IOException {
selectionKeys.reset();
return delegate.select();
}
一共有兩步操作涵叮,第一步是將之前的selectionKeys清空,檢測到就緒的IO事件都會放入selectionKeys中伞插,這里表示新的一輪IO循環(huán)開始割粮,所以要將之前的清空(selectionKeys后續(xù)會在詳細(xì)介紹)。第二步是調(diào)用NIO中的Selector對象的select()媚污,將最后底層的IO實現(xiàn)委托給它舀瓢。
processSelectedKeys
processSelectedKeys這一步將會處理監(jiān)測到的IO事件,比如連接杠步、讀寫的IO操作氢伟。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
這里有個細(xì)節(jié),處理優(yōu)化過后的selectedKeys還是處理原生的selectedKeys幽歼。所謂優(yōu)化的selectedKeys就是將原生的selectedKeys的HashSet替換成數(shù)組實現(xiàn)朵锣,提高空間利用率和遍歷的效率,待會兒會詳細(xì)將到是怎么替換的selectedKeys甸私。
然后跟進(jìn)去看processSelectedKeysOptimized()的具體實現(xiàn):
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
整體流程就是在遍歷selectedKeys诚些,將綁定在SelectionKey上的Channel取下來,然后做對應(yīng)的IO操作皇型,最后再判斷是否需要重置selectedKeys诬烹。下面我會逐步分析里面的細(xì)節(jié)
第一: selectedKeys.keys[i] = null;
將SelectionKey取出之后把數(shù)組這個位置的地方置為null。為什么這么做弃鸦?https://github.com/netty/netty/issues/2363描述的很清楚绞吁,簡單來說就是我們并不會去清空selectedKeys數(shù)組,這就會導(dǎo)致在Channel關(guān)閉之后唬格,依然會保持SelectionKey的強引用家破。
如上圖所示,假如數(shù)組原有長度為2购岗,一次高峰期的IO事件導(dǎo)致數(shù)組擴容到8汰聋,之后新的IO事件的數(shù)量又達(dá)不到之前數(shù)組的位置,為導(dǎo)致上圖坐標(biāo)[6]喊积、[7]位置會長時間持有已經(jīng)關(guān)閉的Channel的引用烹困,所以這里將其置為null,有助于GC乾吻。
第二: processSelectedKey
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
首先是將SelectionKey綁定的屬性取下來髓梅,判斷是否是AbstractNioChannel的類型拟蜻。這里可以追蹤一下netty是什么時候?qū)bstractNioChannel設(shè)置進(jìn)去的。在AbstractNioChannel的doRegister方法
//最后一個參數(shù)就是att
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
其Channel注冊到底層jdk的組件中女淑,然后將AbstractNioChannel作為參數(shù)傳遞進(jìn)去瞭郑,后續(xù)輪詢出IO事件之后辜御,再將AbstractNioChannel取出做后續(xù)操作鸭你。
具體處理IO事件
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
這里貼一點核心流程,主要是判斷當(dāng)前Channel的操作類型擒权,是連接還是讀袱巨、寫
int readyOps = k.readyOps();
//連接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//寫事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//讀事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
這里面的內(nèi)部流程就不具體分析了,大致分為兩個部分bossGroup監(jiān)聽的連接事件碳抄,將接受到的Channel轉(zhuǎn)交給workGroup愉老,然后workGroup處理讀寫事件,然后將事件通過ChannelPipeline將事件傳播出去剖效。具體細(xì)節(jié)可以看AbstractNioMessageChannel和AbstractNioByteChannel的read()方法嫉入,后續(xù)可能會具體分析這里的代碼。
第三: needsToSelectAgain
最后一個步驟璧尸,重新設(shè)置selectedKeys
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
什么時候需要重新select?找到needsToSelectAgain被設(shè)置為true的地方咒林,只有唯一的一處cancel
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
然后看cancel被調(diào)用的地方doDeregister
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
由上面的兩部分代碼分析可以知道,channel的關(guān)閉是通過移除在selector上的注冊實現(xiàn)的爷光,同時會把cancelledKeys加一 垫竞。當(dāng)達(dá)到了閾值CLEANUP_INTERVAL(默認(rèn)256)后將cancelledKeys重置為0、needsToSelectAgain 為true蛀序。
當(dāng)needsToSelectAgain 為true之后欢瞪,有兩個步驟:
1.selectedKeys清空 -> selectedKeys.reset(i + 1);
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
- 再次填充selectedKeys ->selectAgain
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
至于為什么需要重新去填充selectedKeys,可能是需要保持selectedKeys里面的Channel都隨時保持的是活躍的徐裸。
processSelectedKeys到這就分析完了遣鼓,總共分為三步
- 遍歷selectedKeys
- 處理IO事件
- 是否需要重置selectedKeys
ranTasks
現(xiàn)在分析thread的最后一步工作ranTasks,執(zhí)行隊列里的任務(wù)重贺。
1. 任務(wù)類型
NioEventLoop里的任務(wù)類型分為兩部分骑祟,一個是由taskQueue(MpscUnboundedArrayQueue)存放普通的任務(wù),還有一個scheduledTaskQueue存放定時任務(wù)的隊列檬姥。之前分析過EventLoop繼承自ScheduledExecutorService曾我,所以也需要提供執(zhí)行定時任務(wù)的功能,而這里的定時任務(wù)是通過PriorityQueue來實現(xiàn)的健民。(定時任務(wù)的實現(xiàn)方式有很多抒巢,優(yōu)先隊列只是其中一種)ranTasks執(zhí)行的任務(wù)其實就是兩部分的內(nèi)容,一個是普通隊列中的任務(wù)和定時隊列中的任務(wù)秉犹。
2. ioRatio
在分析執(zhí)行細(xì)節(jié)之前蛉谜,在提一個很重要的參數(shù)ioRatio稚晚,代表設(shè)置事件循環(huán)中I/O所需時間的百分比,意思就是在一次循環(huán)中型诚,處理IO事件的時間與處理隊列任務(wù)所占時間做一個百分比的分配客燕,范圍是1到100,當(dāng)設(shè)置為100時狰贯,這個參數(shù)就失效了也搓,默認(rèn)參數(shù)為50。下面代碼就是對ioRatio的使用
//等于100的時候涵紊,參數(shù)失效傍妒,不再平衡IO事件所占時間的比例
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//開始執(zhí)行IO事件的時間
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 獲得IO執(zhí)行總共耗時
final long ioTime = System.nanoTime() - ioStartTime;
//按照ioRatio計算出將花費多少時間執(zhí)行ranTasks
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
3. runAllTasks
protected boolean runAllTasks(long timeoutNanos) {
//將scheduledTaskQueue隊列中的任務(wù)轉(zhuǎn)移到taskQueue中
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
//任務(wù)為空結(jié)束
if (task == null) {
afterRunningAllTasks();
return false;
}
//計算本次執(zhí)行任務(wù)最遲的時間
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
//執(zhí)行任務(wù)
safeExecute(task);
runTasks ++;
//每執(zhí)行64個任務(wù)之后判斷時間是否超出,若超出結(jié)束循環(huán)
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//沒有任務(wù)結(jié)束循環(huán)
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
主要流程為
- scheduledTaskQueue隊列中的任務(wù)轉(zhuǎn)移到taskQueue中摸柄;
- 安全的執(zhí)行任務(wù)(其實就是將任務(wù)try catch颤练,以免任務(wù)執(zhí)行發(fā)生異常,影響其他任務(wù)執(zhí)行)驱负;
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
- 每執(zhí)行64個任務(wù)之后判斷執(zhí)行時間是否超出deadline嗦玖,這里采用64個任務(wù)為一個批次,沒有每次任務(wù)執(zhí)行去判斷跃脊,也是對性能的一個優(yōu)化宇挫;
- 執(zhí)行afterRunningAllTasks方法,其實就是執(zhí)行tailTasks隊列中的任務(wù)匾乓,然后記錄一下最后的執(zhí)行時間this.lastExecutionTime = lastExecutionTime;
一些細(xì)節(jié)
selectedKeySet
前面提到過netty將NIO中Selector的selectedKeys替換捞稿,這里分析一下為什么需要替換和么去替換的selectedKeys。
- 為什么替換
NIO原生的selectedKeys使用的是HashSet拼缝,而NioEventLoop將其替換成了SelectedSelectionKeySet
//SelectorImpl
protected Set<SelectionKey> selectedKeys = new HashSet();
//NioEventLoop
private SelectedSelectionKeySet selectedKeys;
SelectedSelectionKeySet構(gòu)造函數(shù)
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
SelectedSelectionKeySet使用的是數(shù)組存儲元素娱局,而HashSet是基于HashMap去存儲數(shù)據(jù),采用數(shù)組使得空間利用率和遍歷的效率有所提高咧七。
2.怎么替換
要在運行時替換掉類的屬性衰齐,很明顯是通過反射來做到的。
- 獲取sun.nio.ch.SelectorImpl Class對象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
- 創(chuàng)建selectedKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
- 設(shè)置屬性
//獲取屬性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
......
//將selectedKeySet設(shè)置到屬性中
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
NIO空輪詢bug
NIO有一個很出名的bug就是epoll空輪詢的bug继阻,這會導(dǎo)致CPU占有率到100%耻涛,java也并沒有修復(fù)這個bug,netty采用了一個很巧妙的方法來繞過這個bug瘟檩。
主要思想就是抹缕,通過檢測發(fā)生空輪詢的次數(shù),當(dāng)超過一定的閾值之后墨辛,netty將會重新創(chuàng)建一個selector卓研,并將之前selector上的channel轉(zhuǎn)移到新的selector上。通過重新創(chuàng)建selector的方式來解決NIO空輪詢的bug。
unexpectedSelectorWakeup
//空輪詢的次數(shù)超過閾值奏赘,默認(rèn)為512
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
//重新構(gòu)建selector
rebuildSelector();
return true;
}
跟進(jìn)去找到具體的實現(xiàn)方法rebuildSelector0
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
try {
//創(chuàng)建新的selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
//將舊的selector上的channel全部注冊到新的selector上
}
//賦值
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 關(guān)閉舊的selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
總結(jié)
本文分析NioEventLoop中所對應(yīng)的唯一的thread寥闪,啟動是一個懶加載的過程,當(dāng)?shù)谝淮稳蝿?wù)執(zhí)行的時候才會初始化磨淌。后續(xù)thread開始循環(huán)處理三件事件
- 檢測IO事件 疲憋;
- 處理準(zhǔn)備就緒的IO事件;
- 執(zhí)行隊列里的任務(wù)
本文也對具體的代碼進(jìn)行了分析梁只,還有一些netty對NIO的優(yōu)化和bug處理缚柳,當(dāng)然netty的精妙之處遠(yuǎn)不止本文分析的這些,更多的還需要自己去探索和學(xué)習(xí)敛纲。