概要
初始化
static {
// 默認(rèn)的pagesize 8k
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
// 默認(rèn)poolsubpage二叉樹(shù)的深度11
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 11);
final Runtime runtime = Runtime.getRuntime();
// 每個(gè)ByteBufAllocator所能擁有的最小arena個(gè)數(shù)是cpu核心的2倍,跟NioEventLoop一樣
// 換句話說(shuō),每個(gè)EventLoop都有自己專屬的Arena,節(jié)省了同步的開(kāi)銷
final int defaultMinNumArena = runtime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
// runtime.maxMemory() / defaultChunkSize味抖,當(dāng)然是當(dāng)前系統(tǒng)內(nèi)存總共能切分多少個(gè)chunk
// 除以2溅漾,相當(dāng)于不能消耗超過(guò)系統(tǒng)內(nèi)存的一半
// 除以3,相當(dāng)于3*arena=chunk也颤,也就是每個(gè)arena至少需要3個(gè)chunk
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
// 設(shè)置cachesize伊滋,tiny有512碳却,small有256,normal有64
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
}
構(gòu)造方法
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
// 可以看成是一個(gè)ThreadLocal變量笑旺,跟當(dāng)前線程綁定
threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
// chunksize = pagesize*2^maxOrder = 16M 默認(rèn)
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
// Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize)
// 也就是計(jì)算pagesize從尾部開(kāi)始一共有多少位連續(xù)的0
// 比如8k=10000000000000昼浦,那么pageShifts=13
int pageShifts = validateAndCalculatePageShifts(pageSize);
// 上面初始化的時(shí)候,nHeapArena已經(jīng)知道了筒主,最少是CPUCORE*2
if (nHeapArena > 0) {
// 這里先創(chuàng)建一個(gè)CPUCORE*2長(zhǎng)度的PoolArena數(shù)組
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
// 正式開(kāi)始初始化PoolArena.HeapArena
PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
// 加到heapArenas里面
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
// 堆外的跟上面類似
if (nDirectArena > 0) {
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
}
申請(qǐng)
newHeapBuffer
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
// 拿到線程綁定的PoolThreadCache的heapArena
// 也就是先去線程本地緩存里找是否有所屬的arena
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
// 如果有的話座柱,那么直接去找arena申請(qǐng)內(nèi)存
if (heapArena != null) {
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 如果沒(méi)有的話迷帜,那么去申請(qǐng)非池化的內(nèi)存
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 比heap多了一次優(yōu)化,使用unsafe的方式直接去申請(qǐng)堆外內(nèi)存
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
PoolThreadLocalCache
這里有個(gè)問(wèn)題是色洞,既然Allocator初始化的時(shí)候新建了這么多的arena,那么它是怎樣跟線程綁定的呢冠胯?而且線程是怎樣決定綁定哪一個(gè)arena的呢火诸?我們繼續(xù)。
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
// 前面Allocator初始化的時(shí)候已經(jīng)新建了一個(gè)PoolThreadLocalCache
// 同時(shí)荠察,它內(nèi)部ThreadLocalMap初始化的時(shí)候會(huì)觸發(fā)到這里
// 總結(jié)下置蜀,這個(gè)Allocator在該線程綁定一個(gè)threadCache的本地變量
// 而這個(gè)threadCache對(duì)應(yīng)的value是initialValue的返回值
@Override
protected synchronized PoolThreadCache initialValue() {
// 從heapArenas和directArenas分別選中一個(gè),包裝成PoolThreadCache返回
// 最少被使用
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
// 很簡(jiǎn)單悉盆,遍歷整個(gè)Allocator所擁有的所有arena盯荤,看誰(shuí)綁定的線程數(shù)最少就用誰(shuí)
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}