Netty NioEventLoop
Reactor 模型
Netty實(shí)現(xiàn)并擴(kuò)展了Reactor模型,為了更好的了解EventLoop屑彻,我們有必要先看一下Reactor模型的定義徊哑。
在wiki對reactor pattern的定義中硼啤,指出了一下集中角色:
- Resource:資源指的是提供系統(tǒng)輸入或者消費(fèi)系統(tǒng)輸出的資源艰猬。在Netty中它指的是SocketChannel横堡,它們應(yīng)支持select。
- Demultiplexer:事件分離器負(fù)責(zé)對資源進(jìn)行輪尋等待冠桃,當(dāng)資源ready的時(shí)候命贴,分離器負(fù)責(zé)將數(shù)據(jù)發(fā)送給Dispatcher。
- Dispatcher:處理Handler的注冊和反注冊食听。當(dāng)資源到達(dá)時(shí)負(fù)責(zé)把資源分發(fā)到相應(yīng)的Handler中套么。
- Handler:負(fù)責(zé)處理數(shù)據(jù)。
在Netty中EventLoop兼負(fù)了Demultiplexer以及Dispatcher兩個(gè)角色碳蛋。下邊我們通過來看NioEventLoop的源碼學(xué)習(xí)學(xué)習(xí)并了解Netty中的EventLoop。
EventLoop源碼
NioEventLoop的核心方法是run()方法省咨,一旦Netty程序啟動(dòng)之后肃弟,這個(gè)就一直循環(huán)跑下去,不間斷的查詢IO和處理task。
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
}
///
}
關(guān)于selectionStrategy
首先我們來看第一個(gè)邏輯Select Strategy笤受。這段邏輯主要控制這次循環(huán)是執(zhí)行:跳過穷缤;select操作;還是fall through箩兽。判斷依據(jù)是這樣的:
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果當(dāng)前EventLoop中有未處理的task津肛,則執(zhí)行selectorNowSupplier。selectorNowSupplier調(diào)用了selectNow汗贫。selectNow調(diào)用的是Selector的selectNow這個(gè)非阻塞方法身坐。執(zhí)行完selectNow則跳出switch運(yùn)行下邊的processSelectedKeys邏輯。
為了高效的利用CPU落包,EventLoop中只要有未消費(fèi)的task則優(yōu)先消費(fèi)task部蛇。
Nio中Selector.select()是阻塞的,直到某個(gè)selection key可用select方法才會(huì)返回咐蝇。Selector.selectNow()則檢查自從上次select到現(xiàn)在有沒有可用的selection key涯鲁,然后立即返回。
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
select操作
select操作主要是檢查當(dāng)前的selection key有序,看哪些已a(bǔ)vailable抹腿。
上邊我們說到了Selector.select操作是阻塞的,那么如果我不想等了旭寿,可以中斷它嗎警绩?可以,Selector.wakeup可以喚醒正在阻塞的select()操作许师。但是如果當(dāng)前沒有select操作房蝉,執(zhí)行了wakeUp操作,那么下次執(zhí)行的select()或者selectNow()操作將被立即喚醒微渠。
但是Selector.wakeup是開銷比較大的操作,不能每次都直接調(diào)用wakeup搭幻,于是NioEventLoop中聲明了wakenUp(AtomicBoolean)字段,用于控制selector.wakeup()的調(diào)用逞盆。調(diào)用wakeup之前先wakenUp.compareAndSet(false, true)
檀蹋,如果set成功才執(zhí)行Selector.wakeup()操作。
當(dāng)用戶提交新的任務(wù)時(shí)executor.execute(...)云芦,會(huì)觸發(fā)wakeup操作俯逾。
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
這段代碼有一段非常長的注釋,解釋了為什么這段邏輯這樣實(shí)現(xiàn)舅逸。并且給出了什么情況下會(huì)產(chǎn)生競態(tài)條件:
wakenUp.set(false)
selector.select(...)
wakenUp.set(false)執(zhí)行后桌肴,用戶出發(fā)了wakeup操作,然后執(zhí)行select操作琉历,這時(shí)select將立即返回坠七。直到下次循環(huán)把wakenUp重置為false水醋,期間所有的wakenUp.compareAndSet(false, true)都是執(zhí)行失敗的,因?yàn)楝F(xiàn)在wakenUp的值是true彪置。所以接下來的select()都不能被wakeup拄踪。
select 內(nèi)部邏輯
接下來我們看select是如何實(shí)現(xiàn)的:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { // 1
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) { // 2
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // 3
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 重建Selector,舊的Selector中的Selection Key要拷貝到新的Selector中
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
///
}
selectCnt標(biāo)記select執(zhí)行的次數(shù)拳魁,用于檢測NIO的epoll bug惶桐。在這個(gè)方法尾部有一個(gè)判斷:
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {}
判斷select記次是否超過了伐值,如果是的話有可能觸發(fā)了Nio epoll bug潘懊,執(zhí)行重建selector的邏輯:新建一個(gè)Selector姚糊,把原來老的selection key都復(fù)制過去卦尊。重建完成之后再執(zhí)行一次selectNow叛拷。
因?yàn)閟elect操作是阻塞的岂却,如果長時(shí)間沒有IO可用忿薇,就會(huì)造成NioEventLoop中的task積壓躏哩。因此每次執(zhí)行select操作都設(shè)定一個(gè)超時(shí):
1.查詢定時(shí)任務(wù)重最近要被執(zhí)行的task還有多長時(shí)間執(zhí)行.
2.這個(gè)時(shí)間加上0.5ms就是最大超時(shí)時(shí)間署浩。
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
整體來看一下這個(gè)for循環(huán):
- 第1個(gè)if:如果timeoutMillis小于0,則立即執(zhí)行一次異步的selectNow扫尺,跳出循環(huán)消費(fèi)task。
- 第2個(gè)if:如果當(dāng)前taskQueue中有task正驻,并且沒有被wakeup,則執(zhí)行一次異步的selectNow姑曙,跳出循環(huán)消費(fèi)task襟交。
- 接下來執(zhí)行select,并記次伤靠。
- 第3個(gè)if:如果有available keys 或者 被用戶喚醒 或者 任務(wù)隊(duì)列定時(shí)隊(duì)列有任務(wù)則中斷捣域。
- 最后就是重建selector的過程宴合。
processSelectedKeys
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
NioEventLoop.run方法的后半段邏輯主要是processSelectedKeys(處理IO)和runTasks(消費(fèi)任務(wù))。這里有一個(gè)參數(shù)用于控制處理這兩種任務(wù)的時(shí)間配比:ioRatio卦洽。
先來看一下processSelectedKeys,它的邏輯由processSelectedKeysOptimized和processSelectedKeysPlain實(shí)現(xiàn)蜗字,調(diào)用那個(gè)函數(shù)取決于你是否開啟了DISABLE_KEYSET_OPTIMIZATION
打肝。如果開啟了Selection 優(yōu)化選項(xiàng)挪捕,則在創(chuàng)建Selector的時(shí)候以反射的方式把SelectedSelectionKeySet selectedKeys設(shè)置到selector中争便。具體實(shí)現(xiàn)在openSelector中级零,代碼就不貼出來了滞乙。SelectedSelectionKeySet內(nèi)部是基于Array實(shí)現(xiàn)的,而Selector內(nèi)部selectedKeys是Set類型的斩启,遍歷效率Array效率更好一下。
我們來分析processSelectedKeysPlain方法:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
// 處理channel中的數(shù)據(jù)
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
SelectionKey上邊可以掛載Attachment发绢,一般情況下新的鏈接對象Channel會(huì)掛到attachment上。我們在遍歷selectedKeys時(shí)边酒,首先取出selection key上的attachment,key的類型可能是AbstractNioChannel和NioTask墩朦。根據(jù)不同的類型調(diào)用不同的處理函數(shù)翻擒。我們著重看處理channel的邏輯:
1.如果selection key是:SelectionKey.OP_CONNECT氓涣,那表明這是一個(gè)鏈接操作陋气。對于鏈接操作,我們需要把這個(gè)selection key從intrestOps中清除掉恩伺,否則下次select操作會(huì)直接返回。接下來調(diào)用finishConnect方法晶渠。
2.如果selection key是:SelectionKey.OP_WRITE。則執(zhí)行flush操作褒脯,把數(shù)據(jù)刷到客戶端。
3.如果是read操作則調(diào)用unsafe.read()番川。這個(gè)操作就不展開了脊框,等到接下來的文章践啄,專門分析read操作浇雹。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
整體來看NioEventLoop的實(shí)現(xiàn)也不復(fù)雜,主要就干了兩件事情:select IO以及消費(fèi)task屿讽。因?yàn)閟elect操作是阻塞的(盡管設(shè)置了超時(shí)時(shí)間)昭灵,每次執(zhí)行select時(shí)都會(huì)檢查是否有新的task,有則優(yōu)先執(zhí)行task伐谈。這么做也是做大限度的提高EventLoop的吞吐量,減少阻塞時(shí)間抠蚣。
除了這兩件事兒呢,NioEventLoop還解決了JDK中注明的EPoll bug嘶窄。到此NioEventLoop源碼分析完結(jié)奇昙。