管道模式
一等舔、類Unix串行管道:使用通道實現(xiàn)串行管道功能
我們在使用類Unix系統(tǒng)時常常用到管道命令,如"ls |grep 'path/to' "谣旁,它可以讓數(shù)據(jù)在多個命令操作中串行處理螟够。Go的通道也可以做到如此,利用通道通信的特性我們可以創(chuàng)建多個連續(xù)通道野舶,讓一個函數(shù)的輸出作為另一個函數(shù)的輸入,而另一個函數(shù)的輸出也可以作為其他函數(shù)的輸入宰衙。
Go標(biāo)準(zhǔn)庫中的io.Pipe()可以創(chuàng)建類Unix風(fēng)格管道平道,它適合純粹的IO系統(tǒng)原語的管道操作。然而go語言原語中的通道也可以做到類似的操作供炼,以下是Go并發(fā)編程的范式之一一屋,可以普適到更多的應(yīng)用場景。
// 管道過濾器范式
func Demo() {
a(b(c(source("source1", "source2", "source3"))))
}
func a(in <-chan string) {
for i := range in {
fmt.Println("a" + i)
}
}
func b(in <-chan string) <-chan string {
out := make(chan string, cap(in))
go func() {
defer close(out)
for i := range in {
out <- "b" + i
}
}()
return out
}
func c(in <-chan string) <-chan string {
out := make(chan string, cap(in))
go func() {
defer close(out)
for i := range in {
out <- "c" + i
}
}()
return out
}
// 管道輸入源
func source(inputs ...string) <-chan string {
out := make(chan string, len(inputs))
go func() {
defer close(out)
for _, item := range inputs {
out <- item
fmt.Println("source input:", item)
}
}()
return out
}
運行輸出:
=== RUN TestDemo21
source input: source1
source input: source2
source input: source3
abcsource1
abcsource2
abcsource3
--- PASS: TestDemo21 (0.00s)
PASS
二袋哼、構(gòu)建管道最佳實踐
以上為最簡單的模擬管道冀墨,但仔細(xì)一看還是存在問題的,我們在《防止Goroutine泄露》中討論過先嬉,管道每個階段都有可能出現(xiàn)協(xié)程泄露的風(fēng)險轧苫,我們可以引入done管道解決這個問題,下面看一個比較有實質(zhì)性的例子:通過一個生成器產(chǎn)生一些值并進(jìn)入管道運算,最后輸出值含懊。
// 生成器
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
// 乘法階段
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier:
}
}
}()
return multipliedStream
}
// 加法階段
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive:
}
}
}()
return addedStream
}
// 防止協(xié)程泄露的done管道
done := make(chan interface{})
defer close(done)
// 生成一些值身冬,并進(jìn)行管道運算
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
// 輸出管道運算結(jié)果
for v := range pipeline {
fmt.Println(v)
}
在這個管道模式中,我們看到兩件事:
- 在管道的末尾岔乔,可以使用range語句來提取值酥筝;
- 在每個階段可以安全地并發(fā)執(zhí)行,因為輸入和輸出在并發(fā)上下文中是安全的雏门。
看到關(guān)閉done通道是如何影響到管道的了么嘿歌?這是通過管道每個階段的兩件事情實現(xiàn)的:
- 對傳入的頻道進(jìn)行遍歷。當(dāng)輸入通道關(guān)閉時茁影,遍歷操作將退出宙帝。
- 發(fā)送操作與done通道共享select語句。
無論流水線階段處于等待數(shù)據(jù)通道的狀態(tài)募闲,還是處在等待發(fā)送通道關(guān)閉的狀態(tài)步脓,都會強制管道各階段終止。這里有一個復(fù)發(fā)關(guān)系浩螺。在管道開始時靴患,我們已經(jīng)確定必須將傳入的切片值轉(zhuǎn)換為通道。在這個過程中有兩點必須是可搶占的:
- 在生成器通道上創(chuàng)建值要出。
- 在其頻道上發(fā)送離散值鸳君。
在管道開始和結(jié)束之間,代碼總是在一個通道上遍歷患蹂,并在包含done通道的select語句內(nèi)的另一個通道上發(fā)送或颊。如果某個階段在傳入通道檢索到值時被阻塞,則該通道關(guān)閉時它將變?yōu)槲醋枞麪顟B(tài)传于。 如果某個階段在發(fā)送值時被阻塞饭宾,則由于select語句而可搶占。因此格了,整個管道始終可以通過關(guān)閉done通道來搶占。
三徽鼎、生成器模式:
在上面的generator中我們看到一個簡單的生成器盛末,咋一看還比較死板,我們可以利用通道構(gòu)建一個可獲取特定重復(fù)值的生成器否淤。
以下函數(shù)會重復(fù)你傳給它的值悄但,直到你告訴它停止:
var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
以下函數(shù)會從其傳入的valueStream中取出第一個元素然后退出:
var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
OK,讓我們組合它們使用石抡,看一個用例:
func Demo() {
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}
}
在這個基本的例子中檐嚣,我們創(chuàng)建了一個repeat生成器來生成無限數(shù)量的重復(fù)生成器,但是只取前10個。repeat生成器由take接收嚎京。雖然我們可以生成無線數(shù)量的流嗡贺,但只會生成n+1個實例,其中n是我們傳入take的數(shù)量鞍帝。
除了生成特定的固定數(shù)量的值诫睬,我們還可以擴(kuò)展一下,如果把repeat擴(kuò)展成repeatFn帕涌,我們可以生成任何數(shù)據(jù):
var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
繼續(xù)看用例:
func Demo() {
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 10) {
fmt.Println(num)
}
}