Netty源碼解析—— EventLoop(二)之 EventLoopGroup

Netty源碼解析—— EventLoop(二)之 EventLoopGroup

1.類結(jié)構(gòu)圖

NioEventLoopGroup.png

2. EventExecutorGroup

EventExecutorGroup 實現(xiàn) ScheduledExecutorService 凿叠、Iterable接口涩笤,這兩個接口都是jdk原生接口嚼吞,具體看EventExecutorGroup接口中的方法,代碼如下:

   // ========== 自定義接口 ===================================
   
    //是否正在關(guān)閉
    boolean isShuttingDown();

    //優(yōu)雅關(guān)閉線程池
    Future<?> shutdownGracefully();
  
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    //返回線程池終止時的異步結(jié)果
    Future<?> terminationFuture();

    //選擇一個 EventExecutor 對象
    EventExecutor next();


  // ========== 實現(xiàn)自 Iterable 接口 ==========
    @Override
    Iterator<EventExecutor> iterator();


// ========== 實現(xiàn)自 ExecutorService 接口 ==========

    @Override
    @Deprecated
    void shutdown();

   
    @Override
    @Deprecated
    List<Runnable> shutdownNow();


    @Override
    Future<?> submit(Runnable task);

    @Override
    <T> Future<T> submit(Runnable task, T result);

    @Override
    <T> Future<T> submit(Callable<T> task);

    // ========== 實現(xiàn)自 ScheduledExecutorService 接口 ==========


    @Override
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    @Override
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • 重點關(guān)注next()方法蹬碧,該方法的功能是從線程池中選擇一個線程
  • 比較特殊的是舱禽,接口方法返回類型為 Future 不是 Java 原生的 java.util.concurrent.Future ,而是 Netty 自己實現(xiàn)的 Future 接口恩沽,如下代碼:
public interface Future<V> extends java.util.concurrent.Future<V> 

public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> 

3. AbstractEventExecutorGroup

io.netty.util.concurrent.AbstractEventExecutorGroup 誊稚,實現(xiàn) EventExecutorGroup 接口,EventExecutor ( 事件執(zhí)行器 )的分組抽象類罗心。

3.1 submit

#submit(...) 方法里伯,提交一個普通任務(wù)到 EventExecutor 中, 提交的 EventExecutor ,通過 #next() 方法選擇

   @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

3.2 schedule

#schedule(...) 方法渤闷,提交一個定時任務(wù)到 EventExecutor 中,提交的 EventExecutor 疾瓮,通過 #next() 方法選擇。代碼如下:

@Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return next().schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return next().scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

3.3 execute

#execute(...) 方法飒箭,在 EventExecutor 中執(zhí)行一個普通任務(wù),不需要返回結(jié)果狼电,代碼如下:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

3.4 invokeAll

#invokeAll(...) 方法,在 EventExecutor 中執(zhí)行多個普通任務(wù), 多個任務(wù)使用同一個 EventExecuto弦蹂。代碼如下:

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return next().invokeAll(tasks);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return next().invokeAll(tasks, timeout, unit);
    }

3.5 invokeAny

#invokeAll(...) 方法肩碟,在 EventExecutor 中執(zhí)行多個普通任務(wù),有一個執(zhí)行完成即可凸椿,多個任務(wù)使用同一個 EventExecutor 腾务。代碼如下:

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return next().invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return next().invokeAny(tasks, timeout, unit);
    }

3.6 shutdown

#shutdown(...) 方法,關(guān)閉 EventExecutorGroup 削饵。代碼如下:

    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    
    @Override
    @Deprecated
    public abstract void shutdown();

  
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

  • 具體的 #shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)#shutdown() 方法岩瘦,由子類實現(xiàn)。

4. MultithreadEventExecutorGroup

4.1 構(gòu)造方法

/**
     * EventExecutor 數(shù)組
     */
    private final EventExecutor[] children;
    /**
     * 不可變( 只讀 )的 EventExecutor 數(shù)組
     */
    private final Set<EventExecutor> readonlyChildren;
    /**
     * 已終止的 EventExecutor 數(shù)量
     */
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    /**
     * 用于終止 EventExecutor 的異步 Future
     */
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    /**
     * EventExecutor 選擇器
     */
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

   
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

   
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        // 創(chuàng)建執(zhí)行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        // 創(chuàng)建 EventExecutor 數(shù)組
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            // 是否創(chuàng)建成功
            boolean success = false;
            try {
                // 創(chuàng)建 EventExecutor 對象窿撬,newChild抽象方法启昧,具體有子類實現(xiàn)
                children[i] = newChild(executor, args);
                // 標記創(chuàng)建成功
                success = true;
            } catch (Exception e) {
                // 創(chuàng)建失敗,拋出 IllegalStateException 異常
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 創(chuàng)建失敗劈伴,關(guān)閉所有已創(chuàng)建的 EventExecutor
                if (!success) {
                    // 優(yōu)雅的關(guān)閉所有已創(chuàng)建的 EventExecutor密末,只負責關(guān)閉線程,并不知道關(guān)閉的結(jié)果
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    // 確保所有已創(chuàng)建的 EventExecutor 已關(guān)閉
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            //isTerminated() 若關(guān)閉后所有任務(wù)都已完成跛璧,則返回true严里。注意除非首先調(diào)用shutdown或shutdownNow,否則isTerminated永不為true追城。
                            // 返回:若關(guān)閉后所有任務(wù)都已完成刹碾,則返回true。
                            while (!e.isTerminated()) {
                                //等所有已提交的任務(wù)(包括正在跑的和隊列中等待的)執(zhí)行完
                                //或者等超時時間到
                                //或者線程被中斷座柱,拋出InterruptedException
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        // 創(chuàng)建 EventExecutor 選擇器
        chooser = chooserFactory.newChooser(children);

        // 創(chuàng)建監(jiān)聽器迷帜,用于 EventExecutor 終止時的監(jiān)聽
        //回調(diào)的具體邏輯是物舒,當所有 EventExecutor 都終止完成時,
        // 通過調(diào)用 Future#setSuccess(V result) 方法戏锹,通知監(jiān)聽器們冠胯。至于為什么設(shè)置的值是 null ,因為監(jiān)聽器們不關(guān)注具體的結(jié)果锦针。
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                 // 線程池中的線程每終止一個增加記錄數(shù)荠察,直到全部終止設(shè)置線程池異步終止結(jié)果為成功
                if (terminatedChildren.incrementAndGet() == children.length) {// 全部關(guān)閉
                    terminationFuture.setSuccess(null);// 設(shè)置結(jié)果,并通知監(jiān)聽器們奈搜。
                }
            }
        };

        // 設(shè)置監(jiān)聽器到每個 EventExecutor 上
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        // 創(chuàng)建不可變( 只讀 )的 EventExecutor 數(shù)組
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        //設(shè)置不可變的EventExecutor集合
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

4.2 ThreadPerTaskExecutor

  • 創(chuàng)建執(zhí)行器的代碼如下:
// 創(chuàng)建執(zhí)行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

具體看下 ThreadPerTaskExecutor這個類割粮,代碼如下:


/**
 * 實現(xiàn) Executor 接口,每個任務(wù)一個線程的執(zhí)行器實現(xiàn)類
 */
public final class ThreadPerTaskExecutor implements Executor {
    /**
     * 線程工廠對象
     * Netty 實現(xiàn)自定義的 ThreadFactory 類媚污,為 io.netty.util.concurrent.DefaultThreadFactory
     */
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }
    /**
     * 執(zhí)行任務(wù)
     *
     * @param command 任務(wù)
     *
     *  通過 ThreadFactory#newThread(Runnable) 方法舀瓢,創(chuàng)建一個 Thread ,然后調(diào)用 Thread#start() 方法耗美,啟動線程執(zhí)行任務(wù)
     */
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

  • io.netty.util.concurrent.ThreadPerTaskExecutor 京髓,實現(xiàn) Executor 接口,每個任務(wù)一個線程的執(zhí)行器實現(xiàn)類

  • threadFactory 屬性商架,線程工程實例堰怨,通過構(gòu)造方法來初始化,Netty 實現(xiàn)自定義的 ThreadFactory 類蛇摸,為 io.netty.util.concurrent.DefaultThreadFactory 具體的創(chuàng)建看如下方法,創(chuàng)建默認的線程工廠類

  • /**
        * 創(chuàng)建線程工廠對象,并且使用類名作為 poolType
        * @return
        */
       protected ThreadFactory newDefaultThreadFactory() {
           return new DefaultThreadFactory(getClass());
       }
    
  • #execute(Runnable command) 方法备图,通過 ThreadFactory#newThread(Runnable) 方法,創(chuàng)建一個 Thread 赶袄,然后調(diào)用 Thread#start() 方法揽涮,啟動線程執(zhí)行任務(wù)

4.3 DefaultThreadFactory

4.4 EventExecutorChooserFactory

io.netty.util.concurrent.EventExecutorChooserFactory ,EventExecutorChooser 工廠接口饿肺。代碼如下:

/**
 * Factory that creates new {@link EventExecutorChooser}s.
 *
 * EventExecutorChooser 工廠接口
 */
@UnstableApi
public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     *
     * 創(chuàng)建一個 EventExecutorChooser 對象
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     *
     * EventExecutor 選擇器接口
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         *
         * 選擇下一個 EventExecutor 對象
         */
        EventExecutor next();
    }
}
  • #newChooser(EventExecutor[] executors) 方法蒋困,創(chuàng)建一個 EventExecutorChooser 對象;
  • EventExecutorChooser 接口,EventExecutor 選擇器接口敬辣。
    • #next() 方法選擇下一個 EventExecutor對象雪标;

4.4.1 DefaultEventExecutorChooserFactory

io.netty.util.concurrent.DefaultEventExecutorChooserFactory ,實現(xiàn) EventExecutorChooserFactory 接口溉跃,默認 EventExecutorChooser 工廠實現(xiàn)類村刨。代碼如下


/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 *
 * 實現(xiàn) EventExecutorChooserFactory 接口,默認 EventExecutorChooser 工廠實現(xiàn)類
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    /**
     * 單例
     */
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {// 是否為 2 的冪次方
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    /**
     * 是否為 2 的冪次方
     * @param val
     * @return
     */
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }  
}

  • DefaultEventExecutorChooserFactory是個單例撰茎;
  • Netty實現(xiàn)了兩個線程選擇器嵌牺,雖然代碼不一致,功能都是一樣的:每次選擇索引為上一次所選線程索引+1的線程
  • #newChooser(EventExecutor[] executors)創(chuàng)建具體的選擇器,根據(jù)#isPowerOfTwo(executors.length)方法來判斷髓梅,創(chuàng)建哪種選擇器,EventExecutor 數(shù)組的大小是否為 2 的冪次方绎签,如果是枯饿,創(chuàng)建PowerOfTwoEventExecutorChooser選擇器,不是則創(chuàng)建GenericEventExecutorChooser選擇器诡必;
  • #isPowerOfTwo(int val) 方法奢方,為什么 (val & -val) == val 可以判斷數(shù)字是否為 2 的冪次方呢?

? 我們以 8 來舉個例子:

      - 8 的二進制為 `1000` 
      - -8 的二進制使用補碼表示爸舒。所以蟋字,先求反生成反碼為 `0111` ,然后加一生成補碼為 `1000` 
      - 8 和 -8 與操作后扭勉,還是 8 鹊奖。與操作是都為1則為1,其他都為0涂炎,所以結(jié)果還是1000&1000還是1000忠聚;
      - 實際上,以 2 為冪次方的數(shù)字唱捣,都是最高位為 1 两蟀,剩余位為 0 ,所以對應(yīng)的負數(shù)震缭,求完補碼還是自己

4.4.2 PowerOfTwoEventExecutorChooser

PowerOfTwoEventExecutorChooser 實現(xiàn) EventExecutorChooser 接口赂毯,基于 EventExecutor 數(shù)組的大小為 2 的冪次方的 EventExecutor 選擇器實現(xiàn)類。這是一個優(yōu)化的實現(xiàn)拣宰,線程池數(shù)量使用2的冪次方党涕,這樣線程池選擇線程時使用位操作,能使性能最高巡社,PowerOfTwoEventExecutorChooser 是 DefaultEventExecutorChooserFactory 的靜態(tài)內(nèi)部類遣鼓,代碼如下:

 /**
     *  實現(xiàn) EventExecutorChooser 接口,基于 EventExecutor 數(shù)組的大小為 2 的冪次方的 EventExecutor 選擇器實現(xiàn)類
     */
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        /**
         * 自增序列
         */
        private final AtomicInteger idx = new AtomicInteger();
        /**
         * EventExecutor 數(shù)組
         */
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        /**
         * 因為 - ( 二元操作符 ) 的計算優(yōu)先級高于 & ( 一元操作符 ) 重贺。
         *
         * 因為 EventExecutor 數(shù)組的大小是以 2 為冪次方的數(shù)字骑祟,那么減一后,除了最高位是 0 气笙,剩余位都為 1          ( 例如 8 減一后等于 7 次企,而 7 的二進制為 0111 。)潜圃,
         * 那么無論 idx 無論如何遞增缸棵,再進行 & 并操作,都不會超過 EventExecutor 數(shù)組的大小谭期。并且堵第,還能保            證順序遞增吧凉。
         * @return
         */
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

4.4.3 GenericEventExecutorChooser

GenericEventExecutorChooser 實現(xiàn) EventExecutorChooser 接口,通用的 EventExecutor 選擇器實現(xiàn)類踏志。代碼如下:

GenericEventExecutorChooser 內(nèi)嵌在 DefaultEventExecutorChooserFactory 類中阀捅。

?

/**
     * GenericEventExecutorChooser 內(nèi)嵌在 DefaultEventExecutorChooserFactory 類中。
     *  實現(xiàn) EventExecutorChooser 接口针余,通用的 EventExecutor 選擇器實現(xiàn)類
     */
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        /**
         * 使用 idx 自增饲鄙,并使用 EventExecutor 數(shù)組的大小來取余
         * @return
         */
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

4.5 next

#next() 方法,選擇下一個 EventExecutor 對象圆雁。代碼如下:

 /**
     * 選擇下一個 EventExecutor 對象
     * @return
     */
    @Override
    public EventExecutor next() {
        return chooser.next();
    }

4.6 iterator

 /**
     * 獲得 EventExecutor 數(shù)組的迭代器
     * 為了避免調(diào)用方忍级,獲得迭代器后,對 EventExecutor 數(shù)組進行修改伪朽,
     * 所以返回是不可變的 EventExecutor 數(shù)組 readonlyChildren 的迭代器
     * @return
     */
    @Override
    public Iterator<EventExecutor> iterator() {
        return readonlyChildren.iterator();
    }

4.7 executorCount

 /**
     * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
     * 1:1 to the threads it use.
     *
     * 獲得 EventExecutor 數(shù)組的大小
     */
    public final int executorCount() {
        return children.length;
    }

4.8 newChild

   /**
     * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
     * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
     *
     * 抽象方法轴咱,子類實現(xiàn)該方法,創(chuàng)建其對應(yīng)的 EventExecutor 實現(xiàn)類的對象
     *
     */
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

4.9 shutdownGracefully

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

  • 優(yōu)雅的關(guān)閉EventExecutor線程組烈涮,返回terminationFuture嗦玖,在構(gòu)造方法中由于已經(jīng)設(shè)置了監(jiān)聽,如下代碼,通過success屬性來判斷是否全部都關(guān)閉跃脊;

  • final FutureListener<Object> terminationListener = new FutureListener<Object>() {
               @Override
               public void operationComplete(Future<Object> future) throws Exception {
                   // 線程池中的線程每終止一個增加記錄數(shù)宇挫,直到全部終止設(shè)置線程池異步終止結(jié)果為成功
                   if (terminatedChildren.incrementAndGet() == children.length) {// 全部關(guān)閉
                       terminationFuture.setSuccess(null);// 設(shè)置結(jié)果,并通知監(jiān)聽器們酪术。
                   }
               }
           };
    

4.10 terminationFuture

    /**
     * 返回用于終止 EventExecutor 的異步 Future
     * @return
     */
    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

4.11 shutdown

   /**
     * 廢棄的方法器瘪,EventExecutor線程組關(guān)閉
     */
    @Override
    @Deprecated
    public void shutdown() {
        for (EventExecutor l: children) {
            l.shutdown();
        }
    }

4.12 isShuttingDown

    /**
     * 判斷所有的EventExecutor是否在優(yōu)雅的關(guān)閉,或者已經(jīng)關(guān)閉,
     * 任何一個EventExecutor沒有關(guān)閉則返回false
     * @return
     */
    @Override
    public boolean isShuttingDown() {
        for (EventExecutor l: children) {
            if (!l.isShuttingDown()) {
                return false;
            }
        }
        return true;
    }

4.13 isShutdown

    /**
     * 判斷所有的EventExecutor是否都關(guān)閉
     * @return
     */
    @Override
    public boolean isShutdown() {
        for (EventExecutor l: children) {
            if (!l.isShutdown()) {
                return false;
            }
        }
        return true;
    }

4.14 isTerminated

    /**
     * EventExecutor線程組關(guān)閉后绘雁,所有任務(wù)是否都已完成
     * @return
     */
    @Override
    public boolean isTerminated() {
        for (EventExecutor l: children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

4.15 awaitTermination

/**
     * 等待所有的EventExecutor任務(wù)都執(zhí)行完或者等待時間超時,返回任務(wù)是否都已經(jīng)執(zhí)行完
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        loop: for (EventExecutor l: children) {
            for (;;) {
                //超時則跳出loop循環(huán)
                long timeLeft = deadline - System.nanoTime();
                if (timeLeft <= 0) {
                    break loop;
                }
                //等所有已提交的任務(wù)(包括正在跑的和隊列中等待的)執(zhí)行完
                //或者等超時時間到
                //或者線程被中斷橡疼,拋出InterruptedException
                //跳出for(;;)循環(huán)
                if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                    break;
                }
            }
        }
        return isTerminated();
    }

5. EventLoopGroup

io.netty.channel.EventExecutorGroup ,繼承 EventExecutorGroup 接口庐舟,EventLoop 的分組接口欣除。代碼如下:

 // ========== 實現(xiàn)自 EventExecutorGroup 接口 ==========
    /**
     * Return the next {@link EventLoop} to use
     * 覆蓋父類接口的方法,選擇下一個 EventLoop 對象
     */
    @Override
    EventLoop next();

    // ========== 自定義接口 ==========

    /**
     * 注冊 Channel 到 EventLoopGroup 中的一個線程上。實際上挪略,EventLoopGroup 會分配一個 EventLoop 給該 Channel 注冊
     */
    ChannelFuture register(Channel channel);

  
    ChannelFuture register(ChannelPromise promise);

   
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
  • #next() 方法历帚,覆蓋父類接口的方法,選擇下一個 EventLoop 對象
  • #register(...) 方法,注冊 Channel 到 EventLoopGroup 中的一個線程上杠娱。實際上挽牢,EventLoopGroup 會分配一個 EventLoop 給該 Channel 注冊

6.MultithreadEventLoopGroup

io.netty.channel.MultithreadEventLoopGroup ,實現(xiàn) EventLoopGroup 接口摊求,繼承 MultithreadEventExecutorGroup 抽象類禽拔,基于多線程的 EventLoop 的分組抽象類。

6.1 構(gòu)造方法

/**
     * 默認 EventLoop 線程數(shù)
     * EventLoopGroup 默認擁有的 EventLoop 數(shù)量。因為一個 EventLoop 對應(yīng)一個線程睹栖,所以為 CPU 數(shù)量 * 2 硫惕。
     * 為什么會 * 2 呢?因為目前 CPU 基本都是超線程野来,一個 CPU 可對應(yīng) 2 個線程恼除。
     * 在構(gòu)造方法未傳入 nThreads 方法參數(shù)時,使用 DEFAULT_EVENT_LOOP_THREADS 梁只。
     */
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    /**
     * 初始化線程數(shù)
     */
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
     * EventExecutorChooserFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                     Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
    }
  • 主要初始化了線程數(shù)缚柳,然后調(diào)用父類的構(gòu)造方法
  • 默認情況埃脏,線程數(shù)最小為1搪锣,如果配置了系統(tǒng)參數(shù)io.netty.eventLoopThreads,設(shè)置為該系統(tǒng)參數(shù)值彩掐,否則設(shè)置為核心數(shù)的2倍构舟。

6.2 newDefaultThreadFactory

#newDefaultThreadFactory() 方法,創(chuàng)建線程工廠對象堵幽,覆蓋父類方法狗超,增加了線程優(yōu)先級為 Thread.MAX_PRIORITY ,代碼如下:

    /**
     * 創(chuàng)建線程工廠對象
     *
     * 覆蓋父類方法朴下,增加了線程優(yōu)先級為 Thread.MAX_PRIORITY 努咐。
     * @return
     */
    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

6.3 next()

#next()方法,選擇下一個 EventLoop 對象,覆蓋父類方法,將返回值轉(zhuǎn)換成 EventLoop 類,代碼如下:

    /**
     * 選擇下一個 EventLoop 對象
     *
     * 覆蓋父類方法殴胧,將返回值轉(zhuǎn)換成 EventLoop 類
     * @return
     */
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

6.4 newChild

#newChild(...) 抽象方法渗稍,創(chuàng)建 EventExecutor 對象,覆蓋父類方法团滥,返回值改為 EventLoop 類竿屹。

    /**
     * 抽象方法,創(chuàng)建 EventExecutor 對象
     *
     * 覆蓋父類方法灸姊,返回值改為 EventLoop 類拱燃。
     * @param executor
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

6.5 register(

#register(...)方法,注冊 Channel 到 EventLoopGroup 中力惯,通過#next() 方法來選擇一個EventLoop來注冊,也就是通過EventExecutorChooser選擇器從線程組中選擇一個碗誉;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}

7. NioEventLoopGroup

io.netty.channel.nio.NioEventLoopGroup ,繼承 MultithreadEventLoopGroup 抽象類父晶,NioEventLoop 的分組實現(xiàn)類诗充。

7.1 構(gòu)造方法

    public NioEventLoopGroup() {
        this(0);
    }

    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

  
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
       
        this(nThreads, executor, SelectorProvider.provider());
    }

    
    public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
       
        this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
       
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }

構(gòu)造方法比較多,主要是明確了父構(gòu)造方法的 Object ... args方法參數(shù)

  • 第一個參數(shù)诱建,selectorProvider 蝴蜓,java.nio.channels.spi.SelectorProvider ,用于創(chuàng)建 Java NIO Selector 對象。
  • 第二個參數(shù)茎匠,selectStrategyFactory 格仲,io.netty.channel.SelectStrategyFactory ,選擇策略工廠诵冒。詳細解析凯肋,見后續(xù)文章。
  • 第三個參數(shù)汽馋,rejectedExecutionHandler 侮东,io.netty.channel.SelectStrategyFactory ,拒絕執(zhí)行處理器豹芯。詳細解析悄雅,見后續(xù)文章。

7.2 newChild

#newChild(Executor executor, Object... args) 方法铁蹈,創(chuàng)建 NioEventLoop 對象

 
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
  • 模板方法newChild()宽闲,用來創(chuàng)建線程池中的單個線程,現(xiàn)在我們知道MultithreadEventExecutorGroupEventExecutor[] children 保存的就是NioEventLoop

7.3 setIoRatio

#setIoRatio(int ioRatio) 方法,設(shè)置所有 EventLoop 的 IO 任務(wù)占用執(zhí)行時間的比例

public void setIoRatio(int ioRatio) {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).setIoRatio(ioRatio);
        }
    }

7.4 rebuildSelectors

#rebuildSelectors() 方法,重建所有 EventLoop 的 Selector 對象

   /**
     * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
     * around the  infamous epoll 100% CPU bug.
     *
     * 重建所有 EventLoop 的 Selector 對象
     *
     * 因為 JDK 有 epoll 100% CPU Bug 。實際上屹堰,NioEventLoop 當觸發(fā)該 Bug 時,
     * 也會自動調(diào)用 NioEventLoop#rebuildSelector() 方法览徒,進行重建 Selector 對象,以修復(fù)該問題颂龙。
     */
    public void rebuildSelectors() {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).rebuildSelector();
        }
    }

源碼解析好文

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末习蓬,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子厘托,更是在濱河造成了極大的恐慌,老刑警劉巖铅匹,帶你破解...
    沈念sama閱讀 221,273評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件包斑,死亡現(xiàn)場離奇詭異流礁,居然都是意外死亡,警方通過查閱死者的電腦和手機罗丰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評論 3 398
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來找御,“玉大人元镀,你說我怎么就攤上這事■Γ” “怎么了栖疑?”我有些...
    開封第一講書人閱讀 167,709評論 0 360
  • 文/不壞的土叔 我叫張陵遇革,是天一觀的道長揭糕。 經(jīng)常有香客問我,道長揪漩,這世上最難降的妖魔是什么雇寇? 我笑而不...
    開封第一講書人閱讀 59,520評論 1 296
  • 正文 為了忘掉前任锨侯,我火速辦了婚禮冬殃,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘审葬。我一直安慰自己涣觉,他們只是感情好,可當我...
    茶點故事閱讀 68,515評論 6 397
  • 文/花漫 我一把揭開白布生兆。 她就那樣靜靜地躺著膝宁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪合蔽。 梳的紋絲不亂的頭發(fā)上介返,一...
    開封第一講書人閱讀 52,158評論 1 308
  • 那天,我揣著相機與錄音刃宵,去河邊找鬼。 笑死鞍陨,一個胖子當著我的面吹牛从隆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播寿烟,決...
    沈念sama閱讀 40,755評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼筛武,長吁一口氣:“原來是場噩夢啊……” “哼挎塌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起待锈,我...
    開封第一講書人閱讀 39,660評論 0 276
  • 序言:老撾萬榮一對情侶失蹤嘴高,失蹤者是張志新(化名)和其女友劉穎拴驮,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體套啤,經(jīng)...
    沈念sama閱讀 46,203評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡纲岭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,287評論 3 340
  • 正文 我和宋清朗相戀三年止潮,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片袄琳。...
    茶點故事閱讀 40,427評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖宛琅,靈堂內(nèi)的尸體忽然破棺而出逗旁,到底是詐尸還是另有隱情,我是刑警寧澤红伦,帶...
    沈念sama閱讀 36,122評論 5 349
  • 正文 年R本政府宣布昙读,位于F島的核電站膨桥,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏沮稚。R本人自食惡果不足惜介牙,卻給世界環(huán)境...
    茶點故事閱讀 41,801評論 3 333
  • 文/蒙蒙 一澳厢、第九天 我趴在偏房一處隱蔽的房頂上張望剩拢。 院中可真熱鬧,春花似錦徐伐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽需曾。三九已至祈远,卻和暖如春商源,著一層夾襖步出監(jiān)牢的瞬間牡彻,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評論 1 272
  • 我被黑心中介騙來泰國打工充甚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留霸褒,地道東北人。 一個月前我還...
    沈念sama閱讀 48,808評論 3 376
  • 正文 我出身青樓技矮,卻偏偏與公主長得像衰倦,于是被迫代替她去往敵國和親旁理。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,440評論 2 359