golang源碼學習之sync.pool

源碼目錄 ///sync.pool.go (1.14.1)

前言

sync.pool對象池是個好東西跌宛,避免對象的反復創(chuàng)建和回收描沟。對于一些需要頻繁創(chuàng)建的對象我們可以使用它來避免內(nèi)存的頻繁申請囱嫩、回收洞辣。但應避免用于連接池胚吁,因為sync.pool存儲的對象僅存活在三次STW之間延窜。

數(shù)據(jù)結構

//sync.pool.go
type Pool struct {
    noCopy noCopy  // 不可復制標準

    local     unsafe.Pointer // 指向poolLocal數(shù)組
    localSize uintptr        // poolLocal數(shù)組長度

    victim     unsafe.Pointer // 指向poolLocal數(shù)組
    victimSize uintptr        // poolLocal數(shù)組長度

    New func() interface{}  // 自定義的對象創(chuàng)建方法
}

local和victim都指向poolLocal數(shù)組恋脚,它們的區(qū)別是什么呢? pool里面有兩個poolLocal數(shù)組腺办,當經(jīng)歷一次STW后victim置零,將local賦值給victim糟描,local置零怀喉。獲取對象(Get方法)的順序 local --> victim --> New。那poolLocal數(shù)組有多少個呢船响?一個P對應一個poolLocal躬拢,pid作為下標躲履。

//sync.pool.go

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // 沒發(fā)現(xiàn)用在什么地方
}

type poolLocalInternal struct {
    private interface{} // 當前P私有的
    shared  poolChain   // 公共的,當前P可以在頭部壓入和彈出聊闯,其他P可以再尾部彈出
}

poolLocal 就可以當成poolLocalInternal工猜。有一個私有變量存儲單個對象,和一個所有P都可以訪問的公共對象數(shù)組菱蔬。

//sync.poolqueue.go
type poolChain struct {

    // 生產(chǎn)者(當前P)所有篷帅。壓入和彈出。只有一個P對他操作拴泌,不存在競爭魏身,不需要同步
    head *poolChainElt

    // 消費者(所有P)所有。彈出蚪腐。多個P存在競爭箭昵,需要原子操作
    tail *poolChainElt
}

type poolChainElt struct {
    poolDequeue

    next, prev *poolChainElt // 雙向鏈表
}

// 循環(huán)隊列
type poolDequeue struct {
    
    headTail uint64 // 通過它計算數(shù)組的頭和尾,head高32位,tal低32位

    vals []eface  // 對象數(shù)組
}

全局變量

//sync.pool.go
var (
    allPoolsMu Mutex // 全局鎖

    allPools []*Pool // 存儲所有的pool

    oldPools []*Pool 
)

這里的allPools和oldPools都存儲的Pool集合回季,那他們有什么區(qū)別呢家制?allPools存儲的是未經(jīng)歷STW的Pool,使用的是pool的local字段;allPools經(jīng)歷STW之后將local變成victim,allPools變成oldPools茧跋。這兩個變量不參與存取過程慰丛,僅在STW的時候使用。

Put 存

//sync.pool.go
func (p *Pool) Put(x interface{}) {
    if x == nil {  // 空對象不讓存
        return
    }
    
    ....

    l, _ := p.pin()  // 返回poolLocal
    
    // 先往private中放
    if l.private == nil {
        l.private = x
        x = nil
    }
    
    // x != nil 說明往private中放失敗
    if x != nil {
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    ....
}

通過pin()獲取poolLocal瘾杭。如果private為空則放置在private中诅病,否則放置在公共的區(qū)域。在后面會分析pushHead()粥烁。

pin


func (p *Pool) pin() (*poolLocal, int) {
    pid := runtime_procPin()  // 該方法的作用是該P禁止搶占贤笆,意味著當前P一直執(zhí)行當前G
    s := atomic.LoadUintptr(&p.localSize) // poolLocal長度
    l := p.local                          // poolLocal首地址
    if uintptr(pid) < s {   // pid作為poolLocal數(shù)組下標,所以必須小于數(shù)組長度
        return indexLocal(l, pid), pid
    }
    return p.pinSlow() //獲取其它P的poolLocal或生成poolLocal
}

// 根據(jù)首地址 和 下標計算對應的poolLocal的地址
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
    lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
    return (*poolLocal)(lp)
}

獲取當前P的poolLocal, 當然這里的P可能是兩個不同的P,因為中間過程存在調(diào)度讨阻,執(zhí)行該G的P可能變化芥永。

pinSlow

func (p *Pool) pinSlow() (*poolLocal, int) {

    // 在上面pin中執(zhí)行了runtime_procPin,所以需要Unpin
    runtime_procUnpin()
    // 可能需要創(chuàng)建poolLocal,添加到allPools,所以需要加鎖
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin() // 再次禁止搶占
    // poolCleanup won't be called while we are pinned.
    s := p.localSize
    l := p.local
    // 在上面的pin中已經(jīng)判斷了uintptr(pid) < s,為什么還需要再次判斷呢?
    // 因為上面執(zhí)行了Unpin以及l(fā)ock,所以此時的P可能已經(jīng)不是之前的P了
    if uintptr(pid) < s { 
        return indexLocal(l, pid), pid
    }
    if p.local == nil {
        allPools = append(allPools, p) //加入到allPools中
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size) // 從這里可以看出poolLocal的個數(shù)就是P的個數(shù)
    // 使用了新的poolLocal數(shù)組钝吮,指明首地址和長度
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
    return &local[pid], pid
}

  • 一個P對應一個poolLocal埋涧,pid作為poolLocal下標;
  • 獲取poolLocal的時候,會根據(jù)下標pid獲取對應的poolLocal奇瘦。但這不是總是成功的棘催,當修改了GOMAXPROCS導致P的個數(shù)大于poolLocal的個數(shù)時候會重建poolLocal數(shù)組。

Get 獲取


func (p *Pool) Get() interface{} {

  ...
  
  l, pid := p.pin() // 通過pin獲取當前G的P的poolLocal
  x := l.private  // 首先尋找的是private
  l.private = nil
  if x == nil {  // private 不存在
      x, _ = l.shared.popHead()  // 在當前P的公共部分獲取
      if x == nil {  // 如果在當前P獲取失敗耳标,則從其他P中偷一個
          x = p.getSlow(pid)
      }
  }
  runtime_procUnpin()
  ...
  if x == nil && p.New != nil {  // 如果在所有pooLocal和victim都沒有獲取到醇坝,就只有執(zhí)行自定義的New()了
      x = p.New()
  }
  return x
}


流程: 當前P的private --> 當前P的shared --> 其他P的shared --> New

getSlow


func (p *Pool) getSlow(pid int) interface{} {
   // See the comment in pin regarding ordering of the loads.
   size := atomic.LoadUintptr(&p.localSize) // load-acquire
   locals := p.local                        // load-consume
   // 遍歷所有P的公共區(qū)
   for i := 0; i < int(size); i++ {
       l := indexLocal(locals, (pid+i+1)%int(size))
       if x, _ := l.shared.popTail(); x != nil {
           return x
       }
   }


// 如果遍歷了全部poolLocal還是沒獲取到,那就從victim中獲取次坡。victim又是什么呢呼猪?
// victim就是上一輪STW的poolLocal画畅。查找流程和poolLocal一樣,先找pid對應的宋距,再找全部的轴踱。
   size = atomic.LoadUintptr(&p.victimSize)
   if uintptr(pid) >= size {
       return nil
   }
   locals = p.victim
   l := indexLocal(locals, pid)
   if x := l.private; x != nil {
       l.private = nil
       return x
   }
   for i := 0; i < int(size); i++ {
       l := indexLocal(locals, (pid+i)%int(size))
       if x, _ := l.shared.popTail(); x != nil {
           return x
       }
   }

   // 如果在victim中沒獲取到,則將其長度設為0乡革,免得下次在其中查找
   atomic.StoreUintptr(&p.victimSize, 0)

   return nil
}


getSlow就是"偷"寇僧,當然只能在公共區(qū)偷摊腋。先遍歷poolLocal,再遍歷victim沸版。

接下看看如何在poolChain中存取

pushHead


   func (c *poolChain) pushHead(val interface{}) {
       d := c.head
       
       // 如果head不存在就初始化一個
       if d == nil {
           // Initialize the chain.
           const initSize = 8 // Must be a power of 2
           d = new(poolChainElt)
           d.vals = make([]eface, initSize)
           c.head = d
           storePoolChainElt(&c.tail, d) // head不存在也就意味著tail也不存在
       }
   
       // 壓入dequeue中
       if d.pushHead(val) {
           return
       }
       
   
       // 到了這一步說明上面pushHead失敗了,說head對應的dequeue滿了兴蒸,需要添加一個dequeue
       newSize := len(d.vals) * 2  // 當前head的2倍
       if newSize >= dequeueLimit {
           // Can't make it any bigger.
           newSize = dequeueLimit
       }
   
       d2 := &poolChainElt{prev: d}
       d2.vals = make([]eface, newSize)
       c.head = d2
       storePoolChainElt(&d.next, d2)
       d2.pushHead(val)
   }

func (d *poolDequeue) pushHead(val interface{}) bool {
       ptrs := atomic.LoadUint64(&d.headTail)
       head, tail := d.unpack(ptrs)
       
       // 如果首尾剛好相差一個queue長度视粮,說明一個queue滿了
       if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
           // Queue is full.
           return false
       }
       slot := &d.vals[head&uint32(len(d.vals)-1)]
   
       // Check if the head slot has been released by popTail.
       typ := atomic.LoadPointer(&slot.typ)
       if typ != nil { // 為什么要判斷nil,因為在popTail中是先設置headTail,再將typ置零
           // Another goroutine is still cleaning up the tail, so
           // the queue is actually still full.
           return false
       }
   
       // The head slot is free, so we own it.
       if val == nil {
           val = dequeueNil(nil)
       }
       *(*interface{})(unsafe.Pointer(slot)) = val
   
       // Increment head. This passes ownership of slot to popTail
       // and acts as a store barrier for writing the slot.
       atomic.AddUint64(&d.headTail, 1<<dequeueBits)
       return true
   }

存的流程:

  • private 可用就存在private中
  • 往shared的queue的head中存橙凳,如果queue為空蕾殴,則新建一個queue
  • 如果queue滿了,就添加一個2倍的queue節(jié)點

popHead


func (d *poolDequeue) popHead() (interface{}, bool) {
       var slot *eface
       for {
           ptrs := atomic.LoadUint64(&d.headTail)
           head, tail := d.unpack(ptrs)
           if tail == head {
               // Queue is empty.
               return nil, false
           }
   
           // Confirm tail and decrement head. We do this before
           // reading the value to take back ownership of this
           // slot.
           head--
           ptrs2 := d.pack(head, tail)
           if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
               // We successfully took back slot.
               slot = &d.vals[head&uint32(len(d.vals)-1)]
               break
           }
       }
   
       val := *(*interface{})(unsafe.Pointer(slot))
       if val == dequeueNil(nil) {
           val = nil
       }
       // Zero the slot. Unlike popTail, this isn't racing with
       // pushHead, so we don't need to be careful here.
       *slot = eface{}
       return val, true
}

這個邏輯比較簡單岛啸,就是head 減 1

popTail

func (d *poolDequeue) popTail() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        if tail == head {
            // Queue is empty.
            return nil, false
        }

        // Confirm head and tail (for our speculative check
        // above) and increment tail. If this succeeds, then
        // we own the slot at tail.
        ptrs2 := d.pack(head, tail+1)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            // Success.
            slot = &d.vals[tail&uint32(len(d.vals)-1)]
            break
        }
    }

    // We now own slot.
    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }

    // Tell pushHead that we're done with this slot. Zeroing the
    // slot is also important so we don't leave behind references
    // that could keep this object live longer than necessary.
    //
    // We write to val first and then publish that we're done with
    // this slot by atomically writing to typ.
    // 這里為什么不 *slot = eface{}呢钓觉?
    slot.val = nil
    atomic.StorePointer(&slot.typ, nil)
    // At this point pushHead owns the slot.

    return val, true
}

tail先加1,取值坚踩,置零

poolCleanup STW清除


func poolCleanup() {

    
    // victim清空
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

    // poolLocal移到victim荡灾,并清空
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    // allPools 變成 oldPools
    oldPools, allPools = allPools, nil
}


最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市瞬铸,隨后出現(xiàn)的幾起案子批幌,更是在濱河造成了極大的恐慌,老刑警劉巖嗓节,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件荧缘,死亡現(xiàn)場離奇詭異,居然都是意外死亡拦宣,警方通過查閱死者的電腦和手機截粗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來鸵隧,“玉大人绸罗,你說我怎么就攤上這事£桑” “怎么了从诲?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長靡羡。 經(jīng)常有香客問我系洛,道長俊性,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任描扯,我火速辦了婚禮定页,結果婚禮上,老公的妹妹穿的比我還像新娘绽诚。我一直安慰自己典徊,他們只是感情好,可當我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布恩够。 她就那樣靜靜地躺著卒落,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蜂桶。 梳的紋絲不亂的頭發(fā)上儡毕,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天,我揣著相機與錄音扑媚,去河邊找鬼腰湾。 笑死,一個胖子當著我的面吹牛疆股,可吹牛的內(nèi)容都是我干的费坊。 我是一名探鬼主播,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼旬痹,長吁一口氣:“原來是場噩夢啊……” “哼附井!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起唱凯,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤羡忘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后磕昼,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體卷雕,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年票从,在試婚紗的時候發(fā)現(xiàn)自己被綠了漫雕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡峰鄙,死狀恐怖浸间,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布倾哺,位于F島的核電站继榆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏踏揣。R本人自食惡果不足惜输硝,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一柬采、第九天 我趴在偏房一處隱蔽的房頂上張望细移。 院中可真熱鬧搏予,春花似錦、人聲如沸弧轧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽精绎。三九已至速缨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間捺典,已是汗流浹背鸟廓。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留襟己,地道東北人。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓牍陌,卻偏偏與公主長得像擎浴,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子毒涧,可洞房花燭夜當晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容

  • 前言 在 golang 中有一個池贮预,它特別神奇,你只要和它有個約定契讲,你要什么它就給什么仿吞,你用完了還可以還回去,但是...
    LinkinStar閱讀 19,643評論 3 8
  • golang sync.pool對象復用 并發(fā)原理 緩存池 在go http每一次go serve(l)都會構建R...
    fjxCode閱讀 3,957評論 2 7
  • Dispatch Queues dispatch queues是執(zhí)行任務的強大工具捡偏,允許你同步或異步地執(zhí)行任意代碼...
    YangPu閱讀 640評論 0 4
  • 參考go語言的官方包sync.Pool的實現(xiàn)原理和適用場景深入Golang之sync.Pool詳解偽共享(fals...
    合肥黑閱讀 2,247評論 0 0
  • 一個夢改變了我的生活唤冈。 我在坐電梯,突然電梯失控了银伟,一下掉了好幾層你虹,我懷著僥幸的心理,心想也許會正常運行的彤避,結果電...
    希望_604c閱讀 178評論 0 0