一耙厚、EventLoop功能概述
上篇我們分析了EventLoopGroup的核心能力,EventLoopGroup具有執(zhí)行任務(wù)岔霸、注冊Channel、執(zhí)行器調(diào)度等能力俯渤。今天我們來看一下EventLoop呆细。我們先來看看EventLoop的類圖關(guān)系:
我們可以看到,EventLoop接口繼承了EventLoopGroup接口八匠。為什么EventLoop要繼承EventLoopGroup呢絮爷?從上一篇的分析,我們知道梨树,EventLoopGroup最主要的功能時對EventLoop進(jìn)行管理調(diào)度坑夯,EventLoopGroup的其他大部分功能,都是交給自己管理的EventLoop來處理的抡四。而EventLoop繼承EventLoopGroup柜蜈,就是為了繼承EventLoopGroup任務(wù)執(zhí)行仗谆、優(yōu)雅停機(jī)、Channel注冊等功能窗口淑履。
除了繼承EventLoopGroup之外隶垮,EventLoop還繼承了EventExecutor接口。我們可以看一下EventExecutor的具體內(nèi)容:
/**
* The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
* with some handy methods to see if a {@link Thread} is executed in a event loop.
* Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
* way to access methods.
*
*/
public interface EventExecutor extends EventExecutorGroup {
/**
* Returns a reference to itself.
*/
@Override
EventExecutor next();
/**
* Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
*/
EventExecutorGroup parent();
/**
* Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
*/
boolean inEventLoop();
/**
* Return {@code true} if the given {@link Thread} is executed in the event loop,
* {@code false} otherwise.
*/
boolean inEventLoop(Thread thread);
/**
* Return a new {@link Promise}.
*/
<V> Promise<V> newPromise();
/**
* Create a new {@link ProgressivePromise}.
*/
<V> ProgressivePromise<V> newProgressivePromise();
/**
* Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
* will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newSucceededFuture(V result);
/**
* Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
* will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
* every call of blocking methods will just return without blocking.
*/
<V> Future<V> newFailedFuture(Throwable cause);
}
從接口的頭部注釋我們可以看到秘噪,EventExecutor是一個特殊的EventExecutorGroup狸吞,它提供了一些易用的方法來判斷一個線程是否正在事件循環(huán)中執(zhí)行。至于EventExecutorGroup我們上一篇分析過這個接口的能力指煎,這里就不再贅述了蹋偏。我們看一看EventExecutor的幾個重要的的方法:
首先是EventExecutorGroup parent();
方法,EventExecutor只有事件執(zhí)行的能力至壤,沒有調(diào)度的能力威始,所以這個方法只會返回對象自身。
然后是兩個重載的inEventLoop
方法崇渗,用來判斷線程是否正在事件循環(huán)中執(zhí)行字逗。
隨后是兩個創(chuàng)建Promise的方法,關(guān)于Promise的作用宅广,大家不清楚的可以查一下相關(guān)資料葫掉,內(nèi)部具體實現(xiàn)我們在后面的文章中再做分析。
最后跟狱,是一對創(chuàng)建Future的方法俭厚,我們從注釋中可以看到這兩個方法的作用,就是創(chuàng)建一個已經(jīng)被標(biāo)記成成功/失敗的Future對象驶臊。所有已經(jīng)注冊的FutureListener都會被直接通知挪挤。所有的阻塞方法都會非阻塞的返回。
我們的EventLoop繼承了OrderedEventExecutor关翎,而OrderedEventExecutor直接繼承了EventExecutor扛门,本身并無定義其他方法。但是我們可以從OrderedEventExecutor的頭部注釋中看到纵寝,OrderedEventExecutor其實是一個標(biāo)記接口论寨,這個接口保證所有執(zhí)行的任務(wù)必須按照順序執(zhí)行,并且要串行執(zhí)行爽茴!所以我們可以相信葬凳,實現(xiàn)了OrderedEventExecutor的類,執(zhí)行任務(wù)的時候回保證任務(wù)執(zhí)行的順序性室奏,并且同一時刻只能執(zhí)行一個任務(wù)火焰。
到這里,我們可以知道胧沫,EventLoop的核心能力:EventLoop是一個可以優(yōu)雅停機(jī)的任務(wù)執(zhí)行器昌简,它能保證提交的任務(wù)都被順序串行執(zhí)行占业。接下來我們根據(jù)EventLoop的一個具體實現(xiàn)類NioEventLoop來更直觀的理解一下EventLoop的能力。
從NioEventLoop來看EventLoop在netty中扮演的角色
首先我們先看一看NioEventLoop的構(gòu)造方法:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
我們一路跟蹤江场,會發(fā)現(xiàn)纺酸,這個構(gòu)造方法調(diào)用了父類SingleThreadEventExecutor的構(gòu)造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
我們可以看到這里面有一行this.executor = ThreadExecutorMap.apply(executor, this);
。這個構(gòu)造方法傳入的executor
參數(shù)就是我們上節(jié)提到過的NioEventLoopGrop在創(chuàng)建NioEventLoop時傳入的ThreadPerTaskExecutor對象址否。這里在給成員變量賦值的時候調(diào)用了ThreadExecutorMap.apply(executor, this)
餐蔬,我們可以看一下這里面的具體內(nèi)容:
//ThreadExecutorMap類的相關(guān)內(nèi)容
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
我們可以看到,Executor apply(final Executor executor, final EventExecutor eventExecutor)
重新創(chuàng)建了一個Executor對象佑附,這個對象執(zhí)行任務(wù)還是調(diào)用參數(shù)傳入的Executor 來執(zhí)行樊诺,只不過是在傳入的任務(wù)中做了一個靜態(tài)代理,在任務(wù)執(zhí)行的前后分別將執(zhí)行此任務(wù)的EventExecutor綁定音同、解綁到自身持有的一個FastThreadLocal中词爬。這里的FastThreadLocal是netty自己實現(xiàn)的一個處理線程單例的工具,這個FastThreadLocal究竟比我們jdk中的ThreadLocal快在哪里呢权均?我們把這個類的set方法拿出來看一下(在此之前你必須要知道jdkThreadLoop的實現(xiàn)原理):
//FastThreadLocal的set方法
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
InternalThreadLocalMap的get()方法:
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
我們可以看到這個FastThreadLocal在獲取Map的時候會判斷當(dāng)前的線程是否是FastThreadLocalThread的對象顿膨,是的話就調(diào)用fastGet(FastThreadLocalThread thread)
方法獲取InternalThreadLocalMap(不存在就創(chuàng)建);如果不是FastThreadLocalThread的對象叽赊,就調(diào)用slowGet()
獲取恋沃,獲取邏輯是從一個靜態(tài)的ThreadLocal對象中獲取當(dāng)前線程綁定的InternalThreadLocalMap對象,沒有的話就創(chuàng)建一個必指。在獲取到InternalThreadLocalMap的對象后囊咏,怎么向里面賦值呢?我們可以看一下FastThreadLocal中的set
方法賦值的真正邏輯:
// FastThreadLocal的set方法
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
//index是FastThreadLocal維護(hù)的一個索引對象
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
}
}
// InternalThreadLocalMap的方法
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
我們可以看到塔橡,其實InternalThreadLocalMap內(nèi)部是一個數(shù)組梅割,每個FastThreadLocal都記錄了自身維護(hù)的線程單例的對象再數(shù)組中的位置,即index
這個成員變量葛家。這個index
的值是在FastThreadLocal初始化的時候從InternalThreadLocalMap內(nèi)部的一個靜態(tài)遞增變量處獲取的户辞。 InternalThreadLocalMap這種方式和jdk內(nèi)部的ThreadLocalMap使用散列表的方式存儲對象相比,優(yōu)點(diǎn)是:獲取和設(shè)置線程單例對象的時候癞谒,少了hash值計算這一步咆课,并且沒有hash沖撞的情況發(fā)生。這一點(diǎn)相比ThreadLocalMap*的確性能會有所提升扯俱。這也是netty對性能優(yōu)化的一方面體現(xiàn),后面我們還會看到好多在細(xì)節(jié)上的優(yōu)化喇澡。
我們花了很大的篇幅分析了NioEventLoop的構(gòu)造方法迅栅,目的就是為了讓大家看到netty對性能的優(yōu)化都是落到很多細(xì)節(jié)上的。下面我們繼續(xù)分析NioEventLoop構(gòu)造方法的剩余內(nèi)容晴玖,接下來我們會看到netty的另一個優(yōu)化读存,在此之前大家要熟悉Java的NIO为流,不然接下來內(nèi)容肯定是看不懂的!
我們可以看到NioEventLoop有下面幾個成員變量:
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
我們在NioEventLoop構(gòu)造方法中可以看到對這幾個成員變量的初始化過程让簿。
首先敬察,我們可以看到,構(gòu)造方法中通過openSelector()
方法生成了一個SelectorTuple的對象尔当,然后將SelectorTuple中的selector
和unwrappedSelector
賦值給NioEventLoop的隊形屬性莲祸。我們可以看一下openSelector()的內(nèi)容:
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
//創(chuàng)建一個Selector對象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
//禁止優(yōu)化選型,如果選擇禁止優(yōu)化椭迎,就直接創(chuàng)建一個SelectorTuple對象返回
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
//下面是優(yōu)化的開始
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//通過反射獲取Selector的相關(guān)Field
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
//通過反射設(shè)置Selector對應(yīng)的屬性的值
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
我們可以看到锐帜,整個openSelector()
方法做的事情就是:判斷參數(shù)是否允許相關(guān)優(yōu)化,如果允許優(yōu)化畜号,就將創(chuàng)建的Selector的對象的兩個屬性:selectedKeys
缴阎、publicSelectedKeys
重寫為:SelectedSelectionKeySet對象。關(guān)于selectedKeys
简软、publicSelectedKeys
蛮拔,大家可以看一看Selector的API,這里不再贅述痹升。這里為什么要對這兩個屬性重新賦值呢建炫?為什么重新賦值了就是優(yōu)化了呢?我們先來看一下這兩個屬性在Selector中是什么:
//SelectorImpl的部分代碼
protected Set<SelectionKey> selectedKeys = new HashSet();
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
我們可以看到视卢,原來的selectedKeys
和publicSelectedKeys
歸根結(jié)底都是HashSet踱卵。而替換成的SelectedSelectionKeySet又是什么呢?我們來看一下:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public int size() {
return size;
}
@Override
public Iterator<SelectionKey> iterator() {
//省略
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
private void increaseCapacity() {
//省略
}
}
我們可以看到据过,SelectedSelectionKeySet繼承了AbstractSet惋砂,但是它內(nèi)部實現(xiàn)壓根不能算是一個Set,因為它的add方法沒有保證元素在集合中唯一的相關(guān)實現(xiàn)绳锅!為什么要這么做呢西饵?我們不妨先看一下jdk中對selectedKeys
這個集合添加元素的相關(guān)邏輯,由于沒有源碼鳞芙,只能看到變量名是var這種定義眷柔,不過不影響我們對邏輯的理解:
if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
if (var9.clearedCount != var1) {
if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
} else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
var9.updateCount = var1;
++var6;
}
var9.clearedCount = var1;
} else {
if (var9.clearedCount != var1) {
var10.channel.translateAndSetReadyOps(var4, var10);
if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
} else {
var10.channel.translateAndUpdateReadyOps(var4, var10);
if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
WindowsSelectorImpl.this.selectedKeys.add(var10);
var9.updateCount = var1;
++var6;
}
}
var9.clearedCount = var1;
}
我們可以看到,在把元素添加到selectedKeys
之前原朝,會判斷selectedKeys
是否已經(jīng)包含了這個元素驯嘱,包含的話就操作已經(jīng)在就不再進(jìn)行添加操作,不包含的時候才進(jìn)行添加操作喳坠。而SelectedSelectionKeySet的判斷是否存在指定元素的方法始終返回false鞠评,也就意味著,selectedKeys
會被添加重復(fù)的SelectionKey
對象壕鹉。添加重復(fù)的SelectionKey
對象會有什么影響呢剃幌?在netty中對準(zhǔn)備就緒的SelectionKey
做處理之前聋涨,都會判斷SelectionKey
對象就緒的狀態(tài),處理完該事件之后负乡,會把SelectionKey
對象的就緒狀態(tài)移除牍白。所以如果重復(fù)添加SelectionKey
對象,在這里是不會有任何影響的抖棘!那這種用數(shù)組直接替代HashMap的操作有什么好處呢茂腥?首先,我們看钉答,NioEventLoop繼承了SingleThreadEventLoop础芍,我們可以猜出,NioEventLoop是單線程操作selectedKeys
的数尿。單線程操作數(shù)組有什么好處呢仑性?單線程操作可以充分利用CPU的高速緩存,避免偽共享的發(fā)生右蹦!并且netty的處理selectedKeys
時诊杆,只會在處理完所有的就緒的SelectionKey
清空數(shù)組,之后再次調(diào)用select方法何陆。所以不存在添加時找空槽的情況晨汹,只要順序的往數(shù)組里面加元素就可以了!這種操作比HashMap添加贷盲、刪除操作性能要高太多(做了一個小的測試淘这,從容量為10000的數(shù)組和HashMap中刪除元素,HashMap耗時大概是數(shù)組的十倍左右)巩剖。
我們花了大量的篇幅分析了EventLoop的構(gòu)造方法铝穷。這里主要是想讓大家看到netty對性能的優(yōu)化真的無處不在!而且是千方百計的去優(yōu)化佳魔!這也是netty被廣泛應(yīng)用的原因曙聂。包括好多高性能高吞吐的中間件也使用了netty做通信,比如RocketMQ鞠鲜、Spark宁脊。而我們在分析類似netty這種高性能框架的源碼時,一定要注意到這些優(yōu)化細(xì)節(jié)贤姆,這樣我們才能清楚這些框架哪里好榆苞,才能知道怎么樣才能正確的使用這些框架來充分發(fā)揮它們的優(yōu)勢!
我們繼續(xù)看NioEventLoop的主要邏輯霞捡,接下來我們看一下run()
方法:
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
run()
方法是父類SingleThreadEventExecutor的模板方法的實現(xiàn)坐漏。我們可以看到,run()
方法就是一個不斷的循環(huán),在循環(huán)內(nèi)做了什么操作呢仙畦?首先,先調(diào)用selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
來獲取select策略音婶。我們先來看一下SelectStrategy這個接口:
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
* 接下來要執(zhí)行阻塞的select操作
*/
int SELECT = -1;
/**
* IO循環(huán)應(yīng)該被重試慨畸,非阻塞select接下來會被直接執(zhí)行
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* 接下來不要阻塞獲取新的事件IO循環(huán)
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
SelectStrategy提供了三種默認(rèn)的select策略,即SELECT衣式、CONTINUE寸士、BUSY_WAIT。netty中實現(xiàn)了一個默認(rèn)的DefaultSelectStrategy碴卧,它的計算select策略的方式是:
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果當(dāng)前EventLoop任務(wù)隊列中沒有任務(wù)弱卡,就執(zhí)行SELECT策略,即阻塞的select住册。如果有的話婶博,就返回當(dāng)前NioEventLoop中持有的Selector對象的selectNow()
方法的返回值,就緒的IO事件的數(shù)量荧飞。也就是不選擇任何select模式凡人。這個過程中其實已經(jīng)執(zhí)行了一次非阻塞的selectNow操作
:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
在獲取到需要執(zhí)行的IO select策略后,就選擇執(zhí)行具體的內(nèi)容叹阔,我們可以看到挠轴,CONTINUE對應(yīng)的執(zhí)行方法就是不執(zhí)行接下來的邏輯,重新執(zhí)行select策略的選擇耳幢。而NIO不支持忙等操作岸晦,所以BUSY_WAIT的邏輯和SELECT的邏輯是一致性的,都調(diào)用了select(wakenUp.getAndSet(false));
方法睛藻。這里启上,我們先要清楚wakenUp這個成員變量的含義,我們先看一下這塊內(nèi)容:
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
wekenUp的含義是:控制阻塞的Selector.select在執(zhí)行的過程中修档,是否允許被打斷梯影。在使用Selector.select的過程中攀痊,select方法會被設(shè)置超時時間,設(shè)置wekenUp為ture時,Selector.select超時后不會繼續(xù)重新再次被調(diào)用叹坦。
清楚了wekenUp這個參數(shù)的含義后,我們看一下NioEventLoop的具體select
操作是什么邏輯:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//計算出select阻塞時間的最后截止時間荣倾,這個時間計算的方式是當(dāng)前時間加上提交到當(dāng)前EventLoop中的最近需要執(zhí)行的定時任務(wù)的延遲時間
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 計算出select的阻塞時間疤估,加500000L是為了始終進(jìn)位。如果整個select操作執(zhí)行的時間超過了selectDeadLineNanos照激,整個方法就結(jié)束
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 如果任務(wù)被添加進(jìn)來发魄,并且任務(wù)中想要調(diào)用Selector#wakeup方法讓Selector提前從阻塞的select方法中返回的話,如果不執(zhí)行下面操作,就實現(xiàn)不了這個效果励幼,只能等Selector的select方法阻塞timeoutMillis時間后返回汰寓。
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
我們來分析一下這段代碼的邏輯(在此之前,我們先要清楚苹粟,理論上只有當(dāng)當(dāng)前EventLoop的任務(wù)隊列中沒有任務(wù)的時候才會調(diào)用select
這個方法SelectStrategy中的邏輯有滑,只有hasTask()是false的時候才返回SELECT,在在調(diào)用EventLoop的select方法之前嵌削,wakenUp會被設(shè)置成false)毛好。首先,計算出select操作最長的阻塞時間timeoutMillis
苛秕。然后判斷hasTasks()的返回值肌访,即EventLoop中是否有添加的任務(wù),如果有的話就說明我們在之前的SelectStrategy選擇select策略之后艇劫,又有新的任務(wù)添加進(jìn)來了吼驶,這個時候為了防止新添加的任務(wù)要等到select操作阻塞完成才有機(jī)會執(zhí)行,就做了一個判斷:當(dāng)前的wekenUp如果是false港准,就設(shè)置成ture旨剥,然后執(zhí)行一個非阻塞的selector.selectNow后跳出NioEventLoop.select;否則就繼續(xù)執(zhí)行接下來的邏輯浅缸。也就是執(zhí)行Selector.select阻塞操作轨帜。selector.selectNow方法結(jié)束后會判斷,是否有就緒的IO事件衩椒,當(dāng)一下情況滿足任意一條就跳出循環(huán)結(jié)束EventLoop.select方法:有就緒的IO事件蚌父、wakenUp在NioEventLoop.select調(diào)用之前是true、當(dāng)前EventLoop有提交的立即執(zhí)行的任務(wù)毛萌、當(dāng)前EventLoop中有提交的定時執(zhí)行的任務(wù)苟弛。如果不滿足任意情況,就判斷是否當(dāng)前線程有中斷狀態(tài)阁将,有的話也跳出循環(huán)膏秫。最后判斷循環(huán)的總時間是否大于設(shè)置的Selector.select的超時時間,判斷Selector.select是不是因為超時而結(jié)束做盅。如果是因為超時而結(jié)束就將selectCnt設(shè)置為1缤削,繼續(xù)循環(huán);不是的話就判斷循環(huán)的次數(shù)是否大于SELECTOR_AUTO_REBUILD_THRESHOLD吹榴,是的話就跳出循環(huán)亭敢,這塊是為了解決jdk6的關(guān)于NIO的bug,我們可以先不用管這個邏輯图筹。到此整個NioEventLoop.select的過程就結(jié)束了帅刀。這個過程看起來非常亂让腹,我們要弄清楚整個流程,首先要先明白wakenUp這個屬性的生命周期扣溺。我們可以看到骇窍,wakenUp這個屬性提供給外部的修改窗口只有一個:
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
這個方法是protected修飾的,也就是說锥余,這個方法是不提供其他包調(diào)用的像鸡,所以這個方法是一個netty內(nèi)部的調(diào)用方法,我們可以搜索到這個方法在哪里使用:
我們可以看到哈恰,這個方法主要是在停機(jī)的時候調(diào)用的。為的就是在停機(jī)的時候?qū)elector.select從阻塞中喚醒志群。
細(xì)心地朋友也許會發(fā)現(xiàn)着绷,
NioEventLoop.select
方法在調(diào)用之前,會把wakenUp
設(shè)置為false锌云,這是為什么呢荠医?為的就是在外部調(diào)用NioEventLoop.wakeup
方法的時候wakenUp.compareAndSet(false, true)
這個會設(shè)置成功,然后可以調(diào)用selector.wakeup()將Selector喚醒桑涎。
到這里彬向,我們再回過頭去看NioEventLoop.select方法,這個方法的作用其實就是:調(diào)用Selector.select方法來阻塞地獲取就緒的IO事件攻冷,并且在任何時候都可以響應(yīng)weakUp操作娃胆,如果NioEventLoop中添加定時任務(wù),NioEventLoop.select會執(zhí)行的時間最多就是到最近定時任務(wù)執(zhí)行的時間等曼,沒有定時任務(wù)就最多執(zhí)行1s里烦。這樣去理解是不是簡單多了!=胁黑!
細(xì)心的朋友可能會問:為什么要限制NioEventLoop.select的執(zhí)行時間最長到下一個定時任務(wù)執(zhí)行的時間呢?我們先帶著疑問繼續(xù)往下看NioEventLoop.run方法州泊。
在結(jié)束了select操作之后丧蘸,繼續(xù)判斷一下wakenUp的標(biāo)志,如果設(shè)置為ture遥皂,就調(diào)用selector.wakeup();
使下一次的selector.select
非阻塞力喷。
隨后會獲取當(dāng)前的ioRatio
,我們之前提過這個參數(shù)渴肉,這個參數(shù)是設(shè)置我們的IO操作在整個事件執(zhí)行中的時間占比的冗懦,我們看一下下面的具體邏輯。首先仇祭,會判斷ioRatio
是不是設(shè)置100披蕉,如果是設(shè)置百分之百,就先執(zhí)行processSelectedKeys()
,再執(zhí)行runAllTasks()
没讲,不設(shè)置事件占比限制眯娱。如果ioRatio
不是100,就先執(zhí)行processSelectedKeys()
爬凑,并且記錄下processSelectedKeys()
的執(zhí)行時間徙缴,然后計算出剩余時間,使用這個剩余時間來限制runAllTasks
方法嘁信。這兩個方法就是干什么的呢于样?這里我先給出答案:processSelectedKeys()
的作用是處理所有的就緒的selectedKeys
,也就是就緒的IO事件潘靖;而runAllTasks
這兩個重載方法就是執(zhí)行所有的提交任務(wù)穿剖。到這里,我們可以明白為什么要限制NioEventLoop.select
的執(zhí)行時間最長到下一個定時任務(wù)開始執(zhí)行的時間了卦溢。因為IO處理和任務(wù)執(zhí)行是在一個線程里執(zhí)行的糊余,如果不限制NioEventLoop.select
的執(zhí)行時間,到達(dá)下一個定時任務(wù)需要執(zhí)行的時間的時候单寂,有可能整個線程還阻塞在select方法上贬芥!
接下來我們繼續(xù)分析processSelectedKeys()
和runAllTasks
分別是怎么處理IO事件和提交的任務(wù)的。我們先來看一下processSelectedKeys()
:
private void processSelectedKeys() {
//判斷是否使用了優(yōu)化過的Selector宣决,是的話就循環(huán)數(shù)組蘸劈,不是的話就循環(huán)iterator。
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
首先方法會判斷我們是否使用數(shù)組來替代Map優(yōu)化Selector尊沸,這個是我們上邊分析NioEventLoop的構(gòu)造方法講的昵时。我們這里只看優(yōu)化的方法,其實兩個邏輯都是一樣的椒丧,只是循環(huán)的方法不一樣壹甥。整個執(zhí)行過程就是遍歷就緒的SelectionKey。然后交給processSelectedKey
的兩個重載方法去處理壶熏。這里會根據(jù)SelectionKey的attachment對象的類型來判斷調(diào)用哪個重載方法句柠。我們先不用管這個attachment對象是什么時候被添加的,這個會在我們只會的分析中講到棒假,我們先來看一下這兩個方法的邏輯:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
:我們方法開始的驗證邏輯先不看溯职,主要看下面的事件就緒的邏輯。首先帽哑,會獲取就緒的事件谜酒,判斷就緒的事件中是否包含連接事件,如果包含妻枕,就將當(dāng)前
SelectionKey的連接就緒事件從SelectionKey的感興趣的事件中剔除掉僻族,然后將就緒事件交給就緒的AbstractNioChannel的unsafe去處理粘驰,調(diào)用的是unsafe.finishConnect()方法。具體處理邏輯我們本篇不做分析述么。然后就是去判斷就緒的事件中是否包含了寫就緒蝌数、讀就緒、ACCEPT事件度秘,包含的話都是委托給unsafe的對應(yīng)方法顶伞。
processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task)
:這個方法很簡單,就是執(zhí)行NioTask的channelReady
方法剑梳,如果執(zhí)行失敗了唆貌,就執(zhí)行channelUnregistered
方法。我們這里可以猜測NioTask是一個IO就緒事件的回掉方法垢乙。
IO就緒事件的處理邏輯很簡單挠锥,我們接下里看一下提交任務(wù)的處理邏輯,我們只看可以設(shè)置超時時間的重載方法protected boolean runAllTasks(long timeoutNanos)
:
protected boolean runAllTasks(long timeoutNanos) {
//把定時任務(wù)隊列里到達(dá)執(zhí)行時間的任務(wù)添加到非定時任務(wù)隊列
fetchFromScheduledTaskQueue();
//從非定時任務(wù)隊列獲取任務(wù)
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
首先侨赡,將到達(dá)執(zhí)行時間的定時任務(wù)添加到非定時任務(wù)的執(zhí)行列表中,然后從費(fèi)定時任務(wù)列表中獲取任務(wù)粱侣,沒有的話就執(zhí)行afterRunningAllTasks();
羊壹,這是一個開放方法,我們這里先不看具體內(nèi)容齐婴。如果有任務(wù)油猫,就加入循環(huán)中,循環(huán)的內(nèi)容就是:調(diào)用safeExecute
來執(zhí)行任務(wù)柠偶,其實就是在try-cache中執(zhí)行任務(wù)情妖,防止有異常終止。然后已經(jīng)執(zhí)行的方法計數(shù)加以诱担,判斷調(diào)用runAllTasks
執(zhí)行的任務(wù)個數(shù)和0x3F的與是不是0毡证,也就是是不是64的倍數(shù)。如果是就檢查任務(wù)執(zhí)行的時間有沒有超過設(shè)置的超時時間蔫仙,超過了就結(jié)束循環(huán)料睛,然后調(diào)用afterRunningAllTasks();
。沒有超時的話就繼續(xù)獲取任務(wù)摇邦。這個邏輯也比較簡單恤煞。
分析到這里我們就把NioEventLoop.run
方法分析完了。run方法的作用用一句話概括就是處理就緒的IO事件和提交的任務(wù)施籍。那么問題來了居扒,這個run方法在什么時候被調(diào)用呢?我們一路跟著調(diào)用鏈尋找會發(fā)現(xiàn)丑慎,在NioEventLoop父類SingleThreadEventExecutor的execute(Runnable task)
方法被調(diào)用的時候就調(diào)用了run()
方法喜喂。當(dāng)然run()
方法是一個重載方法瓤摧,我們上面分析的是NioEventLoop
的實現(xiàn)。
這里我們的NioEventLoop的關(guān)鍵代碼分析就基本上結(jié)束了夜惭。
三姻灶、復(fù)盤
本篇我們分析了NioEventLoop,NioEventLoop除了可以執(zhí)行提交的任務(wù)之外诈茧,還可以監(jiān)聽注冊的Channel的IO事件产喉,并且可以根據(jù)ioRatio來控制兩者執(zhí)行的時間占比。這都是通過它的run()
方法來執(zhí)行的敢会。
那么曾沈,NioEventLoop在netty中的定位也顯而易見了:真正的任務(wù)執(zhí)行者。在EventLoop的基礎(chǔ)上鸥昏,netty實現(xiàn)了一個抽象類SingleThreadEventLoop塞俱,SingleThreadEventLoop還繼承了SingleThreadEventExecutor,這就使SingleThreadEventLoop具有一個開放性的模板方法:run()
方法吏垮,我們可以通過run()
來實現(xiàn)自己的任務(wù)處理邏輯障涯。而NioEventLoop就是通過實現(xiàn)run()
方法來定制自己可以同時處理提交的任務(wù)和就緒的IO事件的能力。
下篇膳汪,我們會分析唯蝶,netty是怎么將各個組件串聯(lián)起來的。