在 sync
包下提供了最基本的同步原語,如互斥鎖 Mutex
再芋。除 Once
和 WaitGroup
類型外济赎,大部分是由低級庫提供的司训,更高級別的同步最好是通過 channel
通訊來實現(xiàn)液南。
Mutex
類型的變量默認值是未加鎖狀態(tài)滑凉,在第一次使用后畅姊,此值將不得
復制若未,這點切記4趾稀O毒巍甚淡!
本文基于go version: 1.16.2
Mutex 鎖實現(xiàn)了 Locker
接口。
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
鎖的模式
為了互斥公平性焙贷,Mutex 分為 正常模式
和 饑餓模式
兩種辙芍。
正常模式
在正常模式下故硅,等待者 waiter
會進入到一個FIFO
隊列吃衅,在獲取鎖時waiter
會按照先進先出的順序獲取徘层。當喚醒一個waiter
時它被并不會立即獲取鎖趣效,而是要與新來的goroutine
競爭,這種情況下新來的goroutine比較有優(yōu)勢猪贪,主要是因為它已經(jīng)運行在CPU跷敬,可能它的數(shù)量還不少,所以waiter
大概率下獲取不到鎖热押。在這種waiter
獲取不到鎖的情況下干花,waiter
會被添加到隊列的前面。如果waiter
獲取不到鎖的時間超出了1毫秒楞黄,它將被切換為饑餓模式池凄。
這里的 waiter
是指新來一個goroutine 時會嘗試一次獲取鎖鬼廓,如果獲取不到我們就視其為watier
肿仑,并將其添加到FIFO隊列里。
饑餓模式
在正常模式下碎税,每次新來的goroutine都會搶走鎖尤慰,就這會導致一些 waiter
永遠也獲取不到鎖,產(chǎn)生饑餓問題雷蹂。所以為了應對高并發(fā)搶鎖場景下的公平性伟端,官方引入了饑餓模式。
在饑餓模式下匪煌,鎖將直接交給隊列最前面的waiter
责蝠。新來的goroutine即使在鎖未被持有情況下也不會參與競爭鎖党巾,同時也不會進行自旋,而直接將其添加到隊列的尾部霜医。
如果擁有鎖的waiter
發(fā)現(xiàn)有以下兩種情況齿拂,它將切換回正常模式:
- 它是隊列里的最后一個waiter,再也沒有其它waiter
- 等待時間小于1毫秒
模式區(qū)別
正常模式擁有更好的性能肴敛,因為即使有等待搶鎖的 waiter署海,goroutine 也可以連續(xù)多次獲取到鎖。
饑餓模式是對公平性和性能的一種平衡医男,它避免了某些 goroutine 長時間的等待鎖砸狞。在饑餓模式下,優(yōu)先處理的是那些一直在等待的 waiter镀梭。饑餓模式在一定機時會切換回正常模式刀森。
數(shù)據(jù)結(jié)構(gòu)
Mutex 鎖的方法
type Mutex
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
主要有Lock() 和 Unlock() 兩個方法,實現(xiàn)了 Locker 接口丰辣。
Mutex 結(jié)構(gòu)體
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32 // 4字節(jié)
sema uint32 // 4字節(jié)
}
主要有 state
和 sema
兩個字段組成,共8字節(jié)禽捆。其中 sema
是用于控制鎖狀態(tài)的信號量笙什,state
表示鎖的狀態(tài)。
這里的 state
字段是一個復合型的字段胚想,即一個字段包含多個意義琐凭,這樣就可使用最小的內(nèi)存來表示更多的意義,實現(xiàn)互斥鎖浊服。目前共分四部分统屈,其中低三位分別表示mutexed
、mutexWoken
和 mutexStarving
牙躺,剩下的位則用來表示當前共有多少個goroutine 在等待鎖愁憔。
在默認情況下,互斥鎖的所有狀態(tài)位都是0孽拷, 不同的位表示了不同的狀態(tài)
-
mutexLocked
表示鎖定狀態(tài) -
mutexWoken
表示waiter 喚醒狀態(tài) -
mutexStarving
表示饑餓狀態(tài) -
mutexWaiters
表示waiter的個數(shù)吨掌,最大允許記錄 1<<(32-3) -1個goroutine
實現(xiàn)原理
在此之前先了解幾個與Mutex鎖相關(guān)的常量
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken // 2 二進制 0010
mutexStarving // 4 二進制 0100
mutexWaiterShift = iota // 3
starvationThresholdNs = 1e6 // 1毫秒,用來與waiter的等待時間做比較
)
其中前四個常量會參與位運算脓恕。
對于加鎖取與解鎖主要有兩個步驟膜宋,分別為 fast path
和 slow path
兩個方法。我們看一下加鎖 炼幔。
加鎖
加鎖方法對應的是 Lock() 秋茫,其中還有一個私有方法 lockSlow()。
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
// 鎖未被持有乃秀,則直接獲取持有權(quán)
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
// 嘗試自旋競爭或饑餓狀態(tài)下饑餓goroutine競爭
m.lockSlow()
}
先是fast path 幸運路徑肛著,如果獲取到了鎖就直接返回圆兵。如果獲取不到則走 slow path,這里是 lockSlow()
函數(shù)策泣,其實現(xiàn)比較復雜衙傀。
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 當前waiter開始等待時間
starving := false // 當前饑餓狀態(tài)
awoke := false // 當前喚醒狀態(tài)
iter := 0 // 當前自旋次數(shù)
old := m.state // 當前鎖的狀態(tài)
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// 在饑餓模式下不需要自旋,直接將鎖移交給waiter(隊列頭部的waiter)萨咕,因此新來的goroutine永遠也不會獲取鎖
// 正常模式下统抬,鎖被其它goroutine持有,如果當前允許spinning, 則嘗試進行自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true // 設(shè)置當前goroutine喚醒成功
}
runtime_doSpin() // 自旋
iter++ // 當前自旋次數(shù)+1
old = m.state // 當前goroutine再次獲取鎖的狀態(tài),之后會檢查是否鎖被釋放了
continue // 重新判斷spinning
}
......
}
......
}
如果當前鎖被其它goroutine持有(低一位為1)且處于正常模式(低三位為0)危队,且當前還允許自旋則進行自旋操作聪建。
重點介紹這下這塊的位運算邏輯,mutexLocked
和 mutexStarving
這兩個位分別代表了鎖 是否被持有
和 饑餓狀態(tài)
茫陆,它們的二進制值表示 0001
和 0100
金麸。
判斷條件 old&(mutexLocked|mutexStarving) == 0001
可以轉(zhuǎn)化為
old & (0001 | 0100)
old & 0101
如果 old & 0101 = 0001 ,由此計算得知 old 的值必須是低一位為1簿盅,低三位為0挥下。
然后通過函數(shù) runtime_canSpin() 判斷是否可以自旋。
// src/runtime/proc.go
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// sync.Mutex is cooperative, so we are conservative with spinning.
// Spin only few times and only if running on a multicore machine and
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
要想實現(xiàn)自旋桨醋,必須符合以下條件
- 自旋的次數(shù)<4 (active_spin)
- CPU必須為多核處理器
- 當前程序中設(shè)置的
gomaxprocs
個數(shù) >(空閑P個數(shù) + 當前處于自旋m的個數(shù) + 1) - 至少有一個正在運行的P的本地運行隊列為空
如果當前條件同時也滿足自旋的條件棚瘟,則通過 CAS
設(shè)置 mutexWoken
標記以通知解鎖(位運算),并將喚醒變量 awoke
設(shè)置為 true
喜最。
我們看下這塊的位運算邏輯偎蘸,判斷條件共有四個
-
!awoke
表示waiter 處于未喚醒狀態(tài) -
old&mutexWoken == 0
表示未喚醒狀態(tài)。old & 0010 = 0 瞬内,則表示低第二位值為0表示未喚醒狀態(tài) -
old>>mutexWaiterShift != 0
即m.state >> 3
的值不等于0迷雪,則說明當前waitersCount > 0
, 表示當前存在等待釋放鎖的 goroutine -
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)
設(shè)置喚醒位的值為1。如0001 | 0010 = 0011
調(diào)用函數(shù) runtime_doSpin()
執(zhí)行自旋虫蝶,并更新自旋次數(shù)章咧,同步新的狀態(tài) m.state。此函數(shù)內(nèi)部會執(zhí)行30次的 PAUSE
指令能真。
自旋邏輯結(jié)束后慧邮,會根據(jù)當前 m.state 最新值進行一些處理。
func (m *Mutex) lockSlow() {
for {
......
// 當前goroutine的 m.state 新狀態(tài)
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果當前處于正常模式舟陆,則加鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 當前處于饑餓模式误澳,則更新waiters數(shù)量 +1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
// 當前goroutine處于饑餓狀態(tài)且鎖被其它goroutine持有,新狀態(tài)則更新鎖為饑餓模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 當前goroutine的waiter被喚醒,則重置flag
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
// 喚醒狀態(tài)不一致秦躯,直接拋出異常
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 新狀態(tài)清除喚醒標記
new &^= mutexWoken
}
// CAS更新 m.state 狀態(tài)成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 鎖已被釋放且為正常模式
if old&(mutexLocked|mutexStarving) == 0 {
// 通過 CAS 函數(shù)獲取了鎖忆谓,直接中止返回
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
// waitStartTime != 0 說明當前已處于等待狀態(tài)
queueLifo := waitStartTime != 0
// 首次設(shè)置當前goroutine的開始等待時間
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// queueLifo為true,說明已經(jīng)等待了一會踱承,本次循環(huán)則直接將waiter添加到等待隊列的頭部倡缠,使用信號量
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 如果當前goroutine的等待時間>1毫秒則視為饑餓狀態(tài)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果處于饑餓狀態(tài)(有可能等待時間>1毫秒)
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加鎖并且將waiter數(shù)減1(暫時未理解這塊)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 當前goroutine非饑餓狀態(tài) 或者 等待隊列只剩下一個waiter哨免,則退出饑餓模式(清除饑餓標識位)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新狀態(tài)值并中止for循環(huán)
atomic.AddInt32(&m.state, delta)
break
}
// 設(shè)置當前goroutine為喚醒狀態(tài),且重置自璇次數(shù)
awoke = true
iter = 0
} else {
old = m.state
}
}
}
可以看到主要實現(xiàn)原來就是通過一個for循環(huán)實現(xiàn)的昙沦,正常模式下可能發(fā)生spinning琢唾,而允許自旋必須有四個條件,最多允許有四次spinning機會盾饮,否則將轉(zhuǎn)為饑餓模式采桃。饑餓模式下,需要對waiter數(shù)據(jù)進行累加丘损。而當隊列里只剩下一個它自己一個waiter的時候普办,會恢復為正常模式。每次是計算出了新的狀態(tài)值new徘钥,下面通過 cas 實現(xiàn)更新狀態(tài)衔蹲,如果更新失敗,則讀取新的鎖狀態(tài)m.state并開始新一輪的for循環(huán)邏輯呈础。這里的邏輯比較復雜不是太容易理解舆驶。
以上是加鎖的過程,下面我們再看下解鎖的過程.
解鎖
解鎖對應的方法為 Unlock()而钞,同時對應的私有方法為 unlockSlow()沙廉,相比加鎖代碼要簡單的太多了。
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
......
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果 new=0 表示恢復了鎖的默認初始化狀態(tài)笨忌,否則表示鎖仍在使用
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
這里要注意一下蓝仲,對于Mutexq鎖來說俱病,一個goroutine里進行加鎖官疲,在其它goroutine是可以實現(xiàn)解鎖的,但不要重復解鎖亮隙,否則可能會觸發(fā)panic途凫。一般是哪個goroutine加鎖,就由那個goroutine來解鎖溢吻。
解鎖大致流程和加鎖差不多维费,先是執(zhí)行fast path
原子更新,如果失敗則執(zhí)行 slow path
過程促王。
func (m *Mutex) unlockSlow(new int32) {
// 未加鎖狀態(tài)犀盟,直接解鎖出錯
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式 (當前m.state & mutexStarving ==0,則說明 m.state 的 mutexStarving 位是0)
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果當前隊列里沒有waiters 或 當前goroutine已經(jīng)喚醒 或 持有了鎖,則不需要喚醒其它waiter
// 在饑餓模式下蝇狼,鎖控制權(quán)直接交給下一個waiter
// 如果 當前隊列沒有waiter(old>>mutexWaiterShift == 0) 或 (鎖為被持有狀態(tài)阅畴、喚醒狀態(tài)、饑餓狀態(tài)其中條件之一)迅耘,則直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 這里 old-1<<mutexWaiterShift 表示 waiters 數(shù)量-1贱枣,mutexWoken 表示當前goroutine設(shè)置新值喚醒位值為喚醒狀態(tài)监署,然后再通過CAS更新m.state,如果更新失敗纽哥,說明當前m.state 已被其它goroutine修改過了钠乏,然后它再重新讀取m.state的值,開始新一輪的for循環(huán)
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 搶鎖控制權(quán)成功(解鎖成功)
runtime_Semrelease(&m.sema, false, 1)
return
}
// 重新讀取m.state值春塌,開始新一輪判斷
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饑餓模式晓避,將當前鎖控制權(quán)直接交給下一個waiter
runtime_Semrelease(&m.sema, true, 1)
}
}
解鎖源碼比較好理解,對于slow path
而言
- 饑餓模式直接調(diào)用函數(shù)
runtime_Semrelease()
摔笤,通過信號量將鎖控制權(quán)交給下一個waiter够滑。 - 正常模式下分以下情況
- 如果等待隊列里沒有
waiter
或 鎖為被持有狀態(tài)
、喚醒狀態(tài)
吕世、饑餓狀態(tài)
三者其中條件之一彰触,則直接返回并結(jié)束處理邏輯; - 當前goroutine搶鎖的控制權(quán)命辖。先讀取m.state的值况毅,waiters數(shù)量減少1,并修改狀態(tài)為喚醒標記尔艇,最后通過CAS修改m.state尔许,如果修改成功則表示搶鎖控制權(quán)成功,即解鎖成功终娃,則直接結(jié)束
- 否則重新讀取m.state 的值味廊,for 循環(huán)新一輪的邏輯
- 如果等待隊列里沒有
總結(jié)
- 鎖模式分為
正常模式
和饑餓模式
。正常模式下新來的goroutine與waiter競爭鎖棠耕,且新來的goroutine大概率優(yōu)先獲取鎖余佛;饑餓模式下隊列頭部的waiter獲取鎖,新來的goroutine直接進入waiter 隊列窍荧,同時也不會spinning - 饑餓模式下辉巡,擁有鎖的waiter當發(fā)現(xiàn)它是隊列中的最后一個waiter或者
等待時間<1毫秒
時,將自動切換為正常模式 - 在正常模式下蕊退,新來的goroutine如果獲取不到鎖郊楣,則將嘗試
spinning
- 在加鎖和解鎖是分
fast path
和slow path
兩種路徑,加鎖時執(zhí)行runtime_SemacquireMutex()
函數(shù)瓤荔,解鎖時執(zhí)行對應的runtime_Semrelease()
函數(shù)