sync 源碼解析

type Locker interface {
   Lock()
   Unlock()
}

Mutex 互斥鎖

互斥即不可同時(shí)運(yùn)行米绕。即使用了互斥鎖的兩個(gè)代碼片段互相排斥瑟捣,只有其中一個(gè)代碼片段執(zhí)行完成后,另一個(gè)才能執(zhí)行义郑。

type Mutex struct {
   state int32
   sema  uint32
}

state 表示當(dāng)前互斥鎖的狀態(tài)蝶柿,而 sema 是用于控制鎖狀態(tài)的信號量。

互斥鎖的狀態(tài)比較復(fù)雜非驮,如下圖所示,最低三位分別表示 mutexLocked雏赦、mutexWoken 和 mutexStarving劫笙,剩下的位置用來表示當(dāng)前有多少個(gè) Goroutine 在等待互斥鎖的釋放

image.png

在默認(rèn)情況下,互斥鎖的所有狀態(tài)位都是 0星岗,int32 中的不同位分別表示了不同的狀態(tài):

  • mutexLocked — 表示互斥鎖的鎖定狀態(tài)填大;
  • mutexWoken — 表示從正常模式被從喚醒;
  • mutexStarving — 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài)俏橘;
  • waitersCount — 當(dāng)前互斥鎖上等待的 Goroutine 個(gè)數(shù)允华;

Mutex 正常模式和饑餓模式

  • 在正常模式下,鎖的等待者會(huì)按照先進(jìn)先出的順序獲取鎖寥掐。
  • 但是剛被喚起的 Goroutine 與新創(chuàng)建的 Goroutine 競爭時(shí)靴寂,大概率會(huì)獲取不到鎖,為了減少這種情況的出現(xiàn)召耘,一旦 Goroutine 超過 1ms 沒有獲取到鎖百炬,它就會(huì)將當(dāng)前互斥鎖切換饑餓模式,防止部分 Goroutine 被『餓死』污它。
  • 在饑餓模式中剖踊,互斥鎖會(huì)直接交給等待隊(duì)列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖衫贬、也不會(huì)進(jìn)入自旋狀態(tài)德澈,它們只會(huì)在隊(duì)列的末尾等待。
  • 如果一個(gè) Goroutine 獲得了互斥鎖并且它在隊(duì)列的末尾或者它等待的時(shí)間少于 1ms固惯,那么當(dāng)前的互斥鎖就會(huì)切換回正常模式梆造。
  • 與饑餓模式相比,正常模式下的互斥鎖能夠提供更好地性能缝呕,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時(shí)澳窑。
const (
   mutexLocked = 1 << iota      //1         互斥鎖被鎖定 + 未被喚醒
   mutexWoken                   //2         互斥鎖未鎖定 + 被喚醒
   mutexStarving                //4         互斥鎖未鎖定 + 未被喚醒 + 饑餓模式啟動(dòng) 
   mutexWaiterShift = iota      //3         互斥鎖被鎖定 + 被喚醒
   starvationThresholdNs = 1e6
)

加鎖和解鎖

func (m *Mutex) Lock() {
   // 當(dāng)鎖的狀態(tài)是 0 時(shí),將state置位 mutexLocked 為 1:
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m))
      }
      return
   }
   // 如果互斥鎖的狀態(tài)不是 0 時(shí)就會(huì)調(diào)用 sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)
   m.lockSlow()
}
func (m *Mutex) lockSlow() {

該方法的主體是一個(gè)非常大 for 循環(huán)供常,這里將它分成幾個(gè)部分介紹獲取鎖的過程:

  • 判斷當(dāng)前 Goroutine 能否進(jìn)入自旋摊聋;
  • 通過自旋等待互斥鎖的釋放;
  • 計(jì)算互斥鎖的最新狀態(tài)栈暇;
  • 更新互斥鎖的狀態(tài)并獲取鎖麻裁;

自旋是一種多線程同步機(jī)制,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會(huì)一直保持 CPU 的占用,持續(xù)檢查某個(gè)條件是否為真

在多核的 CPU 上煎源,自旋可以避免 Goroutine 的切換色迂,使用恰當(dāng)會(huì)對性能帶來很大的增益,但是使用的不恰當(dāng)就會(huì)拖慢整個(gè)程序手销,所以 Goroutine 進(jìn)入自旋的條件非承苛刻

  • 互斥鎖只有在普通模式才能進(jìn)入自旋;
  • runtime.sync_runtime_canSpin 需要返回 true:
    1. 運(yùn)行在多 CPU 的機(jī)器上锋拖;
    2. 當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次诈悍;
    3. 當(dāng)前機(jī)器上至少存在一個(gè)正在運(yùn)行的處理器 P 并且處理的運(yùn)行隊(duì)列為空;

下面代碼為 進(jìn)入自旋 邏輯

   // 開始等待時(shí)間戳
   var waitStartTime int64
   // 饑餓模式標(biāo)識
   starving := false
   // 喚醒標(biāo)識
   awoke := false
   // 自旋次數(shù)
   iter := 0
   // 保存當(dāng)前對象鎖狀態(tài)
   old := m.state
   for {
      // 鎖是非饑餓狀態(tài)兽埃,鎖還沒被釋放侥钳,嘗試自旋
      // 判斷相當(dāng)于xxxx...x0xx & 0101 = 01,當(dāng)前對象狀態(tài)為:xxxx…x0xx 當(dāng)前對象鎖被使用
      // runtime_canSpin(iter) 根據(jù)iter自旋次數(shù)判斷
      // 互斥鎖被鎖定時(shí) 進(jìn)入自旋狀態(tài) continue
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         if !awoke && 
            // 判斷相當(dāng)于: xxxx...xx0x & 0010 = 0
            // 當(dāng)前對象狀態(tài)為未被喚醒時(shí) 
            old&mutexWoken == 0 && 
            // 當(dāng)前有 Goroutine 在等待互斥鎖的釋放時(shí)柄错,也就是有g(shù)oroution在排隊(duì)時(shí)  old>>mutexWaiterShift != 0 為 true
            old>>mutexWaiterShift != 0 &&
            // 將對象 改為喚醒狀態(tài):xxxx...xx0x | 0010 = xxxx...xx1x
            // 在將 m.state 改為被喚醒狀態(tài) 
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 
         {
                // 告知解鎖舷夺,不要喚醒其他阻塞的goroutines
                awoke = true
         }
         // 進(jìn)入自旋狀態(tài) 當(dāng)前goroutine并不掛起,仍然在占用cpu資源售貌,重試一定次數(shù)后给猾,退出自旋狀態(tài)
         runtime_doSpin()
         //表示自旋次數(shù)
         iter++
         // 保存mutex對象即將被設(shè)置成的狀態(tài)
         //再次獲取鎖的狀態(tài),之后會(huì)檢查是否鎖被釋放了
         old = m.state
         continue
      }

一旦當(dāng)前 Goroutine 能夠進(jìn)入自旋就會(huì)調(diào)用runtime.sync_runtime_doSpin 和 runtime.procyield 并執(zhí)行 30 次的 PAUSE 指令趁矾,該指令只會(huì)占用 CPU 并消耗 CPU 時(shí)間:

func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL cycles+0(FP), AX
again:
    PAUSE
    SUBL $1, AX
    JNZ again
    RET

處理了自旋相關(guān)的特殊邏輯之后耙册,互斥鎖會(huì)根據(jù)上下文計(jì)算當(dāng)前互斥鎖最新的狀態(tài)。幾個(gè)不同的條件分別會(huì)更新 state 字段中存儲(chǔ)的不同信息 — mutexLocked毫捣、mutexStarving详拙、mutexWoken 和 mutexWaiterShift:

      new := old
      // old 后三位為 mutexLocked,mutexWaiterShift,mutexWoken 時(shí) old&mutexStarving == 0 為true
      // xxxx...x0xx & 0100 = 0   xxxx...x0xx
      // old&mutexStarving == 0  篩選當(dāng)前狀態(tài)為正常模式(非饑餓模式)
      // 將新來的goroutines 互斥鎖鎖定
      if old&mutexStarving == 0 {
         // 將未鎖定狀態(tài)改為鎖定狀態(tài)
        // 非饑餓狀態(tài),加鎖
         new |= mutexLocked
      }
      // xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;
      // old 互斥鎖被鎖定時(shí)但是狀態(tài)為饑餓模式時(shí)蔓同,mutex的等待goroutine數(shù)目加1  
      if old&(mutexLocked|mutexStarving) != 0 {
        // new + 8  在等待互斥鎖的釋放的Goroutine數(shù)量 +1
        // 更新阻塞goroutine的數(shù)量,表示mutex的等待goroutine數(shù)目加1
         new += 1 << mutexWaiterShift
      }
      // xxxx...xxx1 & 0001 != 0饶辙;鎖狀態(tài)為鎖定狀態(tài) 
      if starving && old&mutexLocked != 0 {
        // new + 4 表示從正常模式進(jìn)入饑餓模式
        // xxxx...xxx | 0100 => xxxx...x1xx
         new |= mutexStarving
      }
      // 當(dāng) 喚醒標(biāo)志為 true時(shí) 
      if awoke {
        // xxxx...xx1x & 0010 = 0, 如果喚醒標(biāo)志為0 ,表示為被喚醒時(shí)斑粱,panic
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         // new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x :設(shè)置喚醒狀態(tài)位0,goroutine是被喚醒的弃揽,新狀態(tài)清除喚醒標(biāo)志
         new &^= mutexWoken
      }

計(jì)算了新的互斥鎖狀態(tài)之后,會(huì)使用 CAS 函數(shù) sync/atomic.compareandswapint32 更新狀態(tài)

     //判斷cas鎖是否 設(shè)置新狀態(tài) 成功 
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        //原來鎖的狀態(tài)已釋放则北,并且不是饑餓狀態(tài)矿微,正常請求到了鎖,返回
        // xxxx...x0x0 & 0101 = 0尚揣, 互斥鎖未加鎖且為正常模式(非饑餓模式)
         if old&(mutexLocked|mutexStarving) == 0 {
            //結(jié)束cas
            break
         }
         // 如果以前就在隊(duì)列里面涌矢,加入到隊(duì)列頭
         queueLifo := waitStartTime != 0
         if waitStartTime == 0 { 
          //獲取當(dāng)前運(yùn)行納秒時(shí)間戳
            waitStartTime = runtime_nanotime()
         }
         //通過信號量保證資源不會(huì)被兩個(gè) Goroutine 獲取
        //runtime.sync_runtime_SemacquireMutex 會(huì)在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放,一旦當(dāng)前 Goroutine 可以獲取信號量快骗,它就會(huì)立刻返回
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         // 判斷 等待時(shí)間 是否超出限制 1e6 將starving 標(biāo)志為 true 
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
         old = m.state
         // xxxx...x1xx & 0100 != 0   判斷鎖狀態(tài)是否 進(jìn)入饑餓狀態(tài)
         if old&mutexStarving != 0 {
            //當(dāng)前 Goroutine 會(huì)獲得互斥鎖娜庇,如果等待隊(duì)列中只存在當(dāng)前 Goroutine塔次,互斥鎖還會(huì)從饑餓模式中退出
            // xxxx...xx00 & 0011 = 0
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            // 加鎖并且將waiter數(shù)減1
            // delta -7  7 = 0111
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            //判斷 等待隊(duì)列中 是否只存在當(dāng)前 Goroutine
            if !starving || old>>mutexWaiterShift == 1 {
               // delta -11  11 = 1011 
               delta -= mutexStarving
            }
            // m.state - 7 表示當(dāng)前 Goroutine 會(huì)獲得互斥鎖
            //  m.state - 11 表示當(dāng)前 Goroutine 會(huì)獲得互斥鎖還會(huì)從饑餓模式中退出
            atomic.AddInt32(&m.state, delta)
            break
         }
        // 設(shè)置喚醒標(biāo)記 為true 
         awoke = true
        // 重置迭代次數(shù)
         iter = 0
      } else {
         old = m.state
      }
   }
   if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
   }
}

如果沒有通過 CAS 獲得鎖,會(huì)調(diào)用 runtime.sync_runtime_SemacquireMutex 通過信號量保證資源不會(huì)被兩個(gè) Goroutine 獲取

runtime.sync_runtime_SemacquireMutex 會(huì)在方法中不斷嘗試獲取鎖并陷入休眠等待信號量的釋放名秀,一旦當(dāng)前 Goroutine 可以獲取信號量励负,它就會(huì)立刻返回,sync.Mutex.Lock 的剩余代碼也會(huì)繼續(xù)執(zhí)行匕得。

  • 在正常模式下继榆,這段代碼會(huì)設(shè)置喚醒和饑餓標(biāo)記、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán)耗跛;
  • 在饑餓模式下裕照,當(dāng)前 Goroutine 會(huì)獲得互斥鎖,如果等待隊(duì)列中只存在當(dāng)前 Goroutine调塌,互斥鎖還會(huì)從饑餓模式中退出;

互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單惠猿,該過程會(huì)先使用 sync/atomic.AddInt32 函數(shù)快速解鎖羔砾,這時(shí)會(huì)發(fā)生下面的兩種情況:

  • 如果該函數(shù)返回的新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖偶妖;
  • 如果該函數(shù)返回的新狀態(tài)不等于 0姜凄,這段代碼會(huì)調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖:
func (m *Mutex) Unlock() {
   if race.Enabled {
      _ = m.state
      race.Release(unsafe.Pointer(m))
   }
   // m.state - 1 
   // 新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖
   // 新狀態(tài)不等于 0趾访,這段代碼會(huì)調(diào)用 sync.Mutex.unlockSlow 開始慢速解鎖
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      m.unlockSlow(new)
   }
}

sync.Mutex.unlockSlow 會(huì)先校驗(yàn)鎖狀態(tài)的合法性 — 如果當(dāng)前互斥鎖已經(jīng)被解鎖過了會(huì)直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序态秧。
在正常情況下會(huì)根據(jù)當(dāng)前互斥鎖的狀態(tài),分別處理正常模式和饑餓模式下的互斥鎖:

func (m *Mutex) unlockSlow(new int32) {
   // 校驗(yàn)鎖狀態(tài)的合法性
   if (new+mutexLocked)&mutexLocked == 0 { 
      //如果 當(dāng)前互斥鎖已經(jīng)被解鎖過了會(huì)直接拋出異常 “sync: unlock of unlocked mutex” 中止當(dāng)前程序
      throw("sync: unlock of unlocked mutex")
   }
   // 分別處理正常模式和饑餓模式下的互斥鎖
   if new&mutexStarving == 0 {
      // 正常狀態(tài)
      old := new
      for {
         // 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked扼鞋、mutexStarving申鱼、mutexWoken 狀態(tài)不都為 0(有喚醒者或鎖已經(jīng)加鎖),那么當(dāng)前方法可以直接返回云头,不需要喚醒其他等待者捐友;
         //如果沒有其它的 waiter,說明對這個(gè)鎖的競爭的 goroutine 只有一個(gè)溃槐,那就可以直接返回了匣砖;如果這個(gè)時(shí)候有喚醒的 goroutine,或者是又被別人加了鎖昏滴,那么猴鲫,無需我們操勞,其它 goroutine 自己干得都很好谣殊,當(dāng)前的這個(gè) goroutine 就可以放心返回了拂共。
         //xxxx…x000 & (0001 | 0010 | 0100) => xxxx…x000 & 0111 = 0
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 如果互斥鎖存在等待者,會(huì)通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán)蟹倾;
         //如果有等待者匣缘,并且沒有喚醒的 waiter猖闪,那就需要喚醒一個(gè)等待的 waiter。在喚醒之前肌厨,需要將 waiter 數(shù)量減 1培慌,并且將 mutexWoken 標(biāo)志設(shè)置上,這樣柑爸,Unlock 就可以返回了吵护。
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
      // 饑餓狀態(tài)
      // 當(dāng)前鎖交給下一個(gè)正在嘗試獲取鎖的等待者,等待者被喚醒后會(huì)得到鎖表鳍,在這時(shí)互斥鎖還不會(huì)退出饑餓狀態(tài)
      runtime_Semrelease(&m.sema, true, 1)
   }
}

在正常模式下馅而,上述代碼會(huì)使用如下所示的處理過程:

  • 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked、mutexStarving譬圣、mutexWoken 狀態(tài)不都為 0瓮恭,那么當(dāng)前方法可以直接返回,不需要喚醒其他等待者厘熟;
  • 如果互斥鎖存在等待者屯蹦,會(huì)通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán);

在饑餓模式下绳姨,上述代碼會(huì)直接調(diào)用 sync.runtime_Semrelease 將當(dāng)前鎖交給下一個(gè)正在嘗試獲取鎖的等待者登澜,等待者被喚醒后會(huì)得到鎖,在這時(shí)互斥鎖還不會(huì)退出饑餓狀態(tài)飘庄;

總結(jié)

互斥鎖的加鎖過程比較復(fù)雜脑蠕,它涉及自旋、信號量以及調(diào)度等概念:

  • 如果互斥鎖處于初始化狀態(tài)跪削,會(huì)通過置位 mutexLocked 加鎖谴仙;
  • 如果互斥鎖處于 mutexLocked 狀態(tài)并且在普通模式下工作,會(huì)進(jìn)入自旋切揭,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時(shí)間等待鎖的釋放狞甚;
  • 如果當(dāng)前 Goroutine 等待鎖的時(shí)間超過了 1ms,互斥鎖就會(huì)切換到饑餓模式廓旬;
  • 互斥鎖在正常情況下會(huì)通過 runtime.sync_runtime_SemacquireMutex 將嘗試獲取鎖的 Goroutine 切換至休眠狀態(tài)哼审,等待鎖的持有者喚醒;
  • 如果當(dāng)前 Goroutine 是互斥鎖上的最后一個(gè)等待的協(xié)程或者等待的時(shí)間小于 1ms孕豹,那么它會(huì)將互斥鎖切換回正常模式涩盾;

互斥鎖的解鎖過程與之相比就比較簡單,其代碼行數(shù)不多励背、邏輯清晰春霍,也比較容易理解:

  • 當(dāng)互斥鎖已經(jīng)被解鎖時(shí),調(diào)用 sync.Mutex.Unlock 會(huì)直接拋出異常叶眉;
  • 當(dāng)互斥鎖處于饑餓模式時(shí)址儒,將鎖的所有權(quán)交給隊(duì)列中的下一個(gè)等待者芹枷,等待者會(huì)負(fù)責(zé)設(shè)置 mutexLocked 標(biāo)志位;
  • 當(dāng)互斥鎖處于普通模式時(shí)莲趣,如果沒有 Goroutine 等待鎖的釋放或者已經(jīng)有被喚醒的 Goroutine 獲得了鎖鸳慈,會(huì)直接返回;在其他情況下會(huì)通過 sync.runtime_Semrelease 喚醒對應(yīng)的 Goroutine喧伞;

RWMutex 讀寫鎖

讀寫互斥鎖 sync.RWMutex 是細(xì)粒度的互斥鎖走芋,它不限制資源的并發(fā)讀,但是讀寫潘鲫、寫寫操作無法并行執(zhí)行翁逞。
sync.RWMutex 中總共包含以下 5 個(gè)字段:

type RWMutex struct {
   w           Mutex  // held if there are pending writers
   writerSem   uint32 // semaphore for writers to wait for completing readers
   readerSem   uint32 // semaphore for readers to wait for completing writers
   readerCount int32  // number of pending readers
   readerWait  int32  // number of departing readers
}
  • w — 復(fù)用互斥鎖提供的能力;
  • writerSem 和 readerSem — 分別用于寫等待讀和讀等待寫:
  • readerCount 存儲(chǔ)了當(dāng)前正在執(zhí)行的讀操作數(shù)量溉仑;
  • readerWait 表示當(dāng)寫操作被阻塞時(shí)等待的讀操作個(gè)數(shù)挖函;

寫鎖

當(dāng)資源的使用者想要獲取寫鎖時(shí)娃惯,需要調(diào)用 sync.RWMutex.Lock 方法:

func (rw *RWMutex) Lock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   rw.w.Lock()
   r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders 
   if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
      //進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒
      runtime_SemacquireMutex(&rw.writerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
      race.Acquire(unsafe.Pointer(&rw.writerSem))
   }
}
  1. 調(diào)用結(jié)構(gòu)體持有的 sync.Mutex 結(jié)構(gòu)體的 sync.Mutex.Lock 阻塞后續(xù)的寫操作叽掘;因?yàn)榛コ怄i已經(jīng)被獲取谆棱,其他 Goroutine 在獲取寫鎖時(shí)會(huì)進(jìn)入自旋或者休眠邻储;
  2. 調(diào)用 sync/atomic.AddInt32 函數(shù)阻塞后續(xù)的讀操作:
  3. 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖,該 Goroutine 會(huì)調(diào)用 runtime.sync_runtime_SemacquireMutex 進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒偏序;

寫鎖的釋放會(huì)調(diào)用 sync.RWMutex.Unlock:

func (rw *RWMutex) Unlock() {
   if race.Enabled {
      _ = rw.w.state
      race.Release(unsafe.Pointer(&rw.readerSem))
      race.Disable()
   }
   r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
   if r >= rwmutexMaxReaders {
      race.Enable()
      throw("sync: Unlock of unlocked RWMutex")
   }
   for i := 0; i < int(r); i++ {
      runtime_Semrelease(&rw.readerSem, false, 0)
   }
   rw.w.Unlock()
   if race.Enabled {
      race.Enable()
   }
}

與加鎖的過程正好相反,寫鎖的釋放分以下幾個(gè)執(zhí)行:

  • 調(diào)用 sync/atomic.AddInt32 函數(shù)將 readerCount 變回正數(shù),釋放讀鎖吩案;
  • 通過 for 循環(huán)釋放所有因?yàn)楂@取讀鎖而陷入等待的 Goroutine:
  • 調(diào)用 sync.Mutex.Unlock 釋放寫鎖;

獲取寫鎖時(shí)會(huì)先阻塞寫鎖的獲取帝簇,后阻塞讀鎖的獲取徘郭,這種策略能夠保證讀操作不會(huì)被連續(xù)的寫操作『餓死』。

讀鎖

讀鎖的加鎖方法 sync.RWMutex.RLock 很簡單丧肴,該方法會(huì)通過 sync/atomic.AddInt32 將 readerCount 加一:

func (rw *RWMutex) RLock() {
   if race.Enabled {
      _ = rw.w.state
      race.Disable()
   }
   if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      // A writer is pending, wait for it.
      runtime_SemacquireMutex(&rw.readerSem, false, 0)
   }
   if race.Enabled {
      race.Enable()
      race.Acquire(unsafe.Pointer(&rw.readerSem))
   }
}
  1. 如果該方法返回負(fù)數(shù) — 其他 Goroutine 獲得了寫鎖残揉,當(dāng)前 Goroutine 就會(huì)調(diào)用 runtime.sync_runtime_SemacquireMutex 陷入休眠等待鎖的釋放;
  2. 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖芋浮,當(dāng)前方法會(huì)成功返回抱环;

當(dāng) Goroutine 想要釋放讀鎖時(shí),會(huì)調(diào)用如下所示的 sync.RWMutex.RUnlock 方法:

func (rw *RWMutex) RUnlock() {
   if race.Enabled {
      _ = rw.w.state
      race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
      race.Disable()
   }
   if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      // Outlined slow-path to allow the fast-path to be inlined
      rw.rUnlockSlow(r)
   }
   if race.Enabled {
      race.Enable()
   }
}

該方法會(huì)先減少正在讀資源的 readerCount 整數(shù)纸巷,根據(jù) sync/atomic.AddInt32 的返回值不同會(huì)分別進(jìn)行處理:

  • 如果返回值大于等于零 — 讀鎖直接解鎖成功镇草;
  • 如果返回值小于零 — 有一個(gè)正在執(zhí)行的寫操作,在這時(shí)會(huì)調(diào)用sync.RWMutex.rUnlockSlow 方法瘤旨;
func (rw *RWMutex) rUnlockSlow(r int32) {
   if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      race.Enable()
      throw("sync: RUnlock of unlocked RWMutex")
   }
   // A writer is pending.
   if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      // The last reader unblocks the writer.
      runtime_Semrelease(&rw.writerSem, false, 1)
   }
}

sync.RWMutex.rUnlockSlow 會(huì)減少獲取鎖的寫操作等待的讀操作數(shù) readerWait 并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem梯啤,該信號量被觸發(fā)時(shí),調(diào)度器就會(huì)喚醒嘗試獲取寫鎖的 Goroutine存哲。

雖然讀寫互斥鎖 sync.RWMutex 提供的功能比較復(fù)雜因宇,但是因?yàn)樗⒃?sync.Mutex 上七婴,所以實(shí)現(xiàn)會(huì)簡單很多。我們總結(jié)一下讀鎖和寫鎖的關(guān)系:

  • 調(diào)用 sync.RWMutex.Lock 嘗試獲取寫鎖時(shí)察滑;
    • 每次 sync.RWMutex.RUnlock 都會(huì)將 readerCount 其減一打厘,當(dāng)它歸零時(shí)該 Goroutine 會(huì)獲得寫鎖;
    • 將 readerCount 減少 rwmutexMaxReaders 個(gè)數(shù)以阻塞后續(xù)的讀操作杭棵;
  • 調(diào)用 sync.RWMutex.Unlock 釋放寫鎖時(shí)婚惫,會(huì)先通知所有的讀操作,然后才會(huì)釋放持有的互斥鎖魂爪;

讀寫互斥鎖在互斥鎖之上提供了額外的更細(xì)粒度的控制先舷,能夠在讀操作遠(yuǎn)遠(yuǎn)多于寫操作時(shí)提升性能。

WaitGroup

sync.WaitGroup 可以等待一組 Goroutine 的返回滓侍,一個(gè)比較常見的使用場景是批量發(fā)出 RPC 或者 HTTP 請求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

我們可以通過 sync.WaitGroup 將原本順序執(zhí)行的代碼在多個(gè) Goroutine 中并發(fā)執(zhí)行蒋川,加快程序處理的速度。


image.png

WaitGroup 等待多個(gè) Goroutine

結(jié)構(gòu)體

sync.WaitGroup 結(jié)構(gòu)體中只包含兩個(gè)成員變量:

type WaitGroup struct {
   noCopy noCopy
   state1 [3]uint32
}
  • noCopy — 保證 sync.WaitGroup 不會(huì)被開發(fā)者通過再賦值的方式拷貝撩笆;
  • state1 — 存儲(chǔ)著狀態(tài)和信號量捺球;

sync.noCopy 是一個(gè)特殊的私有結(jié)構(gòu)體,tools/go/analysis/passes/copylock 包中的分析器會(huì)在編譯期間檢查被拷貝的變量中是否包含 sync.noCopy 或者實(shí)現(xiàn)了 Lock 和 Unlock 方法
sync.WaitGroup 結(jié)構(gòu)體中包含一個(gè)總共占用 12 字節(jié)的數(shù)組夕冲,這個(gè)數(shù)組會(huì)存儲(chǔ)當(dāng)前結(jié)構(gòu)體的狀態(tài)氮兵,在 64 位與 32 位的機(jī)器上表現(xiàn)也非常不同。


image.png

WaitGroup 在 64 位和 32 位機(jī)器的不同狀態(tài)

  • 64 位機(jī)器上本身就能保證 64 位對齊歹鱼,所以按照 64 位對齊來取數(shù)據(jù)泣栈,拿到 state1[0], state1[1] 本身就是64 位對齊的。但是 32 位機(jī)器上并不能保證 64 位對齊弥姻,因?yàn)?32 位機(jī)器是 4 字節(jié)對齊南片,如果也按照 64 位機(jī)器取 state[0],state[1] 就有可能會(huì)造成 atmoic 的使用錯(cuò)誤庭敦。
  • 32 位機(jī)器上空出第一個(gè) 32 位疼进,也就使后面 64 位天然滿足 64 位對齊,第一個(gè) 32 位放入 sema 剛好合適

WaitGroup.state1 其實(shí)代表三個(gè)字段:counter秧廉,waiter伞广,sema

  • counter :可以理解為一個(gè)計(jì)數(shù)器,計(jì)算經(jīng)過 wg.Add(N), wg.Done() 后的值定血。
  • waiter :當(dāng)前等待 WaitGroup 任務(wù)結(jié)束的等待者數(shù)量赔癌。其實(shí)就是調(diào)用 wg.Wait() 的次數(shù),所以通常這個(gè)值是 1 澜沟。
  • sema : 信號量灾票,用來喚醒 Wait() 函數(shù)。

sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出它的狀態(tài)和信號量茫虽。

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
   if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
      return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
   } else {
      return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
   }
}

接口

sync.WaitGroup 對外暴露了三個(gè)方法 — sync.WaitGroup.Add刊苍、sync.WaitGroup.Wait 和 sync.WaitGroup.Done.

func (wg *WaitGroup) Add(delta int) {
   statep, semap := wg.state()
   if race.Enabled {
      _ = *statep // trigger nil deref early
      if delta < 0 {
         race.ReleaseMerge(unsafe.Pointer(wg))
      }
      race.Disable()
      defer race.Enable()
   }

   //更新worker計(jì)數(shù)器
   state := atomic.AddUint64(statep, uint64(delta)<<32)

   //worker計(jì)數(shù)器:v 是 statep *uint64 的左32位
   //waiter計(jì)數(shù)器:w 是 statep *uint64 的右32位
   v := int32(state >> 32)
   w := uint32(state)

   if race.Enabled && delta > 0 && v == int32(delta) {
      race.Read(unsafe.Pointer(semap))
   }

   if v < 0 {
      panic("sync: negative WaitGroup counter")
   }

   if w != 0 && delta > 0 && v == int32(delta) {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }

  // worker 大于 0 或者 waiter 等于 0 說明還有Goroutine沒有執(zhí)行完既们,直接返回
   if v > 0 || w == 0 {
      return
   }
   
   if *statep != state {
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")
   }
   
   // 狀態(tài)設(shè)置為0 
   *statep = 0

   // 通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine(喚醒 Wait())
   for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
   }
}

sync.WaitGroup.Add 可以更新 sync.WaitGroup 中的計(jì)數(shù)器 counter。
雖然 sync.WaitGroup.Add 方法傳入的參數(shù)可以為負(fù)數(shù)正什,但是計(jì)數(shù)器只能是非負(fù)數(shù)啥纸,一旦出現(xiàn)負(fù)數(shù)就會(huì)發(fā)生程序崩潰。
當(dāng)調(diào)用計(jì)數(shù)器歸零婴氮,即所有任務(wù)都執(zhí)行完成時(shí)斯棒,才會(huì)通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的 Goroutine。

sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法傳入了 -1

func (wg *WaitGroup) Done() {
   wg.Add(-1)
}

sync.WaitGroup 的另一個(gè)方法 sync.WaitGroup.Wait 會(huì)在計(jì)數(shù)器大于 0 并且不存在等待的 Goroutine 時(shí)主经,調(diào)用 runtime.sync_runtime_Semacquire 陷入睡眠荣暮。

func (wg *WaitGroup) Wait() {
   statep, semap := wg.state()

   if race.Enabled {
      _ = *statep // trigger nil deref early
      race.Disable()
   }

   // 循環(huán)判斷 是否滿足退出條件 
   for {

      //worker計(jì)數(shù)器:v 是 statep *uint64 的左32位
      //waiter計(jì)數(shù)器:w 是 statep *uint64 的右32位
      state := atomic.LoadUint64(statep)
      v := int32(state >> 32)
      w := uint32(state)

      // 當(dāng)計(jì)數(shù)器為0時(shí),表示所有Goroutine 都執(zhí)行完成罩驻,立即返回
      if v == 0 {
         if race.Enabled {
            race.Enable()
            race.Acquire(unsafe.Pointer(wg))
         }
         return
      }

      // 更新waiter計(jì)數(shù)器  atomic.CompareAndSwapUint64 對uint64值執(zhí)行比較和交換操作
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         if race.Enabled && w == 0 {
            race.Write(unsafe.Pointer(semap))
         }
         // Goroutine 進(jìn)入睡眠狀態(tài)穗酥,等待信號量喚醒
         runtime_Semacquire(semap)

         if *statep != 0 {
            panic("sync: WaitGroup is reused before previous Wait has returned")
         }

         if race.Enabled {
            race.Enable()
            race.Acquire(unsafe.Pointer(wg))
         }

         return
      } 
      
   }
}

當(dāng) sync.WaitGroup 的計(jì)數(shù)器歸零時(shí),陷入睡眠狀態(tài)的 Goroutine 會(huì)被喚醒惠遏,上述方法也會(huì)立刻返回砾跃。

通過對 sync.WaitGroup 的分析和研究,我們能夠得出以下結(jié)論:

  • sync.WaitGroup 必須在 sync.WaitGroup.Wait 方法返回之后才能被重新使用节吮;
  • sync.WaitGroup.Done 只是對 sync.WaitGroup.Add 方法的簡單封裝抽高,我們可以向 sync.WaitGroup.Add 方法傳入任意負(fù)數(shù)(需要保證計(jì)數(shù)器非負(fù))快速將計(jì)數(shù)器歸零以喚醒等待的 Goroutine;
  • 可以同時(shí)有多個(gè) Goroutine 等待當(dāng)前 sync.WaitGroup 計(jì)數(shù)器的歸零透绩,這些 Goroutine 會(huì)被同時(shí)喚醒厨内;

once

Go 語言標(biāo)準(zhǔn)庫中 sync.Once 可以保證在 Go 程序運(yùn)行期間的某段代碼只會(huì)執(zhí)行一次

結(jié)構(gòu)體

每一個(gè) sync.Once 結(jié)構(gòu)體中都只包含一個(gè)用于標(biāo)識代碼塊是否執(zhí)行過的 done 以及一個(gè)互斥鎖 sync.Mutex:

type Once struct {
   done uint32
   m    Mutex
}

接口

sync.Once.Do 是 sync.Once 結(jié)構(gòu)體對外唯一暴露的方法,該方法會(huì)接收一個(gè)入?yún)榭盏暮瘮?shù):

  • 如果傳入的函數(shù)已經(jīng)執(zhí)行過渺贤,會(huì)直接返回;
  • 如果傳入的函數(shù)沒有執(zhí)行過请毛,會(huì)調(diào)用 sync.Once.doSlow 執(zhí)行傳入的函數(shù):
func (o *Once) Do(f func()) {
   if atomic.LoadUint32(&o.done) == 0 {
      o.doSlow(f)
   }
}

func (o *Once) doSlow(f func()) {
   o.m.Lock()
   defer o.m.Unlock()
   if o.done == 0 {
      defer atomic.StoreUint32(&o.done, 1)
      f()
   }
}
  1. 為當(dāng)前 Goroutine 獲取互斥鎖志鞍;
  2. 執(zhí)行傳入的無入?yún)⒑瘮?shù);
  3. 運(yùn)行延遲函數(shù)調(diào)用方仿,將成員變量 done 更新成 1固棚;

sync.Once 會(huì)通過成員變量 done 確保函數(shù)不會(huì)執(zhí)行第二次。

作為用于保證函數(shù)執(zhí)行次數(shù)的 sync.Once 結(jié)構(gòu)體仙蚜,它使用互斥鎖和 sync/atomic 包提供的方法實(shí)現(xiàn)了某個(gè)函數(shù)在程序運(yùn)行期間只能執(zhí)行一次的語義此洲。在使用該結(jié)構(gòu)體時(shí),我們也需要注意以下的問題:

  • sync.Once.Do方法中傳入的函數(shù)只會(huì)被執(zhí)行一次委粉,哪怕函數(shù)中發(fā)生了 panic呜师;
  • 兩次調(diào)用 sync.Once.Do 方法傳入不同的函數(shù)只會(huì)執(zhí)行第一次調(diào)傳入的函數(shù);

Cond

sync.Cond 用來協(xié)調(diào)想要訪問共享資源的 goroutine贾节。
sync.Cond 經(jīng)常用在多個(gè) goroutine 等待汁汗,一個(gè) goroutine 通知(事件發(fā)生)的場景衷畦。如果是一個(gè)通知,一個(gè)等待知牌,使用互斥鎖或 channel 就能搞定了祈争。
使用場景

  • 有一個(gè)協(xié)程在異步地接收數(shù)據(jù),剩下的多個(gè)協(xié)程必須等待這個(gè)協(xié)程接收完數(shù)據(jù)角寸,才能讀取到正確的數(shù)據(jù)菩混。在這種情況下,如果單純使用 chan 或互斥鎖扁藕,那么只能有一個(gè)協(xié)程可以等待沮峡,并讀取到數(shù)據(jù),沒辦法通知其他的協(xié)程也讀取數(shù)據(jù)纹磺。
  • 這個(gè)時(shí)候帖烘,就需要有個(gè)全局的變量來標(biāo)志第一個(gè)協(xié)程數(shù)據(jù)是否接受完畢,剩下的協(xié)程橄杨,反復(fù)檢查該變量的值秘症,直到滿足要求∈浇茫或者創(chuàng)建多個(gè) channel乡摹,每個(gè)協(xié)程阻塞在一個(gè) channel 上,由接收數(shù)據(jù)的協(xié)程在數(shù)據(jù)接收完畢后采转,逐個(gè)通知聪廉。總之故慈,需要額外的復(fù)雜度來完成這件事

結(jié)構(gòu)體

type Cond struct {
   noCopy noCopy
   L Locker
   notify  notifyList
   checker copyChecker
}
  • noCopy — 用于保證結(jié)構(gòu)體不會(huì)在編譯期間拷貝板熊;
  • copyChecker — 用于禁止運(yùn)行期間發(fā)生的拷貝;
  • L — 用于保護(hù)內(nèi)部的 notify 字段察绷,Locker 接口類型的變量干签;
  • notify — 一個(gè) Goroutine 的鏈表,它是實(shí)現(xiàn)同步機(jī)制的核心結(jié)構(gòu)拆撼;
type notifyList struct {
   wait   uint32
   notify uint32
   lock   uintptr // key field of the mutex
   head   unsafe.Pointer
   tail   unsafe.Pointer
}

在 sync.notifyList 結(jié)構(gòu)體中容劳,head 和 tail 分別指向的鏈表的頭和尾,wait 和 notify 分別表示當(dāng)前正在等待的和已經(jīng)通知到的 Goroutine 的索引闸度。

type copyChecker uintptr

func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}
newCond
func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

NewCond 創(chuàng)建 Cond 實(shí)例時(shí)竭贩,需要關(guān)聯(lián)一個(gè)鎖。

接口

wait

sync.Cond 對外暴露的 sync.Cond.Wait 方法會(huì)將當(dāng)前 Goroutine 陷入休眠狀態(tài)莺禁,它的執(zhí)行過程分成以下兩個(gè)步驟:

  • 調(diào)用 runtime.notifyListAdd 將等待計(jì)數(shù)器加一并解鎖留量;
  • 調(diào)用 runtime.notifyListWait 等待其他 Goroutine 的喚醒并加鎖:
func (c *Cond) Wait() {
   // 檢查c是否是被復(fù)制的,如果是就panic
   c.checker.check()
   // 將當(dāng)前goroutine加入等待隊(duì)列
   t := runtime_notifyListAdd(&c.notify)
   // 解鎖
   c.L.Unlock()
   // 等待隊(duì)列中的所有的goroutine執(zhí)行等待喚醒操作
   runtime_notifyListWait(&c.notify, t)
   // 鎖
   c.L.Lock()
}

調(diào)用 Wait 會(huì)自動(dòng)釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine肪获,因此當(dāng)前協(xié)程會(huì)阻塞在 Wait 方法調(diào)用的地方寝凌。
如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時(shí)孝赫,會(huì)重新給 c.L 加鎖较木,并且繼續(xù)執(zhí)行 Wait 后面的代碼。

runtime.notifyListWait 會(huì)獲取當(dāng)前 Goroutine 并將它追加到 Goroutine 通知鏈表的最末端:

func notifyListWait(l *notifyList, t uint32) {
        s := acquireSudog()
        s.g = getg()
        s.ticket = t
        if l.tail == nil {
                l.head = s
        } else {
                l.tail.next = s
        }
        l.tail = s
        goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
        releaseSudog(s)
}

除了將當(dāng)前 Goroutine 追加到鏈表的末端之外青柄,我們還會(huì)調(diào)用 runtime.goparkunlock 將當(dāng)前 Goroutine 陷入休眠伐债,該函數(shù)也是在 Go 語言切換 Goroutine 時(shí)經(jīng)常會(huì)使用的方法,它會(huì)直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒致开。


image.png

sync.Cond.Signal 和 sync.Cond.Broadcast 就是用來喚醒陷入休眠的 Goroutine 的方法峰锁,它們的實(shí)現(xiàn)有一些細(xì)微的差別:

  • sync.Cond.Signal 方法會(huì)喚醒隊(duì)列最前面的 Goroutine;
  • sync.Cond.Broadcast 方法會(huì)喚醒隊(duì)列中全部的 Goroutine双戳;
func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}

Signal 只喚醒任意 1 個(gè)等待條件變量 c 的 goroutine虹蒋,無需鎖保護(hù)。

func (c *Cond) Broadcast() {
   c.checker.check()
   runtime_notifyListNotifyAll(&c.notify)
}

Broadcast 喚醒所有等待條件變量 c 的 goroutine飒货,無需鎖保護(hù)魄衅。

runtime.notifyListNotifyOne 只會(huì)從 sync.notifyList 鏈表中找到滿足 sudog.ticket == l.notify 條件的 Goroutine 并通過 runtime.readyWithTime 喚醒:

func notifyListNotifyOne(l *notifyList) {
        t := l.notify
        atomic.Store(&l.notify, t+1)

        for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
                if s.ticket == t {
                        n := s.next
                        if p != nil {
                                p.next = n
                        } else {
                                l.head = n
                        }
                        if n == nil {
                                l.tail = p
                        }
                        s.next = nil
                        readyWithTime(s, 4)
                        return
                }
        }
}

runtime.notifyListNotifyAll 會(huì)依次通過 runtime.readyWithTime 喚醒鏈表中 Goroutine:

func notifyListNotifyAll(l *notifyList) {
        s := l.head
        l.head = nil
        l.tail = nil

        atomic.Store(&l.notify, atomic.Load(&l.wait))

        for s != nil {
                next := s.next
                s.next = nil
                readyWithTime(s, 4)
                s = next
        }
}

Goroutine 的喚醒順序也是按照加入隊(duì)列的先后順序,先加入的會(huì)先被喚醒塘辅,而后加入的可能 Goroutine 需要等待調(diào)度器的調(diào)度晃虫。
在一般情況下,我們都會(huì)先調(diào)用 sync.Cond.Wait 陷入休眠等待滿足期望條件扣墩,當(dāng)滿足喚醒條件時(shí)哲银,就可以選擇使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 喚醒一個(gè)或者全部的 Goroutine。
Cond使用例子

var status int64

func main() {
        c := sync.NewCond(&sync.Mutex{})
        for i := 0; i < 10; i++ {
                go listen(c)
        }
        time.Sleep(1 * time.Second)
        go broadcast(c)

        ch := make(chan os.Signal, 1)
        signal.Notify(ch, os.Interrupt)
        <-ch
}

func broadcast(c *sync.Cond) {
        c.L.Lock()
        atomic.StoreInt64(&status, 1)
        c.Broadcast()
        c.L.Unlock()
}

func listen(c *sync.Cond) {
        c.L.Lock()
        for atomic.LoadInt64(&status) != 1 {
                c.Wait()
        }
        fmt.Println("listen")
        c.L.Unlock()
}

$ go run main.go
listen
...
listen

sync.Cond 不是一個(gè)常用的同步機(jī)制呻惕,但是在條件長時(shí)間無法滿足時(shí)荆责,與使用 for {} 進(jìn)行忙碌等待相比,sync.Cond 能夠讓出處理器的使用權(quán)亚脆,提高 CPU 的利用率草巡。使用時(shí)我們也需要注意以下問題:

  • sync.Cond.Wait 在調(diào)用之前一定要使用獲取互斥鎖,否則會(huì)觸發(fā)程序崩潰型酥;
  • sync.Cond.Signal 喚醒的 Goroutine 都是隊(duì)列最前面、等待最久的 Goroutine查乒;
  • sync.Cond.Broadcast 會(huì)按照一定順序廣播通知等待的全部 Goroutine弥喉;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市玛迄,隨后出現(xiàn)的幾起案子由境,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件虏杰,死亡現(xiàn)場離奇詭異讥蟆,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)纺阔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進(jìn)店門瘸彤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人笛钝,你說我怎么就攤上這事质况。” “怎么了玻靡?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵结榄,是天一觀的道長。 經(jīng)常有香客問我囤捻,道長臼朗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任蝎土,我火速辦了婚禮视哑,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘瘟则。我一直安慰自己黎炉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布醋拧。 她就那樣靜靜地躺著慷嗜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪丹壕。 梳的紋絲不亂的頭發(fā)上庆械,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機(jī)與錄音菌赖,去河邊找鬼缭乘。 笑死,一個(gè)胖子當(dāng)著我的面吹牛琉用,可吹牛的內(nèi)容都是我干的堕绩。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼邑时,長吁一口氣:“原來是場噩夢啊……” “哼奴紧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起晶丘,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤黍氮,失蹤者是張志新(化名)和其女友劉穎唐含,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沫浆,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡捷枯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了专执。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,605評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡他炊,死狀恐怖争剿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情痊末,我是刑警寧澤蚕苇,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站凿叠,受9級特大地震影響涩笤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜盒件,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一蹬碧、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧炒刁,春花似錦恩沽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至城瞎,卻和暖如春渤闷,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脖镀。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工飒箭, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蜒灰。 一個(gè)月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓弦蹂,卻偏偏與公主長得像,于是被迫代替她去往敵國和親强窖。 傳聞我的和親對象是個(gè)殘疾皇子凸椿,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容