源碼目錄 ///sync.pool.go (1.14.1)
前言
sync.pool對象池是個好東西跌宛,避免對象的反復創(chuàng)建和回收描沟。對于一些需要頻繁創(chuàng)建的對象我們可以使用它來避免內(nèi)存的頻繁申請囱嫩、回收洞辣。但應避免用于連接池胚吁,因為sync.pool存儲的對象僅存活在三次STW之間延窜。
數(shù)據(jù)結構
//sync.pool.go
type Pool struct {
noCopy noCopy // 不可復制標準
local unsafe.Pointer // 指向poolLocal數(shù)組
localSize uintptr // poolLocal數(shù)組長度
victim unsafe.Pointer // 指向poolLocal數(shù)組
victimSize uintptr // poolLocal數(shù)組長度
New func() interface{} // 自定義的對象創(chuàng)建方法
}
local和victim都指向poolLocal數(shù)組恋脚,它們的區(qū)別是什么呢? pool里面有兩個poolLocal數(shù)組腺办,當經(jīng)歷一次STW后victim置零,將local賦值給victim糟描,local置零怀喉。獲取對象(Get方法)的順序 local --> victim --> New。那poolLocal數(shù)組有多少個呢船响?一個P對應一個poolLocal躬拢,pid作為下標躲履。
//sync.pool.go
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // 沒發(fā)現(xiàn)用在什么地方
}
type poolLocalInternal struct {
private interface{} // 當前P私有的
shared poolChain // 公共的,當前P可以在頭部壓入和彈出聊闯,其他P可以再尾部彈出
}
poolLocal 就可以當成poolLocalInternal工猜。有一個私有變量存儲單個對象,和一個所有P都可以訪問的公共對象數(shù)組菱蔬。
//sync.poolqueue.go
type poolChain struct {
// 生產(chǎn)者(當前P)所有篷帅。壓入和彈出。只有一個P對他操作拴泌,不存在競爭魏身,不需要同步
head *poolChainElt
// 消費者(所有P)所有。彈出蚪腐。多個P存在競爭箭昵,需要原子操作
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
next, prev *poolChainElt // 雙向鏈表
}
// 循環(huán)隊列
type poolDequeue struct {
headTail uint64 // 通過它計算數(shù)組的頭和尾,head高32位,tal低32位
vals []eface // 對象數(shù)組
}
全局變量
//sync.pool.go
var (
allPoolsMu Mutex // 全局鎖
allPools []*Pool // 存儲所有的pool
oldPools []*Pool
)
這里的allPools和oldPools都存儲的Pool集合回季,那他們有什么區(qū)別呢家制?allPools存儲的是未經(jīng)歷STW的Pool,使用的是pool的local字段;allPools經(jīng)歷STW之后將local變成victim,allPools變成oldPools茧跋。這兩個變量不參與存取過程慰丛,僅在STW的時候使用。
Put 存
//sync.pool.go
func (p *Pool) Put(x interface{}) {
if x == nil { // 空對象不讓存
return
}
....
l, _ := p.pin() // 返回poolLocal
// 先往private中放
if l.private == nil {
l.private = x
x = nil
}
// x != nil 說明往private中放失敗
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
....
}
通過pin()獲取poolLocal瘾杭。如果private為空則放置在private中诅病,否則放置在公共的區(qū)域。在后面會分析pushHead()粥烁。
pin
func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin() // 該方法的作用是該P禁止搶占贤笆,意味著當前P一直執(zhí)行當前G
s := atomic.LoadUintptr(&p.localSize) // poolLocal長度
l := p.local // poolLocal首地址
if uintptr(pid) < s { // pid作為poolLocal數(shù)組下標,所以必須小于數(shù)組長度
return indexLocal(l, pid), pid
}
return p.pinSlow() //獲取其它P的poolLocal或生成poolLocal
}
// 根據(jù)首地址 和 下標計算對應的poolLocal的地址
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
獲取當前P的poolLocal, 當然這里的P可能是兩個不同的P,因為中間過程存在調(diào)度讨阻,執(zhí)行該G的P可能變化芥永。
pinSlow
func (p *Pool) pinSlow() (*poolLocal, int) {
// 在上面pin中執(zhí)行了runtime_procPin,所以需要Unpin
runtime_procUnpin()
// 可能需要創(chuàng)建poolLocal,添加到allPools,所以需要加鎖
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin() // 再次禁止搶占
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
// 在上面的pin中已經(jīng)判斷了uintptr(pid) < s,為什么還需要再次判斷呢?
// 因為上面執(zhí)行了Unpin以及l(fā)ock,所以此時的P可能已經(jīng)不是之前的P了
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p) //加入到allPools中
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size) // 從這里可以看出poolLocal的個數(shù)就是P的個數(shù)
// 使用了新的poolLocal數(shù)組钝吮,指明首地址和長度
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
- 一個P對應一個poolLocal埋涧,pid作為poolLocal下標;
- 獲取poolLocal的時候,會根據(jù)下標pid獲取對應的poolLocal奇瘦。但這不是總是成功的棘催,當修改了GOMAXPROCS導致P的個數(shù)大于poolLocal的個數(shù)時候會重建poolLocal數(shù)組。
Get 獲取
func (p *Pool) Get() interface{} {
...
l, pid := p.pin() // 通過pin獲取當前G的P的poolLocal
x := l.private // 首先尋找的是private
l.private = nil
if x == nil { // private 不存在
x, _ = l.shared.popHead() // 在當前P的公共部分獲取
if x == nil { // 如果在當前P獲取失敗耳标,則從其他P中偷一個
x = p.getSlow(pid)
}
}
runtime_procUnpin()
...
if x == nil && p.New != nil { // 如果在所有pooLocal和victim都沒有獲取到醇坝,就只有執(zhí)行自定義的New()了
x = p.New()
}
return x
}
流程: 當前P的private --> 當前P的shared --> 其他P的shared --> New
getSlow
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 遍歷所有P的公共區(qū)
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果遍歷了全部poolLocal還是沒獲取到,那就從victim中獲取次坡。victim又是什么呢呼猪?
// victim就是上一輪STW的poolLocal画畅。查找流程和poolLocal一樣,先找pid對應的宋距,再找全部的轴踱。
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果在victim中沒獲取到,則將其長度設為0乡革,免得下次在其中查找
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
getSlow就是"偷"寇僧,當然只能在公共區(qū)偷摊腋。先遍歷poolLocal,再遍歷victim沸版。
接下看看如何在poolChain中存取
pushHead
func (c *poolChain) pushHead(val interface{}) {
d := c.head
// 如果head不存在就初始化一個
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d) // head不存在也就意味著tail也不存在
}
// 壓入dequeue中
if d.pushHead(val) {
return
}
// 到了這一步說明上面pushHead失敗了,說head對應的dequeue滿了兴蒸,需要添加一個dequeue
newSize := len(d.vals) * 2 // 當前head的2倍
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 如果首尾剛好相差一個queue長度视粮,說明一個queue滿了
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil { // 為什么要判斷nil,因為在popTail中是先設置headTail,再將typ置零
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
存的流程:
- private 可用就存在private中
- 往shared的queue的head中存橙凳,如果queue為空蕾殴,則新建一個queue
- 如果queue滿了,就添加一個2倍的queue節(jié)點
popHead
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface{}
return val, true
}
這個邏輯比較簡單岛啸,就是head 減 1
popTail
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
ptrs2 := d.pack(head, tail+1)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
// 這里為什么不 *slot = eface{}呢钓觉?
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
}
tail先加1,取值坚踩,置零
poolCleanup STW清除
func poolCleanup() {
// victim清空
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// poolLocal移到victim荡灾,并清空
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// allPools 變成 oldPools
oldPools, allPools = allPools, nil
}