channel的使用及源碼解析

簡介

熟悉Go的人都知道差购,它提倡著不要通過共享內(nèi)存來通訊忍抽,而要通過通訊來共享內(nèi)存。Go提供了一種獨特的并發(fā)同步技術(shù)來實現(xiàn)通過通訊來共享內(nèi)存插掂,此技術(shù)即為通道灰瞻。我們可以把一個通道看作是在一個程序內(nèi)部的一個FIFO數(shù)據(jù)隊列。 一些協(xié)程可以向此通道發(fā)送數(shù)據(jù)辅甥,另外一些協(xié)程可以從此通道接收數(shù)據(jù)酝润。

Example

介紹一下簡單的用法:

func main() {
    c := make(chan int)
    go func() {
        c <- 1
    }()
    t := <-c
    fmt.Println(t)
}

幾個注意點(后面會一一驗證):

  1. 向一個nil通道中發(fā)送一個值,將會永久阻塞肆氓。
  2. 向一個已關(guān)閉的通道中發(fā)送一個值袍祖,將會導(dǎo)致panic。
  3. 可以從關(guān)閉的通道中讀取值谢揪,緩沖區(qū)為空時蕉陋,讀取的是通道類型的零值。
  4. 重復(fù)關(guān)閉一個通道也會導(dǎo)致panic拨扶。
  5. 通道的元素值的傳遞都是復(fù)制過程于样,且至少被復(fù)制過一次以上屠橄。(直接復(fù)制到receiver中經(jīng)過一次復(fù)制,通過緩沖區(qū)的話則經(jīng)歷了兩次復(fù)制)

channel有兩種類型,Unbuffered channels與Buffered channels

Unbuffered channels
    c:=make(chan int)

它是一個阻塞型channel蜀变,必須要receiver也準備好的情況下间唉,sender才能夠?qū)⑾⑼哆f到c中去啥繁「椋可以結(jié)合下圖進行思考一波:

Buffered channels
    c:=make(chan int,1)

在buf未滿之前,它是一個非阻塞型channel印蓖,sender可以將符合channel類型的值投遞到channel中去辽慕,它內(nèi)部會自己維護一個隊列。當buf滿了之后赦肃,sender會阻塞溅蛉。可以結(jié)合下圖進行思考一波:

幾種應(yīng)用模式

for-range
func forRange() {
    c := make(chan int, 5)
    for i := 1; i <= 5; i++ {
        c <- i
    }
    close(c)
    for v := range c {
        fmt.Println(v)
    }
}
  1. 在進行for-range一個通道時他宛,該循環(huán)將源源不斷的從通道中獲取數(shù)據(jù)船侧,直到此通道關(guān)閉并且它的緩沖隊列中為空為止。
  2. 這里的通道一定不能是單向發(fā)送通道(chan <- int)厅各。
  3. 當for-range一個空通道時镜撩,將會永久阻塞
select-case
func selectCase() {
    c := make(chan int, 1)
    c <- 1
    close(c)
    select {
    case <-c:
        fmt.Println("xxxx")
    default:
        fmt.Println("aaaa")
    }
}
  1. 每個case關(guān)鍵字后必須跟隨一個通道接收數(shù)據(jù)操作或者一個通道發(fā)送數(shù)據(jù)操作。

  2. 所有的非阻塞case操作中將有一個被隨機選擇執(zhí)行(而不是按照從上到下的順序)队塘,然后執(zhí)行此操作對應(yīng)的case分支代碼塊琐鲁。

  3. 在所有的case操作均為阻塞的情況下卫旱,如果default分支存在,則default分支代碼塊將得到執(zhí)行围段; 否則,當前協(xié)程將被推入所有阻塞操作中相關(guān)的通道的發(fā)送數(shù)據(jù)協(xié)程隊列或者接收數(shù)據(jù)協(xié)程隊列中投放,并進入阻塞狀態(tài)奈泪。

源碼分析

首先了解下channel是怎么創(chuàng)建的?

func main() {
    c := make(chan int, 1)//line: 9
    close(c) //line: 10
}

通過go tool compile -N -l -S main.go輸出其匯編代碼灸芳,截取一小段觀察一下:

0x0024 00036 (main.go:9)        LEAQ    type.chan int(SB), AX // 將&chantype(元素類型是int)放到AX寄存器中
0x002b 00043 (main.go:9)        PCDATA  $2, $0
0x002b 00043 (main.go:9)        MOVQ    AX, (SP) // 也就是將&chantype放到SP(0)位置
0x002f 00047 (main.go:9)        MOVQ    $1, 8(SP)// 將1放到SP(8)位置
0x0038 00056 (main.go:9)        CALL    runtime.makechan(SB)// makechan(SP0,SP8)
0x003d 00061 (main.go:9)        PCDATA  $2, $1
0x003d 00061 (main.go:9)        MOVQ    16(SP), AX
0x0042 00066 (main.go:9)        MOVQ    AX, "".c+24(SP)
0x0047 00071 (main.go:10)       PCDATA  $2, $0
0x0047 00071 (main.go:10)       MOVQ    AX, (SP)
0x004b 00075 (main.go:10)       CALL    runtime.closechan(SB)

在上面的流程的關(guān)鍵部分加上了注釋涝桅,也就是說咱們的make(chan int, 1)最終調(diào)用到了runtime.makechan這個方法。在進入分析之前先看看channel的結(jié)構(gòu):

type hchan struct {
    qcount   uint           // 隊列中實際有多少個元素
    dataqsiz uint           // channel的總長度(緩沖區(qū)的總長度)
    buf      unsafe.Pointer // 指向底層元素的指針
    elemsize uint16  // 元素類型的size
    closed   uint32  // 是否關(guān)閉烙样,0:未關(guān)閉冯遂, 1:已關(guān)閉
    elemtype *_type // 元素類型
    sendx    uint   // 發(fā)送位置索引
    recvx    uint   // 接收位置索引
    recvq    waitq  // 接收者隊列,一個雙向鏈表
    sendq    waitq  // 發(fā)送者隊列谒获,一個雙向鏈表

    lock mutex  // 鎖蛤肌,并發(fā)發(fā)送的時候需要上鎖
}

咱們可以將hchan中的buf簡單的看成一個數(shù)組緩沖區(qū),qcount是數(shù)組中實際存儲元素的數(shù)量批狱,dataqsiz是數(shù)組的容量裸准,elemtype是數(shù)組元素的類型。sendxrecvx分別是發(fā)送索引位置和接收索引位置赔硫,每次操作都會自增1炒俱,當sendxrecv等于dataqsiz時,會重置為零爪膊。recvqsendq都是雙向鏈表权悟,里面維護著等待接收和等待發(fā)送的goroutine。當多個gouroutine并發(fā)操作同一個channel時推盛,會使用lock進行控制峦阁。

創(chuàng)建流程
func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // 緩沖區(qū)中元素類型的尺寸不能超過16k
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    // 判斷是否位數(shù)對齊,
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    // 計算緩沖區(qū)的總長度小槐,并判斷是否溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
    case mem == 0:
        // channel長度或者元素類型尺寸為0時拇派,也就是緩沖區(qū)長度為0時,只用分配hchan所占用的內(nèi)存空間凿跳。
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // 元素類型不是指針類型件豌,則將hchan和buf一次性分配出來
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // 緩沖區(qū)buf的指針位置在c+hchanSize(hchanSize補齊為8的倍數(shù))
        c.buf = add(unsafe.Pointer(c), hchanSize,hchanSize補齊為8的倍數(shù))
    default:
        // 元素類型是指針類型,hchan和緩沖區(qū)單獨分配
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    // 元素的尺寸
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)


    return c
}

創(chuàng)建channel時共分為三種情況:

  1. 緩沖區(qū)大小為0的情況下控嗜,只用給hchan分配內(nèi)存即可茧彤。
  2. 當元素類型不為指針時,可以考慮分配一段連續(xù)的內(nèi)存疆栏,這樣方便垃圾回收曾掂。
  3. 當元素類型為指針時惫谤,需要給hchanbuf分別開辟空間。

最終都調(diào)用到mallocgc方法進行內(nèi)存的分配珠洗,分配過程這里不做過多的描述溜歪,后面會考慮寫一篇文章介紹一下分配的相關(guān)流程。

發(fā)送流程
func main() {
    c := make(chan int, 1)
    c <- 1 // line:9
    close(c) //line:10
}

通過go tool compile -N -l -S main.go輸出其匯編代碼许蓖,截取一小段觀察一下:

0x0057 00087 (main.go:9)        CALL    runtime.chansend1(SB)
0x005c 00092 (main.go:10)       PCDATA  $2, $1
0x005c 00092 (main.go:10)       PCDATA  $0, $0
0x005c 00092 (main.go:10)       MOVQ    "".c+24(SP), AX
0x0061 00097 (main.go:10)       PCDATA  $2, $0
0x0061 00097 (main.go:10)       MOVQ    AX, (SP)
0x0065 00101 (main.go:10)       CALL    runtime.closechan(SB)

從上面的匯編代碼可以清晰的看到蝴猪,c<-1就是一個簡單的語法糖,實際上底層調(diào)用的是runtime.chansend1方法膊爪,咱們跟蹤一下這個方法:

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

從上面的注釋

entry point for c <- x from compiled code

也可以看出來自阱,這段代碼是c <- x編譯后的一個切入點。接著看下一個調(diào)用棧:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        // 從這里可以觀察到米酬,向一個nil的channel中發(fā)送一個值將會導(dǎo)致永久阻塞
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
    }
    // 通過chansend1方法調(diào)用是不會進入這個條件語句的沛豌,在非阻塞并且通道未關(guān)閉
    // 的情況下,滿足 ①該channel是unbufferedChannel且接收隊列為空 ②該channel是bufferedChannel
    // 且緩沖區(qū)已滿赃额。 這兩個條件中任意一個條件就可以快速返回false加派。表示投遞失敗
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    // 進行上鎖,防止出現(xiàn)并發(fā)問題
    lock(&c.lock)

    if c.closed != 0 {
        // 通道已經(jīng)關(guān)閉爬早,解鎖的同時panic,這也證實了向一個已經(jīng)關(guān)閉的通道發(fā)送值會導(dǎo)致panic
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    if sg := c.recvq.dequeue(); sg != nil {
        // 從等待接收的隊列鏈表中取出一個接收者哼丈,進行值的拷貝
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    if c.qcount < c.dataqsiz {
        // 當該通道時bufferedChannel時,緩沖區(qū)還未滿的情況下筛严,從緩沖區(qū)中取出一個內(nèi)存塊
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 將發(fā)送值拷貝到上面取出的內(nèi)存塊上面去
        typedmemmove(c.elemtype, qp, ep)
        // 發(fā)送索引自增1
        c.sendx++
        if c.sendx == c.dataqsiz {
            // 復(fù)用緩沖區(qū)醉旦,當索引位到緩沖區(qū)最后一位時,置位0
            c.sendx = 0
        }
        // 緩沖區(qū)中存儲的元素自增1
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    // 獲取當前goroutine
    gp := getg()
    // 從當前goroutine所在的p上獲取一個sudog結(jié)構(gòu)體
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // 將要發(fā)送的值賦值給sudog的elem字段桨啃,后面chanrecv會用到
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 將mysg插入到發(fā)送隊列的尾部
    c.sendq.enqueue(mysg)
    // 進行channel的解鎖车胡,并且將當前goroutine置為waiting狀態(tài)。
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    // 這里進行闭振活一下匈棘,防止接受者還沒有拷貝過去,這個值就已經(jīng)被gc給回收了
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    // 將當前使用的sudog放到空閑池中析命,供下一次使用
    releaseSudog(mysg)
    return true
}

從上面的流程中主卫,咱們也驗證了使用時需要注意的兩個點:

  1. 向一個空的channel中發(fā)送值將會導(dǎo)致永遠阻塞。
  2. 向一個已關(guān)閉的channel中發(fā)送值將會導(dǎo)致panic鹃愤。

咱們可以將上述流程簡單的總結(jié)為三個段:

接收隊列不為空
if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
}
// 常用的鏈表操作手段簇搅,取出隊列中的第一個元素
func (q *waitq) dequeue() *sudog {
    for {
        sgp := q.first
        if sgp == nil {
            return nil
        }
        y := sgp.next
        if y == nil {
            q.first = nil
            q.last = nil
        } else {
            y.prev = nil
            q.first = y
            sgp.next = nil // mark as removed (see dequeueSudog)
        }

        ....

        return sgp
    }
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ....
    if sg.elem != nil {
        // 將sp,也就是發(fā)送值拷貝到sg.elem字段中
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    ....
    // 喚醒當前接收的goroutine
    goready(gp, skip+1)
}

第一步:從接收隊列中取出第一個sudog元素。

第二步:將發(fā)送的值拷貝到sudog的elem字段上软吐。

第三步:喚醒與當前sudog綁定的處于waiting狀態(tài)的goroutine瘩将。

這個情況下,發(fā)送值只經(jīng)過了一次拷貝,就被接收者消費掉了姿现。

緩沖區(qū)還有可用的位置
if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
}

這個情況下肠仪,發(fā)送值會先進行一次拷貝到緩沖區(qū)中。然后在有接收者的情況下备典,會從緩沖去再拷貝一次异旧,拷貝到接收者指定的內(nèi)存地址上。在這個過程中提佣,會先通過qp := chanbuf(c, c.sendx)方法取出緩沖區(qū)下一個可用的內(nèi)存塊泽艘,然后通過typedmemmove(c.elemtype, qp, ep)方法將值拷貝到相應(yīng)的內(nèi)存塊中去。最后增加相應(yīng)的索引位和緩沖區(qū)元素的個數(shù)镐依。這里在滿足條件的情況下會對索引位進行重置,進入下一個輪回天试。

阻塞發(fā)送
...
mysg := acquireSudog()
...
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
...

在不滿足上面兩個條件的情況下槐壳,當前goroutine會保存部分信息到channel的發(fā)送者隊列中,并且通過調(diào)用goparkunlock阻塞當前goroutine喜每,直到有接收者消費掉了保存該goroutine的sudog务唐,并調(diào)用goready方法,才會使當前陷入waiting狀態(tài)的goroutine被重新喚醒带兜。

接收流程
func main() {
    c := make(chan int, 1)
    close(c)
    <-c //line: 10
}

通過go tool compile -N -l -S main.go輸出其匯編代碼枫笛,截取一小段觀察一下:

0x0055 00085 (main.go:10)       MOVQ    AX, (SP)
0x0059 00089 (main.go:10)       MOVQ    $0, 8(SP)
0x0062 00098 (main.go:10)       CALL    runtime.chanrecv1(SB)
0x0067 00103 (main.go:11)       MOVQ    32(SP), BP
0x006c 00108 (main.go:11)       ADDQ    $40, SP

實際上在第10行會觸發(fā)runtime.chanrecv1方法,和上面的發(fā)送一樣刚照,也是個語法糖:

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

這個方法有兩個參數(shù)刑巧,一個是hchan指針,另一個是一個地址值无畔,后面會將從通道中取出來的值復(fù)制到該地址值中去啊楚。接下來看一下核心的chanrecv方法,for-range包括select-case最終都會調(diào)用到這個方法浑彰。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    if c == nil {
        if !block {
            return
        }
        // 從一個nil通道中讀取一個值也會被永遠的阻塞
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // 通過chanrecv1調(diào)用是不會進入這個代碼塊的恭理,在非阻塞的情況下,如果通道長度為0并且發(fā)送隊列為空郭变,
    // 或者通道長度大于0并且緩沖區(qū)中沒有元素并且通道未關(guān)閉的情況下颜价,進行快速返回
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    // 進行上鎖
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        // 通道已經(jīng)關(guān)閉,并且緩沖區(qū)已經(jīng)空了诉濒,返回該元素類型的零值
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    if sg := c.sendq.dequeue(); sg != nil {
        // 在發(fā)送隊列不為空的情況下周伦,取出一個sender,如果是unbufferedChannel循诉,則直接從sender中拷貝
        // 一個值到接收者横辆;如果是一個bufferedChannel,從緩沖隊列的recvx處取一個值,并且將sender中的
        // 值拷貝到recvx索引的位置
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    if c.qcount > 0 {
        // 直接從緩沖區(qū)中取出對應(yīng)recvx索引位置的值
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
       
        if ep != nil {
            // 拷貝qp中的值到指定的地址ep中
            typedmemmove(c.elemtype, ep, qp)
        }
        // 將qp中的元素值清空成零值
        typedmemclr(c.elemtype, qp)
        // 接收索引進行自增
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // 緩沖區(qū)元素長度遞減
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // 發(fā)送隊列為空,并且緩存區(qū)中也沒有值狈蚤,后續(xù)流程將會阻塞當前goroutine
    gp := getg()
    // 從當前g綁定的p中獲取一個sudog(這一塊是從一個緩存隊列中獲取)
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    // 將mysg放到接收隊列的尾部
    c.recvq.enqueue(mysg)
    // 將當前goroutine狀態(tài)變?yōu)閣aiting,并且解開channel的鎖
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    // 釋放sudog(重新丟到p對應(yīng)的緩存隊列中)
    releaseSudog(mysg)
    return true, !closed
}

接收流程和發(fā)送流程類似困肩,可大概歸結(jié)為四個段:

通道關(guān)閉且緩沖區(qū)為0
if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
        raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
        typedmemclr(c.elemtype, ep)
    }
    return true, false
}

上述代碼將直接返回一個對應(yīng)元素類型的零值,分配對應(yīng)元素零值的代碼typedmemclr(c.elemtype, ep)脆侮,感興趣的可以自己追蹤一下锌畸。

發(fā)送隊列不為空

這一塊可能和發(fā)送流程有點區(qū)別:

if sg := c.sendq.dequeue(); sg != nil {
    // 能進這里代表要么緩沖隊列滿了,要么該通道是一個無緩沖通道
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        // 代表這個通道是一個無緩沖通道
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // 直接從sender中拷貝值
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 從緩沖區(qū)中直接取出recvx索引位置的值
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // 拷貝緩沖區(qū)中的值到指定的ep地址上
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 這里多了一步流程靖避,會將sender中的值存放到剛剛recvx位置處
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒當前sender的goroutine
    goready(gp, skip+1)
}

這里會分兩種情況進行考慮:

  1. 通道是unbuffered channel潭枣,則直接拷貝從發(fā)送隊列中取出來的值。
  2. 通道是buffered channel幻捏,則拷貝從緩沖區(qū)中取出響應(yīng)的值盆犁,并且需要將發(fā)送隊列中取出來的值拷貝到對應(yīng)緩沖區(qū)的位置上。
sendq_not_null.png

在緩沖區(qū)是滿的情況下篡九,sendx和recvx指向同一個位置谐岁。例如上圖:咱們?nèi)〕鰎ecvx為2處的元素,然后會將sudog中的值拷貝到2位置處榛臼,同時sendx和recvx都指向3位置伊佃。

  1. 最終喚醒與當前sender sudog綁定的goroutine。
緩沖區(qū)不為空
if c.qcount > 0 {
    // 讀取緩沖區(qū)recvx索引處的值
    qp := chanbuf(c, c.recvx)
    if raceenabled {
        raceacquire(qp)
        racerelease(qp)
    }
    if ep != nil {
        // 將上述的值拷貝到指定的地址ep中
        typedmemmove(c.elemtype, ep, qp)
    }
    // 將recvx處的值置位零值
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
}

直接從緩沖區(qū)中讀取recvx索引位置中的值沛善,將其拷貝大指定的指針中航揉,然后將對應(yīng)recvx處置位零值。

sendq_is_null.png

如上圖所示金刁,取出索引位置為3的元素后帅涂,將其置位零值,發(fā)送者下次填充buf時就可以從index=3的位置開始填充胀葱。

阻塞接收

流程和上述的阻塞發(fā)送流程一致漠秋,不做過多介紹。

關(guān)閉

簡單的看一段代碼

func main() {
    c := make(chan int)// line:9
    close(c) //line:10
}

通過go tool compile -N -l -S main.go輸出其匯編代碼抵屿,截取一小段觀察一下:

  0x0038 00056 (main.go:9)        CALL    runtime.makechan(SB)
  0x003d 00061 (main.go:9)        PCDATA  $2, $1
  0x003d 00061 (main.go:9)        MOVQ    16(SP), AX //將makechan返回的結(jié)果放到AX寄存器
  0x0042 00066 (main.go:9)        MOVQ    AX, "".c+24(SP)
  0x0047 00071 (main.go:10)       PCDATA  $2, $0
  0x0047 00071 (main.go:10)       MOVQ    AX, (SP) //將AX寄存器中的值復(fù)制到SP(0)位置
  0x004b 00075 (main.go:10)       CALL    runtime.closechan(SB) //調(diào)用runtime.closechan方法

從上面的匯編代碼可以看出來庆锦,通道的關(guān)閉最終調(diào)用到了runtime.closechan這個方法。

func closechan(c *hchan) {
    // 關(guān)閉一個空的channel將會導(dǎo)致panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
        // 關(guān)閉一個已關(guān)閉的channel也會導(dǎo)致panic
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }
    // 將closed置位1轧葛,表示當前通道已關(guān)閉
    c.closed = 1

    var glist gList

    // 取出所有接收隊列中的元素搂抒,將其加入到一個單向鏈表中
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 取出所有發(fā)送隊列中的元素,將其加入到一個單向鏈表中
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // 喚醒阻塞的goroutine
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

關(guān)閉通道這一塊理解起來不太難尿扯,最開始對空channel和已關(guān)閉的channel做了panic處理求晶,后面進行資源的釋放。因為處于發(fā)送隊列和接收隊列的goroutine都是阻塞狀態(tài)的衷笋,咱們在關(guān)閉這個通道時必須得將這些goroutine都喚醒芳杏,防止goroutine泄露。

select-go流程

這里主要是分析一下select-go接收流程,發(fā)送流程類似爵赵。

單case分支(包含一個default)
func main() {
    c := make(chan int)
    close(c)
    select {
        case <-c://line: 11
    default:
    }
}

使用上面的編譯指令生成匯編代碼如下(截取部分):

0x0050 00080 (main.go:11)       MOVQ    $0, (SP)
0x0058 00088 (main.go:11)       PCDATA  $2, $1
0x0058 00088 (main.go:11)       PCDATA  $0, $0
0x0058 00088 (main.go:11)       MOVQ    "".c+24(SP), AX
0x005d 00093 (main.go:11)       PCDATA  $2, $0
0x005d 00093 (main.go:11)       MOVQ    AX, 8(SP)
0x0062 00098 (main.go:11)       CALL    runtime.selectnbrecv(SB)

跟蹤一下調(diào)用鏈:

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    // 這里不會進行阻塞吝秕,因為這里默認有個default分支,c中無法取出值的情況下默認執(zhí)行default分支即可
    // 具體實現(xiàn)流程參考上面的接收流程
    selected, _ = chanrecv(c, elem, false)
    return
}

單case分支(不包含default)
func main() {
    c := make(chan int)
    close(c)
    select {
    case <-c: //line:11
    }
}

使用上面的編譯指令生成匯編代碼如下(截取部分):

0x005a 00090 (main.go:11)       MOVQ    "".c+24(SP), AX
0x005f 00095 (main.go:11)       PCDATA  $2, $0
0x005f 00095 (main.go:11)       MOVQ    AX, (SP)
0x0063 00099 (main.go:11)       MOVQ    $0, 8(SP)
0x006c 00108 (main.go:11)       CALL    runtime.chanrecv1(SB)

跟蹤一下調(diào)用鏈:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    // 在從通道無法取出值的情況下空幻,會阻塞當前goroutine烁峭,等待被喚醒
    // 具體實現(xiàn)流程參考上面的接收流程
    chanrecv(c, elem, true)
}

多case分支
func main() {
    c := make(chan int)
    a := make(chan int)
    close(c)
    close(a)
    select { //line: 13
    case <-c:
    case <-a:
    default:

    }
}

使用上面的編譯指令生成匯編代碼如下(截取部分):

0x011d 00285 (main.go:13)       MOVQ    AX, (SP)
0x0121 00289 (main.go:13)       PCDATA  $2, $1
0x0121 00289 (main.go:13)       PCDATA  $0, $0
0x0121 00289 (main.go:13)       MOVQ    ""..autotmp_7+88(SP), AX
0x0126 00294 (main.go:13)       PCDATA  $2, $0
0x0126 00294 (main.go:13)       MOVQ    AX, 8(SP)
0x012b 00299 (main.go:13)       MOVQ    $3, 16(SP) // 表示case長度為3(包括default分支)
0x0134 00308 (main.go:13)       CALL    runtime.selectgo(SB)

跟蹤一下調(diào)用鏈(截取關(guān)鍵性流程代碼,省略了計算pollorderlockorder的部分):

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    // 獲取所有的case
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]

    for i := range scases {
        // 遍歷所以的case秕铛,將channel為空并且類型不是default的case置位一個空結(jié)構(gòu)體scase
        cas := &scases[i]
        if cas.c == nil && cas.kind != caseDefault {
            *cas = scase{}
        }
    }
    
    // 在進行select的時候會鎖住所有關(guān)聯(lián)的channel
    sellock(scases, lockorder)

    var (
        gp     *g
        sg     *sudog
        c      *hchan
        k      *scase
        sglist *sudog
        sgnext *sudog
        qp     unsafe.Pointer
        nextp  **sudog
    )

loop:
    // pass 1 - look for something already waiting
    var dfli int
    var dfl *scase
    var casi int
    var cas *scase
    var recvOK bool
    for i := 0; i < ncases; i++ {
        casi = int(pollorder[i])
        cas = &scases[casi]
        c = cas.c

        switch cas.kind {
        case caseNil:
            continue

        case caseRecv:
            sg = c.sendq.dequeue()
            if sg != nil {
                // 發(fā)送隊列不為空约郁,取隊列的head,進行值的拷貝
                goto recv
            }
            if c.qcount > 0 {
                // 從緩沖區(qū)去獲取值
                goto bufrecv
            }
            if c.closed != 0 {
                goto rclose
            }

        case caseSend:
            if c.closed != 0 {
                // 這里會panic
                goto sclose
            }
            sg = c.recvq.dequeue()
            if sg != nil {
                // 接收隊列不為空但两,取隊列head鬓梅,進行值的拷貝
                goto send
            }
            if c.qcount < c.dataqsiz {
                // 緩沖區(qū)還有可用位置,拷貝到緩沖區(qū)
                goto bufsend
            }

        case caseDefault:
            // 這個分支不會break-for循環(huán)谨湘,所以優(yōu)先級比較低一些
            dfli = casi
            dfl = cas
        }
    }

    if dfl != nil {
        // default被命中了己肮,直接返回
        selunlock(scases, lockorder)
        casi = dfli
        cas = dfl
        goto retc
    }

    // pass 2 - enqueue on all chans
    gp = getg()
    if gp.waiting != nil {
        throw("gp.waiting != nil")
    }
    nextp = &gp.waiting
   
    for _, casei := range lockorder {
        casi = int(casei)
        cas = &scases[casi]
        if cas.kind == caseNil {
            continue
        }
        c = cas.c
        sg := acquireSudog()
        sg.g = gp
        sg.isSelect = true
        
        sg.elem = cas.elem
        sg.releasetime = 0
        if t0 != 0 {
            sg.releasetime = -1
        }
        sg.c = c
        // Construct waiting list in lock order.
        *nextp = sg
        nextp = &sg.waitlink
         // 這里會將當前sudog放到每個case對應(yīng)的channel隊列中
        switch cas.kind {
        case caseRecv:
            c.recvq.enqueue(sg)

        case caseSend:
            c.sendq.enqueue(sg)
        }
    }

    // wait for someone to wake us up
    gp.param = nil
    // 阻塞當前goroutine,并給所有case對應(yīng)的channel解鎖
    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    // 被喚醒后立即鎖上select對應(yīng)的所有的channel
    sellock(scases, lockorder)

    gp.selectDone = 0
    // 因為喚醒的goroutine會將對應(yīng)的sudog或者nil(channel關(guān)閉)放到當前goroutine的param參數(shù)上
    // 取出命中的sudog供后面進行遍歷比對
    sg = (*sudog)(gp.param)
    gp.param = nil

    casi = -1
    cas = nil
    sglist = gp.waiting
    // 將sglist鏈路中所有sudog狀態(tài)信息和元素指針清空
    for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
        sg1.isSelect = false
        sg1.elem = nil
        sg1.c = nil
    }
    gp.waiting = nil
    // 一個個進行比對
    for _, casei := range lockorder {
        k = &scases[casei]
        if k.kind == caseNil {
            continue
        }
        
        if sg == sglist {
            // 找到命中的sudog悲关,記錄對應(yīng)的索引位
            casi = int(casei)
            cas = k
        } else {
            // 不是對應(yīng)的sudog,從對應(yīng)通道的隊列中移除
            c = k.c
            if k.kind == caseSend {
                c.sendq.dequeueSudoG(sglist)
            } else {
                c.recvq.dequeueSudoG(sglist)
            }
        }
        // 從sglist上移除
        sgnext = sglist.waitlink
        sglist.waitlink = nil
        // 將其歸還給緩存
        releaseSudog(sglist)
        sglist = sgnext
    }

    if cas == nil {
        // 通道被關(guān)閉也會喚醒所有隊列中的goroutine娄柳,所以這里再跑一次上面的循環(huán)即可
        // 便會直接走goto rclose或者goto sclose
        goto loop
    }

    c = cas.c

    if cas.kind == caseRecv {
        recvOK = true
    }

    selunlock(scases, lockorder)
    goto retc
// 后面部分配合發(fā)送流程和接收流程理解一下即可寓辱,和前面描述的基本一樣
bufrecv:
    recvOK = true
    qp = chanbuf(c, c.recvx)
    if cas.elem != nil {
        typedmemmove(c.elemtype, cas.elem, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    selunlock(scases, lockorder)
    goto retc

bufsend:
    // can send to buffer
    ....
    typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    selunlock(scases, lockorder)
    goto retc

recv:
    // can receive from sleeping sender (sg)
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    if debugSelect {
        print("syncrecv: cas0=", cas0, " c=", c, "\n")
    }
    recvOK = true
    goto retc

rclose:
    // read at end of closed channel
    selunlock(scases, lockorder)
    recvOK = false
    if cas.elem != nil {
        typedmemclr(c.elemtype, cas.elem)
    }
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

retc:
    return casi, recvOK

sclose:
    // 向一個已關(guān)閉的channel發(fā)送值,panic
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))
}

雖然上面的代碼開起來很長赤拒,但是邏輯十分清晰秫筏,去掉后半段的sendrecv挎挖,close部分这敬,核心的處理流程就不太多了。分幾步總結(jié)一下:

  1. 將通道為nil并且是非default類型的case設(shè)置值為scase{}蕉朵,可以讓后面的邏輯不對channel是否為空做判斷崔涂。
  2. 遍歷所有的case項,分四種情況進行判斷始衅。
    • 類型是空:直接跳過冷蚂。
    • 接收類型:先判斷發(fā)送隊列是否為空,不為空汛闸,結(jié)束循環(huán)蝙茶;再判斷緩沖區(qū)是否有數(shù)據(jù),有數(shù)據(jù)诸老,結(jié)束循環(huán)隆夯;最后判斷通道是否關(guān)閉,已關(guān)閉,結(jié)束循環(huán)蹄衷。
    • 發(fā)送類型:先判斷通道是否關(guān)閉忧额,已關(guān)閉,則panic宦芦;再判斷接收隊列是否為空宙址,不為空,結(jié)束循環(huán)调卑;判斷緩沖區(qū)是否已經(jīng)滿了抡砂,未滿,結(jié)束循環(huán)恬涧。
    • default類型:不會結(jié)束循環(huán)注益,所以優(yōu)先級最低。
  3. 在所有case都不滿足的情況下溯捆,當前goroutine就會進入waiting狀態(tài)丑搔,等待被喚醒。
  4. 喚醒后進行比對提揍,取出對應(yīng)的case索引即可啤月。
總結(jié)

本文對channel這一塊的知識點大概的介紹了一下,包括了使用案例以及源碼分析劳跃。其中源碼占了大半部分谎仲,可能會略顯枯燥,但是相信只要用心去看刨仑,都會get到一部分的郑诺。在咱們熟悉源碼理解原理后,將會幫助我們寫出更優(yōu)質(zhì)的代碼杉武。

參考資料
  1. 深入理解Golang之channel
  2. go101-通道的介紹
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末辙诞,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子轻抱,更是在濱河造成了極大的恐慌飞涂,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祈搜,死亡現(xiàn)場離奇詭異封拧,居然都是意外死亡,警方通過查閱死者的電腦和手機夭问,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門泽西,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人缰趋,你說我怎么就攤上這事捧杉∩录” “怎么了?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵味抖,是天一觀的道長评甜。 經(jīng)常有香客問我,道長仔涩,這世上最難降的妖魔是什么忍坷? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮熔脂,結(jié)果婚禮上佩研,老公的妹妹穿的比我還像新娘。我一直安慰自己霞揉,他們只是感情好旬薯,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著适秩,像睡著了一般绊序。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上秽荞,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天骤公,我揣著相機與錄音,去河邊找鬼扬跋。 笑死淋样,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的胁住。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼刊咳,長吁一口氣:“原來是場噩夢啊……” “哼彪见!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起娱挨,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤余指,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后跷坝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酵镜,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年柴钻,在試婚紗的時候發(fā)現(xiàn)自己被綠了淮韭。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡贴届,死狀恐怖靠粪,靈堂內(nèi)的尸體忽然破棺而出蜡吧,到底是詐尸還是另有隱情,我是刑警寧澤占键,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布昔善,位于F島的核電站,受9級特大地震影響畔乙,放射性物質(zhì)發(fā)生泄漏君仆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一牲距、第九天 我趴在偏房一處隱蔽的房頂上張望返咱。 院中可真熱鬧,春花似錦嗅虏、人聲如沸洛姑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽楞艾。三九已至,卻和暖如春龄广,著一層夾襖步出監(jiān)牢的瞬間硫眯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工择同, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留两入,地道東北人。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓敲才,卻偏偏與公主長得像裹纳,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子紧武,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348