GO閱讀-同步編程-channel和select

非原創(chuàng)找默,搬運(yùn)工

目錄總匯

鏈接

思考

  • chanel的零值是什么趾访,對(duì)其發(fā)送數(shù)據(jù)會(huì)怎么樣
  • channel是并發(fā)安全的嗎
  • channel使用不當(dāng)會(huì)引發(fā)協(xié)程泄漏,請(qǐng)舉例

1.Channel的使用場(chǎng)景和基本用法

在GO語言中流行一句很廣的諺語

Don’t communicate by sharing memory, share memory by communicating.
--Go Proverbs by Rob Pike
直白來講就是執(zhí)行業(yè)務(wù)處理的協(xié)程不要通過共享內(nèi)存的方式通信翼抠,而是要通過channel的通信方式分享數(shù)據(jù)

  • communicate by sharing memory是傳統(tǒng)的并發(fā)編程處理方式咙轩,即對(duì)臨界區(qū)加鎖
  • share memory by communicating是類似CSP模型的方式,通過通信的方式阴颖,一個(gè)協(xié)程g可以把數(shù)據(jù)的“所有權(quán)”交給另一個(gè)g

也就是說channel類型可應(yīng)用于一下場(chǎng)景

  • 數(shù)據(jù)交流和傳遞:類似消息中間件的生產(chǎn)者和消費(fèi)者活喊,傳遞的數(shù)據(jù)也可以是信號(hào)
  • 提供并發(fā)保護(hù):類似鎖
  • 任務(wù)編排:既然可以傳遞信號(hào),意味著可以根據(jù)信號(hào)來讓g按一定順序執(zhí)行量愧,這就是編排功能

channel的元語定義如下

ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType 

當(dāng)然這個(gè)elem也可以是一個(gè)通道

chan<- chan int 
chan (<-chan int)

箭頭“<-”遵循最左原則钾菊,總是盡量和左邊的chan結(jié)合帅矗,所以需要注意配合括號(hào)使用
特別需要注意的是未初始化的chan的零值是nil(nil是chan的零值),可以后續(xù)使用make初始化

var c chan int //nil
c=make(chan int)

chan還可用于for-range

    for v := range ch {
        fmt.Println(v)
    }

那么忽略讀取的值结缚,只是清空chan就是

   for range ch {
    }

2.源碼走讀

2.1.創(chuàng)建通道m(xù)akechan

后續(xù)就以該demo代碼進(jìn)行邏輯梳理
我們分別創(chuàng)建了帶緩存和不帶緩存的chan

func main() {
    c := make(chan int)
    c1 := make(chan int, 5)
    close(c)
    close(c1)
    fmt.Println("vim-go")
}

對(duì)變量c定義的那一行打斷點(diǎn)

dlv debug main.go
b main.go:6
c
disass

channel匯編代碼

通過查看匯編代碼我們發(fā)現(xiàn)實(shí)際上是調(diào)用runtime.makechan方法(在之前學(xué)習(xí)make方法的時(shí)候我們就知道底層是這樣調(diào)用的损晤,現(xiàn)在是驗(yàn)證)
runtime/chan.go/makechan函數(shù)的定義

func makechan(t *chantype, size int) *hchan {
}

這個(gè)方法有點(diǎn)難看懂,但是看簽名可以知道红竭,最后得到了一個(gè)hchan的結(jié)構(gòu)體(1.17和1.20沒區(qū)別)

2.2chan數(shù)據(jù)結(jié)構(gòu)

type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters

        lock mutex
}
  • qcount 記錄循環(huán)隊(duì)列的元素?cái)?shù)量尤勋,直接看成chan中元素個(gè)數(shù)就行
  • datasize 循環(huán)隊(duì)列大小,就是make創(chuàng)建指定的長度
  • buf 循環(huán)隊(duì)列的指針
  • elemtype chan中元素類型
  • elemtsize chan中單個(gè)元素大小
  • sendx 發(fā)送指針send在buf中位置/索引
  • recvx 接收指針recv在buf中位置/索引
  • sendq和recvq 使用雙向鏈表(循環(huán)隊(duì)列)用于存儲(chǔ)等待的gorutine
  • lock 使用鎖保護(hù)所有字段茵宪,所有chan是并發(fā)安全的

2.3初始化

實(shí)際上go在編譯時(shí)會(huì)根據(jù)容量大小選擇調(diào)用makechan64或者makechan最冰,但兩者邏輯基本相同(makechan64只做了size檢查,底層還是調(diào)用makechan)
makechan會(huì)根據(jù)是否有緩沖區(qū)和元素類型是否為指針來初始化成不同的runtime.hchan

func makechan(t *chantype, size int) *hchan {
// elem是channel元素稀火,這里我們是chan int
        elem := t.elem

        ...略過一連串檢測(cè)代碼...
//math.MulUintptr是判斷申請(qǐng)的內(nèi)存空間會(huì)不會(huì)超過最大申請(qǐng)限制
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
       ......  

        var c *hchan
        switch {
//無緩沖區(qū)情況分支暖哨,只給hchan分配一段內(nèi)存
//即不創(chuàng)建buf
        case mem == 0:
                c = (*hchan)(mallocgc(hchanSize, nil, true))
                c.buf = c.raceaddr()
//chan元素不是指針的情況
//給hchan和緩沖區(qū)buf分配一塊連續(xù)內(nèi)存
        case elem.ptrdata == 0:
                c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
//hchan數(shù)據(jù)結(jié)構(gòu)后面緊接著就是buf
                c.buf = add(unsafe.Pointer(c), hchanSize)
//元素包含指針
//buf單獨(dú)分配一塊內(nèi)存
        default:
                c = new(hchan)
                c.buf = mallocgc(mem, elem, true)
        }

        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
//locakinit不太重要,根據(jù)函數(shù)名字可以推測(cè)是鎖初始化
        lockInit(&c.lock, lockRankHchan)

        return c
}

func (c *hchan) raceaddr() unsafe.Pointer {
        return unsafe.Pointer(&c.buf)
}

makechan無非是干了兩件數(shù):

  • 參數(shù)校驗(yàn)
  • 初始化hchan結(jié)構(gòu)體凰狞,其中根據(jù)元素是否為指針又對(duì)buf進(jìn)行了不同的內(nèi)存分配處理

擴(kuò)展-在線方法查看匯編代碼

ssa是編譯器經(jīng)過優(yōu)化生成的中間代碼篇裁,在網(wǎng)頁寫代碼之后直接查看,跟dlv一樣赡若,只是多一種同類型工具

2.4數(shù)據(jù)發(fā)送chansend

接下來會(huì)經(jīng)常遇到這兩個(gè)函數(shù),先有個(gè)映像:

  • gopark阻塞當(dāng)前協(xié)程
  • goready喚醒一個(gè)協(xié)程

查看底層匯編


image.png

ssa網(wǎng)頁查看結(jié)果

chan的發(fā)送實(shí)際上是調(diào)用runtime.chansend1方法达布,是chansend的封裝,分段學(xué)習(xí)他的邏輯(同樣略過一堆參數(shù)驗(yàn)證)

func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

其中參數(shù)block為true時(shí)表示發(fā)送操作是阻塞的逾冬,反之false就是想要發(fā)送是不阻塞的(遇到阻塞情況就直接返回false)

  • 第一部分判斷是否為nil黍聂,前面說過chan的零值是nil,會(huì)調(diào)用gopark讓調(diào)用者永久阻塞(所以接下來的throw也永遠(yuǎn)不會(huì)執(zhí)行)
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

驗(yàn)證阻塞身腻,測(cè)試結(jié)果顯示超時(shí)

func Test_chan(t *testing.T) {
    var c chan int
    c <- 1
    t.Log("hi")
}
  • 第二部分产还,如果chan沒被關(guān)閉,但是滿了嘀趟,又要求不阻塞發(fā)送脐区,就直接返回false
    if !block && c.closed == 0 && full(c) {
        return false
    }
  • 第三部分,若對(duì)一個(gè)已經(jīng)關(guān)閉的chan發(fā)送數(shù)據(jù)直接panic
     lock(&c.lock)
//驗(yàn)證通道是否關(guān)閉
//這里可以得知往已經(jīng)關(guān)閉的通道發(fā)送會(huì)引發(fā)panic
        if c.closed != 0 {
                unlock(&c.lock)
                panic(plainError("send on closed channel"))
        }

至此我們對(duì)chan的nil她按、close的情況進(jìn)行了檢查牛隅,接下里就開始發(fā)送數(shù)據(jù)

  • 第四部分,如果循環(huán)隊(duì)列有等待的人就直接扔給他(實(shí)際就是把通道內(nèi)的元素拷貝到接收變量的內(nèi)存地址上去)
     if sg := c.recvq.dequeue(); sg != nil {
                send(c, sg, ep, func() { unlock(&c.lock) }, 3)
                return true
        }

以下是擴(kuò)展閱讀
關(guān)于recvq是用于存儲(chǔ)等待的gorutine的尤溜,是waitq類型,實(shí)際就是一個(gè)循環(huán)隊(duì)列/雙向鏈表

type waitq struct {
        first *sudog
        last  *sudog
}

waitq.dequeue邏輯很簡單汗唱,就是遞歸讀取鏈表

func (q *waitq) dequeue() *sudog {
        for {
//先獲取頭部元素宫莱,這個(gè)是最后要被彈出去的
//頭部都沒有就說明是個(gè)空鏈表
                sgp := q.first
                if sgp == nil {
                        return nil
                }
                y := sgp.next
//彈出頭部后整理鏈表,更新first
//沒有下一個(gè)就意味著彈出頭部后是個(gè)空鏈表
//有就將下一個(gè)作為頭部元素
                if y == nil {
                        q.first = nil
                        q.last = nil
                } else {
                        y.prev = nil
                        q.first = y
                        sgp.next = nil 
                }
//如果第一個(gè)不滿足條件就扔掉重新來
                if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
                        continue
                }

                return sgp
        }
}

循環(huán)隊(duì)列調(diào)用send方法發(fā)送數(shù)據(jù)哩罪,主要邏輯是以下部分

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//拷貝變量地址到接收端的內(nèi)存地址
        if sg.elem != nil {
                sendDirect(c.elemtype, sg, ep)
                sg.elem = nil
        }
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        sg.success = true
        if sg.releasetime != 0 {
                sg.releasetime = cputicks()
        }
        goready(gp, skip+1)

主要干了以下兩件事情:

  • 調(diào)用sendDirect將發(fā)送數(shù)據(jù)直接拷貝到 接收channel通道元素的內(nèi)存地址上去
    即 x = c <-a 授霸,拷貝到x內(nèi)存地址上
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
//memove是內(nèi)存拷貝
        memmove(dst, src, t.size)
}
  • 調(diào)用goready
//這里進(jìn)行協(xié)程調(diào)度是為了喚醒那個(gè)在等待數(shù)據(jù)的協(xié)程
func goready(gp *g, traceskip int) {
    // 切換到g0的棧
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

該函數(shù)主要功能就是切換的g0棧執(zhí)行ready方法巡验,核心方法是gorutine狀態(tài)切換到runable,放入隊(duì)列等待P調(diào)度碘耳,這個(gè)在之前學(xué)習(xí)GMP的時(shí)候已經(jīng)很熟悉了显设,這就是喚醒協(xié)程(這里是數(shù)據(jù)接收方)的方法(注意是放入runnext,并沒有立即執(zhí)行辛辨,要等待P調(diào)度)

至此發(fā)送數(shù)據(jù)的第一種情況分析完畢:若有等待協(xié)程則直接發(fā)送捕捂,具體發(fā)送過程是將元素拷貝給接收變量內(nèi)存地址,喚醒數(shù)據(jù)接收方進(jìn)入?yún)f(xié)程調(diào)度
發(fā)送過程
  • 第五部分斗搞,若沒有等待協(xié)程指攒,且buf沒滿,數(shù)據(jù)放入buf僻焚,然后退出
//datasize緩沖區(qū)大小,qcount為放入元素?cái)?shù)量
//進(jìn)入該分支時(shí)此時(shí)緩沖區(qū)未滿允悦,且接收端沒人等
        if c.qcount < c.dataqsiz {
//計(jì)算出下一個(gè)存儲(chǔ)數(shù)據(jù)位置,sendx是發(fā)送指針位置
                qp := chanbuf(c, c.sendx)
//將剛進(jìn)入channel的元素發(fā)送到計(jì)算出的位置
                typedmemmove(c.elemtype, qp, ep)
//發(fā)送指針自增
                c.sendx++
// 緩沖區(qū)滿了虑啤,
//重置sendx
                if c.sendx == c.dataqsiz {
                        c.sendx = 0
                }
                c.qcount++
                unlock(&c.lock)
                return true
        }

qcount<datasize即buf中還有剩余空間隙弛,利用chanbuf計(jì)算出下一個(gè)存儲(chǔ)數(shù)據(jù)的位置,再利用typedmemove將發(fā)送數(shù)據(jù)拷貝到計(jì)算出的位置狞山,發(fā)送指針sendx+1,通道元素?cái)?shù)量qcount+1

  • 第6部分全闷,也就是第三種發(fā)送情況,走到這里說明是條件都不滿足:
    1.要么沒有緩沖區(qū)铣墨,也沒人消費(fèi)
    2.要么緩沖區(qū)buf大小不夠室埋,也沒人消費(fèi)
    需要阻塞當(dāng)前協(xié)程
    if !block {
        unlock(&c.lock)
        return false
    }
//func getg() *g嘗試獲取發(fā)送數(shù)據(jù)使用的協(xié)程
//就是自己了沒錯(cuò)
    gp := getg()
//獲取sudog結(jié)構(gòu)體并設(shè)置一次阻塞發(fā)送的配置
//將要發(fā)送的元素進(jìn)行單獨(dú)特例包裝
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
//扔到sendq鏈表里面去
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
//你個(gè)協(xié)程睡覺去吧你
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

acquireSudog返回一個(gè)sudog結(jié)構(gòu)體,并設(shè)置這一次阻塞發(fā)送的相關(guān)信息伊约,包括發(fā)送的協(xié)程姚淆,是否在select中發(fā)送等
sendq.enqueue就是和recvq的dequeue相反,是將其入隊(duì)屡律,等待條件滿足喚醒
調(diào)用gopark切走協(xié)程主動(dòng)讓出cpu執(zhí)行權(quán)限
剩下代碼就是被喚醒后做一些收尾的工作腌逢,如釋放內(nèi)存,return true表示成功發(fā)送數(shù)據(jù)

2.5只發(fā)送不接收引發(fā)協(xié)程泄漏

在上述源碼閱讀中我們發(fā)現(xiàn)超埋,如果到達(dá)第三種情況搏讶,即要么沒緩沖區(qū)也沒人接收,要么有緩沖區(qū)但是滿了霍殴,最后會(huì)阻塞協(xié)程進(jìn)入休眠

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

在以后學(xué)完GMP后我們會(huì)知道協(xié)程是分配在堆上面的媒惕,一直休眠就意味著該堆空間一直沒法回收,就引發(fā)了協(xié)程泄漏来庭,同理只接收不發(fā)送也會(huì)如此(指到達(dá)第三種情況)

2.6小結(jié)

至此通過閱讀chansend源碼妒蔚,數(shù)據(jù)發(fā)送(chan<-i)的三種情況已經(jīng)分析完畢:
1.剛好有協(xié)程在recvq中等待則直接給他
2.緩沖區(qū)有空位置(qcount<datasize)就放上去
3.都不滿足就創(chuàng)建sudo結(jié)構(gòu)體,包裝元素,扔到sendq鏈表中肴盏,阻塞當(dāng)前協(xié)程等待被喚醒
發(fā)送數(shù)據(jù)時(shí)觸發(fā)Gorutine調(diào)度的幾個(gè)時(shí)機(jī):
1.直接發(fā)送時(shí)會(huì)喚醒協(xié)程(send調(diào)用goready)科盛,等待調(diào)度
2.發(fā)送數(shù)據(jù)時(shí)沒找到接收方且緩存已滿就將自己入隊(duì)等待調(diào)度(gopark)

2.7.接收數(shù)據(jù)chanrecv

ssa截圖

本質(zhì)上是調(diào)用runtime.chanrecv1(是對(duì)chanrecv的封裝),
如果返回兩個(gè)值編譯器是調(diào)用chanrecv2

    func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
  }
  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
  }

chanrecv函數(shù)簽名菜皂,同樣也是分段學(xué)習(xí)

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
  • 第一部分同理檢查nil贞绵,后面的非阻塞也一樣就先跳過
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
  • 第二部分,討論被關(guān)閉的情況恍飘,如果已經(jīng)關(guān)閉且沒元素了就返回true和false榨崩,映射到開發(fā)者操作已關(guān)閉的chan,如果已關(guān)閉且沒元素常侣,接收chan的第二個(gè)參數(shù)返回false蜡饵;反之還有元素就返回true和true,映射到開發(fā)者就是返回true
    lock(&c.lock)

    if c.closed != 0 {
        if c.qcount == 0 {
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
//這個(gè)是1.20新增的代碼胳施,但是總體邏輯還是一樣的
//如果通道被關(guān)閉且緩沖區(qū)還有數(shù)據(jù)
//還能繼續(xù)接收
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }

因此這也是開發(fā)者不能單憑第二個(gè)參數(shù)來判斷通道是否已關(guān)閉的原因溯祸,實(shí)際開發(fā)中會(huì)類似content.Context再使用一個(gè)通道發(fā)送關(guān)閉信號(hào)


又到了真正接收數(shù)據(jù)的邏輯部分

  • 第三部分,有等待的發(fā)送方就直接去找他要數(shù)據(jù)
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

這里要注意recv會(huì)根據(jù)緩沖區(qū)大小分別處理

  • 不存在則調(diào)用recvDirect從sendq隊(duì)列上取

  • 存在則將數(shù)據(jù)拷貝到內(nèi)存舞肆,將發(fā)送隊(duì)列頭的數(shù)據(jù)拷貝到緩沖區(qū)中焦辅,釋放一個(gè)阻塞的發(fā)送方

  • 第四部分,沒等待的發(fā)送方椿胯,buf有數(shù)據(jù)直接拿

    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)

        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

邏輯類似上面發(fā)送代碼筷登,存到緩沖區(qū)中

  • 第五部分,buf沒元素哩盲,阻塞
    if !block {
        unlock(&c.lock)
        return false, false
    }

    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

還是獲取接收協(xié)程信息前方,sudog包裝入隊(duì)然后阻塞,最后收尾工作

2.8.關(guān)閉通道close

調(diào)用closechan方法
image.png

前面是異常處理廉油,跳過

c.closed = 1

    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = nil
        glist.push(gp)
    }

    for {
        sg := c.sendq.dequeue()
        ...
    }
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

清除sendq上的未被處理協(xié)程惠险,最后為所有被阻塞的調(diào)用goready觸發(fā)調(diào)度

3.channel使用容易犯的錯(cuò)誤

使用chan最常見的錯(cuò)誤是引發(fā)panic和協(xié)程泄露(一直gopark),其實(shí)我們通過源碼學(xué)習(xí)也知道了部分原因
引發(fā)panic的情況:

  • close為nil的chan
  • close已經(jīng)close的chan
  • send已經(jīng)close的chan

引發(fā)協(xié)程泄露的情況

  • 對(duì)nil的chan進(jìn)行發(fā)送和接收抒线,也是卡在gopark
  • 只接收不發(fā)送和只發(fā)送不接收班巩,這很好理解直接卡在gopark了
  • 還有一些特殊情況都是上面第二種的衍生
func process(timeout time.Duration) bool {
    ch := make(chan bool)
    go func() {
        // 模擬處理耗時(shí)的業(yè)務(wù)
        time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

此時(shí)如果time.After先超時(shí)了,那么go開啟的協(xié)程就會(huì)阻塞永遠(yuǎn)結(jié)束不了嘶炭,解決辦法就是給chan增加容量

4.select

在上面分析阻塞接收和阻塞發(fā)送時(shí)抱慌,我們都遇到結(jié)構(gòu)體sudo(進(jìn)入recvq或sendq隊(duì)列的結(jié)構(gòu)體,保存協(xié)程信息)眨猎,它有一個(gè)字段叫做isSelect抑进,判斷當(dāng)前執(zhí)行環(huán)境是否在select中,其實(shí)在學(xué)習(xí)select的時(shí)候我們就知道了睡陪,我們常將select和channel搭配使用寺渗,這也說明了兩者代碼是存在一定關(guān)系的夕凝。
但是當(dāng)我們想使用dlv對(duì)select打斷點(diǎn)時(shí)發(fā)現(xiàn)有些沒有效果,這段代碼會(huì)在編譯階段被優(yōu)化成其他户秤!

4.1無case的select

select{}
image.png

這個(gè)可以通過dlv捕捉到,內(nèi)部實(shí)現(xiàn)是runtime.block方法

func block() {
    gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // 永久阻塞
}

同時(shí)內(nèi)部也調(diào)用gopark(讓出當(dāng)前 Goroutine 對(duì)處理器的使用權(quán)并傳入等待原因 waitReasonSelectNoCases逮矛,上文的非阻塞發(fā)送有遇到過)進(jìn)行永久阻塞

4.2單一channel

demo.go

func main() {
    c := make(chan int)
    x := 0
    select {
    case c <- x:
        fmt.Println("hi")
    }
}
image.png

實(shí)際上是直接被編譯器優(yōu)化成接收方法鸡号,并沒有select的痕跡

4.3單一channel+default

demo.go

    select {
    case c <- x:
        fmt.Println("hi")
    default:
        fmt.Println("bye")
    }
image.png

調(diào)用selectnbsend,將c<-x改為 <-c


image.png

變成了調(diào)用selectnbrecv
阻塞發(fā)送的情況

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        return chansend(c, elem, false, getcallerpc())
}

內(nèi)部是對(duì)chansend和chanrecv的封裝须鼎,區(qū)別在于傳入的block=false鲸伴,還記得上文分析過,當(dāng)剛好沒有等待協(xié)程且緩沖區(qū)已滿時(shí)會(huì)進(jìn)入block判斷分支:

//chansend函數(shù)
if sg:= c.recvq.dequeue
   ....
if c.qucount < c.dataqsize
  ...

        if !block {
                unlock(&c.lock)
                return false
        }

也就是說遇到上面情況(沒有消費(fèi)協(xié)程且沒緩沖區(qū))會(huì)快速失敗晋控,而不是繼續(xù)向下執(zhí)行阻塞協(xié)程汞窗,來讓select后面的代碼有可執(zhí)行機(jī)會(huì)

4.3.1 default的應(yīng)用舉例

比如手寫一個(gè)消息隊(duì)列(這里我們采用一個(gè)發(fā)布者對(duì)應(yīng)多個(gè)訂閱者時(shí)使用多個(gè)通道的方式實(shí)現(xiàn)),有如下結(jié)構(gòu)體

type Msg struct {
    context string
}

type Broker struct {
    l  sync.RWMutex
    ch []chan Msg
}

訂閱方法如下

func (b *Broker) SubScribe(c int) (<-chan Msg, error) {
    b.l.Lock()
    defer b.l.Unlock()
    ch := make(chan Msg, c)
    b.ch = append(b.ch, ch)
    return ch, nil
}

發(fā)送消息如下

func (b *Broker) Send(m Msg) error {
    b.l.Lock()
    defer b.l.Unlock()
    for _, c := range b.ch {
        c<-m
    }
    return nil
}

這時(shí)我們就遇到問題赡译,如果一個(gè)訂閱者中的消息沒有被消費(fèi)導(dǎo)致他的通道滿了(即通道容量本來是10仲吏,現(xiàn)在發(fā)送第11個(gè)消息,前面都沒消費(fèi))蝌焚,就會(huì)導(dǎo)致后面全部卡住裹唆,此時(shí)我們可以修改成以下方式發(fā)送

        select {
        case c <- m:
        default:
            return errors.New("cant send")
        }

4.3.2多路復(fù)用(多個(gè)channel)

demo.go

    select {
    case c <- x:
        fmt.Println("hi")
    case <-c:
        fmt.Println(x)
    default:
        fmt.Println(x)
    }

image.png

變成了selectgo函數(shù),這是select里面的重頭戲(位于runtime/select.go)
先來看函數(shù)簽名

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
  • case0是一個(gè)類型為[ncases]scase的數(shù)組
  • order0 是一個(gè)指向[2*ncases]uint16數(shù)組只洒,值都為0

為什么 selectgo 還需要傳遞一個(gè) order0许帐,而不是直接根據(jù) ncase 直接分配呢
編譯轉(zhuǎn)換會(huì)使用 temp 函數(shù)來構(gòu)造生成數(shù)組的語句,而這個(gè)語句便可以保證數(shù)據(jù)會(huì)分配到棧上毕谴,而不是堆上成畦,避免了不必要的堆分配

  • selectgo 會(huì)返回選中的序號(hào),如果是個(gè)接收涝开,還會(huì)返回是否接收到一個(gè)值

select在go語言不存在相應(yīng)的結(jié)構(gòu)體循帐,但是使用的分支case在go中使用scase結(jié)構(gòu)體表示

type scase struct {
    c    *hchan         // chan
    elem unsafe.Pointer // data element
}
  • c 類型hchan已經(jīng)很熟悉了,就是一個(gè)通道的結(jié)構(gòu)體忠寻,來存儲(chǔ)case中使用的channel
    回到selectgo函數(shù)
//將case0數(shù)組和order0轉(zhuǎn)為slice結(jié)構(gòu)
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
   // [:n:n]的方式會(huì)讓slice的len和cap相等
    ncases := nsends + nrecvs
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]

下面看不懂沒關(guān)系惧浴,我也看不懂索性直接copy,只要知道它是隨機(jī)選取分支就好
首先會(huì)進(jìn)行執(zhí)行必要的初始化奕剃,決定處理case的兩個(gè)順序 :

  • 輪詢順序pollorder
  • 加鎖順序 lockorder

order1會(huì)被分為pollorder和lockorder衷旅,這兩個(gè)slice將會(huì)真正決定select的隨機(jī)選擇以及死鎖問題

    norder := 0
    for i := range scases {
        cas := &scases[i]
//對(duì)于channel為nil的收發(fā)操作,將他們的elem設(shè)置為nil
        if cas.c == nil {
            cas.elem = nil // allow GC
            continue
        }
// fastrabdb為生成隨機(jī)數(shù)函數(shù)
//porder剛開始是0纵朋,循環(huán)結(jié)束后是隨機(jī)順序的scases索引
        j := fastrandn(uint32(norder + 1))
        pollorder[norder] = pollorder[j]
        pollorder[j] = uint16(i)
        norder++
    }
    pollorder = pollorder[:norder]
    lockorder = lockorder[:norder]

     ....
     sellock(scases, lockorder)
     ....

這里的輪詢順序pollorder是隨機(jī)的柿顶,避免channel的饑餓問題,保證公平性操软,之后根據(jù)channel的地址順序確定加鎖順序來避免死鎖發(fā)生(sellock函數(shù))

如果多個(gè) goroutine 都需要鎖定 ch1 ch2嘁锯,而他們加鎖的順序不固定,那么很可能會(huì)出現(xiàn)死鎖問題
這個(gè)時(shí)候,對(duì)加鎖的順序就有要求了家乘,按照同樣的順序的話蝗羊,沒有競(jìng)爭到 ch1.lock 的 goroutine,會(huì)等待加鎖 ch1.lcok,而不會(huì)直接去加鎖 ch2.lock

func sellock(scases []scases, lockorder []int16) {
    var c *hchan
    for _, o := range lockorder {
        c0 := scases[0].c // 根據(jù)加鎖順序獲取 case

        // c 記錄了上次加鎖的 hchan 地址仁锯,如果和當(dāng)前 *hchan 相同耀找,那么就不會(huì)再次加鎖
        if c0 != nil && c0 != c {
            c = c0
            lock(&c.lock)
        }
    }
}

加鎖完成,進(jìn)入selectgo主循環(huán)邏輯
第一階段的主要職責(zé)是查找所有 case 中是否有可以立刻被處理的 Channel业崖。無論是在等待的 Goroutine 上還是緩沖區(qū)中野芒,只要存在數(shù)據(jù)滿足條件就會(huì)立刻處理

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    gp = getg()
    nextp = &gp.waiting
    for _, casei := range lockorder {
        casi = int(casei)
        cas = &scases[casi]
        c = cas.c
        sg := acquireSudog()
        sg.g = gp
        sg.c = c

        if casi < nsends {
            c.sendq.enqueue(sg)
        } else {
            c.recvq.enqueue(sg)
        }
    }

    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    ...
}

如果不能立刻找到活躍的 Channel 就會(huì)進(jìn)入循環(huán)的下一階段,按照需要將當(dāng)前 Goroutine 加入到 Channel 的 sendq 或者 recvq 隊(duì)列中
除了將當(dāng)前 Goroutine 對(duì)應(yīng)的 runtime.sudog 結(jié)構(gòu)體加入隊(duì)列之外双炕,這些結(jié)構(gòu)體都會(huì)被串成鏈表附著在 Goroutine 上狞悲。在入隊(duì)之后會(huì)調(diào)用 runtime.gopark 掛起當(dāng)前 Goroutine 等待調(diào)度器的喚醒。
等到 select 中的一些 Channel 準(zhǔn)備就緒之后妇斤,當(dāng)前 Goroutine 就會(huì)被調(diào)度器喚醒摇锋。這時(shí)會(huì)繼續(xù)執(zhí)行 runtime.selectgo 函數(shù)的第三部分,從 runtime.sudog 中讀取數(shù)據(jù):

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    sg = (*sudog)(gp.param)
    gp.param = nil

    casi = -1
    cas = nil
    sglist = gp.waiting
    for _, casei := range lockorder {
        k = &scases[casei]
        if sg == sglist {
            casi = int(casei)
            cas = k
        } else {
            c = k.c
            if int(casei) < nsends {
                c.sendq.dequeueSudoG(sglist)
            } else {
                c.recvq.dequeueSudoG(sglist)
            }
        }
        sgnext = sglist.waitlink
        sglist.waitlink = nil
        releaseSudog(sglist)
        sglist = sgnext
    }

    c = cas.c
    goto retc
    ...
}

第三次遍歷全部 case 時(shí)站超,我們會(huì)先獲取當(dāng)前 Goroutine 接收到的參數(shù) sudog 結(jié)構(gòu)乱投,我們會(huì)依次對(duì)比所有 case 對(duì)應(yīng)的 sudog 結(jié)構(gòu)找到被喚醒的 case球凰,獲取該 case 對(duì)應(yīng)的索引并返回骤公。

由于當(dāng)前的 select 結(jié)構(gòu)找到了一個(gè) case 執(zhí)行,那么剩下 case 中沒有被用到的 sudog 就會(huì)被忽略并且釋放掉墨吓。為了不影響 Channel 的正常使用媳纬,我們還是需要將這些廢棄的 sudog 從 Channel 中出隊(duì)双肤。

當(dāng)我們?cè)谘h(huán)中發(fā)現(xiàn)緩沖區(qū)中有元素或者緩沖區(qū)未滿時(shí)就會(huì)通過 goto 關(guān)鍵字跳轉(zhuǎn)到 bufrecv 和 bufsend 兩個(gè)代碼段,這兩段代碼的執(zhí)行過程都很簡單钮惠,它們只是向 Channel 中發(fā)送數(shù)據(jù)或者從緩沖區(qū)中獲取新數(shù)據(jù):

bufrecv:
    recvOK = true
    qp = chanbuf(c, c.recvx)
    if cas.elem != nil {
        typedmemmove(c.elemtype, cas.elem, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    selunlock(scases, lockorder)
    goto retc

bufsend:
    typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    selunlock(scases, lockorder)
    goto retc

這里在緩沖區(qū)進(jìn)行的操作和直接調(diào)用 runtime.chansendruntime.chanrecv 差不多茅糜,上述兩個(gè)過程在執(zhí)行結(jié)束之后都會(huì)直接跳到 retc 字段。

兩個(gè)直接收發(fā) Channel 的情況會(huì)調(diào)用運(yùn)行時(shí)函數(shù) runtime.sendruntime.recv素挽,這兩個(gè)函數(shù)會(huì)與處于休眠狀態(tài)的 Goroutine 打交道:

recv:
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

兩個(gè)直接收發(fā) Channel 的情況會(huì)調(diào)用運(yùn)行時(shí)函數(shù) runtime.sendruntime.recv蔑赘,這兩個(gè)函數(shù)會(huì)與處于休眠狀態(tài)的 Goroutine 打交道:

recv:
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

不過如果向關(guān)閉的 Channel 發(fā)送數(shù)據(jù)或者從關(guān)閉的 Channel 中接收數(shù)據(jù),情況就稍微有一點(diǎn)復(fù)雜了:

  • 從一個(gè)關(guān)閉 Channel 中接收數(shù)據(jù)會(huì)直接清除 Channel 中的相關(guān)內(nèi)容(1.20版本需要判斷有無緩沖區(qū))预明;
  • 向一個(gè)關(guān)閉的 Channel 發(fā)送數(shù)據(jù)就會(huì)直接 panic 造成程序崩潰:
rclose:
    selunlock(scases, lockorder)
    recvOK = false
    if cas.elem != nil {
        typedmemclr(c.elemtype, cas.elem)
    }
    goto retc

sclose:
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))

總體來看缩赛,select 語句中的 Channel 收發(fā)操作和直接操作 Channel 沒有太多出入,只是由于 select 多出了 default 關(guān)鍵字所以會(huì)支持非阻塞的收發(fā)撰糠。

參考

1.go語言channel
2.go 深入刨析
3.go ready函數(shù)
4.go夜讀

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末酥馍,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子阅酪,更是在濱河造成了極大的恐慌旨袒,老刑警劉巖汁针,帶你破解...
    沈念sama閱讀 211,423評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異砚尽,居然都是意外死亡施无,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門必孤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來帆精,“玉大人,你說我怎么就攤上這事隧魄。” “怎么了隘蝎?”我有些...
    開封第一講書人閱讀 157,019評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵购啄,是天一觀的道長。 經(jīng)常有香客問我嘱么,道長狮含,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,443評(píng)論 1 283
  • 正文 為了忘掉前任曼振,我火速辦了婚禮几迄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘冰评。我一直安慰自己映胁,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,535評(píng)論 6 385
  • 文/花漫 我一把揭開白布甲雅。 她就那樣靜靜地躺著解孙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抛人。 梳的紋絲不亂的頭發(fā)上弛姜,一...
    開封第一講書人閱讀 49,798評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音妖枚,去河邊找鬼廷臼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛绝页,可吹牛的內(nèi)容都是我干的荠商。 我是一名探鬼主播,決...
    沈念sama閱讀 38,941評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼续誉,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼结啼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起屈芜,我...
    開封第一講書人閱讀 37,704評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤郊愧,失蹤者是張志新(化名)和其女友劉穎朴译,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體属铁,經(jīng)...
    沈念sama閱讀 44,152評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡眠寿,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,494評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了焦蘑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盯拱。...
    茶點(diǎn)故事閱讀 38,629評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖例嘱,靈堂內(nèi)的尸體忽然破棺而出狡逢,到底是詐尸還是另有隱情,我是刑警寧澤拼卵,帶...
    沈念sama閱讀 34,295評(píng)論 4 329
  • 正文 年R本政府宣布奢浑,位于F島的核電站,受9級(jí)特大地震影響腋腮,放射性物質(zhì)發(fā)生泄漏雀彼。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,901評(píng)論 3 313
  • 文/蒙蒙 一即寡、第九天 我趴在偏房一處隱蔽的房頂上張望徊哑。 院中可真熱鬧,春花似錦聪富、人聲如沸莺丑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窒盐。三九已至,卻和暖如春钢拧,著一層夾襖步出監(jiān)牢的瞬間蟹漓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評(píng)論 1 266
  • 我被黑心中介騙來泰國打工源内, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留葡粒,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,333評(píng)論 2 360
  • 正文 我出身青樓膜钓,卻偏偏與公主長得像嗽交,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子颂斜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,499評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 設(shè)計(jì)理念 執(zhí)行業(yè)務(wù)處理的 goroutine 不要通過共享內(nèi)存的方式通信夫壁,而是要通過 Channel 通信的方式分...
    kyo1992閱讀 588評(píng)論 0 0
  • Don't communicate by sharing memory, share memory by comm...
    IceberGu閱讀 601評(píng)論 0 0
  • GO 中 Chan 實(shí)現(xiàn)原理分享 嗨,我是小魔童哪吒沃疮,還記得咱們之前分享過GO 通道 和sync包的使用嗎盒让?咱們來...
    阿兵云原生閱讀 371評(píng)論 0 5
  • channel是golang中特有的一種數(shù)據(jù)結(jié)構(gòu)梅肤,通常與goroutine一起使用,下面我們就介紹一下這種數(shù)據(jù)結(jié)構(gòu)...
    cfanbo閱讀 282評(píng)論 0 0
  • 設(shè)計(jì)原理 目前的 Channel 收發(fā)操作均遵循了先進(jìn)先出的設(shè)計(jì)邑茄,具體規(guī)則如下: 先從 Channel 讀取數(shù)據(jù)的...
    Xuenqlve閱讀 1,593評(píng)論 0 0