簡書不維護(hù)了肴捉,歡迎關(guān)注我的知乎:波羅學(xué)的個(gè)人主頁
這篇文章較長將會(huì)分兩篇來翻譯。原文地址:https://blog.golang.org/pipelines
介紹
Go并發(fā)模型使構(gòu)建能有效利用IO和多核CPU的實(shí)時(shí)流式數(shù)據(jù)的pipeline非常方便办成。這篇文章將對(duì)此進(jìn)行介紹,同時(shí)會(huì)著重強(qiáng)調(diào)一些在實(shí)踐中的易犯錯(cuò)誤以及對(duì)應(yīng)的解決方法。
什么是Pipeline
在GO中,pipeline無明確定義壤躲;它是語言提供的一種并發(fā)編程方式,由連接各個(gè)chanel而形成的一系列階段組成备燃。在其各個(gè)階段碉克,可能分別運(yùn)行著很多的goroutine。這些goroutine
- 從輸入channel接收數(shù)據(jù)
- 對(duì)數(shù)據(jù)作相應(yīng)處理并齐,例如在此基礎(chǔ)上產(chǎn)生新數(shù)據(jù)
- 再通過輸出channel把數(shù)據(jù)發(fā)送出去
除了開始和結(jié)束漏麦,每個(gè)階段都會(huì)包含任意多個(gè)輸入和輸出channel。開始階段只有輸出channel况褪,結(jié)束階段只有輸入channel撕贞。相應(yīng)地,開始階段可被稱為生產(chǎn)者测垛,結(jié)束階段可被稱為消費(fèi)者捏膨。
我們先通過一個(gè)簡單的例子來說明。
并發(fā)計(jì)算平方數(shù)
首先來舉一個(gè)三階段pipeline的例子
第一階段食侮,創(chuàng)建輸入?yún)?shù)為可變長int整數(shù)的gen
函數(shù)号涯,它通過goroutine發(fā)送所有輸入?yún)?shù),并在發(fā)送完成后關(guān)閉相應(yīng)channel:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
reutrn out
}
第二階段锯七,sq函數(shù)链快,負(fù)責(zé)從輸入channel中接收數(shù)據(jù)并作平方處理再發(fā)送到輸出channel中。在輸入channel關(guān)閉并把所有數(shù)據(jù)都成功發(fā)送至輸出channel眉尸,關(guān)閉輸出channel:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
主函數(shù)main
中創(chuàng)建了pipeline域蜗,并執(zhí)行了最后階段的任務(wù),從管道中接收了第二階段的數(shù)據(jù)并打印了出來:
func main() {
// Set up the pipeline
c := gen(2, 3)
out := sql(c)
// Consume the output
fmt.Println(<-out)
fmt.Println(<-out)
}
此處sq
函數(shù)的輸入和輸出參數(shù)為相同類型的channel噪猾,因此我們可以對(duì)其進(jìn)行組合霉祸。重寫main
函數(shù),如下:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n)
}
}
此處相等于在pipeline中增加了一個(gè)階段袱蜡,即涉及到了三個(gè)階段脉执,其中2、3階段的goroutine由同一函數(shù)產(chǎn)生戒劫。
Fan-out和Fan-in (扇出和扇入)
多個(gè)函數(shù)可同時(shí)從同一個(gè)channel中讀取數(shù)據(jù)半夷,直到channel關(guān)閉,稱為fan-out
迅细。這為我們提供了一種將任務(wù)分發(fā)給多個(gè)worker的途徑巫橄,從而實(shí)現(xiàn)CPU和I/O的高效利用。
通過多路復(fù)用技術(shù)將多個(gè)channel合并到單個(gè)channel實(shí)現(xiàn)從多個(gè)輸入讀取數(shù)據(jù)的能力茵典,只有當(dāng)所有的輸入都關(guān)閉湘换,才會(huì)停止數(shù)據(jù)的讀取。這個(gè)稱作 fan-in
。
重寫之前的main
彩倚,我們調(diào)用兩次sq
筹我,且兩次都從同一個(gè)channel中讀取數(shù)據(jù)。我們將引入一個(gè)新的函數(shù)帆离,通過fan-in方式獲取數(shù)據(jù):
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in
c1 := sq(in)
c2 := sq(in)
// Comsume the merged output from c1 and c2
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge函數(shù)通過為每個(gè)輸入channel啟動(dòng)一個(gè)goroutine實(shí)現(xiàn)將數(shù)據(jù)發(fā)送同一個(gè)channel中蔬蕊,從完成將channel列表轉(zhuǎn)化為單個(gè)channel的功能。一旦所有的輸出channel(生產(chǎn)者)啟動(dòng)哥谷,merge
就會(huì)啟動(dòng)一或多個(gè)goroutine接收所有數(shù)據(jù)并在結(jié)束后關(guān)閉對(duì)應(yīng)channel岸夯。
在已關(guān)閉的channel發(fā)送數(shù)據(jù)會(huì)導(dǎo)致panic,因此保證關(guān)閉channel前所有數(shù)據(jù)都發(fā)送完畢是非常重要的们妥。sync.WaitGroup
為我們提供了一種實(shí)現(xiàn)該同步的方式猜扮。示例如下:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs)
for _, c := range(cs) {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
goroutine突然停止
一些使用原則:
- 在數(shù)據(jù)發(fā)送完畢后,關(guān)閉輸出channel监婶;
- 持續(xù)不停地從輸入中接收數(shù)據(jù)旅赢,直到channel關(guān)閉;
我們可使用for循環(huán)來接收數(shù)據(jù),一旦數(shù)據(jù)所有數(shù)據(jù)接收完畢將會(huì)自動(dòng)退出惑惶。
但在真實(shí)場(chǎng)景中煮盼,并非所有情況都需要接收完所有的數(shù)據(jù)。有時(shí)設(shè)計(jì)如此集惋,我們只需一部分的數(shù)據(jù)并可運(yùn)行。更常見地踩娘,如果輸入早早就拋出了一個(gè)錯(cuò)誤刮刑,這時(shí)該階段便會(huì)早早地退出。還有一些情況养渴,如接收者不再等待數(shù)據(jù)接收雷绢,此時(shí)也需停止數(shù)據(jù)的生產(chǎn)。
在我們的例子中理卑,如果一個(gè)階段沒有成功接收完數(shù)據(jù)就退出翘紊,我們的goroutine仍會(huì)嘗試發(fā)送數(shù)據(jù),這將會(huì)導(dǎo)致channel進(jìn)入無限期的阻塞藐唠。
func main() {
// Consume the first value from the output
out := merge(c1, c2)
fmt.Println(<-out)
return
// Since we didn't receive the second value from out,
// one of the output goroutine is hung attempting to send it
}
這是資源泄露:goroutine會(huì)繼續(xù)消耗內(nèi)存帆疟、運(yùn)行時(shí)資源,而且在棧中的堆引用也不能被回收宇立。goroutine必須退出踪宠,才能啟動(dòng)垃圾回收機(jī)制。
當(dāng)下游不能完全接收所有數(shù)據(jù)時(shí)妈嘹,我們需要準(zhǔn)備將上游goroutine退出柳琢。一種方式,我們可以把上游的channel設(shè)定為一個(gè)buffer。當(dāng)buffer還有空間時(shí)柬脸,發(fā)送操作將會(huì)立刻完成他去。
c := make(chan int, 2)
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
當(dāng)channel被創(chuàng)建時(shí),如果我們已經(jīng)知道數(shù)據(jù)的大小倒堕,可以如此來簡化我們的代碼灾测。比如,我們重寫gen函數(shù)涩馆,拷貝數(shù)據(jù)到buffer channel中行施,可以避免創(chuàng)建新的goroutine。
func gen(nums ...int) int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
我們可以考慮給merge函數(shù)輸出channel指定固定大小空間魂那。
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan, int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
}
雖然這解決了程序中g(shù)oroutine阻塞的問題蛾号,但并不是好代碼。指定buffer的長度我們需知道m(xù)erge將接收值的數(shù)量涯雅。如果下游讀取少量數(shù)據(jù)便結(jié)束鲜结,依然會(huì)阻塞。
我們需要一種方式活逆,使下游通知上游以表明它們不再接收信息精刷。