上一節(jié)我們研究了NioEventLoop
的執(zhí)行過(guò)程悼院,但是select(wakenUp.getAndSet(false));
和processSelectedKeys();
沒(méi)有分析硼身。
這一節(jié)研究select()
,改方法干的事情其實(shí)是輪詢檢查IO事件啊送。有三個(gè)階段
- 第一個(gè)階段据途,計(jì)算超時(shí)時(shí)間并判斷是否超時(shí),超時(shí)則進(jìn)行一次非阻塞式的select郎任,中斷輪詢檢測(cè);沒(méi)有超時(shí)备籽,同樣也會(huì)判斷
taskQueue和tailQueue
中是否有有任務(wù)舶治,有任務(wù)也進(jìn)行同樣的操作 - 第二個(gè)階段,未超時(shí)车猬,且任務(wù)隊(duì)列為空霉猛,進(jìn)行阻塞式的select,直到檢測(cè)到一個(gè)已注冊(cè)的IO事件發(fā)生,阻塞時(shí)間默認(rèn)為1秒鐘珠闰。
- 第三個(gè)階段惜浅,該階段主要的作用就是防止空輪詢bug,導(dǎo)致cpu暫用率過(guò)高導(dǎo)致宕機(jī)的情況伏嗜。
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//當(dāng)前時(shí)間 + 截止時(shí)間 (NioEventLoop底層維持了一個(gè)定時(shí)任務(wù)隊(duì)列坛悉,按照任務(wù)的截止時(shí)間正序排序)
//delayNanos(currentTimeNanos) : 計(jì)算當(dāng)前定時(shí)任務(wù)隊(duì)列第一個(gè)任務(wù)的截止時(shí)間
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
//輪詢
for (;;) {
//計(jì)算超時(shí)時(shí)間
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
//判斷是否超時(shí)
if (timeoutMillis <= 0) {
//超時(shí)伐厌,判斷有沒(méi)有select
if (selectCnt == 0) {
//沒(méi)有select,進(jìn)行非阻塞的select,異步檢測(cè)IO事件裸影。不阻塞
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
//未超時(shí)
//判斷當(dāng)前任務(wù)隊(duì)列(taskQueue)和異步任務(wù)隊(duì)列(tailQueue)是否有任務(wù)
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
//有任務(wù)挣轨,進(jìn)行一次非阻塞式的select
selector.selectNow();
selectCnt = 1;
break;
}
//第二個(gè)階段
//未超時(shí),且任務(wù)隊(duì)列為空的話轩猩,進(jìn)行阻塞式的select,直到檢測(cè)到一個(gè)已注冊(cè)的IO事件發(fā)生卷扮,timoutMillis是阻塞最大時(shí)間,默認(rèn)1000,
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
//第三個(gè)階段
//防止jdk空輪詢的bug
long time = System.nanoTime();
//判斷執(zhí)行了一次阻塞式select后均践,當(dāng)前時(shí)間和開(kāi)始時(shí)間是否大于超時(shí)時(shí)間画饥。(大于是很正常的,小于的話浊猾,說(shuō)明沒(méi)有執(zhí)行發(fā)生了空輪詢)
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
// SELECTOR_AUTO_REBUILD_THRESHOLD的值為512
//selectCut >= 512 也就是空輪詢測(cè)試大于等于512次的話抖甘,空輪詢處理發(fā)生,調(diào)用改方法避免下一次發(fā)生
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
- 重點(diǎn)說(shuō)一下防止空輪詢bug葫慎,在第三階段衔彻,代碼
time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos
time:當(dāng)前時(shí)間
timeoutMillis:阻塞式select超時(shí)時(shí)間
currentTimeNanos:本方法開(kāi)始時(shí)間
因此滿足該條件就是,經(jīng)過(guò)了阻塞式的select檢測(cè)IO事件偷办,但是并沒(méi)有任何事件發(fā)生艰额,盡力了。這時(shí)候select的計(jì)數(shù)加一椒涯,累計(jì)到閾值SELECTOR_AUTO_REBUILD_THRESHOLD
的時(shí)候則滿足了空輪詢的條件柄沮,該閾值在本類靜態(tài)代碼塊中初始化為512
,經(jīng)過(guò)512次什么事情都沒(méi)有做,表表示滿足空輪詢的條件废岂。
//selectCut >= 512 也就是空輪詢測(cè)試大于等于512次的話祖搓,空輪詢處理發(fā)生,調(diào)用改方法避免下一次發(fā)生
selector = selectRebuildSelector(selectCnt);
private Selector selectRebuildSelector(int selectCnt) throws IOException {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
//充值Selector
rebuildSelector();
Selector selector = this.selector;
//非阻塞select 檢測(cè)IO事件
// Select again to populate selectedKeys.
selector.selectNow();
return selector;
}
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
通過(guò)分析源碼湖苞,發(fā)現(xiàn)空輪詢bug的解決策略就是重置selector
拯欧,重新打開(kāi)一個(gè)selector
,依照舊的selector
重新注冊(cè)所有的IO事件到原有通道上财骨,生成新的selectionKey
镐作,并刷新該值,同時(shí)刷新的還有和selector
相關(guān)的屬性值,最后把舊的給關(guān)閉掉隆箩。這樣不會(huì)無(wú)限死循環(huán)该贾,僅僅會(huì)替換掉原有的屬性值而已,做了一些動(dòng)作捌臊。
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
//重新創(chuàng)建一個(gè)selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
//輪詢舊操作的所有的key和attachment
//attachment 其實(shí)就是netty包裝了的channel
for (SelectionKey key: oldSelector.keys()) {
//獲取channel
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
//注冊(cè)key注冊(cè)的事件
int interestOps = key.interestOps();
//取消之前key的事件
key.cancel();
//將事件和channel注冊(cè)到新創(chuàng)建的selector上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
//替換新的selectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
//重置新的selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
//關(guān)閉舊的selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}