Netty源碼解析—— EventLoop(二)之 EventLoopGroup
1.類結(jié)構(gòu)圖
2. EventExecutorGroup
EventExecutorGroup
實現(xiàn) ScheduledExecutorService 凿叠、Iterable接口涩笤,這兩個接口都是jdk原生接口嚼吞,具體看EventExecutorGroup接口中的方法,代碼如下:
// ========== 自定義接口 ===================================
//是否正在關(guān)閉
boolean isShuttingDown();
//優(yōu)雅關(guān)閉線程池
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
//返回線程池終止時的異步結(jié)果
Future<?> terminationFuture();
//選擇一個 EventExecutor 對象
EventExecutor next();
// ========== 實現(xiàn)自 Iterable 接口 ==========
@Override
Iterator<EventExecutor> iterator();
// ========== 實現(xiàn)自 ExecutorService 接口 ==========
@Override
@Deprecated
void shutdown();
@Override
@Deprecated
List<Runnable> shutdownNow();
@Override
Future<?> submit(Runnable task);
@Override
<T> Future<T> submit(Runnable task, T result);
@Override
<T> Future<T> submit(Callable<T> task);
// ========== 實現(xiàn)自 ScheduledExecutorService 接口 ==========
@Override
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
@Override
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
@Override
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
- 重點關(guān)注next()方法蹬碧,該方法的功能是從線程池中選擇一個線程
- 比較特殊的是舱禽,接口方法返回類型為 Future 不是 Java 原生的
java.util.concurrent.Future
,而是 Netty 自己實現(xiàn)的 Future 接口恩沽,如下代碼:
public interface Future<V> extends java.util.concurrent.Future<V>
public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V>
3. AbstractEventExecutorGroup
io.netty.util.concurrent.AbstractEventExecutorGroup
誊稚,實現(xiàn) EventExecutorGroup 接口,EventExecutor ( 事件執(zhí)行器 )的分組抽象類罗心。
3.1 submit
#submit(...)
方法里伯,提交一個普通任務(wù)到 EventExecutor 中, 提交的 EventExecutor ,通過 #next()
方法選擇
@Override
public Future<?> submit(Runnable task) {
return next().submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
}
3.2 schedule
#schedule(...)
方法渤闷,提交一個定時任務(wù)到 EventExecutor 中,提交的 EventExecutor 疾瓮,通過 #next()
方法選擇。代碼如下:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return next().schedule(command, delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return next().schedule(callable, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
3.3 execute
#execute(...)
方法飒箭,在 EventExecutor 中執(zhí)行一個普通任務(wù),不需要返回結(jié)果狼电,代碼如下:
@Override
public void execute(Runnable command) {
next().execute(command);
}
3.4 invokeAll
#invokeAll(...)
方法,在 EventExecutor 中執(zhí)行多個普通任務(wù), 多個任務(wù)使用同一個 EventExecuto弦蹂。代碼如下:
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return next().invokeAll(tasks);
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return next().invokeAll(tasks, timeout, unit);
}
3.5 invokeAny
#invokeAll(...)
方法肩碟,在 EventExecutor 中執(zhí)行多個普通任務(wù),有一個執(zhí)行完成即可凸椿,多個任務(wù)使用同一個 EventExecutor 腾务。代碼如下:
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return next().invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return next().invokeAny(tasks, timeout, unit);
}
3.6 shutdown
#shutdown(...)
方法,關(guān)閉 EventExecutorGroup 削饵。代碼如下:
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
@Override
@Deprecated
public abstract void shutdown();
@Override
@Deprecated
public List<Runnable> shutdownNow() {
shutdown();
return Collections.emptyList();
}
- 具體的
#shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)
和#shutdown()
方法岩瘦,由子類實現(xiàn)。
4. MultithreadEventExecutorGroup
4.1 構(gòu)造方法
/**
* EventExecutor 數(shù)組
*/
private final EventExecutor[] children;
/**
* 不可變( 只讀 )的 EventExecutor 數(shù)組
*/
private final Set<EventExecutor> readonlyChildren;
/**
* 已終止的 EventExecutor 數(shù)量
*/
private final AtomicInteger terminatedChildren = new AtomicInteger();
/**
* 用于終止 EventExecutor 的異步 Future
*/
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
/**
* EventExecutor 選擇器
*/
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 創(chuàng)建執(zhí)行器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 創(chuàng)建 EventExecutor 數(shù)組
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
// 是否創(chuàng)建成功
boolean success = false;
try {
// 創(chuàng)建 EventExecutor 對象窿撬,newChild抽象方法启昧,具體有子類實現(xiàn)
children[i] = newChild(executor, args);
// 標記創(chuàng)建成功
success = true;
} catch (Exception e) {
// 創(chuàng)建失敗,拋出 IllegalStateException 異常
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 創(chuàng)建失敗劈伴,關(guān)閉所有已創(chuàng)建的 EventExecutor
if (!success) {
// 優(yōu)雅的關(guān)閉所有已創(chuàng)建的 EventExecutor密末,只負責關(guān)閉線程,并不知道關(guān)閉的結(jié)果
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
// 確保所有已創(chuàng)建的 EventExecutor 已關(guān)閉
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
//isTerminated() 若關(guān)閉后所有任務(wù)都已完成跛璧,則返回true严里。注意除非首先調(diào)用shutdown或shutdownNow,否則isTerminated永不為true追城。
// 返回:若關(guān)閉后所有任務(wù)都已完成刹碾,則返回true。
while (!e.isTerminated()) {
//等所有已提交的任務(wù)(包括正在跑的和隊列中等待的)執(zhí)行完
//或者等超時時間到
//或者線程被中斷座柱,拋出InterruptedException
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 創(chuàng)建 EventExecutor 選擇器
chooser = chooserFactory.newChooser(children);
// 創(chuàng)建監(jiān)聽器迷帜,用于 EventExecutor 終止時的監(jiān)聽
//回調(diào)的具體邏輯是物舒,當所有 EventExecutor 都終止完成時,
// 通過調(diào)用 Future#setSuccess(V result) 方法戏锹,通知監(jiān)聽器們冠胯。至于為什么設(shè)置的值是 null ,因為監(jiān)聽器們不關(guān)注具體的結(jié)果锦针。
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 線程池中的線程每終止一個增加記錄數(shù)荠察,直到全部終止設(shè)置線程池異步終止結(jié)果為成功
if (terminatedChildren.incrementAndGet() == children.length) {// 全部關(guān)閉
terminationFuture.setSuccess(null);// 設(shè)置結(jié)果,并通知監(jiān)聽器們奈搜。
}
}
};
// 設(shè)置監(jiān)聽器到每個 EventExecutor 上
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 創(chuàng)建不可變( 只讀 )的 EventExecutor 數(shù)組
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
//設(shè)置不可變的EventExecutor集合
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
4.2 ThreadPerTaskExecutor
- 創(chuàng)建執(zhí)行器的代碼如下:
// 創(chuàng)建執(zhí)行器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
具體看下 ThreadPerTaskExecutor這個類割粮,代碼如下:
/**
* 實現(xiàn) Executor 接口,每個任務(wù)一個線程的執(zhí)行器實現(xiàn)類
*/
public final class ThreadPerTaskExecutor implements Executor {
/**
* 線程工廠對象
* Netty 實現(xiàn)自定義的 ThreadFactory 類媚污,為 io.netty.util.concurrent.DefaultThreadFactory
*/
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
/**
* 執(zhí)行任務(wù)
*
* @param command 任務(wù)
*
* 通過 ThreadFactory#newThread(Runnable) 方法舀瓢,創(chuàng)建一個 Thread ,然后調(diào)用 Thread#start() 方法耗美,啟動線程執(zhí)行任務(wù)
*/
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
io.netty.util.concurrent.ThreadPerTaskExecutor
京髓,實現(xiàn) Executor 接口,每個任務(wù)一個線程的執(zhí)行器實現(xiàn)類threadFactory 屬性商架,線程工程實例堰怨,通過構(gòu)造方法來初始化,Netty 實現(xiàn)自定義的 ThreadFactory 類蛇摸,為
io.netty.util.concurrent.DefaultThreadFactory
具體的創(chuàng)建看如下方法,創(chuàng)建默認的線程工廠類/** * 創(chuàng)建線程工廠對象,并且使用類名作為 poolType * @return */ protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); }
#execute(Runnable command)
方法备图,通過ThreadFactory#newThread(Runnable)
方法,創(chuàng)建一個 Thread 赶袄,然后調(diào)用Thread#start()
方法揽涮,啟動線程執(zhí)行任務(wù)
4.3 DefaultThreadFactory
4.4 EventExecutorChooserFactory
io.netty.util.concurrent.EventExecutorChooserFactory
,EventExecutorChooser 工廠接口饿肺。代碼如下:
/**
* Factory that creates new {@link EventExecutorChooser}s.
*
* EventExecutorChooser 工廠接口
*/
@UnstableApi
public interface EventExecutorChooserFactory {
/**
* Returns a new {@link EventExecutorChooser}.
*
* 創(chuàng)建一個 EventExecutorChooser 對象
*/
EventExecutorChooser newChooser(EventExecutor[] executors);
/**
* Chooses the next {@link EventExecutor} to use.
*
* EventExecutor 選擇器接口
*/
@UnstableApi
interface EventExecutorChooser {
/**
* Returns the new {@link EventExecutor} to use.
*
* 選擇下一個 EventExecutor 對象
*/
EventExecutor next();
}
}
-
#newChooser(EventExecutor[] executors)
方法蒋困,創(chuàng)建一個 EventExecutorChooser 對象; - EventExecutorChooser 接口,EventExecutor 選擇器接口敬辣。
-
#next()
方法選擇下一個 EventExecutor對象雪标;
-
4.4.1 DefaultEventExecutorChooserFactory
io.netty.util.concurrent.DefaultEventExecutorChooserFactory
,實現(xiàn) EventExecutorChooserFactory 接口溉跃,默認 EventExecutorChooser 工廠實現(xiàn)類村刨。代碼如下
/**
* Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
*
* 實現(xiàn) EventExecutorChooserFactory 接口,默認 EventExecutorChooser 工廠實現(xiàn)類
*/
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
/**
* 單例
*/
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {// 是否為 2 的冪次方
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
/**
* 是否為 2 的冪次方
* @param val
* @return
*/
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
}
- DefaultEventExecutorChooserFactory是個單例撰茎;
- Netty實現(xiàn)了兩個線程選擇器嵌牺,雖然代碼不一致,功能都是一樣的:每次選擇索引為上一次所選線程索引+1的線程
-
#newChooser(EventExecutor[] executors)
創(chuàng)建具體的選擇器,根據(jù)#isPowerOfTwo(executors.length)
方法來判斷髓梅,創(chuàng)建哪種選擇器,EventExecutor 數(shù)組的大小是否為 2 的冪次方绎签,如果是枯饿,創(chuàng)建PowerOfTwoEventExecutorChooser選擇器,不是則創(chuàng)建GenericEventExecutorChooser選擇器诡必; -
#isPowerOfTwo(int val)
方法奢方,為什么(val & -val) == val
可以判斷數(shù)字是否為 2 的冪次方呢?
? 我們以 8 來舉個例子:
- 8 的二進制為 `1000`
- -8 的二進制使用補碼表示爸舒。所以蟋字,先求反生成反碼為 `0111` ,然后加一生成補碼為 `1000`
- 8 和 -8 與操作后扭勉,還是 8 鹊奖。與操作是都為1則為1,其他都為0涂炎,所以結(jié)果還是1000&1000還是1000忠聚;
- 實際上,以 2 為冪次方的數(shù)字唱捣,都是最高位為 1 两蟀,剩余位為 0 ,所以對應(yīng)的負數(shù)震缭,求完補碼還是自己
4.4.2 PowerOfTwoEventExecutorChooser
PowerOfTwoEventExecutorChooser 實現(xiàn) EventExecutorChooser 接口赂毯,基于 EventExecutor 數(shù)組的大小為 2 的冪次方的 EventExecutor 選擇器實現(xiàn)類。這是一個優(yōu)化的實現(xiàn)拣宰,線程池數(shù)量使用2的冪次方党涕,這樣線程池選擇線程時使用位操作,能使性能最高巡社,PowerOfTwoEventExecutorChooser 是 DefaultEventExecutorChooserFactory 的靜態(tài)內(nèi)部類遣鼓,代碼如下:
/**
* 實現(xiàn) EventExecutorChooser 接口,基于 EventExecutor 數(shù)組的大小為 2 的冪次方的 EventExecutor 選擇器實現(xiàn)類
*/
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
/**
* 自增序列
*/
private final AtomicInteger idx = new AtomicInteger();
/**
* EventExecutor 數(shù)組
*/
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
/**
* 因為 - ( 二元操作符 ) 的計算優(yōu)先級高于 & ( 一元操作符 ) 重贺。
*
* 因為 EventExecutor 數(shù)組的大小是以 2 為冪次方的數(shù)字骑祟,那么減一后,除了最高位是 0 气笙,剩余位都為 1 ( 例如 8 減一后等于 7 次企,而 7 的二進制為 0111 。)潜圃,
* 那么無論 idx 無論如何遞增缸棵,再進行 & 并操作,都不會超過 EventExecutor 數(shù)組的大小谭期。并且堵第,還能保 證順序遞增吧凉。
* @return
*/
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
4.4.3 GenericEventExecutorChooser
GenericEventExecutorChooser 實現(xiàn) EventExecutorChooser 接口,通用的 EventExecutor 選擇器實現(xiàn)類踏志。代碼如下:
GenericEventExecutorChooser 內(nèi)嵌在 DefaultEventExecutorChooserFactory 類中阀捅。
?
/**
* GenericEventExecutorChooser 內(nèi)嵌在 DefaultEventExecutorChooserFactory 類中。
* 實現(xiàn) EventExecutorChooser 接口针余,通用的 EventExecutor 選擇器實現(xiàn)類
*/
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
/**
* 使用 idx 自增饲鄙,并使用 EventExecutor 數(shù)組的大小來取余
* @return
*/
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
4.5 next
#next()
方法,選擇下一個 EventExecutor 對象圆雁。代碼如下:
/**
* 選擇下一個 EventExecutor 對象
* @return
*/
@Override
public EventExecutor next() {
return chooser.next();
}
4.6 iterator
/**
* 獲得 EventExecutor 數(shù)組的迭代器
* 為了避免調(diào)用方忍级,獲得迭代器后,對 EventExecutor 數(shù)組進行修改伪朽,
* 所以返回是不可變的 EventExecutor 數(shù)組 readonlyChildren 的迭代器
* @return
*/
@Override
public Iterator<EventExecutor> iterator() {
return readonlyChildren.iterator();
}
4.7 executorCount
/**
* Return the number of {@link EventExecutor} this implementation uses. This number is the maps
* 1:1 to the threads it use.
*
* 獲得 EventExecutor 數(shù)組的大小
*/
public final int executorCount() {
return children.length;
}
4.8 newChild
/**
* Create a new EventExecutor which will later then accessible via the {@link #next()} method. This method will be
* called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
*
* 抽象方法轴咱,子類實現(xiàn)該方法,創(chuàng)建其對應(yīng)的 EventExecutor 實現(xiàn)類的對象
*
*/
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
4.9 shutdownGracefully
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
l.shutdownGracefully(quietPeriod, timeout, unit);
}
return terminationFuture();
}
優(yōu)雅的關(guān)閉EventExecutor線程組烈涮,返回terminationFuture嗦玖,在構(gòu)造方法中由于已經(jīng)設(shè)置了監(jiān)聽,如下代碼,通過success屬性來判斷是否全部都關(guān)閉跃脊;
final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { // 線程池中的線程每終止一個增加記錄數(shù)宇挫,直到全部終止設(shè)置線程池異步終止結(jié)果為成功 if (terminatedChildren.incrementAndGet() == children.length) {// 全部關(guān)閉 terminationFuture.setSuccess(null);// 設(shè)置結(jié)果,并通知監(jiān)聽器們酪术。 } } };
4.10 terminationFuture
/**
* 返回用于終止 EventExecutor 的異步 Future
* @return
*/
@Override
public Future<?> terminationFuture() {
return terminationFuture;
}
4.11 shutdown
/**
* 廢棄的方法器瘪,EventExecutor線程組關(guān)閉
*/
@Override
@Deprecated
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}
4.12 isShuttingDown
/**
* 判斷所有的EventExecutor是否在優(yōu)雅的關(guān)閉,或者已經(jīng)關(guān)閉,
* 任何一個EventExecutor沒有關(guān)閉則返回false
* @return
*/
@Override
public boolean isShuttingDown() {
for (EventExecutor l: children) {
if (!l.isShuttingDown()) {
return false;
}
}
return true;
}
4.13 isShutdown
/**
* 判斷所有的EventExecutor是否都關(guān)閉
* @return
*/
@Override
public boolean isShutdown() {
for (EventExecutor l: children) {
if (!l.isShutdown()) {
return false;
}
}
return true;
}
4.14 isTerminated
/**
* EventExecutor線程組關(guān)閉后绘雁,所有任務(wù)是否都已完成
* @return
*/
@Override
public boolean isTerminated() {
for (EventExecutor l: children) {
if (!l.isTerminated()) {
return false;
}
}
return true;
}
4.15 awaitTermination
/**
* 等待所有的EventExecutor任務(wù)都執(zhí)行完或者等待時間超時,返回任務(wù)是否都已經(jīng)執(zhí)行完
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
loop: for (EventExecutor l: children) {
for (;;) {
//超時則跳出loop循環(huán)
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
//等所有已提交的任務(wù)(包括正在跑的和隊列中等待的)執(zhí)行完
//或者等超時時間到
//或者線程被中斷橡疼,拋出InterruptedException
//跳出for(;;)循環(huán)
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
5. EventLoopGroup
io.netty.channel.EventExecutorGroup
,繼承 EventExecutorGroup 接口庐舟,EventLoop 的分組接口欣除。代碼如下:
// ========== 實現(xiàn)自 EventExecutorGroup 接口 ==========
/**
* Return the next {@link EventLoop} to use
* 覆蓋父類接口的方法,選擇下一個 EventLoop 對象
*/
@Override
EventLoop next();
// ========== 自定義接口 ==========
/**
* 注冊 Channel 到 EventLoopGroup 中的一個線程上。實際上挪略,EventLoopGroup 會分配一個 EventLoop 給該 Channel 注冊
*/
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
-
#next()
方法历帚,覆蓋父類接口的方法,選擇下一個 EventLoop 對象 -
#register(...)
方法,注冊 Channel 到 EventLoopGroup 中的一個線程上杠娱。實際上挽牢,EventLoopGroup 會分配一個 EventLoop 給該 Channel 注冊
6.MultithreadEventLoopGroup
io.netty.channel.MultithreadEventLoopGroup
,實現(xiàn) EventLoopGroup 接口摊求,繼承 MultithreadEventExecutorGroup 抽象類禽拔,基于多線程的 EventLoop 的分組抽象類。
6.1 構(gòu)造方法
/**
* 默認 EventLoop 線程數(shù)
* EventLoopGroup 默認擁有的 EventLoop 數(shù)量。因為一個 EventLoop 對應(yīng)一個線程睹栖,所以為 CPU 數(shù)量 * 2 硫惕。
* 為什么會 * 2 呢?因為目前 CPU 基本都是超線程野来,一個 CPU 可對應(yīng) 2 個線程恼除。
* 在構(gòu)造方法未傳入 nThreads 方法參數(shù)時,使用 DEFAULT_EVENT_LOOP_THREADS 梁只。
*/
private static final int DEFAULT_EVENT_LOOP_THREADS;
/**
* 初始化線程數(shù)
*/
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
* EventExecutorChooserFactory, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
}
- 主要初始化了線程數(shù)缚柳,然后調(diào)用父類的構(gòu)造方法
- 默認情況埃脏,線程數(shù)最小為1搪锣,如果配置了系統(tǒng)參數(shù)
io.netty.eventLoopThreads
,設(shè)置為該系統(tǒng)參數(shù)值彩掐,否則設(shè)置為核心數(shù)的2倍构舟。
6.2 newDefaultThreadFactory
#newDefaultThreadFactory()
方法,創(chuàng)建線程工廠對象堵幽,覆蓋父類方法狗超,增加了線程優(yōu)先級為 Thread.MAX_PRIORITY
,代碼如下:
/**
* 創(chuàng)建線程工廠對象
*
* 覆蓋父類方法朴下,增加了線程優(yōu)先級為 Thread.MAX_PRIORITY 努咐。
* @return
*/
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
6.3 next()
#next()
方法,選擇下一個 EventLoop 對象,覆蓋父類方法,將返回值轉(zhuǎn)換成 EventLoop 類,代碼如下:
/**
* 選擇下一個 EventLoop 對象
*
* 覆蓋父類方法殴胧,將返回值轉(zhuǎn)換成 EventLoop 類
* @return
*/
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
6.4 newChild
#newChild(...)
抽象方法渗稍,創(chuàng)建 EventExecutor 對象,覆蓋父類方法团滥,返回值改為 EventLoop 類竿屹。
/**
* 抽象方法,創(chuàng)建 EventExecutor 對象
*
* 覆蓋父類方法灸姊,返回值改為 EventLoop 類拱燃。
* @param executor
* @param args
* @return
* @throws Exception
*/
@Override
protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
6.5 register(
#register(...)
方法,注冊 Channel 到 EventLoopGroup 中力惯,通過#next()
方法來選擇一個EventLoop來注冊,也就是通過EventExecutorChooser選擇器從線程組中選擇一個碗誉;
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Deprecated
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return next().register(channel, promise);
}
}
7. NioEventLoopGroup
io.netty.channel.nio.NioEventLoopGroup
,繼承 MultithreadEventLoopGroup 抽象類父晶,NioEventLoop 的分組實現(xiàn)類诗充。
7.1 構(gòu)造方法
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
}
構(gòu)造方法比較多,主要是明確了父構(gòu)造方法的 Object ... args
方法參數(shù)
- 第一個參數(shù)诱建,
selectorProvider
蝴蜓,java.nio.channels.spi.SelectorProvider
,用于創(chuàng)建 Java NIO Selector 對象。 - 第二個參數(shù)茎匠,
selectStrategyFactory
格仲,io.netty.channel.SelectStrategyFactory
,選擇策略工廠诵冒。詳細解析凯肋,見后續(xù)文章。 - 第三個參數(shù)汽馋,
rejectedExecutionHandler
侮东,io.netty.channel.SelectStrategyFactory
,拒絕執(zhí)行處理器豹芯。詳細解析悄雅,見后續(xù)文章。
7.2 newChild
#newChild(Executor executor, Object... args)
方法铁蹈,創(chuàng)建 NioEventLoop 對象
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
- 模板方法newChild()宽闲,用來創(chuàng)建線程池中的單個線程,現(xiàn)在我們知道
MultithreadEventExecutorGroup
中EventExecutor[] children
保存的就是NioEventLoop
7.3 setIoRatio
#setIoRatio(int ioRatio)
方法,設(shè)置所有 EventLoop 的 IO 任務(wù)占用執(zhí)行時間的比例
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
7.4 rebuildSelectors
#rebuildSelectors()
方法,重建所有 EventLoop 的 Selector 對象
/**
* Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*
* 重建所有 EventLoop 的 Selector 對象
*
* 因為 JDK 有 epoll 100% CPU Bug 。實際上屹堰,NioEventLoop 當觸發(fā)該 Bug 時,
* 也會自動調(diào)用 NioEventLoop#rebuildSelector() 方法览徒,進行重建 Selector 對象,以修復(fù)該問題颂龙。
*/
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}