[翻譯]GO并發(fā)模型一:Pipeline和Cancellation

image.png

簡書不維護(hù)了肴捉,歡迎關(guān)注我的知乎:波羅學(xué)的個(gè)人主頁

這篇文章較長將會(huì)分兩篇來翻譯。原文地址:https://blog.golang.org/pipelines

介紹

Go并發(fā)模型使構(gòu)建能有效利用IO和多核CPU的實(shí)時(shí)流式數(shù)據(jù)的pipeline非常方便办成。這篇文章將對(duì)此進(jìn)行介紹,同時(shí)會(huì)著重強(qiáng)調(diào)一些在實(shí)踐中的易犯錯(cuò)誤以及對(duì)應(yīng)的解決方法。

什么是Pipeline

在GO中,pipeline無明確定義壤躲;它是語言提供的一種并發(fā)編程方式,由連接各個(gè)chanel而形成的一系列階段組成备燃。在其各個(gè)階段碉克,可能分別運(yùn)行著很多的goroutine。這些goroutine

  • 從輸入channel接收數(shù)據(jù)
  • 對(duì)數(shù)據(jù)作相應(yīng)處理并齐,例如在此基礎(chǔ)上產(chǎn)生新數(shù)據(jù)
  • 再通過輸出channel把數(shù)據(jù)發(fā)送出去

除了開始和結(jié)束漏麦,每個(gè)階段都會(huì)包含任意多個(gè)輸入和輸出channel。開始階段只有輸出channel况褪,結(jié)束階段只有輸入channel撕贞。相應(yīng)地,開始階段可被稱為生產(chǎn)者测垛,結(jié)束階段可被稱為消費(fèi)者捏膨。

我們先通過一個(gè)簡單的例子來說明。

并發(fā)計(jì)算平方數(shù)

首先來舉一個(gè)三階段pipeline的例子

第一階段食侮,創(chuàng)建輸入?yún)?shù)為可變長int整數(shù)的gen函數(shù)号涯,它通過goroutine發(fā)送所有輸入?yún)?shù),并在發(fā)送完成后關(guān)閉相應(yīng)channel:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    
    reutrn out
}

第二階段锯七,sq函數(shù)链快,負(fù)責(zé)從輸入channel中接收數(shù)據(jù)并作平方處理再發(fā)送到輸出channel中。在輸入channel關(guān)閉并把所有數(shù)據(jù)都成功發(fā)送至輸出channel眉尸,關(guān)閉輸出channel:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()

    return out
}

主函數(shù)main中創(chuàng)建了pipeline域蜗,并執(zhí)行了最后階段的任務(wù),從管道中接收了第二階段的數(shù)據(jù)并打印了出來:

func main() {
    // Set up the pipeline
    c := gen(2, 3)
    out := sql(c)

    // Consume the output
    fmt.Println(<-out)
    fmt.Println(<-out)
}

此處sq函數(shù)的輸入和輸出參數(shù)為相同類型的channel噪猾,因此我們可以對(duì)其進(jìn)行組合霉祸。重寫main函數(shù),如下:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n)
    }
}

此處相等于在pipeline中增加了一個(gè)階段袱蜡,即涉及到了三個(gè)階段脉执,其中2、3階段的goroutine由同一函數(shù)產(chǎn)生戒劫。

Fan-out和Fan-in (扇出和扇入)

多個(gè)函數(shù)可同時(shí)從同一個(gè)channel中讀取數(shù)據(jù)半夷,直到channel關(guān)閉,稱為fan-out迅细。這為我們提供了一種將任務(wù)分發(fā)給多個(gè)worker的途徑巫橄,從而實(shí)現(xiàn)CPU和I/O的高效利用。

通過多路復(fù)用技術(shù)將多個(gè)channel合并到單個(gè)channel實(shí)現(xiàn)從多個(gè)輸入讀取數(shù)據(jù)的能力茵典,只有當(dāng)所有的輸入都關(guān)閉湘换,才會(huì)停止數(shù)據(jù)的讀取。這個(gè)稱作 fan-in

重寫之前的main彩倚,我們調(diào)用兩次sq筹我,且兩次都從同一個(gè)channel中讀取數(shù)據(jù)。我們將引入一個(gè)新的函數(shù)帆离,通過fan-in方式獲取數(shù)據(jù):

func main() {
    in := gen(2, 3)
    
    // Distribute the sq work across two goroutines that both read from in
    c1 := sq(in)
    c2 := sq(in)
    
    // Comsume the merged output from c1 and c2
    for n := range merge(c1, c2) {
        fmt.Println(n)  // 4 then 9, or 9 then 4
    }
}

merge函數(shù)通過為每個(gè)輸入channel啟動(dòng)一個(gè)goroutine實(shí)現(xiàn)將數(shù)據(jù)發(fā)送同一個(gè)channel中蔬蕊,從完成將channel列表轉(zhuǎn)化為單個(gè)channel的功能。一旦所有的輸出channel(生產(chǎn)者)啟動(dòng)哥谷,merge就會(huì)啟動(dòng)一或多個(gè)goroutine接收所有數(shù)據(jù)并在結(jié)束后關(guān)閉對(duì)應(yīng)channel岸夯。

在已關(guān)閉的channel發(fā)送數(shù)據(jù)會(huì)導(dǎo)致panic,因此保證關(guān)閉channel前所有數(shù)據(jù)都發(fā)送完畢是非常重要的们妥。sync.WaitGroup為我們提供了一種實(shí)現(xiàn)該同步的方式猜扮。示例如下:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        
        wg.Done()
    }
    wg.Add(len(cs)
    
    for _, c := range(cs) {
        go output(c)
    }
    
    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

goroutine突然停止

一些使用原則:

  • 在數(shù)據(jù)發(fā)送完畢后,關(guān)閉輸出channel监婶;
  • 持續(xù)不停地從輸入中接收數(shù)據(jù)旅赢,直到channel關(guān)閉;

我們可使用for循環(huán)來接收數(shù)據(jù),一旦數(shù)據(jù)所有數(shù)據(jù)接收完畢將會(huì)自動(dòng)退出惑惶。

但在真實(shí)場(chǎng)景中煮盼,并非所有情況都需要接收完所有的數(shù)據(jù)。有時(shí)設(shè)計(jì)如此集惋,我們只需一部分的數(shù)據(jù)并可運(yùn)行。更常見地踩娘,如果輸入早早就拋出了一個(gè)錯(cuò)誤刮刑,這時(shí)該階段便會(huì)早早地退出。還有一些情況养渴,如接收者不再等待數(shù)據(jù)接收雷绢,此時(shí)也需停止數(shù)據(jù)的生產(chǎn)。

在我們的例子中理卑,如果一個(gè)階段沒有成功接收完數(shù)據(jù)就退出翘紊,我們的goroutine仍會(huì)嘗試發(fā)送數(shù)據(jù),這將會(huì)導(dǎo)致channel進(jìn)入無限期的阻塞藐唠。

func main() {
    // Consume the first value from the output
    out := merge(c1, c2)
    fmt.Println(<-out)
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutine is hung attempting to send it
}

這是資源泄露:goroutine會(huì)繼續(xù)消耗內(nèi)存帆疟、運(yùn)行時(shí)資源,而且在棧中的堆引用也不能被回收宇立。goroutine必須退出踪宠,才能啟動(dòng)垃圾回收機(jī)制。

當(dāng)下游不能完全接收所有數(shù)據(jù)時(shí)妈嘹,我們需要準(zhǔn)備將上游goroutine退出柳琢。一種方式,我們可以把上游的channel設(shè)定為一個(gè)buffer。當(dāng)buffer還有空間時(shí)柬脸,發(fā)送操作將會(huì)立刻完成他去。

c := make(chan int, 2)
c <- 1      // succeeds immediately
c <- 2      // succeeds immediately
c <- 3      // blocks until another goroutine does <-c and receives 1

當(dāng)channel被創(chuàng)建時(shí),如果我們已經(jīng)知道數(shù)據(jù)的大小倒堕,可以如此來簡化我們的代碼灾测。比如,我們重寫gen函數(shù)涩馆,拷貝數(shù)據(jù)到buffer channel中行施,可以避免創(chuàng)建新的goroutine。

func gen(nums ...int) int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

我們可以考慮給merge函數(shù)輸出channel指定固定大小空間魂那。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan, int, 1)   // enough space for the unread inputs
    // ... the rest is unchanged ...
}

雖然這解決了程序中g(shù)oroutine阻塞的問題蛾号,但并不是好代碼。指定buffer的長度我們需知道m(xù)erge將接收值的數(shù)量涯雅。如果下游讀取少量數(shù)據(jù)便結(jié)束鲜结,依然會(huì)阻塞。

我們需要一種方式活逆,使下游通知上游以表明它們不再接收信息精刷。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蔗候,隨后出現(xiàn)的幾起案子怒允,更是在濱河造成了極大的恐慌,老刑警劉巖锈遥,帶你破解...
    沈念sama閱讀 216,919評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纫事,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡所灸,警方通過查閱死者的電腦和手機(jī)丽惶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,567評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來爬立,“玉大人钾唬,你說我怎么就攤上這事∠姥保” “怎么了抡秆?”我有些...
    開封第一講書人閱讀 163,316評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長吟策。 經(jīng)常有香客問我琅轧,道長,這世上最難降的妖魔是什么踊挠? 我笑而不...
    開封第一講書人閱讀 58,294評(píng)論 1 292
  • 正文 為了忘掉前任乍桂,我火速辦了婚禮冲杀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘睹酌。我一直安慰自己权谁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,318評(píng)論 6 390
  • 文/花漫 我一把揭開白布憋沿。 她就那樣靜靜地躺著旺芽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪辐啄。 梳的紋絲不亂的頭發(fā)上采章,一...
    開封第一講書人閱讀 51,245評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音壶辜,去河邊找鬼悯舟。 笑死,一個(gè)胖子當(dāng)著我的面吹牛砸民,可吹牛的內(nèi)容都是我干的抵怎。 我是一名探鬼主播,決...
    沈念sama閱讀 40,120評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼岭参,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼反惕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起演侯,我...
    開封第一講書人閱讀 38,964評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤姿染,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后秒际,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體悬赏,經(jīng)...
    沈念sama閱讀 45,376評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,592評(píng)論 2 333
  • 正文 我和宋清朗相戀三年程癌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了舷嗡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片轴猎。...
    茶點(diǎn)故事閱讀 39,764評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡嵌莉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出捻脖,到底是詐尸還是另有隱情锐峭,我是刑警寧澤,帶...
    沈念sama閱讀 35,460評(píng)論 5 344
  • 正文 年R本政府宣布可婶,位于F島的核電站沿癞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏矛渴。R本人自食惡果不足惜椎扬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,070評(píng)論 3 327
  • 文/蒙蒙 一惫搏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蚕涤,春花似錦筐赔、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,697評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至天吓,卻和暖如春贿肩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背龄寞。 一陣腳步聲響...
    開封第一講書人閱讀 32,846評(píng)論 1 269
  • 我被黑心中介騙來泰國打工汰规, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人萄焦。 一個(gè)月前我還...
    沈念sama閱讀 47,819評(píng)論 2 370
  • 正文 我出身青樓控轿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親拂封。 傳聞我的和親對(duì)象是個(gè)殘疾皇子茬射,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,665評(píng)論 2 354