本文是Netty文集中“Netty 源碼解析”系列的文章。主要對(duì)Netty的重要流程以及類進(jìn)行源碼解析运悲,以使得我們更好的去使用Netty晶通。Netty是一個(gè)非常優(yōu)秀的網(wǎng)絡(luò)框架,對(duì)其源碼解讀的過(guò)程也是不斷學(xué)習(xí)的過(guò)程华望。
AdaptiveRecvByteBufAllocator主要用于構(gòu)建一個(gè)最優(yōu)大小的緩沖區(qū)來(lái)接收數(shù)據(jù)蕊蝗。比如,在讀事件中就會(huì)通過(guò)該類來(lái)獲取一個(gè)最優(yōu)大小的的緩沖區(qū)來(lái)接收對(duì)端發(fā)送過(guò)來(lái)的可讀取的數(shù)據(jù)赖舟。
關(guān)于AdaptiveRecvByteBufAllocator的分析蓬戚,會(huì)通過(guò)一層層的Java doc來(lái)展開。如果一開始看這些方法說(shuō)明有些個(gè)不大明白宾抓,沒(méi)關(guān)系子漩,在文章的最后會(huì)結(jié)合Netty中真實(shí)的使用場(chǎng)景來(lái)對(duì)AdaptiveRecvByteBufAllocator的使用說(shuō)明分析,這樣就能更好的理解AdaptiveRecvByteBufAllocator的用途石洗。
本文主要針對(duì)NIO網(wǎng)絡(luò)傳輸模式對(duì)AdaptiveRecvByteBufAllocator類展開的分析幢泼。
RecvByteBufAllocator
分配一個(gè)新的接收緩存,該緩存的容量會(huì)盡可能的足夠大以讀入所有的入站數(shù)據(jù)并且該緩存的容量也盡可能的小以不會(huì)浪費(fèi)它的空間讲衫。
Handle newHandle()
創(chuàng)建一個(gè)新的處理器缕棵,該處理器提供一個(gè)真實(shí)的操作并持有內(nèi)部信息,該信息是用于預(yù)測(cè)一個(gè)最優(yōu)緩沖區(qū)大小的必要信息涉兽。
內(nèi)部接口
interface Handle
ByteBuf allocate(ByteBufAllocator alloc);
創(chuàng)建一個(gè)新的接收緩沖區(qū)招驴,該緩沖區(qū)的大小可能足夠大去讀取所有的數(shù)據(jù)并且足夠小以至于不會(huì)浪費(fèi)它的空間。int guess()
類似于一個(gè)「allocate(ByteBufAllocator)」操作花椭,除了它不會(huì)去分配任何東西只是告訴你分配容量的大小忽匈。void reset(ChannelConfig config)
重置所有的計(jì)數(shù)器,該計(jì)數(shù)器已經(jīng)累計(jì)并會(huì)推薦下次讀循環(huán)應(yīng)該讀取的消息次數(shù)以及讀取字節(jié)的數(shù)量矿辽。
它可能被「continueReading()」方式使用去決定是否讀操作應(yīng)該完成丹允。
這僅僅只是一個(gè)暗示并且可能被忽略在實(shí)現(xiàn)的時(shí)候。
參數(shù) config:config是Channel的配置袋倔,它可能會(huì)影響對(duì)象的行為雕蔽。void incMessagesRead(int numMessages)
增加當(dāng)前讀循環(huán)中已經(jīng)讀取消息的次數(shù)(注意,這里不是讀取數(shù)據(jù)的字節(jié)數(shù)宾娜,讀取消息指的是一個(gè)讀操作)批狐。void lastBytesRead(int bytes)
設(shè)置最后一次讀操作已經(jīng)讀取到的字節(jié)數(shù)。
這可能被用于增加已經(jīng)讀取的字節(jié)數(shù)。
參數(shù) bytes:由讀操作提供的字節(jié)數(shù)嚣艇。這可能是一個(gè)負(fù)數(shù)如果一個(gè)讀錯(cuò)誤發(fā)生承冰。如果看到負(fù)值,那么它會(huì)預(yù)計(jì)在下一次調(diào)用「lastBytesRead()」時(shí)返回食零。對(duì)于該類來(lái)說(shuō)一個(gè)負(fù)值將標(biāo)識(shí)一個(gè)外部強(qiáng)制執(zhí)行終止的情況困乒,并且它不要求在「continueReading()」時(shí)執(zhí)行。int lastBytesRead()
獲取最近一次讀操作的字節(jié)數(shù)贰谣。void attemptedBytesRead(int bytes)
設(shè)置有多少字節(jié)讀操作將嘗試讀取娜搂,或已經(jīng)讀取。int attemptedBytesRead()
獲取有多少字節(jié)讀操作將嘗試讀取吱抚,或已經(jīng)讀取百宇。boolean continueReading()
檢測(cè)是否當(dāng)前的讀循環(huán)可以繼續(xù)。
返回true:如果讀循環(huán)應(yīng)該繼續(xù)讀取數(shù)據(jù)秘豹;false:如果讀循環(huán)已經(jīng)完成携御。void readComplete()
讀操作已經(jīng)完成。
interface ExtendedHandle extends Handle
- boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier)
和「Handle#continueReading()」方法一樣憋肖,除了通過(guò)supplier參數(shù)讓更多的數(shù)據(jù)來(lái)決定結(jié)果因痛。
內(nèi)部類
class DelegatingHandle implements Handle
該類只有一個(gè)真實(shí)Handler的引用,所有方法的請(qǐng)求最終都將由這個(gè)真實(shí)Handler來(lái)完成岸更。
該類實(shí)現(xiàn)了代理模式中的靜態(tài)代理。
MaxMessagesRecvByteBufAllocator
限制事件循環(huán)嘗試讀操作(比如膊升,READ事件觸發(fā))時(shí)嘗試讀操作的次數(shù)怎炊。比如,處理READ事件時(shí)廓译,限制讀循環(huán)操作可執(zhí)行的讀操作的次數(shù)评肆。
int maxMessagesPerRead();
返回每個(gè)讀循環(huán)讀取消息的最大次數(shù),即非区,在NIO傳輸模式下一個(gè)讀循環(huán)最大能執(zhí)行幾次循環(huán)操作瓜挽。
如果該值大于1,那么一次事件循環(huán)可能嘗試去讀多次以獲取多個(gè)消息征绸。MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead)
設(shè)置一個(gè)讀循環(huán)可讀取的最大消息個(gè)數(shù)久橙。
如果該值大于1,那么一次事件循環(huán)可能嘗試去讀多次以獲取多個(gè)消息管怠。
DefaultMaxMessagesRecvByteBufAllocator
MaxMessagesRecvByteBufAllocator的默認(rèn)實(shí)現(xiàn)淆衷,它遵守「ChannelConfig#isAutoRead()」并防止溢出。
private volatile int maxMessagesPerRead;
public DefaultMaxMessagesRecvByteBufAllocator() {
this(1);
}
public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
maxMessagesPerRead(maxMessagesPerRead);
}
@Override
public int maxMessagesPerRead() {
return maxMessagesPerRead;
}
@Override
public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
if (maxMessagesPerRead <= 0) {
throw new IllegalArgumentException("maxMessagesPerRead: " + maxMessagesPerRead + " (expected: > 0)");
}
this.maxMessagesPerRead = maxMessagesPerRead;
return this;
}
默認(rèn)構(gòu)造方法提供的每個(gè)讀取循環(huán)最大可讀取消息個(gè)數(shù)為1渤弛。
內(nèi)部類
public abstract class MaxMessageHandle implements ExtendedHandle
專注于施行每次讀操作最多可讀取消息個(gè)數(shù)的條件祝拯,用于「continueReading()」中。
// Channel的配置對(duì)象
private ChannelConfig config;
// 每個(gè)讀循環(huán)可循環(huán)讀取消息的最大次數(shù)她肯。
private int maxMessagePerRead;
// 目前讀循環(huán)已經(jīng)讀取的消息個(gè)數(shù)佳头。即鹰贵,在NIO傳輸模式下也就是讀循環(huán)已經(jīng)執(zhí)行的循環(huán)次數(shù)
private int totalMessages;
// 目前已經(jīng)讀取到的消息字節(jié)總數(shù)
private int totalBytesRead;
// 本次將要進(jìn)行的讀操作,期望讀取的字節(jié)數(shù)康嘉。也就是有這么多個(gè)字節(jié)等待被讀取碉输。
private int attemptedBytesRead;
// 最后一次讀操作讀取到的字節(jié)數(shù)。
private int lastBytesRead;
private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return attemptedBytesRead == lastBytesRead;
}
};
默認(rèn)判斷是否可讀取更多消息的提供器凄鼻,會(huì)在continueReading操作中使用腊瑟。
表示,如果‘最近一次讀操作所期望讀取的字節(jié)數(shù)’與‘最近一次讀操作真實(shí)讀取的字節(jié)數(shù)’一樣块蚌,則表示當(dāng)前的緩沖區(qū)容量已經(jīng)被寫滿了闰非,可能還有數(shù)據(jù)等待著被讀取。
- reset(ChannelConfig config)
public void reset(ChannelConfig config) {
this.config = config;
maxMessagePerRead = maxMessagesPerRead();
totalMessages = totalBytesRead = 0;
}
重新設(shè)置config成員變量峭范,并將totalMessages财松、totalBytesRead重置為0。重置maxMessagePerRead纱控。
- ByteBuf allocate(ByteBufAllocator alloc)
public ByteBuf allocate(ByteBufAllocator alloc) {
return alloc.ioBuffer(guess());
}
根據(jù)給定的緩沖區(qū)分配器辆毡,以及guess()所返回的預(yù)測(cè)的緩存區(qū)容量大小,構(gòu)建一個(gè)新的緩沖區(qū)甜害。
- void lastBytesRead(int bytes)
public final void lastBytesRead(int bytes) {
lastBytesRead = bytes;
if (bytes > 0) {
totalBytesRead += bytes;
}
}
設(shè)置最近一次讀操作的讀取字節(jié)數(shù)舶掖。這里只有當(dāng)bytes>0時(shí)才會(huì)進(jìn)行totalBytesRead的累加。因?yàn)楫?dāng)bytes<0時(shí)尔店,不是真實(shí)的讀取字節(jié)的數(shù)量了眨攘,而標(biāo)識(shí)一個(gè)外部強(qiáng)制執(zhí)行終止的情況。
- continueReading
public boolean continueReading() {
return continueReading(defaultMaybeMoreSupplier);
}
@Override
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
continueReading返回true需要同時(shí)滿足如下條件:
① ChannelConfig的設(shè)置為可自動(dòng)讀取嚣州。即鲫售,autoRead屬性為1。
② maybeMoreDataSupplier.get()返回為true该肴,這個(gè)我們?cè)谏厦嬉呀?jīng)討論過(guò)了情竹。也就是當(dāng)‘最近一次讀操作所期望讀取的字節(jié)數(shù)’與‘最近一次讀操作真實(shí)讀取的字節(jié)數(shù)’一樣,則表示當(dāng)前可能還有數(shù)據(jù)等待被讀取匀哄。則就會(huì)返回true秦效。
③ totalMessages < maxMessagePerRead : 已經(jīng)讀取的消息次數(shù) < 一個(gè)讀循環(huán)最大能讀取消息的次數(shù)
④ totalBytesRead > 0 :因?yàn)閠otalBytesRead是int類型,所以totalBytesRead的最大值是’Integer.MAX_VALUE’(即拱雏,2147483647)棉安。所以,也限制了一個(gè)讀循環(huán)最大能讀取的字節(jié)數(shù)為2147483647铸抑。
- int totalBytesRead()
protected final int totalBytesRead() {
return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
}
返回已經(jīng)讀取的字節(jié)個(gè)數(shù)贡耽,若‘totalBytesRead < 0’則說(shuō)明已經(jīng)讀取的字節(jié)數(shù)已經(jīng)操作了’Integer.MAX_VALUE’,則返回Integer.MAX_VALUE;否則返回真實(shí)的已經(jīng)讀取的字節(jié)數(shù)蒲赂。
同時(shí)阱冶,我們可以留意MaxMessageHandle抽象類,將‘incMessagesRead(int amt)’滥嘴、‘lastBytesRead(int bytes)’木蹬、‘int lastBytesRead()’、‘int totalBytesRead()’這幾個(gè)方法都定義為了final修飾符若皱,這使得子類不能夠?qū)@幾個(gè)方法進(jìn)行重寫镊叁。
AdaptiveRecvByteBufAllocator
RecvByteBufAllocator會(huì)根據(jù)反饋?zhàn)詣?dòng)的增加和減少可預(yù)測(cè)的buffer的大小。
它會(huì)逐漸地增加期望的可讀到的字節(jié)數(shù)如果之前的讀循環(huán)操作所讀取到的字節(jié)數(shù)據(jù)已經(jīng)完全填充滿了分配好的buffer( 也就是走触,上一次的讀循環(huán)操作中執(zhí)行的所有讀取操作所累加的讀到的字節(jié)數(shù)晦譬,已經(jīng)大于等于預(yù)測(cè)分配的buffer的容量大小,那么它就會(huì)很優(yōu)雅的自動(dòng)的去增加可讀的字節(jié)數(shù)量互广,也就是自動(dòng)的增加緩沖區(qū)的大小 )敛腌。它也會(huì)逐漸的減少期望的可讀的字節(jié)數(shù)如果’連續(xù)’兩次讀循環(huán)操作后都沒(méi)有填充滿分配的buffer。否則惫皱,它會(huì)保持相同的預(yù)測(cè)像樊。
// 在調(diào)整緩沖區(qū)大小時(shí),若是增加緩沖區(qū)容量旅敷,那么增加的索引值生棍。
// 比如,當(dāng)前緩沖區(qū)的大小為SIZE_TABLE[20],若預(yù)測(cè)下次需要?jiǎng)?chuàng)建的緩沖區(qū)需要增加容量大小媳谁,
// 則新緩沖區(qū)的大小為SIZE_TABLE[20 + INDEX_INCREMENT]足绅,即SIZE_TABLE[24]
private static final int INDEX_INCREMENT = 4;
// 在調(diào)整緩沖區(qū)大小時(shí),若是減少緩沖區(qū)容量韩脑,那么減少的索引值。
// 比如粹污,當(dāng)前緩沖區(qū)的大小為SIZE_TABLE[20],若預(yù)測(cè)下次需要?jiǎng)?chuàng)建的緩沖區(qū)需要減小容量大小段多,
// 則新緩沖區(qū)的大小為SIZE_TABLE[20 - INDEX_DECREMENT],即SIZE_TABLE[19]
private static final int INDEX_DECREMENT = 1;
// 用于存儲(chǔ)緩沖區(qū)容量大小的數(shù)組
private static final int[] SIZE_TABLE;
static {
List<Integer> sizeTable = new ArrayList<Integer>();
for (int i = 16; i < 512; i += 16) {
sizeTable.add(i);
}
for (int i = 512; i > 0; i <<= 1) {
sizeTable.add(i);
}
SIZE_TABLE = new int[sizeTable.size()];
for (int i = 0; i < SIZE_TABLE.length; i ++) {
SIZE_TABLE[i] = sizeTable.get(i);
}
}
① 依次往sizeTable添加元素:[16 , (512-16)]之間16的倍數(shù)壮吩。即进苍,16、32鸭叙、48...496
② 然后再往sizeTable中添加元素:[512 , 512 * (2^N))觉啊,N > 1; 直到數(shù)值超過(guò)Integer的限制(2^31 - 1);
③ 根據(jù)sizeTable長(zhǎng)度構(gòu)建一個(gè)靜態(tài)成員常量數(shù)組SIZE_TABLE沈贝,并將sizeTable中的元素賦值給SIZE_TABLE數(shù)組杠人。注意List是有序的,所以是根據(jù)插入元素的順序依次的賦值給SIZE_TABLE,SIZE_TABLE從下標(biāo)0開始嗡善。
SIZE_TABLE為預(yù)定義好的以從小到大的順序設(shè)定的可分配緩沖區(qū)的大小值的數(shù)組辑莫。因?yàn)锳daptiveRecvByteBufAllocator作用是可自動(dòng)適配每次讀事件使用的buffer的大小。這樣當(dāng)需要對(duì)buffer大小做調(diào)整時(shí)罩引,只要根據(jù)一定邏輯從SIZE_TABLE中取出值各吨,然后根據(jù)該值創(chuàng)建新buffer即可。
- 構(gòu)造方法
static final int DEFAULT_MINIMUM = 64; // 默認(rèn)緩沖區(qū)的最小容量大小為64
static final int DEFAULT_INITIAL = 1024; // 默認(rèn)緩沖區(qū)的容量大小為1024
static final int DEFAULT_MAXIMUM = 65536; // 默認(rèn)緩沖區(qū)的最大容量大小為65536
// 使用默認(rèn)參數(shù)創(chuàng)建一個(gè)新的AdaptiveRecvByteBufAllocator袁铐。
// 默認(rèn)參數(shù)揭蜒,預(yù)計(jì)緩沖區(qū)大小從1024開始,最小不會(huì)小于64剔桨,最大不會(huì)大于65536屉更。
public AdaptiveRecvByteBufAllocator() {
this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
}
private final int minIndex; // 緩沖區(qū)最小容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置
private final int maxIndex; // 緩沖區(qū)最大容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置
private final int initial; // 緩沖區(qū)默認(rèn)容量大小
// 使用指定的參數(shù)創(chuàng)建AdaptiveRecvByteBufAllocator對(duì)象。
// 其中minimum领炫、initial偶垮、maximum是正整數(shù)。然后通過(guò)getSizeTableIndex()方法獲取相應(yīng)容量在SIZE_TABLE中的索引位置帝洪。
// 并將計(jì)算出來(lái)的索引賦值給相應(yīng)的成員變量minIndex似舵、maxIndex。同時(shí)保證「SIZE_TABLE[minIndex] >= minimum」以及「SIZE_TABLE[maxIndex] <= maximum」.
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
if (minimum <= 0) {
throw new IllegalArgumentException("minimum: " + minimum);
}
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
- int getSizeTableIndex(final int size)
private static int getSizeTableIndex(final int size) {
for (int low = 0, high = SIZE_TABLE.length - 1;;) {
if (high < low) {
return low;
}
if (high == low) {
return high;
}
int mid = low + high >>> 1;
int a = SIZE_TABLE[mid];
int b = SIZE_TABLE[mid + 1];
if (size > b) {
low = mid + 1;
} else if (size < a) {
high = mid - 1;
} else if (size == a) {
return mid;
} else {
return mid + 1;
}
}
}
因?yàn)镾IZE_TABLE數(shù)組是一個(gè)有序數(shù)組葱峡,因此此處用二分查找法砚哗,查找size在SIZE_TABLE中的位置,如果size存在于SIZE_TABLE中砰奕,則返回對(duì)應(yīng)的索引值蛛芥;否則返回接近于size大小的SIZE_TABLE數(shù)組元素的索引值。
- Handle newHandle()
public Handle newHandle() {
return new HandleImpl(minIndex, maxIndex, initial);
}
創(chuàng)建一個(gè)HandleImpl對(duì)象军援,參數(shù)為minIndex仅淑,maxIndex以及initail為AdaptiveRecvByteBufAllocator對(duì)象的成員變量值。
內(nèi)部類
private final class HandleImpl extends MaxMessageHandle
HandleImpl是AdaptiveRecvByteBufAllocator一個(gè)內(nèi)部類胸哥,該處理器類用于提供真實(shí)的操作并保留預(yù)測(cè)最佳緩沖區(qū)容量所需的內(nèi)部信息涯竟。
// 緩沖區(qū)最小容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置,同外部類AdaptiveRecvByteBufAllocator是一個(gè)值
private final int minIndex;
// 緩沖區(qū)最大容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置空厌,同外部類AdaptiveRecvByteBufAllocator是一個(gè)值
private final int maxIndex;
// 緩沖區(qū)默認(rèn)容量對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置庐船,外部類AdaptiveRecvByteBufAllocator記錄的是容量大小值,而HandleImpl中記錄是其值對(duì)應(yīng)于SIZE_TABLE中的下標(biāo)位置
private int index;
// 下一次創(chuàng)建緩沖區(qū)時(shí)的其容量的大小嘲更。
private int nextReceiveBufferSize;
// 在record()方法中使用筐钟,用于標(biāo)識(shí)是否需要減少下一次創(chuàng)建的緩沖區(qū)的大小。
private boolean decreaseNow;
- 構(gòu)造方法
public HandleImpl(int minIndex, int maxIndex, int initial) {
this.minIndex = minIndex;
this.maxIndex = maxIndex;
index = getSizeTableIndex(initial);
nextReceiveBufferSize = SIZE_TABLE[index];
}
給成員變量minIndex赋朦、maxIndex賦值篓冲。同時(shí)通過(guò)getSizeTableIndex(initial)計(jì)算出初始容量在SIZE_TABLE的索引值李破,將其賦值為成員變量index。并將初始容量大小(即纹因,SIZE_TABLE[index])賦值給成員變量nextReceiveBufferSize喷屋。
- int guess()
public int guess() {
return nextReceiveBufferSize;
}
返回推測(cè)的緩沖區(qū)容量大小,即瞭恰,返回成員變量nextReceiveBufferSize的值屯曹。
- record(int actualReadBytes)
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) {
if (decreaseNow) {
index = Math.max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = Math.min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
record方法很重要,它就是完成預(yù)測(cè)下一個(gè)緩沖區(qū)容量大小的操作惊畏。邏輯如下:
若發(fā)現(xiàn)兩次恶耽,本次讀循環(huán)真實(shí)讀取的字節(jié)總數(shù) <= ‘SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]’ 則,減少預(yù)測(cè)的緩沖區(qū)容量大小颜启。重新給成員變量index賦值為為‘index - 1’偷俭,若‘index - 1’ < minIndex,則index新值為minIndex缰盏。根據(jù)算出來(lái)的新的index索引涌萤,給成員變量nextReceiveBufferSize重新賦值'SIZE_TABLE[index]’。最后將decreaseNow置位false口猜,該字段用于表示是否有’連續(xù)’的兩次真實(shí)讀取的數(shù)據(jù)滿足可減少容量大小的情況负溪。注意,這里說(shuō)的‘連續(xù)’并不是真的連續(xù)發(fā)送济炎,而是指滿足條件(即川抡,‘a(chǎn)ctualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]’)兩次的期間沒(méi)有發(fā)生‘a(chǎn)ctualReadBytes >= nextReceiveBufferSize’的情況。
若须尚,本次讀循環(huán)真實(shí)讀取的字節(jié)總數(shù) >= 預(yù)測(cè)的緩沖區(qū)大小崖堤,則進(jìn)行增加預(yù)測(cè)的緩沖區(qū)容量大小。新的index為‘index + 4’耐床,若‘index + 4’ > maxIndex密幔,則index新值為maxIndex。根據(jù)算出來(lái)的新的index索引撩轰,給成員變量nextReceiveBufferSize重新賦值'SIZE_TABLE[index]’老玛。最后將decreaseNow置位false。
- readComplete()
public void readComplete() {
record(totalBytesRead());
}
每次讀循環(huán)完成后钧敞,會(huì)調(diào)用該方法。根據(jù)本次讀循環(huán)讀取的字節(jié)數(shù)來(lái)調(diào)整預(yù)測(cè)的緩沖區(qū)容量大小麸粮。
結(jié)合Netty源碼中的實(shí)際使用場(chǎng)景對(duì)AdaptiveRecvByteBufAllocator做進(jìn)一步的分析
下面結(jié)合NioServerSocketChannel的ACCEPT事件以及NioSocketChannel的READ事件處理中對(duì)AdaptiveRecvByteBufAllocator使用溉苛,來(lái)進(jìn)一步的了解AdaptiveRecvByteBufAllocator的真正用途。本文僅對(duì)AdaptiveRecvByteBufAllocator的使用進(jìn)行解釋弄诲,具體的事件的處理流程并不是本文的重點(diǎn)愚战。
ACCEPT事件中對(duì)AdaptiveRecvByteBufAllocator的使用
當(dāng)服務(wù)端收到客戶端的一個(gè)連接請(qǐng)求時(shí)娇唯,‘SelectionKey.OP_ACCEPT’將會(huì)觸發(fā)。在NioEventLoop的事件循環(huán)中會(huì)對(duì)該事件進(jìn)行處理:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
我們來(lái)看看unsafe.read()的實(shí)現(xiàn)寂玲,在NioServerSocketChannel中unsafe是一個(gè)NioMessageUnsafe實(shí)例:?
② 對(duì)config成員變量賦值想许,即,Handle中持有NioServerSocketChannelConfig對(duì)象的引用断序;將maxMessagePerRead重置(這里為16流纹,是在初始化NioServerSocketChannel的時(shí)候進(jìn)行的相關(guān)設(shè)置),totalMessages违诗、totalBytesRead重置為0漱凝;
在啟動(dòng)流程中初始化NioServerSocketChannel的時(shí)候,會(huì)同時(shí)構(gòu)建一個(gè)Channel的配置對(duì)象DefaultChannelConfig诸迟,在初始化該對(duì)象的過(guò)程中就會(huì)完成對(duì)AdaptiveRecvByteBufAllocator的創(chuàng)建茸炒,并修改其maxMessagesPerRead屬性:
因此,NioServerSocketChannel關(guān)聯(lián)的AdaptiveRecvByteBufAllocator的maxMessagePerRead屬性值為16慎玖。
?
③ 這步主要是通過(guò)serverSocket.accpet()來(lái)接受生成和客戶端通信的socket贮尖,并將其放入到readBuf集合中。如果該流程操作正確趁怔,則返回’1’湿硝,表示已經(jīng)讀取一條消息(即,在處理ACCEPT事件中消息的讀取指的就是接收一個(gè)客戶端的請(qǐng)求「serverSocket.accpet()」操作)润努;否則返回0关斜,若返回0,則會(huì)退出while讀循環(huán)铺浇。 ④ 在執(zhí)行完消息的讀取后(即痢畜,在處理ACCEPT事件中消息的讀取指的就是接收一個(gè)客戶端的請(qǐng)求「serverSocket.accpet()」操作),將執(zhí)行allocHandle.incMessagesRead(localRead)來(lái)增加已經(jīng)讀取消息的個(gè)數(shù)鳍侣。底層就是根據(jù)localRead的值對(duì)totalMessages屬性進(jìn)行累加丁稀。
b) maybeMoreDataSupplier.get() :
因?yàn)閍ccept中并沒(méi)有真實(shí)讀取字節(jié)的操作惑折,因此此時(shí)授账,attemptedBytesRead枯跑、lastBytesRead都為0。該判斷為true白热;
c) totalMessages < maxMessagePerRead:根據(jù)上面的流程我們可以知道敛助,maxMessagePerRead為16,totalMessages也為1屋确。因?yàn)榇伺袛酁閠rue纳击。
d) totalBytesRead > 0:因?yàn)閍ccept操作只是接收一個(gè)新的連接,并沒(méi)有進(jìn)行真實(shí)的數(shù)據(jù)讀取操作乍恐,因此totalBytesRead為0评疗。因此此判斷為false。
也就是allocHandle.continueReading()將返回false茵烈。因此退出循環(huán)百匆,會(huì)繼續(xù)讀取消息。
⑥ 根據(jù)本次讀取的字節(jié)數(shù)呜投,對(duì)下次創(chuàng)建的緩沖區(qū)大小做調(diào)整加匈。其實(shí)本方法,在ACCEPT事件中并沒(méi)用處仑荐。因?yàn)榈衿矗琒erverSocektChannel是沒(méi)有READ事件的,它只會(huì)處理ACCEPT事件粘招,所以它不會(huì)讀取任何的字節(jié)數(shù)據(jù)啥寇。再者在前面處理ACCEPT事件的流程中,我們也可以看到洒扎,我們并沒(méi)有使用allocHandle.allocate(allocator);來(lái)真實(shí)的創(chuàng)建一個(gè)緩沖區(qū)辑甜。
總結(jié),對(duì)于ACCEPT事件袍冷,每次讀循環(huán)執(zhí)行一次讀操作(但并沒(méi)有讀取任何字節(jié)數(shù)據(jù)磷醋,totalBytesRead > 0 為false)這也是符合NIO規(guī)范的,因?yàn)槊看蜛CCEPT事件被觸發(fā)時(shí)胡诗,都表示有一個(gè)客戶端向服務(wù)器端發(fā)起了連接請(qǐng)求诗充。
READ事件中對(duì)AdaptiveRecvByteBufAllocator的使用
當(dāng)有可讀數(shù)據(jù)準(zhǔn)備被讀取時(shí)肛著,‘SelectionKey.OP_READ’將會(huì)觸發(fā)窑睁。在NioEventLoop的事件循環(huán)中會(huì)對(duì)該事件進(jìn)行處理:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
我們來(lái)看看unsafe.read()的實(shí)現(xiàn)宠页,在NioSocketChannel中unsafe是一個(gè)NioByteUnsafe實(shí)例:
① 同ACCEPT事件流程類似,會(huì)獲取到一個(gè)獲取一個(gè)AdaptiveRecvByteBufAllocator.HandleImpl實(shí)例
② 同ACCEPT事件流程類似瑰抵,對(duì)config成員變量賦值缩歪,即,Handle中持有NioSocketChannelConfig對(duì)象的引用谍憔;將maxMessagePerRead重置(這里為16匪蝙,是在初始化NioSocketChannel的時(shí)候進(jìn)行的相關(guān)設(shè)置),totalMessages习贫、totalBytesRead重置為0逛球。只是這里是在實(shí)例化NioSocketChannel實(shí)例時(shí)完成的。
③ 根據(jù)提供的緩沖區(qū)分配器苫昌,創(chuàng)建一個(gè)由AdaptiveRecvByteBufAllocator.HandleImpl推測(cè)的容量大小的ByteBuf颤绕,并使用該ByteBuf來(lái)接收接下來(lái)要讀取的數(shù)據(jù)。第一次讀循環(huán)祟身,默認(rèn)的大小為1024奥务。
④ allocHandle.lastBytesRead(doReadBytes(byteBuf)) :首先,會(huì)先根據(jù)byteBuf可寫的字節(jié)數(shù)來(lái)設(shè)置AdaptiveRecvByteBufAllocator.HandleImpl的attemptedBytesRead屬性袜硫。正如前面對(duì)AdaptiveRecvByteBufAllocator類的介紹所說(shuō)的:“它會(huì)分配一個(gè)新的接收緩存氯葬,該緩存的容量會(huì)盡可能的足夠大以讀入入站數(shù)據(jù)并且該緩存的容量也盡可能的小以不會(huì)浪費(fèi)它的空間⊥裣荩”因此帚称,它認(rèn)為它分配的ByteBuf中可寫的字節(jié)數(shù),就應(yīng)該是本次嘗試讀取到的字節(jié)數(shù)秽澳。然后闯睹。使用上面構(gòu)造的ByteBuf來(lái)接收SocketChannel的數(shù)據(jù),并且限制了最大讀取的字節(jié)數(shù)為attemptedBytesRead担神,然后將真是讀取的字節(jié)數(shù)設(shè)置到AdaptiveRecvByteBufAllocator.HandleImpl中的lastBytesRead屬性上楼吃。
⑤ 在執(zhí)行完消息的讀取后,將執(zhí)行allocHandle.incMessagesRead(1)來(lái)增加已經(jīng)讀取消息的次數(shù)妄讯。底層就是將totalMessages值+1孩锡。
⑥ 在進(jìn)行下一次循環(huán)進(jìn)行消息的讀取前,會(huì)先執(zhí)行該判斷捞挥,判斷是否可以繼續(xù)的去讀取消息浮创。b) maybeMoreDataSupplier.get() :
當(dāng)本次讀操作讀取到的字節(jié)數(shù)與AdaptiveRecvByteBufAllocator推測(cè)出的ByteBuf容量大小不一樣時(shí)砌函,就會(huì)返回false斩披;否則返回true。當(dāng)然讹俊,如上面所說(shuō)垦沉,如果本次讀操作可讀取的字節(jié)大于了attemptedBytesRead的話,一次讀操作也只會(huì)先讀取attemptedBytesRead的字節(jié)數(shù)仍劈。在滿足allocHandle.continueReading()的條件下厕倍,可以在讀循環(huán)中進(jìn)行下一次的數(shù)據(jù)讀取。每次讀循環(huán)都會(huì)構(gòu)建一個(gè)新的ByteBuf贩疙。但是讹弯,請(qǐng)注意况既,一個(gè)讀循環(huán)(可以包含多次讀操作)中每次的讀操作構(gòu)建的ByteBuf大小都是一樣的。
c) totalMessages < maxMessagePerRead:根據(jù)上面的流程我們可以知道组民,maxMessagePerRead為16棒仍,totalMessages為讀循環(huán)已經(jīng)執(zhí)行的讀操作次數(shù)(即,循環(huán)次數(shù))臭胜。
d) totalBytesRead > 0:當(dāng)本次讀操作有讀取到字節(jié)數(shù)時(shí)莫其,或者以讀取到的字節(jié)數(shù)小于Integer.MAX_VALUE,那么該判斷都會(huì)大于0耸三,即乱陡,為true;否則為false仪壮。
⑦ 最后憨颠,通過(guò)allocHandle.readComplete()來(lái)標(biāo)識(shí)本次讀循環(huán)結(jié)束,并根據(jù)本次讀循環(huán)的數(shù)據(jù)信息來(lái)預(yù)測(cè)下一次讀事件觸發(fā)時(shí)睛驳,應(yīng)該分配多大的ByteBuf容量更加合理些烙心。具體的調(diào)整邏輯,在上面的HandleImpl.record(int actualReadBytes)已經(jīng)進(jìn)行了詳細(xì)的說(shuō)明乏沸。
總結(jié)淫茵,對(duì)于READ事件,一個(gè)讀循環(huán)可能會(huì)執(zhí)行多次讀操作(即蹬跃,循環(huán)的次數(shù))匙瘪,至于進(jìn)行幾次讀操作,這將根據(jù)「allocHandle.continueReading()」以及當(dāng)前這次讀取的字節(jié)數(shù)來(lái)決定蝶缀。若‘a(chǎn)llocHandle.continueReading()’為false丹喻,或者本次讀取到的字節(jié)數(shù)<=0(當(dāng)沒(méi)有數(shù)據(jù)可讀取時(shí)為0,當(dāng)遠(yuǎn)端已經(jīng)關(guān)閉時(shí)為-1)翁都,都不會(huì)繼續(xù)進(jìn)行讀循環(huán)操作碍论。再者,一個(gè)讀循環(huán)中的每次讀操作分配的ByteBuf的容量大小都是一樣的柄慰。我們是在一個(gè)讀循環(huán)操作完成后鳍悠,才會(huì)根據(jù)本次的讀循環(huán)的情況對(duì)下一次讀操作的ByteBuf容量大小做預(yù)測(cè)。
后記
若文章有任何錯(cuò)誤坐搔,望大家不吝指教:)
參考
圣思園《精通并發(fā)與Netty》