《聊一聊 Netty 數(shù)據(jù)搬運工 ByteBuf 體系的設(shè)計與實現(xiàn)(上)》
2.7 ByteBuf 的視圖設(shè)計
和 JDK 的設(shè)計一樣咖城,Netty 中的 ByteBuf 也可以通過 slice()
方法以及 duplicate()
方法創(chuàng)建一個視圖 ByteBuf 出來,原生 ByteBuf 和它的視圖 ByteBuf 底層都是共用同一片內(nèi)存區(qū)域呼奢,也就是說在視圖 ByteBuf 上做的任何改動都會反應(yīng)到原生 ByteBuf 上宜雀。同理,在原生 ByteBuf 上做的任何改動也會反應(yīng)到它的視圖 ByteBuf 上握础。我們可以將視圖 ByteBuf 看做是原生 ByteBuf 的一份淺拷貝辐董。
原生 ByteBuf 和它的視圖 ByteBuf 不同的是,它們都有各自獨立的 readerIndex禀综,writerIndex简烘,capacity,maxCapacity定枷。
slice()
方法是在原生 ByteBuf 的 [readerIndex , writerIndex)
這段內(nèi)存區(qū)域內(nèi)創(chuàng)建一個視圖 ByteBuf孤澎。也就是原生 ByteBuf 和視圖 ByteBuf 共用 [readerIndex , writerIndex)
這段內(nèi)存區(qū)域。視圖 ByteBuf 的數(shù)據(jù)區(qū)域其實就是原生 ByteBuf 的可讀字節(jié)區(qū)域欠窒。
視圖 ByteBuf 的 readerIndex = 0 覆旭, writerIndex = capacity = maxCapacity = 原生 ByteBuf 的 readableBytes()
。
@Override
public int readableBytes() {
// 原生 ByteBuf
return writerIndex - readerIndex;
}
下面我們來看一下 slice()
方法創(chuàng)建視圖 ByteBuf 的邏輯實現(xiàn):
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf slice() {
return slice(readerIndex, readableBytes());
}
@Override
public ByteBuf slice(int index, int length) {
// 確保 ByteBuf 的引用計數(shù)不為 0
ensureAccessible();
return new UnpooledSlicedByteBuf(this, index, length);
}
}
Netty 會將 slice 視圖 ByteBuf 封裝在 UnpooledSlicedByteBuf 類中,在這里會初始化 slice 視圖 ByteBuf 的 readerIndex型将,writerIndex寂祥,capacity,maxCapacity七兜。
class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
UnpooledSlicedByteBuf(AbstractByteBuf buffer, int index, int length) {
// index = readerIndex
// length = readableBytes()
super(buffer, index, length);
}
@Override
public int capacity() {
// 視圖 ByteBuf 的 capacity 和 maxCapacity 相等
// 均為原生 ByteBuf 的 readableBytes()
return maxCapacity();
}
}
如上圖所示丸凭,這里的 index 就是原生 ByteBuf 的 readerIndex = 4 ,index 用于表示視圖 ByteBuf 的內(nèi)存區(qū)域相對于原生 ByteBuf 的偏移惊搏,因為視圖 ByteBuf 與原生 ByteBuf 共用的是同一片內(nèi)存區(qū)域贮乳,針對視圖 ByteBuf 的操作其實底層最終是轉(zhuǎn)換為對原生 ByteBuf 的操作。
但由于視圖 ByteBuf 和原生 ByteBuf 各自都有獨立的 readerIndex 和 writerIndex恬惯,比如上圖中向拆,視圖 ByteBuf 中的 readerIndex = 0 其實指向的是原生 ByteBuf 中 readerIndex = 4 的位置。所以每次在我們對視圖 ByteBuf 進(jìn)行讀寫的時候都需要將視圖 ByteBuf 的 readerIndex 加上一個偏移(index)轉(zhuǎn)換成原生 ByteBuf 的 readerIndex酪耳,近而從原生 ByteBuf 中來讀寫數(shù)據(jù)浓恳。
@Override
protected byte _getByte(int index) {
// 底層其實是對原生 ByteBuf 的訪問
return unwrap()._getByte(idx(index));
}
@Override
protected void _setByte(int index, int value) {
unwrap()._setByte(idx(index), value);
}
/**
* Returns the index with the needed adjustment.
*/
final int idx(int index) {
// 轉(zhuǎn)換為原生 ByteBuf 的 readerIndex 或者 writerIndex
return index + adjustment;
}
idx(int index)
方法中的 adjustment 就是上面 UnpooledSlicedByteBuf 構(gòu)造函數(shù)中的 index 偏移,初始化為原生 ByteBuf 的 readerIndex碗暗。
length 則初始化為原生 ByteBuf 的 readableBytes()
颈将,視圖 ByteBuf 中的 writerIndex,capacity言疗,maxCapacity 都是用 length 來初始化晴圾。
abstract class AbstractUnpooledSlicedByteBuf extends AbstractDerivedByteBuf {
// 原生 ByteBuf
private final ByteBuf buffer;
// 視圖 ByteBuf 相對于原生 ByteBuf的數(shù)據(jù)區(qū)域偏移
private final int adjustment;
AbstractUnpooledSlicedByteBuf(ByteBuf buffer, int index, int length) {
// 設(shè)置視圖 ByteBuf 的 maxCapacity,readerIndex 為 0
super(length);
// 原生 ByteBuf
this.buffer = buffer;
// 數(shù)據(jù)偏移為原生 ByteBuf 的 readerIndex
adjustment = index;
// 設(shè)置視圖 ByteBuf 的 writerIndex
writerIndex(length);
}
}
但是通過 slice()
方法創(chuàng)建出來的視圖 ByteBuf 并不會改變原生 ByteBuf 的引用計數(shù)噪奄,這會存在一個問題死姚,就是由于視圖 ByteBuf 和原生 ByteBuf 底層共用的是同一片內(nèi)存區(qū)域,在原生 ByteBuf 或者視圖 ByteBuf 各自的應(yīng)用上下文中他們可能并不會意識到對方的存在勤篮。
如果對原生 ByteBuf 調(diào)用 release 方法都毒,恰好引用計數(shù)就為 0 了,接著就會釋放原生 ByteBuf 的 Native Memory 碰缔。此時再對視圖 ByteBuf 進(jìn)行訪問就有問題了账劲,因為 Native Memory 已經(jīng)被原生 ByteBuf 釋放了。同樣的道理金抡,對視圖 ByteBuf 調(diào)用 release 方法 瀑焦,也會對原生 ByteBuf 產(chǎn)生影響。
為此 Netty 提供了一個 retainedSlice()
方法梗肝,在創(chuàng)建 slice 視圖 ByteBuf 的同時對原生 ByteBuf 的引用計數(shù)加 1 蝠猬,兩者共用同一個引用計數(shù)。
@Override
public ByteBuf retainedSlice() {
// 原生 ByteBuf 的引用計數(shù)加 1
return slice().retain();
}
除了 slice()
之外统捶,Netty 也提供了 duplicate()
方法來創(chuàng)建視圖 ByteBuf 榆芦。
@Override
public ByteBuf duplicate() {
// 確保 ByteBuf 的引用計數(shù)不為 0
ensureAccessible();
return new UnpooledDuplicatedByteBuf(this);
}
但和 slice()
不同的是柄粹, duplicate()
是完全復(fù)刻了原生 ByteBuf,復(fù)刻出來的視圖 ByteBuf 雖然與原生 ByteBuf 都有各自獨立的 readerIndex匆绣,writerIndex驻右,capacity,maxCapacity崎淳。但他們的值都是相同的堪夭。duplicate 視圖 ByteBuf 也是和原生 ByteBuf 共用同一塊 Native Memory 。
public class DuplicatedByteBuf extends AbstractDerivedByteBuf {
// 原生 ByteBuf
private final ByteBuf buffer;
public DuplicatedByteBuf(ByteBuf buffer) {
this(buffer, buffer.readerIndex(), buffer.writerIndex());
}
DuplicatedByteBuf(ByteBuf buffer, int readerIndex, int writerIndex) {
// 初始化視圖 ByteBuf 的 maxCapacity 與原生的相同
super(buffer.maxCapacity());
// 原生 ByteBuf
this.buffer = buffer;
// 視圖 ByteBuf 的 readerIndex 拣凹, writerIndex 也與原生相同
setIndex(readerIndex, writerIndex);
markReaderIndex();
markWriterIndex();
}
@Override
public int capacity() {
// 視圖 ByteBuf 的 capacity 也與原生相同
return unwrap().capacity();
}
}
Netty 同樣也提供了對應(yīng)的 retainedDuplicate()
方法森爽,用于創(chuàng)建 duplicate 視圖 ByteBuf 的同時增加原生 ByteBuf 的引用計數(shù)。視圖 ByteBuf 與原生 ByteBuf 之間共用同一個引用計數(shù)嚣镜。
@Override
public ByteBuf retainedDuplicate() {
return duplicate().retain();
}
上面介紹的兩種視圖 ByteBuf 可以理解為是對原生 ByteBuf 的一層淺拷貝爬迟,Netty 也提供了 copy()
方法來實現(xiàn)對原生 ByteBuf 的深拷貝,copy 出來的 ByteBuf 是原生 ByteBuf 的一個副本菊匿,兩者底層依賴的 Native Memory 是不同的付呕,各自都有獨立的 readerIndex,writerIndex跌捆,capacity徽职,maxCapacity 。
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf copy() {
// 從原生 ByteBuf 中的 readerIndex 開始佩厚,拷貝 readableBytes 個字節(jié)到新的 ByteBuf 中
return copy(readerIndex, readableBytes());
}
}
copy()
方法是對原生 ByteBuf 的 [readerIndex , writerIndex)
這段數(shù)據(jù)范圍內(nèi)容進(jìn)行拷貝姆钉。copy 出來的 ByteBuf,它的 readerIndex = 0 抄瓦, writerIndex = capacity = 原生 ByteBuf 的 readableBytes()
潮瓶。maxCapacity 與原生 maxCapacity 相同。
public class UnpooledDirectByteBuf {
@Override
public ByteBuf copy(int index, int length) {
ensureAccessible();
ByteBuffer src;
try {
// 將原生 ByteBuf 中 [index , index + lengh) 這段范圍的數(shù)據(jù)拷貝到新的 ByteBuf 中
src = (ByteBuffer) buffer.duplicate().clear().position(index).limit(index + length);
} catch (IllegalArgumentException ignored) {
throw new IndexOutOfBoundsException("Too many bytes to read - Need " + (index + length));
}
// 首先新申請一段 native memory , 新的 ByteBuf 初始容量為 length (真實容量)闺鲸,最大容量與原生 ByteBuf 的 maxCapacity 相等
// readerIndex = 0 , writerIndex = length
return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
}
}
2.8 CompositeByteBuf 的零拷貝設(shè)計
這里的零拷貝并不是我們經(jīng)常提到的那種 OS 層面上的零拷貝筋讨,而是 Netty 在用戶態(tài)層面自己實現(xiàn)的避免內(nèi)存拷貝的設(shè)計埃叭。比如在傳統(tǒng)意義上摸恍,如果我們想要將多個獨立的 ByteBuf 聚合成一個 ByteBuf 的時候,我們首先需要向 OS 申請一段更大的內(nèi)存赤屋,然后依次將多個 ByteBuf 中的內(nèi)容拷貝到這段新申請的內(nèi)存上立镶,最后在釋放這些 ByteBuf 的內(nèi)存。
這樣一來就涉及到兩個性能開銷點类早,一個是我們需要向 OS 重新申請更大的內(nèi)存媚媒,另一個是內(nèi)存的拷貝。Netty 引入 CompositeByteBuf 的目的就是為了解決這兩個問題涩僻。巧妙地利用原有 ByteBuf 所占的內(nèi)存缭召,在此基礎(chǔ)之上栈顷,將它們組合成一個邏輯意義上的 CompositeByteBuf ,提供一個統(tǒng)一的邏輯視圖嵌巷。
CompositeByteBuf 其實也是一種視圖 ByteBuf 萄凤,這一點和上小節(jié)中我們介紹的
SlicedByteBuf , DuplicatedByteBuf 一樣搪哪,它們本身并不會占用 Native Memory靡努,底層數(shù)據(jù)的存儲全部依賴于原生的 ByteBuf。
不同點在于晓折,SlicedByteBuf惑朦,DuplicatedByteBuf 它們是在單一的原生 ByteBuf 基礎(chǔ)之上創(chuàng)建出的視圖 ByteBuf。而 CompositeByteBuf 是基于多個原生 ByteBuf 創(chuàng)建出的統(tǒng)一邏輯視圖 ByteBuf漓概。
CompositeByteBuf 對于我們用戶來說和其他的普通 ByteBuf 沒有任何區(qū)別漾月,有自己獨立的 readerIndex,writerIndex垛耳,capacity栅屏,maxCapacity,前面幾個小節(jié)中介紹的各種 ByteBuf 的設(shè)計要素堂鲜,在 CompositeByteBuf 身上也都會體現(xiàn)栈雳。
但從實現(xiàn)的角度來說,CompositeByteBuf 只是一個邏輯上的 ByteBuf缔莲,其本身并不會占用任何的 Native Memory 哥纫,對于 CompositeByteBuf 的任何操作,最終都需要轉(zhuǎn)換到其內(nèi)部具體的 ByteBuf 上痴奏。本小節(jié)我們就來深入到 CompositeByteBuf 的內(nèi)部蛀骇,來看一下 Netty 的巧妙設(shè)計。
2.8.1 CompositeByteBuf 的總體架構(gòu)
從總體設(shè)計上來講读拆,CompositeByteBuf 包含如下五個重要屬性栓袖,其中最為核心的就是 components 數(shù)組,那些需要被聚合的原生 ByteBuf 會被 Netty 封裝在 Component 類中主儡,并統(tǒng)一組織在 components 數(shù)組中逗威。后續(xù)針對 CompositeByteBuf 的所有操作都需要和這個數(shù)組打交道。
public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
// 內(nèi)部 ByteBuf 的分配器辟灰,用于后續(xù)擴(kuò)容个榕,copy , 合并等操作
private final ByteBufAllocator alloc;
// compositeDirectBuffer 還是 compositeHeapBuffer ?
private final boolean direct;
// 最大的 components 數(shù)組容量(16)
private final int maxNumComponents;
// 當(dāng)前 CompositeByteBuf 中包含的 components 個數(shù)
private int componentCount;
// 存儲 component 的數(shù)組
private Component[] components; // resized when needed
}
maxNumComponents 表示 components 數(shù)組最大的容量,CompositeByteBuf 默認(rèn)能夠包含 Component 的最大個數(shù)為 16芥喇,如果超過這個數(shù)量的話西采,Netty 會將當(dāng)前 CompositeByteBuf 中包含的所有 Components 重新合并成一個更大的 Component。
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
static final int DEFAULT_MAX_COMPONENTS = 16;
}
componentCount 表示當(dāng)前 CompositeByteBuf 中包含的 Component 個數(shù)继控。每當(dāng)我們通過 addComponent
方法向 CompositeByteBuf 添加一個新的 ByteBuf 時械馆,Netty 都會用一個新的 Component 實例來包裝這個 ByteBuf胖眷,然后存放在 components 數(shù)組中,最后 componentCount 的個數(shù)加 1 霹崎。
CompositeByteBuf 與其底層聚合的真實 ByteBuf 架構(gòu)設(shè)計關(guān)系瘦材,如下圖所示:
而創(chuàng)建一個 CompositeByteBuf 的核心其實就是創(chuàng)建底層的 components 數(shù)組,后續(xù)添加到該 CompositeByteBuf 的所有原生 ByteBuf 都會被組織在這里仿畸。
private CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents, int initSize) {
// 設(shè)置 maxCapacity
super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
this.alloc = ObjectUtil.checkNotNull(alloc, "alloc");
this.direct = direct;
this.maxNumComponents = maxNumComponents;
// 初始 Component 數(shù)組的容量為 maxNumComponents
components = newCompArray(initSize, maxNumComponents);
}
這里的參數(shù) initSize
表示的并不是 CompositeByteBuf 所包含的字節(jié)數(shù)食棕,而是初始包裝的原生 ByteBuf 個數(shù),也就是初始 Component 的個數(shù)错沽。components 數(shù)組的總體大小由參數(shù) maxNumComponents 決定簿晓,但不能超過 16 。
private static Component[] newCompArray(int initComponents, int maxNumComponents) {
// MAX_COMPONENT
int capacityGuess = Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents);
// 初始 Component 數(shù)組的容量為 maxNumComponents
return new Component[Math.max(initComponents, capacityGuess)];
}
現(xiàn)在我們只是清楚了 CompositeByteBuf 的一個基本骨架千埃,那么接下來 Netty 如何根據(jù)這個基本的骨架將多個原生 ByteBuf 組裝成一個邏輯上的統(tǒng)一視圖 ByteBuf 呢 憔儿?
也就是說我們依據(jù) CompositeByteBuf 中的 readerIndex 以及 writerIndex 進(jìn)行的讀寫操作邏輯如何轉(zhuǎn)換到對應(yīng)的底層原生 ByteBuf 之上呢 ? 這個是整個設(shè)計的核心所在放可。
下面筆者就帶著大家從外到內(nèi)谒臼,從易到難地一一拆解 CompositeByteBuf 中的那些核心設(shè)計要素。從 CompositeByteBuf 的最外層來看耀里,其實我們并不陌生蜈缤,對于用戶來說它就是一個普通的 ByteBuf,擁有自己獨立的 readerIndex 冯挎,writerIndex 底哥。
但 CompositeByteBuf 中那些邏輯上看起來連續(xù)的字節(jié),背后其實存儲在不同的原生 ByteBuf 中房官。不同 ByteBuf 的內(nèi)存之間其實是不連續(xù)的趾徽。
那么現(xiàn)在問題的關(guān)鍵就是我們?nèi)绾闻袛?CompositeByteBuf 中的某一段邏輯數(shù)據(jù)背后對應(yīng)的究竟是哪一個真實的 ByteBuf,如果我們能夠通過 CompositeByteBuf 的相關(guān) Index , 找到這個 Index 背后對應(yīng)的 ByteBuf翰守,近而可以找到 ByteBuf 的 Index 孵奶,這樣是不是就可以將 CompositeByteBuf 的邏輯操作轉(zhuǎn)換成對真實內(nèi)存的讀寫操作了。
CompositeByteBuf 到原生 ByteBuf 的轉(zhuǎn)換關(guān)系蜡峰,Netty 封裝在 Component 類中了袁,每一個被包裝在 CompositeByteBuf 中的原生 ByteBuf 都對應(yīng)一個 Component 實例。它們會按照順序統(tǒng)一組織在 components 數(shù)組中事示。
private static final class Component {
// 原生 ByteBuf
final ByteBuf srcBuf;
// CompositeByteBuf 的 index 加上 srcAdjustment 就得到了srcBuf 的相關(guān) index
int srcAdjustment;
// srcBuf 可能是一個被包裝過的 ByteBuf早像,比如 SlicedByteBuf 僻肖, DuplicatedByteBuf
// 被 srcBuf 包裝的最底層的 ByteBuf 就存放在 buf 字段中
final ByteBuf buf;
// CompositeByteBuf 的 index 加上 adjustment 就得到了 buf 的相關(guān) index
int adjustment;
// 該 Component 在 CompositeByteBuf 視角中表示的數(shù)據(jù)范圍 [offset , endOffset)
int offset;
int endOffset;
}
一個 Component 在 CompositeByteBuf 的視角中所能表示的數(shù)據(jù)邏輯范圍是 [offset , endOffset)
肖爵。
比如上圖中第一個綠色的 ByteBuf , 它里邊存儲的數(shù)據(jù)組成了 CompositeByteBuf 中 [0 , 4)
這段邏輯數(shù)據(jù)范圍。第二個黃色的 ByteBuf臀脏,它里邊存儲的數(shù)據(jù)組成了 CompositeByteBuf 中 [4 , 8)
這段邏輯數(shù)據(jù)范圍劝堪。第三個藍(lán)色的 ByteBuf冀自,它里邊存儲的數(shù)據(jù)組成了 CompositeByteBuf 中 [8 , 12)
這段邏輯數(shù)據(jù)范圍。 上一個 Component 的 endOffset 恰好是下一個 Component 的 offset 秒啦。
而這些真實存儲數(shù)據(jù)的 ByteBuf 則存儲在對應(yīng) Component 中的 srcBuf 字段中熬粗,當(dāng)我們通過 CompositeByteBuf 的 readerIndex 或者 writerIndex 進(jìn)行讀寫操作的時候,首先需要確定相關(guān) index 所對應(yīng)的 srcBuf余境,然后將 CompositeByteBuf 的 index 轉(zhuǎn)換為 srcBuf 的 srcIndex驻呐,近而通過 srcIndex 對 srcBuf 進(jìn)行讀寫。
這個 index 的轉(zhuǎn)換就是通過 srcAdjustment 來進(jìn)行的芳来,比如含末,當(dāng)前 CompositeByteBuf 的 readerIndex 為 5 ,它對應(yīng)的是第二個黃色的 ByteBuf即舌。而 ByteBuf 的 readerIndex 卻是 1 佣盒。
所以第二個 Component 的 srcAdjustment 就是 -4 , 這樣我們讀取 CompositeByteBuf 的時候顽聂,首先將它的 readerIndex 加上 srcAdjustment 就得到了 ByteBuf 的 readerIndex 肥惭,后面就是普通的 ByteBuf 讀取操作了。
在比如說紊搪,我們要對 CompositeByteBuf 進(jìn)行寫操作蜜葱,當(dāng)前的 writerIndex 為 10 ,對應(yīng)的是第三個藍(lán)色的 ByteBuf耀石,它的 writerIndex 為 2 笼沥。
所以第三個 Component 的 srcAdjustment 就是 -8 ,CompositeByteBuf 的 writerIndex 加上 srcAdjustment 就得到了 ByteBuf 的 writerIndex娶牌,后續(xù)就是普通的 ByteBuf 寫入操作奔浅。
int srcIdx(int index) {
// CompositeByteBuf 相關(guān)的 index 轉(zhuǎn)換成 srcBuf 的相關(guān) index
return index + srcAdjustment;
}
除了 srcBuf 之外,Component 實例中還有一個 buf 字段诗良,這里大家可能會比較好奇汹桦,為什么設(shè)計了兩個 ByteBuf 字段呢 ?Component 實例與 ByteBuf 不是一對一的關(guān)系嗎 鉴裹?
srcBuf 是指我們通過 addComponent
方法添加到 CompositeByteBuf 中的原始 ByteBuf舞骆。而這個 srcBuf 可能是一個視圖 ByteBuf,比如上一小節(jié)中介紹到的 SlicedByteBuf 和 DuplicatedByteBuf径荔。srcBuf 還可能是一個被包裝過的 ByteBuf督禽,比如 WrappedByteBuf , SwappedByteBuf。
假如 srcBuf 是一個 SlicedByteBuf 的話总处,我們需要將它的原生 ByteBuf 拆解出來并保存在 Component 實例的 buf 字段中狈惫。事實上 Component 中的 buf 才是真正存儲數(shù)據(jù)的地方。
abstract class AbstractUnpooledSlicedByteBuf {
// 原生 ByteBuf
private final ByteBuf buffer;
}
與 buf 對應(yīng)的就是 adjustment 鹦马, 它用于將 CompositeByteBuf 的相關(guān) index 轉(zhuǎn)換成 buf 相關(guān)的 index 胧谈,假如我們在向一個 CompositeByteBuf 執(zhí)行 read 操作忆肾,它的當(dāng)前 readerIndex 是 5,而 buf 的 readerIndex 是 6 菱肖。
所以在讀取操作之前客冈,我們需要將 CompositeByteBuf 的 readerIndex 加上 adjustment 得到 buf 的 readerIndex,近而將讀取操作轉(zhuǎn)移到 buf 中稳强。其實就和上小節(jié)中介紹的視圖 ByteBuf 是一模一樣的场仲,在讀寫之前都需要修正相關(guān)的 index 。
@Override
public byte getByte(int index) {
// 通過 CompositeByteBuf 的 index , 找到數(shù)據(jù)所屬的 component
Component c = findComponent(index);
// 首先通過 idx 轉(zhuǎn)換為 buf 相關(guān)的 index
// 將對 CompositeByteBuf 的讀寫操作轉(zhuǎn)換為 buf 的讀寫操作
return c.buf.getByte(c.idx(index));
}
int idx(int index) {
// 將 CompositeByteBuf 的相關(guān) index 轉(zhuǎn)換為 buf 的相關(guān) index
return index + adjustment;
}
那么我們?nèi)绾胃鶕?jù)指定的 CompositeByteBuf 的 index 來查找其對應(yīng)的底層數(shù)據(jù)究竟存儲在哪個 Component 中呢 退疫?
核心思想其實很簡單燎窘,因為每個 Component 都會描述自己表示 CompositeByteBuf 中的哪一段數(shù)據(jù)范圍 —— [offset , endOffset)
。所有的 Components 都被有序的組織在 components 數(shù)組中蹄咖。我們可以通過二分查找的方法來尋找這個 index 到底是落在了哪個 Component 表示的范圍中褐健。
這個查找的過程是在 findComponent
方法中實現(xiàn)的,Netty 會將最近一次訪問到的 Component 緩存在 CompositeByteBuf 的 lastAccessed 字段中澜汤,每次進(jìn)行查找的時候首先會判斷 index 是否落在了 lastAccessed 所表示的數(shù)據(jù)范圍內(nèi) —— [ la.offset , la.endOffset)
蚜迅。
如果 index 恰好被緩存的 Component(lastAccessed)所包含,那么就直接返回 lastAccessed 俊抵。
// 緩存最近一次查找到的 Component
private Component lastAccessed;
private Component findComponent(int offset) {
Component la = lastAccessed;
// 首先查找 offset 是否恰好落在 lastAccessed 的區(qū)間中
if (la != null && offset >= la.offset && offset < la.endOffset) {
return la;
}
// 在所有 Components 中進(jìn)行二分查找
return findIt(offset);
}
如果 index 不巧沒有命中緩存谁不,那么就在整個 components 數(shù)組中進(jìn)行二分查找 :
private Component findIt(int offset) {
for (int low = 0, high = componentCount; low <= high;) {
int mid = low + high >>> 1;
Component c = components[mid];
if (offset >= c.endOffset) {
low = mid + 1;
} else if (offset < c.offset) {
high = mid - 1;
} else {
lastAccessed = c;
return c;
}
}
throw new Error("should not reach here");
}
2.8.2 CompositeByteBuf 的創(chuàng)建
好了,現(xiàn)在我們已經(jīng)熟悉了 CompositeByteBuf 的總體架構(gòu)徽诲,那么接下來我們就來看一下 Netty 是如何將多個 ByteBuf 邏輯聚合成一個 CompositeByteBuf 的刹帕。
public final class Unpooled {
public static ByteBuf wrappedBuffer(ByteBuf... buffers) {
return wrappedBuffer(buffers.length, buffers);
}
}
CompositeByteBuf 的初始 maxNumComponents 為 buffers 數(shù)組的長度,如果我們只是傳入一個 ByteBuf 的話谎替,那么就無需創(chuàng)建 CompositeByteBuf偷溺,而是直接返回該 ByteBuf 的 slice 視圖。
如果我們傳入的是多個 ByteBuf 的話钱贯,則將這多個 ByteBuf 包裝成 CompositeByteBuf 返回挫掏。
public final class Unpooled {
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
switch (buffers.length) {
case 0:
break;
case 1:
ByteBuf buffer = buffers[0];
if (buffer.isReadable()) {
// 直接返回 buffer.slice() 視圖
return wrappedBuffer(buffer.order(BIG_ENDIAN));
} else {
buffer.release();
}
break;
default:
for (int i = 0; i < buffers.length; i++) {
ByteBuf buf = buffers[i];
if (buf.isReadable()) {
// 從第一個可讀的 ByteBuf —— buffers[i] 開始創(chuàng)建 CompositeByteBuf
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i);
}
// buf 不可讀則 release
buf.release();
}
break;
}
return EMPTY_BUFFER;
}
}
在進(jìn)入 CompositeByteBuf 的創(chuàng)建流程之后,首先是創(chuàng)建出一個空的 CompositeByteBuf秩命,也就是先把 CompositeByteBuf 的骨架搭建起來尉共,這時它的 initSize 為 buffers.length - offset
。
注意 initSize 表示的并不是 CompositeByteBuf 初始包含的字節(jié)個數(shù)弃锐,而是表示初始 Component 的個數(shù)袄友。offset 則表示從 buffers 數(shù)組中的哪一個索引開始創(chuàng)建 CompositeByteBuf,就是上面 CompositeByteBuf 構(gòu)造函數(shù)中最后一個參數(shù) i 霹菊。
隨后通過 addComponents0
方法為 buffers 數(shù)組中的每一個 ByteBuf 創(chuàng)建初始化 Component 實例剧蚣,并將他們有序的添加到 CompositeByteBuf 的 components 數(shù)組中。
但這時 Component 實例的個數(shù)可能已經(jīng)超過 maxNumComponents 限制的個數(shù),那么接下來就會在 consolidateIfNeeded()
方法中將當(dāng)前 CompositeByteBuf 中的所有 Components 合并成一個更大的 Component券敌。CompositeByteBuf 中的 components 數(shù)組長度是不可以超過 maxNumComponents 限制的,如果超過就需要在這里合并柳洋。
最后設(shè)置當(dāng)前 CompositeByteBuf 的 readerIndex 和 writerIndex待诅,在初始狀態(tài)下 CompositeByteBuf 的 readerIndex 會被設(shè)置為 0 ,writerIndex 會被設(shè)置為最后一個 Component 的 endOffset 熊镣。
CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents,
ByteBuf[] buffers, int offset) {
// 先初始化一個空的 CompositeByteBuf
// initSize 為 buffers.length - offset
this(alloc, direct, maxNumComponents, buffers.length - offset);
// 為所有的 buffers 創(chuàng)建 Component 實例卑雁,并添加到 components 數(shù)組中
addComponents0(false, 0, buffers, offset);
// 如果當(dāng)前 component 的個數(shù)已經(jīng)超過了 maxNumComponents,則將所有 component 合并成一個
consolidateIfNeeded();
// 設(shè)置 CompositeByteBuf 的 readerIndex = 0
// writerIndex 為最后一個 component 的 endOffset
setIndex0(0, capacity());
}
2.8.3 shiftComps 為新的 ByteBuf 騰挪空間
在整個 CompositeByteBuf 的構(gòu)造過程中绪囱,最核心也是最復(fù)雜的步驟其實就是 addComponents0
方法测蹲,將多個 ByteBuf 有序的添加到 CompositeByteBuf 的 components 數(shù)組中看似簡單,其實還有很多種復(fù)雜的情況需要考慮鬼吵。
復(fù)雜之處在于這些 ByteBuf 需要插在 components 數(shù)組的哪個位置上 扣甲? 比較簡單直觀的情況是我們直接在 components 數(shù)組的末尾插入,也就是說要插入的位置索引 cIndex 等于 componentCount齿椅。這里分為兩種情況:
cIndex = componentCount = 0
琉挖,這種情況表示我們在向一個空的 CompositeByteBuf 插入 ByteBufs , 很簡單,直接插入即可涣脚。cIndex = componentCount > 0
示辈, 這種情況表示我們再向一個非空的 CompositeByteBuf 插入 ByteBufs,正如上圖所示遣蚀。同樣也很簡單矾麻,直接在 componentCount 的位置處插入即可。
稍微復(fù)雜一點的情況是我們在 components 數(shù)組的中間位置進(jìn)行插入而不是在末尾芭梯,也就是 cIndex < componentCount
的情況险耀。如下如圖所示,假設(shè)我們現(xiàn)在需要在 cIndex = 3
的位置處插入兩個 ByteBuf 進(jìn)來玖喘,但現(xiàn)在 components[3] 以及 components[4] 的位置已經(jīng)被占用了胰耗。所以我們需要將這兩個位置上的原有 component 向后移動兩個位置,將 components[3] 和 components[4] 的位置騰出來芒涡。
// i = 3 , count = 2 , size = 5
System.arraycopy(components, i, components, i + count, size - i);
在復(fù)雜一點的情況就是 components 數(shù)組需要擴(kuò)容柴灯,當(dāng)一個 CompositeByteBuf 剛剛被初始化出來的時候,它的 components 數(shù)組長度等于 maxNumComponents费尽。
如果當(dāng)前 components 數(shù)組中包含的 component 個數(shù) —— componentCount 加上本次需要添加的 ByteBuf 個數(shù) —— count 已經(jīng)超過了 maxNumComponents 的時候赠群,就需要對 components 數(shù)組進(jìn)行擴(kuò)容。
// 初始為 0旱幼,當(dāng)前 CompositeByteBuf 中包含的 component 個數(shù)
final int size = componentCount,
// 本次 addComponents0 操作之后查描,新的 component 個數(shù)
newSize = size + count;
// newSize 超過了 maxNumComponents 則對 components 數(shù)組進(jìn)行擴(kuò)容
if (newSize > components.length) {
....... 擴(kuò)容 ....
// 擴(kuò)容后的新數(shù)組
components = newArr;
}
擴(kuò)容之后的 components 數(shù)組長度是在 newSize 與原來長度的 3 / 2
之間取一個最大值。
int newArrSize = Math.max(size + (size >> 1), newSize);
如果我們原來恰好是希望在 components 數(shù)組的末尾插入,也就是 cIndex = componentCount
的情況冬三,那么就需要通過 Arrays.copyOf
首先申請一段長度為 newArrSize 的數(shù)組匀油,然后將原來的 components 數(shù)組中的內(nèi)容原樣拷貝過去。
newArr = Arrays.copyOf(components, newArrSize, Component[].class);
這樣新的 components 數(shù)組就有位置可以容納本次需要加入的 ByteBuf 了勾笆。
如果我們希望在原來 components 數(shù)組的中間插入敌蚜,也就是 cIndex < componentCount
的情況,如下圖所示:
這種情況在擴(kuò)容的時候就不能原樣拷貝原 components 數(shù)組了窝爪,而是首先通過 System.arraycopy
將 [0 , cIndex)
這段范圍的內(nèi)容拷貝過去弛车,在將 [cIndex , componentCount)
這段范圍的內(nèi)容拷貝到新數(shù)組的 cIndex + count
位置處。
這樣一來蒲每,就在新 components 數(shù)組的 cIndex 索引處纷跛,空出了兩個位置出來用來添加本次這兩個 ByteBuf。最后更新 componentCount 的值邀杏。以上騰挪空間的邏輯封裝在 shiftComps 方法中:
private void shiftComps(int i, int count) {
// 初始為 0贫奠,當(dāng)前 CompositeByteBuf 中包含的 component 個數(shù)
final int size = componentCount,
// 本次 addComponents0 操作之后,新的 component 個數(shù)
newSize = size + count;
// newSize 超過了 max components(16) 則對 components 數(shù)組進(jìn)行擴(kuò)容
if (newSize > components.length) {
// grow the array望蜡,擴(kuò)容到原來的 3 / 2
int newArrSize = Math.max(size + (size >> 1), newSize);
Component[] newArr;
if (i == size) {
// 在 Component[] 數(shù)組的末尾進(jìn)行插入
// 初始狀態(tài) i = size = 0
// size - 1 是 Component[] 數(shù)組的最后一個元素叮阅,指定的 i 恰好越界
// 原來 Component[] 數(shù)組中的內(nèi)容全部拷貝到 newArr 中
newArr = Arrays.copyOf(components, newArrSize, Component[].class);
} else {
// 在 Component[] 數(shù)組的中間進(jìn)行插入
newArr = new Component[newArrSize];
if (i > 0) {
// [0 , i) 之間的內(nèi)容拷貝到 newArr 中
System.arraycopy(components, 0, newArr, 0, i);
}
if (i < size) {
// 將剩下的 [i , size) 內(nèi)容從 newArr 的 i + count 位置處開始拷貝。
// 因為需要將原來的 [ i , i+count ) 這些位置讓出來泣特,添加本次新的 components浩姥,
System.arraycopy(components, i, newArr, i + count, size - i);
}
}
// 擴(kuò)容后的新數(shù)組
components = newArr;
} else if (i < size) {
// i < size 本次操作要覆蓋原來的 [ i , i+count ) 之間的位置,所以這里需要將原來位置上的 component 向后移動
System.arraycopy(components, i, components, i + count, size - i);
}
// 更新 componentCount
componentCount = newSize;
}
2.8.4 Component 如何封裝 ByteBuf
經(jīng)過上一小節(jié) shiftComps 方法的輾轉(zhuǎn)騰挪之后状您,現(xiàn)在 CompositeByteBuf 中的 components 數(shù)組終于有位置可以容納本次需要添加的 ByteBuf 了勒叠。接下來就需要為每一個 ByteBuf 創(chuàng)建初始化一個 Component 實例,最后將這些 Component 實例放到 components 數(shù)組對應(yīng)的位置上膏孟。
private static final class Component {
// 原生 ByteBuf
final ByteBuf srcBuf;
// CompositeByteBuf 的 index 加上 srcAdjustment 就得到了srcBuf 的相關(guān) index
int srcAdjustment;
// srcBuf 可能是一個被包裝過的 ByteBuf眯分,比如 SlicedByteBuf , DuplicatedByteBuf
// 被 srcBuf 包裝的最底層的 ByteBuf 就存放在 buf 字段中
final ByteBuf buf;
// CompositeByteBuf 的 index 加上 adjustment 就得到了 buf 的相關(guān) index
int adjustment;
// 該 Component 在 CompositeByteBuf 視角中表示的數(shù)據(jù)范圍 [offset , endOffset)
int offset;
int endOffset;
}
我們首先需要初始化 Component 實例的 offset 柒桑, endOffset 屬性弊决,前面我們已經(jīng)介紹了,一個 Component 在 CompositeByteBuf 的視角中所能表示的數(shù)據(jù)邏輯范圍是 [offset , endOffset)
魁淳。在 components 數(shù)組中飘诗,一般前一個 Component 的 endOffset 往往是后一個 Component 的 offset。
如果我們期望從 components 數(shù)組的第一個位置處開始插入(cIndex = 0)界逛,那么第一個 Component 的 offset 自然是 0 昆稿。
如果 cIndex > 0 , 那么我們就需要找到它上一個 Component —— components[cIndex - 1] , 上一個 Component 的 endOffset 恰好就是當(dāng)前 Component 的 offset息拜。
然后通過 newComponent
方法利用 ByteBuf 相關(guān)屬性以及 offset 來初始化 Component 實例溉潭。隨后將創(chuàng)建出來的 Component 實例放置在對應(yīng)的位置上 —— components[cIndex] 净响。
// 獲取當(dāng)前正在插入 Component 的 offset
int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (ci = cIndex; arrOffset < len; arrOffset++, ci++) {
// 待插入 ByteBuf
ByteBuf b = buffers[arrOffset];
if (b == null) {
break;
}
// 將 ByteBuf 封裝在 Component 中
Component c = newComponent(ensureAccessible(b), nextOffset);
components[ci] = c;
// 下一個 Component 的 Offset 是上一個 Component 的 endOffset
nextOffset = c.endOffset;
}
假設(shè)現(xiàn)在有一個空的 CompositeByteBuf,我們需要將一個數(shù)據(jù)范圍為 [1 , 4]
, readerIndex = 1 的 srcBuf 喳瓣, 插入到 CompositeByteBuf 的 components 數(shù)組中馋贤。
但是如果該 srcBuf 是一個視圖 ByteBuf 的話,比如:SlicedByteBuf 畏陕, DuplicatedByteBuf配乓。或者是一個被包裝過的 ByteBuf 蹭秋,比如:WrappedByteBuf 扰付, SwappedByteBuf堤撵。
那么我們就需要對 srcBuf 不斷的執(zhí)行 unwrap()
, 將其最底層的原生 ByteBuf 提取出來仁讨,如上圖所示,原生 buf 的數(shù)據(jù)范圍為 [4 , 7]
, srcBuf 與 buf 之間相關(guān) index 的偏移 adjustment 等于 3 , 原生 buf 的 readerIndex = 4 实昨。
最后我們會根據(jù) srcBuf 洞豁, srcIndex(srcBuf 的 readerIndex),原生 buf 荒给,unwrappedIndex(buf 的 readerIndex)丈挟,offset , len (srcBuf 中的可讀字節(jié)數(shù))來初始化 Component 實例志电。
private Component newComponent(final ByteBuf buf, final int offset) {
// srcBuf 的 readerIndex = 1
final int srcIndex = buf.readerIndex();
// srcBuf 中的可讀字節(jié)數(shù) = 4
final int len = buf.readableBytes();
// srcBuf 可能是一個被包裝過的 ByteBuf曙咽,比如 SlicedByteBuf,DuplicatedByteBuf
// 獲取 srcBuf 底層的原生 ByteBuf
ByteBuf unwrapped = buf;
// 原生 ByteBuf 的 readerIndex
int unwrappedIndex = srcIndex;
while (unwrapped instanceof WrappedByteBuf || unwrapped instanceof SwappedByteBuf) {
unwrapped = unwrapped.unwrap();
}
// unwrap if already sliced
if (unwrapped instanceof AbstractUnpooledSlicedByteBuf) {
// 獲取視圖 ByteBuf 相對于 原生 ByteBuf 的相關(guān) index 偏移
// adjustment = 3
// unwrappedIndex = srcIndex + adjustment = 4
unwrappedIndex += ((AbstractUnpooledSlicedByteBuf) unwrapped).idx(0);
// 獲取原生 ByteBuf
unwrapped = unwrapped.unwrap();
} else if (unwrapped instanceof PooledSlicedByteBuf) {
unwrappedIndex += ((PooledSlicedByteBuf) unwrapped).adjustment;
unwrapped = unwrapped.unwrap();
} else if (unwrapped instanceof DuplicatedByteBuf || unwrapped instanceof PooledDuplicatedByteBuf) {
unwrapped = unwrapped.unwrap();
}
return new Component(buf.order(ByteOrder.BIG_ENDIAN), srcIndex,
unwrapped.order(ByteOrder.BIG_ENDIAN), unwrappedIndex, offset, len, slice);
}
由于當(dāng)前的 CompositeByteBuf 還是空的挑辆,里面沒有包含任何邏輯數(shù)據(jù)例朱,當(dāng)長度為 4 的 srcBuf 加入之后,CompositeByteBuf 就產(chǎn)生了 [0 , 3]
這段邏輯數(shù)據(jù)范圍鱼蝉,所以 srcBuf 所屬 Component 的 offset = 0 , endOffset = 4 洒嗤,srcAdjustment = 1 ,adjustment = 4魁亦。
Component(ByteBuf srcBuf, int srcOffset, ByteBuf buf, int bufOffset,
int offset, int len, ByteBuf slice) {
this.srcBuf = srcBuf;
// 用于將 CompositeByteBuf 的 index 轉(zhuǎn)換為 srcBuf 的index
// 1 - 0 = 1
this.srcAdjustment = srcOffset - offset;
this.buf = buf;
// 用于將 CompositeByteBuf 的 index 轉(zhuǎn)換為 buf 的index
// 4 - 0 = 4
this.adjustment = bufOffset - offset;
// CompositeByteBuf [offset , endOffset) 這段范圍的字節(jié)存儲在該 Component 中
// 0
this.offset = offset;
// 下一個 Component 的 offset
// 4
this.endOffset = offset + len;
}
當(dāng)我們繼續(xù)初始化下一個 Component 的時候渔隶,它的 Offset 其實就是這個 Component 的 endOffset 。后面的流程都是一樣的了洁奈。
2.8.5 addComponents0
在我們清楚了以上背景知識之后间唉,在看 addComponents0 方法的邏輯就很清晰了:
private CompositeByteBuf addComponents0(boolean increaseWriterIndex,
final int cIndex, ByteBuf[] buffers, int arrOffset) {
// buffers 數(shù)組長度
final int len = buffers.length,
// 本次批量添加的 ByteBuf 個數(shù)
count = len - arrOffset;
// ci 表示從 components 數(shù)組的哪個索引位置處開始添加
// 這里先給一個初始值,后續(xù) shiftComps 完成之后還會重新設(shè)置
int ci = Integer.MAX_VALUE;
try {
// cIndex >= 0 && cIndex <= componentCount
checkComponentIndex(cIndex);
// 為新添加進(jìn)來的 ByteBuf 騰挪位置利术,以及增加 componentCount 計數(shù)
shiftComps(cIndex, count); // will increase componentCount
// 獲取當(dāng)前正在插入 Component 的 offset
int nextOffset = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (ci = cIndex; arrOffset < len; arrOffset++, ci++) {
ByteBuf b = buffers[arrOffset];
if (b == null) {
break;
}
// 將 ByteBuf 封裝在 Component 中
Component c = newComponent(ensureAccessible(b), nextOffset);
components[ci] = c;
// 下一個 Component 的 Offset 是上一個 Component 的 endOffset
nextOffset = c.endOffset;
}
return this;
} finally {
// ci is now the index following the last successfully added component
// ci = componentCount 說明是一直按照順序向后追加 component
// ci < componentCount 表示在 components 數(shù)組的中間插入新的 component
if (ci < componentCount) {
// 如果上面 for 循環(huán)完整的走完终吼,ci = cIndex + count
if (ci < cIndex + count) {
// 上面 for 循環(huán)中有 break 的情況出現(xiàn)或者有異常發(fā)生
// ci < componentCount ,在上面的 shiftComps 中將會涉及到 component 移動氯哮,因為要騰出位置
// 如果發(fā)生異常际跪,則將后面沒有加入 components 數(shù)組的 component 位置刪除掉
// [ci, cIndex + count) 這段位置要刪除商佛,因為在 ci-1 處已經(jīng)發(fā)生異常,重新調(diào)整 components 數(shù)組
removeCompRange(ci, cIndex + count);
for (; arrOffset < len; ++arrOffset) {
ReferenceCountUtil.safeRelease(buffers[arrOffset]);
}
}
// (在中間插入的情況下)需要調(diào)整 ci 到 size -1 之間的 component 的相關(guān) Offset
updateComponentOffsets(ci); // only need to do this here for components after the added ones
}
if (increaseWriterIndex && ci > cIndex && ci <= componentCount) {
// 本次添加的最后一個 components[ci - 1]
// 本次添加的第一個 components[cIndex]
// 最后一個 endOffset 減去第一個的 offset 就是本次添加的字節(jié)個數(shù)
writerIndex += components[ci - 1].endOffset - components[cIndex].offset;
}
}
}
這里我們重點介紹下 finally {}
代碼塊中的邏輯姆打。首先 addComponents0 方法中的核心邏輯是先通過 shiftComps 方法為接下來新創(chuàng)建出來的 Component 騰挪位置良姆,因為我們有可能是在原有 components 數(shù)組的中間位置插入。
然后會在一個 for ()
循環(huán)中不停的將新創(chuàng)建的 Component 放置到 components[ci]
位置上幔戏。
當(dāng)跳出 for 循環(huán)進(jìn)入 finally 代碼塊的時候玛追,ci 的值恰恰就是最后一個成功加入 components 數(shù)組的 Component 下一個位置,如下圖所示,假設(shè) components[0] 琐鲁, components[1] 晃洒,components[2] 是我們剛剛在 for 循環(huán)中插入的新值,那么 for 循環(huán)結(jié)束之后陆馁,ci 的值就是 3 。
如果 ci = componentCount
這恰恰說明我們一直是在 components 數(shù)組的末尾進(jìn)行插入合愈,這種情況下各個 Component 實例中的 [offset , endOffset) 都是連續(xù)的不需要做任何調(diào)整叮贩。
但如果 ci < componentCount
這就說明了我們是在原來 components 數(shù)組的中間位置處開始插入,下圖中的 components[3] 佛析,components[4] 是插入位置益老,當(dāng)插入完成之后 ci 的值為 5。
這時候就需要重新調(diào)整 components[5]寸莫,components[6] 中的 [offset , endOffset)
范圍捺萌,因為 shiftComps 方法只負(fù)責(zé)幫你騰挪位置,不負(fù)責(zé)重新調(diào)整 [offset , endOffset)
范圍膘茎,當(dāng)新的 Component 實例插入之后桃纯,原來彼此相鄰的 Component 實例之間的 [offset , endOffset)
就不連續(xù)了,所以這里需要重新調(diào)整辽狈。
比如下圖中所展示的情況慈参,原來的 components 數(shù)組包含五個 Component 實例,分別在 0 - 4 位置刮萌,它們之間原本的是連續(xù)的 [offset , endOffset)
驮配。
現(xiàn)在我們要在位置 3 ,4 處插入兩個新的 Component 實例着茸,所以原來的 components[3] 壮锻,components[4] 需要移動到 components[5] ,components[6] 的位置上涮阔,但 shiftComps 只負(fù)責(zé)移動而不負(fù)責(zé)重新調(diào)整它們的 [offset , endOffset)
猜绣。
當(dāng)新的 Component 實例插入之后,components[4]敬特,components[5] 掰邢,components[6] 之間的 [offset , endOffset)
就不連續(xù)了牺陶。所以需要通過 updateComponentOffsets
方法重新調(diào)整。
private void updateComponentOffsets(int cIndex) {
int size = componentCount;
if (size <= cIndex) {
return;
}
// 重新調(diào)整 components[5] 辣之,components[6] 之間的 [offset , endOffset)
int nextIndex = cIndex > 0 ? components[cIndex - 1].endOffset : 0;
for (; cIndex < size; cIndex++) {
Component c = components[cIndex];
// 重新調(diào)整 Component 的 offset 掰伸, endOffset
c.reposition(nextIndex);
nextIndex = c.endOffset;
}
}
void reposition(int newOffset) {
int move = newOffset - offset;
endOffset += move;
srcAdjustment -= move;
adjustment -= move;
offset = newOffset;
}
以上介紹的是正常情況下的邏輯,如果在執(zhí)行 for 循環(huán)的過程中出現(xiàn)了 break 或者發(fā)生了異常怀估,那么 ci 的值一定是小于 cIndex + count
的狮鸭。什么意思呢 ?
比如我們要向一個 components 數(shù)組 cIndex = 0
的位置插入 count = 5
個 Component 實例多搀,但是在插入第四個 Component 的時候歧蕉,也就是在 components[3] 的位置處出現(xiàn)了 break 或者異常的情況,那么就會退出 for 循環(huán)來到這里的 finally 代碼塊康铭。
此時的 ci 值為 3 惯退,cIndex + count 的值為 5,那么就說明出現(xiàn)了異常情況麻削。
值得我們注意的是蒸痹,components[3] 以及 components[4] 這兩個位置是之前通過 shiftComps 方法騰挪出來的春弥,由于異常情況的發(fā)生呛哟,這兩個位置將不會放置任何 Component 實例。
這樣一來 components 數(shù)組就出現(xiàn)了空洞匿沛,所以接下來我們還需要將 components[5] 扫责, components[6] 位置上的 Component 實例重新移動回 components[3] 以及 components[4] 的位置上。
由于異常情況逃呼,那些 ByteBuf 數(shù)組中沒有被添加進(jìn) CompositeByteBuf 的 ByteBuf 需要執(zhí)行 release 鳖孤。
2.8.6 consolidateIfNeeded
到現(xiàn)在為止一個空的 CompositeByteBuf 就算被填充好了,但是這里有一個問題抡笼,就是 CompositeByteBuf 中所能包含的 Component 實例個數(shù)是受到 maxNumComponents 限制的苏揣。
我們回顧一下整個 addComponents 的過程,好像還沒有一個地方對 Component 的個數(shù)做出限制推姻,甚至在 shiftComps 方法中還會對 components 數(shù)組進(jìn)行擴(kuò)容平匈。
那么這樣一來,Component 的個數(shù)有很大可能會超過 maxNumComponents 的限制藏古,如果當(dāng)前 CompositeByteBuf 中包含的 component 個數(shù)已經(jīng)超過了 maxNumComponents 增炭,那么就需要在 consolidate0
方法中,將所有的 component 合并拧晕。
private void consolidateIfNeeded() {
int size = componentCount;
// 如果當(dāng)前 component 的個數(shù)已經(jīng)超過了 maxNumComponents隙姿,則將所有 component 合并成一個
if (size > maxNumComponents) {
consolidate0(0, size);
}
}
在這里,Netty 會將當(dāng)前 CompositeByteBuf 中包含的所有 Component 合并成一個更大的 Component厂捞。合并之后 输玷,CompositeByteBuf 中就只包含一個 Component 了队丝。合并的核心邏輯如下:
根據(jù)當(dāng)前 CompositeByteBuf 的 capacity 重新申請一個更大的 ByteBuf ,該 ByteBuf 需要容納下 CompositeByteBuf 所能表示的所有字節(jié)欲鹏。
將所有 Component 底層的 buf 中存儲的內(nèi)容全部轉(zhuǎn)移到新的 ByteBuf 中炭玫,并釋放原有 buf 的內(nèi)存。
刪除 Component 數(shù)組中所有的 Component貌虾。
根據(jù)新的 ByteBuf 創(chuàng)建一個新的 Component 實例吞加,并放置在 components 數(shù)組的第一個位置上。
private void consolidate0(int cIndex, int numComponents) {
if (numComponents <= 1) {
return;
}
// 將 [cIndex , endCIndex) 之間的 Components 合并成一個
final int endCIndex = cIndex + numComponents;
final int startOffset = cIndex != 0 ? components[cIndex].offset : 0;
// 計算合并范圍內(nèi) Components 的存儲的字節(jié)總數(shù)
final int capacity = components[endCIndex - 1].endOffset - startOffset;
// 重新申請一個新的 ByteBuf
final ByteBuf consolidated = allocBuffer(capacity);
// 將合并范圍內(nèi)的 Components 中的數(shù)據(jù)全部轉(zhuǎn)移到新的 ByteBuf 中
for (int i = cIndex; i < endCIndex; i ++) {
components[i].transferTo(consolidated);
}
lastAccessed = null;
// 數(shù)據(jù)轉(zhuǎn)移完成之后尽狠,將合并之前的這些 components 刪除
removeCompRange(cIndex + 1, endCIndex);
// 將合并之后的新 Component 存儲在 cIndex 位置處
components[cIndex] = newComponent(consolidated, 0);
if (cIndex != 0 || numComponents != componentCount) {
// 如果 cIndex 不是從 0 開始的衔憨,那么就更新 newComponent 的相關(guān) offset
updateComponentOffsets(cIndex);
}
}
2.8.7 CompositeByteBuf 的應(yīng)用
當(dāng)我們在傳輸層采用 TCP 協(xié)議進(jìn)行數(shù)據(jù)傳輸?shù)臅r候,經(jīng)常會遇到半包或者粘包的問題袄膏,我們從 socket 中讀取出來的 ByteBuf 很大可能還構(gòu)不成一個完整的包践图,這樣一來,我們就需要將每次從 socket 中讀取出來的 ByteBuf 在用戶態(tài)緩存累加起來沉馆。
當(dāng)累加起來的 ByteBuf 達(dá)到一個完整的數(shù)據(jù)包之后码党,我們在從這個被緩存的 ByteBuf 中讀取字節(jié),然后進(jìn)行解碼斥黑,最后將解碼出來的對象沿著 pipeline 向后傳遞揖盘。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 緩存累加起來的 ByteBuf
ByteBuf cumulation;
// ByteBuf 的累加聚合器
private Cumulator cumulator = MERGE_CUMULATOR;
// 是否是第一次收包
private boolean first;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 用于存儲解碼之后的對象
CodecOutputList out = CodecOutputList.newInstance();
try {
// 第一次收包
first = cumulation == null;
// 將新進(jìn)來的 (ByteBuf) msg 與之前緩存的 cumulation 聚合累加起來
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 解碼
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
........ 省略 ........
// 解碼成功之后,就將解碼出來的對象沿著 pipeline 向后傳播
fireChannelRead(ctx, out, size);
}
} else {
ctx.fireChannelRead(msg);
}
}
}
Netty 為此專門定義了一個 Cumulator 接口锌奴,用于將每次從 socket 中讀取到的 ByteBuf 聚合累積起來兽狭。參數(shù) alloc 是一個 ByteBuf 分配器,用于在聚合的過程中如果涉及到擴(kuò)容鹿蜀,合并等操作可以用它來申請內(nèi)存箕慧。
參數(shù) cumulation 就是之前緩存起來的 ByteBuf,當(dāng)?shù)谝淮问瞻臅r候茴恰,這里的 cumulation 就是一個空的 ByteBuf —— Unpooled.EMPTY_BUFFER 颠焦。
參數(shù) in 則是本次剛剛從 socket 中讀取出來的 ByteBuf,可能是一個半包往枣,Cumulator 的作用就是將新讀取出來的 ByteBuf (in)伐庭,累加合并到之前緩存的 ByteBuf (cumulation)中。
public interface Cumulator {
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
Netty 提供了 Cumulator 接口的兩個實現(xiàn)婉商,一個是 MERGE_CUMULATOR 似忧, 另一個是 COMPOSITE_CUMULATOR 。
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
public static final Cumulator MERGE_CUMULATOR
public static final Cumulator COMPOSITE_CUMULATOR
}
MERGE_CUMULATOR 是 Netty 默認(rèn)的 Cumulator 丈秩,也是傳統(tǒng)意義上最為普遍的一種聚合 ByteBuf 的實現(xiàn)盯捌,它的核心思想是在聚合多個 ByteBuf 的時候,首先會申請一塊更大的內(nèi)存蘑秽,然后將這些需要被聚合的 ByteBuf 中的內(nèi)容全部拷貝到新的 ByteBuf 中饺著。然后釋放掉原來的 ByteBuf 箫攀。
效果就是將多個 ByteBuf 重新聚合成一個更大的 ByteBuf ,但這種方式涉及到內(nèi)存申請以及內(nèi)存拷貝的開銷幼衰,優(yōu)勢就是內(nèi)存都是連續(xù)的靴跛,讀取速度快。
另外一種實現(xiàn)就是 COMPOSITE_CUMULATOR 渡嚣,也是本小節(jié)的主題梢睛,它的核心思想是將多個 ByteBuf 聚合到一個 CompositeByteBuf 中,不需要額外申請內(nèi)存识椰,更不需要內(nèi)存的拷貝绝葡。
但由于 CompositeByteBuf 只是邏輯上的一個視圖 ByteBuf,其底層依賴的內(nèi)存還是原來的那些 ByteBuf腹鹉,所以就導(dǎo)致了 CompositeByteBuf 中的內(nèi)存不是連續(xù)的藏畅,在加上 CompositeByteBuf 的相關(guān) index 設(shè)計的比較復(fù)雜,所以在讀取速度方面可能會比 MERGE_CUMULATOR 更慢一點功咒,所以我們需要根據(jù)自己的場景來權(quán)衡考慮愉阎,靈活選擇。
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable()) {
// 之前緩存的已經(jīng)解碼完畢力奋,這里將它釋放榜旦,并從 in 開始重新累加。
cumulation.release();
return in;
}
CompositeByteBuf composite = null;
try {
// cumulation 是一個 CompositeByteBuf刊侯,說明 cumulation 之前是一個被聚合過的 ByteBuf
if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
composite = (CompositeByteBuf) cumulation;
// 這里需要保證 CompositeByteBuf 的 writerIndex 與 capacity 相等
// 因為我們需要每次在 CompositeByteBuf 的末尾聚合添加新的 ByteBuf
if (composite.writerIndex() != composite.capacity()) {
composite.capacity(composite.writerIndex());
}
} else {
// 如果 cumulation 不是 CompositeByteBuf章办,只是一個普通的 ByteBuf
// 說明 cumulation 之前還沒有被聚合過锉走,這里是第一次聚合滨彻,所以需要先創(chuàng)建一個空的 CompositeByteBuf
// 然后將 cumulation 添加到 CompositeByteBuf 中
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
}
// 將本次新接收到的 ByteBuf(in)添加累積到 CompositeByteBuf 中
composite.addFlattenedComponents(true, in);
in = null;
return composite;
} finally {
........ 省略聚合失敗的處理 ..........
}
}
};
3. Heap or Direct
在前面的幾個小節(jié)中,我們討論了很多 ByteBuf 的設(shè)計細(xì)節(jié)挪蹭,接下來讓我們跳出這些細(xì)節(jié)亭饵,重新站在全局的視角下來看一下 ByteBuf 的總體設(shè)計。
在 ByteBuf 的整個設(shè)計體系中梁厉,Netty 從 ByteBuf 內(nèi)存布局的角度上辜羊,將整個體系分為了 HeapByteBuf 和 DirectByteBuf 兩個大類。Netty 提供了 PlatformDependent.directBufferPreferred()
方法來指定在默認(rèn)情況下词顾,是否偏向于分配 Direct Memory八秃。
public final class PlatformDependent {
// 是否偏向于分配 Direct Memory
private static final boolean DIRECT_BUFFER_PREFERRED;
public static boolean directBufferPreferred() {
return DIRECT_BUFFER_PREFERRED;
}
}
要想使得 DIRECT_BUFFER_PREFERRED 為 true ,必須同時滿足以下兩個條件:
-Dio.netty.noPreferDirect
參數(shù)必須指定為 false(默認(rèn))肉盹。CLEANER 不為 NULL , 也就是需要 JDK 中包含有效的 CLEANER 機(jī)制昔驱。
static {
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP
&& !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
}
}
如果是安卓平臺,那么 CLEANER 直接就是 NOOP上忍,不會做任何判斷骤肛,默認(rèn)情況下直接走 Heap Memory , 除非特殊指定要走 Direct Memory纳本。
if (!isAndroid()) {
if (javaVersion() >= 9) {
// 檢查 sun.misc.Unsafe 類中是否包含有效的 invokeCleaner 方法
CLEANER = CleanerJava9.isSupported() ? new CleanerJava9() : NOOP;
} else {
// 檢查 java.nio.ByteBuffer 中是否包含了 cleaner 字段
CLEANER = CleanerJava6.isSupported() ? new CleanerJava6() : NOOP;
}
} else {
CLEANER = NOOP;
}
如果是 JDK 9 以上的版本,Netty 會檢查是否可以通過 sun.misc.Unsafe
的 invokeCleaner
方法正確執(zhí)行 DirectBuffer 的 Cleaner腋颠,如果執(zhí)行過程中發(fā)生異常繁成,那么 CLEANER 就為 NOOP,Netty 在默認(rèn)情況下就會走 Heap Memory淑玫。
public final class Unsafe {
public void invokeCleaner(java.nio.ByteBuffer directBuffer) {
if (!directBuffer.isDirect())
throw new IllegalArgumentException("buffer is non-direct");
theInternalUnsafe.invokeCleaner(directBuffer);
}
}
如果是 JDK 9 以下的版本巾腕,Netty 就會通過反射的方式先去獲取 DirectByteBuffer 的 cleaner 字段,如果 cleaner 為 null 或者在執(zhí)行 clean 方法的過程中出現(xiàn)了異常絮蒿,那么 CLEANER 就為 NOOP祠墅,Netty 在默認(rèn)情況下就會走 Heap Memory。
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer
{
private final Cleaner cleaner;
DirectByteBuffer(int cap) { // package-private
...... 省略 .....
base = UNSAFE.allocateMemory(size);
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
}
}
如果 PlatformDependent.directBufferPreferred()
方法返回 true ,那么 ByteBufAllocator 接下來在分配內(nèi)存的時候歌径,默認(rèn)情況下就會分配 directBuffer毁嗦。
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
// ByteBuf 分配器
public static final UnpooledByteBufAllocator DEFAULT =
new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());
}
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
// 是否默認(rèn)分配 directBuffer
private final boolean directByDefault;
protected AbstractByteBufAllocator(boolean preferDirect) {
directByDefault = preferDirect && PlatformDependent.hasUnsafe();
}
@Override
public ByteBuf buffer() {
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}
}
一般情況下,JDK 都會包含有效的 CLEANER 機(jī)制回铛,所以我們完全可以僅是通過 -Dio.netty.noPreferDirect
(默認(rèn) false)來控制 Netty 默認(rèn)情況下走 Direct Memory狗准。
但如果是安卓平臺,那么無論 -Dio.netty.noPreferDirect
如何設(shè)置茵肃,Netty 默認(rèn)情況下都會走 Heap Memory 腔长。
4. Cleaner or NoCleaner
站在內(nèi)存回收的角度,Netty 將 ByteBuf 分為了帶有 Cleaner 的 DirectByteBuf 和沒有 Cleaner 的 DirectByteBuf 兩個大類验残。在之前的文章《以 ZGC 為例捞附,談一談 JVM 是如何實現(xiàn) Reference 語義的》 中的第三小節(jié),筆者詳細(xì)的介紹過您没,JVM 如何利用 Cleaner 機(jī)制來回收 DirectByteBuffer 背后的 Native Memory 鸟召。
而 Cleaner 回收 DirectByteBuffer 的 Native Memory 需要依賴 GC 的發(fā)生,當(dāng)一個 DirectByteBuffer 沒有任何強(qiáng)引用或者軟引用的時候氨鹏,如果此時發(fā)生 GC , Cleaner 才會去回收 Native Memory欧募。如果很久都沒發(fā)生 GC ,那么這些 DirectByteBuffer 所引用的 Native Memory 將一直不會釋放。
所以僅僅是依賴 Cleaner 來釋放 Native Memory 是有一定延遲的仆抵,極端情況下跟继,如果一直等不來 GC ,很有可能就會發(fā)生 OOM 。
而 Netty 的 ByteBuf 設(shè)計相當(dāng)于是對 NIO ByteBuffer 的一種完善擴(kuò)展镣丑,其底層其實都會依賴一個 JDK 的 ByteBuffer舔糖。比如,前面介紹的 UnpooledDirectByteBuf 莺匠, UnpooledUnsafeDirectByteBuf 其底層依賴的就是 JDK DirectByteBuffer , 而這個 DirectByteBuffer 就是帶有 Cleaner 的 ByteBuf 金吗。
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
// 底層依賴的 JDK DirectByteBuffer
ByteBuffer buffer;
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 創(chuàng)建 DirectByteBuffer
setByteBuffer(allocateDirect(initialCapacity), false);
}
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
// 底層依賴的 JDK DirectByteBuffer 的內(nèi)存地址
long memoryAddress;
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 調(diào)用父類 UnpooledDirectByteBuf 構(gòu)建函數(shù)創(chuàng)建底層依賴的 JDK DirectByteBuffer
super(alloc, initialCapacity, maxCapacity);
}
@Override
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
super.setByteBuffer(buffer, tryFree);
// 獲取 JDK DirectByteBuffer 的內(nèi)存地址
memoryAddress = PlatformDependent.directBufferAddress(buffer);
}
在 JDK NIO 中,凡是通過 ByteBuffer.allocateDirect
方法申請到 DirectByteBuffer 都是帶有 Cleaer 的。
public abstract class ByteBuffer {
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
}
class DirectByteBuffer extends MappedByteBuffer implements DirectBuffer
{
private final Cleaner cleaner;
DirectByteBuffer(int cap) { // package-private
...... 省略 .....
// 通過該構(gòu)造函數(shù)申請到的 Direct Memory 會受到 -XX:MaxDirectMemorySize 參數(shù)的限制
Bits.reserveMemory(size, cap);
// 底層調(diào)用 malloc 申請內(nèi)存
base = UNSAFE.allocateMemory(size);
...... 省略 .....
// 創(chuàng)建 Cleaner
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
}
}
而帶有 Cleaner 的 DirectByteBuffer 背后所能引用的 Direct Memory 是受到 -XX:MaxDirectMemorySize
JVM 參數(shù)限制的辽聊。由于 UnpooledDirectByteBuf 以及 UnpooledUnsafeDirectByteBuf 都帶有 Cleaner纪挎,所以當(dāng)他們在系統(tǒng)中沒有任何強(qiáng)引用或者軟引用的時候,如果發(fā)生 GC , Cleaner 就會釋放他們的 Direct Memory 跟匆。
由于 Cleaner 執(zhí)行會依賴 GC , 而 GC 的發(fā)生往往不那么及時异袄,會有一定的延時,所以 Netty 為了可以及時的釋放 Direct Memory 玛臂,往往選擇不依賴 JDK 的 Cleaner 機(jī)制烤蜕,手動進(jìn)行釋放。所以就有了 NoCleaner 類型的 DirectByteBuf —— UnpooledUnsafeNoCleanerDirectByteBuf 迹冤。
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
// 創(chuàng)建沒有 Cleaner 的 JDK DirectByteBuffer
return PlatformDependent.allocateDirectNoCleaner(initialCapacity);
}
@Override
protected void freeDirect(ByteBuffer buffer) {
// 既然沒有了 Cleaner 讽营, 所以 Netty 要手動進(jìn)行釋放
PlatformDependent.freeDirectNoCleaner(buffer);
}
}
UnpooledUnsafeNoCleanerDirectByteBuf 的底層同樣也會依賴一個 JDK DirectByteBuffer , 但和之前不同的是,這里的 DirectByteBuffer 是不帶有 cleaner 的泡徙。
我們通過 JNI 來調(diào)用 DirectByteBuffer(long addr, int cap)
構(gòu)造函數(shù)創(chuàng)建出來的 JDK DirectByteBuffer 都是沒有 cleaner 的橱鹏。但通過這種方式創(chuàng)建出來的 DirectByteBuffer 背后引用的 Native Memory 是不會受到 -XX:MaxDirectMemorySize
JVM 參數(shù)限制的。
class DirectByteBuffer {
// Invoked only by JNI: NewDirectByteBuffer(void*, long)
private DirectByteBuffer(long addr, int cap) {
super(-1, 0, cap, cap, null);
address = addr;
// cleaner 為 null
cleaner = null;
}
}
既然沒有了 cleaner 堪藐, 所以 Netty 就無法依賴 GC 來釋放 Direct Memory 了莉兰,這就要求 Netty 必須手動調(diào)用 freeDirect
方法及時地釋放 Direct Memory。
事實上礁竞,無論 Netty 中的 DirectByteBuf 有沒有 Cleaner糖荒, Netty 都會選擇手動的進(jìn)行釋放,目的就是為了避免 GC 的延遲 模捂, 從而及時的釋放 Direct Memory捶朵。
那么 Netty 中的 DirectByteBuf 在什么情況下帶有 Cleaner,又在什么情況下不帶 Cleaner 呢 狂男?我們可以通過 PlatformDependent.useDirectBufferNoCleaner
方法的返回值進(jìn)行判斷:
public final class PlatformDependent {
// Netty 的 DirectByteBuf 是否帶有 Cleaner
private static final boolean USE_DIRECT_BUFFER_NO_CLEANER;
public static boolean useDirectBufferNoCleaner() {
return USE_DIRECT_BUFFER_NO_CLEANER;
}
}
USE_DIRECT_BUFFER_NO_CLEANER = TRUE 表示 Netty 創(chuàng)建出來的 DirectByteBuf 不帶有 Cleaner 综看。 Direct Memory 的用量不會受到 JVM 參數(shù) -
XX:MaxDirectMemorySize
的限制。USE_DIRECT_BUFFER_NO_CLEANER = FALSE 表示 Netty 創(chuàng)建出來的 DirectByteBuf 帶有 Cleaner 并淋。 Direct Memory 的用量會受到 JVM 參數(shù) -
XX:MaxDirectMemorySize
的限制寓搬。
我們可以通過 -Dio.netty.maxDirectMemory
來設(shè)置 USE_DIRECT_BUFFER_NO_CLEANER 的值,除此之外县耽,該參數(shù)還可以指定在 Netty 層面上可以使用的最大 DirectMemory 用量。
io.netty.maxDirectMemory = 0
那么 USE_DIRECT_BUFFER_NO_CLEANER 就為 FALSE , 表示在 Netty 層面創(chuàng)建出來的 DirectByteBuf 都是帶有 Cleaner 的镣典,這種情況下 Netty 并不會限制 maxDirectMemory 的用量兔毙,因為限制了也沒用,具體能用多少 maxDirectMemory兄春,還是由 JVM 參數(shù) -XX:MaxDirectMemorySize
決定的澎剥。
io.netty.maxDirectMemory < 0
,默認(rèn)為 -1赶舆,也就是在默認(rèn)情況下 USE_DIRECT_BUFFER_NO_CLEANER 為 TRUE , 創(chuàng)建出來的 DirectByteBuf 都是不帶 Cleaner 的哑姚。由于在這種情況下 maxDirectMemory 的用量并不會受到 JVM 參數(shù) -XX:MaxDirectMemorySize
的限制祭饭,所以在 Netty 層面上必須限制 maxDirectMemory 的用量,默認(rèn)值就是 -XX:MaxDirectMemorySize
指定的值叙量。
這里需要特別注意的是倡蝙,Netty 層面對于 maxDirectMemory 的容量限制和 JVM 層面對于 maxDirectMemory 的容量限制是單獨分別計算的,互不影響绞佩。因此站在 JVM 進(jìn)程的角度來說寺鸥,總體 maxDirectMemory 的用量是 -XX:MaxDirectMemorySize
的兩倍。
io.netty.maxDirectMemory > 0
的情況和小于 0 的情況一樣品山,唯一不同的是 Netty 層面的 maxDirectMemory 用量是專門由 -Dio.netty.maxDirectMemory
參數(shù)指定胆建,仍然獨立于 JVM 層面的 maxDirectMemory 限制之外單獨計算。
所以從這個層面來說肘交,Netty 設(shè)計 NoCleaner 類型的 DirectByteBuf 的另外一個目的就是為了突破 JVM 對于 maxDirectMemory 用量的限制笆载。
public final class PlatformDependent {
// Netty 層面 Direct Memory 的用量統(tǒng)計
// 為 NULL 表示在 Netty 層面不進(jìn)行特殊限制,完全由 JVM 進(jìn)行限制 Direct Memory 的用量
private static final AtomicLong DIRECT_MEMORY_COUNTER;
// Netty 層面 Direct Memory 的最大用量
private static final long DIRECT_MEMORY_LIMIT;
// JVM 指定的 -XX:MaxDirectMemorySize 最大堆外內(nèi)存
private static final long MAX_DIRECT_MEMORY = maxDirectMemory0();
static {
long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);
if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
// maxDirectMemory = 0 表示后續(xù)創(chuàng)建的 DirectBuffer 是帶有 Cleaner 的涯呻,Netty 自己不會強(qiáng)制限定 maxDirectMemory 的用量宰译,完全交給 JDK 的 maxDirectMemory 來限制
// 因為 Netty 限制了也沒用,其底層依然依賴的是 JDK DirectBuffer(Cleaner)魄懂,JDK 會限制 maxDirectMemory 的用量
// 在沒有 Unsafe 的情況下沿侈,那么就必須使用 Cleaner,因為如果不使用 Cleaner 的話市栗,又沒有 Unsafe缀拭,我們就無法釋放 Native Memory 了
// 如果 JDK 本身不包含創(chuàng)建 NoCleaner DirectBuffer 的構(gòu)造函數(shù) —— DirectByteBuffer(long, int),那么自然只能使用 Cleaner
USE_DIRECT_BUFFER_NO_CLEANER = false;
// Netty 自身不會統(tǒng)計 Direct Memory 的用量填帽,完全交給 JDK 來統(tǒng)計
DIRECT_MEMORY_COUNTER = null;
} else {
USE_DIRECT_BUFFER_NO_CLEANER = true;
if (maxDirectMemory < 0) {
// maxDirectMemory < 0 (默認(rèn) -1) 后續(xù)創(chuàng)建 NoCleaner DirectBuffer
// Netty 層面會單獨限制 maxDirectMemory 用量蛛淋,maxDirectMemory 的值與 -XX:MaxDirectMemorySize 的值相同
// 因為 JDK 不會統(tǒng)計和限制 NoCleaner DirectBuffer 的用量
// 注意,這里 Netty 的 maxDirectMemory 和 JDK 的 maxDirectMemory 是分別單獨統(tǒng)計的
// 在 JVM 進(jìn)程的角度來說篡腌,整體 maxDirectMemory 的用量是 -XX:MaxDirectMemorySize 的兩倍(Netty用的和 JDK 用的之和)
maxDirectMemory = MAX_DIRECT_MEMORY;
if (maxDirectMemory <= 0) {
DIRECT_MEMORY_COUNTER = null;
} else {
// 統(tǒng)計 Netty DirectMemory 的用量
DIRECT_MEMORY_COUNTER = new AtomicLong();
}
} else {
// maxDirectMemory > 0 后續(xù)創(chuàng)建 NoCleaner DirectBuffer,Netty 層面的 maxDirectMemory 就是 io.netty.maxDirectMemory 指定的值
DIRECT_MEMORY_COUNTER = new AtomicLong();
}
}
logger.debug("-Dio.netty.maxDirectMemory: {} bytes", maxDirectMemory);
DIRECT_MEMORY_LIMIT = maxDirectMemory >= 1 ? maxDirectMemory : MAX_DIRECT_MEMORY;
}
}
當(dāng) Netty 層面的 direct memory 用量超過了 -Dio.netty.maxDirectMemory
參數(shù)指定的值時褐荷,那么就會拋出 OutOfDirectMemoryError
,分配 DirectByteBuf 將會失敗嘹悼。
private static void incrementMemoryCounter(int capacity) {
if (DIRECT_MEMORY_COUNTER != null) {
long newUsedMemory = DIRECT_MEMORY_COUNTER.addAndGet(capacity);
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
throw new OutOfDirectMemoryError("failed to allocate " + capacity
+ " byte(s) of direct memory (used: " + (newUsedMemory - capacity)
+ ", max: " + DIRECT_MEMORY_LIMIT + ')');
}
}
}
5. Unsafe or NoUnsafe
站在內(nèi)存訪問方式的角度上來說 叛甫, Netty 又會將 ByteBuf 分為了 Unsafe 和 NoUnsafe 兩個大類,其中 NoUnsafe 的內(nèi)存訪問方式是依賴底層的 JDK ByteBuffer杨伙,對于 Netty ByteBuf 的任何操作最終都是會代理給底層 JDK 的 ByteBuffer其监。
public class UnpooledDirectByteBuf extends AbstractReferenceCountedByteBuf {
// 底層依賴的 JDK DirectByteBuffer
ByteBuffer buffer;
@Override
protected byte _getByte(int index) {
return buffer.get(index);
}
@Override
protected void _setByte(int index, int value) {
buffer.put(index, (byte) value);
}
}
而 Unsafe 的內(nèi)存訪問方式則是通過 sun.misc.Unsafe
類中提供的眾多 low-level direct buffer access API 來對內(nèi)存地址直接進(jìn)行訪問,由于是脫離 JVM 相關(guān)規(guī)范直接對內(nèi)存地址進(jìn)行訪問限匣,所以我們在調(diào)用 Unsafe 相關(guān)方法的時候需要考慮 JVM 以及 OS 的各種細(xì)節(jié)抖苦,一不小心就會踩坑出錯,所以它是一種不安全的訪問方式,但是足夠靈活锌历,高效贮庞。
public class UnpooledUnsafeDirectByteBuf extends UnpooledDirectByteBuf {
// 底層依賴的 JDK DirectByteBuffer 的內(nèi)存地址
long memoryAddress;
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
final long addr(int index) {
// 直接通過內(nèi)存地址進(jìn)行訪問
return memoryAddress + index;
}
@Override
protected void _setByte(int index, int value) {
UnsafeByteBufUtil.setByte(addr(index), value);
}
}
Netty 提供了 -Dio.netty.noUnsafe
參數(shù)來讓我們決定是否采用 Unsafe 的內(nèi)存訪問方式,默認(rèn)值是 false , 表示 Netty 默認(rèn)開啟 Unsafe 訪問方式究西。
final class PlatformDependent0 {
// 是否明確禁用 Unsafe窗慎,null 表示開啟 Unsafe
private static final Throwable EXPLICIT_NO_UNSAFE_CAUSE = explicitNoUnsafeCause0();
private static Throwable explicitNoUnsafeCause0() {
final boolean noUnsafe = SystemPropertyUtil.getBoolean("io.netty.noUnsafe", false);
logger.debug("-Dio.netty.noUnsafe: {}", noUnsafe);
if (noUnsafe) {
logger.debug("sun.misc.Unsafe: unavailable (io.netty.noUnsafe)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (io.netty.noUnsafe)");
}
return null;
}
}
在確認(rèn)開啟了 Unsafe 方式之后,我們就需要近一步確認(rèn)在當(dāng)前 JRE 的 classpath 下是否存在 sun.misc.Unsafe
類怔揩,是否能通過反射的方式獲取到 Unsafe 實例 —— theUnsafe 捉邢。
public final class Unsafe {
// Unsafe 實例
private static final Unsafe theUnsafe = new Unsafe();
}
final class PlatformDependent0 {
// 驗證 Unsafe 是否可用,null 表示 Unsafe 是可用狀態(tài)
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE;
static {
// 嘗試通過反射的方式拿到 theUnsafe 實例
final Object maybeUnsafe = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
Throwable cause = ReflectionUtil.trySetAccessible(unsafeField, false);
if (cause != null) {
return cause;
}
// the unsafe instance
return unsafeField.get(null);
} catch (NoSuchFieldException e) {
return e;
} catch (SecurityException e) {
return e;
} catch (IllegalAccessException e) {
return e;
} catch (NoClassDefFoundError e) {
// Also catch NoClassDefFoundError in case someone uses for example OSGI and it made
// Unsafe unloadable.
return e;
}
}
});
}
}
在獲取到 Unsafe 實例之后商膊,我們還需要檢查 Unsafe 中是否包含所有 Netty 用到的 low-level direct buffer access API 伏伐,確保這些 API 可以正常有效的運行。比如晕拆,是否包含 copyMemory
方法藐翎。
public final class Unsafe {
@ForceInline
public void copyMemory(Object srcBase, long srcOffset,
Object destBase, long destOffset,
long bytes) {
theInternalUnsafe.copyMemory(srcBase, srcOffset, destBase, destOffset, bytes);
}
}
是否可以通過 Unsafe 訪問到 NIO Buffer 的 address 字段,因為后續(xù)我們需要直接操作內(nèi)存地址实幕。
public abstract class Buffer {
// 內(nèi)存地址
long address;
}
在整個過程中如果發(fā)生任何異常吝镣,則表示在當(dāng)前 classpath 下,不存在 sun.misc.Unsafe
類或者是由于不同版本 JDK 的設(shè)計昆庇,Unsafe 中沒有 Netty 所需要的一些必要的訪存 API 末贾。這樣一來我們就無法使用 Unsafe,內(nèi)存的訪問方式就需要回退到 NoUnsafe整吆。
if (maybeUnsafe instanceof Throwable) {
unsafe = null;
unsafeUnavailabilityCause = (Throwable) maybeUnsafe;
logger.debug("sun.misc.Unsafe.theUnsafe: unavailable", (Throwable) maybeUnsafe);
} else {
unsafe = (Unsafe) maybeUnsafe;
logger.debug("sun.misc.Unsafe.theUnsafe: available");
}
// 為 null 表示 Unsafe 可用
UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause;
UNSAFE = unsafe;
如果在整個過程中沒有發(fā)生任何異常拱撵,我們獲取到了一個有效的 UNSAFE 實例,那么后續(xù)將正式開啟 Unsafe 的內(nèi)存訪問方式表蝙。
final class PlatformDependent0 {
static boolean hasUnsafe() {
return UNSAFE != null;
}
}
完整的 hasUnsafe()
判斷邏輯如下:
如果當(dāng)前平臺是安卓或者 .NET 拴测,則不能開啟 Unsafe,因為這些平臺并不包含
sun.misc.Unsafe
類府蛇。-Dio.netty.noUnsafe
參數(shù)需要設(shè)置為 false (默認(rèn)開啟)集索。
3.. 當(dāng)前 classpath 下是否包含有效的 sun.misc.Unsafe
類。
- Unsafe 實例需要包含必要的訪存 API 汇跨。
public final class PlatformDependent {
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE = unsafeUnavailabilityCause0();
public static boolean hasUnsafe() {
return UNSAFE_UNAVAILABILITY_CAUSE == null;
}
private static Throwable unsafeUnavailabilityCause0() {
if (isAndroid()) {
logger.debug("sun.misc.Unsafe: unavailable (Android)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (Android)");
}
if (isIkvmDotNet()) {
logger.debug("sun.misc.Unsafe: unavailable (IKVM.NET)");
return new UnsupportedOperationException("sun.misc.Unsafe: unavailable (IKVM.NET)");
}
Throwable cause = PlatformDependent0.getUnsafeUnavailabilityCause();
if (cause != null) {
return cause;
}
try {
boolean hasUnsafe = PlatformDependent0.hasUnsafe();
logger.debug("sun.misc.Unsafe: {}", hasUnsafe ? "available" : "unavailable");
return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause();
} catch (Throwable t) {
logger.trace("Could not determine if Unsafe is available", t);
// Probably failed to initialize PlatformDependent0.
return new UnsupportedOperationException("Could not determine if Unsafe is available", t);
}
}
}
如果 PlatformDependent.hasUnsafe()
方法返回 true , 那么后續(xù) Netty 都會創(chuàng)建 Unsafe 類型的 ByteBuf务荆。
6. Pooled or Unpooled
站在內(nèi)存管理的角度上來講,Netty 將 ByteBuf 分為了 池化(Pooled) 和 非池化(Unpooled)兩個大類扰法,其中 Unpooled 類型的 ByteBuf 是用到的時候才去臨時創(chuàng)建蛹含,使用完的時候再去釋放。
而 Direct Memory 的申請和釋放開銷相較于 Heap Memory 會大很多垮刹,Netty 在面對高并發(fā)網(wǎng)絡(luò)通信的場景下匾嘱,Direct Memory 的申請和釋放是一個非常頻繁的操作遭京,這種大量頻繁地內(nèi)存申請釋放操作對程序的性能影響是巨大的惶凝,因此 Netty 引入了內(nèi)存池將這些 Direct Memory 統(tǒng)一池化管理起來。
Netty 提供了 -Dio.netty.allocator.type
參數(shù)來讓我們決定是否采用內(nèi)存池來管理 ByteBuf 贫橙, 默認(rèn)值是 pooled
, 也就是說 Netty 默認(rèn)是采用池化的方式來管理 PooledByteBuf 。如果是安卓平臺,那么默認(rèn)是使用非池化的 ByteBuf (unpooled)蓬推。
當(dāng)參數(shù)
io.netty.allocator.type
的值為 pooled 時,Netty 的默認(rèn) ByteBufAllocator 是PooledByteBufAllocator.DEFAULT
澡腾。當(dāng)參數(shù)
io.netty.allocator.type
的值為 unpooled 時沸伏,Netty 的默認(rèn) ByteBufAllocator 是UnpooledByteBufAllocator.DEFAULT
。
public final class ByteBufUtil {
// 默認(rèn) PooledByteBufAllocator动分,池化管理 ByteBuf
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static {
// 默認(rèn)為 pooled
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = alloc;
}
}
后續(xù) Netty 在創(chuàng)建 SocketChannel 的時候毅糟,在 SocketChannelConfig 中指定的 ByteBufAllocator 就是這里的 ByteBufUtil.DEFAULT_ALLOCATOR
,默認(rèn)情況下為 PooledByteBufAllocator澜公。
public interface ByteBufAllocator {
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
}
public class DefaultChannelConfig implements ChannelConfig {
// PooledByteBufAllocator
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
}
當(dāng) Netty 讀取 Socket 中的網(wǎng)絡(luò)數(shù)據(jù)時姆另,首先會從 DefaultChannelConfig 中將 ByteBufAllocator 獲取到,然后利用 ByteBufAllocator 從內(nèi)存池中獲取一個 DirectByteBuf 坟乾,最后將 Socket 中的數(shù)據(jù)讀取到 DirectByteBuf 中迹辐,隨后沿著 pipeline 向后傳播,進(jìn)行 IO 處理甚侣。
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
// 獲取 SocketChannelConfig
final ChannelConfig config = config();
// 獲取 ByteBufAllocator 明吩, 默認(rèn)為 PooledByteBufAllocator
final ByteBufAllocator allocator = config.getAllocator();
// 從內(nèi)存池中獲取 byteBuf
byteBuf = allocHandle.allocate(allocator);
// 讀取 socket 中的數(shù)據(jù)到 byteBuf
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 將 byteBuf 沿著 pipeline 向后傳播
pipeline.fireChannelRead(byteBuf);
....... 省略 .......
}
}
除此之外,Netty 還提供了 ChannelOption.ALLOCATOR
選項殷费,讓我們可以在配置 ServerBootstrap 的時候為 SocketChannel 靈活指定自定義的 ByteBufAllocator 印荔。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
// 靈活配置 ByteBufAllocator
.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT;);
這里通過 ChannelOption 來配置 Socket 相關(guān)的屬性是最高優(yōu)先級的,它會覆蓋掉一切默認(rèn)配置宗兼。
7. Metric
在第四小節(jié)中躏鱼,我們介紹了 Cleaner 和 NoCleaner 這兩種 DirectByteBuf,其中 CleanerDirectByteBuf 的整體 Direct Memory 的用量是受到 JVM 參數(shù) -XX:MaxDirectMemorySize
限制的殷绍,而 NoCleanerDirectByteBuf 的整體 Direct Memory 可以突破該參數(shù)的限制染苛,JVM 并不會統(tǒng)計這塊 Direct Memory 的用量。
Netty 為了及時地釋放這些 Direct Memory主到,通常默認(rèn)選擇 NoCleanerDirectByteBuf茶行,這就要求 Netty 需要對這部分 Direct Memory 的用量進(jìn)行自行統(tǒng)計限制。NoCleanerDirectByteBuf 的最大可用 Direct Memory 我們可以通過 -Dio.netty.maxDirectMemory
來指定登钥,默認(rèn)情況下等于 -XX:MaxDirectMemorySize
設(shè)置的值畔师。
PlatformDependent 類中的 DIRECT_MEMORY_COUNTER
字段用于統(tǒng)計在 Netty 層面上,所有 NoCleanerDirectByteBuf 占用的 Direct Memory 大小牧牢。注意這里并不會統(tǒng)計 CleanerDirectByteBuf 的 Direct Memory 占用看锉,這部分統(tǒng)計由 JVM 負(fù)責(zé)姿锭。
public final class PlatformDependent {
// 用于統(tǒng)計 NoCleaner 的 DirectByteBuf 所引用的 Native Memory 大小
private static final AtomicLong DIRECT_MEMORY_COUNTER;
public static ByteBuffer allocateDirectNoCleaner(int capacity) {
// 增加 Native Memory 用量統(tǒng)計
incrementMemoryCounter(capacity);
try {
// 分配 Native Memory
// 初始化 NoCleaner 的 DirectByteBuffer
return PlatformDependent0.allocateDirectNoCleaner(capacity);
} catch (Throwable e) {
decrementMemoryCounter(capacity);
throwException(e);
return null;
}
public static void freeDirectNoCleaner(ByteBuffer buffer) {
int capacity = buffer.capacity();
// 釋放 Native Memory
PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer));
// 減少 Native Memory 用量統(tǒng)計
decrementMemoryCounter(capacity);
}
}
PlatformDependent 類是 Netty 最底層的一個類,所有內(nèi)存的分配伯铣,釋放動作最終都是在該類中執(zhí)行呻此,因此 DIRECT_MEMORY_COUNTER 字段統(tǒng)計的是全局的 Direct Memory 大小(Netty 層面)腔寡。
每一次的內(nèi)存申請 —— allocateDirectNoCleaner 焚鲜, 都會增加 DIRECT_MEMORY_COUNTER 計數(shù),每一次的內(nèi)存釋放 —— freeDirectNoCleaner放前,都會減少 DIRECT_MEMORY_COUNTER 計數(shù)忿磅。
我們可以通過 PlatformDependent.usedDirectMemory()
方法來獲取 Netty 當(dāng)前所占用的 Direct Memory 大小。但如果我們特殊指定了需要使用 CleanerDirectByteBuf 凭语, 比如葱她,將 -Dio.netty.maxDirectMemory
參數(shù)設(shè)置為 0
, 那么這里將會返回 -1 。
private static void incrementMemoryCounter(int capacity) {
// 只統(tǒng)計 NoCleaner 的 DirectByteBuf 所引用的 Native Memory
if (DIRECT_MEMORY_COUNTER != null) {
long newUsedMemory = DIRECT_MEMORY_COUNTER.addAndGet(capacity);
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
throw new OutOfDirectMemoryError("failed to allocate " + capacity
+ " byte(s) of direct memory (used: " + (newUsedMemory - capacity)
+ ", max: " + DIRECT_MEMORY_LIMIT + ')');
}
}
}
private static void decrementMemoryCounter(int capacity) {
if (DIRECT_MEMORY_COUNTER != null) {
long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
assert usedMemory >= 0;
}
}
public static long usedDirectMemory() {
return DIRECT_MEMORY_COUNTER != null ? DIRECT_MEMORY_COUNTER.get() : -1;
}
除了 PlatformDependent 這里的全局統(tǒng)計之外叽粹,Netty 還提供了以 ByteBufAllocator 為粒度的內(nèi)存占用統(tǒng)計览效,統(tǒng)計的維度包括 Heap Memory 的占用和 Direct Memory 的占用。
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
// 從該 ByteBufAllocator 分配出去的內(nèi)存統(tǒng)計
private final UnpooledByteBufAllocatorMetric metric = new UnpooledByteBufAllocatorMetric();
@Override
public ByteBufAllocatorMetric metric() {
return metric;
}
// 統(tǒng)計 Direct Memory 的占用
void incrementDirect(int amount) {
metric.directCounter.add(amount);
}
void decrementDirect(int amount) {
metric.directCounter.add(-amount);
}
// 統(tǒng)計 Heap Memory 的占用
void incrementHeap(int amount) {
metric.heapCounter.add(amount);
}
void decrementHeap(int amount) {
metric.heapCounter.add(-amount);
}
}
Netty 定義的每一個 ByteBufAllocator 中虫几,都會有一個 ByteBufAllocatorMetric 類型的字段锤灿,該類定義兩個計數(shù)字段:directCounter,heapCounter辆脸。 分別用于統(tǒng)計 Direct Memory 和 Heap Memory 的占用但校。
private static final class UnpooledByteBufAllocatorMetric implements ByteBufAllocatorMetric {
final LongCounter directCounter = PlatformDependent.newLongCounter();
final LongCounter heapCounter = PlatformDependent.newLongCounter();
@Override
public long usedHeapMemory() {
return heapCounter.value();
}
@Override
public long usedDirectMemory() {
return directCounter.value();
}
@Override
public String toString() {
return StringUtil.simpleClassName(this) +
"(usedHeapMemory: " + usedHeapMemory() + "; usedDirectMemory: " + usedDirectMemory() + ')';
}
}
因此從內(nèi)存占用統(tǒng)計的角度上來說,Netty 又會將整個 ByteBuf 體系分為 Instrumented 和 NoInstrumented 兩大類啡氢,帶有 Instrumented 前綴的 ByteBuf 状囱,無論你是 Heap or Direct , Cleaner or NoCleaner倘是,Unsafe or NoUnsafe 類型的 ByteBuf 亭枷,Netty 都會統(tǒng)計這部分內(nèi)存占用。
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
extends UnpooledUnsafeNoCleanerDirectByteBuf {
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 構(gòu)造普通的 UnpooledUnsafeNoCleanerDirectByteBuf
super(alloc, initialCapacity, maxCapacity);
}
// 分配搀崭,釋放 的時候更新 Direct Memory
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
// 構(gòu)造普通的 UnpooledUnsafeDirectByteBuf
super(alloc, initialCapacity, maxCapacity);
}
// 分配叨粘,釋放 的時候更新 Direct Memory
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).incrementDirect(buffer.capacity());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).decrementDirect(capacity);
}
}
8. ByteBufAllocator
在 Netty 中,ByteBuf 的創(chuàng)建必須通過 ByteBufAllocator 進(jìn)行瘤睹,不能直接顯示地調(diào)用 ByteBuf 相關(guān)的構(gòu)造函數(shù)自行創(chuàng)建升敲。Netty 定義了兩種類型的 ByteBufAllocator :
PooledByteBufAllocator 負(fù)責(zé)池化 ByteBuf,這里正是 Netty 內(nèi)存管理的核心轰传,在下一篇文章中驴党,筆者會詳細(xì)的和大家介紹它。
UnpooledByteBufAllocator 負(fù)責(zé)分配非池化的 ByteBuf获茬,創(chuàng)建 ByteBuf 的時候臨時向 OS 申請 Native Memory 港庄,使用完之后倔既,需要及時的手動調(diào)用 release 將 Native Memory 釋放給 OS 。
-Dio.netty.allocator.type
參數(shù)可以讓我們自行選擇 ByteBufAllocator 的類型攘轩,默認(rèn)值為 pooled
, Netty 默認(rèn)是采用池化的方式來管理 ByteBuf 叉存。
public interface ByteBufAllocator {
// 默認(rèn)為 PooledByteBufAllocator
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
}
除了以上兩種官方定義的 ByteBufAllocator 之外码俩,我們還可以根據(jù)自己實際業(yè)務(wù)場景來自行定制 ByteBufAllocator 度帮, 然后通過第六小節(jié)中介紹的 ChannelOption.ALLOCATOR
選項,將 ByteBufAllocator 靈活指定為我們自行定制的實現(xiàn)稿存。
對于 UnpooledByteBuf 來說笨篷,Netty 還專門提供了一個工具類 Unpooled
,這里定義實現(xiàn)了很多針對 ByteBuf 的實用操作瓣履,比如率翅,allocate,wrapped袖迎,copied 等冕臭。這里筆者以 DirectByteBuf 的創(chuàng)建為例進(jìn)行說明:
public final class Unpooled {
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
public static ByteBuf directBuffer() {
return ALLOC.directBuffer();
}
}
Unpooled 底層依賴了 UnpooledByteBufAllocator , 所有對 ByteBuf 的創(chuàng)建動作最終都會代理給這個 Allocator 燕锥。在 DirectBuffer 的創(chuàng)建過程中辜贵,我們可以看到前面介紹的所有類型的 ByteBuf。
public final class UnpooledByteBufAllocator {
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// 是否啟動內(nèi)存泄露探測归形,如果啟動則額外用 LeakAwareByteBuf 進(jìn)行包裝返回
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
}
首先 Netty 創(chuàng)建出來的所有 ByteBuf 都是帶有 Metric 統(tǒng)計的托慨,具體的 ByteBuf 類型都會帶有 Instrumented 前綴。
如果當(dāng)前 JRE 環(huán)境支持 Unsafe 暇榴, 那么后續(xù)就會通過 Unsafe 的方式來對 ByteBuf 進(jìn)行相關(guān)操作(默認(rèn))厚棵,具體的 ByteBuf 類型都會帶有 Unsafe 前綴。
如果我們明確指定了 NoCleaner 類型的 DirectByteBuf(默認(rèn))蔼紧,那么創(chuàng)建出來的 ByteBuf 類型就會帶有 NoCleaner 前綴婆硬,由于沒有 Cleaner ,這就要求我們使用完 ByteBuf 的時候必須及時地手動進(jìn)行釋放奸例。
如果我們開啟了內(nèi)存泄露探測彬犯,那么創(chuàng)建流程的最后,Netty 會用一個 LeakAwareByteBuf 去包裝新創(chuàng)建出來的 ByteBuf哩至,當(dāng)這個 ByteBuf 被 GC 的時候躏嚎,Netty 會通過相關(guān)引用計數(shù)來判斷是否存在忘記 release 的情況,從而確定出是否發(fā)生內(nèi)存泄露菩貌。
總結(jié)
本文筆者從八個角度為大家詳細(xì)的剖析了 ByteBuf 的整體設(shè)計卢佣,這八個角度分別是:內(nèi)存區(qū)域分布的角度,內(nèi)存管理的角度箭阶,內(nèi)存訪問的角度虚茶,內(nèi)存回收的角度戈鲁,內(nèi)存統(tǒng)計 Metric 的角度,零拷貝的角度嘹叫,引用計數(shù)的角度婆殿,擴(kuò)容的角度。
到現(xiàn)在為止罩扇,我們只是掃清了 Netty 內(nèi)存管理外圍的一些障礙婆芦,那么下一篇文章,筆者將帶大家深入到內(nèi)存管理的核心喂饥,徹底讓大家弄懂 Netty 的內(nèi)存管理機(jī)制消约。好了,本文的內(nèi)容就到這里员帮,我們下篇文章見~~~