??Netty里的ByteBuf主要用于發(fā)送或接收消息蚁廓。在JDK里有相似功能的類java.nio.ByteBuffer。由于JDK在設(shè)計(jì)ByteBuffer API的時(shí)候?qū)τ脩舨惶押檬夷遥饕憩F(xiàn)在1:寫讀切換的時(shí)候需要調(diào)用flip方法皆愉。2:初使化的時(shí)候長度便固定了,沒有提供自動擴(kuò)容的功能。而Netty在設(shè)計(jì)ByteBuf的時(shí)候考慮到API在使用上的便利精居,對上面提到的兩個(gè)問題很好的進(jìn)行了規(guī)避搂鲫。
java.nio.ByteBuffer源碼解讀
??先來了解一下jdk自帶的ByteBuffer是如何實(shí)現(xiàn)的傍药,有利于我們看Netty里ByteBuf的源碼。在jdk里對基本的數(shù)據(jù)類型都提供了相應(yīng)的Buffer進(jìn)行數(shù)據(jù)的讀然耆浴(除了boolean)拐辽。下圖很好的看出除boolean外的Buffer繼承關(guān)系:
首先來看一下Buffer類的源碼,這里面實(shí)現(xiàn)了一些公共的方法擦酌,比如剛剛提到的flip()方法俱诸。在Buffer里維護(hù)了四個(gè)屬性,分別為mark, position, limit, capacity;他們之前的關(guān)系 是
mark<=position<=limit<=capacity;其中mark屬性用于執(zhí)行些與mark相關(guān)的操作赊舶,主要用于標(biāo)識位置來實(shí)現(xiàn)重復(fù)讀取功能睁搭。部分源碼如下:
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark;
}
}
/**
這個(gè)就是flip方法的實(shí)現(xiàn)。Buffer類實(shí)現(xiàn)讀寫用同一個(gè)Buffer的核心方法就是需要調(diào)用這個(gè)方法笼平,其實(shí)也就是將讀寫的標(biāo)識位進(jìn)行換一下园骆。
**/
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
//用于移動position操作
final int nextGetIndex() { // package-private
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
}
ByteBuffer通過繼承Buffer類有了一些公共的方法,從內(nèi)存分配的位置來分類可以分為:
1)堆內(nèi)存(HeapByteBufer)寓调,特點(diǎn):分配與回收比較快锌唾,在socket傳輸?shù)倪^程中多了一次內(nèi)存復(fù)制。
2)直接內(nèi)存(DirectByteBufer)夺英,特點(diǎn):分配與回收相對比較慢晌涕,但在socket數(shù)據(jù)傳輸中少了內(nèi)存復(fù)制。
所以在ByteBuffer里也只提供了一些通用的公共方法痛悯,具體的存儲還是留給子類來實(shí)現(xiàn)余黎,部分源碼如下:
public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
{
//用于存數(shù)據(jù)的數(shù)組 這里只定義了引用
final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
/**
調(diào)用這個(gè)方法可以將src里的數(shù)據(jù)往hb里放
**/
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
/**
這個(gè)方法會將src里的數(shù)據(jù)從offset到length的數(shù)據(jù)放入到hb里
**/
public ByteBuffer put(byte[] src, int offset, int length) {
checkBounds(offset, length, src.length);
if (length > remaining())
throw new BufferOverflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}
//具體怎么放的留給子類來實(shí)現(xiàn)
public abstract ByteBuffer put(byte b);
//將ByteBuffer里的數(shù)據(jù)往dst里填充
public ByteBuffer get(byte[] dst) {
return get(dst, 0, dst.length);
}
//具體的填充調(diào)用這個(gè)方法,需要傳入填充的開始和結(jié)束位置
public ByteBuffer get(byte[] dst, int offset, int length) {
checkBounds(offset, length, dst.length);
if (length > remaining())
throw new BufferUnderflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
dst[i] = get();
return this;
}
//留給子類來實(shí)現(xiàn)载萌,不同的內(nèi)存類型采用不同的方式
public abstract byte get();
}
下面來看看堆內(nèi)存是如何實(shí)現(xiàn)上面的put與get方法的
class HeapByteBuffer
extends ByteBuffer
{
//這個(gè)構(gòu)造方法傳入的hb是new byte[cap]
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
}
//往byte數(shù)組里放數(shù)據(jù)的方法就是這個(gè)啦惧财,nextPutIndex用于移動position位置,而ix用于根據(jù)offset找到具體的位置放數(shù)據(jù)扭仁,這里的offset是ByteBuffer內(nèi)部的offset.
public ByteBuffer put(byte x) {
hb[ix(nextPutIndex())] = x;
return this;
}
protected int ix(int i) {
return i + offset;
}
//取數(shù)據(jù)與存數(shù)據(jù)是一樣的可缚,都需要通過position確定數(shù)據(jù)的位置,這就是為什么jdk自帶的Buffer需要調(diào)用flip()方法對position位置做轉(zhuǎn)換啦斋枢。
public byte get() {
return hb[ix(nextGetIndex())];
}
}
上面的代碼分析了jdk里ByteBuffer類的實(shí)現(xiàn)原理,通過分析ByteBuffer再來看Netty里ByteBuff的源碼就更簡單了知给。
ByteBuf源碼解讀
??Netty實(shí)現(xiàn)自已的ByteBuf是基于jdk自帶的ByteBuffer有我們上面所說的兩個(gè)缺點(diǎn)瓤帚。Netty通過多加一個(gè)變量就解決了寫讀轉(zhuǎn)換城要調(diào)用flip方法的問題描姚,而通過自動擴(kuò)容解決了ByteBuffer大小固定的問題。下面我們來看看Netty是如何實(shí)現(xiàn)的戈次,首先看主要的類關(guān)系圖如下:
在這里就沒有七種基本類型的Buffer啦轩勘,但是在ByteBuf里提供了對應(yīng)的方法來實(shí)現(xiàn)寫入與讀出不同類型的數(shù)據(jù)。從這個(gè)類圖上我們也可以看出不但對內(nèi)存類型分為DirectByteBuf與HeapByteBuf怯邪。也分為Pooled與Unpooled绊寻。ByteBuf只定義了一些方法,具體的實(shí)現(xiàn)通過模板方法模式悬秉,將通用的方法在AbstractByteBuf類中實(shí)現(xiàn)澄步。下面來看一下AbstractByteBuf里的部分源碼:
public abstract class AbstractByteBuf extends ByteBuf {
//這個(gè)是讀到的位置
int readerIndex;
//這個(gè)是寫到的位置
int writerIndex;
//標(biāo)記讀位置,用于resetReaderIndex方法
private int markedReaderIndex;
//標(biāo)記寫位置和泌,用于resetWriterIndex
private int markedWriterIndex;
//當(dāng)前ByteBuf的最大內(nèi)存
private int maxCapacity;
//構(gòu)造方法村缸,只需要傳入可用的最大內(nèi)存參數(shù)
protected AbstractByteBuf(int maxCapacity) {
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity + " (expected: >= 0)");
}
this.maxCapacity = maxCapacity;
}
}
AbstractByteBuf方法分為以下以內(nèi),下面分別進(jìn)行源碼的注釋:
- 寫類型的方法武氓,比如writeInt方法梯皿,源碼如下:
public abstract class AbstractByteBuf extends ByteBuf {
//寫入一個(gè)int類型的數(shù)據(jù)
@Override
public ByteBuf writeInt(int value) {
//確定能夠?qū)懭?個(gè)字節(jié)大小的數(shù)據(jù),如果寫入不了县恕,這個(gè)方法會進(jìn)行擴(kuò)容或者拋出異常
ensureWritable0(4);
//根據(jù)writerIndex,將數(shù)據(jù)寫到到指定位置
_setInt(writerIndex, value);
//writerIndex 往后移4位
writerIndex += 4;
return this;
}
/**
*這個(gè)方法用于保證能夠?qū)懭胱钚inWritableBytes個(gè)字節(jié)的數(shù)據(jù)
**/
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
//如果當(dāng)前的容量足夠?qū)懭氲亩瑒t直接返回
if (minWritableBytes <= writableBytes()) {
return;
}
//超過設(shè)置的最大容最,那就直接拋出異常忠烛,這種情況不會進(jìn)行擴(kuò)容了
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
//在這里會進(jìn)行擴(kuò)容處理属提,并返回?cái)U(kuò)容后的大小
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
//調(diào)整newCapacity 進(jìn)行擴(kuò)容
// Adjust to the new capacity.
capacity(newCapacity);
}
}
下面來看看Netty里是如何調(diào)整需要擴(kuò)容ByteBuf的大小的。邏輯在AbstractByteBufAllocator 類里源碼如下:
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
// 計(jì)算需要擴(kuò)容的大小
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// 驗(yàn)證參數(shù)的合法性
if (minNewCapacity < 0) {
throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
}
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
//CALCULATE_THRESHOLD這里是個(gè)常量况木,4M
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
//如果需要寫入數(shù)據(jù)的Buffer已經(jīng)超過4M大小了垒拢,這時(shí)會分配4M大小的容量空間,但是不會超過最在允許的maxCapacity
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
//小于4M的話以64比特進(jìn)行翻倍擴(kuò)容
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
}
上面的注釋很清晰的描述了如何確定擴(kuò)容的大小火惊。下面來看一下確定大小后又是如何擴(kuò)容的呢求类,可以肯定不同的內(nèi)存類型有不同的擴(kuò)容方式,我還還是看一下堆內(nèi)存的擴(kuò)容方式吧,源碼如下:
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
/**
**這個(gè)方法里會根據(jù) newCapacity進(jìn)行具體的擴(kuò)容屹耐,其實(shí)也就是對原有的數(shù)據(jù)進(jìn)行復(fù)制
**/
@Override
public ByteBuf capacity(int newCapacity) {
//驗(yàn)證傳入?yún)?shù)的合法性
checkNewCapacity(newCapacity);
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) {
//這里首先分配塊byte[]大小的內(nèi)存
byte[] newArray = allocateArray(newCapacity);
//采用System.arraycopy方法進(jìn)行數(shù)據(jù)的復(fù)制
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
//將新的arry設(shè)置成原來的array對象
setArray(newArray);
//釋放老的array對象
freeArray(oldArray);
} else if (newCapacity < oldCapacity) {
//下面是縮容的邏輯啦尸疆,縮容需要考慮的更多,讀寫指向的位置需要調(diào)整
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
return this;
}
}
寫的方法我們分析到這里惶岭,從源碼分析上看寿弱,在具體應(yīng)用ByteBuf的時(shí)候最后能對ByteBuf的大小有一個(gè)預(yù)估。必竟擴(kuò)容的方法還是很耗性能的按灶。
- 讀方法症革,比如readInt方法,源碼如下:
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public int readInt() {
//保證有數(shù)據(jù)可讀鸯旁,這里有很簡單了噪矛,主要判斷一下readerIndex與writerIndex之間的差是不是比4小
checkReadableBytes0(4);
//_getInt方法肯定是由子類來實(shí)現(xiàn)的啦量蕊,傳入的是讀數(shù)據(jù)的開始位置
int v = _getInt(readerIndex);
//讀指示位往生移4位
readerIndex += 4;
//返回讀到的數(shù)據(jù)
return v;
}
//這里驗(yàn)證傳入的數(shù)據(jù),保證操作的安全
private void checkReadableBytes0(int minimumReadableBytes) {
ensureAccessible();
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
}
- 清空讀過的數(shù)據(jù)艇挨,以重復(fù)利用存儲空間残炮,通過discardReadBytes方法來實(shí)現(xiàn)源碼如下:
public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf discardReadBytes() {
ensureAccessible();
//通過readerIndex 判斷如果沒有讀過的數(shù)據(jù),則直接返回
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
//讀寫位置不一樣時(shí)的處理邏輯缩滨,將byte數(shù)據(jù)進(jìn)行移位操作势就,由不同的子類來實(shí)現(xiàn)
setBytes(0, this, readerIndex, writerIndex - readerIndex);
//重置writerIndex
writerIndex -= readerIndex;
//重置markedReaderIndex 與markedWriterIndex
adjustMarkers(readerIndex);
//readerIndex 變成0
readerIndex = 0;
} else {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
}
堆內(nèi)存的移位操作源碼如下:
public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
//驗(yàn)證傳入的參數(shù)
checkSrcIndex(index, length, srcIndex, src.capacity());
if (src.hasMemoryAddress()) {
PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, array, index, length);
} else if (src.hasArray()) {
//這里通過下面的方法,內(nèi)部還是調(diào)用System.arraycopy方法
setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
} else {
src.getBytes(srcIndex, array, index, length);
}
return this;
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.length);
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
}
ByteBuf是如何處理引用計(jì)數(shù)
??Netty里通過池技術(shù)來重復(fù)利用ByteBuf對象脉漏,而池必然涉及到回何回收對象苞冯,Netty通過對ByteBuf增加一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)對無引用對象的回收。源碼如下:
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
//這個(gè)對象就有意思了鸠删,這個(gè)對象內(nèi)部通過cas的操作保證修改的安全性抱完。
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
//增加引用計(jì)數(shù)的屬性,將這個(gè)值設(shè)為volatile保證各線程在并發(fā)訪問的時(shí)候可見性
private volatile int refCnt;
//通過retain方法將引用計(jì)數(shù)加一
@Override
public ByteBuf retain() {
return retain0(1);
}
@Override
public ByteBuf retain(int increment) {
return retain0(checkPositive(increment, "increment"));
}
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
//通過relaese方法將引用計(jì)數(shù)進(jìn)行減操作
@Override
public boolean release() {
return release0(1);
}
@Override
public boolean release(int decrement) {
return release0(checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
//refCntUpdater的getAndAdd能夠保證操作的原子性刃泡,
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, -decrement);
}
return false;
}
//這個(gè)方法用于釋放當(dāng)前ByteBuf空間啦
/**
* Called once {@link #refCnt()} is equals 0.
*/
protected abstract void deallocate();
}