read

AbstractNioByteChannel.read()

@Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);//每讀一次珊随,會加一次read
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);//觸發(fā)事件
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();//調(diào)用record()每次讀完會進(jìn)行胞此,預(yù)分配調(diào)整斧账,默認(rèn)第一次是1024
                pipeline.fireChannelReadComplete();//觸發(fā)事件

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
 

DefaultMaxBytesRecvByteBufAllocator

   @Override
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   maybeMoreDataSupplier.get() &&
                   totalMessages < maxMessagePerRead &&  //有判斷讀的次數(shù)
                   totalBytesRead > 0;
        }
     private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                return attemptBytesRead == lastBytesRead;//默認(rèn)分配了1024 只讀了420,所以不繼續(xù)讀
            }
        };

AdaptiveRecvByteBufAllocator.HandleImpl計算每次需要的byteSize平均值

private final class HandleImpl extends MaxMessageHandle {
        private final int minIndex;
        private final int maxIndex;
        private int index;
        private int nextReceiveBufferSize;
        private boolean decreaseNow;

        public HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            index = getSizeTableIndex(initial);
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        @Override
        public int guess() {
            return nextReceiveBufferSize;
        }
//每次讀完會進(jìn)行合蔽,預(yù)分配調(diào)整,默認(rèn)第一次是1024
        private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index = Math.max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = Math.min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

        @Override
        public void readComplete() {
            record(totalBytesRead());
        }
    }

    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

導(dǎo)讀

AbstractNioByteChannel.read()
  AdaptiveRecvByteBufAllocator allocHandle = recvBufAllocHandle()
  byteBuf = allocHandle.allocate(allocator);
  allocHandle.lastBytesRead(doReadBytes(byteBuf));
  allocHandle.incMessagesRead(1);
  readPending = false;
  pipeline.fireChannelRead(byteBuf);
  allocHandle.continueReading()
    continueReading(defaultMaybeMoreSupplier)
    bytesToRead > 0 && maybeMoreDataSupplier.get()
        UncheckedBooleanSupplier.get()
          return attemptBytesRead == lastBytesRead
  
  allocHandle.readComplete();
     record(totalBytesRead());
       nextReceiveBufferSize = SIZE_TABLE[index]
  pipeline.fireChannelReadComplete();
  closeOnRead(pipeline)
    if (!isInputShutdown0()) {
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
           shutdownInput();
           pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
       }
    else
      doClose()
          doClose0(promise);
          outboundBuffer.failFlushed(cause, notify);
          outboundBuffer.close(closeCause);
          fireChannelInactiveAndDeregister(wasActive);
    pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE)
  if (!readPending && !config.isAutoRead()) {
      removeReadOp();
  }

AbstractNioMessageChannel.NioMessageUnsafe


        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市何恶,隨后出現(xiàn)的幾起案子捌议,更是在濱河造成了極大的恐慌哼拔,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瓣颅,死亡現(xiàn)場離奇詭異倦逐,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)宫补,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門檬姥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人粉怕,你說我怎么就攤上這事健民。” “怎么了贫贝?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵秉犹,是天一觀的道長蛉谜。 經(jīng)常有香客問我,道長崇堵,這世上最難降的妖魔是什么型诚? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮筑辨,結(jié)果婚禮上俺驶,老公的妹妹穿的比我還像新娘。我一直安慰自己棍辕,他們只是感情好暮现,可當(dāng)我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著楚昭,像睡著了一般栖袋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上抚太,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天塘幅,我揣著相機(jī)與錄音,去河邊找鬼尿贫。 笑死电媳,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的庆亡。 我是一名探鬼主播匾乓,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼又谋!你這毒婦竟也來了拼缝?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤彰亥,失蹤者是張志新(化名)和其女友劉穎咧七,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體任斋,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡继阻,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了废酷。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片穴翩。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖锦积,靈堂內(nèi)的尸體忽然破棺而出芒帕,到底是詐尸還是另有隱情,我是刑警寧澤丰介,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布背蟆,位于F島的核電站鉴分,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏带膀。R本人自食惡果不足惜志珍,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望垛叨。 院中可真熱鬧伦糯,春花似錦、人聲如沸嗽元。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽剂癌。三九已至淤翔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間佩谷,已是汗流浹背旁壮。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留谐檀,地道東北人抡谐。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像桐猬,于是被迫代替她去往敵國和親童叠。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容

  • boss線程主要負(fù)責(zé)監(jiān)聽并處理accept事件课幕,將socketChannel注冊到work線程的selector,...
    美團(tuán)Java閱讀 14,307評論 11 16
  • 用兩張圖告訴你五垮,為什么你的 App 會卡頓? - Android - 掘金 Cover 有什么料乍惊? 從這篇文章中你...
    hw1212閱讀 12,693評論 2 59
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)放仗,斷路器润绎,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 簡介 yii2的自動登錄的原理很簡單。主要就是利用cookie來實現(xiàn)的诞挨,在第一次登錄的時候莉撇,如果登錄成功并且選中了...
    Hanrydy閱讀 2,161評論 2 2
  • 前天,心情很低落惶傻,看著素材寫不出一個字棍郎,加上看到其他老鐵都完成了任務(wù),內(nèi)心各種焦急银室,就是一個字也寫不出來涂佃,只能拖延...
    祎祎閱讀 126評論 0 0