在netty的池化ByteBuf分配中惑灵,包含ByteBuf對(duì)象的池化和真實(shí)內(nèi)存(array或者DirectByteBuffer)的池化旦棉。本文內(nèi)容為對(duì)象池的細(xì)節(jié),真實(shí)內(nèi)存的池化參見(jiàn):netty-內(nèi)存管理苞尝。
實(shí)際上Recycler不僅可以用于ByteBuf對(duì)象的池化肩祥,他是一個(gè)通用的對(duì)象池化技術(shù),我們可以直接使用Recycler實(shí)現(xiàn)自身系統(tǒng)對(duì)象的池化。
1. 對(duì)象池化的作用
在netty中ByteBuf的創(chuàng)建是個(gè)非常頻繁的過(guò)程擅羞,使用對(duì)象池可以起到對(duì)象可以復(fù)用,減少gc頻率的作用义图。
2. Recycler的關(guān)鍵技術(shù)點(diǎn)
-
多線程競(jìng)爭(zhēng)下的對(duì)象分配和釋放(怎么減小多線程競(jìng)爭(zhēng))
每個(gè)線程都有自己的對(duì)象池减俏,分配時(shí)從自己的對(duì)象池中獲得一個(gè)對(duì)象。其他線程release對(duì)象時(shí)碱工,把對(duì)象歸還到原來(lái)自己的池子中去(分配線程的池子)娃承。
大量使用了ThreadLocal,每個(gè)線程都有自己的stack和weakorderqueue痛垛,做到線程封閉草慧,有力減小競(jìng)爭(zhēng)。 -
對(duì)象的分配
2.1 先從stack中分配匙头,如果stack有,則直接stack.pop獲得對(duì)象
2.2 如果stack中沒(méi)有仔雷,從WeakOrderQueue中一次移取多個(gè)對(duì)象到stack中(每次會(huì)盡可能scavenge整個(gè)head Link到stack中)蹂析,最后stack.pop獲得對(duì)象。 -
對(duì)象的release
3.1 如果release線程就是對(duì)象的分配線程碟婆,直接入棧
3.2 如果release線程和對(duì)象的分配線程不是同一個(gè)線程电抚,則歸還到分配線程的WeakOrderQueue中。release線程維護(hù)的數(shù)據(jù)結(jié)構(gòu)為:ThreadLocal<Map<Stack,WeakOrderQueue>>
3. 未解疑問(wèn)
- 為什么需要WeakOrderQueue竖共?
WeakOrderQueue的存在是用于非分配對(duì)象的線程在release對(duì)象時(shí)將對(duì)象歸還到分配線程的對(duì)象池中蝙叛,如果直接把對(duì)象歸還到release對(duì)象的線程中,也就是release線程直接stack.push豈不是很高效公给?
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
maxCapacity=32768
4. Recycler核心結(jié)構(gòu)
Recycler關(guān)聯(lián)了4個(gè)核心類(lèi):
- DefaultHandle:對(duì)象的包裝類(lèi)借帘,在Recycler中緩存的對(duì)象都會(huì)包裝成DefaultHandle類(lèi)。
- Stack:存儲(chǔ)本線程回收的對(duì)象淌铐。對(duì)象的獲取和回收對(duì)應(yīng)Stack的pop和push肺然,即獲取對(duì)象時(shí)從Stack中pop出1個(gè)DefaultHandle,回收對(duì)象時(shí)將對(duì)象包裝成DefaultHandle push到Stack中腿准。Stack會(huì)與線程綁定际起,即每個(gè)用到Recycler的線程都會(huì)擁有1個(gè)Stack,在該線程中獲取對(duì)象都是在該線程的Stack中pop出一個(gè)可用對(duì)象吐葱。
stack底層以數(shù)組作為存儲(chǔ)結(jié)構(gòu)街望,初始大小為256,最大大小為32768弟跑。 - WeakOrderQueue:存儲(chǔ)其它線程回收到分配線程的對(duì)象灾前,當(dāng)某個(gè)線程從Stack中獲取不到對(duì)象時(shí)會(huì)從WeakOrderQueue中獲取對(duì)象。
3.1 從分配線程的角度來(lái)看窖认,每個(gè)分配線程的Stack擁有1個(gè)WeakOrderQueue鏈表豫柬,每個(gè)WeakOrderQueue元素維持了對(duì)應(yīng)release線程歸還的對(duì)象告希。每個(gè)分配線程的WeakOrderQueue鏈表的對(duì)象池子中的對(duì)象數(shù)量不能超過(guò)availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
,該值默認(rèn)為16K=16384烧给。
3.2 從release角度看燕偶,主要的數(shù)據(jù)結(jié)構(gòu)為:
image.png
Map<Stack<?>, WeakOrderQueue>維持了release線程向每個(gè)分配線程歸還對(duì)象的數(shù)據(jù)結(jié)構(gòu),Map的大小不超過(guò)MAX_DELAYED_QUEUES_PER_THREAD = max(0, SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", // We use the same value as default EventLoop number NettyRuntime.availableProcessors() * 2));
础嫡。
也就是release線程默認(rèn)最多能向核數(shù)*2個(gè)分配線程歸還對(duì)象指么。 - Link: WeakOrderQueue中包含1個(gè)Link鏈表,回收對(duì)象存儲(chǔ)在鏈表某個(gè)Link節(jié)點(diǎn)里榴鼎,當(dāng)Link節(jié)點(diǎn)存儲(chǔ)的回收對(duì)象滿了時(shí)會(huì)新建1個(gè)Link放在Link鏈表尾伯诬。
private static final class Link extends AtomicInteger
每個(gè)Link節(jié)點(diǎn)默認(rèn)能存儲(chǔ)16個(gè)元素,通過(guò)繼承AtomicInteger是為了解決分配線程通過(guò)WeakOrderQueue和回收線程歸還對(duì)象時(shí)的線程競(jìng)爭(zhēng)巫财。
整個(gè)Recycler回收對(duì)象存儲(chǔ)結(jié)構(gòu)如下圖所示:
5. Recycler實(shí)現(xiàn)細(xì)節(jié)
本節(jié)部分內(nèi)容摘錄自[NettyRecycler(http://www.reibang.com/p/4eab8450560c)盗似。
Recycler用來(lái)實(shí)現(xiàn)對(duì)象池,其中對(duì)應(yīng)堆內(nèi)存和直接內(nèi)存的池化實(shí)現(xiàn)分別是PooledHeapByteBuf和PooledDirectByteBuf平项。Recycler主要提供了3個(gè)方法:
- get():獲取一個(gè)對(duì)象赫舒。
- recycle(T, Handle):回收一個(gè)對(duì)象,T為對(duì)象泛型闽瓢。
- newObject(Handle):當(dāng)沒(méi)有可用對(duì)象時(shí)創(chuàng)建對(duì)象的實(shí)現(xiàn)方法接癌。
Recycler的UML圖如下:
下面分析下源碼,首先看下Recycler.recycle(T, Handle)方法扣讼,用于回收1個(gè)對(duì)象:
public final boolean recycle(T o, Handle handle) {
if (handle == NOOP_HANDLE) {
return false;
}
DefaultHandle h = (DefaultHandle) handle;
if (h.stack.parent != this) {
return false;
}
if (o != h.value) {
throw new IllegalArgumentException("o does not belong to handle");
}
h.recycle();
return true;
}
回收1個(gè)對(duì)象會(huì)調(diào)用該對(duì)象DefaultHandle.recycle()方法缺猛,如下:
public void recycle() {
stack.push(this);
}
回收1個(gè)對(duì)象(DefaultHandle)就是把該對(duì)象push到stack中。
void push(DefaultHandle item) {
Thread currentThread = Thread.currentThread();
if (thread == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
/**
* 如果該stack就是本線程的stack椭符,那么直接把DefaultHandle放到該stack的數(shù)組里
*/
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack, we need to signal that the push
// happens later.
/**
* 如果該stack不是本線程的stack荔燎,那么把該DefaultHandle放到該stack的WeakOrderQueue中
*/
pushLater(item, currentThread);
}
}
這里分為兩種情況,當(dāng)stack是當(dāng)前線程對(duì)應(yīng)的stack時(shí)艰山,執(zhí)行pushNow(item)方法湖雹,直接把對(duì)象放到該stack的DefaultHandle數(shù)組中,如下:
/**
* 直接把DefaultHandle放到stack的數(shù)組里曙搬,如果數(shù)組滿了那么擴(kuò)展該數(shù)組為當(dāng)前2倍大小
* @param item
*/
private void pushNow(DefaultHandle item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
當(dāng)stack是其它線程的stack時(shí)摔吏,執(zhí)行pushLater(item, currentThread)方法,將對(duì)象放到WeakOrderQueue中纵装,如下:
private void pushLater(DefaultHandle item, Thread thread) {
/**
* Recycler有1個(gè)stack->WeakOrderQueue映射征讲,每個(gè)stack會(huì)映射到1個(gè)WeakOrderQueue,這個(gè)WeakOrderQueue是該stack關(guān)聯(lián)的其它線程WeakOrderQueue鏈表的head WeakOrderQueue橡娄。
* 當(dāng)其它線程回收對(duì)象到該stack時(shí)會(huì)創(chuàng)建1個(gè)WeakOrderQueue中并加到stack的WeakOrderQueue鏈表中诗箍。
*/
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
/**
* 如果delayedRecycled滿了那么將1個(gè)偽造的WeakOrderQueue(DUMMY)放到delayedRecycled中,并丟棄該對(duì)象(DefaultHandle)
*/
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
/**
* 創(chuàng)建1個(gè)WeakOrderQueue
*/
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
/**
* 將對(duì)象放入到該stack對(duì)應(yīng)的WeakOrderQueue中
*/
queue.add(item);
}
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
/**
* 如果該stack的可用共享空間還能再容下1個(gè)WeakOrderQueue挽唉,那么創(chuàng)建1個(gè)WeakOrderQueue滤祖,否則返回null
*/
return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? new WeakOrderQueue(stack, thread) : null;
}
WeakOrderQueue的構(gòu)造函數(shù)如下筷狼,WeakOrderQueue實(shí)現(xiàn)了多線程環(huán)境下回收對(duì)象的機(jī)制,當(dāng)由其它線程回收對(duì)象到stack時(shí)會(huì)為該stack創(chuàng)建1個(gè)WeakOrderQueue匠童,這些由其它線程創(chuàng)建的WeakOrderQueue會(huì)在該stack中按鏈表形式串聯(lián)起來(lái)埂材,每次創(chuàng)建1個(gè)WeakOrderQueue會(huì)把該WeakOrderQueue作為該stack的head WeakOrderQueue:
private WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
owner = new WeakReference<Thread>(thread);
/**
* 每次創(chuàng)建WeakOrderQueue時(shí)會(huì)更新WeakOrderQueue所屬的stack的head為當(dāng)前WeakOrderQueue, 當(dāng)前WeakOrderQueue的next為stack的之前head汤求,
* 這樣把該stack的WeakOrderQueue通過(guò)鏈表串起來(lái)了俏险,當(dāng)下次stack中沒(méi)有可用對(duì)象需要從WeakOrderQueue中轉(zhuǎn)移對(duì)象時(shí)從WeakOrderQueue鏈表的head進(jìn)行scavenge轉(zhuǎn)移到stack的對(duì)DefaultHandle數(shù)組。
*/
synchronized (stack) {
next = stack.head;
stack.head = this;
}
availableSharedCapacity = stack.availableSharedCapacity;
}
下面再看Recycler.get()方法:
public final T get() {
if (maxCapacity == 0) {
return newObject(NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
取出該線程對(duì)應(yīng)的stack扬绪,從stack中pop出1個(gè)DefaultHandle竖独,返回該DefaultHandle的真正對(duì)象。
下面看stack.pop()方法:
DefaultHandle pop() {
int size = this.size;
if (size == 0) {
if (!scavenge()) {
return null;
}
size = this.size;
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
如果該stack的DefaultHandle數(shù)組中還有對(duì)象可用挤牛,那么從該DefaultHandle數(shù)組中取出1個(gè)可用對(duì)象返回莹痢,如果該DefaultHandle數(shù)組沒(méi)有可用的對(duì)象了,那么執(zhí)行scavenge()方法赊颠,將head WeakOrderQueue中的head Link中的DefaultHandle數(shù)組轉(zhuǎn)移到stack的DefaultHandle數(shù)組格二,scavenge方法如下:
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
具體執(zhí)行了scavengeSome()方法,清理WeakOrderQueue中部分DefaultHandle到stack竣蹦,每次盡可能清理head WeakOrderQueue的head Link的全部DefaultHandle,如下:
boolean scavengeSome() {
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
}
}
boolean success = false;
WeakOrderQueue prev = this.prev;
do {
/**
* 將當(dāng)前WeakOrderQueue的head Link的DefaultHandle數(shù)組轉(zhuǎn)移到stack的DefaultHandle數(shù)組中
*/
if (cursor.transfer(this)) {
success = true;
break;
}
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) {
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.next = next;
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
WeakOrderQueue.transfer()方法如下沧奴,將WeakOrderQueue的head Link中的DefaultHandle數(shù)組遷移到stack中:
boolean transfer(Stack<?> dst) {
Link head = this.head;
if (head == null) {
return false;
}
/**
* 如果head Link的readIndex到達(dá)了Link的容量LINK_CAPACITY痘括,說(shuō)明該Link已經(jīng)被scavengge完了。
* 這時(shí)需要把下一個(gè)Link作為新的head Link滔吠。
*/
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
final int srcStart = head.readIndex;
/**
* head Link的回收對(duì)象數(shù)組的最大位置
*/
int srcEnd = head.get();
/**
* head Link可以scavenge的DefaultHandle的數(shù)量
*/
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size;
/**
* 每次會(huì)盡可能scavenge整個(gè)head Link纲菌,如果head Link的DefaultHandle數(shù)組能全部遷移到stack中,stack的DefaultHandle數(shù)組預(yù)期容量
*/
final int expectedCapacity = dstSize + srcSize;
/**
* 如果預(yù)期容量大于stack的DefaultHandle數(shù)組最大長(zhǎng)度疮绷,說(shuō)明本次無(wú)法將head Link的DefaultHandle數(shù)組全部遷移到stack中
*/
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
/**
* head Link的DefaultHandle數(shù)組
*/
final DefaultHandle[] srcElems = head.elements;
/**
* stack的DefaultHandle數(shù)組
*/
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
/**
* 遷移head Link的DefaultHandle數(shù)組到stack的DefaultHandle數(shù)組
*/
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
/**
* 當(dāng)head節(jié)點(diǎn)的對(duì)象全都轉(zhuǎn)移給stack后翰舌,取head下一個(gè)節(jié)點(diǎn)作為head,下次轉(zhuǎn)移的時(shí)候再?gòu)男碌膆ead轉(zhuǎn)移回收的對(duì)象
*/
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
reclaimSpace(LINK_CAPACITY);
this.head = head.next;
}
/**
* 遷移完成后更新原始head Link的readIndex
*/
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}