本篇文章內(nèi)容基于go1.14.2分析
golang的chan是一個(gè)內(nèi)置類型咨油,作為csp編程的核心數(shù)據(jù)結(jié)構(gòu)析二,其底層數(shù)據(jù)結(jié)構(gòu)是一個(gè)叫hchan的struct:
type hchan struct {
qcount uint // 隊(duì)列中的元素?cái)?shù)量
dataqsiz uint // (環(huán)形)隊(duì)列的大小
buf unsafe.Pointer // 隊(duì)列的指針
elemsize uint16 // 元素大小
closed uint32 // 是否已close
elemtype *_type // 元素類型
sendx uint // 環(huán)形隊(duì)列中惫周,send的位置
recvx uint // 環(huán)形隊(duì)列中 recv的位置
recvq waitq // 讀取等待隊(duì)列
sendq waitq // 發(fā)送等待隊(duì)列
lock mutex // 互斥鎖
}
如圖所示钥勋,chan最核心的部分由一個(gè)環(huán)形隊(duì)列和2個(gè)waitq組成,環(huán)形隊(duì)列用于存放數(shù)據(jù)(帶緩沖的情況下)科阎,waitq用于實(shí)現(xiàn)阻塞和恢復(fù)goroutine述吸。
chan的相關(guān)操作
對(duì)chan的操作有:make、讀锣笨、寫(xiě)蝌矛、close,當(dāng)然還有select错英,這里只討論前面四個(gè)操作入撒。
創(chuàng)建 chan
當(dāng)在代碼中使用make創(chuàng)建chan時(shí),編譯器會(huì)根據(jù)情況自動(dòng)替換成makechan64 或者makechan走趋,makechan64 其實(shí)還是調(diào)用了makechan函數(shù)衅金。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 確保元素類型的size < 2^16噪伊,
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 檢查內(nèi)存對(duì)齊
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 計(jì)算緩沖區(qū)所需分配內(nèi)存大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 即不帶緩沖區(qū)的情況簿煌,只需要調(diào)用mallocgc分配
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 理解為空地址
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素類型不包含指針的情況
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 默認(rèn)情況下:包含指針
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
chan 寫(xiě)操作
當(dāng)對(duì)chan進(jìn)行寫(xiě)入“ch <- interface{}” 時(shí),會(huì)被編譯器替換成chansend1函數(shù)的調(diào)用,最終還是調(diào)用了chansend函數(shù):
//elem 是待寫(xiě)入元素的地址
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
先看看chansend的函數(shù)簽名鉴吹,只需關(guān)注ep和block這個(gè)兩個(gè)參數(shù)即可姨伟,ep是要寫(xiě)入數(shù)據(jù)的地址,block表示是否阻塞式的調(diào)用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
chansend有以下幾種處理流程:
-
當(dāng)對(duì)一個(gè)nil chan進(jìn)行寫(xiě)操作時(shí)豆励,如果是非阻塞調(diào)用夺荒,直接返回瞒渠;否則將當(dāng)前協(xié)程掛起
// chansend 對(duì)一個(gè) nil chan發(fā)送數(shù)據(jù)時(shí),如果是非阻塞則直接返回技扼,否則將當(dāng)前協(xié)程掛起 if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
-
非阻塞模式且chan未close伍玖,沒(méi)有緩沖區(qū)且沒(méi)有等待接收或者緩沖區(qū)滿的情況下,直接return false剿吻。
// 1. 非阻塞模式且chan未close // 2. 沒(méi)有緩沖區(qū)且沒(méi)有等待接收 或者 緩沖區(qū)滿的情況下 // 滿足以上條件直接return false if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false }
-
c.recvq中有等待讀的接收者窍箍,將其出隊(duì),將數(shù)據(jù)直接copy給接收者丽旅,并喚醒接收者椰棘。
// 有等待的接收的goroutine // 出隊(duì),傳遞數(shù)據(jù) if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
recvq是一個(gè)雙向鏈表榄笙,每個(gè)sudog會(huì)關(guān)聯(lián)上一個(gè)reader(被阻塞的g)
當(dāng)sudog出隊(duì)后邪狞,會(huì)調(diào)用send方法,通過(guò)sendDirect 實(shí)現(xiàn)數(shù)據(jù)在兩個(gè)地址之間拷貝茅撞,最后調(diào)用goready喚醒reader(被阻塞的g)
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ... 剔除無(wú)關(guān)代碼 if sg.elem != nil { // 直接將數(shù)據(jù)拷貝到變量ep所在的地址 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } //將reader的goroutine喚起 goready(gp, skip+1) }
-
緩沖區(qū)未滿的情況下帆卓,數(shù)據(jù)放入環(huán)形緩沖區(qū)即可。
// 緩沖區(qū)未滿 // 將數(shù)據(jù)放到緩沖區(qū) if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. // 存放位置 qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) // 指針自增 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
-
緩沖區(qū)已滿米丘,阻塞模式下關(guān)聯(lián)一個(gè)sudog數(shù)據(jù)結(jié)構(gòu)并進(jìn)入c.sendq隊(duì)列鳞疲,掛起當(dāng)前協(xié)程。
// 阻塞的情況 gp := getg() //拿到當(dāng)前g mysg := acquireSudog() // 獲取一個(gè)sudog mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 mysg.elem = ep //關(guān)聯(lián)ep蠕蚜,即待寫(xiě)入的數(shù)據(jù)地址 mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // 入隊(duì) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 將g休眠尚洽,讓出cpu // gopark后,需等待reader來(lái)喚醒它 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 喚醒過(guò)后 // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. // 保持?jǐn)?shù)據(jù)不被回收 KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true
chan 讀操作
當(dāng)對(duì)chan進(jìn)行讀操作時(shí)靶累,編譯器會(huì)替換成 chanrecv1或者chanrecv2函數(shù)腺毫,最終會(huì)調(diào)用chanrecv函數(shù)處理讀取
// v := <- ch
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// v, ok := <- ch
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
和chansend一樣,chanrecv也是支持非阻塞式的調(diào)用
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
chanrecv有以下幾種處理流程:
-
讀nil chan挣柬,如果是非阻塞潮酒,直接返回;如果是阻塞式邪蛔,將當(dāng)前協(xié)程掛起急黎。
// 讀阻塞 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
-
非阻塞模式下,沒(méi)有緩沖區(qū)且沒(méi)有等待寫(xiě)的writer或者緩沖區(qū)沒(méi)數(shù)據(jù)侧到,直接返回勃教。
if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return }
-
chan已經(jīng)被close,并且隊(duì)列中沒(méi)有數(shù)據(jù)時(shí)匠抗,會(huì)將存放值的變量清零故源,然后返回。
// c已經(jīng)被close 并且 沒(méi)有數(shù)據(jù) // 清除ep指針 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
-
sendq中有等待的writer汞贸,writer出隊(duì)绳军,并調(diào)用recv函數(shù)
// 從sendq中取出sender if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 從sender中讀取數(shù)據(jù) recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
recv在這分兩種處理:如果ch不帶緩沖區(qū)的話印机,直接將writer的sg.elem數(shù)據(jù)拷貝到ep;如果帶緩沖區(qū)的話门驾,此時(shí)緩沖區(qū)肯定滿了射赛,那么就從緩沖區(qū)隊(duì)列頭部取出數(shù)據(jù)拷貝至ep,然后將writer的sg.elem數(shù)據(jù)拷貝到緩沖區(qū)中奶是,最后喚醒writer(g)
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 不帶緩沖區(qū)的情況 // 直接copy from sender if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. // 隊(duì)列已滿 // 隊(duì)列元素出隊(duì) qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver // 數(shù)據(jù)拷貝給ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue // 將sender的數(shù)據(jù)拷貝到這個(gè)槽中 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } // 置空 sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒sender協(xié)程 goready(gp, skip+1) }
-
直接從緩沖隊(duì)列中讀數(shù)咒劲。
// 帶緩沖區(qū) if c.qcount > 0 { // Receive directly from queue // 直接buf中取 qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } // 拷貝數(shù)據(jù)到ep指針 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 清除qp typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
-
阻塞的情況,緩沖區(qū)沒(méi)有數(shù)據(jù)诫隅,且沒(méi)有writer
// 阻塞 gp := getg() //拿到當(dāng)前的goroutine mysg := acquireSudog() // 獲取一個(gè)sudog mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } //sudog 關(guān)聯(lián) mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) //入隊(duì) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) // 掛起當(dāng)前goroutine腐魂,等待writer喚醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 喚醒后 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil // sudog解除關(guān)聯(lián) mysg.c = nil // 釋放sudog releaseSudog(mysg)
close 關(guān)閉操作
當(dāng)close一個(gè)chan時(shí),編譯器會(huì)替換成對(duì)closechan函數(shù)的調(diào)用逐纬,將closed字段置為1蛔屹,并將recvq和sendq中的goroutine釋放喚醒,對(duì)sendq中未寫(xiě)入的數(shù)據(jù)做清除豁生,且writer會(huì)發(fā)生panic異常兔毒。
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖
lock(&c.lock)
// 不可重復(fù)close
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// 釋放所有的
for {
// 出隊(duì)
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 清零
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放所有writer
for {
// 出隊(duì)
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 丟棄數(shù)據(jù)
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 喚醒所有g(shù)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
chan使用小技巧
-
避免read、write一個(gè)nil chan
func main() { ch := make(chan int,1) go func() { time.Sleep(1*time.Second) ch = nil }() ch<-1 ch<-1 // 協(xié)程直接掛起 }
-
從chan中read時(shí)甸箱,使用帶指示的訪問(wèn)方式育叁,讀取的時(shí)候無(wú)法感知到close的關(guān)閉
func main() { ch := make(chan int) go func() { ch <- 10 close(ch) }() for { select { // case i, ok := <-ch: // if ok { // break //} case i := <-ch: fmt.Println(i) time.Sleep(100 * time.Millisecond) } } }
-
從chan中read時(shí),不要使用已存在變量接收, chan close之后芍殖,緩沖區(qū)沒(méi)有數(shù)據(jù)的話豪嗽,使用存在變量讀取時(shí),會(huì)將變量清零
func main() { a := 10 ch := make(chan int,1) fmt.Println("before close a is: ", a) // a is 10 close(ch) a = <-ch fmt.Println("after close a is: ", a) // a is 0 }
-
使用select+default可以實(shí)現(xiàn) chan的無(wú)阻塞讀取
// 使用select反射包實(shí)現(xiàn)無(wú)阻塞讀寫(xiě) func tryRead(ch chan int) (int, bool) { var cases []reflect.SelectCase caseRead := reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch), } cases = append(cases, caseRead) cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectDefault, }) _, v, ok := reflect.Select(cases) if ok { return (v.Interface()).(int), ok } return 0, ok } func tryWrite(ch chan int, data int) bool { var cases []reflect.SelectCase caseWrite := reflect.SelectCase{ Dir: reflect.SelectSend, Chan: reflect.ValueOf(ch), Send: reflect.ValueOf(data), } cases = append(cases, caseWrite) cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectDefault, }) chosen, _, _ := reflect.Select(cases) return chosen == 0 } // 使用select + default實(shí)現(xiàn)無(wú)阻塞讀寫(xiě) func tryRead2(ch chan int) (int, bool) { select { case v, ok := <-ch: return v, ok default: return 0, false } } func tryWrite2(ch chan int, data int) bool { select { case ch <- data: return true default: return false } }
原因是如果select的case中存在default豌骏,對(duì)chan的讀寫(xiě)會(huì)使用無(wú)阻塞的方法
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return }