netty學(xué)習(xí)系列五:write&flush

一、ChannelOutboundBuffer

1、定義

是AbstractUnsafe使用的數(shù)據(jù)結(jié)構(gòu),用來存儲待發(fā)送的數(shù)據(jù)拌消。
在channel.unsafe實例化時,ChannelOutboundBuffer一起被初始化安券。每個channel都有一個自己的ChannelOutboundBuffer墩崩。

2、ChannelOutboundBuffer中的field

    Channel channel           -->    所綁定的Channel
    Entry flushedEntry        -->    表示下一個要被flush的Entry
    Entry unflushedEntry    -->    表示下一次要flush截止的Entry
    Entry tailEntry
    int flushed
    int nioBufferCount
    int nioBufferSize
    long totalPendingSize   -->    已存儲的需要被write到socket發(fā)送緩存中的byte大小
    int unwritable                 -->    表示當(dāng)前channel的待發(fā)送緩存是否可以繼續(xù)寫入數(shù)據(jù)

ChannelOutboundBuffer中維護(hù)了節(jié)點(diǎn)元素為Entry的單向鏈表侯勉。
Entry為待發(fā)送數(shù)據(jù)的抽象鹦筹,實際待發(fā)送數(shù)據(jù)保存在Entry的Object msg中。

二址貌、write的過程

1铐拐、入口

AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的write(Object msg)方法练对,最終都會由HeadContext.write方法執(zhí)行遍蟋,最終交由AbstractUnsafe.write(Object msg,ChannelPromise promise)實現(xiàn)。

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }

            outboundBuffer.addMessage(msg, size, promise);
        }

2螟凭、取得ChannelOutboundBuffer

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

3虚青、對要進(jìn)行write操作的數(shù)據(jù)msg,如果是ByteBuf類型則統(tǒng)一轉(zhuǎn)換為DirectByteBuf實現(xiàn)

msg = filterOutboundMessage(msg);

內(nèi)部調(diào)用AbstractNioByteChannel.filterOutboundMessage(Object msg)方法螺男,如果msg是ByteBuf類型棒厘,則將其轉(zhuǎn)換為DirectByteBuf的實現(xiàn)纵穿。(調(diào)用ByteBufAllocator.directBuffer(initialCapacity)分配一塊直接緩存空間并將原msg中的字節(jié)流放入)

4、 計算msg的大小

int size = pipeline.estimatorHandle().size(msg);

對于ByteBuf類型的msg奢人,直接調(diào)用readableBytes()方法谓媒。

5、將ByteBuf msg存入ChannelOutboundBuffer中

outboundBuffer.addMessage(msg, size, promise);

1)將msg封裝成Entry對象达传,并放入單向鏈表的尾部tailEntry

Entry entry = Entry.newInstance(msg, size, total(msg), promise);
tailEntry = entry;

netty使用基于thread-local的輕量級對象池Recycler對Entry進(jìn)行回收,避免多次實例化的垃圾回收和開銷迫筑。
2)更新ChannelOutboundBuffer的totalPendingSize宪赶,累加上本次新增的大小

incrementPendingOutboundBytes(size, false);

若totalPendingSize超過了channel的高水位線:
-將unwritable狀態(tài)更新為不可寫;
-執(zhí)行pipeline.fileChannelWritabilityChanged()脯燃;

三搂妻、flush過程

1、入口

AbstractChannel辕棚、DefaultChannelPipeline或AbstractChannelHandlerContext提供的flush()方法欲主,都會由HeadContext.flush(ChannelHandlerContext ctx)方法執(zhí)行,最終交由AbstractUnsafe.flush()實現(xiàn)逝嚎。

        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }

2扁瓢、更新ChannelOutboundBuffer中本次將要flush的Entry區(qū)間

outboundBuffer.addFlush();

將ChannelOutboundBuffer的unflushedEntry向后不斷移動到tailEntry,操作結(jié)束后本次要flush的鏈表區(qū)間就是flushedEntry->unflushedEntry补君。

3引几、檢查ChannelOutboundBuffer是否有待flush的數(shù)據(jù),如果沒有則直接返回挽铁,終止flush過程

4伟桅、將ChannelOutboundBuffer的要flush的鏈表區(qū)間數(shù)據(jù)寫入TCP發(fā)送緩沖區(qū)

NioByteUnsafe.doWrite(ChannelOutboundBuffer in)

對Entry鏈表區(qū)間中的每個Entry.msg(即ByteBuf)執(zhí)行以下邏輯--->
1)如果當(dāng)前flushedEntry為空,則將OP_WRITE事件從對應(yīng)Channel的interestOp中移除叽掘,跳出遍歷直接到步驟5)

            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

2)采用類似自旋鎖的邏輯不斷調(diào)用NioSocketChannel.doWriteBytes(ByteBuf buf)楣铁,將Entry.msg(即ByteBuf)中的數(shù)據(jù)寫入套接字的發(fā)送緩沖區(qū)
-內(nèi)部調(diào)用ByteBuf.readBytes(Channel out, int length)接口;
-實際底層最終調(diào)用nio.SocketChannel.write(ByteBuffer src)更扁;
-如果自旋過程中出現(xiàn)nio.SocketChannel.write(ByteBuffer src)返回結(jié)果為0盖腕,說明此時TCP發(fā)送緩沖隊列已滿,則退出自旋write并將OP_WRITE添加到ch.selectionKey.interestOps中浓镜,等待TCP發(fā)送緩沖隊列可寫時重新出發(fā)write操作赊堪;
-如果Entry的自旋write達(dá)到一定次數(shù)還沒有將Entry中的數(shù)據(jù)寫完,則直接跳出鏈表遍歷操作竖哩,執(zhí)行最后的incompleteWrite哭廉;
3)構(gòu)造ChannelPromise通知當(dāng)前已write的數(shù)據(jù)進(jìn)度

in.progress(flushedAmount);

4)如果當(dāng)前flushedEntry中的數(shù)據(jù)已寫完,將Entry從ChannelOutboundBuffer中清理回收

in.remove();

-將當(dāng)前entry從Entry鏈表中刪除相叁;
-從totalPendingSize中減去entry已write出去的字節(jié)數(shù)遵绰;
-若totalPendingSize小于了channel的低水位線辽幌,將unwritable狀態(tài)更新為可寫,并調(diào)用pipeline.fileChannelWritabilityChanged()產(chǎn)生ChannelWritabilityChanged事件椿访。
<---對Entry鏈表區(qū)間中的每個Entry.msg(即ByteBuf)執(zhí)行以上邏輯
5)根據(jù)當(dāng)前socket的可寫狀態(tài)乌企,進(jìn)行后續(xù)操作

incompleteWrite(setOpWrite);

-若當(dāng)前TCP發(fā)送緩沖區(qū)已滿,則將OP_WRITE添加到ch.selectionKey.interestOps中成玫,等待TCP發(fā)送緩沖隊列可寫時重新觸發(fā)write操作加酵;
-若當(dāng)前TCP發(fā)送緩沖區(qū)未滿,構(gòu)造一個flush()事件哭当,等待EventLoop的下一個循環(huán)重新檢測ChannelOutboundBuffer中有無待flush的數(shù)據(jù)猪腕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市钦勘,隨后出現(xiàn)的幾起案子陋葡,更是在濱河造成了極大的恐慌,老刑警劉巖彻采,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腐缤,死亡現(xiàn)場離奇詭異,居然都是意外死亡肛响,警方通過查閱死者的電腦和手機(jī)岭粤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來特笋,“玉大人绍在,你說我怎么就攤上這事”⒂校” “怎么了偿渡?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長霸奕。 經(jīng)常有香客問我溜宽,道長,這世上最難降的妖魔是什么质帅? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任适揉,我火速辦了婚禮,結(jié)果婚禮上煤惩,老公的妹妹穿的比我還像新娘嫉嘀。我一直安慰自己,他們只是感情好魄揉,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布剪侮。 她就那樣靜靜地躺著,像睡著了一般洛退。 火紅的嫁衣襯著肌膚如雪瓣俯。 梳的紋絲不亂的頭發(fā)上杰标,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機(jī)與錄音彩匕,去河邊找鬼腔剂。 笑死,一個胖子當(dāng)著我的面吹牛驼仪,可吹牛的內(nèi)容都是我干的掸犬。 我是一名探鬼主播,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼绪爸,長吁一口氣:“原來是場噩夢啊……” “哼湾碎!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起毡泻,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤胜茧,失蹤者是張志新(化名)和其女友劉穎粘优,沒想到半個月后仇味,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡雹顺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年丹墨,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嬉愧。...
    茶點(diǎn)故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡贩挣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出没酣,到底是詐尸還是另有隱情王财,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布裕便,位于F島的核電站绒净,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏偿衰。R本人自食惡果不足惜挂疆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望下翎。 院中可真熱鬧缤言,春花似錦、人聲如沸视事。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽俐东。三九已至鸳碧,卻和暖如春盾鳞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背瞻离。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工腾仅, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人套利。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓推励,卻偏偏與公主長得像,于是被迫代替她去往敵國和親肉迫。 傳聞我的和親對象是個殘疾皇子验辞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容