背景
Java自帶的Nio ByteBuffer具有局限性和操作的復(fù)雜性汁掠,主要缺點(diǎn)如下:
1、ByteBuffer長度固定集币,一旦分配成功長度不能動態(tài)擴(kuò)展和縮容考阱,很容易發(fā)生越界異常。
2鞠苟、ByteBuffer只有一個標(biāo)識位置的指針乞榨,讀寫切換時需要手工調(diào)用flip方法
為了彌補(bǔ)這些不足,Netty作者重新造輪子当娱,提供了自己實(shí)現(xiàn)的ByteBuf吃既。
ByteBuf原理
ByteBuf也是通過字節(jié)數(shù)組byte[]作為緩沖區(qū)來存取數(shù)據(jù),通過門面模式聚合了JDK NIO元素的ByteBuffer跨细,進(jìn)行封裝.
與ByteBuffer不同的是鹦倚,ByteBuf提供了兩個指針來協(xié)助緩沖區(qū)的讀寫操作,讀操作readerIndex冀惭,寫操作使用writeIndex.
剛開始readerIndex和writeIndex都為0震叙,隨著數(shù)據(jù)的寫入writeIndex會增加,讀取數(shù)據(jù)時readerIndex會增加散休,但是它不會超過writeIndex的大小媒楼,在讀取之后,0~readerIndex的就視作為discard的溃槐,調(diào)用discardBytes方法可以釋放這部分空間匣砖。下圖表示了ByteBuffer這三部分的關(guān)系:
基本功能
讀操作
byte readByte() 從當(dāng)前讀索引讀取一個字節(jié)并且讀索引readerIndex加1,如果可讀字節(jié)數(shù)小于1則拋出異常
boolean readBoolean() 從當(dāng)前讀索引讀取一個布爾字節(jié)并且讀索引readerIndex加1,如果可讀字節(jié)數(shù)小于1則拋出異常
ByteBuf readBytes(int length) 從當(dāng)前讀索引readerIndex取指定長度字節(jié)數(shù)到另外有一個Buf容器并且讀索引readerIndex加length猴鲫,返回的buf讀索引為0对人,寫索引為length
ByteBuf readSlice(int length) 從當(dāng)前讀索引readerIndex取指定長度字節(jié)數(shù)到另外有一個Buf容器并且讀索引readerIndex加length,返回的buf讀索引為0拂共,寫索引為length
寫操作
ByteBuf writeByte(int value) 從當(dāng)前寫索引writeIndex寫入字節(jié)并且writeIndex增加
ByteBuf writeBytes(ByteBuf src) 將src拷貝到當(dāng)前buf并且writeIndex增加
ByteBuf writeBytes(ByteBuf src, int length)將src拷貝length個字節(jié)到當(dāng)前buf并且writeIndex增加
ByteBuf writeBytes(ByteBuf src, int srcIndex, int length)將src從srcindex開始拷貝length個字節(jié)到當(dāng)前buf并且writeIndex增加
還有很多這里不再列出牺弄,大家可以去看API
丟棄字節(jié)(Discardable byte)
discardReadBytes會將readerindex之后的數(shù)據(jù)移動到從0開始,寫索引減少readerindex個宜狐,也就是會丟棄已讀的字節(jié)势告, 增加可寫的字節(jié)空間
丟棄之前
丟棄之后
clear操作
clear操作不會清空緩沖區(qū),而只是將writeindex和readindex置為0
clear操作前
clear操作后
源碼分析
繼承關(guān)系圖:
AbstractByteBuf繼承自ByteBuf,ByteBuf的公共屬性和公共功能都在AbstractByteBuf上實(shí)現(xiàn)抚恒,比如讀咱台、寫,清除等基本功能
AbstractReferenceCountedByteBuf繼承自AbstractByteBuf俭驮,在AbstractByteBuf的基礎(chǔ)上提供了一下功能:
refCnt:獲得該對象的引用計(jì)數(shù)回溺;
retain:增加該對象的引用計(jì)數(shù)(無參數(shù):+1;有參數(shù):+指定的increment)
release:減少該對象的引用計(jì)數(shù)(無參數(shù):-1混萝;有參數(shù):-指定的increment)遗遵,當(dāng)引用計(jì)數(shù)減少到0時,釋放該對象逸嘀。返回值為true车要,當(dāng)且僅當(dāng)引用計(jì)數(shù)變?yōu)?和該對象已釋放。
讀操作源碼分析
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
首先校驗(yàn)緩沖區(qū)的可用空間
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
第一步判斷如果參數(shù)小于0直接拋出異常崭倘,第二部如果寫的字節(jié)數(shù)小于需要讀取的長度也拋出異常
校驗(yàn)通過后調(diào)用geyBytes方法從buffer的readerindex位置讀取length個字節(jié)到目標(biāo)緩沖區(qū)翼岁,其中目標(biāo)緩沖區(qū)從dstindex開始存儲數(shù)據(jù),由于子類的復(fù)制操作的技術(shù)實(shí)現(xiàn)不一樣司光,所以具體實(shí)現(xiàn)由子類實(shí)現(xiàn)
/**
* Transfers this buffer's data to the specified destination starting at
* the specified absolute {@code index}.
* This method does not modify {@code readerIndex} or {@code writerIndex}
* of this buffer.
*
* @param dstIndex the first index of the destination
* @param length the number of bytes to transfer
*
* @throws IndexOutOfBoundsException
* if the specified {@code index} is less than {@code 0},
* if the specified {@code dstIndex} is less than {@code 0},
* if {@code index + length} is greater than
* {@code this.capacity}, or
* if {@code dstIndex + length} is greater than
* {@code dst.length}
*/
public abstract ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length);
寫操作源碼分析
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
第一步判斷是否可寫
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
第一步判斷參數(shù)合法性登澜,第二步判斷如果可寫的字節(jié)數(shù)大于需要寫入的字節(jié)數(shù)則直接通過,如果需要寫入的字節(jié)數(shù)大于最大可擴(kuò)容所容納的字節(jié)數(shù)則直接拋異常
否則進(jìn)行自動擴(kuò)容飘庄,這就是bytebuf對比bytebuffer的好處。
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
首先設(shè)置閥值為4M购撼,如果需要的新容量正好等于閥值則使用閥值作為緩沖區(qū)容量跪削,如果大于新容量大于閥值則每次以閥值為倍數(shù)進(jìn)行擴(kuò)容,擴(kuò)張后的內(nèi)存需要跟maxCapacity進(jìn)行比較迂求,不能大于最大的maxCapacity碾盐,如果新容量小于閥值,則每次以64字節(jié)為步長進(jìn)行擴(kuò)容
重新計(jì)算完新的需要擴(kuò)容容量后需要重新創(chuàng)建緩沖區(qū)揩局,將原緩沖區(qū)內(nèi)容復(fù)制到新創(chuàng)建的ByteBuf中毫玖,由于不同子類會有不同的復(fù)制方法,所以該方法也是抽象方法
/**
* Adjusts the capacity of this buffer. If the {@code newCapacity} is less than the current
* capacity, the content of this buffer is truncated. If the {@code newCapacity} is greater
* than the current capacity, the buffer is appended with unspecified data whose length is
* {@code (newCapacity - currentCapacity)}.
*/
public abstract ByteBuf capacity(int newCapacity);
UnpooledHeapByteBuf源碼分析
UnpooledHeapByteBuf基于堆內(nèi)存進(jìn)行內(nèi)存分配的緩沖區(qū)。沒有基于對象池技術(shù)實(shí)現(xiàn)付枫,也就意味著每次IO操作都要創(chuàng)建一個新的UnpooledHeapByteBuf烹玉,頻繁的進(jìn)行大款的堆內(nèi)存分配和回收對性能有一定影響,并且可能會有Full GC風(fēng)險阐滩。
成員變量
private final ByteBufAllocator alloc;
private byte[] array;
private ByteBuffer tmpNioBuf;
ByteBufAllocator用于內(nèi)存的分配二打,byte[]作為字節(jié)緩沖區(qū),tmpNioBuf用于Netty ByteBuf到JDK ByteBuffer的轉(zhuǎn)換用
動態(tài)擴(kuò)展緩沖區(qū)實(shí)現(xiàn):
上面提到ByteBuf寫操作時會有自動擴(kuò)容原理掂榔,而具體是由子類實(shí)現(xiàn)继效,那么UnpooledHeapByteBuf的源碼如下:
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
首先對入?yún)⑿碌娜萘繀?shù)newCapacity進(jìn)行合法校驗(yàn),校驗(yàn)通過后判斷如果新的容量大于原來的緩沖區(qū)容量装获,則進(jìn)行動態(tài)擴(kuò)容瑞信,通過byte[] newArray = new byte[newCapacity]創(chuàng)建新的緩沖區(qū),Syste.arraycopy將原來緩沖區(qū)的數(shù)據(jù)復(fù)制到新的緩沖區(qū)穴豫,再通過setArray將緩沖區(qū)替換就的緩沖區(qū)凡简。
如果新的容量小于原來緩沖區(qū)容量,則首先判斷讀索引是否小于新的容量newCapacity绩郎,如果小于在判斷寫索引是否大于newCapacity,如果讀索引小于newCapacity且寫索引大于newCapacity潘鲫,也就意味著原來的字節(jié)緩沖區(qū)還有(newCapacity-readerIndex)個字節(jié)沒有被讀取,那么需要將這些數(shù)據(jù)拷貝到新的緩沖區(qū)肋杖。如果讀索引大于新的容量newCapacity溉仑,那意味著原來緩沖區(qū)所有字節(jié)都被讀取了,那么直接將byte[] newArray = new byte[newCapacity]更新為新的字節(jié)緩沖區(qū)状植。