Channel是Golang實現(xiàn)CSP的核心。
基于channel通信主要涉及buf(數(shù)據(jù))和sendq有额、recvq(維護阻塞的G)氮双,lock保證并發(fā)訪問安全;
本質(zhì)是一個基于環(huán)形緩存的有鎖隊列梧乘,但G的阻塞是在用戶空間;
圖片來源:https://i6448038.github.io/2019/04/11/go-channel/
目錄
新建channel
發(fā)送數(shù)據(jù)
協(xié)程直接發(fā)送數(shù)據(jù)
接收數(shù)據(jù)
協(xié)程直接接收數(shù)據(jù)
關(guān)閉channel
Select原理
新建channel
channel的運行時結(jié)構(gòu)是runtime.hchan
make chan在創(chuàng)建channel的時候會在該進程的heap區(qū)申請一塊內(nèi)存庐杨,創(chuàng)建一個hchan結(jié)構(gòu)體宋下,返回執(zhí)行該內(nèi)存的指針,所以獲取的的ch變量本身就是一個指針辑莫,在函數(shù)之間傳遞的時候是同一個channel学歧。
type hchan struct {
qcount uint // total data in the queue 長度
dataqsiz uint // size of the circular queue 容量
buf unsafe.Pointer // points to an array of dataqsiz elements 環(huán)形隊列
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index 環(huán)形數(shù)組的index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
//FIFO的隊列
type waitq struct {
first *sudog //sudog represents a g in a wait list, such as for sending/receiving on a channel.
last *sudog
}
發(fā)送數(shù)據(jù)
- 加鎖;
- 存在等待的接受者時各吨,直接發(fā)給接收者枝笨;
- 緩沖區(qū)存在剩余空間時,寫入緩沖區(qū)揭蜒;
- 不存在緩沖區(qū)或者滿了的情況下横浑,掛在sendq上;
- 被阻塞的發(fā)送者屉更,接收者會負(fù)責(zé)消息的傳輸徙融,所以被喚醒后進行收尾工作;
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//向nil的channel發(fā)消息會持續(xù)阻塞
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//獲取channel的鎖
lock(&c.lock)
//向close的channel發(fā)消息會Panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//已有g(shù)阻塞在接收隊列瑰谜,直接發(fā)消息欺冀,繞過channel的buf; (沒有緩沖也就是這樣了)
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 沒有阻塞的
// 沒滿,加入buf萨脑,然后返回隐轩;
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) //返回位置的指針
typedmemmove(c.elemtype, qp, ep) //數(shù)據(jù)拷貝
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 滿了,發(fā)送方會阻塞
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep //發(fā)送的數(shù)據(jù)地址
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) //當(dāng)前g+數(shù)據(jù)封裝的mysg渤早,掛在channel的發(fā)送隊列上职车;
//當(dāng)前協(xié)程用戶態(tài)阻塞,釋放lock
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
// 重新恢復(fù)調(diào)度鹊杖,此時以及不需要傳輸數(shù)據(jù)了悴灵,因為數(shù)據(jù)以及被接受了,釋放資源即可骂蓖;
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
協(xié)程直接發(fā)送數(shù)據(jù)
如果存在掛在channel的接收者時积瞒,發(fā)送者直接將數(shù)據(jù)傳輸給最早的接收者FIFO,繞過環(huán)形緩存涯竟;
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil { //接收者的變量
sendDirect(c.elemtype, sg, ep)//直接拷貝過去
sg.elem = nil
}
gp := sg.g
unlockf()//拷貝完畢再釋放channel鎖赡鲜,避免多個發(fā)送者空厌;
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)//喚醒接受者
}
接收數(shù)據(jù)
- 加鎖庐船;
- channel關(guān)閉&數(shù)據(jù)為空银酬,返回零值;
- 如果有掛在sendq的發(fā)送者筐钟,從環(huán)形緩存拿到第一個數(shù)據(jù)揩瞪,然后幫發(fā)送者將數(shù)據(jù)寫入環(huán)形緩存的末尾;和發(fā)送時繞過緩存不同篓冲,保證消息FIFO李破,避免緩存的數(shù)據(jù)被餓死;
- 從環(huán)形緩存中接收數(shù)據(jù)壹将;
- 數(shù)據(jù)為空嗤攻,掛在recvq上;被喚醒诽俯,收尾工作妇菱;
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 向nil發(fā)消息普通會阻塞,select直接返回暴区;
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 獲取channel的鎖
lock(&c.lock)
// case1:channel關(guān)閉&數(shù)據(jù)為空闯团,清空ep->拿到零值,返回仙粱;
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// channel關(guān)閉&數(shù)據(jù)不為空 || channel沒關(guān)閉
// channel已滿的情況房交,直接接收阻塞的發(fā)送者消息,繞過channel伐割;
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//有數(shù)據(jù)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) //位置
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //數(shù)據(jù)copy
}
typedmemclr(c.elemtype, qp)//清楚buf的數(shù)據(jù)
c.recvx++ //更改位置
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//沒數(shù)據(jù)
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) //封裝mysg信息候味,阻塞在recvq隊列;
//讓出調(diào)度
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
//恢復(fù)調(diào)度隔心,此時已經(jīng)接受了數(shù)據(jù)负溪,做收尾工作。
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil //沒關(guān)閉會賦值mysg的地址
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
協(xié)程直接接收數(shù)據(jù)
對于帶緩沖的channel济炎,此處接收者和發(fā)送者并沒有直接數(shù)據(jù)傳輸川抡。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// recv processes a receive operation on a full channel c.
// There are 2 parts:
// 1) The value sent by the sender sg is put into the channel
// and the sender is woken up to go on its merry way.
// 2) The value received by the receiver (the current G) is
// written to ep.
// For synchronous channels, both values are the same.
// For asynchronous channels, the receiver gets its data from
// the channel buffer and the sender's data is put in the
// channel buffer.
// Channel c must be full and locked. recv unlocks c with unlockf.
// sg must already be dequeued from c.
// A non-nil ep must point to the heap or the caller's stack.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
//接收先拿buf的數(shù)據(jù),然后將發(fā)送者的數(shù)據(jù)放到buf中须尚。
//避免數(shù)據(jù)buf的數(shù)據(jù)被餓死崖堤;發(fā)的時候不用,因為buf是空的耐床。
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf() //釋放鎖
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) //喚醒發(fā)送者
}
關(guān)閉channel
主要是處理channel的recvq和sendq隊列:recvq會拿到零值密幔,sendq中的G都是在關(guān)閉之前阻塞的;
//go:linkname reflect_chanclose reflect.chanclose
func reflect_chanclose(c *hchan) {
closechan(c)
}
func closechan(c *hchan) {
// 關(guān)閉nil撩轰,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) //重復(fù)關(guān)閉
}
c.closed = 1
var glist gList
// release all readers
//如果有recvq胯甩,此時的buf肯定是空的昧廷,相當(dāng)于給零值然后喚醒;
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil //此時才為nil,被喚醒的g就知道是否關(guān)閉了偎箫。
glist.push(gp)
}
// release all writers (they will panic)
// 如果有sendq木柬,
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
//釋放鎖
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock. 喚醒
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
Select原理
- 特點
- 可以在channel上進行非阻塞的收發(fā)操作;
- 遇到多個channel同時響應(yīng)時淹办,隨機選擇case執(zhí)行眉枕,避免饑餓;
- 實現(xiàn) https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
- 隨機生成一個遍歷的輪詢順序
pollOrder
并根據(jù) Channel 地址生成鎖定順序lockOrder
怜森; - 根據(jù)
pollOrder
遍歷所有的case
查看是否有可以立刻處理的 Channel速挑;- 如果存在就直接獲取
case
對應(yīng)的索引并返回; - 如果不存在就會創(chuàng)建
runtime.sudog
結(jié)構(gòu)體副硅,將當(dāng)前 Goroutine 加入到所有相關(guān) Channel 的收發(fā)隊列姥宝,并調(diào)用runtime.gopark
掛起當(dāng)前 Goroutine 等待調(diào)度器的喚醒;
- 如果存在就直接獲取
- 當(dāng)調(diào)度器喚醒當(dāng)前 Goroutine 時就會再次按照
lockOrder
遍歷所有的case
恐疲,從中查找需要被處理的runtime.sudog
結(jié)構(gòu)對應(yīng)的索引腊满;
- 隨機生成一個遍歷的輪詢順序
// compiler implements
//
// select {
// case v = <-c:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbrecv(&v, c) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false) //非阻塞
return
}
資料:
圖解Go的channel底層實現(xiàn)
深入理解Golang Channel
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-channel/#64-channel