源碼
- 版本
- 1.14.1
- 相關(guān)目錄
- runtime/asm_amd64.s
- runtime/proc.go
- runtime/runtime2.go
關(guān)鍵概念
- G - 我們代碼寫(xiě)的go func(){ }
- M - 內(nèi)核線(xiàn)程
- P - M調(diào)度G的上下文, P中存儲(chǔ)了很多G,M通過(guò)調(diào)用P來(lái)獲取并執(zhí)行G漓藕。為了方便,下文中稱(chēng)它為==局部調(diào)度器==
- schedt - 全局調(diào)度器殴边,主要存儲(chǔ)了一些空閑的G识腿、M冤吨、P
G、M、P、schedt之間的關(guān)系
graph TB;
subgraph schedt
A(空閑G集合)
B(runable G集合)
C(空閑P集合)
D(空閑M集合)
end
graph TB;
subgraph M
A(running G)
end
subgraph P
B(空閑G集合)
C(runable G集合)
end
A -- M執(zhí)行完G,放入空閑集合 --> B
A -- M從P獲取可運(yùn)行G --> C
D[M] -.-> E[P]
D2[M] -.-> E2[P]
D3[M] -.-> E3[P]
關(guān)鍵數(shù)據(jù)結(jié)構(gòu)
G
/// runtime/runtime2.go 關(guān)鍵字段
type g struct {
stack stack // g自己的棧
m *m // 執(zhí)行當(dāng)前g的m
sched gobuf // 保存了g的現(xiàn)場(chǎng),goroutine切換時(shí)通過(guò)它來(lái)恢復(fù)
atomicstatus uint32 // g的狀態(tài)Gidle,Grunnable,Grunning,Gsyscall,Gwaiting,Gdead
goid int64
schedlink guintptr // 下一個(gè)g, g鏈表
preempt bool //搶占標(biāo)記
lockedm muintptr // 鎖定的M,g中斷恢復(fù)指定M執(zhí)行
gopc uintptr // 創(chuàng)建該goroutine的指令地址
startpc uintptr // goroutine 函數(shù)的指令地址
}
這里先介紹下G的各個(gè)狀態(tài)奏属、
- Gidle 被創(chuàng)建但沒(méi)初始換
- Grunnable 可運(yùn)行
- Grunning 正在運(yùn)行
- Gsyscall 正在系統(tǒng)調(diào)用
- Gwaiting 正在等待
- Gdead 運(yùn)行完成
M
/// runtime/runtime2.go 關(guān)鍵字段
type m struct {
g0 *g // g0, 每個(gè)M都有自己獨(dú)有的g0
curg *g // 當(dāng)前正在運(yùn)行的g
p puintptr // 當(dāng)前用于的p
nextp puintptr // 當(dāng)m被喚醒時(shí),首先擁有這個(gè)p
id int64
spinning bool // 是否處于自旋
park note
alllink *m // on allm
schedlink muintptr // 下一個(gè)m, m鏈表
mcache *mcache // 內(nèi)存分配
lockedg guintptr // 和 G 的lockedm對(duì)應(yīng)
freelink *m // on sched.freem
}
P
/// runtime/runtime2.go 關(guān)鍵字段
type p struct {
id int32
status uint32 // 狀態(tài)
link puintptr // 下一個(gè)P, P鏈表
m muintptr // 擁有這個(gè)P的M
mcache *mcache
// P本地runnable狀態(tài)的G隊(duì)列
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr // 一個(gè)比runq優(yōu)先級(jí)更高的runnable G
// 狀態(tài)為dead的G鏈表潮峦,在獲取G時(shí)會(huì)從這里面獲取
gFree struct {
gList
n int32
}
gcBgMarkWorker guintptr // (atomic)
gcw gcWork
}
這里先介紹下P的各個(gè)狀態(tài)
- Pidle:沒(méi)有關(guān)聯(lián)的M
- Prunning:已和某個(gè)M關(guān)聯(lián)
- Psyscall:當(dāng)前P中的被運(yùn)行的那個(gè)G正在進(jìn)行系統(tǒng)調(diào)用
- Pgcstop: 系統(tǒng)正在GC
- Pdead: 當(dāng)前P不再使用
schedt
/// runtime/runtime2.go 關(guān)鍵字段
type schedt struct {
lock mutex
midle muintptr // 空閑M鏈表
nmidle int32 // 空閑M數(shù)量
nmidlelocked int32 // 被鎖住的M的數(shù)量
mnext int64 // 已創(chuàng)建M的數(shù)量囱皿,以及下一個(gè)M ID
maxmcount int32 // 允許創(chuàng)建最大的M數(shù)量
nmsys int32 // 不計(jì)入死鎖的M數(shù)量
nmfreed int64 // 累計(jì)釋放M的數(shù)量
pidle puintptr // 空閑的P鏈表
npidle uint32 // 空閑的P數(shù)量
runq gQueue // 全局runnable的G隊(duì)列
runqsize int32 // 全局runnable的G數(shù)量
// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
// freem is the list of m's waiting to be freed when their
// m.exited is set. Linked through m.freelink.
freem *m
}
啟動(dòng)
/// runtime/asm_amd64.s
get_tls(BX)
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// save m->g0 = g0
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
CLD // convention is D is always left cleared
CALL runtime·check(SB)
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB) // 命令行初始化,獲取參數(shù)
CALL runtime·osinit(SB) // OS初始化
CALL runtime·schedinit(SB)
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry 對(duì)應(yīng)runtime·main
PUSHQ AX
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
....
DATA runtime·mainPC+0(SB)/8,$runtime·main(SB)
匯編語(yǔ)言我也看不懂,看字面意思可知道啟動(dòng)流程:
1.創(chuàng)建g0
2.創(chuàng)建m0
3.m.g0 = g0
4.g0.m = m0
5.命令行初始化忱嘹,OS初始化
6.schedinit 調(diào)度器初始化
7.newproc 將runtime.main作為參數(shù)創(chuàng)建goroutine
8.mstart
所以g0和m0是通過(guò)匯編指令創(chuàng)建的嘱腥,并將m0賦值給g0.m
schedinit 調(diào)度器初始化
///runtime/proc.go
func schedinit() {
// 獲取g0
_g_ := getg()
...
// 設(shè)置最大 M 數(shù)量
sched.maxmcount = 10000
...
// 棧和內(nèi)存初始化
stackinit()
mallocinit()
...
// 初始化當(dāng)前 M
mcommoninit(_g_.m)
...
//參數(shù)和環(huán)境初始化
goargs()
goenvs()
...
// 設(shè)置 P 的數(shù)量
procs := ncpu
// 通過(guò)環(huán)境變量設(shè)置P的數(shù)量
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
...
}
調(diào)度器初始化的主要工作: 空間申請(qǐng)、M的最大數(shù)量設(shè)置拘悦、P的數(shù)量設(shè)置齿兔、初始化參數(shù)和環(huán)境
newproc 創(chuàng)建goroutine
func newproc(siz int32, fn *funcval) {
// sys.PtrSize = 8, 表示跳過(guò)函數(shù)指針, 獲取第一個(gè)參數(shù)的地址
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg() // 獲取當(dāng)前g
pc := getcallerpc() // 獲取下一條要執(zhí)行的指令地址
// 用 g0 的棧創(chuàng)建 G
// systemstack 會(huì)切換當(dāng)前的 g 到 g0, 并且使用g0的棿∶祝空間
systemstack(func() {
newproc1(fn, argp, siz, gp, pc)
})
}
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
...
_p_ := _g_.m.p.ptr() // 獲取P
newg := gfget(_p_) // 在P或者sched中獲取空閑的G, 在這里也就是主goroutine
if newg == nil { // 獲取失敗就創(chuàng)建一個(gè)新的
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
...
// 將runtime.main地址存儲(chǔ)在主goroutine的sched中
gostartcallfn(&newg.sched, fn)
...
runqput(_p_, newg, true) // 將runnable的newg放入P中
...
}
這里先不分析具體實(shí)現(xiàn)分苇,在啟動(dòng)階段將runtime.main作為入?yún)?chuàng)建G,也就是創(chuàng)建一個(gè)G來(lái)運(yùn)行runtime.main。
runtime.main
func main() {
...
// 64位系統(tǒng) 棧的最大空間為 1G, 32為系統(tǒng) 為 250M
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}
...
fn := main_main // 這就是我們代碼main包的main函數(shù)
fn() // 運(yùn)行我們的main函數(shù)
...
}
這里就可以調(diào)用我們main.main了屁桑,所以上面的newproc是為了給我們的main.main創(chuàng)建一個(gè)主goroutine医寿。
現(xiàn)在有了主goroutine,那怎么啟動(dòng)這個(gè)goroutine呢蘑斧?-- mstart
mstart
func mstart() {
...
mstart1()
...
}
func mstart1() {
...
schedule()
}
func schedule() {
_g_ := getg()
...
top:
pp := _g_.m.p.ptr()
...
var gp *g
...
// 從sched或者P或獲取G,啟動(dòng)階段至此一個(gè)產(chǎn)生了兩個(gè)G:g0和main的G靖秩。g0不會(huì)存在sched和P中,所以這里獲取的是main的G
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
...
// 在上面我們已經(jīng)將runtime.main的地址存在gp.sched中竖瘾。這里就調(diào)用runtime.main沟突。
gogo(&gp.sched)
}
mstart經(jīng)過(guò)一系列的調(diào)用,最終通過(guò)gogo(&gp.sched)調(diào)用了runtime.main准浴。在runtime.main中又調(diào)用了mian.main事扭,至此啟動(dòng)結(jié)束捎稚。
啟動(dòng)小結(jié)
graph TB;
A["創(chuàng)建G0乐横、M0, g0.m = m0"] --> C
C[初始化命令行和OS]-->D
D["schedinit:設(shè)置M最大數(shù)量求橄、P個(gè)數(shù)、棧和內(nèi)存初始化"] --> E
E["newproc:為main.main創(chuàng)建一個(gè)主goroutine"] --> F
F["mstart:運(yùn)行主goroutine --> 運(yùn)行main.main"]
==下面開(kāi)始分析G葡公、M罐农、P==
G 創(chuàng)建
func newproc(siz int32, fn *funcval) {
// sys.PtrSize = 8, 表示跳過(guò)函數(shù)指針, 獲取第一個(gè)參數(shù)的地址
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg() // 獲取當(dāng)前g
pc := getcallerpc() // 獲取下一條要執(zhí)行的指令地址
// 用 g0 的棧創(chuàng)建 G
// systemstack 會(huì)切換當(dāng)前的 g 到 g0, 并且使用g0的棿呤玻空間
systemstack(func() {
newproc1(fn, argp, siz, gp, pc)
})
}
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg() //獲取當(dāng)前g,也就是g0涵亏。因?yàn)樯厦娴膕ystemstack會(huì)切換到g0
if fn == nil { // fn空?qǐng)?bào)錯(cuò)
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem() // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// 參數(shù)大小不能大于 2048 - 4 * 8 - 8 = 2000
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
_p_ := _g_.m.p.ptr()
newg := gfget(_p_) // 在P中獲取G
// 如果沒(méi)獲取到則創(chuàng)建一個(gè)新的G,并設(shè)置成dead狀態(tài),加入全局的allgs
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
sp := newg.stack.hi - totalSize // 棧頂?shù)刂? spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
if narg > 0 {
// 將參數(shù)壓入G的棧
memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
if stkmap.nbit > 0 {
// We're in the prologue, so it's always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
}
}
}
// 清除G的運(yùn)行現(xiàn)場(chǎng)蒲凶,因?yàn)镚有可能是從P中獲取的气筋,清除原有的數(shù)據(jù)
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
// 重新對(duì)現(xiàn)場(chǎng)復(fù)制
newg.sched.sp = sp
newg.stktopsp = sp
// 將pc指向goexit。這個(gè)很重要旋圆,G運(yùn)行完時(shí)會(huì)執(zhí)行它宠默,實(shí)際將調(diào)用goexit1
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 其實(shí)在這里面真正pc指向的是fn, 而上面的goexit被用于sp,當(dāng)RET的時(shí)候pop出goexit
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
// 狀態(tài)設(shè)為runnable
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
// 設(shè)置id,
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
// 加入P的runable數(shù)組
runqput(_p_, newg, true)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
releasem(_g_.m)
}
從P獲取空閑的G,如果沒(méi)有則生成新的G。把參數(shù)復(fù)制到G的棧灵巧,pc: fn, sp:goexit,把G設(shè)為runable,并加入P的runable數(shù)組
如何從P獲取G: gfget(p)
func gfget(_p_ *p) *g {
retry:
// 如果P的本地空閑鏈表為空&全局空閑鏈表不為空
if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// 從全局移一批到本地空閑鏈表
for _p_.gFree.n < 32 {
// Prefer Gs with stacks.
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
_p_.gFree.push(gp)
_p_.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
// 從本地空閑鏈表pop一個(gè)G
gp := _p_.gFree.pop()
if gp == nil {
return nil
}
_p_.gFree.n--
if gp.stack.lo == 0 {
// Stack was deallocated in gfput. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
if raceenabled {
racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if msanenabled {
msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
}
return gp
}
從P的本地空閑鏈表獲取G搀矫。如果本地空閑鏈表為空,則從全局空閑鏈表移一批到本地刻肄。
加入本地runabled列表瓤球,runqput(p, newg, true)
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
if next {
retryNext:
// 把g設(shè)為runnext, 之前的runnext加入runable隊(duì)列
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
// 本地runable隊(duì)列未滿(mǎn),加入本地
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 本地runable隊(duì)列已滿(mǎn)敏弃,移一半到全局runable隊(duì)列
if runqputslow(_p_, gp, h, t) {
return
}
// 移了一部分到全局卦羡,所以本地隊(duì)列未滿(mǎn),再次嘗試加入本地隊(duì)列
goto retry
}
把本地runable隊(duì)列一半到全局 runqputslow
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
// 如果head - tail 不等于 1/2权她,說(shuō)明出問(wèn)題了
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
// 從head開(kāi)始遍歷 1/2 加入batch中
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
// 并沒(méi)有從隊(duì)列中刪除虹茶,只是修改了head
if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp
if randomizeScheduler {
// 重新排序
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// Link the goroutines.
for i := uint32(0); i < n; i++ {
// 各個(gè)G連在一起
batch[i].schedlink.set(batch[i+1])
}
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])
// 全局隊(duì)列可能其他P也會(huì)操作,所以加鎖
lock(&sched.lock)
// 加入全局runable隊(duì)列
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
}
runable入隊(duì)列:
- 替換runnext
- 本地隊(duì)列未滿(mǎn):
- 加入本地隊(duì)列
- 本地隊(duì)列已滿(mǎn):
- 本地隊(duì)列移一半到全局隊(duì)列
- 再次嘗試加入本地隊(duì)列
G小結(jié)
graph TB;
A["go func() {} 我們寫(xiě)的goroutine"] --> B
B["newproc 獲取func和參數(shù)"] -- "切換到g0,使用g0椨缫空間" --> C
C[newproc1] --> D[gfget 從當(dāng)前P獲取空閑G]
D --> E{"P空&全局不空"}
E -- Y --> F[全局移32個(gè)到P本地]
E -- N --> G
F --> G[本地取出空閑G,初始化椇铮空間]
G --> H{"獲取空閑G成功"}
H -- N --> I[創(chuàng)建G,初始化棧空間, 加入全局G數(shù)組]
H -- Y --> J
I --> J["參數(shù)復(fù)制到棧,清除堆,pc:func,sp:goexit1"]
J --> K["狀態(tài)設(shè)為runable,設(shè)置goid"]
K -- runqput 加入當(dāng)前P的runable隊(duì)列 --> L[用g替換runnext]
L --> M{ 本地runable隊(duì)列滿(mǎn) }
M -- Y --> N[本地runbale隊(duì)列移一半到全局]
M -- N --> O[加入本地runable隊(duì)列]
N --> M
O --> P{有空閑P&沒(méi)有自旋的M }
P -- Y --> Q["wakep()"]
現(xiàn)在有了G,那存放G的P又是怎么來(lái)的呢步清?procresize
在啟動(dòng)的時(shí)候有一個(gè)環(huán)節(jié)是schedinit
func schedinit() {
...
// 修改P的個(gè)數(shù)
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
func procresize(nprocs int32) *p {
old := gomaxprocs
// P個(gè)數(shù)必須大于0
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
// 截?cái)嗷驍U(kuò)容allp, allp全局存了所有的P
if nprocs > int32(len(allp)) {
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
// 創(chuàng)建新增的P
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
// 初始化要门,狀態(tài)設(shè)為Pgcstop
pp.init(i)
// 存入allp
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg()
// 如果當(dāng)前的P.id < nprocs,說(shuō)明還在allp里面,繼續(xù)運(yùn)行
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
if _g_.m.p != 0 {
if trace.enabled {
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
//將當(dāng)前P和M取消關(guān)聯(lián)
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
// 再將當(dāng)前M和allp的第一個(gè)P關(guān)聯(lián),并設(shè)置為Prunning
acquirep(p)
if trace.enabled {
traceGoStart()
}
}
// 釋放多余的P
for i := nprocs; i < old; i++ {
p := allp[i]
// 這里的釋放工作不是直接刪除回收p,而是主要把p中的可運(yùn)行和空閑的G移到全局去
p.destroy()
}
// 如果長(zhǎng)度不相等廓啊,裁剪allp
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
// 當(dāng)前P繼續(xù)
if _g_.m.p.ptr() == p {
continue
}
// 將P設(shè)為Pidle,也就是不和M關(guān)聯(lián)
p.status = _Pidle
// 如果P沒(méi)有可運(yùn)行的G, 那么將P加入全局空閑P鏈表
if runqempty(p) {
pidleput(p)
} else {
// 重新設(shè)置M
p.m.set(mget())
// 將allp的各個(gè)P連起來(lái)
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs
// 更新gomaxprocs, P個(gè)數(shù)
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}
func (pp *p) destroy() {
// 將p的runable移到全局去
for pp.runqhead != pp.runqtail {
// Pop from tail of local queue
pp.runqtail--
gp := pp.runq[pp.runqtail%uint32(len(pp.runq))].ptr()
// Push onto head of global queue
globrunqputhead(gp)
}
// runnext移到全局
if pp.runnext != 0 {
globrunqputhead(pp.runnext.ptr())
pp.runnext = 0
}
...
// If there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up.
if gp := pp.gcBgMarkWorker.ptr(); gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
globrunqput(gp)
// This assignment doesn't race because the
// world is stopped.
pp.gcBgMarkWorker.set(nil)
}
...
// 釋放p的內(nèi)存
freemcache(pp.mcache)
pp.mcache = nil
// P的gFree鏈表移到全局
gfpurge(pp)
...
pp.status = _Pdead
}
P小結(jié)
通過(guò)啟動(dòng)時(shí)候的schedinit調(diào)用procresize生成對(duì)應(yīng)個(gè)數(shù)的P州丹。因?yàn)榭梢酝ㄟ^(guò)runtime.GOMAXPROCS來(lái)動(dòng)態(tài)修改P的個(gè)數(shù),所以在procresize中會(huì)對(duì)P數(shù)組進(jìn)行調(diào)整瓢娜,或新增P或減少P识窿。被減少的P會(huì)將自身的runable、runnext第步、gfee移到全局去疮装。
- 如果當(dāng)前P不在多余的P中缘琅,則狀態(tài)為running
- 如果當(dāng)前P在多余的P中,則將當(dāng)前M和P解綁廓推,再將M和P數(shù)組的第一P綁定刷袍,并設(shè)為running
- 除了當(dāng)前P外;所有P都設(shè)為idle樊展,如果P中沒(méi)有runnable,則將P加入全局空閑P,否則獲取全局空閑M和P綁定呻纹。
M從何而來(lái)?
在創(chuàng)建G的時(shí)候會(huì)根據(jù)情況是否創(chuàng)新M
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) {
...
// 全局存在空閑P且沒(méi)有自旋的M
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
...
}
func wakep() {
// be conservative about spinning threads
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
_p_ = pidleget()
// M與P綁定专缠,沒(méi)有了P 當(dāng)然也就不需要生成M了
if _p_ == nil {
unlock(&sched.lock)
if spinning {
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
// 全局獲取空閑M
mp := mget()
unlock(&sched.lock)
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
// 沒(méi)有空閑M 就重新生成一個(gè)
newm(fn, _p_)
return
}
...
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning
mp.nextp.set(_p_) // nextP指向P
notewakeup(&mp.park) // 喚醒M
}
創(chuàng)建G時(shí)雷酪,如果全局存在空閑P且沒(méi)有自旋的M,則獲取M和空閑P綁定。
- 如果全局存在空閑M:獲取空閑M涝婉,綁定P,喚醒M
- 如果全局不存在空閑M: 新生成M
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) // 新生成M, 并創(chuàng)建M的g0
mp.nextp.set(_p_) // nextp指向p
mp.sigmask = initSigmask
...
newm1(mp)
}
func newm1(mp *m) {
...
execLock.rlock() // Prevent process clone.
newosproc(mp)
execLock.runlock()
}
func newosproc(mp *m) {
...
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
// 創(chuàng)建線(xiàn)程和M綁定太闺,最終會(huì)調(diào)用mstart
err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp))
sigprocmask(_SIG_SETMASK, &oset, nil)
...
}
創(chuàng)建M和P綁定,創(chuàng)建線(xiàn)程和M綁定嘁圈,最終將調(diào)用mstart
func mstart() {
...
mstart1()
...
}
func mstart1() {
...
schedule()
...
}
func schedule() {
...
top:
pp := _g_.m.p.ptr()
pp.preempt = false
...
// 下面就是在各個(gè)地方獲取runable的G
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
if gp == nil {
// 以一定頻率從全局獲取runable G省骂。 平衡本地和全局
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 從本地獲取runable G
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
// 阻塞直到獲取runable。從本地最住、全局钞澳、網(wǎng)絡(luò)、其他P中獲取涨缚。
// 如果沒(méi)有可取的runable,則M進(jìn)入休眠,
gp, inheritTime = findrunnable()
}
if _g_.m.spinning {
resetspinning()
}
...
execute(gp, inheritTime)
}
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// g和m相互綁定轧粟,并設(shè)為running
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
...
// 運(yùn)作g,gp.sched的pc是我們寫(xiě)的func, sp是goexit1。
// 執(zhí)行完func,RET的時(shí)候彈出的是goexit1脓魏。
gogo(&gp.sched)
}
func goexit1() {
...
// goexit1 調(diào)用的是goexit0
mcall(goexit0)
}
func goexit0(gp *g) {
_g_ := getg()
// g執(zhí)行完了狀態(tài)改為dead
casgstatus(gp, _Grunning, _Gdead)
...
// 清除g的各種信息
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
...
// g 和 m相互解綁
dropg()
...
// 將g放入本地空閑鏈表兰吟。 如果本地空閑個(gè)數(shù)大于64個(gè),則移一半到全局去
gfput(_g_.m.p.ptr(), gp)
...
// 這里又開(kāi)始調(diào)用上面的schedule了茂翔。所以M是在不斷獲取G混蔼,執(zhí)行G
schedule()
}
M小結(jié)
graph TB;
A["有空閑P&沒(méi)有自旋的M"] --> B["wakep()"]
B -- startm --> B2[全局獲取空閑P]
B2--> C[全局獲取空閑M]
C --> D{獲取成功}
D -- Y --> E[M和P綁定]
D -- N --> F["創(chuàng)建M"]
F --> E
E --> G[喚醒M]
G --> H[mstart / mstart1]
H --> I[schedule]
I --> J[在P本地或全局獲取runbale G]
J --> K{獲取成功}
K -- Y --> L
K -- N --> M["反復(fù)在本地、全局珊燎、網(wǎng)絡(luò)惭嚣、其他P中獲取runable G"]
M --> N{獲取成功}
N -- Y --> L["G和M相互綁定,G設(shè)為running"]
L --> O["匯編執(zhí)行G的pc:func悔政。執(zhí)行完RET彈出sp:goexit1"]
O --> P[goexit1 / goexit0]
P --> Q["將G狀態(tài)設(shè)為dead,清除G的各種信息晚吞,G和M相互解綁"]
Q --> R["G放入本地空閑鏈表。如果本地空閑個(gè)數(shù)大于64個(gè)谋国,則移一半到全局去"]
R --> I
N -- N --> S["MP解綁槽地,P加入全局空閑P,M加入全局空閑M"]
S --> T[M進(jìn)入睡眠]
就這么完了? 不,還有一個(gè)獨(dú)立的M
在啟動(dòng)階段創(chuàng)建了一個(gè)M執(zhí)行sysmon
func main() {
...
if GOARCH != "wasm" {
systemstack(func() {
newm(sysmon, nil)
})
}
...
}
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead()
unlock(&sched.lock)
lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0) // 間隔時(shí)間 20us ~ 10ms
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
now := nanotime()
next, _ := timeSleepUntil() //所有P中timer最先到時(shí)的時(shí)間
// 正在STW或者所有P都處于空閑時(shí)捌蚊,sysmon休眠一會(huì)
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
if next > now {
atomic.Store(&sched.sysmonwait, 1)
...
// sysmon休眠一會(huì)
notetsleep(&sched.sysmonnote, sleep)
...
atomic.Store(&sched.sysmonwait, 0)
noteclear(&sched.sysmonnote)
}
idle = 0
delay = 20
}
unlock(&sched.lock)
}
// trigger libc interceptors if needed
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
// 如果超過(guò)10ms
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // 獲取網(wǎng)絡(luò)事件的Gs
if !list.empty() {
incidlelocked(-1)
// 將這些G設(shè)為runable,加入全局runable
// 如果存在空閑P,則調(diào)用startm運(yùn)行他們
injectglist(&list)
incidlelocked(1)
}
}
// 有timer到時(shí)了弯洗,啟動(dòng)M去執(zhí)行
if next < now {
startm(nil, false)
}
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
// 2分鐘或GC標(biāo)記堆達(dá)到一定大小,觸發(fā)垃圾回收
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
lock(&forcegc.lock)
forcegc.idle = 0
var list gList
list.push(forcegc.g)
injectglist(&list)
unlock(&forcegc.lock)
}
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
}
}
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// G運(yùn)行時(shí)間超過(guò)10ms,進(jìn)行搶占逢勾。
// 其實(shí)這只針對(duì)于running,因?yàn)閟yscall的P沒(méi)有M
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(_p_)
sysretake = true
}
}
if s == _Psyscall {
...
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
...
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
...
n++
_p_.syscalltick++
// 將P交由其他M
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
sysmon小結(jié)
graph TB;
A["啟動(dòng)階段創(chuàng)建獨(dú)立M"] -- sysmon --> B{"STW || 所有P空閑"}
B --Y--> C[睡眠一會(huì)兒]
B --N--> D{超過(guò)10ms}
C --> D
D --Y-->E["從netpoll獲取G,加入全局runable。如有空閑P藐吮,startM運(yùn)行他們"]
D --N--> F["如有timer到時(shí)溺拱,startM運(yùn)行"]
E --> F
F --retake--> G[循環(huán)所有P]
G -.running:超過(guò)10ms.-> H[preempton搶占]
H -.syscall:有work或小于10ms .-> I[handoffp將P交由其他M]
I -.-> G
I --> J["2分鐘或GC標(biāo)記堆達(dá)到一定大小,觸發(fā)垃圾回收"]
J -- "20us~10ms" --> B