Go Channel 底層實現(xiàn)
目錄
- channel 是什么
- channel 的創(chuàng)建
- channel 的發(fā)送
- channel 的接收
- channel 的關(guān)閉
channel 是什么
“Do not communicate by sharing memory; instead, share memory by communicating.”
不要通過共享內(nèi)存通信,通過通信來共享內(nèi)存刊苍。很經(jīng)典的go的并發(fā)哲學(xué)叹卷,依賴csp模型捏题, 通過channel實現(xiàn)爆存。
go的并發(fā)主要靠goroutine實現(xiàn)的面粮,而channel則像一個管道一樣服爷, 實現(xiàn)goroutine之間的通信。雖然也可以通過鎖他爸,原子操作能夠?qū)崿F(xiàn)數(shù)據(jù)的同步,但使用channel更優(yōu)雅果善。
總結(jié)一下诊笤,channel 是并發(fā)安全的管道,主要用于goroutine之間的消息傳遞和事件通知巾陕。
再來看一下在go中channel的數(shù)據(jù)結(jié)構(gòu)
type hchan struct {
// 元素數(shù)量
qcount uint
// 底層循環(huán)數(shù)組的長度
dataqsiz uint
// 指向底層循環(huán)數(shù)組的指針
buf unsafe.Pointer
// 元素大小
elemsize uint16
// 是否被關(guān)閉
closed uint32
// 元素類型
elemtype *_type
// 已發(fā)送元素在循環(huán)數(shù)組中的索引
sendx uint
// 已接收元素在循環(huán)數(shù)組中的索引
recvx uint
// 等待接收的 goroutine 隊列
recvq waitq
// 等待發(fā)送的 goroutine 隊列
sendq waitq
// 鎖
lock mutex
}
結(jié)合下圖更直觀的看一下
channel 的創(chuàng)建
根據(jù)channel的size分為無緩沖和有緩沖channel
// 無緩沖channel
ch1 := make(chan int)
// 有緩沖channel
ch2 := make(chan int, 2)
根據(jù)channel的發(fā)送接收能力來將它分為三種類型, 不指明方向的channel技能發(fā)送也能接收讨跟,<-chan 只能發(fā)送, chan<- 只能接收鄙煤。
// 可以發(fā)送和接收T類型的數(shù)據(jù)
chan T
// 只能發(fā)送T類型的數(shù)據(jù)
chan <- T
// 只能接收T類型的數(shù)據(jù)
<-chan T
使用make函數(shù)就可以創(chuàng)建一個能夠收發(fā)的channel, 只能讀或者寫的channel 一般作為函數(shù)參數(shù)做限制
var wg sync.WaitGroup
ch1 := make(chan int)
ch2 := make(chan int, 2)
wg.Add(2)
// 只能從recvCh接收數(shù)據(jù)
go func(recvCh <-chan int) {
data := <-recvCh
fmt.Println(data)
wg.Done()
}(ch1)
// 只能向sendCh發(fā)送數(shù)據(jù)
go func(sendCh chan<- int) {
sendCh <- 1
wg.Done()
}(ch2)
wg.Wait()
make 函數(shù)最終調(diào)用了 /go/src/runtime/chan.go 下面的makechan方法晾匠,主要做了chan的初始化內(nèi)存分配
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
channel的發(fā)送
ch := make(chan int, 3)
ch <- 1
channel的發(fā)送的代碼很簡單,不過這只是編譯器表面的語法糖而已梯刚, 實際上調(diào)用了runtime.chansend函數(shù)
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞凉馆,直接返回 false,表示未發(fā)送成功
if !block {
return false
}
// 當(dāng)前 goroutine 被掛起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相關(guān)……
// 對于不阻塞的 send亡资,快速檢測失敗場景
//
// 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間澜共。這可能是:
// 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
// 2. channel 是緩沖型的锥腻,但循環(huán)數(shù)組已經(jīng)裝滿了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 鎖住 channel嗦董,并發(fā)安全
lock(&c.lock)
// 如果 channel 關(guān)閉了
if c.closed != 0 {
// 解鎖
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 對于緩沖型的 channel瘦黑,如果還有緩沖空間
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// ……
// 將數(shù)據(jù)從 ep 處拷貝到 qp
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送游標(biāo)值加 1
c.sendx++
// 如果發(fā)送游標(biāo)值等于容量值京革,游標(biāo)值歸 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)的元素數(shù)量加一
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 如果不需要阻塞,則直接返回錯誤
if !block {
unlock(&c.lock)
return false
}
// channel 滿了幸斥,發(fā)送方會被阻塞匹摇。接下來會構(gòu)造一個 sudog
// 獲取當(dāng)前 goroutine 的指針
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 當(dāng)前 goroutine 進入發(fā)送等待隊列
c.sendq.enqueue(mysg)
// 當(dāng)前 goroutine 被掛起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 從這里開始被喚醒了(channel 有機會可以發(fā)送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被喚醒后,channel 關(guān)閉了甲葬±炔坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上綁定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
上面的代碼注釋很清楚說明了channel的發(fā)送流程演顾,下面小結(jié)一下發(fā)送的過程:
- 向一個nil channel發(fā)送數(shù)據(jù)供搀,會調(diào)用gopark函數(shù)將當(dāng)前goroutine掛起
- 向一個已經(jīng)關(guān)閉的channel發(fā)送數(shù)據(jù),直接會panic
- 如果channel的recvq當(dāng)前隊列中有被阻塞的接收者钠至,則直接將數(shù)據(jù)發(fā)送給當(dāng)前goroutine, 并將它設(shè)置成下一個運行的goroutine
- 當(dāng)channel的緩沖區(qū)還有空閑空間葛虐,則將數(shù)據(jù)發(fā)送到sendx指向緩沖區(qū)的位置
- 當(dāng)沒有緩沖區(qū)或者緩沖區(qū)滿了,則會創(chuàng)建一個sudog的結(jié)構(gòu)體將其放到channel的sendq隊列當(dāng)中陷入休眠等待被喚醒
channel 的接收
ch := make(chan int)
// 不帶ok的方式
data := <- ch
// 帶ok的方式
data, ok := <-ch
channel的接收分為兩種方式棉钧,也是編譯器語法糖的結(jié)果屿脐,最終會調(diào)用runtime.chanrecv1 和 runtime.chanrecv2,而這兩個函數(shù)最終調(diào)用了runtime.chanrecv
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
接下來看一下chanrecv的代碼
// 位于 src/runtime/chan.go
// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil,說明忽略了接收值的诵。
// 如果 block == false万栅,即非阻塞型接收,在沒有數(shù)據(jù)可接收的情況下西疤,返回 (false, false)
// 否則烦粒,如果 c 處于關(guān)閉狀態(tài),將 ep 指向的地址清零代赁,返回 (true, false)
// 否則扰她,用返回值填充 ep 指向的內(nèi)存地址。返回 (true, true)
// 如果 ep 非空芭碍,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 內(nèi)容 …………
// 如果是一個 nil 的 channel
if c == nil {
// 如果不阻塞徒役,直接返回 (false, false)
if !block {
return
}
// 否則,接收一個 nil 的 channel窖壕,goroutine 掛起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不會執(zhí)行到這里
throw("unreachable")
}
// 在非阻塞模式下忧勿,快速檢測到失敗,不用獲取鎖瞻讽,快速返回
// 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
// 1. 非緩沖型鸳吸,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
// 2. 緩沖型,但 buf 里沒有元素
// 之后卸夕,又觀察到 closed == 0层释,即 channel 未關(guān)閉。
// 因為 channel 不可能被重復(fù)打開快集,所以前一個觀測的時候 channel 也是未關(guān)閉的,
// 因此在這種情況下可以直接宣布接收失敗廉白,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// channel 已關(guān)閉个初,并且循環(huán)數(shù)組 buf 里沒有元素
// 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
// 也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel猴蹂,
// buf 里有元素的情況下還能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
// 解鎖
unlock(&c.lock)
if ep != nil {
// 從一個已關(guān)閉的 channel 執(zhí)行接收操作院溺,且未忽略返回值
// 那么接收的值將是一個該類型的零值
// typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
typedmemclr(c.elemtype, ep)
}
// 從一個已關(guān)閉的 channel 接收,selected 會返回true
return true, false
}
// 等待發(fā)送隊列里有 goroutine 存在磅轻,說明 buf 是滿的
// 這有可能是:
// 1. 非緩沖型的 channel
// 2. 緩沖型的 channel珍逸,但 buf 滿了
// 針對 1,直接進行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
// 針對 2聋溜,接收到循環(huán)數(shù)組頭部的元素谆膳,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 緩沖型,buf 里有元素撮躁,可以正常接收
if c.qcount > 0 {
// 直接從循環(huán)數(shù)組里找到要接收的元素
qp := chanbuf(c, c.recvx)
// …………
// 代碼里漱病,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch"杨帽,ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
typedmemclr(c.elemtype, qp)
// 接收游標(biāo)向前移動
c.recvx++
// 接收游標(biāo)歸零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 數(shù)組里的元素個數(shù)減 1
c.qcount--
// 解鎖
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收漓穿,解鎖。selected 返回 false注盈,因為沒有接收到值
unlock(&c.lock)
return false, false
}
// 接下來就是要被阻塞的情況了
// 構(gòu)造一個 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收數(shù)據(jù)的地址保存下來
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 進入channel 的等待接收隊列
c.recvq.enqueue(mysg)
// 將當(dāng)前 goroutine 掛起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被喚醒了晃危,接著從這里繼續(xù)執(zhí)行一些掃尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
上面的代碼注釋很清楚說明了channel的接收數(shù)據(jù)的流程,下面小結(jié)一下接收數(shù)據(jù)的過程:
- 從一個nil channel接收數(shù)據(jù)老客,會調(diào)用gopark函數(shù)將當(dāng)前goroutine掛起山害,讓出處理器的使用權(quán)
- 從一個已經(jīng)關(guān)閉并且緩沖區(qū)中沒有元素的channel中接收數(shù)據(jù),則會接收到該類型的默認元素沿量,并且第二個返回值返回false
- 如果channel沒有緩沖區(qū)且sendq的隊列有阻塞的goroutine浪慌,則把sendq隊列頭的sudog中保存的元素值copy到目標(biāo)地址中
- 如果channel有緩沖區(qū)且緩沖區(qū)里面有元素,則把recvx指向緩沖區(qū)的元素值copy到目標(biāo)地址當(dāng)中朴则,sendq隊列頭的sudog的元素值copy到recvx指向緩沖區(qū)位置的地址當(dāng)中, 這塊有點不太好理解权纤,看下圖
- 當(dāng)上面的條件都不符合時,則會創(chuàng)建一個sudog的結(jié)構(gòu)體將其放到channel的recvq隊列當(dāng)中陷入休眠等待被喚醒
channel的關(guān)閉
ch := make(chan int)
close(ch)
關(guān)閉channel相對簡單乌妒,編譯器會轉(zhuǎn)換成runtime.closechan函數(shù)
func closechan(c *hchan) {
// 關(guān)閉一個 nil channel汹想,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 上鎖
lock(&c.lock)
// 如果 channel 已經(jīng)關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
// panic
panic(plainError("close of closed channel"))
}
// …………
// 修改關(guān)閉狀態(tài)
c.closed = 1
var glist *g
// 將 channel 所有等待接收隊列的里 sudog 釋放
for {
// 從接收隊列里出隊一個 sudog
sg := c.recvq.dequeue()
// 出隊完畢,跳出循環(huán)
if sg == nil {
break
}
// 如果 elem 不為空撤蚊,說明此 receiver 未忽略接收數(shù)據(jù)
// 給它賦一個相應(yīng)類型的零值
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 取出 goroutine
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 相連古掏,形成鏈表
gp.schedlink.set(glist)
glist = gp
}
// 將 channel 等待發(fā)送隊列里的 sudog 釋放
// 如果存在,這些 goroutine 將會 panic
for {
// 從發(fā)送隊列里出隊一個 sudog
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 發(fā)送者會 panic
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
// 形成鏈表
gp.schedlink.set(glist)
glist = gp
}
// 解鎖
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍歷鏈表
for glist != nil {
// 取最后一個
gp := glist
// 向前走一步侦啸,下一個喚醒的 g
glist = glist.schedlink.ptr()
gp.schedlink = 0
// 喚醒相應(yīng) goroutine
goready(gp, 3)
}
}
小結(jié)一下channel的關(guān)閉流程:
- 關(guān)閉一個nil channel 會panic
- 關(guān)閉一個已經(jīng)關(guān)閉的channel會panic
- 把sendq和recvq 隊列中的元素加入到 glist當(dāng)中槽唾,清除所有sudog上所有未被處理的元素
- 最后把所有的阻塞的sudog全部喚醒,做一些掃尾工作
參考資料
【go語言的設(shè)計與實現(xiàn)】https://draveness.me/golang/
【GitBook 碼農(nóng)桃花源開源書】https://qcrao91.gitbook.io/go/