了解過go的都知道因块,go最為突出的優(yōu)點(diǎn)就是它天然支持高并發(fā)震贵,但是所有高并發(fā)情況都面臨著一個(gè)很明顯的問題淹魄,就是并發(fā)的多線程或多協(xié)程之間如何通信郁惜,而channel就是go中g(shù)oroutine通信的‘管道’。
channel在go中時(shí)如何使用的
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
var exit = make(chan string, 1)
func main() {
go dealSignal()
exited := make(chan struct{}, 1)
go channel1(exited)
count := 0
t := time.Tick(time.Second)
Loop:
for {
select {
case <-t:
count++
fmt.Printf("main run %d\n", count)
case <-exited:
fmt.Println("main exit begin")
break Loop
}
}
fmt.Println("main exit end")
}
func dealSignal() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
exit <- "shutdown"
}()
}
func channel1(exited chan<- struct{}) {
t := time.Tick(time.Second)
count := 0
for {
select {
case <-t:
count++
fmt.Printf("channel1 run %d\n", count)
case <-exit:
fmt.Println("channel1 exit")
close(exited)
return
}
}
}
這個(gè)例子首先并發(fā)出一個(gè)dealsign方法甲锡,用來接收關(guān)閉信號(hào)兆蕉,如果接收到關(guān)閉信號(hào)后往exit channel發(fā)送一條消息,然后并發(fā)運(yùn)行channel1缤沦,channel1中定了一個(gè)ticker虎韵,正常情況下channel1每秒打印第一個(gè)case語(yǔ)句,如果接收到exit的信號(hào)缸废,進(jìn)入第二個(gè)case包蓝,然后關(guān)閉傳入的exited channel缩多,那么main中的Loop,接收到exited關(guān)閉的信號(hào)后养晋,打印“main exit begin”衬吆, 然后退出循環(huán),進(jìn)程成功退出绳泉。這個(gè)例子演示了channel在goroutine中起到的傳遞消息的作用逊抡。這個(gè)例子是為了向大家展示channel在多個(gè)goroutine之間進(jìn)行通信。
Channel在底層是什么樣的
type hchan struct {
qcount uint // total data in the queue零酪;chan中的元素總數(shù)
dataqsiz uint // size of the circular queue冒嫡;底層循環(huán)數(shù)組的size
buf unsafe.Pointer // points to an array of dataqsiz elements,指向底層循環(huán)數(shù)組的指針四苇,只針對(duì)有緩沖的channel
elemsize uint16 //chan中元素的大小
closed uint32 //chan是否關(guān)閉
elemtype *_type // element type孝凌;元素類型
sendx uint // send index;已發(fā)送元素在循環(huán)數(shù)組中的索引
recvx uint // receive index月腋;已接收元素在循環(huán)數(shù)組中的索引
recvq waitq // list of recv waiters蟀架,等待接收消息的goroutine隊(duì)列
sendq waitq // list of send waiters,等待發(fā)送消息的goroutine隊(duì)列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
創(chuàng)建一個(gè)底層數(shù)組容量為5榆骚,元素類型為int片拍,那么channel的數(shù)據(jù)結(jié)構(gòu)如下圖所示:
創(chuàng)建channel的時(shí)候到底發(fā)生了什么
創(chuàng)建channel的時(shí)候,其實(shí)底層是調(diào)用makechan方法妓肢,我們來看下源碼:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
從函數(shù)原型來看捌省,創(chuàng)建的 chan 是一個(gè)指針。所以我們能在函數(shù)間直接傳遞 channel碉钠,而不用傳遞 channel 的指針纲缓。
具體來看下代碼:
可以看出makechan中其實(shí)主要的代碼就是一個(gè)switch,針對(duì)不同的情況:
1喊废、case mem == 0代表無緩沖型channel祝高,只分配hchan本身結(jié)構(gòu)體大小的內(nèi)存
2、case elem.ptrdata==0 代表元素類型不含指針操禀,只分配hchan本身結(jié)構(gòu)體大小+元素大小*個(gè)數(shù)的內(nèi)存褂策,是連續(xù)的內(nèi)存空間
3横腿、default元素類型包括指針颓屑,兩次分配內(nèi)存的操作
channel的接收與發(fā)送
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("G1 received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("G2 received data: ", val)
return
}
func main() {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
首先創(chuàng)建了一個(gè)無緩沖型的channel,然后啟動(dòng)兩個(gè)goroutine去消費(fèi)channel的數(shù)據(jù)耿焊,緊接著向channel中發(fā)送數(shù)據(jù)揪惦。我們一步一步來分析channel是如何接收和發(fā)送數(shù)據(jù)的,首先來看接收罗侯,golang中接收channel數(shù)據(jù)有兩種方式:
i <- ch
i, ok <- ch
// 位于 src/runtime/chan.go
// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址器腋。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false纫塌,即非阻塞型接收诊县,在沒有數(shù)據(jù)可接收的情況下,返回 (false, false)
// 否則措左,如果 c 處于關(guān)閉狀態(tài)依痊,將 ep 指向的地址清零,返回 (true, false)
// 否則怎披,用返回值填充 ep 指向的內(nèi)存地址胸嘁。返回 (true, true)
// 如果 ep 非空,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 內(nèi)容 …………
// 如果是一個(gè) nil 的 channel
if c == nil {
// 如果不阻塞凉逛,直接返回 (false, false)
if !block {
return
}
// 否則性宏,接收一個(gè) nil 的 channel,goroutine 掛起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不會(huì)執(zhí)行到這里
throw("unreachable")
}
// 在非阻塞模式下状飞,快速檢測(cè)到失敗毫胜,不用獲取鎖,快速返回
// 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
// 1. 非緩沖型诬辈,等待發(fā)送列隊(duì) sendq 里沒有 goroutine 在等待
// 2. 緩沖型指蚁,但 buf 里沒有元素
// 之后,又觀察到 closed == 0自晰,即 channel 未關(guān)閉凝化。
// 因?yàn)?channel 不可能被重復(fù)打開,所以前一個(gè)觀測(cè)的時(shí)候 channel 也是未關(guān)閉的酬荞,
// 因此在這種情況下可以直接宣布接收失敗搓劫,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
// 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
// 也就是說即使是關(guān)閉狀態(tài)混巧,但在緩沖型的 channel枪向,
// buf 里有元素的情況下還能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
// 解鎖
unlock(&c.lock)
if ep != nil {
// 從一個(gè)已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
// 那么接收的值將是一個(gè)該類型的零值
// typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
typedmemclr(c.elemtype, ep)
}
// 從一個(gè)已關(guān)閉的 channel 接收咧党,selected 會(huì)返回true
return true, false
}
// 等待發(fā)送隊(duì)列里有 goroutine 存在秘蛔,說明 buf 是滿的
// 這有可能是:
// 1. 非緩沖型的 channel
// 2. 緩沖型的 channel翰萨,但 buf 滿了
// 針對(duì) 1泰演,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
// 針對(duì) 2芙委,接收到循環(huán)數(shù)組頭部的元素坟募,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 緩沖型爽丹,buf 里有元素缺虐,可以正常接收
if c.qcount > 0 {
// 直接從循環(huán)數(shù)組里找到要接收的元素
qp := chanbuf(c, c.recvx)
// …………
// 代碼里蹂季,沒有忽略要接收的值麻汰,不是 "<- ch"绣的,而是 "val <- ch"叠赐,ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
typedmemclr(c.elemtype, qp)
// 接收游標(biāo)向前移動(dòng)
c.recvx++
// 接收游標(biāo)歸零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 數(shù)組里的元素個(gè)數(shù)減 1
c.qcount--
// 解鎖
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收欲账,解鎖。selected 返回 false芭概,因?yàn)闆]有接收到值
unlock(&c.lock)
return false, false
}
// 接下來就是要被阻塞的情況了
// 構(gòu)造一個(gè) sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收數(shù)據(jù)的地址保存下來
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 進(jìn)入channel 的等待接收隊(duì)列
c.recvq.enqueue(mysg)
// 將當(dāng)前 goroutine 掛起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被喚醒了赛不,接著從這里繼續(xù)執(zhí)行一些掃尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
Step1
如果channel是nil:如果是非阻塞模式,直接返回(false罢洲,false)俄删;如果是阻塞模式,調(diào)用goprak掛起goroutine奏路,會(huì)阻塞下去畴椰。
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
Step2
快速操作(不用獲取鎖,快速返回)鸽粉,三組條件全部滿足斜脂,快速返(false,false)
條件1:首先是在非阻塞模式下
條件2:如果是非緩沖型(datasiz=0)并且等待發(fā)送goroutine隊(duì)列為空(sendq.first=nil触机,就是沒人往channel寫數(shù)據(jù))帚戳,或者緩沖型channel(datasiz>0)并且buf中沒有數(shù)據(jù);
條件3:channel未關(guān)閉
//##################step2####################
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
Step3
首先加鎖儡首,如果channel已經(jīng)關(guān)閉片任,并且buf中沒有元素,返回對(duì)應(yīng)類型的0值蔬胯,但是received為false对供;兩種情況
情形1:非緩沖型,channel已關(guān)閉
情形2:緩沖型氛濒,channel已關(guān)閉产场,并且buf無元素
也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel舞竿,
buf 里有元素的情況下還能接收到元素
//##################step3####################
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
step4
如果等待發(fā)送隊(duì)列中有元素京景,證明channel已經(jīng)滿了,兩種情形
情形1:非緩沖型骗奖,無buf
情形2:緩沖型确徙,buf滿了
//##################step4####################
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
兩種情形都正常進(jìn)入recv方法,我們來看下源碼:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//##################step4-1####################
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
//##################step4-2####################
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
針對(duì) 1执桌,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)(從發(fā)送者的棧copy到接收者的棧)
針對(duì) 2鄙皇,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部.
然后喚醒等待發(fā)送隊(duì)列中的goroutine鼻吮,等待調(diào)度器調(diào)度育苟。
step5
沒有等待發(fā)送的隊(duì)列,并且buf中有元素椎木,直接把接收游標(biāo)處的數(shù)據(jù)copy到接收數(shù)據(jù)的地址违柏,然后改變hchan中元素?cái)?shù)據(jù)。
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
step6
如果是非阻塞香椎,那么直接返回漱竖;如果是阻塞的,構(gòu)造sudog畜伐,保存各種值馍惹;將sudog保存到channel的recvq中,調(diào)用goparkunlock將goroutine掛起
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
非阻塞接收玛界,解鎖万矾。selected 返回 false,因?yàn)闆]有接收到值
我們繼續(xù)之前的例子慎框。前面說到第 14 行良狈,創(chuàng)建了一個(gè)非緩沖型的 channel,接著笨枯,第 15薪丁、16 行分別創(chuàng)建了一個(gè) goroutine,各自執(zhí)行了一個(gè)接收操作馅精。通過前面的源碼分析严嗜,我們知道,這兩個(gè) goroutine (后面稱為 G1 和 G2 好了)都會(huì)被阻塞在接收操作洲敢。G1 和 G2 會(huì)掛在 channel 的 recq 隊(duì)列中漫玄,形成一個(gè)雙向循環(huán)鏈表。
在程序的 17 行之前压彭,chan 的整體數(shù)據(jù)結(jié)構(gòu)如下:
buf 指向一個(gè)長(zhǎng)度為 0 的數(shù)組称近,qcount 為 0,表示 channel 中沒有元素哮塞。重點(diǎn)關(guān)注 recvq 和 sendq刨秆,它們是 waitq 結(jié)構(gòu)體,而 waitq 實(shí)際上就是一個(gè)雙向鏈表忆畅,鏈表的元素是 sudog衡未,里面包含 g 字段,g 表示一個(gè) goroutine家凯,所以 sudog 可以看成一個(gè) goroutine缓醋。recvq 存儲(chǔ)那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則存儲(chǔ)那些嘗試寫入 channel绊诲,但被阻塞的 goroutine送粱。
此時(shí),我們可以看到掂之,recvq 里掛了兩個(gè) goroutine抗俄,也就是前面啟動(dòng)的 G1 和 G2脆丁。因?yàn)闆]有 goroutine 接收,而 channel 又是無緩沖類型动雹,所以 G1 和 G2 被阻塞槽卫。sendq 沒有被阻塞的 goroutine。
再?gòu)恼w上來看一下 chan 此時(shí)的狀態(tài):
當(dāng)一個(gè)channel關(guān)閉后胰蝠,我們依然可以從中讀出數(shù)據(jù)歼培,如果chan的buf中有元素,則讀出的是chan中buf的數(shù)據(jù)茸塞,如果buf為空躲庄,則輸出對(duì)應(yīng)元素類型的零值。那么我們來看下如下的一段程序:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
var exit1 = make(chan struct{}, 1)
func main() {
go dealSignal1()
count := 0
t := time.Tick(time.Second)
for {
select {
case <-t:
count++
fmt.Printf("main run %d\n", count)
case <-exit1:
fmt.Println("main exit begin")
}
}
fmt.Println("main exit over")
}
func dealSignal1() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
close(exit1)
}()
}
發(fā)送
接著上面的例子钾虐,G1 和 G2 現(xiàn)在都在 recvq 隊(duì)列里了噪窘。
17 行向 channel 發(fā)送了一個(gè)元素 3。
發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù)禾唁,直接上源碼效览,同樣大部分都注釋了,可以看懂主流程:
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞荡短,直接返回 false丐枉,表示未發(fā)送成功
if !block {
return false
}
// 當(dāng)前 goroutine 被掛起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相關(guān)……
// 對(duì)于不阻塞的 send,快速檢測(cè)失敗場(chǎng)景
//
// 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間掘托。這可能是:
// 1. channel 是非緩沖型的瘦锹,且等待接收隊(duì)列里沒有 goroutine
// 2. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 鎖住 channel闪盔,并發(fā)安全
lock(&c.lock)
// 如果 channel 關(guān)閉了
if c.closed != 0 {
// 解鎖
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收隊(duì)列里有 goroutine弯院,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 對(duì)于緩沖型的 channel,如果還有緩沖空間
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// ……
// 將數(shù)據(jù)從 ep 處拷貝到 qp
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送游標(biāo)值加 1
c.sendx++
// 如果發(fā)送游標(biāo)值等于容量值泪掀,游標(biāo)值歸 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)的元素?cái)?shù)量加一
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 如果不需要阻塞听绳,則直接返回錯(cuò)誤
if !block {
unlock(&c.lock)
return false
}
// channel 滿了,發(fā)送方會(huì)被阻塞异赫。接下來會(huì)構(gòu)造一個(gè) sudog
// 獲取當(dāng)前 goroutine 的指針
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 當(dāng)前 goroutine 進(jìn)入發(fā)送等待隊(duì)列
c.sendq.enqueue(mysg)
// 當(dāng)前 goroutine 被掛起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 從這里開始被喚醒了(channel 有機(jī)會(huì)可以發(fā)送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被喚醒后椅挣,channel 關(guān)閉了∷坑爹啊鼠证,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上綁定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
我們繼續(xù)往下走,G1靠抑、G2被掛起后量九,往channel中發(fā)送一個(gè)數(shù)據(jù)3,其實(shí)調(diào)用的是chansend方法,我們還是逐步的去講解
step1
如果channel=nil荠列,當(dāng)前goroutine會(huì)被掛起
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
step2
依然是一個(gè)不加鎖的快速操作类浪,三組條件
條件1:非阻塞
條件2:channel未關(guān)閉
條件3:channel是非緩沖型,并且等待接收隊(duì)列為空弯予;或者緩沖型戚宦,并且循環(huán)數(shù)組已經(jīng)滿了
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
step3
加鎖个曙,如果channel已經(jīng)關(guān)閉锈嫩,直接panic
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
step4
如果等待接收隊(duì)列不為空,說明什么垦搬?
情形1:非緩沖型呼寸,等待接收隊(duì)列不為空
情形2:緩沖型,等待接收隊(duì)列不為空(說明buf為空)
兩種情形猴贰,都是直接將待發(fā)送數(shù)據(jù)直接copy到接收處
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)//直接從ep copy到sg
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
兩種情形对雪,都直接從一個(gè)用一個(gè)goroutine操作另一個(gè)goroutine的棧,因此在sendDirect方法中會(huì)有一次寫屏障
step5
如果等待隊(duì)列為空米绕,并且緩沖區(qū)未滿瑟捣,肯定是緩沖型的channel
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
將元素放在sendx處,然后sendx加1栅干,channel總量加1
step6
如果以上情況都沒有命中迈套,說明什么?說明channel已經(jīng)滿了碱鳞,如果是非阻塞的直接返回桑李,否則需要調(diào)用gopack將這個(gè)goroutine掛起,等待被喚醒窿给。
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
我們對(duì)照程序分析下贵白,在前一個(gè)小節(jié)G1、G2被掛起來了崩泡,等待sender的解救禁荒;這時(shí)候往ch中發(fā)送了一個(gè)3,(step4)這時(shí)sender發(fā)現(xiàn)ch的等待接收隊(duì)列recvq中有receiver角撞,就會(huì)出隊(duì)一個(gè)sudog呛伴,然后將元素直接copy到sudog的elem處,然后調(diào)用goready將G1喚醒靴寂,繼續(xù)執(zhí)行G1原來的代碼磷蜀,打印出結(jié)果。如下圖:
當(dāng)調(diào)度器光顧 G1 時(shí)百炬,將 G1 變成 running 狀態(tài)褐隆,執(zhí)行 goroutineA 接下來的代碼。G 表示其他可能有的 goroutine剖踊。
這里其實(shí)涉及到一個(gè)協(xié)程寫另一個(gè)協(xié)程棧的操作庶弃。有兩個(gè) receiver 在 channel 的一邊虎視眈眈地等著衫贬,這時(shí) channel 另一邊來了一個(gè) sender 準(zhǔn)備向 channel 發(fā)送數(shù)據(jù),為了高效歇攻,用不著通過 channel 的 buf “中轉(zhuǎn)”一次固惯,直接從源地址把數(shù)據(jù) copy 到目的地址就可以了,效率高敖墒亍葬毫!
關(guān)閉
close一個(gè)channel會(huì)調(diào)用closechan方法,比較簡(jiǎn)單屡穗,我們也來看下
func closechan(c *hchan) {
// 關(guān)閉一個(gè) nil channel贴捡,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 上鎖
lock(&c.lock)
// 如果 channel 已經(jīng)關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// …………
// 修改關(guān)閉狀態(tài)
c.closed = 1
var glist *g
// 將 channel 所有等待接收隊(duì)列的里 sudog 釋放
for {
// 從接收隊(duì)列里出隊(duì)一個(gè) sudog
sg := c.recvq.dequeue()
// 出隊(duì)完畢,跳出循環(huán)
if sg == nil {
break
}
// 如果 elem 不為空村砂,說明此 receiver 未忽略接收數(shù)據(jù)
// 給它賦一個(gè)相應(yīng)類型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 取出 goroutine
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 相連烂斋,形成鏈表
gp.schedlink.set(glist)
glist = gp
}
// 將 channel 等待發(fā)送隊(duì)列里的 sudog 釋放
// 如果存在,這些 goroutine 將會(huì) panic
for {
// 從發(fā)送隊(duì)列里出隊(duì)一個(gè) sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 發(fā)送者會(huì) panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 形成鏈表
gp.schedlink.set(glist)
glist = gp
}
// 解鎖
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍歷鏈表
for glist != nil {
// 取最后一個(gè)
gp := glist
// 向前走一步础废,下一個(gè)喚醒的 g
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 喚醒相應(yīng) goroutine
goready(gp, 3)
}
}
step1
如果channel為nil汛骂,會(huì)直接panic
if c == nil {
panic(plainError("close of nil channel"))
}
step2
加鎖,如果channel已經(jīng)關(guān)閉评腺,再次關(guān)閉會(huì)panic
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
step3
首選將hchan對(duì)應(yīng)close標(biāo)志置為1帘瞭,然后聲明一個(gè)鏈表;將等待接收隊(duì)列中的所有sudog加入到鏈表歇僧,并將其elem賦予一個(gè)相應(yīng)類型的0值图张;
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
step4
向所有等待發(fā)送隊(duì)列的sudog加入鏈表
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
step5
喚醒sudog所有g(shù)oroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
close 邏輯比較簡(jiǎn)單,對(duì)于一個(gè) channel诈悍,recvq 和 sendq 中分別保存了阻塞的發(fā)送者和接收者祸轮。關(guān)閉 channel 后,對(duì)于等待接收者而言侥钳,會(huì)收到一個(gè)相應(yīng)類型的零值适袜。對(duì)于等待發(fā)送者,會(huì)直接 panic舷夺。所以苦酱,在不了解 channel 還有沒有接收者的情況下,不能貿(mào)然關(guān)閉 channel给猾。
close 函數(shù)先上一把大鎖疫萤,接著把所有掛在這個(gè) channel 上的 sender 和 receiver 全都連成一個(gè) sudog 鏈表,再解鎖敢伸。最后扯饶,再將所有的 sudog 全都喚醒。
喚醒之后,該干嘛干嘛尾序。sender 會(huì)繼續(xù)執(zhí)行 chansend 函數(shù)里 goparkunlock 函數(shù)之后的代碼钓丰,很不幸,檢測(cè)到 channel 已經(jīng)關(guān)閉了每币,panic携丁。receiver 則比較幸運(yùn),進(jìn)行一些掃尾工作后兰怠,返回梦鉴。這里,selected 返回 true痕慢,而返回值 received 則要根據(jù) channel 是否關(guān)閉尚揣,返回不同的值涌矢。如果 channel 關(guān)閉掖举,received 為 false,否則為 true娜庇。
總結(jié)
總結(jié)一下塔次,發(fā)生 panic 的情況有三種:
1.向一個(gè)關(guān)閉的 channel 進(jìn)行寫操作;
- 關(guān)閉一個(gè) nil 的 channel名秀;
- 重復(fù)關(guān)閉一個(gè) channel励负。
讀、寫一個(gè) nil channel 都會(huì)被阻塞匕得。
channel發(fā)送和接收元素的本質(zhì)還是值得拷貝
channel是并發(fā)安全的(加鎖)
參考:博客園-深度解密Go語(yǔ)言只channel
好未來Golang源碼系列三:Channel實(shí)現(xiàn)原理分析