線程模型
概述
因為具有多核心或多個CPU 的計算機現(xiàn)在已經(jīng)司空見慣吁峻,大多數(shù)的現(xiàn)代應(yīng)用程序都利用了
復(fù)雜的多線程處理技術(shù)以有效地利用系統(tǒng)資源。相比之下射赛,在早期的Java 語言中倾芝,我們使用多
線程處理的主要方式無非是按需創(chuàng)建和啟動新的Thread
來執(zhí)行并發(fā)的任務(wù)單元——一種在高
負(fù)載下工作得很差的原始方式。Java 5 隨后引入了Executor
API购裙,其線程池通過緩存和重用
Thread
極大地提高了性能。
基本的線程池化模式可以描述為:
- 從池的空閑線程列表中選擇一個
Thread
鹃栽,并且指派它去運行一個已提交的任務(wù)(一個
Runnable
的實現(xiàn))躏率; - 當(dāng)任務(wù)完成時,將該Thread 返回給該列表民鼓,使其可被重用薇芝。
雖然池化和重用線程相對于簡單地為每個任務(wù)都創(chuàng)建和銷毀線程是一種進(jìn)步,但是它并不能
消除由上下文切換所帶來的開銷丰嘉,其將隨著線程數(shù)量的增加很快變得明顯夯到,并且在高負(fù)載下愈演
愈烈。此外饮亏,僅僅由于應(yīng)用程序的整體復(fù)雜性或者并發(fā)需求耍贾,在項目的生命周期內(nèi)也可能會出現(xiàn)
其他和線程相關(guān)的問題。
EventLoop
Netty 的EventLoop
是協(xié)同設(shè)計的一部分路幸,它采用了兩個基本的API:并發(fā)和網(wǎng)絡(luò)編程荐开。
首先,io.netty.util.concurrent
包構(gòu)建在JDK 的java.util.concurrent
包上简肴,用
來提供線程執(zhí)行器晃听。其次,io.netty.channel
包中的類砰识,為了與Channel
的事件進(jìn)行交互能扒,
擴展了這些接口/類。
在這個模型中仍翰,一個EventLoop
將由一個永遠(yuǎn)都不會改變的Thread
驅(qū)動赫粥,同時任務(wù)
(Runnable
或者Callable
)可以直接提交給EventLoop
實現(xiàn),以立即執(zhí)行或者調(diào)度執(zhí)行予借。
根據(jù)配置和可用核心的不同越平,可能會創(chuàng)建多個EventLoop
實例用以優(yōu)化資源的使用频蛔,并且單個
EventLoop
可能會被指派用于服務(wù)多個Channel
。
實現(xiàn)細(xì)節(jié)
Netty線程模型的卓越性能取決于對于當(dāng)前執(zhí)行的Thread
的身份的確定秦叛,也就是說晦溪,確定它是否是分配給當(dāng)前Channel
以及它的EventLoop
的那一個線程(EventLoop將負(fù)責(zé)處理一個Channel
的整個生命周期內(nèi)的所有事件)。
如果(當(dāng)前)調(diào)用線程正是支撐EventLoop
的線程挣跋,那么所提交的代碼塊將會被(直接)
執(zhí)行三圆。否則,EventLoop
將調(diào)度該任務(wù)以便稍后執(zhí)行避咆,并將它放入到內(nèi)部隊列中舟肉。當(dāng)EventLoop
下次處理它的事件時,它會執(zhí)行隊列中的那些任務(wù)/事件查库。這也就解釋了Thread
是如何
與Channel
直接交互而無需在ChannelHandler
中進(jìn)行額外同步的路媚。
注意,每個EventLoop
都有它自已的任務(wù)隊列樊销,獨立于任何其他的EventLoop
整慎。下圖
展示了EventLoop
用于調(diào)度任務(wù)的執(zhí)行邏輯。這是Netty 線程模型的關(guān)鍵組成部分围苫。
服務(wù)于Channel 的I/O 和事件的EventLoop 包含在EventLoopGroup 中裤园。根據(jù)不同的
傳輸實現(xiàn),EventLoop 的創(chuàng)建和分配方式也不同剂府。
1. 異步傳輸
異步傳輸實現(xiàn)只使用了少量的EventLoop(以及和它們相關(guān)聯(lián)的Thread)拧揽,而且在當(dāng)前的
線程模型中,它們可能會被多個Channel 所共享周循。這使得可以通過盡可能少量的Thread 來支
撐大量的Channel强法,而不是每個Channel 分配一個Thread万俗。
圖7-4 顯示了一個EventLoopGroup湾笛,它具有3 個固定大小的EventLoop(每個EventLoop
都由一個Thread 支撐)。在創(chuàng)建EventLoopGroup 時就直接分配了EventLoop(以及支撐它們
的Thread)闰歪,以確保在需要時它們是可用的嚎研。
EventLoopGroup 負(fù)責(zé)為每個新創(chuàng)建的Channel 分配一個EventLoop。在當(dāng)前實現(xiàn)中库倘,
使用順序循環(huán)(round-robin)的方式進(jìn)行分配以獲取一個均衡的分布临扮,并且相同的EventLoop
可能會被分配給多個Channel。(這一點在將來的版本中可能會改變教翩。)
一旦一個Channel 被分配給一個EventLoop脚囊,它將在它的整個生命周期中都使用這個
EventLoop(以及相關(guān)聯(lián)的Thread)括儒。請牢記這一點,因為它可以使你從擔(dān)憂你的Channel-
Handler 實現(xiàn)中的線程安全和同步問題中解脫出來。
另外,需要注意的是角溃,EventLoop 的分配方式對ThreadLocal 的使用的影響。因為一個
EventLoop 通常會被用于支撐多個Channel,所以對于所有相關(guān)聯(lián)的Channel 來說蚂且,
ThreadLocal 都將是一樣的。這使得它對于實現(xiàn)狀態(tài)追蹤等功能來說是個糟糕的選擇幅恋。然而杏死,
在一些無狀態(tài)的上下文中,它仍然可以被用于在多個Channel 之間共享一些重度的或者代價昂
貴的對象捆交,甚至是事件淑翼。
2. 阻塞傳輸
用于像OIO(舊的阻塞I/O)這樣的其他傳輸?shù)脑O(shè)計略有不同,如圖7-5 所示品追。這里每一個Channel 都將被分配給一個EventLoop(以及它的Thread)窒舟。如果你開發(fā)的應(yīng)用程序使用過java.io 包中的阻塞I/O 實現(xiàn),你可能就遇到過這種模型诵盼。
但是惠豺,正如同之前一樣,得到的保證是每個Channel 的I/O 事件都將只會被一個Thread
(用于支撐該Channel 的EventLoop 的那個Thread)處理风宁。這也是另一個Netty 設(shè)計一致性
的例子洁墙,它(這種設(shè)計上的一致性)對Netty 的可靠性和易用性做出了巨大貢獻(xiàn)。
實現(xiàn)
介紹完Netty的線程模型后戒财,讓我看一下它是如何實現(xiàn)的热监。之前我們已經(jīng)看到了EventLoop
的類層級結(jié)構(gòu)圖,現(xiàn)在我們不管EventLoop
饮寞,而是先看EventExecutorGroup
以及對應(yīng)的實現(xiàn)孝扛。
EventExecutorGroup
首先我們先明確一件事,EventExecutorGroup
與常見的線程池類似幽崩,而EventExecutor
雖然也繼承了ExecutorService
苦始,但是它是一個特化的只維護(hù)一個線程的線程池,因此可以將其視為擁有線程池職能的線程慌申。而一個EventExecutorGroup
管理多個EventExecutor
陌选,此結(jié)構(gòu)與我們熟悉的線程池模型更加相似。理解了這件事之后蹄溉,關(guān)于EventExecutorGroup
的實現(xiàn)將清晰許多咨油。
AbstractEventExecutorGroup
是EventExecutorGroup
接口的基本實現(xiàn),遵循前面的介紹的輪詢(round-robin)模式柒爵,將每一個任務(wù)輪流分配給它所管理的EventExecutor
役电。
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
/**
* Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
EventExecutor next();
MultithreadEventExecutorGroup
則實現(xiàn)了線程池的骨架,正如其名棉胀,它是一個支持多線程執(zhí)行的EventExecutorGroup
法瑟,與java.util.concurrent
中實現(xiàn)的線程池類似囱晴。這個類的核心是它的構(gòu)造方法,展現(xiàn)了它與EventExecutor
之間是如何關(guān)聯(lián)的:
private final EventExecutor[] children;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
其中Executor
只用來構(gòu)造Thread
瓢谢,默認(rèn)為一個EventExecutor
對應(yīng)一個Thread
畸写。注意execute
方法:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
而chooser
字段代表了EventExcutor
的選擇策略(默認(rèn)為輪詢):
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
這里有一點優(yōu)化,即 &
操作相比 %
會快很多氓扛,所以當(dāng)線程池的線程數(shù)量為pow2時枯芬,針對設(shè)計了next()
方法。
choose
字段在EventExecutorGroup#next()
方法中使用采郎,用以選擇執(zhí)行任務(wù)的線程千所。
@Override
public EventExecutor next() {
return chooser.next();
}
構(gòu)造EventExecutor
的newChild()
方法交由子類實現(xiàn),以針對不同的場景進(jìn)行定制蒜埋。默認(rèn)實現(xiàn)如下所示:
@Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
}
public DefaultEventExecutor(EventExecutorGroup parent, Executor executor, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, true, maxPendingTasks, rejectedExecutionHandler);
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ThreadExecutorMap.apply(executor, this);
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
注意taskQueue = newTaskQueue(this.maxPendingTasks);
方法淫痰,每個EventExecutor
都自己維護(hù)一個任務(wù)隊列,而不是EventExecutorGroup
維護(hù)一個任務(wù)隊列整份,因此一批任務(wù)都會由固定的EventExecutor
進(jìn)行執(zhí)行待错,這樣可以避免在編寫代碼時考慮并發(fā)控制,這個優(yōu)勢在ChannelHandler
中體現(xiàn)烈评。
關(guān)于EventExecutorGroup
實現(xiàn)的說明到此為止火俄,接下來講述EventExecutor
的實現(xiàn)類。
在上面的繼承體系中讲冠,頂層類AbstractEventExecutor
繼承了jdk提供的骨架類AbstractExecutorService
瓜客,在此基礎(chǔ)上提供了自己的一些功能,比如:
@Override
public EventExecutorGroup parent() {
return parent;
}
@Override
public EventExecutor next() {
return this;
}
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
其中next()
方法我們在EventExecutorGroup
的實現(xiàn)中提及過竿开,EventExcutor
繼承了EventExecutorGroup
接口谱仪,它的實現(xiàn)就是返回它自己。
而inEventLoop()
則是核心方法否彩,在之前的線程模型中疯攒,我們說過Netty的性能取決于對當(dāng)前線程身份的確定,此方法就用于實現(xiàn)這一功能胳搞,關(guān)于此方法是如何帶來性能提升卸例,我們在后面再討論称杨。
AbstractScheduledEventExecutor
類則增加了任務(wù)調(diào)度的功能肌毅,實現(xiàn)了ScheduledExecutorService
接口。其內(nèi)部維護(hù)了一個優(yōu)先隊列姑原,以任務(wù)下一次執(zhí)行時間為基準(zhǔn)進(jìn)行比較悬而。
SingleThreadEventExecutor
則實現(xiàn)了EventExecutor
的核心功能,它的名字與MultithreadEventExecutorGroup
相對應(yīng)锭汛。
這個類的實現(xiàn)比較繁雜笨奠,有許多的輔助方法袭蝗,我們從核心方法execute()
進(jìn)行切入,我們知道EventExecutor
由EventExecutorGroup
所控制般婆,所以它的啟動需要EventExecutorGroup
幫助:
# EventExecutorGroup.java
@Override
public void execute(Runnable command) {
next().execute(command);
}
# SingleThreadEventExecutor.java -------------------------------------
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
// 將任務(wù)直接增加到任務(wù)隊列中到腥,而不直接執(zhí)行
addTask(task);
if (!inEventLoop) {
// 首次啟動時,thread字段還未初始化蔚袍,所以inEventLoop()方法一定會返回false
startThread();
// 如果EventExecutor已經(jīng)被關(guān)閉乡范,取消此任務(wù)的執(zhí)行(邊界條件)
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
// CAS修改EventExecutor的狀態(tài)為已啟動
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
在介紹真正的啟動過程之前,我們先回憶一下EventExecutor
的構(gòu)造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ThreadExecutorMap.apply(executor, this);
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
這一步對Runnable
進(jìn)行裝飾啤咽,以此將之前EventExecutorGroup
傳入的Executor
所產(chǎn)生的Thread
與EventExecutor
綁定起來晋辆。
doStartThread()
方法調(diào)用了Executor
的executor
的方法,構(gòu)造一個新的線程宇整,并執(zhí)行上面修飾后的Runnable
瓶佳。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
上面的代碼雖然很長,但是核心內(nèi)容只有:
```java
try {
SingleThreadEventExecutor.this.run();
success = true;
}
SingleThreadEventExecutor
并沒有實現(xiàn)run()
方法鳞青,而是讓其在子類中實現(xiàn)霸饲,讓我們看一下默認(rèn)實現(xiàn):
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
run()
方法內(nèi)部是一個死循環(huán),不斷從任務(wù)隊列中獲取任務(wù)然后執(zhí)行臂拓,直到EventExecutor
被關(guān)閉贴彼。
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
// 如果定時隊列中有任務(wù),并且還未到執(zhí)行時間埃儿,
// 那么嘗試從taskQueue中獲取任務(wù)并執(zhí)行器仗,直到定時任務(wù)
// 可以執(zhí)行
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}
}
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
此處的實現(xiàn)與java.util.concurrent
中線程池的工作線程基本相似,不過最重要的是線程模型之間的區(qū)別童番。
takeTask()
方法在之前的版本如下:
Runnable task;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
return null;
}
} else {
task = taskQueue.poll();
}
if (task == null) {
fetchFromDelayedQueue();
task = taskQueue.poll();
}
這個版本的實現(xiàn)有一個bug精钮,如果taskQueue
中一直有任務(wù),那么定時任務(wù)將無法被執(zhí)行剃斧。
最后轨香,讓我看看一下EventExecutor
是如何被關(guān)閉的,雖然ExecutorService
提供了shutdown()
方法幼东,但是它的接口語義是線程池關(guān)閉后將不能再接受任務(wù)臂容,這會使得某些任務(wù)無法被完成,導(dǎo)致出現(xiàn)一些問題根蟹,所以Netty定義了一個新的接口叫做shutdownGracefully
脓杉,用來取代shutdown()
。
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
if (quietPeriod < 0) {
throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
}
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (isShuttingDown()) {
return terminationFuture();
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
wakeup = true;
oldState = state;
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}
if (wakeup) {
wakeup(inEventLoop);
}
return terminationFuture();
}
這個接口擁有三個參數(shù)long quietPeriod
, long timeout
, TimeUnit unit
简逮。如果指定了quietPeriod
不為0球散,那么EventExecutor
在執(zhí)行完當(dāng)前任務(wù)列表中的任務(wù)后,并不會立刻關(guān)閉散庶,而是會繼續(xù)等待新任務(wù)蕉堰,timeout
的作用與此類似凌净。調(diào)用此接口后,線程池的狀態(tài)變更為ST_SHUTTING_DOWN
屋讶,進(jìn)入一段靜默等待的時期冰寻,如果有新任務(wù)到來,它可以成功被接受皿渗。
工作線程需要配置shutdownGracefully
工作性雄,讓我們再看一下run()
方法:
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
protected boolean confirmShutdown() {
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
if (runAllTasks() || runShutdownHooks()) {
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
// terminate if the quiet period is 0.
// See https://github.com/netty/netty/issues/4241
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
wakeup(true);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// Check if any tasks were added to the queue every 100ms.
// TODO: Change the behavior of takeTask() so that it returns on timeout.
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// No tasks were added for last quiet period - hopefully safe to shut down.
// (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
return true;
}
任務(wù)線程每執(zhí)行一次任務(wù)后,都會去檢查線程池是否被關(guān)閉羹奉,如果線程池處于ST_SHUTTING_DOWN
狀態(tài)秒旋,工作線程將進(jìn)入靜默期,等待線程池徹底關(guān)閉或者超時诀拭。如果quietPeriod
指定為0迁筛,那么工作線程執(zhí)行完任務(wù)隊列中的所有任務(wù)后,工作線程將停止耕挨,線程池徹底關(guān)閉细卧。否則,它會等待超時或者quietPeriod
到達(dá)筒占,在此期間每100ms檢查一次任務(wù)隊列贪庙,如果有新任務(wù)到來那么執(zhí)行它。
EventLoopGroup
EventLoopGruop
的類層次結(jié)構(gòu)如下所示翰苫,雖然DefaultEventLoopGroup
只是一個簡單的實現(xiàn)類止邮,但是NioEventLoopGroup
等常用組件都和它的繼承體系差不多,在此只看其基本實現(xiàn)奏窑,排除掉和IO關(guān)聯(lián)的實現(xiàn)類导披。
從上圖可見,MultithreadEventLoopGroup
直接繼承自MultithreadEventExecutorGroup
埃唯,在原本并發(fā)架構(gòu)的基礎(chǔ)上撩匕,進(jìn)行定制,實現(xiàn)了EventLoopGroup
接口的一些方法墨叛。在這種設(shè)計下止毕,并發(fā)框架保持了良好的可擴展性。
MultithreadEventExecutorGroup
與DefaultEventLoopGroup
的實現(xiàn)非常簡單漠趁,只定義了一些構(gòu)造方法扁凛,不再多說。
EventLoop
DefaultEventLoop
與DefaultEventLoopGroup
一樣棚潦,實現(xiàn)了EventLoop
接口令漂,將線程與NIO連接到了一起。