golang channel源碼分析

channel是golang中特有的一種數(shù)據(jù)結(jié)構(gòu),通常與goroutine一起使用邑贴,下面我們就介紹一下這種數(shù)據(jù)結(jié)構(gòu)。

channel數(shù)據(jù)結(jié)構(gòu)

channel最重要的一個(gè)結(jié)構(gòu)體就是hchan,我們創(chuàng)建一個(gè)channel的時(shí)候傀广,實(shí)際上是創(chuàng)建了一個(gè)下面結(jié)構(gòu)體的實(shí)例。

hchan結(jié)構(gòu)體

// src/runtime/chan.go

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

字段說明

  • qcount 當(dāng)前channel中的元素?cái)?shù)量
  • dataqsiz 環(huán)形隊(duì)列的大小
  • buf 指向dataqsize的數(shù)組指針彩届,只有緩沖chan有效
  • closed 當(dāng)前channel關(guān)閉狀態(tài)
  • elemsize 存儲(chǔ)元素的大小
  • elemtype 存儲(chǔ)元素的數(shù)據(jù)類型
  • sendx 發(fā)送操作處理到的索引位置伪冰,最大值為數(shù)組buf的最大下標(biāo)值
  • recvx 接收操作處理到的索引位置,最大值為數(shù)組buf的最大下標(biāo)值
  • recvq 接收隊(duì)列樟蠕,雙向鏈表贮聂,阻塞元素
  • sendq 發(fā)送列隊(duì),雙向鏈表寨辩,阻塞元素
  • lock 鎖,吓懈,用來保護(hù)sudog里的所的字段
image

hchan struct

其中elemsizeelemtype 表示存儲(chǔ)數(shù)據(jù)的大小和類型;sendxrecvx是指向底層數(shù)據(jù)的索引位置靡狞,表示當(dāng)前處理的進(jìn)度位置耻警;recvqsendq 是一個(gè)由雙向鏈表實(shí)現(xiàn)的隊(duì)列,它存儲(chǔ)的內(nèi)容是由于隊(duì)列dataqsize過小甸怕,而阻塞的數(shù)據(jù)甘穿。

每次進(jìn)行發(fā)送數(shù)據(jù)和讀取數(shù)據(jù)時(shí)都需要加鎖。

waitq結(jié)構(gòu)體

// src/runtime/chan.go

type waitq struct {
    first *sudog
    last  *sudog
}

sudog結(jié)構(gòu)體

// src/runtime/runtime2.go

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ? synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g *g

    next *sudog
    prev *sudog
    elem unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool

    parent   *sudog // semaRoot binary tree
    waitlink *sudog // g.waiting list or semaRoot
    waittail *sudog // semaRoot
    c        *hchan // channel
}

這里 sudog 實(shí)際上是對(duì) goroutine 的一個(gè)封裝梢杭,一個(gè)sudog 就是一個(gè)goroutine温兼,如在channal上發(fā)送和接收。

sudogs 是通過一個(gè)特殊的池來分配的武契,通過 acquireSudog()releaseSudog()進(jìn)行獲取和釋放募判。

sudog里的字段是由 hchan.lock 鎖來進(jìn)行保護(hù)荡含。

channel 整體結(jié)構(gòu)圖

image

<figcaption>hchan 結(jié)構(gòu)圖(來源:互聯(lián)網(wǎng)技術(shù)窩)</figcaption>

創(chuàng)建

// 無緩沖通道
ch1 := make(chan int)
// 有緩沖通道
ch2 := make(chan int, 10)

通過編譯可以發(fā)現(xiàn)channel的創(chuàng)建是由[makechan()](https://github.com/golang/go/blob/go1.15.6/src/runtime/chan.go#L71-L118)函數(shù)來完成的。源碼

// src/runtime/chan.go

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "n")
    }
    return c
}

函數(shù)返回的是一個(gè)指針類型兰伤,因此我們可以在函數(shù)中通過參數(shù)直接傳遞内颗,不需要再轉(zhuǎn)為指針使傳遞。

步驟

  1. 數(shù)據(jù)合法性檢查敦腔,包括發(fā)送數(shù)據(jù)的類型和大小
  2. 根據(jù)不同場(chǎng)景分配內(nèi)存均澳,主要針對(duì)buf字段
    a. 內(nèi)存大小為0,注意這時(shí)c.buf 的值為c.raceaddr()
    b. 元素不包含指針符衔,一次性分配一段內(nèi)存地址
    c. 元素包含指針找前,分配內(nèi)存
  3. 初始化其它字段

第一個(gè)參數(shù) *chantype 結(jié)構(gòu)定義

// src/runtime/type.go

type chantype struct {
    typ  _type
    elem *_type
    dir  uintptr
}

實(shí)際上創(chuàng)建一個(gè)channel, 只是對(duì)一個(gè)hchan結(jié)構(gòu)體進(jìn)行了一些初始化操作,并返回其指針判族。因此我們?cè)诤瘮?shù)傳遞時(shí)躺盛,不需要傳遞指針,直接使用就可以了形帮,因?yàn)樗旧砭褪且粋€(gè)指針的類型槽惫。

注意:對(duì)于chan內(nèi)存是在heap上分配的。

發(fā)送數(shù)據(jù)

對(duì)于channel的寫操作是由 chansend() 函數(shù)來實(shí)現(xiàn)的辩撑。

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    ...
}

在chan為nil的情況下, 如果是非阻塞則直接返回界斜,否則panic。

對(duì)于分送數(shù)據(jù)chan有三種情況合冀,分別是直接發(fā)送各薇,緩存區(qū)發(fā)送阻塞發(fā)送,其中阻塞發(fā)送涉及到GMP 的調(diào)度君躺,理解起來有些吃力峭判。

在發(fā)送數(shù)據(jù)前需要進(jìn)行加鎖操作,發(fā)送完再解鎖棕叫,保證原子性操作林螃。

直接發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 直接發(fā)送
    // 如果接收隊(duì)列中有接收者,則直接將數(shù)據(jù)發(fā)給接收者谍珊,重點(diǎn)在send()函數(shù),并在函數(shù)里進(jìn)行解鎖
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    ......
}

如果接收隊(duì)列中有接收者治宣,則優(yōu)化從接收者從隊(duì)列中取出來sg(sg := c.recvq.dequeue()),然后再通過調(diào)用 send() 函數(shù)將數(shù)據(jù)發(fā)送給接收者即可砌滞。

image

<figcaption>channel send</figcaption>

在send()函數(shù)里,會(huì)執(zhí)行一個(gè)回調(diào)函數(shù)主要用來進(jìn)行解鎖c.lock侮邀。真正的發(fā)送操作是函數(shù) sendDirect(),通過memmove(dst, src, t.size) 將數(shù)據(jù)復(fù)制過去贝润。

緩沖區(qū)發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 緩沖區(qū)發(fā)送
    // 接收者隊(duì)列中沒有接收者goroutine
    // 當(dāng)前channel中的元素<隊(duì)列的大小绊茧,有緩沖buffer未滿的情況
    // 將數(shù)據(jù)存放在sendx在buf數(shù)組中的索引位置,然后再將sendx索引+1
    // 由于是一個(gè)循環(huán)數(shù)組打掘,所以如果達(dá)到了dataqsize华畏,則從0開始鹏秋,同時(shí)個(gè)數(shù)+1
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }   

    ......
}

如果當(dāng)前recvq 隊(duì)列里沒有處于等待執(zhí)行的sudog的話,則需要將數(shù)據(jù)發(fā)送到緩沖隊(duì)列中(如果當(dāng)前隊(duì)列為緩沖chan)亡笑。

假設(shè)當(dāng)前buffer大小為6(dataqsiz=6)侣夷,數(shù)據(jù)個(gè)數(shù)為0(qcount=0),這里寫入6個(gè)數(shù)據(jù)仑乌,如下圖百拓。

image

<figcaption>channel send</figcaption>

如果當(dāng)前緩沖區(qū)的元素?cái)?shù)量<隊(duì)列的大小,說明緩沖區(qū)還沒有滿晰甚,還可以繼續(xù)裝載數(shù)據(jù)衙传。

這時(shí)第一步先計(jì)算出 s.sendx 索引位置的內(nèi)存地址,然后調(diào)用 typememmove() 函數(shù)將qp復(fù)制到內(nèi)存地址厕九,再將s.sendx索引值+1蓖捶,同時(shí)c.qcount++

當(dāng) sendx = dataqsiz 的時(shí)候,說明已到了數(shù)組最后一個(gè)元素,下次存儲(chǔ)數(shù)據(jù)的話搓劫,則需要重新從0開始了,所以需要重置為0亭引。

buf是一個(gè)由數(shù)組組成的隊(duì)列,滿足隊(duì)列的FIFO的機(jī)制皮获,最新存儲(chǔ)的數(shù)據(jù)也先消費(fèi),最多可以存儲(chǔ) dataqsiz 個(gè)數(shù)量纹冤。超出這個(gè)數(shù)據(jù)量就需要使用第三種 阻塞發(fā)送 方式了洒宝。

sendx 始終保存的是下次存儲(chǔ)數(shù)據(jù)的數(shù)組索引位置,每次使用完記得+1 萌京。每次存儲(chǔ)以前都需要判斷當(dāng)前buffer是否有空間可用 c.qcount < c.dataqsiz 雁歌。

總結(jié)

  • q.sendx 最大值為 c.dataqsiz -1,即數(shù)組的最大索引值知残。
  • q.count 是當(dāng)前chan 存儲(chǔ)的元素個(gè)數(shù)靠瞎,有可能 > c.dataqsiz

阻塞發(fā)送

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......

    // 阻塞發(fā)送
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // Ensure the value being sent is kept alive until the
    // receiver copies it out. The sudog has a pointer to the
    // stack object, but sudogs aren't considered as roots of the
    // stack tracer.
    KeepAlive(ep)

    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true 

    ......
}

如果當(dāng)buff也寫滿的話,再send數(shù)據(jù)的話求妹,則需要進(jìn)行阻塞發(fā)送了乏盐。

channel send

假如我們有一個(gè)緩沖chan,但緩沖大小已經(jīng)使用完制恍,再次發(fā)送數(shù)據(jù)的話父能,則需要進(jìn)入sendq隊(duì)列了(將sudog綁定到一個(gè)goroutine,并放在sendq净神,等待讀群瘟摺)

對(duì)于阻塞的情況溉委,理解起來有些吃力,因?yàn)樯婕暗紾MP的關(guān)系和調(diào)度爱榕。

  1. 調(diào)用 getg() 函數(shù)獲取當(dāng)前運(yùn)行的goroutine
  2. 調(diào)用 acquireSudog() 函數(shù)獲取一個(gè)sudog瓣喊,并進(jìn)行數(shù)據(jù)綁定
  3. 將mysg 添加到發(fā)送隊(duì)列sendq,并設(shè)置為gp.waiting
  4. 更改goroutine狀態(tài)
  5. 設(shè)置goroutine為等待喚醒狀態(tài)黔酥,調(diào)用 atomic.Store8(&gp.parkingOnChan, 1)函數(shù)藻三?
  6. 通過keepAlive()函數(shù)可以保證發(fā)送的值一直有效,直到被接收者取走
  7. 進(jìn)行清理工作
  8. 釋放 sudog 結(jié)構(gòu)體

總結(jié)

讀取數(shù)據(jù)

對(duì)于channel的讀取方式:

v <- ch
v, ok <- ch

其中 v<-ch 對(duì)應(yīng)的是 runtime.chanrecv1()趴酣, v, ok <-ch 對(duì)應(yīng)的是`runtime.chanrecv2()。但這兩個(gè)函數(shù)最終調(diào)用的還是同一個(gè)函數(shù)坑夯,即 chanrecv()岖寞。

我們先看一下官方文檔對(duì)這個(gè)函數(shù)的說明

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {}
  • chanrecv 用來從chan 中接收數(shù)據(jù),并將接收的數(shù)據(jù)寫入到ep
  • 如果ep為 nil 的話柜蜈,則接收的數(shù)據(jù)將被忽略
  • 如果非阻塞的且沒有可接收的數(shù)據(jù)將返回 (false ,false)
  • 如果chan已關(guān)閉仗谆,零值 ep 和返回值將是true, false,否則使用一個(gè)元素代替ep并返回 (true, true)
  • 一個(gè)非nil的 ep, 必須指向heap或者調(diào)用stack
// src/runtime/chan.go

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 如果c為nil,表示非法操作淑履,則直接gopark(),表示出讓當(dāng)前GMP中的P的使用權(quán)隶垮,允許其它G使用
    if c == nil {
        // 如果非阻塞的話,直接返回秘噪;如果是阻塞的話狸吞,直接panic
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    ...

    // 如果chan已關(guān)閉且元素個(gè)數(shù)為0
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            // 設(shè)置內(nèi)存內(nèi)容為類型 c.elemtype 的零值
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

}

如果當(dāng)前讀取的 chan 為nil的話,且非阻塞的情況指煎,則會(huì)產(chǎn)生死鎖蹋偏,最終提示

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:

否則返回零值。

同時(shí)出讓自己占用的P至壤,允許其它goroutine搶占使用威始。

如果讀取的chan已關(guān)閉,則讀取出來的值為零值(函數(shù)說明第四條)像街。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    // 在沒有獲取鎖的情況下黎棠,檢查非阻塞操作失敗
    if !block && empty(c) {
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.

        // 如果當(dāng)前chan未關(guān)閉
        if atomic.Load(&c.closed) == 0 {
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    ...

}

這段代碼主要是對(duì)重排讀的情況,進(jìn)行了雙重檢測(cè)镰绎,暫是未明白code中考慮的情況脓斩,改天再消化消化。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖跟狱,下面才是真正要讀取的邏輯
    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    ...
}

讀取之前先加鎖俭厚。

對(duì)chan的讀取與發(fā)送一樣,同樣有三種方式驶臊,為直接讀取挪挤、緩沖區(qū)讀取和阻塞讀取叼丑。

直接讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 直接讀取 
    // 從c.sendq隊(duì)列中取sudog, 將數(shù)據(jù)復(fù)制到sg
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
}

獲取一個(gè)待發(fā)送者,如果buffer大小為0扛门,則直接從發(fā)送者接收數(shù)據(jù)鸠信。否則從隊(duì)列頭部接收,并將發(fā)送者發(fā)送的數(shù)據(jù)放在隊(duì)列尾部论寨。

chan recv

從c.sendq隊(duì)列里讀取一個(gè) *sudog星立,通過調(diào)用 recv() 函數(shù),將數(shù)據(jù)從發(fā)送者復(fù)制到ep中葬凳,并返回true,true绰垂,表示讀取成功。真正讀取函數(shù)為 recvDirect()火焰。

緩沖區(qū)讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...

    // 如果c.qcount>0劲装,說明緩沖區(qū)有元素可直接讀取
    if c.qcount > 0 {
        // Receive directly from queue
        // 直接從隊(duì)列中讀取
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
}

如果c.qcount > 0,則說明緩沖區(qū)里有內(nèi)容可以讀取昌简。則

直接獲取 c.recvx 數(shù)組索引位置的內(nèi)存地址占业,則

  1. r.recvx 索引地址的值讀取出來復(fù)制給 ep,
  2. 然后更新接收數(shù)組索引c.recvx++, 如果>數(shù)組索引最大索引值 纯赎,重置為0
  3. 減少元素個(gè)數(shù)
  4. 釋放鎖 c.qcount--
  5. 最后unlock返回谦疾。
image

<figcaption>chan recv</figcaption>

阻塞讀取

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......

    // c.sendq沒有sender,buffer里也是空的犬金,直接阻塞讀取
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
  1. 通過getg()獲取一個(gè)goroutine
  2. 獲取一個(gè)sudog結(jié)構(gòu)體
  3. 綁定兩者關(guān)系
  4. 加入 c.recvq 隊(duì)列
  5. 設(shè)置goroutine為等待喚醒狀態(tài)
  6. 清理狀態(tài)
chan recv

關(guān)閉chan

關(guān)閉chan語(yǔ)句

close(ch)

對(duì)于已關(guān)閉的chan念恍,是不允許再次關(guān)閉的,否則會(huì)產(chǎn)生panic晚顷。對(duì)應(yīng)的函數(shù)為 runtime.closechan() 樊诺。

// src/runtime/chan.go

func closechan(c *hchan) {
    // 如果chan未初始化,觸發(fā)panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    // 關(guān)閉已關(guān)閉的chan音同,觸發(fā)panicc
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    ......

}

對(duì)于一個(gè)未初始化的chan,或者已關(guān)閉的chan秃嗜,如果再次關(guān)閉則會(huì)觸發(fā)panic权均。

func closechan(c *hchan) {
    ......
    // 設(shè)置chan關(guān)閉狀態(tài)
    c.closed = 1

    // 聲明一個(gè)結(jié)構(gòu)體鏈表gList,主要用來調(diào)度使用
    var glist gList

    // release all readers
    // 釋放所有readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }

        // 設(shè)置元素為nil
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // release all writers (they will panic)
    // 釋放所有writers,會(huì)引起panic,見下面說明
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 設(shè)置元素為nil
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

    // 釋放鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 調(diào)度所有g(shù)
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        // 喚醒goroutine
        goready(gp, 3)
    }
}
  1. 聲明一個(gè)gList 鏈表結(jié)構(gòu)體
  2. 將接收隊(duì)列 c.recvq 中的所有元素添加到gList 中锅锨,并將原來的值設(shè)置為
  3. 將發(fā)送隊(duì)列 c.sendq 中的所有元素添加到 gList 中叽赊,并將原來的值設(shè)置為
  4. 將所有的阻塞goroutine通過函數(shù)goready() 進(jìn)行調(diào)度

文章里提到在對(duì)c.sendq 處理的時(shí)候可能會(huì)觸發(fā)panic。這是因?yàn)殛P(guān)閉chan后必搞,執(zhí)行了 goready() 對(duì)原來sendq里的sudogs 進(jìn)行了進(jìn)行了重新調(diào)度必指,這時(shí)候發(fā)現(xiàn)chan已經(jīng)關(guān)閉了,所以會(huì)panic恕洲。那么又是如何調(diào)度的呢塔橡?

package main

import (
    "fmt"
    "time"
)

var ch chan int

func f() {
}

func main() {
    ch := make(chan int, 10)
    // buffer大小為10,這里發(fā)送11個(gè)梅割,使最后一個(gè)進(jìn)入到c.sendq里面
    for i := 0; i < 11; i++ { // i < 10 則正常
        go func(v int) {
            ch <- v
        }(i)
    }
    time.Sleep(time.Second)
    fmt.Println("發(fā)送完畢")
    // 關(guān)閉chan,將對(duì)sendq里的g進(jìn)行喚醒葛家,喚醒后發(fā)現(xiàn)chan關(guān)閉狀態(tài)户辞,直接panic
    close(ch)
    for v := range ch {
        fmt.Println(v)
    }
    time.Sleep(time.Second)
}

有一條廣泛流傳的關(guān)閉 channel 的原則:

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

不要從一個(gè) receiver 側(cè)關(guān)閉 channel,也不要在有多個(gè) sender 時(shí)癞谒,關(guān)閉 channel底燎。對(duì)于只有一個(gè)sender的話,直接在sender端關(guān)閉就可以弹砚。但對(duì)于多個(gè)sender的話双仍,則需要通過一個(gè)信號(hào)量進(jìn)行關(guān)閉,參考這里桌吃。

總結(jié)

close 操作會(huì)觸發(fā)goroutine的調(diào)度行為朱沃。

總結(jié)

  1. 在發(fā)送和讀取 chan的時(shí)候,如果chan為nil的話读存,這時(shí)候就根據(jù)是否阻塞進(jìn)行判斷是否會(huì)發(fā)生panic为流。如果阻塞狀態(tài)的話,則會(huì)發(fā)生panic让簿,否則會(huì)直接返回
  2. 對(duì)chan 發(fā)送或接收數(shù)據(jù)的時(shí)候要保證已初始化狀態(tài)
  3. 對(duì)于已關(guān)閉的chan再次關(guān)閉會(huì)觸發(fā)panic
  4. 對(duì)于發(fā)送和讀取數(shù)據(jù)都有三種處理情況敬察,分別是直接讀寫,緩存區(qū)讀寫和阻塞讀寫
  5. 發(fā)送和接收數(shù)據(jù)的本質(zhì)上是對(duì)值的復(fù)制操作尔当。All transfer of value on the go channels happens with the copy of value.
  6. close(ch)會(huì)觸發(fā)goroutine 的調(diào)度行為
  7. 內(nèi)部使用 sudogs對(duì)goroutine進(jìn)行了一次封裝莲祸。
  8. 如果buffer中的元素?zé)o法保證消費(fèi)完的話,則會(huì)產(chǎn)生內(nèi)存泄漏的危險(xiǎn)椭迎,這時(shí)gc是無法對(duì)這些元素時(shí)間清理的锐帜,過多的 chan就會(huì)占用大量的資源
  9. 對(duì)于chan的分配的內(nèi)存是在哪里,heap還是stack?

參考

本文如有錯(cuò)誤畜号,歡迎大家在下方留言指出缴阎。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市简软,隨后出現(xiàn)的幾起案子蛮拔,更是在濱河造成了極大的恐慌,老刑警劉巖痹升,帶你破解...
    沈念sama閱讀 211,423評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件建炫,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡疼蛾,警方通過查閱死者的電腦和手機(jī)肛跌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,147評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人衍慎,你說我怎么就攤上這事转唉。” “怎么了西饵?”我有些...
    開封第一講書人閱讀 157,019評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵酝掩,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我眷柔,道長(zhǎng)期虾,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,443評(píng)論 1 283
  • 正文 為了忘掉前任驯嘱,我火速辦了婚禮镶苞,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鞠评。我一直安慰自己茂蚓,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,535評(píng)論 6 385
  • 文/花漫 我一把揭開白布剃幌。 她就那樣靜靜地躺著聋涨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪负乡。 梳的紋絲不亂的頭發(fā)上牍白,一...
    開封第一講書人閱讀 49,798評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音抖棘,去河邊找鬼茂腥。 笑死,一個(gè)胖子當(dāng)著我的面吹牛切省,可吹牛的內(nèi)容都是我干的最岗。 我是一名探鬼主播,決...
    沈念sama閱讀 38,941評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼朝捆,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼般渡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起芙盘,我...
    開封第一講書人閱讀 37,704評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤诊杆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后何陆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,152評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡豹储,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,494評(píng)論 2 327
  • 正文 我和宋清朗相戀三年贷盲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,629評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡巩剖,死狀恐怖铝穷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情佳魔,我是刑警寧澤曙聂,帶...
    沈念sama閱讀 34,295評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站鞠鲜,受9級(jí)特大地震影響宁脊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜贤姆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,901評(píng)論 3 313
  • 文/蒙蒙 一榆苞、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧霞捡,春花似錦坐漏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至砰碴,卻和暖如春躏筏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背衣式。 一陣腳步聲響...
    開封第一講書人閱讀 31,978評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工寸士, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人碴卧。 一個(gè)月前我還...
    沈念sama閱讀 46,333評(píng)論 2 360
  • 正文 我出身青樓弱卡,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親住册。 傳聞我的和親對(duì)象是個(gè)殘疾皇子婶博,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,499評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 基礎(chǔ)用法 channel chan T是雙向channel類型,編譯器允許對(duì)雙向channel同時(shí)進(jìn)行發(fā)送和接收荧飞。...
    杰克慢閱讀 290評(píng)論 0 0
  • channel 在 golang 中是一個(gè)非常重要的特性凡人,它為我們提供了一個(gè)并發(fā)模型。對(duì)比鎖叹阔,通過 chan 在多...
    安佳瑋閱讀 710評(píng)論 1 4
  • 簡(jiǎn)書前話: 由于簡(jiǎn)書不支持 mermaid 流程圖挠轴,所以想看完整的版本,可以到我的個(gè)人博客 中查看 01.chan...
    Abson在簡(jiǎn)書閱讀 958評(píng)論 0 0
  • 簡(jiǎn)介 熟悉Go的人都知道耳幢,它提倡著不要通過共享內(nèi)存來通訊岸晦,而要通過通訊來共享內(nèi)存欧啤。Go提供了一種獨(dú)特的并發(fā)同步技術(shù)...
    marsjhe閱讀 2,631評(píng)論 0 2
  • Golang channel 作為Go的核心的數(shù)據(jù)結(jié)構(gòu)和Goroutine之間的通信,是支撐Go語(yǔ)言高并發(fā)的關(guān)鍵 ...
    LegendGo閱讀 421評(píng)論 0 1