在前文分析了ByteBuf的抽象類實(shí)現(xiàn)框架甘邀,現(xiàn)在開始分析最底層的實(shí)現(xiàn)類凝赛。分為兩種情形:Unpooled和Pooled窿侈,首先看Unpooled。
1.UnpooledHeapByteBuf
該Bytebuf的底層為不使用對(duì)象池技術(shù)的JAVA堆字節(jié)數(shù)組淤井,首先看其中的成員變量:
private final ByteBufAllocator alloc; // 分配器
byte[] array; // 底層字節(jié)數(shù)組
private ByteBuffer tmpNioBuf; // NIO的ByteBuffer形式
只需要著重關(guān)注array
變量,它是位于JAVA堆的字節(jié)數(shù)組摊趾。
再看一個(gè)構(gòu)造方法(忽略其中的參數(shù)檢查):
protected UnpooledHeapByteBuf(ByteBufAllocator alloc,
int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
實(shí)現(xiàn)也很簡(jiǎn)單币狠,只需關(guān)注allocateArray()
方法,分配一個(gè)數(shù)組砾层;對(duì)應(yīng)地漩绵,有一個(gè)freeArray()
方法,釋放一個(gè)數(shù)組梢为,代碼如下:渐行、
void freeArray(byte[] array) {
// NOOP
}
由于堆內(nèi)的字節(jié)數(shù)組會(huì)被GC自動(dòng)回收,所以不需要具體實(shí)現(xiàn)代碼铸董。此外祟印,在引用計(jì)數(shù)的分析中,當(dāng)引用計(jì)數(shù)釋放的時(shí)候需要調(diào)用deallocate()
方法釋放該ByteBuf粟害,實(shí)現(xiàn)如下:
protected void deallocate() {
freeArray(array);
array = null;
}
同理蕴忆,使用GC自動(dòng)回收,而設(shè)置array=null
可以幫助GC回收悲幅。
ByteBuf中有關(guān)于判斷底層實(shí)現(xiàn)的方法套鹅,具體實(shí)現(xiàn)也很簡(jiǎn)單:
// 默認(rèn)的字節(jié)序:大端模式
public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
// 底層是否有JAVA堆字節(jié)數(shù)組
public boolean hasArray() { return true; }
// 底層數(shù)組的偏移量
public int arrayOffset() { return 0; }
// 是否直接數(shù)組
public boolean isDirect() { return false; }
// 是否含有os底層的數(shù)組起始地址
public boolean hasMemoryAddress() { return false; }
接下來(lái),看重要的設(shè)置容量方法capacity(int newCapacity)
:
public ByteBuf capacity(int newCapacity) {
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) { // 容量擴(kuò)增
byte[] newArray = allocateArray(newCapacity); // 申請(qǐng)數(shù)組
// 將老數(shù)組的字節(jié)復(fù)制到新數(shù)組
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) { // 容量縮減
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
// 容量縮減導(dǎo)致讀寫索引改變
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
// 只拷貝讀索引之后的數(shù)據(jù)汰具,讀索引之前0填充
System.arraycopy(oldArray, readerIndex,
newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
// 容量相等時(shí)不做處理
return this;
}
設(shè)置容量分為兩種情況:容量擴(kuò)增和容量縮減卓鹿。實(shí)現(xiàn)都是將老數(shù)據(jù)復(fù)制到新的字節(jié)數(shù)組中,有必要的話留荔,調(diào)整讀寫索引位置吟孙。
之前分析過(guò)getXXX()
和readXXX()
的核心實(shí)現(xiàn)是_getXXX(index)
方法,以_getInt(index)
為例進(jìn)行分析聚蝶,代碼如下:
protected int _getInt(int index) {
return HeapByteBufUtil.getInt(array, index);
}
static int getInt(byte[] memory, int index) {
return (memory[index] & 0xff) << 24 |
(memory[index + 1] & 0xff) << 16 |
(memory[index + 2] & 0xff) << 8 |
memory[index + 3] & 0xff;
}
將字節(jié)數(shù)組中指定索引位置處的4個(gè)字節(jié)按照大端模式通過(guò)移位組裝為一個(gè)整數(shù)杰妓。同理,可推斷_setInt(index)
方法將一個(gè)整數(shù)的4個(gè)字節(jié)通過(guò)移位填充到字節(jié)數(shù)組的指定位置碘勉,確實(shí)如此巷挥,核心實(shí)現(xiàn)如下:
static void setInt(byte[] memory, int index, int value) {
memory[index] = (byte) (value >>> 24);
memory[index + 1] = (byte) (value >>> 16);
memory[index + 2] = (byte) (value >>> 8);
memory[index + 3] = (byte) value;
}
可以派生新的ByteBuf的方法中,slice()
和duplicate()
共享底層實(shí)現(xiàn)验靡,在本類中倍宾,就是共享array
變量雏节,但各自維護(hù)獨(dú)立索引,而copy()
方法有自己獨(dú)立的底層字節(jié)數(shù)組高职,通過(guò)將數(shù)據(jù)復(fù)制到一個(gè)新的字節(jié)數(shù)組實(shí)現(xiàn)矾屯,代碼如下:
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
byte[] copiedArray = new byte[length];
System.arraycopy(array, index, copiedArray, 0, length);
return new UnpooledHeapByteBuf(alloc(), copiedArray, maxCapacity());
}
雖然JDK自帶的ByteBuffer
有各種缺憾,但在進(jìn)行IO時(shí)初厚,不得不使用原生的ByteBuffer
件蚕,所以Netty的ByteBuf
也提供方法轉(zhuǎn)化,實(shí)現(xiàn)如下:
public ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
return (ByteBuffer) internalNioBuffer().clear()
.position(index).limit(index + length);
}
private ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = ByteBuffer.wrap(array);
}
return tmpNioBuf;
}
方法將該類轉(zhuǎn)化為JDK的HeapByteBuffer
产禾,可見也是一個(gè)堆緩沖區(qū)排作。clear().position(index).limit(index + length)
的使用是防止原生ByteBuffer
的讀寫模式切換造成的錯(cuò)誤。
至此亚情,UnpooledHeapByteBuf
的實(shí)現(xiàn)分析完畢妄痪,可見并沒有想象中的困難,再接再厲楞件,分析UnpooledDirectByteBuf
衫生。
2. UnpooledDirectByteBuf
Netty的UnpooledDirectByteBuf
在NIO的DirectByteBuf
上采用組合的方式進(jìn)行了封裝,屏蔽了對(duì)程序員不友好的地方土浸,并使其符合Netty的ByteBuf
體系罪针。使用與UnpooledHeapByteBuf
相同的順序進(jìn)行分析,首先看成員變量:
private final ByteBufAllocator alloc; // 分配器
private ByteBuffer buffer; // 底層NIO直接ByteBuffer
private ByteBuffer tmpNioBuf; // 用于IO操作的ByteBuffer
private int capacity; // ByteBuf的容量
private boolean doNotFree; // 釋放標(biāo)記
做一個(gè)簡(jiǎn)介黄伊,buffer
表示底層的直接ByteBuffer泪酱;tmpNioBuf
常用來(lái)進(jìn)行IO操作,實(shí)現(xiàn)實(shí)質(zhì)是buffer.duplicate()
即與buffer
共享底層數(shù)據(jù)結(jié)構(gòu)还最;capacity
表示緩沖區(qū)容量墓阀,即字節(jié)數(shù);doNotFree
是一個(gè)標(biāo)記拓轻,表示是否需要釋放buffer
的底層內(nèi)存斯撮。
接著分析構(gòu)造方法:
protected UnpooledDirectByteBuf(ByteBufAllocator alloc,
int initialCapacity, int maxCapacity) {
super(maxCapacity);
this.alloc = alloc;
setByteBuffer(allocateDirect(initialCapacity));
}
protected ByteBuffer allocateDirect(int initialCapacity) {
return ByteBuffer.allocateDirect(initialCapacity);
}
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
由于setByteBuffer(buffer)
中含有doNotFree
變量使得理解稍微困難.仔細(xì)分析,當(dāng)doNotFree
為true時(shí)扶叉,調(diào)用后置為false勿锅,而為false時(shí)都需要freeDirect(oldBuffer)
。由此可知辜梳,doNotFree
表示不需要釋放舊的Buffer粱甫,根據(jù)代碼大全泳叠,使用反義Not并不是好的做法作瞄,使用free
表示是否需要釋放舊的Buffer會(huì)更容易讓人理解。另外從代碼可以看出:不需要釋放舊的Buffer只有一種情況危纫,這種情況便是Buffer作為構(gòu)造方法的參數(shù)時(shí)宗挥,代碼如下:
protected UnpooledDirectByteBuf(ByteBufAllocator alloc,
ByteBuffer initialBuffer, int maxCapacity) {
super(maxCapacity);
int initialCapacity = initialBuffer.remaining();
this.alloc = alloc;
doNotFree = true; // 置為true 表示不需要釋放原有buffer
setByteBuffer(initialBuffer.slice().order(ByteOrder.BIG_ENDIAN));
// 此時(shí) doNotFree已經(jīng)為false
writerIndex(initialCapacity);
}
分析完乌庶,發(fā)現(xiàn)doNotFree
是一個(gè)不必要的變量,除非在執(zhí)行構(gòu)造方法的時(shí)候契耿,oldBuffer不為null瞒大。(目前沒想到有什么情況如此)
使用allocateDirect(initialCapacity)
分配內(nèi)存時(shí)實(shí)際委托給NIO的方法,釋放內(nèi)存freeDirect(buffer)
也如此搪桂,委托給了NIO中DirectByteBuffer的cleaner透敌,代碼如下:
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
public void freeDirectBuffer(ByteBuffer buffer) {
if (!buffer.isDirect()) {
return;
}
try {
Object cleaner = PlatformDependent0.getObject(buffer, CLEANER_FIELD_OFFSET);
if (cleaner != null) {
CLEAN_METHOD.invoke(cleaner);
}
} catch (Throwable cause) {
PlatformDependent0.throwException(cause);
}
}
實(shí)際代碼根據(jù)JDK版本不同調(diào)用不同方法,上述只是其中之一踢械,但原理相同酗电,不再列出。
與引用計(jì)數(shù)相關(guān)的deallocate()
方法内列,代碼實(shí)現(xiàn)如下:
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
}
this.buffer = null;
if (!doNotFree) {
freeDirect(buffer); // 前述分析可知撵术,doNotFree構(gòu)造方法之后一直為false
}
}
判斷底層實(shí)現(xiàn)的方法則如下:
// 默認(rèn)的字節(jié)序:大端模式
public ByteOrder order() { return ByteOrder.BIG_ENDIAN; }
// 是否直接數(shù)組
public boolean isDirect() { return true; }
// 底層是否有JAVA堆字節(jié)數(shù)組
public boolean hasArray() { throw new UnsupportedOperationException("..."); }
// 底層數(shù)組的偏移量
public int arrayOffset() { throw new UnsupportedOperationException("..."); }
// 是否含有os底層的數(shù)組起始地址
public boolean hasMemoryAddress() { return false; }
設(shè)置容量的方法:
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int readerIndex = readerIndex();
int writerIndex = writerIndex();
int oldCapacity = capacity;
if (newCapacity > oldCapacity) { // 容量擴(kuò)增
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
oldBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.put(oldBuffer);
newBuffer.clear();
setByteBuffer(newBuffer);
} else if (newCapacity < oldCapacity) { // 容量縮減
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = allocateDirect(newCapacity);
if (readerIndex < newCapacity) {
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
oldBuffer.position(readerIndex).limit(writerIndex);
newBuffer.position(readerIndex).limit(writerIndex);
newBuffer.put(oldBuffer);
newBuffer.clear();
} else {
setIndex(newCapacity, newCapacity);
}
setByteBuffer(newBuffer);
}
return this;
}
與HeapByteBuf類似,容量改變時(shí)话瞧,都將oldBuffer中的數(shù)據(jù)復(fù)制到新的newBuffer中嫩与,只是在容量縮減時(shí),需要調(diào)整讀寫索引交排。
接著看關(guān)鍵的_getInt(index)
和_setInt(index,value)
方法:
protected int _getInt(int index) {
return buffer.getInt(index);
}
protected void _setInt(int index, int value) {
buffer.putInt(index, value);
}
可見具體實(shí)現(xiàn)委托給了NIO原生的ByteBuffer划滋,追蹤其中的具體實(shí)現(xiàn),一種情況下的實(shí)現(xiàn)如下:
static int getIntB(long a) {
return makeInt(_get(a ),
_get(a + 1),
_get(a + 2),
_get(a + 3));
}
static private int makeInt(byte b3, byte b2, byte b1, byte b0) {
return (((b3 ) << 24) |
((b2 & 0xff) << 16) |
((b1 & 0xff) << 8) |
((b0 & 0xff) ));
}
可見與Netty的HeapByteBuf
實(shí)現(xiàn)一致埃篓。另一種情況是native實(shí)現(xiàn)古毛,沒有找到具體實(shí)現(xiàn)代碼,如果你有興趣可以尋找相關(guān)實(shí)現(xiàn)都许,有相關(guān)發(fā)現(xiàn)請(qǐng)告訴我稻薇。
繼續(xù)看copy()
方法:
public ByteBuf copy(int index, int length) {
ensureAccessible();
ByteBuffer src;
try {
src = (ByteBuffer) buffer.duplicate()
.clear().position(index).limit(index + length);
} catch (IllegalArgumentException ignored) {
throw new IndexOutOfBoundsException(
"Too many bytes to read - Need " + (index + length));
}
return alloc().directBuffer(length, maxCapacity()).writeBytes(src);
}
對(duì)原buffer使用duplicate()
方法,從而不干擾原來(lái)buffer的索引胶征。然后從分配器中申請(qǐng)一個(gè)buffer并寫入原buffer的數(shù)據(jù)塞椎。
最后看internalNioBuffer()
:
public ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
return (ByteBuffer) internalNioBuffer()
.clear().position(index).limit(index + length);
}
private ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
}
return tmpNioBuf;
}
可見,與copy()
相同睛低,使用duplicate()
防止干擾原buffer的索引案狠。
至此,UnpooledDirectByteBuf
的源碼分析完畢钱雷。
3. UnsafeByteBuf
Netty還使用JAVA的后門類sun.misc.Unsafe
實(shí)現(xiàn)了兩個(gè)緩沖區(qū)UnpooledUnsafeHeapByteBuf
和UnpooledUnsafeDirectByteBuf
骂铁。這個(gè)強(qiáng)大的后門類Unsafe
可以暴露出對(duì)象的底層地址,一般不建議使用罩抗,而性能優(yōu)化狂魔Netty則顧不得這些拉庵。簡(jiǎn)單介紹一下這兩個(gè)類的原理,不再對(duì)代碼進(jìn)行分析套蒂。UnpooledUnsafeHeapByteBuf
在使用Unsafe
后钞支,暴露出字節(jié)數(shù)組在JAVA堆中的地址茫蛹,所以不再使用字節(jié)數(shù)組的索引即array[index]訪問(wèn),轉(zhuǎn)而使用baseAddress + Index的得到字節(jié)的地址烁挟,然后從該地址取得字節(jié)婴洼。UnpooledUnsafeDirectByteBuf
也一樣,暴露底層DirectByteBuffer的地址后撼嗓,使用相同的Address + Index方式取得對(duì)應(yīng)字節(jié)柬采。