1)chan結(jié)構(gòu)體-hchan
- channel內(nèi)部是固定長(zhǎng)度的雙向循環(huán)鏈表储矩,make時(shí)確認(rèn)size大小
- 環(huán)形隊(duì)列有關(guān)的變量:
qcount 入隊(duì)元素?cái)?shù)
dataqsiz 隊(duì)列容量
sendx發(fā)送索引
recvx接收索引 - 關(guān)于發(fā)送緩沖隊(duì)列和接收緩沖隊(duì)列:
當(dāng)寫阻塞隊(duì)列不空屡拨,說(shuō)明chan的buf已經(jīng)無(wú)法寫了,并且讀阻塞隊(duì)列為空
當(dāng)讀阻塞隊(duì)列不空,說(shuō)明chan的buf和sendq均為空
type hchan struct {
qcount uint // 當(dāng)前隊(duì)列中總元素個(gè)數(shù)
dataqsiz uint // 環(huán)形隊(duì)列長(zhǎng)度十饥,即緩沖區(qū)大芯炖ァ(申明channel時(shí)指定的大凶髅摹)
buf unsafe.Pointer // 環(huán)形隊(duì)列指針
elemsize uint16 // buf中每個(gè)元素的大小
closed uint32 // 0為正常厚掷,關(guān)閉時(shí)字段為1
elemtype *_type // 元素類型,用于傳值過(guò)程的賦值
// 環(huán)形緩沖區(qū)中已發(fā)送位置索引
sendx uint // send index
// 環(huán)形緩沖區(qū)中已接收位置索引
recvx uint // receive index
// 等待讀消息的groutine隊(duì)列。
// 當(dāng)讀阻塞隊(duì)列不空,說(shuō)明chan的buf和sendq均為空
recvq waitq // list of recv waiters,放的是等待接收的睡眠協(xié)程
// 等待寫消息的groutine隊(duì)列
// 當(dāng)寫阻塞隊(duì)列不空,說(shuō)明chan的buf已經(jīng)無(wú)法寫了调缨,并且讀阻塞隊(duì)列為空
sendq waitq // list of send waiters伤哺,放的是等待發(fā)送的睡眠協(xié)程
// 互斥鎖杖虾,為每個(gè)讀寫操作鎖定通道(發(fā)送和接收必須互斥)
lock mutex
}
// 等待讀寫的隊(duì)列數(shù)據(jù)結(jié)構(gòu)媒熊,保證先進(jìn)先出
type waitq struct {
first *sudog
last *sudog
}
2)make chan
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
//hchanSize返回實(shí)際占用的內(nèi)存大小奇适,扣去字節(jié)對(duì)齊部分
// 對(duì)應(yīng)的源碼為 c := make(chan int, size)
// c := make(chan int) 這種情況下,size = 0
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//判斷a*b是否會(huì)溢出(對(duì)于64位機(jī)器芦鳍,大于2的64次方)
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 空隊(duì)列或隊(duì)列中元素為空
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// raceaddr內(nèi)部實(shí)現(xiàn)為:return unsafe.Pointer(&c.buf)
//c.buf存c.buf自己的地址嚷往,用于競(jìng)爭(zhēng)探測(cè)。
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// elem不包含指針柠衅,直接申請(qǐng)hchan+mem內(nèi)存大小
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
//包含指針皮仁,則為buf單獨(dú)開(kāi)辟空間
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
3) 發(fā)送數(shù)據(jù)
sudog 存儲(chǔ)
sudoG 結(jié)構(gòu)體:
// sudog 代表在等待列表里的 g,比如向 channel 發(fā)送/接收內(nèi)容時(shí)
// 之所以需要 sudog 是因?yàn)?g 和同步對(duì)象之間的關(guān)系是多對(duì)多的
// 一個(gè) g 可能會(huì)在多個(gè)等待隊(duì)列中菲宴,所以一個(gè) g 可能被打包為多個(gè) sudog
// 多個(gè) g 也可以等待在同一個(gè)同步對(duì)象上
// 因此對(duì)于一個(gè)同步對(duì)象就會(huì)有很多 sudog 了
// sudog 是從一個(gè)特殊的池中進(jìn)行分配的贷祈。用 acquireSudog 和 releaseSudog 來(lái)分配和釋放 sudog
//block表示是否阻塞(正常都是阻塞的,在select中有default就是非阻塞的)
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//第一種情況喝峦,空chan
if c == nil {
//非阻塞立即返回
if !block {
return false
}
//協(xié)程掛起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
//未關(guān)閉的chan且非阻塞狀態(tài)如果滿了就立即返回
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//互斥鎖上鎖
lock(&c.lock)
//向已經(jīng)關(guān)閉的chan發(fā)數(shù)據(jù)會(huì)panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//從接收者隊(duì)列中拿一個(gè)goroutine势誊,把數(shù)據(jù)發(fā)給對(duì)應(yīng)的goroutine,直接返回
//接收隊(duì)列有數(shù)據(jù)谣蠢,說(shuō)明buf環(huán)為空
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//沒(méi)有等待接收的goroutine粟耻,如果隊(duì)列不空钙勃,則放入到buf中
if c.qcount < c.dataqsiz {
//緩存環(huán)形buf未滿椿肩,直接放進(jìn)去,返回
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
//非阻塞酪刀,緩存滿了谈喳,則放入失敗
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
//獲取一個(gè)sudog饭玲,把send協(xié)程信息和send內(nèi)容放入到sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//把sudong放入chan的發(fā)送阻塞隊(duì)列中
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
//休眠當(dāng)前協(xié)程,等待某次接收流程被喚醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
//保持ep不被gc掉
KeepAlive(ep)
// someone woke us up.
//判斷協(xié)程是否被其他處喚醒過(guò)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)//釋放sudog
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
//直接發(fā)送的recv goroutine
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
//釋放chan的互斥鎖
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//喚醒recv 協(xié)程
goready(gp, skip+1)
}
發(fā)送過(guò)程:
1當(dāng)對(duì)一個(gè)nil chan進(jìn)行寫操作時(shí)叁执,如果是非阻塞調(diào)用,直接返回矮冬;否則將當(dāng)前協(xié)程掛起
2 非阻塞模式且chan未close谈宛,沒(méi)有緩沖區(qū)且沒(méi)有等待接收或者緩沖區(qū)滿的情況下,直接return false胎署。
3 c.recvq中有等待讀的接收者(說(shuō)明喚醒buf為空)吆录,將其出隊(duì),將數(shù)據(jù)直接copy給接收者琼牧,并喚醒接收者恢筝。 返回
4 緩沖區(qū)未滿的情況下哀卫,數(shù)據(jù)放入環(huán)形緩沖區(qū)即可。 返回
5緩沖區(qū)滿了撬槽,把go協(xié)程信息和element指針?lè)湃氲叫律暾?qǐng)sudog此改,把sudog放入到sendq(發(fā)送阻塞隊(duì)列)。
總結(jié)一下:先看接收阻塞隊(duì)列侄柔,再看環(huán)形緩沖隊(duì)列共啃,最后不行就掛起當(dāng)前協(xié)程放入到發(fā)送阻塞隊(duì)列(對(duì)于block類型)。
4) chan讀操作
1 讀空chan直接阻塞當(dāng)前協(xié)程
2 非阻塞狀態(tài)且chan為空的話已經(jīng)close直接返回0值暂题,沒(méi)close的直接返回
3 對(duì)于阻塞狀態(tài)移剪,如果已經(jīng)關(guān)閉且隊(duì)列為空的話,直接返回0值
開(kāi)始正常步驟:
4 如果有等待發(fā)送數(shù)據(jù)的groutine薪者,從sendq中取出一個(gè)等待發(fā)送數(shù)據(jù)的Groutine纵苛,取出數(shù)據(jù)
5 如果沒(méi)有等待的groutine,且環(huán)形隊(duì)列中有數(shù)據(jù)言津,從隊(duì)列中取出數(shù)據(jù)
6 如果沒(méi)有等待的groutine攻人,且環(huán)形隊(duì)列中也沒(méi)有數(shù)據(jù),則阻塞該Groutine纺念,并將groutine打包為sudogo加入到recevq等待隊(duì)列中
總結(jié)一下:先從發(fā)送阻塞隊(duì)列取贝椿,沒(méi)有就從緩沖區(qū)取,還沒(méi)有就掛起當(dāng)前協(xié)程到接收緩沖隊(duì)列(block型).
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//非阻塞時(shí) 先檢查chan中是否有可以接收的數(shù)據(jù)(empty())
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
//chan沒(méi)被關(guān)閉陷谱,直接返回
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
//已經(jīng)關(guān)閉且無(wú)任何緩存數(shù)據(jù)的chan烙博,直接返回
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
//已經(jīng)關(guān)閉的chan,隊(duì)列沒(méi)有數(shù)據(jù)烟逊,存放數(shù)據(jù)清零直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//如果發(fā)送隊(duì)列有阻塞go協(xié)程(說(shuō)明buf隊(duì)列已經(jīng)滿了)渣窜,則拿一個(gè)進(jìn)行接收
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//buf隊(duì)列中有數(shù)據(jù),直接取出來(lái)一個(gè)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//未關(guān)閉且沒(méi)數(shù)據(jù)可以取宪躯,非阻塞直接返回
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
//沒(méi)任何數(shù)據(jù)可讀乔宿,當(dāng)前協(xié)程放入讀阻塞隊(duì)列
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
5)關(guān)閉chan操作
把讀阻塞隊(duì)列所有協(xié)程釋放,寫阻塞隊(duì)列所有協(xié)程釋放(并把數(shù)據(jù)丟棄)
func closechan(c *hchan) {
//關(guān)閉空chan访雪,直接panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
//已經(jīng)關(guān)閉的chan详瑞,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// 釋放讀阻塞隊(duì)列
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 釋放寫阻塞隊(duì)列,同時(shí)elem都直接釋放(數(shù)據(jù)丟棄臣缀,無(wú)法被讀)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
//釋放所有被阻塞的協(xié)程
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
https://juejin.cn/post/7036279988131201054#heading-4
https://juejin.cn/post/6875325172249788429