常見的并發(fā)模式
Go語言最吸引人的地方是它內(nèi)建的并發(fā)支持。Go語言并發(fā)體系的理論是C.A.R Hoare在1978年提出的CSP(Communicating Sequential Process预皇,通訊順序進(jìn)程)菲茬。CSP有著精確的數(shù)學(xué)模型猾担,并實(shí)際應(yīng)用在了Hoare參與設(shè)計(jì)的T9000通用計(jì)算機(jī)上辑奈。從NewSqueak袱瓮、Alef或油、Limbo到現(xiàn)在的Go語言逊拍,對于對CSP有著20多年實(shí)戰(zhàn)經(jīng)驗(yàn)的Rob Pike來說上鞠,他更關(guān)注的是將CSP應(yīng)用在通用編程語言上產(chǎn)生的潛力。作為Go并發(fā)編程核心的CSP理論的核心概念只有一個(gè):同步通信芯丧。關(guān)于同步通信的話題我們在前面一節(jié)已經(jīng)講過芍阎,本節(jié)我們將簡單介紹下Go語言中常見的并發(fā)模式。
首先要明確一個(gè)概念:并發(fā)不是并行缨恒。并發(fā)更關(guān)注的是程序的設(shè)計(jì)層面谴咸,并發(fā)的程序完全是可以順序執(zhí)行的度硝,只有在真正的多核CPU上才可能真正地同時(shí)運(yùn)行。并行更關(guān)注的是程序的運(yùn)行層面寿冕,并行一般是簡單的大量重復(fù)蕊程,例如GPU中對圖像處理都會有大量的并行運(yùn)算。為更好的編寫并發(fā)程序驼唱,從設(shè)計(jì)之初Go語言就注重如何在編程語言層級上設(shè)計(jì)一個(gè)簡潔安全高效的抽象模型藻茂,讓程序員專注于分解問題和組合方案,而且不用被線程管理和信號互斥這些繁瑣的操作分散精力玫恳。
在并發(fā)編程中辨赐,對共享資源的正確訪問需要精確的控制,在目前的絕大多數(shù)語言中京办,都是通過加鎖等線程同步方案來解決這一困難問題掀序,而Go語言卻另辟蹊徑,它將共享的值通過Channel傳遞(實(shí)際上多個(gè)獨(dú)立執(zhí)行的線程很少主動(dòng)共享資源)惭婿。在任意給定的時(shí)刻不恭,最好只有一個(gè)Goroutine能夠擁有該資源。數(shù)據(jù)競爭從設(shè)計(jì)層面上就被杜絕了财饥。為了提倡這種思考方式换吧,Go語言將其并發(fā)編程哲學(xué)化為一句口號:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通過共享內(nèi)存來通信,而應(yīng)通過通信來共享內(nèi)存钥星。
這是更高層次的并發(fā)編程哲學(xué)(通過管道來傳值是Go語言推薦的做法)沾瓦。雖然像引用計(jì)數(shù)這類簡單的并發(fā)問題通過原子操作或互斥鎖就能很好地實(shí)現(xiàn),但是通過Channel來控制訪問能夠讓你寫出更簡潔正確的程序谦炒。
并發(fā)版本的Hello world
我們先以在一個(gè)新的Goroutine中輸出“Hello world”贯莺,main
等待后臺線程輸出工作完成之后退出,這樣一個(gè)簡單的并發(fā)程序作為熱身宁改。
并發(fā)編程的核心概念是同步通信缕探,但是同步的方式卻有多種。我們先以大家熟悉的互斥量sync.Mutex
來實(shí)現(xiàn)同步通信透且。根據(jù)文檔撕蔼,我們不能直接對一個(gè)未加鎖狀態(tài)的sync.Mutex
進(jìn)行解鎖豁鲤,這會導(dǎo)致運(yùn)行時(shí)異常秽誊。下面這種方式并不能保證正常工作:
func main() {
var mu sync.Mutex
go func(){
fmt.Println("你好, 世界")
mu.Lock()
}()
mu.Unlock()
}
因?yàn)?code>mu.Lock()和mu.Unlock()
并不在同一個(gè)Goroutine中,所以也就不滿足順序一致性內(nèi)存模型琳骡。同時(shí)它們也沒有其它的同步事件可以參考锅论,這兩個(gè)事件不可排序也就是可以并發(fā)的。因?yàn)榭赡苁遣l(fā)的事件楣号,所以main
函數(shù)中的mu.Unlock()
很有可能先發(fā)生最易,而這個(gè)時(shí)刻mu
互斥對象還處于未加鎖的狀態(tài)怒坯,從而會導(dǎo)致運(yùn)行時(shí)異常。
下面是修復(fù)后的代碼:
func main() {
var mu sync.Mutex
mu.Lock()
go func(){
fmt.Println("你好, 世界")
mu.Unlock()
}()
mu.Lock()
}
修復(fù)的方式是在main
函數(shù)所在線程中執(zhí)行兩次mu.Lock()
藻懒,當(dāng)?shù)诙渭渔i時(shí)會因?yàn)殒i已經(jīng)被占用(不是遞歸鎖)而阻塞剔猿,main
函數(shù)的阻塞狀態(tài)驅(qū)動(dòng)后臺線程繼續(xù)向前執(zhí)行。當(dāng)后臺線程執(zhí)行到mu.Unlock()
時(shí)解鎖嬉荆,此時(shí)打印工作已經(jīng)完成了归敬,解鎖會導(dǎo)致main
函數(shù)中的第二個(gè)mu.Lock()
阻塞狀態(tài)取消,此時(shí)后臺線程和主線程再沒有其它的同步事件參考鄙早,它們退出的事件將是并發(fā)的:在main
函數(shù)退出導(dǎo)致程序退出時(shí)汪茧,后臺線程可能已經(jīng)退出了,也可能沒有退出限番。雖然無法確定兩個(gè)線程退出的時(shí)間舱污,但是打印工作是可以正確完成的。
使用sync.Mutex
互斥鎖同步是比較低級的做法弥虐。我們現(xiàn)在改用無緩存的管道來實(shí)現(xiàn)同步:
func main() {
done := make(chan int)
go func(){
fmt.Println("你好, 世界")
<-done
}()
done <- 1
}
根據(jù)Go語言內(nèi)存模型規(guī)范扩灯,對于從無緩沖Channel進(jìn)行的接收,發(fā)生在對該Channel進(jìn)行的發(fā)送完成之前霜瘪。因此驴剔,后臺線程<-done
接收操作完成之后,main
線程的done <- 1
發(fā)送操作才可能完成(從而退出main粥庄、退出程序)丧失,而此時(shí)打印工作已經(jīng)完成了。
上面的代碼雖然可以正確同步惜互,但是對管道的緩存大小太敏感:如果管道有緩存的話布讹,就無法保證main退出之前后臺線程能正常打印了。更好的做法是將管道的發(fā)送和接收方向調(diào)換一下训堆,這樣可以避免同步事件受管道緩存大小的影響:
func main() {
done := make(chan int, 1) // 帶緩存的管道
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
<-done
}
對于帶緩沖的Channel描验,對于Channel的第K個(gè)接收完成操作發(fā)生在第K+C個(gè)發(fā)送操作完成之前,其中C是Channel的緩存大小坑鱼。雖然管道是帶緩存的膘流,main
線程接收完成是在后臺線程發(fā)送開始但還未完成的時(shí)刻,此時(shí)打印工作也是已經(jīng)完成的鲁沥。
基于帶緩存的管道呼股,我們可以很容易將打印線程擴(kuò)展到N個(gè)。下面的例子是開啟10個(gè)后臺線程分別打踊 :
func main() {
done := make(chan int, 10) // 帶 10 個(gè)緩存
// 開N個(gè)后臺打印線程
for i := 0; i < cap(done); i++ {
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
}
// 等待N個(gè)后臺線程完成
for i := 0; i < cap(done); i++ {
<-done
}
}
對于這種要等待N個(gè)線程完成后再進(jìn)行下一步的同步操作有一個(gè)簡單的做法彭谁,就是使用sync.WaitGroup
來等待一組事件:
func main() {
var wg sync.WaitGroup
// 開N個(gè)后臺打印線程
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
fmt.Println("你好, 世界")
wg.Done()
}()
}
// 等待N個(gè)后臺線程完成
wg.Wait()
}
其中wg.Add(1)
用于增加等待事件的個(gè)數(shù),必須確保在后臺線程啟動(dòng)之前執(zhí)行(如果放到后臺線程之中執(zhí)行則不能保證被正常執(zhí)行到)允扇。當(dāng)后臺線程完成打印工作之后缠局,調(diào)用wg.Done()
表示完成一個(gè)事件则奥。main
函數(shù)的wg.Wait()
是等待全部的事件完成。
生產(chǎn)者消費(fèi)者模型
并發(fā)編程中最常見的例子就是生產(chǎn)者消費(fèi)者模式狭园,該模式主要通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度读处。簡單地說,就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù)唱矛,然后放到成果隊(duì)列中档泽,同時(shí)消費(fèi)者從成果隊(duì)列中來取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費(fèi)變成了異步的兩個(gè)過程揖赴。當(dāng)成果隊(duì)列中沒有數(shù)據(jù)時(shí)馆匿,消費(fèi)者就進(jìn)入饑餓的等待中;而當(dāng)成果隊(duì)列中數(shù)據(jù)已滿時(shí)燥滑,生產(chǎn)者則面臨因產(chǎn)品擠壓導(dǎo)致CPU被剝奪的下崗問題渐北。
Go語言實(shí)現(xiàn)生產(chǎn)者消費(fèi)者并發(fā)很簡單:
// 生產(chǎn)者: 生成 factor 整數(shù)倍的序列
func Producer(factor int, out chan<- int) {
for i := 0; ; i++ {
out <- i*factor
}
}
// 消費(fèi)者
func Consumer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 64) // 成果隊(duì)列
go Producer(3, ch) // 生成 3 的倍數(shù)的序列
go Producer(5, ch) // 生成 5 的倍數(shù)的序列
go Consumer(ch) // 消費(fèi) 生成的隊(duì)列
// 運(yùn)行一定時(shí)間后退出
time.Sleep(5 * time.Second)
}
我們開啟了2個(gè)Producer
生產(chǎn)流水線,分別用于生成3和5的倍數(shù)的序列铭拧。然后開啟1個(gè)Consumer
消費(fèi)者線程赃蛛,打印獲取的結(jié)果。我們通過在main
函數(shù)休眠一定的時(shí)間來讓生產(chǎn)者和消費(fèi)者工作一定時(shí)間搀菩。正如前面一節(jié)說的呕臂,這種靠休眠方式是無法保證穩(wěn)定的輸出結(jié)果的。
我們可以讓main
函數(shù)保存阻塞狀態(tài)不退出肪跋,只有當(dāng)用戶輸入Ctrl-C
時(shí)才真正退出程序:
func main() {
ch := make(chan int, 64) // 成果隊(duì)列
go Producer(3, ch) // 生成 3 的倍數(shù)的序列
go Producer(5, ch) // 生成 5 的倍數(shù)的序列
go Consumer(ch) // 消費(fèi) 生成的隊(duì)列
// Ctrl+C 退出
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("quit (%v)\n", <-sig)
}
我們這個(gè)例子中有2個(gè)生產(chǎn)者歧蒋,并且2個(gè)生產(chǎn)者之間并無同步事件可參考,它們是并發(fā)的州既。因此谜洽,消費(fèi)者輸出的結(jié)果序列的順序是不確定的,這并沒有問題吴叶,生產(chǎn)者和消費(fèi)者依然可以相互配合工作阐虚。
發(fā)布訂閱模型
發(fā)布訂閱(publish-and-subscribe)模型通常被簡寫為pub/sub模型。在這個(gè)模型中蚌卤,消息生產(chǎn)者成為發(fā)布者(publisher)实束,而消息消費(fèi)者則成為訂閱者(subscriber),生產(chǎn)者和消費(fèi)者是M:N的關(guān)系逊彭。在傳統(tǒng)生產(chǎn)者和消費(fèi)者模型中咸灿,是將消息發(fā)送到一個(gè)隊(duì)列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個(gè)主題诫龙。
為此析显,我們構(gòu)建了一個(gè)名為pubsub
的發(fā)布訂閱模型支持包:
// Package pubsub implements a simple multi-topic pub-sub library.
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} // 訂閱者為一個(gè)管道
topicFunc func(v interface{}) bool // 主題為一個(gè)過濾器
)
// 發(fā)布者對象
type Publisher struct {
m sync.RWMutex // 讀寫鎖
buffer int // 訂閱隊(duì)列的緩存大小
timeout time.Duration // 發(fā)布超時(shí)時(shí)間
subscribers map[subscriber]topicFunc // 訂閱者信息
}
// 構(gòu)建一個(gè)發(fā)布者對象, 可以設(shè)置發(fā)布超時(shí)時(shí)間和緩存隊(duì)列的長度
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一個(gè)新的訂閱者,訂閱全部主題
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// 添加一個(gè)新的訂閱者签赃,訂閱過濾器篩選后的主題
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// 退出訂閱
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 發(fā)布一個(gè)主題
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// 關(guān)閉發(fā)布者對象谷异,同時(shí)關(guān)閉所有的訂閱者管道。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// 發(fā)送主題锦聊,可以容忍一定的超時(shí)
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub <- v:
case <-time.After(p.timeout):
}
}
下面的例子中歹嘹,有兩個(gè)訂閱者分別訂閱了全部主題和含有"golang"的主題:
import "path/to/pubsub"
func main() {
p := pubsub.NewPublisher(100*time.Millisecond, 10)
defer p.Close()
all := p.Subscribe()
golang := p.SubscribeTopic(func(v interface{}) bool {
if s, ok := v.(string); ok {
return strings.Contains(s, "golang")
}
return false
})
p.Publish("hello, world!")
p.Publish("hello, golang!")
go func() {
for msg := range all {
fmt.Println("all:", msg)
}
} ()
go func() {
for msg := range golang {
fmt.Println("golang:", msg)
}
} ()
// 運(yùn)行一定時(shí)間后退出
time.Sleep(3 * time.Second)
}
在發(fā)布訂閱模型中,每條消息都會傳送給多個(gè)訂閱者孔庭。發(fā)布者通常不會知道尺上、也不關(guān)心哪一個(gè)訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運(yùn)行時(shí)動(dòng)態(tài)添加圆到,是一種松散的耦合關(guān)系怎抛,這使得系統(tǒng)的復(fù)雜性可以隨時(shí)間的推移而增長。在現(xiàn)實(shí)生活中芽淡,像天氣預(yù)報(bào)之類的應(yīng)用就可以應(yīng)用這個(gè)并發(fā)模式马绝。
控制并發(fā)數(shù)
很多用戶在適應(yīng)了Go語言強(qiáng)大的并發(fā)特性之后,都傾向于編寫最大并發(fā)的程序挣菲,因?yàn)檫@樣似乎可以提供最大的性能富稻。在現(xiàn)實(shí)中我們行色匆匆,但有時(shí)卻需要我們放慢腳步享受生活白胀,并發(fā)的程序也是一樣:有時(shí)候我們需要適當(dāng)?shù)乜刂撇l(fā)的程度椭赋,因?yàn)檫@樣不僅僅可給其它的應(yīng)用/任務(wù)讓出/預(yù)留一定的CPU資源,也可以適當(dāng)降低功耗緩解電池的壓力或杠。
在Go語言自帶的godoc程序?qū)崿F(xiàn)中有一個(gè)vfs
的包對應(yīng)虛擬的文件系統(tǒng)哪怔,在vfs
包下面有一個(gè)gatefs
的子包,gatefs
子包的目的就是為了控制訪問該虛擬文件系統(tǒng)的最大并發(fā)數(shù)向抢。gatefs
包的應(yīng)用很簡單:
import (
"golang.org/x/tools/godoc/vfs"
"golang.org/x/tools/godoc/vfs/gatefs"
)
func main() {
fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
// ...
}
其中vfs.OS("/path")
基于本地文件系統(tǒng)構(gòu)造一個(gè)虛擬的文件系統(tǒng)蔓涧,然后gatefs.New
基于現(xiàn)有的虛擬文件系統(tǒng)構(gòu)造一個(gè)并發(fā)受控的虛擬文件系統(tǒng)。并發(fā)數(shù)控制的原理在前面一節(jié)已經(jīng)講過笋额,就是通過帶緩存管道的發(fā)送和接收規(guī)則來實(shí)現(xiàn)最大并發(fā)阻塞:
var limit = make(chan int, 3)
func main() {
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
select{}
}
不過gatefs
對此做一個(gè)抽象類型gate
元暴,增加了enter
和leave
方法分別對應(yīng)并發(fā)代碼的進(jìn)入和離開。當(dāng)超出并發(fā)數(shù)目限制的時(shí)候兄猩,enter
方法會阻塞直到并發(fā)數(shù)降下來為止茉盏。
type gate chan bool
func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }
gatefs
包裝的新的虛擬文件系統(tǒng)就是將需要控制并發(fā)的方法增加了enter
和leave
調(diào)用而已:
type gatefs struct {
fs vfs.FileSystem
gate
}
func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
fs.enter()
defer fs.leave()
return fs.fs.Lstat(p)
}
我們不僅可以控制最大的并發(fā)數(shù)目,而且可以通過帶緩存Channel的使用量和最大容量比例來判斷程序運(yùn)行的并發(fā)率枢冤。當(dāng)管道為空的時(shí)候可以認(rèn)為是空閑狀態(tài)鸠姨,當(dāng)管道滿了時(shí)任務(wù)是繁忙狀態(tài),這對于后臺一些低級任務(wù)的運(yùn)行是有參考價(jià)值的淹真。
贏者為王
采用并發(fā)編程的動(dòng)機(jī)有很多:并發(fā)編程可以簡化問題讶迁,比如一類問題對應(yīng)一個(gè)處理線程會更簡單;并發(fā)編程還可以提升性能核蘸,在一個(gè)多核CPU上開2個(gè)線程一般會比開1個(gè)線程快一些巍糯。其實(shí)對于提升性能而言啸驯,程序并不是簡單地運(yùn)行速度快就表示用戶體驗(yàn)好的;很多時(shí)候程序能快速響應(yīng)用戶請求才是最重要的祟峦,當(dāng)沒有用戶請求需要處理的時(shí)候才合適處理一些低優(yōu)先級的后臺任務(wù)罚斗。
假設(shè)我們想快速地搜索“golang”相關(guān)的主題,我們可能會同時(shí)打開Bing宅楞、Google或百度等多個(gè)檢索引擎针姿。當(dāng)某個(gè)搜索最先返回結(jié)果后,就可以關(guān)閉其它搜索頁面了厌衙。因?yàn)槭芫W(wǎng)絡(luò)環(huán)境和搜索引擎算法的影響距淫,某些搜索引擎可能很快返回搜索結(jié)果,某些搜索引擎也可能等到他們公司倒閉也沒有完成搜索婶希。我們可以采用類似的策略來編寫這個(gè)程序:
func main() {
ch := make(chan string, 32)
go func() {
ch <- searchByBing("golang")
}()
go func() {
ch <- searchByGoogle("golang")
}()
go func() {
ch <- searchByBaidu("golang")
}()
fmt.Println(<-ch)
}
首先榕暇,我們創(chuàng)建了一個(gè)帶緩存的管道,管道的緩存數(shù)目要足夠大饲趋,保證不會因?yàn)榫彺娴娜萘恳鸩槐匾淖枞战摇H缓笪覀冮_啟了多個(gè)后臺線程,分別向不同的搜索引擎提交搜索請求奕塑。當(dāng)任意一個(gè)搜索引擎最先有結(jié)果之后堂污,都會馬上將結(jié)果發(fā)到管道中(因?yàn)楣艿缼Я俗銐虻木彺妫@個(gè)過程不會阻塞)龄砰。但是最終我們只從管道取第一個(gè)結(jié)果盟猖,也就是最先返回的結(jié)果。
通過適當(dāng)開啟一些冗余的線程换棚,嘗試用不同途徑去解決同樣的問題式镐,最終以贏者為王的方式提升了程序的相應(yīng)性能。
素?cái)?shù)篩
在“Hello world 的革命”一節(jié)中固蚤,我們?yōu)榱搜菔綨ewsqueak的并發(fā)特性娘汞,文中給出了并發(fā)版本素?cái)?shù)篩的實(shí)現(xiàn)。并發(fā)版本的素?cái)?shù)篩是一個(gè)經(jīng)典的并發(fā)例子夕玩,通過它我們可以更深刻地理解Go語言的并發(fā)特性你弦。“素?cái)?shù)篩”的原理如圖:
[圖片上傳失敗...(image-b304d-1554179460547)]
圖 1-13 素?cái)?shù)篩
我們需要先生成最初的2, 3, 4, ...
自然數(shù)序列(不包含開頭的0燎孟、1):
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
GenerateNatural
函數(shù)內(nèi)部啟動(dòng)一個(gè)Goroutine生產(chǎn)序列禽作,返回對應(yīng)的管道。
然后是為每個(gè)素?cái)?shù)構(gòu)造一個(gè)篩子:將輸入序列中是素?cái)?shù)倍數(shù)的數(shù)提出揩页,并返回新的序列旷偿,是一個(gè)新的管道。
// 管道過濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
PrimeFilter
函數(shù)也是內(nèi)部啟動(dòng)一個(gè)Goroutine生產(chǎn)序列,返回過濾后序列對應(yīng)的管道萍程。
現(xiàn)在我們可以在main
函數(shù)中驅(qū)動(dòng)這個(gè)并發(fā)的素?cái)?shù)篩了:
func main() {
ch := GenerateNatural() // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ch, prime) // 基于新素?cái)?shù)構(gòu)造的過濾器
}
}
我們先是調(diào)用GenerateNatural()
生成最原始的從2開始的自然數(shù)序列幢妄。然后開始一個(gè)100次迭代的循環(huán),希望生成100個(gè)素?cái)?shù)尘喝。在每次循環(huán)迭代開始的時(shí)候磁浇,管道中的第一個(gè)數(shù)必定是素?cái)?shù)斋陪,我們先讀取并打印這個(gè)素?cái)?shù)朽褪。然后基于管道中剩余的數(shù)列,并以當(dāng)前取出的素?cái)?shù)為篩子過濾后面的素?cái)?shù)无虚。不同的素?cái)?shù)篩子對應(yīng)的管道是串聯(lián)在一起的缔赠。
素?cái)?shù)篩展示了一種優(yōu)雅的并發(fā)程序結(jié)構(gòu)。但是因?yàn)槊總€(gè)并發(fā)體處理的任務(wù)粒度太細(xì)微友题,程序整體的性能并不理想嗤堰。對于細(xì)粒度的并發(fā)程序,CSP模型中固有的消息傳遞的代價(jià)太高了(多線程并發(fā)模型同樣要面臨線程啟動(dòng)的代價(jià))度宦。
并發(fā)的安全退出
有時(shí)候我們需要通知goroutine停止它正在干的事情踢匣,特別是當(dāng)它工作在錯(cuò)誤的方向上的時(shí)候。Go語言并沒有提供在一個(gè)直接終止Goroutine的方法戈抄,由于這樣會導(dǎo)致goroutine之間的共享變量處在未定義的狀態(tài)上离唬。但是如果我們想要退出兩個(gè)或者任意多個(gè)Goroutine怎么辦呢?
Go語言中不同Goroutine之間主要依靠管道進(jìn)行通信和同步划鸽。要同時(shí)處理多個(gè)管道的發(fā)送或接收操作输莺,我們需要使用select
關(guān)鍵字(這個(gè)關(guān)鍵字和網(wǎng)絡(luò)編程中的select
函數(shù)的行為類似)。當(dāng)select
有多個(gè)分支時(shí)裸诽,會隨機(jī)選擇一個(gè)可用的管道分支嫂用,如果沒有可用的管道分支則選擇default
分支,否則會一直保存阻塞狀態(tài)丈冬。
基于select
實(shí)現(xiàn)的管道的超時(shí)判斷:
select {
case v := <-in:
fmt.Println(v)
case <-time.After(time.Second):
return // 超時(shí)
}
通過select
的default
分支實(shí)現(xiàn)非阻塞的管道發(fā)送或接收操作:
select {
case v := <-in:
fmt.Println(v)
default:
// 沒有數(shù)據(jù)
}
通過select
來阻止main
函數(shù)退出:
func main() {
// do some thins
select{}
}
當(dāng)有多個(gè)管道均可操作時(shí)呆奕,select
會隨機(jī)選擇一個(gè)管道÷С龋基于該特性我們可以用select
實(shí)現(xiàn)一個(gè)生成隨機(jī)數(shù)序列的程序:
func main() {
ch := make(chan int)
go func() {
for {
select {
case ch <- 0:
case ch <- 1:
}
}
}()
for v := range ch {
fmt.Println(v)
}
}
我們通過select
和default
分支可以很容易實(shí)現(xiàn)一個(gè)Goroutine的退出控制:
func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cannel:
// 退出
}
}
}
func main() {
cannel := make(chan bool)
go worker(cannel)
time.Sleep(time.Second)
cannel <- true
}
但是管道的發(fā)送操作和接收操作是一一對應(yīng)的劈榨,如果要停止多個(gè)Goroutine那么可能需要?jiǎng)?chuàng)建同樣數(shù)量的管道,這個(gè)代價(jià)太大了粒梦。其實(shí)我們可以通過close
關(guān)閉一個(gè)管道來實(shí)現(xiàn)廣播的效果亮航,所有從關(guān)閉管道接收的操作均會收到一個(gè)零值和一個(gè)可選的失敗標(biāo)志。
func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cannel:
// 退出
}
}
}
func main() {
cancel := make(chan bool)
for i := 0; i < 10; i++ {
go worker(cancel)
}
time.Sleep(time.Second)
close(cancel)
}
我們通過close
來關(guān)閉cancel
管道向多個(gè)Goroutine廣播退出的指令匀们。不過這個(gè)程序依然不夠穩(wěn)浇闪堋:當(dāng)每個(gè)Goroutine收到退出指令退出時(shí)一般會進(jìn)行一定的清理工作,但是退出的清理工作并不能保證被完成,因?yàn)?code>main線程并沒有等待各個(gè)工作Goroutine退出工作完成的機(jī)制重抖。我們可以結(jié)合sync.WaitGroup
來改進(jìn):
func worker(wg *sync.WaitGroup, cannel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cannel:
return
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel)
wg.Wait()
}
現(xiàn)在每個(gè)工作者并發(fā)體的創(chuàng)建露氮、運(yùn)行、暫停和退出都是在main
函數(shù)的安全控制之下了钟沛。
context包
在Go1.7發(fā)布時(shí)畔规,標(biāo)準(zhǔn)庫增加了一個(gè)context
包,用來簡化對于處理單個(gè)請求的多個(gè)Goroutine之間與請求域的數(shù)據(jù)恨统、超時(shí)和退出等操作叁扫,官方有博文對此做了專門介紹。我們可以用context
包來重新實(shí)現(xiàn)前面的線程安全退出或超時(shí)的控制:
func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done():
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}
當(dāng)并發(fā)體超時(shí)或main
主動(dòng)停止工作者Goroutine時(shí)畜埋,每個(gè)工作者都可以安全退出莫绣。
Go語言是帶內(nèi)存自動(dòng)回收特性的,因此內(nèi)存一般不會泄漏悠鞍。在前面素?cái)?shù)篩的例子中对室,GenerateNatural
和PrimeFilter
函數(shù)內(nèi)部都啟動(dòng)了新的Goroutine,當(dāng)main
函數(shù)不再使用管道時(shí)后臺Goroutine有泄漏的風(fēng)險(xiǎn)咖祭。我們可以通過context
包來避免這個(gè)問題掩宜,下面是改進(jìn)的素?cái)?shù)篩實(shí)現(xiàn):
// 返回生成自然數(shù)序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道過濾器: 刪除能被素?cái)?shù)整除的數(shù)
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
select {
case <- ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
// 通過 Context 控制后臺Goroutine狀態(tài)
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx) // 自然數(shù)序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出現(xiàn)的素?cái)?shù)
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ctx, ch, prime) // 基于新素?cái)?shù)構(gòu)造的過濾器
}
cancel()
}
當(dāng)main函數(shù)完成工作前,通過調(diào)用cancel()
來通知后臺Goroutine退出么翰,這樣就避免了Goroutine的泄漏牺汤。
并發(fā)是一個(gè)非常大的主題,我們這里只是展示幾個(gè)非秤舶埃基礎(chǔ)的并發(fā)編程的例子慧瘤。官方文檔也有很多關(guān)于并發(fā)編程的討論,國內(nèi)也有專門討論Go語言并發(fā)編程的書籍固该。讀者可以根據(jù)自己的需求查閱相關(guān)的文獻(xiàn)锅减。