go并發(fā)編程總結(jié)

本文從上下文Context句占、同步原語與鎖岛杀、Channel、調(diào)度器四個方面介紹Go語言是如何實(shí)現(xiàn)并發(fā)的壶唤。本文絕大部分內(nèi)容是從go并發(fā)編程系列文章學(xué)習(xí)總結(jié)而來雳灵。

上下文Context

上下文context.Context是用來設(shè)置截止日期、同步信號视粮,傳遞請求相關(guān)值的結(jié)構(gòu)體细办。上下文與Gorutine有比較密切的關(guān)系。context.Context是Go語言中獨(dú)特的設(shè)計(java中至少沒有類似設(shè)計)蕾殴。context.Context是Go語言在1.7版本中引入標(biāo)準(zhǔn)庫的接口笑撞,該接口定義了四個需要實(shí)現(xiàn)的方法,如下:

type Context interface {
    Deadline() (deadline time.Time, ok bool) //返回context.Context被取消的時間钓觉,也就是完成工作的截止日期茴肥;
    Done() <-chan struct{} //返回一個Channel,這個Channel會在當(dāng)前工作完成或者上下文被取消之后關(guān)閉荡灾,多次調(diào)用Done方法返回同一個Channel瓤狐;
    Err() error //返回context.Context結(jié)束的原因,它只會在Done返回的Channel被關(guān)閉時才會返回非空的值:a)如果context.Context被取消批幌,會返回Canceled錯誤础锐;b)如果context.Context超時如贷,會返回DeadlineExceeded錯誤婿屹。
    Value(key interface{}) interface{} //從context.Context中獲取鍵對應(yīng)的值
}

context包中提供的 context.Backgroundcontext.TODO溢十、context.WithDeadlinecontext.WithValue 函數(shù)會返回實(shí)現(xiàn)該接口的私有結(jié)構(gòu)體

設(shè)計原理

使用 Context 同步信號
我們可以通過一個代碼片段了解 context.Context 是如何對信號進(jìn)行同步的截粗。在這段代碼中信姓,我們創(chuàng)建了一個過期時間為 1s 的上下文,并向上下文傳入 handle 函數(shù)绸罗,該方法會使用 500ms 的時間處理傳入的『請求』:

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) //1秒之后會往ctx通道推入一個錯誤信息
    defer cancel()

    go handle(ctx, 500*time.Millisecond)
    select {
    case <-ctx.Done(): // ctx.Done()返回通道意推,等待通道信息
        fmt.Println("main", ctx.Err())
    }
}

func handle(ctx context.Context, duration time.Duration) {
    select {
    case <-ctx.Done():
        fmt.Println("handle", ctx.Err())
    case <-time.After(duration):
        fmt.Println("process request with", duration)
    }
}

因?yàn)檫^期時間大于處理時間,所以我們有足夠的時間處理該『請求』珊蟀,運(yùn)行上述代碼會打印出如下所示的內(nèi)容:

$ go run context.go
process request with 500ms
main context deadline exceeded

handle 函數(shù)沒有進(jìn)入超時的 select 分支菊值,但是 main 函數(shù)的 select 卻會等待 context.Context 的超時并打印出 main context deadline exceeded

取消信號

context.WithCancel 函數(shù)能夠從 context.Context 中衍生出一個新的子上下文并返回用于取消該上下文的函數(shù)(CancelFunc)。一旦我們執(zhí)行返回的取消函數(shù)俊性,當(dāng)前上下文以及它的子上下文都會被取消略步,所有的 Goroutine 都會同步收到這一取消信號。
我們直接從 context.WithCancel 函數(shù)的實(shí)現(xiàn)來看它到底做了什么:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := newCancelCtx(parent)
    propagateCancel(parent, &c)
    return &c, func() { c.cancel(true, Canceled) }
}

func propagateCancel(parent Context, child canceler) {
    done := parent.Done()
    if done == nil { //當(dāng) parent.Done() == nil杭煎,也就是 parent 不會觸發(fā)取消事件時,當(dāng)前函數(shù)會直接返回卒落;
        return 
    }
    select {
    case <-done:
        child.cancel(false, parent.Err()) // 父上下文已經(jīng)被取消
        return
    default:
    }

    if p, ok := parentCancelCtx(parent); ok { //判斷parent是否是帶cancel的上下文
        p.mu.Lock()
        if p.err != nil { //如果已經(jīng)被取消羡铲,child 會立刻被取消;
            child.cancel(false, p.err)
        } else { 
            p.children[child] = struct{}{} //如果沒有被取消儡毕,child 會被加入 parent 的 children 列表中也切,等待 parent 釋放取消信號
        }
        p.mu.Unlock()
    } else {
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

context.propagateCancel 的作用是在 parentchild 之間同步取消和結(jié)束的信號,保證在 parent 被取消時腰湾,child 也會收到對應(yīng)的信號雷恃,不會發(fā)生狀態(tài)不一致的問題。

context.cancelCtx 實(shí)現(xiàn)的幾個接口方法也沒有太多值得分析的地方费坊,該結(jié)構(gòu)體最重要的方法是 cancel倒槐,這個方法會關(guān)閉上下文中的 Channel 并向所有的子上下文同步取消信號:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return
    }
    c.err = err
    if c.done == nil {
        c.done = closedchan
    } else {
        close(c.done)
    }
    for child := range c.children {
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
        removeChild(c.Context, c)
    }
}

除了 context.WithCancel 之外,context 包中的另外兩個函數(shù) context.WithDeadlinecontext.WithTimeout 也都能創(chuàng)建可以被取消的計時器上下文 context.timerCtx

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        return WithCancel(parent)
    }
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)
    dur := time.Until(d)
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // 已經(jīng)過了截止日期
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}

context.WithDeadline 方法在創(chuàng)建 context.timerCtx 的過程中附井,判斷了父上下文的截止日期與當(dāng)前日期讨越,并通過 time.AfterFunc 創(chuàng)建定時器,當(dāng)時間超過了截止日期后會調(diào)用 context.timerCtx.cancel 方法同步取消信號永毅。
context.timerCtx 結(jié)構(gòu)體內(nèi)部不僅通過嵌入了context.cancelCtx 結(jié)構(gòu)體繼承了相關(guān)的變量和方法把跨,還通過持有的定時器 timer 和截止時間 deadline 實(shí)現(xiàn)了定時取消這一功能:

type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}

context.timerCtx.cancel 方法不僅調(diào)用了 context.cancelCtx.cancel,還會停止持有的定時器減少不必要的資源浪費(fèi)沼死。

同步原語與鎖

Go 語言作為一個原生支持用戶態(tài)進(jìn)程(Goroutine)的語言着逐,當(dāng)提到并發(fā)編程、多線程編程時漫雕,往往都離不開鎖這一概念。鎖是一種并發(fā)編程中的同步原語(Synchronization Primitives)峰鄙,它能保證多個 Goroutine 在訪問同一片內(nèi)存時不會出現(xiàn)競爭條件(Race condition)等問題浸间。
Go 語言在 sync 包中提供了用于同步的一些基本原語,包括常見的 sync.Mutex吟榴、sync.RWMutex魁蒜、sync.WaitGroupsync.Oncesync.Cond
這些基本原語提高了較為基礎(chǔ)的同步功能,但是它們是一種相對原始的同步機(jī)制兜看,在多數(shù)情況下锥咸,我們都應(yīng)該使用抽象層級的更高的 Channel 實(shí)現(xiàn)同步。

Mutex

Go 語言的 sync.Mutex 由兩個字段 statesema 組成细移。其中 state 表示當(dāng)前互斥鎖的狀態(tài)搏予,而 sema 是用于控制鎖狀態(tài)的信號量。

type Mutex struct {
    state int32
    sema  uint32
}

上述兩個加起來只占 8 字節(jié)空間的結(jié)構(gòu)體表示了 Go 語言中的互斥鎖弧轧,非常輕量級雪侥。
狀態(tài)
互斥鎖的狀態(tài)比較復(fù)雜,如下圖所示精绎,最低三位分別表示 mutexLocked速缨、mutexWokenmutexStarving,剩下的位置用來表示當(dāng)前有多少個 Goroutine 等待互斥鎖的釋放:

golang-mutex-state

在默認(rèn)情況下代乃,互斥鎖的所有狀態(tài)位都是 0旬牲,int32 中的不同位分別表示了不同的狀態(tài):

  • mutexLocked — 表示互斥鎖的鎖定狀態(tài);
  • mutexWoken — 表示從正常模式被從喚醒搁吓;
  • mutexStarving — 當(dāng)前的互斥鎖進(jìn)入饑餓狀態(tài)原茅;
  • waitersCount — 當(dāng)前互斥鎖上等待的 Goroutine 個數(shù);

正常模式和饑餓模式
sync.Mutex 有兩種模式 — 正常模式和饑餓模式擎浴。我們需要在這里先了解正常模式和饑餓模式都是什么员咽,它們有什么樣的關(guān)系。
在正常模式下贮预,鎖的等待者會按照先進(jìn)先出的順序獲取鎖贝室。但是剛被喚起的 Goroutine 與新創(chuàng)建的 Goroutine 競爭時,大概率會獲取不到鎖仿吞,為了減少這種情況的出現(xiàn)滑频,一旦 Goroutine 超過 1ms 沒有獲取到鎖,它就會將當(dāng)前互斥鎖切換饑餓模式唤冈,防止部分 Goroutine 被『餓死』峡迷。
饑餓模式是在 Go 語言 1.9 版本引入的優(yōu)化,引入的目的是保證互斥鎖的公平性(Fairness)你虹。
在饑餓模式中绘搞,互斥鎖會直接交給等待隊(duì)列最前面的 Goroutine。新的 Goroutine 在該狀態(tài)下不能獲取鎖傅物、也不會進(jìn)入自旋狀態(tài)夯辖,它們只會在隊(duì)列的末尾等待。如果一個 Goroutine 獲得了互斥鎖并且它在隊(duì)列的末尾或者它等待的時間少于 1ms董饰,那么當(dāng)前的互斥鎖就會被切換回正常模式蒿褂。
相比于饑餓模式圆米,正常模式下的互斥鎖能夠提供更好地性能,饑餓模式的能避免 Goroutine 由于陷入等待無法獲取鎖而造成的高尾延時啄栓。

加鎖和解鎖
互斥鎖的加鎖和解鎖過程使用 sync.Mutex.Locksync.Mutex.Unlock 方法娄帖。
互斥鎖的加鎖是靠 sync.Mutex.Lock 完成的,最新的 Go 語言源代碼中已經(jīng)將 sync.Mutex.Lock 方法進(jìn)行了簡化昙楚,方法的主干只保留最常見近速、簡單的情況 — 當(dāng)鎖的狀態(tài)是 0 時,將 mutexLocked 位置成 1:

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

如果互斥鎖的狀態(tài)不是 0 時就會調(diào)用 sync.Mutex.lockSlow 嘗試通過自旋(Spinnig)等方式等待鎖的釋放桂肌,該方法的主體是一個非常大 for 循環(huán)数焊,這里將該方法分成幾個部分介紹獲取鎖的過程:

  1. 判斷當(dāng)前 Goroutine 能否進(jìn)入自旋;
  2. 通過自旋等待互斥鎖的釋放崎场;
  3. 計算互斥鎖的最新狀態(tài)佩耳;
  4. 更新互斥鎖的狀態(tài)并獲取鎖;

我們先來介紹互斥鎖是如何判斷當(dāng)前 Goroutine 能否進(jìn)入自旋等互斥鎖的釋放:

func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
                //互斥鎖只有在普通模式才能進(jìn)入自旋谭跨。
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { //runtime_canSpin為true條件:1)運(yùn)行在多 CPU 的機(jī)器上干厚;2)當(dāng)前 Goroutine 為了獲取該鎖進(jìn)入自旋的次數(shù)小于四次;3)當(dāng)前機(jī)器上至少存在一個正在運(yùn)行的處理器 P 并且處理的運(yùn)行隊(duì)列為空螃宙;
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin() // 自旋蛮瞄,該指令只會占用 CPU 并消耗 CPU 時間。
            iter++
            old = m.state
            continue
        }
        ......

自旋是一種多線程同步機(jī)制谆扎,當(dāng)前的進(jìn)程在進(jìn)入自旋的過程中會一直保持 CPU 的占用挂捅,持續(xù)檢查某個條件是否為真。在多核的 CPU 上堂湖,自旋可以避免 Goroutine 的切換闲先,使用恰當(dāng)會對性能帶來很大的增益,但是使用的不恰當(dāng)就會拖慢整個程序无蜂。

一旦當(dāng)前 Goroutine 能夠進(jìn)入自旋就會調(diào)用sync.runtime_doSpinruntime.procyield 并執(zhí)行 30 次的 PAUSE 指令伺糠,該指令只會占用 CPU 并消耗 CPU 時間。
處理了自旋相關(guān)的特殊邏輯之后斥季,互斥鎖會根據(jù)上下文計算當(dāng)前互斥鎖最新的狀態(tài)训桶。幾個不同的條件分別會更新 state 字段中存儲的不同信息 — mutexLockedmutexStarving酣倾、mutexWokenmutexWaiterShift舵揭。計算了新的互斥鎖狀態(tài)之后,就會使用 CAS 函數(shù)更新狀態(tài)躁锡。
如果我們沒有通過 CAS 獲得鎖午绳,會調(diào)用 sync.runtime_SemacquireMutex 使用信號量(關(guān)于信號量實(shí)現(xiàn)進(jìn)程/線程之間通信原理可以自行搜索,此處不深入講解)保證資源不會被兩個 Goroutine 獲取稚铣。sync.runtime_SemacquireMutex 會在方法中不斷調(diào)用嘗試獲取鎖并休眠當(dāng)前 Goroutine 等待信號量的釋放箱叁,一旦當(dāng)前 Goroutine 可以獲取信號量,它就會立刻返回惕医,sync.Mutex.Lock 方法的剩余代碼也會繼續(xù)執(zhí)行耕漱。

  • 在正常模式下,這段代碼會設(shè)置喚醒和饑餓標(biāo)記抬伺、重置迭代次數(shù)并重新執(zhí)行獲取鎖的循環(huán)螟够;
  • 在饑餓模式下,當(dāng)前 Goroutine 會獲得互斥鎖峡钓,如果等待隊(duì)列中只存在當(dāng)前 Goroutine妓笙,互斥鎖還會從饑餓模式中退出;

互斥鎖的解鎖過程 sync.Mutex.Unlock 與加鎖過程相比就很簡單能岩,該過程會先使用 AddInt32 函數(shù)快速解鎖寞宫,這時會發(fā)生下面的兩種情況:

  • 如果該函數(shù)返回的新狀態(tài)等于 0,當(dāng)前 Goroutine 就成功解鎖了互斥鎖拉鹃;
  • 如果該函數(shù)返回的新狀態(tài)不等于 0辈赋,這段代碼會調(diào)用 sync.Mutex.unlockSlow 方法開始慢速解鎖:
func (m *Mutex) Unlock() {
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

sync.Mutex.unlockSlow 方法首先會校驗(yàn)鎖狀態(tài)的合法性 — 如果當(dāng)前互斥鎖已經(jīng)被解鎖過了就會直接拋出異常 sync: unlock of unlocked mutex 中止當(dāng)前程序。

在正常情況下會根據(jù)當(dāng)前互斥鎖的狀態(tài)膏燕,分別處理正常模式和饑餓模式下的互斥鎖:

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 { // 正常模式
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else { // 饑餓模式
        runtime_Semrelease(&m.sema, true, 1)
    }
}

  • 在正常模式下钥屈,這段代碼會分別處理以下兩種情況處理;
    • 如果互斥鎖不存在等待者或者互斥鎖的 mutexLocked坝辫、mutexStarving篷就、mutexWoken 狀態(tài)不都為 0,那么當(dāng)前方法就可以直接返回近忙,不需要喚醒其他等待者竭业;
    • 如果互斥鎖存在等待者,會通過 sync.runtime_Semrelease 喚醒等待者并移交鎖的所有權(quán)银锻;
  • 在饑餓模式下永品,上述代碼會直接調(diào)用 sync.runtime_Semrelease 方法將當(dāng)前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖击纬,在這時互斥鎖還不會退出饑餓狀態(tài)鼎姐;

小結(jié)
我們已經(jīng)從多個方面分析了互斥鎖 sync.Mutex 的實(shí)現(xiàn)原理,在這里我們從加鎖和解鎖兩個方面總結(jié)一下結(jié)論和注意事項(xiàng)更振。
互斥鎖的加鎖過程比較復(fù)雜炕桨,它涉及自旋、信號量以及調(diào)度等概念:

  • 如果互斥鎖處于初始化狀態(tài)肯腕,就會直接通過置位 mutexLocked 加鎖献宫;
  • 如果互斥鎖處于 mutexLocked 并且在普通模式下工作,就會進(jìn)入自旋实撒,執(zhí)行 30 次 PAUSE 指令消耗 CPU 時間等待鎖的釋放姊途;
  • 如果當(dāng)前 Goroutine 等待鎖的時間超過了 1ms涉瘾,互斥鎖就會切換到饑餓模式;
  • 互斥鎖在正常情況下會通過 sync.runtime_SemacquireMutex 函數(shù)將嘗試獲取鎖的 Goroutine 切換至休眠狀態(tài)捷兰,等待鎖的持有者喚醒當(dāng)前 Goroutine立叛;
  • 如果當(dāng)前 Goroutine 是互斥鎖上的最后一個等待的協(xié)程或者等待的時間小于 1ms,當(dāng)前 Goroutine 會將互斥鎖切換回正常模式贡茅;

互斥鎖的解鎖過程與之相比就比較簡單秘蛇,其代碼行數(shù)不多、邏輯清晰顶考,也比較容易理解:

  • 當(dāng)互斥鎖已經(jīng)被解鎖時赁还,那么調(diào)用 sync.Mutex.Unlock 會直接拋出異常;
  • 當(dāng)互斥鎖處于饑餓模式時驹沿,會直接將鎖的所有權(quán)交給隊(duì)列中的下一個等待者艘策,等待者會負(fù)責(zé)設(shè)置 mutexLocked 標(biāo)志位;
  • 當(dāng)互斥鎖處于普通模式時渊季,如果沒有 Goroutine 等待鎖的釋放或者已經(jīng)有被喚醒的 Goroutine 獲得了鎖柬焕,就會直接返回;在其他情況下會通過 sync.runtime_Semrelease 喚醒對應(yīng)的 Goroutine梭域;

RWMutex

讀寫互斥鎖 sync.RWMutex 是細(xì)粒度的互斥鎖斑举,它不限制資源的并發(fā)讀,但是讀寫病涨、寫寫操作無法并行執(zhí)行富玷。

結(jié)構(gòu)體
sync.RWMutex 中總共包含以下 5 個字段:

type RWMutex struct {
    w           Mutex
    writerSem   uint32
    readerSem   uint32
    readerCount int32
    readerWait  int32
}

  • w — 復(fù)用互斥鎖提供的能力;
  • writerSemreaderSem — 分別用于寫等待讀和讀等待寫:
  • readerCount 存儲了當(dāng)前正在執(zhí)行的讀操作的數(shù)量既穆;
  • readerWait 表示當(dāng)寫操作被阻塞時等待的讀操作個數(shù)赎懦;

我們會依次分析獲取寫鎖和讀鎖的實(shí)現(xiàn)原理,其中:

寫鎖
當(dāng)資源的使用者想要獲取寫鎖時,需要調(diào)用 sync.RWMutex.Lock 方法:

func (rw *RWMutex) Lock() {
    rw.w.Lock()
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}
  1. 調(diào)用結(jié)構(gòu)體持有的 sync.Mutexsync.Mutex.Lock 方法阻塞后續(xù)的寫操作囊颅;
    • 因?yàn)榛コ怄i已經(jīng)被獲取当悔,其他 Goroutine 在獲取寫鎖時就會進(jìn)入自旋或者休眠;
  2. 調(diào)用 atomic.AddInt32 方法阻塞后續(xù)的讀操作:
  3. 如果仍然有其他 Goroutine 持有互斥鎖的讀鎖(r != 0)踢代,該 Goroutine 會調(diào)用 sync.runtime_SemacquireMutex 進(jìn)入休眠狀態(tài)等待所有讀鎖所有者執(zhí)行結(jié)束后釋放 writerSem 信號量將當(dāng)前協(xié)程喚醒盲憎。

寫鎖的釋放會調(diào)用 sync.RWMutex.Unlock 方法:

func (rw *RWMutex) Unlock() {
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        throw("sync: Unlock of unlocked RWMutex")
    }
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    rw.w.Unlock()
}

與加鎖的過程正好相反,寫鎖的釋放分以下幾個執(zhí)行:

  1. 調(diào)用 atomic.AddInt32 函數(shù)將變回正數(shù)胳挎,釋放讀鎖饼疙;
  2. 通過 for 循環(huán)觸發(fā)所有由于獲取讀鎖而陷入等待的 Goroutine:
  3. 調(diào)用 sync.Mutex.Unlock 方法釋放寫鎖;

獲取寫鎖時會先阻塞寫鎖的獲取慕爬,后阻塞讀鎖的獲取窑眯,這種策略能夠保證讀操作不會被連續(xù)的寫操作『餓死』屏积。

讀鎖
讀鎖的加鎖方法 sync.RWMutex.RLock 很簡單,該方法會通過 atomic.AddInt32readerCount 加一:

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

  1. 如果該方法返回負(fù)數(shù) — 其他 Goroutine 獲得了寫鎖磅甩,當(dāng)前 Goroutine 就會調(diào)用 sync.runtime_SemacquireMutex 陷入休眠等待鎖的釋放肾请;
  2. 如果該方法的結(jié)果為非負(fù)數(shù) — 沒有 Goroutine 獲得寫鎖,當(dāng)前方法就會成功返回更胖;

當(dāng) Goroutine 想要釋放讀鎖時,會調(diào)用如下所示的 sync.RWMutex.RUnlock 方法:

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        rw.rUnlockSlow(r)
    }
}

該方法會先減少正在讀資源的 readerCount 整數(shù)隔显,根據(jù) atomic.AddInt32 的返回值不同會分別進(jìn)行處理:

  • 如果返回值大于等于零 — 讀鎖直接解鎖成功却妨;
  • 如果返回值小于零 — 有一個正在執(zhí)行的寫操作,在這時會調(diào)用sync.RWMutex.rUnlockSlow 方法括眠;
func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        throw("sync: RUnlock of unlocked RWMutex")
    }
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

sync.RWMutex.rUnlockSlow 會減少獲取鎖的寫操作等待的讀操作數(shù) readerWait 并在所有讀操作都被釋放之后觸發(fā)寫操作的信號量 writerSem彪标,該信號量被觸發(fā)時晌块,調(diào)度器就會喚醒嘗試獲取寫鎖的 Goroutine憎蛤。

小結(jié)
讀寫互斥鎖 sync.RWMutex 雖然提供的功能非常復(fù)雜,不過因?yàn)樗⒃?sync.Mutex 上王悍,所以整體的實(shí)現(xiàn)上會簡單很多当船。我們總結(jié)一下讀鎖和寫鎖的關(guān)系:

  • 調(diào)用 sync.RWMutex.Lock 嘗試獲取寫鎖時题画;
    • 每次 sync.RWMutex.RUnlock 都會將 readerWait 其減一,當(dāng)它歸零時該 Goroutine 就會獲得寫鎖德频;
    • readerCount 減少 rwmutexMaxReaders 個數(shù)以阻塞后續(xù)的讀操作苍息;
  • 調(diào)用 sync.RWMutex.Unlock 釋放寫鎖時,會先通知所有的讀操作壹置,然后才會釋放持有的互斥鎖竞思;

讀寫互斥鎖在互斥鎖之上提供了額外的更細(xì)粒度的控制,能夠在讀操作遠(yuǎn)遠(yuǎn)多于寫操作時提升性能钞护。

WaitGroup

sync.WaitGroup 可以等待一組 Goroutine 的返回盖喷,一個比較常見的使用場景是批量發(fā)出 RPC 或者 HTTP 請求:

requests := []*Request{...}
wg := &sync.WaitGroup{}
wg.Add(len(requests))

for _, request := range requests {
    go func(r *Request) {
        defer wg.Done()
        // res, err := service.call(r)
    }(request)
}
wg.Wait()

我們可以通過 sync.WaitGroup 將原本順序執(zhí)行的代碼在多個 Goroutine 中并發(fā)執(zhí)行,加快程序處理的速度难咕。

結(jié)構(gòu)體
sync.WaitGroup 結(jié)構(gòu)體中的成員變量非常簡單课梳,其中只包含兩個成員變量:

type WaitGroup struct {
    noCopy noCopy
    state1 [3]uint32
}

  • noCopy — 保證 sync.WaitGroup 不會被開發(fā)者通過再賦值的方式拷貝;
  • state1 — 存儲著狀態(tài)和信號量余佃;

sync.noCopy 是一個特殊的私有結(jié)構(gòu)體惦界,tools/go/analysis/passes/copylock 包中的分析器會在編譯期間檢查被拷貝的變量中是否包含 sync.noCopy 結(jié)構(gòu)體,如果包含該結(jié)構(gòu)體就會在運(yùn)行時報錯咙冗。
sync.WaitGroup` 結(jié)構(gòu)體中還包含一個總共占用 12 字節(jié)(8 字節(jié)用戶存儲狀態(tài)沾歪,高32位及state1[1]代表counter,低32位state1[0]是等待gorutine數(shù)雾消,另外32位state1[2]是信號量)的數(shù)組灾搏,這個數(shù)組會存儲當(dāng)前結(jié)構(gòu)體的狀態(tài)挫望。

sync.WaitGroup 提供的私有方法 sync.WaitGroup.state 能夠幫我們從 state1 字段中取出它的狀態(tài)和信號量。

接口
sync.WaitGroup 對外暴露了三個方法 — sync.WaitGroup.Add狂窑、sync.WaitGroup.Waitsync.WaitGroup.Done媳板。

因?yàn)槠渲械?sync.WaitGroup.Done 只是向 sync.WaitGroup.Add 方法傳入了 -1,所以我們重點(diǎn)分析另外兩個方法 sync.WaitGroup.Addsync.WaitGroup.Wait

func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if v > 0 || w == 0 {
        return
    }
    *statep = 0
    for ; w != 0; w-- {
        runtime_Semrelease(semap, false, 0)
    }
}

sync.WaitGroup.Add 方法可以更新 sync.WaitGroup 中的計數(shù)器 counter泉哈。雖然 sync.WaitGroup.Add 方法傳入的參數(shù)可以為負(fù)數(shù)蛉幸,但是計數(shù)器只能是非負(fù)數(shù),一旦出現(xiàn)負(fù)數(shù)就會發(fā)生程序崩潰丛晦。當(dāng)調(diào)用計數(shù)器歸零奕纫,也就是所有任務(wù)都執(zhí)行完成時,就會通過 sync.runtime_Semrelease 喚醒處于等待狀態(tài)的所有 Goroutine烫沙。

sync.WaitGroup 的另一個方法 sync.WaitGroup.Wait 會在計數(shù)器大于 0 并且不存在等待的 Goroutine 時匹层,調(diào)用 sync.runtime_Semacquire 陷入睡眠狀態(tài)。

func (wg *WaitGroup) Wait() {
    statep, semap := wg.state()
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        if v == 0 {
            return
        }
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(semap)
            if +statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

當(dāng) sync.WaitGroup 的計數(shù)器歸零時锌蓄,當(dāng)陷入睡眠狀態(tài)的 Goroutine 就被喚醒升筏,上述方法會立刻返回。

小結(jié)
通過對 sync.WaitGroup 的分析和研究瘸爽,我們能夠得出以下結(jié)論:

Once

Go 語言標(biāo)準(zhǔn)庫中 sync.Once 可以保證在 Go 程序運(yùn)行期間的某段代碼只會執(zhí)行一次昼捍。在運(yùn)行如下所示的代碼時识虚,我們會看到如下所示的運(yùn)行結(jié)果:

func main() {
    o := &sync.Once{}
    for i := 0; i < 10; i++ {
        o.Do(func() {
            fmt.Println("only once")
        })
    }
}

$ go run main.go
only once

結(jié)構(gòu)體
每一個 sync.Once 結(jié)構(gòu)體中都只包含一個用于標(biāo)識代碼塊是否執(zhí)行過的 done 以及一個互斥鎖 sync.Mutex

type Once struct {
    done uint32
    m    Mutex
}

接口
sync.Once.Dosync.Once 結(jié)構(gòu)體對外唯一暴露的方法,該方法會接收一個入?yún)榭盏暮瘮?shù):

  • 如果傳入的函數(shù)已經(jīng)執(zhí)行過妒茬,就會直接返回担锤;
  • 如果傳入的函數(shù)沒有執(zhí)行過,就會調(diào)用 sync.Once.doSlow 執(zhí)行傳入的函數(shù):
func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 0 {
        o.doSlow(f)
    }
}

func (o *Once) doSlow(f func()) {
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

  1. 為當(dāng)前 Goroutine 獲取互斥鎖乍钻;
  2. 執(zhí)行傳入的無入?yún)⒑瘮?shù)肛循;
  3. 運(yùn)行延遲函數(shù)調(diào)用,將成員變量 done 更新成 1银择;

sync.Once 就會通過成員變量 done 確保函數(shù)不會執(zhí)行第二次多糠。

小結(jié)

作為用于保證函數(shù)執(zhí)行次數(shù)的 sync.Once 結(jié)構(gòu)體,它使用互斥鎖和 sync/atomic 包提供的方法實(shí)現(xiàn)了某個函數(shù)在程序運(yùn)行期間只能執(zhí)行一次的語義浩考。在使用該結(jié)構(gòu)體時夹孔,我們也需要注意以下的問題:

  • sync.Once.Do 方法中傳入的函數(shù)只會被執(zhí)行一次,哪怕函數(shù)中發(fā)生了 panic
  • 兩次調(diào)用 sync.Once.Do 方法傳入不同的函數(shù)也只會執(zhí)行第一次調(diào)用的函數(shù)搭伤;

Cond

Go 語言標(biāo)準(zhǔn)庫中的 sync.Cond 一個條件變量只怎,它可以讓一系列的 Goroutine 都在滿足特定條件時被喚醒。每一個 sync.Cond 結(jié)構(gòu)體在初始化時都需要傳入一個互斥鎖怜俐,我們可以通過下面的例子了解它的使用方法:

func main() {
    c := sync.NewCond(&sync.Mutex{})
    for i := 0; i < 10; i++ {
        go listen(c)
    }
    time.Sleep(1*time.Second)
    go broadcast(c)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

func broadcast(c *sync.Cond) {
    c.L.Lock()
    c.Broadcast() //喚起c等待gorutine隊(duì)列中所有的gorutine
    c.L.Unlock()
}

func listen(c *sync.Cond) {
    c.L.Lock()
    c.Wait() //阻塞等待信號
    fmt.Println("listen")
    c.L.Unlock()
}

$ go run main.go
listen
...
listen

上述代碼同時運(yùn)行了 11 個 Goroutine身堡,這 11 個 Goroutine 分別做了不同事情:

  • 10 個 Goroutine 通過 sync.Cond.Wait 等待特定條件的滿足;
  • 1 個 Goroutine 會調(diào)用 sync.Cond.Broadcast 方法通知所有陷入等待的 Goroutine拍鲤;

結(jié)構(gòu)體
sync.Cond 的結(jié)構(gòu)體中包含以下 4 個字段:

type Cond struct {
    noCopy  noCopy //用于保證結(jié)構(gòu)體不會在編譯期間拷貝
    L       Locker //用于保護(hù)內(nèi)部的 `notify` 字段贴谎,`Locker` 接口類型的變量
    notify  notifyList // 一個 Goroutine 的鏈表,它是實(shí)現(xiàn)同步機(jī)制的核心結(jié)構(gòu)
    checker copyChecker //用于禁止運(yùn)行期間發(fā)生的拷貝
}

type notifyList struct {
    wait uint32 //表示當(dāng)前正在等待的 Goroutine
    notify uint32 //表示當(dāng)前已經(jīng)通知到的 Goroutine

    lock mutex
    head *sudog //指向的鏈表的頭
    tail *sudog //指向的鏈表的尾
}

接口
sync.Cond 對外暴露的 sync.Cond.Wait 方法會將當(dāng)前 Goroutine 陷入休眠狀態(tài)季稳,它的執(zhí)行過程如下:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify) // runtime.notifyListAdd 的鏈接名擅这。將等待計數(shù)器加一并解鎖
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t) // runtime.notifyListWait 的鏈接名。 等待其他 Goroutine 的喚醒并加鎖
    c.L.Lock()
}

func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

runtime.notifyListWait 函數(shù)會獲取當(dāng)前 Goroutine 并將它追加到 Goroutine 通知鏈表的最末端:


func notifyListWait(l *notifyList, t uint32) {
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

除了將當(dāng)前 Goroutine 追加到鏈表的末端之外绞幌,我們還會調(diào)用 runtime.goparkunlock 將當(dāng)前 Goroutine 陷入休眠狀態(tài),該函數(shù)也是在 Go 語言切換 Goroutine 時經(jīng)常會使用的方法一忱,它會直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒莲蜘。

sync.Cond.Signalsync.Cond.Broadcast 方法就是用來喚醒調(diào)用 sync.Cond.Wait 陷入休眠的 Goroutine,它們兩個的實(shí)現(xiàn)有一些細(xì)微差別:

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

runtime.notifyListNotifyOne 函數(shù)只會從 sync.notifyList 鏈表中找到滿足 sudog.ticket == l.notify 條件的 Goroutine 并通過 readyWithTime 喚醒:

func notifyListNotifyOne(l *notifyList) {
    t := l.notify
    atomic.Store(&l.notify, t+1)

    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
}

runtime.notifyListNotifyAll 會依次通過 runtime.readyWithTime 函數(shù)喚醒鏈表中 Goroutine:

func notifyListNotifyAll(l *notifyList) {
    s := l.head
    l.head = nil
    l.tail = nil

    atomic.Store(&l.notify, atomic.Load(&l.wait))

    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Goroutine 的喚醒順序也是按照加入隊(duì)列的先后順序,先加入的會先被喚醒芬迄,而后加入的 Goroutine 需要等待調(diào)度器的調(diào)度问顷。

在一般情況下,我們都會先調(diào)用 sync.Cond.Wait 陷入休眠等待滿足期望條件禀梳,當(dāng)滿足喚醒條件時杜窄,就可以選擇使用 sync.Cond.Signal 或者 sync.Cond.Broadcast 喚醒一個或者全部的 Goroutine。

小結(jié)
sync.Cond 不是一個常用的同步機(jī)制算途,在遇到長時間條件無法滿足時塞耕,與使用 for {} 進(jìn)行忙碌等待相比,sync.Cond 能夠讓出處理器的使用權(quán)嘴瓤。在使用的過程中我們需要注意以下問題:

  • sync.Cond.Wait 方法在調(diào)用之前一定要使用獲取互斥鎖扫外,否則會觸發(fā)程序崩潰;
  • sync.Cond.Signal 方法喚醒的 Goroutine 都是隊(duì)列最前面廓脆、等待最久的 Goroutine筛谚;
  • sync.Cond.Broadcast 會按照一定順序廣播通知等待的全部 Goroutine;

擴(kuò)展原語

除了標(biāo)準(zhǔn)庫中提供的同步原語之外停忿,Go 語言還在子倉庫 sync 中提供了四種擴(kuò)展原語驾讲,x/sync/errgroup.Groupx/sync/semaphore.Weightedx/sync/singleflight.Groupx/sync/syncmap.Map蝎毡,其中的 x/sync/syncmap.Map 在 1.9 版本中被移植到了標(biāo)準(zhǔn)庫中厚柳。

下面簡單介紹 Go 語言在擴(kuò)展包中提供的三種同步原語,也就是 x/sync/errgroup.Group沐兵、x/sync/semaphore.Weightedx/sync/singleflight.Group别垮。

ErrGroup

x/sync/errgroup.Group 就為我們在一組 Goroutine 中提供了同步、錯誤傳播以及上下文取消的功能扎谎。
x/sync/errgroup.Group.Go 方法能夠創(chuàng)建一個 Goroutine 并在其中執(zhí)行傳入的函數(shù)碳想,而 x/sync/errgroup.Group.Wait 會等待所有 Goroutine 全部返回,該方法的不同返回結(jié)果也有不同的含義:

  • 如果返回錯誤 — 這一組 Goroutine 最少返回一個錯誤毁靶;
  • 如果返回空值 — 所有 Goroutine 都成功執(zhí)行胧奔;

結(jié)構(gòu)體
x/sync/errgroup.Group 結(jié)構(gòu)體組成如下:

type Group struct {
    cancel func() //創(chuàng)建context.Context時返回的取消函數(shù),用于在多個 Goroutine 之間同步取消信號预吆;
    wg sync.WaitGroup //用于等待一組 Goroutine 完成子任務(wù)的同步原語
    errOnce sync.Once //用于保證只接收一個子任務(wù)返回的錯誤
    err     error
}

這些字段共同組成了 x/sync/errgroup.Group 結(jié)構(gòu)體并為我們提供同步龙填、錯誤傳播以及上下文取消等功能。
x/sync/errgroup.Group 的實(shí)現(xiàn)沒有涉及底層和運(yùn)行時包中的 API拐叉,它只是對基本同步語義進(jìn)行了封裝以提供更加復(fù)雜的功能岩遗。在使用時,我們也需要注意以下的幾個問題:

  • x/sync/errgroup.Group 在出現(xiàn)錯誤或者等待結(jié)束后都會調(diào)用 context.Contextcancel 方法同步取消信號凤瘦;
  • 只有第一個出現(xiàn)的錯誤才會被返回宿礁,剩余的錯誤都會被直接拋棄;

Semaphore

信號量是在并發(fā)編程中常見的一種同步機(jī)制蔬芥,在需要控制訪問資源的進(jìn)程數(shù)量時就會用到信號量梆靖,它會保證持有的計數(shù)器在 0 到初始化的權(quán)重之間波動。

  • 每次獲取資源時都會將信號量中的計數(shù)器減去對應(yīng)的數(shù)值笔诵,在釋放時重新加回來返吻;
  • 當(dāng)遇到計數(shù)器大于信號量大小時就會進(jìn)入休眠等待其他線程釋放信號;

Go 語言的擴(kuò)展包中就提供了帶權(quán)重的信號量 x/sync/semaphore.Weighted乎婿,我們可以按照不同的權(quán)重對資源的訪問進(jìn)行管理思喊,這個結(jié)構(gòu)體對外也只暴露了四個方法:

結(jié)構(gòu)體
x/sync/semaphore.NewWeighted 方法能根據(jù)傳入的信號量最大權(quán)重創(chuàng)建一個 x/sync/semaphore.Weighted 結(jié)構(gòu)體指針:

func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.List
}

x/sync/semaphore.Weighted 結(jié)構(gòu)體中包含一個 waiters 列表纲辽,其中存儲著等待獲取資源的 Goroutine,除此之外它還包含當(dāng)前信號量的上限以及一個計數(shù)器 cur,這個計數(shù)器的范圍就是 [0, size]:

信號量中的計數(shù)器會隨著用戶對資源的訪問和釋放進(jìn)行改變拖吼,引入的權(quán)重概念能夠提供更細(xì)粒度的資源的訪問控制鳞上,盡可能滿足常見的用例。

帶權(quán)重的信號量確實(shí)有著更多的應(yīng)用場景吊档,這也是 Go 語言對外提供的唯一一種信號量實(shí)現(xiàn)篙议,在使用的過程中我們需要注意以下的幾個問題:

SingleFlight

x/sync/singleflight.Group 是 Go 語言擴(kuò)展包中提供了另一種同步原語,它能夠在一個服務(wù)中抑制對下游的多次重復(fù)請求葡秒。一個比較常見的使用場景是 — 我們在使用 Redis 對數(shù)據(jù)庫中的數(shù)據(jù)進(jìn)行緩存姻乓,發(fā)生緩存擊穿時,大量的流量都會打到數(shù)據(jù)庫上進(jìn)而影響服務(wù)的尾延時眯牧。
x/sync/singleflight.Group 能有效地解決這個問題蹋岩,它能夠限制對同一個 Key 的多次重復(fù)請求,減少對下游的瞬時流量炸站。
在資源的獲取非常昂貴時(例如:訪問緩存星澳、數(shù)據(jù)庫)疚顷,就很適合使用 x/sync/singleflight.Group 對服務(wù)進(jìn)行優(yōu)化旱易。我們來了解一下它的使用方法:

type service struct {
    requestGroup singleflight.Group
}

func (s *service) handleRequest(ctx context.Context, request Request) (Response, error) {
    v, err, _ := requestGroup.Do(request.Hash(), func() (interface{}, error) {
        rows, err := // select * from tables
        if err != nil {
            return nil, err
        }
        return rows, nil
    })
    if err != nil {
        return nil, err
    }
    return Response{
        rows: rows,
    }, nil
}

因?yàn)檎埱蟮墓T跇I(yè)務(wù)上一般表示相同的請求,所以上述代碼使用它作為請求的鍵腿堤。當(dāng)然阀坏,我們也可以選擇其他的唯一字段作為 x/sync/singleflight.Group.Do 方法的第一個參數(shù)減少重復(fù)的請求。
結(jié)構(gòu)體
x/sync/singleflight.Group 結(jié)構(gòu)體由一個互斥鎖sync.Mutex 和一個映射表組成笆檀,每一個 x/sync/singleflight.call 結(jié)構(gòu)體都保存了當(dāng)前調(diào)用對應(yīng)的信息:

type Group struct {
    mu sync.Mutex
    m  map[string]*call
}

type call struct {
    wg sync.WaitGroup

    val interface{}
    err error

    dups  int
    chans []chan<- Result
}

x/sync/singleflight.call 結(jié)構(gòu)體中的 valerr 字段都只會在執(zhí)行傳入的函數(shù)時賦值一次并在 sync.WaitGroup.Wait 返回時被讀燃商谩;dupschans 兩個字段分別存儲了抑制的請求數(shù)量以及用于同步結(jié)果的 Channel酗洒。

當(dāng)我們需要減少對下游的相同請求時士修,就可以使用 x/sync/singleflight.Group 來增加吞吐量和服務(wù)質(zhì)量,不過在使用的過程中我們也需要注意以下的幾個問題:

Channel

下面介紹管道 Channel 的設(shè)計原理、數(shù)據(jù)結(jié)構(gòu)和常見操作雹锣,例如 Channel 的創(chuàng)建网沾、發(fā)送、接收和關(guān)閉蕊爵。作為 Go 核心的數(shù)據(jù)結(jié)構(gòu)和 Goroutine 之間的通信方式辉哥,Channel 是支撐 Go 語言高性能并發(fā)編程模型的重要結(jié)構(gòu),我們首先需要了解 Channel 背后的設(shè)計原理以及它的底層數(shù)據(jù)結(jié)構(gòu)在辆。

設(shè)計原理

Go 語言中最常見的证薇、也是經(jīng)常被人提及的設(shè)計模式就是 — 不要通過共享內(nèi)存的方式進(jìn)行通信,而是應(yīng)該通過通信的方式共享內(nèi)存匆篓。在很多主流的編程語言中浑度,多個線程傳遞數(shù)據(jù)的方式一般都是共享內(nèi)存,為了解決線程沖突的問題鸦概,我們需要限制同一時間能夠讀寫這些變量的線程數(shù)量箩张,這與 Go 語言鼓勵的方式并不相同。
雖然我們在 Go 語言中也能使用共享內(nèi)存加互斥鎖進(jìn)行通信窗市,但是 Go 語言提供了一種不同的并發(fā)模型先慷,也就是通信順序進(jìn)程(Communicating sequential processes,CSP)咨察。兩個 Goroutine论熙,一個會向 Channel 中發(fā)送數(shù)據(jù),另一個會從 Channel 中接收數(shù)據(jù)摄狱。Channel 分為同步Channel和異步Channel:

  • 同步 Channel — 不需要緩沖區(qū)脓诡,發(fā)送方會直接將數(shù)據(jù)交給(Handoff)接收方;
  • 異步 Channel — 基于環(huán)形緩存的傳統(tǒng)生產(chǎn)者消費(fèi)者模型媒役;

數(shù)據(jù)結(jié)構(gòu)

Go 語言的 Channel 在運(yùn)行時使用 runtime.hchan 結(jié)構(gòu)體表示祝谚。我們在 Go 語言中創(chuàng)建新的 Channel 時,實(shí)際上創(chuàng)建的都是如下所示的結(jié)構(gòu)體:

type hchan struct {
    qcount   uint //Channel 中的元素個數(shù)
    dataqsiz uint //Channel 中的循環(huán)隊(duì)列的長度
    buf      unsafe.Pointer //Channel 的緩沖區(qū)數(shù)據(jù)指針
    elemsize uint16 //當(dāng)前 Channel 能夠收發(fā)的元素大小
    closed   uint32
    elemtype *_type //當(dāng)前 Channel 能夠收發(fā)的元素類型 
    sendx    uint  //Channel 的發(fā)送操作處理到的位置
    recvx    uint //Channel 的接收操作處理到的位置
    recvq    waitq // 當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的接收 Goroutine 列表
    sendq    waitq //當(dāng)前 Channel 由于緩沖區(qū)空間不足而阻塞的發(fā)送 Goroutine 列表

    lock mutex
}
等待隊(duì)列使用雙向鏈表 `runtime.waitq`
type waitq struct {
    first *sudog
    last  *sudog
}

runtime.sudog 表示一個在等待列表中的 Goroutine酣衷,該結(jié)構(gòu)體中存儲了阻塞的相關(guān)信息以及兩個分別指向前后 runtime.sudog 的指針交惯。

創(chuàng)建管道

Go 語言中所有 Channel 的創(chuàng)建都會使用 make 關(guān)鍵字。編譯器會將 make(chan int, 10) 表達(dá)式被轉(zhuǎn)換成 OMAKE 類型的節(jié)點(diǎn)穿仪,并在類型檢查階段OMAKE 類型的節(jié)點(diǎn)轉(zhuǎn)換成 OMAKECHAN 類型:

func typecheck1(n *Node, top int) (res *Node) {
    switch n.Op {
    case OMAKE:
        ...
        switch t.Etype {
        case TCHAN:
            l = nil
            if i < len(args) { // 帶緩沖區(qū)的異步 Channel
                ...
                n.Left = l
            } else { // 不帶緩沖區(qū)的同步 Channel
                n.Left = nodintconst(0)
            }
            n.Op = OMAKECHAN
        }
    }
}

這一階段會對傳入 make 關(guān)鍵字的緩沖區(qū)大小進(jìn)行檢查席爽,如果我們不向 make 傳遞表示緩沖區(qū)大小的參數(shù),那么就會設(shè)置一個默認(rèn)值 0啊片,也就是當(dāng)前的 Channel 不存在緩沖區(qū)只锻。

OMAKECHAN 類型的節(jié)點(diǎn)最終都會在 SSA 中間代碼生成階段之前被轉(zhuǎn)換成調(diào)用 runtime.makechan 或者 runtime.makechan64 的函數(shù):

func walkexpr(n *Node, init *Nodes) *Node {
    switch n.Op {
    case OMAKECHAN:
        size := n.Left
        fnname := "makechan64"
        argtype := types.Types[TINT64]

        if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
            fnname = "makechan"
            argtype = types.Types[TINT]
        }
        n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
    }
}

runtime.makechanruntime.makechan64 會根據(jù)傳入的參數(shù)類型和緩沖區(qū)大小創(chuàng)建一個新的 Channel 結(jié)構(gòu),其中后者用于處理緩沖區(qū)大小大于 2 的 32 次方的情況钠龙,我們重點(diǎn)關(guān)注 runtime.makechan 函數(shù):

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    mem, _ := math.MulUintptr(elem.size, uintptr(size))

    var c *hchan
    switch {
    case mem == 0: //當(dāng)前 Channel 中不存在緩沖區(qū)炬藤,那么就只會為 runtime.hchan分配一段內(nèi)存空間御铃;
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0: //Channel 中存儲的類型不是指針類型,就會為當(dāng)前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內(nèi)存空間沈矿;
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default: //  在默認(rèn)情況下會單獨(dú)為runtime.hchan和緩沖區(qū)分配內(nèi)存
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

在函數(shù)的最后會統(tǒng)一更新 runtime.hchanelemsize上真、elemtypedataqsiz 幾個字段。

發(fā)送數(shù)據(jù)

當(dāng)我們想要向 Channel 發(fā)送數(shù)據(jù)時羹膳,就需要使用 ch <- i 語句睡互,編譯器會將它解析成 OSEND 節(jié)點(diǎn)并在 cmd/compile/internal/gc.walkexpr 函數(shù)中轉(zhuǎn)換成 runtime.chansend1

func walkexpr(n *Node, init *Nodes) *Node {
    switch n.Op {
    case OSEND:
        n1 := n.Right
        n1 = assignconv(n1, n.Left.Type.Elem(), "chan send")
        n1 = walkexpr(n1, init)
        n1 = nod(OADDR, n1, nil)
        n = mkcall1(chanfn("chansend1", 2, n.Left.Type), nil, init, n.Left, n1)
    }
}

runtime.chansend1 只是調(diào)用了 runtime.chansend 并傳入 Channel 和需要發(fā)送的數(shù)據(jù)。runtime.chansend 是向 Channel 中發(fā)送數(shù)據(jù)時最終會調(diào)用的函數(shù)陵像,這個函數(shù)負(fù)責(zé)了發(fā)送數(shù)據(jù)的全部邏輯就珠,如果我們在調(diào)用時將 block 參數(shù)設(shè)置成 true,那么就表示當(dāng)前發(fā)送操作是一個阻塞操作:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

在發(fā)送數(shù)據(jù)的邏輯執(zhí)行之前會先為當(dāng)前 Channel 加鎖醒颖,防止發(fā)生競爭條件妻怎。如果 Channel 已經(jīng)關(guān)閉,那么向該 Channel 發(fā)送數(shù)據(jù)時就會報"send on closed channel" 錯誤并中止程序泞歉。

因?yàn)?runtime.chansend 函數(shù)的實(shí)現(xiàn)比較復(fù)雜逼侦,所以我們這里將該函數(shù)的執(zhí)行過程分成以下的三個部分:

  • 當(dāng)存在等待的接收者時,通過 runtime.send 直接將數(shù)據(jù)發(fā)送給阻塞的接收者腰耙;
  • 當(dāng)緩沖區(qū)存在空余空間時榛丢,將發(fā)送的數(shù)據(jù)寫入 Channel 的緩沖區(qū);
  • 當(dāng)不存在緩沖區(qū)或者緩沖區(qū)已滿時挺庞,等待其他 Goroutine 從 Channel 接收數(shù)據(jù)晰赞;

直接發(fā)送
如果目標(biāo) Channel 沒有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine色罚,那么 runtime.chansend 函數(shù)會從接收隊(duì)列 recvq 中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù):

    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

發(fā)送數(shù)據(jù)時會調(diào)用 runtime.send呻率,該函數(shù)的執(zhí)行可以分成兩個部分:

  1. 調(diào)用 runtime.sendDirect 函數(shù)將發(fā)送的數(shù)據(jù)直接拷貝到 x = <-c 表達(dá)式中變量 x 所在的內(nèi)存地址上娶牌;
  2. 調(diào)用 runtime.goready 將等待接收數(shù)據(jù)的 Goroutine 標(biāo)記成可運(yùn)行狀態(tài) Grunnable 并把該 Goroutine 放到發(fā)送方所在的處理器的 runnext 上等待執(zhí)行翰铡,該處理器在下一次調(diào)度時就會立刻喚醒數(shù)據(jù)的接收方;
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    goready(gp, skip+1)
}

需要注意的是蚓哩,發(fā)送數(shù)據(jù)的過程只是將接收方的 Goroutine 放到了處理器的 runnext 中,程序沒有立刻執(zhí)行該 Goroutine。

緩沖區(qū)
如果創(chuàng)建的 Channel 包含緩沖區(qū)并且 Channel 中的數(shù)據(jù)沒有裝滿隘谣,就會執(zhí)行下面這段代碼:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    if c.qcount < c.dataqsiz {
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    ...
}

在這里我們首先會使用 chanbuf 計算出下一個可以存儲數(shù)據(jù)的位置,然后通過 runtime.typedmemmove 將發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)中并增加 sendx 索引和 qcount 計數(shù)器啄巧。

如果當(dāng)前 Channel 的緩沖區(qū)未滿寻歧,向 Channel 發(fā)送的數(shù)據(jù)會存儲在 Channel 中 sendx 索引所在的位置并將 sendx 索引加一,由于這里的 buf 是一個循環(huán)數(shù)組秩仆,所以當(dāng) sendx 等于 dataqsiz 時就會重新回到數(shù)組開始的位置码泛。

阻塞發(fā)送
當(dāng) Channel 沒有接收者能夠處理數(shù)據(jù)時,向 Channel 發(fā)送數(shù)據(jù)就會被下游阻塞澄耍,當(dāng)然使用 select 關(guān)鍵字可以向 Channel 非阻塞地發(fā)送消息噪珊。向 Channel 阻塞地發(fā)送數(shù)據(jù)會執(zhí)行下面的代碼晌缘,我們可以簡單梳理一下這段代碼的邏輯:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ...
    if !block {
        unlock(&c.lock)
        return false
    }

    gp := getg() //獲取發(fā)送數(shù)據(jù)使用的 Goroutine
    mysg := acquireSudog() //函數(shù)獲取 runtime.sudog結(jié)構(gòu)體并設(shè)置這一次阻塞發(fā)送的相關(guān)信息,例如發(fā)送的 Channel痢站、是否在 Select 控制結(jié)構(gòu)中和待發(fā)送數(shù)據(jù)的內(nèi)存地址等
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    c.sendq.enqueue(mysg) //將剛剛創(chuàng)建并初始化的 runtime.sudog加入發(fā)送等待隊(duì)列磷箕,并設(shè)置到當(dāng)前 Goroutine 的 `waiting` 上,表示 Goroutine 正在等待該 `sudog` 準(zhǔn)備就緒

    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) //將當(dāng)前的 Goroutine 陷入沉睡等待喚醒
    //被調(diào)度器喚醒后會執(zhí)行一些收尾工作阵难,將一些屬性置零并且釋放 untime.sudog結(jié)構(gòu)體
    gp.waiting = nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true //返回 `true` 表示這向 Channel 發(fā)送數(shù)據(jù)的結(jié)束
}

小結(jié)
我們在這里可以簡單梳理和總結(jié)一下使用 ch <- i 表達(dá)式向 Channel 發(fā)送數(shù)據(jù)時遇到的幾種情況:

  1. 如果當(dāng)前 Channel 的 recvq 上存在已經(jīng)被阻塞的 Goroutine岳枷,那么會直接將數(shù)據(jù)發(fā)送給當(dāng)前的 Goroutine 并將其設(shè)置成下一個運(yùn)行的 Goroutine;
  2. 如果 Channel 存在緩沖區(qū)并且其中還有空閑的容量呜叫,我們就會直接將數(shù)據(jù)直接存儲到當(dāng)前緩沖區(qū) sendx 所在的位置上空繁;
  3. 如果不滿足上面的兩種情況,就會創(chuàng)建一個 runtime.sudog 結(jié)構(gòu)并將其加入 Channel 的 sendq 隊(duì)列中朱庆,當(dāng)前 Goroutine 也會陷入阻塞等待其他的協(xié)程從 Channel 接收數(shù)據(jù)盛泡;

發(fā)送數(shù)據(jù)的過程中包含幾個會觸發(fā) Goroutine 調(diào)度的時機(jī):

  1. 發(fā)送數(shù)據(jù)時發(fā)現(xiàn) Channel 上存在等待接收數(shù)據(jù)的 Goroutine,立刻設(shè)置處理器的 runnext 屬性娱颊,但是并不會立刻觸發(fā)調(diào)度饭于;
  2. 發(fā)送數(shù)據(jù)時并沒有找到接收方并且緩沖區(qū)已經(jīng)滿了,這時就會將自己加入 Channel 的 sendq 隊(duì)列并調(diào)用 runtime.goparkunlock 觸發(fā) Goroutine 的調(diào)度讓出處理器的使用權(quán)维蒙;

接收數(shù)據(jù)

我們接下來繼續(xù)介紹 Channel 操作的另一方 — 數(shù)據(jù)的接收掰吕。Go 語言中可以使用兩種不同的方式去接收 Channel 中的數(shù)據(jù):

i <- ch
i, ok <- ch

這兩種不同的方法經(jīng)過編譯器的處理都會變成 ORECV 類型的節(jié)點(diǎn),后者會在類型檢查階段被轉(zhuǎn)換成 OAS2RECV 類型颅痊。
雖然不同的接收方式會被轉(zhuǎn)換成 runtime.chanrecv1runtime.chanrecv2 兩種不同函數(shù)的調(diào)用殖熟,但是這兩個函數(shù)最終還是會調(diào)用 runtime.chanrecv
當(dāng)我們從一個空 Channel 接收數(shù)據(jù)時會直接調(diào)用 runtime.gopark 直接讓出處理器的使用權(quán)斑响。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

如果當(dāng)前 Channel 已經(jīng)被關(guān)閉并且緩沖區(qū)中不存在任何的數(shù)據(jù)菱属,那么就會清除 ep 指針中的數(shù)據(jù)并立刻返回。

除了上述兩種特殊情況舰罚,使用 runtime.chanrecv 從 Channel 接收數(shù)據(jù)時還包含以下三種不同情況:

  • 當(dāng)存在等待的發(fā)送者時纽门,通過 runtime.recv 直接從阻塞的發(fā)送者或者緩沖區(qū)中獲取數(shù)據(jù);
  • 當(dāng)緩沖區(qū)存在數(shù)據(jù)時营罢,從 Channel 的緩沖區(qū)中接收數(shù)據(jù)赏陵;
  • 當(dāng)緩沖區(qū)中不存在數(shù)據(jù)時,等待其他 Goroutine 向 Channel 發(fā)送數(shù)據(jù)饲漾;

這三種情況和信息發(fā)送是一一對應(yīng)的蝙搔,此處不再贅述。

小結(jié)
我們梳理一下從 Channel 中接收數(shù)據(jù)時可能會發(fā)生的五種情況:

  1. 如果 Channel 為空考传,那么就會直接調(diào)用 runtime.gopark 掛起當(dāng)前 Goroutine吃型;
  2. 如果 Channel 已經(jīng)關(guān)閉并且緩沖區(qū)沒有任何數(shù)據(jù),runtime.chanrecv 函數(shù)會直接返回僚楞;
  3. 如果 Channel 的 sendq 隊(duì)列中存在掛起的 Goroutine勤晚,就會將 recvx 索引所在的數(shù)據(jù)拷貝到接收變量所在的內(nèi)存空間上并將 sendq 隊(duì)列中 Goroutine 的數(shù)據(jù)拷貝到緩沖區(qū)枉层;
  4. 如果 Channel 的緩沖區(qū)中包含數(shù)據(jù)就會直接讀取 recvx 索引對應(yīng)的數(shù)據(jù);
  5. 在默認(rèn)情況下會掛起當(dāng)前的 Goroutine赐写,將 runtime.sudog 結(jié)構(gòu)加入 recvq 隊(duì)列并陷入休眠等待調(diào)度器的喚醒返干;

我們總結(jié)一下從 Channel 接收數(shù)據(jù)時,會觸發(fā) Goroutine 調(diào)度的兩個時機(jī):

  1. 當(dāng) Channel 為空時血淌;
  2. 當(dāng)緩沖區(qū)中不存在數(shù)據(jù)并且也不存在數(shù)據(jù)的發(fā)送者時矩欠;

關(guān)閉管道

編譯器會將用于關(guān)閉管道的 close 關(guān)鍵字轉(zhuǎn)換成 OCLOSE 節(jié)點(diǎn)以及 runtime.closechan 的函數(shù)調(diào)用。
當(dāng) Channel 是一個空指針或者已經(jīng)被關(guān)閉時悠夯,Go 語言運(yùn)行時都會直接 panic 并拋出異常:

func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

處理完了這些異常的情況之后就可以開始執(zhí)行關(guān)閉 Channel 的邏輯了癌淮,下面這段代碼的主要工作就是將 recvqsendq 兩個隊(duì)列中的數(shù)據(jù)加入到 Goroutine 列表 gList 中,與此同時該函數(shù)會清除所有 sudog 上未被處理的元素:

    c.closed = 1

    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = nil
        glist.push(gp)
    }

    for {
        sg := c.sendq.dequeue()
        ...
    }
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

該函數(shù)在最后會為所有被阻塞的 Goroutine 調(diào)用 runtime.goready 觸發(fā)調(diào)度沦补。

調(diào)度器

每個線程會都占用 1M以上的內(nèi)存空間乳蓄,在對線程進(jìn)行切換時不止會消耗較多的內(nèi)存,恢復(fù)寄存器中的內(nèi)容還需要向操作系統(tǒng)申請或者銷毀對應(yīng)的資源夕膀,每一次線程上下文的切換都需要消耗 ~1us 左右的時間虚倒,但是 Go 調(diào)度器對 Goroutine 的上下文切換約為 ~0.2us,減少了 80% 的額外開銷产舞。
Go 語言的調(diào)度器通過使用與CPU 數(shù)量相等的線程減少線程頻繁切換的內(nèi)存開銷魂奥,同時在每一個線程上執(zhí)行額外開銷更低的 Goroutine 來降低操作系統(tǒng)和硬件的負(fù)載。

設(shè)計原理

基于任務(wù)竊取的 Go 語言調(diào)度器使用了沿用至今的 G-M-P 模型易猫,我們能在 runtime: improved scheduler 提交中找到任務(wù)竊取調(diào)度器剛被實(shí)現(xiàn)時的源代碼耻煤,調(diào)度器的 runtime.schedule 函數(shù)在這個版本的調(diào)度器中反而更簡單了:

static void schedule(void) {
    G *gp;
 top:
    if(runtime·gcwaiting) {
        gcstopm(); //如果當(dāng)前運(yùn)行時在等待垃圾回收
        goto top;
    }
    //從本地或者全局的運(yùn)行隊(duì)列中獲取待執(zhí)行的 Goroutine
    gp = runqget(m->p);
    if(gp == nil)
        gp = findrunnable();

    ...
    //在當(dāng)前線程 M 上運(yùn)行 Goroutine
    execute(gp);
}

當(dāng)前處理器本地的運(yùn)行隊(duì)列中不包含 Goroutine 時,調(diào)用 findrunnable 函數(shù)會觸發(fā)工作竊取准颓,從其它的處理器的隊(duì)列中隨機(jī)獲取一些 Goroutine哈蝇。

運(yùn)行時 G-M-P 模型中引入的處理器 P 是線程和 Goroutine 的中間層,我們從它的結(jié)構(gòu)體中就能看到處理器與 M 和 G 的關(guān)系:

struct P {
    Lock;

    uint32  status;
    P*  link;
    uint32  tick;
    M*  m;
    MCache* mcache;

    G** runq;
    int32   runqhead;
    int32   runqtail;
    int32   runqsize;

    G*  gfree;
    int32   gfreecnt;
};

處理器持有一個由可運(yùn)行的 Goroutine 組成的運(yùn)行隊(duì)列 runq攘已,還反向持有一個線程炮赦。調(diào)度器在調(diào)度時會從處理器的隊(duì)列中選擇隊(duì)列頭的 Goroutine 放到線程 M 上執(zhí)行。
基于工作竊取的多線程調(diào)度器將每一個線程綁定到了獨(dú)立的 CPU 上样勃,這些線程會被不同處理器管理吠勘,不同的處理器通過工作竊取對任務(wù)進(jìn)行再分配實(shí)現(xiàn)任務(wù)的平衡,也能提升調(diào)度器和 Go 語言程序的整體性能彤灶,今天所有的 Go 語言服務(wù)都受益于這一改動看幼。

數(shù)據(jù)結(jié)構(gòu)

調(diào)度器的三個重要組成部分 — 線程 M批旺、Goroutine G 和處理器 P:
G — 表示 Goroutine幌陕,它是一個待執(zhí)行的任務(wù);
M — 表示操作系統(tǒng)的線程汽煮,它由操作系統(tǒng)的調(diào)度器調(diào)度和管理搏熄;
P — 表示處理器棚唆,它可以被看做運(yùn)行在線程上的本地調(diào)度器;
下面詳細(xì)介紹它們的作用心例、數(shù)據(jù)結(jié)構(gòu)以及在運(yùn)行期間可能處于的狀態(tài)宵凌。
G
Gorotuine 就是 Go 語言調(diào)度器中待執(zhí)行的任務(wù),它在運(yùn)行時調(diào)度器中的地位與線程在操作系統(tǒng)中差不多止后,但是它占用了更小的內(nèi)存空間瞎惫,也降低了上下文切換的開銷。
Goroutine 只存在于 Go 語言的運(yùn)行時译株,它是 Go 語言在用戶態(tài)提供的線程瓜喇,作為一種粒度更細(xì)的資源調(diào)度單元,如果使用得當(dāng)能夠在高并發(fā)的場景下更高效地利用機(jī)器的 CPU歉糜。
Goroutine 在 Go 語言運(yùn)行時使用私有結(jié)構(gòu)體 runtime.g 表示乘寒。這個私有結(jié)構(gòu)體非常復(fù)雜,總共包含 40 多個用于表示各種狀態(tài)的成員變量匪补,我們在這里也不會介紹全部字段伞辛,而是會挑選其中的一部分進(jìn)行介紹:

type g struct {
    stack       stack //stack 字段描述了當(dāng)前 Goroutine 的棧內(nèi)存范圍 [stack.lo, stack.hi)
    stackguard0 uintptr //用于調(diào)度器搶占式調(diào)度
    preempt       bool // 搶占信號
    preemptStop   bool // 搶占時將狀態(tài)修改成 `_Gpreempted`
    preemptShrink bool // 在同步安全點(diǎn)收縮棧
    _panic       *_panic // 最內(nèi)側(cè)的 panic 結(jié)構(gòu)體
    _defer       *_defer // 最內(nèi)側(cè)的延遲函數(shù)結(jié)構(gòu)體

    m              *m //當(dāng)前 Goroutine 占用的線程,可能為空夯缺;
    sched          gobuf //存儲 Goroutine 的調(diào)度相關(guān)的數(shù)據(jù)
    atomicstatus   uint32 //Goroutine 的狀態(tài)
    goid           int64 //Goroutine 的 ID蚤氏,該字段對開發(fā)者不可見,Go 團(tuán)隊(duì)認(rèn)為引入 ID 會讓部分 Goroutine 變得更特殊踊兜,從而限制語言的并發(fā)能力
}

type gobuf struct {
    sp   uintptr //棧指針(Stack Pointer)
    pc   uintptr //程序計數(shù)器(Program Counter)
    g    guintptr //持有 runtime.gobuf
    ret  sys.Uintreg //系統(tǒng)調(diào)用的返回值
    ...
}

這個在調(diào)度器保存或者恢復(fù)上下文的時候用到瞧捌,其中的棧指針和程序計數(shù)器會用來存儲或者恢復(fù)寄存器中的值,改變程序即將執(zhí)行的代碼润文。

結(jié)構(gòu)體 runtime.gatomicstatus 字段就存儲了當(dāng)前 Goroutine 的狀態(tài)姐呐。除了幾個已經(jīng)不被使用的以及與 GC 相關(guān)的狀態(tài)之外,Goroutine 可能處于以下 9 個狀態(tài):

狀態(tài) 描述
_Gidle 剛剛被分配并且還沒有被初始化
_Grunnable 沒有執(zhí)行代碼典蝌,沒有棧的所有權(quán)曙砂,存儲在運(yùn)行隊(duì)列中
_Grunning 可以執(zhí)行代碼,擁有棧的所有權(quán)骏掀,被賦予了內(nèi)核線程 M 和處理器 P
_Gsyscall 正在執(zhí)行系統(tǒng)調(diào)用鸠澈,擁有棧的所有權(quán),沒有執(zhí)行用戶代碼截驮,被賦予了內(nèi)核線程 M 但是不在運(yùn)行隊(duì)列上
_Gwaiting 由于運(yùn)行時而被阻塞笑陈,沒有執(zhí)行用戶代碼并且不在運(yùn)行隊(duì)列上,但是可能存在于 Channel 的等待隊(duì)列上
_Gdead 沒有被使用葵袭,沒有執(zhí)行代碼涵妥,可能有分配的棧
_Gcopystack 棧正在被拷貝,沒有執(zhí)行代碼坡锡,不在運(yùn)行隊(duì)列上
_Gpreempted 由于搶占而被阻塞蓬网,沒有執(zhí)行用戶代碼并且不在運(yùn)行隊(duì)列上窒所,等待喚醒
_Gscan GC 正在掃描棧空間帆锋,沒有執(zhí)行代碼吵取,可以與其他狀態(tài)同時存在

上述狀態(tài)中比較常見是 _Grunnable_Grunning锯厢、_Gsyscall皮官、_Gwaiting_Gpreempted 五個狀態(tài),我們會重點(diǎn)介紹這幾個狀態(tài)实辑,Goroutine 的狀態(tài)遷移是一個復(fù)雜的過程臣疑,觸發(fā) Goroutine 狀態(tài)遷移的方法也很多,在這里我們也沒有辦法介紹全部的遷移線路徙菠,我們會從中選擇一些進(jìn)行介紹讯沈。
雖然 Goroutine 在運(yùn)行時中定義的狀態(tài)非常多而且復(fù)雜,但是我們可以將這些不同的狀態(tài)聚合成最終的三種:等待中婿奔、可運(yùn)行缺狠、運(yùn)行中,在運(yùn)行期間我們會在這三種不同的狀態(tài)來回切換:

  • 等待中:Goroutine 正在等待某些條件滿足萍摊,例如:系統(tǒng)調(diào)用結(jié)束等挤茄,包括 _Gwaiting_Gsyscall_Gpreempted 幾個狀態(tài)冰木;
  • 可運(yùn)行:Goroutine 已經(jīng)準(zhǔn)備就緒穷劈,可以在線程運(yùn)行,如果當(dāng)前程序中有非常多的 Goroutine踊沸,每個 Goroutine 就可能會等待更多的時間歇终,即 _Grunnable
  • 運(yùn)行中:Goroutine 正在某個線程上運(yùn)行逼龟,即 _Grunning评凝;
Goroutine 的常見狀態(tài)遷移

上圖展示了 Goroutine 狀態(tài)遷移的常見路徑,其中包括創(chuàng)建 Goroutine 到 Goroutine 被執(zhí)行腺律、觸發(fā)系統(tǒng)調(diào)用或者搶占式調(diào)度器的狀態(tài)遷移過程奕短。

M
Go 語言并發(fā)模型中的 M 是操作系統(tǒng)線程。調(diào)度器最多可以創(chuàng)建 10000 個線程匀钧,但是其中大多數(shù)的線程都不會執(zhí)行用戶代碼(可能陷入系統(tǒng)調(diào)用)翎碑,最多只會有 GOMAXPROCS 個活躍線程能夠正常運(yùn)行。

在默認(rèn)情況下之斯,運(yùn)行時會將 GOMAXPROCS 設(shè)置成當(dāng)前機(jī)器的核數(shù)日杈,我們也可以使用 runtime.GOMAXPROCS 來改變程序中最大的線程數(shù)。
在默認(rèn)情況下,一個四核機(jī)器上會創(chuàng)建四個活躍的操作系統(tǒng)線程达椰,每一個線程都對應(yīng)一個運(yùn)行時中的 runtime.m 結(jié)構(gòu)體翰蠢。
在大多數(shù)情況下项乒,我們都會使用 Go 的默認(rèn)設(shè)置啰劲,也就是線程數(shù)等于 CPU 個數(shù),在這種情況下不會觸發(fā)操作系統(tǒng)的線程調(diào)度和上下文切換檀何,所有的調(diào)度都會發(fā)生在用戶態(tài)蝇裤,由 Go 語言調(diào)度器觸發(fā),能夠減少非常多的額外開銷频鉴。
操作系統(tǒng)線程在 Go 語言中會使用私有結(jié)構(gòu)體 runtime.m 來表示栓辜,這個結(jié)構(gòu)體中也包含了幾十個私有的字段,我們依然對其進(jìn)行了刪減垛孔,先來了解幾個與 Goroutine 直接相關(guān)的字段:

type m struct {
    g0   *g 
    curg *g
    ...
}

其中 g0 是持有調(diào)度棧的 Goroutine藕甩,curg 是在當(dāng)前線程上運(yùn)行的用戶 Goroutine,這也是操作系統(tǒng)線程唯一關(guān)心的兩個 Goroutine周荐。
g0 是一個運(yùn)行時中比較特殊的 Goroutine狭莱,它會深度參與運(yùn)行時的調(diào)度過程,包括 Goroutine 的創(chuàng)建概作、大內(nèi)存分配和 CGO 函數(shù)的執(zhí)行腋妙。在后面的小節(jié)中,我們會經(jīng)逞堕牛看到 g0 的身影骤素。runtime.m 結(jié)構(gòu)體中還存在著三個處理器字段,它們分別表示正在運(yùn)行代碼的處理器 p愚屁、暫存的處理器 nextp 和執(zhí)行系統(tǒng)調(diào)用之前的使用線程的處理器 oldp

type m struct {
    p             puintptr
    nextp         puintptr
    oldp          puintptr
}

除了在上面介紹的字段之外济竹,runtime.m 中還包含大量與線程狀態(tài)、鎖霎槐、調(diào)度规辱、系統(tǒng)調(diào)用有關(guān)的字段,我們會在分析調(diào)度過程時詳細(xì)介紹栽燕。

P
調(diào)度器中的處理器 P 是線程和 Goroutine 的中間層罕袋,它能提供線程需要的上下文環(huán)境,也會負(fù)責(zé)調(diào)度線程上的等待隊(duì)列碍岔,通過處理器 P 的調(diào)度浴讯,每一個內(nèi)核線程都能夠執(zhí)行多個 Goroutine,它能在 Goroutine 進(jìn)行一些 I/O 操作時及時切換蔼啦,提高線程的利用率榆纽。
因?yàn)檎{(diào)度器在啟動時就會創(chuàng)建 GOMAXPROCS 個處理器,所以 Go 語言程序的處理器數(shù)量一定會等于 GOMAXPROCS,這些處理器會綁定到不同的內(nèi)核線程上并利用線程的計算資源運(yùn)行 Goroutine奈籽。
runtime.p 是處理器的運(yùn)行時表示饥侵,作為調(diào)度器的內(nèi)部實(shí)現(xiàn),它包含的字段也非常多衣屏,其中包括與性能追蹤躏升、垃圾回收和計時器相關(guān)的字段,這些字段也非常重要狼忱,但是在這里就不一一展示了膨疏,我們主要關(guān)注處理器中的線程和運(yùn)行隊(duì)列:

type p struct {
    m           muintptr

    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    runnext guintptr
    ...
}

反向存儲的線程維護(hù)著線程與處理器之間的關(guān)系,而 runhead钻弄、runqtailrunq 三個字段表示處理器持有的運(yùn)行隊(duì)列佃却,其中存儲著待執(zhí)行的 Goroutine 列表,runnext 中是線程下一個需要執(zhí)行的 Goroutine窘俺。

runtime.p 結(jié)構(gòu)體中的狀態(tài) status 字段會是以下五種中的一種:

狀態(tài) 描述
_Pidle 處理器沒有運(yùn)行用戶代碼或者調(diào)度器饲帅,被空閑隊(duì)列或者改變其狀態(tài)的結(jié)構(gòu)持有,運(yùn)行隊(duì)列為空
_Prunning 被線程 M 持有瘤泪,并且正在執(zhí)行用戶代碼或者調(diào)度器
_Psyscall 沒有執(zhí)行用戶代碼灶泵,當(dāng)前線程陷入系統(tǒng)調(diào)用
_Pgcstop 被線程 M 持有,當(dāng)前處理器由于垃圾回收被停止
_Pdead 當(dāng)前處理器已經(jīng)不被使用

通過分析處理器 P 的狀態(tài)均芽,我們能夠?qū)μ幚砥鞯墓ぷ鬟^程有一些簡單理解丘逸,例如處理器在執(zhí)行用戶代碼時會處于 _Prunning 狀態(tài),在當(dāng)前線程執(zhí)行 I/O 操作時會陷入 _Psyscall 狀態(tài)掀宋。

小結(jié)
我們簡單介紹了 Go 語言調(diào)度器中常見的數(shù)據(jù)結(jié)構(gòu)深纲,包括線程 M、處理器 P 和 Goroutine G劲妙,它們在 Go 語言運(yùn)行時中分別使用不同的私有結(jié)構(gòu)體表示湃鹊,我們在下面會深入分析 Go 語言調(diào)度器的實(shí)現(xiàn)原理。

調(diào)度器啟動

調(diào)度器的啟動過程是我們平時比較難以接觸的過程镣奋,不過作為程序啟動前的準(zhǔn)備工作币呵,理解調(diào)度器的啟動過程對我們理解調(diào)度器的實(shí)現(xiàn)原理很有幫助,運(yùn)行時通過 runtime.schedinit 函數(shù)初始化調(diào)度器:

func schedinit() {
    _g_ := getg()
    ...

    sched.maxmcount = 10000

    ...
    sched.lastpoll = uint64(nanotime())
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
}

在調(diào)度器初始函數(shù)執(zhí)行的過程中會將 maxmcount 設(shè)置成 10000侨颈,這也就是一個 Go 語言程序能夠創(chuàng)建的最大線程數(shù)余赢,雖然最多可以創(chuàng)建 10000 個線程,但是可以同時運(yùn)行的線程還是由 GOMAXPROCS 變量控制哈垢。

我們從環(huán)境變量 GOMAXPROCS 獲取了程序能夠同時運(yùn)行的最大處理器數(shù)之后就會調(diào)用 runtime.procresize 更新程序中處理器的數(shù)量妻柒,在這時整個程序不會執(zhí)行任何用戶 Goroutine,調(diào)度器也會進(jìn)入鎖定狀態(tài)耘分,runtime.procresize 是調(diào)度器啟動的最后一步举塔,在這一步過后調(diào)度器會完成相應(yīng)數(shù)量處理器的啟動绑警,等待用戶創(chuàng)建運(yùn)行新的 Goroutine 并為 Goroutine 調(diào)度處理器資源。

創(chuàng)建 Goroutine

想要啟動一個新的 Goroutine 來執(zhí)行任務(wù)時央渣,我們需要使用 Go 語言中的 go 關(guān)鍵字计盒,這個關(guān)鍵字會在編譯期間通過以下方法 cmd/compile/internal/gc.state.stmtcmd/compile/internal/gc.state.call 兩個方法將該關(guān)鍵字轉(zhuǎn)換成 runtime.newproc 函數(shù)調(diào)用,該函數(shù)會接收大小和表示函數(shù)的指針 funcval芽丹。在這個函數(shù)中我們還會獲取 Goroutine 以及調(diào)用方的程序計數(shù)器北启,然后調(diào)用 runtime.newproc1 函數(shù):

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg()
    pc := getcallerpc()
    newproc1(fn, (*uint8)(argp), siz, gp, pc)
}

runtime.newproc1 會根據(jù)傳入?yún)?shù)初始化一個 g 結(jié)構(gòu)體,該函數(shù)實(shí)現(xiàn)可以分成以下幾個部分:

  1. 獲取或者創(chuàng)建新的 Goroutine 結(jié)構(gòu)體志衍,先從處理器的 gFree 列表中查找空閑的 Goroutine暖庄,如果不存在空閑的 Goroutine聊替,就會通過 runtime.malg 函數(shù)創(chuàng)建一個棧大小足夠的新結(jié)構(gòu)體楼肪;
  2. 將傳入的參數(shù)移到 Goroutine 的棧上,調(diào)用 runtime.memmove 函數(shù)將 fn 函數(shù)的全部參數(shù)拷貝到棧上惹悄;
  3. 更新 Goroutine 調(diào)度相關(guān)的屬性春叫,runtime.newproc1 會設(shè)置新的 Goroutine 結(jié)構(gòu)體的參數(shù),包括棧指針泣港、程序計數(shù)器并更新其狀態(tài)到 _Grunnable暂殖;
  4. 將 Goroutine 加入處理器的運(yùn)行隊(duì)列,將初始化好的 Goroutine 加入處理器的運(yùn)行隊(duì)列并在滿足條件時調(diào)用 runtime.wakep 函數(shù)喚醒新的處理執(zhí)行 Goroutine当纱;

調(diào)度循環(huán)

調(diào)度器啟動之后呛每,Go 語言運(yùn)行時會調(diào)用 runtime.mstart 以及 runtime.mstart1,前者會初始化 g0 的 stackguard0stackguard1 字段坡氯,后者會初始化線程并調(diào)用 runtime.schedule 進(jìn)入調(diào)度循環(huán):

func schedule() {
    _g_ := getg()

top:
    var gp *g
    var inheritTime bool

    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    if gp == nil {
        gp, inheritTime = findrunnable()
    }

    execute(gp, inheritTime)
}

runtime.schedule 函數(shù)的會從不同地方查找待執(zhí)行的 Goroutine:

  1. 為了保證公平,當(dāng)全局運(yùn)行隊(duì)列中有待執(zhí)行的 Goroutine 時,通過 schedtick 保證有一定幾率會從全局的運(yùn)行隊(duì)列中查找對應(yīng)的 Goroutine烁涌;
  2. 從處理器本地的運(yùn)行隊(duì)列中查找待執(zhí)行的 Goroutine帐姻;
  3. 如果前兩種方法都沒有找到 Goroutine,就會通過 runtime.findrunnable 進(jìn)行阻塞地查找 Goroutine悯恍;

runtime.findrunnable 函數(shù)的實(shí)現(xiàn)非常復(fù)雜库糠,這個 300 多行的函數(shù)通過以下的過程獲取可運(yùn)行的 Goroutine:

  1. 從本地運(yùn)行隊(duì)列、全局運(yùn)行隊(duì)列中查找涮毫;
  2. 從網(wǎng)絡(luò)輪詢器中查找是否有 Goroutine 等待運(yùn)行瞬欧;
  3. 通過 runtime.runqsteal 函數(shù)嘗試從其他隨機(jī)的處理器中竊取待運(yùn)行的 Goroutine,在該過程中還可能竊取處理器中的計時器罢防;

因?yàn)楹瘮?shù)的實(shí)現(xiàn)過于復(fù)雜艘虎,上述執(zhí)行過程是經(jīng)過大量簡化的,總而言之篙梢,當(dāng)前函數(shù)一定會返回一個可執(zhí)行的 Goroutine顷帖,如果當(dāng)前不存在就會阻塞等待美旧。

接下來由 runtime.execute 函數(shù)執(zhí)行獲取的 Goroutine,做好準(zhǔn)備工作后贬墩,它會通過 runtime.gogo 將 Goroutine 調(diào)度到當(dāng)前線程上榴嗅。

func execute(gp *g, inheritTime bool) {
    _g_ := getg()

    _g_.m.curg = gp
    gp.m = _g_.m
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        _g_.m.p.ptr().schedtick++
    }

    gogo(&gp.sched)
}

runtime.gogo 在不同處理器架構(gòu)上的實(shí)現(xiàn)都不同,但是不同的實(shí)現(xiàn)也都大同小異陶舞,下面是該函數(shù)在 386 架構(gòu)上的實(shí)現(xiàn):

TEXT runtime·gogo(SB), NOSPLIT, $8-4
    MOVL buf+0(FP), BX     // 獲取調(diào)度信息
    MOVL gobuf_g(BX), DX
    MOVL 0(DX), CX         // 保證 Goroutine 不為空
    get_tls(CX)
    MOVL DX, g(CX)
    MOVL gobuf_sp(BX), SP  // 將 runtime.goexit 函數(shù)的 PC 恢復(fù)到 SP 中
    MOVL gobuf_ret(BX), AX
    MOVL gobuf_ctxt(BX), DX
    MOVL $0, gobuf_sp(BX)
    MOVL $0, gobuf_ret(BX)
    MOVL $0, gobuf_ctxt(BX)
    MOVL gobuf_pc(BX), BX  // 獲取待執(zhí)行函數(shù)的程序計數(shù)器
    JMP  BX                // 開始執(zhí)行

該函數(shù)的實(shí)現(xiàn)非常巧妙嗽测,它從 runtime.gobuf 中取出了 runtime.goexit 的程序計數(shù)器和待執(zhí)行函數(shù)的程序計數(shù)器,其中:

  • runtime.goexit 的程序計數(shù)器被放到了棧 SP 上肿孵;
  • 待執(zhí)行函數(shù)的程序計數(shù)器被放到了寄存器 BX 上唠粥;

runtime.gogo 就利用了 Go 語言的調(diào)用慣例成功模擬這一調(diào)用過程,通過以下幾個關(guān)鍵指令模擬 CALL 的過程:

    MOVL gobuf_sp(BX), SP  // 將 runtime.goexit 函數(shù)的 PC 恢復(fù)到 SP 中
    MOVL gobuf_pc(BX), BX  // 獲取待執(zhí)行函數(shù)的程序計數(shù)器
    JMP  BX                // 開始執(zhí)行

當(dāng) Goroutine 中運(yùn)行的函數(shù)返回時就會跳轉(zhuǎn)到 runtime.goexit 所在位置執(zhí)行該函數(shù):

TEXT runtime·goexit(SB),NOSPLIT,$0-0
    CALL    runtime·goexit1(SB)

func goexit1() {
    mcall(goexit0)
}

經(jīng)過套娃似的一系列函數(shù)調(diào)用停做,我們最終在當(dāng)前線程的 g0 的棧上調(diào)用 runtime.goexit0 函數(shù)晤愧,該函數(shù)會將 Goroutine 轉(zhuǎn)換會 _Gdead 狀態(tài)、清理其中的字段蛉腌、移除 Goroutine 和線程的關(guān)聯(lián)并調(diào)用 runtime.gfput 重新加入處理器的 Goroutine 空閑列表 gFree

func goexit0(gp *g) {
    _g_ := getg()

    casgstatus(gp, _Grunning, _Gdead)
    gp.m = nil
    ...
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    dropg()
    gfput(_g_.m.p.ptr(), gp)
    schedule()
}

在最后 runtime.goexit0 函數(shù)會重新調(diào)用 runtime.schedule 觸發(fā)新的 Goroutine 調(diào)度官份,我們可以認(rèn)為調(diào)度循環(huán)永遠(yuǎn)都不會返回。

調(diào)度循環(huán)

Go 語言中的運(yùn)行時調(diào)度循環(huán)會從 runtime.schedule 函數(shù)開始烙丛,最終又回到 runtime.schedule舅巷;這里介紹的是 Goroutine 正常執(zhí)行并退出的邏輯,實(shí)際情況會復(fù)雜得多河咽,多數(shù)情況下 Goroutine 的執(zhí)行的過程中都會經(jīng)歷協(xié)作式或者搶占式調(diào)度钠右,這時會讓出線程的使用權(quán)等待調(diào)度器的喚醒。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末忘蟹,一起剝皮案震驚了整個濱河市飒房,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌寒瓦,老刑警劉巖情屹,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異杂腰,居然都是意外死亡垃你,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進(jìn)店門喂很,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惜颇,“玉大人,你說我怎么就攤上這事少辣×枭悖” “怎么了?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵漓帅,是天一觀的道長锨亏。 經(jīng)常有香客問我痴怨,道長,這世上最難降的妖魔是什么器予? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任浪藻,我火速辦了婚禮,結(jié)果婚禮上乾翔,老公的妹妹穿的比我還像新娘爱葵。我一直安慰自己,他們只是感情好反浓,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布萌丈。 她就那樣靜靜地躺著,像睡著了一般雷则。 火紅的嫁衣襯著肌膚如雪辆雾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天巧婶,我揣著相機(jī)與錄音乾颁,去河邊找鬼涂乌。 笑死艺栈,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的湾盒。 我是一名探鬼主播湿右,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼罚勾!你這毒婦竟也來了毅人?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤尖殃,失蹤者是張志新(化名)和其女友劉穎丈莺,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體送丰,經(jīng)...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡缔俄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了器躏。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片俐载。...
    茶點(diǎn)故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖登失,靈堂內(nèi)的尸體忽然破棺而出遏佣,到底是詐尸還是另有隱情,我是刑警寧澤揽浙,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布状婶,位于F島的核電站意敛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏膛虫。R本人自食惡果不足惜空闲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望走敌。 院中可真熱鬧碴倾,春花似錦、人聲如沸掉丽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽捶障。三九已至僧须,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間项炼,已是汗流浹背担平。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留锭部,地道東北人暂论。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像拌禾,于是被迫代替她去往敵國和親取胎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,630評論 2 359