Netty源碼分析


源碼分析

NIOEventLoopGroup

NioEventLoopGroup(其實是MultithreadEventExecutorGroup) 內(nèi)部維護(hù)一個類型為 EventExecutor children [], 默認(rèn)大小是處理器核數(shù) * 2, 這樣就構(gòu)成了一個線程池,初始化EventExecutor時NioEventLoopGroup重載newChild方法,所以children元素的實際類型為NioEventLoop。

線程啟動時調(diào)用SingleThreadEventExecutor的構(gòu)造方法晾匠,執(zhí)行NioEventLoop類的run方法,首先會調(diào)用hasTasks()方法判斷當(dāng)前taskQueue是否有元素儒拂。如果taskQueue中有元素监透,執(zhí)行 selectNow() 方法匙头,最終執(zhí)行selector.selectNow()瑟慈,該方法會立即返回桃移。如果taskQueue沒有元素,執(zhí)行 select(oldWakenUp) 方法

select ( oldWakenUp) 方法解決了 Nio 中的 bug葛碧,selectCnt 用來記錄selector.select方法的執(zhí)行次數(shù)和標(biāo)識是否執(zhí)行過selector.selectNow()借杰,若觸發(fā)了epoll的空輪詢bug,則會反復(fù)執(zhí)行selector.select(timeoutMillis)进泼,變量selectCnt 會逐漸變大蔗衡,當(dāng)selectCnt 達(dá)到閾值(默認(rèn)512)纤虽,則執(zhí)行rebuildSelector方法,進(jìn)行selector重建粘都,解決cpu占用100%的bug廓推。

rebuildSelector方法先通過openSelector方法創(chuàng)建一個新的selector刷袍。然后將old selector的selectionKey執(zhí)行cancel翩隧。最后將old selector的channel重新注冊到新的selector中。rebuild后呻纹,需要重新執(zhí)行方法selectNow堆生,檢查是否有已ready的selectionKey。

接下來調(diào)用processSelectedKeys 方法(處理I/O任務(wù))雷酪,當(dāng)selectedKeys != null時淑仆,調(diào)用processSelectedKeysOptimized方法,迭代 selectedKeys 獲取就緒的 IO 事件的selectkey存放在數(shù)組selectedKeys中, 然后為每個事件都調(diào)用 processSelectedKey 來處理它哥力,processSelectedKey 中分別處理OP_READ蔗怠;OP_WRITE;OP_CONNECT事件吩跋。

最后調(diào)用runAllTasks方法(非IO任務(wù))寞射,該方法首先會調(diào)用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已經(jīng)超過延遲執(zhí)行時間的任務(wù)移到taskQueue中等待被執(zhí)行锌钮,然后依次從taskQueue中取任務(wù)執(zhí)行桥温,每執(zhí)行64個任務(wù),進(jìn)行耗時檢查梁丘,如果已執(zhí)行時間超過預(yù)先設(shè)定的執(zhí)行時間侵浸,則停止執(zhí)行非IO任務(wù),避免非IO任務(wù)太多氛谜,影響IO任務(wù)的執(zhí)行掏觉。

每個NioEventLoop對應(yīng)一個線程和一個Selector,NioServerSocketChannel會主動注冊到某一個NioEventLoop的Selector上值漫,NioEventLoop負(fù)責(zé)事件輪詢澳腹。

Outbound 事件都是請求事件, 發(fā)起者是 Channel,處理者是 unsafe惭嚣,通過 Outbound 事件進(jìn)行通知枕磁,傳播方向是 tail到head。Inbound 事件發(fā)起者是 unsafe形庭,事件的處理者是 Channel, 是通知事件萌踱,傳播方向是從頭到尾。

內(nèi)存管理機制槽地,首先會預(yù)申請一大塊內(nèi)存Arena迁沫,Arena由許多Chunk組成芦瘾,而每個Chunk默認(rèn)由2048個page組成。Chunk通過AVL樹的形式組織Page集畅,每個葉子節(jié)點表示一個Page近弟,而中間節(jié)點表示內(nèi)存區(qū)域,節(jié)點自己記錄它在整個Arena中的偏移地址挺智。當(dāng)區(qū)域被分配出去后祷愉,中間節(jié)點上的標(biāo)記位會被標(biāo)記,這樣就表示這個中間節(jié)點以下的所有節(jié)點都已被分配了赦颇。大于8k的內(nèi)存分配在poolChunkList中二鳄,而PoolSubpage用于分配小于8k的內(nèi)存,它會把一個page分割成多段媒怯,進(jìn)行內(nèi)存分配订讼。

ByteBuf的特點:支持自動擴容(4M),保證put方法不會拋出異常扇苞、通過內(nèi)置的復(fù)合緩沖類型欺殿,實現(xiàn)零拷貝(zero-copy);不需要調(diào)用flip()來切換讀/寫模式鳖敷,讀取和寫入索引分開脖苏;方法鏈;引用計數(shù)基于AtomicIntegerFieldUpdater用于內(nèi)存回收哄陶;PooledByteBuf采用二叉樹來實現(xiàn)一個內(nèi)存池帆阳,集中管理內(nèi)存的分配和釋放,不用每次使用都新建一個緩沖區(qū)對象屋吨。UnpooledHeapByteBuf每次都會新建一個緩沖區(qū)對象蜒谤。

NioEventLoop與NioChannel類關(guān)系


一個NioEventLoopGroup下包含多個NioEventLoop
每個NioEventLoop中包含有一個Selector,一個taskQueue至扰,一個delayedTaskQueue
每個NioEventLoop的Selector上可以注冊監(jiān)聽多個AbstractNioChannel.ch
每個AbstractNioChannel只會綁定在唯一的NioEventLoop上
每個AbstractNioChannel都綁定有一個自己的DefaultChannelPipeline

NioEventLoop線程執(zhí)行過程

輪詢監(jiān)聽的IO事件

  1. netty的輪詢注冊機制
    netty將AbstractNioChannel內(nèi)部的jdk類SelectableChannel對象注冊到NioEventLoop中的jdk類Selector對象上去鳍徽,并且將AbstractNioChannel作為SelectableChannel對象的一個attachment附屬上。
    這樣在Selector輪詢到某個SelectableChannel有IO事件發(fā)生時敢课,就可以直接取出IO事件對應(yīng)的AbstractNioChannel進(jìn)行后續(xù)操作阶祭。

  2. 循環(huán)執(zhí)行阻塞selector.select(timeoutMIllis)操作直到以下條件產(chǎn)生
    a)輪詢到了IO事件(selectedKey != 0)
    b)oldWakenUp參數(shù)為true
    c)任務(wù)隊列里面有待處理任務(wù)(hasTasks())
    d)第一個定時任務(wù)即將要被執(zhí)行(hasScheduledTasks())
    e)用戶主動喚醒(wakenUp.get()==true)

  3. 解決JDK的NIO epoll bug
    該bug會導(dǎo)致Selector一直空輪詢,最終導(dǎo)致cpu 100%直秆。
    在每次selector.select(timeoutMillis)后濒募,如果沒有監(jiān)聽到就緒IO事件,會記錄此次select的耗時圾结。如果耗時不足timeoutMillis瑰剃,說明select操作沒有阻塞那么長時間,可能觸發(fā)了空輪詢筝野,進(jìn)行一次計數(shù)晌姚。
    計數(shù)累積超過閾值(默認(rèn)512)后粤剧,開始進(jìn)行Selector重建:
    a)拿到有效的selectionKey集合
    b)取消該selectionKey在舊的selector上的事件注冊
    c)將該selectionKey對應(yīng)的Channel注冊到新的selector上,生成新的selectionKey
    d)重新綁定Channel和新的selectionKey的關(guān)系

  4. netty優(yōu)化了sun.nio.ch.SelectorImpl類中的selectedKeys和publicSelectedKeys這兩個field的實現(xiàn)
    netty通過反射將這兩個filed替換掉挥唠,替換后的field采用數(shù)組實現(xiàn)抵恋。
    這樣每次在輪詢到nio事件的時候,netty只需要O(1)的時間復(fù)雜度就能將SelectionKey塞到set中去宝磨,而jdk原有field底層使用的hashSet需要O(lgn)的時間復(fù)雜度弧关。

處理IO事件

1)對于boss NioEventLoop來說,輪詢到的是基本上是連接事件(OP_ACCEPT):
a)socketChannel = ch.accept()懊烤;
b)將socketChannel綁定到worker NioEventLoop上梯醒;
c)socketChannel在worker NioEventLoop上創(chuàng)建register0任務(wù);
d)pipeline.fireChannelReadComplete();

2)對于worker NioEventLoop來說腌紧,輪詢到的基本上是IO讀寫事件(以O(shè)P_READ為例):
a)ByteBuffer.allocateDirect(capacity);
b)socketChannel.read(dst);
c)pipeline.fireChannelRead();
d)pipeline.fireChannelReadComplete();

處理任務(wù)隊列

1)處理用戶產(chǎn)生的普通任務(wù)
NioEventLoop中的Queue<Runnable> taskQueue被用來承載用戶產(chǎn)生的普通Task畜隶。
taskQueue被實現(xiàn)為netty的mpscQueue壁肋,即多生產(chǎn)者單消費者隊列。netty使用該隊列將外部用戶線程產(chǎn)生的Task聚集籽慢,并在reactor線程內(nèi)部用單線程的方式串行執(zhí)行隊列中的Task浸遗。
當(dāng)用戶在非IO線程調(diào)用Channel的各種方法執(zhí)行Channel相關(guān)的操作時,比如channel.write()箱亿、channel.flush()等跛锌,netty會將相關(guān)操作封裝成一個Task并放入taskQueue中,保證相關(guān)操作在IO線程中串行執(zhí)行届惋。

2)處理用戶產(chǎn)生的定時任務(wù)
NioEventLoop中的Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue被用來承載用戶產(chǎn)生的定時Task髓帽。
當(dāng)用戶在非IO線程需要產(chǎn)生定時操作時,netty將用戶的定時操作封裝成ScheduledFutureTask脑豹,即一個netty內(nèi)部的定時Task郑藏,并將定時Task放入delayedTaskQueue中等待對應(yīng)Channel的IO線程串行執(zhí)行。
為了解決多線程并發(fā)寫入delayedTaskQueue的問題瘩欺,netty將添加ScheduledFutureTask到delayedTaskQueue中的操作封裝成普通Task必盖,放入taskQueue中,通過NioEventLoop的IO線程對delayedTaskQueue進(jìn)行單線程寫操作俱饿。

3)處理任務(wù)隊列的邏輯
a)將已到期的定時Task從delayedTaskQueue中轉(zhuǎn)移到taskQueue中
b)計算本次循環(huán)執(zhí)行的截止時間
c)循環(huán)執(zhí)行taskQueue中的任務(wù)歌粥,每隔64個任務(wù)檢查一下是否已過截止時間,直到taskQueue中任務(wù)全部執(zhí)行完或者超過執(zhí)行截止時間拍埠。

Netty中Reactor線程和worker線程所處理的事件

Server端NioEventLoop處理的事件

Client端NioEventLoop處理的事件


pipeline原理與事件處理

目錄

  • pipeline整體關(guān)系簡述
  • Unsafe的作用
  • 事件的分類及處理
  • pipeline中節(jié)點的添加和刪除

一失驶、pipeline整體關(guān)系簡述

netty中的pipeline模型

當(dāng)EventLoop的selector監(jiān)聽到某Channel產(chǎn)生了就緒的IO事件,并調(diào)用socket API對就緒的IO事件進(jìn)行操作后械拍,需要將操作產(chǎn)生的“IO數(shù)據(jù)”或“操作結(jié)果”告知用戶進(jìn)行相應(yīng)的業(yè)務(wù)處理突勇。

netty將因外部IO事件導(dǎo)致的Channel狀態(tài)變更(Channel被注冊到EventLoop中装盯,Channel狀態(tài)變?yōu)榭捎茫珻hannel讀取到IO數(shù)據(jù)...)或Channel內(nèi)部邏輯操作(添加ChannelHandler...)抽象為不同的回調(diào)事件甲馋,并定義了pipeline對Channel的回調(diào)事件進(jìn)行流式的響應(yīng)處理埂奈。

用戶可在pipeline中添加多個事件處理器(ChannelHandler),并通過實現(xiàn)ChannelHandler中定義的方法定躏,對回調(diào)事件進(jìn)行定制化的業(yè)務(wù)處理账磺。ChannelHandler也可以調(diào)用自身方法對Channel本身進(jìn)行操作。

netty會保證“回調(diào)事件在ChannelHandler之間的流轉(zhuǎn)”及“Channel內(nèi)部IO操作”由EventLoop線程串行執(zhí)行痊远,用戶也可以在ChannelHandler中使用自行構(gòu)建的業(yè)務(wù)線程進(jìn)行業(yè)務(wù)處理垮抗。

pipeline相關(guān)類的關(guān)系圖

  • DefaultChannelPipeline:
    事件處理流,是一個雙向鏈表結(jié)構(gòu)碧聪,鏈表中節(jié)點元素為ChannelHandlerContext冒版。

新的AbstractNioChannel創(chuàng)建時,會創(chuàng)建該Channel對應(yīng)的DefaultChannelPipeline逞姿,用于處理該Channel對應(yīng)的回調(diào)事件辞嗡。

DefaultChannelPipeline創(chuàng)建時,會自動創(chuàng)建并向鏈表中添加兩個ChannelhandlerContext節(jié)點——head和tail滞造。

pipeline的fireXXX()方法:回調(diào)事件的發(fā)起方法续室。會產(chǎn)生相應(yīng)回調(diào)事件并直接調(diào)用ChannelHandlerContext.invokeXXX(head)方法將回調(diào)事件傳遞給pipeline的head節(jié)點。

ChannelHandlerContext:

事件處理器上下文谒养,pipeline中的實際處理節(jié)點挺狰。

每個處理節(jié)點ChannelHandlerContext中包含一個具體的事件處理器ChannelHandler,同時ChannelHandlerContext中也綁定了對應(yīng)的pipeline和Channel的信息买窟,方便ChannelHandler進(jìn)行調(diào)用丰泊。

AbstractChannelHandlerContext具體實現(xiàn)了ChannelHandlerContext接口的功能,并進(jìn)行了相應(yīng)擴展蔑祟。

  • ChannelHandlerContext的fireXXX方法:回調(diào)事件的發(fā)起方法趁耗。會產(chǎn)生相應(yīng)回調(diào)事件并將其交給pipeline中的下一個處理節(jié)點。此方法提供給用戶實現(xiàn)的ChannelHandler使用疆虚,用于將回調(diào)事件向pipeline中的下一個節(jié)點傳遞苛败。
  • AbstractChannelHandlerContext的static invokeXXX(AbstractChannelHandlerContext next)方法:封裝next.invokeXXX()的邏輯并交給EventLoop的IO線程執(zhí)行。
  • ChannelHandlerContext的invokeXXX()方法:回調(diào)事件執(zhí)行方法径簿。執(zhí)行節(jié)點中事件處理器ChannelHandler的XXX方法罢屈,實際處理回調(diào)事件。

ChannelHandler:

ChannelHandler(事件處理器接口)篇亭,由ChannelInboundHandler接口和ChannelOutboundHandler接口繼承缠捌。

ChannelInboundHandler中定義了各個回調(diào)事件的回調(diào)方法,由用戶進(jìn)行具體實現(xiàn)。

ChannelOutboundHandler中定義了方法進(jìn)行Channel內(nèi)部IO操作(Channel發(fā)起bind/connect/close操作曼月,Channel監(jiān)聽OP_READ谊却,Channel寫IO數(shù)據(jù)...),供用戶在回調(diào)方法中使用哑芹。

ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter為接口的默認(rèn)實現(xiàn)類(其實沒干什么事)炎辨,用戶通過繼承這兩個類來實現(xiàn)自己的業(yè)務(wù)處理邏輯

二、Unsafe的作用

概述

Unsafe是Channel的內(nèi)部類聪姿,一個Channel對應(yīng)一個Unsafe碴萧。
Unsafe用于處理Channel對應(yīng)網(wǎng)絡(luò)IO的底層操作。ChannelHandler處理回調(diào)事件時產(chǎn)生的相關(guān)網(wǎng)絡(luò)IO操作最終也會委托給Unsafe執(zhí)行末购。

Unsafe接口中定義了socket相關(guān)操作破喻,包括SocketAddress獲取、selector注冊盟榴、網(wǎng)卡端口綁定曹质、socket建連與斷連、socket寫數(shù)據(jù)曹货。這些操作都和jdk底層socket相關(guān)咆繁。
NioUnsafe在Unsafe基礎(chǔ)上增加了幾個操作,包括訪問jdk的SelectableChannel顶籽、socket讀數(shù)據(jù)等。
NioByteUnsafe實現(xiàn)了與socket連接的字節(jié)數(shù)據(jù)讀取相關(guān)的操作银觅。
NioMessageUnsafe實現(xiàn)了與新連接建立相關(guān)的操作礼饱。

三、事件的分類及處理

在netty的pipeline中包含兩種類型的事件究驴,分別為inbound和outbound镊绪,inbound為上行事件,outbound為下行事件洒忧。
inbound事件為被動觸發(fā)蝴韭,在某些情況發(fā)生時自動觸發(fā);
outbound為主動觸發(fā)熙侍,在需要主動執(zhí)行某些操作時觸發(fā)榄鉴。

Inbound事件

Outbound事件

發(fā)起并處理inbound事件

對于inbound事件,因為需要進(jìn)行業(yè)務(wù)邏輯處理蛉抓,因此pipeline的head節(jié)點會執(zhí)行fireXXX()方法將事件透傳給后面的用戶自己實現(xiàn)inbound處理節(jié)點庆尘,由用戶自己實現(xiàn)的ChannelHandler接收事件并回調(diào)執(zhí)行業(yè)務(wù)邏輯。

發(fā)起并處理outbound事件

對于outbound事件巷送,因為和IO操作相關(guān)驶忌,最后會由pipeline中的head節(jié)點接收處理。head節(jié)點實現(xiàn)了ChannelHandler的事件執(zhí)行方法笑跛,將實際的執(zhí)行操作委托給Unsafe進(jìn)行付魔。



bind聊品、connect、read几苍、writeAndFlush等outbound事件的處理過程可以自己跟代碼看一下翻屈。(最終由head節(jié)點委托給Unsafe類執(zhí)行相關(guān)IO操作)

head、tail節(jié)點的作用

  • head節(jié)點:
    head節(jié)點既是inBound處理節(jié)點擦剑,又是outBound處理節(jié)點妖胀。
    head節(jié)點作為pipeline的頭結(jié)點開始接收并傳遞inbound事件。并作為pipeline的最后一環(huán)最終接收處理outbound事件(委托Unsafe進(jìn)行outbound事件的相關(guān)IO操作)惠勒。

  • tail節(jié)點:
    tail節(jié)點是inBound處理節(jié)點赚抡。
    tail節(jié)點作為pipeline的第一環(huán)傳遞outbound事件,其實就是將outbound事件透傳到前一個outbound處理節(jié)點纠屋。并作為pipeline的最后一環(huán)最終接收inbound事件涂臣,大部分左右是終止inbound事件的傳播。
    tail節(jié)點的exceptionCaught方法:若最終在用戶自定義的處理節(jié)點沒有捕獲處理異常售担,則在tail節(jié)點捕獲異常打印警告日志赁遗。
    tail節(jié)點的channelRead方法:若Channel讀入的ByteBuf在流經(jīng)pipeline過程中沒有被消費掉,最終流入了tail節(jié)點族铆,則將該ByteBuf丟棄回收并打印警告日志岩四。

Ref:
http://www.reibang.com/p/607501fdeb92

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市哥攘,隨后出現(xiàn)的幾起案子剖煌,更是在濱河造成了極大的恐慌,老刑警劉巖逝淹,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耕姊,死亡現(xiàn)場離奇詭異,居然都是意外死亡栅葡,警方通過查閱死者的電腦和手機茉兰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來欣簇,“玉大人规脸,你說我怎么就攤上這事∽硪希” “怎么了燃辖?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長网棍。 經(jīng)常有香客問我黔龟,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任氏身,我火速辦了婚禮巍棱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蛋欣。我一直安慰自己航徙,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布陷虎。 她就那樣靜靜地躺著到踏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪尚猿。 梳的紋絲不亂的頭發(fā)上窝稿,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天,我揣著相機與錄音凿掂,去河邊找鬼伴榔。 笑死,一個胖子當(dāng)著我的面吹牛庄萎,可吹牛的內(nèi)容都是我干的踪少。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼糠涛,長吁一口氣:“原來是場噩夢啊……” “哼援奢!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起忍捡,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤萝究,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后锉罐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡绕娘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年脓规,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片险领。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡侨舆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出绢陌,到底是詐尸還是另有隱情挨下,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布脐湾,位于F島的核電站臭笆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜愁铺,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一鹰霍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧茵乱,春花似錦茂洒、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至斤贰,卻和暖如春智哀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背腋舌。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工盏触, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人块饺。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓赞辩,卻偏偏與公主長得像,于是被迫代替她去往敵國和親授艰。 傳聞我的和親對象是個殘疾皇子辨嗽,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容