由淺入深剖析 go channel

由淺入深剖析 go channel

channel 是 golang 中最核心的 feature 之一揖闸,因此理解 Channel 的原理對于學(xué)習(xí)和使用 golang 非常重要。

channel 是 goroutine 之間通信的一種方式凯正,可以類比成 Unix 中的進(jìn)程的通信方式管道组题。

CSP 模型

在講 channel 之前姥宝,有必要先提一下 CSP 模型善涨,傳統(tǒng)的并發(fā)模型主要分為 Actor 模型和 CSP 模型掏婶,CSP 模型全稱為 communicating sequential processes啃奴,CSP 模型由并發(fā)執(zhí)行實(shí)體(進(jìn)程,線程或協(xié)程)气堕,和消息通道組成纺腊,實(shí)體之間通過消息通道發(fā)送消息進(jìn)行通信。和 Actor 模型不同茎芭,CSP 模型關(guān)注的是消息發(fā)送的載體揖膜,即通道,而不是發(fā)送消息的執(zhí)行實(shí)體梅桩。關(guān)于 CSP 模型的更進(jìn)一步的介紹壹粟,有興趣的同學(xué)可以閱讀論文 Communicating Sequential Processes,Go 語言的并發(fā)模型參考了 CSP 理論宿百,其中執(zhí)行實(shí)體對應(yīng)的是 goroutine趁仙, 消息通道對應(yīng)的就是 channel。

channel 介紹

channel 提供了一種通信機(jī)制垦页,通過它雀费,一個(gè) goroutine 可以想另一 goroutine 發(fā)送消息。channel 本身還需關(guān)聯(lián)了一個(gè)類型痊焊,也就是 channel 可以發(fā)送數(shù)據(jù)的類型盏袄。例如: 發(fā)送 int 類型消息的 channel 寫作 chan int 。

channel 創(chuàng)建

channel 使用內(nèi)置的 make 函數(shù)創(chuàng)建薄啥,下面聲明了一個(gè) chan int 類型的 channel:

ch := make(chan int)

c和 map 類似辕羽,make 創(chuàng)建了一個(gè)底層數(shù)據(jù)結(jié)構(gòu)的引用,當(dāng)賦值或參數(shù)傳遞時(shí)垄惧,只是拷貝了一個(gè) channel 引用刁愿,指向相同的 channel 對象。和其他引用類型一樣到逊,channel 的空值為 nil 铣口。使用 == 可以對類型相同的 channel 進(jìn)行比較,只有指向相同對象或同為 nil 時(shí)蕾管,才返回 true
枷踏。

channel 的讀寫操作

ch := make(chan int)

// write to channel
ch <- x

// read from channel
x <- ch

// another way to read
x = <- ch

channel 一定要初始化后才能進(jìn)行讀寫操作,否則會永久阻塞掰曾。

關(guān)閉 channel

golang 提供了內(nèi)置的 close 函數(shù)對 channel 進(jìn)行關(guān)閉操作旭蠕。

ch := make(chan int)

close(ch)

有關(guān) channel 的關(guān)閉,你需要注意以下事項(xiàng):

  • 關(guān)閉一個(gè)未初始化(nil) 的 channel 會產(chǎn)生 panic
  • 重復(fù)關(guān)閉同一個(gè) channel 會產(chǎn)生 panic
  • 向一個(gè)已關(guān)閉的 channel 中發(fā)送消息會產(chǎn)生 panic
  • 從已關(guān)閉的 channel 讀取消息不會產(chǎn)生 panic旷坦,且能讀出 channel 中還未被讀取的消息掏熬,若消息均已讀出,則會讀到類型的零值秒梅。從一個(gè)已關(guān)閉的 channel 中讀取消息永遠(yuǎn)不會阻塞旗芬,并且會返回一個(gè)為 false 的 ok-idiom,可以用它來判斷 channel 是否關(guān)閉
  • 關(guān)閉 channel 會產(chǎn)生一個(gè)廣播機(jī)制捆蜀,所有向 channel 讀取消息的 goroutine 都會收到消息
ch := make(chan int, 10)
ch <- 11
ch <- 12

close(ch)

for x := range ch {
    fmt.Println(x)
}

x, ok := <- ch
fmt.Println(x, ok)


-----
output:

11
12
0 false

channel 的類型

channel 分為不帶緩存的 channel 和帶緩存的 channel疮丛。

無緩存的 channel

從無緩存的 channel 中讀取消息會阻塞幔嫂,直到有 goroutine 向該 channel 中發(fā)送消息;同理誊薄,向無緩存的 channel 中發(fā)送消息也會阻塞履恩,直到有 goroutine 從 channel 中讀取消息。

通過無緩存的 channel 進(jìn)行通信時(shí)呢蔫,接收者收到數(shù)據(jù) happens before 發(fā)送者 goroutine 喚醒

有緩存的 channel

有緩存的 channel 的聲明方式為指定 make 函數(shù)的第二個(gè)參數(shù)切心,該參數(shù)為 channel 緩存的容量

ch := make(chan int, 10)

有緩存的 channel 類似一個(gè)阻塞隊(duì)列(采用環(huán)形數(shù)組實(shí)現(xiàn))。當(dāng)緩存未滿時(shí)片吊,向 channel 中發(fā)送消息時(shí)不會阻塞绽昏,當(dāng)緩存滿時(shí),發(fā)送操作將被阻塞俏脊,直到有其他 goroutine 從中讀取消息全谤;相應(yīng)的,當(dāng) channel 中消息不為空時(shí)爷贫,讀取消息不會出現(xiàn)阻塞啼县,當(dāng) channel 為空時(shí),讀取操作會造成阻塞沸久,直到有 goroutine 向 channel 中寫入消息季眷。

ch := make(chan int, 3)

// blocked, read from empty buffered channel
<- ch
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3

// blocked, send to full buffered channel
ch <- 4

通過 len 函數(shù)可以獲得 chan 中的元素個(gè)數(shù),通過 cap 函數(shù)可以得到 channel 的緩存長度卷胯。

channel 的用法

goroutine 通信

看一個(gè) effective go 中的例子:

c := make(chan int)  // Allocate a channel.

// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()

doSomethingForAWhile()
<-c

主 goroutine 會阻塞子刮,直到執(zhí)行 sort 的 goroutine 完成。

range 遍歷

channel 也可以使用 range 取值窑睁,并且會一直從 channel 中讀取數(shù)據(jù)挺峡,直到有 goroutine 對改 channel 執(zhí)行 close 操作,循環(huán)才會結(jié)束担钮。

// consumer worker
ch := make(chan int, 10)
for x := range ch{
    fmt.Println(x)
}

等價(jià)于

for {
    x, ok := <- ch
    if !ok {
        break
    }
    
    fmt.Println(x)
}

配合 select 使用

select 用法類似與 IO 多路復(fù)用橱赠,可以同時(shí)監(jiān)聽多個(gè) channel 的消息狀態(tài),看下面的例子

select {
    case <- ch1:
    ...
    case <- ch2:
    ...
    case ch3 <- 10;
    ...
    default:
    ...
}
  • select 可以同時(shí)監(jiān)聽多個(gè) channel 的寫入或讀取
  • 執(zhí)行 select 時(shí)箫津,若只有一個(gè) case 通過(不阻塞)狭姨,則執(zhí)行這個(gè) case 塊
  • 若有多個(gè) case 通過,則隨機(jī)挑選一個(gè) case 執(zhí)行
  • 若所有 case 均阻塞苏遥,且定義了 default 模塊饼拍,則執(zhí)行 default 模塊。若未定義 default 模塊田炭,則 select 語句阻塞师抄,直到有 case 被喚醒。
  • 使用 break 會跳出 select 塊教硫。

1. 設(shè)置超時(shí)時(shí)間

ch := make(chan struct{})

// finish task while send msg to ch
go doTask(ch)

timeout := time.After(5 * time.Second)
select {
    case <- ch:
        fmt.Println("task finished.")
    case <- timeout:
        fmt.Println("task timeout.")
}

2. quite channel

有一些場景中叨吮,一些 worker goroutine 需要一直循環(huán)處理信息辆布,直到收到 quit 信號

msgCh := make(chan struct{})
quitCh := make(chan struct{})
for {
    select {
    case <- msgCh:
        doWork()
    case <- quitCh:
        finish()
        return
}

單向 channel

即只可寫入或只可讀的channel,事實(shí)上 channel 只讀或只寫都沒有意義茶鉴,所謂的單向 channel 其實(shí)知識聲明時(shí)用谚殊,比如

func foo(ch chan<- int) <-chan int {...}

chan<- int 表示一個(gè)只可寫入的 channel,<-chan int 表示一個(gè)只可讀取的 channel蛤铜。上面這個(gè)函數(shù)約定了 foo 內(nèi)只能從向 ch 中寫入數(shù)據(jù),返回只一個(gè)只能讀取的 channel丛肢,雖然使用普通的 channel 也沒有問題围肥,但這樣在方法聲明時(shí)約定可以防止 channel 被濫用,這種預(yù)防機(jī)制發(fā)生再編譯期間蜂怎。

channel 源碼分析

channel 的主要實(shí)現(xiàn)在 src/runtime/chan.go 中穆刻,以下源碼均基于 go1.9.2。源碼閱讀時(shí)為了更好的理解 channel 特性杠步,幫助正確合理的使用 channel氢伟,閱讀代碼的過程可以回憶前面章節(jié)的 channel 特性。

channel 類結(jié)構(gòu)

channel 相關(guān)類定義如下:

// channel 類型定義
type hchan struct {
    // channel 中的元素?cái)?shù)量, len
    qcount   uint           // total data in the queue
    
    // channel 的大小, cap
    dataqsiz uint           // size of the circular queue
    
    // channel 的緩沖區(qū)幽歼,環(huán)形數(shù)組實(shí)現(xiàn)
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    
    // 單個(gè)元素的大小
    elemsize uint16
    
    // closed 標(biāo)志位
    closed   uint32
    
    // 元素的類型
    elemtype *_type // element type
    
    // send 和 recieve 的索引朵锣,用于實(shí)現(xiàn)環(huán)形數(shù)組隊(duì)列
    sendx    uint   // send index
    recvx    uint   // receive index
    
    // recv goroutine 等待隊(duì)列
    recvq    waitq  // list of recv waiters
    
    // send goroutine 等待隊(duì)列
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

// 等待隊(duì)列的鏈表實(shí)現(xiàn)
type waitq struct {    
    first *sudog       
    last  *sudog       
}

// in src/runtime/runtime2.go
// 對 G 的封裝
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

可以看到,channel 的主要組成有:一個(gè)環(huán)形數(shù)組實(shí)現(xiàn)的隊(duì)列甸私,用于存儲消息元素诚些;兩個(gè)鏈表實(shí)現(xiàn)的 goroutine 等待隊(duì)列,用于存儲阻塞在 recv 和 send 操作上的 goroutine皇型;一個(gè)互斥鎖诬烹,用于各個(gè)屬性變動(dòng)的同步

channel make 實(shí)現(xiàn)

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // case 1: channel 不含有指針
        // case 2: size == 0,即無緩沖 channel
        // Allocate memory in one call.
        // Hchan does not contain pointers interesting for GC in this case:
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        
        // 在堆上分配連續(xù)的空間用作 channel
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // 有緩沖 channel 初始化
        c = new(hchan)
        // 堆上分配 buf 內(nèi)存
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

make 的過程還比較簡單弃鸦,需要注意一點(diǎn)的是當(dāng)元素不含指針的時(shí)候绞吁,會將整個(gè) hchan 分配成一個(gè)連續(xù)的空間。

channel send

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    // 前面章節(jié)說道的唬格,當(dāng) channel 未初始化或?yàn)?nil 時(shí)家破,向其中發(fā)送數(shù)據(jù)將會永久阻塞
    if c == nil {
        if !block {
            return false
        }
        
        // gopark 會使當(dāng)前 goroutine 休眠,并通過 unlockf 喚醒购岗,但是此時(shí)傳入的 unlockf 為 nil, 因此员舵,goroutine 會一直休眠
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    if debugChan {
        print("chansend: chan=", c, "\n")
    }

    if raceenabled {
        racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation.
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 獲取同步鎖
    lock(&c.lock)

    // 之前章節(jié)提過,向已經(jīng)關(guān)閉的 channel 發(fā)送消息會產(chǎn)生 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

    // CASE1: 當(dāng)有 goroutine 在 recv 隊(duì)列上等待時(shí)藕畔,跳過緩存隊(duì)列马僻,將消息直接發(fā)給 reciever goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // CASE2: 緩存隊(duì)列未滿,則將消息復(fù)制到緩存隊(duì)列上
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if !block {
        unlock(&c.lock)
        return false
    }
    
    // CASE3: 緩存隊(duì)列已滿注服,將goroutine 加入 send 隊(duì)列
    // 初始化 sudog
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // 加入隊(duì)列
    c.sendq.enqueue(mysg)
    // 休眠
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 喚醒 goroutine
    // someone woke us up.
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
    

從 send 代碼中可以看到韭邓,之前章節(jié)提到的一些特性都在代碼中有所體現(xiàn)措近,

send 有以下幾種情況:

  • 有 goroutine 阻塞在 channel recv 隊(duì)列上,此時(shí)緩存隊(duì)列為空女淑,直接將消息發(fā)送給 reciever goroutine,只產(chǎn)生一次復(fù)制
  • 當(dāng) channel 緩存隊(duì)列有剩余空間時(shí)瞭郑,將數(shù)據(jù)放到隊(duì)列里,等待接收鸭你,接收后總共產(chǎn)生兩次復(fù)制
  • 當(dāng) channel 緩存隊(duì)列已滿時(shí)屈张,將當(dāng)前 goroutine 加入 send 隊(duì)列并阻塞。

channel recieve

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")
    }

    // 從 nil 的 channel 中接收消息,永久阻塞
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not ready for receiving, we observe that the
    // channel is not closed. Each of these observations is a single word-sized read
    // (first c.sendq.first or c.qcount, and second c.closed).
    // Because a channel cannot be reopened, the later observation of the channel
    // being not closed implies that it was also not closed at the moment of the
    // first observation. We behave as if we observed the channel at that moment
    // and report that the receive cannot proceed.
    //
    // The order of operations is important here: reversing the operations can lead to
    // incorrect behavior when racing with a close.
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)
    
    // CASE1: 從已經(jīng) close 且為空的 channel recv 數(shù)據(jù),返回空值
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

    // CASE2: send 隊(duì)列不為空
    // CASE2.1: 緩存隊(duì)列為空畏浆,直接從 sender recv 元素
    // CASE2.2: 緩存隊(duì)列不為空竟终,此時(shí)只有可能是緩存隊(duì)列已滿,從隊(duì)列頭取出元素,并喚醒 sender 將元素寫入緩存隊(duì)列尾部。由于為環(huán)形隊(duì)列,因此焰盗,隊(duì)列滿時(shí)只需要將隊(duì)列頭復(fù)制給 reciever,同時(shí)將 sender 元素復(fù)制到該位置咒林,并移動(dòng)隊(duì)列頭尾索引熬拒,不需要移動(dòng)隊(duì)列元素
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // CASE3: 緩存隊(duì)列不為空,直接從隊(duì)列取元素垫竞,移動(dòng)頭索引
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    
    if !block {
        unlock(&c.lock)
        return false, false
    }
    
    // CASE4: 緩存隊(duì)列為空梦湘,將 goroutine 加入 recv 隊(duì)列,并阻塞
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

channel close

func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    
    // 重復(fù) close件甥,產(chǎn)生 panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc(unsafe.Pointer(&c))
        racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
        racerelease(unsafe.Pointer(c))
    }

    c.closed = 1

    var glist *g

    // 喚醒所有 reciever
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }

    // 喚醒所有 sender捌议,并產(chǎn)生 panic
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for glist != nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市引有,隨后出現(xiàn)的幾起案子瓣颅,更是在濱河造成了極大的恐慌,老刑警劉巖譬正,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宫补,死亡現(xiàn)場離奇詭異,居然都是意外死亡曾我,警方通過查閱死者的電腦和手機(jī)粉怕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來抒巢,“玉大人贫贝,你說我怎么就攤上這事。” “怎么了稚晚?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵崇堵,是天一觀的道長。 經(jīng)常有香客問我客燕,道長鸳劳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任也搓,我火速辦了婚禮赏廓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘傍妒。我一直安慰自己幔摸,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布拍顷。 她就那樣靜靜地躺著,像睡著了一般塘幅。 火紅的嫁衣襯著肌膚如雪昔案。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天电媳,我揣著相機(jī)與錄音踏揣,去河邊找鬼。 笑死匾乓,一個(gè)胖子當(dāng)著我的面吹牛捞稿,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播拼缝,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼娱局,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了咧七?” 一聲冷哼從身側(cè)響起衰齐,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎继阻,沒想到半個(gè)月后耻涛,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡瘟檩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年抹缕,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片墨辛。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡卓研,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出睹簇,到底是詐尸還是另有隱情鉴分,我是刑警寧澤哮幢,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站志珍,受9級特大地震影響橙垢,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜伦糯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一柜某、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧敛纲,春花似錦喂击、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至旁壮,卻和暖如春监嗜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背抡谐。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工裁奇, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人麦撵。 一個(gè)月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓刽肠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親免胃。 傳聞我的和親對象是個(gè)殘疾皇子音五,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Go的內(nèi)存模型 看完這篇文章你會明白 一個(gè)Go程序在啟動(dòng)時(shí)的執(zhí)行順序 并發(fā)的執(zhí)行順序 并發(fā)環(huán)境下如何保證數(shù)據(jù)的同步...
    初級賽亞人閱讀 2,838評論 0 2
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,873評論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)羔沙,斷路器放仗,智...
    卡卡羅2017閱讀 134,600評論 18 139
  • 這是一篇馬化騰的自述,我做了節(jié)選撬碟,以他所在的高度诞挨,以他的視角講述的過往經(jīng)歷,我們才能知道為什么他會如此成功呢蛤。...
    熙熙Breathe閱讀 239評論 0 2
  • 昨天的文章探討了牛人和普通人的差別是因?yàn)榕H苏莆樟吮绕胀ㄈ烁喔呒壐者m的心理表征惶傻,而這些心理表征都是以知識的形...
    勇哥在進(jìn)化閱讀 859評論 2 10