問題
當main goroutine
為了等待work goroutine
都運行完畢唾戚,不得不在程序末尾使用time.Sleep()
來休眠一段時間掸犬,等待work goroutine
充分運行站粟。
$ vim ./test/goroutine_test.go
package test
import (
"fmt"
"testing"
"time"
)
func TestGoRoutine(t *testing.T) {
for i := 0; i < 10; i++ {
go fmt.Println(i)
}
time.Sleep(time.Second)
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN TestGoRoutine
9
3
1
2
4
5
6
7
8
0
--- PASS: TestGoRoutine (1.00s)
PASS
ok command-line-arguments 1.291s
但對于實際應用中微王,休眠1秒是完全不夠的顽馋,同時大部分時間都無法預知for
循環(huán)內(nèi)代碼運行時間的長短皱碘,此時就不能使用time.Sleep()
來完成等待操作。
可以使用管道來完成上述操作
func TestGoRoutine(t *testing.T) {
count := 10
ch := make(chan bool, count)
for i := 0; i < count; i++ {
go func(i int) {
fmt.Println(i)
ch <- true
}(i)
}
for i := 0; i < count; i++ {
<-ch
}
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN TestGoRoutine
9
0
5
6
7
8
2
1
4
3
--- PASS: TestGoRoutine (0.00s)
PASS
ok command-line-arguments 0.304s
使用管道可以達到目的均唉,但有些大材小用是晨,因為管道被設計出來不僅僅只是在這里做簡單的同步處理的,因此這里使用管道實際上是不合適的舔箭。假如有上萬罩缴、上十萬蚊逢、上百萬的循環(huán),也要申請同樣數(shù)量大小的管道箫章,對內(nèi)存會是一個不小的開銷烙荷。
對于這種情況,Golang中有一種工具sync.WaitGroup
能更加方便地幫助達到目的檬寂。
sync.WaitGroup
Golang中除了使用Channel通道和Mutex互斥鎖實現(xiàn)兩個并發(fā)程序之間的同步外终抽,還可以通過WaitGroup等待組實現(xiàn)多個任務的同步,WaitGroup可以保證在并發(fā)環(huán)境中完成指定數(shù)量的任務桶至。
-
WaitGroup
在Golang中用于goroutine
同步昼伴,解決同步阻塞等外的問題。
通俗來講goroutine
分為兩類角色镣屹,一種gorouine
作為一個worker
小弟圃郊,老老實實的干活。另一種goroutine
作為master
管理者來監(jiān)督小弟干活女蜈,當然master
自身也是一個worker
描沟。
當有很多worker
干活時,master
沒事干歇著鞭光,但同時master
又希望得到一個通知,了解所有worker
們什么時候干完泞遗。
從程序開發(fā)角度來看惰许,就是維護一個worker
總數(shù)和一個channel
,每個worker
干完就向channel
發(fā)送一個空message
史辙。master
阻塞在channel
的監(jiān)聽上汹买,來一個message
就說明有一個worker
干完活了,記錄下有多少message
聊倔,message
和worker
總數(shù)一致則說明全干完活晦毙。master
就可以關閉channel
,驗收worker
的工作成果耙蔑。
-
WaitGroup
是指等待(Wait)一系列執(zhí)行(Group)完成后才會繼續(xù)向下執(zhí)行 -
WaitGroup
能一直等到所有的work goroutine
執(zhí)行完畢见妒,同時阻塞main goroutine
的執(zhí)行,直到所有的goroutine
執(zhí)行完成甸陌。 -
WaitGroup
類似發(fā)布訂閱须揣,只不過訂閱者接收到的不是消息,而是一種事件信號钱豁。
計數(shù)器
WaitGroup
內(nèi)部擁有一個計數(shù)器耻卡,最初從0開始。
type WaitGroup struct{
noCopy noCopy
state1 [3]byte
}
- Counter:Worker計數(shù)器
master gortouine
調(diào)用WaitGroup.Add(delta int)
時會增加delta
牲尺,調(diào)用WaitGroup.Done()
時會減少1卵酪。 - Waiter:Waiter計數(shù)器
調(diào)用WaitGroup.Wait()
時Waiter
計數(shù)器加1,worker goroutine
計數(shù)器降低到0時,會重置Waiter
計數(shù)器溃卡。 - Sema:信號量
用于阻塞master goroutine
溢豆,調(diào)用WaitGroup.Wait()
時會通過runtime_Semacquire
獲取信號量。降低Waiter
計數(shù)器時塑煎,通過runtime_Semrelease
釋放信號量沫换。
方法
WaitGroup
擁有三個方法分別是Add()
、Done()
最铁、Wait()
用來控制計數(shù)器的數(shù)量
-
Add()
將計數(shù)器設置為n
讯赏,用于增加或減少worker goroutine
的數(shù)量。
func (wg *WaitGroup) Add(delta int)
-
Done()
每次會將計數(shù)器減少1
func (wg *WaitGroup) Done()
WaitGroup.Done()
和WaitGroup.Add(-1)
完全等價
-
Wait()
會阻塞代碼的運行冷尉,直到計數(shù)器的值減少為0漱挎。
func (wg *WaitGroup) Wait()
使用方法
-
master goroutine
通過調(diào)用WaitGroup.Add(delta int)
來設置worker goroutine
的個數(shù),然后創(chuàng)建work goroutine
雀哨。 -
worker goroutine
執(zhí)行結束后需調(diào)用WaitGroup.Done()
-
master goroutine
調(diào)用WaitGroup.Wait()
且被block
阻塞磕谅,直到所有的worker goroutine
全部執(zhí)行結束后返回。
例如:
$ vim ./test/sync_test.go
package test
import (
"fmt"
"sync"
"testing"
)
func TestWaitGroup(t *testing.T) {
count := 10
//添加goroutine數(shù)量
wg := sync.WaitGroup{}
wg.Add(count)
//循環(huán)模擬并發(fā)
for i := 0; i < count; i++ {
go func(i int) {
fmt.Println(i)
wg.Done() //設置gorooutine為-1
}(i)
}
//執(zhí)行main goroutine阻塞雾棺,直到所有WaitGroup數(shù)量為0膊夹。
wg.Wait()
}
$ go test -v -run TestWaitGroup sync_test.go
=== RUN TestWaitGroup
9
4
5
6
7
8
2
3
1
0
--- PASS: TestWaitGroup (0.00s)
PASS
ok command-line-arguments 0.294s
注意
- WaitGroup對象不是一個引用類型,函數(shù)傳值時需使用地址(地址傳值)捌浩。
- WaitGroup的計數(shù)器不能為負數(shù)放刨,不能使用
Add()
給WaitGroup對象設置一個負值。
應用
需要一個用戶的畫像服務尸饺,當一個請求到來時需要
- 從請求中解析出用戶ID和用戶畫像維度參數(shù)
- 根據(jù)用戶ID從五個服務比如數(shù)據(jù)庫进统、存儲、RPC等拉取不同維度的數(shù)據(jù)
- 將讀取到的數(shù)據(jù)進行整合返回給調(diào)用方
假如每個服務的響應時間是20ms到50ms浪听,如果順序調(diào)用服務讀取數(shù)據(jù)不考慮數(shù)據(jù)整合消耗的時間螟碎,服務端整體的響應時間將會在100ms到250ms。先不說業(yè)務能不能接受迹栓,響應時間顯然存在很大的優(yōu)化空間掉分。最直接的優(yōu)化方向是取數(shù)邏輯總時間應該是單個服務最大消耗時間。
func TestTask(t *testing.T) {
var wg sync.WaitGroup
for _,task := range tasks{
task := task
wg.Add(1)
go func(){
defer wg.Done()
task()
}()
}
wg.Wait()
}
使用注意
-
WaitGroup.Done()
必須在所有WaitGroup.Add()
之后執(zhí)行克伊,要保證兩個函數(shù)都在master goroutine
中調(diào)用叉抡。 -
WaitGroup.Done()
在worker goroutine
中調(diào)用,尤其要保證調(diào)用一次答毫,不能因為panic
或任何原因?qū)е聸]有執(zhí)行褥民,因此建議使用defer WaitGroup.Done()
。 -
WaitGroup.Done()
和WaitGroup.Wait()
在時序上沒有先后順序
task := task
由于Golang對切片遍歷時runtime
會將tasks[i]
拷貝到task
的內(nèi)存地址中洗搂,下標i
會變化消返,而task
的內(nèi)存地址是不會改變的载弄。如果不做此次賦值操作,所有的goroutine
可能讀取到的都是最后一個task
撵颊。
例如:
func TestTask(t *testing.T) {
tasks := []func(){
func() { fmt.Printf("task1 ") },
func() { fmt.Printf("task2 ") },
}
for index, task := range tasks {
task()
fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
}
}
$ go test -v -run TestTask sync_test.go
=== RUN TestTask
task1 0xc000006040 0xc00003c500
task2 0xc000006040 0xc00003c508
--- PASS: TestTask (0.00s)
PASS
ok command-line-arguments 0.296s
執(zhí)行結果說明
- 遍歷時數(shù)據(jù)的內(nèi)存地址不變
unsafe.Pointer(&task)
- 遍歷時通過下標獲取數(shù)據(jù)時內(nèi)存地址不同
unsafe.Pointer(&tasks[index])
func TestTask(t *testing.T) {
tasks := []func(){
func() { fmt.Printf("task1 ") },
func() { fmt.Printf("task2 ") },
}
for index, task := range tasks {
task := task
task()
fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
}
}
$ go test -v -run TestTask sync_test.go
=== RUN TestTask
task1 0xc0000c0030 0xc0000884f0
task2 0xc0000c0038 0xc0000884f8
--- PASS: TestTask (0.00s)
PASS
ok command-line-arguments 0.320s
執(zhí)行結果說明
- 遍歷內(nèi)部創(chuàng)建的局部變量宇攻,即使名稱相同,內(nèi)存地址也不會復用倡勇。
- 遍歷時數(shù)據(jù)的內(nèi)存地址不同
unsafe.Pointer(&task)
- 遍歷時通過下標獲取數(shù)據(jù)時內(nèi)存地址不同
unsafe.Pointer(&tasks[index])