Netty源碼解析 —— concurrency

線程模型

概述

因為具有多核心或多個CPU 的計算機現(xiàn)在已經(jīng)司空見慣吁峻,大多數(shù)的現(xiàn)代應(yīng)用程序都利用了
復(fù)雜的多線程處理技術(shù)以有效地利用系統(tǒng)資源。相比之下射赛,在早期的Java 語言中倾芝,我們使用多
線程處理的主要方式無非是按需創(chuàng)建和啟動新的Thread來執(zhí)行并發(fā)的任務(wù)單元——一種在高
負(fù)載下工作得很差的原始方式。Java 5 隨后引入了Executor API购裙,其線程池通過緩存和重用
Thread 極大地提高了性能。

基本的線程池化模式可以描述為:

  • 從池的空閑線程列表中選擇一個Thread鹃栽,并且指派它去運行一個已提交的任務(wù)(一個
    Runnable 的實現(xiàn))躏率;
  • 當(dāng)任務(wù)完成時,將該Thread 返回給該列表民鼓,使其可被重用薇芝。

雖然池化和重用線程相對于簡單地為每個任務(wù)都創(chuàng)建和銷毀線程是一種進(jìn)步,但是它并不能
消除由上下文切換所帶來的開銷丰嘉,其將隨著線程數(shù)量的增加很快變得明顯夯到,并且在高負(fù)載下愈演
愈烈。此外饮亏,僅僅由于應(yīng)用程序的整體復(fù)雜性或者并發(fā)需求耍贾,在項目的生命周期內(nèi)也可能會出現(xiàn)
其他和線程相關(guān)的問題。

Executor的執(zhí)行邏輯

EventLoop

Netty 的EventLoop 是協(xié)同設(shè)計的一部分路幸,它采用了兩個基本的API:并發(fā)和網(wǎng)絡(luò)編程荐开。
首先,io.netty.util.concurrent 包構(gòu)建在JDK 的java.util.concurrent 包上简肴,用
來提供線程執(zhí)行器晃听。其次,io.netty.channel 包中的類砰识,為了與Channel 的事件進(jìn)行交互能扒,
擴展了這些接口/類。

EventLoop類繼承結(jié)構(gòu)

在這個模型中仍翰,一個EventLoop 將由一個永遠(yuǎn)都不會改變的Thread 驅(qū)動赫粥,同時任務(wù)
Runnable 或者Callable)可以直接提交給EventLoop 實現(xiàn),以立即執(zhí)行或者調(diào)度執(zhí)行予借。
根據(jù)配置和可用核心的不同越平,可能會創(chuàng)建多個EventLoop 實例用以優(yōu)化資源的使用频蛔,并且單個
EventLoop 可能會被指派用于服務(wù)多個Channel

實現(xiàn)細(xì)節(jié)

Netty線程模型的卓越性能取決于對于當(dāng)前執(zhí)行的Thread的身份的確定秦叛,也就是說晦溪,確定它是否是分配給當(dāng)前Channel以及它的EventLoop的那一個線程(EventLoop將負(fù)責(zé)處理一個Channel的整個生命周期內(nèi)的所有事件)。

如果(當(dāng)前)調(diào)用線程正是支撐EventLoop 的線程挣跋,那么所提交的代碼塊將會被(直接)
執(zhí)行三圆。否則,EventLoop 將調(diào)度該任務(wù)以便稍后執(zhí)行避咆,并將它放入到內(nèi)部隊列中舟肉。當(dāng)EventLoop
下次處理它的事件時,它會執(zhí)行隊列中的那些任務(wù)/事件查库。這也就解釋了Thread 是如何
Channel 直接交互而無需在ChannelHandler 中進(jìn)行額外同步的路媚。

注意,每個EventLoop 都有它自已的任務(wù)隊列樊销,獨立于任何其他的EventLoop整慎。下圖
展示了EventLoop 用于調(diào)度任務(wù)的執(zhí)行邏輯。這是Netty 線程模型的關(guān)鍵組成部分围苫。

EventLoop的執(zhí)行邏輯

服務(wù)于Channel 的I/O 和事件的EventLoop 包含在EventLoopGroup 中裤园。根據(jù)不同的
傳輸實現(xiàn),EventLoop 的創(chuàng)建和分配方式也不同剂府。

1. 異步傳輸

異步傳輸實現(xiàn)只使用了少量的EventLoop(以及和它們相關(guān)聯(lián)的Thread)拧揽,而且在當(dāng)前的
線程模型中,它們可能會被多個Channel 所共享周循。這使得可以通過盡可能少量的Thread 來支
撐大量的Channel强法,而不是每個Channel 分配一個Thread万俗。
圖7-4 顯示了一個EventLoopGroup湾笛,它具有3 個固定大小的EventLoop(每個EventLoop
都由一個Thread 支撐)。在創(chuàng)建EventLoopGroup 時就直接分配了EventLoop(以及支撐它們
的Thread)闰歪,以確保在需要時它們是可用的嚎研。

用于非阻塞傳輸(如NIO 和AIO)的EventLoop 分配方式

EventLoopGroup 負(fù)責(zé)為每個新創(chuàng)建的Channel 分配一個EventLoop。在當(dāng)前實現(xiàn)中库倘,
使用順序循環(huán)(round-robin)的方式進(jìn)行分配以獲取一個均衡的分布临扮,并且相同的EventLoop
可能會被分配給多個Channel。(這一點在將來的版本中可能會改變教翩。)

一旦一個Channel 被分配給一個EventLoop脚囊,它將在它的整個生命周期中都使用這個
EventLoop(以及相關(guān)聯(lián)的Thread)括儒。請牢記這一點,因為它可以使你從擔(dān)憂你的Channel-
Handler 實現(xiàn)中的線程安全和同步問題中解脫出來。

另外,需要注意的是角溃,EventLoop 的分配方式對ThreadLocal 的使用的影響。因為一個
EventLoop 通常會被用于支撐多個Channel,所以對于所有相關(guān)聯(lián)的Channel 來說蚂且,
ThreadLocal 都將是一樣的。這使得它對于實現(xiàn)狀態(tài)追蹤等功能來說是個糟糕的選擇幅恋。然而杏死,
在一些無狀態(tài)的上下文中,它仍然可以被用于在多個Channel 之間共享一些重度的或者代價昂
貴的對象捆交,甚至是事件淑翼。

2. 阻塞傳輸

用于像OIO(舊的阻塞I/O)這樣的其他傳輸?shù)脑O(shè)計略有不同,如圖7-5 所示品追。這里每一個Channel 都將被分配給一個EventLoop(以及它的Thread)窒舟。如果你開發(fā)的應(yīng)用程序使用過java.io 包中的阻塞I/O 實現(xiàn),你可能就遇到過這種模型诵盼。

阻塞傳輸(如OIO)的EventLoop 分配方式

但是惠豺,正如同之前一樣,得到的保證是每個Channel 的I/O 事件都將只會被一個Thread
(用于支撐該Channel 的EventLoop 的那個Thread)處理风宁。這也是另一個Netty 設(shè)計一致性
的例子洁墙,它(這種設(shè)計上的一致性)對Netty 的可靠性和易用性做出了巨大貢獻(xiàn)。

實現(xiàn)

介紹完Netty的線程模型后戒财,讓我看一下它是如何實現(xiàn)的热监。之前我們已經(jīng)看到了EventLoop的類層級結(jié)構(gòu)圖,現(xiàn)在我們不管EventLoop饮寞,而是先看EventExecutorGroup以及對應(yīng)的實現(xiàn)孝扛。

EventExecutorGroup

首先我們先明確一件事,EventExecutorGroup與常見的線程池類似幽崩,而EventExecutor雖然也繼承了ExecutorService苦始,但是它是一個特化的只維護(hù)一個線程的線程池,因此可以將其視為擁有線程池職能的線程慌申。而一個EventExecutorGroup管理多個EventExecutor陌选,此結(jié)構(gòu)與我們熟悉的線程池模型更加相似。理解了這件事之后蹄溉,關(guān)于EventExecutorGroup的實現(xiàn)將清晰許多咨油。

EventExecutorGroup

AbstractEventExecutorGroupEventExecutorGroup接口的基本實現(xiàn),遵循前面的介紹的輪詢(round-robin)模式柒爵,將每一個任務(wù)輪流分配給它所管理的EventExecutor役电。

@Override
public Future<?> submit(Runnable task) {
    return next().submit(task);
}

/**
 * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
 */
EventExecutor next();

MultithreadEventExecutorGroup則實現(xiàn)了線程池的骨架,正如其名棉胀,它是一個支持多線程執(zhí)行的EventExecutorGroup法瑟,與java.util.concurrent中實現(xiàn)的線程池類似囱晴。這個類的核心是它的構(gòu)造方法,展現(xiàn)了它與EventExecutor之間是如何關(guān)聯(lián)的:

private final EventExecutor[] children;


protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

其中Executor只用來構(gòu)造Thread瓢谢,默認(rèn)為一個EventExecutor對應(yīng)一個Thread畸写。注意execute方法:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

chooser字段代表了EventExcutor的選擇策略(默認(rèn)為輪詢):

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

這里有一點優(yōu)化,即 & 操作相比 % 會快很多氓扛,所以當(dāng)線程池的線程數(shù)量為pow2時枯芬,針對設(shè)計了next()方法。

choose字段在EventExecutorGroup#next()方法中使用采郎,用以選擇執(zhí)行任務(wù)的線程千所。

@Override
public EventExecutor next() {
    return chooser.next();
}

構(gòu)造EventExecutornewChild()方法交由子類實現(xiàn),以針對不同的場景進(jìn)行定制蒜埋。默認(rèn)實現(xiàn)如下所示:

@Override
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
    return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
}

public DefaultEventExecutor(EventExecutorGroup parent, Executor executor, int maxPendingTasks,
                            RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, true, maxPendingTasks, rejectedExecutionHandler);
}

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ThreadExecutorMap.apply(executor, this);
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

注意taskQueue = newTaskQueue(this.maxPendingTasks);方法淫痰,每個EventExecutor都自己維護(hù)一個任務(wù)隊列,而不是EventExecutorGroup維護(hù)一個任務(wù)隊列整份,因此一批任務(wù)都會由固定的EventExecutor進(jìn)行執(zhí)行待错,這樣可以避免在編寫代碼時考慮并發(fā)控制,這個優(yōu)勢在ChannelHandler中體現(xiàn)烈评。

關(guān)于EventExecutorGroup實現(xiàn)的說明到此為止火俄,接下來講述EventExecutor的實現(xiàn)類。

EventExecutor

在上面的繼承體系中讲冠,頂層類AbstractEventExecutor繼承了jdk提供的骨架類AbstractExecutorService瓜客,在此基礎(chǔ)上提供了自己的一些功能,比如:

@Override
public EventExecutorGroup parent() {
    return parent;
}

@Override
public EventExecutor next() {
    return this;
}

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

其中next()方法我們在EventExecutorGroup的實現(xiàn)中提及過竿开,EventExcutor繼承了EventExecutorGroup接口谱仪,它的實現(xiàn)就是返回它自己。

inEventLoop()則是核心方法否彩,在之前的線程模型中疯攒,我們說過Netty的性能取決于對當(dāng)前線程身份的確定,此方法就用于實現(xiàn)這一功能胳搞,關(guān)于此方法是如何帶來性能提升卸例,我們在后面再討論称杨。

AbstractScheduledEventExecutor類則增加了任務(wù)調(diào)度的功能肌毅,實現(xiàn)了ScheduledExecutorService接口。其內(nèi)部維護(hù)了一個優(yōu)先隊列姑原,以任務(wù)下一次執(zhí)行時間為基準(zhǔn)進(jìn)行比較悬而。

SingleThreadEventExecutor則實現(xiàn)了EventExecutor的核心功能,它的名字與MultithreadEventExecutorGroup相對應(yīng)锭汛。

這個類的實現(xiàn)比較繁雜笨奠,有許多的輔助方法袭蝗,我們從核心方法execute()進(jìn)行切入,我們知道EventExecutorEventExecutorGroup所控制般婆,所以它的啟動需要EventExecutorGroup幫助:

# EventExecutorGroup.java

@Override
public void execute(Runnable command) {
    next().execute(command);
}


# SingleThreadEventExecutor.java    -------------------------------------

@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    // 將任務(wù)直接增加到任務(wù)隊列中到腥,而不直接執(zhí)行
    addTask(task);
    if (!inEventLoop) {
        // 首次啟動時,thread字段還未初始化蔚袍,所以inEventLoop()方法一定會返回false
        startThread();
        // 如果EventExecutor已經(jīng)被關(guān)閉乡范,取消此任務(wù)的執(zhí)行(邊界條件)
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}


private void startThread() {
    if (state == ST_NOT_STARTED) {
        // CAS修改EventExecutor的狀態(tài)為已啟動
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

在介紹真正的啟動過程之前,我們先回憶一下EventExecutor的構(gòu)造方法:

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedHandler) {
    super(parent);
    this.addTaskWakesUp = addTaskWakesUp;
    this.maxPendingTasks = Math.max(16, maxPendingTasks);
    this.executor = ThreadExecutorMap.apply(executor, this);
    taskQueue = newTaskQueue(this.maxPendingTasks);
    rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            executor.execute(apply(command, eventExecutor));
        }
    };
}

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
        @Override
        public void run() {
            setCurrentEventExecutor(eventExecutor);
            try {
                command.run();
            } finally {
                setCurrentEventExecutor(null);
            }
        }
    };
}

private static void setCurrentEventExecutor(EventExecutor executor) {
    mappings.set(executor);
}

private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();

這一步對Runnable進(jìn)行裝飾啤咽,以此將之前EventExecutorGroup傳入的Executor所產(chǎn)生的ThreadEventExecutor綁定起來晋辆。

doStartThread()方法調(diào)用了Executorexecutor的方法,構(gòu)造一個新的線程宇整,并執(zhí)行上面修飾后的Runnable瓶佳。

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    if (logger.isErrorEnabled()) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                "be called before run() implementation terminates.");
                    }
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                        // the future. The user may block on the future and once it unblocks the JVM may terminate
                        // and start unloading classes.
                        // See https://github.com/netty/netty/issues/6596.
                        FastThreadLocal.removeAll();

                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.countDown();
                        if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
                            logger.warn("An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
}

上面的代碼雖然很長,但是核心內(nèi)容只有:

```java
try {
    SingleThreadEventExecutor.this.run();
    success = true;
}

SingleThreadEventExecutor并沒有實現(xiàn)run()方法鳞青,而是讓其在子類中實現(xiàn)霸饲,讓我們看一下默認(rèn)實現(xiàn):

@Override
protected void run() {
    for (;;) {
        Runnable task = takeTask();
        if (task != null) {
            task.run();
            updateLastExecutionTime();
        }

        if (confirmShutdown()) {
            break;
        }
    }
}

run()方法內(nèi)部是一個死循環(huán),不斷從任務(wù)隊列中獲取任務(wù)然后執(zhí)行臂拓,直到EventExecutor被關(guān)閉贴彼。

protected Runnable takeTask() {
    assert inEventLoop();
    if (!(taskQueue instanceof BlockingQueue)) {
        throw new UnsupportedOperationException();
    }

    BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
    for (;;) {
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null) {
            Runnable task = null;
            try {
                task = taskQueue.take();
                if (task == WAKEUP_TASK) {
                    task = null;
                }
            } catch (InterruptedException e) {
                // Ignore
            }
            return task;
        } else {
            long delayNanos = scheduledTask.delayNanos();
            Runnable task = null;
            // 如果定時隊列中有任務(wù),并且還未到執(zhí)行時間埃儿,
            // 那么嘗試從taskQueue中獲取任務(wù)并執(zhí)行器仗,直到定時任務(wù)
            // 可以執(zhí)行
            if (delayNanos > 0) {
                try {
                    task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    // Waken up.
                    return null;
                }
            }
            if (task == null) {
                // We need to fetch the scheduled tasks now as otherwise there may be a chance that
                // scheduled tasks are never executed if there is always one task in the taskQueue.
                // This is for example true for the read task of OIO Transport
                // See https://github.com/netty/netty/issues/1614
                fetchFromScheduledTaskQueue();
                task = taskQueue.poll();
            }

            if (task != null) {
                return task;
            }
        }
    }
}

private boolean fetchFromScheduledTaskQueue() {
    long nanoTime = AbstractScheduledEventExecutor.nanoTime();
    Runnable scheduledTask  = pollScheduledTask(nanoTime);
    while (scheduledTask != null) {
        if (!taskQueue.offer(scheduledTask)) {
            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
            return false;
        }
        scheduledTask  = pollScheduledTask(nanoTime);
    }
    return true;
}

此處的實現(xiàn)與java.util.concurrent中線程池的工作線程基本相似,不過最重要的是線程模型之間的區(qū)別童番。

takeTask()方法在之前的版本如下:

Runnable task;
if (delayNanos > 0) {
    try {
        task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
    } catch (InterruptedException e) {
        return null;
    }
} else {
    task = taskQueue.poll();
}
if (task == null) {
    fetchFromDelayedQueue();
    task = taskQueue.poll();
}

這個版本的實現(xiàn)有一個bug精钮,如果taskQueue中一直有任務(wù),那么定時任務(wù)將無法被執(zhí)行剃斧。

最后轨香,讓我看看一下EventExecutor是如何被關(guān)閉的,雖然ExecutorService提供了shutdown()方法幼东,但是它的接口語義是線程池關(guān)閉后將不能再接受任務(wù)臂容,這會使得某些任務(wù)無法被完成,導(dǎo)致出現(xiàn)一些問題根蟹,所以Netty定義了一個新的接口叫做shutdownGracefully脓杉,用來取代shutdown()

@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
    if (quietPeriod < 0) {
        throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
    }
    if (timeout < quietPeriod) {
        throw new IllegalArgumentException(
                "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    if (isShuttingDown()) {
        return terminationFuture();
    }

    boolean inEventLoop = inEventLoop();
    boolean wakeup;
    int oldState;
    for (;;) {
        if (isShuttingDown()) {
            return terminationFuture();
        }
        int newState;
        wakeup = true;
        oldState = state;
        if (inEventLoop) {
            newState = ST_SHUTTING_DOWN;
        } else {
            switch (oldState) {
                case ST_NOT_STARTED:
                case ST_STARTED:
                    newState = ST_SHUTTING_DOWN;
                    break;
                default:
                    newState = oldState;
                    wakeup = false;
            }
        }
        if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
            break;
        }
    }
    gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
    gracefulShutdownTimeout = unit.toNanos(timeout);

    if (ensureThreadStarted(oldState)) {
        return terminationFuture;
    }

    if (wakeup) {
        wakeup(inEventLoop);
    }

    return terminationFuture();
}

這個接口擁有三個參數(shù)long quietPeriod, long timeout, TimeUnit unit简逮。如果指定了quietPeriod不為0球散,那么EventExecutor在執(zhí)行完當(dāng)前任務(wù)列表中的任務(wù)后,并不會立刻關(guān)閉散庶,而是會繼續(xù)等待新任務(wù)蕉堰,timeout的作用與此類似凌净。調(diào)用此接口后,線程池的狀態(tài)變更為ST_SHUTTING_DOWN屋讶,進(jìn)入一段靜默等待的時期冰寻,如果有新任務(wù)到來,它可以成功被接受皿渗。

工作線程需要配置shutdownGracefully工作性雄,讓我們再看一下run()方法:

protected void run() {
    for (;;) {
        Runnable task = takeTask();
        if (task != null) {
            task.run();
            updateLastExecutionTime();
        }

        if (confirmShutdown()) {
            break;
        }
    }
}


protected boolean confirmShutdown() {
    if (!isShuttingDown()) {
        return false;
    }

    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }

    cancelScheduledTasks();

    if (gracefulShutdownStartTime == 0) {
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }

    if (runAllTasks() || runShutdownHooks()) {
        if (isShutdown()) {
            // Executor shut down - no new tasks anymore.
            return true;
        }

        // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
        // terminate if the quiet period is 0.
        // See https://github.com/netty/netty/issues/4241
        if (gracefulShutdownQuietPeriod == 0) {
            return true;
        }
        wakeup(true);
        return false;
    }

    final long nanoTime = ScheduledFutureTask.nanoTime();

    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }

    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // Check if any tasks were added to the queue every 100ms.
        // TODO: Change the behavior of takeTask() so that it returns on timeout.
        wakeup(true);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore
        }

        return false;
    }

    // No tasks were added for last quiet period - hopefully safe to shut down.
    // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
    return true;
}

任務(wù)線程每執(zhí)行一次任務(wù)后,都會去檢查線程池是否被關(guān)閉羹奉,如果線程池處于ST_SHUTTING_DOWN狀態(tài)秒旋,工作線程將進(jìn)入靜默期,等待線程池徹底關(guān)閉或者超時诀拭。如果quietPeriod指定為0迁筛,那么工作線程執(zhí)行完任務(wù)隊列中的所有任務(wù)后,工作線程將停止耕挨,線程池徹底關(guān)閉细卧。否則,它會等待超時或者quietPeriod到達(dá)筒占,在此期間每100ms檢查一次任務(wù)隊列贪庙,如果有新任務(wù)到來那么執(zhí)行它。

EventLoopGroup

EventLoopGruop的類層次結(jié)構(gòu)如下所示翰苫,雖然DefaultEventLoopGroup只是一個簡單的實現(xiàn)類止邮,但是NioEventLoopGroup等常用組件都和它的繼承體系差不多,在此只看其基本實現(xiàn)奏窑,排除掉和IO關(guān)聯(lián)的實現(xiàn)類导披。

EventLoopGroup

從上圖可見,MultithreadEventLoopGroup直接繼承自MultithreadEventExecutorGroup埃唯,在原本并發(fā)架構(gòu)的基礎(chǔ)上撩匕,進(jìn)行定制,實現(xiàn)了EventLoopGroup接口的一些方法墨叛。在這種設(shè)計下止毕,并發(fā)框架保持了良好的可擴展性。

MultithreadEventExecutorGroupDefaultEventLoopGroup的實現(xiàn)非常簡單漠趁,只定義了一些構(gòu)造方法扁凛,不再多說。

EventLoop

DefaultEventLoopDefaultEventLoopGroup一樣棚潦,實現(xiàn)了EventLoop接口令漂,將線程與NIO連接到了一起。

DefaultEventLoop
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末丸边,一起剝皮案震驚了整個濱河市叠必,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌妹窖,老刑警劉巖纬朝,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異骄呼,居然都是意外死亡共苛,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門蜓萄,熙熙樓的掌柜王于貴愁眉苦臉地迎上來隅茎,“玉大人,你說我怎么就攤上這事嫉沽”傧” “怎么了?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵绸硕,是天一觀的道長堂竟。 經(jīng)常有香客問我,道長玻佩,這世上最難降的妖魔是什么出嘹? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮咬崔,結(jié)果婚禮上税稼,老公的妹妹穿的比我還像新娘。我一直安慰自己垮斯,他們只是感情好娶聘,可當(dāng)我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著甚脉,像睡著了一般丸升。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上牺氨,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天狡耻,我揣著相機與錄音,去河邊找鬼猴凹。 笑死夷狰,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的郊霎。 我是一名探鬼主播沼头,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了进倍?” 一聲冷哼從身側(cè)響起土至,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎猾昆,沒想到半個月后陶因,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡垂蜗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年楷扬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贴见。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡烘苹,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出片部,到底是詐尸還是另有隱情镣衡,我是刑警寧澤,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布吞琐,位于F島的核電站捆探,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏站粟。R本人自食惡果不足惜黍图,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望奴烙。 院中可真熱鬧助被,春花似錦、人聲如沸切诀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽幅虑。三九已至丰滑,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間倒庵,已是汗流浹背褒墨。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留擎宝,地道東北人郁妈。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像绍申,于是被迫代替她去往敵國和親噩咪。 傳聞我的和親對象是個殘疾皇子顾彰,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容