設(shè)計(jì)原理
目前的 Channel 收發(fā)操作均遵循了先進(jìn)先出的設(shè)計(jì),具體規(guī)則如下:
- 先從 Channel 讀取數(shù)據(jù)的 Goroutine 會先接收到數(shù)據(jù)亚隅;
- 先向 Channel 發(fā)送數(shù)據(jù)的 Goroutine 會得到先發(fā)送數(shù)據(jù)的權(quán)利;
Go 語言社區(qū)也在 2014 年提出了無鎖 Channel 的實(shí)現(xiàn)方案隘擎,該方案將 Channel 分成了以下三種類型:
- 同步 Channel — 不需要緩沖區(qū),發(fā)送方會直接將數(shù)據(jù)交給(Handoff)接收方震桶;
- 異步 Channel — 基于環(huán)形緩存的傳統(tǒng)生產(chǎn)者消費(fèi)者模型凫岖;
- chan struct{} 類型的異步 Channel — struct{} 類型不占用內(nèi)存空間歼指,不需要實(shí)現(xiàn)緩沖區(qū)和直接發(fā)送(Handoff)的語義爹土;
channel 的構(gòu)造語句 make(chan int),將會被 golang 編譯器翻譯為 runtime.makechan 函數(shù)
func makechan(t *chantype, size int) *hchan
type hchan struct {
qcount uint // buffer 中已放入的元素個(gè)數(shù)
dataqsiz uint // 用戶構(gòu)造 channel 時(shí)指定的 buf 大小
buf unsafe.Pointer // buffer
elemsize uint16 // buffer 中每個(gè)元素的大小
closed uint32 // channel 是否關(guān)閉,== 0 代表未 closed
elemtype *_type // channel 元素的類型信息
sendx uint // buffer 中已發(fā)送的索引位置 send index
recvx uint // buffer 中已接收的索引位置 receive index
recvq waitq // 等待接收的 goroutine list of recv waiters
sendq waitq // 等待發(fā)送的 goroutine list of send waiters
lock mutex
}
type waitq struct {
first *sudog // 鏈表
last *sudog
}
Channel 的 ring buffer 實(shí)現(xiàn)
channel 中使用了 ring buffer(環(huán)形緩沖區(qū)) 來緩存寫入的數(shù)據(jù)。ring buffer 有很多好處惯雳,而且非常適合用來實(shí)現(xiàn) FIFO 式的固定長度隊(duì)列。hchan 中有兩個(gè)與 buffer 相關(guān)的變量: recvx 和 sendx损拢。
- sendx 表示 buffer 中可寫的 index邮破。
- recvx 表示 buffer 中可讀的 index诈豌。
- 從 recvx 到 sendx 之間的元素,表示已正常存放入 buffer 中的數(shù)據(jù)抒和。
makechan
代碼根據(jù) Channel 中收發(fā)元素的類型和緩沖區(qū)的大小初始化 runtime.hchan 和緩沖區(qū)
- 如果當(dāng)前 Channel 中不存在緩沖區(qū)矫渔,那么就只會為 runtime.hchan 分配一段內(nèi)存空間;
- 如果當(dāng)前 Channel 中存儲的類型不是指針類型摧莽,會為當(dāng)前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內(nèi)存空間庙洼;
- 在默認(rèn)情況下會單獨(dú)為 runtime.hchan 和緩沖區(qū)分配內(nèi)存;
在函數(shù)的最后會統(tǒng)一更新 runtime.hchan 的 elemsize镊辕、elemtype 和 dataqsiz 幾個(gè)字段油够。
發(fā)送數(shù)據(jù)
channel 的發(fā)送過程 (如 c <- 1), 對應(yīng)于 runtime.chansend 函數(shù)的實(shí)現(xiàn)。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
在嘗試向 channel 中發(fā)送數(shù)據(jù)時(shí)征懈,如果 recvq 隊(duì)列不為空石咬,則首先會從 recvq 中頭部取出一個(gè)等待接收數(shù)據(jù)的 goroutine 出來。并將數(shù)據(jù)直接發(fā)送給該 goroutine受裹。代碼如下
lock(&c.lock)
直接發(fā)送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
recvq 中是正在等待接收數(shù)據(jù)的 goroutine碌补。當(dāng)某個(gè) goroutine 使用 recv 操作 (例如虏束,x := <- c)棉饶,如果此時(shí) channel 的緩存中沒有數(shù)據(jù),且沒有其他 goroutine 正在等待發(fā)送數(shù)據(jù) (即 sendq 為空)镇匀,會將該 goroutine 以及要接收的數(shù)據(jù)地址打包成 sudog 對象照藻,并放入到 recvq 中。
如果此時(shí) recvq 不為空汗侵,則調(diào)用 send 函數(shù)將數(shù)據(jù)拷貝到對應(yīng)的 goroutine 的堆棧上幸缕。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
send 函數(shù)的實(shí)現(xiàn)主要包含兩點(diǎn):
- memmove(dst, src, t.size) 進(jìn)行數(shù)據(jù)的轉(zhuǎn)移,本質(zhì)上就是一個(gè)內(nèi)存拷貝,將發(fā)送的數(shù)據(jù)直接拷貝到 x = <-c 表達(dá)式中變量 x 所在的內(nèi)存地址上晰韵;发乔。
- goready(gp, skip+1) 將等待接收數(shù)據(jù)的 Goroutine 標(biāo)記成可運(yùn)行狀態(tài) Grunnable 并把該 Goroutine 放到發(fā)送方所在的處理器的 runnext 上等待執(zhí)行,該處理器在下一次調(diào)度時(shí)會立刻喚醒數(shù)據(jù)的接收方雪猪;
緩沖區(qū)
如果創(chuàng)建的 Channel 包含緩沖區(qū)并且 Channel 中的數(shù)據(jù)沒有裝滿栏尚,會執(zhí)行下面這段代碼
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
- 首先會使用 runtime.chanbuf 計(jì)算出下一個(gè)可以存儲數(shù)據(jù)的位置,相當(dāng)于 c.buf[c.sendx]
- 然后通過 runtime.typedmemmove 將發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)中并增加 sendx 索引和 qcount 計(jì)數(shù)器
阻塞發(fā)送
如果用戶使用的是無緩沖 channel 或者此時(shí) buffer 已滿只恨,則 c.qcount < c.dataqsiz 條件不會滿足,以上流程也并不會執(zhí)行到译仗。
- 調(diào)用 runtime.getg 獲取發(fā)送數(shù)據(jù)使用的 Goroutine抬虽;
- 執(zhí)行 runtime.acquireSudog 獲取 runtime.sudog 結(jié)構(gòu)并設(shè)置這一次阻塞發(fā)送的相關(guān)信息,例如發(fā)送的 Channel纵菌、是否在 select 中和待發(fā)送數(shù)據(jù)的內(nèi)存地址等阐污;
- 將剛剛創(chuàng)建并初始化的 runtime.sudog 加入發(fā)送等待隊(duì)列,并設(shè)置到當(dāng)前 Goroutine 的 waiting 上咱圆,表示 Goroutine 正在等待該 sudog 準(zhǔn)備就緒笛辟;
- 調(diào)用 runtime.goparkunlock 將當(dāng)前的 Goroutine 陷入沉睡等待喚醒;
- 被調(diào)度器喚醒后會執(zhí)行一些收尾工作闷堡,將一些屬性置零并且釋放 runtime.sudog 結(jié)構(gòu)體隘膘;
我們在這里可以簡單梳理和總結(jié)一下使用 ch <- i 表達(dá)式向 Channel 發(fā)送數(shù)據(jù)時(shí)遇到的幾種情況:
- 如果當(dāng)前 Channel 的 recvq 上存在已經(jīng)被阻塞的 Goroutine,那么會直接將數(shù)據(jù)發(fā)送給當(dāng)前 Goroutine 并將其設(shè)置成下一個(gè)運(yùn)行的 Goroutine杠览;
- 如果 Channel 存在緩沖區(qū)并且其中還有空閑的容量弯菊,我們會直接將數(shù)據(jù)存儲到緩沖區(qū) sendx 所在的位置上;
- 如果不滿足上面的兩種情況踱阿,會創(chuàng)建一個(gè) runtime.sudog 結(jié)構(gòu)并將其加入 Channel 的 sendq 隊(duì)列中管钳,當(dāng)前 Goroutine 也會陷入阻塞等待其他的協(xié)程從 Channel 接收數(shù)據(jù);
接收數(shù)據(jù)
Channel 的接收過程使用兩種不同的方式去接收
i <- ch
i, ok <- ch
這兩種不同的方法經(jīng)過編譯器的處理都會變成 ORECV 類型的節(jié)點(diǎn)软舌,后者會在類型檢查階段被轉(zhuǎn)換成 OAS2RECV 類型才漆。數(shù)據(jù)的接收操作遵循以下的路線圖:func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
如果 Channel 為空,那么會直接調(diào)用 runtime.gopark 掛起當(dāng)前 Goroutine佛点;
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
如果 Channel 已經(jīng)關(guān)閉并且緩沖區(qū)沒有任何數(shù)據(jù)醇滥,runtime.chanrecv 會直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int){
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
ep 接收數(shù)據(jù)的變量對應(yīng)的地址。例如超营,在 x := <- c 中鸳玩,ep表示變量 x 的地址。
而 sg 代表從 sendq 中取出的第一個(gè) sudog
- typedmemmove(c.elemtype, ep, qp) 表示 buffer 中的當(dāng)前可讀元素拷貝到接收變量的地址處演闭。
- typedmemmove(c.elemtype, qp, sg.elem) 表示將 sendq 中 goroutine 等待發(fā)送的數(shù)據(jù)拷貝到 buffer 中不跟。因?yàn)榇撕筮M(jìn)行了 recv++, 因此相當(dāng)于把 sendq 中的數(shù)據(jù)放到了隊(duì)尾。
c.sendx = c.recvx, 這句話實(shí)際的作用相當(dāng)于 c.sendx = (c.sendx+1) % c.dataqsiz米碰,因?yàn)榇藭r(shí) buffer 依然是滿的窝革,所以 sendx == recvx 是成立的。
如果 Channel 不存在緩沖區(qū)吕座;
- 調(diào)用 runtime.recvDirect 將 Channel 發(fā)送隊(duì)列中 Goroutine 存儲的 elem 數(shù)據(jù)拷貝到目標(biāo)內(nèi)存地址中虐译;
如果 Channel 存在緩沖區(qū); - 將隊(duì)列中的數(shù)據(jù)拷貝到接收方的內(nèi)存地址吴趴;
- 將發(fā)送隊(duì)列頭的數(shù)據(jù)拷貝到緩沖區(qū)中漆诽,釋放一個(gè)阻塞的發(fā)送方;
上圖展示了 Channel 在緩沖區(qū)已經(jīng)沒有空間并且發(fā)送隊(duì)列中存在等待的 Goroutine 時(shí),運(yùn)行 <-ch 的執(zhí)行過程蚪腐。發(fā)送隊(duì)列頭的 runtime.sudog 中的元素會替換接收索引 recvx 所在位置的元素箭昵,原有的元素會被拷貝到接收數(shù)據(jù)的變量對應(yīng)的內(nèi)存空間上。
緩存區(qū)
當(dāng) Channel 的緩沖區(qū)中已經(jīng)包含數(shù)據(jù)時(shí)回季,從 Channel 中接收數(shù)據(jù)會直接從緩沖區(qū)中 recvx 的索引位置中取出數(shù)據(jù)進(jìn)行處理:
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
如果接收數(shù)據(jù)的內(nèi)存地址不為空家制,那么會使用 runtime.typedmemmove 將緩沖區(qū)中的數(shù)據(jù)拷貝到內(nèi)存中、清除隊(duì)列中的數(shù)據(jù)并完成收尾工作泡一。
阻塞接收
當(dāng) Channel 的發(fā)送隊(duì)列中不存在等待的 Goroutine 并且緩沖區(qū)中也不存在任何數(shù)據(jù)時(shí)颤殴,從管道中接收數(shù)據(jù)的操作會變成阻塞的,然而不是所有的接收操作都是阻塞的鼻忠,與 select 語句結(jié)合使用時(shí)就可能會使用到非阻塞的接收操作:
在正常的接收場景中涵但,我們會使用 runtime.sudog 將當(dāng)前 Goroutine 包裝成一個(gè)處于等待狀態(tài)的 Goroutine 并將其加入到接收隊(duì)列中。
完成入隊(duì)之后帖蔓,上述代碼還會調(diào)用 runtime.goparkunlock 立刻觸發(fā) Goroutine 的調(diào)度矮瘟,讓出處理器的使用權(quán)并等待調(diào)度器的調(diào)度。
- 如果 Channel 為空塑娇,那么會直接調(diào)用 runtime.gopark 掛起當(dāng)前 Goroutine澈侠;
- 如果 Channel 已經(jīng)關(guān)閉并且緩沖區(qū)沒有任何數(shù)據(jù),runtime.chanrecv 會直接返回埋酬;
- 如果 Channel 的 sendq 隊(duì)列中存在掛起的 Goroutine哨啃,會將 recvx 索引所在的數(shù)據(jù)拷貝到接收變量所在的內(nèi)存空間上并將 sendq 隊(duì)列中 Goroutine 的數(shù)據(jù)拷貝到緩沖區(qū);
- 如果 Channel 的緩沖區(qū)中包含數(shù)據(jù)写妥,那么直接讀取 recvx 索引對應(yīng)的數(shù)據(jù)拳球;
- 在默認(rèn)情況下會掛起當(dāng)前的 Goroutine,將 runtime.sudog 結(jié)構(gòu)加入 recvq 隊(duì)列并陷入休眠等待調(diào)度器的喚醒耳标;
從 Channel 接收數(shù)據(jù)時(shí)醇坝,會觸發(fā) Goroutine 調(diào)度的兩個(gè)時(shí)機(jī):
- 當(dāng) Channel 為空時(shí)邑跪;
- 當(dāng)緩沖區(qū)中不存在數(shù)據(jù)并且也不存在數(shù)據(jù)的發(fā)送者時(shí)次坡;
關(guān)閉管道
編譯器會將用于關(guān)閉管道的 close 關(guān)鍵字轉(zhuǎn)換成 OCLOSE 節(jié)點(diǎn)以及 runtime.closechan 函數(shù)。
當(dāng) Channel 是一個(gè)空指針或者已經(jīng)被關(guān)閉時(shí)画畅,Go 語言運(yùn)行時(shí)都會直接崩潰并拋出異常:
func closechan(c *hchan)
當(dāng) Channel 是一個(gè)空指針或者已經(jīng)被關(guān)閉時(shí)砸琅,Go 語言運(yùn)行時(shí)都會直接崩潰并拋出異常:
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
將 recvq 和 sendq 兩個(gè)隊(duì)列中的數(shù)據(jù)加入到 Goroutine 列表 gList 中,與此同時(shí)該函數(shù)會清除所有 runtime.sudog 上未被處理的元素
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
該函數(shù)在最后會為所有被阻塞的 Goroutine 調(diào)用 runtime.goready 觸發(fā)調(diào)度轴踱。