Golang 學(xué)習(xí)筆記九 并發(fā)編程 協(xié)程 通道

參考
《快學(xué) Go 語言》第 11 課 —— 千軍萬馬跑協(xié)程
《快學(xué) Go 語言》第 12 課 —— 通道
let's GoLang(三): Goroutine&Channel

知識(shí)回顧
并發(fā)編程基礎(chǔ)知識(shí)一 并發(fā)和并行
并發(fā)編程基礎(chǔ)知識(shí)二 線程和進(jìn)程
并發(fā)編程基礎(chǔ)知識(shí)三 異步严肪,非阻塞和 IO 復(fù)用

協(xié)程和通道是 Go 語言作為并發(fā)編程語言最為重要的特色之一近哟,初學(xué)者可以完全將協(xié)程理解為線程,但是用起來比線程更加簡(jiǎn)單,占用的資源也更少袍患。通常在一個(gè)進(jìn)程里啟動(dòng)上萬個(gè)線程就已經(jīng)不堪重負(fù)别渔,但是 Go 語言允許你啟動(dòng)百萬協(xié)程也可以輕松應(yīng)付们衙。如果把協(xié)程比喻成小島剃斧,那通道就是島嶼之間的交流橋梁,數(shù)據(jù)搭乘通道從一個(gè)協(xié)程流轉(zhuǎn)到另一個(gè)協(xié)程檐迟。通道是并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)补胚,它類似于內(nèi)存消息隊(duì)列,允許很多的協(xié)程并發(fā)對(duì)通道進(jìn)行讀寫追迟。


image.png

Go 語言里面的協(xié)程稱之為 goroutine溶其,通道稱之為 channel。

一敦间、協(xié)程

1.協(xié)程的啟動(dòng)
Go 語言里創(chuàng)建一個(gè)協(xié)程非常簡(jiǎn)單瓶逃,使用 go 關(guān)鍵詞加上一個(gè)函數(shù)調(diào)用就可以了。Go 語言會(huì)啟動(dòng)一個(gè)新的協(xié)程廓块,函數(shù)調(diào)用將成為這個(gè)協(xié)程的入口厢绝。

package main

import "fmt"
import "time"

func main() {
    fmt.Println("run in main goroutine")
    go func() {
        fmt.Println("run in child goroutine")
        go func() {
            fmt.Println("run in grand child goroutine")
            go func() {
                fmt.Println("run in grand grand child goroutine")
            }()
        }()
    }()
    time.Sleep(time.Second)
    fmt.Println("main goroutine will quit")
}

-------
run in main goroutine
run in child goroutine
run in grand child goroutine
run in grand grand child goroutine
main goroutine will quit

main 函數(shù)運(yùn)行在主協(xié)程(main goroutine)里面,上面的例子中我們?cè)谥鲄f(xié)程里面啟動(dòng)了一個(gè)子協(xié)程带猴,子協(xié)程又啟動(dòng)了一個(gè)孫子協(xié)程昔汉,孫子協(xié)程又啟動(dòng)了一個(gè)曾孫子協(xié)程。這些協(xié)程之間似乎形成了父子拴清、子孫靶病、關(guān)系,但是實(shí)際上協(xié)程之間并不存在這么多的層級(jí)關(guān)系口予,在 Go 語言里只有一個(gè)主協(xié)程娄周,其它都是它的子協(xié)程,子協(xié)程之間是平行關(guān)系沪停。

值得注意的是這里的 go 關(guān)鍵字語法和前面的 defer 關(guān)鍵字語法是一樣的煤辨,它后面跟了一個(gè)匿名函數(shù)裳涛,然后還要帶上一對(duì)(),表示對(duì)匿名函數(shù)的調(diào)用众辨。

上面的代碼中主協(xié)程睡眠了 1s端三,等待子協(xié)程們執(zhí)行完畢。如果將睡眠的這行代碼去掉泻轰,將會(huì)看不到子協(xié)程運(yùn)行的痕跡(注意測(cè)試時(shí)如果不睡眠技肩,會(huì)看不到輸出,莫名其妙)

-------------
run in main goroutine
main goroutine will quit

這是因?yàn)橹鲄f(xié)程運(yùn)行結(jié)束浮声,其它協(xié)程就會(huì)立即消亡,不管它們是否已經(jīng)開始運(yùn)行旋奢。

2.協(xié)程的本質(zhì)
一個(gè)進(jìn)程內(nèi)部可以運(yùn)行多個(gè)線程泳挥,而每個(gè)線程又可以運(yùn)行很多協(xié)程。線程要負(fù)責(zé)對(duì)協(xié)程進(jìn)行調(diào)度至朗,保證每個(gè)協(xié)程都有機(jī)會(huì)得到執(zhí)行屉符。當(dāng)一個(gè)協(xié)程睡眠時(shí),它要將線程的運(yùn)行權(quán)讓給其它的協(xié)程來運(yùn)行锹引,而不能持續(xù)霸占這個(gè)線程矗钟。同一個(gè)線程內(nèi)部最多只會(huì)有一個(gè)協(xié)程正在運(yùn)行。

image.png

線程的調(diào)度是由操作系統(tǒng)負(fù)責(zé)的嫌变,調(diào)度算法運(yùn)行在內(nèi)核態(tài)吨艇,而協(xié)程的調(diào)用是由 Go 語言的運(yùn)行時(shí)負(fù)責(zé)的,調(diào)度算法運(yùn)行在用戶態(tài)腾啥。

協(xié)程可以簡(jiǎn)化為三個(gè)狀態(tài)东涡,運(yùn)行態(tài)、就緒態(tài)和休眠態(tài)倘待。同一個(gè)線程中最多只會(huì)存在一個(gè)處于運(yùn)行態(tài)的協(xié)程疮跑,就緒態(tài)的協(xié)程是指那些具備了運(yùn)行能力但是還沒有得到運(yùn)行機(jī)會(huì)的協(xié)程,它們隨時(shí)會(huì)被調(diào)度到運(yùn)行態(tài)凸舵,休眠態(tài)的協(xié)程還不具備運(yùn)行能力祖娘,它們是在等待某些條件的發(fā)生,比如 IO 操作的完成啊奄、睡眠時(shí)間的結(jié)束等渐苏。


image.png

操作系統(tǒng)對(duì)線程的調(diào)度是搶占式的,也就是說單個(gè)線程的死循環(huán)不會(huì)影響其它線程的執(zhí)行增热,每個(gè)線程的連續(xù)運(yùn)行受到時(shí)間片的限制整以。

Go 語言運(yùn)行時(shí)對(duì)協(xié)程的調(diào)度并不是搶占式的。如果單個(gè)協(xié)程通過死循環(huán)霸占了線程的執(zhí)行權(quán)峻仇,那這個(gè)線程就沒有機(jī)會(huì)去運(yùn)行其它協(xié)程了公黑,你可以說這個(gè)線程假死了。不過一個(gè)進(jìn)程內(nèi)部往往有多個(gè)線程,假死了一個(gè)線程沒事凡蚜,全部假死了才會(huì)導(dǎo)致整個(gè)進(jìn)程卡死人断。

每個(gè)線程都會(huì)包含多個(gè)就緒態(tài)的協(xié)程形成了一個(gè)就緒隊(duì)列,如果這個(gè)線程因?yàn)槟硞€(gè)別協(xié)程死循環(huán)導(dǎo)致假死朝蜘,那這個(gè)隊(duì)列上所有的就緒態(tài)協(xié)程是不是就沒有機(jī)會(huì)得到運(yùn)行了呢恶迈?Go 語言運(yùn)行時(shí)調(diào)度器采用了 work-stealing 算法,當(dāng)某個(gè)線程空閑時(shí)谱醇,也就是該線程上所有的協(xié)程都在休眠(或者一個(gè)協(xié)程都沒有)暇仲,它就會(huì)去其它線程的就緒隊(duì)列上去偷一些協(xié)程來運(yùn)行。也就是說這些線程會(huì)主動(dòng)找活干副渴,在正常情況下奈附,運(yùn)行時(shí)會(huì)盡量平均分配工作任務(wù)。

3.設(shè)置線程數(shù)
默認(rèn)情況下煮剧,Go 運(yùn)行時(shí)會(huì)將線程數(shù)會(huì)被設(shè)置為機(jī)器 CPU 邏輯核心數(shù)斥滤。同時(shí)它內(nèi)置的 runtime 包提供了 GOMAXPROCS(n int) 函數(shù)允許我們動(dòng)態(tài)調(diào)整線程數(shù),注意這個(gè)函數(shù)名字是全大寫勉盅,Go 語言的設(shè)計(jì)者就是這么任性佑颇,該函數(shù)會(huì)返回修改前的線程數(shù),如果參數(shù) n <=0 草娜,就不會(huì)產(chǎn)生修改效果挑胸,等價(jià)于讀操作。

package main

import "fmt"
import "runtime"

func main() {
// 讀取默認(rèn)的線程數(shù)
fmt.Println(runtime.GOMAXPROCS(0))
// 設(shè)置線程數(shù)為 10
runtime.GOMAXPROCS(10)
// 讀取當(dāng)前的線程數(shù)
fmt.Println(runtime.GOMAXPROCS(0))
}

--------
4
10

獲取當(dāng)前的協(xié)程數(shù)量可以使用 runtime 包提供的 NumGoroutine() 方法

package main

import "fmt"
import "time"
import "runtime"

func main() {
    fmt.Println(runtime.NumGoroutine())
    for i:=0;i<10;i++ {
        go func(){
            for {
                time.Sleep(time.Second)
            }
        }()
    }
    fmt.Println(runtime.NumGoroutine())
}

------
1
11
二驱还、使用atmoic包和互斥鎖mutex解決競(jìng)爭(zhēng)狀態(tài)

摘自《go語言實(shí)戰(zhàn)》P132
參考Go語言實(shí)戰(zhàn)筆記(十三)| Go 并發(fā)資源競(jìng)爭(zhēng)

package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    count int32
    wg    sync.WaitGroup
)

func main() {
    wg.Add(2)
    go incCount()
    go incCount()
    wg.Wait()
    fmt.Println(count)
}

func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        value := count
        runtime.Gosched()
        value++
        count = value
    }
}

這是一個(gè)資源競(jìng)爭(zhēng)的例子嗜暴,我們可以多運(yùn)行幾次這個(gè)程序,會(huì)發(fā)現(xiàn)結(jié)果可能是2议蟆,也可以是3闷沥,也可能是4。因?yàn)楣蚕碣Y源count變量沒有任何同步保護(hù)咐容,所以兩個(gè)goroutine都會(huì)對(duì)其進(jìn)行讀寫舆逃,會(huì)導(dǎo)致對(duì)已經(jīng)計(jì)算好的結(jié)果覆蓋,以至于產(chǎn)生錯(cuò)誤結(jié)果.
1.atmoic包
如果兩個(gè)或者多個(gè) goroutine 在沒有互相同步的情況下戳粒,訪問某個(gè)共享的資源路狮,并試圖同時(shí)讀和寫這個(gè)資源,就處于相互競(jìng)爭(zhēng)的狀態(tài)蔚约,這種情況被稱作競(jìng)爭(zhēng)狀態(tài)(race candition)奄妨。競(jìng)爭(zhēng)狀態(tài)的存在是讓并發(fā)程序變得復(fù)雜的地方,十分容易引起潛在問題苹祟。對(duì)一個(gè)共享資源的讀和寫操作必須是原子化的砸抛,換句話說评雌,同一時(shí)刻只能有一個(gè) goroutine 對(duì)共享資源進(jìn)行讀和寫操作。

func incCount() {
    defer wg.Done()
    for i := 0; i < 2; i++ {
        value := atomic.LoadInt32(&count)
        runtime.Gosched()
        value++
        atomic.StoreInt32(&count,value)
    }
}

留意這里atomic.LoadInt32和atomic.StoreInt32兩個(gè)函數(shù)直焙,一個(gè)讀取int32類型變量的值景东,一個(gè)是修改int32類型變量的值,這兩個(gè)都是原子性的操作奔誓,Go已經(jīng)幫助我們?cè)诘讓邮褂眉渔i機(jī)制斤吐,保證了共享資源的同步和安全,所以我們可以得到正確的結(jié)果厨喂,這時(shí)候我們?cè)偈褂觅Y源競(jìng)爭(zhēng)檢測(cè)工具go build -race檢查和措,也不會(huì)提示有問題了。

atom包的Addint64會(huì)同步整型值的加法杯聚,強(qiáng)制同一時(shí)刻只能有一個(gè)goroutine運(yùn)行并完成這個(gè)加法操作臼婆。atomic包里還有很多原子化的函數(shù)可以保證并發(fā)下資源同步訪問修改的問題,比如函數(shù)atomic.AddInt32可以直接對(duì)一個(gè)int32類型的變量進(jìn)行修改幌绍,在原值的基礎(chǔ)上再增加多少的功能,也是原子性的故响,這里不再舉例傀广,大家自己可以試試。

atomic雖然可以解決資源競(jìng)爭(zhēng)問題彩届,但是比較都是比較簡(jiǎn)單的伪冰,支持的數(shù)據(jù)類型也有限,所以Go語言還提供了一個(gè)sync包樟蠕,這個(gè)sync包里提供了一種互斥型的鎖贮聂,可以讓我們自己靈活的控制哪些代碼,同時(shí)只能有一個(gè)goroutine訪問寨辩,被sync互斥鎖控制的這段代碼范圍吓懈,被稱之為臨界區(qū),臨界區(qū)的代碼靡狞,同一時(shí)間耻警,只能又一個(gè)goroutine訪問。

更多可參考Go語言atomic原子操作

原子操作由底層硬件支持甸怕,而鎖則由操作系統(tǒng)提供的API實(shí)現(xiàn)甘穿。若實(shí)現(xiàn)相同的功能,前者通常會(huì)更有效率梢杭。

2.互斥鎖

// This sample program demonstrates how to use a mutex
// to define critical sections of code that need synchronous
// access.
package main

import (
    "fmt"
    "runtime"
    "sync"
)

var (
    // counter is a variable incremented by all goroutines.
    counter int

    // wg is used to wait for the program to finish.
    wg sync.WaitGroup

    // mutex is used to define a critical section of code.
    mutex sync.Mutex
)

// main is the entry point for all Go programs.
func main() {
    // Add a count of two, one for each goroutine.
    wg.Add(2)

    // Create two goroutines.
    go incCounter(1)
    go incCounter(2)

    // Wait for the goroutines to finish.
    wg.Wait()
    fmt.Printf("Final Counter: %d\n", counter)
}

// incCounter increments the package level Counter variable
// using the Mutex to synchronize and provide safe access.
func incCounter(id int) {
    // Schedule the call to Done to tell main we are done.
    defer wg.Done()

    for count := 0; count < 2; count++ {
        // Only allow one goroutine through this
        // critical section at a time.
        mutex.Lock()
        {
            // Capture the value of counter.
            value := counter

            // Yield the thread and be placed back in queue.
            runtime.Gosched()

            // Increment our local value of counter.
            value++

            // Store the value back into counter.
            counter = value
        }
        mutex.Unlock()
        // Release the lock and allow any
        // waiting goroutine through.
    }
}

對(duì) counter 變量的操作在第 46 行和第 60 行的 Lock()和 Unlock()函數(shù)調(diào)用定義的臨界區(qū)里被保護(hù)起來温兼。使用大括號(hào)只是為了讓臨界區(qū)看起來更清晰,并不是必需的武契。同一時(shí)刻只有一個(gè) goroutine 可以進(jìn)入臨界區(qū)募判。之后荡含,直到調(diào)用 Unlock()函數(shù)之后,其他 goroutine 才能進(jìn)入臨界區(qū)兰伤。當(dāng)?shù)?52 行強(qiáng)制將當(dāng)前 goroutine 退出當(dāng)前線程后内颗,調(diào)度器會(huì)再次分配這個(gè) goroutine 繼續(xù)運(yùn)行。當(dāng)程序結(jié)束時(shí)敦腔,我們得到正確的值 4均澳,競(jìng)爭(zhēng)狀態(tài)不再存在。

三符衔、通道

原子函數(shù)和互斥鎖都能工作找前,但是依靠它們都不會(huì)讓編寫并發(fā)程序變得更簡(jiǎn)單,更不容易出錯(cuò)判族,或者更有趣躺盛。在 Go 語言里,你不僅可以使用原子函數(shù)和互斥鎖來保證對(duì)共享資源的安全訪問以及消除競(jìng)爭(zhēng)狀態(tài)形帮,還可以使用通道槽惫,通過發(fā)送和接收需要共享的資源,在 goroutine 之間做同步辩撑。

不同的并行協(xié)程之間交流的方式有兩種界斜,一種是通過共享變量,另一種是通過隊(duì)列合冀。Go 語言鼓勵(lì)使用隊(duì)列的形式來交流各薇,它單獨(dú)為協(xié)程之間的隊(duì)列數(shù)據(jù)交流定制了特殊的語法 —— 通道。

通道是協(xié)程的輸入和輸出君躺。作為協(xié)程的輸出峭判,通道是一個(gè)容器,它可以容納數(shù)據(jù)棕叫。作為協(xié)程的輸入林螃,通道是一個(gè)生產(chǎn)者,它可以向協(xié)程提供數(shù)據(jù)谍珊。通道作為容器是有限定大小的治宣,滿了就寫不進(jìn)去,空了就讀不出來砌滞。通道還有它自己的類型侮邀,它可以限定進(jìn)入通道的數(shù)據(jù)的類型。


image.png

1.創(chuàng)建通道
創(chuàng)建通道只有一種語法贝润,那就是 make 全局函數(shù)绊茧,提供第一個(gè)類型參數(shù)限定通道可以容納的數(shù)據(jù)類型,再提供第二個(gè)整數(shù)參數(shù)作為通道的容器大小打掘。大小參數(shù)是可選的华畏,如果不填鹏秋,那這個(gè)通道的容量為零,叫著「非緩沖型通道」亡笑,非緩沖型通道必須確保有協(xié)程正在嘗試讀取當(dāng)前通道侣夷,否則寫操作就會(huì)阻塞直到有其它協(xié)程來從通道中讀東西。

如果兩個(gè) goroutine沒有同時(shí)準(zhǔn)備好仑乌,通道會(huì)導(dǎo)致先執(zhí)行發(fā)送或接收操作的 goroutine 阻塞等待百拓。這種對(duì)通道進(jìn)行發(fā)送和接收的交互行為本身就是同步的。其中任意一個(gè)操作都無法離開另一個(gè)操作單獨(dú)存在晰甚。非緩沖型通道總是處于既滿又空的狀態(tài)衙传。

與之對(duì)應(yīng)的有限定大小的通道就是緩沖型通道。在 Go 語言里不存在無界通道厕九,每個(gè)通道都是有限定最大容量的蓖捶。

// 緩沖型通道,里面只能放整數(shù)
var bufferedChannel = make(chan int, 1024)
// 非緩沖型通道
var unbufferedChannel = make(chan int)

2.讀寫通道
Go 語言為通道的讀寫設(shè)計(jì)了特殊的箭頭語法糖 <-扁远,讓我們使用通道時(shí)非常方便俊鱼。可以把箭頭的方向理解為數(shù)據(jù)的流向。把箭頭寫在通道變量的右邊就是寫通道畅买,把箭頭寫在通道的左邊就是讀通道亭引。一次只能讀寫一個(gè)元素。

package main

import "fmt"

func main() {
 var ch chan int = make(chan int, 4)
 for i:=0; i<cap(ch); i++ {
  ch <- i   // 寫通道
 }
 for len(ch) > 0 {
  var value int = <- ch  // 讀通道
  fmt.Println(value)
 }
}
--------------
0
1
2
3

通道作為容器皮获,它可以像切片一樣,使用 cap() 和 len() 全局函數(shù)獲得通道的容量和當(dāng)前內(nèi)部的元素個(gè)數(shù)纹冤。通道一般作為不同的協(xié)程交流的媒介洒宝,在同一個(gè)協(xié)程里它也是可以使用的。注意看輸出結(jié)果萌京,先寫進(jìn)去的先出來雁歌,說明通道是一個(gè)隊(duì)列

func writeChan(ch chan <- int){
    for i:=0; i<cap(ch); i++ {
        ch <- i   // 寫通道
    }

    for len(ch) > 0 {
        var value int = <- ch  // 讀通道
        fmt.Println(value)
    }
}

上面的代碼編譯不通過知残,會(huì)報(bào)錯(cuò):Invalid operation: <- ch (receive from send-only type chan<- int)靠瞎。也就是說,當(dāng)函數(shù)的參數(shù)中出現(xiàn)通道時(shí)求妹,可以不加箭頭乏盐,表示 雙向的。如果在chan后面加上<-就表示數(shù)據(jù)要流入通道制恍,也就是這個(gè)參數(shù)只能寫父能,不能讀。反過來净神,改成ch <- chan int何吝,就是只能讀溉委,不能寫的通道參數(shù)。

3.讀寫阻塞
通道滿了蒿讥,寫操作就會(huì)阻塞滑臊,協(xié)程就會(huì)進(jìn)入休眠鸽粉,直到有其它協(xié)程讀通道挪出了空間,協(xié)程才會(huì)被喚醒藻三。如果有多個(gè)協(xié)程的寫操作都阻塞了,一個(gè)讀操作只會(huì)喚醒一個(gè)協(xié)程絮爷。

通道空了趴酣,讀操作就會(huì)阻塞,協(xié)程也會(huì)進(jìn)入睡眠坑夯,直到有其它協(xié)程寫通道裝進(jìn)了數(shù)據(jù)才會(huì)被喚醒岖寞。如果有多個(gè)協(xié)程的讀操作阻塞了,一個(gè)寫操作也只會(huì)喚醒一個(gè)協(xié)程柜蜈。

package main

import "fmt"
import "time"
import "math/rand"

func send(ch chan int) {
 for {
  var value = rand.Intn(100)
  ch <- value
  fmt.Printf("send %d\n", value)
 }
}

func recv(ch chan int) {
 for {
  value := <- ch
  fmt.Printf("recv %d\n", value)
  time.Sleep(time.Second)
 }
}

func main() {
 var ch = make(chan int, 1)
 // 子協(xié)程循環(huán)讀
 go recv(ch)
 // 主協(xié)程循環(huán)寫
 send(ch)
}

--------
send 81
send 87
recv 81
recv 87
send 47
recv 47
send 59

4.關(guān)閉通道
Go 語言的通道有點(diǎn)像文件仗谆,不但支持讀寫操作, 還支持關(guān)閉淑履。讀取一個(gè)已經(jīng)關(guān)閉的通道會(huì)立即返回通道類型的「零值」隶垮,而寫一個(gè)已經(jīng)關(guān)閉的通道會(huì)拋異常。如果通道里的元素是整型的秘噪,讀操作是不能通過返回值來確定通道是否關(guān)閉的狸吞。

package main

import "fmt"

func main() {
 var ch = make(chan int, 4)
 ch <- 1
 ch <- 2
 close(ch)

 value := <- ch
 fmt.Println(value)
 value = <- ch
 fmt.Println(value)
 value = <- ch
 fmt.Println(value)
}

-------
1
2
0

這時(shí)候就需要引入一個(gè)新的知識(shí)點(diǎn) —— 使用 for range 語法糖來遍歷通道

for range 語法我們已經(jīng)見了很多次了,它是多功能的指煎,除了可以遍歷數(shù)組蹋偏、切片、字典至壤,還可以遍歷通道威始,取代箭頭操作符。當(dāng)通道空了像街,循環(huán)會(huì)暫停阻塞黎棠,當(dāng)通道關(guān)閉時(shí),阻塞停止镰绎,循環(huán)也跟著結(jié)束了脓斩。當(dāng)循環(huán)結(jié)束時(shí),我們就知道通道已經(jīng)關(guān)閉了跟狱。

package main

import "fmt"

func main() {
 var ch = make(chan int, 4)
 ch <- 1
 ch <- 2
 close(ch)

 // for range 遍歷通道
 for value := range ch {
  fmt.Println(value)
 }
}

------
1
2

通道如果沒有顯式關(guān)閉俭厚,當(dāng)它不再被程序使用的時(shí)候,會(huì)自動(dòng)關(guān)閉被垃圾回收掉驶臊。不過優(yōu)雅的程序應(yīng)該將通道看成資源挪挤,顯式關(guān)閉每個(gè)不再使用的資源是一種良好的習(xí)慣叼丑。

5.通道寫安全
上面提到向一個(gè)已經(jīng)關(guān)閉的通道執(zhí)行寫操作會(huì)拋出異常,這意味著我們?cè)趯懲ǖ罆r(shí)一定要確保通道沒有被關(guān)閉扛门。

package main

import "fmt"

func send(ch chan int) {
 i := 0
 for {
  i++
  ch <- i
 }
}

func recv(ch chan int) {
 value := <- ch
 fmt.Println(value)
 value = <- ch
 fmt.Println(value)
 close(ch)
}

func main() {
 var ch = make(chan int, 4)
 go recv(ch)
 send(ch)
}

---------
1
2
panic: send on closed channel

goroutine 1 [running]:
main.send(0xc42008a000)
 /Users/qianwp/go/src/github.com/pyloque/practice/main.go:9 +0x44
main.main()
 /Users/qianwp/go/src/github.com/pyloque/practice/main.go:24 +0x66
exit status 2

那如何確保呢鸠信?Go 語言并不存在一個(gè)內(nèi)置函數(shù)可以判斷出通道是否已經(jīng)被關(guān)閉。即使存在這樣一個(gè)函數(shù)论寨,當(dāng)你判斷時(shí)通道沒有關(guān)閉星立,并不意味著當(dāng)你往通道里寫數(shù)據(jù)時(shí)它就一定沒有被關(guān)閉,并發(fā)環(huán)境下葬凳,它是可能被其它協(xié)程隨時(shí)關(guān)閉的绰垂。

確保通道寫安全的最好方式是由負(fù)責(zé)寫通道的協(xié)程自己來關(guān)閉通道,讀通道的協(xié)程不要去關(guān)閉通道火焰。

package main

import "fmt"

func send(ch chan int) {
 ch <- 1
 ch <- 2
 ch <- 3
 ch <- 4
 close(ch)
}

func recv(ch chan int) {
 for v := range ch {
  fmt.Println(v)
 }
}

func main() {
 var ch = make(chan int, 1)
 go send(ch)
 recv(ch)
}

-----------
1
2
3
4

這個(gè)方法確實(shí)可以解決單寫多讀的場(chǎng)景劲装,可要是遇上了多寫單讀的場(chǎng)合該怎么辦呢?任意一個(gè)讀寫通道的協(xié)程都不可以隨意關(guān)閉通道昌简,否則會(huì)導(dǎo)致其它寫通道協(xié)程拋出異常占业。這時(shí)候就必須讓其它不相干的協(xié)程來干這件事,這個(gè)協(xié)程需要等待所有的寫通道協(xié)程都結(jié)束運(yùn)行后才能關(guān)閉通道纯赎。那其它協(xié)程要如何才能知道所有的寫通道已經(jīng)結(jié)束運(yùn)行了呢谦疾?這個(gè)就需要使用到內(nèi)置 sync 包提供的 WaitGroup 對(duì)象,它使用計(jì)數(shù)來等待指定事件完成犬金。

package main

import "fmt"
import "time"
import "sync"

func send(ch chan int, wg *sync.WaitGroup) {
 defer wg.Done() // 計(jì)數(shù)值減一
 i := 0
 for i < 4 {
  i++
  ch <- i
 }
}

func recv(ch chan int) {
 for v := range ch {
  fmt.Println(v)
 }
}

func main() {
 var ch = make(chan int, 4)
 var wg = new(sync.WaitGroup)
 wg.Add(2) // 增加計(jì)數(shù)值
 go send(ch, wg)  // 寫
 go send(ch, wg)  // 寫
 go recv(ch)
 // Wait() 阻塞等待所有的寫通道協(xié)程結(jié)束
 // 待計(jì)數(shù)值變成零念恍,Wait() 才會(huì)返回
 wg.Wait()
 // 關(guān)閉通道
 close(ch)
 time.Sleep(time.Second)
}

---------
1
2
3
4
1
2
3
4

6.多路通道
這里可以先參考并發(fā)編程基礎(chǔ)知識(shí)三 異步,非阻塞和 IO 復(fù)用
在真實(shí)的世界中晚顷,還有一種消息傳遞場(chǎng)景樊诺,那就是消費(fèi)者有多個(gè)消費(fèi)來源,只要有一個(gè)來源生產(chǎn)了數(shù)據(jù)音同,消費(fèi)者就可以讀這個(gè)數(shù)據(jù)進(jìn)行消費(fèi)。這時(shí)候可以將多個(gè)來源通道的數(shù)據(jù)匯聚到目標(biāo)通道秃嗜,然后統(tǒng)一在目標(biāo)通道進(jìn)行消費(fèi)权均。

package main

import "fmt"
import "time"

// 每隔一會(huì)生產(chǎn)一個(gè)數(shù)
func send(ch chan int, gap time.Duration) {
 i := 0
 for {
  i++
  ch <- i
  time.Sleep(gap)
 }
}

// 將多個(gè)原通道內(nèi)容拷貝到單一的目標(biāo)通道
func collect(source chan int, target chan int) {
 for v := range source {
  target <- v
 }
}

// 從目標(biāo)通道消費(fèi)數(shù)據(jù)
func recv(ch chan int) {
 for v := range ch {
  fmt.Printf("receive %d\n", v)
 }
}


func main() {
 var ch1 = make(chan int)
 var ch2 = make(chan int)
 var ch3 = make(chan int)
 go send(ch1, time.Second)
 go send(ch2, 2 * time.Second)
 go collect(ch1, ch3)
 go collect(ch2, ch3)
 recv(ch3)
}

---------
receive 1
receive 1
receive 2
receive 2
receive 3
receive 4
receive 3
receive 5
receive 6
receive 4
receive 7
receive 8
receive 5
receive 9
....

但是上面這種形式比較繁瑣,需要為每一種消費(fèi)來源都單獨(dú)啟動(dòng)一個(gè)匯聚協(xié)程锅锨。Go 語言為這種使用場(chǎng)景帶來了「多路復(fù)用」語法糖叽赊,也就是下面要講的 select 語句,它可以同時(shí)管理多個(gè)通道讀寫必搞,如果所有通道都不能讀寫必指,它就整體阻塞,只要有一個(gè)通道可以讀寫恕洲,它就會(huì)繼續(xù)塔橡。下面我們使用 select 語句來簡(jiǎn)化上面的邏輯

package main

import "fmt"
import "time"

func send(ch chan int, gap time.Duration) {
 i := 0
 for {
  i++
  ch <- i
  time.Sleep(gap)
 }
}

func recv(ch1 chan int, ch2 chan int) {
 for {
  select {
   case v := <- ch1:
    fmt.Printf("recv %d from ch1\n", v)
   case v := <- ch2:
    fmt.Printf("recv %d from ch2\n", v)
  }
 }
}

func main() {
 var ch1 = make(chan int)
 var ch2 = make(chan int)
 go send(ch1, time.Second)
 go send(ch2, 2 * time.Second)
 recv(ch1, ch2)
}

------------
recv 1 from ch2
recv 1 from ch1
recv 2 from ch1
recv 3 from ch1
recv 2 from ch2
recv 4 from ch1
recv 3 from ch2
recv 5 from ch1

上面是多路復(fù)用 select 語句的讀通道形式梅割,下面是它的寫通道形式,只要有一個(gè)通道能寫進(jìn)去葛家,它就會(huì)打破阻塞户辞。

select {
  case ch1 <- v:
      fmt.Println("send to ch1")
  case ch2 <- v:
      fmt.Println("send to ch2")
}

7.非阻塞讀寫
前面我們講的讀寫都是阻塞讀寫,Go 語言還提供了通道的非阻塞讀寫癞谒。當(dāng)通道空時(shí)底燎,讀操作不會(huì)阻塞,當(dāng)通道滿時(shí)弹砚,寫操作也不會(huì)阻塞双仍。非阻塞讀寫需要依靠 select 語句的 default 分支。當(dāng) select 語句所有通道都不可讀寫時(shí)桌吃,如果定義了 default 分支朱沃,那就會(huì)執(zhí)行 default 分支邏輯,這樣就起到了不阻塞的效果读存。下面我們演示一個(gè)單生產(chǎn)者多消費(fèi)者的場(chǎng)景为流。生產(chǎn)者同時(shí)向兩個(gè)通道寫數(shù)據(jù),寫不進(jìn)去就丟棄让簿。

package main

import "fmt"
import "time"

func send(ch1 chan int, ch2 chan int) {
 i := 0
 for {
  i++
  select {
   case ch1 <- i:
    fmt.Printf("send ch1 %d\n", i)
   case ch2 <- i:
    fmt.Printf("send ch2 %d\n", i)
   default:
  }
 }
}

func recv(ch chan int, gap time.Duration, name string) {
 for v := range ch {
  fmt.Printf("receive %s %d\n", name, v)
  time.Sleep(gap)
 }
}

func main() {
 // 無緩沖通道
 var ch1 = make(chan int)
 var ch2 = make(chan int)
 // 兩個(gè)消費(fèi)者的休眠時(shí)間不一樣敬察,名稱不一樣
 go recv(ch1, time.Second, "ch1")
 go recv(ch2, 2 * time.Second, "ch2")
 send(ch1, ch2)
}

------------
send ch1 27
send ch2 28
receive ch1 27
receive ch2 28
send ch1 6708984
receive ch1 6708984
send ch2 13347544
send ch1 13347775
receive ch2 13347544
receive ch1 13347775
send ch1 20101642
receive ch1 20101642
send ch2 26775795
receive ch2 26775795
...

從輸出中可以明顯看出有很多的數(shù)據(jù)都丟棄了,消費(fèi)者讀到的數(shù)據(jù)是不連續(xù)的尔当。如果將 select 語句里面的 default 分支干掉莲祸,再運(yùn)行一次,結(jié)果如下

send ch2 1
send ch1 2
receive ch1 2
receive ch2 1
receive ch1 3
send ch1 3
receive ch2 4
send ch2 4
send ch1 5
receive ch1 5
receive ch1 6
send ch1 6
receive ch1 7

可以看到消費(fèi)者讀到的數(shù)據(jù)都連續(xù)了椭迎,但是每個(gè)數(shù)據(jù)只給了一個(gè)消費(fèi)者锐帜。select 語句的 default 分支非常關(guān)鍵,它是決定通道讀寫操作阻塞與否的關(guān)鍵畜号。

8.通道內(nèi)部結(jié)構(gòu)
Go 語言的通道內(nèi)部結(jié)構(gòu)是一個(gè)循環(huán)數(shù)組缴阎,通過讀寫偏移量來控制元素發(fā)送和接受。它為了保證線程安全简软,內(nèi)部會(huì)有一個(gè)全局鎖來控制并發(fā)蛮拔。對(duì)于發(fā)送和接受操作都會(huì)有一個(gè)隊(duì)列來容納處于阻塞狀態(tài)的協(xié)程。


image.png
type hchan struct {
  qcount uint  // 通道有效元素個(gè)數(shù)
  dataqsize uint   // 通道容量痹升,循環(huán)數(shù)組總長度
  buf unsafe.Pointer // 數(shù)組地址
  elemsize uint16 // 內(nèi)部元素的大小
  closed uint32 // 是否已關(guān)閉 0或者1
  elemtype *_type // 內(nèi)部元素類型信息
  sendx uint // 循環(huán)數(shù)組的寫偏移量
  recvx uint // 循環(huán)數(shù)組的讀偏移量
  recvq waitq // 阻塞在讀操作上的協(xié)程隊(duì)列
  sendq waitq // 阻塞在寫操作上的協(xié)程隊(duì)列
  
  lock mutex // 全局鎖
}

這個(gè)循環(huán)隊(duì)列和 Java 語言內(nèi)置的 ArrayBlockingQueue 結(jié)構(gòu)如出一轍建炫。從這個(gè)數(shù)據(jù)結(jié)構(gòu)中我們也可以得出結(jié)論:隊(duì)列在本質(zhì)上是使用共享變量加鎖的方式來實(shí)現(xiàn)的,共享變量才是并行交流的本質(zhì)疼蛾。

class ArrayBlockingQueue extends AbstractQueue {
  Object[] items;
  int takeIndex;
  int putIndex;
  int count;
  ReentrantLock lock;
  ...
}

所以讀者請(qǐng)不要認(rèn)為 Go 語言的通道很神奇肛跌,Go 語言只是對(duì)通道設(shè)計(jì)了一套便于使用的語法糖,讓這套數(shù)據(jù)結(jié)構(gòu)顯的平易近人。它在內(nèi)部實(shí)現(xiàn)上和其它語言的并發(fā)隊(duì)列大同小異衍慎。

四转唉、sync.Once的實(shí)現(xiàn)分析

sync.once可以控制函數(shù)只能被調(diào)用一次,不能多次重復(fù)調(diào)用西饵。

我們可以用下面的代碼實(shí)現(xiàn)一個(gè)線程安全的單例模式

package singleton
import (
    "fmt"
    "sync"
)
type object struct {
    name string
}
var once sync.Once
var obj *object //單例指針
//公開方法 外包調(diào)用
func Instance() *object {
    once.Do(getObj)
    return obj
}
func getObj() {
    if obj == nil {
        obj = new(object)
        //可以做其他初始化事件
    }
}
//單例測(cè)試
func (obj *object) Test() {
    fmt.Println(obj.name)
}

如果我們要自己實(shí)現(xiàn)這么一個(gè)功能如何做呢酝掩?

  • 定義一個(gè)status變量用來描述是否已經(jīng)執(zhí)行過了
  • 使用sync.Mutex 或者sync.Atomic實(shí)現(xiàn)線程安全的獲取status狀態(tài), 根據(jù)狀態(tài)判斷是否執(zhí)行特定的函數(shù)

然后看下sync.Once實(shí)際是如何實(shí)現(xiàn)的

// Once is an object that will perform exactly one action.
type Once struct {
    m    Mutex
    done uint32
}
//使用了雙層檢查機(jī)制 
func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 1 {
        return
    }
    // Slow-path.
    o.m.Lock()
    defer o.m.Unlock()
    //這里需要再次重新判斷下眷柔,因?yàn)?atomic.LoadUint32取出狀態(tài)值
    //到  o.m.Lock() 之間是有可能存在其它gotoutine改變status的狀態(tài)值的
    if o.done == 0 {
        f()
        atomic.StoreUint32(&o.done, 1)
    }
}

也有網(wǎng)友寫出了更簡(jiǎn)潔的代碼,不知道官方為什么沒有采用下面的實(shí)現(xiàn)方式期虾。

type Once struct {
    done int32
}
func (o *Once) Do(f func()) {
    if atomic.LoadInt32(&o.done) == 1 {
        return
    }
    // Slow-path.
    if atomic.CompareAndSwapInt32(&o.done, 0, 1) {
        f()
    }
}
五、擴(kuò)展閱讀

golang 關(guān)于鎖 mutex,踩過的坑
Golang并發(fā):再也不愁選channel還是選鎖

面對(duì)一個(gè)并發(fā)問題的時(shí)候驯嘱,應(yīng)當(dāng)選擇合適的并發(fā)方式:channel還是mutex镶苞。選擇的依據(jù)是他們的能力/特性:channel的能力是讓數(shù)據(jù)流動(dòng)起來,擅長的是數(shù)據(jù)流動(dòng)的場(chǎng)景鞠评,mutex的能力是數(shù)據(jù)不動(dòng)茂蚓,某段時(shí)間只給一個(gè)協(xié)程訪問數(shù)據(jù)的權(quán)限擅長數(shù)據(jù)位置固定的場(chǎng)景

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市剃幌,隨后出現(xiàn)的幾起案子聋涨,更是在濱河造成了極大的恐慌,老刑警劉巖负乡,帶你破解...
    沈念sama閱讀 221,820評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牍白,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡抖棘,警方通過查閱死者的電腦和手機(jī)茂腥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來切省,“玉大人最岗,你說我怎么就攤上這事〕Γ” “怎么了般渡?”我有些...
    開封第一講書人閱讀 168,324評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長芙盘。 經(jīng)常有香客問我诊杆,道長,這世上最難降的妖魔是什么何陆? 我笑而不...
    開封第一講書人閱讀 59,714評(píng)論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮豹储,結(jié)果婚禮上贷盲,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好巩剖,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,724評(píng)論 6 397
  • 文/花漫 我一把揭開白布铝穷。 她就那樣靜靜地躺著,像睡著了一般佳魔。 火紅的嫁衣襯著肌膚如雪曙聂。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評(píng)論 1 310
  • 那天鞠鲜,我揣著相機(jī)與錄音宁脊,去河邊找鬼。 笑死贤姆,一個(gè)胖子當(dāng)著我的面吹牛榆苞,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播霞捡,決...
    沈念sama閱讀 40,897評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼坐漏,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了碧信?” 一聲冷哼從身側(cè)響起赊琳,我...
    開封第一講書人閱讀 39,804評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎砰碴,沒想到半個(gè)月后躏筏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡衣式,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,431評(píng)論 3 340
  • 正文 我和宋清朗相戀三年寸士,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碴卧。...
    茶點(diǎn)故事閱讀 40,561評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡弱卡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出住册,到底是詐尸還是另有隱情婶博,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評(píng)論 5 350
  • 正文 年R本政府宣布荧飞,位于F島的核電站凡人,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏叹阔。R本人自食惡果不足惜挠轴,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,928評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望耳幢。 院中可真熱鬧岸晦,春花似錦欧啤、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至冈在,卻和暖如春倒慧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背包券。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評(píng)論 1 272
  • 我被黑心中介騙來泰國打工纫谅, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人兴使。 一個(gè)月前我還...
    沈念sama閱讀 48,983評(píng)論 3 376
  • 正文 我出身青樓系宜,卻偏偏與公主長得像,于是被迫代替她去往敵國和親发魄。 傳聞我的和親對(duì)象是個(gè)殘疾皇子盹牧,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,573評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容