Go sync.WaitGroup

問題

main goroutine為了等待work goroutine都運行完畢唾戚,不得不在程序末尾使用time.Sleep()來休眠一段時間掸犬,等待work goroutine充分運行站粟。

$ vim ./test/goroutine_test.go
package test

import (
    "fmt"
    "testing"
    "time"
)

func TestGoRoutine(t *testing.T) {
    for i := 0; i < 10; i++ {
        go fmt.Println(i)
    }
    time.Sleep(time.Second)
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN   TestGoRoutine
9
3
1
2
4
5
6
7
8
0
--- PASS: TestGoRoutine (1.00s)
PASS
ok      command-line-arguments  1.291s

但對于實際應用中微王,休眠1秒是完全不夠的顽馋,同時大部分時間都無法預知for循環(huán)內(nèi)代碼運行時間的長短皱碘,此時就不能使用time.Sleep()來完成等待操作。

可以使用管道來完成上述操作

func TestGoRoutine(t *testing.T) {
    count := 10
    ch := make(chan bool, count)
    for i := 0; i < count; i++ {
        go func(i int) {
            fmt.Println(i)
            ch <- true
        }(i)
    }
    for i := 0; i < count; i++ {
        <-ch
    }
}
$ go test -v -run TestGoRoutine goroutine_test.go
=== RUN   TestGoRoutine
9
0
5
6
7
8
2
1
4
3
--- PASS: TestGoRoutine (0.00s)
PASS
ok      command-line-arguments  0.304s

使用管道可以達到目的均唉,但有些大材小用是晨,因為管道被設計出來不僅僅只是在這里做簡單的同步處理的,因此這里使用管道實際上是不合適的舔箭。假如有上萬罩缴、上十萬蚊逢、上百萬的循環(huán),也要申請同樣數(shù)量大小的管道箫章,對內(nèi)存會是一個不小的開銷烙荷。

對于這種情況,Golang中有一種工具sync.WaitGroup能更加方便地幫助達到目的檬寂。

sync.WaitGroup

Golang中除了使用Channel通道和Mutex互斥鎖實現(xiàn)兩個并發(fā)程序之間的同步外终抽,還可以通過WaitGroup等待組實現(xiàn)多個任務的同步,WaitGroup可以保證在并發(fā)環(huán)境中完成指定數(shù)量的任務桶至。

  • WaitGroup在Golang中用于goroutine同步昼伴,解決同步阻塞等外的問題。

通俗來講goroutine分為兩類角色镣屹,一種gorouine作為一個worker小弟圃郊,老老實實的干活。另一種goroutine作為master管理者來監(jiān)督小弟干活女蜈,當然master自身也是一個worker描沟。

當有很多worker干活時,master沒事干歇著鞭光,但同時master又希望得到一個通知,了解所有worker們什么時候干完泞遗。

從程序開發(fā)角度來看惰许,就是維護一個worker總數(shù)和一個channel,每個worker干完就向channel發(fā)送一個空message史辙。master阻塞在channel的監(jiān)聽上汹买,來一個message就說明有一個worker干完活了,記錄下有多少message聊倔,messageworker總數(shù)一致則說明全干完活晦毙。master就可以關閉channel,驗收worker的工作成果耙蔑。

  • WaitGroup是指等待(Wait)一系列執(zhí)行(Group)完成后才會繼續(xù)向下執(zhí)行
  • WaitGroup能一直等到所有的work goroutine執(zhí)行完畢见妒,同時阻塞main goroutine的執(zhí)行,直到所有的goroutine執(zhí)行完成甸陌。
  • WaitGroup類似發(fā)布訂閱须揣,只不過訂閱者接收到的不是消息,而是一種事件信號钱豁。

計數(shù)器

WaitGroup內(nèi)部擁有一個計數(shù)器耻卡,最初從0開始。

type WaitGroup struct{
  noCopy noCopy
  state1 [3]byte
}
WaitGroup
  • Counter:Worker計數(shù)器
    master gortouine調(diào)用WaitGroup.Add(delta int)時會增加delta牲尺,調(diào)用WaitGroup.Done()時會減少1卵酪。
  • Waiter:Waiter計數(shù)器
    調(diào)用WaitGroup.Wait()Waiter計數(shù)器加1,worker goroutine計數(shù)器降低到0時,會重置Waiter計數(shù)器溃卡。
  • Sema:信號量
    用于阻塞master goroutine溢豆,調(diào)用WaitGroup.Wait()時會通過runtime_Semacquire獲取信號量。降低Waiter計數(shù)器時塑煎,通過runtime_Semrelease釋放信號量沫换。

方法

WaitGroup擁有三個方法分別是Add()Done()最铁、Wait()用來控制計數(shù)器的數(shù)量

Wai't'Group
  • Add()將計數(shù)器設置為n讯赏,用于增加或減少worker goroutine的數(shù)量。
func (wg *WaitGroup) Add(delta int)
  • Done()每次會將計數(shù)器減少1
func (wg *WaitGroup) Done()

WaitGroup.Done()WaitGroup.Add(-1)完全等價

  • Wait()會阻塞代碼的運行冷尉,直到計數(shù)器的值減少為0漱挎。
func (wg *WaitGroup) Wait()

使用方法

  1. master goroutine通過調(diào)用WaitGroup.Add(delta int)來設置worker goroutine的個數(shù),然后創(chuàng)建work goroutine雀哨。
  2. worker goroutine執(zhí)行結束后需調(diào)用WaitGroup.Done()
  3. master goroutine調(diào)用WaitGroup.Wait()且被block阻塞磕谅,直到所有的worker goroutine全部執(zhí)行結束后返回。

例如:

$ vim ./test/sync_test.go
package test

import (
    "fmt"
    "sync"
    "testing"
)

func TestWaitGroup(t *testing.T) {
    count := 10
    //添加goroutine數(shù)量
    wg := sync.WaitGroup{}
    wg.Add(count)
    //循環(huán)模擬并發(fā)
    for i := 0; i < count; i++ {
        go func(i int) {
            fmt.Println(i)
            wg.Done() //設置gorooutine為-1
        }(i)
    }
    //執(zhí)行main goroutine阻塞雾棺,直到所有WaitGroup數(shù)量為0膊夹。
    wg.Wait()
}
$ go test -v -run TestWaitGroup sync_test.go
=== RUN   TestWaitGroup
9
4
5
6
7
8
2
3
1
0
--- PASS: TestWaitGroup (0.00s)
PASS
ok      command-line-arguments  0.294s

注意

  • WaitGroup對象不是一個引用類型,函數(shù)傳值時需使用地址(地址傳值)捌浩。
  • WaitGroup的計數(shù)器不能為負數(shù)放刨,不能使用Add()給WaitGroup對象設置一個負值。

應用

需要一個用戶的畫像服務尸饺,當一個請求到來時需要

  • 從請求中解析出用戶ID和用戶畫像維度參數(shù)
  • 根據(jù)用戶ID從五個服務比如數(shù)據(jù)庫进统、存儲、RPC等拉取不同維度的數(shù)據(jù)
  • 將讀取到的數(shù)據(jù)進行整合返回給調(diào)用方

假如每個服務的響應時間是20ms到50ms浪听,如果順序調(diào)用服務讀取數(shù)據(jù)不考慮數(shù)據(jù)整合消耗的時間螟碎,服務端整體的響應時間將會在100ms到250ms。先不說業(yè)務能不能接受迹栓,響應時間顯然存在很大的優(yōu)化空間掉分。最直接的優(yōu)化方向是取數(shù)邏輯總時間應該是單個服務最大消耗時間。

func TestTask(t *testing.T) {
    var wg sync.WaitGroup

    for _,task := range tasks{
        task := task
        wg.Add(1)

        go func(){
            defer wg.Done()
            task()
        }()
    }

    wg.Wait()
}

使用注意

  • WaitGroup.Done()必須在所有WaitGroup.Add()之后執(zhí)行克伊,要保證兩個函數(shù)都在master goroutine中調(diào)用叉抡。
  • WaitGroup.Done()worker goroutine中調(diào)用,尤其要保證調(diào)用一次答毫,不能因為panic或任何原因?qū)е聸]有執(zhí)行褥民,因此建議使用defer WaitGroup.Done()
  • WaitGroup.Done()WaitGroup.Wait()在時序上沒有先后順序
task := task

由于Golang對切片遍歷時runtime會將tasks[i]拷貝到task的內(nèi)存地址中洗搂,下標i會變化消返,而task的內(nèi)存地址是不會改變的载弄。如果不做此次賦值操作,所有的goroutine可能讀取到的都是最后一個task撵颊。

例如:

func TestTask(t *testing.T) {
    tasks := []func(){
        func() { fmt.Printf("task1 ") },
        func() { fmt.Printf("task2 ") },
    }

    for index, task := range tasks {
        task()
        fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
    }
}
$ go test -v -run TestTask sync_test.go
=== RUN   TestTask
task1 0xc000006040 0xc00003c500
task2 0xc000006040 0xc00003c508
--- PASS: TestTask (0.00s)
PASS
ok      command-line-arguments  0.296s

執(zhí)行結果說明

  • 遍歷時數(shù)據(jù)的內(nèi)存地址不變unsafe.Pointer(&task)
  • 遍歷時通過下標獲取數(shù)據(jù)時內(nèi)存地址不同unsafe.Pointer(&tasks[index])
func TestTask(t *testing.T) {
    tasks := []func(){
        func() { fmt.Printf("task1 ") },
        func() { fmt.Printf("task2 ") },
    }

    for index, task := range tasks {
        task := task
        task()
        fmt.Printf("%v %v\n", unsafe.Pointer(&task), unsafe.Pointer(&tasks[index]))
    }
}
$ go test -v -run TestTask sync_test.go
=== RUN   TestTask
task1 0xc0000c0030 0xc0000884f0
task2 0xc0000c0038 0xc0000884f8
--- PASS: TestTask (0.00s)
PASS
ok      command-line-arguments  0.320s

執(zhí)行結果說明

  • 遍歷內(nèi)部創(chuàng)建的局部變量宇攻,即使名稱相同,內(nèi)存地址也不會復用倡勇。
  • 遍歷時數(shù)據(jù)的內(nèi)存地址不同unsafe.Pointer(&task)
  • 遍歷時通過下標獲取數(shù)據(jù)時內(nèi)存地址不同unsafe.Pointer(&tasks[index])
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末逞刷,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子妻熊,更是在濱河造成了極大的恐慌夸浅,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,029評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扔役,死亡現(xiàn)場離奇詭異帆喇,居然都是意外死亡,警方通過查閱死者的電腦和手機亿胸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,395評論 3 385
  • 文/潘曉璐 我一進店門坯钦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人侈玄,你說我怎么就攤上這事婉刀。” “怎么了序仙?”我有些...
    開封第一講書人閱讀 157,570評論 0 348
  • 文/不壞的土叔 我叫張陵突颊,是天一觀的道長。 經(jīng)常有香客問我诱桂,道長,這世上最難降的妖魔是什么呈昔? 我笑而不...
    開封第一講書人閱讀 56,535評論 1 284
  • 正文 為了忘掉前任挥等,我火速辦了婚禮,結果婚禮上堤尾,老公的妹妹穿的比我還像新娘肝劲。我一直安慰自己,他們只是感情好郭宝,可當我...
    茶點故事閱讀 65,650評論 6 386
  • 文/花漫 我一把揭開白布辞槐。 她就那樣靜靜地躺著,像睡著了一般粘室。 火紅的嫁衣襯著肌膚如雪榄檬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,850評論 1 290
  • 那天衔统,我揣著相機與錄音鹿榜,去河邊找鬼海雪。 笑死,一個胖子當著我的面吹牛舱殿,可吹牛的內(nèi)容都是我干的奥裸。 我是一名探鬼主播,決...
    沈念sama閱讀 39,006評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼沪袭,長吁一口氣:“原來是場噩夢啊……” “哼湾宙!你這毒婦竟也來了?” 一聲冷哼從身側響起冈绊,我...
    開封第一講書人閱讀 37,747評論 0 268
  • 序言:老撾萬榮一對情侶失蹤侠鳄,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后焚碌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體畦攘,經(jīng)...
    沈念sama閱讀 44,207評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,536評論 2 327
  • 正文 我和宋清朗相戀三年十电,在試婚紗的時候發(fā)現(xiàn)自己被綠了知押。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,683評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡鹃骂,死狀恐怖台盯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情畏线,我是刑警寧澤静盅,帶...
    沈念sama閱讀 34,342評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站寝殴,受9級特大地震影響蒿叠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蚣常,卻給世界環(huán)境...
    茶點故事閱讀 39,964評論 3 315
  • 文/蒙蒙 一市咽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抵蚊,春花似錦施绎、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,772評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至冈闭,卻和暖如春俱尼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背萎攒。 一陣腳步聲響...
    開封第一講書人閱讀 32,004評論 1 266
  • 我被黑心中介騙來泰國打工号显, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留臭猜,地道東北人。 一個月前我還...
    沈念sama閱讀 46,401評論 2 360
  • 正文 我出身青樓押蚤,卻偏偏與公主長得像蔑歌,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子揽碘,可洞房花燭夜當晚...
    茶點故事閱讀 43,566評論 2 349

推薦閱讀更多精彩內(nèi)容