先上結(jié)論吧
- select 是針對(duì)chan類型的, 所以case 只有default和chan(讀/寫(xiě))兩種
- 遍歷case的時(shí)候順序不確定福扬,但chan的優(yōu)先級(jí)比default高腕铸。當(dāng)有default和可執(zhí)行的chan時(shí),總是執(zhí)行chan
- 當(dāng)沒(méi)有default,且無(wú)可執(zhí)行的chan時(shí)忧换,阻塞
- select{}, 阻塞
開(kāi)始看源碼吧
scase
// case 的幾種類型
const (
caseNil = iota
caseRecv
caseSend
caseDefault
)
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element 數(shù)據(jù)元素
kind uint16 // 對(duì)應(yīng)const的那幾種類型
pc uintptr // race pc (for race detector / msan)
releasetime int64
}
入口
func reflect_rselect(cases []runtimeSelect) (int, bool) {
// 沒(méi)有case 時(shí)直接阻塞恬惯, 所以我們?cè)赿emo時(shí) main(){ .... select{} }
if len(cases) == 0 {
block()
}
sel := make([]scase, len(cases))
// 為什么是 2 倍呢? 后面的pollorder和lockorder會(huì)用到
order := make([]uint16, 2*len(cases))
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
sel[i] = scase{kind: caseDefault}
case selectSend:
sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
case selectRecv:
sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
}
if raceenabled || msanenabled {
selectsetpc(&sel[i])
}
}
return selectgo(&sel[0], &order[0], len(cases))
}
這里主要是初始化case數(shù)組亚茬。重點(diǎn)在selectgo中
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
if debugSelect {
print("select: cas0=", cas0, "\n")
}
// 指向case數(shù)組首地址
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// order1 長(zhǎng)度是 cas1 的兩倍
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// slice里面有兩個(gè)冒號(hào)什么意思呢? a[x:y:z] 切片長(zhǎng)度: y-x 切片容量:z-x
scases := cas1[:ncases:ncases]
// 輪詢順序
pollorder := order1[:ncases:ncases]
// chan 加鎖順序
lockorder := order1[ncases:][:ncases:ncases] //賦值完之后浓恳,其實(shí)只用了order1的 2*ncases長(zhǎng)度刹缝,pollorder占了前面ncases, lockorder占了后面ncases
// Replace send/receive cases involving nil channels with
// caseNil so logic below can assume non-nil channel.
for i := range scases {
cas := &scases[i]
if cas.c == nil && cas.kind != caseDefault {
*cas = scase{}
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
for i := 0; i < ncases; i++ {
scases[i].releasetime = -1
}
}
// 洗牌 打亂pollorder順序
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
// 下面一堆for循環(huán)颈将,根據(jù) hchan的地址排序梢夯, 生成lockorder加鎖順序
for i := 0; i < ncases; i++ {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
// 這里的sortkey() 其實(shí)只是返回內(nèi)存地址
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := ncases - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
if debugSelect {
for i := 0; i+1 < ncases; i++ {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
throw("select: broken sort")
}
}
}
// lock all the channels involved in the select
sellock(scases, lockorder)
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
loop:
// pass 1 - look for something already waiting
// CASE 1: case中有可執(zhí)行的chan, 或者存在default case
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
//開(kāi)始遍歷case數(shù)組了
for i := 0; i < ncases; i++ {
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
// chan 為空 下一輪循環(huán)
case caseNil:
continue
case caseRecv: // 接收chan
sg = c.sendq.dequeue()
//當(dāng)chan的send隊(duì)列存在 G 時(shí)
if sg != nil {
goto recv
}
// 當(dāng)chan的緩存隊(duì)列存在元素時(shí)
if c.qcount > 0 {
goto bufrecv
}
// 當(dāng)chan關(guān)閉時(shí)
if c.closed != 0 {
goto rclose
}
case caseSend: // 發(fā)送隊(duì)列
if raceenabled {
racereadpc(c.raceaddr(), cas.pc, chansendpc)
}
// chan關(guān)閉時(shí)
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
// chan的接收隊(duì)列存在 G 時(shí)
if sg != nil {
goto send
}
// chan的緩存隊(duì)列的元素少于緩存容量時(shí)
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault: // case default, 你看 default的情況并沒(méi)有結(jié)束循環(huán)晴圾,說(shuō)明 chan的優(yōu)先級(jí)比default高
dfli = casi
dfl = cas
}
}
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// pass 2 - enqueue on all chans
// CASE 2: 將當(dāng)前的 G 加入的 chan 的等待隊(duì)列中
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink
switch cas.kind {
case caseRecv:
// 加入等待接收隊(duì)列
// 不斷的循環(huán)是不是會(huì)導(dǎo)致隊(duì)列邊長(zhǎng)呢颂砸? 其實(shí)不是的, 因?yàn)樵贑ASE 1 的時(shí)候有做出棧操作
c.recvq.enqueue(sg)
case caseSend:
// 加入等待發(fā)送隊(duì)列
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
gp.param = nil
// 當(dāng)前 G 進(jìn)入休眠
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
sellock(scases, lockorder)
gp.selectDone = 0
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
// CASE 3: 被喚醒死姚, 這種情況是不存在default的時(shí)候
casi = -1
cas = nil
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sglist.releasetime > 0 {
k.releasetime = sglist.releasetime
}
// 這段代碼一直沒(méi)想明白人乓,直到我回想到chan的send()方法時(shí),才有些明白了都毒。
// sg = (*sudog)(gp.param), gp.param其實(shí)就是sudog色罚,也就是加入等待隊(duì)列時(shí)的sudog。
// 當(dāng)被喚醒時(shí)账劲,喚醒的是gp.param戳护,所以遍歷等待隊(duì)列 判斷sudog相等就可以確定是哪個(gè)case了
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else {
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
// 沒(méi)找到對(duì)應(yīng)的case, 重新進(jìn)入loop
if cas == nil {
goto loop
}
c = cas.c
if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
if cas.kind == caseRecv {
recvOK = true
}
if raceenabled {
if cas.kind == caseRecv && cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
} else if cas.kind == caseSend {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
}
if msanenabled {
if cas.kind == caseRecv && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
} else if cas.kind == caseSend {
msanread(cas.elem, c.elemtype.size)
}
}
selunlock(scases, lockorder)
goto retc
bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
raceacquire(chanbuf(c, c.recvx))
racerelease(chanbuf(c, c.recvx))
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
// 將chan緩存中的數(shù)據(jù)拷貝到 case.elem。 eg: a := <-ch, a就是case.elem
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
// 將cas.elem拷貝到chan的緩存中瀑焦。eg: ch <- a, a 就是 cas.elem
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc
rclose:
// read at end of closed channel
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc
send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc
retc:
if cas.releasetime > 0 {
blockevent(cas.releasetime-t0, 1)
}
return casi, recvOK
sclose:
// send on closed channel
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
bufrecv腌且、bufsend、recv榛瓮、rclose铺董、send最終都會(huì)跳轉(zhuǎn)到retc。 這里涉及到一些channel的知識(shí)榆芦,有興趣的可以看我另一篇關(guān)于channel的文章 http://www.reibang.com/p/9dd5e77469da