[翻譯]GO并發(fā)模型二:Pipeline和Cancellation

image.png

簡(jiǎn)書不維護(hù)了匕垫,歡迎關(guān)注我的知乎:波羅學(xué)的個(gè)人主頁(yè)

緊接上文:[翻譯]GO并發(fā)模型一:Pipeline和Cancellation

明確地取消

當(dāng)主函數(shù)在沒有從輸出channel中接收完所有值便退出時(shí)搂抒,它必須告訴上游停止數(shù)據(jù)發(fā)送腌巾≥┢担可以通過(guò)向done channel發(fā)送停止信號(hào)實(shí)現(xiàn)瓤鼻。此處有兩個(gè)可能阻塞的goroutine宁脊,所以需發(fā)兩個(gè)值渐夸。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutine that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out)

    // Tell the remaining senders we're leaving
    done <- struct{}{}
    done <- struct{}{}
}

merge中的發(fā)送goroutine用select語(yǔ)句取代了原來(lái)的發(fā)送操作箍镜,它將負(fù)責(zé)將數(shù)據(jù)發(fā)出和接收done channel的消息源祈。Done將接收的值是空結(jié)構(gòu)體,因?yàn)樵撝禌]有任何意義:它僅僅是用來(lái)表明應(yīng)該停止向輸出channel發(fā)送數(shù)據(jù)了色迂。該goroutine將會(huì)不停循環(huán)地從輸入channel中接收數(shù)據(jù)香缺,以確保上游不被阻塞。(待會(huì)我們將會(huì)討論怎么提早從循環(huán)退出)歇僧。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
                    case out <- n:
                    case <-done:
            }
            }
        wg.Done()
    }
    // ... the rest is unchanged ...
}

這種方式的問(wèn)題是:每個(gè)下游都需知道上游將發(fā)送的數(shù)據(jù)量图张,以便向其發(fā)送消息實(shí)現(xiàn)提早退出锋拖。但毫無(wú)疑問(wèn),時(shí)刻監(jiān)控已發(fā)送數(shù)量是非郴雎郑荒誕兽埃,也是非常容易出錯(cuò)的。

我們需要一種在上游goroutine數(shù)量未知或無(wú)限大的情況下使其停止的方式适袜。在GO中柄错,我們可以通過(guò)關(guān)閉channel來(lái)實(shí)現(xiàn),因?yàn)樵谝殃P(guān)閉的channel上接收數(shù)據(jù)會(huì)被立刻處理并返回一個(gè)零值苦酱。

這意味著main函數(shù)中可僅僅通過(guò)關(guān)閉done channel來(lái)使發(fā)送方解除阻塞售貌。該關(guān)閉操作會(huì)產(chǎn)生一個(gè)有效的廣播信號(hào)并傳遞給發(fā)送方。我們可以擴(kuò)展pipeline中的函數(shù)疫萤,使其可以多接受一個(gè)done參數(shù)颂跨,然后通過(guò)defer語(yǔ)句對(duì)執(zhí)行關(guān)閉以便于在main退出時(shí)發(fā)送給各階段完成信號(hào)來(lái)實(shí)現(xiàn)退出。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.
}

一旦done channel關(guān)閉扯饶,各個(gè)階段就可以成功返回退出恒削。當(dāng)done被關(guān)閉,merge就會(huì)知道上游會(huì)停止發(fā)送數(shù)據(jù)尾序,merge函數(shù)就會(huì)停止從輸入channel接收數(shù)據(jù)并返回钓丰。輸出channel通過(guò)defer語(yǔ)句確保所有的wg.Done在函數(shù)時(shí)能被調(diào)用。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...
}

相似地蹲诀,只要done channel一關(guān)閉斑粱,sq函數(shù)也會(huì)立刻返回。通過(guò)defer語(yǔ)句脯爪,sql函數(shù)確保它們輸出channel一定能被順利關(guān)閉则北。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

pipeline幾個(gè)指導(dǎo)原則:

  • 各個(gè)階段在所有的發(fā)送操作完成便會(huì)關(guān)閉輸出channels;
  • 各個(gè)階段會(huì)不停的接收數(shù)據(jù)痕慢,直到這些channel都被關(guān)閉或者發(fā)送方不再阻塞尚揣;

Pipelines中可以通過(guò)為數(shù)據(jù)發(fā)送提供足夠的buffer大小或在接收方確定放棄繼續(xù)接收數(shù)據(jù)時(shí)發(fā)送完成信號(hào)來(lái)解除發(fā)送方的阻塞。

對(duì)目錄中的文件執(zhí)行摘要

讓我們來(lái)看一個(gè)更實(shí)際的例子.

MD5是一種消息摘要算法掖举,在checksum校驗(yàn)文件方面非常有用快骗。通過(guò)命令行工具md5sum,我們打印了一系列文件的摘要值塔次。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我們的例子是一個(gè)類似于md5sum的程序方篮,它接受單一目錄作為參數(shù),并打印該目錄下每個(gè)文件的摘要值励负。文件是按文件名升序排列打印藕溅。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

程序的main主函數(shù)調(diào)用了一個(gè)名為MD5All的函數(shù),它返回的是一個(gè)map继榆,key為路徑名巾表,value為摘要值汁掠。最后,對(duì)結(jié)果進(jìn)行了排序和打印集币。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All函數(shù)我們將重點(diǎn)討論考阱。在serial.go文件中,并沒有使用并發(fā)技術(shù)鞠苟,僅僅是依次讀取每個(gè)文件內(nèi)容并對(duì)其調(diào)用md5.Sum乞榨。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行執(zhí)行摘要

在parellel.go文件中,我們把MD5ALL拆成了兩階段偶妖。第一階段姜凄,在sumFiles函數(shù)中政溃,它遍歷目錄并在各個(gè)goroutine中執(zhí)行文件摘要趾访,最后將結(jié)果發(fā)送給channel。

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles函數(shù)返回了兩個(gè)channels:一個(gè)用于傳輸結(jié)果董虱,另一個(gè)用于返回filepath.Walk的錯(cuò)誤扼鞋。walk函數(shù)為每個(gè)文件啟動(dòng)了一個(gè)新的goroutine來(lái)處理它們,同時(shí)也檢查是否done 愤诱。如果done被關(guān)閉云头,walk函數(shù)將立刻返回。

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

MD5All函數(shù)從c(channel)中接收摘要值淫半。但發(fā)現(xiàn)錯(cuò)誤溃槐,它會(huì)提早返回,并通過(guò)defer語(yǔ)句關(guān)閉done科吭。

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

對(duì)并行增加限制

parallel.go中的MD5All通過(guò)為每個(gè)文件系統(tǒng)一個(gè)新的goroutine實(shí)現(xiàn)昏滴。如果一個(gè)目錄中有太多的大文件,這可能會(huì)導(dǎo)致分配的內(nèi)存超過(guò)機(jī)器的可用內(nèi)存对人。

我們可以通過(guò)限制并行讀取文件的數(shù)量來(lái)限制內(nèi)容分配谣殊。在bounded.go文件中,我們創(chuàng)建了固定數(shù)量的goroutines來(lái)讀取文件∥現(xiàn)在我們的pipeline涉及了三個(gè)階段:遍歷目錄樹姻几、讀取文件并執(zhí)行摘要和收集摘要結(jié)果。

第一階段势告,walkFiles蛇捌,負(fù)責(zé)發(fā)送目錄樹中文件路徑:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

第二階段,我們?yōu)閐igester函數(shù)啟動(dòng)了固定數(shù)量的goroutine咱台,它將從paths中接收文件名處理并發(fā)送摘要結(jié)果給channel c:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

不同于之前的例子络拌,digester不會(huì)關(guān)閉輸出channel,因?yàn)樘嗟膅oroutine共用了一個(gè)channel吵护。取而代之盒音,在所有digester執(zhí)行完畢表鳍,MD5All會(huì)著手關(guān)閉channel。

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我們可以為每個(gè)digester創(chuàng)建獨(dú)立的channel祥诽,然后返回譬圣。但是這樣我們就需要增加額外的goroutines來(lái)對(duì)結(jié)果進(jìn)行合并。

最后階段雄坪,我們從channel c中接收所有的結(jié)果并檢查errc是否返回了錯(cuò)誤厘熟。該檢查無(wú)法過(guò)早執(zhí)行,因?yàn)檫^(guò)早檢查维哈,可能會(huì)導(dǎo)致walkFile阻塞绳姨。

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

總結(jié)

這篇文章給我們展示了如何在GO中構(gòu)建流式數(shù)據(jù)pipeline。pipeline中處理失敗是需要一定的技巧的阔挠,因?yàn)槊總€(gè)嘗試給下游發(fā)送數(shù)據(jù)的階段都可能被阻塞飘庄,因?yàn)橄掠慰赡懿辉诮邮丈嫌蔚妮斎霐?shù)據(jù)。我們展示了如何通過(guò)關(guān)閉channel來(lái)給所有的goroutine發(fā)送 "done" 信號(hào)和定義了正確構(gòu)建pipeline的指導(dǎo)原則购撼。

作者:Sameer Ajmani

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末跪削,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子迂求,更是在濱河造成了極大的恐慌碾盐,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,843評(píng)論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件揩局,死亡現(xiàn)場(chǎng)離奇詭異毫玖,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)凌盯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門付枫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人十气,你說(shuō)我怎么就攤上這事励背。” “怎么了砸西?”我有些...
    開封第一講書人閱讀 163,187評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵叶眉,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我芹枷,道長(zhǎng)衅疙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,264評(píng)論 1 292
  • 正文 為了忘掉前任鸳慈,我火速辦了婚禮饱溢,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘走芋。我一直安慰自己绩郎,他們只是感情好潘鲫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,289評(píng)論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著肋杖,像睡著了一般溉仑。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上状植,一...
    開封第一講書人閱讀 51,231評(píng)論 1 299
  • 那天浊竟,我揣著相機(jī)與錄音,去河邊找鬼津畸。 笑死振定,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的肉拓。 我是一名探鬼主播后频,決...
    沈念sama閱讀 40,116評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼帝簇!你這毒婦竟也來(lái)了徘郭?” 一聲冷哼從身側(cè)響起靠益,我...
    開封第一講書人閱讀 38,945評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤丧肴,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后胧后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芋浮,經(jīng)...
    沈念sama閱讀 45,367評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,581評(píng)論 2 333
  • 正文 我和宋清朗相戀三年壳快,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了纸巷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,754評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡眶痰,死狀恐怖瘤旨,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情竖伯,我是刑警寧澤存哲,帶...
    沈念sama閱讀 35,458評(píng)論 5 344
  • 正文 年R本政府宣布,位于F島的核電站七婴,受9級(jí)特大地震影響祟偷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜打厘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,068評(píng)論 3 327
  • 文/蒙蒙 一修肠、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧户盯,春花似錦嵌施、人聲如沸饲化。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)滓侍。三九已至,卻和暖如春牲芋,著一層夾襖步出監(jiān)牢的瞬間撩笆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工缸浦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留夕冲,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,797評(píng)論 2 369
  • 正文 我出身青樓裂逐,卻偏偏與公主長(zhǎng)得像歹鱼,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子卜高,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,654評(píng)論 2 354