一悯衬、什么是CSP
Do not communicate by sharing memory; instead, share memory by communicating.
不要通過共享內(nèi)存來通信,而要通過通信來實現(xiàn)內(nèi)存共享。
這就是 Go 的并發(fā)哲學(xué)浅妆,它依賴 CSP 模型皿渗,基于 channel 實現(xiàn)樊拓。
CSP 經(jīng)常被認(rèn)為是 Go 在并發(fā)編程上成功的關(guān)鍵因素洁闰。CSP 全稱是 “Communicating Sequential Processes”,這也是 Tony Hoare 在 1978 年發(fā)表在 ACM 的一篇論文迟赃。論文里指出一門編程語言應(yīng)該重視 input 和 output 的原語陪拘,尤其是并發(fā)編程的代碼。
Go 一開始就把 CSP 的思想融入到語言的核心里纤壁,所以并發(fā)編程成為 Go 的一個獨特的優(yōu)勢左刽,而且很容易理解。
Go 的并發(fā)原則非常優(yōu)秀酌媒,目標(biāo)就是簡單:盡量使用 channel欠痴;把 goroutine 當(dāng)作免費的資源,隨便用秒咨。
二喇辽、channel底層數(shù)據(jù)結(jié)構(gòu)
底層數(shù)據(jù)結(jié)構(gòu)源碼:
type hchan struct {
// chan 里元素數(shù)量
qcount uint
// chan 底層循環(huán)數(shù)組的長度
dataqsiz uint
// 指向底層循環(huán)數(shù)組的指針
// 只針對有緩沖的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被關(guān)閉的標(biāo)志
closed uint32
// chan 中元素類型
elemtype *_type // element type
//有緩沖channel內(nèi)的緩沖數(shù)組會被作為一個“環(huán)型”來使用。
//當(dāng)下標(biāo)超過數(shù)組容量后會回到第一個位置雨席,所以需要有兩個字段記錄當(dāng)前讀和寫的下標(biāo)位置
sendx uint // 下一次發(fā)送數(shù)據(jù)的下標(biāo)位置
recvx uint // 下一次讀取數(shù)據(jù)的下標(biāo)位置
//當(dāng)循環(huán)數(shù)組中沒有數(shù)據(jù)時菩咨,收到了接收請求,那么接收數(shù)據(jù)的變量地址將會寫入讀等待隊列
//當(dāng)循環(huán)數(shù)組中數(shù)據(jù)已滿時,收到了發(fā)送請求抽米,那么發(fā)送數(shù)據(jù)的變量地址將寫入寫等待隊列
recvq waitq // 讀等待隊列
sendq waitq // 寫等待隊列
// 保護(hù) hchan 中所有字段
lock mutex
}
waitq
是 sudog
的一個雙向鏈表特占,而 sudog
實際上是對 goroutine
的一個封裝:
type waitq struct {
first *sudog
last *sudog
}
例如,創(chuàng)建一個容量為 6 的云茸,元素為 int 型的 channel 數(shù)據(jù)結(jié)構(gòu)如下 :
總結(jié)hchan結(jié)構(gòu)體的主要組成部分有四個:
- 用來保存goroutine之間傳遞數(shù)據(jù)的循環(huán)鏈表是目。=====> buf。
- 用來記錄此循環(huán)鏈表當(dāng)前發(fā)送或接收數(shù)據(jù)的下標(biāo)值标捺。=====> sendx和recvx懊纳。
- 用于保存向該chan發(fā)送和從改chan接收數(shù)據(jù)的goroutine的隊列。=====> sendq 和 recvq
- 保證channel寫入和讀取數(shù)據(jù)時線程安全的鎖亡容。 =====> lock
創(chuàng)建
我們知道嗤疯,通道有兩個方向,發(fā)送和接收萍倡。理論上來說身弊,我們可以創(chuàng)建一個只發(fā)送或只接收的通道,但是這種通道創(chuàng)建出來后列敲,怎么使用呢?一個只能發(fā)的通道帖汞,怎么接收呢戴而?同樣,一個只能收的通道翩蘸,如何向其發(fā)送數(shù)據(jù)呢所意?
創(chuàng)建 chan 的函數(shù)是 makechan:
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 省略了檢查 channel size,align 的代碼
// ……
var c *hchan
// 如果元素類型不含指針 或者 size 大小為 0(無緩沖類型)
// 只進(jìn)行一次內(nèi)存分配
if elem.kind&kindNoPointers != 0 || size == 0 {
// 如果 hchan 結(jié)構(gòu)體中不含指針催首,GC 就不會掃描 chan 中的元素
// 只分配 "hchan 結(jié)構(gòu)體大小 + 元素大小*個數(shù)" 的內(nèi)存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 如果是緩沖型 channel 且元素大小不等于 0(大小等于 0的元素類型:struct{})
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
// 1. 非緩沖型的扶踊,buf 沒用,直接指向 chan 起始地址處
// 2. 緩沖型的郎任,能進(jìn)入到這里秧耗,說明元素?zé)o指針且元素類型為 struct{},也無影響
// 因為只會用到接收和發(fā)送游標(biāo)舶治,不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內(nèi)容)
c.buf = unsafe.Pointer(c)
}
} else {
// 進(jìn)行兩次內(nèi)存分配操作
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循環(huán)數(shù)組長度
c.dataqsiz = uint(size)
// 返回 hchan 指針
return c
}
從函數(shù)原型來看分井,創(chuàng)建的 chan 是一個指針。所以我們能在函數(shù)間直接傳遞 channel霉猛,而不用傳遞 channel 的指針尺锚。
新建一個 chan 后,內(nèi)存在堆上分配惜浅,大概長這樣:
三瘫辩、向channel發(fā)送數(shù)據(jù)
源碼分析
發(fā)送操作最終轉(zhuǎn)化為 chansend
函數(shù),直接上源碼,同樣大部分都注釋了伐厌,可以看懂主流程:
// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞承绸,直接返回 false,表示未發(fā)送成功
if !block {
return false
}
// 當(dāng)前 goroutine 被掛起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// 省略 debug 相關(guān)……
// 對于不阻塞的 send弧械,快速檢測失敗場景
//
// 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間八酒。這可能是:
// 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
// 2. channel 是緩沖型的刃唐,但循環(huán)數(shù)組已經(jīng)裝滿了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 鎖住 channel羞迷,并發(fā)安全
lock(&c.lock)
// 如果 channel 關(guān)閉了
if c.closed != 0 {
// 解鎖
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}
// 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 對于緩沖型的 channel画饥,如果還有緩沖空間
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)
// ……
// 將數(shù)據(jù)從 ep 處拷貝到 qp
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送游標(biāo)值加 1
c.sendx++
// 如果發(fā)送游標(biāo)值等于容量值衔瓮,游標(biāo)值歸 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)的元素數(shù)量加一
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 如果不需要阻塞,則直接返回錯誤
if !block {
unlock(&c.lock)
return false
}
// channel 滿了抖甘,發(fā)送方會被阻塞热鞍。接下來會構(gòu)造一個 sudog
// 獲取當(dāng)前 goroutine 的指針
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 當(dāng)前 goroutine 進(jìn)入發(fā)送等待隊列
c.sendq.enqueue(mysg)
// 當(dāng)前 goroutine 被掛起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 從這里開始被喚醒了(channel 有機(jī)會可以發(fā)送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被喚醒后,channel 關(guān)閉了衔彻∞背瑁坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上綁定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}
上面的代碼注釋地比較詳細(xì)了艰额,我們來詳細(xì)看看澄港。
- 如果檢測到 channel 是空的,當(dāng)前 goroutine 會被掛起柄沮。
- 對于不阻塞的發(fā)送操作回梧,如果 channel 未關(guān)閉并且沒有多余的緩沖空間(說明:a. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine祖搓;b. channel 是緩沖型的狱意,但循環(huán)數(shù)組已經(jīng)裝滿了元素)
對于這一點,runtime 源碼里注釋了很多拯欧。這一條判斷語句是為了在不阻塞發(fā)送的場景下快速檢測到發(fā)送失敗详囤,好快速返回。
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
- 如果檢測到 channel 已經(jīng)關(guān)閉哈扮,直接 panic纬纪。
- 如果能從等待接收隊列 recvq 里出隊一個 sudog(代表一個 goroutine),說明此時 channel 是空的滑肉,沒有元素包各,所以才會有等待接收者。這時會調(diào)用 send 函數(shù)將元素直接從發(fā)送者的棸忻恚拷貝到接收者的棧问畅,關(guān)鍵操作由 sendDirect 函數(shù)完成。
// send 函數(shù)處理向一個空的 channel 發(fā)送操作
// ep 指向被發(fā)送的元素,會被直接拷貝到接收的 goroutine
// 之后护姆,接收的 goroutine 會被喚醒
// c 必須是空的(因為等待隊列里有 goroutine矾端,肯定是空的)
// c 必須被上鎖,發(fā)送操作執(zhí)行完后卵皂,會使用 unlockf 函數(shù)解鎖
// sg 必須已經(jīng)從等待隊列里取出來了
// ep 必須是非空秩铆,并且它指向堆或調(diào)用者的棧
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 省略一些用不到的
// ……
// sg.elem 指向接收到的值存放的位置,如 val <- ch灯变,指的就是 &val
if sg.elem != nil {
// 直接拷貝內(nèi)存(從發(fā)送者到接收者)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// sudog 上綁定的 goroutine
gp := sg.g
// 解鎖
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒接收的 goroutine. skip 和打印棧相關(guān)殴玛,暫時不理會
goready(gp, skip+1)
}
繼續(xù)看 sendDirect 函數(shù):
// 向一個非緩沖型的 channel 發(fā)送數(shù)據(jù)、從一個無元素的(非緩沖型或緩沖型但空)的 channel
// 接收數(shù)據(jù)添祸,都會導(dǎo)致一個 goroutine 直接操作另一個 goroutine 的棧
// 由于 GC 假設(shè)對棧的寫操作只能發(fā)生在 goroutine 正在運行中并且由當(dāng)前 goroutine 來寫
// 所以這里實際上違反了這個假設(shè)滚粟。可能會造成一些問題刃泌,所以需要用到寫屏障來規(guī)避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 在當(dāng)前 goroutine 的棧上凡壤,dst 是另一個 goroutine 的棧
// 直接進(jìn)行內(nèi)存"搬遷"
// 如果目標(biāo)地址的棧發(fā)生了棧收縮,當(dāng)我們讀出了 sg.elem 后
// 就不能修改真正的 dst 位置的值了
// 因此需要在讀和寫之前加上一個屏障
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
這里涉及到一個 goroutine 直接寫另一個 goroutine 棧的操作耙替,一般而言亚侠,不同 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設(shè)俗扇。為了不出問題盖奈,寫的過程中增加了寫屏障,保證正確地完成寫操作狐援。這樣做的好處是減少了一次內(nèi)存 copy:不用先拷貝到 channel 的 buf,直接由發(fā)送者到接收者究孕,沒有中間商賺差價啥酱,效率得以提高,完美厨诸。
然后镶殷,解鎖、喚醒接收者微酬,等待調(diào)度器的光臨绘趋,接收者也得以重見天日,可以繼續(xù)執(zhí)行接收操作之后的代碼了颗管。
- 如果 c.qcount < c.dataqsiz陷遮,說明緩沖區(qū)可用(肯定是緩沖型的 channel)。先通過函數(shù)取出待發(fā)送元素應(yīng)該去到的位置:
qp := chanbuf(c, c.sendx)
// 返回循環(huán)隊列里第 i 個元素的地址處
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
c.sendx
指向下一個待發(fā)送元素在循環(huán)數(shù)組中的位置垦江,然后調(diào)用 typedmemmove
函數(shù)將其拷貝到循環(huán)數(shù)組中帽馋。之后 c.sendx
加 1,元素總量加 1 :c.qcount++,最后绽族,解鎖并返回姨涡。
- 如果沒有命中以上條件的,說明 channel 已經(jīng)滿了吧慢。不管這個 channel 是緩沖型的還是非緩沖型的涛漂,都要將這個 sender “關(guān)起來”(goroutine 被阻塞)。如果 block 為 false检诗,直接解鎖匈仗,返回 false。
- 最后就是真的需要被阻塞的情況岁诉。先構(gòu)造一個 sudog锚沸,將其入隊(channel 的 sendq 字段)。然后調(diào)用 goparkunlock 將當(dāng)前 goroutine 掛起涕癣,并解鎖哗蜈,等待合適的時機(jī)再喚醒。
喚醒之后坠韩,從 goparkunlock
下一行代碼開始繼續(xù)往下執(zhí)行距潘。
這里有一些綁定操作,sudog 通過 g 字段綁定 goroutine只搁,而 goroutine 通過 waiting 綁定 sudog音比,sudog 還通過 elem 字段綁定待發(fā)送元素的地址,以及 c 字段綁定被“坑”在此處的 channel氢惋。
所以洞翩,待發(fā)送的元素地址其實是存儲在 sudog 結(jié)構(gòu)體里,也就是當(dāng)前 goroutine 里焰望。
四骚亿、從channel接收數(shù)據(jù)
接收操作有兩種寫法,一種帶 “ok”熊赖,反應(yīng) channel 是否關(guān)閉来屠;一種不帶 “ok”,這種寫法震鹉,當(dāng)接收到相應(yīng)類型的零值時無法知道是真實的發(fā)送者發(fā)送過來的值俱笛,還是 channel 被關(guān)閉后,返回給接收者的默認(rèn)類型的零值传趾。兩種寫法迎膜,都有各自的應(yīng)用場景。
經(jīng)過編譯器的處理后墨缘,這兩種寫法最后對應(yīng)源碼里的這兩個函數(shù):
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv1
函數(shù)處理不帶 “ok” 的情形星虹,chanrecv2
則通過返回 “received” 這個字段來反應(yīng) channel 是否被關(guān)閉零抬。接收值則比較特殊,會“放到”參數(shù) elem 所指向的地址了宽涌,這很像 C/C++ 里的寫法平夜。如果代碼里忽略了接收值,這里的 elem 為 nil卸亮。
無論如何忽妒,最終轉(zhuǎn)向了 chanrecv
函數(shù):
// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil兼贸,說明忽略了接收值段直。
// 如果 block == false,即非阻塞型接收溶诞,在沒有數(shù)據(jù)可接收的情況下鸯檬,返回 (false, false)
// 否則,如果 c 處于關(guān)閉狀態(tài)螺垢,將 ep 指向的地址清零喧务,返回 (true, false)
// 否則,用返回值填充 ep 指向的內(nèi)存地址枉圃。返回 (true, true)
// 如果 ep 非空功茴,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 省略 debug 內(nèi)容 …………
// 如果是一個 nil 的 channel
if c == nil {
// 如果不阻塞,直接返回 (false, false)
if !block {
return
}
// 否則孽亲,接收一個 nil 的 channel坎穿,goroutine 掛起
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
// 不會執(zhí)行到這里
throw("unreachable")
}
// 在非阻塞模式下,快速檢測到失敗返劲,不用獲取鎖玲昧,快速返回
// 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
// 1. 非緩沖型,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
// 2. 緩沖型篮绿,但 buf 里沒有元素
// 之后酌呆,又觀察到 closed == 0,即 channel 未關(guān)閉搔耕。
// 因為 channel 不可能被重復(fù)打開,所以前一個觀測的時候 channel 也是未關(guān)閉的痰娱,
// 因此在這種情況下可以直接宣布接收失敗弃榨,返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
// 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
// 也就是說即使是關(guān)閉狀態(tài)梨睁,但在緩沖型的 channel鲸睛,
// buf 里有元素的情況下還能接收到元素
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
// 解鎖
unlock(&c.lock)
if ep != nil {
// 從一個已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
// 那么接收的值將是一個該類型的零值
// typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
typedmemclr(c.elemtype, ep)
}
// 從一個已關(guān)閉的 channel 接收坡贺,selected 會返回true
return true, false
}
// 等待發(fā)送隊列里有 goroutine 存在官辈,說明 buf 是滿的
// 這有可能是:
// 1. 非緩沖型的 channel
// 2. 緩沖型的 channel箱舞,但 buf 滿了
// 針對 1,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
// 針對 2拳亿,接收到循環(huán)數(shù)組頭部的元素晴股,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 緩沖型,buf 里有元素肺魁,可以正常接收
if c.qcount > 0 {
// 直接從循環(huán)數(shù)組里找到要接收的元素
qp := chanbuf(c, c.recvx)
// …………
// 代碼里电湘,沒有忽略要接收的值,不是 "<- ch"鹅经,而是 "val <- ch"寂呛,ep 指向 val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
typedmemclr(c.elemtype, qp)
// 接收游標(biāo)向前移動
c.recvx++
// 接收游標(biāo)歸零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf 數(shù)組里的元素個數(shù)減 1
c.qcount--
// 解鎖
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收,解鎖瘾晃。selected 返回 false贷痪,因為沒有接收到值
unlock(&c.lock)
return false, false
}
// 接下來就是要被阻塞的情況了
// 構(gòu)造一個 sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 待接收數(shù)據(jù)的地址保存下來
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
// 進(jìn)入channel 的等待接收隊列
c.recvq.enqueue(mysg)
// 將當(dāng)前 goroutine 掛起
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// 被喚醒了,接著從這里繼續(xù)執(zhí)行一些掃尾工作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
- 如果 channel 是一個空值(nil)蹦误,在非阻塞模式下劫拢,會直接返回。在阻塞模式下胖缤,會調(diào)用 gopark 函數(shù)掛起 goroutine尚镰,這個會一直阻塞下去。因為在 channel 是 nil 的情況下哪廓,要想不阻塞狗唉,只有關(guān)閉它,但關(guān)閉一個 nil 的 channel 又會發(fā)生 panic涡真,所以沒有機(jī)會被喚醒了分俯。更詳細(xì)地可以在 closechan 函數(shù)的時候再看。
- 和發(fā)送函數(shù)一樣哆料,接下來搞了一個在非阻塞模式下缸剪,不用獲取鎖,快速檢測到失敗并且返回的操作东亦。順帶插一句杏节,我們平時在寫代碼的時候,找到一些邊界條件典阵,快速返回奋渔,能讓代碼邏輯更清晰,因為接下來的正常情況就比較少壮啊,更聚焦了嫉鲸,看代碼的人也更能專注地看核心代碼邏輯了。
// 在非阻塞模式下歹啼,快速檢測到失敗玄渗,不用獲取鎖座菠,快速返回 (false, false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
- 非緩沖型,等待發(fā)送列隊里沒有 goroutine 在等待
- 緩沖型藤树,但 buf 里沒有元素
之后浴滴,又觀察到 closed == 0,即 channel 未關(guān)閉也榄。
因為 channel 不可能被重復(fù)打開巡莹,所以前一個觀測的時候, channel 也是未關(guān)閉的甜紫,因此在這種情況下可以直接宣布接收失敗降宅,快速返回。因為沒被選中囚霸,也沒接收到數(shù)據(jù)腰根,所以返回值為 (false, false)。
接下來的操作拓型,首先會上一把鎖额嘿,粒度比較大。如果 channel 已關(guān)閉劣挫,并且循環(huán)數(shù)組 buf 里沒有元素册养。對應(yīng)非緩沖型關(guān)閉和緩沖型關(guān)閉但 buf 無元素的情況,返回對應(yīng)類型的零值压固,但 received 標(biāo)識是 false球拦,告訴調(diào)用者此 channel 已關(guān)閉,你取出來的值并不是正常由發(fā)送者發(fā)送過來的數(shù)據(jù)帐我。但是如果處于 select 語境下坎炼,這種情況是被選中了的。很多將 channel 用作通知信號的場景就是命中了這里拦键。
接下來谣光,如果有等待發(fā)送的隊列,說明 channel 已經(jīng)滿了芬为,要么是非緩沖型的 channel萄金,要么是緩沖型的 channel,但 buf 滿了媚朦。這兩種情況下都可以正常接收數(shù)據(jù)捡絮。
于是,調(diào)用 recv 函數(shù):
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是非緩沖型的 channel
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 未忽略接收的數(shù)據(jù)
if ep != nil {
// 直接拷貝數(shù)據(jù)莲镣,從 sender goroutine -> receiver goroutine
recvDirect(c.elemtype, sg, ep)
}
} else {
// 緩沖型的 channel,但 buf 已滿涎拉。
// 將循環(huán)數(shù)組 buf 隊首的元素拷貝到接收數(shù)據(jù)的地址
// 將發(fā)送者的數(shù)據(jù)入隊瑞侮。實際上這時 revx 和 sendx 值相等
// 找到接收游標(biāo)
qp := chanbuf(c, c.recvx)
// …………
// 將接收游標(biāo)處的數(shù)據(jù)拷貝給接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 將發(fā)送者數(shù)據(jù)拷貝到 buf
typedmemmove(c.elemtype, qp, sg.elem)
// 更新游標(biāo)值
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
// 解鎖
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒發(fā)送的 goroutine的圆。需要等到調(diào)度器的光臨
goready(gp, skip+1)
}
如果是非緩沖型的,就直接從發(fā)送者的棸牖穑拷貝到接收者的棧越妈。
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
否則,就是緩沖型 channel钮糖,而 buf 又滿了的情形梅掠。說明發(fā)送游標(biāo)和接收游標(biāo)重合了,因此需要先找到接收游標(biāo):
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
將該處的元素拷貝到接收地址店归。然后將發(fā)送者待發(fā)送的數(shù)據(jù)拷貝到接收游標(biāo)處阎抒。這樣就完成了接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作。接著消痛,分別將發(fā)送游標(biāo)和接收游標(biāo)向前進(jìn)一且叁,如果發(fā)生“環(huán)繞”,再從 0 開始秩伞。
最后逞带,取出 sudog 里的 goroutine,調(diào)用 goready 將其狀態(tài)改成 “runnable”纱新,待發(fā)送者被喚醒展氓,等待調(diào)度器的調(diào)度。
然后脸爱,如果 channel 的 buf 里還有數(shù)據(jù)遇汞,說明可以比較正常地接收。注意阅羹,這里勺疼,即使是在 channel 已經(jīng)關(guān)閉的情況下,也是可以走到這里的捏鱼。這一步比較簡單执庐,正常地將 buf 里接收游標(biāo)處的數(shù)據(jù)拷貝到接收數(shù)據(jù)的地址。
到了最后一步导梆,走到這里來的情形是要阻塞的轨淌。當(dāng)然,如果 block 傳進(jìn)來的值是 false看尼,那就不阻塞递鹉,直接返回就好了。
先構(gòu)造一個 sudog藏斩,接著就是保存各種值了躏结。注意,這里會將接收數(shù)據(jù)的地址存儲到了 elem 字段狰域,當(dāng)被喚醒時媳拴,接收到的數(shù)據(jù)就會保存到這個字段指向的地址黄橘。然后將 sudog 添加到 channel 的 recvq 隊列里。調(diào)用 goparkunlock 函數(shù)將 goroutine 掛起屈溉。
接下來的代碼就是 goroutine 被喚醒后的各種收尾工作了偿荷。
channel操作總結(jié):
注意:關(guān)閉已經(jīng)關(guān)閉的channel也會引發(fā)panic馋辈。
References:
https://cloud.tencent.com/developer/article/1750350
https://golang.design/go-questions/channel
https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html
https://juejin.cn/post/7037656471210819614