由淺入深剖析 go channel
channel 是 golang 中最核心的 feature 之一揖闸,因此理解 Channel 的原理對于學(xué)習(xí)和使用 golang 非常重要。
channel 是 goroutine 之間通信的一種方式凯正,可以類比成 Unix 中的進(jìn)程的通信方式管道组题。
CSP 模型
在講 channel 之前姥宝,有必要先提一下 CSP 模型善涨,傳統(tǒng)的并發(fā)模型主要分為 Actor 模型和 CSP 模型掏婶,CSP 模型全稱為 communicating sequential processes啃奴,CSP 模型由并發(fā)執(zhí)行實(shí)體(進(jìn)程,線程或協(xié)程)气堕,和消息通道組成纺腊,實(shí)體之間通過消息通道發(fā)送消息進(jìn)行通信。和 Actor 模型不同茎芭,CSP 模型關(guān)注的是消息發(fā)送的載體揖膜,即通道,而不是發(fā)送消息的執(zhí)行實(shí)體梅桩。關(guān)于 CSP 模型的更進(jìn)一步的介紹壹粟,有興趣的同學(xué)可以閱讀論文 Communicating Sequential Processes,Go 語言的并發(fā)模型參考了 CSP 理論宿百,其中執(zhí)行實(shí)體對應(yīng)的是 goroutine趁仙, 消息通道對應(yīng)的就是 channel。
channel 介紹
channel 提供了一種通信機(jī)制垦页,通過它雀费,一個(gè) goroutine 可以想另一 goroutine 發(fā)送消息。channel 本身還需關(guān)聯(lián)了一個(gè)類型痊焊,也就是 channel 可以發(fā)送數(shù)據(jù)的類型盏袄。例如: 發(fā)送 int 類型消息的 channel 寫作 chan int 。
channel 創(chuàng)建
channel 使用內(nèi)置的 make 函數(shù)創(chuàng)建薄啥,下面聲明了一個(gè) chan int 類型的 channel:
ch := make(chan int)
c和 map 類似辕羽,make 創(chuàng)建了一個(gè)底層數(shù)據(jù)結(jié)構(gòu)的引用,當(dāng)賦值或參數(shù)傳遞時(shí)垄惧,只是拷貝了一個(gè) channel 引用刁愿,指向相同的 channel 對象。和其他引用類型一樣到逊,channel 的空值為 nil 铣口。使用 == 可以對類型相同的 channel 進(jìn)行比較,只有指向相同對象或同為 nil 時(shí)蕾管,才返回 true
枷踏。
channel 的讀寫操作
ch := make(chan int)
// write to channel
ch <- x
// read from channel
x <- ch
// another way to read
x = <- ch
channel 一定要初始化后才能進(jìn)行讀寫操作,否則會永久阻塞掰曾。
關(guān)閉 channel
golang 提供了內(nèi)置的 close 函數(shù)對 channel 進(jìn)行關(guān)閉操作旭蠕。
ch := make(chan int)
close(ch)
有關(guān) channel 的關(guān)閉,你需要注意以下事項(xiàng):
- 關(guān)閉一個(gè)未初始化(nil) 的 channel 會產(chǎn)生 panic
- 重復(fù)關(guān)閉同一個(gè) channel 會產(chǎn)生 panic
- 向一個(gè)已關(guān)閉的 channel 中發(fā)送消息會產(chǎn)生 panic
- 從已關(guān)閉的 channel 讀取消息不會產(chǎn)生 panic旷坦,且能讀出 channel 中還未被讀取的消息掏熬,若消息均已讀出,則會讀到類型的零值秒梅。從一個(gè)已關(guān)閉的 channel 中讀取消息永遠(yuǎn)不會阻塞旗芬,并且會返回一個(gè)為 false 的 ok-idiom,可以用它來判斷 channel 是否關(guān)閉
- 關(guān)閉 channel 會產(chǎn)生一個(gè)廣播機(jī)制捆蜀,所有向 channel 讀取消息的 goroutine 都會收到消息
ch := make(chan int, 10)
ch <- 11
ch <- 12
close(ch)
for x := range ch {
fmt.Println(x)
}
x, ok := <- ch
fmt.Println(x, ok)
-----
output:
11
12
0 false
channel 的類型
channel 分為不帶緩存的 channel 和帶緩存的 channel疮丛。
無緩存的 channel
從無緩存的 channel 中讀取消息會阻塞幔嫂,直到有 goroutine 向該 channel 中發(fā)送消息;同理誊薄,向無緩存的 channel 中發(fā)送消息也會阻塞履恩,直到有 goroutine 從 channel 中讀取消息。
通過無緩存的 channel 進(jìn)行通信時(shí)呢蔫,接收者收到數(shù)據(jù) happens before 發(fā)送者 goroutine 喚醒
有緩存的 channel
有緩存的 channel 的聲明方式為指定 make 函數(shù)的第二個(gè)參數(shù)切心,該參數(shù)為 channel 緩存的容量
ch := make(chan int, 10)
有緩存的 channel 類似一個(gè)阻塞隊(duì)列(采用環(huán)形數(shù)組實(shí)現(xiàn))。當(dāng)緩存未滿時(shí)片吊,向 channel 中發(fā)送消息時(shí)不會阻塞绽昏,當(dāng)緩存滿時(shí),發(fā)送操作將被阻塞俏脊,直到有其他 goroutine 從中讀取消息全谤;相應(yīng)的,當(dāng) channel 中消息不為空時(shí)爷贫,讀取消息不會出現(xiàn)阻塞啼县,當(dāng) channel 為空時(shí),讀取操作會造成阻塞沸久,直到有 goroutine 向 channel 中寫入消息季眷。
ch := make(chan int, 3)
// blocked, read from empty buffered channel
<- ch
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
// blocked, send to full buffered channel
ch <- 4
通過 len 函數(shù)可以獲得 chan 中的元素個(gè)數(shù),通過 cap 函數(shù)可以得到 channel 的緩存長度卷胯。
channel 的用法
goroutine 通信
看一個(gè) effective go 中的例子:
c := make(chan int) // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c <- 1 // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c
主 goroutine 會阻塞子刮,直到執(zhí)行 sort 的 goroutine 完成。
range 遍歷
channel 也可以使用 range 取值窑睁,并且會一直從 channel 中讀取數(shù)據(jù)挺峡,直到有 goroutine 對改 channel 執(zhí)行 close 操作,循環(huán)才會結(jié)束担钮。
// consumer worker
ch := make(chan int, 10)
for x := range ch{
fmt.Println(x)
}
等價(jià)于
for {
x, ok := <- ch
if !ok {
break
}
fmt.Println(x)
}
配合 select 使用
select 用法類似與 IO 多路復(fù)用橱赠,可以同時(shí)監(jiān)聽多個(gè) channel 的消息狀態(tài),看下面的例子
select {
case <- ch1:
...
case <- ch2:
...
case ch3 <- 10;
...
default:
...
}
- select 可以同時(shí)監(jiān)聽多個(gè) channel 的寫入或讀取
- 執(zhí)行 select 時(shí)箫津,若只有一個(gè) case 通過(不阻塞)狭姨,則執(zhí)行這個(gè) case 塊
- 若有多個(gè) case 通過,則隨機(jī)挑選一個(gè) case 執(zhí)行
- 若所有 case 均阻塞苏遥,且定義了 default 模塊饼拍,則執(zhí)行 default 模塊。若未定義 default 模塊田炭,則 select 語句阻塞师抄,直到有 case 被喚醒。
- 使用 break 會跳出 select 塊教硫。
1. 設(shè)置超時(shí)時(shí)間
ch := make(chan struct{})
// finish task while send msg to ch
go doTask(ch)
timeout := time.After(5 * time.Second)
select {
case <- ch:
fmt.Println("task finished.")
case <- timeout:
fmt.Println("task timeout.")
}
2. quite channel
有一些場景中叨吮,一些 worker goroutine 需要一直循環(huán)處理信息辆布,直到收到 quit 信號
msgCh := make(chan struct{})
quitCh := make(chan struct{})
for {
select {
case <- msgCh:
doWork()
case <- quitCh:
finish()
return
}
單向 channel
即只可寫入或只可讀的channel,事實(shí)上 channel 只讀或只寫都沒有意義茶鉴,所謂的單向 channel 其實(shí)知識聲明時(shí)用谚殊,比如
func foo(ch chan<- int) <-chan int {...}
chan<- int
表示一個(gè)只可寫入的 channel,<-chan int
表示一個(gè)只可讀取的 channel蛤铜。上面這個(gè)函數(shù)約定了 foo 內(nèi)只能從向 ch 中寫入數(shù)據(jù),返回只一個(gè)只能讀取的 channel丛肢,雖然使用普通的 channel 也沒有問題围肥,但這樣在方法聲明時(shí)約定可以防止 channel 被濫用,這種預(yù)防機(jī)制發(fā)生再編譯期間蜂怎。
channel 源碼分析
channel 的主要實(shí)現(xiàn)在 src/runtime/chan.go
中穆刻,以下源碼均基于 go1.9.2。源碼閱讀時(shí)為了更好的理解 channel 特性杠步,幫助正確合理的使用 channel氢伟,閱讀代碼的過程可以回憶前面章節(jié)的 channel 特性。
channel 類結(jié)構(gòu)
channel 相關(guān)類定義如下:
// channel 類型定義
type hchan struct {
// channel 中的元素?cái)?shù)量, len
qcount uint // total data in the queue
// channel 的大小, cap
dataqsiz uint // size of the circular queue
// channel 的緩沖區(qū)幽歼,環(huán)形數(shù)組實(shí)現(xiàn)
buf unsafe.Pointer // points to an array of dataqsiz elements
// 單個(gè)元素的大小
elemsize uint16
// closed 標(biāo)志位
closed uint32
// 元素的類型
elemtype *_type // element type
// send 和 recieve 的索引朵锣,用于實(shí)現(xiàn)環(huán)形數(shù)組隊(duì)列
sendx uint // send index
recvx uint // receive index
// recv goroutine 等待隊(duì)列
recvq waitq // list of recv waiters
// send goroutine 等待隊(duì)列
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
// 等待隊(duì)列的鏈表實(shí)現(xiàn)
type waitq struct {
first *sudog
last *sudog
}
// in src/runtime/runtime2.go
// 對 G 的封裝
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
可以看到,channel 的主要組成有:一個(gè)環(huán)形數(shù)組實(shí)現(xiàn)的隊(duì)列甸私,用于存儲消息元素诚些;兩個(gè)鏈表實(shí)現(xiàn)的 goroutine 等待隊(duì)列,用于存儲阻塞在 recv 和 send 操作上的 goroutine皇型;一個(gè)互斥鎖诬烹,用于各個(gè)屬性變動(dòng)的同步
channel make 實(shí)現(xiàn)
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// case 1: channel 不含有指針
// case 2: size == 0,即無緩沖 channel
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
// 在堆上分配連續(xù)的空間用作 channel
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
// 有緩沖 channel 初始化
c = new(hchan)
// 堆上分配 buf 內(nèi)存
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
make 的過程還比較簡單弃鸦,需要注意一點(diǎn)的是當(dāng)元素不含指針的時(shí)候绞吁,會將整個(gè) hchan 分配成一個(gè)連續(xù)的空間。
channel send
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 前面章節(jié)說道的唬格,當(dāng) channel 未初始化或?yàn)?nil 時(shí)家破,向其中發(fā)送數(shù)據(jù)將會永久阻塞
if c == nil {
if !block {
return false
}
// gopark 會使當(dāng)前 goroutine 休眠,并通過 unlockf 喚醒购岗,但是此時(shí)傳入的 unlockf 為 nil, 因此员舵,goroutine 會一直休眠
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取同步鎖
lock(&c.lock)
// 之前章節(jié)提過,向已經(jīng)關(guān)閉的 channel 發(fā)送消息會產(chǎn)生 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// CASE1: 當(dāng)有 goroutine 在 recv 隊(duì)列上等待時(shí)藕畔,跳過緩存隊(duì)列马僻,將消息直接發(fā)給 reciever goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// CASE2: 緩存隊(duì)列未滿,則將消息復(fù)制到緩存隊(duì)列上
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// CASE3: 緩存隊(duì)列已滿注服,將goroutine 加入 send 隊(duì)列
// 初始化 sudog
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 加入隊(duì)列
c.sendq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// 喚醒 goroutine
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
從 send 代碼中可以看到韭邓,之前章節(jié)提到的一些特性都在代碼中有所體現(xiàn)措近,
send 有以下幾種情況:
- 有 goroutine 阻塞在 channel recv 隊(duì)列上,此時(shí)緩存隊(duì)列為空女淑,直接將消息發(fā)送給 reciever goroutine,只產(chǎn)生一次復(fù)制
- 當(dāng) channel 緩存隊(duì)列有剩余空間時(shí)瞭郑,將數(shù)據(jù)放到隊(duì)列里,等待接收鸭你,接收后總共產(chǎn)生兩次復(fù)制
- 當(dāng) channel 緩存隊(duì)列已滿時(shí)屈张,將當(dāng)前 goroutine 加入 send 隊(duì)列并阻塞。
channel recieve
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 從 nil 的 channel 中接收消息,永久阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// CASE1: 從已經(jīng) close 且為空的 channel recv 數(shù)據(jù),返回空值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// CASE2: send 隊(duì)列不為空
// CASE2.1: 緩存隊(duì)列為空畏浆,直接從 sender recv 元素
// CASE2.2: 緩存隊(duì)列不為空竟终,此時(shí)只有可能是緩存隊(duì)列已滿,從隊(duì)列頭取出元素,并喚醒 sender 將元素寫入緩存隊(duì)列尾部。由于為環(huán)形隊(duì)列,因此焰盗,隊(duì)列滿時(shí)只需要將隊(duì)列頭復(fù)制給 reciever,同時(shí)將 sender 元素復(fù)制到該位置咒林,并移動(dòng)隊(duì)列頭尾索引熬拒,不需要移動(dòng)隊(duì)列元素
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// CASE3: 緩存隊(duì)列不為空,直接從隊(duì)列取元素垫竞,移動(dòng)頭索引
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// CASE4: 緩存隊(duì)列為空梦湘,將 goroutine 加入 recv 隊(duì)列,并阻塞
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
channel close
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 重復(fù) close件甥,產(chǎn)生 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc(unsafe.Pointer(&c))
racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
racerelease(unsafe.Pointer(c))
}
c.closed = 1
var glist *g
// 喚醒所有 reciever
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
// 喚醒所有 sender捌议,并產(chǎn)生 panic
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
}
}