本文基于 go1.11 版本愤诱。
Mutex 使用
在深入源碼之前贬派,要先搞清楚一點晒屎,對 Golang 中互斥鎖 sync.Mutex
的操作是程序員的主動行為肄渗,可以看作是是一種協(xié)議后雷,而不是強制在操作前必須先獲取鎖季惯。
這樣說可能有點抽象,看下面這段代碼:
package main
import (
"fmt"
"sync"
"time"
)
type People struct {
mux sync.Mutex
Name string
Age uint8
}
func (p *People) IncAge() {
p.mux.Lock()
time.Sleep(3 * time.Second)
p.Age++
p.mux.Unlock()
}
func main() {
leo := &People{Name: "leo", Age: 18}
innerIncTime := time.Now().Second()
fmt.Println("with mutex inc time:", innerIncTime)
go leo.IncAge()
time.Sleep(time.Second)
outerIncTime := time.Now().Second()
fmt.Println("without mutex inc time:", outerIncTime)
leo.Age++
fmt.Println("without mutex inc result:", leo.Age)
fmt.Println("mutex status:", leo.mux)
time.Sleep(2 * time.Second)
fmt.Println("with mutex inc result:", leo.Age)
fmt.Println("Two seconds later mutex status:", leo.mux)
}
在執(zhí)行 leo.Age++
之前已經(jīng)加鎖了喷面,如果是需要強制獲取鎖的話星瘾,這里會等待 3 秒直到鎖釋放后才能執(zhí)行,而這里沒有獲取鎖就可以直接對 Age 字段進行操作惧辈,輸出結(jié)果:
with mutex inc time: 19
without mutex inc time: 20
without mutex inc result: 19
mutex status: {1 0}
with mutex inc result: 20
Two seconds later mutex status: {0 0}
所以琳状,如果在一個 goroutine 中對鎖執(zhí)行了 Lock(),在另一個 goroutine 可以不用理會這個鎖盒齿,直接進行操作(當(dāng)然不建議這么做)念逞。
還有一點需要注意的是困食,鎖只和具體變量關(guān)聯(lián),與特定 goroutine 無關(guān)翎承。雖然可以在一個 goroutine 中加鎖硕盹,在另一個 goroutine 中解鎖(如通過指針傳遞變量,或全局變量都可以)叨咖,但還是建議在同一個代碼塊中進行成對的加鎖解鎖操作瘩例。
源碼分析
這是 Mutex 的 源碼鏈接 。
Mutex 結(jié)構(gòu)
Mutex 表示一個互斥鎖甸各,其零值就是未加鎖狀態(tài)的 mutex垛贤,無需初始化。在首次使用后不要做值拷貝趣倾,這樣可能會使鎖失效聘惦。
type Mutex struct {
state int32 // 表示鎖當(dāng)前的狀態(tài)
sema uint32 // 信號量,用于向處于 Gwaitting 的 G 發(fā)送信號
}
幾個常量
這里用位操作來表示鎖的不同狀態(tài)儒恋。
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
mutexLocked
值為 1善绎,第一位為 1,表示 mutex 已經(jīng)被加鎖诫尽。根據(jù) mutex.state & mutexLocked
的結(jié)果來判斷 mutex 的狀態(tài):該位為 1 表示已加鎖禀酱,0 表示未加鎖。
mutexWoken
值為 2牧嫉,第二位為 1比勉,表示 mutex 是否被喚醒。根據(jù) mutex.state & mutexWoken
的結(jié)果判斷 mutex 是否被喚醒:該位為 1 表示已被喚醒驹止,0 表示未被喚醒浩聋。
mutexStarving
值為 4,第三位為 1臊恋,表示 mutex 是否處于饑餓模式衣洁。根據(jù) mutex.state & mutexWoken
的結(jié)果判斷 mutex 是否處于饑餓模式:該位為 1 表示處于饑餓模式,0 表示正常模式抖仅。
mutexWaiterShift
值為 3坊夫,表示 mutex.state
右移 3 位后即為等待的 goroutine
的數(shù)量。
starvationThresholdNs
值為 1000000 納秒撤卢,即 1ms环凿,表示將 mutex 切換到饑餓模式的等待時間閾值。這個常量在源碼中有大篇幅的注釋放吩,理解這段注釋對理解程序邏輯至關(guān)重要智听,翻譯整理如下:
引入這個常量是為了保證出現(xiàn) mutex 競爭情況時的公平性。mutex 有兩種操作模式:正常模式和饑餓模式。
正常模式下到推,等待者以 FIFO 的順序排隊來獲取鎖考赛,但被喚醒的等待者發(fā)現(xiàn)并沒有獲取到 mutex,并且還要與新到達的 goroutine 們競爭 mutex 的所有權(quán)莉测。新到達的 goroutine 們有一個優(yōu)勢 —— 它們已經(jīng)運行在 CPU 上且可能數(shù)量很多颜骤,所以一個醒來的等待者有很大可能會獲取不到鎖。在這種情況下它處在等待隊列的前面捣卤。如果一個 goroutine 等待 mutex 釋放的時間超過 1ms忍抽,它就會將 mutex 切換到饑餓模式。
在饑餓模式下董朝,mutex 的所有權(quán)直接從對 mutex 執(zhí)行解鎖的 goroutine 傳遞給等待隊列前面的等待者梯找。新到達的 goroutine 們不要嘗試去獲取 mutex,即使它看起來是在解鎖狀態(tài)益涧,也不要試圖自旋(等也白等,在饑餓模式下是不會給你的)驯鳖,而是自己乖乖到等待隊列的尾部排隊去闲询。
如果一個等待者獲得 mutex 的所有權(quán),并且看到以下兩種情況中的任一種:1) 它是等待隊列中的最后一個浅辙,或者 2) 它等待的時間少于 1ms扭弧,它便將 mutex 切換回正常操作模式。
正常模式有更好地性能记舆,因為一個 goroutine 可以連續(xù)獲得好幾次 mutex鸽捻,即使有阻塞的等待者。而饑餓模式可以有效防止出現(xiàn)位于等待隊列尾部的等待者一直無法獲取到 mutex 的情況泽腮。
自旋鎖操作
在開始看 Mutex 源碼前要先介紹幾個與自旋鎖相關(guān)的函數(shù)御蒲,源碼中通過這幾個函數(shù)實現(xiàn)了對自旋鎖的操作。這幾個函數(shù)實際執(zhí)行的代碼都是在 runtime 包中實現(xiàn)的诊赊。
runtime_canSpin
代碼具體位置厚满。
由于 Mutex 的特性,自旋需要比較保守的進行碧磅,原因參考上面 starvationThresholdNs
常量的注釋碘箍。
限制條件是:只能自旋少于 4 次,而且僅當(dāng)運行在多核機器上并且 GOMAXPROCS>1鲸郊;最少有一個其它正在運行的 P丰榴,并且本地的運行隊列 runq 里沒有 G 在等待。與 runtime mutex 相反秆撮,不做被動自旋四濒,因為可以在全局 runq 上或其它 P 上工作。
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
runtime_doSpin
代碼具體位置同上。
執(zhí)行自旋操作峻黍,這個函數(shù)是用匯編實現(xiàn)的复隆,函數(shù)內(nèi)部循環(huán)調(diào)用 PAUSE 指令。PAUSE 指令什么都不做姆涩,但是會消耗 CPU 時間挽拂。
/go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
runtime_SemacquireMutex
代碼具體位置。發(fā)送獲取到 Mutex 的信號骨饿。
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}
runtime_Semrelease
代碼具體位置同上亏栈。發(fā)送釋放 Mutex 的信號。
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool) {
semrelease1(addr, handoff)
}
Lock
這是 Lock 方法的全部源碼宏赘,先看一下整體邏輯绒北,下面會分段解釋:
首先是直接調(diào)用 CAS 嘗試獲取鎖,如果獲取到則將鎖的狀態(tài)從 0 切換為 1 并返回察署。獲取不到就進入 for 循環(huán)闷游,通過自旋來等待鎖被其它 goroutine 釋放,只有兩個地方 break 退出 for 循環(huán)而獲取到鎖贴汪。
源碼實現(xiàn)分析
剛進入函數(shù)脐往,會嘗試獲取鎖:
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
直接通過調(diào)用 CompareAndSwapInt32
這個方法來檢查鎖的狀態(tài)是否是 0,如果是則表示可以加鎖扳埂,將其狀態(tài)轉(zhuǎn)換為 1业簿,當(dāng)前 goroutine 加鎖成功,函數(shù)返回阳懂。
CompareAndSwapInt32
這個方法是匯編實現(xiàn)的梅尤,在單核CPU上運行是可以保證原子性的,但在多核 CPU 上運行時岩调,需要加上 LOCK 前綴來對總線加鎖巷燥,從而保證了該指令的原子性。
至于 race.Enabled
号枕,這里是判斷是否啟用了競爭檢測矾湃,即程序編譯或運行時是否加上了 -race
子命令。關(guān)于競爭檢測如想深入了解可以看官方博客堕澄,見參考目錄 1邀跃。在這里可以不用理會。
如果 mutex 已經(jīng)被其它 goroutine 持有蛙紫,則進入下面的邏輯拍屑。先定義了幾個變量:
var waitStartTime int64 // 當(dāng)前 goroutine 開始等待的時間
starving := false // mutex 當(dāng)前的所處的模式
awoke := false // 當(dāng)前 goroutine 是否被喚醒
iter := 0 // 自旋迭代的次數(shù)
old := m.state // 保存 mutex 當(dāng)前狀態(tài)
進入 for 循環(huán)后,先檢查是否可以進行自旋:
如上所述坑傅,不要在饑餓模式下進行自旋僵驰,因為在饑餓模式下只有等待者們可以獲得 mutex 的所有權(quán),這時自旋是不可能獲取到鎖的。
能進入執(zhí)行自旋邏輯部分的條件:當(dāng)前不是饑餓模式蒜茴,而且當(dāng)前還可以進行自旋(見上面的 runtime_canSpin 函數(shù))星爪。
然后是判斷能否喚醒當(dāng)前 goroutine 的四個條件:根據(jù) 1)!awoke
和 2)old&mutexWoken == 0
來判斷當(dāng)前 goroutine 還沒有被喚醒哀卫;3)old>>mutexWaiterShift != 0
表示還有其它在等待的 goroutine往扔;4)如果當(dāng)前 goroutine 狀態(tài)還沒有變纽乱,就將其狀態(tài)切換為 old|mutexWoken
叉庐, 即喚醒狀態(tài) 。
// old&(mutexLocked|mutexStarving) == mutexLocked 表示 mutex 當(dāng)前不處于饑餓模式披粟。
// 即 old & 0101 == 0001庄岖,old 的第一位必定為 1许蓖,第三位必定為 0窖杀,即未處于饑餓模式漓摩。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 這時自旋是有意義的,通過把 mutexWoken 標(biāo)識為 true入客,以通知 Unlock 方法就先不要叫醒其它
// 阻塞著的 goroutine 了管毙。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 將當(dāng)前 goroutine 標(biāo)識為喚醒狀態(tài)后,執(zhí)行自旋操作桌硫,計數(shù)器加一夭咬,將當(dāng)前狀態(tài)記錄到 old,繼續(xù)循環(huán)等待
runtime_doSpin()
iter++
old = m.state
continue
}
如果不能進行自旋操作鞍泉,進入下面的邏輯:
如果 mutex 當(dāng)前處于正常模式,將 new 的第一位即鎖位設(shè)置為 1肮帐;如果 mutex 當(dāng)前已經(jīng)被加鎖或處于饑餓模式咖驮,則當(dāng)前 goroutine 進入等待隊列;如果 mutex 當(dāng)前處于饑餓模式训枢,而且 mutex 已被加鎖托修,則將 new 的第三位即饑餓模式位設(shè)置為 1。
new := old
// 不要嘗試獲取處于饑餓模式的鎖恒界,新到達的 goroutine 們必須排隊睦刃。
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 沒有獲取到鎖,當(dāng)前 goroutine 進入等待隊列十酣。
// old & 0101 != 0涩拙,那么 old 的第一位和第三位至少有一個為 1,即 mutex 已加鎖或處于饑餓模式耸采。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 當(dāng)前 goroutine 將 mutex 切換到饑餓模式兴泥。但如果當(dāng)前 mutex 是解鎖狀態(tài),不要切換虾宇。
// Unlock 期望處于饑餓模式的 mutex 有等待者搓彻,在這種情況下不會這樣。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
設(shè)置好 new 后,繼續(xù)下面的邏輯:
當(dāng) goroutine 被喚醒時旭贬,如果 new 還沒有被喚醒怔接,則發(fā)生了不一致的 mutex 狀態(tài),拋出錯誤稀轨;否則就重置 new 的第二位即喚醒位為 0扼脐。
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
接下來會調(diào)用 CAS 來將 mutex 當(dāng)前狀態(tài)由 old 更新為 new:
如更新成功,old&(mutexLocked|mutexStarving) == 0
表示 mutex 未鎖定且未處于饑餓模式靶端,則 break 跳出循環(huán)谎势,當(dāng)前 goroutine 獲取到鎖。
如果當(dāng)前的 goroutine 之前已經(jīng)在排隊了杨名,就排到隊列的前面脏榆。runtime_SemacquireMutex(&m.sema, queueLifo)
這個函數(shù)就是做插隊操作的,如果 queueLifo == true台谍,就把當(dāng)前 goroutine 插入到等待隊列的前面须喂。
繼續(xù)往下,如果 mutex 當(dāng)前是處于饑餓模式趁蕊,則修改等待的 goroutine 數(shù)量和第三位即饑餓模式位坞生,break 跳出循環(huán),當(dāng)前 goroutine 獲取到鎖掷伙;如果是正常模式是己,繼續(xù)循環(huán)。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// old & 0101 == 0任柜,old 的第一位和第三位必定不是 1卒废,即 mutex 未鎖定且未處于饑餓模式。
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
// 之前沒排過隊宙地,開始計時摔认。
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
// 確定 mutex 當(dāng)前所處模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不是饑餓模式或只剩一個等待者了,退出饑餓模式
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 未處于饑餓模式宅粥,讓新到來的 goroutine 先獲取鎖参袱,繼續(xù)循環(huán)
awoke = true
iter = 0
} else {
// 上面的 CAS 沒有成功更新為 new,記錄當(dāng)前狀態(tài)秽梅,繼續(xù)循環(huán)
old = m.state
}
Unlock
Unlock 的代碼比較少抹蚀,直接在代碼中注釋:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 將 mutex 當(dāng)前狀態(tài)第一位即鎖位置為 0 保存到 new
new := atomic.AddInt32(&m.state, -mutexLocked)
// 當(dāng) new 狀態(tài)鎖位為 1 時會滿足此條件,即對未加鎖狀態(tài)的 mutex 進行解鎖企垦,拋出錯誤
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果未處于饑餓模式
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有等待者况鸣,或者一個 goroutine 已經(jīng)被喚醒或獲取到鎖,或處于饑餓模式竹观,
// 無需喚醒任何其它被掛起的 goroutine镐捧。
// 在饑餓模式中潜索,所有權(quán)直接從執(zhí)行解鎖的 goroutine 傳遞給下一個等待者。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 等待者數(shù)量減 1懂酱,并將喚醒位改成 1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 喚醒一個阻塞的 goroutine竹习,但不是喚醒第一個等待者
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// 饑餓模式:將 mutex 所有權(quán)傳遞給下個等待者。
// 注意:mutexLocked 沒有設(shè)置列牺,等待者將在被喚醒后設(shè)置它整陌。
// 但是如果設(shè)置了 mutexStarving,仍然認為 mutex 是鎖定的瞎领,所以新來的 goroutine 不會獲取到它泌辫。
runtime_Semrelease(&m.sema, true)
}
}