1檩坚、進程/線程/協(xié)程基本概念
一個進程可以有多個線程着撩,一般情況下固定2MB內(nèi)存塊來做棧,用來保存當前被調(diào)用/掛起的函數(shù)內(nèi)部的變量匾委,CPU在執(zhí)行調(diào)度的時候切換的是線程拖叙,如果下一個線程也是當前進程的,就只有線程切換赂乐,“很快”就能完成薯鳍;如果下一個線程不是當前的進程,就需要切換進程挨措,這就得費點時間了挖滤。
線程分為內(nèi)核態(tài)線程和用戶態(tài)線程崩溪,用戶態(tài)線程需要綁定內(nèi)核態(tài)線程,CPU并不能感知用戶態(tài)線程的存在斩松,它只知道它在運行1個線程伶唯,這個線程實際是內(nèi)核態(tài)線程。
用戶態(tài)線程實際有個名字叫協(xié)程(co-routine)惧盹,為了容易區(qū)分抵怎,我們使用協(xié)程指用戶態(tài)線程,使用線程指內(nèi)核態(tài)線程岭参。
協(xié)程跟線程是有區(qū)別的反惕,線程由CPU調(diào)度是搶占式的,協(xié)程由用戶態(tài)調(diào)度是協(xié)作式的演侯,一個協(xié)程讓出CPU后姿染,才執(zhí)行下一個協(xié)程。
協(xié)程和線程綁定關系有以下3種:
N:1秒际,N個協(xié)程綁定1個線程悬赏,優(yōu)點就是協(xié)程在用戶態(tài)線程即完成切換,不會陷入到內(nèi)核態(tài)娄徊,這種切換非常的輕量快速闽颇。但也有很大的缺點,1個進程的所有協(xié)程都綁定在1個線程上寄锐,一是某個程序用不了硬件的多核加速能力兵多,二是一旦某協(xié)程阻塞,造成線程阻塞橄仆,本進程的其他協(xié)程都無法執(zhí)行了剩膘,根本就沒有并發(fā)的能力了。
1:1盆顾,1個協(xié)程綁定1個線程怠褐,這種最容易實現(xiàn)。協(xié)程的調(diào)度都由CPU完成了您宪,不存在N:1缺點奈懒,但有一個缺點是協(xié)程的創(chuàng)建、刪除和切換的代價都由CPU完成宪巨,有點略顯昂貴了磷杏。
M:N,M個協(xié)程綁定N個線程揖铜,是N:1和1:1類型的結(jié)合茴丰,克服了以上2種模型的缺點达皿,但實現(xiàn)起來最為復雜天吓。
2贿肩、Golang簡介
2.1 Goroutine 概念
因為線程切換需要很大的上下文,這種切換消耗了大量CPU時間龄寞,所以Go的并行單元并不是傳統(tǒng)意義上的線程汰规,而是采用更輕量的協(xié)程(goroutine)來處理,大大提高了并行度物邑,因此Go被稱為“最并行的語言”溜哮。
2.2與其他并發(fā)模型的對比
Python等解釋性語言采用的是多進程并發(fā)模型,進程的上下文是最大的色解,所以切換耗費巨大茂嗓,同時由于多進程通信只能用socket通訊,或者專門設置共享內(nèi)存科阎,給編程帶來了極大的困擾與不便述吸;
C++等語言通常會采用多線程并發(fā)模型,相比進程锣笨,線程的上下文要小很多蝌矛,而且多個線程之間本來就是共享內(nèi)存的,所以編程相比要輕松很多错英。但是線程的啟動和銷毀入撒,切換依然要耗費大量CPU時間;于是出現(xiàn)了線程池技術(shù)椭岩,將線程先儲存起來茅逮,保持一定的數(shù)量,來避免頻繁開啟/關閉線程的時間消耗判哥,但是這種初級的技術(shù)存在一些問題氮唯,比如有線程一直被IO阻塞,這樣的話這個線程一直占據(jù)著坑位姨伟,導致后面的任務排不到隊惩琉,拿不到線程來執(zhí)行;
Go的并發(fā)較為復雜夺荒,Go采用了更輕量的數(shù)據(jù)結(jié)構(gòu)來代替線程瞒渠,這種數(shù)據(jù)結(jié)構(gòu)相比線程更輕量,他有自己的棧技扼,切換起來更快伍玖。然而真正執(zhí)行并發(fā)的還是線程,Go通過調(diào)度器將goroutine調(diào)度到線程中執(zhí)行剿吻,并適時地釋放和創(chuàng)建新的線程窍箍,并且當一個正在運行的goroutine進入阻塞(常見場景就是等待IO)時,將其脫離占用的線程,將其他準備好運行的goroutine放在該線程上執(zhí)行椰棘。通過較為復雜的調(diào)度手段纺棺,使得整個系統(tǒng)獲得極高的并行度同時又不耗費大量的CPU資源。
2.3 Goroutine的特點
非阻塞邪狞。Goroutine的引入是為了方便高并發(fā)程序的編寫祷蝌。一個Goroutine在進行阻塞操作(比如系統(tǒng)調(diào)用)時,會把當前線程中的其他Goroutine移交到其他線程中繼續(xù)執(zhí)行帆卓,從而避免了整個程序的阻塞巨朦。
調(diào)度器。雖然Golang引入了垃圾回收(gc)剑令,在執(zhí)行g(shù)c時就要求Goroutine是停止的糊啡,但Go通過自己實現(xiàn)調(diào)度器,也可以方便的實現(xiàn)該功能吁津。 通過多個Goroutine來實現(xiàn)并發(fā)程序悔橄,既有異步IO的優(yōu)勢,又具有多線程腺毫、多進程編寫程序的便利性癣疟。
自己維護堆棧。當然引入Goroutine潮酒,也意味著引入了極大的復雜性睛挚。一個Goroutine既要包含要執(zhí)行的代碼,又要包含用于執(zhí)行該代碼的棧急黎、PC(PC值=當前程序執(zhí)行位置+8)和SP指針扎狱。堆棧指針需要保證各種模式下程序完成性。
既然每個Goroutine都有自己的棧勃教,那么在創(chuàng)建Goroutine時淤击,就要同時創(chuàng)建對應的棧。Goroutine在執(zhí)行時故源,椢厶В空間會不停增長。棧通常是連續(xù)增長的绳军,由于每個進程中的各個線程共享虛擬內(nèi)存空間印机,當有多個線程時,就需要為每個線程分配不同起始地址的棧门驾。這就需要在分配棧之前先預估每個線程棧的大小射赛。如果線程數(shù)量非常多,就很容易棧溢出奶是。
為了解決這個問題楣责,就有了Split Stacks 技術(shù):創(chuàng)建棧時竣灌,只分配一塊比較小的內(nèi)存,如果進行某次函數(shù)調(diào)用導致椄阳铮空間不足時初嘹,就會在其他地方分配一塊新的棧空間蛔屹。新的空間不需要和老的椣鞣空間連續(xù)豁生。函數(shù)調(diào)用的參數(shù)會拷貝到新的椡枚荆空間中,接下來的函數(shù)執(zhí)行都在新椀橄洌空間中進行育叁。Golang的棧管理方式與此類似,但是為了更高的效率芍殖,使用了連續(xù)棧( Golang連續(xù)棧) 實現(xiàn)方式也是先分配一塊固定大小的棧豪嗽,在棧空間不足時豌骏,分配一塊更大的棧龟梦,并把舊的棧全部拷貝到新棧中。這樣避免了Split Stacks方法可能導致的頻繁內(nèi)存分配和釋放窃躲。
Goroutine的執(zhí)行是可以被搶占的计贰。如果一個Goroutine一直占用CPU,長時間沒有被調(diào)度過蒂窒,就會被runtime搶占掉躁倒,把CPU時間交給其他Goroutine。 這個可以通過 debug/goroutine 阻塞實現(xiàn)洒琢。
2.4 結(jié)構(gòu)體
M:指go中的工作者線程秧秉,是真正執(zhí)行代碼的單元;
P:是一種調(diào)度goroutine的上下文衰抑,goroutine依賴于P進行調(diào)度象迎,P是真正的并行單元;
G:即goroutine呛踊,是go語言中的一段代碼(以一個函數(shù)的形式展現(xiàn))挖帘,最小的并行單元;
P必須綁定在M上才能運行恋技,M必須綁定了P才能運行拇舀,而一般情況下,最多有MAXPROCS(通常等于CPU數(shù)量)個P蜻底,但是可能有很多個M骄崩,真正運行的只有綁定了M的P聘鳞,所以P是真正的并行單元。
每個P有一個自己的runnableG隊列要拂,可以從里面拿出一個G來運行抠璃,同時也有一個全局的runnable G隊列,G通過P依附在M上面執(zhí)行脱惰。不單獨使用全局的runnable G隊列的原因是搏嗡,分布式的隊列有利于減小臨界區(qū)大小,想一想多個線程同時請求可用的G的時候拉一,如果只有全局的資源采盒,那么這個全局的鎖會導致多少線程一直在等待。
但是如果一個正在執(zhí)行的G進入了阻塞蔚润,典型的例子就是等待IO磅氨,那么他和它所在的M會在那邊等待,而上下文P會傳遞到其他可用的M上面嫡纠,這樣這個阻塞就不會影響程序的并行度烦租。
G結(jié)構(gòu)體
typegstruct{// Stack parameters.// stack describes the actual stack memory: [stack.lo, stack.hi).// stackguard0 is the stack pointer compared in the Go stack growth prologue.// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.// stackguard1 is the stack pointer compared in the C stack growth prologue.// It is stack.lo+StackGuard on g0 and gsignal stacks.// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).stack? ? ? stack// offset known to runtime/cgo //描述了真實的棧內(nèi)存,包括上下界、stackguard0uintptr// offset known to liblinkstackguard1uintptr// offset known to liblink_panic? ? ? ? *_panic// innermost panic - offset known to liblink_defer? ? ? ? *_defer// innermost deferm? ? ? ? ? ? ? *m// current m; offset known to arm liblink? //當前的Msched? ? ? ? ? gobuf//goroutine切換時,用于保存g的上下文syscallspuintptr// if status==Gsyscall, syscallsp = sched.sp to use during gcsyscallpcuintptr// if status==Gsyscall, syscallpc = sched.pc to use during gcstktopspuintptr// expected sp at top of stack, to check in tracebackparam? ? ? ? ? unsafe.Pointer// passed parameter on wakeup 用于傳遞參數(shù),睡眠時 其他goroutine可以設置param,喚醒時該goroutine可以獲取atomicstatusuint32stackLockuint32// sigprof/scang lock;TODO:fold in to atomicstatusgoidint64//goroutine 的IDwaitsinceint64// approx time when the g become blocked? g被阻塞的 大概時間waitreasonstring// if status==Gwaitingschedlink? ? ? guintptr? preemptbool// preemption signal, duplicates stackguard0 = stackpreemptpaniconfaultbool// panic (instead of crash) on unexpected fault addresspreemptscanbool// preempted g does scan for gcgcscandonebool// g has scanned stack; protected by _Gscan bit in statusgcscanvalidbool// false at start of gc cycle, true if G has not run since last scan;TODO:remove?throwsplitbool// must not split stackraceignoreint8// ignore race detection eventssysblocktracedbool// StartTrace has emitted EvGoInSyscall about this goroutinesysexitticksint64// cputicks when syscall has returned (for tracing)tracesequint64// trace event sequencertracelastp? ? puintptr// last P emitted an event for this goroutinelockedm? ? ? ? muintptr//G被鎖定只能在這個M運行siguint32writebuf? ? ? []bytesigcode0uintptrsigcode1uintptrsigpcuintptrgopcuintptr// pc of go statement that created this goroutinestartpcuintptr// pc of goroutine functionracectxuintptrwaiting? ? ? ? *sudog// sudog structures this g is waiting on (that have a valid elem ptr); in lock ordercgoCtxt? ? ? ? []uintptr// cgo traceback contextlabels? ? ? ? unsafe.Pointer// profiler labelstimer? ? ? ? ? *timer// cached timer for time.SleepselectDoneuint32// are we participating in a select and did someone win the race?// Per-G GC state// gcAssistBytes is this G's GC assist credit in terms of// bytes allocated. If this is positive, then the G has credit// to allocate gcAssistBytes bytes without assisting. If this// is negative, then the G must correct this by performing// scan work. We track this in bytes to make it fast to update// and check for debt in the malloc hot path. The assist ratio// determines how this corresponds to scan work debt.gcAssistBytesint64}
Gobuf結(jié)構(gòu)體
typegobuf struct {spuintptrpcuintptrgguintptrctxtunsafe.Pointerretsys.Uintreglruintptrbpuintptr // for GOEXPERIMENT=framepointer}
其中最主要的當然是sched了除盏,保存了goroutine的上下文叉橱。goroutine切換的時候不同于線程有OS來負責這部分數(shù)據(jù),而是由一個gobuf對象來保存者蠕,這樣能夠更加輕量級窃祝,再來看看gobuf的結(jié)構(gòu)
M結(jié)構(gòu)體
typem struct {g0*g? ? // 帶有調(diào)度棧的goroutinegsignal*g? ? ? ? // 處理信號的goroutinetls[6]uintptr // thread-local storagemstartfnfunc()curg*g? ? ? // 當前運行的goroutinecaughtsigguintptrppuintptr // 關聯(lián)p和執(zhí)行的go代碼nextppuintptridint32mallocingint32 // 狀態(tài)spinningbool // m是否out of workblockedbool // m是否被阻塞inwbbool // m是否在執(zhí)行寫屏蔽printlockint8incgobool // m在執(zhí)行cgo嗎fastranduint32ncgocalluint64? ? ? // cgo調(diào)用的總數(shù)ncgoint32? ? ? // 當前cgo調(diào)用的數(shù)目parknotealllink*m // 用于鏈接allmschedlinkmuintptrmcache*mcache // 當前m的內(nèi)存緩存lockedg*g // 鎖定g在當前m上執(zhí)行,而不會切換到其他mcreatestack[32]uintptr // thread創(chuàng)建的棧}
結(jié)構(gòu)體M中有兩個G是需要關注一下的:
一個是curg蠢棱,代表結(jié)構(gòu)體M當前綁定的結(jié)構(gòu)體G锌杀。
另一個是g0,是帶有調(diào)度棧的goroutine泻仙,這是一個比較特殊的goroutine糕再。普通的goroutine的棧是在堆上分配的可增長的棧,而g0的棧是M對應的線程的棧玉转。所有調(diào)度相關的代碼突想,會先切換到該goroutine的棧中再執(zhí)行。也就是說線程的棧也是用的g實現(xiàn)究抓,而不是使用的OS的猾担。
P結(jié)構(gòu)體
typep struct {lockmutexidint32statusuint32 // 狀態(tài),可以為pidle/prunning/...linkpuintptrschedtickuint32? ? // 每調(diào)度一次加1syscalltickuint32? ? // 每一次系統(tǒng)調(diào)用加1sysmonticksysmontickmmuintptr? // 回鏈到關聯(lián)的mmcache*mcacheracectxuintptrgoidcacheuint64 // goroutine的ID的緩存goidcacheenduint64//可運行的goroutine的隊列runqheaduint32runqtailuint32runq[256]guintptrrunnextguintptr // 下一個運行的gsudogcache[]*sudogsudogbuf[128]*sudogpallocpersistentAlloc // per-P to avoid mutexpad[sys.CacheLineSize]byte}
其中P的狀態(tài)有Pidle, Prunning, Psyscall, Pgcstop, Pdead刺下;在其內(nèi)部隊列runqhead里面有可運行的goroutine绑嘹,P優(yōu)先從內(nèi)部獲取執(zhí)行的g,這樣能夠提高效率橘茉。
Schedt結(jié)構(gòu)體
typeschedtstruct{? goidgenuint64lastpolluint64lock mutex? ? midle? ? ? ? muintptr// idle狀態(tài)的mnmidleint32// idle狀態(tài)的m個數(shù)nmidlelockedint32// lockde狀態(tài)的m個數(shù)mcountint32// 創(chuàng)建的m的總數(shù)maxmcountint32// m允許的最大個數(shù)ngsysuint32// 系統(tǒng)中g(shù)oroutine的數(shù)目谐鼎,會自動更新pidle? ? ? puintptr// idle的pnpidleuint32nmspinninguint32// 全局的可運行的g隊列runqhead guintptr? ? runqtail guintptr? ? runqsizeint32// dead的G的全局緩存gflock? ? ? mutex? ? gfreeStack? *g? ? gfreeNoStack *g? ? ngfreeint32// sudog的緩存中心sudoglock? mutex? ? sudogcache *sudog}
大多數(shù)需要的信息都已放在了結(jié)構(gòu)體M、G和P中抗蠢,schedt結(jié)構(gòu)體只是一個殼”可以看到,其中有M的idle隊列,P的idle隊列,以及一個全局的就緒的G隊列歼争。schedt結(jié)構(gòu)體中的Lock是非常必須的,如果M或P等做一些非局部的操作渗勘,它們一般需要先鎖住調(diào)度器沐绒。
2.5具體函數(shù)
goroutine調(diào)度器的代碼在/src/runtime/proc.go中,一些比較關鍵的函數(shù)分析如下呀邢。
2.5.1 schedule函數(shù)
schedule函數(shù)在runtime需要進行調(diào)度時執(zhí)行洒沦,為當前的P尋找一個可以運行的G并執(zhí)行它豹绪,尋找順序如下:
1) 調(diào)用runqget函數(shù)來從P自己的runnable G隊列中得到一個可以執(zhí)行的G价淌;
2) 如果1)失敗,則調(diào)用findrunnable函數(shù)去尋找一個可以執(zhí)行的G瞒津;
3) 如果2)也沒有得到可以執(zhí)行的G蝉衣,那么結(jié)束調(diào)度,從上次的現(xiàn)場繼續(xù)執(zhí)行巷蚪。
4) 注意)//偶爾會先檢查一次全局可運行隊列病毡,以確保公平性。否則屁柏,兩個goroutine可以完全占用本地runqueue啦膜。 通過 schedtick計數(shù) %61來保證
代碼如下:
// One round of scheduler: find a runnable goroutine and execute it.// Never returns.funcschedule(){? _g_ := getg()if_g_.m.locks !=0{? ? ? throw("schedule: holding locks")? }if_g_.m.lockedg !=0{? ? ? stoplockedm()? ? ? execute(_g_.m.lockedg.ptr(),false)// Never returns.}// We should not schedule away from a g that is executing a cgo call,// since the cgo call is using the m's g0 stack.if_g_.m.incgo {? ? ? throw("schedule: in cgo")? } top:ifsched.gcwaiting !=0{? ? ? gcstopm()gototop? }if_g_.m.p.ptr().runSafePointFn !=0{? ? ? runSafePointFn()? }vargp *gvarinheritTimebooliftrace.enabled || trace.shutdown {? ? ? gp = traceReader()ifgp !=nil{? ? ? ? casgstatus(gp, _Gwaiting, _Grunnable)? ? ? ? traceGoUnpark(gp,0)? ? ? }? }ifgp ==nil&& gcBlackenEnabled !=0{? ? ? gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())? }ifgp ==nil{// Check the global runnable queue once in a while to ensure fairness.// Otherwise two goroutines can completely occupy the local runqueue// by constantly respawning each other.if_g_.m.p.ptr().schedtick%61==0&& sched.runqsize >0{? ? ? ? lock(&sched.lock)? ? ? ? gp = globrunqget(_g_.m.p.ptr(),1)? ? ? ? unlock(&sched.lock)? ? ? }? }ifgp ==nil{? ? ? gp, inheritTime = runqget(_g_.m.p.ptr())ifgp !=nil&& _g_.m.spinning {? ? ? ? throw("schedule: spinning with local work")? ? ? }? }ifgp ==nil{? ? ? gp, inheritTime = findrunnable()// blocks until work is available}// This thread is going to run a goroutine and is not spinning anymore,// so if it was marked as spinning we need to reset it now and potentially// start a new spinning M.if_g_.m.spinning {? ? ? resetspinning()? }ifgp.lockedm !=0{// Hands off own p to the locked m,// then blocks waiting for a new p.startlockedm(gp)gototop? }? ? execute(gp, inheritTime)}
2.5.2 findrunnable函數(shù)
findrunnable函數(shù)負責給一個P尋找可以執(zhí)行的G,它的尋找順序如下:
1) 調(diào)用runqget函數(shù)來從P自己的runnable G隊列中得到一個可以執(zhí)行的G淌喻;
2) 如果1)失敗僧家,調(diào)用globrunqget函數(shù)從全局runnableG隊列中得到一個可以執(zhí)行的G;
3) 如果2)失敗裸删,調(diào)用netpoll(非阻塞)函數(shù)取一個異步回調(diào)的G
4) 如果3)失敗八拱,嘗試從其他P那里偷取一半數(shù)量的G過來;
5) 如果4)失敗涯塔,再次調(diào)用globrunqget函數(shù)從全局runnableG隊列中得到一個可以執(zhí)行的G肌稻;
6) 如果5)失敗,調(diào)用netpoll(阻塞)函數(shù)取一個異步回調(diào)的G匕荸;
7) 如果6)仍然沒有取到G爹谭,那么調(diào)用stopm函數(shù)停止這個M。
代碼如下:
// Finds a runnable goroutine to execute.// Tries to steal from other P's, get g from global queue, poll network.funcfindrunnable()(gp *g, inheritTimebool){? _g_ := getg()// The conditions here and in handoffp must agree: if// findrunnable would return a G to run, handoffp must start// an M.top:? _p_ := _g_.m.p.ptr()ifsched.gcwaiting !=0{? ? ? gcstopm()gototop? }if_p_.runSafePointFn !=0{? ? ? runSafePointFn()? }iffingwait && fingwake {ifgp := wakefing(); gp !=nil{? ? ? ? ready(gp,0,true)? ? ? }? }if*cgo_yield !=nil{? ? ? asmcgocall(*cgo_yield,nil)? }// local runqifgp, inheritTime := runqget(_p_); gp !=nil{returngp, inheritTime? }// global runqifsched.runqsize !=0{? ? ? lock(&sched.lock)? ? ? gp := globrunqget(_p_,0)? ? ? unlock(&sched.lock)ifgp !=nil{returngp,false}? }// Poll network.// This netpoll is only an optimization before we resort to stealing.// We can safely skip it if there are no waiters or a thread is blocked// in netpoll already. If there is any kind of logical race with that// blocked thread (e.g. it has already returned from netpoll, but does// not set lastpoll yet), this thread will do blocking netpoll below// anyway.ifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Load64(&sched.lastpoll) !=0{ifgp := netpoll(false); gp !=nil{// non-blocking// netpoll returns list of goroutines linked by schedlink.injectglist(gp.schedlink.ptr())? ? ? ? casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {? ? ? ? ? ? traceGoUnpark(gp,0)? ? ? ? }returngp,false}? }// Steal work from other P's.procs :=uint32(gomaxprocs)ifatomic.Load(&sched.npidle) == procs-1{// Either GOMAXPROCS=1 or everybody, except for us, is idle already.// New work can appear from returning syscall/cgocall, network or timers.// Neither of that submits to local run queues, so no point in stealing.gotostop? }// If number of spinning M's >= number of busy P's, block.// This is necessary to prevent excessive CPU consumption// when GOMAXPROCS>>1 but the program parallelism is low.if!_g_.m.spinning &&2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {gotostop? }if!_g_.m.spinning {? ? ? _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)? }fori :=0; i <4; i++ {forenum := stealOrder.start(fastrand()); !enum.done(); enum.next() {ifsched.gcwaiting !=0{gototop? ? ? ? }? ? ? ? stealRunNextG := i >2// first look for ready queues with more than 1 gifgp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp !=nil{returngp,false}? ? ? }? } stop:// We have nothing to do. If we're in the GC mark phase, can// safely scan and blacken objects, and have work to do, run// idle-time marking rather than give up the P.ifgcBlackenEnabled !=0&& _p_.gcBgMarkWorker !=0&& gcMarkWorkAvailable(_p_) {? ? ? _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode? ? ? gp := _p_.gcBgMarkWorker.ptr()? ? ? casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {? ? ? ? traceGoUnpark(gp,0)? ? ? }returngp,false}// Before we drop our P, make a snapshot of the allp slice,// which can change underfoot once we no longer block// safe-points. We don't need to snapshot the contents because// everything up to cap(allp) is immutable.allpSnapshot := allp// return P and blocklock(&sched.lock)ifsched.gcwaiting !=0|| _p_.runSafePointFn !=0{? ? ? unlock(&sched.lock)gototop? }ifsched.runqsize !=0{? ? ? gp := globrunqget(_p_,0)? ? ? unlock(&sched.lock)returngp,false}ifreleasep() != _p_ {? ? ? throw("findrunnable: wrong p")? }? pidleput(_p_)? unlock(&sched.lock)// Delicate dance: thread transitions from spinning to non-spinning state,// potentially concurrently with submission of new goroutines. We must// drop nmspinning first and then check all per-P queues again (with// #StoreLoad memory barrier in between). If we do it the other way around,// another thread can submit a goroutine after we've checked all run queues// but before we drop nmspinning; as the result nobody will unpark a thread// to run the goroutine.// If we discover new work below, we need to restore m.spinning as a signal// for resetspinning to unpark a new worker thread (because there can be more// than one starving goroutine). However, if after discovering new work// we also observe no idle Ps, it is OK to just park the current thread:// the system is fully loaded so no spinning threads are required.// Also see "Worker thread parking/unparking" comment at the top of the file.wasSpinning := _g_.m.spinningif_g_.m.spinning {? ? ? _g_.m.spinning =falseifint32(atomic.Xadd(&sched.nmspinning,-1)) <0{? ? ? ? throw("findrunnable: negative nmspinning")? ? ? }? }// check all runqueues once againfor_, _p_ :=rangeallpSnapshot {if!runqempty(_p_) {? ? ? ? lock(&sched.lock)? ? ? ? _p_ = pidleget()? ? ? ? unlock(&sched.lock)if_p_ !=nil{? ? ? ? ? ? acquirep(_p_)ifwasSpinning {? ? ? ? ? ? ? _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)? ? ? ? ? ? }gototop? ? ? ? }break}? }// Check for idle-priority GC work again.ifgcBlackenEnabled !=0&& gcMarkWorkAvailable(nil) {? ? ? lock(&sched.lock)? ? ? _p_ = pidleget()if_p_ !=nil&& _p_.gcBgMarkWorker ==0{? ? ? ? pidleput(_p_)? ? ? ? _p_ =nil}? ? ? unlock(&sched.lock)if_p_ !=nil{? ? ? ? acquirep(_p_)ifwasSpinning {? ? ? ? ? ? _g_.m.spinning =trueatomic.Xadd(&sched.nmspinning,1)? ? ? ? }// Go back to idle GC check.gotostop? ? ? }? }// poll networkifnetpollinited() && atomic.Load(&netpollWaiters) >0&& atomic.Xchg64(&sched.lastpoll,0) !=0{if_g_.m.p !=0{? ? ? ? throw("findrunnable: netpoll with p")? ? ? }if_g_.m.spinning {? ? ? ? throw("findrunnable: netpoll with spinning")? ? ? }? ? ? gp := netpoll(true)// block until new work is availableatomic.Store64(&sched.lastpoll,uint64(nanotime()))ifgp !=nil{? ? ? ? lock(&sched.lock)? ? ? ? _p_ = pidleget()? ? ? ? unlock(&sched.lock)if_p_ !=nil{? ? ? ? ? ? acquirep(_p_)? ? ? ? ? ? injectglist(gp.schedlink.ptr())? ? ? ? ? ? casgstatus(gp, _Gwaiting, _Grunnable)iftrace.enabled {? ? ? ? ? ? ? traceGoUnpark(gp,0)? ? ? ? ? ? }returngp,false}? ? ? ? injectglist(gp)? ? ? }? }? stopm()gototop}
2.5.3 newproc函數(shù)
newproc函數(shù)負責創(chuàng)建一個可以運行的G并將其放在當前的P的runnable G隊列中榛搔,它是類似”go func() { … }”語句真正被編譯器翻譯后的調(diào)用诺凡,核心代碼在newproc1函數(shù)齿风。這個函數(shù)執(zhí)行順序如下:
1) 獲得當前的G所在的 P,然后從free G隊列中取出一個G绑洛;
2) 如果1)取到則對這個G進行參數(shù)配置救斑,否則新建一個G;
3) 將G加入P的runnable G隊列真屯。
代碼如下:
// Go1.10.8版本默認stack大小為2KB_StackMin =2048// 創(chuàng)建一個g對象,然后放到g隊列// 等待被執(zhí)行// Create a new g running fn with narg bytes of arguments starting// at argp. callerpc is the address of the go statement that created// this. The new g is put on the queue of g's waiting to run.funcnewproc1(fn *funcval, argp *uint8, nargint32, callerpcuintptr){? _g_ := getg()iffn ==nil{? ? ? _g_.m.throwing =-1// do not dump full stacksthrow("go of nil func value")? }? _g_.m.locks++// disable preemption because it can be holding p in a local varsiz := narg? siz = (siz +7) &^7// We could allocate a larger initial stack if necessary.// Not worth it: this is almost always an error.// 4*sizeof(uintreg): extra space added below// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).ifsiz >= _StackMin-4*sys.RegSize-sys.RegSize {? ? ? throw("newproc: function arguments too large for new goroutine")? }? ? _p_ := _g_.m.p.ptr()? newg := gfget(_p_)ifnewg ==nil{? ? ? newg = malg(_StackMin)? ? ? casgstatus(newg, _Gidle, _Gdead)? ? ? allgadd(newg)// publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.}ifnewg.stack.hi ==0{? ? ? throw("newproc1: newg missing stack")? }ifreadgstatus(newg) != _Gdead {? ? ? throw("newproc1: new g is not Gdead")? }? ? totalSize :=4*sys.RegSize +uintptr(siz) + sys.MinFrameSize// extra space in case of reads slightly beyond frametotalSize += -totalSize & (sys.SpAlign -1)// align to spAlignsp := newg.stack.hi - totalSize? spArg := spifusesLR {// caller's LR*(*uintptr)(unsafe.Pointer(sp)) =0prepGoExitFrame(sp)? ? ? spArg += sys.MinFrameSize? }ifnarg >0{? ? ? memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp),uintptr(narg))// This is a stack-to-stack copy. If write barriers// are enabled and the source stack is grey (the// destination is always black), then perform a// barrier copy. We do this *after* the memmove// because the destination stack may have garbage on// it.ifwriteBarrier.needed && !_g_.m.curg.gcscandone {? ? ? ? f := findfunc(fn.fn)? ? ? ? stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))// We're in the prologue, so it's always stack map index 0.bv := stackmapdata(stkmap,0)? ? ? ? bulkBarrierBitmap(spArg, spArg,uintptr(narg),0, bv.bytedata)? ? ? }? }? ? memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))? newg.sched.sp = sp? newg.stktopsp = sp? newg.sched.pc = funcPC(goexit) + sys.PCQuantum// +PCQuantum so that previous instruction is in same functionnewg.sched.g = guintptr(unsafe.Pointer(newg))? gostartcallfn(&newg.sched, fn)? newg.gopc = callerpc? newg.startpc = fn.fnif_g_.m.curg !=nil{? ? ? newg.labels = _g_.m.curg.labels? }ifisSystemGoroutine(newg) {? ? ? atomic.Xadd(&sched.ngsys, +1)? }? newg.gcscanvalid =falsecasgstatus(newg, _Gdead, _Grunnable)if_p_.goidcache == _p_.goidcacheend {// Sched.goidgen is the last allocated id,// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].// At startup sched.goidgen=0, so main goroutine receives goid=1._p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)? ? ? _p_.goidcache -= _GoidCacheBatch -1_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch? }? newg.goid =int64(_p_.goidcache)? _p_.goidcache++ifraceenabled {? ? ? newg.racectx = racegostart(callerpc)? }iftrace.enabled {? ? ? traceGoCreate(newg, newg.startpc)? }? runqput(_p_, newg,true)ifatomic.Load(&sched.npidle) !=0&& atomic.Load(&sched.nmspinning) ==0&& mainStarted {? ? ? wakep()? }? _g_.m.locks--if_g_.m.locks ==0&& _g_.preempt {// restore the preemption request in case we've cleared it in newstack_g_.stackguard0 = stackPreempt? }}
2.5.4 goexit0函數(shù)
goexit函數(shù)是當G退出時調(diào)用的脸候。這個函數(shù)對G進行一些設置后,將它放入free G列表中绑蔫,供以后復用运沦,之后調(diào)用schedule函數(shù)調(diào)度。
// goexit continuation on g0.funcgoexit0(gp *g){? _g_ := getg()//設置g的 status從 _Grunning變?yōu)?_Gdeadcasgstatus(gp, _Grunning, _Gdead)ifisSystemGoroutine(gp) {? ? ? atomic.Xadd(&sched.ngsys,-1)? }//對該g 進行釋放設置 基本為nil /0gp.m =nillocked := gp.lockedm !=0gp.lockedm =0_g_.m.lockedg =0gp.paniconfault =falsegp._defer =nil// should be true already but just in case.gp._panic =nil// non-nil for Goexit during panic. points at stack-allocated data.gp.writebuf =nilgp.waitreason =""gp.param =nilgp.labels =nilgp.timer =nilifgcBlackenEnabled !=0&& gp.gcAssistBytes >0{// Flush assist credit to the global pool. This gives// better information to pacing if the application is// rapidly creating an exiting goroutines.scanCredit :=int64(gcController.assistWorkPerByte *float64(gp.gcAssistBytes))? ? ? atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)? ? ? gp.gcAssistBytes =0}// Note that gp's stack scan is now "valid" because it has no// stack.gp.gcscanvalid =truedropg()if_g_.m.lockedInt !=0{print("invalid m->lockedInt = ", _g_.m.lockedInt,"\n")? ? ? throw("internal lockOSThread error")? }? _g_.m.lockedExt =0//把這個g 推到free G 列表gfput(_g_.m.p.ptr(), gp)iflocked {// The goroutine may have locked this thread because// it put it in an unusual kernel state. Kill it// rather than returning it to the thread pool.// Return to mstart, which will release the P and exit// the thread.ifGOOS !="plan9"{// See golang.org/issue/22227.gogo(&_g_.m.g0.sched)? ? ? }? }? schedule()}
2.5.5 handoffp函數(shù)
handoffp函數(shù)將P從系統(tǒng)調(diào)用或阻塞的M中傳遞出去配深,如果P還有runnable G隊列携添,那么新開一個M,調(diào)用startm函數(shù)篓叶,新開的M不空旋烈掠。
// Hands off P from syscall or locked M.// Always runs without a P, so write barriers are not allowed.//go:nowritebarrierrecfunchandoffp(_p_ *p){// handoffp must start an M in any situation where// findrunnable would return a G to run on _p_.//如果這個P的隊列不為空或調(diào)度內(nèi)的size不為空 那么 進行startm 且不空旋if!runqempty(_p_) || sched.runqsize !=0{? ? ? startm(_p_,false)return}//如果正在進行GC處理? 同上ifgcBlackenEnabled !=0&& gcMarkWorkAvailable(_p_) {? ? ? startm(_p_,false)return}//如果沒活可做了,檢查下有沒有 空閑/自旋的 M//否則 不需要我們做自旋ifatomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) ==0&& atomic.Cas(&sched.nmspinning,0,1) {//TODO:fast atomicstartm(_p_,true)return}//調(diào)度上鎖? 將這個P 摘除走lock(&sched.lock)ifsched.gcwaiting !=0{? ? ? _p_.status = _Pgcstop? ? ? sched.stopwait--ifsched.stopwait ==0{? ? ? ? notewakeup(&sched.stopnote)? ? ? }? ? ? unlock(&sched.lock)return}if_p_.runSafePointFn !=0&& atomic.Cas(&_p_.runSafePointFn,1,0) {? ? ? sched.safePointFn(_p_)? ? ? sched.safePointWait--ifsched.safePointWait ==0{? ? ? ? notewakeup(&sched.safePointNote)? ? ? }? }ifsched.runqsize !=0{? ? ? unlock(&sched.lock)? ? ? startm(_p_,false)return}// If this is the last running P and nobody is polling network,// need to wakeup another M to poll network.ifsched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) !=0{? ? ? unlock(&sched.lock)? ? ? startm(_p_,false)return}? pidleput(_p_)? unlock(&sched.lock)}
2.5.6 startm函數(shù)
startm函數(shù)調(diào)度一個M或者必要時創(chuàng)建一個M來運行指定的P缸托。
// Schedules some M to run the p (creates an M if necessary).// If p==nil, tries to get an idle P, if no idle P's does nothing.// May run with m.p==nil, so write barriers are not allowed.// If spinning is set, the caller has incremented nmspinning and startm will// either decrement nmspinning or set m.spinning in the newly started M.//go:nowritebarrierrecfuncstartm(_p_ *p, spinning bool){//加鎖lock(&sched.lock)if_p_ ==nil{? ? ? ? ? ? _p_ = pidleget()if_p_ ==nil{? ? ? ? unlock(&sched.lock)ifspinning {// The caller incremented nmspinning, but there are no idle Ps,// so it's okay to just undo the increment and give up.ifint32(atomic.Xadd(&sched.nmspinning, -1)) <0{throw("startm: negative nmspinning")? ? ? ? ? ? }? ? ? ? }return}? }? ? ? mp := mget()? unlock(&sched.lock)ifmp ==nil{varfnfunc()ifspinning {// The caller incremented nmspinning, so set m.spinning in the new M.fn = mspinning? ? ? }? ? ? newm(fn, _p_)return}ifmp.spinning {throw("startm: m is spinning")? }ifmp.nextp !=0{throw("startm: m has p")? }ifspinning && !runqempty(_p_) {throw("startm: p has runnable gs")? }// The caller incremented nmspinning, so set m.spinning in the new M.mp.spinning = spinning? mp.nextp.set(_p_)? notewakeup(&mp.park)}
2.5.7 sysmon函數(shù)
sysmon函數(shù)是Go runtime啟動時創(chuàng)建的左敌,負責監(jiān)控所有g(shù)oroutine的狀態(tài),判斷是否需要GC俐镐,進行netpoll等操作矫限。sysmon函數(shù)中會調(diào)用retake函數(shù)進行搶占式調(diào)度。
// Always runs without a P, so write barriers are not allowed.////go:nowritebarrierrecfuncsysmon(){? lock(&sched.lock)? sched.nmsys++? checkdead()? unlock(&sched.lock)// If a heap span goes unused for 5 minutes after a garbage collection,// we hand it back to the operating system.scavengelimit :=int64(5*60*1e9)ifdebug.scavenge >0{// Scavenge-a-lot for testing.forcegcperiod =10*1e6scavengelimit =20*1e6}? ? lastscavenge := nanotime()? nscavenge :=0lasttrace :=int64(0)? idle :=0// how many cycles in succession we had not wokeup somebodydelay :=uint32(0)for{ifidle ==0{// start with 20us sleep...delay =20}elseifidle >50{// start doubling the sleep after 1ms...delay *=2}ifdelay >10*1000{// up to 10msdelay =10*1000}? ? ? usleep(delay)ifdebug.schedtrace <=0&& (sched.gcwaiting !=0|| atomic.Load(&sched.npidle) ==uint32(gomaxprocs)) {? ? ? ? lock(&sched.lock)ifatomic.Load(&sched.gcwaiting) !=0|| atomic.Load(&sched.npidle) ==uint32(gomaxprocs) {? ? ? ? ? ? atomic.Store(&sched.sysmonwait,1)? ? ? ? ? ? unlock(&sched.lock)// Make wake-up period small enough// for the sampling to be correct.maxsleep := forcegcperiod /2ifscavengelimit < forcegcperiod {? ? ? ? ? ? ? maxsleep = scavengelimit /2}? ? ? ? ? ? shouldRelax :=trueifosRelaxMinNS >0{? ? ? ? ? ? ? next := timeSleepUntil()? ? ? ? ? ? ? now := nanotime()ifnext-now < osRelaxMinNS {? ? ? ? ? ? ? ? ? shouldRelax =false}? ? ? ? ? ? }ifshouldRelax {? ? ? ? ? ? ? osRelax(true)? ? ? ? ? ? }? ? ? ? ? ? notetsleep(&sched.sysmonnote, maxsleep)ifshouldRelax {? ? ? ? ? ? ? osRelax(false)? ? ? ? ? ? }? ? ? ? ? ? lock(&sched.lock)? ? ? ? ? ? atomic.Store(&sched.sysmonwait,0)? ? ? ? ? ? noteclear(&sched.sysmonnote)? ? ? ? ? ? idle =0delay =20}? ? ? ? unlock(&sched.lock)? ? ? }// trigger libc interceptors if neededif*cgo_yield !=nil{? ? ? ? asmcgocall(*cgo_yield,nil)? ? ? }// poll network if not polled for more than 10mslastpoll :=int64(atomic.Load64(&sched.lastpoll))? ? ? now := nanotime()ifnetpollinited() && lastpoll !=0&& lastpoll+10*1000*1000< now {? ? ? ? atomic.Cas64(&sched.lastpoll,uint64(lastpoll),uint64(now))? ? ? ? gp := netpoll(false)// non-blocking - returns list of goroutinesifgp !=nil{// Need to decrement number of idle locked M's// (pretending that one more is running) before injectglist.// Otherwise it can lead to the following situation:// injectglist grabs all P's but before it starts M's to run the P's,// another M returns from syscall, finishes running its G,// observes that there is no work to do and no other running M's// and reports deadlock.incidlelocked(-1)? ? ? ? ? ? injectglist(gp)? ? ? ? ? ? incidlelocked(1)? ? ? ? }? ? ? }// retake P's blocked in syscalls// and preempt long running G'sifretake(now) !=0{? ? ? ? idle =0}else{? ? ? ? idle++? ? ? }// check if we need to force a GCift := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) !=0{? ? ? ? lock(&forcegc.lock)? ? ? ? forcegc.idle =0forcegc.g.schedlink =0injectglist(forcegc.g)? ? ? ? unlock(&forcegc.lock)? ? ? }// scavenge heap once in a whileiflastscavenge+scavengelimit/2< now {? ? ? ? mheap_.scavenge(int32(nscavenge),uint64(now),uint64(scavengelimit))? ? ? ? lastscavenge = now? ? ? ? nscavenge++? ? ? }ifdebug.schedtrace >0&& lasttrace+int64(debug.schedtrace)*1000000<= now {? ? ? ? lasttrace = now? ? ? ? schedtrace(debug.scheddetail >0)? ? ? }? }}
2.5.8 retake函數(shù)
枚舉所有的P 如果P在系統(tǒng)調(diào)用中(_Psyscall), 且經(jīng)過了一次sysmon循環(huán)(20us~10ms), 則搶占這個P佩抹, 調(diào)用handoffp解除M和P之間的關聯(lián)叼风, 如果P在運行中(_Prunning), 且經(jīng)過了一次sysmon循環(huán)并且G運行時間超過forcePreemptNS(10ms), 則搶占這個P
并設置g.preempt = true,g.stackguard0 = stackPreempt棍苹。
為什么設置了stackguard就可以實現(xiàn)搶占?
因為這個值用于檢查當前椢匏蓿空間是否足夠, go函數(shù)的開頭會比對這個值判斷是否需要擴張棧。
newstack函數(shù)判斷g.stackguard0等于stackPreempt, 就知道這是搶占觸發(fā)的, 這時會再檢查一遍是否要搶占廊勃。
搶占機制保證了不會有一個G長時間的運行導致其他G無法運行的情況發(fā)生懈贺。
funcretake(nowint64)uint32{? n :=0// Prevent allp slice changes. This lock will be completely// uncontended unless we're already stopping the world.lock(&allpLock)// We can't use a range loop over allp because we may// temporarily drop the allpLock. Hence, we need to re-fetch// allp each time around the loop.fori :=0; i 0&& pd.syscallwhen+10*1000*1000> now {continue}// Drop allpLock so we can take sched.lock.unlock(&allpLock)// Need to decrement number of idle locked M's// (pretending that one more is running) before the CAS.// Otherwise the M from which we retake can exit the syscall,// increment nmidle and report deadlock.incidlelocked(-1)ifatomic.Cas(&_p_.status, s, _Pidle) {iftrace.enabled {? ? ? ? ? ? ? traceGoSysBlock(_p_)? ? ? ? ? ? ? traceProcStop(_p_)? ? ? ? ? ? }? ? ? ? ? ? n++? ? ? ? ? ? _p_.syscalltick++? ? ? ? ? ? handoffp(_p_)? ? ? ? }? ? ? ? incidlelocked(1)? ? ? ? lock(&allpLock)? ? ? }elseifs == _Prunning {// Preempt G if it's running for too long.t :=int64(_p_.schedtick)ifint64(pd.schedtick) != t {? ? ? ? ? ? pd.schedtick =uint32(t)? ? ? ? ? ? pd.schedwhen = nowcontinue}ifpd.schedwhen+forcePreemptNS > now {continue}? ? ? ? preemptone(_p_)? ? ? }? }? unlock(&allpLock)returnuint32(n)}
3、調(diào)度器總結(jié)
3.1 調(diào)度器的兩大思想
復用線程:協(xié)程本身就是運行在一組線程之上坡垫,不需要頻繁的創(chuàng)建梭灿、銷毀線程,而是對線程的復用冰悠。在調(diào)度器中復用線程還有2個體現(xiàn):1)work stealing堡妒,當本線程無可運行的G時,嘗試從其他線程綁定的P偷取G溉卓,而不是銷毀線程皮迟。2)handoff搬泥,當本線程因為G進行系統(tǒng)調(diào)用阻塞時,線程釋放綁定的P伏尼,把P轉(zhuǎn)移給其他空閑的線程執(zhí)行忿檩。
利用并行:GOMAXPROCS設置P的數(shù)量,當GOMAXPROCS大于1時爆阶,就最多有GOMAXPROCS個線程處于運行狀態(tài)燥透,這些線程可能分布在多個CPU核上同時運行,使得并發(fā)利用并行辨图。另外班套,GOMAXPROCS也限制了并發(fā)的程度,比如GOMAXPROCS = 核數(shù)/2故河,則最多利用了一半的CPU核進行并行吱韭。
3.2調(diào)度器的兩小策略:
搶占:在coroutine中要等待一個協(xié)程主動讓出CPU才執(zhí)行下一個協(xié)程,在Go中鱼的,一個goroutine最多占用CPU 10ms理盆,防止其他goroutine被餓死,這就是goroutine不同于coroutine的一個地方鸳吸。
全局G隊列:在新的調(diào)度器中依然有全局G隊列熏挎,但功能已經(jīng)被弱化了速勇,當M執(zhí)行work stealing從其他P偷不到G時晌砾,它可以從全局G隊列獲取G。
4烦磁、參考資料
Golang代碼倉庫:https://github.com/golang/go
《ScalableGo Schedule》:https://docs.google.com/docum...
《GoPreemptive Scheduler》:https://docs.google.com/docum...
網(wǎng)上文章:
https://studygolang.com/artic...
https://studygolang.com/artic...
https://studygolang.com/artic...
https://studygolang.com/artic...
https://studygolang.com/artic...?調(diào)度實例分析?
https://www.cnblogs.com/sunsk...?搶占式
https://blog.csdn.net/u010853...?schedule 剖析理解 分析的很到位--建議大家認真閱讀幾遍-因為圖形很形象养匈。