golang筆記——channel底層原理

一悯衬、什么是CSP

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共享內(nèi)存來通信,而要通過通信來實現(xiàn)內(nèi)存共享。
這就是 Go 的并發(fā)哲學(xué)浅妆,它依賴 CSP 模型皿渗,基于 channel 實現(xiàn)樊拓。

CSP 經(jīng)常被認(rèn)為是 Go 在并發(fā)編程上成功的關(guān)鍵因素洁闰。CSP 全稱是 “Communicating Sequential Processes”,這也是 Tony Hoare 在 1978 年發(fā)表在 ACM 的一篇論文迟赃。論文里指出一門編程語言應(yīng)該重視 input 和 output 的原語陪拘,尤其是并發(fā)編程的代碼。

Go 一開始就把 CSP 的思想融入到語言的核心里纤壁,所以并發(fā)編程成為 Go 的一個獨特的優(yōu)勢左刽,而且很容易理解。

Go 的并發(fā)原則非常優(yōu)秀酌媒,目標(biāo)就是簡單:盡量使用 channel欠痴;把 goroutine 當(dāng)作免費的資源,隨便用秒咨。

二喇辽、channel底層數(shù)據(jù)結(jié)構(gòu)

底層數(shù)據(jù)結(jié)構(gòu)源碼:

type hchan struct {
    // chan 里元素數(shù)量
    qcount   uint
    // chan 底層循環(huán)數(shù)組的長度
    dataqsiz uint
    // 指向底層循環(huán)數(shù)組的指針
    // 只針對有緩沖的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被關(guān)閉的標(biāo)志
    closed   uint32
    // chan 中元素類型
    elemtype *_type // element type
    //有緩沖channel內(nèi)的緩沖數(shù)組會被作為一個“環(huán)型”來使用。
    //當(dāng)下標(biāo)超過數(shù)組容量后會回到第一個位置雨席,所以需要有兩個字段記錄當(dāng)前讀和寫的下標(biāo)位置
    sendx    uint   // 下一次發(fā)送數(shù)據(jù)的下標(biāo)位置
    recvx    uint   // 下一次讀取數(shù)據(jù)的下標(biāo)位置
    //當(dāng)循環(huán)數(shù)組中沒有數(shù)據(jù)時菩咨,收到了接收請求,那么接收數(shù)據(jù)的變量地址將會寫入讀等待隊列
    //當(dāng)循環(huán)數(shù)組中數(shù)據(jù)已滿時,收到了發(fā)送請求抽米,那么發(fā)送數(shù)據(jù)的變量地址將寫入寫等待隊列
    recvq    waitq  // 讀等待隊列
    sendq    waitq  // 寫等待隊列

    // 保護(hù) hchan 中所有字段
    lock mutex
}

waitqsudog 的一個雙向鏈表特占,而 sudog 實際上是對 goroutine 的一個封裝:

type waitq struct {
    first *sudog
    last  *sudog
}

例如,創(chuàng)建一個容量為 6 的云茸,元素為 int 型的 channel 數(shù)據(jù)結(jié)構(gòu)如下 :


總結(jié)hchan結(jié)構(gòu)體的主要組成部分有四個:

  • 用來保存goroutine之間傳遞數(shù)據(jù)的循環(huán)鏈表是目。=====> buf。
  • 用來記錄此循環(huán)鏈表當(dāng)前發(fā)送或接收數(shù)據(jù)的下標(biāo)值标捺。=====> sendx和recvx懊纳。
  • 用于保存向該chan發(fā)送和從改chan接收數(shù)據(jù)的goroutine的隊列。=====> sendq 和 recvq
  • 保證channel寫入和讀取數(shù)據(jù)時線程安全的鎖亡容。 =====> lock

創(chuàng)建

我們知道嗤疯,通道有兩個方向,發(fā)送和接收萍倡。理論上來說身弊,我們可以創(chuàng)建一個只發(fā)送或只接收的通道,但是這種通道創(chuàng)建出來后列敲,怎么使用呢?一個只能發(fā)的通道帖汞,怎么接收呢戴而?同樣,一個只能收的通道翩蘸,如何向其發(fā)送數(shù)據(jù)呢所意?

創(chuàng)建 chan 的函數(shù)是 makechan:

const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // 省略了檢查 channel size,align 的代碼
    // ……

    var c *hchan
    // 如果元素類型不含指針 或者 size 大小為 0(無緩沖類型)
    // 只進(jìn)行一次內(nèi)存分配
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // 如果 hchan 結(jié)構(gòu)體中不含指針催首,GC 就不會掃描 chan 中的元素
        // 只分配 "hchan 結(jié)構(gòu)體大小 + 元素大小*個數(shù)" 的內(nèi)存
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 如果是緩沖型 channel 且元素大小不等于 0(大小等于 0的元素類型:struct{})
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            // 1. 非緩沖型的扶踊,buf 沒用,直接指向 chan 起始地址處
            // 2. 緩沖型的郎任,能進(jìn)入到這里秧耗,說明元素?zé)o指針且元素類型為 struct{},也無影響
            // 因為只會用到接收和發(fā)送游標(biāo)舶治,不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內(nèi)容)
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // 進(jìn)行兩次內(nèi)存分配操作
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    // 循環(huán)數(shù)組長度
    c.dataqsiz = uint(size)

    // 返回 hchan 指針
    return c
}

從函數(shù)原型來看分井,創(chuàng)建的 chan 是一個指針。所以我們能在函數(shù)間直接傳遞 channel霉猛,而不用傳遞 channel 的指針尺锚。

新建一個 chan 后,內(nèi)存在堆上分配惜浅,大概長這樣:


三瘫辩、向channel發(fā)送數(shù)據(jù)

源碼分析

發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù),直接上源碼,同樣大部分都注釋了伐厌,可以看懂主流程:

// 位于 src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞承绸,直接返回 false,表示未發(fā)送成功
        if !block {
            return false
        }
        // 當(dāng)前 goroutine 被掛起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相關(guān)……

    // 對于不阻塞的 send弧械,快速檢測失敗場景
    //
    // 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間八酒。這可能是:
    // 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
    // 2. channel 是緩沖型的刃唐,但循環(huán)數(shù)組已經(jīng)裝滿了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 鎖住 channel羞迷,并發(fā)安全
    lock(&c.lock)

    // 如果 channel 關(guān)閉了
    if c.closed != 0 {
        // 解鎖
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收隊列里有 goroutine,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 對于緩沖型的 channel画饥,如果還有緩沖空間
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 將數(shù)據(jù)從 ep 處拷貝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 發(fā)送游標(biāo)值加 1
        c.sendx++
        // 如果發(fā)送游標(biāo)值等于容量值衔瓮,游標(biāo)值歸 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 緩沖區(qū)的元素數(shù)量加一
        c.qcount++

        // 解鎖
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,則直接返回錯誤
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 滿了抖甘,發(fā)送方會被阻塞热鞍。接下來會構(gòu)造一個 sudog

    // 獲取當(dāng)前 goroutine 的指針
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 當(dāng)前 goroutine 進(jìn)入發(fā)送等待隊列
    c.sendq.enqueue(mysg)

    // 當(dāng)前 goroutine 被掛起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 從這里開始被喚醒了(channel 有機(jī)會可以發(fā)送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被喚醒后,channel 關(guān)閉了衔彻∞背瑁坑爹啊,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上綁定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

上面的代碼注釋地比較詳細(xì)了艰额,我們來詳細(xì)看看澄港。

  • 如果檢測到 channel 是空的,當(dāng)前 goroutine 會被掛起柄沮。
  • 對于不阻塞的發(fā)送操作回梧,如果 channel 未關(guān)閉并且沒有多余的緩沖空間(說明:a. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine祖搓;b. channel 是緩沖型的狱意,但循環(huán)數(shù)組已經(jīng)裝滿了元素)

對于這一點,runtime 源碼里注釋了很多拯欧。這一條判斷語句是為了在不阻塞發(fā)送的場景下快速檢測到發(fā)送失敗详囤,好快速返回。

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
}
  1. 如果檢測到 channel 已經(jīng)關(guān)閉哈扮,直接 panic纬纪。
  2. 如果能從等待接收隊列 recvq 里出隊一個 sudog(代表一個 goroutine),說明此時 channel 是空的滑肉,沒有元素包各,所以才會有等待接收者。這時會調(diào)用 send 函數(shù)將元素直接從發(fā)送者的棸忻恚拷貝到接收者的棧问畅,關(guān)鍵操作由 sendDirect 函數(shù)完成。
// send 函數(shù)處理向一個空的 channel 發(fā)送操作

// ep 指向被發(fā)送的元素,會被直接拷貝到接收的 goroutine
// 之后护姆,接收的 goroutine 會被喚醒
// c 必須是空的(因為等待隊列里有 goroutine矾端,肯定是空的)
// c 必須被上鎖,發(fā)送操作執(zhí)行完后卵皂,會使用 unlockf 函數(shù)解鎖
// sg 必須已經(jīng)從等待隊列里取出來了
// ep 必須是非空秩铆,并且它指向堆或調(diào)用者的棧
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 省略一些用不到的
    // ……

    // sg.elem 指向接收到的值存放的位置,如 val <- ch灯变,指的就是 &val
    if sg.elem != nil {
        // 直接拷貝內(nèi)存(從發(fā)送者到接收者)
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // sudog 上綁定的 goroutine
    gp := sg.g
    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒接收的 goroutine. skip 和打印棧相關(guān)殴玛,暫時不理會
    goready(gp, skip+1)
}

繼續(xù)看 sendDirect 函數(shù):

// 向一個非緩沖型的 channel 發(fā)送數(shù)據(jù)、從一個無元素的(非緩沖型或緩沖型但空)的 channel
// 接收數(shù)據(jù)添祸,都會導(dǎo)致一個 goroutine 直接操作另一個 goroutine 的棧
// 由于 GC 假設(shè)對棧的寫操作只能發(fā)生在 goroutine 正在運行中并且由當(dāng)前 goroutine 來寫
// 所以這里實際上違反了這個假設(shè)滚粟。可能會造成一些問題刃泌,所以需要用到寫屏障來規(guī)避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src 在當(dāng)前 goroutine 的棧上凡壤,dst 是另一個 goroutine 的棧

    // 直接進(jìn)行內(nèi)存"搬遷"
    // 如果目標(biāo)地址的棧發(fā)生了棧收縮,當(dāng)我們讀出了 sg.elem 后
    // 就不能修改真正的 dst 位置的值了
    // 因此需要在讀和寫之前加上一個屏障
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

這里涉及到一個 goroutine 直接寫另一個 goroutine 棧的操作耙替,一般而言亚侠,不同 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設(shè)俗扇。為了不出問題盖奈,寫的過程中增加了寫屏障,保證正確地完成寫操作狐援。這樣做的好處是減少了一次內(nèi)存 copy:不用先拷貝到 channel 的 buf,直接由發(fā)送者到接收者究孕,沒有中間商賺差價啥酱,效率得以提高,完美厨诸。

然后镶殷,解鎖、喚醒接收者微酬,等待調(diào)度器的光臨绘趋,接收者也得以重見天日,可以繼續(xù)執(zhí)行接收操作之后的代碼了颗管。

  • 如果 c.qcount < c.dataqsiz陷遮,說明緩沖區(qū)可用(肯定是緩沖型的 channel)。先通過函數(shù)取出待發(fā)送元素應(yīng)該去到的位置:
qp := chanbuf(c, c.sendx)

// 返回循環(huán)隊列里第 i 個元素的地址處
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

c.sendx 指向下一個待發(fā)送元素在循環(huán)數(shù)組中的位置垦江,然后調(diào)用 typedmemmove 函數(shù)將其拷貝到循環(huán)數(shù)組中帽馋。之后 c.sendx 加 1,元素總量加 1 :c.qcount++,最后绽族,解鎖并返回姨涡。

  • 如果沒有命中以上條件的,說明 channel 已經(jīng)滿了吧慢。不管這個 channel 是緩沖型的還是非緩沖型的涛漂,都要將這個 sender “關(guān)起來”(goroutine 被阻塞)。如果 block 為 false检诗,直接解鎖匈仗,返回 false。
  • 最后就是真的需要被阻塞的情況岁诉。先構(gòu)造一個 sudog锚沸,將其入隊(channel 的 sendq 字段)。然后調(diào)用 goparkunlock 將當(dāng)前 goroutine 掛起涕癣,并解鎖哗蜈,等待合適的時機(jī)再喚醒。

喚醒之后坠韩,從 goparkunlock 下一行代碼開始繼續(xù)往下執(zhí)行距潘。

這里有一些綁定操作,sudog 通過 g 字段綁定 goroutine只搁,而 goroutine 通過 waiting 綁定 sudog音比,sudog 還通過 elem 字段綁定待發(fā)送元素的地址,以及 c 字段綁定被“坑”在此處的 channel氢惋。

所以洞翩,待發(fā)送的元素地址其實是存儲在 sudog 結(jié)構(gòu)體里,也就是當(dāng)前 goroutine 里焰望。

四骚亿、從channel接收數(shù)據(jù)

接收操作有兩種寫法,一種帶 “ok”熊赖,反應(yīng) channel 是否關(guān)閉来屠;一種不帶 “ok”,這種寫法震鹉,當(dāng)接收到相應(yīng)類型的零值時無法知道是真實的發(fā)送者發(fā)送過來的值俱笛,還是 channel 被關(guān)閉后,返回給接收者的默認(rèn)類型的零值传趾。兩種寫法迎膜,都有各自的應(yīng)用場景。

經(jīng)過編譯器的處理后墨缘,這兩種寫法最后對應(yīng)源碼里的這兩個函數(shù):

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv1 函數(shù)處理不帶 “ok” 的情形星虹,chanrecv2 則通過返回 “received” 這個字段來反應(yīng) channel 是否被關(guān)閉零抬。接收值則比較特殊,會“放到”參數(shù) elem 所指向的地址了宽涌,這很像 C/C++ 里的寫法平夜。如果代碼里忽略了接收值,這里的 elem 為 nil卸亮。

無論如何忽妒,最終轉(zhuǎn)向了 chanrecv 函數(shù):

// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址。
// 如果 ep 是 nil兼贸,說明忽略了接收值段直。
// 如果 block == false,即非阻塞型接收溶诞,在沒有數(shù)據(jù)可接收的情況下鸯檬,返回 (false, false)
// 否則,如果 c 處于關(guān)閉狀態(tài)螺垢,將 ep 指向的地址清零喧务,返回 (true, false)
// 否則,用返回值填充 ep 指向的內(nèi)存地址枉圃。返回 (true, true)
// 如果 ep 非空功茴,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 內(nèi)容 …………

    // 如果是一個 nil 的 channel
    if c == nil {
        // 如果不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 否則孽亲,接收一個 nil 的 channel坎穿,goroutine 掛起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不會執(zhí)行到這里
        throw("unreachable")
    }

    // 在非阻塞模式下,快速檢測到失敗返劲,不用獲取鎖玲昧,快速返回
    // 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
    // 1. 非緩沖型,等待發(fā)送列隊 sendq 里沒有 goroutine 在等待
    // 2. 緩沖型篮绿,但 buf 里沒有元素
    // 之后酌呆,又觀察到 closed == 0,即 channel 未關(guān)閉搔耕。
    // 因為 channel 不可能被重復(fù)打開,所以前一個觀測的時候 channel 也是未關(guān)閉的痰娱,
    // 因此在這種情況下可以直接宣布接收失敗弃榨,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
    // 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
    // 也就是說即使是關(guān)閉狀態(tài)梨睁,但在緩沖型的 channel鲸睛,
    // buf 里有元素的情況下還能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解鎖
        unlock(&c.lock)
        if ep != nil {
            // 從一個已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
            // 那么接收的值將是一個該類型的零值
            // typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
            typedmemclr(c.elemtype, ep)
        }
        // 從一個已關(guān)閉的 channel 接收坡贺,selected 會返回true
        return true, false
    }

    // 等待發(fā)送隊列里有 goroutine 存在官辈,說明 buf 是滿的
    // 這有可能是:
    // 1. 非緩沖型的 channel
    // 2. 緩沖型的 channel箱舞,但 buf 滿了
    // 針對 1,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
    // 針對 2拳亿,接收到循環(huán)數(shù)組頭部的元素晴股,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩沖型,buf 里有元素肺魁,可以正常接收
    if c.qcount > 0 {
        // 直接從循環(huán)數(shù)組里找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代碼里电湘,沒有忽略要接收的值,不是 "<- ch"鹅经,而是 "val <- ch"寂呛,ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游標(biāo)向前移動
        c.recvx++
        // 接收游標(biāo)歸零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 數(shù)組里的元素個數(shù)減 1
        c.qcount--
        // 解鎖
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解鎖瘾晃。selected 返回 false贷痪,因為沒有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下來就是要被阻塞的情況了
    // 構(gòu)造一個 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收數(shù)據(jù)的地址保存下來
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 進(jìn)入channel 的等待接收隊列
    c.recvq.enqueue(mysg)
    // 將當(dāng)前 goroutine 掛起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被喚醒了,接著從這里繼續(xù)執(zhí)行一些掃尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
  1. 如果 channel 是一個空值(nil)蹦误,在非阻塞模式下劫拢,會直接返回。在阻塞模式下胖缤,會調(diào)用 gopark 函數(shù)掛起 goroutine尚镰,這個會一直阻塞下去。因為在 channel 是 nil 的情況下哪廓,要想不阻塞狗唉,只有關(guān)閉它,但關(guān)閉一個 nil 的 channel 又會發(fā)生 panic涡真,所以沒有機(jī)會被喚醒了分俯。更詳細(xì)地可以在 closechan 函數(shù)的時候再看。
  2. 和發(fā)送函數(shù)一樣哆料,接下來搞了一個在非阻塞模式下缸剪,不用獲取鎖,快速檢測到失敗并且返回的操作东亦。順帶插一句杏节,我們平時在寫代碼的時候,找到一些邊界條件典阵,快速返回奋渔,能讓代碼邏輯更清晰,因為接下來的正常情況就比較少壮啊,更聚焦了嫉鲸,看代碼的人也更能專注地看核心代碼邏輯了。
// 在非阻塞模式下歹啼,快速檢測到失敗玄渗,不用獲取鎖座菠,快速返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:

  • 非緩沖型,等待發(fā)送列隊里沒有 goroutine 在等待
  • 緩沖型藤树,但 buf 里沒有元素

之后浴滴,又觀察到 closed == 0,即 channel 未關(guān)閉也榄。
因為 channel 不可能被重復(fù)打開巡莹,所以前一個觀測的時候, channel 也是未關(guān)閉的甜紫,因此在這種情況下可以直接宣布接收失敗降宅,快速返回。因為沒被選中囚霸,也沒接收到數(shù)據(jù)腰根,所以返回值為 (false, false)。

接下來的操作拓型,首先會上一把鎖额嘿,粒度比較大。如果 channel 已關(guān)閉劣挫,并且循環(huán)數(shù)組 buf 里沒有元素册养。對應(yīng)非緩沖型關(guān)閉和緩沖型關(guān)閉但 buf 無元素的情況,返回對應(yīng)類型的零值压固,但 received 標(biāo)識是 false球拦,告訴調(diào)用者此 channel 已關(guān)閉,你取出來的值并不是正常由發(fā)送者發(fā)送過來的數(shù)據(jù)帐我。但是如果處于 select 語境下坎炼,這種情況是被選中了的。很多將 channel 用作通知信號的場景就是命中了這里拦键。

接下來谣光,如果有等待發(fā)送的隊列,說明 channel 已經(jīng)滿了芬为,要么是非緩沖型的 channel萄金,要么是緩沖型的 channel,但 buf 滿了媚朦。這兩種情況下都可以正常接收數(shù)據(jù)捡絮。

于是,調(diào)用 recv 函數(shù):

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果是非緩沖型的 channel
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        // 未忽略接收的數(shù)據(jù)
        if ep != nil {
            // 直接拷貝數(shù)據(jù)莲镣,從 sender goroutine -> receiver goroutine
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 緩沖型的 channel,但 buf 已滿涎拉。
        // 將循環(huán)數(shù)組 buf 隊首的元素拷貝到接收數(shù)據(jù)的地址
        // 將發(fā)送者的數(shù)據(jù)入隊瑞侮。實際上這時 revx 和 sendx 值相等
        // 找到接收游標(biāo)
        qp := chanbuf(c, c.recvx)
        // …………
        // 將接收游標(biāo)處的數(shù)據(jù)拷貝給接收者
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }

        // 將發(fā)送者數(shù)據(jù)拷貝到 buf
        typedmemmove(c.elemtype, qp, sg.elem)
        // 更新游標(biāo)值
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx
    }
    sg.elem = nil
    gp := sg.g

    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }

    // 喚醒發(fā)送的 goroutine的圆。需要等到調(diào)度器的光臨
    goready(gp, skip+1)
}

如果是非緩沖型的,就直接從發(fā)送者的棸牖穑拷貝到接收者的棧越妈。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

否則,就是緩沖型 channel钮糖,而 buf 又滿了的情形梅掠。說明發(fā)送游標(biāo)和接收游標(biāo)重合了,因此需要先找到接收游標(biāo):

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

將該處的元素拷貝到接收地址店归。然后將發(fā)送者待發(fā)送的數(shù)據(jù)拷貝到接收游標(biāo)處阎抒。這樣就完成了接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作。接著消痛,分別將發(fā)送游標(biāo)和接收游標(biāo)向前進(jìn)一且叁,如果發(fā)生“環(huán)繞”,再從 0 開始秩伞。

最后逞带,取出 sudog 里的 goroutine,調(diào)用 goready 將其狀態(tài)改成 “runnable”纱新,待發(fā)送者被喚醒展氓,等待調(diào)度器的調(diào)度。

然后脸爱,如果 channel 的 buf 里還有數(shù)據(jù)遇汞,說明可以比較正常地接收。注意阅羹,這里勺疼,即使是在 channel 已經(jīng)關(guān)閉的情況下,也是可以走到這里的捏鱼。這一步比較簡單执庐,正常地將 buf 里接收游標(biāo)處的數(shù)據(jù)拷貝到接收數(shù)據(jù)的地址。

到了最后一步导梆,走到這里來的情形是要阻塞的轨淌。當(dāng)然,如果 block 傳進(jìn)來的值是 false看尼,那就不阻塞递鹉,直接返回就好了。

先構(gòu)造一個 sudog藏斩,接著就是保存各種值了躏结。注意,這里會將接收數(shù)據(jù)的地址存儲到了 elem 字段狰域,當(dāng)被喚醒時媳拴,接收到的數(shù)據(jù)就會保存到這個字段指向的地址黄橘。然后將 sudog 添加到 channel 的 recvq 隊列里。調(diào)用 goparkunlock 函數(shù)將 goroutine 掛起屈溉。

接下來的代碼就是 goroutine 被喚醒后的各種收尾工作了偿荷。

channel操作總結(jié):


注意:關(guān)閉已經(jīng)關(guān)閉的channel也會引發(fā)panic馋辈。


References:
https://cloud.tencent.com/developer/article/1750350
https://golang.design/go-questions/channel
https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/channel.html
https://juejin.cn/post/7037656471210819614

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末硫戈,一起剝皮案震驚了整個濱河市茶敏,隨后出現(xiàn)的幾起案子线梗,更是在濱河造成了極大的恐慌椰于,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件僻造,死亡現(xiàn)場離奇詭異框往,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)闯捎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進(jìn)店門椰弊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瓤鼻,你說我怎么就攤上這事秉版。” “怎么了茬祷?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵清焕,是天一觀的道長。 經(jīng)常有香客問我,道長秸妥,這世上最難降的妖魔是什么借卧? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮筛峭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘陪每。我一直安慰自己影晓,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布檩禾。 她就那樣靜靜地躺著挂签,像睡著了一般。 火紅的嫁衣襯著肌膚如雪盼产。 梳的紋絲不亂的頭發(fā)上饵婆,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機(jī)與錄音戏售,去河邊找鬼侨核。 笑死,一個胖子當(dāng)著我的面吹牛灌灾,可吹牛的內(nèi)容都是我干的搓译。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼锋喜,長吁一口氣:“原來是場噩夢啊……” “哼些己!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起嘿般,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤段标,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后炉奴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體逼庞,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年盆佣,在試婚紗的時候發(fā)現(xiàn)自己被綠了往堡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡共耍,死狀恐怖虑灰,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情痹兜,我是刑警寧澤穆咐,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響对湃,放射性物質(zhì)發(fā)生泄漏崖叫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一拍柒、第九天 我趴在偏房一處隱蔽的房頂上張望心傀。 院中可真熱鬧,春花似錦拆讯、人聲如沸脂男。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽宰翅。三九已至,卻和暖如春爽室,著一層夾襖步出監(jiān)牢的瞬間汁讼,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工阔墩, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留嘿架,地道東北人。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓戈擒,卻偏偏與公主長得像眶明,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子筐高,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,440評論 2 348