我們知道, 一個(gè) Netty 程序啟動(dòng)時(shí), 至少要指定一個(gè) EventLoopGroup(如果使用到的是 NIO, 那么通常是 NioEventLoopGroup), 那么這個(gè) NioEventLoopGroup 在 Netty 中到底扮演著什么角色呢?
Netty 是 Reactor 模型的一個(gè)實(shí)現(xiàn), 那么首先從 Reactor 的線(xiàn)程模型開(kāi)始吧.
關(guān)于 Reactor 的線(xiàn)程模型
- 單線(xiàn)程模型
- 多線(xiàn)程模型
-
主從多線(xiàn)程模型
先來(lái)看一下單線(xiàn)程模型:
所謂單線(xiàn)程, 即 acceptor 處理和 handler 處理都在一個(gè)線(xiàn)程中處理. 這個(gè)模型的壞處顯而易見(jiàn): 當(dāng)其中某個(gè) handler 阻塞時(shí), 會(huì)導(dǎo)致其他所有的 client 的 handler 都得不到執(zhí)行蝇更, 并且更嚴(yán)重的是, handler 的阻塞也會(huì)導(dǎo)致整個(gè)服務(wù)不能接收新的 client 請(qǐng)求(因?yàn)?acceptor 也被阻塞了). 因?yàn)橛羞@么多的缺陷, 因此單線(xiàn)程Reactor 模型用的比較少.
那么什么是 多線(xiàn)程模型 呢? Reactor 的多線(xiàn)程模型與單線(xiàn)程模型的區(qū)別就是 acceptor 是一個(gè)單獨(dú)的線(xiàn)程處理, 并且有一組特定的 NIO 線(xiàn)程來(lái)負(fù)責(zé)各個(gè)客戶(hù)端連接的 IO 操作. Reactor 多線(xiàn)程模型如下:
Reactor 多線(xiàn)程模型 有如下特點(diǎn):
- 有專(zhuān)門(mén)一個(gè)線(xiàn)程, 即 Acceptor 線(xiàn)程用于監(jiān)聽(tīng)客戶(hù)端的TCP連接請(qǐng)求.
- 客戶(hù)端連接的 IO 操作都是由一個(gè)特定的 NIO 線(xiàn)程池負(fù)責(zé). 每個(gè)客戶(hù)端連接都與一個(gè)特定的 NIO 線(xiàn)程綁定, 因此在這個(gè)客戶(hù)端連接中的所有 IO 操作都是在同一個(gè)線(xiàn)程中完成的.
- 客戶(hù)端連接有很多, 但是 NIO 線(xiàn)程數(shù)是比較少的, 因此一個(gè) NIO 線(xiàn)程可以同時(shí)綁定到多個(gè)客戶(hù)端連接中.
接下來(lái)我們?cè)賮?lái)看一下 Reactor 的主從多線(xiàn)程模型.
一般情況下, Reactor 的多線(xiàn)程模式已經(jīng)可以很好的工作了, 但是我們考慮一下如下情況: 如果我們的服務(wù)器需要同時(shí)處理大量的客戶(hù)端連接請(qǐng)求或我們需要在客戶(hù)端連接時(shí), 進(jìn)行一些權(quán)限的檢查, 那么單線(xiàn)程的 Acceptor 很有可能就處理不過(guò)來(lái), 造成了大量的客戶(hù)端不能連接到服務(wù)器.Reactor 的主從多線(xiàn)程模型就是在這樣的情況下提出來(lái)的, 它的特點(diǎn)是: 服務(wù)器端接收客戶(hù)端的連接請(qǐng)求不再是一個(gè)線(xiàn)程, 而是由一個(gè)獨(dú)立的線(xiàn)程池組成. 它的線(xiàn)程模型如下:
可以看到, Reactor 的主從多線(xiàn)程模型和 Reactor 多線(xiàn)程模型很類(lèi)似, 只不過(guò) Reactor 的主從多線(xiàn)程模型的 acceptor 使用了線(xiàn)程池來(lái)處理大量的客戶(hù)端請(qǐng)求.
NioEventLoopGroup 與 Reactor 線(xiàn)程模型的對(duì)應(yīng)
我們介紹了三種 Reactor 的線(xiàn)程模型, 那么它們和 NioEventLoopGroup 又有什么關(guān)系呢? 其實(shí), 不同的設(shè)置 NioEventLoopGroup 的方式就對(duì)應(yīng)了不同的 Reactor 的線(xiàn)程模型.
單線(xiàn)程模型
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
.channel(NioServerSocketChannel.class)
...
注意, 我們實(shí)例化了一個(gè) NioEventLoopGroup, 構(gòu)造器參數(shù)是1, 表示 NioEventLoopGroup 的線(xiàn)程池大小是1. 然后接著我們調(diào)用 b.group(bossGroup) 設(shè)置了服務(wù)器端的 EventLoopGroup. 有些朋友可能會(huì)有疑惑: 我記得在啟動(dòng)服務(wù)器端的 Netty 程序時(shí), 是需要設(shè)置 bossGroup 和 workerGroup 的, 為什么這里就只有一個(gè) bossGroup?
其實(shí)很簡(jiǎn)單, ServerBootstrap 重寫(xiě)了 group 方法:
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
因此當(dāng)傳入一個(gè) group 時(shí), 那么 bossGroup 和 workerGroup 就是同一個(gè) NioEventLoopGroup 了.
這時(shí)候呢, 因?yàn)?bossGroup 和 workerGroup 就是同一個(gè) NioEventLoopGroup, 并且這個(gè) NioEventLoopGroup 只有一個(gè)線(xiàn)程, 這樣就會(huì)導(dǎo)致 Netty 中的 acceptor 和后續(xù)的所有客戶(hù)端連接的 IO 操作都是在一個(gè)線(xiàn)程中處理的斥滤。那么對(duì)應(yīng)到 Reactor 的線(xiàn)程模型中, 我們這樣設(shè)置 NioEventLoopGroup 時(shí), 就相當(dāng)于 Reactor 單線(xiàn)程模型职员。
多線(xiàn)程模型
同理, 再來(lái)看一下下面的例子:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
...
bossGroup 中只有一個(gè)線(xiàn)程, 而 workerGroup 中的線(xiàn)程是 CPU 核心數(shù)乘以2, 因此對(duì)應(yīng)的到 Reactor 線(xiàn)程模型中, 我們知道, 這樣設(shè)置的 NioEventLoopGroup 其實(shí)就是 Reactor 多線(xiàn)程模型.
主從多線(xiàn)程模型
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
...
其實(shí)在Netty 的服務(wù)器端的 acceptor 階段, 沒(méi)有使用到多線(xiàn)程, 因此上面的 主從多線(xiàn)程模型在 Netty 的服務(wù)器端是不存在的.
服務(wù)器端的 ServerSocketChannel 只綁定到了 bossGroup 中的一個(gè)線(xiàn)程, 因此在調(diào)用 Java NIO 的 Selector.select 處理客戶(hù)端的連接請(qǐng)求時(shí), 實(shí)際上是在一個(gè)線(xiàn)程中的, 所以對(duì)只有一個(gè)服務(wù)的應(yīng)用來(lái)說(shuō), bossGroup 設(shè)置多個(gè)線(xiàn)程是沒(méi)有什么作用的, 反而還會(huì)造成資源浪費(fèi).
那么Netty 中的 bossGroup 為什么使用線(xiàn)程池的原因大家眾所紛紜牲平,在stackoverflow有人說(shuō):
the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps
NioEventLoopGroup
我們?cè)賮?lái)簡(jiǎn)單回顧下NioEventLoopGroup 的初始化過(guò)程,
- EventLoopGroup(其實(shí)是MultithreadEventExecutorGroup) 內(nèi)部維護(hù)一個(gè)類(lèi)型為 EventExecutor children 數(shù)組, 其大小是 nThreads, 這樣就構(gòu)成了一個(gè)線(xiàn)程池
- 如果我們?cè)趯?shí)例化 NioEventLoopGroup 時(shí), 如果指定線(xiàn)程池大小, 則 nThreads 就是指定的值, 反之是處理器核心數(shù) * 2
- MultithreadEventExecutorGroup 中會(huì)調(diào)用 newChild 抽象方法來(lái)初始化 children 數(shù)組
- 抽象方法 newChild 是在 NioEventLoopGroup 中實(shí)現(xiàn)的, 它返回一個(gè) NioEventLoop 實(shí)例.
- NioEventLoop 屬性: SelectorProvider provider 屬性: NioEventLoopGroup 構(gòu)造器中通過(guò) SelectorProvider.provider() 獲取一個(gè) SelectorProvider;Selector selector 屬性: NioEventLoop 構(gòu)造器中通過(guò)調(diào)用通過(guò) selector = provider.openSelector() 獲取一個(gè) selector 對(duì)象.
NioEventLoop
NioEventLoop 繼承于 SingleThreadEventLoop, 而 SingleThreadEventLoop 又繼承于 SingleThreadEventExecutor. SingleThreadEventExecutor 是 Netty 中對(duì)本地線(xiàn)程的抽象, 它內(nèi)部有一個(gè) Thread thread 屬性, 存儲(chǔ)了一個(gè)本地 Java 線(xiàn)程. 因此我們可以認(rèn)為, 一個(gè) NioEventLoop 其實(shí)和一個(gè)特定的線(xiàn)程綁定, 并且在其生命周期內(nèi), 綁定的線(xiàn)程都不會(huì)再改變.
NioEventLoop 類(lèi)層次結(jié)構(gòu)
NioEventLoop 的類(lèi)層次結(jié)構(gòu)圖還是比較復(fù)雜的, 不過(guò)我們只需要關(guān)注幾個(gè)重要的點(diǎn)即可. 首先 NioEventLoop 的繼承鏈如下:
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
在 AbstractScheduledEventExecutor 中, Netty 實(shí)現(xiàn)了 NioEventLoop 的 schedule 功能, 即我們可以通過(guò)調(diào)用一個(gè) NioEventLoop 實(shí)例的 schedule 方法來(lái)運(yùn)行一些定時(shí)任務(wù). 而在 SingleThreadEventLoop 中, 又實(shí)現(xiàn)了任務(wù)隊(duì)列的功能, 通過(guò)它, 我們可以調(diào)用一個(gè) NioEventLoop 實(shí)例的 execute 方法來(lái)向任務(wù)隊(duì)列中添加一個(gè) task, 并由 NioEventLoop 進(jìn)行調(diào)度執(zhí)行
通常來(lái)說(shuō), NioEventLoop 肩負(fù)著兩種任務(wù), 第一個(gè)是作為 IO 線(xiàn)程, 執(zhí)行與 Channel 相關(guān)的 IO 操作, 包括 調(diào)用 select 等待就緒的 IO 事件、讀寫(xiě)數(shù)據(jù)與數(shù)據(jù)的處理等; 而第二個(gè)任務(wù)是作為任務(wù)隊(duì)列, 執(zhí)行 taskQueue 中的任務(wù), 例如用戶(hù)調(diào)用 eventLoop.schedule 提交的定時(shí)任務(wù)也是這個(gè)線(xiàn)程執(zhí)行的.
從上圖可以看到, SingleThreadEventExecutor 有一個(gè)名為 thread 的 Thread 類(lèi)型字段, 這個(gè)字段就代表了與 SingleThreadEventExecutor 關(guān)聯(lián)的本地線(xiàn)程.
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略清理代碼
...
}
}
});
threadProperties = new DefaultThreadProperties(thread);
taskQueue = newTaskQueue();
}
在 SingleThreadEventExecutor 構(gòu)造器中, 通過(guò) threadFactory.newThread 創(chuàng)建了一個(gè)新的 Java 線(xiàn)程. 在這個(gè)線(xiàn)程中所做的事情主要就是調(diào)用 SingleThreadEventExecutor.this.run() 方法, 而因?yàn)?NioEventLoop 實(shí)現(xiàn)了這個(gè)方法, 因此根據(jù)多態(tài)性, 其實(shí)調(diào)用的是 NioEventLoop.run() 方法.
EventLoop 與 Channel 的關(guān)聯(lián)
Netty 中, 每個(gè) Channel 都有且僅有一個(gè) EventLoop 與之關(guān)聯(lián), 它們的關(guān)聯(lián)過(guò)程如下:
從上圖中我們可以看到, 當(dāng)調(diào)用了 AbstractChannel#AbstractUnsafe.register 后, 就完成了 Channel 和 EventLoop 的關(guān)聯(lián). register 實(shí)現(xiàn)如下:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 刪除條件檢查.
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
在 AbstractChannel#AbstractUnsafe.register 中, 會(huì)將一個(gè) EventLoop 賦值給 AbstractChannel 內(nèi)部的 eventLoop 字段, 到這里就完成了 EventLoop 與 Channel 的關(guān)聯(lián)過(guò)程.
EventLoop 的啟動(dòng)
我們已經(jīng)知道,NioEventLoop 本身就是一個(gè) SingleThreadEventExecutor, 因此 NioEventLoop 的啟動(dòng), 其實(shí)就是 NioEventLoop 所綁定的本地 Java 線(xiàn)程的啟動(dòng). 依照這個(gè)思想, 我們只要找到在哪里調(diào)用了 SingleThreadEventExecutor 的 thread 字段的 start() 方法就可以知道是在哪里啟動(dòng)的這個(gè)線(xiàn)程了.
從代碼中搜索, thread.start() 被封裝到 SingleThreadEventExecutor.startThread() 方法中了:
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
thread.start();
}
}
}
STATE_UPDATER 是 SingleThreadEventExecutor 內(nèi)部維護(hù)的一個(gè)屬性, 它的作用是標(biāo)識(shí)當(dāng)前的 thread 的狀態(tài). 在初始的時(shí)候, STATE_UPDATER == ST_NOT_STARTED, 因此第一次調(diào)用 startThread() 方法時(shí), 就會(huì)進(jìn)入到 if 語(yǔ)句內(nèi), 進(jìn)而調(diào)用到 thread.start().
而這個(gè)關(guān)鍵的 startThread() 方法又是在哪里調(diào)用的呢? 經(jīng)過(guò)方法調(diào)用關(guān)系搜索, 我們發(fā)現(xiàn), startThread 是在 SingleThreadEventExecutor.execute 方法中調(diào)用的:
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread(); // 調(diào)用 startThread 方法, 啟動(dòng)EventLoop 線(xiàn)程.
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
既然如此, 那現(xiàn)在我們的工作就變?yōu)榱藢ふ?在哪里第一次調(diào)用了 SingleThreadEventExecutor.execute() 方法.
我們?cè)?EventLoop 與 Channel 的關(guān)聯(lián) 這一小節(jié)時(shí), 有提到到在注冊(cè) channel 的過(guò)程中, 會(huì)在 AbstractChannel#AbstractUnsafe.register 中調(diào)用 eventLoop.execute 方法, 在 EventLoop 中進(jìn)行 Channel 注冊(cè)代碼的執(zhí)行, AbstractChannel#AbstractUnsafe.register 部分代碼如下:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
很顯然, 一路從 Bootstrap.connect 方法跟蹤到 AbstractChannel#AbstractUnsafe.register 方法, 整個(gè)代碼都是在主線(xiàn)程中運(yùn)行的, 因此上面的 eventLoop.inEventLoop() 就為 false, 于是進(jìn)入到 else 分支, 在這個(gè)分支中調(diào)用了 eventLoop.execute. eventLoop 是一個(gè) NioEventLoop 的實(shí)例, 而 NioEventLoop 沒(méi)有實(shí)現(xiàn) execute 方法, 因此調(diào)用的是 SingleThreadEventExecutor.execute:
@Override
public void execute(Runnable task) {
...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
我們已經(jīng)分析過(guò)了, inEventLoop == false, 因此執(zhí)行到 else 分支, 在這里就調(diào)用了 startThread() 方法來(lái)啟動(dòng) SingleThreadEventExecutor 內(nèi)部關(guān)聯(lián)的 Java 本地線(xiàn)程了. 總結(jié)一句話(huà), 當(dāng) EventLoop.execute 第一次被調(diào)用時(shí), 就會(huì)觸發(fā) startThread() 的調(diào)用, 進(jìn)而導(dǎo)致了 EventLoop 所對(duì)應(yīng)的 Java 線(xiàn)程的啟動(dòng).