1. 數(shù)據(jù)對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu)
runtime.chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
2.channel 創(chuàng)建
通過(guò)make 創(chuàng)建channel ,在編譯階段午衰,對(duì)make 進(jìn)行類型檢查和展開亚兄,最終調(diào)用runtime.makechan()
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
3. 發(fā)送數(shù)據(jù)過(guò)程分析
使用 ch <- i 發(fā)送數(shù)據(jù),通過(guò)編譯器 最終執(zhí)行 runtime.chansend()
- 直接發(fā)送數(shù)據(jù)
如果目標(biāo) Channel 沒有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine包蓝,那么runtime.chansend
會(huì)從接收隊(duì)列recvq
中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù)
if c.closed != 0 { //如果已經(jīng)關(guān)閉迎捺,仍然向chan 發(fā)送數(shù)據(jù),則拋異常
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//已經(jīng)存在接收者箕别,則直接copy 數(shù)據(jù)且喚醒
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
- 具有緩存隊(duì)列
緩存區(qū)未滿時(shí),將要發(fā)送的數(shù)據(jù),加入緩存循環(huán)隊(duì)列
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
- 阻塞發(fā)送
如果當(dāng)前既沒有接收者且循環(huán)隊(duì)列已滿或不存在钢坦,則 發(fā)送進(jìn)入阻塞狀態(tài)
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
4.接收數(shù)據(jù)過(guò)程分析
i <- ch
i, ok <- ch
經(jīng)過(guò)編譯器的檢查和類型展開究孕,最終調(diào)用runtime. chanrecv()
當(dāng)我們從一個(gè)空 Channel 接收數(shù)據(jù)時(shí)會(huì)直接調(diào)用 runtime.gopark
讓出處理器的使用權(quán);
如果當(dāng)前 Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何數(shù)據(jù)啥酱,那么會(huì)清除 ep 指針中的數(shù)據(jù)并立刻返回;
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
- 直接接收
//出現(xiàn)在2種場(chǎng)景下:1. 沒有緩存隊(duì)列 2.緩存隊(duì)列已經(jīng)滿了
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { //沒有緩存隊(duì)列的場(chǎng)景
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {// 緩存隊(duì)列已經(jīng)滿了
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
- 緩存區(qū)接收
...
if c.qcount > 0 {//緩存區(qū)有數(shù)據(jù)且緩存區(qū)未滿時(shí)
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}
1.當(dāng) Channel 的緩沖區(qū)中已經(jīng)包含數(shù)據(jù)時(shí)爹凹,從 Channel 中接收數(shù)據(jù)會(huì)直接從緩沖區(qū)中 recvx 的索引位置中取出數(shù)據(jù)進(jìn)行處理
2.如果接收數(shù)據(jù)的內(nèi)存地址不為空,那么會(huì)使用 runtime.typedmemmove
將緩沖區(qū)中的數(shù)據(jù)拷貝到內(nèi)存中镶殷、清除隊(duì)列中的數(shù)據(jù)并完成收尾工作禾酱。
3.收尾工作包括遞增 recvx,一旦發(fā)現(xiàn)索引超過(guò)了 Channel 的容量時(shí),會(huì)將它歸零重置循環(huán)隊(duì)列的索引颤陶;除此之外颗管,該函數(shù)還會(huì)減少 qcount 計(jì)數(shù)器并釋放持有 Channel 的鎖
- 阻塞接收
當(dāng) Channel 的發(fā)送隊(duì)列中不存在等待的 Goroutine 并且緩沖區(qū)中也不存在任何數(shù)據(jù)時(shí),從管道中接收數(shù)據(jù)的操作會(huì)變成阻塞的
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
}
5. 關(guān)閉流程分析
編譯器會(huì)將用于關(guān)閉管道的 close
關(guān)鍵字轉(zhuǎn)換成 OCLOSE
節(jié)點(diǎn)以及 runtime.closechan
函數(shù)滓走。
當(dāng) Channel 是一個(gè)空指針或者已經(jīng)被關(guān)閉時(shí)垦江,Go 語(yǔ)言運(yùn)行時(shí)都會(huì)直接崩潰并拋出異常:
func closechan(c *hchan) {
if c == nil { //為空時(shí) 關(guān)閉 拋異常
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 { //已經(jīng)關(guān)閉,再次關(guān)閉拋異常
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
.....
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
}