[toc]
基礎概念
Channel 是 Golang 的核心類型摧找,常用于多個 Goroutine 之間的通信核行。可以把 Channel 理解成是一個單向的管道蹬耘,具有 FIFO 特性芝雪。
image.png
Channel 是有容量限制的
- 當容量是 0 時,稱為無緩沖 Channel综苔。發(fā)送和接收只有一方就緒時惩系,就緒方會被阻塞直到另一方也就緒。
- 當容量大于 0 時如筛,稱為有緩沖 Channel堡牡。當傳輸中的元素個數超過容量時,發(fā)送方將會被阻塞直到有可用的緩沖空間出現杨刨;當傳輸中的元素個數為 0 時晤柄,消費方將會被阻塞直到緩沖空間出現新的數據。
數據結構
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
- qcount拭嫁,緩沖隊列的大小可免,記錄實際元素數量
- dataqsiz抓于,緩沖隊列的容量,記錄最大可存儲元素數量
- buf浇借,指向環(huán)形緩沖隊列的指針
- elemsize捉撮,每個元素的大小
- closed,記錄 channel 的關閉狀態(tài)
- elemtype妇垢,元素的類型
- sendx巾遭,緩沖隊列中即將發(fā)送的數據下標
- recvx,緩沖隊列中即將接收的數據下標
- recvq闯估,等待從 channel 接收數據的 goroutine 雙向鏈表
- sendq灼舍,等待向 channel 發(fā)送數據的 goroutine 雙向鏈表
- lock,多 goroutine 讀寫的并發(fā)保護鎖
圖解發(fā)送數據
image.png
注:無緩沖 Channel 原理類似不做贅述
圖解接收數據
image.png
注:無緩沖 Channel 原理類似不做贅述
源碼解讀
發(fā)送數據
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil { // 關鍵點1
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz { // 關鍵點2
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
...
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 關鍵點3
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
...
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
關鍵點
- 當 recvq 有等待的接收者時涨薪,說明緩沖隊列是空的骑素,則將數據直接發(fā)送給接收者,然后將接收者的 Goroutine 標記成可運行的狀態(tài)刚夺,并加入到本地可運行隊列中献丑。
- 當緩沖隊列未滿時,則將數據直接寫入緩沖隊列侠姑。
- 當緩沖隊列滿了或者無緩沖隊列時创橄,則將發(fā)送數據的指針和當前 Goroutine 等信息組裝成 sudog 并加入到 sendq 中,等待合適機會執(zhí)行莽红。
接收數據
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock)
...
if sg := c.sendq.dequeue(); sg != nil { // 關鍵點1
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 { // 關鍵點2
// Receive directly from queue
qp := chanbuf(c, c.recvx)
...
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 關鍵點3
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
return true, success
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
...
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
...
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
關鍵點
- 當 sendq 有等待的發(fā)送者時妥畏,如果是無緩沖隊列,則直接從發(fā)送者獲取數據安吁;如果是緩沖隊列滿了醉蚁,則從緩沖隊列取出一個數據,然后將發(fā)送者的數據寫入緩沖隊列柳畔。最后將發(fā)送者的 Goroutine 標記成可運行的狀態(tài)馍管,并加入到本地可運行隊列中。
- 當緩沖隊列有數據時薪韩,則直接從緩沖隊列讀取數據确沸。
- 當緩沖無數據時,則將接收數據的指針和當前 Goroutine 等信息組裝成 sudog 并加入到 recvq 中俘陷,等待合適機會執(zhí)行罗捎。