鎖
type Locker interface {
Lock()
Unlock()
}
Mutex 互斥鎖
互斥即不可同時(shí)運(yùn)行米绕。即使用了互斥鎖的兩個(gè)代碼片段互相排斥瑟捣,只有其中一個(gè)代碼片段執(zhí)行完成后,另一個(gè)才能執(zhí)行义郑。
type Mutex struct {
state int32
sema uint32
}
state 表示當(dāng)前互斥鎖的狀態(tài)蝶柿,而 sema 是用于控制鎖狀態(tài)的信號量。
互斥鎖的狀態(tài)比較復(fù)雜非驮,如下圖所示,最低三位分別表示 mutexLocked雏赦、mutexWoken 和 mutexStarving劫笙,剩下的位置用來表示當(dāng)前有多少個(gè) Goroutine 在等待互斥鎖的釋放
在默認(rèn)情況下,互斥鎖的所有狀態(tài)位都是 0星岗,int32 中的不同位分別表示了不同的狀態(tài):
- mutexLocked — 表示互斥鎖的鎖定狀態(tài)填大;
- mutexWoken — 表示從正常模式被從喚醒;
- mutexStarving — 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài)俏橘;
- waitersCount — 當(dāng)前互斥鎖上等待的 Goroutine 個(gè)數(shù)允华;
Mutex 正常模式和饑餓模式
- 在正常模式下,鎖的等待者會(huì)按照先進(jìn)先出的順序獲取鎖寥掐。
- 但是剛被喚起的 Goroutine 與新創(chuàng)建的 Goroutine 競爭時(shí)靴寂,大概率會(huì)獲取不到鎖,為了減少這種情況的出現(xiàn)召耘,一旦 Goroutine 超過 1ms 沒有獲取到鎖百炬,它就會(huì)將當(dāng)前互斥鎖切換饑餓模式,防止部分 Goroutine 被『餓死』污它。
- 在饑餓模式中剖踊,互斥鎖會(huì)直接交給等待隊(duì)列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖衫贬、也不會(huì)進(jìn)入自旋狀態(tài)德澈,它們只會(huì)在隊(duì)列的末尾等待。
- 如果一個(gè) Goroutine 獲得了互斥鎖并且它在隊(duì)列的末尾或者它等待的時(shí)間少于 1ms固惯,那么當(dāng)前的互斥鎖就會(huì)切換回正常模式梆造。
- 與饑餓模式相比,正常模式下的互斥鎖能夠提供更好地性能缝呕,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時(shí)澳窑。
const (
mutexLocked = 1 << iota //1 互斥鎖被鎖定 + 未被喚醒
mutexWoken //2 互斥鎖未鎖定 + 被喚醒
mutexStarving //4 互斥鎖未鎖定 + 未被喚醒 + 饑餓模式啟動(dòng)
mutexWaiterShift = iota //3 互斥鎖被鎖定 + 被喚醒
starvationThresholdNs = 1e6
)
加鎖和解鎖
func (m *Mutex) Lock() {
// 當(dāng)鎖的狀態(tài)是 0 時(shí),將state置位 mutexLocked 為 1:
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 如果互斥鎖的狀態(tài)不是 0 時(shí)就會(huì)調(diào)用 sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)
m.lockSlow()
}
func (m *Mutex) lockSlow() {
該方法的主體是一個(gè)非常大 for 循環(huán)供常,這里將它分成幾個(gè)部分介紹獲取鎖的過程:
- 判斷當(dāng)前 Goroutine 能否進(jìn)入自旋摊聋;
- 通過自旋等待互斥鎖的釋放;
- 計(jì)算互斥鎖的最新狀態(tài)栈暇;
- 更新互斥鎖的狀態(tài)并獲取鎖麻裁;
自旋是一種多線程同步機(jī)制,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會(huì)一直保持 CPU 的占用,持續(xù)檢查某個(gè)條件是否為真
在多核的 CPU 上煎源,自旋可以避免 Goroutine 的切換色迂,使用恰當(dāng)會(huì)對性能帶來很大的增益,但是使用的不恰當(dāng)就會(huì)拖慢整個(gè)程序手销,所以 Goroutine 進(jìn)入自旋的條件非承苛刻
- 互斥鎖只有在普通模式才能進(jìn)入自旋;
- runtime.sync_runtime_canSpin 需要返回 true:
- 運(yùn)行在多 CPU 的機(jī)器上锋拖;
- 當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次诈悍;
- 當(dāng)前機(jī)器上至少存在一個(gè)正在運(yùn)行的處理器 P 并且處理的運(yùn)行隊(duì)列為空;
下面代碼為 進(jìn)入自旋 邏輯
// 開始等待時(shí)間戳
var waitStartTime int64
// 饑餓模式標(biāo)識
starving := false
// 喚醒標(biāo)識
awoke := false
// 自旋次數(shù)
iter := 0
// 保存當(dāng)前對象鎖狀態(tài)
old := m.state
for {
// 鎖是非饑餓狀態(tài)兽埃,鎖還沒被釋放侥钳,嘗試自旋
// 判斷相當(dāng)于xxxx...x0xx & 0101 = 01,當(dāng)前對象狀態(tài)為:xxxx…x0xx 當(dāng)前對象鎖被使用
// runtime_canSpin(iter) 根據(jù)iter自旋次數(shù)判斷
// 互斥鎖被鎖定時(shí) 進(jìn)入自旋狀態(tài) continue
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke &&
// 判斷相當(dāng)于: xxxx...xx0x & 0010 = 0
// 當(dāng)前對象狀態(tài)為未被喚醒時(shí)
old&mutexWoken == 0 &&
// 當(dāng)前有 Goroutine 在等待互斥鎖的釋放時(shí)柄错,也就是有g(shù)oroution在排隊(duì)時(shí) old>>mutexWaiterShift != 0 為 true
old>>mutexWaiterShift != 0 &&
// 將對象 改為喚醒狀態(tài):xxxx...xx0x | 0010 = xxxx...xx1x
// 在將 m.state 改為被喚醒狀態(tài)
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)
{
// 告知解鎖舷夺,不要喚醒其他阻塞的goroutines
awoke = true
}
// 進(jìn)入自旋狀態(tài) 當(dāng)前goroutine并不掛起,仍然在占用cpu資源售貌,重試一定次數(shù)后给猾,退出自旋狀態(tài)
runtime_doSpin()
//表示自旋次數(shù)
iter++
// 保存mutex對象即將被設(shè)置成的狀態(tài)
//再次獲取鎖的狀態(tài),之后會(huì)檢查是否鎖被釋放了
old = m.state
continue
}
一旦當(dāng)前 Goroutine 能夠進(jìn)入自旋就會(huì)調(diào)用runtime.sync_runtime_doSpin 和 runtime.procyield 并執(zhí)行 30 次的 PAUSE 指令趁矾,該指令只會(huì)占用 CPU 并消耗 CPU 時(shí)間:
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET
處理了自旋相關(guān)的特殊邏輯之后耙册,互斥鎖會(huì)根據(jù)上下文計(jì)算當(dāng)前互斥鎖最新的狀態(tài)。幾個(gè)不同的條件分別會(huì)更新 state 字段中存儲(chǔ)的不同信息 — mutexLocked毫捣、mutexStarving详拙、mutexWoken 和 mutexWaiterShift:
new := old
// old 后三位為 mutexLocked,mutexWaiterShift,mutexWoken 時(shí) old&mutexStarving == 0 為true
// xxxx...x0xx & 0100 = 0 xxxx...x0xx
// old&mutexStarving == 0 篩選當(dāng)前狀態(tài)為正常模式(非饑餓模式)
// 將新來的goroutines 互斥鎖鎖定
if old&mutexStarving == 0 {
// 將未鎖定狀態(tài)改為鎖定狀態(tài)
// 非饑餓狀態(tài),加鎖
new |= mutexLocked
}
// xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;
// old 互斥鎖被鎖定時(shí)但是狀態(tài)為饑餓模式時(shí)蔓同,mutex的等待goroutine數(shù)目加1
if old&(mutexLocked|mutexStarving) != 0 {
// new + 8 在等待互斥鎖的釋放的Goroutine數(shù)量 +1
// 更新阻塞goroutine的數(shù)量,表示mutex的等待goroutine數(shù)目加1
new += 1 << mutexWaiterShift
}
// xxxx...xxx1 & 0001 != 0饶辙;鎖狀態(tài)為鎖定狀態(tài)
if starving && old&mutexLocked != 0 {
// new + 4 表示從正常模式進(jìn)入饑餓模式
// xxxx...xxx | 0100 => xxxx...x1xx
new |= mutexStarving
}
// 當(dāng) 喚醒標(biāo)志為 true時(shí)
if awoke {
// xxxx...xx1x & 0010 = 0, 如果喚醒標(biāo)志為0 ,表示為被喚醒時(shí)斑粱,panic
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :設(shè)置喚醒狀態(tài)位0,goroutine是被喚醒的弃揽,新狀態(tài)清除喚醒標(biāo)志
new &^= mutexWoken
}
計(jì)算了新的互斥鎖狀態(tài)之后,會(huì)使用 CAS 函數(shù) sync/atomic.compareandswapint32 更新狀態(tài)
//判斷cas鎖是否 設(shè)置新狀態(tài) 成功
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//原來鎖的狀態(tài)已釋放则北,并且不是饑餓狀態(tài)矿微,正常請求到了鎖,返回
// xxxx...x0x0 & 0101 = 0尚揣, 互斥鎖未加鎖且為正常模式(非饑餓模式)
if old&(mutexLocked|mutexStarving) == 0 {
//結(jié)束cas
break
}
// 如果以前就在隊(duì)列里面涌矢,加入到隊(duì)列頭
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
//獲取當(dāng)前運(yùn)行納秒時(shí)間戳
waitStartTime = runtime_nanotime()
}
//通過信號量保證資源不會(huì)被兩個(gè) Goroutine 獲取
//runtime.sync_runtime_SemacquireMutex 會(huì)在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當(dāng)前 Goroutine 可以獲取信號量快骗,它就會(huì)立刻返回
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 判斷 等待時(shí)間 是否超出限制 1e6 將starving 標(biāo)志為 true
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// xxxx...x1xx & 0100 != 0 判斷鎖狀態(tài)是否 進(jìn)入饑餓狀態(tài)
if old&mutexStarving != 0 {
//當(dāng)前 Goroutine 會(huì)獲得互斥鎖娜庇,如果等待隊(duì)列中只存在當(dāng)前 Goroutine塔次,互斥鎖還會(huì)從饑餓模式中退出
// xxxx...xx00 & 0011 = 0
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 加鎖并且將waiter數(shù)減1
// delta -7 7 = 0111
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//判斷 等待隊(duì)列中 是否只存在當(dāng)前 Goroutine
if !starving || old>>mutexWaiterShift == 1 {
// delta -11 11 = 1011
delta -= mutexStarving
}
// m.state - 7 表示當(dāng)前 Goroutine 會(huì)獲得互斥鎖
// m.state - 11 表示當(dāng)前 Goroutine 會(huì)獲得互斥鎖還會(huì)從饑餓模式中退出
atomic.AddInt32(&m.state, delta)
break
}
// 設(shè)置喚醒標(biāo)記 為true
awoke = true
// 重置迭代次數(shù)
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
如果沒有通過 CAS 獲得鎖,會(huì)調(diào)用 runtime.sync_runtime_SemacquireMutex 通過信號量保證資源不會(huì)被兩個(gè) Goroutine 獲取
runtime.sync_runtime_SemacquireMutex 會(huì)在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放名秀,一旦當(dāng)前 Goroutine 可以獲取信號量励负,它就會(huì)立刻返回,sync.Mutex.Lock 的剩余代碼也會(huì)繼續(xù)執(zhí)行匕得。
- 在正常模式下继榆,這段代碼會(huì)設(shè)置喚醒和饑餓標(biāo)記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán)耗跛;
- 在饑餓模式下裕照,當(dāng)前 Goroutine 會(huì)獲得互斥鎖,如果等待隊(duì)列中只存在當(dāng)前 Goroutine调塌,互斥鎖還會(huì)從饑餓模式中退出;
互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單惠猿,該過程會(huì)先使用 sync/atomic.AddInt32 函數(shù)快速解鎖羔砾,這時(shí)會(huì)發(fā)生下面的兩種情況:
- 如果該函數(shù)返回的新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖偶妖;
- 如果該函數(shù)返回的新狀態(tài)不等于 0姜凄,這段代碼會(huì)調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖:
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// m.state - 1
// 新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖
// 新狀態(tài)不等于 0趾访,這段代碼會(huì)調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow 會(huì)先校驗(yàn)鎖狀態(tài)的合法性 — 如果當(dāng)前互斥鎖已經(jīng)被解鎖過了會(huì)直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序态秧。
在正常情況下會(huì)根據(jù)當(dāng)前互斥鎖的狀態(tài),分別處理正常模式和饑餓模式下的互斥鎖:
func (m *Mutex) unlockSlow(new int32) {
// 校驗(yàn)鎖狀態(tài)的合法性
if (new+mutexLocked)&mutexLocked == 0 {
//如果 當(dāng)前互斥鎖已經(jīng)被解鎖過了會(huì)直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序
throw("sync: unlock of unlocked mutex")
}
// 分別處理正常模式和饑餓模式下的互斥鎖
if new&mutexStarving == 0 {
// 正常狀態(tài)
old := new
for {
// 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked扼鞋、mutexStarving申鱼、mutexWoken 狀態(tài)不都為 0(有喚醒者或鎖已經(jīng)加鎖),那么當(dāng)前方法可以直接返回云头,不需要喚醒其他等待者捐友;
//如果沒有其它的 waiter,說明對這個(gè)鎖的競爭的 goroutine 只有一個(gè)溃槐,那就可以直接返回了匣砖;如果這個(gè)時(shí)候有喚醒的 goroutine,或者是又被別人加了鎖昏滴,那么猴鲫,無需我們操勞,其它 goroutine 自己干得都很好谣殊,當(dāng)前的這個(gè) goroutine 就可以放心返回了拂共。
//xxxx…x000 & (0001 | 0010 | 0100) => xxxx…x000 & 0111 = 0
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 如果互斥鎖存在等待者,會(huì)通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán)蟹倾;
//如果有等待者匣缘,并且沒有喚醒的 waiter猖闪,那就需要喚醒一個(gè)等待的 waiter。在喚醒之前肌厨,需要將 waiter 數(shù)量減 1培慌,并且將 mutexWoken 標(biāo)志設(shè)置上,這樣柑爸,Unlock 就可以返回了吵护。
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饑餓狀態(tài)
// 當(dāng)前鎖交給下一個(gè)正在嘗試獲取鎖的等待者,等待者被喚醒后會(huì)得到鎖表鳍,在這時(shí)互斥鎖還不會(huì)退出饑餓狀態(tài)
runtime_Semrelease(&m.sema, true, 1)
}
}
在正常模式下馅而,上述代碼會(huì)使用如下所示的處理過程:
- 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving譬圣、mutexWoken 狀態(tài)不都為 0瓮恭,那么當(dāng)前方法可以直接返回,不需要喚醒其他等待者厘熟;
- 如果互斥鎖存在等待者屯蹦,會(huì)通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán);
在饑餓模式下绳姨,上述代碼會(huì)直接調(diào)用 sync.runtime_Semrelease 將當(dāng)前鎖交給下一個(gè)正在嘗試獲取鎖的等待者登澜,等待者被喚醒后會(huì)得到鎖,在這時(shí)互斥鎖還不會(huì)退出饑餓狀態(tài)飘庄;
總結(jié)
互斥鎖的加鎖過程比較復(fù)雜脑蠕,它涉及自旋、信號量以及調(diào)度等概念:
- 如果互斥鎖處于初始化狀態(tài)跪削,會(huì)通過置位 mutexLocked 加鎖谴仙;
- 如果互斥鎖處于 mutexLocked 狀態(tài)并且在普通模式下工作,會(huì)進(jìn)入自旋切揭,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時(shí)間等待鎖的釋放狞甚;
- 如果當(dāng)前 Goroutine 等待鎖的時(shí)間超過了 1ms,互斥鎖就會(huì)切換到饑餓模式廓旬;
- 互斥鎖在正常情況下會(huì)通過 runtime.sync_runtime_SemacquireMutex 將嘗試獲取鎖的 Goroutine 切換至休眠狀態(tài)哼审,等待鎖的持有者喚醒;
- 如果當(dāng)前 Goroutine 是互斥鎖上的最后一個(gè)等待的協(xié)程或者等待的時(shí)間小于 1ms孕豹,那么它會(huì)將互斥鎖切換回正常模式涩盾;
互斥鎖的解鎖過程與之相比就比較簡單,其代碼行數(shù)不多励背、邏輯清晰春霍,也比較容易理解:
- 當(dāng)互斥鎖已經(jīng)被解鎖時(shí),調(diào)用 sync.Mutex.Unlock 會(huì)直接拋出異常叶眉;
- 當(dāng)互斥鎖處于饑餓模式時(shí)址儒,將鎖的所有權(quán)交給隊(duì)列中的下一個(gè)等待者芹枷,等待者會(huì)負(fù)責(zé)設(shè)置 mutexLocked 標(biāo)志位;
- 當(dāng)互斥鎖處于普通模式時(shí)莲趣,如果沒有 Goroutine 等待鎖的釋放或者已經(jīng)有被喚醒的 Goroutine 獲得了鎖鸳慈,會(huì)直接返回;在其他情況下會(huì)通過 sync.runtime_Semrelease 喚醒對應(yīng)的 Goroutine喧伞;
RWMutex 讀寫鎖
讀寫互斥鎖 sync.RWMutex 是細(xì)粒度的互斥鎖走芋,它不限制資源的并發(fā)讀,但是讀寫潘鲫、寫寫操作無法并行執(zhí)行翁逞。
sync.RWMutex 中總共包含以下 5 個(gè)字段:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
- w — 復(fù)用互斥鎖提供的能力;
- writerSem 和 readerSem — 分別用于寫等待讀和讀等待寫:
- readerCount 存儲(chǔ)了當(dāng)前正在執(zhí)行的讀操作數(shù)量溉仑;
- readerWait 表示當(dāng)寫操作被阻塞時(shí)等待的讀操作個(gè)數(shù)挖函;
寫鎖
當(dāng)資源的使用者想要獲取寫鎖時(shí)娃惯,需要調(diào)用 sync.RWMutex.Lock 方法:
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
//進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
- 調(diào)用結(jié)構(gòu)體持有的 sync.Mutex 結(jié)構(gòu)體的 sync.Mutex.Lock 阻塞后續(xù)的寫操作叽掘;因?yàn)榛コ怄i已經(jīng)被獲取谆棱,其他 Goroutine 在獲取寫鎖時(shí)會(huì)進(jìn)入自旋或者休眠邻储;
- 調(diào)用 sync/atomic.AddInt32 函數(shù)阻塞后續(xù)的讀操作:
- 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 會(huì)調(diào)用 runtime.sync_runtime_SemacquireMutex 進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒偏序;
寫鎖的釋放會(huì)調(diào)用 sync.RWMutex.Unlock:
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
與加鎖的過程正好相反,寫鎖的釋放分以下幾個(gè)執(zhí)行:
- 調(diào)用 sync/atomic.AddInt32 函數(shù)將 readerCount 變回正數(shù),釋放讀鎖吩案;
- 通過 for 循環(huán)釋放所有因?yàn)楂@取讀鎖而陷入等待的 Goroutine:
- 調(diào)用 sync.Mutex.Unlock 釋放寫鎖;
獲取寫鎖時(shí)會(huì)先阻塞寫鎖的獲取帝簇,后阻塞讀鎖的獲取徘郭,這種策略能夠保證讀操作不會(huì)被連續(xù)的寫操作『餓死』。
讀鎖
讀鎖的加鎖方法 sync.RWMutex.RLock 很簡單丧肴,該方法會(huì)通過 sync/atomic.AddInt32 將 readerCount 加一:
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
- 如果該方法返回負(fù)數(shù) — 其他 Goroutine 獲得了寫鎖残揉,當(dāng)前 Goroutine 就會(huì)調(diào)用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待鎖的釋放;
- 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖芋浮,當(dāng)前方法會(huì)成功返回抱环;
當(dāng) Goroutine 想要釋放讀鎖時(shí),會(huì)調(diào)用如下所示的 sync.RWMutex.RUnlock 方法:
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
該方法會(huì)先減少正在讀資源的 readerCount 整數(shù)纸巷,根據(jù) sync/atomic.AddInt32 的返回值不同會(huì)分別進(jìn)行處理:
- 如果返回值大于等于零 — 讀鎖直接解鎖成功镇草;
- 如果返回值小于零 — 有一個(gè)正在執(zhí)行的寫操作,在這時(shí)會(huì)調(diào)用sync.RWMutex.rUnlockSlow 方法瘤旨;
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
sync.RWMutex.rUnlockSlow 會(huì)減少獲取鎖的寫操作等待的讀操作數(shù) readerWait 并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem梯啤,該信號量被觸發(fā)時(shí),調(diào)度器就會(huì)喚醒嘗試獲取寫鎖的 Goroutine存哲。
雖然讀寫互斥鎖 sync.RWMutex 提供的功能比較復(fù)雜因宇,但是因?yàn)樗⒃?sync.Mutex 上七婴,所以實(shí)現(xiàn)會(huì)簡單很多。我們總結(jié)一下讀鎖和寫鎖的關(guān)系:
- 調(diào)用 sync.RWMutex.Lock 嘗試獲取寫鎖時(shí)察滑;
- 每次 sync.RWMutex.RUnlock 都會(huì)將 readerCount 其減一打厘,當(dāng)它歸零時(shí)該 Goroutine 會(huì)獲得寫鎖;
- 將 readerCount 減少 rwmutexMaxReaders 個(gè)數(shù)以阻塞后續(xù)的讀操作杭棵;
- 調(diào)用 sync.RWMutex.Unlock 釋放寫鎖時(shí)婚惫,會(huì)先通知所有的讀操作,然后才會(huì)釋放持有的互斥鎖魂爪;
讀寫互斥鎖在互斥鎖之上提供了額外的更細(xì)粒度的控制先舷,能夠在讀操作遠(yuǎn)遠(yuǎn)多于寫操作時(shí)提升性能。
WaitGroup
sync.WaitGroup 可以等待一組 Goroutine 的返回滓侍,一個(gè)比較常見的使用場景是批量發(fā)出 RPC 或者 HTTP 請求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我們可以通過 sync.WaitGroup 將原本順序執(zhí)行的代碼在多個(gè) Goroutine 中并發(fā)執(zhí)行蒋川,加快程序處理的速度。
WaitGroup 等待多個(gè) Goroutine
結(jié)構(gòu)體
sync.WaitGroup 結(jié)構(gòu)體中只包含兩個(gè)成員變量:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
- noCopy — 保證 sync.WaitGroup 不會(huì)被開發(fā)者通過再賦值的方式拷貝撩笆;
- state1 — 存儲(chǔ)著狀態(tài)和信號量捺球;
sync.noCopy 是一個(gè)特殊的私有結(jié)構(gòu)體,tools/go/analysis/passes/copylock 包中的分析器會(huì)在編譯期間檢查被拷貝的變量中是否包含 sync.noCopy 或者實(shí)現(xiàn)了 Lock 和 Unlock 方法
sync.WaitGroup 結(jié)構(gòu)體中包含一個(gè)總共占用 12 字節(jié)的數(shù)組夕冲,這個(gè)數(shù)組會(huì)存儲(chǔ)當(dāng)前結(jié)構(gòu)體的狀態(tài)氮兵,在 64 位與 32 位的機(jī)器上表現(xiàn)也非常不同。
WaitGroup 在 64 位和 32 位機(jī)器的不同狀態(tài)
- 64 位機(jī)器上本身就能保證 64 位對齊歹鱼,所以按照 64 位對齊來取數(shù)據(jù)泣栈,拿到 state1[0], state1[1] 本身就是64 位對齊的。但是 32 位機(jī)器上并不能保證 64 位對齊弥姻,因?yàn)?32 位機(jī)器是 4 字節(jié)對齊南片,如果也按照 64 位機(jī)器取 state[0],state[1] 就有可能會(huì)造成 atmoic 的使用錯(cuò)誤庭敦。
- 32 位機(jī)器上空出第一個(gè) 32 位疼进,也就使后面 64 位天然滿足 64 位對齊,第一個(gè) 32 位放入 sema 剛好合適
WaitGroup.state1 其實(shí)代表三個(gè)字段:counter秧廉,waiter伞广,sema
- counter :可以理解為一個(gè)計(jì)數(shù)器,計(jì)算經(jīng)過 wg.Add(N), wg.Done() 后的值定血。
- waiter :當(dāng)前等待 WaitGroup 任務(wù)結(jié)束的等待者數(shù)量赔癌。其實(shí)就是調(diào)用 wg.Wait() 的次數(shù),所以通常這個(gè)值是 1 澜沟。
- sema : 信號量灾票,用來喚醒 Wait() 函數(shù)。
sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出它的狀態(tài)和信號量茫虽。
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
接口
sync.WaitGroup 對外暴露了三個(gè)方法 — sync.WaitGroup.Add刊苍、sync.WaitGroup.Wait 和 sync.WaitGroup.Done.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
//更新worker計(jì)數(shù)器
state := atomic.AddUint64(statep, uint64(delta)<<32)
//worker計(jì)數(shù)器:v 是 statep *uint64 的左32位
//waiter計(jì)數(shù)器:w 是 statep *uint64 的右32位
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// worker 大于 0 或者 waiter 等于 0 說明還有Goroutine沒有執(zhí)行完既们,直接返回
if v > 0 || w == 0 {
return
}
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 狀態(tài)設(shè)置為0
*statep = 0
// 通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine(喚醒 Wait())
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的計(jì)數(shù)器 counter。
雖然 sync.WaitGroup.Add 方法傳入的參數(shù)可以為負(fù)數(shù)正什,但是計(jì)數(shù)器只能是非負(fù)數(shù)啥纸,一旦出現(xiàn)負(fù)數(shù)就會(huì)發(fā)生程序崩潰。
當(dāng)調(diào)用計(jì)數(shù)器歸零婴氮,即所有任務(wù)都執(zhí)行完成時(shí)斯棒,才會(huì)通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine。
sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法傳入了 -1
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
sync.WaitGroup 的另一個(gè)方法 sync.WaitGroup.Wait 會(huì)在計(jì)數(shù)器大于 0 并且不存在等待的 Goroutine 時(shí)主经,調(diào)用 runtime.sync_runtime_Semacquire 陷入睡眠荣暮。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
// 循環(huán)判斷 是否滿足退出條件
for {
//worker計(jì)數(shù)器:v 是 statep *uint64 的左32位
//waiter計(jì)數(shù)器:w 是 statep *uint64 的右32位
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// 當(dāng)計(jì)數(shù)器為0時(shí),表示所有Goroutine 都執(zhí)行完成罩驻,立即返回
if v == 0 {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 更新waiter計(jì)數(shù)器 atomic.CompareAndSwapUint64 對uint64值執(zhí)行比較和交換操作
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(semap))
}
// Goroutine 進(jìn)入睡眠狀態(tài)穗酥,等待信號量喚醒
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
當(dāng) sync.WaitGroup 的計(jì)數(shù)器歸零時(shí),陷入睡眠狀態(tài)的 Goroutine 會(huì)被喚醒惠遏,上述方法也會(huì)立刻返回砾跃。
通過對 sync.WaitGroup 的分析和研究,我們能夠得出以下結(jié)論:
- sync.WaitGroup 必須在 sync.WaitGroup.Wait 方法返回之后才能被重新使用节吮;
- sync.WaitGroup.Done 只是對 sync.WaitGroup.Add 方法的簡單封裝抽高,我們可以向 sync.WaitGroup.Add 方法傳入任意負(fù)數(shù)(需要保證計(jì)數(shù)器非負(fù))快速將計(jì)數(shù)器歸零以喚醒等待的 Goroutine;
- 可以同時(shí)有多個(gè) Goroutine 等待當(dāng)前 sync.WaitGroup 計(jì)數(shù)器的歸零透绩,這些 Goroutine 會(huì)被同時(shí)喚醒厨内;
once
Go 語言標(biāo)準(zhǔn)庫中 sync.Once 可以保證在 Go 程序運(yùn)行期間的某段代碼只會(huì)執(zhí)行一次
結(jié)構(gòu)體
每一個(gè) sync.Once 結(jié)構(gòu)體中都只包含一個(gè)用于標(biāo)識代碼塊是否執(zhí)行過的 done 以及一個(gè)互斥鎖 sync.Mutex:
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do 是 sync.Once 結(jié)構(gòu)體對外唯一暴露的方法,該方法會(huì)接收一個(gè)入?yún)榭盏暮瘮?shù):
- 如果傳入的函數(shù)已經(jīng)執(zhí)行過渺贤,會(huì)直接返回;
- 如果傳入的函數(shù)沒有執(zhí)行過请毛,會(huì)調(diào)用 sync.Once.doSlow 執(zhí)行傳入的函數(shù):
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
- 為當(dāng)前 Goroutine 獲取互斥鎖志鞍;
- 執(zhí)行傳入的無入?yún)⒑瘮?shù);
- 運(yùn)行延遲函數(shù)調(diào)用方仿,將成員變量 done 更新成 1固棚;
sync.Once 會(huì)通過成員變量 done 確保函數(shù)不會(huì)執(zhí)行第二次。
作為用于保證函數(shù)執(zhí)行次數(shù)的 sync.Once 結(jié)構(gòu)體仙蚜,它使用互斥鎖和 sync/atomic 包提供的方法實(shí)現(xiàn)了某個(gè)函數(shù)在程序運(yùn)行期間只能執(zhí)行一次的語義此洲。在使用該結(jié)構(gòu)體時(shí),我們也需要注意以下的問題:
- sync.Once.Do方法中傳入的函數(shù)只會(huì)被執(zhí)行一次委粉,哪怕函數(shù)中發(fā)生了 panic呜师;
- 兩次調(diào)用 sync.Once.Do 方法傳入不同的函數(shù)只會(huì)執(zhí)行第一次調(diào)傳入的函數(shù);
Cond
sync.Cond 用來協(xié)調(diào)想要訪問共享資源的 goroutine贾节。
sync.Cond 經(jīng)常用在多個(gè) goroutine 等待汁汗,一個(gè) goroutine 通知(事件發(fā)生)的場景衷畦。如果是一個(gè)通知,一個(gè)等待知牌,使用互斥鎖或 channel 就能搞定了祈争。
使用場景
- 有一個(gè)協(xié)程在異步地接收數(shù)據(jù),剩下的多個(gè)協(xié)程必須等待這個(gè)協(xié)程接收完數(shù)據(jù)角寸,才能讀取到正確的數(shù)據(jù)菩混。在這種情況下,如果單純使用 chan 或互斥鎖扁藕,那么只能有一個(gè)協(xié)程可以等待沮峡,并讀取到數(shù)據(jù),沒辦法通知其他的協(xié)程也讀取數(shù)據(jù)纹磺。
- 這個(gè)時(shí)候帖烘,就需要有個(gè)全局的變量來標(biāo)志第一個(gè)協(xié)程數(shù)據(jù)是否接受完畢,剩下的協(xié)程橄杨,反復(fù)檢查該變量的值秘症,直到滿足要求∈浇茫或者創(chuàng)建多個(gè) channel乡摹,每個(gè)協(xié)程阻塞在一個(gè) channel 上,由接收數(shù)據(jù)的協(xié)程在數(shù)據(jù)接收完畢后采转,逐個(gè)通知聪廉。總之故慈,需要額外的復(fù)雜度來完成這件事
結(jié)構(gòu)體
type Cond struct {
noCopy noCopy
L Locker
notify notifyList
checker copyChecker
}
- noCopy — 用于保證結(jié)構(gòu)體不會(huì)在編譯期間拷貝板熊;
- copyChecker — 用于禁止運(yùn)行期間發(fā)生的拷貝;
- L — 用于保護(hù)內(nèi)部的 notify 字段察绷,Locker 接口類型的變量干签;
- notify — 一個(gè) Goroutine 的鏈表,它是實(shí)現(xiàn)同步機(jī)制的核心結(jié)構(gòu)拆撼;
type notifyList struct {
wait uint32
notify uint32
lock uintptr // key field of the mutex
head unsafe.Pointer
tail unsafe.Pointer
}
在 sync.notifyList 結(jié)構(gòu)體中容劳,head 和 tail 分別指向的鏈表的頭和尾,wait 和 notify 分別表示當(dāng)前正在等待的和已經(jīng)通知到的 Goroutine 的索引闸度。
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
newCond
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
NewCond 創(chuàng)建 Cond 實(shí)例時(shí)竭贩,需要關(guān)聯(lián)一個(gè)鎖。
接口
wait
sync.Cond 對外暴露的 sync.Cond.Wait 方法會(huì)將當(dāng)前 Goroutine 陷入休眠狀態(tài)莺禁,它的執(zhí)行過程分成以下兩個(gè)步驟:
- 調(diào)用 runtime.notifyListAdd 將等待計(jì)數(shù)器加一并解鎖留量;
- 調(diào)用 runtime.notifyListWait 等待其他 Goroutine 的喚醒并加鎖:
func (c *Cond) Wait() {
// 檢查c是否是被復(fù)制的,如果是就panic
c.checker.check()
// 將當(dāng)前goroutine加入等待隊(duì)列
t := runtime_notifyListAdd(&c.notify)
// 解鎖
c.L.Unlock()
// 等待隊(duì)列中的所有的goroutine執(zhí)行等待喚醒操作
runtime_notifyListWait(&c.notify, t)
// 鎖
c.L.Lock()
}
調(diào)用 Wait 會(huì)自動(dòng)釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine肪获,因此當(dāng)前協(xié)程會(huì)阻塞在 Wait 方法調(diào)用的地方寝凌。
如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時(shí)孝赫,會(huì)重新給 c.L 加鎖较木,并且繼續(xù)執(zhí)行 Wait 后面的代碼。
runtime.notifyListWait 會(huì)獲取當(dāng)前 Goroutine 并將它追加到 Goroutine 通知鏈表的最末端:
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
除了將當(dāng)前 Goroutine 追加到鏈表的末端之外青柄,我們還會(huì)調(diào)用 runtime.goparkunlock 將當(dāng)前 Goroutine 陷入休眠伐债,該函數(shù)也是在 Go 語言切換 Goroutine 時(shí)經(jīng)常會(huì)使用的方法,它會(huì)直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒致开。
sync.Cond.Signal 和 sync.Cond.Broadcast 就是用來喚醒陷入休眠的 Goroutine 的方法峰锁,它們的實(shí)現(xiàn)有一些細(xì)微的差別:
- sync.Cond.Signal 方法會(huì)喚醒隊(duì)列最前面的 Goroutine;
- sync.Cond.Broadcast 方法會(huì)喚醒隊(duì)列中全部的 Goroutine双戳;
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
Signal 只喚醒任意 1 個(gè)等待條件變量 c 的 goroutine虹蒋,無需鎖保護(hù)。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
Broadcast 喚醒所有等待條件變量 c 的 goroutine飒货,無需鎖保護(hù)魄衅。
runtime.notifyListNotifyOne 只會(huì)從 sync.notifyList 鏈表中找到滿足 sudog.ticket == l.notify 條件的 Goroutine 并通過 runtime.readyWithTime 喚醒:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
runtime.notifyListNotifyAll 會(huì)依次通過 runtime.readyWithTime 喚醒鏈表中 Goroutine:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Goroutine 的喚醒順序也是按照加入隊(duì)列的先后順序,先加入的會(huì)先被喚醒塘辅,而后加入的可能 Goroutine 需要等待調(diào)度器的調(diào)度晃虫。
在一般情況下,我們都會(huì)先調(diào)用 sync.Cond.Wait 陷入休眠等待滿足期望條件扣墩,當(dāng)滿足喚醒條件時(shí)哲银,就可以選擇使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 喚醒一個(gè)或者全部的 Goroutine。
Cond使用例子
var status int64
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1 * time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
atomic.StoreInt64(&status, 1)
c.Broadcast()
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
for atomic.LoadInt64(&status) != 1 {
c.Wait()
}
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
sync.Cond 不是一個(gè)常用的同步機(jī)制呻惕,但是在條件長時(shí)間無法滿足時(shí)荆责,與使用 for {} 進(jìn)行忙碌等待相比,sync.Cond 能夠讓出處理器的使用權(quán)亚脆,提高 CPU 的利用率草巡。使用時(shí)我們也需要注意以下問題:
- sync.Cond.Wait 在調(diào)用之前一定要使用獲取互斥鎖,否則會(huì)觸發(fā)程序崩潰型酥;
- sync.Cond.Signal 喚醒的 Goroutine 都是隊(duì)列最前面、等待最久的 Goroutine查乒;
- sync.Cond.Broadcast 會(huì)按照一定順序廣播通知等待的全部 Goroutine弥喉;