同步原語(yǔ)和鎖
Golang作為一個(gè)原生支持用戶態(tài)的語(yǔ)言健霹,當(dāng)提到并發(fā)進(jìn)程旺上,多線程的時(shí)候,是離不開鎖的糖埋,鎖是一種并發(fā)編程中的同步原語(yǔ)(Synchronization Primitives)宣吱,它能保證多個(gè) Goroutine 在訪問同一片內(nèi)存時(shí)不會(huì)出現(xiàn)競(jìng)爭(zhēng)條件(Race condition)等問題。
基于原語(yǔ)
go語(yǔ)言在sync包中提供了用于同步的一些基本原語(yǔ)瞳别,包括常見的sync.Mutex,sync.RWMutex,sync.WaitGroup,
sync.Once,sync.Cond.
這些基本原語(yǔ)提高了較為基礎(chǔ)的同步功能征候,但是它們是一種相對(duì)原始的同步機(jī)制,在多數(shù)情況下祟敛,我們都應(yīng)該使用抽象層級(jí)的更高的 Channel 實(shí)現(xiàn)同步疤坝。
Mutex
Mutex由兩個(gè)字段:state,sema組成,其中state表示當(dāng)前互斥鎖的狀態(tài)馆铁,而sema是用于控制鎖的狀態(tài)的信號(hào)量卒煞。上述兩個(gè)加起來,只占用8個(gè)字節(jié)
-
狀態(tài)
- 在默認(rèn)的情況下叼架,互斥鎖的所有的狀態(tài)都是0畔裕,int32中不同位分別表示了不同的狀態(tài)
- mutexLocked — 表示互斥鎖的鎖定狀態(tài);
- mutexWoken — 表示從正常模式被從喚醒乖订;
- mutexStarving — 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài)扮饶;
- waitersCount — 當(dāng)前互斥鎖上等待的 Goroutine 個(gè)數(shù);
- 在默認(rèn)的情況下叼架,互斥鎖的所有的狀態(tài)都是0畔裕,int32中不同位分別表示了不同的狀態(tài)
-
正常模式和饑餓模
- 正常模式:鎖的的等待者會(huì)按照新出的順序獲取鎖乍构,但是剛被喚起的goroutine和新創(chuàng)造的進(jìn)程競(jìng)爭(zhēng)的時(shí)候甜无,大概率會(huì)獲得鎖,為了防止這種情況哥遮,一旦goroutine超過1ms沒有獲得鎖岂丘,就會(huì)將當(dāng)前的狀態(tài)切換為饑餓模式,防止部分 Goroutine 被『餓死』眠饮。
- 在饑餓模式中奥帘,互斥鎖會(huì)直接交給等待隊(duì)列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖仪召、也不會(huì)進(jìn)入自旋狀態(tài)寨蹋,它們只會(huì)在隊(duì)列的末尾等待松蒜。如果一個(gè) Goroutine 獲得了互斥鎖并且它在隊(duì)列的末尾或者它等待的時(shí)間少于 1ms,那么當(dāng)前的互斥鎖就會(huì)被切換回正常模式已旧。
-
加鎖和解鎖
- 互斥鎖的加鎖是靠 sync.Mutex.Lock 完成的秸苗,最新的 Go 語(yǔ)言源代碼中已經(jīng)將 sync.Mutex.Lock 方法進(jìn)行了簡(jiǎn)化,方法的主干只保留最常見运褪、簡(jiǎn)單的情況 — 當(dāng)鎖的狀態(tài)是 0 時(shí)惊楼,將 mutexLocked 位置成 1:如果互斥鎖的狀態(tài)不是0的時(shí)候就會(huì)調(diào)用sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)等方式等待鎖的釋放,該方法的主體是一個(gè)非常大 for 循環(huán)秸讹,這里將該方法分成幾個(gè)部分介紹獲取鎖的過程:
- 判斷當(dāng)前goroutine是否進(jìn)入自旋轉(zhuǎn)
- 通過自旋等待互斥鎖的釋放檀咙;
- 計(jì)算互斥鎖的最新狀態(tài);
- 更新互斥鎖的狀態(tài)并獲取鎖
- 自旋是一種多線程同步機(jī)制嗦枢,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會(huì)一直保持 CPU 的占用,持續(xù)檢查某個(gè)條件是否為真屯断。在多核的 CPU 上文虏,自旋可以避免 Goroutine 的切換殖演,使用恰當(dāng)會(huì)對(duì)性能帶來很大的增益氧秘,但是使用的不恰當(dāng)就會(huì)拖慢整個(gè)程序,所以 Goroutine 進(jìn)入自旋的條件非撑烤茫苛刻:
- 互斥鎖只有在普通模式下才會(huì)進(jìn)入自旋
- sync.runtime_canSpin 需要返回 true:
- 運(yùn)行在多 CPU 的機(jī)器上丸相;
- 當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次;
- 當(dāng)前機(jī)器上至少存在一個(gè)正在運(yùn)行的處理器 P 并且處理的運(yùn)行隊(duì)列為空彼棍;
- 如果沒有通過CAS 獲得鎖灭忠,會(huì)調(diào)用 sync.runtime_SemacquireMutex 使用信號(hào)量保證資源不會(huì)被兩個(gè) Goroutine 獲取。sync.runtime_SemacquireMutex 會(huì)在方法中不斷調(diào)用嘗試獲取鎖并休眠當(dāng)前 Goroutine 等待信號(hào)量的釋放座硕,一旦當(dāng)前 Goroutine 可以獲取信號(hào)量弛作,它就會(huì)立刻返回。
- 互斥鎖的加鎖是靠 sync.Mutex.Lock 完成的秸苗,最新的 Go 語(yǔ)言源代碼中已經(jīng)將 sync.Mutex.Lock 方法進(jìn)行了簡(jiǎn)化,方法的主干只保留最常見运褪、簡(jiǎn)單的情況 — 當(dāng)鎖的狀態(tài)是 0 時(shí)惊楼,將 mutexLocked 位置成 1:如果互斥鎖的狀態(tài)不是0的時(shí)候就會(huì)調(diào)用sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)等方式等待鎖的釋放,該方法的主體是一個(gè)非常大 for 循環(huán)秸讹,這里將該方法分成幾個(gè)部分介紹獲取鎖的過程:
RWMutex
讀寫互斥鎖sync.RWMutex华匾,是細(xì)粒度的互斥鎖映琳,她并不限制資源的并發(fā)讀,但是讀寫蜘拉,寫寫操作無法并行執(zhí)行萨西。一個(gè)常見的服務(wù)對(duì)資源的讀寫比例會(huì)非常高,因?yàn)榇蠖鄶?shù)的讀請(qǐng)求之間不會(huì)相互影響旭旭,所以我們可以讀寫資源操作的分離谎脯,在類似場(chǎng)景下提高服務(wù)的性能。
結(jié)構(gòu)體
sync.RWMutex 中總共包含以下 5 個(gè)字段:
type RWMUtex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
- w 復(fù)用互斥鎖提供的能力
- writerSem和readSem 分別用于寫等待和讀等待
- readerCount 存儲(chǔ)了當(dāng)前正在執(zhí)行的讀操作的數(shù)量
- readerWait 表示當(dāng)寫操作被阻塞時(shí)等待的讀操作的個(gè)數(shù)
我們會(huì)依次分析獲取寫鎖和讀鎖的實(shí)現(xiàn)能力持寄,其中:
- 寫操作使用 sync.RWMutex.Lock 和 sync.RWMutex.Unlock 方法穿肄;
- 讀操作使用 sync.RWMutex.RLock 和 sync.RWMutex.RUnlock 方法年局;
寫鎖
- 當(dāng)資源的使用者想要獲取寫鎖時(shí),需要調(diào)用 sync.RWMutex.Lock 方法
- 寫鎖的釋放會(huì)調(diào)用 sync.RWMutex.Unlock 方法
與加鎖的過程正好相反咸产,寫鎖的釋放分以下幾個(gè)執(zhí)行
- 調(diào)用atomic.AddInt32 函數(shù)將變回正數(shù)矢否,釋放讀鎖;
- 通過 for 循環(huán)觸發(fā)所有由于獲取讀鎖而陷入等待的 Goroutine
- 調(diào)用 sync.Mutex.Unlock 方法釋放寫鎖
讀鎖
讀鎖的加鎖方法 sync.RWMutex.RLock
func (rw *RWMutex) RLock(){
if atomic.AddInt32(&rw.readerCount,1) < 0 {
runtime_SemacquireMutex(&rw.readerSem,false,0)
}
}
- 如果該方法返回函數(shù)-其他 Goroutine 獲得了寫鎖脑溢,當(dāng)前 Goroutine 就會(huì)調(diào)用 sync.runtime_SemacquireMutex 陷入休眠等待鎖的釋放僵朗。
- 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖,當(dāng)前方法就會(huì)成功返回.
當(dāng) Goroutine 想要釋放讀鎖時(shí)屑彻,會(huì)調(diào)用如下所示的 sync.RWMutex.RUnlock 方法
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount,-1);r<0{
rw.rUnlockSlow(r)
}
}
WaitGroup
sync.WaitGroup 可以等待一組 Goroutine 的返回验庙,一個(gè)比較常見的使用場(chǎng)景是批量發(fā)出 RPC 或者 HTTP 請(qǐng)求:
reuqests := []*Requests{...}
wg := &sync.WaitGroup()
wg.Add(len(requests))
for _,request := range requests {
go func(r *Request){
defer wg.Done()
}(request)
}
wg.Wait()
我們可以通過 sync.WaitGroup 將原本順序執(zhí)行的代碼在多個(gè) Goroutine 中并發(fā)執(zhí)行,加快程序處理的速度社牲。
結(jié)構(gòu)體
sync.WaitGroup 結(jié)構(gòu)體中的成員變量非常簡(jiǎn)單粪薛,其中只包含兩個(gè)成員變量
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
- noCopy 保證 sync.WaitGroup 不會(huì)被開發(fā)者通過再賦值的方式拷貝
- state1 存儲(chǔ)著狀態(tài)和信號(hào)量
接口
其中的 sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法傳入了 -1,所以我們重點(diǎn)分析另外兩個(gè)方法 sync.WaitGroup.Add 和 sync.WaitGroup.Wait
func (wg *WaitGroup) Add(delta int){
statep,semap := wg.state()
state := atomic.AddUint64(statep,uint64(delta)<<32)
v := int32(state >>32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0{
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
另一個(gè)方法 sync.WaitGroup.Wait
func (wg *WaitGroup) Wait(){
statep,semp := wg.state()
for {
state := atomic.LoadUint64(statep)
v :=int32(state >> 32)
if v == 0{
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
當(dāng) sync.WaitGroup 的計(jì)數(shù)器歸零時(shí)搏恤,當(dāng)陷入睡眠狀態(tài)的 Goroutine 就被喚醒
Once
Go 語(yǔ)言標(biāo)準(zhǔn)庫(kù)中 sync.Once 可以保證在 Go 程序運(yùn)行期間的某段代碼只會(huì)執(zhí)行一次违寿。在運(yùn)行如下所示的代碼時(shí),我們會(huì)看到如下所示的運(yùn)行結(jié)果
func main() {
o := &sync.Once{}
for i:=0;i<10;i++{
o.Do(func(){
fmt.Println("ddd)
})
}
}
結(jié)構(gòu)體
每一個(gè) sync.Once 結(jié)構(gòu)體中都只包含一個(gè)用于標(biāo)識(shí)代碼塊是否執(zhí)行過的 done 以及一個(gè)互斥鎖 sync.Mutex
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do 是 sync.Once 結(jié)構(gòu)體對(duì)外唯一暴露的方法
- 如果傳入的函數(shù)已經(jīng)執(zhí)行過了熟空,就會(huì)直接返回
- 如果傳入的函數(shù)沒有執(zhí)行過藤巢,就會(huì)調(diào)用sync.Once.doSlow執(zhí)行傳入函數(shù)
func (o *Once) Do(f func()){
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()){
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUinit32(&o.done,1)
f()
}
}
Cond
Go標(biāo)準(zhǔn)庫(kù)的中的sync.Cond是一個(gè)條件變量,它可以讓一系列的goroutine都在滿足特定條件下時(shí)候被喚醒息罗,每一個(gè) sync.Cond 結(jié)構(gòu)體在初始化時(shí)都需要傳入一個(gè)互斥鎖掂咒,我們可以通過下面的例子了解它的使用方法
func main() {
c := sync.NewCond(&sync.Mutex{})
for i :=0;i<10;i++{
go listen(c)
}
time.Sleep(1*time.Second)
go broadcast(c)
ch := make(chan os.Signal,1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond){
c.l.Lock()
c.Broadcast()
c.l.Unlock()
}
func listen(c *sync.Cond) {
c.l.Lock()
c.wait()
fmt.Println("ddd")
c.l,Unlock()
}
上述代碼同時(shí)運(yùn)行了 11 個(gè) Goroutine,這 11 個(gè) Goroutine 分別做了不同事情:
- 10 個(gè) Goroutine 通過 sync.Cond.Wait 等待特定條件的滿足迈喉;
- 1 個(gè) Goroutine 會(huì)調(diào)用 sync.Cond.Broadcast 方法通知所有陷入等待的 Goroutine绍刮;
sync.Cond.Signal 和 sync.Cond.Broadcast 方法就是用來喚醒調(diào)用 sync.Cond.Wait 陷入休眠的 Goroutine,它們兩個(gè)的實(shí)現(xiàn)有一些細(xì)微差別:
- sync.Cond.Signal 方法會(huì)喚醒隊(duì)列最前面的 Goroutine挨摸;
- sync.Cond.Broadcast 方法會(huì)喚醒隊(duì)列中全部的 Goroutine录淡;
在一般情況下,我們都會(huì)先調(diào)用 sync.Cond.Wait 陷入休眠等待滿足期望條件油坝,當(dāng)滿足喚醒條件時(shí)嫉戚,就可以選擇使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 喚醒一個(gè)或者全部的 Goroutine。
ErrGroup
x/sync/errgroup.Group 就為我們?cè)谝唤M Goroutine 中提供了同步澈圈、錯(cuò)誤傳播以及上下文取消的功能彬檀,我們可以使用如下所示的方式并行獲取網(wǎng)頁(yè)的數(shù)據(jù)
var g errgroup.Group
var urls = []string{
"http://www.golang.org"
"http://www.baidu.com"
}
for i := range urls {
url := urls[i]
g.Go(func() error {
resp,err := http.Get(url)
if err == nil{
resp.Body.Close()
}
return err
})
}
if err := g.Wait();err == nil{
fmt.Println("Successfully fetched all URLs.")
}
x/sync/errgroup.Group.Go 方法能夠創(chuàng)建一個(gè) Goroutine 并在其中執(zhí)行傳入的函數(shù),而 x/sync/errgroup.Group.Wait 會(huì)等待所有 Goroutine 全部返回瞬女,該方法的不同返回結(jié)果也有不同的含義:
- 如果返回錯(cuò)誤 — 這一組 Goroutine 最少返回一個(gè)錯(cuò)誤窍帝;
- 如果返回空值 — 所有 Goroutine 都成功執(zhí)行
Semaphore
信號(hào)量是在并發(fā)編程中常見的一種同步機(jī)制,在需要控制訪問資源的進(jìn)程數(shù)量時(shí)就會(huì)用到信號(hào)量诽偷,它會(huì)保證持有的計(jì)數(shù)器在 0 到初始化的權(quán)重之間波動(dòng)
- 每次獲取資源時(shí)都會(huì)將信號(hào)量中的計(jì)數(shù)器減去對(duì)應(yīng)的數(shù)值坤学,在釋放時(shí)重新加回來
- 當(dāng)遇到計(jì)數(shù)器大于信號(hào)量大小時(shí)就會(huì)進(jìn)入休眠等待其他線程釋放信號(hào)
這個(gè)結(jié)構(gòu)體對(duì)外也只暴露了四個(gè)方法:
- x/sync/semaphore.NewWeighted 用于創(chuàng)建新的信號(hào)量纵竖;
- x/sync/semaphore.Weighted.Acquire 阻塞地獲取指定權(quán)重的資源焚鲜,如果當(dāng)前沒有空閑資源,就會(huì)陷入休眠等待;
- x/sync/semaphore.Weighted.TryAcquire 非阻塞地獲取指定權(quán)重的資源堕担,如果當(dāng)前沒有空閑資源洲尊,就會(huì)直接返回 false贩猎;
- x/sync/semaphore.Weighted.Release 用于釋放指定權(quán)重的資源收夸;
在使用過程中需要注意以下幾個(gè)問題
- x/sync/semaphore.Weighted.Acquire 和 x/sync/semaphore.Weighted.TryAcquire 方法都可以用于獲取資源,前者會(huì)阻塞地獲取信號(hào)量布卡,后者會(huì)非阻塞地獲取信號(hào)量雨让;
- x/sync/semaphore.Weighted.Release 方法會(huì)按照 FIFO 的順序喚醒可以被喚醒的 Goroutine;
- 如果一個(gè) Goroutine 獲取了較多地資源忿等,由于 x/sync/semaphore.Weighted.Release 的釋放策略可能會(huì)等待比較長(zhǎng)的時(shí)間
SingleFlight
這個(gè)是Go語(yǔ)言的擴(kuò)展包中提供的另外一個(gè)信號(hào)量栖忠,它能夠在一個(gè)服務(wù)中抑制對(duì)下游的多次重復(fù)請(qǐng)求,比如在redis的緩存雪崩中贸街,能夠限制對(duì)同一個(gè) Key 的多次重復(fù)請(qǐng)求庵寞,減少對(duì)下游的瞬時(shí)流量。
在資源獲取非常昂貴的時(shí)候匾浪,就很適合使用x/sync/singleflight.Group
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response error){
v,err,_ := requestGroup.Do(request.Hash(),func() (interface{},error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
})
if err != nil{
return nil,err
}
return Response {
rows:rows,
},nil
}
因?yàn)檎?qǐng)求的哈希在業(yè)務(wù)上一般表示相同的請(qǐng)求皇帮,所以上述代碼使用它作為請(qǐng)求的鍵卷哩。當(dāng)然蛋辈,我們也可以選擇其他的唯一字段作為 x/sync/singleflight.Group.Do 方法的第一個(gè)參數(shù)減少重復(fù)的請(qǐng)求。