Netty 源碼解析 ——— AdaptiveRecvByteBufAllocator

本文是Netty文集中“Netty 源碼解析”系列的文章。主要對(duì)Netty的重要流程以及類進(jìn)行源碼解析运悲,以使得我們更好的去使用Netty晶通。Netty是一個(gè)非常優(yōu)秀的網(wǎng)絡(luò)框架,對(duì)其源碼解讀的過(guò)程也是不斷學(xué)習(xí)的過(guò)程华望。

AdaptiveRecvByteBufAllocator主要用于構(gòu)建一個(gè)最優(yōu)大小的緩沖區(qū)來(lái)接收數(shù)據(jù)蕊蝗。比如,在讀事件中就會(huì)通過(guò)該類來(lái)獲取一個(gè)最優(yōu)大小的的緩沖區(qū)來(lái)接收對(duì)端發(fā)送過(guò)來(lái)的可讀取的數(shù)據(jù)赖舟。

關(guān)于AdaptiveRecvByteBufAllocator的分析蓬戚,會(huì)通過(guò)一層層的Java doc來(lái)展開。如果一開始看這些方法說(shuō)明有些個(gè)不大明白宾抓,沒(méi)關(guān)系子漩,在文章的最后會(huì)結(jié)合Netty中真實(shí)的使用場(chǎng)景來(lái)對(duì)AdaptiveRecvByteBufAllocator的使用說(shuō)明分析,這樣就能更好的理解AdaptiveRecvByteBufAllocator的用途石洗。

本文主要針對(duì)NIO網(wǎng)絡(luò)傳輸模式對(duì)AdaptiveRecvByteBufAllocator類展開的分析幢泼。

RecvByteBufAllocator

分配一個(gè)新的接收緩存,該緩存的容量會(huì)盡可能的足夠大以讀入所有的入站數(shù)據(jù)并且該緩存的容量也盡可能的小以不會(huì)浪費(fèi)它的空間讲衫。

Handle newHandle()
創(chuàng)建一個(gè)新的處理器缕棵,該處理器提供一個(gè)真實(shí)的操作并持有內(nèi)部信息,該信息是用于預(yù)測(cè)一個(gè)最優(yōu)緩沖區(qū)大小的必要信息涉兽。

內(nèi)部接口

interface Handle

  • ByteBuf allocate(ByteBufAllocator alloc);
    創(chuàng)建一個(gè)新的接收緩沖區(qū)招驴,該緩沖區(qū)的大小可能足夠大去讀取所有的數(shù)據(jù)并且足夠小以至于不會(huì)浪費(fèi)它的空間。

  • int guess()
    類似于一個(gè)「allocate(ByteBufAllocator)」操作花椭,除了它不會(huì)去分配任何東西只是告訴你分配容量的大小忽匈。

  • void reset(ChannelConfig config)
    重置所有的計(jì)數(shù)器,該計(jì)數(shù)器已經(jīng)累計(jì)并會(huì)推薦下次讀循環(huán)應(yīng)該讀取的消息次數(shù)以及讀取字節(jié)的數(shù)量矿辽。
    它可能被「continueReading()」方式使用去決定是否讀操作應(yīng)該完成丹允。
    這僅僅只是一個(gè)暗示并且可能被忽略在實(shí)現(xiàn)的時(shí)候。
    參數(shù) config:config是Channel的配置袋倔,它可能會(huì)影響對(duì)象的行為雕蔽。

  • void incMessagesRead(int numMessages)
    增加當(dāng)前讀循環(huán)中已經(jīng)讀取消息的次數(shù)(注意,這里不是讀取數(shù)據(jù)的字節(jié)數(shù)宾娜,讀取消息指的是一個(gè)讀操作)批狐。

  • void lastBytesRead(int bytes)
    設(shè)置最后一次讀操作已經(jīng)讀取到的字節(jié)數(shù)。
    這可能被用于增加已經(jīng)讀取的字節(jié)數(shù)。
    參數(shù) bytes:由讀操作提供的字節(jié)數(shù)嚣艇。這可能是一個(gè)負(fù)數(shù)如果一個(gè)讀錯(cuò)誤發(fā)生承冰。如果看到負(fù)值,那么它會(huì)預(yù)計(jì)在下一次調(diào)用「lastBytesRead()」時(shí)返回食零。對(duì)于該類來(lái)說(shuō)一個(gè)負(fù)值將標(biāo)識(shí)一個(gè)外部強(qiáng)制執(zhí)行終止的情況困乒,并且它不要求在「continueReading()」時(shí)執(zhí)行。

  • int lastBytesRead()
    獲取最近一次讀操作的字節(jié)數(shù)贰谣。

  • void attemptedBytesRead(int bytes)
    設(shè)置有多少字節(jié)讀操作將嘗試讀取娜搂,或已經(jīng)讀取。

  • int attemptedBytesRead()
    獲取有多少字節(jié)讀操作將嘗試讀取吱抚,或已經(jīng)讀取百宇。

  • boolean continueReading()
    檢測(cè)是否當(dāng)前的讀循環(huán)可以繼續(xù)。
    返回true:如果讀循環(huán)應(yīng)該繼續(xù)讀取數(shù)據(jù)秘豹;false:如果讀循環(huán)已經(jīng)完成携御。

  • void readComplete()
    讀操作已經(jīng)完成。

interface ExtendedHandle extends Handle

  • boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier)
    和「Handle#continueReading()」方法一樣憋肖,除了通過(guò)supplier參數(shù)讓更多的數(shù)據(jù)來(lái)決定結(jié)果因痛。

內(nèi)部類

class DelegatingHandle implements Handle
該類只有一個(gè)真實(shí)Handler的引用,所有方法的請(qǐng)求最終都將由這個(gè)真實(shí)Handler來(lái)完成岸更。
該類實(shí)現(xiàn)了代理模式中的靜態(tài)代理。

MaxMessagesRecvByteBufAllocator

限制事件循環(huán)嘗試讀操作(比如膊升,READ事件觸發(fā))時(shí)嘗試讀操作的次數(shù)怎炊。比如,處理READ事件時(shí)廓译,限制讀循環(huán)操作可執(zhí)行的讀操作的次數(shù)评肆。

  • int maxMessagesPerRead();
    返回每個(gè)讀循環(huán)讀取消息的最大次數(shù),即非区,在NIO傳輸模式下一個(gè)讀循環(huán)最大能執(zhí)行幾次循環(huán)操作瓜挽。
    如果該值大于1,那么一次事件循環(huán)可能嘗試去讀多次以獲取多個(gè)消息征绸。

  • MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead)
    設(shè)置一個(gè)讀循環(huán)可讀取的最大消息個(gè)數(shù)久橙。
    如果該值大于1,那么一次事件循環(huán)可能嘗試去讀多次以獲取多個(gè)消息管怠。

DefaultMaxMessagesRecvByteBufAllocator

MaxMessagesRecvByteBufAllocator的默認(rèn)實(shí)現(xiàn)淆衷,它遵守「ChannelConfig#isAutoRead()」并防止溢出。

    private volatile int maxMessagesPerRead;

    public DefaultMaxMessagesRecvByteBufAllocator() {
        this(1);
    }

    public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
        maxMessagesPerRead(maxMessagesPerRead);
    }

    @Override
    public int maxMessagesPerRead() {
        return maxMessagesPerRead;
    }

    @Override
    public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
        if (maxMessagesPerRead <= 0) {
            throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
        }
        this.maxMessagesPerRead = maxMessagesPerRead;
        return this;
    }

默認(rèn)構(gòu)造方法提供的每個(gè)讀取循環(huán)最大可讀取消息個(gè)數(shù)為1渤弛。

內(nèi)部類

public abstract class MaxMessageHandle implements ExtendedHandle
專注于施行每次讀操作最多可讀取消息個(gè)數(shù)的條件祝拯,用于「continueReading()」中。

// Channel的配置對(duì)象
private ChannelConfig config;

// 每個(gè)讀循環(huán)可循環(huán)讀取消息的最大次數(shù)她肯。
private int maxMessagePerRead;

// 目前讀循環(huán)已經(jīng)讀取的消息個(gè)數(shù)佳头。即鹰贵,在NIO傳輸模式下也就是讀循環(huán)已經(jīng)執(zhí)行的循環(huán)次數(shù)
private int totalMessages;

// 目前已經(jīng)讀取到的消息字節(jié)總數(shù)
private int totalBytesRead;

// 本次將要進(jìn)行的讀操作,期望讀取的字節(jié)數(shù)康嘉。也就是有這么多個(gè)字節(jié)等待被讀取碉输。
private int attemptedBytesRead;

// 最后一次讀操作讀取到的字節(jié)數(shù)。
private int lastBytesRead;


        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                return attemptedBytesRead == lastBytesRead;
            }
        };

默認(rèn)判斷是否可讀取更多消息的提供器凄鼻,會(huì)在continueReading操作中使用腊瑟。
表示,如果‘最近一次讀操作所期望讀取的字節(jié)數(shù)’與‘最近一次讀操作真實(shí)讀取的字節(jié)數(shù)’一樣块蚌,則表示當(dāng)前的緩沖區(qū)容量已經(jīng)被寫滿了闰非,可能還有數(shù)據(jù)等待著被讀取。

  • reset(ChannelConfig config)
        public void reset(ChannelConfig config) {
            this.config = config;
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

重新設(shè)置config成員變量峭范,并將totalMessages财松、totalBytesRead重置為0。重置maxMessagePerRead纱控。

  • ByteBuf allocate(ByteBufAllocator alloc)
        public ByteBuf allocate(ByteBufAllocator alloc) {
            return alloc.ioBuffer(guess());
        }

根據(jù)給定的緩沖區(qū)分配器辆毡,以及guess()所返回的預(yù)測(cè)的緩存區(qū)容量大小,構(gòu)建一個(gè)新的緩沖區(qū)甜害。

  • void lastBytesRead(int bytes)
        public final void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

設(shè)置最近一次讀操作的讀取字節(jié)數(shù)舶掖。這里只有當(dāng)bytes>0時(shí)才會(huì)進(jìn)行totalBytesRead的累加。因?yàn)楫?dāng)bytes<0時(shí)尔店,不是真實(shí)的讀取字節(jié)的數(shù)量了眨攘,而標(biāo)識(shí)一個(gè)外部強(qiáng)制執(zhí)行終止的情況。

  • continueReading
        public boolean continueReading() {
            return continueReading(defaultMaybeMoreSupplier);
        }

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   maybeMoreDataSupplier.get() &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

continueReading返回true需要同時(shí)滿足如下條件:
① ChannelConfig的設(shè)置為可自動(dòng)讀取嚣州。即鲫售,autoRead屬性為1。
② maybeMoreDataSupplier.get()返回為true该肴,這個(gè)我們?cè)谏厦嬉呀?jīng)討論過(guò)了情竹。也就是當(dāng)‘最近一次讀操作所期望讀取的字節(jié)數(shù)’與‘最近一次讀操作真實(shí)讀取的字節(jié)數(shù)’一樣,則表示當(dāng)前可能還有數(shù)據(jù)等待被讀取匀哄。則就會(huì)返回true秦效。
③ totalMessages < maxMessagePerRead : 已經(jīng)讀取的消息次數(shù) < 一個(gè)讀循環(huán)最大能讀取消息的次數(shù)
④ totalBytesRead > 0 :因?yàn)閠otalBytesRead是int類型,所以totalBytesRead的最大值是’Integer.MAX_VALUE’(即拱雏,2147483647)棉安。所以,也限制了一個(gè)讀循環(huán)最大能讀取的字節(jié)數(shù)為2147483647铸抑。

  • int totalBytesRead()
        protected final int totalBytesRead() {
            return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
        }

返回已經(jīng)讀取的字節(jié)個(gè)數(shù)贡耽,若‘totalBytesRead < 0’則說(shuō)明已經(jīng)讀取的字節(jié)數(shù)已經(jīng)操作了’Integer.MAX_VALUE’,則返回Integer.MAX_VALUE;否則返回真實(shí)的已經(jīng)讀取的字節(jié)數(shù)蒲赂。

同時(shí)阱冶,我們可以留意MaxMessageHandle抽象類,將‘incMessagesRead(int amt)’滥嘴、‘lastBytesRead(int bytes)’木蹬、‘int lastBytesRead()’、‘int totalBytesRead()’這幾個(gè)方法都定義為了final修飾符若皱,這使得子類不能夠?qū)@幾個(gè)方法進(jìn)行重寫镊叁。

AdaptiveRecvByteBufAllocator

RecvByteBufAllocator會(huì)根據(jù)反饋?zhàn)詣?dòng)的增加和減少可預(yù)測(cè)的buffer的大小。
它會(huì)逐漸地增加期望的可讀到的字節(jié)數(shù)如果之前的讀循環(huán)操作所讀取到的字節(jié)數(shù)據(jù)已經(jīng)完全填充滿了分配好的buffer( 也就是走触,上一次的讀循環(huán)操作中執(zhí)行的所有讀取操作所累加的讀到的字節(jié)數(shù)晦譬,已經(jīng)大于等于預(yù)測(cè)分配的buffer的容量大小,那么它就會(huì)很優(yōu)雅的自動(dòng)的去增加可讀的字節(jié)數(shù)量互广,也就是自動(dòng)的增加緩沖區(qū)的大小 )敛腌。它也會(huì)逐漸的減少期望的可讀的字節(jié)數(shù)如果’連續(xù)’兩次讀循環(huán)操作后都沒(méi)有填充滿分配的buffer。否則惫皱,它會(huì)保持相同的預(yù)測(cè)像樊。

// 在調(diào)整緩沖區(qū)大小時(shí),若是增加緩沖區(qū)容量旅敷,那么增加的索引值生棍。
// 比如,當(dāng)前緩沖區(qū)的大小為SIZE_TABLE[20],若預(yù)測(cè)下次需要?jiǎng)?chuàng)建的緩沖區(qū)需要增加容量大小媳谁,
// 則新緩沖區(qū)的大小為SIZE_TABLE[20 + INDEX_INCREMENT]足绅,即SIZE_TABLE[24]
private static final int INDEX_INCREMENT = 4;    

// 在調(diào)整緩沖區(qū)大小時(shí),若是減少緩沖區(qū)容量韩脑,那么減少的索引值。
// 比如粹污,當(dāng)前緩沖區(qū)的大小為SIZE_TABLE[20],若預(yù)測(cè)下次需要?jiǎng)?chuàng)建的緩沖區(qū)需要減小容量大小段多,
// 則新緩沖區(qū)的大小為SIZE_TABLE[20 - INDEX_DECREMENT],即SIZE_TABLE[19]
private static final int INDEX_DECREMENT = 1;    

// 用于存儲(chǔ)緩沖區(qū)容量大小的數(shù)組
private static final int[] SIZE_TABLE;    
    static {
        List<Integer> sizeTable = new ArrayList<Integer>();
        for (int i = 16; i < 512; i += 16) {
            sizeTable.add(i);
        }

        for (int i = 512; i > 0; i <<= 1) {
            sizeTable.add(i);
        }

        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {
            SIZE_TABLE[i] = sizeTable.get(i);
        }
    }

① 依次往sizeTable添加元素:[16 , (512-16)]之間16的倍數(shù)壮吩。即进苍,16、32鸭叙、48...496
② 然后再往sizeTable中添加元素:[512 , 512 * (2^N))觉啊,N > 1; 直到數(shù)值超過(guò)Integer的限制(2^31 - 1);
③ 根據(jù)sizeTable長(zhǎng)度構(gòu)建一個(gè)靜態(tài)成員常量數(shù)組SIZE_TABLE沈贝,并將sizeTable中的元素賦值給SIZE_TABLE數(shù)組杠人。注意List是有序的,所以是根據(jù)插入元素的順序依次的賦值給SIZE_TABLE,SIZE_TABLE從下標(biāo)0開始嗡善。

SIZE_TABLE為預(yù)定義好的以從小到大的順序設(shè)定的可分配緩沖區(qū)的大小值的數(shù)組辑莫。因?yàn)锳daptiveRecvByteBufAllocator作用是可自動(dòng)適配每次讀事件使用的buffer的大小。這樣當(dāng)需要對(duì)buffer大小做調(diào)整時(shí)罩引,只要根據(jù)一定邏輯從SIZE_TABLE中取出值各吨,然后根據(jù)該值創(chuàng)建新buffer即可。

  • 構(gòu)造方法
static final int DEFAULT_MINIMUM = 64;    // 默認(rèn)緩沖區(qū)的最小容量大小為64
static final int DEFAULT_INITIAL = 1024;    // 默認(rèn)緩沖區(qū)的容量大小為1024
static final int DEFAULT_MAXIMUM = 65536;    // 默認(rèn)緩沖區(qū)的最大容量大小為65536

// 使用默認(rèn)參數(shù)創(chuàng)建一個(gè)新的AdaptiveRecvByteBufAllocator袁铐。
// 默認(rèn)參數(shù)揭蜒,預(yù)計(jì)緩沖區(qū)大小從1024開始,最小不會(huì)小于64剔桨,最大不會(huì)大于65536屉更。
public AdaptiveRecvByteBufAllocator() {
    this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}


    private final int minIndex;    // 緩沖區(qū)最小容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置
    private final int maxIndex;    // 緩沖區(qū)最大容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置
    private final int initial;     // 緩沖區(qū)默認(rèn)容量大小 

    // 使用指定的參數(shù)創(chuàng)建AdaptiveRecvByteBufAllocator對(duì)象。
    // 其中minimum领炫、initial偶垮、maximum是正整數(shù)。然后通過(guò)getSizeTableIndex()方法獲取相應(yīng)容量在SIZE_TABLE中的索引位置帝洪。
    // 并將計(jì)算出來(lái)的索引賦值給相應(yīng)的成員變量minIndex似舵、maxIndex。同時(shí)保證「SIZE_TABLE[minIndex] >= minimum」以及「SIZE_TABLE[maxIndex] <= maximum」.
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        if (minimum <= 0) {
            throw new IllegalArgumentException("minimum: " + minimum);
        }
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }

        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }


  • int getSizeTableIndex(final int size)
    private static int getSizeTableIndex(final int size) {
        for (int low = 0, high = SIZE_TABLE.length - 1;;) {
            if (high < low) {
                return low;
            }
            if (high == low) {
                return high;
            }

            int mid = low + high >>> 1;
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {
                low = mid + 1;
            } else if (size < a) {
                high = mid - 1;
            } else if (size == a) {
                return mid;
            } else {
                return mid + 1;
            }
        }
    }

因?yàn)镾IZE_TABLE數(shù)組是一個(gè)有序數(shù)組葱峡,因此此處用二分查找法砚哗,查找size在SIZE_TABLE中的位置,如果size存在于SIZE_TABLE中砰奕,則返回對(duì)應(yīng)的索引值蛛芥;否則返回接近于size大小的SIZE_TABLE數(shù)組元素的索引值。

  • Handle newHandle()
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }

創(chuàng)建一個(gè)HandleImpl對(duì)象军援,參數(shù)為minIndex仅淑,maxIndex以及initail為AdaptiveRecvByteBufAllocator對(duì)象的成員變量值。

內(nèi)部類

private final class HandleImpl extends MaxMessageHandle
HandleImpl是AdaptiveRecvByteBufAllocator一個(gè)內(nèi)部類胸哥,該處理器類用于提供真實(shí)的操作并保留預(yù)測(cè)最佳緩沖區(qū)容量所需的內(nèi)部信息涯竟。

// 緩沖區(qū)最小容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置,同外部類AdaptiveRecvByteBufAllocator是一個(gè)值
private final int minIndex;    

// 緩沖區(qū)最大容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置空厌,同外部類AdaptiveRecvByteBufAllocator是一個(gè)值
private final int maxIndex;    

// 緩沖區(qū)默認(rèn)容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置庐船,外部類AdaptiveRecvByteBufAllocator記錄的是容量大小值,而HandleImpl中記錄是其值對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置
private int index;            

// 下一次創(chuàng)建緩沖區(qū)時(shí)的其容量的大小嘲更。
private int nextReceiveBufferSize;    

// 在record()方法中使用筐钟,用于標(biāo)識(shí)是否需要減少下一次創(chuàng)建的緩沖區(qū)的大小。
private boolean decreaseNow;        
  • 構(gòu)造方法
    public HandleImpl(int minIndex, int maxIndex, int initial) {
        this.minIndex = minIndex;
        this.maxIndex = maxIndex;

        index = getSizeTableIndex(initial);
        nextReceiveBufferSize = SIZE_TABLE[index];
    }

給成員變量minIndex赋朦、maxIndex賦值篓冲。同時(shí)通過(guò)getSizeTableIndex(initial)計(jì)算出初始容量在SIZE_TABLE的索引值李破,將其賦值為成員變量index。并將初始容量大小(即纹因,SIZE_TABLE[index])賦值給成員變量nextReceiveBufferSize喷屋。

  • int guess()
public int guess() {
    return nextReceiveBufferSize;
}

返回推測(cè)的緩沖區(qū)容量大小,即瞭恰,返回成員變量nextReceiveBufferSize的值屯曹。

  • record(int actualReadBytes)
        private void record(int actualReadBytes) {
            if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
                if (decreaseNow) {
                    index = Math.max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {
                    decreaseNow = true;
                }
            } else if (actualReadBytes >= nextReceiveBufferSize) {
                index = Math.min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }

record方法很重要,它就是完成預(yù)測(cè)下一個(gè)緩沖區(qū)容量大小的操作惊畏。邏輯如下:
若發(fā)現(xiàn)兩次恶耽,本次讀循環(huán)真實(shí)讀取的字節(jié)總數(shù) <= ‘SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]’ 則,減少預(yù)測(cè)的緩沖區(qū)容量大小颜启。重新給成員變量index賦值為為‘index - 1’偷俭,若‘index - 1’ < minIndex,則index新值為minIndex缰盏。根據(jù)算出來(lái)的新的index索引涌萤,給成員變量nextReceiveBufferSize重新賦值'SIZE_TABLE[index]’。最后將decreaseNow置位false口猜,該字段用于表示是否有’連續(xù)’的兩次真實(shí)讀取的數(shù)據(jù)滿足可減少容量大小的情況负溪。注意,這里說(shuō)的‘連續(xù)’并不是真的連續(xù)發(fā)送济炎,而是指滿足條件(即川抡,‘a(chǎn)ctualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]’)兩次的期間沒(méi)有發(fā)生‘a(chǎn)ctualReadBytes >= nextReceiveBufferSize’的情況。
若须尚,本次讀循環(huán)真實(shí)讀取的字節(jié)總數(shù) >= 預(yù)測(cè)的緩沖區(qū)大小崖堤,則進(jìn)行增加預(yù)測(cè)的緩沖區(qū)容量大小。新的index為‘index + 4’耐床,若‘index + 4’ > maxIndex密幔,則index新值為maxIndex。根據(jù)算出來(lái)的新的index索引撩轰,給成員變量nextReceiveBufferSize重新賦值'SIZE_TABLE[index]’老玛。最后將decreaseNow置位false。

  • readComplete()
        public void readComplete() {
            record(totalBytesRead());
        }

每次讀循環(huán)完成后钧敞,會(huì)調(diào)用該方法。根據(jù)本次讀循環(huán)讀取的字節(jié)數(shù)來(lái)調(diào)整預(yù)測(cè)的緩沖區(qū)容量大小麸粮。

結(jié)合Netty源碼中的實(shí)際使用場(chǎng)景對(duì)AdaptiveRecvByteBufAllocator做進(jìn)一步的分析

下面結(jié)合NioServerSocketChannel的ACCEPT事件以及NioSocketChannel的READ事件處理中對(duì)AdaptiveRecvByteBufAllocator使用溉苛,來(lái)進(jìn)一步的了解AdaptiveRecvByteBufAllocator的真正用途。本文僅對(duì)AdaptiveRecvByteBufAllocator的使用進(jìn)行解釋弄诲,具體的事件的處理流程并不是本文的重點(diǎn)愚战。

ACCEPT事件中對(duì)AdaptiveRecvByteBufAllocator的使用

當(dāng)服務(wù)端收到客戶端的一個(gè)連接請(qǐng)求時(shí)娇唯,‘SelectionKey.OP_ACCEPT’將會(huì)觸發(fā)。在NioEventLoop的事件循環(huán)中會(huì)對(duì)該事件進(jìn)行處理:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

我們來(lái)看看unsafe.read()的實(shí)現(xiàn)寂玲,在NioServerSocketChannel中unsafe是一個(gè)NioMessageUnsafe實(shí)例:

① 獲取一個(gè)AdaptiveRecvByteBufAllocator.HandleImpl實(shí)例
如果recvHandler不存在則創(chuàng)建一個(gè)AdaptiveRecvByteBufAllocator.HandleImpl實(shí)例塔插,然后返回;否則直接返回拓哟。
?

② 對(duì)config成員變量賦值想许,即,Handle中持有NioServerSocketChannelConfig對(duì)象的引用断序;將maxMessagePerRead重置(這里為16流纹,是在初始化NioServerSocketChannel的時(shí)候進(jìn)行的相關(guān)設(shè)置),totalMessages违诗、totalBytesRead重置為0漱凝;



在啟動(dòng)流程中初始化NioServerSocketChannel的時(shí)候,會(huì)同時(shí)構(gòu)建一個(gè)Channel的配置對(duì)象DefaultChannelConfig诸迟,在初始化該對(duì)象的過(guò)程中就會(huì)完成對(duì)AdaptiveRecvByteBufAllocator的創(chuàng)建茸炒,并修改其maxMessagesPerRead屬性:
而channel.metadata()返回的是NioServerSocketChannel中的靜態(tài)常量「ChannelMetadata METADATA」,它表示一個(gè)hasDisconnect為false阵苇,defaultMaxMessagesPerRead為16的Channel屬性實(shí)現(xiàn)壁公。
因此,NioServerSocketChannel關(guān)聯(lián)的AdaptiveRecvByteBufAllocator的maxMessagePerRead屬性值為16慎玖。
?
③ 這步主要是通過(guò)serverSocket.accpet()來(lái)接受生成和客戶端通信的socket贮尖,并將其放入到readBuf集合中。如果該流程操作正確趁怔,則返回’1’湿硝,表示已經(jīng)讀取一條消息(即,在處理ACCEPT事件中消息的讀取指的就是接收一個(gè)客戶端的請(qǐng)求「serverSocket.accpet()」操作)润努;否則返回0关斜,若返回0,則會(huì)退出while讀循環(huán)铺浇。

④ 在執(zhí)行完消息的讀取后(即痢畜,在處理ACCEPT事件中消息的讀取指的就是接收一個(gè)客戶端的請(qǐng)求「serverSocket.accpet()」操作),將執(zhí)行allocHandle.incMessagesRead(localRead)來(lái)增加已經(jīng)讀取消息的個(gè)數(shù)鳍侣。底層就是根據(jù)localRead的值對(duì)totalMessages屬性進(jìn)行累加丁稀。

⑤ 在進(jìn)行下一次循環(huán)進(jìn)行消息的讀取前,會(huì)先執(zhí)行該判斷倚聚,判斷是否可以繼續(xù)的去讀取消息线衫。

a) config.isAutoRead():
NioServerSocketChannelConfig的autoRead默認(rèn)為1,因此該判斷為true
b) maybeMoreDataSupplier.get() :

因?yàn)閍ccept中并沒(méi)有真實(shí)讀取字節(jié)的操作惑折,因此此時(shí)授账,attemptedBytesRead枯跑、lastBytesRead都為0。該判斷為true白热;
c) totalMessages < maxMessagePerRead:根據(jù)上面的流程我們可以知道敛助,maxMessagePerRead為16,totalMessages也為1屋确。因?yàn)榇伺袛酁閠rue纳击。
d) totalBytesRead > 0:因?yàn)閍ccept操作只是接收一個(gè)新的連接,并沒(méi)有進(jìn)行真實(shí)的數(shù)據(jù)讀取操作乍恐,因此totalBytesRead為0评疗。因此此判斷為false。
也就是allocHandle.continueReading()將返回false茵烈。因此退出循環(huán)百匆,會(huì)繼續(xù)讀取消息。

⑥ 根據(jù)本次讀取的字節(jié)數(shù)呜投,對(duì)下次創(chuàng)建的緩沖區(qū)大小做調(diào)整加匈。其實(shí)本方法,在ACCEPT事件中并沒(méi)用處仑荐。因?yàn)榈衿矗琒erverSocektChannel是沒(méi)有READ事件的,它只會(huì)處理ACCEPT事件粘招,所以它不會(huì)讀取任何的字節(jié)數(shù)據(jù)啥寇。再者在前面處理ACCEPT事件的流程中,我們也可以看到洒扎,我們并沒(méi)有使用allocHandle.allocate(allocator);來(lái)真實(shí)的創(chuàng)建一個(gè)緩沖區(qū)辑甜。

總結(jié),對(duì)于ACCEPT事件袍冷,每次讀循環(huán)執(zhí)行一次讀操作(但并沒(méi)有讀取任何字節(jié)數(shù)據(jù)磷醋,totalBytesRead > 0 為false)這也是符合NIO規(guī)范的,因?yàn)槊看蜛CCEPT事件被觸發(fā)時(shí)胡诗,都表示有一個(gè)客戶端向服務(wù)器端發(fā)起了連接請(qǐng)求诗充。

READ事件中對(duì)AdaptiveRecvByteBufAllocator的使用

當(dāng)有可讀數(shù)據(jù)準(zhǔn)備被讀取時(shí)肛著,‘SelectionKey.OP_READ’將會(huì)觸發(fā)窑睁。在NioEventLoop的事件循環(huán)中會(huì)對(duì)該事件進(jìn)行處理:

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

我們來(lái)看看unsafe.read()的實(shí)現(xiàn)宠页,在NioSocketChannel中unsafe是一個(gè)NioByteUnsafe實(shí)例:



① 同ACCEPT事件流程類似,會(huì)獲取到一個(gè)獲取一個(gè)AdaptiveRecvByteBufAllocator.HandleImpl實(shí)例

② 同ACCEPT事件流程類似瑰抵,對(duì)config成員變量賦值缩歪,即,Handle中持有NioSocketChannelConfig對(duì)象的引用谍憔;將maxMessagePerRead重置(這里為16匪蝙,是在初始化NioSocketChannel的時(shí)候進(jìn)行的相關(guān)設(shè)置),totalMessages习贫、totalBytesRead重置為0逛球。只是這里是在實(shí)例化NioSocketChannel實(shí)例時(shí)完成的。

③ 根據(jù)提供的緩沖區(qū)分配器苫昌,創(chuàng)建一個(gè)由AdaptiveRecvByteBufAllocator.HandleImpl推測(cè)的容量大小的ByteBuf颤绕,并使用該ByteBuf來(lái)接收接下來(lái)要讀取的數(shù)據(jù)。第一次讀循環(huán)祟身,默認(rèn)的大小為1024奥务。

④ allocHandle.lastBytesRead(doReadBytes(byteBuf)) :

首先,會(huì)先根據(jù)byteBuf可寫的字節(jié)數(shù)來(lái)設(shè)置AdaptiveRecvByteBufAllocator.HandleImpl的attemptedBytesRead屬性袜硫。正如前面對(duì)AdaptiveRecvByteBufAllocator類的介紹所說(shuō)的:“它會(huì)分配一個(gè)新的接收緩存氯葬,該緩存的容量會(huì)盡可能的足夠大以讀入入站數(shù)據(jù)并且該緩存的容量也盡可能的小以不會(huì)浪費(fèi)它的空間⊥裣荩”因此帚称,它認(rèn)為它分配的ByteBuf中可寫的字節(jié)數(shù),就應(yīng)該是本次嘗試讀取到的字節(jié)數(shù)秽澳。然后闯睹。使用上面構(gòu)造的ByteBuf來(lái)接收SocketChannel的數(shù)據(jù),并且限制了最大讀取的字節(jié)數(shù)為attemptedBytesRead担神,然后將真是讀取的字節(jié)數(shù)設(shè)置到AdaptiveRecvByteBufAllocator.HandleImpl中的lastBytesRead屬性上楼吃。

⑤ 在執(zhí)行完消息的讀取后,將執(zhí)行allocHandle.incMessagesRead(1)來(lái)增加已經(jīng)讀取消息的次數(shù)妄讯。底層就是將totalMessages值+1孩锡。

⑥ 在進(jìn)行下一次循環(huán)進(jìn)行消息的讀取前,會(huì)先執(zhí)行該判斷捞挥,判斷是否可以繼續(xù)的去讀取消息浮创。

a) config.isAutoRead():
NioSocketChannelConfig的autoRead默認(rèn)為1,因此該判斷為true
b) maybeMoreDataSupplier.get() :

當(dāng)本次讀操作讀取到的字節(jié)數(shù)與AdaptiveRecvByteBufAllocator推測(cè)出的ByteBuf容量大小不一樣時(shí)砌函,就會(huì)返回false斩披;否則返回true。當(dāng)然讹俊,如上面所說(shuō)垦沉,如果本次讀操作可讀取的字節(jié)大于了attemptedBytesRead的話,一次讀操作也只會(huì)先讀取attemptedBytesRead的字節(jié)數(shù)仍劈。在滿足allocHandle.continueReading()的條件下厕倍,可以在讀循環(huán)中進(jìn)行下一次的數(shù)據(jù)讀取。每次讀循環(huán)都會(huì)構(gòu)建一個(gè)新的ByteBuf贩疙。但是讹弯,請(qǐng)注意况既,一個(gè)讀循環(huán)(可以包含多次讀操作)中每次的讀操作構(gòu)建的ByteBuf大小都是一樣的。
c) totalMessages < maxMessagePerRead:根據(jù)上面的流程我們可以知道组民,maxMessagePerRead為16棒仍,totalMessages為讀循環(huán)已經(jīng)執(zhí)行的讀操作次數(shù)(即,循環(huán)次數(shù))臭胜。
d) totalBytesRead > 0:當(dāng)本次讀操作有讀取到字節(jié)數(shù)時(shí)莫其,或者以讀取到的字節(jié)數(shù)小于Integer.MAX_VALUE,那么該判斷都會(huì)大于0耸三,即乱陡,為true;否則為false仪壮。

⑦ 最后憨颠,通過(guò)allocHandle.readComplete()來(lái)標(biāo)識(shí)本次讀循環(huán)結(jié)束,并根據(jù)本次讀循環(huán)的數(shù)據(jù)信息來(lái)預(yù)測(cè)下一次讀事件觸發(fā)時(shí)睛驳,應(yīng)該分配多大的ByteBuf容量更加合理些烙心。具體的調(diào)整邏輯,在上面的HandleImpl.record(int actualReadBytes)已經(jīng)進(jìn)行了詳細(xì)的說(shuō)明乏沸。

總結(jié)淫茵,對(duì)于READ事件,一個(gè)讀循環(huán)可能會(huì)執(zhí)行多次讀操作(即蹬跃,循環(huán)的次數(shù))匙瘪,至于進(jìn)行幾次讀操作,這將根據(jù)「allocHandle.continueReading()」以及當(dāng)前這次讀取的字節(jié)數(shù)來(lái)決定蝶缀。若‘a(chǎn)llocHandle.continueReading()’為false丹喻,或者本次讀取到的字節(jié)數(shù)<=0(當(dāng)沒(méi)有數(shù)據(jù)可讀取時(shí)為0,當(dāng)遠(yuǎn)端已經(jīng)關(guān)閉時(shí)為-1)翁都,都不會(huì)繼續(xù)進(jìn)行讀循環(huán)操作碍论。再者,一個(gè)讀循環(huán)中的每次讀操作分配的ByteBuf的容量大小都是一樣的柄慰。我們是在一個(gè)讀循環(huán)操作完成后鳍悠,才會(huì)根據(jù)本次的讀循環(huán)的情況對(duì)下一次讀操作的ByteBuf容量大小做預(yù)測(cè)。

后記

若文章有任何錯(cuò)誤坐搔,望大家不吝指教:)

參考

圣思園《精通并發(fā)與Netty》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末藏研,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子概行,更是在濱河造成了極大的恐慌蠢挡,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異业踏,居然都是意外死亡禽炬,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門勤家,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)瞎抛,“玉大人,你說(shuō)我怎么就攤上這事却紧。” “怎么了胎撤?”我有些...
    開封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵晓殊,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我伤提,道長(zhǎng)巫俺,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任肿男,我火速辦了婚禮介汹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舶沛。我一直安慰自己嘹承,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開白布如庭。 她就那樣靜靜地躺著叹卷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪坪它。 梳的紋絲不亂的頭發(fā)上骤竹,一...
    開封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音往毡,去河邊找鬼蒙揣。 笑死,一個(gè)胖子當(dāng)著我的面吹牛开瞭,可吹牛的內(nèi)容都是我干的懒震。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼惩阶,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼挎狸!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起断楷,我...
    開封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤锨匆,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體恐锣,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡茅主,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了土榴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片诀姚。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖玷禽,靈堂內(nèi)的尸體忽然破棺而出赫段,到底是詐尸還是另有隱情,我是刑警寧澤矢赁,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布糯笙,位于F島的核電站,受9級(jí)特大地震影響撩银,放射性物質(zhì)發(fā)生泄漏给涕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一额获、第九天 我趴在偏房一處隱蔽的房頂上張望够庙。 院中可真熱鬧,春花似錦抄邀、人聲如沸耘眨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)毅桃。三九已至,卻和暖如春准夷,著一層夾襖步出監(jiān)牢的瞬間钥飞,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工衫嵌, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留读宙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓楔绞,卻偏偏與公主長(zhǎng)得像结闸,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子酒朵,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容