是什么
sync.Cond
條件變量用來(lái)協(xié)調(diào)想要訪問(wèn)共享資源的那些 goroutine巍膘,當(dāng)共享資源的狀態(tài)發(fā)生變化的時(shí)候哺徊,它可以用來(lái)通知被互斥鎖阻塞的 goroutine构资。
sync.Cond
基于互斥鎖/讀寫(xiě)鎖芜壁,經(jīng)常用在多個(gè) goroutine 等待功茴,一個(gè) goroutine 通知(事件發(fā)生)的場(chǎng)景庐冯。
怎么用
sync.Cond 的結(jié)構(gòu)
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
sync.Cond 的四個(gè)方法
// NewCond 創(chuàng)建 Cond 實(shí)例時(shí),需要關(guān)聯(lián)一個(gè)鎖坎穿。
func NewCond(l Locker) *Cond{}
// Broadcast 喚醒所有等待條件變量 c 的 goroutine展父,無(wú)需鎖保護(hù)。
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast(){}
// Signal 喚醒一個(gè)協(xié)程
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal(){}
// Wait 等待
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait(){}
使用demo
package main
import (
"log"
"sync"
"time"
)
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "starts reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
log.Println(name, "starts writing")
time.Sleep(time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "wakes")
c.Broadcast()
//c.Signal()
}
func main() {
cond := sync.NewCond(&sync.Mutex{})
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
應(yīng)用場(chǎng)景
經(jīng)常用在多個(gè) goroutine 等待玲昧,一個(gè) goroutine 通知(事件發(fā)生)的場(chǎng)景栖茉。
比如:有一個(gè)協(xié)程在異步地接收數(shù)據(jù),剩下的多個(gè)協(xié)程必須等待這個(gè)協(xié)程接收完數(shù)據(jù)孵延,才能讀取到正確的數(shù)據(jù)
實(shí)際使用
https://github.com/panjf2000/ants.
第三方包goroutine池吕漂,ants包中pool.go文件
// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w *goWorker) {
spawnWorker := func() {
w = p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
w = p.workers.detach()
if w != nil { // first try to fetch the worker from the queue
p.lock.Unlock()
} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,
// then just spawn a new worker goroutine.
p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
if p.options.Nonblocking {
p.lock.Unlock()
return
}
retry:
if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.blockingNum++
p.cond.Wait() // block and wait for an available worker
p.blockingNum--
var nw int
if nw = p.Running(); nw == 0 { // awakened by the scavenger
p.lock.Unlock()
if !p.IsClosed() {
spawnWorker()
}
return
}
if w = p.workers.detach(); w == nil {
if nw < capacity {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
p.cond.Broadcast()
return false
}
worker.recycleTime = time.Now()
p.lock.Lock()
// To avoid memory leaks, add a double check in the lock scope.
// Issue: https://github.com/panjf2000/ants/issues/113
if p.IsClosed() {
p.lock.Unlock()
return false
}
err := p.workers.insert(worker)
if err != nil {
p.lock.Unlock()
return false
}
// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
p.cond.Signal()
p.lock.Unlock()
return true
}
參考
1、sync.Cond 條件變量
2尘应、一文讀懂 Go sync.Cond 設(shè)計(jì)
3惶凝、源碼剖析sync.Cond(條件變量的實(shí)現(xiàn)機(jī)制)