本文從上下文Context句占、同步原語與鎖岛杀、Channel、調(diào)度器四個方面介紹Go語言是如何實(shí)現(xiàn)并發(fā)的壶唤。本文絕大部分內(nèi)容是從go并發(fā)編程系列文章學(xué)習(xí)總結(jié)而來雳灵。
上下文Context
上下文context.Context是用來設(shè)置截止日期、同步信號视粮,傳遞請求相關(guān)值的結(jié)構(gòu)體细办。上下文與Gorutine有比較密切的關(guān)系。context.Context是Go語言中獨(dú)特的設(shè)計(java中至少沒有類似設(shè)計)蕾殴。context.Context是Go語言在1.7版本中引入標(biāo)準(zhǔn)庫的接口笑撞,該接口定義了四個需要實(shí)現(xiàn)的方法,如下:
type Context interface {
Deadline() (deadline time.Time, ok bool) //返回context.Context被取消的時間钓觉,也就是完成工作的截止日期茴肥;
Done() <-chan struct{} //返回一個Channel,這個Channel會在當(dāng)前工作完成或者上下文被取消之后關(guān)閉荡灾,多次調(diào)用Done方法返回同一個Channel瓤狐;
Err() error //返回context.Context結(jié)束的原因,它只會在Done返回的Channel被關(guān)閉時才會返回非空的值:a)如果context.Context被取消批幌,會返回Canceled錯誤础锐;b)如果context.Context超時如贷,會返回DeadlineExceeded錯誤婿屹。
Value(key interface{}) interface{} //從context.Context中獲取鍵對應(yīng)的值
}
context包中提供的 context.Background
、context.TODO
溢十、context.WithDeadline
和 context.WithValue
函數(shù)會返回實(shí)現(xiàn)該接口的私有結(jié)構(gòu)體
設(shè)計原理
使用 Context 同步信號
我們可以通過一個代碼片段了解 context.Context
是如何對信號進(jìn)行同步的截粗。在這段代碼中信姓,我們創(chuàng)建了一個過期時間為 1s 的上下文,并向上下文傳入 handle
函數(shù)绸罗,該方法會使用 500ms 的時間處理傳入的『請求』:
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) //1秒之后會往ctx通道推入一個錯誤信息
defer cancel()
go handle(ctx, 500*time.Millisecond)
select {
case <-ctx.Done(): // ctx.Done()返回通道意推,等待通道信息
fmt.Println("main", ctx.Err())
}
}
func handle(ctx context.Context, duration time.Duration) {
select {
case <-ctx.Done():
fmt.Println("handle", ctx.Err())
case <-time.After(duration):
fmt.Println("process request with", duration)
}
}
因?yàn)檫^期時間大于處理時間,所以我們有足夠的時間處理該『請求』珊蟀,運(yùn)行上述代碼會打印出如下所示的內(nèi)容:
$ go run context.go
process request with 500ms
main context deadline exceeded
handle
函數(shù)沒有進(jìn)入超時的 select
分支菊值,但是 main
函數(shù)的 select
卻會等待 context.Context
的超時并打印出 main context deadline exceeded
。
取消信號
context.WithCancel
函數(shù)能夠從 context.Context
中衍生出一個新的子上下文并返回用于取消該上下文的函數(shù)(CancelFunc)。一旦我們執(zhí)行返回的取消函數(shù)俊性,當(dāng)前上下文以及它的子上下文都會被取消略步,所有的 Goroutine 都會同步收到這一取消信號。
我們直接從 context.WithCancel
函數(shù)的實(shí)現(xiàn)來看它到底做了什么:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
-
context.newCancelCtx
將傳入的上下文包裝成私有結(jié)構(gòu)體context.cancelCtx
定页; -
context.propagateCancel
會構(gòu)建父子上下文之間的關(guān)聯(lián)趟薄,當(dāng)父上下文被取消時,子上下文也會被取消:
其中cancaler是一個接口典徊,ontext.cancelCtx
實(shí)現(xiàn)了對應(yīng)的方法
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil { //當(dāng) parent.Done() == nil杭煎,也就是 parent 不會觸發(fā)取消事件時,當(dāng)前函數(shù)會直接返回卒落;
return
}
select {
case <-done:
child.cancel(false, parent.Err()) // 父上下文已經(jīng)被取消
return
default:
}
if p, ok := parentCancelCtx(parent); ok { //判斷parent是否是帶cancel的上下文
p.mu.Lock()
if p.err != nil { //如果已經(jīng)被取消羡铲,child 會立刻被取消;
child.cancel(false, p.err)
} else {
p.children[child] = struct{}{} //如果沒有被取消儡毕,child 會被加入 parent 的 children 列表中也切,等待 parent 釋放取消信號
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
context.propagateCancel
的作用是在 parent
和 child
之間同步取消和結(jié)束的信號,保證在 parent
被取消時腰湾,child
也會收到對應(yīng)的信號雷恃,不會發(fā)生狀態(tài)不一致的問題。
context.cancelCtx
實(shí)現(xiàn)的幾個接口方法也沒有太多值得分析的地方费坊,該結(jié)構(gòu)體最重要的方法是 cancel
倒槐,這個方法會關(guān)閉上下文中的 Channel 并向所有的子上下文同步取消信號:
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
除了 context.WithCancel
之外,context
包中的另外兩個函數(shù) context.WithDeadline
和 context.WithTimeout
也都能創(chuàng)建可以被取消的計時器上下文 context.timerCtx
:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // 已經(jīng)過了截止日期
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
context.WithDeadline
方法在創(chuàng)建 context.timerCtx
的過程中附井,判斷了父上下文的截止日期與當(dāng)前日期讨越,并通過 time.AfterFunc
創(chuàng)建定時器,當(dāng)時間超過了截止日期后會調(diào)用 context.timerCtx.cancel
方法同步取消信號永毅。
context.timerCtx
結(jié)構(gòu)體內(nèi)部不僅通過嵌入了context.cancelCtx
結(jié)構(gòu)體繼承了相關(guān)的變量和方法把跨,還通過持有的定時器 timer
和截止時間 deadline
實(shí)現(xiàn)了定時取消這一功能:
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
context.timerCtx.cancel
方法不僅調(diào)用了 context.cancelCtx.cancel
,還會停止持有的定時器減少不必要的資源浪費(fèi)沼死。
同步原語與鎖
Go 語言作為一個原生支持用戶態(tài)進(jìn)程(Goroutine)的語言着逐,當(dāng)提到并發(fā)編程、多線程編程時漫雕,往往都離不開鎖這一概念。鎖是一種并發(fā)編程中的同步原語(Synchronization Primitives)峰鄙,它能保證多個 Goroutine 在訪問同一片內(nèi)存時不會出現(xiàn)競爭條件(Race condition)等問題浸间。
Go 語言在 sync
包中提供了用于同步的一些基本原語,包括常見的 sync.Mutex
吟榴、sync.RWMutex
魁蒜、sync.WaitGroup
、sync.Once
和 sync.Cond
:
這些基本原語提高了較為基礎(chǔ)的同步功能,但是它們是一種相對原始的同步機(jī)制兜看,在多數(shù)情況下锥咸,我們都應(yīng)該使用抽象層級的更高的 Channel 實(shí)現(xiàn)同步。
Mutex
Go 語言的 sync.Mutex
由兩個字段 state
和 sema
組成细移。其中 state
表示當(dāng)前互斥鎖的狀態(tài)搏予,而 sema
是用于控制鎖狀態(tài)的信號量。
type Mutex struct {
state int32
sema uint32
}
上述兩個加起來只占 8 字節(jié)空間的結(jié)構(gòu)體表示了 Go 語言中的互斥鎖弧轧,非常輕量級雪侥。
狀態(tài)
互斥鎖的狀態(tài)比較復(fù)雜,如下圖所示精绎,最低三位分別表示 mutexLocked
速缨、mutexWoken
和 mutexStarving
,剩下的位置用來表示當(dāng)前有多少個 Goroutine 等待互斥鎖的釋放:
在默認(rèn)情況下代乃,互斥鎖的所有狀態(tài)位都是
0
旬牲,int32
中的不同位分別表示了不同的狀態(tài):
-
mutexLocked
— 表示互斥鎖的鎖定狀態(tài); -
mutexWoken
— 表示從正常模式被從喚醒搁吓; -
mutexStarving
— 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài)原茅; -
waitersCount
— 當(dāng)前互斥鎖上等待的 Goroutine 個數(shù);
正常模式和饑餓模式
sync.Mutex
有兩種模式 — 正常模式和饑餓模式擎浴。我們需要在這里先了解正常模式和饑餓模式都是什么员咽,它們有什么樣的關(guān)系。
在正常模式下贮预,鎖的等待者會按照先進(jìn)先出的順序獲取鎖贝室。但是剛被喚起的 Goroutine 與新創(chuàng)建的 Goroutine 競爭時,大概率會獲取不到鎖仿吞,為了減少這種情況的出現(xiàn)滑频,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當(dāng)前互斥鎖切換饑餓模式唤冈,防止部分 Goroutine 被『餓死』峡迷。
饑餓模式是在 Go 語言 1.9 版本引入的優(yōu)化,引入的目的是保證互斥鎖的公平性(Fairness)你虹。
在饑餓模式中绘搞,互斥鎖會直接交給等待隊(duì)列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖傅物、也不會進(jìn)入自旋狀態(tài)夯辖,它們只會在隊(duì)列的末尾等待。如果一個 Goroutine 獲得了互斥鎖并且它在隊(duì)列的末尾或者它等待的時間少于 1ms董饰,那么當(dāng)前的互斥鎖就會被切換回正常模式蒿褂。
相比于饑餓模式圆米,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時啄栓。
加鎖和解鎖
互斥鎖的加鎖和解鎖過程使用 sync.Mutex.Lock
和 sync.Mutex.Unlock
方法娄帖。
互斥鎖的加鎖是靠 sync.Mutex.Lock
完成的,最新的 Go 語言源代碼中已經(jīng)將 sync.Mutex.Lock
方法進(jìn)行了簡化昙楚,方法的主干只保留最常見近速、簡單的情況 — 當(dāng)鎖的狀態(tài)是 0 時,將 mutexLocked
位置成 1:
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}
如果互斥鎖的狀態(tài)不是 0 時就會調(diào)用 sync.Mutex.lockSlow
嘗試通過自旋(Spinnig)等方式等待鎖的釋放桂肌,該方法的主體是一個非常大 for 循環(huán)数焊,這里將該方法分成幾個部分介紹獲取鎖的過程:
- 判斷當(dāng)前 Goroutine 能否進(jìn)入自旋;
- 通過自旋等待互斥鎖的釋放崎场;
- 計算互斥鎖的最新狀態(tài)佩耳;
- 更新互斥鎖的狀態(tài)并獲取鎖;
我們先來介紹互斥鎖是如何判斷當(dāng)前 Goroutine 能否進(jìn)入自旋等互斥鎖的釋放:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
//互斥鎖只有在普通模式才能進(jìn)入自旋谭跨。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { //runtime_canSpin為true條件:1)運(yùn)行在多 CPU 的機(jī)器上干厚;2)當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次;3)當(dāng)前機(jī)器上至少存在一個正在運(yùn)行的處理器 P 并且處理的運(yùn)行隊(duì)列為空螃宙;
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // 自旋蛮瞄,該指令只會占用 CPU 并消耗 CPU 時間。
iter++
old = m.state
continue
}
......
自旋是一種多線程同步機(jī)制谆扎,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會一直保持 CPU 的占用挂捅,持續(xù)檢查某個條件是否為真。在多核的 CPU 上堂湖,自旋可以避免 Goroutine 的切換闲先,使用恰當(dāng)會對性能帶來很大的增益,但是使用的不恰當(dāng)就會拖慢整個程序无蜂。
一旦當(dāng)前 Goroutine 能夠進(jìn)入自旋就會調(diào)用sync.runtime_doSpin
和 runtime.procyield
并執(zhí)行 30 次的 PAUSE
指令伺糠,該指令只會占用 CPU 并消耗 CPU 時間。
處理了自旋相關(guān)的特殊邏輯之后斥季,互斥鎖會根據(jù)上下文計算當(dāng)前互斥鎖最新的狀態(tài)训桶。幾個不同的條件分別會更新 state
字段中存儲的不同信息 — mutexLocked
、mutexStarving
酣倾、mutexWoken
和 mutexWaiterShift
舵揭。計算了新的互斥鎖狀態(tài)之后,就會使用 CAS 函數(shù)更新狀態(tài)躁锡。
如果我們沒有通過 CAS 獲得鎖午绳,會調(diào)用 sync.runtime_SemacquireMutex
使用信號量(關(guān)于信號量實(shí)現(xiàn)進(jìn)程/線程之間通信原理可以自行搜索,此處不深入講解)保證資源不會被兩個 Goroutine 獲取稚铣。sync.runtime_SemacquireMutex
會在方法中不斷調(diào)用嘗試獲取鎖并休眠當(dāng)前 Goroutine 等待信號量的釋放箱叁,一旦當(dāng)前 Goroutine 可以獲取信號量,它就會立刻返回惕医,sync.Mutex.Lock
方法的剩余代碼也會繼續(xù)執(zhí)行耕漱。
- 在正常模式下,這段代碼會設(shè)置喚醒和饑餓標(biāo)記抬伺、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán)螟够;
- 在饑餓模式下,當(dāng)前 Goroutine 會獲得互斥鎖峡钓,如果等待隊(duì)列中只存在當(dāng)前 Goroutine妓笙,互斥鎖還會從饑餓模式中退出;
互斥鎖的解鎖過程 sync.Mutex.Unlock
與加鎖過程相比就很簡單能岩,該過程會先使用 AddInt32
函數(shù)快速解鎖寞宫,這時會發(fā)生下面的兩種情況:
- 如果該函數(shù)返回的新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖拉鹃;
- 如果該函數(shù)返回的新狀態(tài)不等于 0辈赋,這段代碼會調(diào)用
sync.Mutex.unlockSlow
方法開始慢速解鎖:
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
sync.Mutex.unlockSlow
方法首先會校驗(yàn)鎖狀態(tài)的合法性 — 如果當(dāng)前互斥鎖已經(jīng)被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex
中止當(dāng)前程序。
在正常情況下會根據(jù)當(dāng)前互斥鎖的狀態(tài)膏燕,分別處理正常模式和饑餓模式下的互斥鎖:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 { // 正常模式
old := new
for {
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 饑餓模式
runtime_Semrelease(&m.sema, true, 1)
}
}
- 在正常模式下钥屈,這段代碼會分別處理以下兩種情況處理;
- 如果互斥鎖不存在等待者或者互斥鎖的
mutexLocked
坝辫、mutexStarving
篷就、mutexWoken
狀態(tài)不都為 0,那么當(dāng)前方法就可以直接返回近忙,不需要喚醒其他等待者竭业; - 如果互斥鎖存在等待者,會通過
sync.runtime_Semrelease
喚醒等待者并移交鎖的所有權(quán)银锻;
- 如果互斥鎖不存在等待者或者互斥鎖的
- 在饑餓模式下永品,上述代碼會直接調(diào)用
sync.runtime_Semrelease
方法將當(dāng)前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖击纬,在這時互斥鎖還不會退出饑餓狀態(tài)鼎姐;
小結(jié)
我們已經(jīng)從多個方面分析了互斥鎖 sync.Mutex
的實(shí)現(xiàn)原理,在這里我們從加鎖和解鎖兩個方面總結(jié)一下結(jié)論和注意事項(xiàng)更振。
互斥鎖的加鎖過程比較復(fù)雜炕桨,它涉及自旋、信號量以及調(diào)度等概念:
- 如果互斥鎖處于初始化狀態(tài)肯腕,就會直接通過置位
mutexLocked
加鎖献宫; - 如果互斥鎖處于
mutexLocked
并且在普通模式下工作,就會進(jìn)入自旋实撒,執(zhí)行 30 次PAUSE
指令消耗 CPU 時間等待鎖的釋放姊途; - 如果當(dāng)前 Goroutine 等待鎖的時間超過了 1ms涉瘾,互斥鎖就會切換到饑餓模式;
- 互斥鎖在正常情況下會通過
sync.runtime_SemacquireMutex
函數(shù)將嘗試獲取鎖的 Goroutine 切換至休眠狀態(tài)捷兰,等待鎖的持有者喚醒當(dāng)前 Goroutine立叛; - 如果當(dāng)前 Goroutine 是互斥鎖上的最后一個等待的協(xié)程或者等待的時間小于 1ms,當(dāng)前 Goroutine 會將互斥鎖切換回正常模式贡茅;
互斥鎖的解鎖過程與之相比就比較簡單秘蛇,其代碼行數(shù)不多、邏輯清晰顶考,也比較容易理解:
- 當(dāng)互斥鎖已經(jīng)被解鎖時赁还,那么調(diào)用
sync.Mutex.Unlock
會直接拋出異常; - 當(dāng)互斥鎖處于饑餓模式時驹沿,會直接將鎖的所有權(quán)交給隊(duì)列中的下一個等待者艘策,等待者會負(fù)責(zé)設(shè)置
mutexLocked
標(biāo)志位; - 當(dāng)互斥鎖處于普通模式時渊季,如果沒有 Goroutine 等待鎖的釋放或者已經(jīng)有被喚醒的 Goroutine 獲得了鎖柬焕,就會直接返回;在其他情況下會通過
sync.runtime_Semrelease
喚醒對應(yīng)的 Goroutine梭域;
RWMutex
讀寫互斥鎖 sync.RWMutex
是細(xì)粒度的互斥鎖斑举,它不限制資源的并發(fā)讀,但是讀寫病涨、寫寫操作無法并行執(zhí)行富玷。
結(jié)構(gòu)體
sync.RWMutex
中總共包含以下 5 個字段:
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
-
w
— 復(fù)用互斥鎖提供的能力; -
writerSem
和readerSem
— 分別用于寫等待讀和讀等待寫: -
readerCount
存儲了當(dāng)前正在執(zhí)行的讀操作的數(shù)量既穆; -
readerWait
表示當(dāng)寫操作被阻塞時等待的讀操作個數(shù)赎懦;
我們會依次分析獲取寫鎖和讀鎖的實(shí)現(xiàn)原理,其中:
- 寫操作使用
sync.RWMutex.Lock
和sync.RWMutex.Unlock
方法幻工; - 讀操作使用
sync.RWMutex.RLock
和sync.RWMutex.RUnlock
方法励两;
寫鎖
當(dāng)資源的使用者想要獲取寫鎖時,需要調(diào)用 sync.RWMutex.Lock
方法:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
- 調(diào)用結(jié)構(gòu)體持有的
sync.Mutex
的sync.Mutex.Lock
方法阻塞后續(xù)的寫操作囊颅;- 因?yàn)榛コ怄i已經(jīng)被獲取当悔,其他 Goroutine 在獲取寫鎖時就會進(jìn)入自旋或者休眠;
- 調(diào)用
atomic.AddInt32
方法阻塞后續(xù)的讀操作: - 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖(r != 0)踢代,該 Goroutine 會調(diào)用
sync.runtime_SemacquireMutex
進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放writerSem
信號量將當(dāng)前協(xié)程喚醒盲憎。
寫鎖的釋放會調(diào)用 sync.RWMutex.Unlock
方法:
func (rw *RWMutex) Unlock() {
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
throw("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}
與加鎖的過程正好相反,寫鎖的釋放分以下幾個執(zhí)行:
- 調(diào)用
atomic.AddInt32
函數(shù)將變回正數(shù)胳挎,釋放讀鎖饼疙; - 通過 for 循環(huán)觸發(fā)所有由于獲取讀鎖而陷入等待的 Goroutine:
- 調(diào)用
sync.Mutex.Unlock
方法釋放寫鎖;
獲取寫鎖時會先阻塞寫鎖的獲取慕爬,后阻塞讀鎖的獲取窑眯,這種策略能夠保證讀操作不會被連續(xù)的寫操作『餓死』屏积。
讀鎖
讀鎖的加鎖方法 sync.RWMutex.RLock
很簡單,該方法會通過 atomic.AddInt32
將 readerCount
加一:
func (rw *RWMutex) RLock() {
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
- 如果該方法返回負(fù)數(shù) — 其他 Goroutine 獲得了寫鎖磅甩,當(dāng)前 Goroutine 就會調(diào)用
sync.runtime_SemacquireMutex
陷入休眠等待鎖的釋放肾请; - 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖,當(dāng)前方法就會成功返回更胖;
當(dāng) Goroutine 想要釋放讀鎖時,會調(diào)用如下所示的 sync.RWMutex.RUnlock
方法:
func (rw *RWMutex) RUnlock() {
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r)
}
}
該方法會先減少正在讀資源的 readerCount
整數(shù)隔显,根據(jù) atomic.AddInt32
的返回值不同會分別進(jìn)行處理:
- 如果返回值大于等于零 — 讀鎖直接解鎖成功却妨;
- 如果返回值小于零 — 有一個正在執(zhí)行的寫操作,在這時會調(diào)用
sync.RWMutex.rUnlockSlow
方法括眠;
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
throw("sync: RUnlock of unlocked RWMutex")
}
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
sync.RWMutex.rUnlockSlow
會減少獲取鎖的寫操作等待的讀操作數(shù) readerWait
并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem
彪标,該信號量被觸發(fā)時晌块,調(diào)度器就會喚醒嘗試獲取寫鎖的 Goroutine憎蛤。
小結(jié)
讀寫互斥鎖 sync.RWMutex
雖然提供的功能非常復(fù)雜,不過因?yàn)樗⒃?sync.Mutex
上王悍,所以整體的實(shí)現(xiàn)上會簡單很多当船。我們總結(jié)一下讀鎖和寫鎖的關(guān)系:
- 調(diào)用
sync.RWMutex.Lock
嘗試獲取寫鎖時题画;- 每次
sync.RWMutex.RUnlock
都會將readerWait
其減一,當(dāng)它歸零時該 Goroutine 就會獲得寫鎖德频; - 將
readerCount
減少rwmutexMaxReaders
個數(shù)以阻塞后續(xù)的讀操作苍息;
- 每次
- 調(diào)用
sync.RWMutex.Unlock
釋放寫鎖時,會先通知所有的讀操作壹置,然后才會釋放持有的互斥鎖竞思;
讀寫互斥鎖在互斥鎖之上提供了額外的更細(xì)粒度的控制,能夠在讀操作遠(yuǎn)遠(yuǎn)多于寫操作時提升性能钞护。
WaitGroup
sync.WaitGroup
可以等待一組 Goroutine 的返回盖喷,一個比較常見的使用場景是批量發(fā)出 RPC 或者 HTTP 請求:
requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(r *Request) {
defer wg.Done()
// res, err := service.call(r)
}(request)
}
wg.Wait()
我們可以通過 sync.WaitGroup
將原本順序執(zhí)行的代碼在多個 Goroutine 中并發(fā)執(zhí)行,加快程序處理的速度难咕。
結(jié)構(gòu)體
sync.WaitGroup
結(jié)構(gòu)體中的成員變量非常簡單课梳,其中只包含兩個成員變量:
type WaitGroup struct {
noCopy noCopy
state1 [3]uint32
}
-
noCopy
— 保證sync.WaitGroup
不會被開發(fā)者通過再賦值的方式拷貝; -
state1
— 存儲著狀態(tài)和信號量余佃;
sync.noCopy
是一個特殊的私有結(jié)構(gòu)體惦界,tools/go/analysis/passes/copylock
包中的分析器會在編譯期間檢查被拷貝的變量中是否包含 sync.noCopy
結(jié)構(gòu)體,如果包含該結(jié)構(gòu)體就會在運(yùn)行時報錯咙冗。
sync.WaitGroup
` 結(jié)構(gòu)體中還包含一個總共占用 12 字節(jié)(8 字節(jié)用戶存儲狀態(tài)沾歪,高32位及state1[1]代表counter,低32位state1[0]是等待gorutine數(shù)雾消,另外32位state1[2]是信號量)的數(shù)組灾搏,這個數(shù)組會存儲當(dāng)前結(jié)構(gòu)體的狀態(tài)挫望。
sync.WaitGroup
提供的私有方法 sync.WaitGroup.state
能夠幫我們從 state1
字段中取出它的狀態(tài)和信號量。
接口
sync.WaitGroup
對外暴露了三個方法 — sync.WaitGroup.Add
狂窑、sync.WaitGroup.Wait
和 sync.WaitGroup.Done
媳板。
因?yàn)槠渲械?sync.WaitGroup.Done
只是向 sync.WaitGroup.Add
方法傳入了 -1,所以我們重點(diǎn)分析另外兩個方法 sync.WaitGroup.Add
和 sync.WaitGroup.Wait
:
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || w == 0 {
return
}
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
sync.WaitGroup.Add
方法可以更新 sync.WaitGroup
中的計數(shù)器 counter
泉哈。雖然 sync.WaitGroup.Add
方法傳入的參數(shù)可以為負(fù)數(shù)蛉幸,但是計數(shù)器只能是非負(fù)數(shù),一旦出現(xiàn)負(fù)數(shù)就會發(fā)生程序崩潰丛晦。當(dāng)調(diào)用計數(shù)器歸零奕纫,也就是所有任務(wù)都執(zhí)行完成時,就會通過 sync.runtime_Semrelease
喚醒處于等待狀態(tài)的所有 Goroutine烫沙。
sync.WaitGroup
的另一個方法 sync.WaitGroup.Wait
會在計數(shù)器大于 0 并且不存在等待的 Goroutine 時匹层,調(diào)用 sync.runtime_Semacquire
陷入睡眠狀態(tài)。
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
if v == 0 {
return
}
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if +statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
當(dāng) sync.WaitGroup
的計數(shù)器歸零時锌蓄,當(dāng)陷入睡眠狀態(tài)的 Goroutine 就被喚醒升筏,上述方法會立刻返回。
小結(jié)
通過對 sync.WaitGroup
的分析和研究瘸爽,我們能夠得出以下結(jié)論:
-
sync.WaitGroup
必須在sync.WaitGroup.Wait
方法返回之后才能被重新使用您访; -
sync.WaitGroup.Done
只是對sync.WaitGroup.Add
方法的簡單封裝,我們可以向sync.WaitGroup.Add
方法傳入任意負(fù)數(shù)(需要保證計數(shù)器非負(fù))快速將計數(shù)器歸零以喚醒其他等待的 Goroutine剪决; - 可以同時有多個 Goroutine 等待當(dāng)前
sync.WaitGroup
計數(shù)器的歸零洋只,這些 Goroutine 會被同時喚醒;
Once
Go 語言標(biāo)準(zhǔn)庫中 sync.Once
可以保證在 Go 程序運(yùn)行期間的某段代碼只會執(zhí)行一次昼捍。在運(yùn)行如下所示的代碼時识虚,我們會看到如下所示的運(yùn)行結(jié)果:
func main() {
o := &sync.Once{}
for i := 0; i < 10; i++ {
o.Do(func() {
fmt.Println("only once")
})
}
}
$ go run main.go
only once
結(jié)構(gòu)體
每一個 sync.Once
結(jié)構(gòu)體中都只包含一個用于標(biāo)識代碼塊是否執(zhí)行過的 done
以及一個互斥鎖 sync.Mutex
:
type Once struct {
done uint32
m Mutex
}
接口
sync.Once.Do
是 sync.Once
結(jié)構(gòu)體對外唯一暴露的方法,該方法會接收一個入?yún)榭盏暮瘮?shù):
- 如果傳入的函數(shù)已經(jīng)執(zhí)行過妒茬,就會直接返回担锤;
- 如果傳入的函數(shù)沒有執(zhí)行過,就會調(diào)用
sync.Once.doSlow
執(zhí)行傳入的函數(shù):
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
- 為當(dāng)前 Goroutine 獲取互斥鎖乍钻;
- 執(zhí)行傳入的無入?yún)⒑瘮?shù)肛循;
- 運(yùn)行延遲函數(shù)調(diào)用,將成員變量
done
更新成 1银择;
sync.Once
就會通過成員變量 done
確保函數(shù)不會執(zhí)行第二次多糠。
小結(jié)
作為用于保證函數(shù)執(zhí)行次數(shù)的 sync.Once
結(jié)構(gòu)體,它使用互斥鎖和 sync/atomic
包提供的方法實(shí)現(xiàn)了某個函數(shù)在程序運(yùn)行期間只能執(zhí)行一次的語義浩考。在使用該結(jié)構(gòu)體時夹孔,我們也需要注意以下的問題:
-
sync.Once.Do
方法中傳入的函數(shù)只會被執(zhí)行一次,哪怕函數(shù)中發(fā)生了panic
; - 兩次調(diào)用
sync.Once.Do
方法傳入不同的函數(shù)也只會執(zhí)行第一次調(diào)用的函數(shù)搭伤;
Cond
Go 語言標(biāo)準(zhǔn)庫中的 sync.Cond
一個條件變量只怎,它可以讓一系列的 Goroutine 都在滿足特定條件時被喚醒。每一個 sync.Cond
結(jié)構(gòu)體在初始化時都需要傳入一個互斥鎖怜俐,我們可以通過下面的例子了解它的使用方法:
func main() {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go listen(c)
}
time.Sleep(1*time.Second)
go broadcast(c)
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
func broadcast(c *sync.Cond) {
c.L.Lock()
c.Broadcast() //喚起c等待gorutine隊(duì)列中所有的gorutine
c.L.Unlock()
}
func listen(c *sync.Cond) {
c.L.Lock()
c.Wait() //阻塞等待信號
fmt.Println("listen")
c.L.Unlock()
}
$ go run main.go
listen
...
listen
上述代碼同時運(yùn)行了 11 個 Goroutine身堡,這 11 個 Goroutine 分別做了不同事情:
- 10 個 Goroutine 通過
sync.Cond.Wait
等待特定條件的滿足; - 1 個 Goroutine 會調(diào)用
sync.Cond.Broadcast
方法通知所有陷入等待的 Goroutine拍鲤;
結(jié)構(gòu)體
sync.Cond
的結(jié)構(gòu)體中包含以下 4 個字段:
type Cond struct {
noCopy noCopy //用于保證結(jié)構(gòu)體不會在編譯期間拷貝
L Locker //用于保護(hù)內(nèi)部的 `notify` 字段贴谎,`Locker` 接口類型的變量
notify notifyList // 一個 Goroutine 的鏈表,它是實(shí)現(xiàn)同步機(jī)制的核心結(jié)構(gòu)
checker copyChecker //用于禁止運(yùn)行期間發(fā)生的拷貝
}
type notifyList struct {
wait uint32 //表示當(dāng)前正在等待的 Goroutine
notify uint32 //表示當(dāng)前已經(jīng)通知到的 Goroutine
lock mutex
head *sudog //指向的鏈表的頭
tail *sudog //指向的鏈表的尾
}
接口
sync.Cond
對外暴露的 sync.Cond.Wait
方法會將當(dāng)前 Goroutine 陷入休眠狀態(tài)季稳,它的執(zhí)行過程如下:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的鏈接名擅这。將等待計數(shù)器加一并解鎖
c.L.Unlock()
runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的鏈接名。 等待其他 Goroutine 的喚醒并加鎖
c.L.Lock()
}
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
runtime.notifyListWait
函數(shù)會獲取當(dāng)前 Goroutine 并將它追加到 Goroutine 通知鏈表的最末端:
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
除了將當(dāng)前 Goroutine 追加到鏈表的末端之外绞幌,我們還會調(diào)用 runtime.goparkunlock
將當(dāng)前 Goroutine 陷入休眠狀態(tài),該函數(shù)也是在 Go 語言切換 Goroutine 時經(jīng)常會使用的方法一忱,它會直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒莲蜘。
sync.Cond.Signal
和 sync.Cond.Broadcast
方法就是用來喚醒調(diào)用 sync.Cond.Wait
陷入休眠的 Goroutine,它們兩個的實(shí)現(xiàn)有一些細(xì)微差別:
-
sync.Cond.Signal
方法會喚醒隊(duì)列最前面的 Goroutine帘营; -
sync.Cond.Broadcast
方法會喚醒隊(duì)列中全部的 Goroutine票渠;
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
runtime.notifyListNotifyOne
函數(shù)只會從 sync.notifyList
鏈表中找到滿足 sudog.ticket == l.notify
條件的 Goroutine 并通過 readyWithTime
喚醒:
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
runtime.notifyListNotifyAll
會依次通過 runtime.readyWithTime
函數(shù)喚醒鏈表中 Goroutine:
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Goroutine 的喚醒順序也是按照加入隊(duì)列的先后順序,先加入的會先被喚醒芬迄,而后加入的 Goroutine 需要等待調(diào)度器的調(diào)度问顷。
在一般情況下,我們都會先調(diào)用 sync.Cond.Wait
陷入休眠等待滿足期望條件禀梳,當(dāng)滿足喚醒條件時杜窄,就可以選擇使用 sync.Cond.Signal
或者 sync.Cond.Broadcast
喚醒一個或者全部的 Goroutine。
小結(jié)
sync.Cond
不是一個常用的同步機(jī)制算途,在遇到長時間條件無法滿足時塞耕,與使用 for {}
進(jìn)行忙碌等待相比,sync.Cond
能夠讓出處理器的使用權(quán)嘴瓤。在使用的過程中我們需要注意以下問題:
-
sync.Cond.Wait
方法在調(diào)用之前一定要使用獲取互斥鎖扫外,否則會觸發(fā)程序崩潰; -
sync.Cond.Signal
方法喚醒的 Goroutine 都是隊(duì)列最前面廓脆、等待最久的 Goroutine筛谚; -
sync.Cond.Broadcast
會按照一定順序廣播通知等待的全部 Goroutine;
擴(kuò)展原語
除了標(biāo)準(zhǔn)庫中提供的同步原語之外停忿,Go 語言還在子倉庫 sync 中提供了四種擴(kuò)展原語驾讲,x/sync/errgroup.Group
、x/sync/semaphore.Weighted
、x/sync/singleflight.Group
和 x/sync/syncmap.Map
蝎毡,其中的 x/sync/syncmap.Map
在 1.9 版本中被移植到了標(biāo)準(zhǔn)庫中厚柳。
下面簡單介紹 Go 語言在擴(kuò)展包中提供的三種同步原語,也就是 x/sync/errgroup.Group
沐兵、x/sync/semaphore.Weighted
和 x/sync/singleflight.Group
别垮。
ErrGroup
x/sync/errgroup.Group
就為我們在一組 Goroutine 中提供了同步、錯誤傳播以及上下文取消的功能扎谎。
x/sync/errgroup.Group.Go
方法能夠創(chuàng)建一個 Goroutine 并在其中執(zhí)行傳入的函數(shù)碳想,而 x/sync/errgroup.Group.Wait
會等待所有 Goroutine 全部返回,該方法的不同返回結(jié)果也有不同的含義:
- 如果返回錯誤 — 這一組 Goroutine 最少返回一個錯誤毁靶;
- 如果返回空值 — 所有 Goroutine 都成功執(zhí)行胧奔;
結(jié)構(gòu)體
x/sync/errgroup.Group
結(jié)構(gòu)體組成如下:
type Group struct {
cancel func() //創(chuàng)建context.Context時返回的取消函數(shù),用于在多個 Goroutine 之間同步取消信號预吆;
wg sync.WaitGroup //用于等待一組 Goroutine 完成子任務(wù)的同步原語
errOnce sync.Once //用于保證只接收一個子任務(wù)返回的錯誤
err error
}
這些字段共同組成了 x/sync/errgroup.Group
結(jié)構(gòu)體并為我們提供同步龙填、錯誤傳播以及上下文取消等功能。
x/sync/errgroup.Group
的實(shí)現(xiàn)沒有涉及底層和運(yùn)行時包中的 API拐叉,它只是對基本同步語義進(jìn)行了封裝以提供更加復(fù)雜的功能岩遗。在使用時,我們也需要注意以下的幾個問題:
-
x/sync/errgroup.Group
在出現(xiàn)錯誤或者等待結(jié)束后都會調(diào)用context.Context
的cancel
方法同步取消信號凤瘦; - 只有第一個出現(xiàn)的錯誤才會被返回宿礁,剩余的錯誤都會被直接拋棄;
Semaphore
信號量是在并發(fā)編程中常見的一種同步機(jī)制蔬芥,在需要控制訪問資源的進(jìn)程數(shù)量時就會用到信號量梆靖,它會保證持有的計數(shù)器在 0 到初始化的權(quán)重之間波動。
- 每次獲取資源時都會將信號量中的計數(shù)器減去對應(yīng)的數(shù)值笔诵,在釋放時重新加回來返吻;
- 當(dāng)遇到計數(shù)器大于信號量大小時就會進(jìn)入休眠等待其他線程釋放信號;
Go 語言的擴(kuò)展包中就提供了帶權(quán)重的信號量 x/sync/semaphore.Weighted
乎婿,我們可以按照不同的權(quán)重對資源的訪問進(jìn)行管理思喊,這個結(jié)構(gòu)體對外也只暴露了四個方法:
-
x/sync/semaphore.NewWeighted
用于創(chuàng)建新的信號量; -
x/sync/semaphore.Weighted.Acquire
阻塞地獲取指定權(quán)重的資源次酌,如果當(dāng)前沒有空閑資源恨课,就會陷入休眠等待; -
x/sync/semaphore.Weighted.TryAcquire
非阻塞地獲取指定權(quán)重的資源岳服,如果當(dāng)前沒有空閑資源剂公,就會直接返回false
; -
x/sync/semaphore.Weighted.Release
用于釋放指定權(quán)重的資源吊宋;
結(jié)構(gòu)體
x/sync/semaphore.NewWeighted
方法能根據(jù)傳入的信號量最大權(quán)重創(chuàng)建一個 x/sync/semaphore.Weighted
結(jié)構(gòu)體指針:
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
x/sync/semaphore.Weighted
結(jié)構(gòu)體中包含一個 waiters
列表纲辽,其中存儲著等待獲取資源的 Goroutine,除此之外它還包含當(dāng)前信號量的上限以及一個計數(shù)器 cur
,這個計數(shù)器的范圍就是 [0, size]:
信號量中的計數(shù)器會隨著用戶對資源的訪問和釋放進(jìn)行改變拖吼,引入的權(quán)重概念能夠提供更細(xì)粒度的資源的訪問控制鳞上,盡可能滿足常見的用例。
帶權(quán)重的信號量確實(shí)有著更多的應(yīng)用場景吊档,這也是 Go 語言對外提供的唯一一種信號量實(shí)現(xiàn)篙议,在使用的過程中我們需要注意以下的幾個問題:
-
x/sync/semaphore.Weighted.Acquire
和x/sync/semaphore.Weighted.TryAcquire
方法都可以用于獲取資源,前者會阻塞地獲取信號量怠硼,后者會非阻塞地獲取信號量鬼贱; -
x/sync/semaphore.Weighted.Release
方法會按照 FIFO 的順序喚醒可以被喚醒的 Goroutine; - 如果一個 Goroutine 獲取了較多地資源香璃,由于
x/sync/semaphore.Weighted.Release
的釋放策略可能會等待比較長的時間这难;
SingleFlight
x/sync/singleflight.Group
是 Go 語言擴(kuò)展包中提供了另一種同步原語,它能夠在一個服務(wù)中抑制對下游的多次重復(fù)請求葡秒。一個比較常見的使用場景是 — 我們在使用 Redis 對數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行緩存姻乓,發(fā)生緩存擊穿時,大量的流量都會打到數(shù)據(jù)庫上進(jìn)而影響服務(wù)的尾延時眯牧。
x/sync/singleflight.Group
能有效地解決這個問題蹋岩,它能夠限制對同一個 Key
的多次重復(fù)請求,減少對下游的瞬時流量炸站。
在資源的獲取非常昂貴時(例如:訪問緩存星澳、數(shù)據(jù)庫)疚顷,就很適合使用 x/sync/singleflight.Group
對服務(wù)進(jìn)行優(yōu)化旱易。我們來了解一下它的使用方法:
type service struct {
requestGroup singleflight.Group
}
func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
rows, err := // select * from tables
if err != nil {
return nil, err
}
return rows, nil
})
if err != nil {
return nil, err
}
return Response{
rows: rows,
}, nil
}
因?yàn)檎埱蟮墓T跇I(yè)務(wù)上一般表示相同的請求,所以上述代碼使用它作為請求的鍵腿堤。當(dāng)然阀坏,我們也可以選擇其他的唯一字段作為 x/sync/singleflight.Group.Do
方法的第一個參數(shù)減少重復(fù)的請求。
結(jié)構(gòu)體
x/sync/singleflight.Group
結(jié)構(gòu)體由一個互斥鎖sync.Mutex
和一個映射表組成笆檀,每一個 x/sync/singleflight.call
結(jié)構(gòu)體都保存了當(dāng)前調(diào)用對應(yīng)的信息:
type Group struct {
mu sync.Mutex
m map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}
x/sync/singleflight.call
結(jié)構(gòu)體中的 val
和 err
字段都只會在執(zhí)行傳入的函數(shù)時賦值一次并在 sync.WaitGroup.Wait
返回時被讀燃商谩;dups
和 chans
兩個字段分別存儲了抑制的請求數(shù)量以及用于同步結(jié)果的 Channel酗洒。
當(dāng)我們需要減少對下游的相同請求時士修,就可以使用 x/sync/singleflight.Group
來增加吞吐量和服務(wù)質(zhì)量,不過在使用的過程中我們也需要注意以下的幾個問題:
-
x/sync/singleflight.Group.Do
和x/sync/singleflight.Group.DoChan
一個用于同步阻塞調(diào)用傳入的函數(shù)樱衷,一個用于異步調(diào)用傳入的參數(shù)并通過 Channel 接收函數(shù)的返回值棋嘲; -
x/sync/singleflight.Group.Forget
方法可以通知x/sync/singleflight.Group
在持有的映射表中刪除某個鍵,接下來對該鍵的調(diào)用就不會等待前面的函數(shù)返回了矩桂; - 一旦調(diào)用的函數(shù)返回了錯誤沸移,所有在等待的 Goroutine 也都會接收到同樣的錯誤;
Channel
下面介紹管道 Channel 的設(shè)計原理、數(shù)據(jù)結(jié)構(gòu)和常見操作雹锣,例如 Channel 的創(chuàng)建网沾、發(fā)送、接收和關(guān)閉蕊爵。作為 Go 核心的數(shù)據(jù)結(jié)構(gòu)和 Goroutine 之間的通信方式辉哥,Channel 是支撐 Go 語言高性能并發(fā)編程模型的重要結(jié)構(gòu),我們首先需要了解 Channel 背后的設(shè)計原理以及它的底層數(shù)據(jù)結(jié)構(gòu)在辆。
設(shè)計原理
Go 語言中最常見的证薇、也是經(jīng)常被人提及的設(shè)計模式就是 — 不要通過共享內(nèi)存的方式進(jìn)行通信,而是應(yīng)該通過通信的方式共享內(nèi)存匆篓。在很多主流的編程語言中浑度,多個線程傳遞數(shù)據(jù)的方式一般都是共享內(nèi)存,為了解決線程沖突的問題鸦概,我們需要限制同一時間能夠讀寫這些變量的線程數(shù)量箩张,這與 Go 語言鼓勵的方式并不相同。
雖然我們在 Go 語言中也能使用共享內(nèi)存加互斥鎖進(jìn)行通信窗市,但是 Go 語言提供了一種不同的并發(fā)模型先慷,也就是通信順序進(jìn)程(Communicating sequential processes,CSP)咨察。兩個 Goroutine论熙,一個會向 Channel 中發(fā)送數(shù)據(jù),另一個會從 Channel 中接收數(shù)據(jù)摄狱。Channel 分為同步Channel和異步Channel:
- 同步 Channel — 不需要緩沖區(qū)脓诡,發(fā)送方會直接將數(shù)據(jù)交給(Handoff)接收方;
- 異步 Channel — 基于環(huán)形緩存的傳統(tǒng)生產(chǎn)者消費(fèi)者模型媒役;
數(shù)據(jù)結(jié)構(gòu)
Go 語言的 Channel 在運(yùn)行時使用 runtime.hchan
結(jié)構(gòu)體表示祝谚。我們在 Go 語言中創(chuàng)建新的 Channel 時,實(shí)際上創(chuàng)建的都是如下所示的結(jié)構(gòu)體:
type hchan struct {
qcount uint //Channel 中的元素個數(shù)
dataqsiz uint //Channel 中的循環(huán)隊(duì)列的長度
buf unsafe.Pointer //Channel 的緩沖區(qū)數(shù)據(jù)指針
elemsize uint16 //當(dāng)前 Channel 能夠收發(fā)的元素大小
closed uint32
elemtype *_type //當(dāng)前 Channel 能夠收發(fā)的元素類型
sendx uint //Channel 的發(fā)送操作處理到的位置
recvx uint //Channel 的接收操作處理到的位置
recvq waitq // 當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的接收 Goroutine 列表
sendq waitq //當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的發(fā)送 Goroutine 列表
lock mutex
}
等待隊(duì)列使用雙向鏈表 `runtime.waitq`
type waitq struct {
first *sudog
last *sudog
}
runtime.sudog
表示一個在等待列表中的 Goroutine酣衷,該結(jié)構(gòu)體中存儲了阻塞的相關(guān)信息以及兩個分別指向前后 runtime.sudog
的指針交惯。
創(chuàng)建管道
Go 語言中所有 Channel 的創(chuàng)建都會使用 make
關(guān)鍵字。編譯器會將 make(chan int, 10)
表達(dá)式被轉(zhuǎn)換成 OMAKE
類型的節(jié)點(diǎn)穿仪,并在類型檢查階段將 OMAKE
類型的節(jié)點(diǎn)轉(zhuǎn)換成 OMAKECHAN
類型:
func typecheck1(n *Node, top int) (res *Node) {
switch n.Op {
case OMAKE:
...
switch t.Etype {
case TCHAN:
l = nil
if i < len(args) { // 帶緩沖區(qū)的異步 Channel
...
n.Left = l
} else { // 不帶緩沖區(qū)的同步 Channel
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN
}
}
}
這一階段會對傳入 make
關(guān)鍵字的緩沖區(qū)大小進(jìn)行檢查席爽,如果我們不向 make
傳遞表示緩沖區(qū)大小的參數(shù),那么就會設(shè)置一個默認(rèn)值 0啊片,也就是當(dāng)前的 Channel 不存在緩沖區(qū)只锻。
OMAKECHAN
類型的節(jié)點(diǎn)最終都會在 SSA 中間代碼生成階段之前被轉(zhuǎn)換成調(diào)用 runtime.makechan
或者 runtime.makechan64
的函數(shù):
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
runtime.makechan
和 runtime.makechan64
會根據(jù)傳入的參數(shù)類型和緩沖區(qū)大小創(chuàng)建一個新的 Channel 結(jié)構(gòu),其中后者用于處理緩沖區(qū)大小大于 2 的 32 次方的情況钠龙,我們重點(diǎn)關(guān)注 runtime.makechan
函數(shù):
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, _ := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
switch {
case mem == 0: //當(dāng)前 Channel 中不存在緩沖區(qū)炬藤,那么就只會為 runtime.hchan分配一段內(nèi)存空間御铃;
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0: //Channel 中存儲的類型不是指針類型,就會為當(dāng)前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內(nèi)存空間沈矿;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 在默認(rèn)情況下會單獨(dú)為runtime.hchan和緩沖區(qū)分配內(nèi)存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
在函數(shù)的最后會統(tǒng)一更新 runtime.hchan
的 elemsize
上真、elemtype
和 dataqsiz
幾個字段。
發(fā)送數(shù)據(jù)
當(dāng)我們想要向 Channel 發(fā)送數(shù)據(jù)時羹膳,就需要使用 ch <- i
語句睡互,編譯器會將它解析成 OSEND
節(jié)點(diǎn)并在 cmd/compile/internal/gc.walkexpr
函數(shù)中轉(zhuǎn)換成 runtime.chansend1
:
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OSEND:
n1 := n.Right
n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
n1 = walkexpr(n1, init)
n1 = nod(OADDR, n1, nil)
n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
}
}
runtime.chansend1
只是調(diào)用了 runtime.chansend
并傳入 Channel 和需要發(fā)送的數(shù)據(jù)。runtime.chansend
是向 Channel 中發(fā)送數(shù)據(jù)時最終會調(diào)用的函數(shù)陵像,這個函數(shù)負(fù)責(zé)了發(fā)送數(shù)據(jù)的全部邏輯就珠,如果我們在調(diào)用時將 block
參數(shù)設(shè)置成 true
,那么就表示當(dāng)前發(fā)送操作是一個阻塞操作:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
在發(fā)送數(shù)據(jù)的邏輯執(zhí)行之前會先為當(dāng)前 Channel 加鎖醒颖,防止發(fā)生競爭條件妻怎。如果 Channel 已經(jīng)關(guān)閉,那么向該 Channel 發(fā)送數(shù)據(jù)時就會報"send on closed channel"
錯誤并中止程序泞歉。
因?yàn)?runtime.chansend
函數(shù)的實(shí)現(xiàn)比較復(fù)雜逼侦,所以我們這里將該函數(shù)的執(zhí)行過程分成以下的三個部分:
- 當(dāng)存在等待的接收者時,通過
runtime.send
直接將數(shù)據(jù)發(fā)送給阻塞的接收者腰耙; - 當(dāng)緩沖區(qū)存在空余空間時榛丢,將發(fā)送的數(shù)據(jù)寫入 Channel 的緩沖區(qū);
- 當(dāng)不存在緩沖區(qū)或者緩沖區(qū)已滿時挺庞,等待其他 Goroutine 從 Channel 接收數(shù)據(jù)晰赞;
直接發(fā)送
如果目標(biāo) Channel 沒有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine色罚,那么 runtime.chansend
函數(shù)會從接收隊(duì)列 recvq
中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù):
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
發(fā)送數(shù)據(jù)時會調(diào)用 runtime.send
呻率,該函數(shù)的執(zhí)行可以分成兩個部分:
- 調(diào)用
runtime.sendDirect
函數(shù)將發(fā)送的數(shù)據(jù)直接拷貝到x = <-c
表達(dá)式中變量x
所在的內(nèi)存地址上娶牌; - 調(diào)用
runtime.goready
將等待接收數(shù)據(jù)的 Goroutine 標(biāo)記成可運(yùn)行狀態(tài)Grunnable
并把該 Goroutine 放到發(fā)送方所在的處理器的runnext
上等待執(zhí)行翰铡,該處理器在下一次調(diào)度時就會立刻喚醒數(shù)據(jù)的接收方;
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1)
}
需要注意的是蚓哩,發(fā)送數(shù)據(jù)的過程只是將接收方的 Goroutine 放到了處理器的 runnext
中,程序沒有立刻執(zhí)行該 Goroutine。
緩沖區(qū)
如果創(chuàng)建的 Channel 包含緩沖區(qū)并且 Channel 中的數(shù)據(jù)沒有裝滿隘谣,就會執(zhí)行下面這段代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
在這里我們首先會使用 chanbuf
計算出下一個可以存儲數(shù)據(jù)的位置,然后通過 runtime.typedmemmove
將發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)中并增加 sendx
索引和 qcount
計數(shù)器啄巧。
如果當(dāng)前 Channel 的緩沖區(qū)未滿寻歧,向 Channel 發(fā)送的數(shù)據(jù)會存儲在 Channel 中 sendx
索引所在的位置并將 sendx
索引加一,由于這里的 buf
是一個循環(huán)數(shù)組秩仆,所以當(dāng) sendx
等于 dataqsiz
時就會重新回到數(shù)組開始的位置码泛。
阻塞發(fā)送
當(dāng) Channel 沒有接收者能夠處理數(shù)據(jù)時,向 Channel 發(fā)送數(shù)據(jù)就會被下游阻塞澄耍,當(dāng)然使用 select
關(guān)鍵字可以向 Channel 非阻塞地發(fā)送消息噪珊。向 Channel 阻塞地發(fā)送數(shù)據(jù)會執(zhí)行下面的代碼晌缘,我們可以簡單梳理一下這段代碼的邏輯:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if !block {
unlock(&c.lock)
return false
}
gp := getg() //獲取發(fā)送數(shù)據(jù)使用的 Goroutine
mysg := acquireSudog() //函數(shù)獲取 runtime.sudog結(jié)構(gòu)體并設(shè)置這一次阻塞發(fā)送的相關(guān)信息,例如發(fā)送的 Channel痢站、是否在 Select 控制結(jié)構(gòu)中和待發(fā)送數(shù)據(jù)的內(nèi)存地址等
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg) //將剛剛創(chuàng)建并初始化的 runtime.sudog加入發(fā)送等待隊(duì)列磷箕,并設(shè)置到當(dāng)前 Goroutine 的 `waiting` 上,表示 Goroutine 正在等待該 `sudog` 準(zhǔn)備就緒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) //將當(dāng)前的 Goroutine 陷入沉睡等待喚醒
//被調(diào)度器喚醒后會執(zhí)行一些收尾工作阵难,將一些屬性置零并且釋放 untime.sudog結(jié)構(gòu)體
gp.waiting = nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true //返回 `true` 表示這向 Channel 發(fā)送數(shù)據(jù)的結(jié)束
}
小結(jié)
我們在這里可以簡單梳理和總結(jié)一下使用 ch <- i
表達(dá)式向 Channel 發(fā)送數(shù)據(jù)時遇到的幾種情況:
- 如果當(dāng)前 Channel 的
recvq
上存在已經(jīng)被阻塞的 Goroutine岳枷,那么會直接將數(shù)據(jù)發(fā)送給當(dāng)前的 Goroutine 并將其設(shè)置成下一個運(yùn)行的 Goroutine; - 如果 Channel 存在緩沖區(qū)并且其中還有空閑的容量呜叫,我們就會直接將數(shù)據(jù)直接存儲到當(dāng)前緩沖區(qū)
sendx
所在的位置上空繁; - 如果不滿足上面的兩種情況,就會創(chuàng)建一個
runtime.sudog
結(jié)構(gòu)并將其加入 Channel 的sendq
隊(duì)列中朱庆,當(dāng)前 Goroutine 也會陷入阻塞等待其他的協(xié)程從 Channel 接收數(shù)據(jù)盛泡;
發(fā)送數(shù)據(jù)的過程中包含幾個會觸發(fā) Goroutine 調(diào)度的時機(jī):
- 發(fā)送數(shù)據(jù)時發(fā)現(xiàn) Channel 上存在等待接收數(shù)據(jù)的 Goroutine,立刻設(shè)置處理器的
runnext
屬性娱颊,但是并不會立刻觸發(fā)調(diào)度饭于; - 發(fā)送數(shù)據(jù)時并沒有找到接收方并且緩沖區(qū)已經(jīng)滿了,這時就會將自己加入 Channel 的
sendq
隊(duì)列并調(diào)用runtime.goparkunlock
觸發(fā) Goroutine 的調(diào)度讓出處理器的使用權(quán)维蒙;
接收數(shù)據(jù)
我們接下來繼續(xù)介紹 Channel 操作的另一方 — 數(shù)據(jù)的接收掰吕。Go 語言中可以使用兩種不同的方式去接收 Channel 中的數(shù)據(jù):
i <- ch
i, ok <- ch
這兩種不同的方法經(jīng)過編譯器的處理都會變成 ORECV
類型的節(jié)點(diǎn),后者會在類型檢查階段被轉(zhuǎn)換成 OAS2RECV
類型颅痊。
雖然不同的接收方式會被轉(zhuǎn)換成 runtime.chanrecv1
和 runtime.chanrecv2
兩種不同函數(shù)的調(diào)用殖熟,但是這兩個函數(shù)最終還是會調(diào)用 runtime.chanrecv
。
當(dāng)我們從一個空 Channel 接收數(shù)據(jù)時會直接調(diào)用 runtime.gopark
直接讓出處理器的使用權(quán)斑响。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
如果當(dāng)前 Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何的數(shù)據(jù)菱属,那么就會清除 ep
指針中的數(shù)據(jù)并立刻返回。
除了上述兩種特殊情況舰罚,使用 runtime.chanrecv
從 Channel 接收數(shù)據(jù)時還包含以下三種不同情況:
- 當(dāng)存在等待的發(fā)送者時纽门,通過
runtime.recv
直接從阻塞的發(fā)送者或者緩沖區(qū)中獲取數(shù)據(jù); - 當(dāng)緩沖區(qū)存在數(shù)據(jù)時营罢,從 Channel 的緩沖區(qū)中接收數(shù)據(jù)赏陵;
- 當(dāng)緩沖區(qū)中不存在數(shù)據(jù)時,等待其他 Goroutine 向 Channel 發(fā)送數(shù)據(jù)饲漾;
這三種情況和信息發(fā)送是一一對應(yīng)的蝙搔,此處不再贅述。
小結(jié)
我們梳理一下從 Channel 中接收數(shù)據(jù)時可能會發(fā)生的五種情況:
- 如果 Channel 為空考传,那么就會直接調(diào)用
runtime.gopark
掛起當(dāng)前 Goroutine吃型; - 如果 Channel 已經(jīng)關(guān)閉并且緩沖區(qū)沒有任何數(shù)據(jù),
runtime.chanrecv
函數(shù)會直接返回僚楞; - 如果 Channel 的
sendq
隊(duì)列中存在掛起的 Goroutine勤晚,就會將recvx
索引所在的數(shù)據(jù)拷貝到接收變量所在的內(nèi)存空間上并將sendq
隊(duì)列中 Goroutine 的數(shù)據(jù)拷貝到緩沖區(qū)枉层; - 如果 Channel 的緩沖區(qū)中包含數(shù)據(jù)就會直接讀取
recvx
索引對應(yīng)的數(shù)據(jù); - 在默認(rèn)情況下會掛起當(dāng)前的 Goroutine赐写,將
runtime.sudog
結(jié)構(gòu)加入recvq
隊(duì)列并陷入休眠等待調(diào)度器的喚醒返干;
我們總結(jié)一下從 Channel 接收數(shù)據(jù)時,會觸發(fā) Goroutine 調(diào)度的兩個時機(jī):
- 當(dāng) Channel 為空時血淌;
- 當(dāng)緩沖區(qū)中不存在數(shù)據(jù)并且也不存在數(shù)據(jù)的發(fā)送者時矩欠;
關(guān)閉管道
編譯器會將用于關(guān)閉管道的 close
關(guān)鍵字轉(zhuǎn)換成 OCLOSE
節(jié)點(diǎn)以及 runtime.closechan
的函數(shù)調(diào)用。
當(dāng) Channel 是一個空指針或者已經(jīng)被關(guān)閉時悠夯,Go 語言運(yùn)行時都會直接 panic
并拋出異常:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
處理完了這些異常的情況之后就可以開始執(zhí)行關(guān)閉 Channel 的邏輯了癌淮,下面這段代碼的主要工作就是將 recvq
和 sendq
兩個隊(duì)列中的數(shù)據(jù)加入到 Goroutine 列表 gList
中,與此同時該函數(shù)會清除所有 sudog
上未被處理的元素:
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
該函數(shù)在最后會為所有被阻塞的 Goroutine 調(diào)用 runtime.goready
觸發(fā)調(diào)度沦补。
調(diào)度器
每個線程會都占用 1M以上的內(nèi)存空間乳蓄,在對線程進(jìn)行切換時不止會消耗較多的內(nèi)存,恢復(fù)寄存器中的內(nèi)容還需要向操作系統(tǒng)申請或者銷毀對應(yīng)的資源夕膀,每一次線程上下文的切換都需要消耗 ~1us 左右的時間虚倒,但是 Go 調(diào)度器對 Goroutine 的上下文切換約為 ~0.2us,減少了 80% 的額外開銷产舞。
Go 語言的調(diào)度器通過使用與CPU 數(shù)量相等的線程減少線程頻繁切換的內(nèi)存開銷魂奥,同時在每一個線程上執(zhí)行額外開銷更低的 Goroutine 來降低操作系統(tǒng)和硬件的負(fù)載。
設(shè)計原理
基于任務(wù)竊取的 Go 語言調(diào)度器使用了沿用至今的 G-M-P 模型易猫,我們能在 runtime: improved scheduler 提交中找到任務(wù)竊取調(diào)度器剛被實(shí)現(xiàn)時的源代碼耻煤,調(diào)度器的 runtime.schedule
函數(shù)在這個版本的調(diào)度器中反而更簡單了:
static void schedule(void) {
G *gp;
top:
if(runtime·gcwaiting) {
gcstopm(); //如果當(dāng)前運(yùn)行時在等待垃圾回收
goto top;
}
//從本地或者全局的運(yùn)行隊(duì)列中獲取待執(zhí)行的 Goroutine
gp = runqget(m->p);
if(gp == nil)
gp = findrunnable();
...
//在當(dāng)前線程 M 上運(yùn)行 Goroutine
execute(gp);
}
當(dāng)前處理器本地的運(yùn)行隊(duì)列中不包含 Goroutine 時,調(diào)用 findrunnable
函數(shù)會觸發(fā)工作竊取准颓,從其它的處理器的隊(duì)列中隨機(jī)獲取一些 Goroutine哈蝇。
運(yùn)行時 G-M-P 模型中引入的處理器 P 是線程和 Goroutine 的中間層,我們從它的結(jié)構(gòu)體中就能看到處理器與 M 和 G 的關(guān)系:
struct P {
Lock;
uint32 status;
P* link;
uint32 tick;
M* m;
MCache* mcache;
G** runq;
int32 runqhead;
int32 runqtail;
int32 runqsize;
G* gfree;
int32 gfreecnt;
};
處理器持有一個由可運(yùn)行的 Goroutine 組成的運(yùn)行隊(duì)列 runq
攘已,還反向持有一個線程炮赦。調(diào)度器在調(diào)度時會從處理器的隊(duì)列中選擇隊(duì)列頭的 Goroutine 放到線程 M 上執(zhí)行。
基于工作竊取的多線程調(diào)度器將每一個線程綁定到了獨(dú)立的 CPU 上样勃,這些線程會被不同處理器管理吠勘,不同的處理器通過工作竊取對任務(wù)進(jìn)行再分配實(shí)現(xiàn)任務(wù)的平衡,也能提升調(diào)度器和 Go 語言程序的整體性能彤灶,今天所有的 Go 語言服務(wù)都受益于這一改動看幼。
數(shù)據(jù)結(jié)構(gòu)
調(diào)度器的三個重要組成部分 — 線程 M批旺、Goroutine G 和處理器 P:
G — 表示 Goroutine幌陕,它是一個待執(zhí)行的任務(wù);
M — 表示操作系統(tǒng)的線程汽煮,它由操作系統(tǒng)的調(diào)度器調(diào)度和管理搏熄;
P — 表示處理器棚唆,它可以被看做運(yùn)行在線程上的本地調(diào)度器;
下面詳細(xì)介紹它們的作用心例、數(shù)據(jù)結(jié)構(gòu)以及在運(yùn)行期間可能處于的狀態(tài)宵凌。
G
Gorotuine 就是 Go 語言調(diào)度器中待執(zhí)行的任務(wù),它在運(yùn)行時調(diào)度器中的地位與線程在操作系統(tǒng)中差不多止后,但是它占用了更小的內(nèi)存空間瞎惫,也降低了上下文切換的開銷。
Goroutine 只存在于 Go 語言的運(yùn)行時译株,它是 Go 語言在用戶態(tài)提供的線程瓜喇,作為一種粒度更細(xì)的資源調(diào)度單元,如果使用得當(dāng)能夠在高并發(fā)的場景下更高效地利用機(jī)器的 CPU歉糜。
Goroutine 在 Go 語言運(yùn)行時使用私有結(jié)構(gòu)體 runtime.g
表示乘寒。這個私有結(jié)構(gòu)體非常復(fù)雜,總共包含 40 多個用于表示各種狀態(tài)的成員變量匪补,我們在這里也不會介紹全部字段伞辛,而是會挑選其中的一部分進(jìn)行介紹:
type g struct {
stack stack //stack 字段描述了當(dāng)前 Goroutine 的棧內(nèi)存范圍 [stack.lo, stack.hi)
stackguard0 uintptr //用于調(diào)度器搶占式調(diào)度
preempt bool // 搶占信號
preemptStop bool // 搶占時將狀態(tài)修改成 `_Gpreempted`
preemptShrink bool // 在同步安全點(diǎn)收縮棧
_panic *_panic // 最內(nèi)側(cè)的 panic 結(jié)構(gòu)體
_defer *_defer // 最內(nèi)側(cè)的延遲函數(shù)結(jié)構(gòu)體
m *m //當(dāng)前 Goroutine 占用的線程,可能為空夯缺;
sched gobuf //存儲 Goroutine 的調(diào)度相關(guān)的數(shù)據(jù)
atomicstatus uint32 //Goroutine 的狀態(tài)
goid int64 //Goroutine 的 ID蚤氏,該字段對開發(fā)者不可見,Go 團(tuán)隊(duì)認(rèn)為引入 ID 會讓部分 Goroutine 變得更特殊踊兜,從而限制語言的并發(fā)能力
}
type gobuf struct {
sp uintptr //棧指針(Stack Pointer)
pc uintptr //程序計數(shù)器(Program Counter)
g guintptr //持有 runtime.gobuf
ret sys.Uintreg //系統(tǒng)調(diào)用的返回值
...
}
這個在調(diào)度器保存或者恢復(fù)上下文的時候用到瞧捌,其中的棧指針和程序計數(shù)器會用來存儲或者恢復(fù)寄存器中的值,改變程序即將執(zhí)行的代碼润文。
結(jié)構(gòu)體 runtime.g
的 atomicstatus
字段就存儲了當(dāng)前 Goroutine 的狀態(tài)姐呐。除了幾個已經(jīng)不被使用的以及與 GC 相關(guān)的狀態(tài)之外,Goroutine 可能處于以下 9 個狀態(tài):
狀態(tài) | 描述 |
---|---|
_Gidle |
剛剛被分配并且還沒有被初始化 |
_Grunnable |
沒有執(zhí)行代碼典蝌,沒有棧的所有權(quán)曙砂,存儲在運(yùn)行隊(duì)列中 |
_Grunning |
可以執(zhí)行代碼,擁有棧的所有權(quán)骏掀,被賦予了內(nèi)核線程 M 和處理器 P |
_Gsyscall |
正在執(zhí)行系統(tǒng)調(diào)用鸠澈,擁有棧的所有權(quán),沒有執(zhí)行用戶代碼截驮,被賦予了內(nèi)核線程 M 但是不在運(yùn)行隊(duì)列上 |
_Gwaiting |
由于運(yùn)行時而被阻塞笑陈,沒有執(zhí)行用戶代碼并且不在運(yùn)行隊(duì)列上,但是可能存在于 Channel 的等待隊(duì)列上 |
_Gdead |
沒有被使用葵袭,沒有執(zhí)行代碼涵妥,可能有分配的棧 |
_Gcopystack |
棧正在被拷貝,沒有執(zhí)行代碼坡锡,不在運(yùn)行隊(duì)列上 |
_Gpreempted |
由于搶占而被阻塞蓬网,沒有執(zhí)行用戶代碼并且不在運(yùn)行隊(duì)列上窒所,等待喚醒 |
_Gscan |
GC 正在掃描棧空間帆锋,沒有執(zhí)行代碼吵取,可以與其他狀態(tài)同時存在 |
上述狀態(tài)中比較常見是 _Grunnable
、_Grunning
锯厢、_Gsyscall
皮官、_Gwaiting
和 _Gpreempted
五個狀態(tài),我們會重點(diǎn)介紹這幾個狀態(tài)实辑,Goroutine 的狀態(tài)遷移是一個復(fù)雜的過程臣疑,觸發(fā) Goroutine 狀態(tài)遷移的方法也很多,在這里我們也沒有辦法介紹全部的遷移線路徙菠,我們會從中選擇一些進(jìn)行介紹讯沈。
雖然 Goroutine 在運(yùn)行時中定義的狀態(tài)非常多而且復(fù)雜,但是我們可以將這些不同的狀態(tài)聚合成最終的三種:等待中婿奔、可運(yùn)行缺狠、運(yùn)行中,在運(yùn)行期間我們會在這三種不同的狀態(tài)來回切換:
- 等待中:Goroutine 正在等待某些條件滿足萍摊,例如:系統(tǒng)調(diào)用結(jié)束等挤茄,包括
_Gwaiting
、_Gsyscall
和_Gpreempted
幾個狀態(tài)冰木; - 可運(yùn)行:Goroutine 已經(jīng)準(zhǔn)備就緒穷劈,可以在線程運(yùn)行,如果當(dāng)前程序中有非常多的 Goroutine踊沸,每個 Goroutine 就可能會等待更多的時間歇终,即
_Grunnable
; - 運(yùn)行中:Goroutine 正在某個線程上運(yùn)行逼龟,即
_Grunning
评凝;
上圖展示了 Goroutine 狀態(tài)遷移的常見路徑,其中包括創(chuàng)建 Goroutine 到 Goroutine 被執(zhí)行腺律、觸發(fā)系統(tǒng)調(diào)用或者搶占式調(diào)度器的狀態(tài)遷移過程奕短。
M
Go 語言并發(fā)模型中的 M 是操作系統(tǒng)線程。調(diào)度器最多可以創(chuàng)建 10000 個線程匀钧,但是其中大多數(shù)的線程都不會執(zhí)行用戶代碼(可能陷入系統(tǒng)調(diào)用)翎碑,最多只會有 GOMAXPROCS
個活躍線程能夠正常運(yùn)行。
在默認(rèn)情況下之斯,運(yùn)行時會將 GOMAXPROCS
設(shè)置成當(dāng)前機(jī)器的核數(shù)日杈,我們也可以使用 runtime.GOMAXPROCS
來改變程序中最大的線程數(shù)。
在默認(rèn)情況下,一個四核機(jī)器上會創(chuàng)建四個活躍的操作系統(tǒng)線程达椰,每一個線程都對應(yīng)一個運(yùn)行時中的 runtime.m
結(jié)構(gòu)體翰蠢。
在大多數(shù)情況下项乒,我們都會使用 Go 的默認(rèn)設(shè)置啰劲,也就是線程數(shù)等于 CPU 個數(shù),在這種情況下不會觸發(fā)操作系統(tǒng)的線程調(diào)度和上下文切換檀何,所有的調(diào)度都會發(fā)生在用戶態(tài)蝇裤,由 Go 語言調(diào)度器觸發(fā),能夠減少非常多的額外開銷频鉴。
操作系統(tǒng)線程在 Go 語言中會使用私有結(jié)構(gòu)體 runtime.m
來表示栓辜,這個結(jié)構(gòu)體中也包含了幾十個私有的字段,我們依然對其進(jìn)行了刪減垛孔,先來了解幾個與 Goroutine 直接相關(guān)的字段:
type m struct {
g0 *g
curg *g
...
}
其中 g0 是持有調(diào)度棧的 Goroutine藕甩,curg
是在當(dāng)前線程上運(yùn)行的用戶 Goroutine,這也是操作系統(tǒng)線程唯一關(guān)心的兩個 Goroutine周荐。
g0 是一個運(yùn)行時中比較特殊的 Goroutine狭莱,它會深度參與運(yùn)行時的調(diào)度過程,包括 Goroutine 的創(chuàng)建概作、大內(nèi)存分配和 CGO 函數(shù)的執(zhí)行腋妙。在后面的小節(jié)中,我們會經(jīng)逞堕牛看到 g0 的身影骤素。runtime.m
結(jié)構(gòu)體中還存在著三個處理器字段,它們分別表示正在運(yùn)行代碼的處理器 p
愚屁、暫存的處理器 nextp
和執(zhí)行系統(tǒng)調(diào)用之前的使用線程的處理器 oldp
:
type m struct {
p puintptr
nextp puintptr
oldp puintptr
}
除了在上面介紹的字段之外济竹,runtime.m
中還包含大量與線程狀態(tài)、鎖霎槐、調(diào)度规辱、系統(tǒng)調(diào)用有關(guān)的字段,我們會在分析調(diào)度過程時詳細(xì)介紹栽燕。
P
調(diào)度器中的處理器 P 是線程和 Goroutine 的中間層罕袋,它能提供線程需要的上下文環(huán)境,也會負(fù)責(zé)調(diào)度線程上的等待隊(duì)列碍岔,通過處理器 P 的調(diào)度浴讯,每一個內(nèi)核線程都能夠執(zhí)行多個 Goroutine,它能在 Goroutine 進(jìn)行一些 I/O 操作時及時切換蔼啦,提高線程的利用率榆纽。
因?yàn)檎{(diào)度器在啟動時就會創(chuàng)建 GOMAXPROCS
個處理器,所以 Go 語言程序的處理器數(shù)量一定會等于 GOMAXPROCS
,這些處理器會綁定到不同的內(nèi)核線程上并利用線程的計算資源運(yùn)行 Goroutine奈籽。
runtime.p
是處理器的運(yùn)行時表示饥侵,作為調(diào)度器的內(nèi)部實(shí)現(xiàn),它包含的字段也非常多衣屏,其中包括與性能追蹤躏升、垃圾回收和計時器相關(guān)的字段,這些字段也非常重要狼忱,但是在這里就不一一展示了膨疏,我們主要關(guān)注處理器中的線程和運(yùn)行隊(duì)列:
type p struct {
m muintptr
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
...
}
反向存儲的線程維護(hù)著線程與處理器之間的關(guān)系,而 runhead
钻弄、runqtail
和 runq
三個字段表示處理器持有的運(yùn)行隊(duì)列佃却,其中存儲著待執(zhí)行的 Goroutine 列表,runnext
中是線程下一個需要執(zhí)行的 Goroutine窘俺。
runtime.p
結(jié)構(gòu)體中的狀態(tài) status
字段會是以下五種中的一種:
狀態(tài) | 描述 |
---|---|
_Pidle |
處理器沒有運(yùn)行用戶代碼或者調(diào)度器饲帅,被空閑隊(duì)列或者改變其狀態(tài)的結(jié)構(gòu)持有,運(yùn)行隊(duì)列為空 |
_Prunning |
被線程 M 持有瘤泪,并且正在執(zhí)行用戶代碼或者調(diào)度器 |
_Psyscall |
沒有執(zhí)行用戶代碼灶泵,當(dāng)前線程陷入系統(tǒng)調(diào)用 |
_Pgcstop |
被線程 M 持有,當(dāng)前處理器由于垃圾回收被停止 |
_Pdead |
當(dāng)前處理器已經(jīng)不被使用 |
通過分析處理器 P 的狀態(tài)均芽,我們能夠?qū)μ幚砥鞯墓ぷ鬟^程有一些簡單理解丘逸,例如處理器在執(zhí)行用戶代碼時會處于 _Prunning
狀態(tài),在當(dāng)前線程執(zhí)行 I/O 操作時會陷入 _Psyscall
狀態(tài)掀宋。
小結(jié)
我們簡單介紹了 Go 語言調(diào)度器中常見的數(shù)據(jù)結(jié)構(gòu)深纲,包括線程 M、處理器 P 和 Goroutine G劲妙,它們在 Go 語言運(yùn)行時中分別使用不同的私有結(jié)構(gòu)體表示湃鹊,我們在下面會深入分析 Go 語言調(diào)度器的實(shí)現(xiàn)原理。
調(diào)度器啟動
調(diào)度器的啟動過程是我們平時比較難以接觸的過程镣奋,不過作為程序啟動前的準(zhǔn)備工作币呵,理解調(diào)度器的啟動過程對我們理解調(diào)度器的實(shí)現(xiàn)原理很有幫助,運(yùn)行時通過 runtime.schedinit
函數(shù)初始化調(diào)度器:
func schedinit() {
_g_ := getg()
...
sched.maxmcount = 10000
...
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}
在調(diào)度器初始函數(shù)執(zhí)行的過程中會將 maxmcount
設(shè)置成 10000侨颈,這也就是一個 Go 語言程序能夠創(chuàng)建的最大線程數(shù)余赢,雖然最多可以創(chuàng)建 10000 個線程,但是可以同時運(yùn)行的線程還是由 GOMAXPROCS
變量控制哈垢。
我們從環(huán)境變量 GOMAXPROCS
獲取了程序能夠同時運(yùn)行的最大處理器數(shù)之后就會調(diào)用 runtime.procresize
更新程序中處理器的數(shù)量妻柒,在這時整個程序不會執(zhí)行任何用戶 Goroutine,調(diào)度器也會進(jìn)入鎖定狀態(tài)耘分,runtime.procresize
是調(diào)度器啟動的最后一步举塔,在這一步過后調(diào)度器會完成相應(yīng)數(shù)量處理器的啟動绑警,等待用戶創(chuàng)建運(yùn)行新的 Goroutine 并為 Goroutine 調(diào)度處理器資源。
創(chuàng)建 Goroutine
想要啟動一個新的 Goroutine 來執(zhí)行任務(wù)時央渣,我們需要使用 Go 語言中的 go
關(guān)鍵字计盒,這個關(guān)鍵字會在編譯期間通過以下方法 cmd/compile/internal/gc.state.stmt
和 cmd/compile/internal/gc.state.call
兩個方法將該關(guān)鍵字轉(zhuǎn)換成 runtime.newproc
函數(shù)調(diào)用,該函數(shù)會接收大小和表示函數(shù)的指針 funcval
芽丹。在這個函數(shù)中我們還會獲取 Goroutine 以及調(diào)用方的程序計數(shù)器北启,然后調(diào)用 runtime.newproc1
函數(shù):
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
newproc1(fn, (*uint8)(argp), siz, gp, pc)
}
runtime.newproc1
會根據(jù)傳入?yún)?shù)初始化一個 g
結(jié)構(gòu)體,該函數(shù)實(shí)現(xiàn)可以分成以下幾個部分:
- 獲取或者創(chuàng)建新的 Goroutine 結(jié)構(gòu)體志衍,先從處理器的
gFree
列表中查找空閑的 Goroutine暖庄,如果不存在空閑的 Goroutine聊替,就會通過runtime.malg
函數(shù)創(chuàng)建一個棧大小足夠的新結(jié)構(gòu)體楼肪; - 將傳入的參數(shù)移到 Goroutine 的棧上,調(diào)用
runtime.memmove
函數(shù)將fn
函數(shù)的全部參數(shù)拷貝到棧上惹悄; - 更新 Goroutine 調(diào)度相關(guān)的屬性春叫,
runtime.newproc1
會設(shè)置新的 Goroutine 結(jié)構(gòu)體的參數(shù),包括棧指針泣港、程序計數(shù)器并更新其狀態(tài)到_Grunnable
暂殖; - 將 Goroutine 加入處理器的運(yùn)行隊(duì)列,將初始化好的 Goroutine 加入處理器的運(yùn)行隊(duì)列并在滿足條件時調(diào)用
runtime.wakep
函數(shù)喚醒新的處理執(zhí)行 Goroutine当纱;
調(diào)度循環(huán)
調(diào)度器啟動之后呛每,Go 語言運(yùn)行時會調(diào)用 runtime.mstart
以及 runtime.mstart1
,前者會初始化 g0 的 stackguard0
和 stackguard1
字段坡氯,后者會初始化線程并調(diào)用 runtime.schedule
進(jìn)入調(diào)度循環(huán):
func schedule() {
_g_ := getg()
top:
var gp *g
var inheritTime bool
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable()
}
execute(gp, inheritTime)
}
runtime.schedule
函數(shù)的會從不同地方查找待執(zhí)行的 Goroutine:
- 為了保證公平,當(dāng)全局運(yùn)行隊(duì)列中有待執(zhí)行的 Goroutine 時,通過
schedtick
保證有一定幾率會從全局的運(yùn)行隊(duì)列中查找對應(yīng)的 Goroutine烁涌; - 從處理器本地的運(yùn)行隊(duì)列中查找待執(zhí)行的 Goroutine帐姻;
- 如果前兩種方法都沒有找到 Goroutine,就會通過
runtime.findrunnable
進(jìn)行阻塞地查找 Goroutine悯恍;
runtime.findrunnable
函數(shù)的實(shí)現(xiàn)非常復(fù)雜库糠,這個 300 多行的函數(shù)通過以下的過程獲取可運(yùn)行的 Goroutine:
- 從本地運(yùn)行隊(duì)列、全局運(yùn)行隊(duì)列中查找涮毫;
- 從網(wǎng)絡(luò)輪詢器中查找是否有 Goroutine 等待運(yùn)行瞬欧;
- 通過
runtime.runqsteal
函數(shù)嘗試從其他隨機(jī)的處理器中竊取待運(yùn)行的 Goroutine,在該過程中還可能竊取處理器中的計時器罢防;
因?yàn)楹瘮?shù)的實(shí)現(xiàn)過于復(fù)雜艘虎,上述執(zhí)行過程是經(jīng)過大量簡化的,總而言之篙梢,當(dāng)前函數(shù)一定會返回一個可執(zhí)行的 Goroutine顷帖,如果當(dāng)前不存在就會阻塞等待美旧。
接下來由 runtime.execute
函數(shù)執(zhí)行獲取的 Goroutine,做好準(zhǔn)備工作后贬墩,它會通過 runtime.gogo
將 Goroutine 調(diào)度到當(dāng)前線程上榴嗅。
func execute(gp *g, inheritTime bool) {
_g_ := getg()
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
gogo(&gp.sched)
}
runtime.gogo
在不同處理器架構(gòu)上的實(shí)現(xiàn)都不同,但是不同的實(shí)現(xiàn)也都大同小異陶舞,下面是該函數(shù)在 386 架構(gòu)上的實(shí)現(xiàn):
TEXT runtime·gogo(SB), NOSPLIT, $8-4
MOVL buf+0(FP), BX // 獲取調(diào)度信息
MOVL gobuf_g(BX), DX
MOVL 0(DX), CX // 保證 Goroutine 不為空
get_tls(CX)
MOVL DX, g(CX)
MOVL gobuf_sp(BX), SP // 將 runtime.goexit 函數(shù)的 PC 恢復(fù)到 SP 中
MOVL gobuf_ret(BX), AX
MOVL gobuf_ctxt(BX), DX
MOVL $0, gobuf_sp(BX)
MOVL $0, gobuf_ret(BX)
MOVL $0, gobuf_ctxt(BX)
MOVL gobuf_pc(BX), BX // 獲取待執(zhí)行函數(shù)的程序計數(shù)器
JMP BX // 開始執(zhí)行
該函數(shù)的實(shí)現(xiàn)非常巧妙嗽测,它從 runtime.gobuf
中取出了 runtime.goexit
的程序計數(shù)器和待執(zhí)行函數(shù)的程序計數(shù)器,其中:
-
runtime.goexit
的程序計數(shù)器被放到了棧 SP 上肿孵; - 待執(zhí)行函數(shù)的程序計數(shù)器被放到了寄存器 BX 上唠粥;
runtime.gogo
就利用了 Go 語言的調(diào)用慣例成功模擬這一調(diào)用過程,通過以下幾個關(guān)鍵指令模擬 CALL
的過程:
MOVL gobuf_sp(BX), SP // 將 runtime.goexit 函數(shù)的 PC 恢復(fù)到 SP 中
MOVL gobuf_pc(BX), BX // 獲取待執(zhí)行函數(shù)的程序計數(shù)器
JMP BX // 開始執(zhí)行
當(dāng) Goroutine 中運(yùn)行的函數(shù)返回時就會跳轉(zhuǎn)到 runtime.goexit
所在位置執(zhí)行該函數(shù):
TEXT runtime·goexit(SB),NOSPLIT,$0-0
CALL runtime·goexit1(SB)
func goexit1() {
mcall(goexit0)
}
經(jīng)過套娃似的一系列函數(shù)調(diào)用停做,我們最終在當(dāng)前線程的 g0 的棧上調(diào)用 runtime.goexit0
函數(shù)晤愧,該函數(shù)會將 Goroutine 轉(zhuǎn)換會 _Gdead
狀態(tài)、清理其中的字段蛉腌、移除 Goroutine 和線程的關(guān)聯(lián)并調(diào)用 runtime.gfput
重新加入處理器的 Goroutine 空閑列表 gFree
:
func goexit0(gp *g) {
_g_ := getg()
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
...
gp.param = nil
gp.labels = nil
gp.timer = nil
dropg()
gfput(_g_.m.p.ptr(), gp)
schedule()
}
在最后 runtime.goexit0
函數(shù)會重新調(diào)用 runtime.schedule
觸發(fā)新的 Goroutine 調(diào)度官份,我們可以認(rèn)為調(diào)度循環(huán)永遠(yuǎn)都不會返回。
Go 語言中的運(yùn)行時調(diào)度循環(huán)會從 runtime.schedule
函數(shù)開始烙丛,最終又回到 runtime.schedule
舅巷;這里介紹的是 Goroutine 正常執(zhí)行并退出的邏輯,實(shí)際情況會復(fù)雜得多河咽,多數(shù)情況下 Goroutine 的執(zhí)行的過程中都會經(jīng)歷協(xié)作式或者搶占式調(diào)度钠右,這時會讓出線程的使用權(quán)等待調(diào)度器的喚醒。