Golang Channel實現(xiàn)

Channel是Golang實現(xiàn)CSP的核心。

基于channel通信主要涉及buf(數(shù)據(jù))和sendq有额、recvq(維護阻塞的G)氮双,lock保證并發(fā)訪問安全;
本質(zhì)是一個基于環(huán)形緩存的有鎖隊列梧乘,但G的阻塞是在用戶空間;

圖片來源:https://i6448038.github.io/2019/04/11/go-channel/

目錄
新建channel
發(fā)送數(shù)據(jù)
協(xié)程直接發(fā)送數(shù)據(jù)
接收數(shù)據(jù)
協(xié)程直接接收數(shù)據(jù)
關(guān)閉channel
Select原理

新建channel

channel的運行時結(jié)構(gòu)是runtime.hchan
make chan在創(chuàng)建channel的時候會在該進程的heap區(qū)申請一塊內(nèi)存庐杨,創(chuàng)建一個hchan結(jié)構(gòu)體宋下,返回執(zhí)行該內(nèi)存的指針,所以獲取的的ch變量本身就是一個指針辑莫,在函數(shù)之間傳遞的時候是同一個channel学歧。

type hchan struct {
   qcount   uint           // total data in the queue   長度
   dataqsiz uint           // size of the circular queue 容量
   buf      unsafe.Pointer // points to an array of dataqsiz  elements 環(huán)形隊列
   elemsize uint16
   closed   uint32
   elemtype *_type // element type
   sendx    uint   // send index 環(huán)形數(shù)組的index
   recvx    uint   // receive index
   recvq    waitq  // list of recv waiters
   sendq    waitq  // list of send waiters

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

//FIFO的隊列
type waitq struct {
   first *sudog //sudog represents a g in a wait list, such as for sending/receiving on a channel.
   last  *sudog
}

發(fā)送數(shù)據(jù)

  1. 加鎖;
  2. 存在等待的接受者時各吨,直接發(fā)給接收者枝笨;
  3. 緩沖區(qū)存在剩余空間時,寫入緩沖區(qū)揭蜒;
  4. 不存在緩沖區(qū)或者滿了的情況下横浑,掛在sendq上;
  5. 被阻塞的發(fā)送者屉更,接收者會負(fù)責(zé)消息的傳輸徙融,所以被喚醒后進行收尾工作;
/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   //向nil的channel發(fā)消息會持續(xù)阻塞
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   //獲取channel的鎖
   lock(&c.lock)

   //向close的channel發(fā)消息會Panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   //已有g(shù)阻塞在接收隊列瑰谜,直接發(fā)消息欺冀,繞過channel的buf; (沒有緩沖也就是這樣了)
   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   // 沒有阻塞的
   // 沒滿,加入buf萨脑,然后返回隐轩;
   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx) //返回位置的指針
      typedmemmove(c.elemtype, qp, ep) //數(shù)據(jù)拷貝
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   // 滿了,發(fā)送方會阻塞
   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep  //發(fā)送的數(shù)據(jù)地址
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg) //當(dāng)前g+數(shù)據(jù)封裝的mysg渤早,掛在channel的發(fā)送隊列上职车;
   //當(dāng)前協(xié)程用戶態(tài)阻塞,釋放lock
   goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   // 重新恢復(fù)調(diào)度鹊杖,此時以及不需要傳輸數(shù)據(jù)了悴灵,因為數(shù)據(jù)以及被接受了,釋放資源即可骂蓖;
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}

協(xié)程直接發(fā)送數(shù)據(jù)

如果存在掛在channel的接收者時积瞒,發(fā)送者直接將數(shù)據(jù)傳輸給最早的接收者FIFO,繞過環(huán)形緩存涯竟;

send(c, sg, ep, func() { unlock(&c.lock) }, 3)

// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if sg.elem != nil { //接收者的變量
      sendDirect(c.elemtype, sg, ep)//直接拷貝過去
      sg.elem = nil
   }
   gp := sg.g
   unlockf()//拷貝完畢再釋放channel鎖赡鲜,避免多個發(fā)送者空厌;
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)//喚醒接受者
}

接收數(shù)據(jù)

  1. 加鎖庐船;
  2. channel關(guān)閉&數(shù)據(jù)為空银酬,返回零值;
  3. 如果有掛在sendq的發(fā)送者筐钟,從環(huán)形緩存拿到第一個數(shù)據(jù)揩瞪,然后幫發(fā)送者將數(shù)據(jù)寫入環(huán)形緩存的末尾;和發(fā)送時繞過緩存不同篓冲,保證消息FIFO李破,避免緩存的數(shù)據(jù)被餓死;
  4. 從環(huán)形緩存中接收數(shù)據(jù)壹将;
  5. 數(shù)據(jù)為空嗤攻,掛在recvq上;被喚醒诽俯,收尾工作妇菱;
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // 向nil發(fā)消息普通會阻塞,select直接返回暴区;
   if c == nil {
      if !block {
         return
      }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }


   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 獲取channel的鎖
   lock(&c.lock)

   // case1:channel關(guān)閉&數(shù)據(jù)為空闯团,清空ep->拿到零值,返回仙粱;
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }
    
   // channel關(guān)閉&數(shù)據(jù)不為空   ||  channel沒關(guān)閉

   // channel已滿的情況房交,直接接收阻塞的發(fā)送者消息,繞過channel伐割;
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }

//有數(shù)據(jù)
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx) //位置
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp) //數(shù)據(jù)copy
      }
      typedmemclr(c.elemtype, qp)//清楚buf的數(shù)據(jù)
      c.recvx++ //更改位置
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }

//沒數(shù)據(jù)
   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg) //封裝mysg信息候味,阻塞在recvq隊列;
   //讓出調(diào)度
   goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
   //恢復(fù)調(diào)度隔心,此時已經(jīng)接受了數(shù)據(jù)负溪,做收尾工作。
   // someone woke us up
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil //沒關(guān)閉會賦值mysg的地址
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}

協(xié)程直接接收數(shù)據(jù)

對于帶緩沖的channel济炎,此處接收者和發(fā)送者并沒有直接數(shù)據(jù)傳輸川抡。

recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if c.dataqsiz == 0 {
      if ep != nil {
         // copy data from sender
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // Queue is full. Take the item at the
      // head of the queue. Make the sender enqueue
      // its item at the tail of the queue. Since the
      // queue is full, those are both the same slot.
      //接收先拿buf的數(shù)據(jù),然后將發(fā)送者的數(shù)據(jù)放到buf中须尚。
         //避免數(shù)據(jù)buf的數(shù)據(jù)被餓死崖堤;發(fā)的時候不用,因為buf是空的耐床。
      qp := chanbuf(c, c.recvx)
      // copy data from queue to receiver
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // copy data from sender to queue
      typedmemmove(c.elemtype, qp, sg.elem)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   sg.elem = nil
   gp := sg.g
   unlockf() //釋放鎖
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1) //喚醒發(fā)送者
}

關(guān)閉channel

主要是處理channel的recvq和sendq隊列:recvq會拿到零值密幔,sendq中的G都是在關(guān)閉之前阻塞的;

//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
   closechan(c)
}

func closechan(c *hchan) {
    // 關(guān)閉nil撩轰,panic
   if c == nil {
      panic(plainError("close of nil channel"))
   }

   // 加鎖
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("close of closed channel")) //重復(fù)關(guān)閉
   }

   c.closed = 1

   var glist gList

   // release all readers
    //如果有recvq胯甩,此時的buf肯定是空的昧廷,相當(dāng)于給零值然后喚醒;
   for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil //此時才為nil,被喚醒的g就知道是否關(guān)閉了偎箫。
      glist.push(gp)
   }

   // release all writers (they will panic)
   // 如果有sendq木柬,
   for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
      }
      sg.elem = nil 
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      glist.push(gp)
   }

   //釋放鎖
   unlock(&c.lock)

   // Ready all Gs now that we've dropped the channel lock. 喚醒
   for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

Select原理

  • 特點
    1. 可以在channel上進行非阻塞的收發(fā)操作;
    2. 遇到多個channel同時響應(yīng)時淹办,隨機選擇case執(zhí)行眉枕,避免饑餓;
  • 實現(xiàn) https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
    1. 隨機生成一個遍歷的輪詢順序 pollOrder 并根據(jù) Channel 地址生成鎖定順序 lockOrder怜森;
    2. 根據(jù) pollOrder 遍歷所有的 case 查看是否有可以立刻處理的 Channel速挑;
      1. 如果存在就直接獲取 case 對應(yīng)的索引并返回;
      2. 如果不存在就會創(chuàng)建 runtime.sudog 結(jié)構(gòu)體副硅,將當(dāng)前 Goroutine 加入到所有相關(guān) Channel 的收發(fā)隊列姥宝,并調(diào)用 runtime.gopark 掛起當(dāng)前 Goroutine 等待調(diào)度器的喚醒;
    3. 當(dāng)調(diào)度器喚醒當(dāng)前 Goroutine 時就會再次按照 lockOrder 遍歷所有的 case恐疲,從中查找需要被處理的 runtime.sudog 結(jié)構(gòu)對應(yīng)的索引腊满;
// compiler implements
//
// select {
// case v = <-c:
//    ... foo
// default:
//    ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
//    ... foo
// } else {
//    ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
   selected, _ = chanrecv(c, elem, false)  //非阻塞
   return
}

資料
圖解Go的channel底層實現(xiàn)
深入理解Golang Channel
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市流纹,隨后出現(xiàn)的幾起案子糜烹,更是在濱河造成了極大的恐慌,老刑警劉巖漱凝,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疮蹦,死亡現(xiàn)場離奇詭異,居然都是意外死亡茸炒,警方通過查閱死者的電腦和手機愕乎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來壁公,“玉大人感论,你說我怎么就攤上這事∥刹幔” “怎么了比肄?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長囊陡。 經(jīng)常有香客問我芳绩,道長,這世上最難降的妖魔是什么撞反? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任妥色,我火速辦了婚禮,結(jié)果婚禮上遏片,老公的妹妹穿的比我還像新娘嘹害。我一直安慰自己撮竿,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布笔呀。 她就那樣靜靜地躺著幢踏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪凿可。 梳的紋絲不亂的頭發(fā)上惑折,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天授账,我揣著相機與錄音枯跑,去河邊找鬼。 笑死白热,一個胖子當(dāng)著我的面吹牛敛助,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播屋确,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼纳击,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了攻臀?” 一聲冷哼從身側(cè)響起焕数,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎刨啸,沒想到半個月后堡赔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡设联,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年善已,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片离例。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡换团,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出宫蛆,到底是詐尸還是另有隱情艘包,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布耀盗,位于F島的核電站想虎,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏袍冷。R本人自食惡果不足惜磷醋,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望胡诗。 院中可真熱鬧邓线,春花似錦淌友、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至你雌,卻和暖如春器联,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背婿崭。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工拨拓, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人氓栈。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓渣磷,卻偏偏與公主長得像,于是被迫代替她去往敵國和親授瘦。 傳聞我的和親對象是個殘疾皇子醋界,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345