在Go1.14版本開始實(shí)現(xiàn)了 基于信號(hào)的協(xié)程搶占調(diào)度
模式儒鹿,在此版本以前執(zhí)行以下代碼是永遠(yuǎn)也無法執(zhí)行完成。
package main
import (
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()
time.Sleep(time.Millisecond)
println("OK")
}
原因很簡(jiǎn)單:在main函數(shù)里只有一個(gè)CPU凳忙,從上到下執(zhí)行到 time.Sleep()
函數(shù)的時(shí)候业踏,會(huì)將 main goroutine
放出入運(yùn)行隊(duì)列,讓了P,開始執(zhí)行匿名函數(shù)涧卵,但匿名函數(shù)是一個(gè)for循環(huán)勤家,沒有任何IO語句,也就是無法引起調(diào)度柳恐,所以當(dāng)前僅有的一個(gè)P永遠(yuǎn)被其占用伐脖,導(dǎo)致無法打印OK。
這個(gè)問題在1.14版本開始有所改變乐设,主要是因?yàn)橐肓?code>基于信號(hào)的搶占模式讼庇。在程序啟動(dòng)時(shí),初始化信號(hào)近尚,并在 runtime.sighandler
函數(shù)注冊(cè)了 SIGURG
信號(hào)的處理函數(shù) runtime.doSigPreempt
蠕啄,然后在觸發(fā)垃圾回收的棧掃描時(shí),調(diào)用函數(shù)掛起goroutine,并向M發(fā)送信號(hào)歼跟,M收到信號(hào)后和媳,會(huì)讓當(dāng)前goroutine陷入休眠繼續(xù)執(zhí)行其他的goroutine。
本篇從發(fā)送與接收信息并處理兩方面來看一下它是如何實(shí)現(xiàn)的哈街。
發(fā)送信號(hào)
在上篇文章(認(rèn)識(shí)sysmon監(jiān)控線程)介紹 sysmon 的時(shí)候留瞳,我們知道監(jiān)控線程會(huì)在無P的情況下一直運(yùn)行,定期掃描所有的P叹卷,將長(zhǎng)時(shí)間運(yùn)行的G 進(jìn)行解除撼港。
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
......
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
......
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
}
......
}
通過 retake()
函數(shù)對(duì)所有 P
進(jìn)行檢查。
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
// 如果 P 運(yùn)行得太久, 則搶占 G
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 如果超過了10ms就需要進(jìn)行搶占了
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
......
}
}
如果一個(gè) P
的 pd.schedwhen+forcePreemptNS <= now
骤竹,則說明P上的G運(yùn)行的時(shí)間太長(zhǎng)帝牡,則需要通過函數(shù) preemptone() 進(jìn)行搶占。
// src/runtime/proc.go
// Tell the goroutine running on processor P to stop.
// This function is purely best-effort. It can incorrectly fail to inform the
// goroutine. It can send inform the wrong goroutine. Even if it informs the
// correct goroutine, that goroutine might ignore the request if it is
// simultaneously executing newstack.
// No lock needs to be held.
// Returns true if preemption request was issued.
// The actual preemption will happen at some point in the future
// and will be indicated by the gp->status no longer being
// Grunning
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
// 被搶占的 goroutine
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 設(shè)置g的搶占標(biāo)識(shí)
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
// 設(shè)置棧搶占 stackPreempt蒙揣,這是一個(gè)很大的值比任何棧都大,
// 在 goroutine 內(nèi)部的每次調(diào)用都會(huì)比較棧頂指針和 g.stackguard0靶溜,用以判斷是否發(fā)生了棧溢出。
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
// 對(duì)P發(fā)一個(gè)異步搶占請(qǐng)示
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
這里主要是設(shè)備兩個(gè)搶占標(biāo)識(shí)位懒震,對(duì)于信號(hào)調(diào)用了 preemptM() 函數(shù)發(fā)送一個(gè)搶占請(qǐng)求到m罩息。
// src/runtime/signal_unix.go
const preemptMSupported = true
// preemptM sends a preemption request to mp. This request may be
// handled asynchronously and may be coalesced with other requests to
// the M. When the request is received, if the running G or P are
// marked for preemption and the goroutine is at an asynchronous
// safe-point, it will preempt the goroutine. It always atomically
// increments mp.preemptGen after handling a preemption request.
func preemptM(mp *m) {
......
if atomic.Cas(&mp.signalPending, 0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, 1)
}
// If multiple threads are preempting the same M, it may send many
// signals to the same M such that it hardly make progress, causing
// live-lock problem. Apparently this could happen on darwin. See
// issue #37741.
// Only send a signal if there isn't already one pending.
signalM(mp, sigPreempt)
}
......
}
這里又調(diào)用了 signalM()
函數(shù)。
// src/runtime/os_darwin.go
func signalM(mp *m, sig int) {
pthread_kill(pthread(mp.procid), uint32(sig))
}
對(duì)于后面的 pthread_kill()
函數(shù)我們就不再繼續(xù)看了个扰。
以上就是發(fā)送搶占信號(hào)的基本流程瓷炮,相應(yīng)有也就應(yīng)該有處理搶占信號(hào)的邏輯。
處理信息
給m發(fā)送的信息是 sigPreempt 递宅,它是一個(gè)常量
const sigPreempt = _SIGURG
對(duì)于它的詳細(xì)說明娘香,可以參考官方注釋文檔。
程序在開始運(yùn)行的時(shí)候办龄,
// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
func initsig(preinit bool) {
if !preinit {
// It's now OK for signal handlers to run.
signalsOK = true
}
// For c-archive/c-shared this is called by libpreinit with
// preinit == true.
if (isarchive || islibrary) && !preinit {
return
}
for i := uint32(0); i < _NSIG; i++ {
t := &sigtable[i]
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}
// We don't need to use atomic operations here because
// there shouldn't be any other goroutines running yet.
fwdSig[i] = getsig(i)
if !sigInstallGoHandler(i) {
// Even if we are not installing a signal handler,
// set SA_ONSTACK if necessary.
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
sigInitIgnored(i)
}
continue
}
handlingSig[i] = 1
setsig(i, funcPC(sighandler)) // 注冊(cè)信號(hào)對(duì)應(yīng)的回調(diào)方法
}
}
go 在啟動(dòng)的時(shí)候會(huì)把所有的信息都注冊(cè)一次烘绽。
再通過 sighandler() 函數(shù)進(jìn)行注冊(cè)。
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
......
// 如果是搶占信號(hào)
if sig == sigPreempt && debug.asyncpreemptoff == 0 {
// Might be a preemption signal.
doSigPreempt(gp, c)
// Even if this was definitely a preemption signal, it
// may have been coalesced with another signal, so we
// still let it through to the application.
}
......
}
然后是執(zhí)行搶占信號(hào)事件
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
// 執(zhí)行搶占
ctxt.pushCall(funcPC(asyncPreempt), newpc)
}
}
// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)
if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}
會(huì)先判斷異步安全點(diǎn) isAsyncSafePoint()
俐填, 然后返回一個(gè)可以插入調(diào)用 asyncPreempt()
的PC安接。
本文基于go version 1.16