netty學(xué)習(xí)系列四:讀操作

一臼勉、緩存空間分配器:ByteBufAllocator

ByteBufAllocator接口為ByteBuf分配器剪况,用于分配新的ByteBuf存儲IO數(shù)據(jù)冒签。

ByteBufAllocator類關(guān)系圖

1闯两、ByteBufAllocator

接口定義了 ByteBuf ioBuffer(int initialCapacity) 方法俘枫,用于分配一個ByteBuf

2腥沽、AbstractByteBufAllocator

實(shí)現(xiàn)了ByteBuf ioBuffer(int initialCapacity)方法,根據(jù)系統(tǒng)配置實(shí)際調(diào)用自己的抽象方法

ByteBuf newDirectBuffer(int initialCapacity,int maxCapacity);
ByteBuf newHeapBuffer(int initialCapacity,int maxCapacity);

其子類UnpooledByteBufAllocator和PooledByteBufAllocator實(shí)現(xiàn)了上述抽象方法鸠蚪。

3今阳、UnpooledByteBufAllocator

非池化的ByteBuf分配器,具體實(shí)現(xiàn)了抽象方法邓嘹。
1)對于newDirectBuffer
根據(jù)平臺是否支持Unsafe方式酣栈,將實(shí)例化出一個UnpooledDirectByteBuf/UnpooledUnsafeDirectByteBuf。
UnpooledDirectByteBuf是一個基于NIO的Buffer汹押,其內(nèi)部持有一個NIO ByteBuffer buffer矿筝,并通過ByteBuffer.allocateDirect(initcapacity)方法進(jìn)行實(shí)例化。
2)對于newHeapBuffer
根據(jù)平臺是否支持Unsafe方式棚贾,將實(shí)例化出一個UnpooledHeapByteBuf/UnpooledUnsafeHeapByteBuf窖维。
UnpooledHeapByteBuf是一個基于java heap的Buffer,其內(nèi)部直接在java heap中申請byte[] array空間進(jìn)行IO數(shù)據(jù)存儲妙痹。

二铸史、接收緩存分配器:RecvByteBufAllocator

1、接口概述

RecvByteBufAllocator接口用于分配一塊大小合理的buffer空間怯伊,存儲Channel讀入的IO數(shù)據(jù)琳轿。具體功能交由內(nèi)部接口Handle定義。

    interface Handle {
        /**
         * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
         * enough not to waste its space.
         */
        ByteBuf allocate(ByteBufAllocator alloc);

        /**
         * Increment the number of messages that have been read for the current read loop.
         * @param numMessages The amount to increment by.
         */
        void incMessagesRead(int numMessages);

        /**
         * Set the bytes that have been read for the last read operation.
         * This may be used to increment the number of bytes that have been read.
         * @param bytes The number of bytes from the previous read operation. This may be negative if an read error
         * occurs. If a negative value is seen it is expected to be return on the next call to
         * {@link #lastBytesRead()}. A negative value will signal a termination condition enforced externally
         * to this class and is not required to be enforced in {@link #continueReading()}.
         */
        void lastBytesRead(int bytes);

        /**
         * Determine if the current read loop should should continue.
         * @return {@code true} if the read loop should continue reading. {@code false} if the read loop is complete.
         */
        boolean continueReading();

        /**
         * The read has completed.
         */
        void readComplete();
    }

allocate(ByteBufAllocator alloc)方法用于創(chuàng)建存放讀入IO數(shù)據(jù)的ByteBuf耿芹。
readComplete()在讀操作完成后調(diào)用崭篡,在實(shí)現(xiàn)類HandleImpl中執(zhí)行record(int actualReadBytes)做了調(diào)整分配空間大小的邏輯。

2吧秕、實(shí)現(xiàn)類AdaptiveRecvByteBufAllocator

接口RecvByteBufAllocator的實(shí)現(xiàn)類琉闪,能根據(jù)前一次實(shí)際讀取的字節(jié)數(shù)量,自適應(yīng)調(diào)整當(dāng)前緩存分配的大小砸彬。

三颠毙、NioEventLoop處理OP_READ事件

1斯入、代碼入口

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //上面省略...
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
        if (!ch.isOpen()) {
            // Connection already closed - no need to handle write.
            return;
        }
    }
    //下面省略...
}

NioEventLoop在處理其selector監(jiān)聽到的OP_READ事件時,會執(zhí)行上面的代碼邏輯蛀蜜,將OP_READ事件最終交由Unsafe處理刻两,即執(zhí)行NioByteUnsafe.read()方法。

        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

2涵防、執(zhí)行邏輯

1)獲取緩存分配器

final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

循環(huán)執(zhí)行以下邏輯直到跳出--->
2)分配緩存ByteBuf

ByteBuf byteBuf = allocHandle.allocate(allocator);

3)將數(shù)據(jù)從nio.SocketChannel讀取到byteBuf中

doReadBytes(byteBuf)

實(shí)際調(diào)用了NioSocketChannel.doReadBytes(ByteBuf byteBuf)方法闹伪。
進(jìn)一步調(diào)用ByteBuf.writeBytes(ScatteringByteChannel in, int length)方法。
最終底層調(diào)用nio的ReadableByteChannel.read(ByteBuffer dst)方法壮池。

                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

若本次讀取到的數(shù)據(jù)長度==0偏瓤,表示本次OP_READ事件的數(shù)據(jù)已讀取完畢,退出循環(huán)椰憋。
若本次讀取到的數(shù)據(jù)長度<0厅克,表示對端已斷開socket連接,退出循環(huán)橙依,執(zhí)行NioByteUnsafe.closeOnRead()方法關(guān)閉Channel证舟,關(guān)閉Channel的過程中執(zhí)行了pipeline.fireChannelInactive()pipeline.fireChannelUnregistered()
4)觸發(fā)ChannelRead事件窗骑,將讀ByteBuf交給pipeline流轉(zhuǎn)

pipeline.fireChannelRead(byteBuf);
byteBuf = null;

<----循環(huán)結(jié)束女责。結(jié)束條件:a、localReadAmount == 0或-1创译,b抵知、循環(huán)讀取到ByteBuf個數(shù)超過指定閾值

5)根據(jù)本次已讀字節(jié)數(shù),調(diào)整RecvByteBufAllocator的下次分配的緩存大小

allocHandle.readComplete();

6)觸發(fā)ChannelReadComplete事件

pipeline.fireChannelReadComplete();
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末软族,一起剝皮案震驚了整個濱河市刷喜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌立砸,老刑警劉巖掖疮,帶你破解...
    沈念sama閱讀 206,723評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異颗祝,居然都是意外死亡浊闪,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評論 2 382
  • 文/潘曉璐 我一進(jìn)店門螺戳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來规揪,“玉大人,你說我怎么就攤上這事温峭。” “怎么了字支?”我有些...
    開封第一講書人閱讀 152,998評論 0 344
  • 文/不壞的土叔 我叫張陵凤藏,是天一觀的道長奸忽。 經(jīng)常有香客問我,道長揖庄,這世上最難降的妖魔是什么栗菜? 我笑而不...
    開封第一講書人閱讀 55,323評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮蹄梢,結(jié)果婚禮上疙筹,老公的妹妹穿的比我還像新娘。我一直安慰自己禁炒,他們只是感情好而咆,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著幕袱,像睡著了一般暴备。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上们豌,一...
    開封第一講書人閱讀 49,079評論 1 285
  • 那天涯捻,我揣著相機(jī)與錄音,去河邊找鬼望迎。 笑死障癌,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的辩尊。 我是一名探鬼主播涛浙,決...
    沈念sama閱讀 38,389評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼对省!你這毒婦竟也來了蝗拿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,019評論 0 259
  • 序言:老撾萬榮一對情侶失蹤蒿涎,失蹤者是張志新(化名)和其女友劉穎哀托,沒想到半個月后劳秋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仓手,經(jīng)...
    沈念sama閱讀 43,519評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評論 2 325
  • 正文 我和宋清朗相戀三年玻淑,在試婚紗的時候發(fā)現(xiàn)自己被綠了嗽冒。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,100評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡补履,死狀恐怖添坊,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情箫锤,我是刑警寧澤贬蛙,帶...
    沈念sama閱讀 33,738評論 4 324
  • 正文 年R本政府宣布雨女,位于F島的核電站,受9級特大地震影響阳准,放射性物質(zhì)發(fā)生泄漏氛堕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評論 3 307
  • 文/蒙蒙 一野蝇、第九天 我趴在偏房一處隱蔽的房頂上張望讼稚。 院中可真熱鬧,春花似錦绕沈、人聲如沸锐想。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽痛倚。三九已至,卻和暖如春澜躺,著一層夾襖步出監(jiān)牢的瞬間蝉稳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評論 1 262
  • 我被黑心中介騙來泰國打工掘鄙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留耘戚,地道東北人。 一個月前我還...
    沈念sama閱讀 45,547評論 2 354
  • 正文 我出身青樓操漠,卻偏偏與公主長得像收津,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子浊伙,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評論 2 345

推薦閱讀更多精彩內(nèi)容