從源碼角度看Golang的調(diào)度

從源碼角度看Golang的調(diào)度

本章主要從源碼角度針對Go調(diào)度相關(guān)進行分析罕模。僅關(guān)注linux系統(tǒng)下的邏輯陕截。代碼版本GO1.9.2寺董。

本章例子中的代碼對應(yīng)詳細注釋參考:gosrc-reader


目錄

先來個目錄方便讀者理解文本結(jié)構(gòu)

簡單概念

調(diào)度器的三個抽象概念:G碌奉、M短曾、P

  • G:代表一個goroutine,每個goroutine都有自己獨立的棧存放當(dāng)前的運行內(nèi)存及狀態(tài)赐劣〖倒眨可以把一個G當(dāng)做一個任務(wù)。
  • M: 代表內(nèi)核線程(Pthread)魁兼,它本身就與一個內(nèi)核線程進行綁定婉徘,goroutine運行在M上。
  • P:代表一個處理器咐汞,可以認(rèn)為一個“有運行任務(wù)”的P占了一個CPU線程的資源盖呼,且只要處于調(diào)度的時候就有P。

注:內(nèi)核線程CPU線程的區(qū)別化撕,在系統(tǒng)里可以有上萬個內(nèi)核線程几晤,但CPU線程并沒有那么多,CPU線程也就是Top命令里看到的CPU0植阴、CPU1蟹瘾、CPU2......的數(shù)量。

三者關(guān)系大致如下圖:

image.png

圖1掠手、圖2代表2個有運行任務(wù)時的狀態(tài)憾朴。M與一個內(nèi)核線程綁定,可運行的goroutine列表存放到P里面喷鸽,然后占用了一個CPU線程來運行众雷。

圖3代表沒有運行任務(wù)時的狀態(tài),M依然與一個內(nèi)核線程綁定做祝,由于沒有運行任務(wù)因此不占用CPU線程砾省,同時也不占用P。

調(diào)度的大致輪廓

image.png

圖中表述了由go func觸發(fā)的調(diào)度剖淀。先創(chuàng)建M通過M啟動調(diào)度循環(huán)纯蛾,然后調(diào)度循環(huán)過程中獲取G來執(zhí)行纤房,執(zhí)行過程中遇到圖中running G后面幾個case再次進入下一循環(huán)纵隔。

下面從程序啟動、調(diào)度循環(huán)炮姨、G的來源三個角度分析調(diào)度的實現(xiàn)捌刮。

進程啟動時都做了什么

下面先看一段程序啟動的代碼

// runtime/asm_amd64.s

TEXT runtime·rt0_go(SB),NOSPLIT,$0
......此處省略N多代碼......
ok:
        // set the per-goroutine and per-mach "registers"
        get_tls(BX)  // 將 g0 放到 tls(thread local storage)里
        LEAQ    runtime·g0(SB), CX
        MOVQ    CX, g(BX)
        LEAQ    runtime·m0(SB), AX

        // save m->g0 = g0  // 將全局M0與全局G0綁定
        MOVQ    CX, m_g0(AX)
        // save m0 to g0->m
        MOVQ    AX, g_m(CX)

        CLD                             // convention is D is always left cleared
        CALL    runtime·check(SB)

        MOVL    16(SP), AX              // copy argc
        MOVL    AX, 0(SP)
        MOVQ    24(SP), AX              // copy argv
        MOVQ    AX, 8(SP)
        CALL    runtime·args(SB) // 解析命令行參數(shù)
        CALL    runtime·osinit(SB) // 只初始化了CPU核數(shù)
        CALL    runtime·schedinit(SB) // 內(nèi)存分配器、棧舒岸、P绅作、GC回收器等初始化

        // create a new goroutine to start program
        MOVQ    $runtime·mainPC(SB), AX         // 
        PUSHQ   AX
        PUSHQ   $0                      // arg size
        CALL    runtime·newproc(SB) // 創(chuàng)建一個新的G來啟動runtime.main
        POPQ    AX
        POPQ    AX

        // start this M
        CALL    runtime·mstart(SB) // 啟動M0,開始等待空閑G,正式進入調(diào)度循環(huán)

        MOVL    $0xf1, 0xf1  // crash
        RET

在啟動過程里主要做了這三個事情(這里只跟調(diào)度相關(guān)的):

  • 初始化固定數(shù)量的P
  • 創(chuàng)建一個新的G來啟動runtime.main,也就是runtime下的main方法
  • 創(chuàng)建全局M0、全局G0蛾派,啟動M0進入第一個調(diào)度循環(huán)

M0是什么俄认?程序里會啟動多個M个少,第一個啟動的叫M0。

G0是什么眯杏?執(zhí)行runtime下調(diào)度工作的叫G0夜焦,每個M都綁定一個G0。寫程序接觸到的基本都是第一種

我們按照順序看是怎么完成上面三個事情的岂贩。

runtime.osinit(SB)方法針對系統(tǒng)環(huán)境的初始化

這里實質(zhì)只做了一件事情茫经,就是獲取CPU的線程數(shù),也就是Top命令里看到的CPU0萎津、CPU1卸伞、CPU2......的數(shù)量

// runtime/os_linux.go

func osinit() {
    ncpu = getproccount()
}

runtime.schedinit(SB)調(diào)度相關(guān)的一些初始化

// runtime/proc.go

// 設(shè)置最大M數(shù)量
sched.maxmcount = 10000

// 初始化當(dāng)前M,即全局M0
mcommoninit(_g_.m)

// 查看應(yīng)該啟動的P數(shù)量,默認(rèn)為cpu core數(shù).
// 如果設(shè)置了環(huán)境變量GOMAXPROCS則以環(huán)境變量為準(zhǔn),最大不得超過_MaxGomaxprocs(1024)個
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
    procs = n
}
if procs > _MaxGomaxprocs {
    procs = _MaxGomaxprocs
}
// 調(diào)整P數(shù)量锉屈,此時由于是初始化階段荤傲,所以P都是新建的
if procresize(procs) != nil {
    throw("unknown runnable goroutine during bootstrap")
}

這里sched.maxmcount設(shè)置了M最大的數(shù)量,而M代表的是系統(tǒng)內(nèi)核線程颈渊,因此可以認(rèn)為一個進程最大只能啟動10000個系統(tǒng)線程弃酌。

procresize初始化P的數(shù)量,procs參數(shù)為初始化的數(shù)量儡炼,而在初始化之前先做數(shù)量的判斷妓湘,默認(rèn)是ncpu(與CPU核數(shù)相等)。也可以通過環(huán)境變量GOMAXPROCS來控制P的數(shù)量乌询。_MaxGomaxprocs控制了最大的P數(shù)量只能是1024榜贴。

有些人在進程初始化的時候經(jīng)常用到runtime.GOMAXPROCS()方法,其實也是調(diào)用的procresize方法重新設(shè)置了最大CPU使用數(shù)量妹田。

runtime·mainPC(SB)啟動監(jiān)控任務(wù)

// runtime/proc.go

// The main goroutine.
func main() {
    ......
    
    // 啟動后臺監(jiān)控
    systemstack(func() {
        newm(sysmon, nil)
    })

    ......
}

在runtime下會啟動一個全程運行的監(jiān)控任務(wù)唬党,該任務(wù)用于標(biāo)記搶占執(zhí)行過長時間的G,以及檢測epoll里面是否有可執(zhí)行的G鬼佣。下面會詳細說到驶拱。

最后runtime·mstart(SB)啟動調(diào)度循環(huán)

前面都是各種初始化操作,在這里開啟了調(diào)度器的第一個調(diào)度循環(huán)晶衷。(這里啟動的M就是M0)

下面來圍繞G蓝纲、M、P三個概念介紹Goroutine調(diào)度循環(huán)的運作流程晌纫。

調(diào)度循環(huán)都做了什么

先看一個簡易的流程圖:

image.png

圖1代表M啟動的過程税迷,把M跟一個P綁定再一起。在程序初始化的過程中說到在進程啟動的最后一步啟動了第一個M(即M0)锹漱,這個M從全局的空閑P列表里拿到一個P箭养,然后與其綁定。而P里面有2個管理G的鏈表(runq存儲等待運行的G列表哥牍,gfree存儲空閑的G列表)毕泌,M啟動后等待可執(zhí)行的G喝检。

圖2代表創(chuàng)建G的過程。創(chuàng)建完一個G先扔到當(dāng)前P的runq待運行隊列里撼泛。

圖3的執(zhí)行過程里蛇耀,M從綁定的P的runq列表或者全局的runq里獲取一個G來執(zhí)行。

圖4的流程里當(dāng)執(zhí)行完成后把G仍到gfree隊列里坎弯。注意此時G并沒有銷毀(只重置了G的棧以及狀態(tài))纺涤,當(dāng)再次創(chuàng)建G的時候優(yōu)先從gfree列表里獲取,這樣就起到了復(fù)用G的作用抠忘,避免反復(fù)與系統(tǒng)交互創(chuàng)建內(nèi)存撩炊。

M即啟動后處于一個自循環(huán)狀態(tài),執(zhí)行完一個G之后繼續(xù)執(zhí)行下一個G崎脉,反復(fù)上面的圖2~圖4過程拧咳。當(dāng)?shù)谝粋€M正在繁忙而又有新的G需要執(zhí)行時,會再開啟一個M來執(zhí)行囚灼。

下面詳細看下調(diào)度循環(huán)的實現(xiàn)骆膝。

調(diào)度器如何開啟調(diào)度循環(huán)

先看一下M的啟動過程(M0啟動是個特殊的啟動過程,也是第一個啟動的M灶体,由匯編實現(xiàn)的初始化后啟動阅签,而后續(xù)的M創(chuàng)建以及啟動則是Go代碼實現(xiàn))。

// runtime/proc.go

func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    if _p_ == nil {
        // 從空閑P里獲取一個
        _p_ = pidleget()
        
        ......
    }
    // 獲取一個空閑的m
    mp := mget()
    unlock(&sched.lock)
    // 如果沒有空閑M蝎抽,則new一個
    if mp == nil {
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    
    ......
    
    // 喚醒M
    notewakeup(&mp.park)
}

func newm(fn func(), _p_ *p) {
    // 創(chuàng)建一個M對象,且與P關(guān)聯(lián)
    mp := allocm(_p_, fn)
    // 暫存P
    mp.nextp.set(_p_)
    mp.sigmask = initSigmask
    
    ......
    
    execLock.rlock() // Prevent process clone.
    // 創(chuàng)建系統(tǒng)內(nèi)核線程
    newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
    execLock.runlock()
}

// runtime/os_linux.go
func newosproc(mp *m, stk unsafe.Pointer) {
    // Disable signals during clone, so that the new thread starts
    // with signals disabled. It will enable them in minit.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)
}

func allocm(_p_ *p, fn func()) *m {
    ......
    
    mp := new(m)
    mp.mstartfn = fn // 設(shè)置啟動函數(shù)
    mcommoninit(mp)  // 初始化m

    // 創(chuàng)建g0
    // In case of cgo or Solaris, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
        mp.g0 = malg(-1)
    } else {
        mp.g0 = malg(8192 * sys.StackGuardMultiplier)
    }
    // 把新創(chuàng)建的g0與M做關(guān)聯(lián)
    mp.g0.m = mp

    ......
    
    return mp
}

func mstart() {
    ......
    
    mstart1()
}

func mstart1() {

    ......
    
    // 進入調(diào)度循環(huán)(阻塞不返回)
    schedule()
}

非M0的啟動首先從startm方法開始啟動政钟,要進行調(diào)度工作必須有調(diào)度處理器P,因此先從空閑的P鏈表里獲取一個P樟结,在newm方法創(chuàng)建一個M與P綁定养交。

newm方法中通過newosproc新建一個內(nèi)核線程,并把內(nèi)核線程與M以及mstart方法進行關(guān)聯(lián)瓢宦,這樣內(nèi)核線程執(zhí)行時就可以找到M并且找到啟動調(diào)度循環(huán)的方法碎连。最后schedule啟動調(diào)度循環(huán)

allocm方法中創(chuàng)建M的同時創(chuàng)建了一個G與自己關(guān)聯(lián),這個G就是我們在上面說到的g0驮履。為什么M要關(guān)聯(lián)一個g0鱼辙?因為runtime下執(zhí)行一個G也需要用到棧空間來完成調(diào)度工作疲吸,而擁有執(zhí)行棧的地方只有G座每,因此需要為每個執(zhí)行線程里配置一個g0。

調(diào)度器如何進行調(diào)度循環(huán)

調(diào)用schedule進入調(diào)度器的調(diào)度循環(huán)后摘悴,在這個方法里永遠不再返回。下面看下實現(xiàn)舰绘。

// runtime/proc.go

func schedule() {
    _g_ := getg()

    // 進入gc MarkWorker 工作模式
    if gp == nil && gcBlackenEnabled != 0 {
        gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
    }
    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        // 每處理n個任務(wù)就去全局隊列獲取G任務(wù),確保公平
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    // 從P本地獲取
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    // 從其它地方獲取G,如果獲取不到則沉睡M蹂喻,并且阻塞在這里葱椭,直到M被再次使用
    if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    ......
    
    // 執(zhí)行找到的G
    execute(gp, inheritTime)
}

// 從P本地獲取一個可運行的G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    // 優(yōu)先從runnext里獲取一個G,如果沒有則從runq里獲取
    for {
        next := _p_.runnext
        if next == 0 {
            break
        }
        if _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
    }

    // 從隊頭獲取
    for {
        h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

// 從其它地方獲取G
func findrunnable() (gp *g, inheritTime bool) {
    ......

    // 從本地隊列獲取
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // 全局隊列獲取
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }
    
    // 從epoll里取
    if netpollinited() && sched.lastpoll != 0 {
        if gp := netpoll(false); gp != nil { // non-blocking
            ......
            
            return gp, false
        }
    }
    
    ......
    
    // 嘗試4次從別的P偷
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // 在這里開始針對P進行偷取操作
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
}

// 嘗試從全局runq中獲取G
// 在"sched.runqsize/gomaxprocs + 1"口四、"max"孵运、"len(_p_.runq))/2"三個數(shù)字中取最小的數(shù)字作為獲取的G數(shù)量
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 {
        return nil
    }

    n := sched.runqsize/gomaxprocs + 1
    if n > sched.runqsize {
        n = sched.runqsize
    }
    if max > 0 && n > max {
        n = max
    }
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

    sched.runqsize -= n
    if sched.runqsize == 0 {
        sched.runqtail = 0
    }

    gp := sched.runqhead.ptr()
    sched.runqhead = gp.schedlink
    n--
    for ; n > 0; n-- {
        gp1 := sched.runqhead.ptr()
        sched.runqhead = gp1.schedlink
        runqput(_p_, gp1, false) // 放到本地P里
    }
    return gp
}

schedule中首先嘗試從P本地隊列中獲取(runqget)一個可執(zhí)行的G,如果沒有則從其它地方獲取(findrunnable),最終通過execute方法執(zhí)行G蔓彩。

runqget先通過runnext拿到待運行G,沒有的話治笨,再從runq里面取。

findrunnable從全局隊列赤嚼、epoll旷赖、別的P里獲取。(后面會擴展分析實現(xiàn))

在調(diào)度的開頭出還做了一個小優(yōu)化:每處理一些任務(wù)之后更卒,就優(yōu)先從全局隊列里獲取任務(wù)等孵,以保障公平性,防止由于每個P里的G過多蹂空,而全局隊列里的任務(wù)一直得不到執(zhí)行機會俯萌。

這里用到了一個關(guān)鍵方法getg(),runtime的代碼里大量使用該方法上枕,它由匯編實現(xiàn)咐熙,該方法就是獲取當(dāng)前運行的G,具體實現(xiàn)不再這里闡述辨萍。

多個線程下如何調(diào)度

拋出一個問題:每個P里面的G執(zhí)行時間是不可控的糖声,如果多個P同時在執(zhí)行,會不會出現(xiàn)有的P里面的G執(zhí)行不完分瘦,有的P里面幾乎沒有G可執(zhí)行呢蘸泻?

這就要從M的自循環(huán)過程中如何獲取G、歸還G的行為說起了嘲玫,先看圖:

image.png

圖中可以看出有兩種途徑:1.借助全局隊列sched.runq作為中介悦施,本地P里的G太多的話就放全局里,G太少的話就從全局取去团。2.全局列表里沒有的話直接從P1里偷取(steal)抡诞。(更多M在執(zhí)行的話,同樣的原理土陪,這里就只拿2個來舉例)

第1種途徑實現(xiàn)如下:

// runtime/proc.go

func runqput(_p_ *p, gp *g, next bool) {
    if randomizeScheduler && next && fastrand()%2 == 0 {
        next = false
    }

    // 嘗試把G添加到P的runnext節(jié)點昼汗,這里確保runnext只有一個G,如果之前已經(jīng)有一個G則踢出來放到runq里
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // 把老的g踢出來鬼雀,在下面放到runq里
        gp = oldnext.ptr()
    }

retry:
    // 如果_p_.runq隊列不滿顷窒,則放到隊尾就結(jié)束了。
    // 試想如果不放到隊尾而放到隊頭里會怎樣?如果頻繁的創(chuàng)建G則可能后面的G總是不被執(zhí)行鞋吉,對后面的G不公平
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    //如果隊列滿了鸦做,嘗試把G和當(dāng)前P里的一部分runq放到全局隊列
    //因為操作全局需要加鎖,所以名字里帶個slow
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    var batch [len(_p_.runq)/2 + 1]*g

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    if n != uint32(len(_p_.runq)/2) {
        throw("runqputslow: queue is not full")
    }
    // 從runq頭部開始取出一半的runq放到臨時變量batch里
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    // 把要put的g也放進batch去
    batch[n] = gp

    if randomizeScheduler {
        for i := uint32(1); i <= n; i++ {
            j := fastrandn(i + 1)
            batch[i], batch[j] = batch[j], batch[i]
        }
    }

    // 把取出來的一半runq組成鏈表
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
    }

    // 將一半的runq放到global隊列里,一次多轉(zhuǎn)移一些省得轉(zhuǎn)移頻繁
    lock(&sched.lock)
    globrunqputbatch(batch[0], batch[n], int32(n+1))
    unlock(&sched.lock)
    return true
}

func globrunqputbatch(ghead *g, gtail *g, n int32) {
    gtail.schedlink = 0
    if sched.runqtail != 0 {
        sched.runqtail.ptr().schedlink.set(ghead)
    } else {
        sched.runqhead.set(ghead)
    }
    sched.runqtail.set(gtail)
    sched.runqsize += n
}

runqput方法歸還執(zhí)行完的G,runq定義是runq [256]guintptr,有固定的長度谓着,因此當(dāng)前P里的待運行G超過256的時候說明過多了泼诱,則執(zhí)行runqputslow方法把一半G扔給全局G鏈表,globrunqputbatch連接全局鏈表的頭尾指針赊锚。

但可能別的P里面并沒有超過256治筒,就不會放到全局G鏈表里,甚至可能一直維持在不到256個舷蒲。這就借助第2個途徑了:

第2種途徑實現(xiàn)如下:

// runtime/proc.go

// 從其它地方獲取G
func findrunnable() (gp *g, inheritTime bool) {
    ......
    
    // 嘗試4次從別的P偷
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // 在這里開始針對P進行偷取操作
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
}

從別的P里面"偷取"一些G過來執(zhí)行了耸袜。runqsteal方法實現(xiàn)了"偷取"操作。

// runtime/proc.go

// 偷取P2一半到本地運行隊列阿纤,失敗則返回nil
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
    t := _p_.runqtail
    n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
    if n == 0 {
        return nil
    }
    n--
    // 返回尾部的一個G
    gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
    if n == 0 {
        return gp
    }
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    if t-h+n >= uint32(len(_p_.runq)) {
        throw("runqsteal: runq overflow")
    }
    atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
    return gp
}

// 從P里獲取一半的G,放到batch里
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
    for {
        // 計算一半的數(shù)量
        h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer
        n := t - h
        n = n - n/2
        
        ......
        
        // 將偷到的任務(wù)轉(zhuǎn)移到本地P隊列里
        for i := uint32(0); i < n; i++ {
            g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
            batch[(batchHead+i)%uint32(len(batch))] = g
        }
        if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
            return n
        }
    }
}

上面可以看出從別的P里面偷(steal)了一半句灌,這樣就足夠運行了碎罚。有了“偷取”操作也就充分利用了多線程的資源沃饶。

調(diào)度循環(huán)中如何讓出CPU

執(zhí)行完成讓出CPU

絕大多數(shù)場景下我們程序都是執(zhí)行完一個G,再執(zhí)行另一個G芹血,那我們就看下G是如何被執(zhí)行以及執(zhí)行完如何退出的藐窄。

先看G如何被執(zhí)行:

// runtime/proc.go

func execute(gp *g, inheritTime bool) {
    _g_ := getg()

    casgstatus(gp, _Grunnable, _Grunning)
    
    ......

    // 真正的執(zhí)行G资昧,切換到該G的棧幀上執(zhí)行(匯編實現(xiàn))
    gogo(&gp.sched)
}

execute方法先更改G的狀態(tài)為_Grunning表示運行中,最終給gogo方法做實際的執(zhí)行操作。而gogo方法則是匯編實現(xiàn)荆忍。再來看下gogo方法的實現(xiàn):

// runtime.asm_amd64.s

TEXT runtime·gogo(SB), NOSPLIT, $16-8
        MOVQ    buf+0(FP), BX           // gobuf 把0偏移的8個字節(jié)給BX寄存器, gobuf結(jié)構(gòu)的前8個字節(jié)就是SP指針

        // If ctxt is not nil, invoke deletion barrier before overwriting.
        MOVQ    gobuf_ctxt(BX), AX // 在把gobuf的ctxt變量給AX寄存器
        TESTQ   AX, AX // 判斷AX寄存器是否為空,傳進來gp.sched的話肯定不為空了,因此JZ nilctxt不跳轉(zhuǎn)
        JZ      nilctxt
        LEAQ    gobuf_ctxt(BX), AX
        MOVQ    AX, 0(SP)
        MOVQ    $0, 8(SP)
        CALL    runtime·writebarrierptr_prewrite(SB)
        MOVQ    buf+0(FP), BX

nilctxt: // 下面則是函數(shù)棧的BP SP指針移動格带,最后進入到指定的代碼區(qū)域
        MOVQ    gobuf_g(BX), DX
        MOVQ    0(DX), CX               // make sure g != nil
        get_tls(CX)
        MOVQ    DX, g(CX)
        MOVQ    gobuf_sp(BX), SP        // restore SP
        MOVQ    gobuf_ret(BX), AX 
        MOVQ    gobuf_ctxt(BX), DX
        MOVQ    gobuf_bp(BX), BP
        MOVQ    $0, gobuf_sp(BX)        // clear to help garbage collector
        MOVQ    $0, gobuf_ret(BX) 
        MOVQ    $0, gobuf_ctxt(BX)
        MOVQ    $0, gobuf_bp(BX)
        MOVQ    gobuf_pc(BX), BX // PC指針指向退出時要執(zhí)行的函數(shù)地址
        JMP     BX  // 跳轉(zhuǎn)到執(zhí)行代碼處
// runtime/runtime2.go

type gobuf struct {
    // The offsets of sp, pc, and g are known to (hard-coded in) libmach.
    //
    // ctxt is unusual with respect to GC: it may be a
    // heap-allocated funcval so write require a write barrier,
    // but gobuf needs to be cleared from assembly. We take
    // advantage of the fact that the only path that uses a
    // non-nil ctxt is morestack. As a result, gogo is the only
    // place where it may not already be nil, so gogo uses an
    // explicit write barrier. Everywhere else that resets the
    // gobuf asserts that ctxt is already nil.
    sp   uintptr
    pc   uintptr
    g    guintptr
    ctxt unsafe.Pointer // this has to be a pointer so that gc scans it
    ret  sys.Uintreg
    lr   uintptr
    bp   uintptr // for GOEXPERIMENT=framepointer
}

gogo方法傳的參數(shù)注意是gp.sched,而這個結(jié)構(gòu)體里可以看到保存了熟悉的函數(shù)棧寄存器SP/PC/BP,能想到是把執(zhí)行棧傳了進去(既然是執(zhí)行一個G刹枉,當(dāng)然要把執(zhí)行棧傳進去了)叽唱。可以看到在gogo函數(shù)中實質(zhì)就只是做了函數(shù)棧指針的移動微宝。

這個執(zhí)行G的操作棺亭,熟悉函數(shù)調(diào)用的函數(shù)棧的基本原理的人想必有些印象(如果不熟悉請自行搜索),執(zhí)行一個G其實就是執(zhí)行函數(shù)一樣切換到對應(yīng)的函數(shù)棧幀上蟋软。

C語言里棧幀創(chuàng)建的時候有個IP寄存器指向"return address",即主調(diào)函數(shù)的一條指令的地址镶摘, 被調(diào)函數(shù)退出的時候通過該指針回到調(diào)用函數(shù)里。在Go語言里有個PC寄存器指向退出函數(shù)岳守。那么下PC寄存器指向的是哪里凄敢?我們回到創(chuàng)建G的地方看下代碼:

// runtime/proc.go

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
    ......
    
    // 從當(dāng)前P里面復(fù)用一個空閑G
    newg := gfget(_p_)
    // 如果沒有空閑G則新建一個,默認(rèn)堆大小為_StackMin=2048 bytes
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        // 把新創(chuàng)建的G添加到全局allg里
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    
    ......
    
    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // 記錄當(dāng)前任務(wù)的pc寄存器為goexit方法,用于當(dāng)執(zhí)行G結(jié)束后找到退出方法湿痢,從而再次進入調(diào)度循環(huán) // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.startpc = fn.fn
    
    .......
    
    return newg
}

代碼中可以看到涝缝,給G的執(zhí)行環(huán)境里的pc變量賦值了一個goexit的函數(shù)地址,也就是說G正常執(zhí)行完退出時執(zhí)行的是goexit函數(shù)。再看下該函數(shù)的實現(xiàn):

// runtime/asm_amd64.s

// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT,$0-0
    BYTE    $0x90   // NOP
    CALL    runtime·goexit1(SB) // does not return
    // traceback from goexit1 must hit code range of goexit
    BYTE    $0x90   // NOP
// runtime/proc.go

// G執(zhí)行結(jié)束后回到這里放到P的本地隊列里
func goexit1() {
    if raceenabled {
        racegoend()
    }
    if trace.enabled {
        traceGoEnd()
    }
    // 切換到g0來釋放G
    mcall(goexit0)
}

// g0下當(dāng)G執(zhí)行結(jié)束后回到這里放到P的本地隊列里
func goexit0(gp *g) {
    ......

    gfput(_g_.m.p.ptr(), gp)
    schedule()
}

代碼中切換到了G0下執(zhí)行了schedule方法俊卤,再次進度了下一輪調(diào)度循環(huán)嫩挤。

以上就是正常執(zhí)行一個G并正常退出的實現(xiàn)害幅。

主動讓出CPU

在實際場景中還有一些沒有執(zhí)行完成的G消恍,而又需要臨時停止執(zhí)行,比如time.Sleep以现、IO阻塞等等狠怨,就需要掛起該G,把CPU讓出給別人使用邑遏。在runtime下面有個gopark方法佣赖,看下實現(xiàn):

// runtime/proc.go

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    // mcall 在M里從當(dāng)前正在運行的G切換到g0
    // park_m 在切換到的g0下先把傳過來的G切換為_Gwaiting狀態(tài)掛起該G
    // 調(diào)用回調(diào)函數(shù)waitunlockf()由外層決定是否等待解鎖,返回true則等待解鎖不在執(zhí)行G记盒,返回false則不等待解鎖繼續(xù)執(zhí)行
    mcall(park_m)
}
// runtime/stubs.go

// mcall switches from the g to the g0 stack and invokes fn(g),
// where g is the goroutine that made the call.
// mcall saves g's current PC/SP in g->sched so that it can be restored later.
......
func mcall(fn func(*g))
// runtime/proc.go

func park_m(gp *g) {
    _g_ := getg() // 此處獲得的是g0,而不是gp

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg() // 把g0從M的"當(dāng)前運行"里剝離出來

    if _g_.m.waitunlockf != nil {
        fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok { // 如果不需要等待解鎖憎蛤,則切換到_Grunnable狀態(tài)并直接執(zhí)行G
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

gopark是進行調(diào)度出讓CPU資源的方法,里面有個方法mcall()纪吮,注釋里這樣描述:

從當(dāng)前運行的G切換到g0的運行棧上俩檬,然后調(diào)用fn(g),這里被調(diào)用的G是調(diào)用mcall方法時的G碾盟。mcall方法保存當(dāng)前運行的G的 PC/SP 到 g->sched 里棚辽,因此該G可以在以后被重新恢復(fù)執(zhí)行.

在本章開始介紹初始化過程中有提到M創(chuàng)建的時候綁定了一個g0,調(diào)度工作是運行在g0的棧上的冰肴。mcall方法通過g0先把當(dāng)前調(diào)用的G的執(zhí)行棧暫存到 g->sched 變量里屈藐,然后切換到g0的執(zhí)行棧上執(zhí)行park_mpark_m方法里把gp的狀態(tài)從 _Grunning 切換到 _Gwaiting 表明進入到等待喚醒狀態(tài)熙尉,此時休眠G的操作就完成了联逻。接下來既然G休眠了,CPU線程總不能閑下來检痰,在park_m方法里又可以看到schedule方法包归,開始進入到到一輪調(diào)度循環(huán)了。

park_m方法里還有段小插曲攀细,進入調(diào)度循環(huán)之前還有個對waitunlockf方法的判斷箫踩,該方法意思是如果解鎖不成功則調(diào)用execute方法繼續(xù)執(zhí)行之前的G,而該方法永遠不會return谭贪,也就不會再次進入下一次調(diào)度境钟。也就是說給外部一個控制是否要進行下一個調(diào)度的選擇。

搶占讓出CPU

回想在runtime.main()里面有單獨啟動了一個監(jiān)控任務(wù)俭识,方法是sysmon慨削。看下該方法:

// runtime/proc.go

func sysmon() {
    ......
    
    for {
        // delay參數(shù)用于控制for循環(huán)的間隔,不至于無限死循環(huán)缚态。
        // 控制邏輯是前50次每次sleep 20微秒磁椒,超過50次則每次翻2倍,直到最大10毫秒
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)
        
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        now := nanotime()
        if lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            gp := netpoll(false) // non-blocking - returns list of goroutines
            if gp != nil {
                ......
                
                incidlelocked(-1)
                // 把epoll ready的G列表注入到全局runq里
                injectglist(gp)
                incidlelocked(1)
            }
        }
        
        // retake P's blocked in syscalls
        // and preempt long running G's
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
        
        ......
    }
}

func retake(now int64) uint32 {
    n := 0
    for i := int32(0); i < gomaxprocs; i++ {
        _p_ := allp[i] // 從所有P里面去找
        if _p_ == nil {
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {
        
            ......
            
        } else if s == _Prunning { // 針對正在運行的P
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            // 如果已經(jīng)超過forcePreemptNS(10ms)玫芦,則搶占
            if pd.schedwhen+forcePreemptNS > now {
                continue
            }
            // 搶占P
            preemptone(_p_)
        }
    }
    return uint32(n)
}

func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    // 找到當(dāng)前正在運行的G
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }
    // 標(biāo)記搶占狀態(tài)
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // G里面的每一次調(diào)用都會比較當(dāng)前棧指針與 gp->stackguard0 來檢查堆棧溢出
    // 設(shè)置 gp->stackguard0 為 StackPreempt 來觸發(fā)正常的堆棧溢出檢測
    gp.stackguard0 = stackPreempt
    return true
}

sysmon()方法處于無限for循環(huán)浆熔,整個進程的生命周期監(jiān)控著。retake()方法每次對所有的P遍歷檢查超過10ms的還在運行的G桥帆,如果有超過10ms的則通過preemptone()進行搶占医增,但是要注意這里只把gp.stackguard0賦值了一個stackPreempt,并沒有做讓出CPU的操作老虫,因此這里的搶占實質(zhì)只是一個”標(biāo)記“搶占叶骨。那么真正停止G執(zhí)行的操作在哪里?

// runtime/stack.go

func newstack(ctxt unsafe.Pointer) {
    ......
    
    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below.
    // 這里的邏輯是為G的搶占做的判斷祈匙。
    // 判斷是否是搶占引發(fā)棧擴張忽刽,如果 gp.stackguard0 == stackPreempt 則說明是搶占觸發(fā)的棧擴張
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

    ......

    //如果判斷可以搶占, 則繼續(xù)判斷是否GC引起的, 如果是則對G的棧空間執(zhí)行標(biāo)記處理(掃描根對象)然后繼續(xù)運行,
    //如果不是GC引起的則調(diào)用gopreempt_m函數(shù)完成搶占.
    if preempt {
        ......
        
        // 停止當(dāng)前運行狀態(tài)的G,最后放到全局runq里,釋放M
        // 這里會進入schedule循環(huán).阻塞到這里
        gopreempt_m(gp) // never return
    }

    ......
}
// runtime/proc.go

func goschedImpl(gp *g) {
    status := readgstatus(gp)
    if status&^_Gscan != _Grunning {
        dumpgstatus(gp)
        throw("bad g status")
    }
    casgstatus(gp, _Grunning, _Grunnable)
    dropg()
    lock(&sched.lock)
    globrunqput(gp)
    unlock(&sched.lock)

    schedule()
}

我們都知道Go的調(diào)度是非搶占式的夺欲,要想實現(xiàn)G不被長時間跪帝,就只能主動觸發(fā)搶占,而Go觸發(fā)搶占的實際就是在棧擴張的時候洁闰,在newstack新創(chuàng)建椙干酰空間的時候檢測是否有搶占標(biāo)記(也就是gp.stackguard0是否等于stackPreempt),如果有則通過goschedImpl方法再次進入到熟悉的schedule調(diào)度循環(huán)扑眉。

系統(tǒng)調(diào)用讓出CPU

我們程序都跑在系統(tǒng)上面纸泄,就繞不開與系統(tǒng)的交互。那么當(dāng)我們的Go程序做系統(tǒng)調(diào)用的時候腰素,系統(tǒng)的方法不確定會阻塞多久聘裁,而我們程序又不知道運行的狀態(tài)該怎么辦?

在Go中并沒有直接對系統(tǒng)內(nèi)核函數(shù)調(diào)用弓千,而是封裝了個syscall.Syscall方法衡便,先看下實現(xiàn):

// syscall/syscall_unix.go

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
// syscall/asm_linux_amd64.s

TEXT    ·Syscall(SB),NOSPLIT,$0-56
    CALL    runtime·entersyscall(SB) 
    MOVQ    a1+8(FP), DI
    MOVQ    a2+16(FP), SI
    MOVQ    a3+24(FP), DX
    MOVQ    $0, R10
    MOVQ    $0, R8
    MOVQ    $0, R9
    MOVQ    trap+0(FP), AX  // syscall entry
    SYSCALL // 進行系統(tǒng)調(diào)用
    CMPQ    AX, $0xfffffffffffff001
    JLS ok
    MOVQ    $-1, r1+32(FP)
    MOVQ    $0, r2+40(FP)
    NEGQ    AX
    MOVQ    AX, err+48(FP)
    CALL    runtime·exitsyscall(SB)
    RET
ok:
    MOVQ    AX, r1+32(FP)
    MOVQ    DX, r2+40(FP)
    MOVQ    $0, err+48(FP)
    CALL    runtime·exitsyscall(SB)
    RET

在匯編代碼中看出先是執(zhí)行了runtime·entersyscall方法,然后進行系統(tǒng)調(diào)用洋访,最后執(zhí)行了runtime·exitsyscall(SB)镣陕,從字面意思看是進入系統(tǒng)調(diào)用之前先執(zhí)行一些邏輯,退出系統(tǒng)調(diào)用之后執(zhí)行一堆邏輯姻政〈粢郑看下具體實現(xiàn):

// runtime/proc.go

func entersyscall(dummy int32) {
    reentersyscall(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy)))
}

func reentersyscall(pc, sp uintptr) {
    ......
    
    // Leave SP around for GC and traceback.
    // 保存執(zhí)行現(xiàn)場
    save(pc, sp)
    _g_.syscallsp = sp
    _g_.syscallpc = pc
    // 切換到系統(tǒng)調(diào)用狀態(tài)
    casgstatus(_g_, _Grunning, _Gsyscall)
    
    ......
    
    // Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).
    // We set _StackGuard to StackPreempt so that first split stack check calls morestack.
    // Morestack detects this case and throws.
    _g_.stackguard0 = stackPreempt
    _g_.m.locks--
}

進入系統(tǒng)調(diào)用前先保存執(zhí)行現(xiàn)場,然后切換到_Gsyscall狀態(tài)汁展,最后標(biāo)記搶占鹊碍,等待被搶占走厌殉。

// runtime/proc.go

func exitsyscall(dummy int32) {
    ......

    // Call the scheduler.
    mcall(exitsyscall0)

    ......
}

func exitsyscall0(gp *g) {
    _g_ := getg()

    casgstatus(gp, _Gsyscall, _Grunnable)
    dropg()
    lock(&sched.lock)
    // 獲取一個空閑的P,如果沒有則放到全局隊列里侈咕,如果有則執(zhí)行
    _p_ := pidleget()
    if _p_ == nil {
        globrunqput(gp) // 如果沒有P就放到全局隊列里,等待有資源時執(zhí)行
    } else if atomic.Load(&sched.sysmonwait) != 0 {
        atomic.Store(&sched.sysmonwait, 0)
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    if _p_ != nil {
        acquirep(_p_)
        execute(gp, false) // Never returns. // 如果找到空閑的P則直接執(zhí)行
    }
    if _g_.m.lockedg != nil {
        // Wait until another thread schedules gp and so m again.
        stoplockedm()
        execute(gp, false) // Never returns.
    }
    stopm()
    schedule() // Never returns. // 沒有P資源執(zhí)行公罕,就繼續(xù)下一輪調(diào)度循環(huán)
}

系統(tǒng)調(diào)用退出時,切到G0下把G狀態(tài)切回來耀销,如果有可執(zhí)行的P則直接執(zhí)行楼眷,如果沒有則放到全局隊列里,等待調(diào)度树姨,最后又看到了熟悉的schedule進入下一輪調(diào)度循環(huán)摩桶。

待執(zhí)行G的來源

gofunc創(chuàng)建G

當(dāng)開啟一個Goroutine的時候用到go func()這樣的語法桥状,在runtime下其實調(diào)用的就是newproc方法帽揪。

// runtime/proc.go

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    pc := getcallerpc(unsafe.Pointer(&siz))
    systemstack(func() {
        newproc1(fn, (*uint8)(argp), siz, 0, pc)
    })
}

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {
    ......
    
    _p_ := _g_.m.p.ptr()
    // 從當(dāng)前P里面復(fù)用一個空閑G
    newg := gfget(_p_)
    // 如果沒有空閑G則新建一個,默認(rèn)堆大小為_StackMin=2048 bytes
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        // 把新創(chuàng)建的G添加到全局allg里
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }

    ......
    
    if isSystemGoroutine(newg) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    newg.gcscanvalid = false
    casgstatus(newg, _Gdead, _Grunnable)

    // 把G放到P里的待運行隊列,第三參數(shù)設(shè)置為true辅斟,表示要放到runnext里转晰,作為優(yōu)先要執(zhí)行的G
    runqput(_p_, newg, true)

    // 如果有其它空閑P則嘗試喚醒某個M來執(zhí)行
    // 如果有M處于自璇等待P或G狀態(tài),放棄士飒。
    // NOTE: sched.nmspinning!=0說明正在有M被喚醒查邢,這里判斷sched.nmspinnin==0時才進入wakep是防止同時喚醒多個M
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
    
    ......
    
    return newg
}

newproc1方法中gfget先從空閑的G列表獲取一個G對象,沒有則創(chuàng)建一個新的G對象酵幕,然后runqput放到當(dāng)前P待運行隊列里扰藕。

epoll來源

回想上面分析搶占以及多線程下如何調(diào)度時都見到一個netpoll方法,這個方法就是從系統(tǒng)內(nèi)核獲取已經(jīng)有數(shù)據(jù)的時間芳撒,然后映射到對應(yīng)的G標(biāo)記ready邓深。下面看實現(xiàn):

// runtime/proc.go

func netpoll(block bool) *g {
    ......
    var events [128]epollevent
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        goto retry
    }
    var gp guintptr
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))

            netpollready(&gp, pd, mode)
        }
    }
    if block && gp == 0 {
        goto retry
    }
    return gp.ptr()
}

func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
    var rg, wg guintptr
    if mode == 'r' || mode == 'r'+'w' {
        rg.set(netpollunblock(pd, 'r', true))
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg.set(netpollunblock(pd, 'w', true))
    }
    if rg != 0 {
        rg.ptr().schedlink = *gpp
        *gpp = rg
    }
    if wg != 0 {
        wg.ptr().schedlink = *gpp
        *gpp = wg
    }
}

// 解鎖pd wait狀態(tài),標(biāo)記為pdReady,并返回
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set READY for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // 變量pd.rg在netpollblock的時候已經(jīng)指向了運行pd的G,因此old其實指向G的指針笔刹,而不是pdWait等等的狀態(tài)指針了
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0
            }
            return (*g)(unsafe.Pointer(old))
        }
    }
}

首先epollwait從內(nèi)核獲取到一批event芥备,也就拿到了有收到就緒的FD。netpoll的返回值是一個G鏈表舌菜,在該方法里只是把要被喚醒的G標(biāo)記ready萌壳,然后交給外部處理,例如sysmon中的代碼:

// runtime/proc.go

func sysmon() {
    ......
    
    for {
        ......
        
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        now := nanotime()
        if lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            gp := netpoll(false) // non-blocking - returns list of goroutines
            if gp != nil {
                ......
                
                incidlelocked(-1)
                // 把epoll ready的G列表注入到全局runq里
                injectglist(gp)
                incidlelocked(1)
            }
        }
        
        ......
    }
}

// 把G列表注入到全局runq里
func injectglist(glist *g) {
    ......
    
    lock(&sched.lock)
    var n int
    for n = 0; glist != nil; n++ {
        gp := glist
        glist = gp.schedlink.ptr()
        casgstatus(gp, _Gwaiting, _Grunnable)
        globrunqput(gp)
    }
    
    ......
}

netpoll返回的鏈表交給了injectglist日月,然后其實是放到了全局rung隊列中袱瓮,等待被調(diào)度。

epoll內(nèi)容較多爱咬,本章主要圍繞調(diào)度的話題討論尺借,在這里就不展開分析。

看幾個主動讓出CPU的場景

time.Sleep

當(dāng)代碼中調(diào)用time.Sleep的時候我們是要black住程序不在繼續(xù)往下執(zhí)行台颠,此時該goroutine不會做其他事情了褐望,理應(yīng)把CPU資源釋放出來勒庄,下面看下實現(xiàn):

// runtime/time.go

func timeSleep(ns int64) {
    if ns <= 0 {
        return
    }

    t := getg().timer
    if t == nil {
        t = new(timer)
        getg().timer = t
    }
    *t = timer{} // 每個定時任務(wù)都創(chuàng)建一個timer
    t.when = nanotime() + ns
    t.f = goroutineReady // 記錄喚醒該G的方法,喚醒時通過該方法執(zhí)行喚醒
    t.arg = getg()       // 把timer與當(dāng)前G關(guān)聯(lián),時間到了喚醒時通過該參數(shù)找到所在的G
    lock(&timers.lock)
    addtimerLocked(t)                                      // 把timer添加到最小堆里
    goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2) // 切到G0讓出CPU,進入休眠
}
// runtime/proc.go

func goparkunlock(lock *mutex, reason string, traceEv byte, traceskip int) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}

timeSleep函數(shù)里通過addtimerLocked把定時器加入到timer管理器(timer通過最小堆的數(shù)據(jù)結(jié)構(gòu)存放每個定時器,在這不做詳細說明)后瘫里,再通過goparkunlock實現(xiàn)把當(dāng)前G休眠实蔽,這里看到了上面提到的gopark方法進行調(diào)度循環(huán)的上下文切換。

上面介紹的是一個G如何進入到休眠狀態(tài)的過程谨读,該例子是個定時器局装,當(dāng)時間到了的話,當(dāng)前G就要被喚醒繼續(xù)執(zhí)行了劳殖。下面就介紹下喚醒的流程铐尚。

返回到最開始timeSleep方法里在進入調(diào)度方法之前有一個addtimerLocked方法,看下這個方法做了什么哆姻。

// runtime/time.go

func addtimerLocked(t *timer) {
    // when must never be negative; otherwise timerproc will overflow
    // during its delta calculation and never expire other runtime timers.
    if t.when < 0 {
        t.when = 1<<63 - 1
    }
    t.i = len(timers.t)
    timers.t = append(timers.t, t) //將當(dāng)前timer添加到timer管理器里
    siftupTimer(t.i)
    
    ......
    
    // 如果沒有啟動timer管理定時器宣增,則啟動。timerproc只會啟動一次矛缨,即全局timer管理器
    if !timers.created {
        timers.created = true
        go timerproc()
    }
}
// runtime/time.go

// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, it wakes timerproc early.
func timerproc() {
    timers.gp = getg()
    for {
        lock(&timers.lock)
        timers.sleeping = false
        now := nanotime()
        delta := int64(-1)
        for {
            if len(timers.t) == 0 {
                delta = -1
                break
            }
            t := timers.t[0]
            delta = t.when - now
            if delta > 0 {
                break
            }
            if t.period > 0 {
                // leave in heap but adjust next time to fire
                t.when += t.period * (1 + -delta/t.period)
                siftdownTimer(0)
            } else {
                // remove from heap
                last := len(timers.t) - 1
                if last > 0 {
                    timers.t[0] = timers.t[last]
                    timers.t[0].i = 0
                }
                timers.t[last] = nil
                timers.t = timers.t[:last]
                if last > 0 {
                    siftdownTimer(0)
                }
                t.i = -1 // mark as removed
            }
            f := t.f
            arg := t.arg
            seq := t.seq
            unlock(&timers.lock)
            if raceenabled {
                raceacquire(unsafe.Pointer(t))
            }
            f(arg, seq)
            lock(&timers.lock)
        }
        ......
    }
}

addtimerLocked方法的最下面有個邏輯在運行期間開啟了'全局時間事件驅(qū)動器'timerproc,該方法會全程遍歷最小堆爹脾,尋找最早進入timer管理器的定時器,然后喚醒箕昭。他是怎么找到要喚醒哪個G的灵妨?回頭看下timeSleep方法里把當(dāng)時正在執(zhí)行的G以及喚醒方法goroutineReady帶到了每個定時器里,而在timerproc則通過找到期的定時器執(zhí)行f(arg, seq)
即通過goroutineReady方法喚醒落竹。方法調(diào)用過程: goroutineReady() -> ready()

// runtime/time.go

func goroutineReady(arg interface{}, seq uintptr) {
    goready(arg.(*g), 0)
}
// runtime/proc.go

func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    if trace.enabled {
        traceGoUnpark(gp, traceskip)
    }

    status := readgstatus(gp)

    // Mark runnable.
    _g_ := getg()
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    if status&^_Gscan != _Gwaiting {
        dumpgstatus(gp)
        throw("bad g->status in ready")
    }

    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next)
    
    ......
}

在上面的方法里可以看到先把休眠的G從_Gwaiting切換到_Grunnable狀態(tài)泌霍,表明已經(jīng)可運行。然后通過runqput方法把G放到P的待運行隊列里述召,就進入到調(diào)度器的調(diào)度循環(huán)里了朱转。

總結(jié):time.Sleep想要進入阻塞(休眠)狀態(tài),其實是通過gopark方法給自己標(biāo)記個_Gwaiting狀態(tài)桨武,然后把自己所占用的CPU線程資源給釋放出來肋拔,繼續(xù)執(zhí)行調(diào)度任務(wù),調(diào)度其它的G來運行呀酸。而喚醒是通過把G更改回_Grunnable狀態(tài)后凉蜂,然后把G放入到P的待運行隊列里等待執(zhí)行。通過這點還可以看出休眠中的G其實并不占用CPU資源性誉,最多是占用內(nèi)存窿吩,是個很輕量級的阻塞。

sync.Mutex

// sync/mutex.go

func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    // 首先嘗試搶鎖错览,如果搶到則直接返回,并標(biāo)記mutexLocked狀態(tài)
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        // 嘗試自璇,但有如下幾個條件跳過自璇,這里的自璇是用戶態(tài)自璇,基本lock的cpu消耗都耗到這里了
        // 1.不在饑餓模式自璇
        // 2.超過4次循環(huán)纫雁,則不再自璇. (runtime_canSpin里面)
        // 3.全部P空閑時,不自璇.(runtime_canSpin里面)
        // 4.當(dāng)前P里無運行G時倾哺,不自璇.(runtime_canSpin里面)
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin() // doSpin其實就是用戶態(tài)自璇30次
            iter++
            old = m.state
            continue
        }
        
        ......
        
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            ......
            
            runtime_SemacquireMutex(&m.sema, queueLifo)                                     // 這里會再次自璇幾次,然后最后切換到g0把G標(biāo)記_Gwaiting狀態(tài)阻塞在這里
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 如果鎖等了1毫秒才被喚醒轧邪,才會標(biāo)記為饑餓模式
            old = m.state
            
            ......
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}
// runtime/sema.go

func sync_runtime_Semacquire(addr *uint32) {
    semacquire1(addr, false, semaBlockProfile)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
    ......
    
    for {
        ......
        
        // Any semrelease after the cansemacquire knows we're waiting
        // (we set nwait above), so go to sleep.
        root.queue(addr, s, lifo)                                     // 把當(dāng)前鎖的信息存起來以便以后喚醒時找到當(dāng)前G,G是在queue里面獲取的刽脖。
        goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) // 進行休眠,然后阻塞在這里
        if s.ticket != 0 || cansemacquire(addr) {
            break
        }
    }
}

// queue adds s to the blocked goroutines in semaRoot.
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
    s.g = getg() // 這里記錄了當(dāng)前的G忌愚,以便喚醒的時候找到要被喚醒的G
    s.elem = unsafe.Pointer(addr)
    s.next = nil
    s.prev = nil

    var last *sudog
    pt := &root.treap
    for t := *pt; t != nil; t = *pt {
        ......
        
        last = t
        if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
            pt = &t.prev
        } else {
            pt = &t.next
        }
    }

    ......

Mutex.Lock方法通過調(diào)用runtime_SemacquireMutex最終還是調(diào)用goparkunlock實現(xiàn)把G進入到休眠狀態(tài)曲管。在進入休眠之前先把自己加入到隊列里root.queue(addr, s, lifo),在queue方法里硕糊,記錄了當(dāng)前的G院水,以便以后找到并喚醒。

// sync/mutex.go

func (m *Mutex) Unlock() {
    ......
    
    if new&mutexStarving == 0 { // 如果不是饑餓模式
        old := new
        for {
            ......
            
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false) // 喚醒鎖
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        runtime_Semrelease(&m.sema, true) // 喚醒鎖
    }
}
// runtime/sema.go

func sync_runtime_Semrelease(addr *uint32, handoff bool) {
    semrelease1(addr, handoff)
}

func semrelease1(addr *uint32, handoff bool) {
    root := semroot(addr)
    s, t0 := root.dequeue(addr)
    if s != nil {
        atomic.Xadd(&root.nwait, -1)
    }
    
    ......
    
    if s != nil { // May be slow, so unlock first
        ......
        
        readyWithTime(s, 5)
    }
}

func readyWithTime(s *sudog, traceskip int) {
    if s.releasetime != 0 {
        s.releasetime = cputicks()
    }
    goready(s.g, traceskip)
}

Mutex. Unlock方法通過調(diào)用runtime_Semrelease最終還是調(diào)用goready實現(xiàn)把G喚醒简十。

channel

// runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 尋找一個等待中的receiver檬某,直接把值傳給這個receiver,繞過下面channel buffer螟蝙,
    // 避免從sender buffer->chan buffer->receiver buffer恢恼,而是直接sender buffer->receiver buffer,仍然做了內(nèi)存copy
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 如果沒有receiver等待:
    // 如果當(dāng)前chan里的元素個數(shù)小于環(huán)形隊列大小(也就是chan還沒滿),則把內(nèi)存拷貝到channel buffer里胶逢,然后直接返回厅瞎。
    // 注意dataqsiz是允許為0的,當(dāng)為0時初坠,也不存在該if里面的內(nèi)存copy
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx) // 獲取即將要寫入的chan buffer的指針地址
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        // 把元素內(nèi)存拷貝進去.
        // 注意這里產(chǎn)生了一次內(nèi)存copy,也就是說如果沒有receiver的話,就一定會產(chǎn)生內(nèi)存拷貝
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++ // 發(fā)送索引+1
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++ // 隊列元素計數(shù)器+1
        unlock(&c.lock)
        return true
    }

    if !block { // 如果是非阻塞的彭雾,到這里就可以結(jié)束了
        unlock(&c.lock)
        return false
    }

    // ########下面是進入阻塞模式的如何實現(xiàn)阻塞的處理邏輯

    // Block on the channel. Some receiver will complete our operation for us.
    // 把元素相關(guān)信息碟刺、當(dāng)前的G信息打包到一個sudog里,然后扔進send隊列
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp // 把當(dāng)前G也扔進sudog里,用于別人喚醒該G的時候找到該G
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg // 記錄當(dāng)前G正在等待的sudog
    gp.param = nil
    c.sendq.enqueue(mysg)
    // 切換到g0薯酝,把當(dāng)前G切換到_Gwaiting狀態(tài)半沽,然后喚醒lock.
    // 此時當(dāng)前G被阻塞了,P就繼續(xù)執(zhí)行其它G去了.
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    ......
    
    return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ......
    
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

當(dāng)給一個chan發(fā)送消息的時候,實質(zhì)觸發(fā)的方法是chansend吴菠。在該方法里不是先進入休眠狀態(tài)者填。

1)如果此時有接收者接收這個chan的消息則直接把數(shù)據(jù)通過send方法扔給接收者,并喚醒接收者的G做葵,然后當(dāng)前G則繼續(xù)執(zhí)行占哟。

2)如果沒有接收者,就把數(shù)據(jù)copy到chan的臨時內(nèi)存里酿矢,且內(nèi)存沒有滿就繼續(xù)執(zhí)行當(dāng)前G榨乎。

  1. 如果沒有接收者且chan滿了,依然是通過goparkunlock方法進入休眠瘫筐。在休眠前把當(dāng)前的G相關(guān)信息存到隊列(sendq)以便有接收者接收數(shù)據(jù)的時候喚醒當(dāng)前G蜜暑。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......
    
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        // 尋找一個正在等待的sender
        // 如果buffer size是0,則嘗試直接從sender獲取(這種情況是在環(huán)形隊列長度(dataqsiz)為0的時候出現(xiàn))
        // 否則(buffer full的時候)從隊列head接收策肝,并且?guī)椭鷖ender在隊列滿時的阻塞的元素信息拷貝到隊列里肛捍,然后將sender的G狀態(tài)切換為_Grunning,這樣sender就不阻塞了隐绵。
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 如果有數(shù)據(jù)則從channel buffer里獲取數(shù)據(jù)后返回(此時環(huán)形隊列長度dataqsiz!=0)
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx) // 獲取即將要讀取的chan buffer的指針地址
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp) // copy元素數(shù)據(jù)內(nèi)存到channel buffer
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // ##########下面是無任何數(shù)據(jù)準(zhǔn)備把當(dāng)前G切換為_Gwaiting狀態(tài)的邏輯

    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // 釋放了鎖,然后把當(dāng)前G切換為_Gwaiting狀態(tài)拙毫,阻塞在這里等待有數(shù)據(jù)進來被喚醒
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    ......
    
    return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ......
    
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}

chanrecv方法是在chan接收者的地方調(diào)用的方法氢橙。

1)如果有發(fā)送者被休眠,則取出數(shù)據(jù)然后喚醒發(fā)送者恬偷,當(dāng)前接收者的G拿到數(shù)據(jù)繼續(xù)執(zhí)行悍手。

2)如果沒有等待的發(fā)送者就看下有沒有發(fā)送的數(shù)據(jù)還沒被接收,有的話就直接取出數(shù)據(jù)然后返回袍患,當(dāng)前接收者的G拿到數(shù)據(jù)繼續(xù)執(zhí)行坦康。(注意:這里取的數(shù)據(jù)不是正在等待的sender的數(shù)據(jù),而是從chan的開頭的內(nèi)存取诡延,如果是sender的數(shù)據(jù)則讀出來的數(shù)據(jù)順序就亂了)

3)如果即沒有發(fā)送者滞欠,chan里也沒數(shù)據(jù)就通過goparkunlock進行休眠,在休眠之前把當(dāng)前的G相關(guān)信息存到recvq里面肆良,以便有數(shù)據(jù)時找到要喚醒的G筛璧。

image.png

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市惹恃,隨后出現(xiàn)的幾起案子夭谤,更是在濱河造成了極大的恐慌,老刑警劉巖巫糙,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朗儒,死亡現(xiàn)場離奇詭異,居然都是意外死亡参淹,警方通過查閱死者的電腦和手機醉锄,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來浙值,“玉大人恳不,你說我怎么就攤上這事】牛” “怎么了烟勋?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長负蚊。 經(jīng)常有香客問我神妹,道長,這世上最難降的妖魔是什么家妆? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任鸵荠,我火速辦了婚禮,結(jié)果婚禮上伤极,老公的妹妹穿的比我還像新娘蛹找。我一直安慰自己姨伤,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布庸疾。 她就那樣靜靜地躺著乍楚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪届慈。 梳的紋絲不亂的頭發(fā)上徒溪,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機與錄音金顿,去河邊找鬼臊泌。 笑死,一個胖子當(dāng)著我的面吹牛揍拆,可吹牛的內(nèi)容都是我干的渠概。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼嫂拴,長吁一口氣:“原來是場噩夢啊……” “哼播揪!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起筒狠,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤猪狈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后窟蓝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體罪裹,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年运挫,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片套耕。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡谁帕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出冯袍,到底是詐尸還是另有隱情匈挖,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布康愤,位于F島的核電站儡循,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏征冷。R本人自食惡果不足惜择膝,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望检激。 院中可真熱鬧肴捉,春花似錦腹侣、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至窃页,卻和暖如春跺株,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脖卖。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工乒省, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人胚嘲。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓作儿,卻偏偏與公主長得像,于是被迫代替她去往敵國和親馋劈。 傳聞我的和親對象是個殘疾皇子攻锰,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 下課,已是21:00妓雾!我已經(jīng)在嘀嗒軟件叫好了順風(fēng)車娶吞! “你怎么還沒下來了啊械姻?我都在門口繞...
    盧菲絲小姐閱讀 648評論 6 5
  • 暑假怎么過妒蛇,這是大多數(shù)父母比較頭疼的問題,孩子在家天天看電視上網(wǎng)楷拳,玩绣夺,睡覺,吃零食欢揖,自己看著都無奈陶耍,其實孩子自己也...
    明月幾時照閱讀 209評論 1 3
  • 是這個課程里畫得最不好看的一張烈钞。。 我坤按,討厭毯欣,畫花朵,啊啊啊啊啊啊啊?? (??  ̄?? ̄? )?? ??
    文案喵柒言閱讀 350評論 0 4
  • 清新的早晨 鳥語花香 溫暖的陽光 照耀著我的心房 快樂的心情 隨著動聽的歌聲 在藍天上 飛向夢想詩與遠方 久別的暖...
    呂四乃閱讀 207評論 0 6