你真的了解 sync.Mutex嗎

【關(guān)注公眾號】「syd3600520」 回復(fù)002 獲取Go相關(guān)學(xué)習(xí)資料

Mutex是一個互斥的排他鎖炕横,零值Mutex為未上鎖狀態(tài)份殿,Mutex一旦被使用 禁止被拷貝。使用起來也比較簡單

package main

import "sync"

func main() {
    m := sync.Mutex{}
    m.Lock()
    defer m.Unlock()
  // do something
}

Mutex有兩種操作模式:

  • 正常模式(非公平模式)

阻塞等待的goroutine保存在FIFO的隊列中颂斜,喚醒的goroutine不直接擁有鎖拾枣,需要與新來的goroutine競爭獲取鎖。因?yàn)樾聛淼?code>goroutine很多已經(jīng)占有了CPU邑茄,所以喚醒的goroutine在競爭中很容易輸俊啼;但如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為饑餓模式授帕。

  • 饑餓模式(公平模式)

這種模式下,直接將等待隊列隊頭goroutine解鎖goroutine彤路;新來的gorountine也不會嘗試獲得鎖芥映,而是直接插入到等待隊列隊尾屏轰。

mutex mode

如果一個goroutine獲得了鎖霎苗,并且他在等待隊列隊尾 或者 他等待小于1ms榛做,則會將Mutex的模式切換回正常模式检眯。正常模式有更好的性能,新來的goroutine通過幾次競爭可以直接獲取到鎖刽严,盡管當(dāng)前仍有等待的goroutine避凝。而饑餓模式則是對正常模式的補(bǔ)充管削,防止等待隊列中的goroutine永遠(yuǎn)沒有機(jī)會獲取鎖。

其數(shù)據(jù)結(jié)構(gòu)為:

type Mutex struct {
    state int32 // 鎖競爭的狀態(tài)值
    sema  uint32 // 信號量
}

state代表了當(dāng)前鎖的狀態(tài)崎弃、 是否是存在自旋、是否是饑餓模式线婚、阻塞goroutine數(shù)量

    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
mutex state

mutex.state & mutexLocked 加鎖狀態(tài) 1 表示已加鎖 0 表示未枷鎖

mutex.state & mutexWoken 喚醒狀態(tài) 1 表示已喚醒狀態(tài) 0 表示未喚醒

mutex.state & mutexStarving 饑餓狀態(tài) 1 表示饑餓狀體 0表示正常狀態(tài)

mutex.state >> mutexWaiterShift得到當(dāng)前goroutine數(shù)目

Lock

上鎖大致分為fast-pathslow-path

Fast-path

lock通過調(diào)用atomic.CompareAndSwapInt32來競爭更新m.state酌伊,成功則獲得鎖居砖;失敗驴娃,則進(jìn)入slow-path

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

atomic.CompareAndSwapInt32正如簽名一樣唇敞,進(jìn)行比較交換操作,這過程是原子的

// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

源碼中我們并不能看到該函數(shù)的具體實(shí)現(xiàn)咒精,他的實(shí)現(xiàn)跟硬件平臺有關(guān)模叙,我們可以查看匯編代碼一窺究竟鞋屈,go tool compile -S mutex.go也可以對二進(jìn)制文件go tool objdump -s methodname binary

    0x0036 00054 (loop.go:6)    MOVQ    AX, CX
    0x0039 00057 ($GOROOT/src/sync/mutex.go:74) XORL    AX, AX
    0x003b 00059 ($GOROOT/src/sync/mutex.go:74) MOVL    $1, DX
    0x0040 00064 ($GOROOT/src/sync/mutex.go:74) LOCK
    0x0041 00065 ($GOROOT/src/sync/mutex.go:74) CMPXCHGL    DX, (CX)
    0x0044 00068 ($GOROOT/src/sync/mutex.go:74) SETEQ   AL
    0x0047 00071 ($GOROOT/src/sync/mutex.go:74) TESTB   AL, AL
    0x0049 00073 ($GOROOT/src/sync/mutex.go:74) JEQ 150
    0x004b 00075 (loop.go:8)    MOVL    $8, ""..autotmp_6+16(SP)
    0x0053 00083 (loop.go:8)    LEAQ    sync.(*Mutex).Unlock·f(SB), AX

重點(diǎn)關(guān)注第5行CMPXCHGL DX, (CX)這個CMPXCHGL是x86和Intel架構(gòu)中的compare and exchange指令厂庇,Java的那套AtomicXX底層也是依賴這個指令來保證原子性操作的权旷。

所以我們看到Mutex是互斥排他鎖且不可重入,當(dāng)我們在一個goroutine獲取同一個鎖會導(dǎo)致死鎖。

package main

import "sync"

func main() {
    m := sync.Mutex{}
    m.Lock()
  //這里會導(dǎo)致死鎖
    m.Lock()
    defer m.Unlock()
}

slow-path

如果goroutinefast-path失敗躲查,這調(diào)用m.lockSlow()進(jìn)入slow-path熙含,函數(shù)內(nèi)部主要是一個for{}死循環(huán)艇纺,進(jìn)入循環(huán)的goroutine大致分為兩類:

  • 新來的gorountine
  • 被喚醒的goroutine

Mutex默認(rèn)為正常模式,若新來的goroutine搶占成功蚓聘,則另一個就需要阻塞等待夜牡;阻塞等待一旦超過閾值1ms則會將Mutex切換到饑餓模式,這個模式下新來的goroutine只能阻塞等待在隊列尾部急迂,沒有搶占的資格僚碎。當(dāng)然等待阻塞->喚醒->參與搶占鎖阴幌,這個過程顯示不是很高效矛双,所以這里有一個自旋的優(yōu)化

當(dāng)mutex處于正常模式且能夠自旋,會讓當(dāng)前goroutine自旋等待,同時設(shè)置mutex.state的mutexWoken位為1懒闷,保證自旋等待的goroutine一定比新來goroutine更有優(yōu)先權(quán)毛雇。這樣unlock操作也會優(yōu)先保證自旋等待的goroutine獲取鎖

golang對自旋做了些限制要求 需要:

  • 多核CPU
  • GOMAXPROCS>1
  • 至少有一個運(yùn)行的P并且local的P隊列為空

感興趣的可以跟下源碼比較簡單

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
    //饑餓模式下不能自旋,也沒有資格搶占侦镇,鎖是手遞手給到等待的goroutine
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {//當(dāng)Mutex處于正常模式且能夠自旋
      //設(shè)置mutexWoken為1 告訴unlock操作壳繁,存在自旋gorountine unlock后不需要喚醒其他goroutine
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
  //  自旋完了 還是沒拿到鎖
        new := old
    //當(dāng)mutex處于正常模式闹炉,將new的mutexLocked設(shè)置為1 即準(zhǔn)備搶占鎖
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
    //加鎖狀態(tài)或饑餓模式下 新來的goroutine進(jìn)入等待隊列
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }

    //將Mutex切換為饑餓模式润樱,若未加鎖則不必切換
    //Unlock操作希望饑餓模式存在等待者
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
      // 當(dāng)前goroutine自旋過 已被被喚醒壹若,則需要將mutexWoken重置
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken //重置mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
      // 當(dāng)前goroutine獲取鎖前mutex處于未加鎖 正常模式下
            if old&(mutexLocked|mutexStarving) == 0 {
                break // 使用CAS成功搶占到鎖
            }
            // waitStartTime!=0表示當(dāng)前goroutine是等待狀態(tài)喚醒的 
      // 為了與第一次調(diào)用Lock的goroutine劃分不同的優(yōu)先級
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
        //開始記錄等待時間
                waitStartTime = runtime_nanotime()
            }
      // 將被喚醒但是沒有獲得鎖的goroutine插入到當(dāng)前等待隊列隊首
      // 使用信號量阻塞當(dāng)前goroutine
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 當(dāng)goroutine等待時間超過starvationThresholdNs,mutex進(jìn)入饑餓模式
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
        //如果當(dāng)前goroutine被喚醒且mutex處于饑餓模式 則將鎖手遞手交給當(dāng)前goroutine
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
        //等待狀態(tài)的goroutine - 1
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
        //如果等待時間小于1ms 或 當(dāng)前goroutine是隊列中最后一個
                if !starving || old>>mutexWaiterShift == 1 {
                  // 退出饑餓模式
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}

Unlock

解鎖分兩種情況

  1. 當(dāng)前只有一個goroutine占有鎖 unlock完 直接結(jié)束
func (m *Mutex) Unlock() {

    // 去除加鎖狀態(tài)
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {//存在等待的goroutine
        m.unlockSlow(new)
    }
}
  1. unlock完閉mutex.state!=0 則存在以下可能
    • 正常模式下
      • 當(dāng)前存在等待goroutine 然后喚醒它 但不是第一個goroutine
      • 當(dāng)前存在自旋等待的goroutine 則不喚醒其他等待gorotune
    • 饑餓模式下
      • 直接將鎖交給等待隊列的第一個goroutine
func (m *Mutex) unlockSlow(new int32) {
  //未加鎖的情況下不能多次調(diào)用unlock
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {//正常模式下
        old := new
        for {
      //沒有等待的goroutine 或 已經(jīng)存在一個獲得鎖 或被喚醒 或處于饑餓模式下不需要喚醒任何處于等待的goroutine
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // 等待狀態(tài)goroutine數(shù)量-1 并設(shè)置喚醒狀態(tài)為1 然后喚醒一個等待goroutine
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
        //喚醒一個阻塞的goroutine 但不是第一個等待者
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
    //饑餓模式下手遞手將鎖交給隊列第一個等待的goroutine
    //即使期間有新來的goroutine到來,只要處于饑餓模式 鎖就不會被新來的goroutine搶占
        runtime_Semrelease(&m.sema, true, 1)
    }
}

信號量

上面可以看到Mutexgoroutine的阻塞和喚醒操作是利用semaphore來實(shí)現(xiàn)的舶胀,大致的思路是:Go runtime維護(hù)了一個全局的變量semtable,它保持了所有的信號量

// Prime to not correlate with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
    root semaRoot
    pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

每個信號量都由一個變量地址指定嚣伐,Mutex的栗子里就是mutex.sema的地址

type semaRoot struct {
    lock  mutex
    treap *sudog // root of balanced tree of unique waiters.
    nwait uint32 // Number of waiters. Read w/o the lock.
}

大致畫了下其數(shù)據(jù)結(jié)構(gòu)

semtable
  1. 當(dāng)goroutine未獲取到鎖轩端,需要阻塞時調(diào)用sync.runtime_SemacquireMutex 進(jìn)入阻塞邏輯
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    gp := getg()
    if gp != gp.m.curg {
        throw("semacquire not on the G stack")
    }

    // 低成本case
  // 若addr大于1 并通過CAS -1 成功船万,則獲取信號量成功 不需要阻塞
    if cansemacquire(addr) {
        return
    }

    // 復(fù)雜 case:
    //  增加等待goroutine數(shù)量
    //  再次嘗試cansemacquire 成功則返回
    //  失敗則將自己作為一個waiter入隊
    //  sleep
    //  (waiter descriptor is dequeued by signaler)
    s := acquireSudog()
    root := semroot(addr)
    t0 := int64(0)
    s.releasetime = 0
    s.acquiretime = 0
    s.ticket = 0
    if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
        if t0 == 0 {
            t0 = cputicks()
        }
        s.acquiretime = t0
    }
    for {
        lock(&root.lock)
        // 給nwait+1 這樣semrelease中不會進(jìn)低成本路徑了
        atomic.Xadd(&root.nwait, 1)
        // 檢查 cansemacquire 避免錯過喚醒
        if cansemacquire(addr) {
            atomic.Xadd(&root.nwait, -1)
            unlock(&root.lock)
            break
        }
    //cansemacquire之后的semrelease都可以知道我們正在等待
    //上面設(shè)置了nwait耿导,所以會直接進(jìn)入sleep 即goparkunlock
        root.queue(addr, s, lifo)
        goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
    if s.releasetime > 0 {
        blockevent(s.releasetime-t0, 3+skipframes)
    }
    releaseSudog(s)
}

如果addr大于1并通過CAS-1成功則獲取信號量成功舱呻,直接返回

否則通過對信號量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root拿到semaRoot(這里個3和251 沒有明白為什么時這兩個數(shù)悠汽??茬高?)怎栽,semaRoot包含了一個sudog鏈表和一個nwait整型字段宿饱。nwait表示該信號量上阻塞等待的g的數(shù)量,同時為了保證線程安全需要一個互斥量來保護(hù)鏈表强饮。

這里需要注意的是 此處的runtime.mutex并不是之前所說的sync.Mutex,是內(nèi)部的一個簡單版本

簡單來說为黎,sync_runtime_Semacquire就是wait知道*s>0 然后原子的遞減它,來完成同步過程中簡單的睡眠原語

  1. 當(dāng)goroutine要釋放鎖 喚醒等待的g時調(diào)用sync.runtime_Semrelease
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
    semrelease1(addr, handoff, skipframes)
}

func semrelease1(addr *uint32, handoff bool, skipframes int) {
    root := semroot(addr)
    atomic.Xadd(addr, 1)

    // Easy case: no waiters?
    // 這個檢查必須發(fā)生在xadd之后 避免錯過喚醒
    // (see loop in semacquire).
    if atomic.Load(&root.nwait) == 0 {
        return
    }

    // Harder case: 搜索一個waiter 并喚醒它
    lock(&root.lock)
    if atomic.Load(&root.nwait) == 0 {
        // count值已經(jīng)被另一個goroutine消費(fèi)了
        // 所以不需要喚醒其他goroutine
        unlock(&root.lock)
        return
    }
    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    unlock(&root.lock)
    if s != nil { // May be slow, so unlock first
        acquiretime := s.acquiretime
        if acquiretime != 0 {
            mutexevent(t0-acquiretime, 3+skipframes)
        }
        if s.ticket != 0 {
            throw("corrupted semaphore ticket")
        }
        if handoff && cansemacquire(addr) {
            s.ticket = 1
        }
        readyWithTime(s, 5+skipframes)
    }
}

關(guān)于信號量更深層的研究可以看下semaphore in plan9

總結(jié)

通過看源碼發(fā)現(xiàn)個有意思的問題: 如果goroutine g1加的鎖 可以被另一個goroutine g2解鎖,但是等到g1解鎖的時候就會panic

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末淮野,一起剝皮案震驚了整個濱河市吹泡,隨后出現(xiàn)的幾起案子爆哑,更是在濱河造成了極大的恐慌,老刑警劉巖队贱,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件柱嫌,死亡現(xiàn)場離奇詭異屯换,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)嘉抓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門抑片,熙熙樓的掌柜王于貴愁眉苦臉地迎上來杨赤,“玉大人望拖,你說我怎么就攤上這事挫鸽。” “怎么了盔沫?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵架诞,是天一觀的道長。 經(jīng)常有香客問我很泊,道長沾谓,這世上最難降的妖魔是什么均驶? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任妇穴,我火速辦了婚禮,結(jié)果婚禮上跑筝,老公的妹妹穿的比我還像新娘携狭。我一直安慰自己,他們只是感情好稀并,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布碘举。 她就那樣靜靜地躺著搁廓,像睡著了一般境蜕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上售滤,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天完箩,我揣著相機(jī)與錄音,去河邊找鬼阻逮。 笑死秩彤,一個胖子當(dāng)著我的面吹牛呐舔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播食呻,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼仅胞,長吁一口氣:“原來是場噩夢啊……” “哼剑辫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起椎眯,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤编整,失蹤者是張志新(化名)和其女友劉穎乳丰,沒想到半個月后产园,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡粘勒,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年仲义,在試婚紗的時候發(fā)現(xiàn)自己被綠了埃撵。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片虽另。...
    茶點(diǎn)故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖捂刺,靈堂內(nèi)的尸體忽然破棺而出谣拣,到底是詐尸還是另有隱情,我是刑警寧澤族展,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布森缠,位于F島的核電站,受9級特大地震影響仪缸,放射性物質(zhì)發(fā)生泄漏贵涵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一恰画、第九天 我趴在偏房一處隱蔽的房頂上張望宾茂。 院中可真熱鬧拴还,春花似錦跨晴、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至费封,卻和暖如春焕妙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背孝偎。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工访敌, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人衣盾。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓寺旺,卻偏偏與公主長得像,于是被迫代替她去往敵國和親势决。 傳聞我的和親對象是個殘疾皇子阻塑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 本文基于 go1.11 版本。 Mutex 使用 在深入源碼之前果复,要先搞清楚一點(diǎn)陈莽,對 Golang 中互斥鎖 sy...
    LLLeon閱讀 1,593評論 0 4
  • Mutext兩種模式 正常模式和饑餓模式。一開始默認(rèn)處于正常模式。在正常模式中走搁,每個新加入競爭鎖行列的協(xié)程都會直接...
    尼桑麻閱讀 859評論 0 2
  • 前言 Golang中有兩種類型的鎖独柑,Mutex (互斥鎖)和RWMutex(讀寫鎖)對于這兩種鎖的使用這里就不多說...
    LinkinStar閱讀 5,766評論 2 3
  • sync.mutex 源代碼分析 [TOC] 針對 Golang 1.10.3 的 sync.Mutex 進(jìn)行分析...
    CoffeeRabbit閱讀 622評論 0 0
  • 久違的晴天,家長會私植。 家長大會開好到教室時忌栅,離放學(xué)已經(jīng)沒多少時間了。班主任說已經(jīng)安排了三個家長分享經(jīng)驗(yàn)曲稼。 放學(xué)鈴聲...
    飄雪兒5閱讀 7,523評論 16 22