channel 的實(shí)現(xiàn)

1. 數(shù)據(jù)對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu)

runtime.chan.go

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

2.channel 創(chuàng)建

通過(guò)make 創(chuàng)建channel ,在編譯階段午衰,對(duì)make 進(jìn)行類型檢查和展開亚兄,最終調(diào)用runtime.makechan()

func walkexpr(n *Node, init *Nodes) *Node {
    switch n.Op {
    case OMAKECHAN:
        size := n.Left
        fnname := "makechan64"
        argtype := types.Types[TINT64]

        if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
            fnname = "makechan"
            argtype = types.Types[TINT]
        }
        n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
    }
}

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    mem, _ := math.MulUintptr(elem.size, uintptr(size))

    var c *hchan
    switch {
    case mem == 0:
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

3. 發(fā)送數(shù)據(jù)過(guò)程分析

使用 ch <- i 發(fā)送數(shù)據(jù),通過(guò)編譯器 最終執(zhí)行 runtime.chansend()

  • 直接發(fā)送數(shù)據(jù)
    如果目標(biāo) Channel 沒有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine包蓝,那么 runtime.chansend會(huì)從接收隊(duì)列 recvq 中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù)
    if c.closed != 0 { //如果已經(jīng)關(guān)閉迎捺,仍然向chan 發(fā)送數(shù)據(jù),則拋異常
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
       //已經(jīng)存在接收者箕别,則直接copy 數(shù)據(jù)且喚醒
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
  • 具有緩存隊(duì)列
    緩存區(qū)未滿時(shí),將要發(fā)送的數(shù)據(jù),加入緩存循環(huán)隊(duì)列
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
  • 阻塞發(fā)送
    如果當(dāng)前既沒有接收者且循環(huán)隊(duì)列已滿或不存在钢坦,則 發(fā)送進(jìn)入阻塞狀態(tài)
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

4.接收數(shù)據(jù)過(guò)程分析

i <- ch
i, ok <- ch
經(jīng)過(guò)編譯器的檢查和類型展開究孕,最終調(diào)用runtime. chanrecv()

當(dāng)我們從一個(gè)空 Channel 接收數(shù)據(jù)時(shí)會(huì)直接調(diào)用 runtime.gopark讓出處理器的使用權(quán);
如果當(dāng)前 Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何數(shù)據(jù)啥酱,那么會(huì)清除 ep 指針中的數(shù)據(jù)并立刻返回;

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
  • 直接接收
     //出現(xiàn)在2種場(chǎng)景下:1. 沒有緩存隊(duì)列 2.緩存隊(duì)列已經(jīng)滿了
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }


// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
//    and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
//    written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 { //沒有緩存隊(duì)列的場(chǎng)景
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // copy data from sender
            recvDirect(c.elemtype, sg, ep)
        }
    } else {// 緩存隊(duì)列已經(jīng)滿了
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}
  • 緩存區(qū)接收
    ...
    if c.qcount > 0 {//緩存區(qū)有數(shù)據(jù)且緩存區(qū)未滿時(shí)
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    ...
}

1.當(dāng) Channel 的緩沖區(qū)中已經(jīng)包含數(shù)據(jù)時(shí)爹凹,從 Channel 中接收數(shù)據(jù)會(huì)直接從緩沖區(qū)中 recvx 的索引位置中取出數(shù)據(jù)進(jìn)行處理
2.如果接收數(shù)據(jù)的內(nèi)存地址不為空,那么會(huì)使用 runtime.typedmemmove 將緩沖區(qū)中的數(shù)據(jù)拷貝到內(nèi)存中镶殷、清除隊(duì)列中的數(shù)據(jù)并完成收尾工作禾酱。
3.收尾工作包括遞增 recvx,一旦發(fā)現(xiàn)索引超過(guò)了 Channel 的容量時(shí),會(huì)將它歸零重置循環(huán)隊(duì)列的索引颤陶;除此之外颗管,該函數(shù)還會(huì)減少 qcount 計(jì)數(shù)器并釋放持有 Channel 的鎖

  • 阻塞接收
    當(dāng) Channel 的發(fā)送隊(duì)列中不存在等待的 Goroutine 并且緩沖區(qū)中也不存在任何數(shù)據(jù)時(shí),從管道中接收數(shù)據(jù)的操作會(huì)變成阻塞的
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ...
    if !block {
        unlock(&c.lock)
        return false, false
    }

    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

    gp.waiting = nil
    closed := gp.param == nil
    gp.param = nil
    releaseSudog(mysg)
    return true, !closed
}

5. 關(guān)閉流程分析

編譯器會(huì)將用于關(guān)閉管道的 close 關(guān)鍵字轉(zhuǎn)換成 OCLOSE 節(jié)點(diǎn)以及 runtime.closechan 函數(shù)滓走。

當(dāng) Channel 是一個(gè)空指針或者已經(jīng)被關(guān)閉時(shí)垦江,Go 語(yǔ)言運(yùn)行時(shí)都會(huì)直接崩潰并拋出異常:

func closechan(c *hchan) {
    if c == nil { //為空時(shí) 關(guān)閉 拋異常
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 { //已經(jīng)關(guān)閉,再次關(guān)閉拋異常
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

    c.closed = 1

    var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
       .....
        // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末搅方,一起剝皮案震驚了整個(gè)濱河市比吭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌姨涡,老刑警劉巖衩藤,帶你破解...
    沈念sama閱讀 210,835評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異涛漂,居然都是意外死亡赏表,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,900評(píng)論 2 383
  • 文/潘曉璐 我一進(jìn)店門匈仗,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)瓢剿,“玉大人,你說(shuō)我怎么就攤上這事悠轩“涎。” “怎么了?”我有些...
    開封第一講書人閱讀 156,481評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵哗蜈,是天一觀的道長(zhǎng)前标。 經(jīng)常有香客問(wèn)我,道長(zhǎng)距潘,這世上最難降的妖魔是什么炼列? 我笑而不...
    開封第一講書人閱讀 56,303評(píng)論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮音比,結(jié)果婚禮上俭尖,老公的妹妹穿的比我還像新娘。我一直安慰自己洞翩,他們只是感情好稽犁,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,375評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著骚亿,像睡著了一般已亥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上来屠,一...
    開封第一講書人閱讀 49,729評(píng)論 1 289
  • 那天虑椎,我揣著相機(jī)與錄音震鹉,去河邊找鬼。 笑死捆姜,一個(gè)胖子當(dāng)著我的面吹牛传趾,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播泥技,決...
    沈念sama閱讀 38,877評(píng)論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼浆兰,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了珊豹?” 一聲冷哼從身側(cè)響起镊讼,我...
    開封第一講書人閱讀 37,633評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎平夜,沒想到半個(gè)月后蝶棋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,088評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡忽妒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,443評(píng)論 2 326
  • 正文 我和宋清朗相戀三年玩裙,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片段直。...
    茶點(diǎn)故事閱讀 38,563評(píng)論 1 339
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡吃溅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鸯檬,到底是詐尸還是另有隱情决侈,我是刑警寧澤,帶...
    沈念sama閱讀 34,251評(píng)論 4 328
  • 正文 年R本政府宣布喧务,位于F島的核電站赖歌,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏功茴。R本人自食惡果不足惜庐冯,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,827評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望坎穿。 院中可真熱鬧展父,春花似錦、人聲如沸玲昧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,712評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)孵延。三九已至吕漂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間隙袁,已是汗流浹背痰娱。 一陣腳步聲響...
    開封第一講書人閱讀 31,943評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工弃榨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留菩收,地道東北人梨睁。 一個(gè)月前我還...
    沈念sama閱讀 46,240評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像娜饵,于是被迫代替她去往敵國(guó)和親坡贺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,435評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容