channel
channel的實(shí)現(xiàn)相對(duì)map簡單了不少予跌,通過鎖mutex來保證并發(fā)安全榆纽,同時(shí)只提供讀寫和關(guān)閉操作,channel支持有/無緩沖區(qū)芬探,對(duì)于有緩沖區(qū)的channel神得,緩沖區(qū)大小也是在初始化的時(shí)候確定了,后續(xù)不會(huì)有擴(kuò)容操作偷仿,一起來看看源碼吧
源碼
初始化
// channel結(jié)構(gòu)體
type hchan struct {
// 目前緩沖區(qū)已使用數(shù)量哩簿,對(duì)于無緩沖區(qū)的channel,qcount=0
qcount uint // total data in the queue
// 緩沖區(qū)大小 make(chan int, 3)其中3就是申請(qǐng)的緩沖區(qū)大小
dataqsiz uint // size of the circular queue
// 指向緩沖區(qū)的指針炎疆,用于讀/寫緩沖區(qū)
buf unsafe.Pointer // points to an array of dataqsiz elements
// channel的元素size
elemsize uint16
// channel是否已關(guān)閉卡骂,還記得close(ch)吧
closed uint32
// channel的元素type
elemtype *_type // element type
// 寫buf索引,通過buf + sendx可以算出寫入位置
sendx uint // send index
// 讀buf索引形入,通過buf + recvx可以算出取出位置
recvx uint // receive index
// 讀channel隊(duì)列(當(dāng)緩存區(qū)已寫滿或無緩沖區(qū)的時(shí)候)全跨,讀動(dòng)作會(huì)進(jìn)行排隊(duì)
recvq waitq // list of recv waiters
// 寫channel隊(duì)列,同上亿遂,寫動(dòng)作也會(huì)進(jìn)行排隊(duì)
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
// 并發(fā)鎖
lock mutex
}
// 排隊(duì)隊(duì)列結(jié)構(gòu)
// 這里面包含了一個(gè)頭指針和一個(gè)尾指針
// go通過雙向鏈表實(shí)現(xiàn)讀寫channel隊(duì)列浓若,后面源碼的時(shí)候會(huì)看到
// 至于sudog這里不做詳細(xì)闡述,你可以認(rèn)為是g在某個(gè)事件等待隊(duì)列中的一個(gè)等待實(shí)體
// 因?yàn)橐粋€(gè)g可能需要等待多個(gè)事件蛇数,所以需要sudog作為委托去等待挪钓,一旦sudog被喚醒,它就會(huì)通知g
type waitq struct {
first *sudog
last *sudog
}
// channel初始化
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 控制channel elem的size耳舅,你可以試試構(gòu)造一個(gè)size很大的struct碌上,然后make對(duì)應(yīng)的channel,就會(huì)報(bào)錯(cuò)
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 控制緩存區(qū)大小
// 不能小于0
// 計(jì)算分配字節(jié)數(shù)的時(shí)候不能溢出
// 不能超過可分配內(nèi)存數(shù)
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
// 聲明一個(gè)hchan指針
var c *hchan
// 這里分三種情況進(jìn)行初始化
switch {
// 第一種無緩存區(qū)
case mem == 0:
// Queue or element size is zero.
// 不用分配buf浦徊,只分配hchan
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// 用于競態(tài)檢測馏予,本次源碼不闡述,感興趣自己去翻閱
c.buf = c.raceaddr()
// 第二種有緩沖區(qū)且channel元素不包含指針類型
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 直接申請(qǐng)一整塊內(nèi)存盔性,一個(gè)是方便gc霞丧,另外一個(gè)是減少內(nèi)存碎片
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 第三種有緩沖區(qū)且channel元素包含指針類型
default:
// Elements contain pointers.
// 分開申請(qǐng)hchan和buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化工作
// 元素size、元素類型冕香、緩沖區(qū)大小和鎖
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
寫channel
寫channel的核心函數(shù)是chansend蛹尝,同時(shí)有兩個(gè)對(duì)chansend包裝的函數(shù),分別是chansend1和selectnbsend悉尾,對(duì)應(yīng)阻塞和非阻塞模式突那,阻塞模式我們都知道,比如ch <- x构眯,就可能發(fā)生阻塞愕难,而非阻塞模式就是通過select...case來調(diào)用,這里集中看下chansend的源碼
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果channel是nil
if c == nil {
// 如果非阻塞模式,返回false
if !block {
return false
}
// 如果是阻塞模式务漩,就讓當(dāng)前g睡眠等待即掛起
// 至于gopark是怎么做的,后面會(huì)有單獨(dú)文章來聊聊g調(diào)度它褪,這里知道是做什么的就行
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
// 這里是對(duì)非阻塞模式的一個(gè)快速判斷饵骨,可以不用加鎖,減少鎖的頻率茫打,提升性能
// 如果非阻塞模式 + channel沒有關(guān)閉 + channel緩存區(qū)已經(jīng)滿了
// 這個(gè)時(shí)候肯定是寫不進(jìn)去了居触,返回false
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 加鎖
lock(&c.lock)
// 如果channle關(guān)閉了
// 還記得嗎,對(duì)一個(gè)closed的channel進(jìn)行寫入操作老赤,是會(huì)引發(fā)panic的轮洋,即使是select語句也不例外
if c.closed != 0 {
// 解鎖
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果channel沒關(guān)閉并且讀等待隊(duì)列中有等待的sg,直接取出并將ep傳遞過去
// dequeue是從雙向鏈中取頭一個(gè)sg抬旺,尾部排隊(duì)弊予,頭部取出,嚴(yán)格FIFO开财,保證recvq的順序性
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// send就是將要寫入的ep傳遞給取出的sg汉柒,同時(shí)會(huì)調(diào)用unlock解鎖
// 這里只是將sg喚醒,具體后續(xù)是sg對(duì)應(yīng)的g的動(dòng)作了
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果channel沒關(guān)閉责鳍,沒有等待讀的sg碾褂,且緩沖區(qū)沒空,就寫到緩沖區(qū)中
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// chanbuf就是通過c + sendx來找到寫入位置历葛,sendx下標(biāo)是從0開始的
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 傳遞ep
typedmemmove(c.elemtype, qp, ep)
// 索引+1
c.sendx++
// 這里可以指導(dǎo)正塌,buf邏輯上是一個(gè)環(huán)形的結(jié)構(gòu)體,當(dāng)sendx大于總長時(shí)恤溶,就從0開始乓诽,即從頭開始
// 有點(diǎn)類似mysql的redo log結(jié)構(gòu),一個(gè)環(huán) + 寫入標(biāo)志 + 讀取(擦除)標(biāo)志
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)元素?cái)?shù)量+1
c.qcount++
// 解鎖
unlock(&c.lock)
return true
}
// 如果channel沒關(guān)閉宏娄,沒有等待讀的sg问裕,沒有緩沖區(qū)或者緩沖區(qū)滿了
// 如果是非阻塞模式,解鎖孵坚,返回false即可
if !block {
unlock(&c.lock)
return false
}
// 如果是阻塞模式粮宛,不好意思,構(gòu)造本g的等待實(shí)體mysg卖宠,掛起等待
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// enqueue就是將mysq掛到sendq中
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
// gopark功能同上巍杈,將當(dāng)前的g置為等待狀態(tài)并解鎖c.lock
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 對(duì)應(yīng)喚醒后的動(dòng)作
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
讀channel
讀channel的核心函數(shù)是chanrecv,有對(duì)應(yīng)三個(gè)包裝函數(shù)扛伍,分別是chanrecv1筷畦、chanrecv2和selectnbrecv,前面兩個(gè)對(duì)應(yīng)阻塞模式,后面對(duì)應(yīng)非阻塞模式鳖宾,即配合select...case使用吼砂,chanrecv1和chanrecv2區(qū)別就是chanrecv2會(huì)多返回一個(gè)bool類型值,注意這個(gè)不可用于判斷channel是否關(guān)閉鼎文,只能用于判斷是否從channel中讀取到數(shù)據(jù)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 同chansend
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
// 無鎖模式快速判斷非阻塞模式下是否會(huì)讀channel失敗
// empty會(huì)確認(rèn)是否是無緩存區(qū)或者緩存區(qū)是空的
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
// empty無法確認(rèn)channel是否關(guān)閉
// 如果channel沒關(guān)閉渔肩,且無緩沖區(qū)或者緩沖區(qū)是空的,返回false
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
// 這里selected是false拇惋,配合select...case來看就明白了
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
// 如果channel關(guān)閉了周偎,再次通過empty函數(shù)確認(rèn)
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
// 這里會(huì)將ep指向的內(nèi)存清零,還記得嗎撑帖,讀取一個(gè)關(guān)閉的channel蓉坎,返回的是類型零值,就是這里清零的
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// 這里selected是true
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 上鎖
lock(&c.lock)
// 如果channel關(guān)閉了并且沒有緩沖區(qū)或者緩沖區(qū)是空的
// 同樣返回類型零值胡嘿,selected是true
// 這里說明一下蛉艾,就是即使channel關(guān)閉了,如果buf中還有數(shù)據(jù)沒讀完衷敌,是可以繼續(xù)讀的
// 這也是為什么還要判斷c.qcount=0的原因
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果channel沒關(guān)閉并且有正在等待寫的sg
// 直接將sg要寫的數(shù)據(jù)傳遞給ep
// dequeue方法上面說過了
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// recv方法與send方法動(dòng)作幾乎一樣
// 將sg要寫的數(shù)據(jù)傳遞給ep
// 喚醒sg繼續(xù)做后面的事
// 有一個(gè)不同的地方伺通,就是對(duì)于send,如果有等待讀的sg逢享,那么要么無緩沖區(qū)罐监,要么是空的緩沖區(qū)
// 這個(gè)時(shí)候是不需要改變sendx和recvx的,因?yàn)閎uf是空的環(huán)瞒爬,只要sendx和recvx的相對(duì)位置不變弓柱,在哪里無所謂
// 但是對(duì)于recv就不同了,如果有等待寫的sg侧但,那么要么無緩沖區(qū)矢空,要么緩沖區(qū)滿了,這個(gè)時(shí)候recvx=sendx
// 如果無緩沖區(qū)禀横,也不用改變sendx和recvx
// 如果有緩沖區(qū)屁药,那么需要將緩沖區(qū)對(duì)應(yīng)recvx位置的數(shù)據(jù)傳遞給ep
// 然后將sg的的數(shù)據(jù)傳遞給recvx對(duì)應(yīng)的內(nèi)存,然后recvx和sendx都需要加1柏锄,此時(shí)從sg讀取到的數(shù)據(jù)就會(huì)在buf環(huán)的最后
// 這樣做才能保證channel的讀取順序性
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果channel的buf中還有數(shù)據(jù)就繼續(xù)讀取
if c.qcount > 0 {
// Receive directly from queue
// 通過c + recvx找到對(duì)應(yīng)的讀取位置
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 傳遞給ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 擦除qp
typedmemclr(c.elemtype, qp)
// 讀索引加1
c.recvx++
// 回環(huán)
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// buf數(shù)據(jù)量減1
c.qcount--
// 解鎖
unlock(&c.lock)
// 返回
return true, true
}
// 如果channel沒有關(guān)閉且沒有等待寫的sg且無緩沖區(qū)或緩沖區(qū)是空的
// 非阻塞模式下返回false
if !block {
unlock(&c.lock)
return false, false
}
// 阻塞模式下回將當(dāng)前g掛起等待
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 將mysg掛到recvq中
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
關(guān)閉channel
// 關(guān)閉channel做幾件事
// closed置為1
// 收集讀等待隊(duì)列recvq的所有sg酿箭,每個(gè)sg的elem都設(shè)為類型零值
// 收集寫等待隊(duì)列sendq的所有sg,每個(gè)sg的elem都設(shè)為nil
// 喚醒所有收集的sg
func closechan(c *hchan) {
// close一個(gè)nil的channel是會(huì)panic的
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 重復(fù)close一個(gè)channel也是會(huì)panic的
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 設(shè)置關(guān)閉標(biāo)志
c.closed = 1
var glist gList
// release all readers
// 收集讀sg
for {
sg := c.recvq.dequeue()
// 空隊(duì)列趾娃,跳出循環(huán)
if sg == nil {
break
}
// 清零elem
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 收集寫sg
for {
sg := c.sendq.dequeue()
// 空隊(duì)列缭嫡,跳出循環(huán)
if sg == nil {
break
}
// elem置為nil
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 喚醒所有收集到的sg
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
總結(jié)
拋開g的調(diào)度那些跟channel無關(guān)的代碼,channel的實(shí)現(xiàn)還是挺簡單的抬闷,通過兩個(gè)等待FIFO隊(duì)列妇蛀、一個(gè)環(huán)形buf和一把鎖實(shí)現(xiàn)了通道并發(fā)安全的通信,不過細(xì)細(xì)琢磨還是有點(diǎn)疑問的,比如chansend和chanrecv針對(duì)非阻塞模式的無鎖快速試錯(cuò)部分评架,不加鎖是否有可能造成詭異的結(jié)果眷茁,為什么只有這部分可以無鎖,無鎖的范圍還能擴(kuò)大嗎纵诞?今天腦殼疼俱萍,就不細(xì)想了囱稽,留給讀者吧