sync.pool
主要用于暫時(shí)保存對象戈锻,提供存取操作编兄,可以復(fù)用對象以避免頻繁的創(chuàng)建對象轩性,當(dāng)goroutine很多,頻繁的創(chuàng)建某個(gè)對象時(shí)狠鸳,可能會形成并發(fā)?-占?內(nèi)存?-GC 緩慢-處理并發(fā)能?降低-并發(fā)更?這樣的惡性循環(huán)揣苏,不過sync.pool
不能用于數(shù)據(jù)庫連接池悯嗓,因?yàn)?code>pool池會定期自動觸發(fā)GC回收對象,至于用法就不贅述了卸察,下面主要解讀源碼
結(jié)構(gòu)
// pool池
type Pool struct {
// 禁止copy绅作,之前的文章講過,這里不多說了
noCopy noCopy
// 對象池蛾派,指向[P]poolLocal切片的指針
// 這里的P是通過runtime.GOMAXPROCS獲得俄认,不過不知道什么是P,可以先看下go的GPM模型
// 這里默認(rèn)poolLocal切片的長度是P主要有兩個(gè)好處
// 一個(gè)是將緩存池進(jìn)行了分段洪乍,減少了操作鎖粒度眯杏,類似mysql的組提交
// 另外一個(gè)是同一個(gè)P綁定到M之后,同一時(shí)間只會調(diào)度一個(gè)G壳澳,也就天然的防止了P維度下的并發(fā)
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
// poolLocal元素個(gè)數(shù)
localSize uintptr // size of the local array
// victim會在一輪GC到來的時(shí)候做兩件事
// 一個(gè)是釋放自己占用的內(nèi)存
// 另外一個(gè)是接管local
// 也就是說pool池的內(nèi)存釋放會有兩輪GC的間隔岂贩,具體看后續(xù)源碼
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
// 新建對象的方法,當(dāng)pool池中沒有可用的對象時(shí)巷波,會調(diào)用該方法創(chuàng)建一個(gè)新的對象
New func() interface{}
}
// pool池的分段結(jié)構(gòu)體
type poolLocal struct {
// 內(nèi)嵌poolLocalInternal
// 這里內(nèi)嵌主要是為了下面好計(jì)算size
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
// 這里加一些pad主要是為了防止cpu緩存的偽共享萎津,也是為了提升頻繁存取的性能
// 至于偽共享,可以看看 https://zhuanlan.zhihu.com/p/65394173
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// 內(nèi)嵌的存儲結(jié)構(gòu)體
// Local per-P Pool appendix.
type poolLocalInternal struct {
// 私有抹镊,只能被當(dāng)前P使用锉屈,比從poolChain更快獲取到對象,也是為了提升頻繁存取的性能
private interface{} // Can be used only by the respective P.
// 共享垮耳,所有P都能使用颈渊,但是有部分限制,這個(gè)后面再說
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
// 實(shí)際存儲對象的邏輯結(jié)構(gòu)是一個(gè)雙向鏈表终佛,然后每個(gè)鏈表節(jié)點(diǎn)是一個(gè)環(huán)形隊(duì)列
// 這里是鏈表的頭尾節(jié)點(diǎn)
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
// 鏈表節(jié)點(diǎn)
type poolChainElt struct {
// 環(huán)形隊(duì)列
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
// 前后向指針
next, prev *poolChainElt
}
// 環(huán)形隊(duì)列
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
// 64位的整型俊嗽,高32位用來記錄環(huán)head的位置,低32位用來記錄環(huán)tail的位置
// 而且head代表的是當(dāng)前要寫入的位置铃彰,所以實(shí)際的存儲區(qū)間是[tail, head)
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
// 環(huán)绍豁,這里用eface這個(gè)結(jié)構(gòu)體也是有妙用的,后面具體會說
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
這里附上一張整體的結(jié)構(gòu)體輔助理解
Put
var (
// 全局鎖
allPoolsMu Mutex
// allPools is the set of pools that have non-empty primary
// caches. Protected by either 1) allPoolsMu and pinning or 2)
// STW.
// 存儲所有的pool池
allPools []*Pool
// oldPools is the set of pools that may have non-empty victim
// caches. Protected by STW.
// 需要被GC回收的pool池
oldPools []*Pool
)
// 存入
func (p *Pool) Put(x interface{}) {
// 不允許存入nil
if x == nil {
return
}
// 競態(tài)檢測
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 獲取指向poolLocal的指針
l, _ := p.pin()
// 正如上面說的牙捉,private可用于快速的存取
// 這里判斷后會存入對象到private中
if l.private == nil {
l.private = x
x = nil
}
// 如果沒有存入private
if x != nil {
// 那就存到環(huán)形隊(duì)列中竹揍,pushHead方法后面會說
l.shared.pushHead(x)
}
// 因?yàn)閜in方法中會禁止當(dāng)前M被搶占,也就是綁定的P和M不會改變
// 所以這里需要解除禁止
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
// pin主要是獲取當(dāng)前P的pid和poolLocal
func (p *Pool) pin() (*poolLocal, int) {
// 這里禁止M被搶占鹃共,也就會防止GC觸發(fā)pool池回收鬼佣,
// 具體為啥看下GMP的調(diào)度就知道了,一輪GC的時(shí)候會嘗試搶占所有的P并停掉霜浴,只有STW之后才能進(jìn)行GC
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
// 獲取localSize
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// 如果pid < s晶衷,那說明localSize已經(jīng)有值了,poolLocal已經(jīng)初始化過了
// poolLocal通過pid和P對應(yīng),所以poolLocal[pid]就是當(dāng)前要找的目標(biāo)分段
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 到這里說uintptr(pid) >= s
// 只有兩種可能晌纫,一種是pool池還沒初始化税迷,也就是說poolLocal還沒創(chuàng)建,localSize為0
// 第二種就是調(diào)大了P的數(shù)量
// 這兩種都會觸發(fā)重新初始化
return p.pinSlow()
}
// 通過索引找到目標(biāo)poolLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
// 簡單的指針計(jì)算锹漱,l指向poolLocal數(shù)組箭养,l+i*sizeof(poolLocal) = poolLocal[i]
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
// 初始化操作
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
// 到這里說明需要重新初始化了,而且后面有加全局鎖操作哥牍,可能導(dǎo)致當(dāng)前g睡眠
// 這里是可以進(jìn)行GC的毕泌,所以解除禁止M被搶占
runtime_procUnpin()
// 加全局鎖,因?yàn)闀僮魅值腶llPools
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 到這里又要防止GC觸發(fā)了
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
// 再檢查一次嗅辣,因?yàn)橛锌赡茉谏厦娑虝旱慕獬筂被搶占后撼泛,可能M調(diào)度了別的G,然后該G進(jìn)行了初始化
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 第一次初始化澡谭,加到全局pool池中愿题,方便后續(xù)定期的GC回收
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
// 獲取P的數(shù)量
size := runtime.GOMAXPROCS(0)
// 創(chuàng)建poolLocal切片
local := make([]poolLocal, size)
// local指向poolLocal
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
// localSize=P的數(shù)量
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
// 返回具體的poolLocal和pid
return &local[pid], pid
}
這里的Put操作只說到了poolLocal,其實(shí)后面還會繼續(xù)向poolChain中存入對象蛙奖,這個(gè)先放到后面
Get
// 獲取對象
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
// 跟存入一樣的操作潘酗,可能會觸發(fā)初始化
l, pid := p.pin()
// 優(yōu)先取私有的private
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
// private沒有就從share里面取,popHead后面統(tǒng)一說
x, _ = l.shared.popHead()
if x == nil {
// 如果還沒有就只能從其他分段的poolLocal里面取了
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 還沒取到雁仲,那只能調(diào)用New方法創(chuàng)建一個(gè)新的了
if x == nil && p.New != nil {
x = p.New()
}
return x
}
// 從別的poolLocal中獲取對象
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
// 獲取local和對應(yīng)的size
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
// 這里會循環(huán)找除自身外其他的所有的poolLocal
for i := 0; i < int(size); i++ {
// 注意這里的取模仔夺,pid是當(dāng)前poolLocal的索引,pid+i+1隨著i的遞增會遍歷所有其他的poolLocal
l := indexLocal(locals, (pid+i+1)%int(size))
// 因?yàn)槭菑钠渌膒oolLocal獲取對象伯顶,所以不能獲取private囚灼,只能獲取share
// 如果這里也能獲取private骆膝,那么又得考慮并發(fā)安全祭衩,無論是加鎖還是使用復(fù)雜的邏輯結(jié)構(gòu),都跟private的初衷即加快存取性能有違背
// 注意這里只能從popTail取阅签,popTail后面會說掐暮,后面再解釋為啥這里只能從popTail取
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
// 還沒找到,只能嘗試從victim中找了
// 其實(shí)victim將在下一輪GC中被回收政钟,此處可以當(dāng)做二級緩存來用路克,可以增加pool池的命中率
// 查找的操作基本跟上面一致
size = atomic.LoadUintptr(&p.victimSize)
// 這里會通過前置判斷來減少不必要的查找
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
// 到這里說明victim也取不到對象,而victim此時(shí)只會靜靜等待GC回收了养交,不會有改變了
// 所以將victimSize置為0精算,跟前面的前置判斷相呼應(yīng)
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
到這里,sync.pool
的存取對象操作流程就說完了碎连,繼續(xù)往下一層看看poolChain
的pushHead
灰羽、popHead
和popTail
操作
poolChain
func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
}
func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
}
// 從頭存入對象
func (c *poolChain) pushHead(val interface{}) {
// 獲取鏈表的頭節(jié)點(diǎn)
d := c.head
// 如果還沒有頭節(jié)點(diǎn)就創(chuàng)建一個(gè)
if d == nil {
// Initialize the chain.
// 節(jié)點(diǎn)環(huán)隊(duì)列的長度初始為8,后續(xù)每增加一個(gè)節(jié)點(diǎn),長度*2(即始終是2的倍數(shù))廉嚼,原因后面說
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
// 初始化環(huán)隊(duì)列
d.vals = make([]eface, initSize)
// 新節(jié)點(diǎn)連到鏈表中
c.head = d
storePoolChainElt(&c.tail, d)
}
// 向環(huán)隊(duì)列頭添加val玫镐,pushHead后面會說,現(xiàn)在先說鏈表維度的
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
// 如果頭節(jié)點(diǎn)的環(huán)隊(duì)列滿了怠噪,插入失敗恐似,就再新建一個(gè)節(jié)點(diǎn),size*2
newSize := len(d.vals) * 2
// 這里有一個(gè)環(huán)隊(duì)列的長度門限值
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
// 同樣的操作
// 注意這是prev指針指向d而不是next
// 因?yàn)檫@里是根據(jù)創(chuàng)建順序來的傍念,d在d2之前創(chuàng)建的矫夷,所以是prev指向d
// 而此時(shí)d2是頭節(jié)點(diǎn),所以感知上是反序的
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
// 將新節(jié)點(diǎn)加入到鏈表中且是新的頭節(jié)點(diǎn)
c.head = d2
// d.next指向d2
storePoolChainElt(&d.next, d2)
// 存入val
d2.pushHead(val)
}
// 從頭部取出對象
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
// 如果沒有頭節(jié)點(diǎn)憋槐,那說明還沒存入口四,返回nil
for d != nil {
// 從環(huán)隊(duì)列中取
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
// 如果當(dāng)前節(jié)點(diǎn)沒找到,繼續(xù)向后找
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
func (c *poolChain) popTail() (interface{}, bool) {
// 取到尾部節(jié)點(diǎn)
d := loadPoolChainElt(&c.tail)
// 還未存入對象
if d == nil {
return nil, false
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
// 向前獲取一個(gè)節(jié)點(diǎn)
d2 := loadPoolChainElt(&d.next)
// 如果獲取到了則返回
if val, ok := d.popTail(); ok {
return val, ok
}
// 否則如果尾節(jié)點(diǎn)環(huán)隊(duì)列是空的秦陋,并且前一個(gè)節(jié)點(diǎn)的環(huán)隊(duì)列也是空的
// 就說明所有的節(jié)點(diǎn)的環(huán)隊(duì)列現(xiàn)在都是空的蔓彩,不用往前找了
// 因?yàn)槌藀opTail操作外就只有pushHead和popHead操作,這兩個(gè)操作可以認(rèn)為是入棧和出棧驳概,如果棧底是空的那么整個(gè)棧都是空的
if d2 == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
// 這里是個(gè)原子操作赤嚼,因?yàn)闀卸鄠€(gè)其他的goroutine來竊取對象
// 將tail指針往前挪,刪除掉當(dāng)前空的tail節(jié)點(diǎn)
// 為什么要?jiǎng)h掉顺又,因?yàn)檫@部分空的節(jié)點(diǎn)不可能再使用了更卒,因?yàn)榇嫒胫挥衟ushHead操作,而pushHead始終都是往頭部插入的
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
}
// 繼續(xù)往前遍歷
d = d2
}
}
到這里鏈表poolChain
也說完了稚照,再往下一層就是節(jié)點(diǎn)poolDequeue
蹂空,也是真正存取的對象
poolDequeue
type dequeueNil *struct{}
// 還記得吧,前面說的poolDequeue的headTail是通過高低位來表示head和tail的
// 解出head和tail
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
// const dequeueBits = 32
const mask = 1<<dequeueBits - 1
// 取高32位
head = uint32((ptrs >> dequeueBits) & mask)
// 取低32位
tail = uint32(ptrs & mask)
return
}
// 組合headTail
func (d *poolDequeue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val interface{}) bool {
// 獲取head和tail
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 判斷隊(duì)列是否滿了
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
// 因?yàn)閔ead有可能是一直遞增的(配合popTail)果录,這里通過取模mask來得到正確的索引
// 這里就是為什么環(huán)隊(duì)列的長度始終是2的倍數(shù)了上枕,就是為了方便得到mask進(jìn)行取模
// 這種取模的方式類似redis的sds
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
// 獲取目標(biāo)slot的typ
typ := atomic.LoadPointer(&slot.typ)
// 如果typ不為空,說明有popTail還沒執(zhí)行完弱恒,不能并發(fā)進(jìn)行辨萍,放棄
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
// val=nil本身是用來表示空slot的,如果存入的是nil返弹,需要用dequeueNil進(jìn)行封裝來區(qū)分
// dequeueNil就是*struct{}
if val == nil {
val = dequeueNil(nil)
}
// 這里是個(gè)騷操作锈玉,注意eface的定義,跟interface底層的無接口方法的eface是不是很像
// 直接將val轉(zhuǎn)成interface义起,底層typ標(biāo)注類型拉背,val標(biāo)注數(shù)據(jù)
// 這也是為啥當(dāng)存入的是nil的時(shí)候需要轉(zhuǎn)成*struct{},為了讓typ不等于nil默终,從而不會被判定為是空slot
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
// head+1
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
// 從頭部彈出對象
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 如果環(huán)隊(duì)列是空的
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
// 這里是個(gè)原子操作椅棺,用來更新headTail
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
// 如果更新成功
// 獲取對應(yīng)的slot
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
// 否則就重新再來一次
}
// 注意這里不同于pushHead抡诞,是先更新headTail再進(jìn)行取操作,具體原因后面會說
val := *(*interface{})(unsafe.Pointer(slot))
// 如果是存入的是nil土陪,解封裝
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
// slot置空
*slot = eface{}
return val, true
}
// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
// 從尾部彈出對象
// 該方法只有在當(dāng)前P被其他的P竊取對象時(shí)才會用昼汗,除了自身的并發(fā)競爭外還會跟上面的pushHead、popHead產(chǎn)生并發(fā)競爭
func (d *poolDequeue) popTail() (interface{}, bool) {
var slot *eface
for {
// 這里操作跟popHead基本一樣
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
ptrs2 := d.pack(head, tail+1)
// 也是原子操作鬼雀,這里跟popHead中對headTail的原子操作天然形成了互斥
// 也就是說popTail和popHead之間是無鎖且并發(fā)安全的顷窒,同時(shí)也支持自身的并發(fā)安全
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
// 也是同樣的操作
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
// 這里可能有人有疑問,為啥只對slot.typ進(jìn)行原子操作
// 主要吧slot是eface源哩,兩個(gè)unsafe.pointer鞋吉,占用16字節(jié),atomic還無法對16字節(jié)進(jìn)行原子操作
// 所以割了一半励烦,只對slot.typ進(jìn)行原子操作
// 還記得pushHead中的typ := atomic.LoadPointer(&slot.typ)吧谓着,跟這里是呼應(yīng)的
// 只有這里執(zhí)行完后置slot.typ為nil,pushHead才能獲得該slot進(jìn)行后續(xù)操作
// 所以這里又通過兩個(gè)原子操作來解決了popTail和pushHead的并發(fā)問題
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
}
總結(jié)
sync.pool
的前幾個(gè)版本還沒有這么復(fù)雜坛掠,同樣性能也比較差赊锚,后續(xù)迭代持續(xù)做了優(yōu)化,下面說說sync.pool
實(shí)現(xiàn)的一些亮點(diǎn)
- 利用P的原生隔離屬性屉栓,對緩存池進(jìn)行分段舷蒲,減少了鎖粒度,降低了并發(fā)競爭的概率
- 使用victim cache來進(jìn)行緩存池的新老替換友多,實(shí)現(xiàn)了定期觸發(fā)GC回收減少內(nèi)存占用牲平,也可作為二級緩存來增加命中率
- 通過增加pad來避免cpu緩存的偽共享,提升讀取性能
- 底層存儲使用eface結(jié)構(gòu)體域滥,方便進(jìn)行判空和賦值(使用interface{})纵柿,并且環(huán)隊(duì)列單節(jié)點(diǎn)使用固定2^n大小,方便通過mask計(jì)算存取位置启绰,同時(shí)通過鏈表來實(shí)現(xiàn)擴(kuò)容和收縮昂儒,然后定義通過定義頭部存取和尾部取的行為來控制O(1)復(fù)雜度
- 支持P之間共享分段池,通過限制其他P只能從尾部獲取對象以及最小粒度的原子操作來實(shí)現(xiàn)了無鎖共享
- 環(huán)隊(duì)列使用一個(gè)uint64的headTail來實(shí)現(xiàn)酬土,通過位移操作來解出head和tail荆忍,方便進(jìn)行原子操作,而且head和tail都是uint32撤缴,考慮溢出的話,邏輯上也是個(gè)環(huán)形結(jié)構(gòu)叽唱,跟實(shí)際存儲的環(huán)形結(jié)構(gòu)保持一致屈呕,更方便計(jì)算位置索引