目的
Many Go programs and packages try to reuse memory either for locality reasons or to reduce GC pressure。
緩解GC壓力
GC(garbage collector):
- 自動垃圾回收妓忍,減輕了程序員的壓力
- 減輕壓力的同時萍鲸,也增加了運(yùn)行時開銷破讨。
sync.pool應(yīng)運(yùn)而生,設(shè)計(jì)的目的是用來保存和復(fù)用臨時對象峭状,減小GC分配莫瞬,降低GC壓力。
Pool設(shè)計(jì)用意是在全局變量里維護(hù)的釋放鏈表搓蚪,尤其是被多個 goroutine 同時訪問的全局變量蛤售。使用Pool代替自己寫的釋放鏈表,可以讓程序運(yùn)行的時候妒潭,在恰當(dāng)?shù)膱鼍跋聫某乩?重用-某項(xiàng)值悴能。
sync.Pool的一種使用場景是,為臨時緩沖區(qū)創(chuàng)建一個池雳灾,多個客戶端使用這個緩沖區(qū)來共享全局資源漠酿。
另一方面,不恰當(dāng)?shù)氖褂美踊涯叮绻尫沛湵硎悄硞€對象的一部分炒嘲,并由這個對象維護(hù),而這個對象只由一個客戶端使用匈庭,在這個客戶端工作完成后釋放鏈表夫凸,那么用Pool實(shí)現(xiàn)這個釋放鏈表是不合適的。
由來討論
Brad Fizpatrick曾建議在創(chuàng)建一個工友的Cache
類型阱持。這個建議引發(fā)了一長串的討論夭拌。Go 語言應(yīng)該在標(biāo)準(zhǔn)庫里提供一個這個樣子的類型,還是應(yīng)當(dāng)將這個類型作為私下的實(shí)現(xiàn)?這個實(shí)現(xiàn)應(yīng)該真的釋放內(nèi)存么鸽扁?如果釋放道逗,什么時候釋放?這個類型應(yīng)當(dāng)叫做Cache
献烦,或者更應(yīng)該叫做Pool
https://github.com/golang/go/issues/4720
https://my.oschina.net/u/115763/blog/282376
簡單介紹
A Pool is a set of temporary objects that may be individually saved and retrieved.
池是一組可以單獨(dú)保存和檢索的臨時對象
Any item stored in the Pool may be removed automatically at any time without notification. If the Pool holds the only reference when this happens, the item might be deallocated.
存儲在池中的任何項(xiàng)目都可以隨時自動刪除滓窍,而無需通知。如果發(fā)生這種情況時池保存唯一的引用巩那,則可能會釋放該項(xiàng)
A Pool is safe for use by multiple goroutines simultaneously
并發(fā)安全
上面三句是pool源碼上的摘抄解釋
pool特性
-沒有大小限制吏夯,大小只受限與GC的臨界值
-對象的最大緩存周期是GC周期,當(dāng)GC調(diào)用時即横,沒有被引用的對象的會被清理掉
-Get方法返回的都是池子中的任意一個對象噪生,沒有順序,注意是沒有順序的东囚;如果當(dāng)期池子為空跺嗽,會調(diào)用New方法創(chuàng)建一個對象,沒有New方法則會返回nil
使用場景
高并發(fā)場景下页藻,當(dāng)多個goroutine都需要創(chuàng)建同?個臨時對象的時候桨嫁,因?yàn)閷ο笫钦純?nèi)存的,進(jìn)?導(dǎo)致的就是內(nèi)存回收的GC壓?增加份帐。
造成 “并發(fā)?大-占?內(nèi)存?大-GC緩慢-處理理并發(fā)能?力力降低-并發(fā)更更 ?大”這樣的惡性循環(huán)璃吧。
業(yè)界使用
Echo:
使用了sync.pool來從用內(nèi)存,實(shí)現(xiàn)了0動態(tài)內(nèi)存分配
https://echo.labstack.com/guide/routing
Gin:
上面是gin使用pool作為context的緩存
https://github.com/gin-gonic/gin/blob/73ccfea3ba5a115e74177dbfbc1ea0fff88c13f4/gin.go
fmt:
原生的fmt包里废境,也包含了sync.pool的調(diào)用畜挨。
源碼分析
如上圖所示:在go的M、P噩凹、G模型中巴元,每個P綁定了一個poolLocalInternal,這結(jié)合了go的優(yōu)勢驮宴,使得當(dāng)前P綁定等待隊(duì)列中的任何G對poolLocalInternal的訪問都不需要加鎖逮刨。每個poolLocalInternal中包含private和shared。private為單個對象幻赚,為每個P單獨(dú)維護(hù)禀忆,不具有共享特質(zhì),每次獲取和添加都會首先設(shè)置private落恼;shared為一系列的臨時對象箩退,為共享隊(duì)列,各個P之間通過shard共享對象集佳谦,在go1.13之前戴涝,shard為數(shù)組,在1.13之后修改為使用環(huán)形數(shù)組,通過CAS實(shí)現(xiàn)了lock-free啥刻。
從全局的角度來看奸鸯,全局維護(hù)了一個統(tǒng)一的結(jié)構(gòu),如上圖所示的紅色的pool可帽,pool維護(hù)每個產(chǎn)生的local娄涩,每個local指向每個P綁定的poolLocalInternal。
before Go1.13:
// A Pool must not be copied after first use.
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
// Local per-P Pool appendix.
// 1.13之前
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
}
上面定義了一個Pool結(jié)構(gòu)體映跟,其中聲明了noCopy蓄拣;poolLocalInternal是每個P的一個附件,其中包含一個private的私有對象努隙,只能當(dāng)前P訪問球恤,在獲取和設(shè)置的時候會優(yōu)先從改私有對象中獲取和一個shared的數(shù)組,可以被任意的P訪問荸镊。
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin()
if l.private == nil {
l.private = x
x = nil
}
runtime_procUnpin()
if x != nil {
l.Lock()
l.shared = append(l.shared, x)
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
Put函數(shù)為sync.pool的主要函數(shù)咽斧,用于添加對象。調(diào)用了p.pin()獲取當(dāng)前P的綁定附件躬存,runtime_procUnpin解除綁定關(guān)系张惹,并且設(shè)計(jì)設(shè)置禁止關(guān)系(不禁止強(qiáng)占可能造成并發(fā)問題),通過P先判斷是否可以放進(jìn)private對象中优构,否則放進(jìn)shard數(shù)組中诵叁。
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin()
x := l.private
l.private = nil
runtime_procUnpin()
if x == nil {
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
}
l.Unlock()
if x == nil {
x = p.getSlow()
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ {
l := indexLocal(local, (pid+i+1)%int(size))
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
Get函數(shù)和Put函數(shù)一致,通過pin()獲取當(dāng)前P綁定的附件钦椭。先從private中獲取,再沖shard中獲取碑诉,獲取失敗再調(diào)用getslow函數(shù)彪腔,在getslow函數(shù)中,通過遍歷獲取其余P的shared資源进栽,會偷取最后一個德挣,最后再偷取失敗才會使用出事化函數(shù)New()
Get執(zhí)行流程:private->shard->getslow()->New()
// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid)
}
return p.pinSlow()
}
func (p *Pool) pinSlow() *poolLocal {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid]
}
pin函數(shù)索引當(dāng)前G對應(yīng)的綁定的P,通過runtime_procPin設(shè)置禁止強(qiáng)占快毛,返回當(dāng)前P擁有的poolLocal格嗅,獲取不到時調(diào)用pinslow進(jìn)行第二次獲取。第二次調(diào)用會先使用runtime_procUnpin()進(jìn)行強(qiáng)占解除唠帝,對全局鎖加鎖屯掖,這是如果local為空(第一次創(chuàng)建),則加入全局隊(duì)列中襟衰。
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Defensively zero out everything, 2 reasons:
// 1. To prevent false retention of whole Pools.
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
// it will retain whole Pool. So next cycle memory consumption would be doubled.
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
var (
allPoolsMu Mutex
allPools []*Pool
)
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
poolCleanup為運(yùn)行時的注冊函數(shù)贴铜,在GC開始時調(diào)用,邏輯很暴力,三層for循環(huán)賦空绍坝!
這個版本有啥缺點(diǎn)
- 對全局shared加鎖讀寫徘意,性能較低
- 三層for循環(huán)賦空很暴力,容易造成GC的尖峰
- 每次GC對全量清空轩褐,造成的緩存命中率下降
After Go1.13
在GO1.13之后椎咧,優(yōu)化了以上的問題:
- 對全局的shard加鎖,使用了CAS實(shí)現(xiàn)了lock-free
- 對GC造成的尖峰問題把介,引入了受害者緩存邑退。延長了緩存的聲明周期,增加了緩存的命中效率
可以很清楚的發(fā)現(xiàn)劳澄,和之前的數(shù)據(jù)結(jié)構(gòu)相比地技,1.13之后的版本增加了黃色的poolDequene,那這這和黃色部分又是何方神圣呢秒拔?
// 1.13之后
// Local per-P Pool appendix.
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []eface
}
對鎖的優(yōu)化:
Go在1.13之后增加了poolDequene:
- lock-free
- 生產(chǎn)者可以進(jìn)行pushHead和popTail
- 消費(fèi)者只能進(jìn)行popTail
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func (d *poolDequeue) pushHead(val interface{}) bool {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
新版本使用l.shared.pushHead(x)莫矗,進(jìn)行頭添加,刪除了鎖的使用砂缩。
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l, pid := p.pin()
x := l.private
l.private = nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New()
}
return x
}
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface{}
return val, true
}
在獲取臨時對象的時候作谚,會首先從private中獲取,private為空會接著從shard變量中拉取庵芭,shared變量中也沒有空閑妹懒,接著調(diào)用getSlow從其他P中偷取,偷取失敗的時候双吆,這時候會使用受害者緩存眨唬,這一步是新添加,接著才會調(diào)用New()好乐。
Get執(zhí)行流程:private->shard->getslow()->victim→New()
針對GC尖峰的優(yōu)化:
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// Drop victim caches from all pools.
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// Move primary cache to victim cache.
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
oldPools, allPools = allPools, nil
}
受害者緩存(Victim Cache):是一個與直接匹配或低相聯(lián)緩存并用的匾竿、容量很小的全相聯(lián)緩存。當(dāng)一個數(shù)據(jù)塊被逐出緩存時蔚万,并不直接丟棄岭妖,而是暫先進(jìn)入受害者緩存。如果受害者緩存已滿反璃,就替換掉其中一項(xiàng)昵慌。當(dāng)進(jìn)行緩存標(biāo)簽匹配時,在與索引指向標(biāo)簽匹配的同時淮蜈,并行查看受害者緩存斋攀,如果在受害者緩存發(fā)現(xiàn)匹配,就將其此數(shù)據(jù)塊與緩存中的不匹配數(shù)據(jù)塊做交換礁芦,同時返回給處理器蜻韭。
新版本的poolCleanup增加了victim悼尾,對于原來應(yīng)該被GC的緩存,添加到了victim,銷毀滯后到了下一輪肖方,以此來解決緩存命中率低的問題闺魏。
基準(zhǔn)測試
package main
import (
"sync"
"testing"
)
type info struct {
Val int
}
func BenchmarkNoPool(b *testing.B) {
b.ResetTimer()
var k *info
for i := 0; i < b.N; i++ {
k = &info{Val: 1}
k.Val += 1
}
}
var pInfo = sync.Pool{New: func() interface{} {
return new(info)
}}
func BenchmarkWithPool(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
k := pInfo.Get().(*info)
// 重置
k.Val = 0
k.Val += 1
pInfo.Put(k)
}
}
測試結(jié)果
go test -bench=. -benchmem
goos: darwin
goarch: amd64
pkg: pool_test
BenchmarkNoPool-4 78748666 13.7 ns/op 8 B/op 1 allocs/op
BenchmarkWithPool-4 75934996 16.2 ns/op 0 B/op 0 allocs/op
PASS
ok pool_test 3.962s
函數(shù) | MAXPEOCESS | 總執(zhí)行次數(shù) | 單次平均耗時(ns) | 單詞平均內(nèi)存(B) | 單次分配次數(shù) |
---|---|---|---|---|---|
BenchmarkNoPool | 4 | 78748666 | 13.7 | 8 | 1 |
BenchmarkWithPool | 4 | 75934996 | 16.2 | 0 | 0 |