并發(fā)是編程里面一個非常重要的概念蹂风,Go語言在語言層面天生支持并發(fā),這也是Go語言流行的一個很重要的原因沟沙。
Go語言中的并發(fā)編程
并發(fā)與并行
并發(fā):同一時間段內(nèi)執(zhí)行多個任務(wù)(你在用微信和兩個女朋友聊天)河劝。
并行:同一時刻執(zhí)行多個任務(wù)(你和你朋友都在用微信和女朋友聊天)。
Go語言的并發(fā)通過goroutine
實(shí)現(xiàn)矛紫。goroutine
類似于線程赎瞎,屬于用戶態(tài)的線程,我們可以根據(jù)需要創(chuàng)建成千上萬個goroutine
并發(fā)工作颊咬。goroutine
是由Go語言的運(yùn)行時(runtime)調(diào)度完成务甥,而線程是由操作系統(tǒng)調(diào)度完成。
Go語言還提供channel
在多個goroutine
間進(jìn)行通信喳篇。goroutine
和channel
是 Go 語言秉承的 CSP(Communicating Sequential Process)并發(fā)模式的重要實(shí)現(xiàn)基礎(chǔ)敞临。
goroutine
在java/c++中我們要實(shí)現(xiàn)并發(fā)編程的時候,我們通常需要自己維護(hù)一個線程池麸澜,并且需要自己去包裝一個又一個的任務(wù)挺尿,同時需要自己去調(diào)度線程執(zhí)行任務(wù)并維護(hù)上下文切換,這一切通常會耗費(fèi)程序員大量的心智炊邦。那么能不能有一種機(jī)制编矾,程序員只需要定義很多個任務(wù),讓系統(tǒng)去幫助我們把這些任務(wù)分配到CPU上實(shí)現(xiàn)并發(fā)執(zhí)行呢馁害?
Go語言中的goroutine
就是這樣一種機(jī)制窄俏,goroutine
的概念類似于線程,但 goroutine
是由Go的運(yùn)行時(runtime)調(diào)度和管理的碘菜。Go程序會智能地將 goroutine 中的任務(wù)合理地分配給每個CPU凹蜈。Go語言之所以被稱為現(xiàn)代化的編程語言限寞,就是因?yàn)樗谡Z言層面已經(jīng)內(nèi)置了調(diào)度和上下文切換的機(jī)制。
在Go語言編程中你不需要去自己寫進(jìn)程踪区、線程昆烁、協(xié)程,你的技能包里只有一個技能–goroutine
缎岗,當(dāng)你需要讓某個任務(wù)并發(fā)執(zhí)行的時候静尼,你只需要把這個任務(wù)包裝成一個函數(shù),開啟一個goroutine
去執(zhí)行這個函數(shù)就可以了传泊,就是這么簡單粗暴鼠渺。
使用goroutine
Go語言中使用goroutine
非常簡單,只需要在調(diào)用函數(shù)的時候在前面加上go
關(guān)鍵字眷细,就可以為一個函數(shù)創(chuàng)建一個goroutine
拦盹。
一個goroutine
必定對應(yīng)一個函數(shù),可以創(chuàng)建多個goroutine
去執(zhí)行相同的函數(shù)溪椎。
啟動單個goroutine
啟動goroutine的方式非常簡單普舆,只需要在調(diào)用的函數(shù)(普通函數(shù)和匿名函數(shù))前面加上一個go
關(guān)鍵字。
舉個例子如下:
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
這個示例中hello函數(shù)和下面的語句是串行的校读,執(zhí)行的結(jié)果是打印完Hello Goroutine!
后打印main goroutine done!
沼侣。
接下來我們在調(diào)用hello函數(shù)前面加上關(guān)鍵字go
,也就是啟動一個goroutine去執(zhí)行hello這個函數(shù)歉秫。
func main() {
go hello() // 啟動另外一個goroutine去執(zhí)行hello函數(shù)
fmt.Println("main goroutine done!")
}
這一次的執(zhí)行結(jié)果只打印了main goroutine done!
蛾洛,并沒有打印Hello Goroutine!
。為什么呢?
在程序啟動時,Go程序就會為main()
函數(shù)創(chuàng)建一個默認(rèn)的goroutine
葛假。
當(dāng)main()函數(shù)返回的時候該goroutine
就結(jié)束了绑谣,所有在main()
函數(shù)中啟動的goroutine
會一同結(jié)束,main
函數(shù)所在的goroutine
就像是權(quán)利的游戲中的夜王,其他的goroutine
都是異鬼,夜王一死它轉(zhuǎn)化的那些異鬼也就全部GG了。
所以我們要想辦法讓main函數(shù)等一等hello函數(shù)椿浓,最簡單粗暴的方式就是time.Sleep
了。
func main() {
go hello() // 啟動另外一個goroutine去執(zhí)行hello函數(shù)
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
執(zhí)行上面的代碼你會發(fā)現(xiàn)闽晦,這一次先打印main goroutine done!
扳碍,然后緊接著打印Hello Goroutine!
。
首先為什么會先打印main goroutine done!
是因?yàn)槲覀冊趧?chuàng)建新的goroutine的時候需要花費(fèi)一些時間仙蛉,而此時main函數(shù)所在的goroutine
是繼續(xù)執(zhí)行的笋敞。
啟動多個goroutine
在Go語言中實(shí)現(xiàn)并發(fā)就是這樣簡單,我們還可以啟動多個goroutine
荠瘪。讓我們再來一個例子: (這里使用了sync.WaitGroup
來實(shí)現(xiàn)goroutine的同步)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine結(jié)束就登記-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 啟動一個goroutine就登記+1
go hello(i)
}
wg.Wait() // 等待所有登記的goroutine都結(jié)束
}
多次執(zhí)行上面的代碼夯巷,會發(fā)現(xiàn)每次打印的數(shù)字的順序都不一致赛惩。這是因?yàn)?0個goroutine
是并發(fā)執(zhí)行的,而goroutine
的調(diào)度是隨機(jī)的趁餐。
goroutine與線程
可增長的棧
OS線程(操作系統(tǒng)線程)一般都有固定的棧內(nèi)存(通常為2MB),一個goroutine
的棧在其生命周期開始時只有很小的棧(典型情況下2KB)喷兼,goroutine
的棧不是固定的,他可以按需增大和縮小后雷,goroutine
的棧大小限制可以達(dá)到1GB季惯,雖然極少會用到這么大。所以在Go語言中一次創(chuàng)建十萬左右的goroutine
也是可以的臀突。
goroutine調(diào)度
GPM
是Go語言運(yùn)行時(runtime)層面的實(shí)現(xiàn)勉抓,是go語言自己實(shí)現(xiàn)的一套調(diào)度系統(tǒng)。區(qū)別于操作系統(tǒng)調(diào)度OS線程候学。
-
G
很好理解藕筋,就是個goroutine的,里面除了存放本goroutine信息外 還有與所在P的綁定等信息梳码。 -
P
管理著一組goroutine隊(duì)列隐圾,P里面會存儲當(dāng)前goroutine運(yùn)行的上下文環(huán)境(函數(shù)指針,堆棧地址及地址邊界)掰茶,P會對自己管理的goroutine隊(duì)列做一些調(diào)度(比如把占用CPU時間較長的goroutine暫停翎承、運(yùn)行后續(xù)的goroutine等等)當(dāng)自己的隊(duì)列消費(fèi)完了就去全局隊(duì)列里取,如果全局隊(duì)列里也消費(fèi)完了會去其他P的隊(duì)列里搶任務(wù)符匾。 -
M(machine)
是Go運(yùn)行時(runtime)對操作系統(tǒng)內(nèi)核線程的虛擬, M與內(nèi)核線程一般是一一映射的關(guān)系瘩例, 一個groutine最終是要放到M上執(zhí)行的啊胶;
P與M一般也是一一對應(yīng)的。他們關(guān)系是: P管理著一組G掛載在M上運(yùn)行垛贤。當(dāng)一個G長久阻塞在一個M上時焰坪,runtime會新建一個M,阻塞G所在的P會把其他的G 掛載在新建的M上聘惦。當(dāng)舊的G阻塞完成或者認(rèn)為其已經(jīng)死掉時 回收舊的M某饰。
P的個數(shù)是通過runtime.GOMAXPROCS
設(shè)定(最大256),Go1.5版本之后默認(rèn)為物理線程數(shù)善绎。 在并發(fā)量大的時候會增加一些P和M黔漂,但不會太多,切換太頻繁的話得不償失禀酱。
單從線程調(diào)度講炬守,Go語言相比起其他語言的優(yōu)勢在于OS線程是由OS內(nèi)核來調(diào)度的,goroutine
則是由Go運(yùn)行時(runtime)自己的調(diào)度器調(diào)度的剂跟,這個調(diào)度器使用一個稱為m:n調(diào)度的技術(shù)(復(fù)用/調(diào)度m個goroutine到n個OS線程)减途。 其一大特點(diǎn)是goroutine的調(diào)度是在用戶態(tài)下完成的酣藻, 不涉及內(nèi)核態(tài)與用戶態(tài)之間的頻繁切換,包括內(nèi)存的分配與釋放鳍置,都是在用戶態(tài)維護(hù)著一塊大的內(nèi)存池辽剧, 不直接調(diào)用系統(tǒng)的malloc函數(shù)(除非內(nèi)存池需要改變),成本比調(diào)度OS線程低很多税产。 另一方面充分利用了多核的硬件資源怕轿,近似的把若干goroutine均分在物理線程上, 再加上本身goroutine的超輕量砖第,以上種種保證了go調(diào)度方面的性能撤卢。
GOMAXPROCS
Go運(yùn)行時的調(diào)度器使用GOMAXPROCS
參數(shù)來確定需要使用多少個OS線程來同時執(zhí)行Go代碼。默認(rèn)值是機(jī)器上的CPU核心數(shù)梧兼。例如在一個8核心的機(jī)器上放吩,調(diào)度器會把Go代碼同時調(diào)度到8個OS線程上(GOMAXPROCS是m:n調(diào)度中的n)。
Go語言中可以通過runtime.GOMAXPROCS()
函數(shù)設(shè)置當(dāng)前程序并發(fā)時占用的CPU邏輯核心數(shù)羽杰。
Go1.5版本之前渡紫,默認(rèn)使用的是單核心執(zhí)行。Go1.5版本之后考赛,默認(rèn)使用全部的CPU邏輯核心數(shù)惕澎。
我們可以通過將任務(wù)分配到不同的CPU邏輯核心上實(shí)現(xiàn)并行的效果,這里舉個例子:
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(1)
go a()
go b()
time.Sleep(time.Second)
}
兩個任務(wù)只有一個邏輯核心颜骤,此時是做完一個任務(wù)再做另一個任務(wù)唧喉。 將邏輯核心數(shù)設(shè)為2,此時兩個任務(wù)并行執(zhí)行忍抽,代碼如下八孝。
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(time.Second)
}
Go語言中的操作系統(tǒng)線程和goroutine的關(guān)系:
- 一個操作系統(tǒng)線程對應(yīng)用戶態(tài)多個goroutine。
- go程序可以同時使用多個操作系統(tǒng)線程鸠项。
- goroutine和OS線程是多對多的關(guān)系干跛,即m:n。
channel
單純地將函數(shù)并發(fā)執(zhí)行是沒有意義的祟绊。函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義楼入。
雖然可以使用共享內(nèi)存進(jìn)行數(shù)據(jù)交換,但是共享內(nèi)存在不同的goroutine
中容易發(fā)生競態(tài)問題牧抽。為了保證數(shù)據(jù)交換的正確性嘉熊,必須使用互斥量對內(nèi)存進(jìn)行加鎖,這種做法勢必造成性能問題扬舒。
Go語言的并發(fā)模型是CSP(Communicating Sequential Processes)
记舆,提倡通過通信共享內(nèi)存而不是通過共享內(nèi)存而實(shí)現(xiàn)通信。
如果說goroutine
是Go程序并發(fā)的執(zhí)行體呼巴,channel
就是它們之間的連接泽腮。channel
是可以讓一個goroutine
發(fā)送特定值到另一個goroutine
的通信機(jī)制御蒲。
Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊(duì)列诊赊,總是遵循先入先出(First In First Out)的規(guī)則厚满,保證收發(fā)數(shù)據(jù)的順序。每一個通道都是一個具體類型的導(dǎo)管碧磅,也就是聲明channel的時候需要為其指定元素類型碘箍。
channel類型
channel
是一種類型,一種引用類型鲸郊。聲明通道類型的格式如下:
var 變量 chan 元素類型
舉幾個例子:
var ch1 chan int // 聲明一個傳遞整型的通道
var ch2 chan bool // 聲明一個傳遞布爾型的通道
var ch3 chan []int // 聲明一個傳遞int切片的通道
創(chuàng)建channel
通道是引用類型丰榴,通道類型的空值是nil
。
var ch chan int
fmt.Println(ch) // <nil>
聲明的通道后需要使用make
函數(shù)初始化之后才能使用秆撮。
創(chuàng)建channel的格式如下:
make(chan 元素類型, [緩沖大小])
channel的緩沖大小是可選的四濒。
舉幾個例子:
ch4 := make(chan int)
ch5 := make(chan bool)
ch6 := make(chan []int)
channel操作
通道有發(fā)送(send)、接收(receive)和關(guān)閉(close)三種操作职辨。
發(fā)送和接收都使用<-
符號盗蟆。
現(xiàn)在我們先使用以下語句定義一個通道:
ch := make(chan int)
發(fā)送
將一個值發(fā)送到通道中。
ch <- 10 // 把10發(fā)送到ch中
接收
從一個通道中接收值舒裤。
x := <- ch // 從ch中接收值并賦值給變量x
<-ch // 從ch中接收值喳资,忽略結(jié)果
關(guān)閉
我們通過調(diào)用內(nèi)置的close
函數(shù)來關(guān)閉通道腾供。
close(ch)
關(guān)于關(guān)閉通道需要注意的事情是,只有在通知接收方goroutine所有的數(shù)據(jù)都發(fā)送完畢的時候才需要關(guān)閉通道伴鳖。通道是可以被垃圾回收機(jī)制回收的节值,它和關(guān)閉文件是不一樣的峻汉,在結(jié)束操作之后關(guān)閉文件是必須要做的业簿,但關(guān)閉通道不是必須的赡盘。
關(guān)閉后的通道有以下特點(diǎn):
- 對一個關(guān)閉的通道再發(fā)送值就會導(dǎo)致panic赞厕。
- 對一個關(guān)閉的通道進(jìn)行接收會一直獲取值直到通道為空。
- 對一個關(guān)閉的并且沒有值的通道執(zhí)行接收操作會得到對應(yīng)類型的零值抄肖。
- 關(guān)閉一個已經(jīng)關(guān)閉的通道會導(dǎo)致panic腿椎。
無緩沖的通道
無緩沖的通道又稱為阻塞的通道。我們來看一下下面的代碼:
func main() {
ch := make(chan int)
ch <- 10
fmt.Println("發(fā)送成功")
}
上面這段代碼能夠通過編譯涩拙,但是執(zhí)行的時候會出現(xiàn)以下錯誤:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/github.com/Q1mi/studygo/day06/channel02/main.go:8 +0x54
為什么會出現(xiàn)deadlock
錯誤呢?
因?yàn)槲覀兪褂?code>ch := make(chan int)創(chuàng)建的是無緩沖的通道耸采,無緩沖的通道只有在有人接收值的時候才能發(fā)送值兴泥。就像你住的小區(qū)沒有快遞柜和代收點(diǎn),快遞員給你打電話必須要把這個物品送到你的手中虾宇,簡單來說就是無緩沖的通道必須有接收才能發(fā)送搓彻。
上面的代碼會阻塞在ch <- 10
這一行代碼形成死鎖,那如何解決這個問題呢嘱朽?
一種方法是啟用一個goroutine
去接收值旭贬,例如:
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch) // 啟用goroutine從通道接收值
ch <- 10
fmt.Println("發(fā)送成功")
}
無緩沖通道上的發(fā)送操作會阻塞,直到另一個goroutine
在該通道上執(zhí)行接收操作搪泳,這時值才能發(fā)送成功稀轨,兩個goroutine
將繼續(xù)執(zhí)行。相反岸军,如果接收操作先執(zhí)行奋刽,接收方的goroutine將阻塞,直到另一個goroutine
在該通道上發(fā)送一個值艰赞。
使用無緩沖通道進(jìn)行通信將導(dǎo)致發(fā)送和接收的goroutine
同步化佣谐。因此,無緩沖通道也被稱為同步通道
方妖。
有緩沖的通道
解決上面問題的方法還有一種就是使用有緩沖區(qū)的通道狭魂。我們可以在使用make函數(shù)初始化通道的時候?yàn)槠渲付ㄍǖ赖娜萘浚纾?/p>
func main() {
ch := make(chan int, 1) // 創(chuàng)建一個容量為1的有緩沖區(qū)通道
ch <- 10
fmt.Println("發(fā)送成功")
}
只要通道的容量大于零党觅,那么該通道就是有緩沖的通道雌澄,通道的容量表示通道中能存放元素的數(shù)量。就像你小區(qū)的快遞柜只有那么個多格子仔役,格子滿了就裝不下了掷伙,就阻塞了是己,等到別人取走一個快遞員就能往里面放一個又兵。
我們可以使用內(nèi)置的len
函數(shù)獲取通道內(nèi)元素的數(shù)量,使用cap
函數(shù)獲取通道的容量,雖然我們很少會這么做沛厨。
for range從通道循環(huán)取值
當(dāng)向通道中發(fā)送完數(shù)據(jù)時宙地,我們可以通過close
函數(shù)來關(guān)閉通道。
當(dāng)通道被關(guān)閉時逆皮,再往該通道發(fā)送值會引發(fā)panic
宅粥,從該通道取值的操作會先取完通道中的值,再然后取到的值一直都是對應(yīng)類型的零值电谣。那如何判斷一個通道是否被關(guān)閉了呢秽梅?
我們來看下面這個例子:
// channel 練習(xí)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
// 開啟goroutine將0~100的數(shù)發(fā)送到ch1中
go func() {
for i := 0; i < 100; i++ {
ch1 <- i
}
close(ch1)
}()
// 開啟goroutine從ch1中接收值,并將該值的平方發(fā)送到ch2中
go func() {
for {
i, ok := <-ch1 // 通道關(guān)閉后再取值ok=false
if !ok {
break
}
ch2 <- i * i
}
close(ch2)
}()
// 在主goroutine中從ch2中接收值打印
for i := range ch2 { // 通道關(guān)閉后會退出for range循環(huán)
fmt.Println(i)
}
}
從上面的例子中我們看到有兩種方式在接收值的時候判斷該通道是否被關(guān)閉剿牺,不過我們通常使用的是for range
的方式企垦。使用for range
遍歷通道,當(dāng)通道被關(guān)閉的時候就會退出for range
晒来。
單向通道
有的時候我們會將通道作為參數(shù)在多個任務(wù)函數(shù)間傳遞钞诡,很多時候我們在不同的任務(wù)函數(shù)中使用通道都會對其進(jìn)行限制,比如限制通道在函數(shù)中只能發(fā)送或只能接收湃崩。
Go語言中提供了單向通道來處理這種情況荧降。例如,我們把上面的例子改造如下:
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
其中攒读,
-
chan<- int
是一個只寫單向通道(只能對其寫入int類型值)朵诫,可以對其執(zhí)行發(fā)送操作但是不能執(zhí)行接收操作; -
<-chan int
是一個只讀單向通道(只能從其讀取int類型值)整陌,可以對其執(zhí)行接收操作但是不能執(zhí)行發(fā)送操作拗窃。
在函數(shù)傳參及任何賦值操作中可以將雙向通道轉(zhuǎn)換為單向通道,但反過來是不可以的泌辫。
通道總結(jié)
channel
常見的異乘婵洌總結(jié),如下圖:
關(guān)閉已經(jīng)關(guān)閉的channel
也會引發(fā)panic
宾毒。
worker pool(goroutine池)
在工作中我們通常會使用可以指定啟動的goroutine數(shù)量–worker pool
模式,控制goroutine
的數(shù)量殿遂,防止goroutine
泄漏和暴漲诈铛。
一個簡易的work pool
示例代碼如下:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Printf("worker:%d start job:%d\n", id, j)
time.Sleep(time.Second)
fmt.Printf("worker:%d end job:%d\n", id, j)
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 開啟3個goroutine
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 5個任務(wù)
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// 輸出結(jié)果
for a := 1; a <= 5; a++ {
<-results
}
}
select多路復(fù)用
在某些場景下我們需要同時從多個通道接收數(shù)據(jù)。通道在接收數(shù)據(jù)時墨礁,如果沒有數(shù)據(jù)可以接收將會發(fā)生阻塞幢竹。你也許會寫出如下代碼使用遍歷的方式來實(shí)現(xiàn):
for{
// 嘗試從ch1接收值
data, ok := <-ch1
// 嘗試從ch2接收值
data, ok := <-ch2
…
}
這種方式雖然可以實(shí)現(xiàn)從多個通道接收值的需求,但是運(yùn)行性能會差很多恩静。為了應(yīng)對這種場景焕毫,Go內(nèi)置了select
關(guān)鍵字蹲坷,可以同時響應(yīng)多個通道的操作。
select
的使用類似于switch語句邑飒,它有一系列case分支和一個默認(rèn)的分支循签。每個case會對應(yīng)一個通道的通信(接收或發(fā)送)過程。select
會一直等待疙咸,直到某個case
的通信操作完成時县匠,就會執(zhí)行case
分支對應(yīng)的語句。具體格式如下:
select{
case <-ch1:
...
case data := <-ch2:
...
case ch3<-data:
...
default:
默認(rèn)操作
}
舉個小例子來演示下select
的使用:
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
使用select
語句能提高代碼的可讀性撒轮。
- 可處理一個或多個channel的發(fā)送/接收操作乞旦。
- 如果多個
case
同時滿足,select
會隨機(jī)選擇一個题山。 - 對于沒有
case
的select{}
會一直等待杆查,可用于阻塞main函數(shù)。
并發(fā)安全和鎖
有時候在Go代碼中可能會存在多個goroutine
同時操作一個資源(臨界區(qū))臀蛛,這種情況會發(fā)生競態(tài)問題
(數(shù)據(jù)競態(tài))亲桦。類比現(xiàn)實(shí)生活中的例子有十字路口被各個方向的的汽車競爭;還有火車上的衛(wèi)生間被車廂里的人競爭浊仆。
舉個例子:
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 5000; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上面的代碼中我們開啟了兩個goroutine
去累加變量x的值客峭,這兩個goroutine
在訪問和修改x
變量的時候就會存在數(shù)據(jù)競爭,導(dǎo)致最后的結(jié)果與期待的不符抡柿。
互斥鎖
互斥鎖是一種常用的控制共享資源訪問的方法舔琅,它能夠保證同時只有一個goroutine
可以訪問共享資源。Go語言中使用sync
包的Mutex
類型來實(shí)現(xiàn)互斥鎖洲劣。 使用互斥鎖來修復(fù)上面代碼的問題:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 5000; i++ {
lock.Lock() // 加鎖
x = x + 1
lock.Unlock() // 解鎖
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用互斥鎖能夠保證同一時間有且只有一個goroutine
進(jìn)入臨界區(qū)备蚓,其他的goroutine
則在等待鎖;當(dāng)互斥鎖釋放后囱稽,等待的goroutine
才可以獲取鎖進(jìn)入臨界區(qū)郊尝,多個goroutine
同時等待一個鎖時,喚醒的策略是隨機(jī)的战惊。
讀寫互斥鎖
互斥鎖是完全互斥的流昏,但是有很多實(shí)際的場景下是讀多寫少的,當(dāng)我們并發(fā)的去讀取一個資源不涉及資源修改的時候是沒有必要加鎖的吞获,這種場景下使用讀寫鎖是更好的一種選擇况凉。讀寫鎖在Go語言中使用sync
包中的RWMutex
類型。
讀寫鎖分為兩種:讀鎖和寫鎖各拷。當(dāng)一個goroutine獲取讀鎖之后刁绒,其他的goroutine
如果是獲取讀鎖會繼續(xù)獲得鎖,如果是獲取寫鎖就會等待烤黍;當(dāng)一個goroutine
獲取寫鎖之后知市,其他的goroutine
無論是獲取讀鎖還是寫鎖都會等待粮坞。
讀寫鎖示例:
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // 加互斥鎖
rwlock.Lock() // 加寫鎖
x = x + 1
time.Sleep(10 * time.Millisecond) // 假設(shè)讀操作耗時10毫秒
rwlock.Unlock() // 解寫鎖
// lock.Unlock() // 解互斥鎖
wg.Done()
}
func read() {
// lock.Lock() // 加互斥鎖
rwlock.RLock() // 加讀鎖
time.Sleep(time.Millisecond) // 假設(shè)讀操作耗時1毫秒
rwlock.RUnlock() // 解讀鎖
// lock.Unlock() // 解互斥鎖
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
需要注意的是讀寫鎖非常適合讀多寫少的場景,如果讀和寫的操作差別不大初狰,讀寫鎖的優(yōu)勢就發(fā)揮不出來。
sync.WaitGroup
在代碼中生硬的使用time.Sleep
肯定是不合適的互例,Go語言中可以使用sync.WaitGroup
來實(shí)現(xiàn)并發(fā)任務(wù)的同步奢入。 sync.WaitGroup
有以下幾個方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 計數(shù)器+delta |
(wg *WaitGroup) Done() | 計數(shù)器-1 |
(wg *WaitGroup) Wait() | 阻塞直到計數(shù)器變?yōu)? |
sync.WaitGroup
內(nèi)部維護(hù)著一個計數(shù)器,計數(shù)器的值可以增加和減少媳叨。例如當(dāng)我們啟動了N 個并發(fā)任務(wù)時腥光,就將計數(shù)器值增加N。每個任務(wù)完成時通過調(diào)用Done()方法將計數(shù)器減1糊秆。通過調(diào)用Wait()來等待并發(fā)任務(wù)執(zhí)行完武福,當(dāng)計數(shù)器值為0時,表示所有并發(fā)任務(wù)已經(jīng)完成痘番。
我們利用sync.WaitGroup
將上面的代碼優(yōu)化一下:
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("Hello Goroutine!")
}
func main() {
wg.Add(1)
go hello() // 啟動另外一個goroutine去執(zhí)行hello函數(shù)
fmt.Println("main goroutine done!")
wg.Wait()
}
需要注意sync.WaitGroup
是一個結(jié)構(gòu)體捉片,傳遞的時候要傳遞指針。
sync.Once
說在前面的話:這是一個進(jìn)階知識點(diǎn)汞舱。
在編程的很多場景下我們需要確保某些操作在高并發(fā)的場景下只執(zhí)行一次伍纫,例如只加載一次配置文件、只關(guān)閉一次通道等昂芜。
Go語言中的sync
包中提供了一個針對只執(zhí)行一次場景的解決方案–sync.Once
莹规。
sync.Once
只有一個Do
方法,其簽名如下:
func (o *Once) Do(f func()) {}
備注:如果要執(zhí)行的函數(shù)f
需要傳遞參數(shù)就需要搭配閉包來使用泌神。
加載配置文件示例
延遲一個開銷很大的初始化操作到真正用到它的時候再執(zhí)行是一個很好的實(shí)踐良漱。因?yàn)轭A(yù)先初始化一個變量(比如在init函數(shù)中完成初始化)會增加程序的啟動耗時,而且有可能實(shí)際執(zhí)行過程中這個變量沒有用上欢际,那么這個初始化操作就不是必須要做的母市。我們來看一個例子:
var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 被多個goroutine調(diào)用時不是并發(fā)安全的
func Icon(name string) image.Image {
if icons == nil {
loadIcons()
}
return icons[name]
}
多個goroutine
并發(fā)調(diào)用Icon函數(shù)時不是并發(fā)安全的,現(xiàn)代的編譯器和CPU可能會在保證每個goroutine
都滿足串行一致的基礎(chǔ)上自由地重排訪問內(nèi)存的順序损趋。loadIcons函數(shù)可能會被重排為以下結(jié)果:
func loadIcons() {
icons = make(map[string]image.Image)
icons["left"] = loadIcon("left.png")
icons["up"] = loadIcon("up.png")
icons["right"] = loadIcon("right.png")
icons["down"] = loadIcon("down.png")
}
在這種情況下就會出現(xiàn)即使判斷了icons
不是nil也不意味著變量初始化完成了窒篱。考慮到這種情況舶沿,我們能想到的辦法就是添加互斥鎖墙杯,保證初始化icons
的時候不會被其他的goroutine
操作,但是這樣做又會引發(fā)性能問題括荡。
使用sync.Once
改造的示例代碼如下:
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcons() {
icons = map[string]image.Image{
"left": loadIcon("left.png"),
"up": loadIcon("up.png"),
"right": loadIcon("right.png"),
"down": loadIcon("down.png"),
}
}
// Icon 是并發(fā)安全的
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
并發(fā)安全的單例模式
下面是借助sync.Once
實(shí)現(xiàn)的并發(fā)安全的單例模式:
package singleton
import (
"sync"
)
type singleton struct {}
var instance *singleton
var once sync.Once
func GetInstance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync.Once
其實(shí)內(nèi)部包含一個互斥鎖和一個布爾值高镐,互斥鎖保證布爾值和數(shù)據(jù)的安全,而布爾值用來記錄初始化是否完成畸冲。這樣設(shè)計就能保證初始化操作的時候是并發(fā)安全的并且初始化操作也不會被執(zhí)行多次嫉髓。
sync.Map
Go語言中內(nèi)置的map不是并發(fā)安全的观腊。請看下面的示例:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上面的代碼開啟少量幾個goroutine
的時候可能沒什么問題,當(dāng)并發(fā)多了之后執(zhí)行上面的代碼就會報fatal error: concurrent map writes
錯誤算行。
像這種場景下就需要為map加鎖來保證并發(fā)的安全性了梧油,Go語言的sync
包中提供了一個開箱即用的并發(fā)安全版map–sync.Map
。開箱即用表示不用像內(nèi)置的map一樣使用make函數(shù)初始化就能直接使用州邢。同時sync.Map
內(nèi)置了諸如Store
儡陨、Load
、LoadOrStore
量淌、Delete
骗村、Range
等操作方法。
var m = sync.Map{}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
m.Store(key, n)
value, _ := m.Load(key)
fmt.Printf("k=:%v,v:=%v\n", key, value)
wg.Done()
}(i)
}
wg.Wait()
}
原子操作
在上面的代碼中的我們通過鎖操作來實(shí)現(xiàn)同步呀枢。而鎖機(jī)制的底層是基于原子操作的胚股,其一般直接通過CPU指令實(shí)現(xiàn)。Go語言中原子操作由內(nèi)置的標(biāo)準(zhǔn)庫sync/atomic
提供裙秋。
atomic包
方法 | 解釋 |
---|---|
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer) |
讀取操作 |
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintptr, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer) |
寫入操作 |
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr) |
修改操作 |
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer) |
交換操作 |
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool) |
比較并交換操作 |
示例
我們填寫一個示例來比較下互斥鎖和原子操作的性能琅拌。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Counter interface {
Inc()
Load() int64
}
// 普通版
type CommonCounter struct {
counter int64
}
func (c CommonCounter) Inc() {
c.counter++
}
func (c CommonCounter) Load() int64 {
return c.counter
}
// 互斥鎖版
type MutexCounter struct {
counter int64
lock sync.Mutex
}
func (m *MutexCounter) Inc() {
m.lock.Lock()
defer m.lock.Unlock()
m.counter++
}
func (m *MutexCounter) Load() int64 {
m.lock.Lock()
defer m.lock.Unlock()
return m.counter
}
// 原子操作版
type AtomicCounter struct {
counter int64
}
func (a *AtomicCounter) Inc() {
atomic.AddInt64(&a.counter, 1)
}
func (a *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&a.counter)
}
func test(c Counter) {
var wg sync.WaitGroup
start := time.Now()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
c.Inc()
wg.Done()
}()
}
wg.Wait()
end := time.Now()
fmt.Println(c.Load(), end.Sub(start))
}
func main() {
c1 := CommonCounter{} // 非并發(fā)安全
test(c1)
c2 := MutexCounter{} // 使用互斥鎖實(shí)現(xiàn)并發(fā)安全
test(&c2)
c3 := AtomicCounter{} // 并發(fā)安全且比互斥鎖效率更高
test(&c3)
}
atomic
包提供了底層的原子級內(nèi)存操作,對于同步算法的實(shí)現(xiàn)很有用摘刑。這些函數(shù)必須謹(jǐn)慎地保證正確使用财忽。除了某些特殊的底層應(yīng)用,使用通道或者sync包的函數(shù)/類型實(shí)現(xiàn)同步更好泣侮。