整體來說比較正常司恳,就是保存三個(gè)狀態(tài)
- 計(jì)數(shù)值 // add到了多少
- waiter數(shù)量 // 有多少調(diào)用了Wait方法
- seme // 用來阻塞的信號(hào)量,掛載等待的協(xié)程
只是為了節(jié)省內(nèi)存徽职,這三個(gè)字段合成了一個(gè)字段,三個(gè)uint32切片共96位剧蹂。針對(duì)32位和64位系統(tǒng)也有不同實(shí)現(xiàn)爆安。
- Add 方法主要操作的是 state 的計(jì)數(shù)部分氢烘。計(jì)數(shù)值增加一個(gè) delta 值怀偷,內(nèi)部通過原子操作把這個(gè)值加到計(jì)數(shù)值上。需要注意的是播玖,這個(gè) delta 也可以是個(gè)負(fù)數(shù)椎工,相當(dāng)于為計(jì)數(shù)值減去一個(gè)值,Done 方法內(nèi)部其實(shí)就是通過Add(-1) 實(shí)現(xiàn)的黎棠。
- Wait 方法的實(shí)現(xiàn)邏輯是:不斷檢查 state 的值。如果其中的計(jì)數(shù)值變?yōu)榱?0镰绎,那么說明所有的任務(wù)已完成脓斩,調(diào)用者不必再等待,直接返回畴栖。如果計(jì)數(shù)值大于 0随静,說明此時(shí)還有任務(wù)沒完成,那么調(diào)用者就變成了等待者吗讶,需要加入 waiter 隊(duì)列燎猛,并且阻塞住自己。
Add的代碼
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Wait的代碼
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
需要注意的就是所有的Add要在Wait前調(diào)用 (在源碼中也有狀態(tài)的檢查)照皆。比如Add的調(diào)用要在開啟新協(xié)程前完成