Netty之EventLoop

EventLoop:英譯事件循環(huán)器,感覺起來就是是不停的處理事件
EventLoopGroup:事件循環(huán)器組,感覺就是一大推的EventLoop處理事件润努。那么究竟是怎么回事呢?往下面看看

本篇文章先通過查看EventLoop和EventLoopGroup實現(xiàn)的部分細節(jié)序六,然后分析我們在使用代碼的啟動流程任连,當(dāng)然主要了解NioEventLoop和NioEventLoopGroup

1.EventLoopGoup

? 首先,我們先縱覽一下EventLoopGroup 的類結(jié)構(gòu)圖例诀,如下圖所示:

image

從上面圖中先分析下EventLoopGroup有哪些功能:

  • 首先通過ScheduledExecutorService發(fā)現(xiàn)使用的jdk的api随抠,包含了調(diào)度任務(wù)的定時或某種頻率執(zhí)行任務(wù),
  • EventExecutorGoup 除了添加了迭代功能,next()和shutdown的功能等就是繼承ScheduledExecutorService相關(guān)功能了
  • EventLoopGroup 主要的功能有next獲取下一個NextLoop,和register注冊服務(wù)繁涂,類似將某一個channel注冊到具體的EventLoop中
   EventLoop next();
   ChannelFuture register(Channel channel);

然后我們接著分析實現(xiàn)細節(jié)

1.1 AbstractEventExecutorGroup

明顯發(fā)現(xiàn)沒有成員變量拱她,只有部分接口實現(xiàn)∪幼铮可以簡單的發(fā)現(xiàn)全部都是通過next()調(diào)用調(diào)度任務(wù)的方法秉沼,意味著next()就表示線程和線程池。所以接下來就需要看看next的實現(xiàn)了

public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
    @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }
....

1.2 MultithreadEventExecutorGroup

繼續(xù)看成員變量

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final EventExecutor[] children;   //持有 多個事件執(zhí)行器數(shù)組
    private final Set<EventExecutor> readonlyChildren;  //只讀執(zhí)行器列表
    private final AtomicInteger terminatedChildren = new AtomicInteger();  //結(jié)束的執(zhí)行器個數(shù)
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;    //執(zhí)行器選擇器
}

核心API看下

    //構(gòu)造器中數(shù)據(jù)初始化
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //一個線程對應(yīng)一個eventExecutor
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
              //這里存儲具體的引用矿酵,這里是抽象接口留給子類實現(xiàn)
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        //初始化不同的選擇器唬复,這里實現(xiàn)會根據(jù)線程的奇偶數(shù)不同返回不同的選擇器,以用來提供執(zhí)行效率
        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                //所有的線程都關(guān)閉的狀態(tài)
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        //添加事件監(jiān)聽器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

         //由當(dāng)前的線程組生成只讀readOnlyChildren
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

    //next方法實現(xiàn)全肮,很明顯是通過上面生成的chooser方法從children數(shù)組中選擇一個執(zhí)行事件執(zhí)行器(線程)
    @Override
    public EventExecutor next() {
        return chooser.next();
    }

1.3 MultithreadEventLoopGroup

這個類比較簡單敞咧,繼承了MultithreadEventExecutorGroup的功能,然后實現(xiàn)了EventLoopGroup功能(主要是注冊和next返回eventLoop)辜腺。
主要實現(xiàn)功能有幾點:

  • 1) 構(gòu)造默認的線程數(shù)量
  • 2)調(diào)用next()返回EventLoop休建,而不是EventExector(EventLoop是子接口)
  • 3)調(diào)用next()的register完成注冊功能
    -4)仍然將父類留下來的newChild留給子類實現(xiàn)
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
  protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

1.4 NioEventLoopGroup

這個類就更加簡單了,除了構(gòu)造方法就看newChild的實現(xiàn)了

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

// 评疗。测砂。。省略
// 主要是線程大小百匆,執(zhí)行器砌些,selector的提供者以及選擇策略,還有拒接處理器
//不過我們平時用的多的就是設(shè)置線程數(shù)量了加匈,拒絕策略寄症,最多還有線程的命名了
 public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
      //返回一個NioEventLoop,再次應(yīng)征了EventLoopGroup就是多個EventLoop的組合矩动,每一個EventLoop就是一個線程
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

總結(jié)下:上面的EventLoopGoup功能很明確:

  • 1)構(gòu)造一組EventLoop池有巧,初始化chooser,調(diào)用next()返回不同的EventLoop
    1. 持有調(diào)度功能悲没,不過是通過next()獲取的EventLoop實現(xiàn)的
  • 3)含有register功能篮迎,同樣也是需要EventLoop實現(xiàn)

2.EventLoop

EventLoopGroup的分析告訴我們男图,事件處理還是需要我們EventLoop來做具體實現(xiàn),上面Group只是管理EventLoop的創(chuàng)建和調(diào)用甜橱;就是線程池和線程的概念逊笆,只是包裝了一層殼罷了。具體實現(xiàn)的情況繼續(xù)分析:

先看類繼承圖


image

圖中很明顯的發(fā)現(xiàn)我們的EventLoop接口繼承了EventLoopGroup岂傲,說明我們EventLoop除了擁有上面的所有功能還做了功能增強难裆。

2.1 AbstractEventExecutor

主要看看這個parent成員變量,這個就將EventLoop和EventLoopGroup給聯(lián)系起來了镊掖,EventLoopGroup作為父類被EventLoop給持有

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

    static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
    static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
    //持有父類
    private final EventExecutorGroup parent;

    //netxt()返回自身乃戈,相對group明顯不同
    @Override
    public EventExecutor next() {
        return this;
    }
    //通過英文名稱表示當(dāng)前線程 是否處于事件循環(huán)中(線程執(zhí)行)了
    @Override
    public boolean inEventLoop() {
      //具體實現(xiàn)留給子類了
        return inEventLoop(Thread.currentThread());
    }

2.2 AbstractScheduledEventExecutor

功能很明確主要是Schedule功能,持有一個調(diào)度隊列亩进,

  • 對于任務(wù)隊列有crud功能症虑,
  • 另外實現(xiàn)了就是調(diào)度任務(wù) 執(zhí)行大體功能了,核心執(zhí)行留給子類實現(xiàn)
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
        if (scheduledTaskQueue == null) {
            scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
        }
        return scheduledTaskQueue;
    }
  ....

  //查看調(diào)度任務(wù)
   final ScheduledFutureTask<?> peekScheduledTask() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        if (scheduledTaskQueue == null) {
            return null;
        }
        return scheduledTaskQueue.peek();
    }

     //執(zhí)行某一個調(diào)度任務(wù)
    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            delay = 0;
        }
        return schedule(new ScheduledFutureTask<Void>(
                this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }

    //執(zhí)行調(diào)度任務(wù)归薛,按時間間隔
   @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(unit, "unit");
        if (initialDelay < 0) {
            throw new IllegalArgumentException(
                    String.format("initialDelay: %d (expected: >= 0)", initialDelay));
        }
        if (period <= 0) {
            throw new IllegalArgumentException(
                    String.format("period: %d (expected: > 0)", period));
        }

        return schedule(new ScheduledFutureTask<Void>(
                this, Executors.<Void>callable(command, null),
                ScheduledFutureTask.deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
    }


    //上面的最終執(zhí)行會交給
  <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {  
            scheduledTaskQueue().add(task);   //會添加到隊列中
        } else {
            execute(new Runnable() {   //這里交給子類實現(xiàn)
                @Override
                public void run() {  //目的還是運行添加到隊列中
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }
}

2.3 SingleThreadEventExecutor

這個才是關(guān)鍵核心實現(xiàn)類谍憔,從名稱也可以發(fā)現(xiàn)單線程的事件執(zhí)行器。主要是持有一個線程的應(yīng)用主籍,然后使用當(dāng)前線程的做某些處理习贫,也是最核心的。

先看成員變量千元,可以發(fā)現(xiàn)線程苫昌,線程任務(wù),線程參數(shù)等變量诅炉。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //線程pending的最大任務(wù)數(shù)量了
    static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

    private static final InternalLogger logger =
            InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
    
   //線程狀態(tài)了
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

    private static final Runnable WAKEUP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    private static final Runnable NOOP_TASK = new Runnable() {
        @Override
        public void run() {
            // Do nothing.
        }
    };
    //原子性處理線程 state值了
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    //原子性處理線程參數(shù)了
    private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
    //任務(wù)隊列
    private final Queue<Runnable> taskQueue;
   //線程了
    private volatile Thread thread;
    @SuppressWarnings("unused")
    private volatile ThreadProperties threadProperties;
    private final Executor executor;
    private volatile boolean interrupted;

    private final Semaphore threadLock = new Semaphore(0);
    private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
    private final boolean addTaskWakesUp;
    private final int maxPendingTasks;
    private final RejectedExecutionHandler rejectedExecutionHandler;

    private long lastExecutionTime;

    @SuppressWarnings({ "FieldMayBeFinal", "unused" })
    private volatile int state = ST_NOT_STARTED;

    private volatile long gracefulShutdownQuietPeriod;
    private volatile long gracefulShutdownTimeout;
    private long gracefulShutdownStartTime;

    private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
...
}
    //簡單看下構(gòu)造器
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

接著我們再瞧瞧任務(wù)執(zhí)行實現(xiàn):

   @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();  //這里就是eventLoop線程和執(zhí)行線程是否同一個
        if (inEventLoop) {
            addTask(task);  //滿足當(dāng)前線程,將任務(wù)放入隊列中
        } else {
            startThread(); // 開啟一個線程
            addTask(task); //然后把任務(wù)添加到后臺任務(wù)與對壘中
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }


  @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
   //開啟線程
   private void startThread() {
        if (state == ST_NOT_STARTED) {
            //原子操作屋厘,保證只啟動一次
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

 private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                //更新上次執(zhí)行時間
                updateLastExecutionTime();
                try {
                    //抽象接口交給子類實現(xiàn)
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            //hooks 對于關(guān)閉
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

2.4 SingleThreadEventLoop

相對SingleThreadEventExecutor多實現(xiàn)了EventLoop功能涕烧,這個提供parent接口,然后也是持有一個任務(wù)隊列信息汗洒。


public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    private final Queue<Runnable> tailTasks;
.....
}

2.5 NioEventLoop

主要是核心API --> run方法

public final class NioEventLoop extends SingleThreadEventLoop {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);

    private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.

    private static final boolean DISABLE_KEYSET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

    private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
    private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;

    private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {
            return selectNow();
        }
    };
    private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            return NioEventLoop.super.pendingTasks();
        }
    };

    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;   //核心的選擇器
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;

    private final SelectorProvider provider;

    /**
     * Boolean that controls determines if a blocked Selector.select should
     * break out of its selection process. In our case we use a timeout for
     * the select method and the select method will block for that time unless
     * waken up.
     */
    private final AtomicBoolean wakenUp = new AtomicBoolean();

    private final SelectStrategy selectStrategy;

    private volatile int ioRatio = 50;
    private int cancelledKeys;
    private boolean needsToSelectAgain;

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();  //這里打開selector
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    }

如下事NIO邏輯操作议纯,只有NioEventLoop收到退出指令才會退出,否則一直下面方法一直執(zhí)行下去溢谤,這也是NIO線程的執(zhí)行方式瞻凤。

 @Override
protected void run() {
        for (;;) {
            try {
              //selectorNowSupplier和hasTasks判斷是否有需要處理的channel
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // 'wakenUp.compareAndSet(false, true)' is always evaluated
                        // before calling 'selector.wakeup()' to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when 'wakenUp' is set to
                        // true too early.
                        //
                        // 'wakenUp' is set to true too early if:
                        // 1) Selector is waken up between 'wakenUp.set(false)' and
                        //    'selector.select(...)'. (BAD)
                        // 2) Selector is waken up between 'selector.select(...)' and
                        //    'if (wakenUp.get()) { ... }'. (OK)
                        //
                        // In the first case, 'wakenUp' is set to true and the
                        // following 'selector.select(...)' will wake up immediately.
                        // Until 'wakenUp' is set to false again in the next round,
                        // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following 'selector.select(...)' call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;  //表示IO的執(zhí)行比例  
                if (ioRatio == 100) {    //IO如果是100,那么全部優(yōu)先執(zhí)行完IO在執(zhí)行調(diào)度任務(wù)
                    try {    
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys(); //先執(zhí)行IO任務(wù)
                    } finally {
                         //這里給出了IO執(zhí)行的時長世杀,然后根據(jù)IO比例計算調(diào)度任務(wù)應(yīng)該執(zhí)行的超時時長
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }

IO執(zhí)行流程:主要分析主流程阀参,其他支路沒有分析,如果感興趣可以自己分析下

    
   private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            //執(zhí)行這個    
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                //之前我們分析channel有提到附件是把自身給當(dāng)做附件的
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

   private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops); //注銷掉連接操作位
                
                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            //如果還有消息沒有寫出去瞻坝,通過flush全部寫出
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //如果可讀或者可以接受信息這讀取數(shù)據(jù)
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

調(diào)度任務(wù)執(zhí)行:主要分析主流程蛛壳,其他支路沒有分析,如果感興趣可以自己分析下,如下分析的是IO線程執(zhí)行不是100%的情況下

  protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task); //直接調(diào)用task的run方法進行調(diào)用了
 
            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {  //超時需要返回
                    break;
                }
            }

            task = pollTask();
            if (task == null) {  //任務(wù)執(zhí)行完了也返回
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

總結(jié)一下:
EventLoop就是持有一個線程的執(zhí)行器,執(zhí)行的大體邏輯流程由SingleThreadEventExecutor實現(xiàn)衙荐,然后具體的IO流程實現(xiàn)有NioEventLoop來實現(xiàn)捞挥,從而完成IO任務(wù)和調(diào)度任務(wù)。

3.業(yè)務(wù)啟動代碼源碼

我們業(yè)務(wù)代碼一般服務(wù)端會初始化如下兩個group忧吟,這里需要結(jié)合之前提到的Reactor模型砌函,第一個bossGroup負責(zé)處理連接操作,后者處理具體的IO操作溜族。

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);

我們就看看構(gòu)造NioEventLoopGroup()做了什么事情讹俊?

 public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
 public NioEventLoopGroup(int nThreads, Executor executor) {
        this(nThreads, executor, SelectorProvider.provider());
    }
// 接著繼續(xù)看,執(zhí)行到了MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
 //這里就回到我們介紹eventGroup的信息了。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //根據(jù)線程數(shù)量構(gòu)建對應(yīng)的EventExecutor數(shù)目
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末斩祭,一起剝皮案震驚了整個濱河市劣像,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌摧玫,老刑警劉巖耳奕,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異诬像,居然都是意外死亡屋群,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門坏挠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芍躏,“玉大人,你說我怎么就攤上這事降狠《钥ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵榜配,是天一觀的道長否纬。 經(jīng)常有香客問我,道長蛋褥,這世上最難降的妖魔是什么临燃? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮烙心,結(jié)果婚禮上膜廊,老公的妹妹穿的比我還像新娘。我一直安慰自己淫茵,他們只是感情好爪瓜,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著匙瘪,像睡著了一般钥勋。 火紅的嫁衣襯著肌膚如雪炬转。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天算灸,我揣著相機與錄音扼劈,去河邊找鬼。 笑死菲驴,一個胖子當(dāng)著我的面吹牛荐吵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播赊瞬,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼先煎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了巧涧?” 一聲冷哼從身側(cè)響起薯蝎,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎谤绳,沒想到半個月后占锯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡缩筛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年消略,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瞎抛。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡艺演,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出桐臊,到底是詐尸還是另有隱情胎撤,我是刑警寧澤,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布断凶,位于F島的核電站伤提,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏懒浮。R本人自食惡果不足惜飘弧,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一识藤、第九天 我趴在偏房一處隱蔽的房頂上張望砚著。 院中可真熱鬧,春花似錦痴昧、人聲如沸稽穆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舌镶。三九已至柱彻,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間餐胀,已是汗流浹背哟楷。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留否灾,地道東北人卖擅。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像墨技,于是被迫代替她去往敵國和親惩阶。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361