【關(guān)注公眾號】「syd3600520」 回復(fù)002 獲取Go相關(guān)學(xué)習(xí)資料
Mutex
是一個互斥的排他鎖炕横,零值Mutex
為未上鎖狀態(tài)份殿,Mutex
一旦被使用 禁止被拷貝。使用起來也比較簡單
package main
import "sync"
func main() {
m := sync.Mutex{}
m.Lock()
defer m.Unlock()
// do something
}
Mutex
有兩種操作模式:
- 正常模式(非公平模式)
阻塞等待的goroutine
保存在FIFO
的隊列中颂斜,喚醒的goroutine
不直接擁有鎖拾枣,需要與新來的goroutine
競爭獲取鎖。因?yàn)樾聛淼?code>goroutine很多已經(jīng)占有了CPU
邑茄,所以喚醒的goroutine
在競爭中很容易輸俊啼;但如果一個goroutine
獲取鎖失敗超過1ms
,則會將Mutex
切換為饑餓模式授帕。
- 饑餓模式(公平模式)
這種模式下,直接將等待隊列隊頭goroutine
解鎖goroutine
彤路;新來的gorountine
也不會嘗試獲得鎖芥映,而是直接插入到等待隊列隊尾屏轰。
如果一個goroutine
獲得了鎖霎苗,并且他在等待隊列隊尾 或者 他等待小于1ms
榛做,則會將Mutex
的模式切換回正常模式检眯。正常模式有更好的性能,新來的goroutine
通過幾次競爭可以直接獲取到鎖刽严,盡管當(dāng)前仍有等待的goroutine
避凝。而饑餓模式則是對正常模式的補(bǔ)充管削,防止等待隊列中的goroutine
永遠(yuǎn)沒有機(jī)會獲取鎖。
其數(shù)據(jù)結(jié)構(gòu)為:
type Mutex struct {
state int32 // 鎖競爭的狀態(tài)值
sema uint32 // 信號量
}
state代表了當(dāng)前鎖的狀態(tài)崎弃、 是否是存在自旋、是否是饑餓模式线婚、阻塞goroutine
數(shù)量
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota
mutex.state & mutexLocked
加鎖狀態(tài) 1 表示已加鎖 0 表示未枷鎖
mutex.state & mutexWoken
喚醒狀態(tài) 1 表示已喚醒狀態(tài) 0 表示未喚醒
mutex.state & mutexStarving
饑餓狀態(tài) 1 表示饑餓狀體 0表示正常狀態(tài)
mutex.state >> mutexWaiterShift
得到當(dāng)前goroutine數(shù)目
Lock
上鎖大致分為fast-path
和slow-path
Fast-path
lock通過調(diào)用atomic.CompareAndSwapInt32
來競爭更新m.state
酌伊,成功則獲得鎖居砖;失敗驴娃,則進(jìn)入slow-path
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
atomic.CompareAndSwapInt32
正如簽名一樣唇敞,進(jìn)行比較和交換操作,這過程是原子的
// CompareAndSwapInt32 executes the compare-and-swap operation for an int32 value.
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
源碼中我們并不能看到該函數(shù)的具體實(shí)現(xiàn)咒精,他的實(shí)現(xiàn)跟硬件平臺有關(guān)模叙,我們可以查看匯編代碼一窺究竟鞋屈,go tool compile -S mutex.go
也可以對二進(jìn)制文件go tool objdump -s methodname binary
0x0036 00054 (loop.go:6) MOVQ AX, CX
0x0039 00057 ($GOROOT/src/sync/mutex.go:74) XORL AX, AX
0x003b 00059 ($GOROOT/src/sync/mutex.go:74) MOVL $1, DX
0x0040 00064 ($GOROOT/src/sync/mutex.go:74) LOCK
0x0041 00065 ($GOROOT/src/sync/mutex.go:74) CMPXCHGL DX, (CX)
0x0044 00068 ($GOROOT/src/sync/mutex.go:74) SETEQ AL
0x0047 00071 ($GOROOT/src/sync/mutex.go:74) TESTB AL, AL
0x0049 00073 ($GOROOT/src/sync/mutex.go:74) JEQ 150
0x004b 00075 (loop.go:8) MOVL $8, ""..autotmp_6+16(SP)
0x0053 00083 (loop.go:8) LEAQ sync.(*Mutex).Unlock·f(SB), AX
重點(diǎn)關(guān)注第5行CMPXCHGL DX, (CX)
這個CMPXCHGL
是x86和Intel架構(gòu)中的compare and exchange
指令厂庇,Java
的那套AtomicXX
底層也是依賴這個指令來保證原子性操作的权旷。
所以我們看到Mutex
是互斥排他鎖且不可重入
,當(dāng)我們在一個goroutine
獲取同一個鎖會導(dǎo)致死鎖。
package main
import "sync"
func main() {
m := sync.Mutex{}
m.Lock()
//這里會導(dǎo)致死鎖
m.Lock()
defer m.Unlock()
}
slow-path
如果goroutine
fast-path失敗躲查,這調(diào)用m.lockSlow()
進(jìn)入slow-path
熙含,函數(shù)內(nèi)部主要是一個for{}
死循環(huán)艇纺,進(jìn)入循環(huán)的goroutine
大致分為兩類:
- 新來的
gorountine
- 被喚醒的
goroutine
Mutex
默認(rèn)為正常模式,若新來的goroutine
搶占成功蚓聘,則另一個就需要阻塞等待夜牡;阻塞等待一旦超過閾值1ms則會將Mutex
切換到饑餓模式,這個模式下新來的goroutine
只能阻塞等待在隊列尾部急迂,沒有搶占的資格僚碎。當(dāng)然等待阻塞->喚醒->參與搶占鎖阴幌,這個過程顯示不是很高效矛双,所以這里有一個自旋
的優(yōu)化
當(dāng)mutex處于正常模式且能夠自旋,會讓當(dāng)前goroutine自旋等待,同時設(shè)置mutex.state的mutexWoken位為1懒闷,保證自旋等待的goroutine一定比新來goroutine更有優(yōu)先權(quán)毛雇。這樣unlock操作也會優(yōu)先保證自旋等待的goroutine獲取鎖
golang對自旋做了些限制要求 需要:
- 多核CPU
- GOMAXPROCS>1
- 至少有一個運(yùn)行的P并且local的P隊列為空
感興趣的可以跟下源碼比較簡單
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
//饑餓模式下不能自旋,也沒有資格搶占侦镇,鎖是手遞手給到等待的goroutine
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {//當(dāng)Mutex處于正常模式且能夠自旋
//設(shè)置mutexWoken為1 告訴unlock操作壳繁,存在自旋gorountine unlock后不需要喚醒其他goroutine
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// 自旋完了 還是沒拿到鎖
new := old
//當(dāng)mutex處于正常模式闹炉,將new的mutexLocked設(shè)置為1 即準(zhǔn)備搶占鎖
if old&mutexStarving == 0 {
new |= mutexLocked
}
//加鎖狀態(tài)或饑餓模式下 新來的goroutine進(jìn)入等待隊列
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
//將Mutex切換為饑餓模式润樱,若未加鎖則不必切換
//Unlock操作希望饑餓模式存在等待者
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// 當(dāng)前goroutine自旋過 已被被喚醒壹若,則需要將mutexWoken重置
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken //重置mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 當(dāng)前goroutine獲取鎖前mutex處于未加鎖 正常模式下
if old&(mutexLocked|mutexStarving) == 0 {
break // 使用CAS成功搶占到鎖
}
// waitStartTime!=0表示當(dāng)前goroutine是等待狀態(tài)喚醒的
// 為了與第一次調(diào)用Lock的goroutine劃分不同的優(yōu)先級
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
//開始記錄等待時間
waitStartTime = runtime_nanotime()
}
// 將被喚醒但是沒有獲得鎖的goroutine插入到當(dāng)前等待隊列隊首
// 使用信號量阻塞當(dāng)前goroutine
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 當(dāng)goroutine等待時間超過starvationThresholdNs,mutex進(jìn)入饑餓模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
//如果當(dāng)前goroutine被喚醒且mutex處于饑餓模式 則將鎖手遞手交給當(dāng)前goroutine
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
//等待狀態(tài)的goroutine - 1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//如果等待時間小于1ms 或 當(dāng)前goroutine是隊列中最后一個
if !starving || old>>mutexWaiterShift == 1 {
// 退出饑餓模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
Unlock
解鎖分兩種情況
- 當(dāng)前只有一個goroutine占有鎖 unlock完 直接結(jié)束
func (m *Mutex) Unlock() {
// 去除加鎖狀態(tài)
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {//存在等待的goroutine
m.unlockSlow(new)
}
}
- unlock完閉mutex.state!=0 則存在以下可能
- 正常模式下
- 當(dāng)前存在等待goroutine 然后喚醒它 但不是第一個goroutine
- 當(dāng)前存在自旋等待的goroutine 則不喚醒其他等待gorotune
- 饑餓模式下
- 直接將鎖交給等待隊列的第一個goroutine
- 正常模式下
func (m *Mutex) unlockSlow(new int32) {
//未加鎖的情況下不能多次調(diào)用unlock
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {//正常模式下
old := new
for {
//沒有等待的goroutine 或 已經(jīng)存在一個獲得鎖 或被喚醒 或處于饑餓模式下不需要喚醒任何處于等待的goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 等待狀態(tài)goroutine數(shù)量-1 并設(shè)置喚醒狀態(tài)為1 然后喚醒一個等待goroutine
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
//喚醒一個阻塞的goroutine 但不是第一個等待者
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
//饑餓模式下手遞手將鎖交給隊列第一個等待的goroutine
//即使期間有新來的goroutine到來,只要處于饑餓模式 鎖就不會被新來的goroutine搶占
runtime_Semrelease(&m.sema, true, 1)
}
}
信號量
上面可以看到Mutex
對goroutine
的阻塞和喚醒操作是利用semaphore
來實(shí)現(xiàn)的舶胀,大致的思路是:Go runtime維護(hù)了一個全局的變量semtable
,它保持了所有的信號量
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
每個信號量都由一個變量地址指定嚣伐,Mutex的栗子里就是mutex.sema
的地址
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
大致畫了下其數(shù)據(jù)結(jié)構(gòu)
- 當(dāng)
goroutine
未獲取到鎖轩端,需要阻塞時調(diào)用sync.runtime_SemacquireMutex
進(jìn)入阻塞邏輯
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// 低成本case
// 若addr大于1 并通過CAS -1 成功船万,則獲取信號量成功 不需要阻塞
if cansemacquire(addr) {
return
}
// 復(fù)雜 case:
// 增加等待goroutine數(shù)量
// 再次嘗試cansemacquire 成功則返回
// 失敗則將自己作為一個waiter入隊
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
// 給nwait+1 這樣semrelease中不會進(jìn)低成本路徑了
atomic.Xadd(&root.nwait, 1)
// 檢查 cansemacquire 避免錯過喚醒
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
//cansemacquire之后的semrelease都可以知道我們正在等待
//上面設(shè)置了nwait耿导,所以會直接進(jìn)入sleep 即goparkunlock
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}
如果addr大于1并通過CAS-1成功則獲取信號量成功舱呻,直接返回
否則通過對信號量地址偏移取模&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
拿到semaRoot
(這里個3和251 沒有明白為什么時這兩個數(shù)悠汽??茬高?)怎栽,semaRoot
包含了一個sudog
鏈表和一個nwait
整型字段宿饱。nwait
表示該信號量上阻塞等待的g的數(shù)量,同時為了保證線程安全需要一個互斥量來保護(hù)鏈表强饮。
這里需要注意的是 此處的runtime.mutex并不是之前所說的sync.Mutex,是內(nèi)部的一個簡單版本
簡單來說为黎,sync_runtime_Semacquire
就是wait知道*s>0 然后原子的遞減它,來完成同步過程中簡單的睡眠原語
- 當(dāng)
goroutine
要釋放鎖 喚醒等待的g時調(diào)用sync.runtime_Semrelease
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
atomic.Xadd(addr, 1)
// Easy case: no waiters?
// 這個檢查必須發(fā)生在xadd之后 避免錯過喚醒
// (see loop in semacquire).
if atomic.Load(&root.nwait) == 0 {
return
}
// Harder case: 搜索一個waiter 并喚醒它
lock(&root.lock)
if atomic.Load(&root.nwait) == 0 {
// count值已經(jīng)被另一個goroutine消費(fèi)了
// 所以不需要喚醒其他goroutine
unlock(&root.lock)
return
}
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // May be slow, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5+skipframes)
}
}
關(guān)于信號量更深層的研究可以看下semaphore in plan9
總結(jié)
通過看源碼發(fā)現(xiàn)個有意思的問題: 如果goroutine g1加的鎖 可以被另一個goroutine g2解鎖,但是等到g1解鎖的時候就會panic