什么是協(xié)程?
進程和線程
一個應(yīng)用程序時運行在操作系統(tǒng)上的一個進程阱表。進程是一個運行在自己獨立內(nèi)存空間的獨立執(zhí)行體殿如,是操作系統(tǒng)進行資源分配的最小單位。一個進程則有一個或多個線程組成最爬,這些線程是共享進程內(nèi)存地址空間的執(zhí)行體涉馁,是操作系統(tǒng)進行任務(wù)調(diào)度的最小單位。而使用多線程進行工作時爱致,由于共享父進程的內(nèi)存空間等資源烤送,訪問同一個數(shù)據(jù)需要對其進行加鎖,保證同一時間只有一個線程操作一個數(shù)據(jù)糠悯。這樣不僅會提高編碼的復雜度帮坚,還會有多個線程搶占鎖、線程切換帶來的額外開銷互艾。
協(xié)程
在Go中试和,應(yīng)用程序并發(fā)處理的部分被稱作goroutines(協(xié)程),它是一種更輕量級的線程纫普,和操作系統(tǒng)的線程之間并無一對一的關(guān)系阅悍。協(xié)程是根據(jù)一個或多個線程的可用性,映射(多路復用局嘁,執(zhí)行于)在它們之上的溉箕;協(xié)程調(diào)度器負責在Go運行時調(diào)度進行協(xié)程的工作。
通道(Channel)
協(xié)程工作在相同的地址空間中悦昵,所以共享內(nèi)存的方式是同步的,可以使用互斥鎖來實現(xiàn)晌畅,但是Go中更好的方案是使用Channel來同步協(xié)程但指。
通道類型(Chan)就像一個可用于發(fā)送類型化數(shù)據(jù)的管道,由其負責協(xié)程之間的通信,在任何時間棋凳,一個通道數(shù)據(jù)被設(shè)計為只有一個協(xié)程可以對其訪問拦坠,所以不會發(fā)生數(shù)據(jù)競爭。
通道阻塞
默認情況下剩岳,Go創(chuàng)建的通道是同步且無緩沖的:在有接受者接受數(shù)據(jù)之前贞滨,發(fā)送不會結(jié)束,發(fā)送者是直接將數(shù)據(jù)交給接受者的拍棕,所以這種通道的發(fā)送或接受操作在對方準備好之前都是阻塞的晓铆。
例如以下代碼,執(zhí)行會報錯死鎖:
示例1.1:
func main() {
ch := make(chan int)
ch <- 1
<-ch
}
因為對ch的讀寫都在main函數(shù)的主協(xié)程中绰播,執(zhí)行到ch <-1
時由于接收ch的數(shù)據(jù)還沒準備好骄噪,發(fā)送數(shù)據(jù)將被阻塞,程序無法繼續(xù)執(zhí)行蠢箩。必須使用關(guān)鍵字go
來啟動一個新的協(xié)程發(fā)送數(shù)據(jù)链蕊,另一個協(xié)程接收數(shù)據(jù),如下所示:
示例1.2
func main() {
ch := make(chan int)
go func() {
ch <- 1
}()
fmt.Println(<- ch)
}
使用make
創(chuàng)建一個通道的時候可以傳入第二個參數(shù)指定通道緩沖區(qū)大小谬泌,這種方式在通道寫滿之前滔韵,發(fā)送數(shù)據(jù)不會被阻塞,通道不為空時接收操作不會被阻塞掌实,如果將示例1.1中的創(chuàng)建通道傳第二個參數(shù)為1奏属,就可以正常運行不會死鎖了,代碼如下:
示例1.3
func main() {
ch := make(chan int, 1)
ch <- 1
fmt.Println(<- ch)
}
Go協(xié)程調(diào)度原理
調(diào)度器架構(gòu)
Go的調(diào)度器從最開始的單線程經(jīng)過不斷的改進潮峦、優(yōu)化囱皿,發(fā)展到現(xiàn)在的GMP模型,在GMP模型中有三個重要的結(jié)構(gòu):
- G(Goroutine):go協(xié)程忱嘹,一個可執(zhí)行單元嘱腥,調(diào)度器作用就是對所有G的切換
- M(Thread):操作系統(tǒng)上的線程,G運行與M上拘悦,一個G可能由多個不同的M運行齿兔,一個M可以運行多個G
- P(Processor):處理器,他包含了運行G的資源础米,如果線程M想運行G分苇,必須先獲取P,P還包含了可運行的G隊列屁桑。一個M一個時刻只擁有一個P医寿,M和P的數(shù)量是1:1的。
上圖中各個模塊的作用如下:
- 全局隊列:存放等待運行G
- P的本地隊列:和全局隊列類似蘑斧,存放的也是等待運行的G靖秩,存放數(shù)量上限256個须眷。新建G時,G優(yōu)先加入到P的本地隊列沟突,如果隊列滿了花颗,則會把本地隊列中的一半G移動到全局隊列
- P列表:所有的P都在程序啟動時創(chuàng)建,保存在數(shù)組中惠拭,最多有
GOMAXPROCS
個扩劝,可通過runtime.GOMAXPROCS(N)
修改,N表示設(shè)置的個數(shù)
M是Goroutine調(diào)度器和操作系統(tǒng)調(diào)度器的橋梁职辅,每個M代表一個內(nèi)核線程棒呛,操作系統(tǒng)調(diào)度器負責把內(nèi)核線程分配到CPU的核心上執(zhí)行。
調(diào)度策略
復用線程
調(diào)度器核心思想是盡可能避免頻繁的創(chuàng)建罐农、銷毀線程条霜,對線程進行復用以提高效率。
1. work stealing機制(竊取式)
當本線程無G可運行時涵亏,嘗試從其他線程綁定的P竊取G宰睡,而不是直接銷毀線程。
2. hand off機制
當本線程M因為G進行的系統(tǒng)調(diào)用阻塞是气筋,線程釋放綁定的P拆内,把P轉(zhuǎn)移給其他空閑的M'執(zhí)行。
利用多核CPU并行
GOMAXPROCS
設(shè)置P的數(shù)量宠默,最多有GOMAXPROCS
個線程分布在多個CPU核心上運行麸恍。
搶占
一個goroutine最多占用CPU10ms,防止其他goroutine等待太久得不到執(zhí)行被“餓死”搀矫。
全局G隊列
全局G隊列是有互斥鎖保護的抹沪,訪問需要競爭鎖,新的調(diào)度器將其功能弱化了瓤球,當M執(zhí)行work stealing從其他P竊取不到G時融欧,才會去全局G隊列獲取G。
Go并發(fā)編程實例
兩個協(xié)程交替打印1-100
用兩個協(xié)程順序打印出1-100卦羡,要求一個協(xié)程打印1噪馏、3、5绿饵、7...奇數(shù)欠肾,另一個協(xié)程打印2、4拟赊、6刺桃、8...偶數(shù),輸出必須是順序的要门。
示例代碼:
func main() {
// ch用來同步兩個協(xié)程交替執(zhí)行
ch := make(chan int)
// ch_end用來阻塞主程序虏肾,讓兩個協(xié)程可以執(zhí)行完
ch_end := make(chan int)
go func() {
for i := 1; i <= 100; i++ {
ch <- 1
if i % 2 == 0 {
fmt.Println(i)
}
}
ch_end <- 1
}()
go func() {
for i := 1; i <= 100; i++ {
<-ch
if i % 2 != 0 {
fmt.Println(i)
}
}
}()
<-ch_end
}
并行素數(shù)篩選
有一個協(xié)程不斷生2~n的自然數(shù)廓啊,對每個素數(shù)起一個協(xié)程欢搜,用來篩選素數(shù)
示例代碼:
func generate(ch chan int, n int) {
for i := 2; i <= n ; i++ {
fmt.Println("generate:", i)
ch <- i
}
close(ch)
}
func filter(in, out chan int, prime int) {
for i := range in {
fmt.Printf("filter(%d): %d\n", prime, i)
if i % prime != 0 {
out <- i
}
}
close(out)
}
func main() {
res := []int{}
ch := make(chan int)
go generate(ch, 112)
for {
if prime, ok := <- ch; ok {
res = append(res, prime)
ch_out := make(chan int)
go filter(ch, ch_out, prime)
// 前一個素數(shù)過濾協(xié)程的輸出通道是后一個素數(shù)過濾通道的輸入通道
ch = ch_out
} else {
break
}
}
fmt.Println("main:", res)
}
實現(xiàn)超時機制
當設(shè)置的超時時間到達后如果work還不可執(zhí)行就終止等待封豪,返回超時
示例代碼
func TimeOut(timeout time.Duration) {
ch_to := make(chan bool, 1)
go func() {
time.Sleep(timeout)
ch_to <- true
}()
ch_do := make(chan int, 1)
go func() {
time.Sleep(3e9)
ch_do <- 1
}()
select {
case i := <- ch_do:
fmt.Println("do something, id:", i)
case <-ch_to:
fmt.Println("timeout")
break
}
}
實現(xiàn)迭代器
實現(xiàn)一個惰性迭代器,每次調(diào)用返回一個列表元素炒瘟,直到所有的元素被訪問完返回nil
示例代碼:
// 迭代器
func iterator(iterable []interface{}) chan interface{}{
yield := make(chan interface{})
go func() {
for i := 0; i < len(iterable); i++ {
yield <- iterable[i]
}
close(yield)
}()
return yield
}
// 獲取下一個元素
func next(iter chan interface{}) interface{} {
for v := range iter {
return v
}
return nil
}
func main() {
nums := []interface{}{1, 2, 3, 4, 5}
iter := iterator(nums)
for v := next(iter); v != nil; v = next(iter) {
fmt.Println(v)
}
}
參考
【1】《The Way to Go》:并發(fā)吹埠、并行和協(xié)程
【2】Golang的協(xié)程調(diào)度器原理及GMP設(shè)計思想?