channel是golang中特有的一種數(shù)據(jù)結(jié)構(gòu),通常與goroutine一起使用邑贴,下面我們就介紹一下這種數(shù)據(jù)結(jié)構(gòu)。
channel數(shù)據(jù)結(jié)構(gòu)
channel最重要的一個(gè)結(jié)構(gòu)體就是hchan
,我們創(chuàng)建一個(gè)channel的時(shí)候傀广,實(shí)際上是創(chuàng)建了一個(gè)下面結(jié)構(gòu)體的實(shí)例。
hchan結(jié)構(gòu)體
// src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
字段說明
-
qcount
當(dāng)前channel中的元素?cái)?shù)量 -
dataqsiz
環(huán)形隊(duì)列的大小 -
buf
指向dataqsize
的數(shù)組指針彩届,只有緩沖chan有效 -
closed
當(dāng)前channel關(guān)閉狀態(tài) -
elemsize
存儲(chǔ)元素的大小 -
elemtype
存儲(chǔ)元素的數(shù)據(jù)類型 -
sendx
發(fā)送操作處理到的索引位置伪冰,最大值為數(shù)組buf的最大下標(biāo)值 -
recvx
接收操作處理到的索引位置,最大值為數(shù)組buf的最大下標(biāo)值 -
recvq
接收隊(duì)列樟蠕,雙向鏈表贮聂,阻塞元素 -
sendq
發(fā)送列隊(duì),雙向鏈表寨辩,阻塞元素 -
lock
鎖,吓懈,用來保護(hù)sudog里的所的字段
hchan struct
其中elemsize
和 elemtype
表示存儲(chǔ)數(shù)據(jù)的大小和類型;sendx
和recvx
是指向底層數(shù)據(jù)的索引位置靡狞,表示當(dāng)前處理的進(jìn)度位置耻警;recvq
和sendq
是一個(gè)由雙向鏈表實(shí)現(xiàn)的隊(duì)列,它存儲(chǔ)的內(nèi)容是由于隊(duì)列dataqsize
過小甸怕,而阻塞的數(shù)據(jù)甘穿。
每次進(jìn)行發(fā)送數(shù)據(jù)和讀取數(shù)據(jù)時(shí)都需要加鎖。
waitq結(jié)構(gòu)體
// src/runtime/chan.go
type waitq struct {
first *sudog
last *sudog
}
sudog結(jié)構(gòu)體
// src/runtime/runtime2.go
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ? synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
這里 sudog
實(shí)際上是對(duì) goroutine
的一個(gè)封裝梢杭,一個(gè)sudog 就是一個(gè)goroutine温兼,如在channal上發(fā)送和接收。
sudogs
是通過一個(gè)特殊的池來分配的武契,通過 acquireSudog() 和releaseSudog()進(jìn)行獲取和釋放募判。
sudog里的字段是由 hchan.lock
鎖來進(jìn)行保護(hù)荡含。
channel 整體結(jié)構(gòu)圖
<figcaption>hchan 結(jié)構(gòu)圖(來源:互聯(lián)網(wǎng)技術(shù)窩)</figcaption>
創(chuàng)建
// 無緩沖通道
ch1 := make(chan int)
// 有緩沖通道
ch2 := make(chan int, 10)
通過編譯可以發(fā)現(xiàn)channel的創(chuàng)建是由[makechan()](https://github.com/golang/go/blob/go1.15.6/src/runtime/chan.go#L71-L118)
函數(shù)來完成的。源碼
// src/runtime/chan.go
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
}
return c
}
函數(shù)返回的是一個(gè)指針類型兰伤,因此我們可以在函數(shù)中通過參數(shù)直接傳遞内颗,不需要再轉(zhuǎn)為指針使傳遞。
步驟
- 數(shù)據(jù)合法性檢查敦腔,包括發(fā)送數(shù)據(jù)的類型和大小
- 根據(jù)不同場(chǎng)景分配內(nèi)存均澳,主要針對(duì)buf字段
a. 內(nèi)存大小為0,注意這時(shí)c.buf 的值為c.raceaddr()
b. 元素不包含指針符衔,一次性分配一段內(nèi)存地址
c. 元素包含指針找前,分配內(nèi)存 - 初始化其它字段
第一個(gè)參數(shù) *chantype 結(jié)構(gòu)定義
// src/runtime/type.go
type chantype struct {
typ _type
elem *_type
dir uintptr
}
實(shí)際上創(chuàng)建一個(gè)channel, 只是對(duì)一個(gè)hchan結(jié)構(gòu)體進(jìn)行了一些初始化操作,并返回其指針判族。因此我們?cè)诤瘮?shù)傳遞時(shí)躺盛,不需要傳遞指針,直接使用就可以了形帮,因?yàn)樗旧砭褪且粋€(gè)指針的類型槽惫。
注意:對(duì)于chan內(nèi)存是在heap上分配的。
發(fā)送數(shù)據(jù)
對(duì)于channel的寫操作是由 chansend() 函數(shù)來實(shí)現(xiàn)的辩撑。
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
在chan為nil的情況下, 如果是非阻塞則直接返回界斜,否則panic。
對(duì)于分送數(shù)據(jù)chan有三種情況合冀,分別是直接發(fā)送各薇,緩存區(qū)發(fā)送
和 阻塞發(fā)送
,其中阻塞發(fā)送涉及到GMP 的調(diào)度君躺,理解起來有些吃力峭判。
在發(fā)送數(shù)據(jù)前需要進(jìn)行加鎖
操作,發(fā)送完再解鎖
棕叫,保證原子性操作林螃。
直接發(fā)送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
// 直接發(fā)送
// 如果接收隊(duì)列中有接收者,則直接將數(shù)據(jù)發(fā)給接收者谍珊,重點(diǎn)在send()函數(shù),并在函數(shù)里進(jìn)行解鎖
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
......
}
如果接收隊(duì)列中有接收者治宣,則優(yōu)化從接收者從隊(duì)列中取出來sg(sg := c.recvq.dequeue()
),然后再通過調(diào)用 send() 函數(shù)將數(shù)據(jù)發(fā)送給接收者即可砌滞。
<figcaption>channel send</figcaption>
在send()函數(shù)里,會(huì)執(zhí)行一個(gè)回調(diào)函數(shù)主要用來進(jìn)行解鎖c.lock
侮邀。真正的發(fā)送操作是函數(shù) sendDirect(),通過memmove(dst, src, t.size) 將數(shù)據(jù)復(fù)制過去贝润。
緩沖區(qū)發(fā)送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
// 緩沖區(qū)發(fā)送
// 接收者隊(duì)列中沒有接收者goroutine
// 當(dāng)前channel中的元素<隊(duì)列的大小绊茧,有緩沖buffer未滿的情況
// 將數(shù)據(jù)存放在sendx在buf數(shù)組中的索引位置,然后再將sendx索引+1
// 由于是一個(gè)循環(huán)數(shù)組打掘,所以如果達(dá)到了dataqsize华畏,則從0開始鹏秋,同時(shí)個(gè)數(shù)+1
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
......
}
如果當(dāng)前recvq
隊(duì)列里沒有處于等待執(zhí)行的sudog
的話,則需要將數(shù)據(jù)發(fā)送到緩沖隊(duì)列中(如果當(dāng)前隊(duì)列為緩沖chan)亡笑。
假設(shè)當(dāng)前buffer大小為6(dataqsiz=6
)侣夷,數(shù)據(jù)個(gè)數(shù)為0(qcount=0
),這里寫入6個(gè)數(shù)據(jù)仑乌,如下圖百拓。
<figcaption>channel send</figcaption>
如果當(dāng)前緩沖區(qū)的元素?cái)?shù)量<隊(duì)列的大小,說明緩沖區(qū)還沒有滿晰甚,還可以繼續(xù)裝載數(shù)據(jù)衙传。
這時(shí)第一步先計(jì)算出 s.sendx 索引位置的內(nèi)存地址,然后調(diào)用 typememmove() 函數(shù)將qp復(fù)制到內(nèi)存地址厕九,再將s.sendx索引值+1蓖捶,同時(shí)c.qcount++
。
當(dāng) sendx = dataqsiz
的時(shí)候,說明已到了數(shù)組最后一個(gè)元素,下次存儲(chǔ)數(shù)據(jù)的話搓劫,則需要重新從0開始了,所以需要重置為0
亭引。
buf
是一個(gè)由數(shù)組組成的隊(duì)列,滿足隊(duì)列的FIFO
的機(jī)制皮获,最新存儲(chǔ)的數(shù)據(jù)也先消費(fèi),最多可以存儲(chǔ) dataqsiz
個(gè)數(shù)量纹冤。超出這個(gè)數(shù)據(jù)量就需要使用第三種 阻塞發(fā)送
方式了洒宝。
sendx
始終保存的是下次存儲(chǔ)數(shù)據(jù)的數(shù)組索引位置,每次使用完記得+1
萌京。每次存儲(chǔ)以前都需要判斷當(dāng)前buffer是否有空間可用 c.qcount < c.dataqsiz
雁歌。
總結(jié)
-
q.sendx
最大值為c.dataqsiz -1
,即數(shù)組的最大索引值知残。 -
q.count
是當(dāng)前chan 存儲(chǔ)的元素個(gè)數(shù)靠瞎,有可能 >c.dataqsiz
阻塞發(fā)送
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
// 阻塞發(fā)送
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
......
}
如果當(dāng)buff也寫滿的話,再send數(shù)據(jù)的話求妹,則需要進(jìn)行阻塞發(fā)送了乏盐。
假如我們有一個(gè)緩沖chan,但緩沖大小已經(jīng)使用完制恍,再次發(fā)送數(shù)據(jù)的話父能,則需要進(jìn)入sendq隊(duì)列了(將sudog綁定到一個(gè)goroutine,并放在sendq净神,等待讀群瘟摺)
對(duì)于阻塞的情況溉委,理解起來有些吃力,因?yàn)樯婕暗紾MP的關(guān)系和調(diào)度爱榕。
- 調(diào)用 getg() 函數(shù)獲取當(dāng)前運(yùn)行的goroutine
- 調(diào)用 acquireSudog() 函數(shù)獲取一個(gè)sudog瓣喊,并進(jìn)行數(shù)據(jù)綁定
- 將mysg 添加到發(fā)送隊(duì)列sendq,并設(shè)置為gp.waiting
- 更改goroutine狀態(tài)
- 設(shè)置goroutine為等待喚醒狀態(tài)黔酥,調(diào)用 atomic.Store8(&gp.parkingOnChan, 1)函數(shù)藻三?
- 通過keepAlive()函數(shù)可以保證發(fā)送的值一直有效,直到被接收者取走
- 進(jìn)行清理工作
- 釋放 sudog 結(jié)構(gòu)體
總結(jié)
- 阻塞發(fā)送并不會(huì)更新
c.qcount
數(shù)量個(gè)數(shù) - acquireSudog() 和 releaseSudog(mysg) 是配對(duì)一起使用絮爷。
讀取數(shù)據(jù)
對(duì)于channel的讀取方式:
v <- ch
v, ok <- ch
其中 v<-ch 對(duì)應(yīng)的是 runtime.chanrecv1()趴酣, v, ok <-ch 對(duì)應(yīng)的是`runtime.chanrecv2()。但這兩個(gè)函數(shù)最終調(diào)用的還是同一個(gè)函數(shù)坑夯,即 chanrecv()岖寞。
我們先看一下官方文檔對(duì)這個(gè)函數(shù)的說明
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
- chanrecv 用來從chan 中接收數(shù)據(jù),并將接收的數(shù)據(jù)寫入到ep
- 如果ep為 nil 的話柜蜈,則接收的數(shù)據(jù)將被忽略
- 如果非阻塞的且沒有可接收的數(shù)據(jù)將返回 (false ,false)
- 如果chan已關(guān)閉仗谆,零值 ep 和返回值將是true, false,否則使用一個(gè)元素代替ep并返回 (true, true)
- 一個(gè)非nil的 ep, 必須指向heap或者調(diào)用stack
// src/runtime/chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果c為nil,表示非法操作淑履,則直接gopark(),表示出讓當(dāng)前GMP中的P的使用權(quán)隶垮,允許其它G使用
if c == nil {
// 如果非阻塞的話,直接返回秘噪;如果是阻塞的話狸吞,直接panic
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
// 如果chan已關(guān)閉且元素個(gè)數(shù)為0
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 設(shè)置內(nèi)存內(nèi)容為類型 c.elemtype 的零值
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
如果當(dāng)前讀取的 chan 為nil的話,且非阻塞的情況指煎,則會(huì)產(chǎn)生死鎖蹋偏,最終提示
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]:
否則返回零值。
同時(shí)出讓自己占用的P至壤,允許其它goroutine搶占使用威始。
如果讀取的chan已關(guān)閉,則讀取出來的值為零值(函數(shù)說明第四條)像街。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 在沒有獲取鎖的情況下黎棠,檢查非阻塞操作失敗
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
// 如果當(dāng)前chan未關(guān)閉
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}
這段代碼主要是對(duì)重排讀的情況,進(jìn)行了雙重檢測(cè)镰绎,暫是未明白code中考慮的情況脓斩,改天再消化消化。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖跟狱,下面才是真正要讀取的邏輯
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
...
}
讀取之前先加鎖俭厚。
對(duì)chan的讀取與發(fā)送一樣,同樣有三種方式驶臊,為直接讀取挪挤、緩沖區(qū)讀取和阻塞讀取叼丑。
直接讀取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 直接讀取
// 從c.sendq隊(duì)列中取sudog, 將數(shù)據(jù)復(fù)制到sg
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
獲取一個(gè)待發(fā)送者,如果buffer大小為0扛门,則直接從發(fā)送者接收數(shù)據(jù)鸠信。否則從隊(duì)列頭部接收,并將發(fā)送者發(fā)送的數(shù)據(jù)放在隊(duì)列尾部论寨。
從c.sendq隊(duì)列里讀取一個(gè) *sudog星立,通過調(diào)用 recv()
函數(shù),將數(shù)據(jù)從發(fā)送者復(fù)制到ep中葬凳,并返回true,true绰垂,表示讀取成功。真正讀取函數(shù)為 recvDirect()火焰。
緩沖區(qū)讀取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果c.qcount>0劲装,說明緩沖區(qū)有元素可直接讀取
if c.qcount > 0 {
// Receive directly from queue
// 直接從隊(duì)列中讀取
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
如果c.qcount > 0
,則說明緩沖區(qū)里有內(nèi)容可以讀取昌简。則
直接獲取 c.recvx
數(shù)組索引位置的內(nèi)存地址占业,則
- 將
r.recvx
索引地址的值讀取出來復(fù)制給 ep, - 然后更新接收數(shù)組索引
c.recvx++
, 如果>數(shù)組索引最大索引值 纯赎,重置為0 - 減少元素個(gè)數(shù)
- 釋放鎖 c.qcount--
- 最后unlock返回谦疾。
<figcaption>chan recv</figcaption>
阻塞讀取
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
......
// c.sendq沒有sender,buffer里也是空的犬金,直接阻塞讀取
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
- 通過getg()獲取一個(gè)goroutine
- 獲取一個(gè)sudog結(jié)構(gòu)體
- 綁定兩者關(guān)系
- 加入 c.recvq 隊(duì)列
- 設(shè)置goroutine為等待喚醒狀態(tài)
- 清理狀態(tài)
關(guān)閉chan
關(guān)閉chan語(yǔ)句
close(ch)
對(duì)于已關(guān)閉的chan念恍,是不允許再次關(guān)閉的,否則會(huì)產(chǎn)生panic晚顷。對(duì)應(yīng)的函數(shù)為 runtime.closechan() 樊诺。
// src/runtime/chan.go
func closechan(c *hchan) {
// 如果chan未初始化,觸發(fā)panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 關(guān)閉已關(guān)閉的chan音同,觸發(fā)panicc
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
......
}
對(duì)于一個(gè)未初始化的chan,或者已關(guān)閉的chan秃嗜,如果再次關(guān)閉則會(huì)觸發(fā)panic权均。
func closechan(c *hchan) {
......
// 設(shè)置chan關(guān)閉狀態(tài)
c.closed = 1
// 聲明一個(gè)結(jié)構(gòu)體鏈表gList,主要用來調(diào)度使用
var glist gList
// release all readers
// 釋放所有readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 設(shè)置元素為nil
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 釋放所有writers,會(huì)引起panic,見下面說明
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 設(shè)置元素為nil
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放鎖
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 調(diào)度所有g(shù)
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 喚醒goroutine
goready(gp, 3)
}
}
- 聲明一個(gè)
gList
鏈表結(jié)構(gòu)體 - 將接收隊(duì)列
c.recvq
中的所有元素添加到gList
中锅锨,并將原來的值設(shè)置為零
值 - 將發(fā)送隊(duì)列
c.sendq
中的所有元素添加到gList
中叽赊,并將原來的值設(shè)置為零
值 - 將所有的阻塞goroutine通過函數(shù)
goready()
進(jìn)行調(diào)度
文章里提到在對(duì)c.sendq
處理的時(shí)候可能會(huì)觸發(fā)panic。這是因?yàn)殛P(guān)閉chan后必搞,執(zhí)行了 goready()
對(duì)原來sendq里的sudogs 進(jìn)行了進(jìn)行了重新調(diào)度必指,這時(shí)候發(fā)現(xiàn)chan已經(jīng)關(guān)閉了,所以會(huì)panic恕洲。那么又是如何調(diào)度的呢塔橡?
package main
import (
"fmt"
"time"
)
var ch chan int
func f() {
}
func main() {
ch := make(chan int, 10)
// buffer大小為10,這里發(fā)送11個(gè)梅割,使最后一個(gè)進(jìn)入到c.sendq里面
for i := 0; i < 11; i++ { // i < 10 則正常
go func(v int) {
ch <- v
}(i)
}
time.Sleep(time.Second)
fmt.Println("發(fā)送完畢")
// 關(guān)閉chan,將對(duì)sendq里的g進(jìn)行喚醒葛家,喚醒后發(fā)現(xiàn)chan關(guān)閉狀態(tài)户辞,直接panic
close(ch)
for v := range ch {
fmt.Println(v)
}
time.Sleep(time.Second)
}
有一條廣泛流傳的關(guān)閉 channel 的原則:
don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.
不要從一個(gè) receiver 側(cè)關(guān)閉 channel,也不要在有多個(gè) sender 時(shí)癞谒,關(guān)閉 channel底燎。對(duì)于只有一個(gè)sender的話,直接在sender端關(guān)閉就可以弹砚。但對(duì)于多個(gè)sender的話双仍,則需要通過一個(gè)信號(hào)量進(jìn)行關(guān)閉,參考這里桌吃。
總結(jié)
close 操作會(huì)觸發(fā)goroutine的調(diào)度行為朱沃。
總結(jié)
- 在發(fā)送和讀取 chan的時(shí)候,如果chan為nil的話读存,這時(shí)候就根據(jù)是否阻塞進(jìn)行判斷是否會(huì)發(fā)生panic为流。如果阻塞狀態(tài)的話,則會(huì)發(fā)生panic让簿,否則會(huì)直接返回
- 對(duì)chan 發(fā)送或接收數(shù)據(jù)的時(shí)候要保證已初始化狀態(tài)
- 對(duì)于已關(guān)閉的chan再次關(guān)閉會(huì)觸發(fā)panic
- 對(duì)于發(fā)送和讀取數(shù)據(jù)都有三種處理情況敬察,分別是直接讀寫,緩存區(qū)讀寫和阻塞讀寫
- 發(fā)送和接收數(shù)據(jù)的本質(zhì)上是對(duì)值的
復(fù)制
操作尔当。All transfer of value on the go channels happens with the copy of value.
- close(ch)會(huì)觸發(fā)goroutine 的調(diào)度行為
- 內(nèi)部使用 sudogs對(duì)goroutine進(jìn)行了一次封裝莲祸。
- 如果buffer中的元素?zé)o法保證消費(fèi)完的話,則會(huì)產(chǎn)生內(nèi)存泄漏的危險(xiǎn)椭迎,這時(shí)gc是無法對(duì)這些元素時(shí)間清理的锐帜,過多的 chan就會(huì)占用大量的資源
- 對(duì)于chan的分配的內(nèi)存是在哪里,heap還是stack?
參考
- https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#
- https://studygolang.com/articles/20714
- https://github.com/qcrao/Go-Questions/tree/master/channel
本文如有錯(cuò)誤畜号,歡迎大家在下方留言指出缴阎。