Go 并發(fā)編程:通道應(yīng)用范式之管道模式

管道模式

一等舔、類Unix串行管道:使用通道實現(xiàn)串行管道功能

我們在使用類Unix系統(tǒng)時常常用到管道命令,如"ls |grep 'path/to' "谣旁,它可以讓數(shù)據(jù)在多個命令操作中串行處理螟够。Go的通道也可以做到如此,利用通道通信的特性我們可以創(chuàng)建多個連續(xù)通道野舶,讓一個函數(shù)的輸出作為另一個函數(shù)的輸入,而另一個函數(shù)的輸出也可以作為其他函數(shù)的輸入宰衙。

Go標(biāo)準(zhǔn)庫中的io.Pipe()可以創(chuàng)建類Unix風(fēng)格管道平道,它適合純粹的IO系統(tǒng)原語的管道操作。然而go語言原語中的通道也可以做到類似的操作供炼,以下是Go并發(fā)編程的范式之一一屋,可以普適到更多的應(yīng)用場景。

// 管道過濾器范式
func Demo() {
    a(b(c(source("source1", "source2", "source3"))))
}

func a(in <-chan string) {
    for i := range in {
        fmt.Println("a" + i)
    }
}

func b(in <-chan string) <-chan string {
    out := make(chan string, cap(in))
    go func() {
        defer close(out)
        for i := range in {
            out <- "b" + i
        }
    }()
    return out
}

func c(in <-chan string) <-chan string {
    out := make(chan string, cap(in))
    go func() {
        defer close(out)
        for i := range in {
            out <- "c" + i
        }
    }()
    return out
}

// 管道輸入源
func source(inputs ...string) <-chan string {
    out := make(chan string, len(inputs))

    go func() {
        defer close(out)
        for _, item := range inputs {
            out <- item
            fmt.Println("source input:", item)
        }
    }()
    return out
}

運行輸出:

=== RUN   TestDemo21
source input: source1
source input: source2
source input: source3
abcsource1
abcsource2
abcsource3
--- PASS: TestDemo21 (0.00s)
PASS

二袋哼、構(gòu)建管道最佳實踐

以上為最簡單的模擬管道冀墨,但仔細(xì)一看還是存在問題的,我們在《防止Goroutine泄露》中討論過先嬉,管道每個階段都有可能出現(xiàn)協(xié)程泄露的風(fēng)險轧苫,我們可以引入done管道解決這個問題,下面看一個比較有實質(zhì)性的例子:通過一個生成器產(chǎn)生一些值并進(jìn)入管道運算,最后輸出值含懊。

// 生成器
generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

// 乘法階段
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i * multiplier:
            }
        }
    }()

    return multipliedStream
}

// 加法階段
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i + additive:
            }
        }
    }()
    return addedStream
}

// 防止協(xié)程泄露的done管道
done := make(chan interface{})
defer close(done)

// 生成一些值身冬,并進(jìn)行管道運算
intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

// 輸出管道運算結(jié)果
for v := range pipeline {
    fmt.Println(v)
}

在這個管道模式中,我們看到兩件事:

  • 在管道的末尾岔乔,可以使用range語句來提取值酥筝;
  • 在每個階段可以安全地并發(fā)執(zhí)行,因為輸入和輸出在并發(fā)上下文中是安全的雏门。

看到關(guān)閉done通道是如何影響到管道的了么嘿歌?這是通過管道每個階段的兩件事情實現(xiàn)的:

  • 對傳入的頻道進(jìn)行遍歷。當(dāng)輸入通道關(guān)閉時茁影,遍歷操作將退出宙帝。
  • 發(fā)送操作與done通道共享select語句。

無論流水線階段處于等待數(shù)據(jù)通道的狀態(tài)募闲,還是處在等待發(fā)送通道關(guān)閉的狀態(tài)步脓,都會強制管道各階段終止。這里有一個復(fù)發(fā)關(guān)系浩螺。在管道開始時靴患,我們已經(jīng)確定必須將傳入的切片值轉(zhuǎn)換為通道。在這個過程中有兩點必須是可搶占的:

  • 在生成器通道上創(chuàng)建值要出。
  • 在其頻道上發(fā)送離散值鸳君。

在管道開始和結(jié)束之間,代碼總是在一個通道上遍歷患蹂,并在包含done通道的select語句內(nèi)的另一個通道上發(fā)送或颊。如果某個階段在傳入通道檢索到值時被阻塞,則該通道關(guān)閉時它將變?yōu)槲醋枞麪顟B(tài)传于。 如果某個階段在發(fā)送值時被阻塞饭宾,則由于select語句而可搶占。因此格了,整個管道始終可以通過關(guān)閉done通道來搶占。

三徽鼎、生成器模式:

在上面的generator中我們看到一個簡單的生成器盛末,咋一看還比較死板,我們可以利用通道構(gòu)建一個可獲取特定重復(fù)值的生成器否淤。

以下函數(shù)會重復(fù)你傳給它的值悄但,直到你告訴它停止:

var repeat = func(done <-chan interface{}, values ...interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

以下函數(shù)會從其傳入的valueStream中取出第一個元素然后退出:

var take = func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {

    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

OK,讓我們組合它們使用石抡,看一個用例:

func  Demo() {
    done := make(chan interface{})
    defer close(done)

    for num := range take(done, repeat(done, 1), 10) {
        fmt.Printf("%v ", num)
    }
}

在這個基本的例子中檐嚣,我們創(chuàng)建了一個repeat生成器來生成無限數(shù)量的重復(fù)生成器,但是只取前10個。repeat生成器由take接收嚎京。雖然我們可以生成無線數(shù)量的流嗡贺,但只會生成n+1個實例,其中n是我們傳入take的數(shù)量鞍帝。

除了生成特定的固定數(shù)量的值诫睬,我們還可以擴(kuò)展一下,如果把repeat擴(kuò)展成repeatFn帕涌,我們可以生成任何數(shù)據(jù):

var repeatFn = func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

繼續(xù)看用例:

func Demo() {
    done := make(chan interface{})
    defer close(done)
    rand := func() interface{} {
        return rand.Int()
    }
    for num := range take(done, repeatFn(done, rand), 10) {
        fmt.Println(num)
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末摄凡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蚓曼,更是在濱河造成了極大的恐慌亲澡,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纫版,死亡現(xiàn)場離奇詭異床绪,居然都是意外死亡,警方通過查閱死者的電腦和手機捎琐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門会涎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瑞凑,你說我怎么就攤上這事末秃。” “怎么了籽御?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵练慕,是天一觀的道長。 經(jīng)常有香客問我技掏,道長铃将,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任哑梳,我火速辦了婚禮劲阎,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鸠真。我一直安慰自己悯仙,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布吠卷。 她就那樣靜靜地躺著锡垄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪祭隔。 梳的紋絲不亂的頭發(fā)上货岭,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天,我揣著相機與錄音,去河邊找鬼千贯。 笑死屯仗,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丈牢。 我是一名探鬼主播祭钉,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼己沛!你這毒婦竟也來了慌核?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤申尼,失蹤者是張志新(化名)和其女友劉穎垮卓,沒想到半個月后共虑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體褐奴,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡晦嵌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年油狂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片兼都。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡泄伪,死狀恐怖失都,靈堂內(nèi)的尸體忽然破棺而出后控,到底是詐尸還是另有隱情庙曙,我是刑警寧澤,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布浩淘,位于F島的核電站捌朴,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏张抄。R本人自食惡果不足惜砂蔽,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望署惯。 院中可真熱鬧左驾,春花似錦、人聲如沸极谊。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽怀酷。三九已至,卻和暖如春嗜闻,著一層夾襖步出監(jiān)牢的瞬間蜕依,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留样眠,地道東北人友瘤。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像檐束,于是被迫代替她去往敵國和親辫秧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容