go channel詳解之源碼分析

作為golang并發(fā)編程思想的重要組成啤握,channel(通道)非常重要垦江,和goroutine(go協(xié)程)一起使用侄非,用來(lái)實(shí)現(xiàn)go的CSP(Communicating Sequential Processes)并發(fā)模型片拍。

Do not communicate by sharing memory; instead, share memory by communicating
不要以共享內(nèi)存的方式來(lái)通信浪读,相反昔榴,要通過(guò)通信來(lái)共享內(nèi)存。

因?yàn)閏hannel的重要性碘橘,有必要對(duì)其原理和源碼進(jìn)行學(xué)習(xí)互订,在參考了網(wǎng)絡(luò)上各種大牛的分享后,將其作為筆記記錄下來(lái)痘拆,如有不足之處仰禽,還請(qǐng)指正。
源碼路徑: go1.11/src/runtime/chan.go

channel結(jié)構(gòu)體

type hchan struct {
    // 通道中實(shí)際的元素個(gè)數(shù)纺蛆,len(ch)的返回值
    qcount uint //total data in the queue
    // 通道的容量吐葵,cap(ch)的返回值,qcount <= dataqsiz
    // ch := make(chan T, x)中的 x桥氏,dataqsiz為0表示非緩沖通道
    dataqsiz uint // size of the circular queue
    //存儲(chǔ)通道元素的緩沖隊(duì)列地址温峭,使用環(huán)形數(shù)組實(shí)現(xiàn)
    buf unsafe.Pointer // points to an array of dataqsiz elements
    //通道內(nèi)單個(gè)元素的大小,單位為字節(jié)
    elemsize uint16
    //通道是否關(guān)閉的標(biāo)志位
    closed uint32
    //通道元素的類型字支,make(chan T, x)中的T凤藏,被go編譯器抽象為_(kāi)type結(jié)構(gòu)體,記錄這該類型的全部屬性
    elemtype *_type // element type
    //待發(fā)送元素在緩沖隊(duì)列中的索引
    sendx uint // send index
    //待接收元素在緩沖隊(duì)列中的索引
    recvx uint // receive index
    //接收goroutine等待隊(duì)列堕伪,當(dāng)通道為空時(shí)揖庄,用來(lái)存放阻塞的接收goroutine
    recvq waitq // list of recv waiters
    //發(fā)送goroutine等待隊(duì)列,當(dāng)通道滿時(shí)欠雌,用來(lái)存放阻塞發(fā)送goroutine
    sendq waitq // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    //操作通道時(shí)使用的互斥鎖
    lock mutex
}

recvq 和 sendq 對(duì)應(yīng)的結(jié)構(gòu) waitq 是一個(gè)鏈表抠艾,包含一個(gè)頭結(jié)點(diǎn)和一個(gè)尾結(jié)點(diǎn),隊(duì)列中的每個(gè)成員是一個(gè)sudog結(jié)構(gòu)體

type waitq struct {
    first *sudog
    last  *sudog
}
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ? synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
// 當(dāng)goroutine遇到阻塞桨昙,或需要等待的場(chǎng)景時(shí)检号,會(huì)被打包成sudog這樣一個(gè)結(jié)構(gòu)(封裝了該goroutine指針)
// 之所以需要 sudog是由于goroutine和同步對(duì)象的關(guān)系是多對(duì)多的
// 一個(gè)goroutine可以在多個(gè)等待隊(duì)列中腌歉,因此一個(gè)goroutine可能被打包為多個(gè)sudog
// 許多goroutine可能在同一個(gè)同步對(duì)象上等待,因此一個(gè)對(duì)象可能有多個(gè)sudog
// sudog是從一個(gè)特殊的池中分配的齐苛。使用AcquireDog和ReleaseSudog分配和釋放它們
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.
    // 以下的這些字段都是被該goroutine所在的通道中的hchan.lock來(lái)保護(hù)的
    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    //sudog雙向鏈表對(duì)應(yīng)的指針
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

make(chan T, x)

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    //檢查T類型大小是否超過(guò)限制翘盖,比如傳入一個(gè)大于64k大數(shù)組,會(huì)報(bào)錯(cuò)
    //64位操作系統(tǒng)下
    //ch := make(chan [8192]int64, 1) 64k = 65536 = 8(int64占8字節(jié)) * 8192
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    //判斷對(duì)齊限制
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    //這里做了兩個(gè)判斷:
    //判斷緩沖通道的容量是否為負(fù)
    //判斷當(dāng)緩沖通道滿時(shí)凹蜂,隊(duì)列大小是否超出系統(tǒng)最大內(nèi)存
    if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
    case size == 0 || elem.size == 0:
        //當(dāng)創(chuàng)建的是非緩沖通道
        //或者緩沖通道的元素類型大小為0(如 struct{}{})
        //只需要申請(qǐng)hchan的內(nèi)存而不需要申請(qǐng)緩沖隊(duì)列的內(nèi)存
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        //由于申請(qǐng)的內(nèi)存只給hchan使用
        //c.buf直接指向申請(qǐng)的hchan的內(nèi)存地址
        c.buf = unsafe.Pointer(c)
    case elem.kind&kindNoPointers != 0:
        //當(dāng)創(chuàng)建的是緩沖通道馍驯,并且通道元素類型不是指針類型的
        //需要申請(qǐng)hchan的內(nèi)存和緩沖隊(duì)列的內(nèi)存
        //計(jì)算公式為:hchan內(nèi)存 + 緩沖隊(duì)列元素個(gè)數(shù) * 元素大小
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        //由于申請(qǐng)的內(nèi)存是給hchan和緩沖隊(duì)列一起用的
        //指向內(nèi)存緩沖中,hchan的位置
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        //當(dāng)創(chuàng)建的是緩沖通道玛痊,并且通道元素類型是指針類型的
        //調(diào)用了兩次mallocgc來(lái)申請(qǐng)內(nèi)存汰瘫,hchan和緩沖隊(duì)列不共用內(nèi)存(內(nèi)存空間是不連續(xù)的)
        c = new(hchan)
        c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
    }

    //記錄單個(gè)元素的大小,元素類型及通道容量
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}
從makechan代碼中擂煞,可以以下總結(jié)幾點(diǎn)

1混弥、當(dāng)創(chuàng)建的是非緩沖通道或者緩沖通道的元素類型大小為0時(shí),是不需要申請(qǐng)緩沖隊(duì)列的內(nèi)存的

非緩沖通道或元素大小為0.jpg

2对省、當(dāng)創(chuàng)建的是緩沖通道蝗拿,并且通道元素類型不是指針類型的,會(huì)向系統(tǒng)申請(qǐng)一塊連續(xù)內(nèi)存蒿涎,用來(lái)存放hchan結(jié)構(gòu)體和緩沖隊(duì)列
緩沖通道并且類型不是指針.jpg

3哀托、當(dāng)創(chuàng)建的是緩沖通道,并且通道元素類型是指針類型的劳秋,會(huì)向系統(tǒng)申請(qǐng)兩塊內(nèi)存仓手,用來(lái)存放hchan結(jié)構(gòu)體和緩沖隊(duì)列
緩沖通道且類型是指針.jpg

4、當(dāng)創(chuàng)建緩沖通道時(shí)玻淑,如果通道元素沒(méi)有實(shí)際意義(如信號(hào)的傳遞)時(shí)嗽冒,可以用 make(chan struct{}, n),因?yàn)?struct{} 類型的大小為0岁忘,創(chuàng)建通道時(shí)辛慰,會(huì)走第一個(gè)case区匠, 不會(huì)為緩沖隊(duì)列分配內(nèi)存

向通道發(fā)送數(shù)據(jù) ch <- x

// entry point for c <- x from compiled code
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        //參數(shù)block是用來(lái)指定通道是否阻塞的
        if !block {
            return false
        }
        // gopark函數(shù)將當(dāng)前goroutine置于等待狀態(tài)并通過(guò)unlockf喚醒
        // 但是傳入的unlockf為nil(第一個(gè)參數(shù))
        // 所以干像,當(dāng)通道為nil時(shí),向其發(fā)送數(shù)據(jù)驰弄,會(huì)永久阻塞
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation.
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }
    //在數(shù)據(jù)發(fā)送到通道前麻汰,先獲取互斥鎖,保證線程安全
    lock(&c.lock)

    //向已經(jīng)關(guān)閉的通道發(fā)送數(shù)據(jù)戚篙,會(huì)panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        // 接收goroutine的等待隊(duì)列中五鲫,有等待著的goroutine
        // 說(shuō)明通道為非緩沖通道或者緩沖通道的緩沖隊(duì)列為空?
        // 取出接收隊(duì)列中排在最前邊的goroutine
        // 然后不經(jīng)過(guò)通道的緩沖區(qū),將發(fā)送的數(shù)據(jù)直接拷貝給這個(gè)goroutine
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    //接收goroutine隊(duì)列為空岔擂,緩沖通道的元素個(gè)數(shù)小于通道容量
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        // 獲取指向緩沖通道中第i個(gè)槽的指針位喂,后邊將數(shù)據(jù)拷貝到此指針對(duì)應(yīng)的空間內(nèi)
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        //將發(fā)送goroutine中需要發(fā)送的數(shù)據(jù)拷貝到緩沖通道中
        typedmemmove(c.elemtype, qp, ep)
        //發(fā)送index + 1
        c.sendx++
        //如果緩沖通道元素?cái)?shù)量達(dá)到了通道容量浪耘,就將發(fā)送index改為0,構(gòu)造環(huán)形數(shù)組
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }

    // Block on the channel. Some receiver will complete our operation for us.
    //如果緩沖通道元素?cái)?shù)量達(dá)到了通道容量
    //獲取這個(gè)發(fā)送gouroutine指針
    gp := getg()
    //新建一個(gè)sudog結(jié)構(gòu)
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    //設(shè)置sudog.elem=發(fā)送goroutine中需要發(fā)送的數(shù)據(jù)的地址
    mysg.elem = ep
    mysg.waitlink = nil
    //設(shè)置sudog.g=發(fā)送gouroutine指針
    mysg.g = gp
    mysg.isSelect = false
    //設(shè)置sudog.c=當(dāng)前通道
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    //將sudog結(jié)構(gòu)放到通道的sendq隊(duì)列中
    c.sendq.enqueue(mysg)
    //goparkunlock->用于協(xié)程切換的gopark函數(shù)->mcall(park_m)
    //mcall中會(huì)將此goroutine當(dāng)前的狀態(tài)進(jìn)行保存塑崖,在調(diào)度是恢復(fù)狀態(tài)
    //park_m中邏輯(以后分析goroutine的時(shí)候回詳細(xì)分析七冲,現(xiàn)在先粗略分析下)
    //1、將此發(fā)送gouroutine休眠(狀態(tài)由_Grunning變?yōu)開(kāi)Gwaiting)规婆,等待被喚醒
    //2澜躺、解除M和此gouroutine之間的關(guān)聯(lián)
    //3、調(diào)用schedule調(diào)度函數(shù)抒蚜,讓可以被執(zhí)行的gouroutine放到M上
    //4掘鄙、由于此發(fā)送gouroutine休眠,阻塞
    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

    // someone woke us up.
    //發(fā)送gouroutine被喚醒后執(zhí)行的代碼
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        //喚醒后發(fā)現(xiàn)通道被關(guān)嗡髓,直接panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
// send處理在通道為非緩沖通道或者緩沖通道的緩沖隊(duì)列為空?
// 直接將數(shù)據(jù)從發(fā)送goroutine操漠,復(fù)制到接收goroutine,而不經(jīng)過(guò)緩沖隊(duì)列
// 接收goroutine接到數(shù)據(jù)后器贩,調(diào)用goready颅夺,喚醒該goroutine,放入P的本地運(yùn)行隊(duì)列蛹稍,并和M對(duì)接
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if raceenabled {
        if c.dataqsiz == 0 {
            racesync(c, sg)
        } else {
            // Pretend we go through the buffer, even though
            // we copy directly. Note that we need to increment
            // the head/tail locations only when raceenabled.
            qp := chanbuf(c, c.recvx)
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
    }
    if sg.elem != nil {
        //直接將數(shù)據(jù)復(fù)制到接收goroutine
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    //喚醒接收goroutine
    goready(gp, skip+1)
}

原文地址 http://www.reibang.com/p/b9a76325ccc5

從通道接收數(shù)據(jù) <- ch

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
// 從通道接收數(shù)據(jù)吧黄,并將數(shù)據(jù)寫入ep參數(shù)
// ep參數(shù)可能為nil,這樣的話接收的數(shù)據(jù)將被忽略(_, ok := <-ch唆姐,ep 為_(kāi))
// 當(dāng)ep不為nil拗慨,通道關(guān)閉,并且通道內(nèi)無(wú)數(shù)據(jù)時(shí)奉芦,ep會(huì)被賦值為對(duì)應(yīng)類型的零值
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    if c == nil {
        if !block {
            return
        }
        //同發(fā)送一樣赵抢,如果通道為nil,則會(huì)永久阻塞
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not ready for receiving, we observe that the
    // channel is not closed. Each of these observations is a single word-sized read
    // (first c.sendq.first or c.qcount, and second c.closed).
    // Because a channel cannot be reopened, the later observation of the channel
    // being not closed implies that it was also not closed at the moment of the
    // first observation. We behave as if we observed the channel at that moment
    // and report that the receive cannot proceed.
    //
    // The order of operations is important here: reversing the operations can lead to
    // incorrect behavior when racing with a close.
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            //當(dāng)ep不為nil声功,通道關(guān)閉烦却,并且通道內(nèi)無(wú)數(shù)據(jù)時(shí),ep會(huì)被賦值為對(duì)應(yīng)類型的零值
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).

        // 發(fā)送goroutine的等待隊(duì)列中先巴,有等待著的goroutine
        // 說(shuō)明通道為非緩沖通道或者緩沖通道的緩沖隊(duì)列已經(jīng)滿了
        // 當(dāng)通道為非緩沖通道時(shí)
        //   recv邏輯和send一樣其爵,取出發(fā)送隊(duì)列中排在最前邊的goroutine
        //   然后不經(jīng)過(guò)通道的緩沖區(qū),直接拷貝
        // 當(dāng)緩沖通道的緩沖隊(duì)列緩沖通道滿時(shí)
        //   先從緩沖通道中取出排在最前邊的數(shù)據(jù),寫入到ep(若有)
        //   清除緩沖通道對(duì)應(yīng)位置的空間
        //   取出發(fā)送隊(duì)列中排在最前邊的goroutine伸蚯,將其所攜帶的數(shù)據(jù)放入緩沖隊(duì)列尾部
        //   由于此時(shí)緩沖隊(duì)列是滿的
        //   所以從緩沖隊(duì)列中拿出的數(shù)據(jù)地址摩渺,和發(fā)送goroutine放入數(shù)據(jù)的地址,是一個(gè)地址
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩沖隊(duì)列中不為空
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            //ep不為nil剂邮,將緩沖隊(duì)列中摇幻,索引為c.recvx對(duì)應(yīng)的值寫到ep中
            typedmemmove(c.elemtype, ep, qp)
        }
        //將緩沖通道中c.recvx索引指向的值變?yōu)橥ǖ李愋偷牧阒?        //由于上邊已經(jīng)判斷ep != nil的情況了,所以這里直接將值丟棄
        //為緩沖通道清理空間
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            //如果緩沖通道元素?cái)?shù)量達(dá)到了通道容量,就將發(fā)送index改為0绰姻,構(gòu)造環(huán)形數(shù)組
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    // 下邊的邏輯和發(fā)送goroutine隊(duì)列邏輯一樣枉侧,就不重復(fù)分析了
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            //非緩沖通道,并且ep != nil狂芋,直接將發(fā)送goroutine的數(shù)據(jù)寫ep
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        // 緩沖通道棵逊,并且只有緩沖隊(duì)列滿了,才會(huì)走到這里
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
            //先從緩沖通道中取出c.recvx對(duì)應(yīng)的數(shù)據(jù)银酗,寫入到ep
            typedmemmove(c.elemtype, ep, qp)
        }
        // 將發(fā)送隊(duì)列中排在最前邊的goroutine所攜帶的數(shù)據(jù)
        // 放入c.recvx對(duì)應(yīng)的空間
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

關(guān)閉通道close(ch)

func closechan(c *hchan) {
    //關(guān)閉未初始化(nil)的通道辆影,會(huì)panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
        //關(guān)閉已經(jīng)關(guān)閉的通道,會(huì)panic
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
        racerelease(unsafe.Pointer(c))
    }

    c.closed = 1

    var glist *g

    // release all readers
    // 喚醒所有接收隊(duì)列中的goroutine黍特,清空接收隊(duì)列
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            //釋放內(nèi)存
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 將goroutine入glist
        // 為最后喚醒(goready)全部goroutine做準(zhǔn)備
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }

    // release all writers (they will panic)
    // 喚醒所有發(fā)送隊(duì)列中的goroutine蛙讥,清空發(fā)送隊(duì)列
    // 該操作會(huì)使所有發(fā)送goroutine panic
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        //釋放內(nèi)存
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        //喚醒goroutine(Grunnable)
        goready(gp, 3)
    }
}

channel涉及到select相關(guān)的源碼分析,等和select源碼一起分析吧~
最后灭衷,貼上通道數(shù)據(jù)傳遞的圖次慢,結(jié)合通道發(fā)送、接收的源碼來(lái)看~非常形象


非緩沖通道或接收端阻塞.png

緩沖通道.png

參考:
http://www.cnblogs.com/zkweb/p/7815600.html?utm_campaign=studygolang.com&utm_medium=studygolang.com&utm_source=studygolang.com
https://blog.csdn.net/qq_25870633/article/details/83388952
http://legendtkl.com/2017/07/30/understanding-golang-channel/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末翔曲,一起剝皮案震驚了整個(gè)濱河市迫像,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瞳遍,老刑警劉巖闻妓,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異掠械,居然都是意外死亡由缆,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門猾蒂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)均唉,“玉大人,你說(shuō)我怎么就攤上這事肚菠√蚣” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵蚊逢,是天一觀的道長(zhǎng)层扶。 經(jīng)常有香客問(wèn)我,道長(zhǎng)时捌,這世上最難降的妖魔是什么怒医? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任炉抒,我火速辦了婚禮奢讨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己拿诸,他們只是感情好扒袖,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著亩码,像睡著了一般季率。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上描沟,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天飒泻,我揣著相機(jī)與錄音,去河邊找鬼吏廉。 笑死泞遗,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的席覆。 我是一名探鬼主播史辙,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼佩伤!你這毒婦竟也來(lái)了聊倔?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤生巡,失蹤者是張志新(化名)和其女友劉穎耙蔑,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體孤荣,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡纵潦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了垃环。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片邀层。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖遂庄,靈堂內(nèi)的尸體忽然破棺而出寥院,到底是詐尸還是另有隱情,我是刑警寧澤涛目,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布秸谢,位于F島的核電站,受9級(jí)特大地震影響霹肝,放射性物質(zhì)發(fā)生泄漏估蹄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一沫换、第九天 我趴在偏房一處隱蔽的房頂上張望臭蚁。 院中可真熱鬧,春花似錦、人聲如沸垮兑。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)系枪。三九已至雀哨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間私爷,已是汗流浹背雾棺。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留衬浑,地道東北人垢村。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嚎卫,于是被迫代替她去往敵國(guó)和親嘉栓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345