go-channel初識(shí)

了解過go的都知道因块,go最為突出的優(yōu)點(diǎn)就是它天然支持高并發(fā)震贵,但是所有高并發(fā)情況都面臨著一個(gè)很明顯的問題淹魄,就是并發(fā)的多線程或多協(xié)程之間如何通信郁惜,而channel就是go中g(shù)oroutine通信的‘管道’。

channel在go中時(shí)如何使用的

package main

import (
  "fmt"
  "os"
  "os/signal"
  "syscall"
  "time"
)

var exit = make(chan string, 1)

func main() {
  go dealSignal()
  exited := make(chan struct{}, 1)
  go channel1(exited)
  count := 0
  t := time.Tick(time.Second)
Loop:
  for {
    select {
    case <-t:
      count++
      fmt.Printf("main run %d\n", count)
    case <-exited:
      fmt.Println("main exit begin")
      break Loop
    }
  }
  fmt.Println("main exit end")
}

func dealSignal() {
  c := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-c
    exit <- "shutdown"
  }()
}

func channel1(exited chan<- struct{}) {
  t := time.Tick(time.Second)
  count := 0
  for {
    select {
    case <-t:
      count++
      fmt.Printf("channel1 run %d\n", count)
    case <-exit:
      fmt.Println("channel1 exit")
      close(exited)
      return
    }
  }
}

這個(gè)例子首先并發(fā)出一個(gè)dealsign方法甲锡,用來接收關(guān)閉信號(hào)兆蕉,如果接收到關(guān)閉信號(hào)后往exit channel發(fā)送一條消息,然后并發(fā)運(yùn)行channel1缤沦,channel1中定了一個(gè)ticker虎韵,正常情況下channel1每秒打印第一個(gè)case語(yǔ)句,如果接收到exit的信號(hào)缸废,進(jìn)入第二個(gè)case包蓝,然后關(guān)閉傳入的exited channel缩多,那么main中的Loop,接收到exited關(guān)閉的信號(hào)后养晋,打印“main exit begin”衬吆, 然后退出循環(huán),進(jìn)程成功退出绳泉。這個(gè)例子演示了channel在goroutine中起到的傳遞消息的作用逊抡。這個(gè)例子是為了向大家展示channel在多個(gè)goroutine之間進(jìn)行通信。

Channel在底層是什么樣的


type hchan struct {
  qcount   uint           // total data in the queue零酪;chan中的元素總數(shù)
  dataqsiz uint           // size of the circular queue冒嫡;底層循環(huán)數(shù)組的size
  buf      unsafe.Pointer // points to an array of dataqsiz elements,指向底層循環(huán)數(shù)組的指針四苇,只針對(duì)有緩沖的channel
  elemsize uint16  //chan中元素的大小
  closed   uint32  //chan是否關(guān)閉
  elemtype *_type // element type孝凌;元素類型
  sendx    uint   // send index;已發(fā)送元素在循環(huán)數(shù)組中的索引
  recvx    uint   // receive index月腋;已接收元素在循環(huán)數(shù)組中的索引
  recvq    waitq  // list of recv waiters蟀架,等待接收消息的goroutine隊(duì)列
  sendq    waitq  // list of send waiters,等待發(fā)送消息的goroutine隊(duì)列

  // lock protects all fields in hchan, as well as several
  // fields in sudogs blocked on this channel.
  //
  // Do not change another G's status while holding this lock
  // (in particular, do not ready a G), as this can deadlock
  // with stack shrinking.
  lock mutex
}

type waitq struct {
  first *sudog
  last  *sudog
}

創(chuàng)建一個(gè)底層數(shù)組容量為5榆骚,元素類型為int片拍,那么channel的數(shù)據(jù)結(jié)構(gòu)如下圖所示:


創(chuàng)建channel的時(shí)候到底發(fā)生了什么

創(chuàng)建channel的時(shí)候,其實(shí)底層是調(diào)用makechan方法妓肢,我們來看下源碼:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // compiler checks this but be safe.
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }

  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }

  // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
  // buf points into the same allocation, elemtype is persistent.
  // SudoG's are referenced from their owning thread so they can't be collected.
  // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  var c *hchan
  switch {
  case mem == 0:
    // Queue or element size is zero.
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // Race detector uses this location for synchronization.
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // Elements do not contain pointers.
    // Allocate hchan and buf in one call.
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // Elements contain pointers.
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  }
  return c
}

從函數(shù)原型來看捌省,創(chuàng)建的 chan 是一個(gè)指針。所以我們能在函數(shù)間直接傳遞 channel碉钠,而不用傳遞 channel 的指針纲缓。

具體來看下代碼:
可以看出makechan中其實(shí)主要的代碼就是一個(gè)switch,針對(duì)不同的情況:

1喊废、case mem == 0代表無緩沖型channel祝高,只分配hchan本身結(jié)構(gòu)體大小的內(nèi)存
2、case elem.ptrdata==0 代表元素類型不含指針操禀,只分配hchan本身結(jié)構(gòu)體大小+元素大小*個(gè)數(shù)的內(nèi)存褂策,是連續(xù)的內(nèi)存空間
3横腿、default元素類型包括指針颓屑,兩次分配內(nèi)存的操作

channel的接收與發(fā)送

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

首先創(chuàng)建了一個(gè)無緩沖型的channel,然后啟動(dòng)兩個(gè)goroutine去消費(fèi)channel的數(shù)據(jù)耿焊,緊接著向channel中發(fā)送數(shù)據(jù)揪惦。我們一步一步來分析channel是如何接收和發(fā)送數(shù)據(jù)的,首先來看接收罗侯,golang中接收channel數(shù)據(jù)有兩種方式:

i <- ch
i, ok <- ch

// 位于 src/runtime/chan.go

// chanrecv 函數(shù)接收 channel c 的元素并將其寫入 ep 所指向的內(nèi)存地址器腋。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false纫塌,即非阻塞型接收诊县,在沒有數(shù)據(jù)可接收的情況下,返回 (false, false)
// 否則措左,如果 c 處于關(guān)閉狀態(tài)依痊,將 ep 指向的地址清零,返回 (true, false)
// 否則怎披,用返回值填充 ep 指向的內(nèi)存地址胸嘁。返回 (true, true)
// 如果 ep 非空,則應(yīng)該指向堆或者函數(shù)調(diào)用者的棧

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 內(nèi)容 …………

    // 如果是一個(gè) nil 的 channel
    if c == nil {
        // 如果不阻塞凉逛,直接返回 (false, false)
        if !block {
            return
        }
        // 否則性宏,接收一個(gè) nil 的 channel,goroutine 掛起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不會(huì)執(zhí)行到這里
        throw("unreachable")
    }

    // 在非阻塞模式下状飞,快速檢測(cè)到失敗毫胜,不用獲取鎖,快速返回
    // 當(dāng)我們觀察到 channel 沒準(zhǔn)備好接收:
    // 1. 非緩沖型诬辈,等待發(fā)送列隊(duì) sendq 里沒有 goroutine 在等待
    // 2. 緩沖型指蚁,但 buf 里沒有元素
    // 之后,又觀察到 closed == 0自晰,即 channel 未關(guān)閉凝化。
    // 因?yàn)?channel 不可能被重復(fù)打開,所以前一個(gè)觀測(cè)的時(shí)候 channel 也是未關(guān)閉的酬荞,
    // 因此在這種情況下可以直接宣布接收失敗搓劫,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // channel 已關(guān)閉,并且循環(huán)數(shù)組 buf 里沒有元素
    // 這里可以處理非緩沖型關(guān)閉 和 緩沖型關(guān)閉但 buf 無元素的情況
    // 也就是說即使是關(guān)閉狀態(tài)混巧,但在緩沖型的 channel枪向,
    // buf 里有元素的情況下還能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解鎖
        unlock(&c.lock)
        if ep != nil {
            // 從一個(gè)已關(guān)閉的 channel 執(zhí)行接收操作,且未忽略返回值
            // 那么接收的值將是一個(gè)該類型的零值
            // typedmemclr 根據(jù)類型清理相應(yīng)地址的內(nèi)存
            typedmemclr(c.elemtype, ep)
        }
        // 從一個(gè)已關(guān)閉的 channel 接收咧党,selected 會(huì)返回true
        return true, false
    }

    // 等待發(fā)送隊(duì)列里有 goroutine 存在秘蛔,說明 buf 是滿的
    // 這有可能是:
    // 1. 非緩沖型的 channel
    // 2. 緩沖型的 channel翰萨,但 buf 滿了
    // 針對(duì) 1泰演,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)
    // 針對(duì) 2芙委,接收到循環(huán)數(shù)組頭部的元素坟募,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩沖型爽丹,buf 里有元素缺虐,可以正常接收
    if c.qcount > 0 {
        // 直接從循環(huán)數(shù)組里找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 代碼里蹂季,沒有忽略要接收的值麻汰,不是 "<- ch"绣的,而是 "val <- ch"叠赐,ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉循環(huán)數(shù)組里相應(yīng)位置的值
        typedmemclr(c.elemtype, qp)
        // 接收游標(biāo)向前移動(dòng)
        c.recvx++
        // 接收游標(biāo)歸零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 數(shù)組里的元素個(gè)數(shù)減 1
        c.qcount--
        // 解鎖
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收欲账,解鎖。selected 返回 false芭概,因?yàn)闆]有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下來就是要被阻塞的情況了
    // 構(gòu)造一個(gè) sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收數(shù)據(jù)的地址保存下來
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 進(jìn)入channel 的等待接收隊(duì)列
    c.recvq.enqueue(mysg)
    // 將當(dāng)前 goroutine 掛起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被喚醒了赛不,接著從這里繼續(xù)執(zhí)行一些掃尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
Step1

如果channel是nil:如果是非阻塞模式,直接返回(false罢洲,false)俄删;如果是阻塞模式,調(diào)用goprak掛起goroutine奏路,會(huì)阻塞下去畴椰。

if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
Step2

快速操作(不用獲取鎖,快速返回)鸽粉,三組條件全部滿足斜脂,快速返(false,false)

條件1:首先是在非阻塞模式下
條件2:如果是非緩沖型(datasiz=0)并且等待發(fā)送goroutine隊(duì)列為空(sendq.first=nil触机,就是沒人往channel寫數(shù)據(jù))帚戳,或者緩沖型channel(datasiz>0)并且buf中沒有數(shù)據(jù);
條件3:channel未關(guān)閉


//##################step2####################
  if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
    c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
    atomic.Load(&c.closed) == 0 {
    return
  }
Step3

首先加鎖儡首,如果channel已經(jīng)關(guān)閉片任,并且buf中沒有元素,返回對(duì)應(yīng)類型的0值蔬胯,但是received為false对供;兩種情況

情形1:非緩沖型,channel已關(guān)閉
情形2:緩沖型氛濒,channel已關(guān)閉产场,并且buf無元素

也就是說即使是關(guān)閉狀態(tài),但在緩沖型的 channel舞竿,
buf 里有元素的情況下還能接收到元素


//##################step3####################
  lock(&c.lock)

  if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)
    }
    return true, false
  }
step4

如果等待發(fā)送隊(duì)列中有元素京景,證明channel已經(jīng)滿了,兩種情形

情形1:非緩沖型骗奖,無buf
情形2:緩沖型确徙,buf滿了

//##################step4####################
if sg := c.sendq.dequeue(); sg != nil {
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }

兩種情形都正常進(jìn)入recv方法,我們來看下源碼:


func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  //##################step4-1####################
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      // copy data from sender
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
     //##################step4-2####################
    // Queue is full. Take the item at the
    // head of the queue. Make the sender enqueue
    // its item at the tail of the queue. Since the
    // queue is full, those are both the same slot.
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
    }
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

針對(duì) 1执桌,直接進(jìn)行內(nèi)存拷貝(從 sender goroutine -> receiver goroutine)(從發(fā)送者的棧copy到接收者的棧)
針對(duì) 2鄙皇,接收到循環(huán)數(shù)組頭部的元素,并將發(fā)送者的元素放到循環(huán)數(shù)組尾部.
然后喚醒等待發(fā)送隊(duì)列中的goroutine鼻吮,等待調(diào)度器調(diào)度育苟。

step5

沒有等待發(fā)送的隊(duì)列,并且buf中有元素椎木,直接把接收游標(biāo)處的數(shù)據(jù)copy到接收數(shù)據(jù)的地址违柏,然后改變hchan中元素?cái)?shù)據(jù)。

if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
step6

如果是非阻塞香椎,那么直接返回漱竖;如果是阻塞的,構(gòu)造sudog畜伐,保存各種值馍惹;將sudog保存到channel的recvq中,調(diào)用goparkunlock將goroutine掛起

if !block {
    unlock(&c.lock)
    return false, false
  }
// no sender available: block on this channel.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil
  c.recvq.enqueue(mysg)
  goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

  // someone woke us up
  if mysg != gp.waiting {
    throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  closed := gp.param == nil
  gp.param = nil
  mysg.c = nil
  releaseSudog(mysg)
  return true, !closed

非阻塞接收玛界,解鎖万矾。selected 返回 false,因?yàn)闆]有接收到值

我們繼續(xù)之前的例子慎框。前面說到第 14 行良狈,創(chuàng)建了一個(gè)非緩沖型的 channel,接著笨枯,第 15薪丁、16 行分別創(chuàng)建了一個(gè) goroutine,各自執(zhí)行了一個(gè)接收操作馅精。通過前面的源碼分析严嗜,我們知道,這兩個(gè) goroutine (后面稱為 G1 和 G2 好了)都會(huì)被阻塞在接收操作洲敢。G1 和 G2 會(huì)掛在 channel 的 recq 隊(duì)列中漫玄,形成一個(gè)雙向循環(huán)鏈表。

在程序的 17 行之前压彭,chan 的整體數(shù)據(jù)結(jié)構(gòu)如下:


buf 指向一個(gè)長(zhǎng)度為 0 的數(shù)組称近,qcount 為 0,表示 channel 中沒有元素哮塞。重點(diǎn)關(guān)注 recvq 和 sendq刨秆,它們是 waitq 結(jié)構(gòu)體,而 waitq 實(shí)際上就是一個(gè)雙向鏈表忆畅,鏈表的元素是 sudog衡未,里面包含 g 字段,g 表示一個(gè) goroutine家凯,所以 sudog 可以看成一個(gè) goroutine缓醋。recvq 存儲(chǔ)那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則存儲(chǔ)那些嘗試寫入 channel绊诲,但被阻塞的 goroutine送粱。

此時(shí),我們可以看到掂之,recvq 里掛了兩個(gè) goroutine抗俄,也就是前面啟動(dòng)的 G1 和 G2脆丁。因?yàn)闆]有 goroutine 接收,而 channel 又是無緩沖類型动雹,所以 G1 和 G2 被阻塞槽卫。sendq 沒有被阻塞的 goroutine。

再?gòu)恼w上來看一下 chan 此時(shí)的狀態(tài):


當(dāng)一個(gè)channel關(guān)閉后胰蝠,我們依然可以從中讀出數(shù)據(jù)歼培,如果chan的buf中有元素,則讀出的是chan中buf的數(shù)據(jù)茸塞,如果buf為空躲庄,則輸出對(duì)應(yīng)元素類型的零值。那么我們來看下如下的一段程序:

package main

import (
  "fmt"
  "os"
  "os/signal"
  "syscall"
  "time"
)

var exit1 = make(chan struct{}, 1)

func main() {
  go dealSignal1()
  count := 0
  t := time.Tick(time.Second)
  for {
    select {
    case <-t:
      count++
      fmt.Printf("main run %d\n", count)
    case <-exit1:
      fmt.Println("main exit begin")
    }
  }
  fmt.Println("main exit over")
}

func dealSignal1() {
  c := make(chan os.Signal, 2)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-c
    close(exit1)
  }()
}

發(fā)送

接著上面的例子钾虐,G1 和 G2 現(xiàn)在都在 recvq 隊(duì)列里了噪窘。
17 行向 channel 發(fā)送了一個(gè)元素 3。

發(fā)送操作最終轉(zhuǎn)化為 chansend 函數(shù)禾唁,直接上源碼效览,同樣大部分都注釋了,可以看懂主流程:

// 位于 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞荡短,直接返回 false丐枉,表示未發(fā)送成功
        if !block {
            return false
        }
        // 當(dāng)前 goroutine 被掛起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相關(guān)……

    // 對(duì)于不阻塞的 send,快速檢測(cè)失敗場(chǎng)景
    //
    // 如果 channel 未關(guān)閉且 channel 沒有多余的緩沖空間掘托。這可能是:
    // 1. channel 是非緩沖型的瘦锹,且等待接收隊(duì)列里沒有 goroutine
    // 2. channel 是緩沖型的,但循環(huán)數(shù)組已經(jīng)裝滿了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 鎖住 channel闪盔,并發(fā)安全
    lock(&c.lock)

    // 如果 channel 關(guān)閉了
    if c.closed != 0 {
        // 解鎖
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收隊(duì)列里有 goroutine弯院,直接將要發(fā)送的數(shù)據(jù)拷貝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 對(duì)于緩沖型的 channel,如果還有緩沖空間
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 將數(shù)據(jù)從 ep 處拷貝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 發(fā)送游標(biāo)值加 1
        c.sendx++
        // 如果發(fā)送游標(biāo)值等于容量值泪掀,游標(biāo)值歸 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 緩沖區(qū)的元素?cái)?shù)量加一
        c.qcount++

        // 解鎖
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞听绳,則直接返回錯(cuò)誤
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 滿了,發(fā)送方會(huì)被阻塞异赫。接下來會(huì)構(gòu)造一個(gè) sudog

    // 獲取當(dāng)前 goroutine 的指針
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 當(dāng)前 goroutine 進(jìn)入發(fā)送等待隊(duì)列
    c.sendq.enqueue(mysg)

    // 當(dāng)前 goroutine 被掛起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 從這里開始被喚醒了(channel 有機(jī)會(huì)可以發(fā)送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被喚醒后椅挣,channel 關(guān)閉了∷坑爹啊鼠证,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上綁定的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

我們繼續(xù)往下走,G1靠抑、G2被掛起后量九,往channel中發(fā)送一個(gè)數(shù)據(jù)3,其實(shí)調(diào)用的是chansend方法,我們還是逐步的去講解

step1

如果channel=nil荠列,當(dāng)前goroutine會(huì)被掛起


if c == nil {
    if !block {
      return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }
step2

依然是一個(gè)不加鎖的快速操作类浪,三組條件

條件1:非阻塞
條件2:channel未關(guān)閉
條件3:channel是非緩沖型,并且等待接收隊(duì)列為空弯予;或者緩沖型戚宦,并且循環(huán)數(shù)組已經(jīng)滿了

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
  }
step3

加鎖个曙,如果channel已經(jīng)關(guān)閉锈嫩,直接panic

lock(&c.lock)

if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
}
step4

如果等待接收隊(duì)列不為空,說明什么垦搬?

情形1:非緩沖型呼寸,等待接收隊(duì)列不為空
情形2:緩沖型,等待接收隊(duì)列不為空(說明buf為空)

兩種情形猴贰,都是直接將待發(fā)送數(shù)據(jù)直接copy到接收處

if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)//直接從ep copy到sg
    return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if raceenabled {
    if c.dataqsiz == 0 {
      racesync(c, sg)
    } else {
      // Pretend we go through the buffer, even though
      // we copy directly. Note that we need to increment
      // the head/tail locations only when raceenabled.
      qp := chanbuf(c, c.recvx)
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
        c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
  }
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

兩種情形对雪,都直接從一個(gè)用一個(gè)goroutine操作另一個(gè)goroutine的棧,因此在sendDirect方法中會(huì)有一次寫屏障

step5

如果等待隊(duì)列為空米绕,并且緩沖區(qū)未滿瑟捣,肯定是緩沖型的channel


if c.qcount < c.dataqsiz {
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }

將元素放在sendx處,然后sendx加1栅干,channel總量加1

step6

如果以上情況都沒有命中迈套,說明什么?說明channel已經(jīng)滿了碱鳞,如果是非阻塞的直接返回桑李,否則需要調(diào)用gopack將這個(gè)goroutine掛起,等待被喚醒窿给。

if !block {
    unlock(&c.lock)
    return false
  }

  // Block on the channel. Some receiver will complete our operation for us.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  c.sendq.enqueue(mysg)
  goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
  // Ensure the value being sent is kept alive until the
  // receiver copies it out. The sudog has a pointer to the
  // stack object, but sudogs aren't considered as roots of the
  // stack tracer.
  KeepAlive(ep)

我們對(duì)照程序分析下贵白,在前一個(gè)小節(jié)G1、G2被掛起來了崩泡,等待sender的解救禁荒;這時(shí)候往ch中發(fā)送了一個(gè)3,(step4)這時(shí)sender發(fā)現(xiàn)ch的等待接收隊(duì)列recvq中有receiver角撞,就會(huì)出隊(duì)一個(gè)sudog呛伴,然后將元素直接copy到sudog的elem處,然后調(diào)用goready將G1喚醒靴寂,繼續(xù)執(zhí)行G1原來的代碼磷蜀,打印出結(jié)果。如下圖:



當(dāng)調(diào)度器光顧 G1 時(shí)百炬,將 G1 變成 running 狀態(tài)褐隆,執(zhí)行 goroutineA 接下來的代碼。G 表示其他可能有的 goroutine剖踊。

這里其實(shí)涉及到一個(gè)協(xié)程寫另一個(gè)協(xié)程棧的操作庶弃。有兩個(gè) receiver 在 channel 的一邊虎視眈眈地等著衫贬,這時(shí) channel 另一邊來了一個(gè) sender 準(zhǔn)備向 channel 發(fā)送數(shù)據(jù),為了高效歇攻,用不著通過 channel 的 buf “中轉(zhuǎn)”一次固惯,直接從源地址把數(shù)據(jù) copy 到目的地址就可以了,效率高敖墒亍葬毫!

關(guān)閉

close一個(gè)channel會(huì)調(diào)用closechan方法,比較簡(jiǎn)單屡穗,我們也來看下

func closechan(c *hchan) {
    // 關(guān)閉一個(gè) nil channel贴捡,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 上鎖
    lock(&c.lock)
    // 如果 channel 已經(jīng)關(guān)閉
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }

    // …………

    // 修改關(guān)閉狀態(tài)
    c.closed = 1

    var glist *g

    // 將 channel 所有等待接收隊(duì)列的里 sudog 釋放
    for {
        // 從接收隊(duì)列里出隊(duì)一個(gè) sudog
        sg := c.recvq.dequeue()
        // 出隊(duì)完畢,跳出循環(huán)
        if sg == nil {
            break
        }

        // 如果 elem 不為空村砂,說明此 receiver 未忽略接收數(shù)據(jù)
        // 給它賦一個(gè)相應(yīng)類型的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相連烂斋,形成鏈表
        gp.schedlink.set(glist)
        glist = gp
    }

    // 將 channel 等待發(fā)送隊(duì)列里的 sudog 釋放
    // 如果存在,這些 goroutine 將會(huì) panic
    for {
        // 從發(fā)送隊(duì)列里出隊(duì)一個(gè) sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 發(fā)送者會(huì) panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成鏈表
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍歷鏈表
    for glist != nil {
        // 取最后一個(gè)
        gp := glist
        // 向前走一步础废,下一個(gè)喚醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 喚醒相應(yīng) goroutine
        goready(gp, 3)
    }
}
step1

如果channel為nil汛骂,會(huì)直接panic

if c == nil {
    panic(plainError("close of nil channel"))
  }
step2

加鎖,如果channel已經(jīng)關(guān)閉评腺,再次關(guān)閉會(huì)panic


lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
step3

首選將hchan對(duì)應(yīng)close標(biāo)志置為1帘瞭,然后聲明一個(gè)鏈表;將等待接收隊(duì)列中的所有sudog加入到鏈表歇僧,并將其elem賦予一個(gè)相應(yīng)類型的0值图张;


c.closed = 1

  var glist gList

  // release all readers
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
step4

向所有等待發(fā)送隊(duì)列的sudog加入鏈表


// release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)
step5

喚醒sudog所有g(shù)oroutine

for !glist.empty() {
    gp := glist.pop()
    gp.schedlink = 0
    goready(gp, 3)
  }

close 邏輯比較簡(jiǎn)單,對(duì)于一個(gè) channel诈悍,recvq 和 sendq 中分別保存了阻塞的發(fā)送者和接收者祸轮。關(guān)閉 channel 后,對(duì)于等待接收者而言侥钳,會(huì)收到一個(gè)相應(yīng)類型的零值适袜。對(duì)于等待發(fā)送者,會(huì)直接 panic舷夺。所以苦酱,在不了解 channel 還有沒有接收者的情況下,不能貿(mào)然關(guān)閉 channel给猾。

close 函數(shù)先上一把大鎖疫萤,接著把所有掛在這個(gè) channel 上的 sender 和 receiver 全都連成一個(gè) sudog 鏈表,再解鎖敢伸。最后扯饶,再將所有的 sudog 全都喚醒。

喚醒之后,該干嘛干嘛尾序。sender 會(huì)繼續(xù)執(zhí)行 chansend 函數(shù)里 goparkunlock 函數(shù)之后的代碼钓丰,很不幸,檢測(cè)到 channel 已經(jīng)關(guān)閉了每币,panic携丁。receiver 則比較幸運(yùn),進(jìn)行一些掃尾工作后兰怠,返回梦鉴。這里,selected 返回 true痕慢,而返回值 received 則要根據(jù) channel 是否關(guān)閉尚揣,返回不同的值涌矢。如果 channel 關(guān)閉掖举,received 為 false,否則為 true娜庇。

總結(jié)

總結(jié)一下塔次,發(fā)生 panic 的情況有三種:

1.向一個(gè)關(guān)閉的 channel 進(jìn)行寫操作;

  1. 關(guān)閉一個(gè) nil 的 channel名秀;
  2. 重復(fù)關(guān)閉一個(gè) channel励负。

讀、寫一個(gè) nil channel 都會(huì)被阻塞匕得。

channel發(fā)送和接收元素的本質(zhì)還是值得拷貝
channel是并發(fā)安全的(加鎖)

參考:博客園-深度解密Go語(yǔ)言只channel
好未來Golang源碼系列三:Channel實(shí)現(xiàn)原理分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末继榆,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子汁掠,更是在濱河造成了極大的恐慌略吨,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件考阱,死亡現(xiàn)場(chǎng)離奇詭異翠忠,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)乞榨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門秽之,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人吃既,你說我怎么就攤上這事考榨。” “怎么了鹦倚?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵河质,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我,道長(zhǎng)愤诱,這世上最難降的妖魔是什么云头? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮淫半,結(jié)果婚禮上溃槐,老公的妹妹穿的比我還像新娘。我一直安慰自己科吭,他們只是感情好昏滴,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著对人,像睡著了一般谣殊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上牺弄,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天姻几,我揣著相機(jī)與錄音,去河邊找鬼势告。 笑死蛇捌,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的咱台。 我是一名探鬼主播络拌,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼回溺!你這毒婦竟也來了春贸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤遗遵,失蹤者是張志新(化名)和其女友劉穎萍恕,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瓮恭,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡雄坪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了屯蹦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片维哈。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖登澜,靈堂內(nèi)的尸體忽然破棺而出阔挠,到底是詐尸還是另有隱情,我是刑警寧澤脑蠕,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布购撼,位于F島的核電站跪削,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏迂求。R本人自食惡果不足惜碾盐,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望揩局。 院中可真熱鬧毫玖,春花似錦、人聲如沸凌盯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)驰怎。三九已至阐滩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間县忌,已是汗流浹背掂榔。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留芹枷,地道東北人衅疙。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像鸳慈,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子喧伞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容