sync.pool

sync.pool 主要用于暫時(shí)保存對象戈锻,提供存取操作编兄,可以復(fù)用對象以避免頻繁的創(chuàng)建對象轩性,當(dāng)goroutine很多,頻繁的創(chuàng)建某個(gè)對象時(shí)狠鸳,可能會形成并發(fā)?-占?內(nèi)存?-GC 緩慢-處理并發(fā)能?降低-并發(fā)更?這樣的惡性循環(huán)揣苏,不過sync.pool不能用于數(shù)據(jù)庫連接池悯嗓,因?yàn)?code>pool池會定期自動觸發(fā)GC回收對象,至于用法就不贅述了卸察,下面主要解讀源碼

結(jié)構(gòu)

// pool池
type Pool struct {
    // 禁止copy绅作,之前的文章講過,這里不多說了
    noCopy noCopy

    // 對象池蛾派,指向[P]poolLocal切片的指針
    // 這里的P是通過runtime.GOMAXPROCS獲得俄认,不過不知道什么是P,可以先看下go的GPM模型
    // 這里默認(rèn)poolLocal切片的長度是P主要有兩個(gè)好處
    // 一個(gè)是將緩存池進(jìn)行了分段洪乍,減少了操作鎖粒度眯杏,類似mysql的組提交
    // 另外一個(gè)是同一個(gè)P綁定到M之后,同一時(shí)間只會調(diào)度一個(gè)G壳澳,也就天然的防止了P維度下的并發(fā)
    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    // poolLocal元素個(gè)數(shù)  
    localSize uintptr        // size of the local array

    // victim會在一輪GC到來的時(shí)候做兩件事
    // 一個(gè)是釋放自己占用的內(nèi)存
    // 另外一個(gè)是接管local
    // 也就是說pool池的內(nèi)存釋放會有兩輪GC的間隔岂贩,具體看后續(xù)源碼
    victim     unsafe.Pointer // local from previous cycle
    victimSize uintptr        // size of victims array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    // 新建對象的方法,當(dāng)pool池中沒有可用的對象時(shí)巷波,會調(diào)用該方法創(chuàng)建一個(gè)新的對象
    New func() interface{}
}

// pool池的分段結(jié)構(gòu)體
type poolLocal struct {
    // 內(nèi)嵌poolLocalInternal
    // 這里內(nèi)嵌主要是為了下面好計(jì)算size
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    // 這里加一些pad主要是為了防止cpu緩存的偽共享萎津,也是為了提升頻繁存取的性能
    // 至于偽共享,可以看看 https://zhuanlan.zhihu.com/p/65394173
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// 內(nèi)嵌的存儲結(jié)構(gòu)體
// Local per-P Pool appendix.
type poolLocalInternal struct {
    // 私有抹镊,只能被當(dāng)前P使用锉屈,比從poolChain更快獲取到對象,也是為了提升頻繁存取的性能
    private interface{} // Can be used only by the respective P.
    // 共享垮耳,所有P都能使用颈渊,但是有部分限制,這個(gè)后面再說
    shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
}

// 實(shí)際存儲對象的邏輯結(jié)構(gòu)是一個(gè)雙向鏈表终佛,然后每個(gè)鏈表節(jié)點(diǎn)是一個(gè)環(huán)形隊(duì)列
// 這里是鏈表的頭尾節(jié)點(diǎn)
type poolChain struct {
    // head is the poolDequeue to push to. This is only accessed
    // by the producer, so doesn't need to be synchronized.
    head *poolChainElt

    // tail is the poolDequeue to popTail from. This is accessed
    // by consumers, so reads and writes must be atomic.
    tail *poolChainElt
}

// 鏈表節(jié)點(diǎn)
type poolChainElt struct {
    // 環(huán)形隊(duì)列
    poolDequeue

    // next and prev link to the adjacent poolChainElts in this
    // poolChain.
    //
    // next is written atomically by the producer and read
    // atomically by the consumer. It only transitions from nil to
    // non-nil.
    //
    // prev is written atomically by the consumer and read
    // atomically by the producer. It only transitions from
    // non-nil to nil.
    // 前后向指針
    next, prev *poolChainElt
}

// 環(huán)形隊(duì)列
type poolDequeue struct {
    // headTail packs together a 32-bit head index and a 32-bit
    // tail index. Both are indexes into vals modulo len(vals)-1.
    //
    // tail = index of oldest data in queue
    // head = index of next slot to fill
    //
    // Slots in the range [tail, head) are owned by consumers.
    // A consumer continues to own a slot outside this range until
    // it nils the slot, at which point ownership passes to the
    // producer.
    //
    // The head index is stored in the most-significant bits so
    // that we can atomically add to it and the overflow is
    // harmless.
    // 64位的整型俊嗽,高32位用來記錄環(huán)head的位置,低32位用來記錄環(huán)tail的位置
    // 而且head代表的是當(dāng)前要寫入的位置铃彰,所以實(shí)際的存儲區(qū)間是[tail, head)
    headTail uint64

    // vals is a ring buffer of interface{} values stored in this
    // dequeue. The size of this must be a power of 2.
    //
    // vals[i].typ is nil if the slot is empty and non-nil
    // otherwise. A slot is still in use until *both* the tail
    // index has moved beyond it and typ has been set to nil. This
    // is set to nil atomically by the consumer and read
    // atomically by the producer.
    // 環(huán)绍豁,這里用eface這個(gè)結(jié)構(gòu)體也是有妙用的,后面具體會說
    vals []eface
}

type eface struct {
    typ, val unsafe.Pointer
}

這里附上一張整體的結(jié)構(gòu)體輔助理解


image.png

Put

var (
    // 全局鎖
    allPoolsMu Mutex

    // allPools is the set of pools that have non-empty primary
    // caches. Protected by either 1) allPoolsMu and pinning or 2)
    // STW.
    // 存儲所有的pool池
    allPools []*Pool

    // oldPools is the set of pools that may have non-empty victim
    // caches. Protected by STW.
    // 需要被GC回收的pool池
    oldPools []*Pool
)

// 存入
func (p *Pool) Put(x interface{}) {
    // 不允許存入nil
    if x == nil {
        return
    }
    // 競態(tài)檢測
    if race.Enabled {
        if fastrand()%4 == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    // 獲取指向poolLocal的指針
    l, _ := p.pin()
    // 正如上面說的牙捉,private可用于快速的存取
    // 這里判斷后會存入對象到private中
    if l.private == nil {
        l.private = x
        x = nil
    }
    // 如果沒有存入private
    if x != nil {
        // 那就存到環(huán)形隊(duì)列中竹揍,pushHead方法后面會說
        l.shared.pushHead(x)
    }
    // 因?yàn)閜in方法中會禁止當(dāng)前M被搶占,也就是綁定的P和M不會改變
    // 所以這里需要解除禁止
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
    }
}


// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
// pin主要是獲取當(dāng)前P的pid和poolLocal
func (p *Pool) pin() (*poolLocal, int) {
    // 這里禁止M被搶占鹃共,也就會防止GC觸發(fā)pool池回收鬼佣,
    // 具體為啥看下GMP的調(diào)度就知道了,一輪GC的時(shí)候會嘗試搶占所有的P并停掉霜浴,只有STW之后才能進(jìn)行GC
    pid := runtime_procPin()
    // In pinSlow we store to local and then to localSize, here we load in opposite order.
    // Since we've disabled preemption, GC cannot happen in between.
    // Thus here we must observe local at least as large localSize.
    // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
    // 獲取localSize
    s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    l := p.local                              // load-consume
    // 如果pid < s晶衷,那說明localSize已經(jīng)有值了,poolLocal已經(jīng)初始化過了
    // poolLocal通過pid和P對應(yīng),所以poolLocal[pid]就是當(dāng)前要找的目標(biāo)分段
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    // 到這里說uintptr(pid) >= s
    // 只有兩種可能晌纫,一種是pool池還沒初始化税迷,也就是說poolLocal還沒創(chuàng)建,localSize為0
    // 第二種就是調(diào)大了P的數(shù)量
    // 這兩種都會觸發(fā)重新初始化
    return p.pinSlow()
}

// 通過索引找到目標(biāo)poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    // 簡單的指針計(jì)算锹漱,l指向poolLocal數(shù)組箭养,l+i*sizeof(poolLocal) = poolLocal[i]
    lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    return (*poolLocal)(lp)
}

// 初始化操作
func (p *Pool) pinSlow() (*poolLocal, int) {
    // Retry under the mutex.
    // Can not lock the mutex while pinned.
    // 到這里說明需要重新初始化了,而且后面有加全局鎖操作哥牍,可能導(dǎo)致當(dāng)前g睡眠
    // 這里是可以進(jìn)行GC的毕泌,所以解除禁止M被搶占
    runtime_procUnpin()
    // 加全局鎖,因?yàn)闀僮魅值腶llPools
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    // 到這里又要防止GC觸發(fā)了
    pid := runtime_procPin()
    // poolCleanup won't be called while we are pinned.
    s := p.localSize
    l := p.local
    // 再檢查一次嗅辣,因?yàn)橛锌赡茉谏厦娑虝旱慕獬筂被搶占后撼泛,可能M調(diào)度了別的G,然后該G進(jìn)行了初始化
    if uintptr(pid) < s {
        return indexLocal(l, pid), pid
    }
    // 第一次初始化澡谭,加到全局pool池中愿题,方便后續(xù)定期的GC回收
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    // 獲取P的數(shù)量
    size := runtime.GOMAXPROCS(0)
    // 創(chuàng)建poolLocal切片
    local := make([]poolLocal, size)
    // local指向poolLocal
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    // localSize=P的數(shù)量
    runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
    // 返回具體的poolLocal和pid
    return &local[pid], pid
}

這里的Put操作只說到了poolLocal,其實(shí)后面還會繼續(xù)向poolChain中存入對象蛙奖,這個(gè)先放到后面

Get

// 獲取對象
func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    // 跟存入一樣的操作潘酗,可能會觸發(fā)初始化
    l, pid := p.pin()
    // 優(yōu)先取私有的private
    x := l.private
    l.private = nil
    if x == nil {
        // Try to pop the head of the local shard. We prefer
        // the head over the tail for temporal locality of
        // reuse.
        // private沒有就從share里面取,popHead后面統(tǒng)一說
        x, _ = l.shared.popHead()
        if x == nil {
            // 如果還沒有就只能從其他分段的poolLocal里面取了
            x = p.getSlow(pid)
        }
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    // 還沒取到雁仲,那只能調(diào)用New方法創(chuàng)建一個(gè)新的了
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

// 從別的poolLocal中獲取對象
func (p *Pool) getSlow(pid int) interface{} {
    // See the comment in pin regarding ordering of the loads.
    // 獲取local和對應(yīng)的size
    size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
    locals := p.local                            // load-consume
    // Try to steal one element from other procs.
    // 這里會循環(huán)找除自身外其他的所有的poolLocal
    for i := 0; i < int(size); i++ {
        // 注意這里的取模仔夺,pid是當(dāng)前poolLocal的索引,pid+i+1隨著i的遞增會遍歷所有其他的poolLocal
        l := indexLocal(locals, (pid+i+1)%int(size))
        // 因?yàn)槭菑钠渌膒oolLocal獲取對象伯顶,所以不能獲取private囚灼,只能獲取share
        // 如果這里也能獲取private骆膝,那么又得考慮并發(fā)安全祭衩,無論是加鎖還是使用復(fù)雜的邏輯結(jié)構(gòu),都跟private的初衷即加快存取性能有違背
        // 注意這里只能從popTail取阅签,popTail后面會說掐暮,后面再解釋為啥這里只能從popTail取
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Try the victim cache. We do this after attempting to steal
    // from all primary caches because we want objects in the
    // victim cache to age out if at all possible.
    // 還沒找到,只能嘗試從victim中找了
    // 其實(shí)victim將在下一輪GC中被回收政钟,此處可以當(dāng)做二級緩存來用路克,可以增加pool池的命中率
    // 查找的操作基本跟上面一致
    size = atomic.LoadUintptr(&p.victimSize)
    // 這里會通過前置判斷來減少不必要的查找
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Mark the victim cache as empty for future gets don't bother
    // with it.
    // 到這里說明victim也取不到對象,而victim此時(shí)只會靜靜等待GC回收了养交,不會有改變了
    // 所以將victimSize置為0精算,跟前面的前置判斷相呼應(yīng)
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}

到這里,sync.pool的存取對象操作流程就說完了碎连,繼續(xù)往下一層看看poolChainpushHead灰羽、popHeadpopTail操作

poolChain

func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
    atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
}

func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
    return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
}

// 從頭存入對象
func (c *poolChain) pushHead(val interface{}) {
    // 獲取鏈表的頭節(jié)點(diǎn)
    d := c.head
    // 如果還沒有頭節(jié)點(diǎn)就創(chuàng)建一個(gè)
    if d == nil {
        // Initialize the chain.
        // 節(jié)點(diǎn)環(huán)隊(duì)列的長度初始為8,后續(xù)每增加一個(gè)節(jié)點(diǎn),長度*2(即始終是2的倍數(shù))廉嚼,原因后面說
        const initSize = 8 // Must be a power of 2
        d = new(poolChainElt)
        // 初始化環(huán)隊(duì)列
        d.vals = make([]eface, initSize)
        // 新節(jié)點(diǎn)連到鏈表中
        c.head = d
        storePoolChainElt(&c.tail, d)
    }

    // 向環(huán)隊(duì)列頭添加val玫镐,pushHead后面會說,現(xiàn)在先說鏈表維度的
    if d.pushHead(val) {
        return
    }

    // The current dequeue is full. Allocate a new one of twice
    // the size.
    // 如果頭節(jié)點(diǎn)的環(huán)隊(duì)列滿了怠噪,插入失敗恐似,就再新建一個(gè)節(jié)點(diǎn),size*2
    newSize := len(d.vals) * 2
    // 這里有一個(gè)環(huán)隊(duì)列的長度門限值
    if newSize >= dequeueLimit {
        // Can't make it any bigger.
        newSize = dequeueLimit
    }

    // 同樣的操作
    // 注意這是prev指針指向d而不是next
    // 因?yàn)檫@里是根據(jù)創(chuàng)建順序來的傍念,d在d2之前創(chuàng)建的矫夷,所以是prev指向d
    // 而此時(shí)d2是頭節(jié)點(diǎn),所以感知上是反序的
    d2 := &poolChainElt{prev: d}
    d2.vals = make([]eface, newSize)
    // 將新節(jié)點(diǎn)加入到鏈表中且是新的頭節(jié)點(diǎn)
    c.head = d2
    // d.next指向d2
    storePoolChainElt(&d.next, d2)
    // 存入val
    d2.pushHead(val)
}

// 從頭部取出對象
func (c *poolChain) popHead() (interface{}, bool) {
    d := c.head
    // 如果沒有頭節(jié)點(diǎn)憋槐,那說明還沒存入口四,返回nil
    for d != nil {
        // 從環(huán)隊(duì)列中取
        if val, ok := d.popHead(); ok {
            return val, ok
        }
        // There may still be unconsumed elements in the
        // previous dequeue, so try backing up.
        // 如果當(dāng)前節(jié)點(diǎn)沒找到,繼續(xù)向后找
        d = loadPoolChainElt(&d.prev)
    }
    return nil, false
}

func (c *poolChain) popTail() (interface{}, bool) {
    // 取到尾部節(jié)點(diǎn)
    d := loadPoolChainElt(&c.tail)
    // 還未存入對象
    if d == nil {
        return nil, false
    }

    for {
        // It's important that we load the next pointer
        // *before* popping the tail. In general, d may be
        // transiently empty, but if next is non-nil before
        // the pop and the pop fails, then d is permanently
        // empty, which is the only condition under which it's
        // safe to drop d from the chain.
        // 向前獲取一個(gè)節(jié)點(diǎn)
        d2 := loadPoolChainElt(&d.next)

        // 如果獲取到了則返回
        if val, ok := d.popTail(); ok {
            return val, ok
        }

        // 否則如果尾節(jié)點(diǎn)環(huán)隊(duì)列是空的秦陋,并且前一個(gè)節(jié)點(diǎn)的環(huán)隊(duì)列也是空的
        // 就說明所有的節(jié)點(diǎn)的環(huán)隊(duì)列現(xiàn)在都是空的蔓彩,不用往前找了
        // 因?yàn)槌藀opTail操作外就只有pushHead和popHead操作,這兩個(gè)操作可以認(rèn)為是入棧和出棧驳概,如果棧底是空的那么整個(gè)棧都是空的
        if d2 == nil {
            // This is the only dequeue. It's empty right
            // now, but could be pushed to in the future.
            return nil, false
        }

        // The tail of the chain has been drained, so move on
        // to the next dequeue. Try to drop it from the chain
        // so the next pop doesn't have to look at the empty
        // dequeue again.
        // 這里是個(gè)原子操作赤嚼,因?yàn)闀卸鄠€(gè)其他的goroutine來竊取對象
        // 將tail指針往前挪,刪除掉當(dāng)前空的tail節(jié)點(diǎn)
        // 為什么要?jiǎng)h掉顺又,因?yàn)檫@部分空的節(jié)點(diǎn)不可能再使用了更卒,因?yàn)榇嫒胫挥衟ushHead操作,而pushHead始終都是往頭部插入的
        if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
            // We won the race. Clear the prev pointer so
            // the garbage collector can collect the empty
            // dequeue and so popHead doesn't back up
            // further than necessary.
            storePoolChainElt(&d2.prev, nil)
        }
        // 繼續(xù)往前遍歷
        d = d2
    }
}

到這里鏈表poolChain也說完了稚照,再往下一層就是節(jié)點(diǎn)poolDequeue蹂空,也是真正存取的對象

poolDequeue

type dequeueNil *struct{}

// 還記得吧,前面說的poolDequeue的headTail是通過高低位來表示head和tail的
// 解出head和tail
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
    // const dequeueBits = 32
    const mask = 1<<dequeueBits - 1
    // 取高32位
    head = uint32((ptrs >> dequeueBits) & mask)
    // 取低32位
    tail = uint32(ptrs & mask)
    return
}

// 組合headTail
func (d *poolDequeue) pack(head, tail uint32) uint64 {
    const mask = 1<<dequeueBits - 1
    return (uint64(head) << dequeueBits) |
        uint64(tail&mask)
}

// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val interface{}) bool {
    // 獲取head和tail
    ptrs := atomic.LoadUint64(&d.headTail)
    head, tail := d.unpack(ptrs)
    // 判斷隊(duì)列是否滿了
    if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
        // Queue is full.
        return false
    }
    // 因?yàn)閔ead有可能是一直遞增的(配合popTail)果录,這里通過取模mask來得到正確的索引
    // 這里就是為什么環(huán)隊(duì)列的長度始終是2的倍數(shù)了上枕,就是為了方便得到mask進(jìn)行取模
    // 這種取模的方式類似redis的sds
    slot := &d.vals[head&uint32(len(d.vals)-1)]

    // Check if the head slot has been released by popTail.
    // 獲取目標(biāo)slot的typ
    typ := atomic.LoadPointer(&slot.typ)
    // 如果typ不為空,說明有popTail還沒執(zhí)行完弱恒,不能并發(fā)進(jìn)行辨萍,放棄
    if typ != nil {
        // Another goroutine is still cleaning up the tail, so
        // the queue is actually still full.
        return false
    }

    // The head slot is free, so we own it.
    // val=nil本身是用來表示空slot的,如果存入的是nil返弹,需要用dequeueNil進(jìn)行封裝來區(qū)分
    // dequeueNil就是*struct{}
    if val == nil {
        val = dequeueNil(nil)
    }
    // 這里是個(gè)騷操作锈玉,注意eface的定義,跟interface底層的無接口方法的eface是不是很像
    // 直接將val轉(zhuǎn)成interface义起,底層typ標(biāo)注類型拉背,val標(biāo)注數(shù)據(jù)
    // 這也是為啥當(dāng)存入的是nil的時(shí)候需要轉(zhuǎn)成*struct{},為了讓typ不等于nil默终,從而不會被判定為是空slot
    *(*interface{})(unsafe.Pointer(slot)) = val

    // Increment head. This passes ownership of slot to popTail
    // and acts as a store barrier for writing the slot.
    // head+1
    atomic.AddUint64(&d.headTail, 1<<dequeueBits)
    return true
}

// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
// 從頭部彈出對象
func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        // 如果環(huán)隊(duì)列是空的
        if tail == head {
            // Queue is empty.
            return nil, false
        }

        // Confirm tail and decrement head. We do this before
        // reading the value to take back ownership of this
        // slot.
        head--
        ptrs2 := d.pack(head, tail)
        // 這里是個(gè)原子操作椅棺,用來更新headTail
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            // We successfully took back slot.
            // 如果更新成功
            // 獲取對應(yīng)的slot
            slot = &d.vals[head&uint32(len(d.vals)-1)]
            break
        }
        // 否則就重新再來一次
    }
    // 注意這里不同于pushHead抡诞,是先更新headTail再進(jìn)行取操作,具體原因后面會說

    val := *(*interface{})(unsafe.Pointer(slot))
    // 如果是存入的是nil土陪,解封裝
    if val == dequeueNil(nil) {
        val = nil
    }
    // Zero the slot. Unlike popTail, this isn't racing with
    // pushHead, so we don't need to be careful here.
    // slot置空
    *slot = eface{}
    return val, true
}

// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
// 從尾部彈出對象
// 該方法只有在當(dāng)前P被其他的P竊取對象時(shí)才會用昼汗,除了自身的并發(fā)競爭外還會跟上面的pushHead、popHead產(chǎn)生并發(fā)競爭
func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {
        // 這里操作跟popHead基本一樣
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        if tail == head {
            // Queue is empty.
            return nil, false
        }

        // Confirm head and tail (for our speculative check
        // above) and increment tail. If this succeeds, then
        // we own the slot at tail.
        ptrs2 := d.pack(head, tail+1)
        // 也是原子操作鬼雀,這里跟popHead中對headTail的原子操作天然形成了互斥
        // 也就是說popTail和popHead之間是無鎖且并發(fā)安全的顷窒,同時(shí)也支持自身的并發(fā)安全
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            // Success.
            slot = &d.vals[tail&uint32(len(d.vals)-1)]
            break
        }
    }

    // We now own slot.
    // 也是同樣的操作
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }

    // Tell pushHead that we're done with this slot. Zeroing the
    // slot is also important so we don't leave behind references
    // that could keep this object live longer than necessary.
    //
    // We write to val first and then publish that we're done with
    // this slot by atomically writing to typ.
    // 這里可能有人有疑問,為啥只對slot.typ進(jìn)行原子操作
    // 主要吧slot是eface源哩,兩個(gè)unsafe.pointer鞋吉,占用16字節(jié),atomic還無法對16字節(jié)進(jìn)行原子操作
    // 所以割了一半励烦,只對slot.typ進(jìn)行原子操作
    // 還記得pushHead中的typ := atomic.LoadPointer(&slot.typ)吧谓着,跟這里是呼應(yīng)的
    // 只有這里執(zhí)行完后置slot.typ為nil,pushHead才能獲得該slot進(jìn)行后續(xù)操作
    // 所以這里又通過兩個(gè)原子操作來解決了popTail和pushHead的并發(fā)問題
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)
    // At this point pushHead owns the slot.

    return val, true
}

總結(jié)

sync.pool的前幾個(gè)版本還沒有這么復(fù)雜坛掠,同樣性能也比較差赊锚,后續(xù)迭代持續(xù)做了優(yōu)化,下面說說sync.pool實(shí)現(xiàn)的一些亮點(diǎn)

  • 利用P的原生隔離屬性屉栓,對緩存池進(jìn)行分段舷蒲,減少了鎖粒度,降低了并發(fā)競爭的概率
  • 使用victim cache來進(jìn)行緩存池的新老替換友多,實(shí)現(xiàn)了定期觸發(fā)GC回收減少內(nèi)存占用牲平,也可作為二級緩存來增加命中率
  • 通過增加pad來避免cpu緩存的偽共享,提升讀取性能
  • 底層存儲使用eface結(jié)構(gòu)體域滥,方便進(jìn)行判空和賦值(使用interface{})纵柿,并且環(huán)隊(duì)列單節(jié)點(diǎn)使用固定2^n大小,方便通過mask計(jì)算存取位置启绰,同時(shí)通過鏈表來實(shí)現(xiàn)擴(kuò)容和收縮昂儒,然后定義通過定義頭部存取和尾部取的行為來控制O(1)復(fù)雜度
  • 支持P之間共享分段池,通過限制其他P只能從尾部獲取對象以及最小粒度的原子操作來實(shí)現(xiàn)了無鎖共享
  • 環(huán)隊(duì)列使用一個(gè)uint64的headTail來實(shí)現(xiàn)酬土,通過位移操作來解出head和tail荆忍,方便進(jìn)行原子操作,而且head和tail都是uint32撤缴,考慮溢出的話,邏輯上也是個(gè)環(huán)形結(jié)構(gòu)叽唱,跟實(shí)際存儲的環(huán)形結(jié)構(gòu)保持一致屈呕,更方便計(jì)算位置索引
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市棺亭,隨后出現(xiàn)的幾起案子虎眨,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嗽桩,死亡現(xiàn)場離奇詭異岳守,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)碌冶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門湿痢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扑庞,你說我怎么就攤上這事譬重。” “怎么了罐氨?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵臀规,是天一觀的道長。 經(jīng)常有香客問我栅隐,道長塔嬉,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任租悄,我火速辦了婚禮邑遏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘恰矩。我一直安慰自己记盒,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布外傅。 她就那樣靜靜地躺著纪吮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪萎胰。 梳的紋絲不亂的頭發(fā)上碾盟,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天,我揣著相機(jī)與錄音技竟,去河邊找鬼冰肴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛榔组,可吹牛的內(nèi)容都是我干的熙尉。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼搓扯,長吁一口氣:“原來是場噩夢啊……” “哼检痰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起锨推,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤铅歼,失蹤者是張志新(化名)和其女友劉穎公壤,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體椎椰,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡厦幅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了慨飘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片确憨。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖套媚,靈堂內(nèi)的尸體忽然破棺而出缚态,到底是詐尸還是另有隱情,我是刑警寧澤堤瘤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布玫芦,位于F島的核電站,受9級特大地震影響本辐,放射性物質(zhì)發(fā)生泄漏桥帆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一慎皱、第九天 我趴在偏房一處隱蔽的房頂上張望老虫。 院中可真熱鬧,春花似錦茫多、人聲如沸祈匙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夺欲。三九已至,卻和暖如春今膊,著一層夾襖步出監(jiān)牢的瞬間些阅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工斑唬, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留市埋,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓恕刘,卻偏偏與公主長得像缤谎,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子雪营,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容