非原創(chuàng)找默,搬運(yùn)工
目錄總匯
思考
- chanel的零值是什么趾访,對(duì)其發(fā)送數(shù)據(jù)會(huì)怎么樣
- channel是并發(fā)安全的嗎
- channel使用不當(dāng)會(huì)引發(fā)協(xié)程泄漏,請(qǐng)舉例
1.Channel的使用場(chǎng)景和基本用法
在GO語言中流行一句很廣的諺語
Don’t communicate by sharing memory, share memory by communicating.
--Go Proverbs by Rob Pike
直白來講就是執(zhí)行業(yè)務(wù)處理的協(xié)程不要通過共享內(nèi)存的方式通信翼抠,而是要通過channel的通信方式分享數(shù)據(jù)
- communicate by sharing memory是傳統(tǒng)的并發(fā)編程處理方式咙轩,即對(duì)臨界區(qū)加鎖
- share memory by communicating是類似CSP模型的方式,通過通信的方式阴颖,一個(gè)協(xié)程g可以把數(shù)據(jù)的“所有權(quán)”交給另一個(gè)g
也就是說channel類型可應(yīng)用于一下場(chǎng)景
- 數(shù)據(jù)交流和傳遞:類似消息中間件的生產(chǎn)者和消費(fèi)者活喊,傳遞的數(shù)據(jù)也可以是信號(hào)
- 提供并發(fā)保護(hù):類似鎖
- 任務(wù)編排:既然可以傳遞信號(hào),意味著可以根據(jù)信號(hào)來讓g按一定順序執(zhí)行量愧,這就是編排功能
channel的元語定義如下
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType
當(dāng)然這個(gè)elem也可以是一個(gè)通道
chan<- chan int
chan (<-chan int)
箭頭“<-”遵循最左原則钾菊,總是盡量和左邊的chan結(jié)合帅矗,所以需要注意配合括號(hào)使用
特別需要注意的是未初始化的chan的零值是nil(nil是chan的零值),可以后續(xù)使用make初始化
var c chan int //nil
c=make(chan int)
chan還可用于for-range
for v := range ch {
fmt.Println(v)
}
那么忽略讀取的值结缚,只是清空chan就是
for range ch {
}
2.源碼走讀
2.1.創(chuàng)建通道m(xù)akechan
后續(xù)就以該demo代碼進(jìn)行邏輯梳理
我們分別創(chuàng)建了帶緩存和不帶緩存的chan
func main() {
c := make(chan int)
c1 := make(chan int, 5)
close(c)
close(c1)
fmt.Println("vim-go")
}
對(duì)變量c定義的那一行打斷點(diǎn)
dlv debug main.go
b main.go:6
c
disass
通過查看匯編代碼我們發(fā)現(xiàn)實(shí)際上是調(diào)用runtime.makechan方法(在之前學(xué)習(xí)make方法的時(shí)候我們就知道底層是這樣調(diào)用的损晤,現(xiàn)在是驗(yàn)證)
runtime/chan.go/makechan函數(shù)的定義
func makechan(t *chantype, size int) *hchan {
}
這個(gè)方法有點(diǎn)難看懂,但是看簽名可以知道红竭,最后得到了一個(gè)hchan的結(jié)構(gòu)體(1.17和1.20沒區(qū)別)
2.2chan數(shù)據(jù)結(jié)構(gòu)
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
- qcount 記錄循環(huán)隊(duì)列的元素?cái)?shù)量尤勋,直接看成chan中元素個(gè)數(shù)就行
- datasize 循環(huán)隊(duì)列大小,就是make創(chuàng)建指定的長度
- buf 循環(huán)隊(duì)列的指針
- elemtype chan中元素類型
- elemtsize chan中單個(gè)元素大小
- sendx 發(fā)送指針send在buf中位置/索引
- recvx 接收指針recv在buf中位置/索引
- sendq和recvq 使用雙向鏈表(循環(huán)隊(duì)列)用于存儲(chǔ)等待的gorutine
- lock 使用鎖保護(hù)所有字段茵宪,所有chan是并發(fā)安全的
2.3初始化
實(shí)際上go在編譯時(shí)會(huì)根據(jù)容量大小選擇調(diào)用makechan64或者makechan最冰,但兩者邏輯基本相同(makechan64只做了size檢查,底層還是調(diào)用makechan)
makechan會(huì)根據(jù)是否有緩沖區(qū)和元素類型是否為指針來初始化成不同的runtime.hchan
func makechan(t *chantype, size int) *hchan {
// elem是channel元素稀火,這里我們是chan int
elem := t.elem
...略過一連串檢測(cè)代碼...
//math.MulUintptr是判斷申請(qǐng)的內(nèi)存空間會(huì)不會(huì)超過最大申請(qǐng)限制
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
......
var c *hchan
switch {
//無緩沖區(qū)情況分支暖哨,只給hchan分配一段內(nèi)存
//即不創(chuàng)建buf
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
//chan元素不是指針的情況
//給hchan和緩沖區(qū)buf分配一塊連續(xù)內(nèi)存
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
//hchan數(shù)據(jù)結(jié)構(gòu)后面緊接著就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
//元素包含指針
//buf單獨(dú)分配一塊內(nèi)存
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
//locakinit不太重要,根據(jù)函數(shù)名字可以推測(cè)是鎖初始化
lockInit(&c.lock, lockRankHchan)
return c
}
func (c *hchan) raceaddr() unsafe.Pointer {
return unsafe.Pointer(&c.buf)
}
makechan無非是干了兩件數(shù):
- 參數(shù)校驗(yàn)
- 初始化hchan結(jié)構(gòu)體凰狞,其中根據(jù)元素是否為指針又對(duì)buf進(jìn)行了不同的內(nèi)存分配處理
擴(kuò)展-在線方法查看匯編代碼
ssa是編譯器經(jīng)過優(yōu)化生成的中間代碼篇裁,在網(wǎng)頁寫代碼之后直接查看,跟dlv一樣赡若,只是多一種同類型工具
2.4數(shù)據(jù)發(fā)送chansend
接下來會(huì)經(jīng)常遇到這兩個(gè)函數(shù),先有個(gè)映像:
- gopark阻塞當(dāng)前協(xié)程
- goready喚醒一個(gè)協(xié)程
查看底層匯編
chan的發(fā)送實(shí)際上是調(diào)用runtime.chansend1方法达布,是chansend的封裝,分段學(xué)習(xí)他的邏輯(同樣略過一堆參數(shù)驗(yàn)證)
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
其中參數(shù)block為true時(shí)表示發(fā)送操作是阻塞的逾冬,反之false就是想要發(fā)送是不阻塞的(遇到阻塞情況就直接返回false)
- 第一部分判斷是否為nil黍聂,前面說過chan的零值是nil,會(huì)調(diào)用gopark讓調(diào)用者永久阻塞(所以接下來的throw也永遠(yuǎn)不會(huì)執(zhí)行)
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
驗(yàn)證阻塞身腻,測(cè)試結(jié)果顯示超時(shí)
func Test_chan(t *testing.T) {
var c chan int
c <- 1
t.Log("hi")
}
- 第二部分产还,如果chan沒被關(guān)閉,但是滿了嘀趟,又要求不阻塞發(fā)送脐区,就直接返回false
if !block && c.closed == 0 && full(c) {
return false
}
- 第三部分,若對(duì)一個(gè)已經(jīng)關(guān)閉的chan發(fā)送數(shù)據(jù)直接panic
lock(&c.lock)
//驗(yàn)證通道是否關(guān)閉
//這里可以得知往已經(jīng)關(guān)閉的通道發(fā)送會(huì)引發(fā)panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
至此我們對(duì)chan的nil她按、close的情況進(jìn)行了檢查牛隅,接下里就開始發(fā)送數(shù)據(jù)
- 第四部分,如果循環(huán)隊(duì)列有等待的人就直接扔給他(實(shí)際就是把通道內(nèi)的元素拷貝到接收變量的內(nèi)存地址上去)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
以下是擴(kuò)展閱讀
關(guān)于recvq是用于存儲(chǔ)等待的gorutine的尤溜,是waitq類型,實(shí)際就是一個(gè)循環(huán)隊(duì)列/雙向鏈表
type waitq struct {
first *sudog
last *sudog
}
waitq.dequeue邏輯很簡單汗唱,就是遞歸讀取鏈表
func (q *waitq) dequeue() *sudog {
for {
//先獲取頭部元素宫莱,這個(gè)是最后要被彈出去的
//頭部都沒有就說明是個(gè)空鏈表
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
//彈出頭部后整理鏈表,更新first
//沒有下一個(gè)就意味著彈出頭部后是個(gè)空鏈表
//有就將下一個(gè)作為頭部元素
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil
}
//如果第一個(gè)不滿足條件就扔掉重新來
if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
continue
}
return sgp
}
}
循環(huán)隊(duì)列調(diào)用send方法發(fā)送數(shù)據(jù)哩罪,主要邏輯是以下部分
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//拷貝變量地址到接收端的內(nèi)存地址
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
主要干了以下兩件事情:
- 調(diào)用sendDirect將發(fā)送數(shù)據(jù)直接拷貝到 接收channel通道元素的內(nèi)存地址上去
即 x = c <-a 授霸,拷貝到x內(nèi)存地址上
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
//memove是內(nèi)存拷貝
memmove(dst, src, t.size)
}
- 調(diào)用goready
//這里進(jìn)行協(xié)程調(diào)度是為了喚醒那個(gè)在等待數(shù)據(jù)的協(xié)程
func goready(gp *g, traceskip int) {
// 切換到g0的棧
systemstack(func() {
ready(gp, traceskip, true)
})
}
該函數(shù)主要功能就是切換的g0棧執(zhí)行ready方法巡验,核心方法是gorutine狀態(tài)切換到runable,放入隊(duì)列等待P調(diào)度碘耳,這個(gè)在之前學(xué)習(xí)GMP的時(shí)候已經(jīng)很熟悉了显设,這就是喚醒協(xié)程(這里是數(shù)據(jù)接收方)的方法(注意是放入runnext,并沒有立即執(zhí)行辛辨,要等待P調(diào)度)
至此發(fā)送數(shù)據(jù)的第一種情況分析完畢:若有等待協(xié)程則直接發(fā)送捕捂,具體發(fā)送過程是將元素拷貝給接收變量內(nèi)存地址,喚醒數(shù)據(jù)接收方進(jìn)入?yún)f(xié)程調(diào)度- 第五部分斗搞,若沒有等待協(xié)程指攒,且buf沒滿,數(shù)據(jù)放入buf僻焚,然后退出
//datasize緩沖區(qū)大小,qcount為放入元素?cái)?shù)量
//進(jìn)入該分支時(shí)此時(shí)緩沖區(qū)未滿允悦,且接收端沒人等
if c.qcount < c.dataqsiz {
//計(jì)算出下一個(gè)存儲(chǔ)數(shù)據(jù)位置,sendx是發(fā)送指針位置
qp := chanbuf(c, c.sendx)
//將剛進(jìn)入channel的元素發(fā)送到計(jì)算出的位置
typedmemmove(c.elemtype, qp, ep)
//發(fā)送指針自增
c.sendx++
// 緩沖區(qū)滿了虑啤,
//重置sendx
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
qcount<datasize即buf中還有剩余空間隙弛,利用chanbuf計(jì)算出下一個(gè)存儲(chǔ)數(shù)據(jù)的位置,再利用typedmemove將發(fā)送數(shù)據(jù)拷貝到計(jì)算出的位置狞山,發(fā)送指針sendx+1,通道元素?cái)?shù)量qcount+1
- 第6部分全闷,也就是第三種發(fā)送情況,走到這里說明是條件都不滿足:
1.要么沒有緩沖區(qū)铣墨,也沒人消費(fèi)
2.要么緩沖區(qū)buf大小不夠室埋,也沒人消費(fèi)
需要阻塞當(dāng)前協(xié)程
if !block {
unlock(&c.lock)
return false
}
//func getg() *g嘗試獲取發(fā)送數(shù)據(jù)使用的協(xié)程
//就是自己了沒錯(cuò)
gp := getg()
//獲取sudog結(jié)構(gòu)體并設(shè)置一次阻塞發(fā)送的配置
//將要發(fā)送的元素進(jìn)行單獨(dú)特例包裝
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//扔到sendq鏈表里面去
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
//你個(gè)協(xié)程睡覺去吧你
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
acquireSudog返回一個(gè)sudog結(jié)構(gòu)體,并設(shè)置這一次阻塞發(fā)送的相關(guān)信息伊约,包括發(fā)送的協(xié)程姚淆,是否在select中發(fā)送等
sendq.enqueue就是和recvq的dequeue相反,是將其入隊(duì)屡律,等待條件滿足喚醒
調(diào)用gopark切走協(xié)程主動(dòng)讓出cpu執(zhí)行權(quán)限
剩下代碼就是被喚醒后做一些收尾的工作腌逢,如釋放內(nèi)存,return true表示成功發(fā)送數(shù)據(jù)
2.5只發(fā)送不接收引發(fā)協(xié)程泄漏
在上述源碼閱讀中我們發(fā)現(xiàn)超埋,如果到達(dá)第三種情況搏讶,即要么沒緩沖區(qū)也沒人接收,要么有緩沖區(qū)但是滿了霍殴,最后會(huì)阻塞協(xié)程進(jìn)入休眠
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
在以后學(xué)完GMP后我們會(huì)知道協(xié)程是分配在堆上面的媒惕,一直休眠就意味著該堆空間一直沒法回收,就引發(fā)了協(xié)程泄漏来庭,同理只接收不發(fā)送也會(huì)如此(指到達(dá)第三種情況)
2.6小結(jié)
至此通過閱讀chansend源碼妒蔚,數(shù)據(jù)發(fā)送(chan<-i)的三種情況已經(jīng)分析完畢:
1.剛好有協(xié)程在recvq中等待則直接給他
2.緩沖區(qū)有空位置(qcount<datasize)就放上去
3.都不滿足就創(chuàng)建sudo結(jié)構(gòu)體,包裝元素,扔到sendq鏈表中肴盏,阻塞當(dāng)前協(xié)程等待被喚醒
發(fā)送數(shù)據(jù)時(shí)觸發(fā)Gorutine調(diào)度的幾個(gè)時(shí)機(jī):
1.直接發(fā)送時(shí)會(huì)喚醒協(xié)程(send調(diào)用goready)科盛,等待調(diào)度
2.發(fā)送數(shù)據(jù)時(shí)沒找到接收方且緩存已滿就將自己入隊(duì)等待調(diào)度(gopark)
2.7.接收數(shù)據(jù)chanrecv
本質(zhì)上是調(diào)用runtime.chanrecv1(是對(duì)chanrecv的封裝),
如果返回兩個(gè)值編譯器是調(diào)用chanrecv2
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv函數(shù)簽名菜皂,同樣也是分段學(xué)習(xí)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
- 第一部分同理檢查nil贞绵,后面的非阻塞也一樣就先跳過
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
- 第二部分,討論被關(guān)閉的情況恍飘,如果已經(jīng)關(guān)閉且沒元素了就返回true和false榨崩,映射到開發(fā)者操作已關(guān)閉的chan,如果已關(guān)閉且沒元素常侣,接收chan的第二個(gè)參數(shù)返回false蜡饵;反之還有元素就返回true和true,映射到開發(fā)者就是返回true
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
//這個(gè)是1.20新增的代碼胳施,但是總體邏輯還是一樣的
//如果通道被關(guān)閉且緩沖區(qū)還有數(shù)據(jù)
//還能繼續(xù)接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
因此這也是開發(fā)者不能單憑第二個(gè)參數(shù)來判斷通道是否已關(guān)閉的原因溯祸,實(shí)際開發(fā)中會(huì)類似content.Context再使用一個(gè)通道發(fā)送關(guān)閉信號(hào)
又到了真正接收數(shù)據(jù)的邏輯部分
- 第三部分,有等待的發(fā)送方就直接去找他要數(shù)據(jù)
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
這里要注意recv會(huì)根據(jù)緩沖區(qū)大小分別處理
不存在則調(diào)用recvDirect從sendq隊(duì)列上取
存在則將數(shù)據(jù)拷貝到內(nèi)存舞肆,將發(fā)送隊(duì)列頭的數(shù)據(jù)拷貝到緩沖區(qū)中焦辅,釋放一個(gè)阻塞的發(fā)送方
第四部分,沒等待的發(fā)送方椿胯,buf有數(shù)據(jù)直接拿
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
邏輯類似上面發(fā)送代碼筷登,存到緩沖區(qū)中
- 第五部分,buf沒元素哩盲,阻塞
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
還是獲取接收協(xié)程信息前方,sudog包裝入隊(duì)然后阻塞,最后收尾工作
2.8.關(guān)閉通道close
調(diào)用closechan方法前面是異常處理廉油,跳過
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
清除sendq上的未被處理協(xié)程惠险,最后為所有被阻塞的調(diào)用goready觸發(fā)調(diào)度
3.channel使用容易犯的錯(cuò)誤
使用chan最常見的錯(cuò)誤是引發(fā)panic和協(xié)程泄露(一直gopark),其實(shí)我們通過源碼學(xué)習(xí)也知道了部分原因
引發(fā)panic的情況:
- close為nil的chan
- close已經(jīng)close的chan
- send已經(jīng)close的chan
引發(fā)協(xié)程泄露的情況
- 對(duì)nil的chan進(jìn)行發(fā)送和接收抒线,也是卡在gopark
- 只接收不發(fā)送和只發(fā)送不接收班巩,這很好理解直接卡在gopark了
- 還有一些特殊情況都是上面第二種的衍生
如
func process(timeout time.Duration) bool {
ch := make(chan bool)
go func() {
// 模擬處理耗時(shí)的業(yè)務(wù)
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println("exit goroutine")
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}
此時(shí)如果time.After先超時(shí)了,那么go開啟的協(xié)程就會(huì)阻塞永遠(yuǎn)結(jié)束不了嘶炭,解決辦法就是給chan增加容量
4.select
在上面分析阻塞接收和阻塞發(fā)送時(shí)抱慌,我們都遇到結(jié)構(gòu)體sudo(進(jìn)入recvq或sendq隊(duì)列的結(jié)構(gòu)體,保存協(xié)程信息)眨猎,它有一個(gè)字段叫做isSelect抑进,判斷當(dāng)前執(zhí)行環(huán)境是否在select中,其實(shí)在學(xué)習(xí)select的時(shí)候我們就知道了睡陪,我們常將select和channel搭配使用寺渗,這也說明了兩者代碼是存在一定關(guān)系的夕凝。
但是當(dāng)我們想使用dlv對(duì)select打斷點(diǎn)時(shí)發(fā)現(xiàn)有些沒有效果,這段代碼會(huì)在編譯階段被優(yōu)化成其他户秤!
4.1無case的select
select{}
這個(gè)可以通過dlv捕捉到,內(nèi)部實(shí)現(xiàn)是runtime.block方法
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // 永久阻塞
}
同時(shí)內(nèi)部也調(diào)用gopark(讓出當(dāng)前 Goroutine 對(duì)處理器的使用權(quán)并傳入等待原因 waitReasonSelectNoCases逮矛,上文的非阻塞發(fā)送有遇到過)進(jìn)行永久阻塞
4.2單一channel
demo.go
func main() {
c := make(chan int)
x := 0
select {
case c <- x:
fmt.Println("hi")
}
}
實(shí)際上是直接被編譯器優(yōu)化成接收方法鸡号,并沒有select的痕跡
4.3單一channel+default
demo.go
select {
case c <- x:
fmt.Println("hi")
default:
fmt.Println("bye")
}
調(diào)用selectnbsend,將c<-x改為 <-c
變成了調(diào)用selectnbrecv
阻塞發(fā)送的情況
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
內(nèi)部是對(duì)chansend和chanrecv的封裝须鼎,區(qū)別在于傳入的block=false鲸伴,還記得上文分析過,當(dāng)剛好沒有等待協(xié)程且緩沖區(qū)已滿時(shí)會(huì)進(jìn)入block判斷分支:
//chansend函數(shù)
if sg:= c.recvq.dequeue
....
if c.qucount < c.dataqsize
...
if !block {
unlock(&c.lock)
return false
}
也就是說遇到上面情況(沒有消費(fèi)協(xié)程且沒緩沖區(qū))會(huì)快速失敗晋控,而不是繼續(xù)向下執(zhí)行阻塞協(xié)程汞窗,來讓select后面的代碼有可執(zhí)行機(jī)會(huì)
4.3.1 default的應(yīng)用舉例
比如手寫一個(gè)消息隊(duì)列(這里我們采用一個(gè)發(fā)布者對(duì)應(yīng)多個(gè)訂閱者時(shí)使用多個(gè)通道的方式實(shí)現(xiàn)),有如下結(jié)構(gòu)體
type Msg struct {
context string
}
type Broker struct {
l sync.RWMutex
ch []chan Msg
}
訂閱方法如下
func (b *Broker) SubScribe(c int) (<-chan Msg, error) {
b.l.Lock()
defer b.l.Unlock()
ch := make(chan Msg, c)
b.ch = append(b.ch, ch)
return ch, nil
}
發(fā)送消息如下
func (b *Broker) Send(m Msg) error {
b.l.Lock()
defer b.l.Unlock()
for _, c := range b.ch {
c<-m
}
return nil
}
這時(shí)我們就遇到問題赡译,如果一個(gè)訂閱者中的消息沒有被消費(fèi)導(dǎo)致他的通道滿了(即通道容量本來是10仲吏,現(xiàn)在發(fā)送第11個(gè)消息,前面都沒消費(fèi))蝌焚,就會(huì)導(dǎo)致后面全部卡住裹唆,此時(shí)我們可以修改成以下方式發(fā)送
select {
case c <- m:
default:
return errors.New("cant send")
}
4.3.2多路復(fù)用(多個(gè)channel)
demo.go
select {
case c <- x:
fmt.Println("hi")
case <-c:
fmt.Println(x)
default:
fmt.Println(x)
}
變成了selectgo函數(shù),這是select里面的重頭戲(位于runtime/select.go)
先來看函數(shù)簽名
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
- case0是一個(gè)類型為[ncases]scase的數(shù)組
- order0 是一個(gè)指向[2*ncases]uint16數(shù)組只洒,值都為0
為什么 selectgo 還需要傳遞一個(gè) order0许帐,而不是直接根據(jù) ncase 直接分配呢
編譯轉(zhuǎn)換會(huì)使用 temp 函數(shù)來構(gòu)造生成數(shù)組的語句,而這個(gè)語句便可以保證數(shù)據(jù)會(huì)分配到棧上毕谴,而不是堆上成畦,避免了不必要的堆分配
- selectgo 會(huì)返回選中的序號(hào),如果是個(gè)接收涝开,還會(huì)返回是否接收到一個(gè)值
select在go語言不存在相應(yīng)的結(jié)構(gòu)體循帐,但是使用的分支case在go中使用scase結(jié)構(gòu)體表示
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}
- c 類型hchan已經(jīng)很熟悉了,就是一個(gè)通道的結(jié)構(gòu)體忠寻,來存儲(chǔ)case中使用的channel
回到selectgo函數(shù)
//將case0數(shù)組和order0轉(zhuǎn)為slice結(jié)構(gòu)
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// [:n:n]的方式會(huì)讓slice的len和cap相等
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]
下面看不懂沒關(guān)系惧浴,我也看不懂索性直接copy,只要知道它是隨機(jī)選取分支就好
首先會(huì)進(jìn)行執(zhí)行必要的初始化奕剃,決定處理case的兩個(gè)順序 :
- 輪詢順序pollorder
- 加鎖順序 lockorder
order1會(huì)被分為pollorder和lockorder衷旅,這兩個(gè)slice將會(huì)真正決定select的隨機(jī)選擇以及死鎖問題
norder := 0
for i := range scases {
cas := &scases[i]
//對(duì)于channel為nil的收發(fā)操作,將他們的elem設(shè)置為nil
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
// fastrabdb為生成隨機(jī)數(shù)函數(shù)
//porder剛開始是0纵朋,循環(huán)結(jié)束后是隨機(jī)順序的scases索引
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]
....
sellock(scases, lockorder)
....
這里的輪詢順序pollorder是隨機(jī)的柿顶,避免channel的饑餓問題,保證公平性操软,之后根據(jù)channel的地址順序確定加鎖順序來避免死鎖發(fā)生(sellock函數(shù))
如果多個(gè) goroutine 都需要鎖定 ch1 ch2嘁锯,而他們加鎖的順序不固定,那么很可能會(huì)出現(xiàn)死鎖問題
這個(gè)時(shí)候,對(duì)加鎖的順序就有要求了家乘,按照同樣的順序的話蝗羊,沒有競(jìng)爭到 ch1.lock 的 goroutine,會(huì)等待加鎖 ch1.lcok,而不會(huì)直接去加鎖 ch2.lock
func sellock(scases []scases, lockorder []int16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[0].c // 根據(jù)加鎖順序獲取 case
// c 記錄了上次加鎖的 hchan 地址仁锯,如果和當(dāng)前 *hchan 相同耀找,那么就不會(huì)再次加鎖
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}
加鎖完成,進(jìn)入selectgo主循環(huán)邏輯
第一階段的主要職責(zé)是查找所有 case 中是否有可以立刻被處理的 Channel业崖。無論是在等待的 Goroutine 上還是緩沖區(qū)中野芒,只要存在數(shù)據(jù)滿足條件就會(huì)立刻處理
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.c = c
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
...
}
如果不能立刻找到活躍的 Channel 就會(huì)進(jìn)入循環(huán)的下一階段,按照需要將當(dāng)前 Goroutine 加入到 Channel 的 sendq 或者 recvq 隊(duì)列中
除了將當(dāng)前 Goroutine 對(duì)應(yīng)的 runtime.sudog
結(jié)構(gòu)體加入隊(duì)列之外双炕,這些結(jié)構(gòu)體都會(huì)被串成鏈表附著在 Goroutine 上狞悲。在入隊(duì)之后會(huì)調(diào)用 runtime.gopark
掛起當(dāng)前 Goroutine 等待調(diào)度器的喚醒。
等到 select
中的一些 Channel 準(zhǔn)備就緒之后妇斤,當(dāng)前 Goroutine 就會(huì)被調(diào)度器喚醒摇锋。這時(shí)會(huì)繼續(xù)執(zhí)行 runtime.selectgo
函數(shù)的第三部分,從 runtime.sudog
中讀取數(shù)據(jù):
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
...
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
sglist = gp.waiting
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
casi = int(casei)
cas = k
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
c = cas.c
goto retc
...
}
第三次遍歷全部 case 時(shí)站超,我們會(huì)先獲取當(dāng)前 Goroutine 接收到的參數(shù) sudog 結(jié)構(gòu)乱投,我們會(huì)依次對(duì)比所有 case 對(duì)應(yīng)的 sudog 結(jié)構(gòu)找到被喚醒的 case球凰,獲取該 case 對(duì)應(yīng)的索引并返回骤公。
由于當(dāng)前的 select 結(jié)構(gòu)找到了一個(gè) case 執(zhí)行,那么剩下 case 中沒有被用到的 sudog 就會(huì)被忽略并且釋放掉墨吓。為了不影響 Channel 的正常使用媳纬,我們還是需要將這些廢棄的 sudog 從 Channel 中出隊(duì)双肤。
當(dāng)我們?cè)谘h(huán)中發(fā)現(xiàn)緩沖區(qū)中有元素或者緩沖區(qū)未滿時(shí)就會(huì)通過 goto 關(guān)鍵字跳轉(zhuǎn)到 bufrecv 和 bufsend 兩個(gè)代碼段,這兩段代碼的執(zhí)行過程都很簡單钮惠,它們只是向 Channel 中發(fā)送數(shù)據(jù)或者從緩沖區(qū)中獲取新數(shù)據(jù):
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
這里在緩沖區(qū)進(jìn)行的操作和直接調(diào)用 runtime.chansend
和 runtime.chanrecv
差不多茅糜,上述兩個(gè)過程在執(zhí)行結(jié)束之后都會(huì)直接跳到 retc
字段。
兩個(gè)直接收發(fā) Channel 的情況會(huì)調(diào)用運(yùn)行時(shí)函數(shù) runtime.send
和 runtime.recv
素挽,這兩個(gè)函數(shù)會(huì)與處于休眠狀態(tài)的 Goroutine 打交道:
recv:
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
兩個(gè)直接收發(fā) Channel 的情況會(huì)調(diào)用運(yùn)行時(shí)函數(shù) runtime.send
和 runtime.recv
蔑赘,這兩個(gè)函數(shù)會(huì)與處于休眠狀態(tài)的 Goroutine 打交道:
recv:
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc
不過如果向關(guān)閉的 Channel 發(fā)送數(shù)據(jù)或者從關(guān)閉的 Channel 中接收數(shù)據(jù),情況就稍微有一點(diǎn)復(fù)雜了:
- 從一個(gè)關(guān)閉 Channel 中接收數(shù)據(jù)會(huì)直接清除 Channel 中的相關(guān)內(nèi)容(1.20版本需要判斷有無緩沖區(qū))预明;
- 向一個(gè)關(guān)閉的 Channel 發(fā)送數(shù)據(jù)就會(huì)直接 panic 造成程序崩潰:
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
總體來看缩赛,select 語句中的 Channel 收發(fā)操作和直接操作 Channel 沒有太多出入,只是由于 select 多出了 default 關(guān)鍵字所以會(huì)支持非阻塞的收發(fā)撰糠。
參考
1.go語言channel
2.go 深入刨析
3.go ready函數(shù)
4.go夜讀