簡介
熟悉Go的人都知道差购,它提倡著不要通過共享內(nèi)存來通訊忍抽,而要通過通訊來共享內(nèi)存。Go提供了一種獨特的并發(fā)同步技術(shù)來實現(xiàn)通過通訊來共享內(nèi)存插掂,此技術(shù)即為通道灰瞻。我們可以把一個通道看作是在一個程序內(nèi)部的一個FIFO數(shù)據(jù)隊列。 一些協(xié)程可以向此通道發(fā)送數(shù)據(jù)辅甥,另外一些協(xié)程可以從此通道接收數(shù)據(jù)酝润。
Example
介紹一下簡單的用法:
func main() {
c := make(chan int)
go func() {
c <- 1
}()
t := <-c
fmt.Println(t)
}
幾個注意點(后面會一一驗證):
- 向一個nil通道中發(fā)送一個值,將會永久阻塞肆氓。
- 向一個已關(guān)閉的通道中發(fā)送一個值袍祖,將會導(dǎo)致panic。
- 可以從關(guān)閉的通道中讀取值谢揪,緩沖區(qū)為空時蕉陋,讀取的是通道類型的零值。
- 重復(fù)關(guān)閉一個通道也會導(dǎo)致panic拨扶。
- 通道的元素值的傳遞都是復(fù)制過程于样,且至少被復(fù)制過一次以上屠橄。(直接復(fù)制到receiver中經(jīng)過一次復(fù)制,通過緩沖區(qū)的話則經(jīng)歷了兩次復(fù)制)
channel有兩種類型,Unbuffered channels與Buffered channels
Unbuffered channels
c:=make(chan int)
它是一個阻塞型channel蜀变,必須要receiver也準備好的情況下间唉,sender才能夠?qū)⑾⑼哆f到c中去啥繁「椋可以結(jié)合下圖進行思考一波:
Buffered channels
c:=make(chan int,1)
在buf未滿之前,它是一個非阻塞型channel印蓖,sender可以將符合channel類型的值投遞到channel中去辽慕,它內(nèi)部會自己維護一個隊列。當buf滿了之后赦肃,sender會阻塞溅蛉。可以結(jié)合下圖進行思考一波:
幾種應(yīng)用模式
for-range
func forRange() {
c := make(chan int, 5)
for i := 1; i <= 5; i++ {
c <- i
}
close(c)
for v := range c {
fmt.Println(v)
}
}
- 在進行for-range一個通道時他宛,該循環(huán)將源源不斷的從通道中獲取數(shù)據(jù)船侧,直到此通道關(guān)閉并且它的緩沖隊列中為空為止。
- 這里的通道一定不能是單向發(fā)送通道(
chan <- int
)厅各。 - 當for-range一個空通道時镜撩,將會永久阻塞
select-case
func selectCase() {
c := make(chan int, 1)
c <- 1
close(c)
select {
case <-c:
fmt.Println("xxxx")
default:
fmt.Println("aaaa")
}
}
每個
case
關(guān)鍵字后必須跟隨一個通道接收數(shù)據(jù)操作或者一個通道發(fā)送數(shù)據(jù)操作。所有的非阻塞
case
操作中將有一個被隨機選擇執(zhí)行(而不是按照從上到下的順序)队塘,然后執(zhí)行此操作對應(yīng)的case
分支代碼塊琐鲁。在所有的
case
操作均為阻塞的情況下卫旱,如果default
分支存在,則default
分支代碼塊將得到執(zhí)行围段; 否則,當前協(xié)程將被推入所有阻塞操作中相關(guān)的通道的發(fā)送數(shù)據(jù)協(xié)程隊列或者接收數(shù)據(jù)協(xié)程隊列中投放,并進入阻塞狀態(tài)奈泪。
源碼分析
首先了解下channel是怎么創(chuàng)建的?
func main() {
c := make(chan int, 1)//line: 9
close(c) //line: 10
}
通過go tool compile -N -l -S main.go
輸出其匯編代碼灸芳,截取一小段觀察一下:
0x0024 00036 (main.go:9) LEAQ type.chan int(SB), AX // 將&chantype(元素類型是int)放到AX寄存器中
0x002b 00043 (main.go:9) PCDATA $2, $0
0x002b 00043 (main.go:9) MOVQ AX, (SP) // 也就是將&chantype放到SP(0)位置
0x002f 00047 (main.go:9) MOVQ $1, 8(SP)// 將1放到SP(8)位置
0x0038 00056 (main.go:9) CALL runtime.makechan(SB)// makechan(SP0,SP8)
0x003d 00061 (main.go:9) PCDATA $2, $1
0x003d 00061 (main.go:9) MOVQ 16(SP), AX
0x0042 00066 (main.go:9) MOVQ AX, "".c+24(SP)
0x0047 00071 (main.go:10) PCDATA $2, $0
0x0047 00071 (main.go:10) MOVQ AX, (SP)
0x004b 00075 (main.go:10) CALL runtime.closechan(SB)
在上面的流程的關(guān)鍵部分加上了注釋涝桅,也就是說咱們的make(chan int, 1)
最終調(diào)用到了runtime.makechan
這個方法。在進入分析之前先看看channel的結(jié)構(gòu):
type hchan struct {
qcount uint // 隊列中實際有多少個元素
dataqsiz uint // channel的總長度(緩沖區(qū)的總長度)
buf unsafe.Pointer // 指向底層元素的指針
elemsize uint16 // 元素類型的size
closed uint32 // 是否關(guān)閉烙样,0:未關(guān)閉冯遂, 1:已關(guān)閉
elemtype *_type // 元素類型
sendx uint // 發(fā)送位置索引
recvx uint // 接收位置索引
recvq waitq // 接收者隊列,一個雙向鏈表
sendq waitq // 發(fā)送者隊列谒获,一個雙向鏈表
lock mutex // 鎖蛤肌,并發(fā)發(fā)送的時候需要上鎖
}
咱們可以將hchan
中的buf
簡單的看成一個數(shù)組緩沖區(qū),qcount
是數(shù)組中實際存儲元素的數(shù)量批狱,dataqsiz
是數(shù)組的容量裸准,elemtype
是數(shù)組元素的類型。sendx
和recvx
分別是發(fā)送索引位置和接收索引位置赔硫,每次操作都會自增1炒俱,當sendx
和recv
等于dataqsiz
時,會重置為零爪膊。recvq
和sendq
都是雙向鏈表权悟,里面維護著等待接收和等待發(fā)送的goroutine
。當多個gouroutine
并發(fā)操作同一個channel
時推盛,會使用lock
進行控制峦阁。
創(chuàng)建流程
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 緩沖區(qū)中元素類型的尺寸不能超過16k
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判斷是否位數(shù)對齊,
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 計算緩沖區(qū)的總長度小槐,并判斷是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// channel長度或者元素類型尺寸為0時拇派,也就是緩沖區(qū)長度為0時,只用分配hchan所占用的內(nèi)存空間凿跳。
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 元素類型不是指針類型件豌,則將hchan和buf一次性分配出來
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 緩沖區(qū)buf的指針位置在c+hchanSize(hchanSize補齊為8的倍數(shù))
c.buf = add(unsafe.Pointer(c), hchanSize,hchanSize補齊為8的倍數(shù))
default:
// 元素類型是指針類型,hchan和緩沖區(qū)單獨分配
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素的尺寸
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
創(chuàng)建channel時共分為三種情況:
- 緩沖區(qū)大小為0的情況下控嗜,只用給
hchan
分配內(nèi)存即可茧彤。 - 當元素類型不為指針時,可以考慮分配一段連續(xù)的內(nèi)存疆栏,這樣方便垃圾回收曾掂。
- 當元素類型為指針時惫谤,需要給
hchan
和buf
分別開辟空間。
最終都調(diào)用到mallocgc
方法進行內(nèi)存的分配珠洗,分配過程這里不做過多的描述溜歪,后面會考慮寫一篇文章介紹一下分配的相關(guān)流程。
發(fā)送流程
func main() {
c := make(chan int, 1)
c <- 1 // line:9
close(c) //line:10
}
通過go tool compile -N -l -S main.go
輸出其匯編代碼许蓖,截取一小段觀察一下:
0x0057 00087 (main.go:9) CALL runtime.chansend1(SB)
0x005c 00092 (main.go:10) PCDATA $2, $1
0x005c 00092 (main.go:10) PCDATA $0, $0
0x005c 00092 (main.go:10) MOVQ "".c+24(SP), AX
0x0061 00097 (main.go:10) PCDATA $2, $0
0x0061 00097 (main.go:10) MOVQ AX, (SP)
0x0065 00101 (main.go:10) CALL runtime.closechan(SB)
從上面的匯編代碼可以清晰的看到蝴猪,c<-1
就是一個簡單的語法糖,實際上底層調(diào)用的是runtime.chansend1
方法膊爪,咱們跟蹤一下這個方法:
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
從上面的注釋
entry point for c <- x from compiled code
也可以看出來自阱,這段代碼是c <- x
編譯后的一個切入點。接著看下一個調(diào)用棧:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
// 從這里可以觀察到米酬,向一個nil的channel中發(fā)送一個值將會導(dǎo)致永久阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// 通過chansend1方法調(diào)用是不會進入這個條件語句的沛豌,在非阻塞并且通道未關(guān)閉
// 的情況下,滿足 ①該channel是unbufferedChannel且接收隊列為空 ②該channel是bufferedChannel
// 且緩沖區(qū)已滿赃额。 這兩個條件中任意一個條件就可以快速返回false加派。表示投遞失敗
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 進行上鎖,防止出現(xiàn)并發(fā)問題
lock(&c.lock)
if c.closed != 0 {
// 通道已經(jīng)關(guān)閉爬早,解鎖的同時panic,這也證實了向一個已經(jīng)關(guān)閉的通道發(fā)送值會導(dǎo)致panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// 從等待接收的隊列鏈表中取出一個接收者哼丈,進行值的拷貝
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// 當該通道時bufferedChannel時,緩沖區(qū)還未滿的情況下筛严,從緩沖區(qū)中取出一個內(nèi)存塊
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 將發(fā)送值拷貝到上面取出的內(nèi)存塊上面去
typedmemmove(c.elemtype, qp, ep)
// 發(fā)送索引自增1
c.sendx++
if c.sendx == c.dataqsiz {
// 復(fù)用緩沖區(qū)醉旦,當索引位到緩沖區(qū)最后一位時,置位0
c.sendx = 0
}
// 緩沖區(qū)中存儲的元素自增1
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 獲取當前goroutine
gp := getg()
// 從當前goroutine所在的p上獲取一個sudog結(jié)構(gòu)體
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 將要發(fā)送的值賦值給sudog的elem字段桨啃,后面chanrecv會用到
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 將mysg插入到發(fā)送隊列的尾部
c.sendq.enqueue(mysg)
// 進行channel的解鎖车胡,并且將當前goroutine置為waiting狀態(tài)。
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// 這里進行闭振活一下匈棘,防止接受者還沒有拷貝過去,這個值就已經(jīng)被gc給回收了
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 將當前使用的sudog放到空閑池中析命,供下一次使用
releaseSudog(mysg)
return true
}
從上面的流程中主卫,咱們也驗證了使用時需要注意的兩個點:
- 向一個空的
channel
中發(fā)送值將會導(dǎo)致永遠阻塞。 - 向一個已關(guān)閉的
channel
中發(fā)送值將會導(dǎo)致panic鹃愤。
咱們可以將上述流程簡單的總結(jié)為三個段:
接收隊列不為空
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 常用的鏈表操作手段簇搅,取出隊列中的第一個元素
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudog)
}
....
return sgp
}
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
....
if sg.elem != nil {
// 將sp,也就是發(fā)送值拷貝到sg.elem字段中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
....
// 喚醒當前接收的goroutine
goready(gp, skip+1)
}
第一步:從接收隊列中取出第一個sudog元素。
第二步:將發(fā)送的值拷貝到sudog的elem字段上软吐。
第三步:喚醒與當前sudog綁定的處于waiting狀態(tài)的goroutine瘩将。
這個情況下,發(fā)送值只經(jīng)過了一次拷貝,就被接收者消費掉了姿现。
緩沖區(qū)還有可用的位置
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
這個情況下肠仪,發(fā)送值會先進行一次拷貝到緩沖區(qū)中。然后在有接收者的情況下备典,會從緩沖去再拷貝一次异旧,拷貝到接收者指定的內(nèi)存地址上。在這個過程中提佣,會先通過qp := chanbuf(c, c.sendx)
方法取出緩沖區(qū)下一個可用的內(nèi)存塊泽艘,然后通過typedmemmove(c.elemtype, qp, ep)
方法將值拷貝到相應(yīng)的內(nèi)存塊中去。最后增加相應(yīng)的索引位和緩沖區(qū)元素的個數(shù)镐依。這里在滿足條件的情況下會對索引位進行重置,進入下一個輪回天试。
阻塞發(fā)送
...
mysg := acquireSudog()
...
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
...
在不滿足上面兩個條件的情況下槐壳,當前goroutine會保存部分信息到channel的發(fā)送者隊列中,并且通過調(diào)用goparkunlock
阻塞當前goroutine喜每,直到有接收者消費掉了保存該goroutine的sudog
务唐,并調(diào)用goready
方法,才會使當前陷入waiting
狀態(tài)的goroutine被重新喚醒带兜。
接收流程
func main() {
c := make(chan int, 1)
close(c)
<-c //line: 10
}
通過go tool compile -N -l -S main.go
輸出其匯編代碼枫笛,截取一小段觀察一下:
0x0055 00085 (main.go:10) MOVQ AX, (SP)
0x0059 00089 (main.go:10) MOVQ $0, 8(SP)
0x0062 00098 (main.go:10) CALL runtime.chanrecv1(SB)
0x0067 00103 (main.go:11) MOVQ 32(SP), BP
0x006c 00108 (main.go:11) ADDQ $40, SP
實際上在第10行會觸發(fā)runtime.chanrecv1
方法,和上面的發(fā)送一樣刚照,也是個語法糖:
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
這個方法有兩個參數(shù)刑巧,一個是hchan
指針,另一個是一個地址值无畔,后面會將從通道中取出來的值復(fù)制到該地址值中去啊楚。接下來看一下核心的chanrecv
方法,for-range包括select-case最終都會調(diào)用到這個方法浑彰。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
// 從一個nil通道中讀取一個值也會被永遠的阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 通過chanrecv1調(diào)用是不會進入這個代碼塊的恭理,在非阻塞的情況下,如果通道長度為0并且發(fā)送隊列為空郭变,
// 或者通道長度大于0并且緩沖區(qū)中沒有元素并且通道未關(guān)閉的情況下颜价,進行快速返回
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 進行上鎖
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
// 通道已經(jīng)關(guān)閉,并且緩沖區(qū)已經(jīng)空了诉濒,返回該元素類型的零值
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// 在發(fā)送隊列不為空的情況下周伦,取出一個sender,如果是unbufferedChannel循诉,則直接從sender中拷貝
// 一個值到接收者横辆;如果是一個bufferedChannel,從緩沖隊列的recvx處取一個值,并且將sender中的
// 值拷貝到recvx索引的位置
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 {
// 直接從緩沖區(qū)中取出對應(yīng)recvx索引位置的值
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 拷貝qp中的值到指定的地址ep中
typedmemmove(c.elemtype, ep, qp)
}
// 將qp中的元素值清空成零值
typedmemclr(c.elemtype, qp)
// 接收索引進行自增
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 緩沖區(qū)元素長度遞減
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 發(fā)送隊列為空,并且緩存區(qū)中也沒有值狈蚤,后續(xù)流程將會阻塞當前goroutine
gp := getg()
// 從當前g綁定的p中獲取一個sudog(這一塊是從一個緩存隊列中獲取)
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 將mysg放到接收隊列的尾部
c.recvq.enqueue(mysg)
// 將當前goroutine狀態(tài)變?yōu)閣aiting,并且解開channel的鎖
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
// 釋放sudog(重新丟到p對應(yīng)的緩存隊列中)
releaseSudog(mysg)
return true, !closed
}
接收流程和發(fā)送流程類似困肩,可大概歸結(jié)為四個段:
通道關(guān)閉且緩沖區(qū)為0
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
上述代碼將直接返回一個對應(yīng)元素類型的零值,分配對應(yīng)元素零值的代碼typedmemclr(c.elemtype, ep)
脆侮,感興趣的可以自己追蹤一下锌畸。
發(fā)送隊列不為空
這一塊可能和發(fā)送流程有點區(qū)別:
if sg := c.sendq.dequeue(); sg != nil {
// 能進這里代表要么緩沖隊列滿了,要么該通道是一個無緩沖通道
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 代表這個通道是一個無緩沖通道
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// 直接從sender中拷貝值
recvDirect(c.elemtype, sg, ep)
}
} else {
// 從緩沖區(qū)中直接取出recvx索引位置的值
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// 拷貝緩沖區(qū)中的值到指定的ep地址上
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 這里多了一步流程靖避,會將sender中的值存放到剛剛recvx位置處
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒當前sender的goroutine
goready(gp, skip+1)
}
這里會分兩種情況進行考慮:
- 通道是unbuffered channel潭枣,則直接拷貝從發(fā)送隊列中取出來的值。
- 通道是buffered channel幻捏,則拷貝從緩沖區(qū)中取出響應(yīng)的值盆犁,并且需要將發(fā)送隊列中取出來的值拷貝到對應(yīng)緩沖區(qū)的位置上。
在緩沖區(qū)是滿的情況下篡九,sendx和recvx指向同一個位置谐岁。例如上圖:咱們?nèi)〕鰎ecvx為2處的元素,然后會將sudog中的值拷貝到2位置處榛臼,同時sendx和recvx都指向3位置伊佃。
- 最終喚醒與當前sender sudog綁定的goroutine。
緩沖區(qū)不為空
if c.qcount > 0 {
// 讀取緩沖區(qū)recvx索引處的值
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 將上述的值拷貝到指定的地址ep中
typedmemmove(c.elemtype, ep, qp)
}
// 將recvx處的值置位零值
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
直接從緩沖區(qū)中讀取recvx索引位置中的值沛善,將其拷貝大指定的指針中航揉,然后將對應(yīng)recvx處置位零值。
如上圖所示金刁,取出索引位置為3的元素后帅涂,將其置位零值,發(fā)送者下次填充buf時就可以從index=3的位置開始填充胀葱。
阻塞接收
流程和上述的阻塞發(fā)送流程一致漠秋,不做過多介紹。
關(guān)閉
簡單的看一段代碼
func main() {
c := make(chan int)// line:9
close(c) //line:10
}
通過go tool compile -N -l -S main.go
輸出其匯編代碼抵屿,截取一小段觀察一下:
0x0038 00056 (main.go:9) CALL runtime.makechan(SB)
0x003d 00061 (main.go:9) PCDATA $2, $1
0x003d 00061 (main.go:9) MOVQ 16(SP), AX //將makechan返回的結(jié)果放到AX寄存器
0x0042 00066 (main.go:9) MOVQ AX, "".c+24(SP)
0x0047 00071 (main.go:10) PCDATA $2, $0
0x0047 00071 (main.go:10) MOVQ AX, (SP) //將AX寄存器中的值復(fù)制到SP(0)位置
0x004b 00075 (main.go:10) CALL runtime.closechan(SB) //調(diào)用runtime.closechan方法
從上面的匯編代碼可以看出來庆锦,通道的關(guān)閉最終調(diào)用到了runtime.closechan
這個方法。
func closechan(c *hchan) {
// 關(guān)閉一個空的channel將會導(dǎo)致panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
// 關(guān)閉一個已關(guān)閉的channel也會導(dǎo)致panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 將closed置位1轧葛,表示當前通道已關(guān)閉
c.closed = 1
var glist gList
// 取出所有接收隊列中的元素搂抒,將其加入到一個單向鏈表中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 取出所有發(fā)送隊列中的元素,將其加入到一個單向鏈表中
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 喚醒阻塞的goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
關(guān)閉通道這一塊理解起來不太難尿扯,最開始對空channel和已關(guān)閉的channel做了panic處理求晶,后面進行資源的釋放。因為處于發(fā)送隊列和接收隊列的goroutine都是阻塞狀態(tài)的衷笋,咱們在關(guān)閉這個通道時必須得將這些goroutine都喚醒芳杏,防止goroutine泄露。
select-go流程
這里主要是分析一下select-go接收流程,發(fā)送流程類似爵赵。
單case分支(包含一個default)
func main() {
c := make(chan int)
close(c)
select {
case <-c://line: 11
default:
}
}
使用上面的編譯指令生成匯編代碼如下(截取部分):
0x0050 00080 (main.go:11) MOVQ $0, (SP)
0x0058 00088 (main.go:11) PCDATA $2, $1
0x0058 00088 (main.go:11) PCDATA $0, $0
0x0058 00088 (main.go:11) MOVQ "".c+24(SP), AX
0x005d 00093 (main.go:11) PCDATA $2, $0
0x005d 00093 (main.go:11) MOVQ AX, 8(SP)
0x0062 00098 (main.go:11) CALL runtime.selectnbrecv(SB)
跟蹤一下調(diào)用鏈:
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
// 這里不會進行阻塞吝秕,因為這里默認有個default分支,c中無法取出值的情況下默認執(zhí)行default分支即可
// 具體實現(xiàn)流程參考上面的接收流程
selected, _ = chanrecv(c, elem, false)
return
}
單case分支(不包含default)
func main() {
c := make(chan int)
close(c)
select {
case <-c: //line:11
}
}
使用上面的編譯指令生成匯編代碼如下(截取部分):
0x005a 00090 (main.go:11) MOVQ "".c+24(SP), AX
0x005f 00095 (main.go:11) PCDATA $2, $0
0x005f 00095 (main.go:11) MOVQ AX, (SP)
0x0063 00099 (main.go:11) MOVQ $0, 8(SP)
0x006c 00108 (main.go:11) CALL runtime.chanrecv1(SB)
跟蹤一下調(diào)用鏈:
func chanrecv1(c *hchan, elem unsafe.Pointer) {
// 在從通道無法取出值的情況下空幻,會阻塞當前goroutine烁峭,等待被喚醒
// 具體實現(xiàn)流程參考上面的接收流程
chanrecv(c, elem, true)
}
多case分支
func main() {
c := make(chan int)
a := make(chan int)
close(c)
close(a)
select { //line: 13
case <-c:
case <-a:
default:
}
}
使用上面的編譯指令生成匯編代碼如下(截取部分):
0x011d 00285 (main.go:13) MOVQ AX, (SP)
0x0121 00289 (main.go:13) PCDATA $2, $1
0x0121 00289 (main.go:13) PCDATA $0, $0
0x0121 00289 (main.go:13) MOVQ ""..autotmp_7+88(SP), AX
0x0126 00294 (main.go:13) PCDATA $2, $0
0x0126 00294 (main.go:13) MOVQ AX, 8(SP)
0x012b 00299 (main.go:13) MOVQ $3, 16(SP) // 表示case長度為3(包括default分支)
0x0134 00308 (main.go:13) CALL runtime.selectgo(SB)
跟蹤一下調(diào)用鏈(截取關(guān)鍵性流程代碼,省略了計算pollorder
和lockorder
的部分):
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// 獲取所有的case
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
for i := range scases {
// 遍歷所以的case秕铛,將channel為空并且類型不是default的case置位一個空結(jié)構(gòu)體scase
cas := &scases[i]
if cas.c == nil && cas.kind != caseDefault {
*cas = scase{}
}
}
// 在進行select的時候會鎖住所有關(guān)聯(lián)的channel
sellock(scases, lockorder)
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
loop:
// pass 1 - look for something already waiting
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
for i := 0; i < ncases; i++ {
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
case caseNil:
continue
case caseRecv:
sg = c.sendq.dequeue()
if sg != nil {
// 發(fā)送隊列不為空约郁,取隊列的head,進行值的拷貝
goto recv
}
if c.qcount > 0 {
// 從緩沖區(qū)去獲取值
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
case caseSend:
if c.closed != 0 {
// 這里會panic
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
// 接收隊列不為空但两,取隊列head鬓梅,進行值的拷貝
goto send
}
if c.qcount < c.dataqsiz {
// 緩沖區(qū)還有可用位置,拷貝到緩沖區(qū)
goto bufsend
}
case caseDefault:
// 這個分支不會break-for循環(huán)谨湘,所以優(yōu)先級比較低一些
dfli = casi
dfl = cas
}
}
if dfl != nil {
// default被命中了己肮,直接返回
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// pass 2 - enqueue on all chans
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink
// 這里會將當前sudog放到每個case對應(yīng)的channel隊列中
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
case caseSend:
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
gp.param = nil
// 阻塞當前goroutine,并給所有case對應(yīng)的channel解鎖
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
// 被喚醒后立即鎖上select對應(yīng)的所有的channel
sellock(scases, lockorder)
gp.selectDone = 0
// 因為喚醒的goroutine會將對應(yīng)的sudog或者nil(channel關(guān)閉)放到當前goroutine的param參數(shù)上
// 取出命中的sudog供后面進行遍歷比對
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
sglist = gp.waiting
// 將sglist鏈路中所有sudog狀態(tài)信息和元素指針清空
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
// 一個個進行比對
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sg == sglist {
// 找到命中的sudog悲关,記錄對應(yīng)的索引位
casi = int(casei)
cas = k
} else {
// 不是對應(yīng)的sudog,從對應(yīng)通道的隊列中移除
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
// 從sglist上移除
sgnext = sglist.waitlink
sglist.waitlink = nil
// 將其歸還給緩存
releaseSudog(sglist)
sglist = sgnext
}
if cas == nil {
// 通道被關(guān)閉也會喚醒所有隊列中的goroutine娄柳,所以這里再跑一次上面的循環(huán)即可
// 便會直接走goto rclose或者goto sclose
goto loop
}
c = cas.c
if cas.kind == caseRecv {
recvOK = true
}
selunlock(scases, lockorder)
goto retc
// 后面部分配合發(fā)送流程和接收流程理解一下即可寓辱,和前面描述的基本一樣
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
// can send to buffer
....
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc
rclose:
// read at end of closed channel
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
retc:
return casi, recvOK
sclose:
// 向一個已關(guān)閉的channel發(fā)送值,panic
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
雖然上面的代碼開起來很長赤拒,但是邏輯十分清晰秫筏,去掉后半段的send
,recv
挎挖,close
部分这敬,核心的處理流程就不太多了。分幾步總結(jié)一下:
- 將通道為nil并且是非default類型的case設(shè)置值為
scase{}
蕉朵,可以讓后面的邏輯不對channel是否為空做判斷崔涂。 - 遍歷所有的case項,分四種情況進行判斷始衅。
- 類型是空:直接跳過冷蚂。
- 接收類型:先判斷發(fā)送隊列是否為空,不為空汛闸,結(jié)束循環(huán)蝙茶;再判斷緩沖區(qū)是否有數(shù)據(jù),有數(shù)據(jù)诸老,結(jié)束循環(huán)隆夯;最后判斷通道是否關(guān)閉,已關(guān)閉,結(jié)束循環(huán)蹄衷。
- 發(fā)送類型:先判斷通道是否關(guān)閉忧额,已關(guān)閉,則panic宦芦;再判斷接收隊列是否為空宙址,不為空,結(jié)束循環(huán)调卑;判斷緩沖區(qū)是否已經(jīng)滿了抡砂,未滿,結(jié)束循環(huán)恬涧。
- default類型:不會結(jié)束循環(huán)注益,所以優(yōu)先級最低。
- 在所有case都不滿足的情況下溯捆,當前goroutine就會進入
waiting
狀態(tài)丑搔,等待被喚醒。 - 喚醒后進行比對提揍,取出對應(yīng)的case索引即可啤月。
總結(jié)
本文對channel這一塊的知識點大概的介紹了一下,包括了使用案例以及源碼分析劳跃。其中源碼占了大半部分谎仲,可能會略顯枯燥,但是相信只要用心去看刨仑,都會get到一部分的郑诺。在咱們熟悉源碼理解原理后,將會幫助我們寫出更優(yōu)質(zhì)的代碼杉武。