NioEventLoop的創(chuàng)建
NioEventLoop是netty及其重要的組成部件超燃,它的首要職責(zé)就是為注冊在它上的channels服務(wù),發(fā)現(xiàn)這些channels上發(fā)生的新連接晨仑、讀寫等I/O事件,然后將事件轉(zhuǎn)交 channel 流水線處理拆檬。使用netty時(shí)洪己,我們首先要做的就是創(chuàng)建NioEventLoopGroup,這是一組NioEventLoop的集合竟贯,類似線程與線程池答捕。通常,服務(wù)端會創(chuàng)建2個(gè)group,一個(gè)叫做bossGroup,一個(gè)叫做workerGroup屑那。bossGroup負(fù)責(zé)監(jiān)聽綁定的端口拱镐,接受請求并創(chuàng)建新連接,初始化后交由workerGroup處理后續(xù)IO事件持际。
NioEventLoop和NioEventLoopGroup的類圖
首先看看NioEventLoop和NioEventLoopGroup的類關(guān)系圖
類多但不亂沃琅,可以發(fā)現(xiàn)三個(gè)特點(diǎn):
- 兩者都繼承了ExecutorService,從而與線程池建立了聯(lián)系
- NioEventLoop繼承的都是SingleThread蜘欲,NioEventLoop繼承的是MultiThread
- NioEventLoop還繼承了AbstractScheduledEventExecutor益眉,不難猜出這是個(gè)和定時(shí)任務(wù)調(diào)度有關(guān)的線程池
NioEventLoopGroup的創(chuàng)建
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
我們先看看bossGroup和workerGroup的構(gòu)造方法。
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
除此之外姥份,還有多達(dá)8種構(gòu)造方法郭脂,這些構(gòu)造方法可以指定5種參數(shù):
1、最大線程數(shù)量澈歉。如果指定為0展鸡,那么Netty會將線程數(shù)量設(shè)置為CPU邏輯處理器數(shù)量的2倍
2、線程工廠闷祥。要求線程工廠類必須實(shí)現(xiàn)java.util.concurrent.ThreadFactory接口娱颊。如果沒有指定線程工廠傲诵,那么默認(rèn)DefaultThreadFactory。
3箱硕、SelectorProvider拴竹。如果沒有指定SelectorProvider,那么默認(rèn)的SelectorProvider為SelectorProvider.provider()剧罩。
4栓拜、SelectStrategyFactory。如果沒有指定則默認(rèn)為DefaultSelectStrategyFactory.INSTANCE
5惠昔、RejectedExecutionHandler幕与。拒絕策略處理類,如果這個(gè)EventLoopGroup已被關(guān)閉镇防,那么之后提交的Runnable任務(wù)會默認(rèn)調(diào)用RejectedExecutionHandler的reject方法進(jìn)行處理啦鸣。如果沒有指定,則默認(rèn)調(diào)用拒絕策略来氧。
最終诫给,NioEventLoopGroup會重載到父類MultiThreadEventExecutorGroup的構(gòu)造方法上,這里省略了一些健壯性代碼。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
// 步驟1
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 步驟2
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
// 步驟3
chooser = chooserFactory.newChooser(children);
// 步驟4
final FutureListener<Object> terminationListener = future -> {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 步驟5
Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
這里可以分解為5個(gè)步驟啦扬,下面一步步講解
步驟1
第一個(gè)步驟是創(chuàng)建線程池executor中狂。從workerGroup構(gòu)造方法可知,默認(rèn)傳進(jìn)來的executor為null扑毡,所以首先創(chuàng)建executor胃榕。newDefaultThreadFactory的作用是設(shè)置線程的前綴名和線程優(yōu)先級,默認(rèn)情況下瞄摊,前綴名是nioEventLoopGroup-x-y這樣的命名規(guī)則,而線程優(yōu)先級則是5勋又,處于中間位置。
創(chuàng)建完newDefaultThreadFactory后泉褐,進(jìn)入到ThreadPerTaskExecutor赐写。它直接實(shí)現(xiàn)了juc包的線程池頂級接口,從構(gòu)造方法可以看到它只是簡單的把factory賦值給自己的成員變量膜赃。而它實(shí)現(xiàn)的接口方法調(diào)用了threadFactory的newThread方法。從名字可以看出揉忘,它構(gòu)造了一個(gè)thread跳座,并立即啟動(dòng)thread。
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
那么我們回過頭來看下DefaultThreadFactory的newThread方法,發(fā)現(xiàn)他創(chuàng)建了一個(gè)FastThreadLocalThread泣矛。這是netty自定義的一個(gè)線程類疲眷,顧名思義,netty認(rèn)為它的性能更快您朽。關(guān)于它的解析留待以后狂丝。這里步驟1創(chuàng)建線程池就完成了换淆。總的來說他與我們通常使用的線程池不太一樣几颜,不設(shè)置線程池的線程數(shù)和任務(wù)隊(duì)列倍试,而是來一個(gè)任務(wù)啟動(dòng)一個(gè)線程。(問題:那任務(wù)一多豈不是直接線程爆炸蛋哭?)
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
步驟2
步驟2是創(chuàng)建workerGroup中的NioEventLoop县习。在示例代碼中,傳進(jìn)來的線程數(shù)是0谆趾,顯然不可能真的只創(chuàng)建0個(gè)nioEventLoop線程躁愿。在調(diào)用父類MultithreadEventLoopGroup構(gòu)造函數(shù)時(shí),對線程數(shù)進(jìn)行了判斷沪蓬,若為0彤钟,則傳入默認(rèn)線程數(shù),該值默認(rèn)為2倍CPU核心數(shù)
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 靜態(tài)代碼塊初始化DEFAULT_EVENT_LOOP_THREADS
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
接下來是通過newChild方法為每一個(gè)EventExecutor創(chuàng)建一個(gè)對應(yīng)的NioEventLoop跷叉。這個(gè)方法傳入了一些args到NioEventLoop中样勃,前三個(gè)是在NioEventLoopGroup創(chuàng)建時(shí)傳過來的參數(shù)。默認(rèn)值見上文
- SlectorProvider.provider, 用于創(chuàng)建 Java NIO Selector 對象;
- SelectStrategyFactory, 選擇策略工廠;
- RejectedExecutionHandlers, 拒絕執(zhí)行處理器;
- EventLoopTaskQueueFactory,任務(wù)隊(duì)列工廠,默認(rèn)為null;
進(jìn)入NioEventLoop的構(gòu)造函數(shù)性芬,如下:
NioEventLoop構(gòu)造函數(shù)
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
// 父類構(gòu)造函數(shù)
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue");
rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler");
}
首先調(diào)用一個(gè)newTaskQueue方法創(chuàng)建一個(gè)任務(wù)隊(duì)列峡眶。這是一個(gè)mpsc即多生產(chǎn)者單消費(fèi)者的無鎖隊(duì)列。之后調(diào)用父類的構(gòu)造函數(shù)植锉,在父類的構(gòu)造函數(shù)中辫樱,將NioEventLoopGroup設(shè)置為自己的parent,并通過匿名內(nèi)部類創(chuàng)建了這樣一個(gè)Executor————通過ThreadPerTaskExecutor執(zhí)行傳進(jìn)來的任務(wù)俊庇,并且在執(zhí)行時(shí)將當(dāng)前線程與NioEventLoop綁定狮暑。其他屬性也一一設(shè)置。
在nioEventLoop構(gòu)造函數(shù)中辉饱,我們發(fā)現(xiàn)創(chuàng)建了一個(gè)selector搬男,不妨看一看netty對它的包裝。
unwrappedSelector = provider.openSelector();
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
首先看到netty定義了一個(gè)常量DISABLE_KEY_SET_OPTIMIZATION彭沼,如果這個(gè)常量設(shè)置為true缔逛,也即不對keyset進(jìn)行優(yōu)化,則直接返回未包裝的selector姓惑。那么netty對selector進(jìn)行了哪些優(yōu)化褐奴?
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
}
往下,我們看到了一個(gè)叫做selectedSelectionKeySet的類于毙,點(diǎn)進(jìn)去可以看到敦冬,它繼承了AbstractSet,然而它的成員變量卻讓我們想到了ArrayList,再看看它定義的方法唯沮,除了不支持remove和contains外脖旱,活脫脫一個(gè)簡化版的ArrayList堪遂,甚至也支持?jǐn)U容。
沒錯(cuò)萌庆,netty確實(shí)通過反射的方式溶褪,將selectionKey從Set替換為了ArrayList。仔細(xì)一想踊兜,卻又覺得此番做法有些道理竿滨。眾所周知,雖然HashSet和ArrayList隨機(jī)查找的時(shí)間復(fù)雜度都是o(1)捏境,但相比數(shù)組直接通過偏移量定位于游,HashSet由于需要Hash運(yùn)算,時(shí)間消耗上又稍稍遜色了些垫言。再加上使用場景上贰剥,都是獲取selectionKey集合然后遍歷,Set去重的特性完全用不上,也無怪乎追求性能的netty想要替換它了筷频。
步驟3
創(chuàng)建完workerGroup的NioEventLoop后,如何挑選一個(gè)nioEventLoop進(jìn)行工作是netty接下來想要做的事蚌成。一般來說輪詢是一個(gè)很容易想到的方案,為此需要?jiǎng)?chuàng)建一個(gè)類似負(fù)載均衡作用的線程選擇器凛捏。當(dāng)然追求性能到喪心病狂的netty是不會輕易滿足的担忧。我們看看netty在這樣常見的方案里又做了哪些操作。
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// PowerOfTwo
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
// Generic
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
可以看到坯癣,netty根據(jù)workerGroup內(nèi)線程的數(shù)量采取了2種不同的線程選擇器瓶盛,當(dāng)線程數(shù)x是2的冪次方時(shí),可以通過&(x-1)來達(dá)到對x取模的效果示罗,其他情況則需要直接取模惩猫。這與hashmap強(qiáng)制設(shè)置容量為2的冪次方有異曲同工之妙。
步驟4
步驟4就是添加一些保證健壯性而添加的監(jiān)聽器了蚜点,這些監(jiān)聽器會在EventLoop被關(guān)閉后得到通知轧房。
步驟5
創(chuàng)建一個(gè)只讀的NioEventLoop線程組
到此NioEventLoopGroup及其包含的NioEventLoop組就創(chuàng)建完成了