上一節(jié)分析了UnpooledByteBufAllocator
胚委,包括了堆內(nèi)堆外內(nèi)存是如何分配的士飒,底層時時如何獲取數(shù)據(jù)內(nèi)容的帘营。
本節(jié)分析分析PooledByteBufAllocator
尉姨,看看它是怎么做Pooled
類型的內(nèi)存管理的。
- 入口
PooledByteBufAllocator#newHeapBuffer()
和PooledByteBufAllocator#newDirectBuffer()
拂檩,
堆內(nèi)內(nèi)存和堆外內(nèi)存分配的模式都比較固定
- 拿到線程局部緩存
PoolThreadCache
- 拿到不同類型的
rena
- 使用不同類型的
arena
進行內(nèi)存分配
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
//拿到線程局部緩存
PoolThreadCache cache = threadCache.get();
//拿到heapArena
PoolArena<byte[]> heapArena = cache.heapArena;
final ByteBuf buf;
if (heapArena != null) {
//使用heapArena分配內(nèi)存
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//拿到線程局部緩存
PoolThreadCache cache = threadCache.get();
//拿到directArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//使用directArena分配內(nèi)存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
-
跟蹤threadCache.get()
調(diào)用的是FastThreadLocal#get()
方法侮腹。那么其實threadCache
也是一個FastThreadLocal
,可以看成是jdk的ThreadLocal
,只不過還了一種跟家塊的是西安方法广恢。get
方發(fā)住喲啊是調(diào)用了初始化方法initialize
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
//調(diào)用初始化方法
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
private final PoolThreadLocalCache threadCache;
initialValue()
方法的邏輯如下
- 從預(yù)先準備好的
heapArenas
和directArenas
中獲取最少使用的arena
- 使用獲取到的
arean
為參數(shù)凯旋,實例化一個PoolThreadCache
并返回
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
/**
* arena翻譯成競技場,關(guān)于內(nèi)存非配的邏輯都在這個競技場中進行分配
*/
//獲取heapArena:從heapArenas堆內(nèi)競技場中拿出使用最少的一個arena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
//獲取directArena:從directArena堆內(nèi)競技場中拿出使用最少的一個arena
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
//創(chuàng)建PoolThreadCache:該Cache最終被一個線程使用
//通過heapArena和directArena維護兩大塊內(nèi)存:堆和堆外內(nèi)存
//通過tinyCacheSize钉迷,smallCacheSize,normalCacheSize維護ByteBuf緩存列表維護反復(fù)使用的內(nèi)存塊
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
//省略代碼......
}
查看PoolThreadCache
其維護了兩種類型的內(nèi)存分配策略钠署,一種是上述通過持有heapArena
和directArena
糠聪,另一種是通過維護tiny
,small
,normal
對應(yīng)的緩存列表來維護反復(fù)使用的內(nèi)存。
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
//通過arena的方式維護內(nèi)存
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//維護了tiny, small, normal三種類型的緩存列表
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private final AtomicBoolean freed = new AtomicBoolean();
private int allocations;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
//通過持有heapArena和directArena谐鼎,arena的方式管理內(nèi)存分配
this.heapArena = heapArena;
this.directArena = directArena;
//通過tinyCacheSize,smallCacheSize,normalCacheSize創(chuàng)建不同類型的緩存列表并保存到成員變量
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
//創(chuàng)建規(guī)格化緩存隊列
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
//創(chuàng)建規(guī)格化緩存隊列
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
//創(chuàng)建規(guī)格化緩存隊列
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
//MemoryRegionCache 維護緩存的一個對象
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
//每一種MemoryRegionCache(tiny,small,normal)都表示不同內(nèi)存大薪Ⅲ (不同規(guī)格)的一個隊列
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
//MemoryRegionCache 維護緩存的一個對象
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
//每一種MemoryRegionCache(tiny,small,normal)都表示不同內(nèi)存(不同規(guī)格)大小的一個隊列
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
......
}
通過查看分配緩存的方法PoolThreadCache#createSubPageCaches()
可以發(fā)現(xiàn)具體維護的緩存列表對象MemoryRegionCache
實際上時維護了一個Queue<Entry<T>> queue
也就是隊列。
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
//做一個簡單的規(guī)格化
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
//持有這種規(guī)格的緩存隊列
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
......
}
- 關(guān)于準備好的內(nèi)存競技場
heapArena
和directArena
被PooledByteBufAllocator
持有狸棍。在實例化分配器的時候被初始化值
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
//三種緩存列表長度
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
跟蹤初始化的過程可以發(fā)現(xiàn)身害,其實headArena
和directArena
都是一個PoolArena[]
,其內(nèi)部分別定義了兩個內(nèi)部類PoolArena.HeapArena
和PoolArena.DirectArena
分別表示堆內(nèi)內(nèi)存競技場和堆外內(nèi)存競技場。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
checkPositiveOrZero(nHeapArena, "nHeapArena");
checkPositiveOrZero(nDirectArena, "nDirectArena");
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
//創(chuàng)建兩種內(nèi)存分配的PoolArena數(shù)組草戈,heapArenas和directArenas
if (nHeapArena > 0) {
//創(chuàng)建heapArenas內(nèi)存競技場(其實是PoolArena[])
//nHeapArena:數(shù)組大小
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
//堆內(nèi):PoolArena[]存放它下面的HeapArena
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
//創(chuàng)建heapArenas內(nèi)存競技場(其實是PoolArena[])
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
//堆外:PoolArena[]存放它下面的DirectArena
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
private static <T> PoolArena<T>[] newArenaArray(int size) {
//創(chuàng)建PoolArena數(shù)組
return new PoolArena[size];
}
初始化內(nèi)存競技場數(shù)組的大家的默認值為defaultMinNumArena
塌鸯,2被的cpu核心數(shù),運行時每個線程可獨享一個arena唐片,內(nèi)存分配的時候就不用加鎖了
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
//2倍cpu核心數(shù),默認創(chuàng)建這個數(shù)量大小的Arena數(shù)組
// (這個數(shù)字和創(chuàng)建NioEventLoop數(shù)組的數(shù)量一致丙猬,每個線程都可以由一個獨享的arena,這個數(shù)組中的arena其實在分配內(nèi)存的時候是不用加鎖的)
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
- 整體分配架構(gòu),如圖
假設(shè)初始化了4個NioEventLoop
也就是4個線程的數(shù)組费韭,默認cpu核心數(shù)為2茧球。那么內(nèi)存分配器PooledByteBufAllocator
持有的arena
數(shù)量也是4個。創(chuàng)建一個ByteBuf的過程如下:
- 首先星持,通過
PoolThreadCache
去拿到一個對應(yīng)的arena
對象抢埋。那么PoolThreadCache
的作用就是通過ThreadLoad
的方式把內(nèi)存分配器PooledByteBufAllocator
持有的arena
數(shù)組中其中的一個arena(最少使用的)
塞到PoolThreadCache
的一個成員變量里面。- 然后,當每個線程通過它(
threadCache
)去調(diào)用get
方法的時候揪垄,會拿到它底層的一個arena
,也就是第一個線程拿到第一個穷吮,第二個線程拿到第二個以此類推。這樣可以把線程和arena
進行一個綁定PoolThreadCache
除了可以直接在arena
管理的這塊內(nèi)存進行內(nèi)存分配福侈,還可在它底層維護的一個ByteBuf緩存列表里進行內(nèi)存分配酒来。在PooledByteBufAllocator
中持有tinyCacheSize
,smallCacheSize
,normalCacheSize
,分配內(nèi)存時調(diào)用threadCache.get();
的時候?qū)嵗?code>PoolThreadCache作為它的構(gòu)造方法參數(shù)傳入肪凛,創(chuàng)建了對應(yīng)的緩存列表堰汉。