EventLoop:英譯事件循環(huán)器,感覺起來就是是不停的處理事件
EventLoopGroup:事件循環(huán)器組,感覺就是一大推的EventLoop處理事件润努。那么究竟是怎么回事呢?往下面看看
本篇文章先通過查看EventLoop和EventLoopGroup實現(xiàn)的部分細節(jié)序六,然后分析我們在使用代碼的啟動流程任连,當(dāng)然主要了解NioEventLoop和NioEventLoopGroup
1.EventLoopGoup
? 首先,我們先縱覽一下EventLoopGroup 的類結(jié)構(gòu)圖例诀,如下圖所示:
從上面圖中先分析下EventLoopGroup有哪些功能:
- 首先通過ScheduledExecutorService發(fā)現(xiàn)使用的jdk的api随抠,包含了調(diào)度任務(wù)的定時或某種頻率執(zhí)行任務(wù),
- EventExecutorGoup 除了添加了迭代功能,next()和shutdown的功能等就是繼承ScheduledExecutorService相關(guān)功能了
- EventLoopGroup 主要的功能有next獲取下一個NextLoop,和register注冊服務(wù)繁涂,類似將某一個channel注冊到具體的EventLoop中
EventLoop next();
ChannelFuture register(Channel channel);
然后我們接著分析實現(xiàn)細節(jié)
1.1 AbstractEventExecutorGroup
明顯發(fā)現(xiàn)沒有成員變量拱她,只有部分接口實現(xiàn)∪幼铮可以簡單的發(fā)現(xiàn)全部都是通過next()調(diào)用調(diào)度任務(wù)的方法秉沼,意味著next()就表示線程和線程池。所以接下來就需要看看next的實現(xiàn)了
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
....
1.2 MultithreadEventExecutorGroup
繼續(xù)看成員變量
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children; //持有 多個事件執(zhí)行器數(shù)組
private final Set<EventExecutor> readonlyChildren; //只讀執(zhí)行器列表
private final AtomicInteger terminatedChildren = new AtomicInteger(); //結(jié)束的執(zhí)行器個數(shù)
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser; //執(zhí)行器選擇器
}
核心API看下
//構(gòu)造器中數(shù)據(jù)初始化
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//一個線程對應(yīng)一個eventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//這里存儲具體的引用矿酵,這里是抽象接口留給子類實現(xiàn)
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//初始化不同的選擇器唬复,這里實現(xiàn)會根據(jù)線程的奇偶數(shù)不同返回不同的選擇器,以用來提供執(zhí)行效率
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
//所有的線程都關(guān)閉的狀態(tài)
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
//添加事件監(jiān)聽器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
//由當(dāng)前的線程組生成只讀readOnlyChildren
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
//next方法實現(xiàn)全肮,很明顯是通過上面生成的chooser方法從children數(shù)組中選擇一個執(zhí)行事件執(zhí)行器(線程)
@Override
public EventExecutor next() {
return chooser.next();
}
1.3 MultithreadEventLoopGroup
這個類比較簡單敞咧,繼承了MultithreadEventExecutorGroup的功能,然后實現(xiàn)了EventLoopGroup功能(主要是注冊和next返回eventLoop)辜腺。
主要實現(xiàn)功能有幾點:
- 1) 構(gòu)造默認的線程數(shù)量
- 2)調(diào)用next()返回EventLoop休建,而不是EventExector(EventLoop是子接口)
- 3)調(diào)用next()的register完成注冊功能
-4)仍然將父類留下來的newChild留給子類實現(xiàn)
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
1.4 NioEventLoopGroup
這個類就更加簡單了,除了構(gòu)造方法就看newChild的實現(xiàn)了
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
// 评疗。测砂。。省略
// 主要是線程大小百匆,執(zhí)行器砌些,selector的提供者以及選擇策略,還有拒接處理器
//不過我們平時用的多的就是設(shè)置線程數(shù)量了加匈,拒絕策略寄症,最多還有線程的命名了
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//返回一個NioEventLoop,再次應(yīng)征了EventLoopGroup就是多個EventLoop的組合矩动,每一個EventLoop就是一個線程
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
總結(jié)下:上面的EventLoopGoup功能很明確:
- 1)構(gòu)造一組EventLoop池有巧,初始化chooser,調(diào)用next()返回不同的EventLoop
- 持有調(diào)度功能悲没,不過是通過next()獲取的EventLoop實現(xiàn)的
- 3)含有register功能篮迎,同樣也是需要EventLoop實現(xiàn)
2.EventLoop
EventLoopGroup的分析告訴我們男图,事件處理還是需要我們EventLoop來做具體實現(xiàn),上面Group只是管理EventLoop的創(chuàng)建和調(diào)用甜橱;就是線程池和線程的概念逊笆,只是包裝了一層殼罷了。具體實現(xiàn)的情況繼續(xù)分析:
先看類繼承圖
圖中很明顯的發(fā)現(xiàn)我們的EventLoop接口繼承了EventLoopGroup岂傲,說明我們EventLoop除了擁有上面的所有功能還做了功能增強难裆。
2.1 AbstractEventExecutor
主要看看這個parent成員變量,這個就將EventLoop和EventLoopGroup給聯(lián)系起來了镊掖,EventLoopGroup作為父類被EventLoop給持有
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
//持有父類
private final EventExecutorGroup parent;
//netxt()返回自身乃戈,相對group明顯不同
@Override
public EventExecutor next() {
return this;
}
//通過英文名稱表示當(dāng)前線程 是否處于事件循環(huán)中(線程執(zhí)行)了
@Override
public boolean inEventLoop() {
//具體實現(xiàn)留給子類了
return inEventLoop(Thread.currentThread());
}
2.2 AbstractScheduledEventExecutor
功能很明確主要是Schedule功能,持有一個調(diào)度隊列亩进,
- 對于任務(wù)隊列有crud功能症虑,
- 另外實現(xiàn)了就是調(diào)度任務(wù) 執(zhí)行大體功能了,核心執(zhí)行留給子類實現(xiàn)
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
}
return scheduledTaskQueue;
}
....
//查看調(diào)度任務(wù)
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
}
//執(zhí)行某一個調(diào)度任務(wù)
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
//執(zhí)行調(diào)度任務(wù)归薛,按時間間隔
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (initialDelay < 0) {
throw new IllegalArgumentException(
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
}
if (period <= 0) {
throw new IllegalArgumentException(
String.format("period: %d (expected: > 0)", period));
}
return schedule(new ScheduledFutureTask<Void>(
this, Executors.<Void>callable(command, null),
ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
}
//上面的最終執(zhí)行會交給
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task); //會添加到隊列中
} else {
execute(new Runnable() { //這里交給子類實現(xiàn)
@Override
public void run() { //目的還是運行添加到隊列中
scheduledTaskQueue().add(task);
}
});
}
return task;
}
}
2.3 SingleThreadEventExecutor
這個才是關(guān)鍵核心實現(xiàn)類谍憔,從名稱也可以發(fā)現(xiàn)單線程的事件執(zhí)行器。主要是持有一個線程的應(yīng)用主籍,然后使用當(dāng)前線程的做某些處理习贫,也是最核心的。
先看成員變量千元,可以發(fā)現(xiàn)線程苫昌,線程任務(wù),線程參數(shù)等變量诅炉。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
//線程pending的最大任務(wù)數(shù)量了
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
//線程狀態(tài)了
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final Runnable NOOP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
//原子性處理線程 state值了
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
//原子性處理線程參數(shù)了
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
//任務(wù)隊列
private final Queue<Runnable> taskQueue;
//線程了
private volatile Thread thread;
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;
private final Executor executor;
private volatile boolean interrupted;
private final Semaphore threadLock = new Semaphore(0);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
private final int maxPendingTasks;
private final RejectedExecutionHandler rejectedExecutionHandler;
private long lastExecutionTime;
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;
private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
...
}
//簡單看下構(gòu)造器
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
接著我們再瞧瞧任務(wù)執(zhí)行實現(xiàn):
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop(); //這里就是eventLoop線程和執(zhí)行線程是否同一個
if (inEventLoop) {
addTask(task); //滿足當(dāng)前線程,將任務(wù)放入隊列中
} else {
startThread(); // 開啟一個線程
addTask(task); //然后把任務(wù)添加到后臺任務(wù)與對壘中
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
//開啟線程
private void startThread() {
if (state == ST_NOT_STARTED) {
//原子操作屋厘,保證只啟動一次
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
//更新上次執(zhí)行時間
updateLastExecutionTime();
try {
//抽象接口交給子類實現(xiàn)
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
//hooks 對于關(guān)閉
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
2.4 SingleThreadEventLoop
相對SingleThreadEventExecutor多實現(xiàn)了EventLoop功能涕烧,這個提供parent接口,然后也是持有一個任務(wù)隊列信息汗洒。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private final Queue<Runnable> tailTasks;
.....
}
2.5 NioEventLoop
主要是核心API --> run方法
public final class NioEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return NioEventLoop.super.pendingTasks();
}
};
/**
* The NIO {@link Selector}.
*/
private Selector selector; //核心的選擇器
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector(); //這里打開selector
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
}
如下事NIO邏輯操作议纯,只有NioEventLoop收到退出指令才會退出,否則一直下面方法一直執(zhí)行下去溢谤,這也是NIO線程的執(zhí)行方式瞻凤。
@Override
protected void run() {
for (;;) {
try {
//selectorNowSupplier和hasTasks判斷是否有需要處理的channel
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio; //表示IO的執(zhí)行比例
if (ioRatio == 100) { //IO如果是100,那么全部優(yōu)先執(zhí)行完IO在執(zhí)行調(diào)度任務(wù)
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); //先執(zhí)行IO任務(wù)
} finally {
//這里給出了IO執(zhí)行的時長世杀,然后根據(jù)IO比例計算調(diào)度任務(wù)應(yīng)該執(zhí)行的超時時長
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
IO執(zhí)行流程:主要分析主流程阀参,其他支路沒有分析,如果感興趣可以自己分析下
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
//執(zhí)行這個
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
//之前我們分析channel有提到附件是把自身給當(dāng)做附件的
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); //注銷掉連接操作位
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
//如果還有消息沒有寫出去瞻坝,通過flush全部寫出
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//如果可讀或者可以接受信息這讀取數(shù)據(jù)
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
調(diào)度任務(wù)執(zhí)行:主要分析主流程蛛壳,其他支路沒有分析,如果感興趣可以自己分析下,如下分析的是IO線程執(zhí)行不是100%的情況下
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task); //直接調(diào)用task的run方法進行調(diào)用了
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) { //超時需要返回
break;
}
}
task = pollTask();
if (task == null) { //任務(wù)執(zhí)行完了也返回
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
總結(jié)一下:
EventLoop就是持有一個線程的執(zhí)行器,執(zhí)行的大體邏輯流程由SingleThreadEventExecutor實現(xiàn)衙荐,然后具體的IO流程實現(xiàn)有NioEventLoop來實現(xiàn)捞挥,從而完成IO任務(wù)和調(diào)度任務(wù)。
3.業(yè)務(wù)啟動代碼源碼
我們業(yè)務(wù)代碼一般服務(wù)端會初始化如下兩個group忧吟,這里需要結(jié)合之前提到的Reactor模型砌函,第一個bossGroup負責(zé)處理連接操作,后者處理具體的IO操作溜族。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
我們就看看構(gòu)造NioEventLoopGroup()做了什么事情讹俊?
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// 接著繼續(xù)看,執(zhí)行到了MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//這里就回到我們介紹eventGroup的信息了。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//根據(jù)線程數(shù)量構(gòu)建對應(yīng)的EventExecutor數(shù)目
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}