WaitGroup
WaitGroup
主要用于等待多個(gè)goroutines
執(zhí)行完蕊玷,具體怎么用這種基操就不說了
結(jié)構(gòu)
// WaitGroup結(jié)構(gòu)體
type WaitGroup struct {
// noCopy醋界,同字面意思次员,就是不允許copy
// go中禁止copy的方法就是在目標(biāo)結(jié)構(gòu)體中聲明一個(gè)結(jié)構(gòu)體noCopy的變量雨涛,這樣go vet就能檢測出來
// 詳見 https://github.com/golang/go/issues/8005#issuecomment-190753527
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
// 包含3個(gè)uint32的數(shù)組脐区,這三個(gè)uint32分別表示goroutine計(jì)數(shù)(對應(yīng)Add和Done操作)弥搞、等待計(jì)數(shù)(對應(yīng)Wait操作)和信號量抹蚀,信號量是用來喚醒因調(diào)用wait而睡眠等待的goroutine
// 具體state1中哪個(gè)uint32表示上述三個(gè)變量拯辙,這個(gè)得在運(yùn)行時(shí)計(jì)算得出
// 主要原因是在Add中會(huì)用到原子操作atomic.AddUint64郭变,該方法要求對齊系數(shù)是8,關(guān)于內(nèi)存對齊涯保,可參考 https://gfw.go101.org/article/memory-layout.html
// 所以當(dāng)運(yùn)行在32位機(jī)器的時(shí)候诉濒,由于默認(rèn)對齊系數(shù)是4,所以state1的地址可能是8的倍數(shù)也可能不是8的倍數(shù)夕春,當(dāng)不是8的倍數(shù)的時(shí)候未荒,state1[0]表示信號量,state[1]和state[2]分別表示goroutine計(jì)數(shù)和等待計(jì)數(shù)及志,這樣state[1]的地址就肯定是8的倍數(shù)
// 這么做的好處是無論是32位機(jī)器還是64位機(jī)器片排,state1始終只占用12個(gè)字節(jié),不會(huì)為了內(nèi)存對齊而浪費(fèi)內(nèi)存空間
state1 [3]uint32
}
// state returns pointers to the state and sema fields stored within wg.state1.
// 動(dòng)態(tài)獲取goroutine計(jì)數(shù)速侈、等待計(jì)數(shù)和信號量率寡,下面用c、w和p表示
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
// 如果state1的地址是8的倍數(shù)
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
// state1[0]和state1[1]分別是c和w倚搬,state1[2]是p
// 這是這里c和w統(tǒng)一按照一個(gè)uint64返回冶共,分別占據(jù)高32位和低32位
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
// 否則state1[1]和state1[2]分別是c和w,state1[0]是p
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
Add和Done
// 添加goroutine計(jì)數(shù),注意delta可正可負(fù)
// 當(dāng)delta為負(fù)數(shù)時(shí)捅僵,對應(yīng)Done操作
func (wg *WaitGroup) Add(delta int) {
// 獲取c家卖、w和p
statep, semap := wg.state()
// 競態(tài)檢測
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// 還記得上面說的吧,這里高32位是c庙楚,所以需要將delta右移32位加和
// 這里用到了原子操作上荡,也就是并發(fā)安全的
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 高32位是c
v := int32(state >> 32)
// 低32位是w
w := uint32(state)
// 競態(tài)檢測
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
// 如果加和后goroutine計(jì)數(shù)還變成負(fù)數(shù)了,那肯定有問題了馒闷,直接panic
// 所以注意調(diào)用Done方法的次數(shù)要 <= Add進(jìn)去的goroutine數(shù)量
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// 到這里說明v>=0
// WaitGroup是可以復(fù)用的酪捡,但是需要等到wait計(jì)數(shù)清零之后
// 這里就是防止并發(fā)造成的疊加使用
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 如果加和完之后goroutine計(jì)數(shù)還是 > 0 說明還有g(shù)oroutine的Done還未執(zhí)行或者只是單純的添加了一些goroutine
// 此時(shí)加和完后直接返回即可
// 如果v<=0,結(jié)合上面的v>=0窜司,可知道v=0沛善,如果這個(gè)時(shí)候w=0,說明等待計(jì)數(shù)也清零了塞祈,也可以直接返回了
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
// 這里會(huì)做最后一次合法檢查,如果由于并發(fā)調(diào)用Add帅涂、Done或者Wait方法導(dǎo)致了statep指向的state1中的uint32發(fā)生了改變
// 直接panic
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
// 到這里可以知道v=0议薪,w!=0
// 既然goroutine計(jì)數(shù)清零了,那么說明所有的goroutine都執(zhí)行了Done方法了
// 這個(gè)時(shí)候需要喚醒所有通過Wait睡眠的goroutine媳友,而具體要喚醒多少斯议,就需要使用等待計(jì)數(shù)了
*statep = 0
for ; w != 0; w-- {
// 釋放信號量,通過runtime_Semacquire喚醒被阻塞的waiter
runtime_Semrelease(semap, false, 0)
}
}
// Done decrements the WaitGroup counter by one.
// 調(diào)用Add方法醇锚,將goroutine計(jì)數(shù)減一
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait
// Wait blocks until the WaitGroup counter is zero.
// 每次執(zhí)行Wait哼御,等待計(jì)數(shù)都會(huì)加1
func (wg *WaitGroup) Wait() {
// 同樣的操作
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
// 注意這里是個(gè)原子操作,因?yàn)槿绻?2位機(jī)器焊唬,每次取四個(gè)字節(jié)恋昼,取一個(gè)uint64需要兩次
// 為了兩次過程中目標(biāo)不被更改,所以使用原子操作
state := atomic.LoadUint64(statep)
//取到goroutine計(jì)數(shù)和等待計(jì)數(shù)
v := int32(state >> 32)
w := uint32(state)
// 如果還未添加goroutine赶促,Wait什么也不用做液肌,直接返回即可
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// Increment waiters count.
// 這里又是一個(gè)原子操作,先比較再+1
// 所以這里如果并發(fā)的調(diào)用Wait方法鸥滨,可能會(huì)導(dǎo)致某些Wait方法失效
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 該方法和runtime_Semrelease是一對
// 當(dāng)semap > 0的時(shí)候會(huì)被喚醒并將semap減1嗦哆,這兩個(gè)步驟是一個(gè)原子行為
runtime_Semacquire(semap)
// 通過Add方法可以知道,喚醒Wait之前會(huì)將statep重置為0
// 這里會(huì)做進(jìn)一步合法校驗(yàn)婿滓,如果statep不為0老速,也就是說Wait還未全部喚醒,WaitGroup就被重新使用并添加了goroutine
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
// 喚醒完之后不用做凸主,Wait阻塞會(huì)解除橘券,對應(yīng)的go程序會(huì)繼續(xù)執(zhí)行
return
}
}
}
總結(jié)
WaitGroup
巧妙的通過動(dòng)態(tài)布局state1來適配多硬件體系的內(nèi)存對齊,節(jié)省了內(nèi)存空間,這個(gè)我們在構(gòu)造結(jié)構(gòu)體的時(shí)候约郁,如果對內(nèi)存占用要求很高缩挑,也需要注意調(diào)整布局來適配內(nèi)存對齊,達(dá)到最小的內(nèi)存占用鬓梅;同時(shí)不要濫用Add
和Wait
方法供置,特別是并發(fā)場景下,老老實(shí)實(shí)套常用的寫法即可