sync.mutex 源代碼分析
[TOC]
針對 Golang 1.10.3 的 sync.Mutex 進(jìn)行分析殊橙,代碼位置:sync/mutex.go
結(jié)構(gòu)體定義
type Mutex struct {
state int32 // 指代mutex鎖當(dāng)前的狀態(tài)
sema uint32 // 信號量踩叭,用于喚醒goroutine
}
Mutex 中的 state 用于指代鎖當(dāng)前的狀態(tài)险毁,如下所示
1111 1111 ...... 1111 1111
\_________29__________/|||
存儲等待 goroutine 數(shù)量
[0]第0個bit 表示當(dāng)前 mutex 是否已被某個goroutine所擁有,1=加鎖
[1]第1個bit 表示當(dāng)前 mutex 是否被喚醒睹酌,也就是有某個喚醒的goroutine要嘗試獲取鎖
[2]第2個bit 表示 mutex 當(dāng)前是否處于饑餓狀態(tài)
常量定義
const (
mutexLocked = 1 << iota
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
mutexLocked 值為1,根據(jù) mutex.state & mutexLocked 得到 mutex 的加鎖狀態(tài)剩檀,結(jié)果為1表示已加鎖憋沿,0表示未加鎖
mutexWoken 值為2(二進(jìn)制:10),根據(jù) mutex.state & mutexWoken 得到 mutex 的喚醒狀態(tài)沪猴,結(jié)果為1表示已喚醒辐啄,0表示未喚醒
mutexStarving 值為4(二進(jìn)制:100),根據(jù) mutex.state & mutexStarving 得到 mutex 的饑餓狀態(tài)运嗜,結(jié)果為1表示處于饑餓狀態(tài)壶辜,0表示處于正常狀態(tài)
mutexWaiterShift 值為3,根據(jù) mutex.state >> mutexWaiterShift 得到當(dāng)前等待的 goroutine 數(shù)目
starvationThresholdNs 值為1e6納秒担租,也就是1毫秒砸民,當(dāng)?shù)却?duì)列中隊(duì)首 goroutine 等待時間超過 starvationThresholdNs,mutex 進(jìn)入饑餓模式
饑餓模式與正常模式
根據(jù)Mutex的注釋,當(dāng)前的Mutex有如下的性質(zhì)岭参。這些注釋將極大的幫助我們理解Mutex的實(shí)現(xiàn)便贵。
Mutex 有兩種工作模式:正常模式和饑餓模式
正常模式
在正常模式中,所有等待鎖的 goroutine 按照 FIFO 的順序排隊(duì)獲取鎖冗荸,但是一個被喚醒的等待者有時候并不能獲取 mutex承璃,它還需要和新到來的 goroutine 們競爭 mutex 的使用權(quán)。新到來的 goroutine 具有優(yōu)勢蚌本,它們已經(jīng)在 CPU 上運(yùn)行且它們數(shù)量很多盔粹,因此一個被喚醒的等待者有很大的概率獲取不到鎖。在這種情況下程癌,這個被喚醒的 goroutine 會加入到等待隊(duì)列的前面舷嗡。如果一個等待的 goroutine 超過 1ms 沒有獲取鎖,它就會將 mutex 切換到饑餓模式
饑餓模式
在饑餓模式中嵌莉,鎖的所有權(quán)將從 unlock 的 gorutine 直接交給交給等待隊(duì)列中的第一個进萄。新來的 goroutine 將不會嘗試去獲得鎖,即使鎖看起來是 unlock 狀態(tài), 也不會去嘗試自旋操作锐峭,而是放在等待隊(duì)列的尾部中鼠。
如果一個等待的goroutine獲取了鎖,并且滿足一以下其中的任何一個條件沿癞,它會將鎖的狀態(tài)轉(zhuǎn)換為正常狀態(tài)援雇。
- 它是等待隊(duì)列中的最后一個;
- 它等待的時間少于1ms。
函數(shù)
在分析源代碼之前椎扬, 我們要從多線程(goroutine)的并發(fā)場景去理解為什么實(shí)現(xiàn)中有很多的分支惫搏。
當(dāng)一個goroutine獲取這個鎖的時候, 有可能這個鎖根本沒有競爭者蚕涤, 那么這個goroutine輕輕松松獲取了這個鎖筐赔。
而如果這個鎖已經(jīng)被別的goroutine擁有, 就需要考慮怎么處理當(dāng)前的期望獲取鎖的goroutine揖铜。
同時茴丰, 當(dāng)并發(fā)goroutine很多的時候,有可能會有多個競爭者蛮位, 而且還會有通過信號量喚醒的等待者较沪。
以下代碼已經(jīng)去掉了與核心代碼無關(guān)的 race 代碼。
Lock
Lock 方法申請對 mutex 加鎖失仁,Lock 執(zhí)行的時候尸曼,分三種情況:
- 無沖突 通過 CAS 操作把當(dāng)前狀態(tài)設(shè)置為加鎖狀態(tài)
- 有沖突 開始自旋,并等待鎖釋放萄焦,如果其他 goroutine 在這段時間內(nèi)釋放了該鎖控轿,直接獲得該鎖冤竹;如果沒有釋放,進(jìn)入3
- 有沖突茬射,且已經(jīng)過了自旋階段 通過調(diào)用 semacquire 函數(shù)來讓當(dāng)前 goroutine 進(jìn)入等待狀態(tài)
func (m *Mutex) Lock() {
// 如果 mutext 的 state 沒有被鎖鹦蠕,也沒有等待/喚醒的 goroutine , 鎖處于正常狀態(tài),那么獲得鎖在抛,返回.
// 比如鎖第一次被 goroutine 請求時钟病,就是這種狀態(tài)「账螅或者鎖處于空閑的時候肠阱,也是這種狀態(tài)。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
var waitStartTime int64 // 當(dāng)前 goroutine 開始等待的時間
starving := false // 當(dāng)前 goroutine 是否已經(jīng)處于饑餓狀態(tài)
awoke := false // 當(dāng)前 goroutine 是否被喚醒
iter := 0 // 自旋迭代的次數(shù)
old := m.state // old 保存當(dāng)前 mutex 的狀態(tài)
for {
// 第一個條件是 state 已被鎖朴读,但是不是饑餓狀態(tài)屹徘。如果是饑餓狀態(tài),自旋是沒有用的衅金,鎖的擁有權(quán)直接交給了等待隊(duì)列的第一個噪伊。
// 第二個條件是還可以自旋,多核氮唯、壓力不大并且在一定次數(shù)內(nèi)可以自旋鉴吹, 具體的條件可以參考`sync_runtime_canSpin`的實(shí)現(xiàn)(匯編實(shí)現(xiàn),內(nèi)部持續(xù)調(diào)用 PAUSE 指令您觉,消耗 CPU 時間)拙寡。
// 如果滿足這兩個條件,不斷自旋來等待鎖被釋放琳水、或者進(jìn)入饑餓狀態(tài)、或者不能再自旋般堆。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 自旋的過程中如果發(fā)現(xiàn) state 還沒有設(shè)置 woken 標(biāo)識在孝,則設(shè)置它的 woken 標(biāo)識, 并標(biāo)記自己為被喚醒淮摔。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 到了這一步私沮, state的狀態(tài)可能是:
// 1. 鎖還沒有被釋放,鎖處于正常狀態(tài)
// 2. 鎖還沒有被釋放和橙, 鎖處于饑餓狀態(tài)
// 3. 鎖已經(jīng)被釋放仔燕, 鎖處于正常狀態(tài)
// 4. 鎖已經(jīng)被釋放, 鎖處于饑餓狀態(tài)
// 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已經(jīng)設(shè)置了state的woken標(biāo)識)
// new 復(fù)制 state的當(dāng)前狀態(tài)魔招, 用來設(shè)置新的狀態(tài)晰搀。old 是鎖當(dāng)前的狀態(tài)
new := old
// 如果 old state 狀態(tài)不是饑餓狀態(tài), new state 設(shè)置鎖, 嘗試通過CAS獲取鎖办斑。
// 如果 old state 狀態(tài)是饑餓狀態(tài), 則不設(shè)置 new state 的鎖外恕,因?yàn)轲囸I狀態(tài)下鎖直接轉(zhuǎn)給等待隊(duì)列的第一個.
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 當(dāng) mutex 處于加鎖狀態(tài)或饑餓狀態(tài)的時候杆逗,新到來的 goroutine 進(jìn)入等待隊(duì)列
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 當(dāng)前 goroutine 將 mutex 切換為饑餓狀態(tài),但如果當(dāng)前 mutex 未加鎖鳞疲,則不需要切換
// Unlock 操作希望饑餓模式存在等待者
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果本 goroutine 已經(jīng)設(shè)置為喚醒狀態(tài), 需要清除 new state 的喚醒標(biāo)記,
// 因?yàn)楸?goroutine 要么獲得了鎖罪郊,要么進(jìn)入休眠,總之state的新狀態(tài)不再是woken狀態(tài).
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 調(diào)用 CAS 更新 state 狀態(tài)
// 注意 new 的鎖標(biāo)記不一定是 true, 也可能只是標(biāo)記一下鎖的 state 是饑餓狀態(tài).
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果 old state 的狀態(tài)是未被鎖狀態(tài)尚洽,并且鎖不處于饑餓狀態(tài),
// 那么當(dāng)前 goroutine 已經(jīng)獲取了鎖的擁有權(quán)悔橄,返回
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 設(shè)置/計算本 goroutine 的等待時間
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 既然未能獲取到鎖, 那么就使用 sleep 原語阻塞本 goroutine
// 如果是新來的 goroutine, queueLifo=false, 加入到等待隊(duì)列的尾部腺毫,耐心等待
// 如果是喚醒的 goroutine, queueLifo=true, 加入到等待隊(duì)列的頭部
runtime_SemacquireMutex(&m.sema, queueLifo)
// 如果當(dāng)前 goroutine 等待時間超過 starvationThresholdNs癣疟,mutex 進(jìn)入饑餓模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果當(dāng)前的 state 已經(jīng)是饑餓狀態(tài)
// 那么鎖應(yīng)該處于 Unlock 狀態(tài),鎖被直接交給了本 goroutine
if old&mutexStarving != 0 {
// 如果當(dāng)前的 state 已被鎖拴曲,或者已標(biāo)記為喚醒争舞, 或者等待的隊(duì)列中不為空,
// 那么 state 是一個非法狀態(tài)
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 等待狀態(tài)的 goroutine - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不是饑餓模式了或者當(dāng)前等待著只剩下一個,退出饑餓模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新狀態(tài)
// 因?yàn)橐呀?jīng)獲得了鎖澈灼,退出竞川、返回
atomic.AddInt32(&m.state, delta)
break
}
// 如果當(dāng)前的鎖是正常模式,本 goroutine 被喚醒叁熔,自旋次數(shù)清零委乌,從 for 循環(huán)開始處重新開始
awoke = true
iter = 0
} else {
// 如果CAS不成功,重新獲取鎖的 state, 從 for 循環(huán)開始處重新開始
old = m.state
}
}
}
Unlock
Unlock方法釋放所申請的鎖
func (m *Mutex) Unlock() {
// 如果 state 不是處于鎖的狀態(tài), 那么就是 Unlock 根本沒有加鎖的 mutex, panic
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 釋放鎖荣回,并通知其它等待者
// 鎖如果處于饑餓狀態(tài)遭贸,直接交給等待隊(duì)列的第一個, 喚醒它,讓它去獲取鎖
// mutex 正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有等待者心软,或者已經(jīng)存在一個 goroutine 被喚醒或得到鎖壕吹、或處于饑餓模式
// 直接返回.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 將等待的 goroutine-1,并設(shè)置 woken 標(biāo)識
new = (old - 1<<mutexWaiterShift) | mutexWoken
// 設(shè)置新的 state, 這里通過信號量會喚醒一個阻塞的 goroutine 去獲取鎖.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// mutex 饑餓模式删铃,直接將 mutex 擁有權(quán)移交給等待隊(duì)列最前端的 goroutine
// 注意此時 state 的 mutex 還沒有加鎖耳贬,喚醒的 goroutine 會設(shè)置它。
// 在此期間猎唁,如果有新的 goroutine 來請求鎖咒劲, 因?yàn)?mutex 處于饑餓狀態(tài), mutex 還是被認(rèn)為處于鎖狀態(tài)诫隅,
// 新來的 goroutine 不會把鎖搶過去.
runtime_Semrelease(&m.sema, true)
}
}
sync.RWMutex 源碼分析
RWMutex 是讀寫互斥鎖腐魂,鎖可以由任意數(shù)量的讀取器或單個寫入器來保持
RWMutex 的零值是一個解鎖的互斥鎖
RWMutex 是搶占式的讀寫鎖,寫鎖之后來的讀鎖是加不上的
代碼位置:sync/rwmutex.go
結(jié)構(gòu)體定義
type RWMutex struct {
w Mutex // 互斥鎖
writerSem uint32 // 寫鎖信號量
readerSem uint32 // 讀鎖信號量
readerCount int32 // 讀鎖計數(shù)器
readerWait int32 // 獲取寫鎖時需要等待的讀鎖釋放數(shù)量
}
常量定義
const rwmutexMaxReaders = 1 << 30 // 支持最多2^30個讀鎖
函數(shù)
以下是 sync.RWMutex 提供的4個方法
Lock
提供寫鎖加鎖操作
func (rw *RWMutex) Lock() {
// 使用 Mutex 鎖
rw.w.Lock()
// 將當(dāng)前的 readerCount 置為負(fù)數(shù)逐纬,告訴 RUnLock 當(dāng)前存在寫鎖等待
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 等待讀鎖釋放
runtime_Semacquire(&rw.writerSem)
}
}
Unlock
提供寫鎖釋放操作
func (rw *RWMutex) Unlock() {
// 加上 Lock 的時候減去的 rwmutexMaxReaders
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 沒執(zhí)行Lock調(diào)用Unlock蛔屹,拋出異常
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 通知當(dāng)前等待的讀鎖
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 釋放 Mutex 鎖
rw.w.Unlock()
}
RLock
提供讀鎖操作
func (rw *RWMutex) RLock() {
// 每次 goroutine 獲取讀鎖時,readerCount+1
// 如果寫鎖已經(jīng)被獲取风题,那么 readerCount 在 -rwmutexMaxReaders 與 0 之間判导,這時掛起獲取讀鎖的 goroutine
// 如果寫鎖沒有被獲取嫉父,那么 readerCount > 0,獲取讀鎖, 不阻塞
// 通過 readerCount 判斷讀鎖與寫鎖互斥, 如果有寫鎖存在就掛起goroutine, 多個讀鎖可以并行
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 將 goroutine 排到G隊(duì)列的后面,掛起 goroutine
runtime_Semacquire(&rw.readerSem)
}
}
RUnLock
對讀鎖進(jìn)行解鎖
func (rw *RWMutex) RUnlock() {
// 寫鎖等待狀態(tài)眼刃,檢查當(dāng)前是否可以進(jìn)行獲取
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// r + 1 == 0表示直接執(zhí)行RUnlock()
// r + 1 == -rwmutexMaxReaders表示執(zhí)行Lock()再執(zhí)行RUnlock()
// 兩總情況均拋出異常
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 當(dāng)讀鎖釋放完畢后绕辖,通知寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
}
總結(jié)
sync.Mutex
- 同一個時刻只有一個線程能夠拿到鎖
注意:
- 不要重復(fù)鎖定互斥鎖
- 不要忘記解鎖互斥鎖
- 不要在多個函數(shù)之間直接傳遞互斥鎖
sync.RWMutex
- 如果設(shè)置了一個寫鎖,那么其它讀的線程以及寫的線程都拿不到鎖擂红,這個時候仪际,與互斥鎖的功能相同
- 如果設(shè)置了一個讀鎖,那么其它寫的線程是拿不到鎖的昵骤,但是其它讀的線程是可以拿到鎖
讀寫互斥鎖的實(shí)現(xiàn)比較有技巧性一些树碱,需要幾點(diǎn)
- 讀鎖不能阻塞讀鎖,引入readerCount實(shí)現(xiàn)
- 讀鎖需要阻塞寫鎖变秦,直到所有讀鎖都釋放成榜,引入readerSem實(shí)現(xiàn)
- 寫鎖需要阻塞讀鎖,直到所有寫鎖都釋放蹦玫,引入wirterSem實(shí)現(xiàn)
- 寫鎖需要阻塞寫鎖赎婚,引入Metux實(shí)現(xiàn)