Golang 常見的并發(fā)模式

常見的并發(fā)模式

Go語言最吸引人的地方是它內(nèi)建的并發(fā)支持。Go語言并發(fā)體系的理論是C.A.R Hoare在1978年提出的CSP(Communicating Sequential Process预皇,通訊順序進(jìn)程)菲茬。CSP有著精確的數(shù)學(xué)模型猾担,并實(shí)際應(yīng)用在了Hoare參與設(shè)計(jì)的T9000通用計(jì)算機(jī)上辑奈。從NewSqueak袱瓮、Alef或油、Limbo到現(xiàn)在的Go語言逊拍,對于對CSP有著20多年實(shí)戰(zhàn)經(jīng)驗(yàn)的Rob Pike來說上鞠,他更關(guān)注的是將CSP應(yīng)用在通用編程語言上產(chǎn)生的潛力。作為Go并發(fā)編程核心的CSP理論的核心概念只有一個(gè):同步通信芯丧。關(guān)于同步通信的話題我們在前面一節(jié)已經(jīng)講過芍阎,本節(jié)我們將簡單介紹下Go語言中常見的并發(fā)模式。

首先要明確一個(gè)概念:并發(fā)不是并行缨恒。并發(fā)更關(guān)注的是程序的設(shè)計(jì)層面谴咸,并發(fā)的程序完全是可以順序執(zhí)行的度硝,只有在真正的多核CPU上才可能真正地同時(shí)運(yùn)行。并行更關(guān)注的是程序的運(yùn)行層面寿冕,并行一般是簡單的大量重復(fù)蕊程,例如GPU中對圖像處理都會有大量的并行運(yùn)算。為更好的編寫并發(fā)程序驼唱,從設(shè)計(jì)之初Go語言就注重如何在編程語言層級上設(shè)計(jì)一個(gè)簡潔安全高效的抽象模型藻茂,讓程序員專注于分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力玫恳。

在并發(fā)編程中辨赐,對共享資源的正確訪問需要精確的控制,在目前的絕大多數(shù)語言中京办,都是通過加鎖等線程同步方案來解決這一困難問題掀序,而Go語言卻另辟蹊徑,它將共享的值通過Channel傳遞(實(shí)際上多個(gè)獨(dú)立執(zhí)行的線程很少主動(dòng)共享資源)惭婿。在任意給定的時(shí)刻不恭,最好只有一個(gè)Goroutine能夠擁有該資源。數(shù)據(jù)競爭從設(shè)計(jì)層面上就被杜絕了财饥。為了提倡這種思考方式换吧,Go語言將其并發(fā)編程哲學(xué)化為一句口號:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共享內(nèi)存來通信,而應(yīng)通過通信來共享內(nèi)存钥星。

這是更高層次的并發(fā)編程哲學(xué)(通過管道來傳值是Go語言推薦的做法)沾瓦。雖然像引用計(jì)數(shù)這類簡單的并發(fā)問題通過原子操作或互斥鎖就能很好地實(shí)現(xiàn),但是通過Channel來控制訪問能夠讓你寫出更簡潔正確的程序谦炒。

并發(fā)版本的Hello world

我們先以在一個(gè)新的Goroutine中輸出“Hello world”贯莺,main等待后臺線程輸出工作完成之后退出,這樣一個(gè)簡單的并發(fā)程序作為熱身宁改。

并發(fā)編程的核心概念是同步通信缕探,但是同步的方式卻有多種。我們先以大家熟悉的互斥量sync.Mutex來實(shí)現(xiàn)同步通信透且。根據(jù)文檔撕蔼,我們不能直接對一個(gè)未加鎖狀態(tài)的sync.Mutex進(jìn)行解鎖豁鲤,這會導(dǎo)致運(yùn)行時(shí)異常秽誊。下面這種方式并不能保證正常工作:

func main() {
    var mu sync.Mutex

    go func(){
        fmt.Println("你好, 世界")
        mu.Lock()
    }()

    mu.Unlock()
}

因?yàn)?code>mu.Lock()和mu.Unlock()并不在同一個(gè)Goroutine中,所以也就不滿足順序一致性內(nèi)存模型琳骡。同時(shí)它們也沒有其它的同步事件可以參考锅论,這兩個(gè)事件不可排序也就是可以并發(fā)的。因?yàn)榭赡苁遣l(fā)的事件楣号,所以main函數(shù)中的mu.Unlock()很有可能先發(fā)生最易,而這個(gè)時(shí)刻mu互斥對象還處于未加鎖的狀態(tài)怒坯,從而會導(dǎo)致運(yùn)行時(shí)異常。

下面是修復(fù)后的代碼:

func main() {
    var mu sync.Mutex

    mu.Lock()
    go func(){
        fmt.Println("你好, 世界")
        mu.Unlock()
    }()

    mu.Lock()
}

修復(fù)的方式是在main函數(shù)所在線程中執(zhí)行兩次mu.Lock()藻懒,當(dāng)?shù)诙渭渔i時(shí)會因?yàn)殒i已經(jīng)被占用(不是遞歸鎖)而阻塞剔猿,main函數(shù)的阻塞狀態(tài)驅(qū)動(dòng)后臺線程繼續(xù)向前執(zhí)行。當(dāng)后臺線程執(zhí)行到mu.Unlock()時(shí)解鎖嬉荆,此時(shí)打印工作已經(jīng)完成了归敬,解鎖會導(dǎo)致main函數(shù)中的第二個(gè)mu.Lock()阻塞狀態(tài)取消,此時(shí)后臺線程和主線程再沒有其它的同步事件參考鄙早,它們退出的事件將是并發(fā)的:在main函數(shù)退出導(dǎo)致程序退出時(shí)汪茧,后臺線程可能已經(jīng)退出了,也可能沒有退出限番。雖然無法確定兩個(gè)線程退出的時(shí)間舱污,但是打印工作是可以正確完成的。

使用sync.Mutex互斥鎖同步是比較低級的做法弥虐。我們現(xiàn)在改用無緩存的管道來實(shí)現(xiàn)同步:

func main() {
    done := make(chan int)

    go func(){
        fmt.Println("你好, 世界")
        <-done
    }()

    done <- 1
}

根據(jù)Go語言內(nèi)存模型規(guī)范扩灯,對于從無緩沖Channel進(jìn)行的接收,發(fā)生在對該Channel進(jìn)行的發(fā)送完成之前霜瘪。因此驴剔,后臺線程<-done接收操作完成之后,main線程的done <- 1發(fā)送操作才可能完成(從而退出main粥庄、退出程序)丧失,而此時(shí)打印工作已經(jīng)完成了。

上面的代碼雖然可以正確同步惜互,但是對管道的緩存大小太敏感:如果管道有緩存的話布讹,就無法保證main退出之前后臺線程能正常打印了。更好的做法是將管道的發(fā)送和接收方向調(diào)換一下训堆,這樣可以避免同步事件受管道緩存大小的影響:

func main() {
    done := make(chan int, 1) // 帶緩存的管道

    go func(){
        fmt.Println("你好, 世界")
        done <- 1
    }()

    <-done
}

對于帶緩沖的Channel描验,對于Channel的第K個(gè)接收完成操作發(fā)生在第K+C個(gè)發(fā)送操作完成之前,其中C是Channel的緩存大小坑鱼。雖然管道是帶緩存的膘流,main線程接收完成是在后臺線程發(fā)送開始但還未完成的時(shí)刻,此時(shí)打印工作也是已經(jīng)完成的鲁沥。

基于帶緩存的管道呼股,我們可以很容易將打印線程擴(kuò)展到N個(gè)。下面的例子是開啟10個(gè)后臺線程分別打踊 :

func main() {
    done := make(chan int, 10) // 帶 10 個(gè)緩存

    // 開N個(gè)后臺打印線程
    for i := 0; i < cap(done); i++ {
        go func(){
            fmt.Println("你好, 世界")
            done <- 1
        }()
    }

    // 等待N個(gè)后臺線程完成
    for i := 0; i < cap(done); i++ {
        <-done
    }
}

對于這種要等待N個(gè)線程完成后再進(jìn)行下一步的同步操作有一個(gè)簡單的做法彭谁,就是使用sync.WaitGroup來等待一組事件:

func main() {
    var wg sync.WaitGroup

    // 開N個(gè)后臺打印線程
    for i := 0; i < 10; i++ {
        wg.Add(1)

        go func() {
            fmt.Println("你好, 世界")
            wg.Done()
        }()
    }

    // 等待N個(gè)后臺線程完成
    wg.Wait()
}

其中wg.Add(1)用于增加等待事件的個(gè)數(shù),必須確保在后臺線程啟動(dòng)之前執(zhí)行(如果放到后臺線程之中執(zhí)行則不能保證被正常執(zhí)行到)允扇。當(dāng)后臺線程完成打印工作之后缠局,調(diào)用wg.Done()表示完成一個(gè)事件则奥。main函數(shù)的wg.Wait()是等待全部的事件完成。

生產(chǎn)者消費(fèi)者模型

并發(fā)編程中最常見的例子就是生產(chǎn)者消費(fèi)者模式狭园,該模式主要通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度读处。簡單地說,就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù)唱矛,然后放到成果隊(duì)列中档泽,同時(shí)消費(fèi)者從成果隊(duì)列中來取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費(fèi)變成了異步的兩個(gè)過程揖赴。當(dāng)成果隊(duì)列中沒有數(shù)據(jù)時(shí)馆匿,消費(fèi)者就進(jìn)入饑餓的等待中;而當(dāng)成果隊(duì)列中數(shù)據(jù)已滿時(shí)燥滑,生產(chǎn)者則面臨因產(chǎn)品擠壓導(dǎo)致CPU被剝奪的下崗問題渐北。

Go語言實(shí)現(xiàn)生產(chǎn)者消費(fèi)者并發(fā)很簡單:

// 生產(chǎn)者: 生成 factor 整數(shù)倍的序列
func Producer(factor int, out chan<- int) {
    for i := 0; ; i++ {
        out <- i*factor
    }
}

// 消費(fèi)者
func Consumer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}
func main() {
    ch := make(chan int, 64) // 成果隊(duì)列

    go Producer(3, ch) // 生成 3 的倍數(shù)的序列
    go Producer(5, ch) // 生成 5 的倍數(shù)的序列
    go Consumer(ch)    // 消費(fèi) 生成的隊(duì)列

    // 運(yùn)行一定時(shí)間后退出
    time.Sleep(5 * time.Second)
}

我們開啟了2個(gè)Producer生產(chǎn)流水線,分別用于生成3和5的倍數(shù)的序列铭拧。然后開啟1個(gè)Consumer消費(fèi)者線程赃蛛,打印獲取的結(jié)果。我們通過在main函數(shù)休眠一定的時(shí)間來讓生產(chǎn)者和消費(fèi)者工作一定時(shí)間搀菩。正如前面一節(jié)說的呕臂,這種靠休眠方式是無法保證穩(wěn)定的輸出結(jié)果的。

我們可以讓main函數(shù)保存阻塞狀態(tài)不退出肪跋,只有當(dāng)用戶輸入Ctrl-C時(shí)才真正退出程序:

func main() {
    ch := make(chan int, 64) // 成果隊(duì)列

    go Producer(3, ch) // 生成 3 的倍數(shù)的序列
    go Producer(5, ch) // 生成 5 的倍數(shù)的序列
    go Consumer(ch)    // 消費(fèi) 生成的隊(duì)列

    // Ctrl+C 退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Printf("quit (%v)\n", <-sig)
}

我們這個(gè)例子中有2個(gè)生產(chǎn)者歧蒋,并且2個(gè)生產(chǎn)者之間并無同步事件可參考,它們是并發(fā)的州既。因此谜洽,消費(fèi)者輸出的結(jié)果序列的順序是不確定的,這并沒有問題吴叶,生產(chǎn)者和消費(fèi)者依然可以相互配合工作阐虚。

發(fā)布訂閱模型

發(fā)布訂閱(publish-and-subscribe)模型通常被簡寫為pub/sub模型。在這個(gè)模型中蚌卤,消息生產(chǎn)者成為發(fā)布者(publisher)实束,而消息消費(fèi)者則成為訂閱者(subscriber),生產(chǎn)者和消費(fèi)者是M:N的關(guān)系逊彭。在傳統(tǒng)生產(chǎn)者和消費(fèi)者模型中咸灿,是將消息發(fā)送到一個(gè)隊(duì)列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個(gè)主題诫龙。

為此析显,我們構(gòu)建了一個(gè)名為pubsub的發(fā)布訂閱模型支持包:

// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub

import (
    "sync"
    "time"
)

type (
    subscriber chan interface{}         // 訂閱者為一個(gè)管道
    topicFunc  func(v interface{}) bool // 主題為一個(gè)過濾器
)

// 發(fā)布者對象
type Publisher struct {
    m           sync.RWMutex             // 讀寫鎖
    buffer      int                      // 訂閱隊(duì)列的緩存大小
    timeout     time.Duration            // 發(fā)布超時(shí)時(shí)間
    subscribers map[subscriber]topicFunc // 訂閱者信息
}

// 構(gòu)建一個(gè)發(fā)布者對象, 可以設(shè)置發(fā)布超時(shí)時(shí)間和緩存隊(duì)列的長度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
    return &Publisher{
        buffer:      buffer,
        timeout:     publishTimeout,
        subscribers: make(map[subscriber]topicFunc),
    }
}

// 添加一個(gè)新的訂閱者,訂閱全部主題
func (p *Publisher) Subscribe() chan interface{} {
    return p.SubscribeTopic(nil)
}

// 添加一個(gè)新的訂閱者签赃,訂閱過濾器篩選后的主題
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
    ch := make(chan interface{}, p.buffer)
    p.m.Lock()
    p.subscribers[ch] = topic
    p.m.Unlock()
    return ch
}

// 退出訂閱
func (p *Publisher) Evict(sub chan interface{}) {
    p.m.Lock()
    defer p.m.Unlock()

    delete(p.subscribers, sub)
    close(sub)
}

// 發(fā)布一個(gè)主題
func (p *Publisher) Publish(v interface{}) {
    p.m.RLock()
    defer p.m.RUnlock()

    var wg sync.WaitGroup
    for sub, topic := range p.subscribers {
        wg.Add(1)
        go p.sendTopic(sub, topic, v, &wg)
    }
    wg.Wait()
}

// 關(guān)閉發(fā)布者對象谷异,同時(shí)關(guān)閉所有的訂閱者管道。
func (p *Publisher) Close() {
    p.m.Lock()
    defer p.m.Unlock()

    for sub := range p.subscribers {
        delete(p.subscribers, sub)
        close(sub)
    }
}

// 發(fā)送主題锦聊,可以容忍一定的超時(shí)
func (p *Publisher) sendTopic(
    sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
    defer wg.Done()
    if topic != nil && !topic(v) {
        return
    }

    select {
    case sub <- v:
    case <-time.After(p.timeout):
    }
}

下面的例子中歹嘹,有兩個(gè)訂閱者分別訂閱了全部主題和含有"golang"的主題:

import "path/to/pubsub"

func main() {
    p := pubsub.NewPublisher(100*time.Millisecond, 10)
    defer p.Close()

    all := p.Subscribe()
    golang := p.SubscribeTopic(func(v interface{}) bool {
        if s, ok := v.(string); ok {
            return strings.Contains(s, "golang")
        }
        return false
    })

    p.Publish("hello,  world!")
    p.Publish("hello, golang!")

    go func() {
        for  msg := range all {
            fmt.Println("all:", msg)
        }
    } ()

    go func() {
        for  msg := range golang {
            fmt.Println("golang:", msg)
        }
    } ()

    // 運(yùn)行一定時(shí)間后退出
    time.Sleep(3 * time.Second)
}

在發(fā)布訂閱模型中,每條消息都會傳送給多個(gè)訂閱者孔庭。發(fā)布者通常不會知道尺上、也不關(guān)心哪一個(gè)訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運(yùn)行時(shí)動(dòng)態(tài)添加圆到,是一種松散的耦合關(guān)系怎抛,這使得系統(tǒng)的復(fù)雜性可以隨時(shí)間的推移而增長。在現(xiàn)實(shí)生活中芽淡,像天氣預(yù)報(bào)之類的應(yīng)用就可以應(yīng)用這個(gè)并發(fā)模式马绝。

控制并發(fā)數(shù)

很多用戶在適應(yīng)了Go語言強(qiáng)大的并發(fā)特性之后,都傾向于編寫最大并發(fā)的程序挣菲,因?yàn)檫@樣似乎可以提供最大的性能富稻。在現(xiàn)實(shí)中我們行色匆匆,但有時(shí)卻需要我們放慢腳步享受生活白胀,并發(fā)的程序也是一樣:有時(shí)候我們需要適當(dāng)?shù)乜刂撇l(fā)的程度椭赋,因?yàn)檫@樣不僅僅可給其它的應(yīng)用/任務(wù)讓出/預(yù)留一定的CPU資源,也可以適當(dāng)降低功耗緩解電池的壓力或杠。

在Go語言自帶的godoc程序?qū)崿F(xiàn)中有一個(gè)vfs的包對應(yīng)虛擬的文件系統(tǒng)哪怔,在vfs包下面有一個(gè)gatefs的子包,gatefs子包的目的就是為了控制訪問該虛擬文件系統(tǒng)的最大并發(fā)數(shù)向抢。gatefs包的應(yīng)用很簡單:

import (
    "golang.org/x/tools/godoc/vfs"
    "golang.org/x/tools/godoc/vfs/gatefs"
)

func main() {
    fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
    // ...
}

其中vfs.OS("/path")基于本地文件系統(tǒng)構(gòu)造一個(gè)虛擬的文件系統(tǒng)蔓涧,然后gatefs.New基于現(xiàn)有的虛擬文件系統(tǒng)構(gòu)造一個(gè)并發(fā)受控的虛擬文件系統(tǒng)。并發(fā)數(shù)控制的原理在前面一節(jié)已經(jīng)講過笋额,就是通過帶緩存管道的發(fā)送和接收規(guī)則來實(shí)現(xiàn)最大并發(fā)阻塞:

var limit = make(chan int, 3)

func main() {
    for _, w := range work {
        go func() {
            limit <- 1
            w()
            <-limit
        }()
    }
    select{}
}

不過gatefs對此做一個(gè)抽象類型gate元暴,增加了enterleave方法分別對應(yīng)并發(fā)代碼的進(jìn)入和離開。當(dāng)超出并發(fā)數(shù)目限制的時(shí)候兄猩,enter方法會阻塞直到并發(fā)數(shù)降下來為止茉盏。

type gate chan bool

func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }

gatefs包裝的新的虛擬文件系統(tǒng)就是將需要控制并發(fā)的方法增加了enterleave調(diào)用而已:

type gatefs struct {
    fs vfs.FileSystem
    gate
}

func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
    fs.enter()
    defer fs.leave()
    return fs.fs.Lstat(p)
}

我們不僅可以控制最大的并發(fā)數(shù)目,而且可以通過帶緩存Channel的使用量和最大容量比例來判斷程序運(yùn)行的并發(fā)率枢冤。當(dāng)管道為空的時(shí)候可以認(rèn)為是空閑狀態(tài)鸠姨,當(dāng)管道滿了時(shí)任務(wù)是繁忙狀態(tài),這對于后臺一些低級任務(wù)的運(yùn)行是有參考價(jià)值的淹真。

贏者為王

采用并發(fā)編程的動(dòng)機(jī)有很多:并發(fā)編程可以簡化問題讶迁,比如一類問題對應(yīng)一個(gè)處理線程會更簡單;并發(fā)編程還可以提升性能核蘸,在一個(gè)多核CPU上開2個(gè)線程一般會比開1個(gè)線程快一些巍糯。其實(shí)對于提升性能而言啸驯,程序并不是簡單地運(yùn)行速度快就表示用戶體驗(yàn)好的;很多時(shí)候程序能快速響應(yīng)用戶請求才是最重要的祟峦,當(dāng)沒有用戶請求需要處理的時(shí)候才合適處理一些低優(yōu)先級的后臺任務(wù)罚斗。

假設(shè)我們想快速地搜索“golang”相關(guān)的主題,我們可能會同時(shí)打開Bing宅楞、Google或百度等多個(gè)檢索引擎针姿。當(dāng)某個(gè)搜索最先返回結(jié)果后,就可以關(guān)閉其它搜索頁面了厌衙。因?yàn)槭芫W(wǎng)絡(luò)環(huán)境和搜索引擎算法的影響距淫,某些搜索引擎可能很快返回搜索結(jié)果,某些搜索引擎也可能等到他們公司倒閉也沒有完成搜索婶希。我們可以采用類似的策略來編寫這個(gè)程序:

func main() {
    ch := make(chan string, 32)

    go func() {
        ch <- searchByBing("golang")
    }()
    go func() {
        ch <- searchByGoogle("golang")
    }()
    go func() {
        ch <- searchByBaidu("golang")
    }()

    fmt.Println(<-ch)
}

首先榕暇,我們創(chuàng)建了一個(gè)帶緩存的管道,管道的緩存數(shù)目要足夠大饲趋,保證不會因?yàn)榫彺娴娜萘恳鸩槐匾淖枞战摇H缓笪覀冮_啟了多個(gè)后臺線程,分別向不同的搜索引擎提交搜索請求奕塑。當(dāng)任意一個(gè)搜索引擎最先有結(jié)果之后堂污,都會馬上將結(jié)果發(fā)到管道中(因?yàn)楣艿缼Я俗銐虻木彺妫@個(gè)過程不會阻塞)龄砰。但是最終我們只從管道取第一個(gè)結(jié)果盟猖,也就是最先返回的結(jié)果。

通過適當(dāng)開啟一些冗余的線程换棚,嘗試用不同途徑去解決同樣的問題式镐,最終以贏者為王的方式提升了程序的相應(yīng)性能。

素?cái)?shù)篩

在“Hello world 的革命”一節(jié)中固蚤,我們?yōu)榱搜菔綨ewsqueak的并發(fā)特性娘汞,文中給出了并發(fā)版本素?cái)?shù)篩的實(shí)現(xiàn)。并發(fā)版本的素?cái)?shù)篩是一個(gè)經(jīng)典的并發(fā)例子夕玩,通過它我們可以更深刻地理解Go語言的并發(fā)特性你弦。“素?cái)?shù)篩”的原理如圖:

[圖片上傳失敗...(image-b304d-1554179460547)]

圖 1-13 素?cái)?shù)篩

我們需要先生成最初的2, 3, 4, ...自然數(shù)序列(不包含開頭的0燎孟、1):

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            ch <- i
        }
    }()
    return ch
}

GenerateNatural函數(shù)內(nèi)部啟動(dòng)一個(gè)Goroutine生產(chǎn)序列禽作,返回對應(yīng)的管道。

然后是為每個(gè)素?cái)?shù)構(gòu)造一個(gè)篩子:將輸入序列中是素?cái)?shù)倍數(shù)的數(shù)提出揩页,并返回新的序列旷偿,是一個(gè)新的管道。

// 管道過濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(in <-chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                out <- i
            }
        }
    }()
    return out
}

PrimeFilter函數(shù)也是內(nèi)部啟動(dòng)一個(gè)Goroutine生產(chǎn)序列,返回過濾后序列對應(yīng)的管道萍程。

現(xiàn)在我們可以在main函數(shù)中驅(qū)動(dòng)這個(gè)并發(fā)的素?cái)?shù)篩了:

func main() {
    ch := GenerateNatural() // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素?cái)?shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        ch = PrimeFilter(ch, prime) // 基于新素?cái)?shù)構(gòu)造的過濾器
    }
}

我們先是調(diào)用GenerateNatural()生成最原始的從2開始的自然數(shù)序列幢妄。然后開始一個(gè)100次迭代的循環(huán),希望生成100個(gè)素?cái)?shù)尘喝。在每次循環(huán)迭代開始的時(shí)候磁浇,管道中的第一個(gè)數(shù)必定是素?cái)?shù)斋陪,我們先讀取并打印這個(gè)素?cái)?shù)朽褪。然后基于管道中剩余的數(shù)列,并以當(dāng)前取出的素?cái)?shù)為篩子過濾后面的素?cái)?shù)无虚。不同的素?cái)?shù)篩子對應(yīng)的管道是串聯(lián)在一起的缔赠。

素?cái)?shù)篩展示了一種優(yōu)雅的并發(fā)程序結(jié)構(gòu)。但是因?yàn)槊總€(gè)并發(fā)體處理的任務(wù)粒度太細(xì)微友题,程序整體的性能并不理想嗤堰。對于細(xì)粒度的并發(fā)程序,CSP模型中固有的消息傳遞的代價(jià)太高了(多線程并發(fā)模型同樣要面臨線程啟動(dòng)的代價(jià))度宦。

并發(fā)的安全退出

有時(shí)候我們需要通知goroutine停止它正在干的事情踢匣,特別是當(dāng)它工作在錯(cuò)誤的方向上的時(shí)候。Go語言并沒有提供在一個(gè)直接終止Goroutine的方法戈抄,由于這樣會導(dǎo)致goroutine之間的共享變量處在未定義的狀態(tài)上离唬。但是如果我們想要退出兩個(gè)或者任意多個(gè)Goroutine怎么辦呢?

Go語言中不同Goroutine之間主要依靠管道進(jìn)行通信和同步划鸽。要同時(shí)處理多個(gè)管道的發(fā)送或接收操作输莺,我們需要使用select關(guān)鍵字(這個(gè)關(guān)鍵字和網(wǎng)絡(luò)編程中的select函數(shù)的行為類似)。當(dāng)select有多個(gè)分支時(shí)裸诽,會隨機(jī)選擇一個(gè)可用的管道分支嫂用,如果沒有可用的管道分支則選擇default分支,否則會一直保存阻塞狀態(tài)丈冬。

基于select實(shí)現(xiàn)的管道的超時(shí)判斷:

select {
case v := <-in:
    fmt.Println(v)
case <-time.After(time.Second):
    return // 超時(shí)
}

通過selectdefault分支實(shí)現(xiàn)非阻塞的管道發(fā)送或接收操作:

select {
case v := <-in:
    fmt.Println(v)
default:
    // 沒有數(shù)據(jù)
}

通過select來阻止main函數(shù)退出:

func main() {
    // do some thins
    select{}
}

當(dāng)有多個(gè)管道均可操作時(shí)呆奕,select會隨機(jī)選擇一個(gè)管道÷С龋基于該特性我們可以用select實(shí)現(xiàn)一個(gè)生成隨機(jī)數(shù)序列的程序:

func main() {
    ch := make(chan int)
    go func() {
        for {
            select {
            case ch <- 0:
            case ch <- 1:
            }
        }
    }()

    for v := range ch {
        fmt.Println(v)
    }
}

我們通過selectdefault分支可以很容易實(shí)現(xiàn)一個(gè)Goroutine的退出控制:

func worker(cannel chan bool) {
    for {
        select {
        default:
            fmt.Println("hello")
            // 正常工作
        case <-cannel:
            // 退出
        }
    }
}

func main() {
    cannel := make(chan bool)
    go worker(cannel)

    time.Sleep(time.Second)
    cannel <- true
}

但是管道的發(fā)送操作和接收操作是一一對應(yīng)的劈榨,如果要停止多個(gè)Goroutine那么可能需要?jiǎng)?chuàng)建同樣數(shù)量的管道,這個(gè)代價(jià)太大了粒梦。其實(shí)我們可以通過close關(guān)閉一個(gè)管道來實(shí)現(xiàn)廣播的效果亮航,所有從關(guān)閉管道接收的操作均會收到一個(gè)零值和一個(gè)可選的失敗標(biāo)志。

func worker(cannel chan bool) {
    for {
        select {
        default:
            fmt.Println("hello")
            // 正常工作
        case <-cannel:
            // 退出
        }
    }
}

func main() {
    cancel := make(chan bool)

    for i := 0; i < 10; i++ {
        go worker(cancel)
    }

    time.Sleep(time.Second)
    close(cancel)
}

我們通過close來關(guān)閉cancel管道向多個(gè)Goroutine廣播退出的指令匀们。不過這個(gè)程序依然不夠穩(wěn)浇闪堋:當(dāng)每個(gè)Goroutine收到退出指令退出時(shí)一般會進(jìn)行一定的清理工作,但是退出的清理工作并不能保證被完成,因?yàn)?code>main線程并沒有等待各個(gè)工作Goroutine退出工作完成的機(jī)制重抖。我們可以結(jié)合sync.WaitGroup來改進(jìn):

func worker(wg *sync.WaitGroup, cannel chan bool) {
    defer wg.Done()

    for {
        select {
        default:
            fmt.Println("hello")
        case <-cannel:
            return
        }
    }
}

func main() {
    cancel := make(chan bool)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(&wg, cancel)
    }

    time.Sleep(time.Second)
    close(cancel)
    wg.Wait()
}

現(xiàn)在每個(gè)工作者并發(fā)體的創(chuàng)建露氮、運(yùn)行、暫停和退出都是在main函數(shù)的安全控制之下了钟沛。

context包

在Go1.7發(fā)布時(shí)畔规,標(biāo)準(zhǔn)庫增加了一個(gè)context包,用來簡化對于處理單個(gè)請求的多個(gè)Goroutine之間與請求域的數(shù)據(jù)恨统、超時(shí)和退出等操作叁扫,官方有博文對此做了專門介紹。我們可以用context包來重新實(shí)現(xiàn)前面的線程安全退出或超時(shí)的控制:

func worker(ctx context.Context, wg *sync.WaitGroup) error {
    defer wg.Done()

    for {
        select {
        default:
            fmt.Println("hello")
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go worker(ctx, &wg)
    }

    time.Sleep(time.Second)
    cancel()

    wg.Wait()
}

當(dāng)并發(fā)體超時(shí)或main主動(dòng)停止工作者Goroutine時(shí)畜埋,每個(gè)工作者都可以安全退出莫绣。

Go語言是帶內(nèi)存自動(dòng)回收特性的,因此內(nèi)存一般不會泄漏悠鞍。在前面素?cái)?shù)篩的例子中对室,GenerateNaturalPrimeFilter函數(shù)內(nèi)部都啟動(dòng)了新的Goroutine,當(dāng)main函數(shù)不再使用管道時(shí)后臺Goroutine有泄漏的風(fēng)險(xiǎn)咖祭。我們可以通過context包來避免這個(gè)問題掩宜,下面是改進(jìn)的素?cái)?shù)篩實(shí)現(xiàn):

// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            select {
            case <- ctx.Done():
                return
            case ch <- i:
            }
        }
    }()
    return ch
}

// 管道過濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                select {
                case <- ctx.Done():
                    return
                case out <- i:
                }
            }
        }
    }()
    return out
}

func main() {
    // 通過 Context 控制后臺Goroutine狀態(tài)
    ctx, cancel := context.WithCancel(context.Background())

    ch := GenerateNatural(ctx) // 自然數(shù)序列: 2, 3, 4, ...
    for i := 0; i < 100; i++ {
        prime := <-ch // 新出現(xiàn)的素?cái)?shù)
        fmt.Printf("%v: %v\n", i+1, prime)
        ch = PrimeFilter(ctx, ch, prime) // 基于新素?cái)?shù)構(gòu)造的過濾器
    }

    cancel()
}

當(dāng)main函數(shù)完成工作前,通過調(diào)用cancel()來通知后臺Goroutine退出么翰,這樣就避免了Goroutine的泄漏牺汤。

并發(fā)是一個(gè)非常大的主題,我們這里只是展示幾個(gè)非秤舶埃基礎(chǔ)的并發(fā)編程的例子慧瘤。官方文檔也有很多關(guān)于并發(fā)編程的討論,國內(nèi)也有專門討論Go語言并發(fā)編程的書籍固该。讀者可以根據(jù)自己的需求查閱相關(guān)的文獻(xiàn)锅减。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市伐坏,隨后出現(xiàn)的幾起案子怔匣,更是在濱河造成了極大的恐慌,老刑警劉巖桦沉,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件每瞒,死亡現(xiàn)場離奇詭異,居然都是意外死亡纯露,警方通過查閱死者的電腦和手機(jī)剿骨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來埠褪,“玉大人浓利,你說我怎么就攤上這事挤庇。” “怎么了贷掖?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵嫡秕,是天一觀的道長。 經(jīng)常有香客問我苹威,道長昆咽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任牙甫,我火速辦了婚禮掷酗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘腹暖。我一直安慰自己汇在,他們只是感情好翰萨,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布脏答。 她就那樣靜靜地躺著,像睡著了一般亩鬼。 火紅的嫁衣襯著肌膚如雪殖告。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天雳锋,我揣著相機(jī)與錄音黄绩,去河邊找鬼。 笑死玷过,一個(gè)胖子當(dāng)著我的面吹牛爽丹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播辛蚊,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼粤蝎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了袋马?” 一聲冷哼從身側(cè)響起初澎,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎虑凛,沒想到半個(gè)月后碑宴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡桑谍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年延柠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锣披。...
    茶點(diǎn)故事閱讀 39,731評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贞间,死狀恐怖匕积,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情榜跌,我是刑警寧澤闪唆,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站钓葫,受9級特大地震影響悄蕾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜础浮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一帆调、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧豆同,春花似錦番刊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至鸭廷,卻和暖如春枣抱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辆床。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工佳晶, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人讼载。 一個(gè)月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓轿秧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親咨堤。 傳聞我的和親對象是個(gè)殘疾皇子菇篡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內(nèi)容