【golang源碼分析】sync.Mutex
概述
Mutex只有兩個阻塞方法Lock和Unlock海蔽,有兩種工作模式:normal和starvation撼嗓。
goroutine在搶占鎖時,會先自旋一段時間细移,如果搶占失敗會被放在一個FIFO等待隊列中休眠搏予,當(dāng)鎖被釋放時,會優(yōu)先喚醒隊首的goroutine弧轧。
在normal模式中雪侥,被喚醒的waiter會跟新到的goroutine競爭鎖,一般來說新的goroutine已經(jīng)在cpu中進(jìn)行了精绎,并且可能有不止一個goroutine速缨,在這種情況下剛被喚醒的goroutine有很大概率會失敗,而被重新放回FIFO隊首代乃。如果當(dāng)一個waiter等待獲取鎖的時間超過1ms旬牲,會將mutex轉(zhuǎn)換成starvation模式。
在starvation模式搁吓,當(dāng)mutex被unlock時引谜,持有權(quán)會直接移交到隊首的goroutine中。新加入的goroutine不會再參與鎖的競爭擎浴,而是直接放入等待隊列的尾部员咽。
mutex通過一個state變量來維護(hù)鎖的狀態(tài)信息。
- 低一位mutexLocked:表示鎖是否被鎖定
- 低二位mutexWoken:表示是否有g(shù)oroutine在競爭鎖贮预,這個主要應(yīng)用于unlock時判斷是否有必要喚醒等待隊列中的goroutine
- 低三位mutexStarving:表示鎖的模式starvation或normal贝室,剩余高位用于標(biāo)志等待隊列的長度契讲。
代碼分析
定義
Mutex實(shí)現(xiàn)了Locker接口,維護(hù)兩個變量——state用來維護(hù)鎖的狀態(tài)滑频,同時通過操作sema來喚醒和休眠等待goroutine
type Mutex struct {
state int32
sema uint32
}
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
Lock
完整代碼
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
詳解
- 首先默認(rèn)state為0捡偏,通過cas操作將鎖的狀態(tài)更新為mutexLocked,嘗試快速獲取鎖峡迷。在并發(fā)度較小的情況下银伟,mutex多數(shù)處于初始狀態(tài),可以提供加鎖性能绘搞。如果快速獲取鎖失敗彤避,goroutine會不斷地進(jìn)行自旋搶鎖、休眠夯辖、喚醒搶鎖琉预,直到搶到鎖后break。在這期間蒿褂,需要通過cas對state進(jìn)行更新圆米,只有更新成功的goroutine才能進(jìn)行下一階段的操作。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
- 競爭鎖的goroutine會先自旋一段時間直到mutex被unlocked啄栓、或切換到starvation模式娄帖、或自旋次數(shù)達(dá)到上限。自旋過程中昙楚,goroutine也會嘗試通過cas將mutexWoken位置1块茁,以通知Unlock操作不必喚醒等待隊列中的goroutine。starvatione模式是不需要自旋的桂肌,因?yàn)殒i的持有權(quán)會直接移交給等待隊列隊首的goroutine数焊,自旋操作沒有實(shí)際意義。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 當(dāng)前goroutine未更新成功過woken位崎场,woken位為0佩耳,等待隊列非空,通過cas嘗試更新woken位
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // pause一段時間
iter++
old = m.state
continue
}
- 從前面代碼可以看到谭跨,退出自旋有三種原因干厚,及其對應(yīng)的處理如下:
- mutex被Unlock:將mutexLocked置1,代表嘗試搶鎖螃宙,然后cas嘗試更新state
- mutex切換到starvation模式:不改變mutexLocked蛮瞄,waiter數(shù)量+1,cas更新state成功后后直接加入到等待隊列里谆扎。
- 自旋次數(shù)達(dá)到上限:不改變mutexLocked挂捅,waiter數(shù)量+1,如果starving變量為true堂湖,則將mutexStarving位置1闲先。
starving是goroutine被喚醒時通過計算等待時間獲取的状土,大于1ms則為true,表示需要切換到starvation模式伺糠。
這里需要注意蒙谓,當(dāng)mutex為unlocked的時候,不需要轉(zhuǎn)換mutex模式训桶。因?yàn)閟tarving為true累驮,說明該goroutine是normal模式下等待隊列中剛被喚醒的waiter(原因需要看后面的代碼,在starvation模式舵揭。會直接將mutex的持有權(quán)交給喚醒的goroutine谤专,走不到這一步)。如果此時做了狀態(tài)切換琉朽,可能會出現(xiàn)在starvation模式,等待隊列為空的情況稚铣。因此箱叁,只有自旋搶鎖失敗的情況下,才會由等待時間超過閾值的goroutine將mutex模式切換到starvation惕医。
new := old
// 鎖被unlocked耕漱,將mutexLocked置1,代表嘗試搶鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
// starving或自旋次數(shù)到上限, 等待數(shù)量+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果已經(jīng)是starvation狀態(tài)抬伺,這一步?jīng)]什么意義
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 把a(bǔ)woke位給清掉
new &^= mutexWoken
}
- cas 更新mutex狀態(tài)螟够,失敗則重復(fù)上述操作,直到狀態(tài)更新成功峡钓。
- 如果是獲取鎖的動作妓笙,則直接break。
- 否則將goroutine放到等待隊列中能岩,若waitStartTime不為0寞宫,則說明是waiter搶鎖失敗,應(yīng)該重新返回隊首拉鹃。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 獲取鎖成功辈赋,break。接下來要處理starving和自旋次數(shù)達(dá)到上限的情況
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// waitStartTime說明是被喚醒的goroutine搶鎖失敗了膏燕,應(yīng)該重新放回隊首(queueLifo=true)
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 休眠
runtime_SemacquireMutex(&m.sema, queueLifo)
- 被喚醒后钥屈,如果當(dāng)前starving為true,或等待時間大于閾值坝辫,則將starving置為true篷就,下一次自旋搶鎖失敗會將mutex切換到starvation模式。在starvation模式近忙,該goroutine直接獲取鎖腻脏,否則的話要重復(fù)前面的操作跟新來的goroutine搶鎖
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// starving模式不應(yīng)該出現(xiàn)mutex被locked或woken鸦泳,等待隊列長度也不能為0,有的話說明這段代碼邏輯有bug
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 獲取鎖并將等待數(shù)-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 等待時間小于閾值永品,或該goroutine為等待隊列中最后一個waiter做鹰,切換到normal模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 把狀態(tài)更新過去,獲取鎖并退出鼎姐。這里不用cas钾麸,因?yàn)榍懊娌僮鞅WC了低三位都不會被更新,高位本來是原子計數(shù)炕桨,所以直接add就可以
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
Unlock
// mutex允許一個goroutine加鎖饭尝,另一個goroutine解鎖,不要求兩個操作保證同一個goroutine
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 直接解鎖献宫,然后判斷一下狀態(tài)钥平,如果不一致的話則拋異常
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// starvation模式,則直接喚醒blocked隊列隊首goroutine
// normal模式姊途,如果等待隊列為空涉瘾,或此時鎖的狀態(tài)發(fā)生了變化,則不用喚醒等待隊列中的goroutine
// 否則的話捷兰,將等待數(shù)減1立叛,cas更新state,成功則喚醒一個waiter
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// starving模式贡茅,直接將持有權(quán)交給下一個waiter秘蛇。mutexLocked是在隊首waiter被喚醒后更新的,所以此時mutexLocked還是0顶考,但是在starving
// 模式下赁还,新來的goroutine不會更新mutexLocked位。配合Lock代碼看驹沿。
runtime_Semrelease(&m.sema, true)
}
}
\