Netty中有一個主要的特點兴喂,就是ByteBuf的實現(xiàn), 由于NIO中java.nio.ByteBuf
中只有position
字段來記錄讀寫時的位置仔引,讀寫切換時需要調(diào)用flip()
方法芝薇。Netty自己實現(xiàn)一個ByteBuf
來的實體, 維護兩個獨立的index的表示讀寫的位置:readerIndex 與 wirterIndex
嫌佑。ByteBuf又分為HeapByteBuf 與 DirectByteBuf
來表示 ‘堆內(nèi)內(nèi)存' 與 '堆外內(nèi)存', 由于Netty是基于針對網(wǎng)絡(luò)傳輸豆茫,所以分配內(nèi)存侨歉,釋放內(nèi)存,是一個很頻繁的事件揩魂,Netty將ByteBuf的進行了池化幽邓,由PooledByteBufAllocator
來分配ButeBuf, 由于Netty對ByteBuf進行了池化, 就需要對內(nèi)存的分配與釋放進行一個更有效的管理火脉,如:分配內(nèi)存空間為一個連續(xù)的空間牵舵,怎么讓內(nèi)存分配更快, 管理內(nèi)存碎片 ... .Netty引入了 'Slab算法' 跟 'Bubby算法' 來進行對內(nèi)存分配進行管理倦挂。
Slab算法:
引用Memcache中內(nèi)存的分配方式相同畸颅, 將內(nèi)存通過大小分為若干類方援,通過需要的內(nèi)存大寫没炒,找第一個大于等于的類分配。Netty 中將內(nèi)存的分配分為以下幾類(堆外堆內(nèi)相同犯戏,這里以堆外為例):
-
MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches
:normCapacity < 512的內(nèi)存分配 -
MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches
:normCapacity < pageSize 的內(nèi)存分配 (PageSize 默認為 8KB) -
MemoryRegionCache<ByteBuffer>[] normalDirectCaches
:normCapacity <= chunkSize 的內(nèi)存分配(chunkSize 默認值為 8KB * 2^11 = 16MB, 其直的計算方式會根據(jù)下面的Bubby算法介紹) -
allocateHuge(buf, reqCapacity)
: normCapacity > chunkSize 的內(nèi)存分配
Bubby算法:
由于Netty的分配是由PoolChunk
進行分配送火,PoolChunk
是一個非常大的內(nèi)存塊(默認值為8KB * 2^11 = 16MB),這個內(nèi)存塊有一個完全平衡的二叉樹來描述先匪。
1:由一個byte[] memoryMap
來對來對內(nèi)存頁進行管理种吸,父節(jié)點直接管理子節(jié)點, 這里maxOrder的默認值為11呀非, 而第一個Page(pageSize = 8KB), 由memoryMap[2^11=2048]
直接管理....坚俗。這里節(jié)點是否分配油value=memoryMap[index]
的值來表示,如果①:value=當前層級岸裙,表示未分配坦冠,②:unusable = (byte) (maxOrder + 1)
(unusable = 12)表示節(jié)點,及節(jié)點的子節(jié)點數(shù)據(jù)都已被分配哥桥, ③: 當前層級<value<unusable
,表示當前節(jié)點已被分配辙浑,而節(jié)點的字節(jié)點有內(nèi)存未被分配。
2:如果要分配8KB的內(nèi)存拟糕,則通過開始 memoryMap[2^11=2048]——> memoryMap[2^12-1=4095]
順序查找未分配的Page判呕。
3:需要分配大于PageSize的內(nèi)存倦踢, 如16KB, 由于層級d=11每個節(jié)點管理一個pageSize=8KB的內(nèi)存塊,層級d=10管理兩個d=11的節(jié)點侠草,只需要分配一個層級為d=10的節(jié)點就可以分配一個16KB的內(nèi)存, 則通過 memoryMap[2^10=1024]——> memoryMap[2^10-1=2047]
順序查找未分配的節(jié)點
4:小于PageSize的內(nèi)存分配辱挥,直接通過對一個Page的內(nèi)存分割成tinySubPageDirectCaches
與smallSubPageDirectCaches
進行內(nèi)存的分配
補充:
1:Netty中還為了分配大內(nèi)存的分配的成功率,將PoolChunk進行一個使用率的分類管理边涕,使用率高的晤碘,自然未使用的內(nèi)存數(shù)量高,分配大內(nèi)存時成功率高功蜓。Netty中維護了一個PoolArena
的使用率鏈表园爷,每次分配的內(nèi)存的時候PoolChunkList.allocate(..)
,會比較當前分配的PoolChunk.usage()
與PoolChunkList.maxUsage
比較式撼,如果大于maxUsage
,則會將此PoolChunk
從當前鏈表中刪除童社,添加到下一個PoolChunkList中。每次釋放內(nèi)存的時候PoolChunkList.free(...)
著隆,會比較當前的PoolChunk.usage()
與PoolChunkList.minUsage
比較扰楼,如果小于minUsage
,則會將此PoolChunk
從當前鏈表中刪除,添加到上一個PoolChunkList中美浦。注意,不管是分配還是釋放PoolChunkList的添加比較都是遞歸比較浦辨,直到找到適合自己的PoolChunkList
2:Netty中的分配為了提交并發(fā),將會通過一個PoolArena<ByteBuffer>[]
去減少并發(fā)度荤牍,每一個線程會從PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache>
中去找到當前的PoolArena, PoolArena與線程的綁定初始化,通過最少使用算法(PoolThreadLocalCache.leastUsedArena(PoolArena<T>[] arenas)
)查找PoolArena康吵。
接下來具體描述Netty中代碼的實現(xiàn):
分配內(nèi)存的方式從PooledByteBufAllocator類開始:
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
...
private final PoolThreadLocalCache threadCache;
...
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
//初始化ThreadLocal
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
...
}
...
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//先從ThreadLocal中查找分配的PoolArena
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
}
我們先看一下 PoolThreadLocalCache類的定義:
//繼承FastThreadLocal
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
//查找最少使用的PoolArena來作為此次分配的PoolArena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
if (useCacheForAllThreads || Thread.currentThread() instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching for non FastThreadLocalThreads.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
接下來查看directArena.allocate方法看一下是怎么分配的:
abstract class PoolArena<T> implements PoolArenaMetric {
...
//Tiny類大小頁數(shù)組
private final PoolSubpage<T>[] tinySubpagePools;
//Small類大小頁數(shù)組
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
...
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
...
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
//numTinySubpagePools = 512 >>> 4;
//設(shè)置Tiny類Page頁面值為2^4=16, 所以一個512大小內(nèi)存塊可以分配512 >>> 4 = 32個Page
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
//設(shè)置Small類的Page頁面值為512晦嵌, 即2^9, 所以一個PageSize大小的內(nèi)存塊刻一個分配pageShifts - 9個Page
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
...
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
...
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//從RECYCLER中獲取
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
/**
對分配內(nèi)存大小分為 4 類:
Tiny :(0, 512)
Small: [512, PageSize)
Noraml: [PageSize, ChunkSize]
Huge: > ChunkSize
*/
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//獲取正好大于reqCapacity的2的冪的數(shù)值
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
//首次分配緩存queue為空同辣, 分配失敗,返回false
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//查找需要分配normCapacity大小的內(nèi)存在tinySubpagePools中第一個索引值
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//查找需要分配normCapacity大小的內(nèi)存在smallSubpagePools中第一個索引值
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
//將分配normCapacity在相應(yīng)數(shù)組池第一個索引值對應(yīng)的PoolSubpage作為head
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
//PoolSubpage 是通過 newSubpagePoolHead 方法創(chuàng)建
//newSubpagePoolHead方法中head.prev = head;
//初始分配是 s != head旱函, 返回false
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
//對Normal 類的內(nèi)存進行分配
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
// Method must be called inside synchronized(this) { ... } block
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
//根據(jù)返回的handle(分配Page的位置(對與<PageSize的分配)棒妨, 分配的Pages的管理節(jié)點來正式分配內(nèi)存)
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
...
}
通過上面的代碼含长,最終分配的內(nèi)存方法是通過PoolChunk中的allocate(int normCapacity)方法進行內(nèi)存的分配伏穆,查看一下分配邏輯:
final class PoolChunk<T> implements PoolChunkMetric {
...
long allocate(int normCapacity) {
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
}
}
/**
* 為 normCapacity 分配 pages , pages的數(shù)量>=1
*
* @param normCapacity normalized capacity
* @return index in memoryMap
*/
private long allocateRun(int normCapacity) {
int d = maxOrder - (log2(normCapacity) - pageShifts);
int id = allocateNode(d);
if (id < 0) {
return id;
}
freeBytes -= runLength(id);
return id;
}
/**
* 在memoryMap中分配一個depth d 的管理內(nèi)存index 節(jié)點
*
* @param d depth
* @return index in memoryMap
*/
private int allocateNode(int d) {
int id = 1;
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // idx = 1 的節(jié)點子節(jié)點被分配枕扫,剩余內(nèi)存不夠辱魁,不能分配
return -1;
}
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
setValue(id, unusable); // 設(shè)置當前節(jié)點已被分配
updateParentsAlloc(id);//同時更新父節(jié)點的分配情況
return id;
}
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
//由于subpages的大小 <PageSize, 只需要一個管理一個PageSize的節(jié)點就可以分配
//直接查找深度為maxOrder的管理節(jié)點進行分配
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d);
if (id < 0) {//表示分配失敗
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
private int runOffset(int id) {
// represents the 0-based offset in #bytes from start of the byte-array chunk
int shift = id ^ 1 << depth(id);
return shift * runLength(id);
}
...
}
對于>= pageSize內(nèi)存染簇, PoolChunk可以直接分配, 對于小于pageSize的內(nèi)存卵洗, 先從PoolChunk中分配一個 pageSize的節(jié)點弥咪, 然后交給PoolSubpage進行分配:
final class PoolSubpage<T> implements PoolSubpageMetric {
...
/** Special constructor that creates a linked list head */
PoolSubpage(int pageSize) {
chunk = null;
memoryMapIdx = -1;
runOffset = -1;
elemSize = -1;
this.pageSize = pageSize;
bitmap = null;
}
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
//一個long的長度為64個字節(jié)
//分配的大小最小為16十绑,一個Page可以劃分為PageSize/16 個
//PageSize/16/64 則可以表示所有的內(nèi)存段的情況
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
//bitmapLength的長度為maxNumElems/64 或者長度超過有余, 則+1
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
addToPool(head);
}
/**
* Returns the bitmap index of the subpage allocation.
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
//查找可以使用下一個可以使用內(nèi)存段的index
final int bitmapIdx = getNextAvail();
//根據(jù)內(nèi)存段的index扳躬, 查找在bitmap中描述的idx
int q = bitmapIdx >>> 6;
//根據(jù)內(nèi)存段的index,并long數(shù)據(jù)中描述位置
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
//對bitmap描述的內(nèi)存段的使用情況進行更新操作
bitmap[q] |= 1L << r;
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
//將bitmapIdx 放在高位甚亭, memoryMapIdx放在地位贷币, 拼成一個long型的數(shù)據(jù)返回
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
...
}
而對于內(nèi)存的釋放,最終會調(diào)用PooledByteBuf.deallocate()方法進行釋放亏狰。
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}
}
private void recycle() {
recyclerHandle.recycle(this);
}
看一下 chunk.arena.free(chunk, handle, maxLength, cache)方法
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass);
}
}
可以看到調(diào)用cache.add方法,最紅會把釋放的內(nèi)存放入Queue<Entry<T>> queue隊列中暇唾,供下次分配時,直接分配