源碼分析
NIOEventLoopGroup
NioEventLoopGroup(其實是MultithreadEventExecutorGroup) 內(nèi)部維護(hù)一個類型為 EventExecutor children [], 默認(rèn)大小是處理器核數(shù) * 2, 這樣就構(gòu)成了一個線程池,初始化EventExecutor時NioEventLoopGroup重載newChild方法,所以children元素的實際類型為NioEventLoop。
線程啟動時調(diào)用SingleThreadEventExecutor的構(gòu)造方法晾匠,執(zhí)行NioEventLoop類的run方法,首先會調(diào)用hasTasks()方法判斷當(dāng)前taskQueue是否有元素儒拂。如果taskQueue中有元素监透,執(zhí)行 selectNow() 方法匙头,最終執(zhí)行selector.selectNow()瑟慈,該方法會立即返回桃移。如果taskQueue沒有元素,執(zhí)行 select(oldWakenUp) 方法
select ( oldWakenUp) 方法解決了 Nio 中的 bug葛碧,selectCnt 用來記錄selector.select方法的執(zhí)行次數(shù)和標(biāo)識是否執(zhí)行過selector.selectNow()借杰,若觸發(fā)了epoll的空輪詢bug,則會反復(fù)執(zhí)行selector.select(timeoutMillis)进泼,變量selectCnt 會逐漸變大蔗衡,當(dāng)selectCnt 達(dá)到閾值(默認(rèn)512)纤虽,則執(zhí)行rebuildSelector方法,進(jìn)行selector重建粘都,解決cpu占用100%的bug廓推。
rebuildSelector方法先通過openSelector方法創(chuàng)建一個新的selector刷袍。然后將old selector的selectionKey執(zhí)行cancel翩隧。最后將old selector的channel重新注冊到新的selector中。rebuild后呻纹,需要重新執(zhí)行方法selectNow堆生,檢查是否有已ready的selectionKey。
接下來調(diào)用processSelectedKeys 方法(處理I/O任務(wù))雷酪,當(dāng)selectedKeys != null時淑仆,調(diào)用processSelectedKeysOptimized方法,迭代 selectedKeys 獲取就緒的 IO 事件的selectkey存放在數(shù)組selectedKeys中, 然后為每個事件都調(diào)用 processSelectedKey 來處理它哥力,processSelectedKey 中分別處理OP_READ蔗怠;OP_WRITE;OP_CONNECT事件吩跋。
最后調(diào)用runAllTasks方法(非IO任務(wù))寞射,該方法首先會調(diào)用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已經(jīng)超過延遲執(zhí)行時間的任務(wù)移到taskQueue中等待被執(zhí)行锌钮,然后依次從taskQueue中取任務(wù)執(zhí)行桥温,每執(zhí)行64個任務(wù),進(jìn)行耗時檢查梁丘,如果已執(zhí)行時間超過預(yù)先設(shè)定的執(zhí)行時間侵浸,則停止執(zhí)行非IO任務(wù),避免非IO任務(wù)太多氛谜,影響IO任務(wù)的執(zhí)行掏觉。
每個NioEventLoop對應(yīng)一個線程和一個Selector,NioServerSocketChannel會主動注冊到某一個NioEventLoop的Selector上值漫,NioEventLoop負(fù)責(zé)事件輪詢澳腹。
Outbound 事件都是請求事件, 發(fā)起者是 Channel,處理者是 unsafe惭嚣,通過 Outbound 事件進(jìn)行通知枕磁,傳播方向是 tail到head。Inbound 事件發(fā)起者是 unsafe形庭,事件的處理者是 Channel, 是通知事件萌踱,傳播方向是從頭到尾。
內(nèi)存管理機制槽地,首先會預(yù)申請一大塊內(nèi)存Arena迁沫,Arena由許多Chunk組成芦瘾,而每個Chunk默認(rèn)由2048個page組成。Chunk通過AVL樹的形式組織Page集畅,每個葉子節(jié)點表示一個Page近弟,而中間節(jié)點表示內(nèi)存區(qū)域,節(jié)點自己記錄它在整個Arena中的偏移地址挺智。當(dāng)區(qū)域被分配出去后祷愉,中間節(jié)點上的標(biāo)記位會被標(biāo)記,這樣就表示這個中間節(jié)點以下的所有節(jié)點都已被分配了赦颇。大于8k的內(nèi)存分配在poolChunkList中二鳄,而PoolSubpage用于分配小于8k的內(nèi)存,它會把一個page分割成多段媒怯,進(jìn)行內(nèi)存分配订讼。
ByteBuf的特點:支持自動擴容(4M),保證put方法不會拋出異常扇苞、通過內(nèi)置的復(fù)合緩沖類型欺殿,實現(xiàn)零拷貝(zero-copy);不需要調(diào)用flip()來切換讀/寫模式鳖敷,讀取和寫入索引分開脖苏;方法鏈;引用計數(shù)基于AtomicIntegerFieldUpdater用于內(nèi)存回收哄陶;PooledByteBuf采用二叉樹來實現(xiàn)一個內(nèi)存池帆阳,集中管理內(nèi)存的分配和釋放,不用每次使用都新建一個緩沖區(qū)對象屋吨。UnpooledHeapByteBuf每次都會新建一個緩沖區(qū)對象蜒谤。
NioEventLoop與NioChannel類關(guān)系
一個NioEventLoopGroup下包含多個NioEventLoop
每個NioEventLoop中包含有一個Selector,一個taskQueue至扰,一個delayedTaskQueue
每個NioEventLoop的Selector上可以注冊監(jiān)聽多個AbstractNioChannel.ch
每個AbstractNioChannel只會綁定在唯一的NioEventLoop上
每個AbstractNioChannel都綁定有一個自己的DefaultChannelPipeline
NioEventLoop線程執(zhí)行過程
輪詢監(jiān)聽的IO事件
netty的輪詢注冊機制
netty將AbstractNioChannel內(nèi)部的jdk類SelectableChannel對象注冊到NioEventLoop中的jdk類Selector對象上去鳍徽,并且將AbstractNioChannel作為SelectableChannel對象的一個attachment附屬上。
這樣在Selector輪詢到某個SelectableChannel有IO事件發(fā)生時敢课,就可以直接取出IO事件對應(yīng)的AbstractNioChannel進(jìn)行后續(xù)操作阶祭。循環(huán)執(zhí)行阻塞selector.select(timeoutMIllis)操作直到以下條件產(chǎn)生
a)輪詢到了IO事件(selectedKey != 0)
b)oldWakenUp參數(shù)為true
c)任務(wù)隊列里面有待處理任務(wù)(hasTasks())
d)第一個定時任務(wù)即將要被執(zhí)行(hasScheduledTasks())
e)用戶主動喚醒(wakenUp.get()==true)解決JDK的NIO epoll bug
該bug會導(dǎo)致Selector一直空輪詢,最終導(dǎo)致cpu 100%直秆。
在每次selector.select(timeoutMillis)后濒募,如果沒有監(jiān)聽到就緒IO事件,會記錄此次select的耗時圾结。如果耗時不足timeoutMillis瑰剃,說明select操作沒有阻塞那么長時間,可能觸發(fā)了空輪詢筝野,進(jìn)行一次計數(shù)晌姚。
計數(shù)累積超過閾值(默認(rèn)512)后粤剧,開始進(jìn)行Selector重建:
a)拿到有效的selectionKey集合
b)取消該selectionKey在舊的selector上的事件注冊
c)將該selectionKey對應(yīng)的Channel注冊到新的selector上,生成新的selectionKey
d)重新綁定Channel和新的selectionKey的關(guān)系netty優(yōu)化了sun.nio.ch.SelectorImpl類中的selectedKeys和publicSelectedKeys這兩個field的實現(xiàn)
netty通過反射將這兩個filed替換掉挥唠,替換后的field采用數(shù)組實現(xiàn)抵恋。
這樣每次在輪詢到nio事件的時候,netty只需要O(1)的時間復(fù)雜度就能將SelectionKey塞到set中去宝磨,而jdk原有field底層使用的hashSet需要O(lgn)的時間復(fù)雜度弧关。
處理IO事件
1)對于boss NioEventLoop來說,輪詢到的是基本上是連接事件(OP_ACCEPT):
a)socketChannel = ch.accept()懊烤;
b)將socketChannel綁定到worker NioEventLoop上梯醒;
c)socketChannel在worker NioEventLoop上創(chuàng)建register0任務(wù);
d)pipeline.fireChannelReadComplete();
2)對于worker NioEventLoop來說腌紧,輪詢到的基本上是IO讀寫事件(以O(shè)P_READ為例):
a)ByteBuffer.allocateDirect(capacity);
b)socketChannel.read(dst);
c)pipeline.fireChannelRead();
d)pipeline.fireChannelReadComplete();
處理任務(wù)隊列
1)處理用戶產(chǎn)生的普通任務(wù)
NioEventLoop中的Queue<Runnable> taskQueue被用來承載用戶產(chǎn)生的普通Task畜隶。
taskQueue被實現(xiàn)為netty的mpscQueue壁肋,即多生產(chǎn)者單消費者隊列。netty使用該隊列將外部用戶線程產(chǎn)生的Task聚集籽慢,并在reactor線程內(nèi)部用單線程的方式串行執(zhí)行隊列中的Task浸遗。
當(dāng)用戶在非IO線程調(diào)用Channel的各種方法執(zhí)行Channel相關(guān)的操作時,比如channel.write()箱亿、channel.flush()等跛锌,netty會將相關(guān)操作封裝成一個Task并放入taskQueue中,保證相關(guān)操作在IO線程中串行執(zhí)行届惋。
2)處理用戶產(chǎn)生的定時任務(wù)
NioEventLoop中的Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue被用來承載用戶產(chǎn)生的定時Task髓帽。
當(dāng)用戶在非IO線程需要產(chǎn)生定時操作時,netty將用戶的定時操作封裝成ScheduledFutureTask脑豹,即一個netty內(nèi)部的定時Task郑藏,并將定時Task放入delayedTaskQueue中等待對應(yīng)Channel的IO線程串行執(zhí)行。
為了解決多線程并發(fā)寫入delayedTaskQueue的問題瘩欺,netty將添加ScheduledFutureTask到delayedTaskQueue中的操作封裝成普通Task必盖,放入taskQueue中,通過NioEventLoop的IO線程對delayedTaskQueue進(jìn)行單線程寫操作俱饿。
3)處理任務(wù)隊列的邏輯
a)將已到期的定時Task從delayedTaskQueue中轉(zhuǎn)移到taskQueue中
b)計算本次循環(huán)執(zhí)行的截止時間
c)循環(huán)執(zhí)行taskQueue中的任務(wù)歌粥,每隔64個任務(wù)檢查一下是否已過截止時間,直到taskQueue中任務(wù)全部執(zhí)行完或者超過執(zhí)行截止時間拍埠。
Netty中Reactor線程和worker線程所處理的事件
Server端NioEventLoop處理的事件
Client端NioEventLoop處理的事件
pipeline原理與事件處理
目錄
- pipeline整體關(guān)系簡述
- Unsafe的作用
- 事件的分類及處理
- pipeline中節(jié)點的添加和刪除
一失驶、pipeline整體關(guān)系簡述
netty中的pipeline模型
當(dāng)EventLoop的selector監(jiān)聽到某Channel產(chǎn)生了就緒的IO事件,并調(diào)用socket API對就緒的IO事件進(jìn)行操作后械拍,需要將操作產(chǎn)生的“IO數(shù)據(jù)”或“操作結(jié)果”告知用戶進(jìn)行相應(yīng)的業(yè)務(wù)處理突勇。
netty將因外部IO事件導(dǎo)致的Channel狀態(tài)變更(Channel被注冊到EventLoop中装盯,Channel狀態(tài)變?yōu)榭捎茫珻hannel讀取到IO數(shù)據(jù)...)或Channel內(nèi)部邏輯操作(添加ChannelHandler...)抽象為不同的回調(diào)事件甲馋,并定義了pipeline對Channel的回調(diào)事件進(jìn)行流式的響應(yīng)處理埂奈。
用戶可在pipeline中添加多個事件處理器(ChannelHandler),并通過實現(xiàn)ChannelHandler中定義的方法定躏,對回調(diào)事件進(jìn)行定制化的業(yè)務(wù)處理账磺。ChannelHandler也可以調(diào)用自身方法對Channel本身進(jìn)行操作。
netty會保證“回調(diào)事件在ChannelHandler之間的流轉(zhuǎn)”及“Channel內(nèi)部IO操作”由EventLoop線程串行執(zhí)行痊远,用戶也可以在ChannelHandler中使用自行構(gòu)建的業(yè)務(wù)線程進(jìn)行業(yè)務(wù)處理垮抗。
pipeline相關(guān)類的關(guān)系圖
- DefaultChannelPipeline:
事件處理流,是一個雙向鏈表結(jié)構(gòu)碧聪,鏈表中節(jié)點元素為ChannelHandlerContext冒版。
新的AbstractNioChannel創(chuàng)建時,會創(chuàng)建該Channel對應(yīng)的DefaultChannelPipeline逞姿,用于處理該Channel對應(yīng)的回調(diào)事件辞嗡。
DefaultChannelPipeline創(chuàng)建時,會自動創(chuàng)建并向鏈表中添加兩個ChannelhandlerContext節(jié)點——head和tail滞造。
pipeline的fireXXX()方法:回調(diào)事件的發(fā)起方法续室。會產(chǎn)生相應(yīng)回調(diào)事件并直接調(diào)用ChannelHandlerContext.invokeXXX(head)方法將回調(diào)事件傳遞給pipeline的head節(jié)點。
ChannelHandlerContext:
事件處理器上下文谒养,pipeline中的實際處理節(jié)點挺狰。
每個處理節(jié)點ChannelHandlerContext中包含一個具體的事件處理器ChannelHandler,同時ChannelHandlerContext中也綁定了對應(yīng)的pipeline和Channel的信息买窟,方便ChannelHandler進(jìn)行調(diào)用丰泊。
AbstractChannelHandlerContext具體實現(xiàn)了ChannelHandlerContext接口的功能,并進(jìn)行了相應(yīng)擴展蔑祟。
- ChannelHandlerContext的fireXXX方法:回調(diào)事件的發(fā)起方法趁耗。會產(chǎn)生相應(yīng)回調(diào)事件并將其交給pipeline中的下一個處理節(jié)點。此方法提供給用戶實現(xiàn)的ChannelHandler使用疆虚,用于將回調(diào)事件向pipeline中的下一個節(jié)點傳遞苛败。
- AbstractChannelHandlerContext的static invokeXXX(AbstractChannelHandlerContext next)方法:封裝next.invokeXXX()的邏輯并交給EventLoop的IO線程執(zhí)行。
- ChannelHandlerContext的invokeXXX()方法:回調(diào)事件執(zhí)行方法径簿。執(zhí)行節(jié)點中事件處理器ChannelHandler的XXX方法罢屈,實際處理回調(diào)事件。
ChannelHandler:
ChannelHandler(事件處理器接口)篇亭,由ChannelInboundHandler接口和ChannelOutboundHandler接口繼承缠捌。
ChannelInboundHandler中定義了各個回調(diào)事件的回調(diào)方法,由用戶進(jìn)行具體實現(xiàn)。
ChannelOutboundHandler中定義了方法進(jìn)行Channel內(nèi)部IO操作(Channel發(fā)起bind/connect/close操作曼月,Channel監(jiān)聽OP_READ谊却,Channel寫IO數(shù)據(jù)...),供用戶在回調(diào)方法中使用哑芹。
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter為接口的默認(rèn)實現(xiàn)類(其實沒干什么事)炎辨,用戶通過繼承這兩個類來實現(xiàn)自己的業(yè)務(wù)處理邏輯
二、Unsafe的作用
概述
Unsafe是Channel的內(nèi)部類聪姿,一個Channel對應(yīng)一個Unsafe碴萧。
Unsafe用于處理Channel對應(yīng)網(wǎng)絡(luò)IO的底層操作。ChannelHandler處理回調(diào)事件時產(chǎn)生的相關(guān)網(wǎng)絡(luò)IO操作最終也會委托給Unsafe執(zhí)行末购。
Unsafe接口中定義了socket相關(guān)操作破喻,包括SocketAddress獲取、selector注冊盟榴、網(wǎng)卡端口綁定曹质、socket建連與斷連、socket寫數(shù)據(jù)曹货。這些操作都和jdk底層socket相關(guān)咆繁。
NioUnsafe在Unsafe基礎(chǔ)上增加了幾個操作,包括訪問jdk的SelectableChannel顶籽、socket讀數(shù)據(jù)等。
NioByteUnsafe實現(xiàn)了與socket連接的字節(jié)數(shù)據(jù)讀取相關(guān)的操作银觅。
NioMessageUnsafe實現(xiàn)了與新連接建立相關(guān)的操作礼饱。
三、事件的分類及處理
在netty的pipeline中包含兩種類型的事件究驴,分別為inbound和outbound镊绪,inbound為上行事件,outbound為下行事件洒忧。
inbound事件為被動觸發(fā)蝴韭,在某些情況發(fā)生時自動觸發(fā);
outbound為主動觸發(fā)熙侍,在需要主動執(zhí)行某些操作時觸發(fā)榄鉴。
Inbound事件
Outbound事件
發(fā)起并處理inbound事件
對于inbound事件,因為需要進(jìn)行業(yè)務(wù)邏輯處理蛉抓,因此pipeline的head節(jié)點會執(zhí)行fireXXX()方法將事件透傳給后面的用戶自己實現(xiàn)inbound處理節(jié)點庆尘,由用戶自己實現(xiàn)的ChannelHandler接收事件并回調(diào)執(zhí)行業(yè)務(wù)邏輯。
發(fā)起并處理outbound事件
對于outbound事件巷送,因為和IO操作相關(guān)驶忌,最后會由pipeline中的head節(jié)點接收處理。head節(jié)點實現(xiàn)了ChannelHandler的事件執(zhí)行方法笑跛,將實際的執(zhí)行操作委托給Unsafe進(jìn)行付魔。
bind聊品、connect、read几苍、writeAndFlush等outbound事件的處理過程可以自己跟代碼看一下翻屈。(最終由head節(jié)點委托給Unsafe類執(zhí)行相關(guān)IO操作)
head、tail節(jié)點的作用
head節(jié)點:
head節(jié)點既是inBound處理節(jié)點擦剑,又是outBound處理節(jié)點妖胀。
head節(jié)點作為pipeline的頭結(jié)點開始接收并傳遞inbound事件。并作為pipeline的最后一環(huán)最終接收處理outbound事件(委托Unsafe進(jìn)行outbound事件的相關(guān)IO操作)惠勒。tail節(jié)點:
tail節(jié)點是inBound處理節(jié)點赚抡。
tail節(jié)點作為pipeline的第一環(huán)傳遞outbound事件,其實就是將outbound事件透傳到前一個outbound處理節(jié)點纠屋。并作為pipeline的最后一環(huán)最終接收inbound事件涂臣,大部分左右是終止inbound事件的傳播。
tail節(jié)點的exceptionCaught方法:若最終在用戶自定義的處理節(jié)點沒有捕獲處理異常售担,則在tail節(jié)點捕獲異常打印警告日志赁遗。
tail節(jié)點的channelRead方法:若Channel讀入的ByteBuf在流經(jīng)pipeline過程中沒有被消費掉,最終流入了tail節(jié)點族铆,則將該ByteBuf丟棄回收并打印警告日志岩四。