netpoller

關(guān)于epoll/io多路復(fù)用的解析是經(jīng)典八股文描函,網(wǎng)上很多講解,這里不做贅述。

本文基于go1.17,是對go語言網(wǎng)絡(luò)模型netpoller的學(xué)習(xí)筆記龟劲,只涉及unix下的epoll模型。側(cè)重于弄清楚go是如何實現(xiàn)epoll模型的用戶代碼部分的轴或,以及go是如何將內(nèi)核緩沖區(qū)到用戶空間的復(fù)制過程接入到go本身的調(diào)度的昌跌。

先來段簡單的代碼。這段代碼監(jiān)聽了本地8888端口照雁,并調(diào)用Accept方法在有連接到來時返回一個net.TCPConn的引用(聲明是tcp服務(wù)器的情況下)蚕愤。這個結(jié)構(gòu)體最終會包含這個新連接的對應(yīng)socket。

func Run(){
    listen, err := net.Listen("tcp", "0.0.0.0:8888")
    if err != nil {
        fmt.Println(err)
        return
    }
    for  {
        accept, err := listen.Accept()
        if err != nil {
            fmt.Println(err)
            return
        }
        go handle(accept)
    }
}

func handle(conn net.Conn){
    data := make([]byte,1024)
    read, err := conn.Read(data)
    if err != nil {
        fmt.Println(err)
        return 
    }
    fmt.Println("read:", read)
    fmt.Println(data[:read])
    _, err = conn.Write([]byte("hello"))
    if err != nil {
        fmt.Println(err)
        return
    }

}

跟蹤進(jìn)這個方法饺蚊,發(fā)現(xiàn)主要是個accept的調(diào)用

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}  

繼續(xù)跟蹤 代碼分為兩部分

func (ln *TCPListener) accept() (*TCPConn, error) {
      //  第一部分返回一個netFD的引用(go對網(wǎng)絡(luò)文件描述符的表示)
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
      // 第二部分通過文件描述符創(chuàng)建一個新的TCPConn對象萍诱。
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

繼續(xù)跟蹤

func (fd *netFD) accept() (netfd *netFD, err error) {
        // 這里d是個int類型 應(yīng)該接近系統(tǒng)調(diào)用了
    d, rsa, errcall, err := fd.pfd.Accept()
    if err != nil {
        if errcall != "" {
            err = wrapSyscallError(errcall, err)
        }
        return nil, err
    }
        // 初始化新netFD結(jié)構(gòu)體
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

繼續(xù)往下 fd.pfd.Accept() 方法

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
       // 加鎖
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()
      // 顧名思義,這里先不管
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
      // 來到了最核心的地方
    for { 
                // 系統(tǒng)調(diào)用接受新連接污呼,創(chuàng)建新socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
                // 主要看這個錯誤裕坊,如果用戶進(jìn)程進(jìn)行socket的read操作時內(nèi)核還沒有準(zhǔn)備好數(shù)據(jù),則會返回這個錯誤也就是說這里就是go runtime發(fā)揮作用的地方
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                                
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

繼續(xù) fd.pd.waitRead() 方法(TCPConn的Read方法最終也會走到這個函數(shù))

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
        // 最終調(diào)用了link到runtime包的方法
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

runtime_pollWait對應(yīng)代碼如下

// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // netpollcheckerr的作用先忽略
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
       
    for !netpollblock(pd, int32(mode), false) {
              // 并發(fā)環(huán)境中常見的雙重檢查燕酷?
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return pollNoError
}

既然是阻塞應(yīng)該就是在netpollblock里面的邏輯了

// 一些注意事項
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.

// 這里主要注意gpp這個變量既作為要保存的阻塞協(xié)程的地址也作為了信號量在使用籍凝,在pollDesc結(jié)構(gòu)體中有說明(rg uintptr // pdReady, pdWait, G waiting for read or nil)周瞎。有點喪心病狂。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
        // 區(qū)分讀寫操作
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to pdWait
    for {
        // Consume notification if already ready.
        if atomic.Casuintptr(gpp, pdReady, 0) {
            return true
        }
                // 一般來說能走到這都能跳出循環(huán)
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }

        // Double check that this isn't corrupt; otherwise we'd loop
        // forever.
        if v := atomic.Loaduintptr(gpp); v != pdReady && v != 0 {
            throw("runtime: double wait")
        }
    }

    // need to recheck error states after setting gpp to pdWait
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
    if waitio || netpollcheckerr(pd, mode) == 0 {
                // 這里才是重頭戲了饵蒂,調(diào)用gopark把當(dāng)前協(xié)程停住了声诸。并且停住之后gopark會調(diào)用netpollblockcommit把當(dāng)前協(xié)程的結(jié)構(gòu)體g寫到gpp(也就是pollDesc.wg或者pollDesc.rg)上,而每個pollDesc都會在netpoll的鏈表上退盯。之后go的sysmon或者schedule就能從這里恢復(fù)運(yùn)行協(xié)程彼乌。
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}
 
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
      // 設(shè)置協(xié)程地址到gpp
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

來看gopark邏輯

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()  // 鎖定當(dāng)前m,不被調(diào)度給其他g運(yùn)行
    gp := mp.curg // 獲取m上運(yùn)行的g渊迁,在這個情境下就是發(fā)起讀或者寫的協(xié)程
    status := readgstatus(gp) 
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock // 也就是gpp
    mp.waitunlockf = unlockf // 是netpollblockcommit地址
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp) // 解鎖
    // can't do anything that might move the G between Ms here.
    mcall(park_m) //切換到g0棧執(zhí)行park_m
}

這里mcall會切換到系統(tǒng)棧執(zhí)行park_m囤攀,并把調(diào)用他的g傳給park_m。

// park continuation on g0.
func park_m(gp *g) {
    _g_ := getg() // _g_這個時候是g0了

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting) //修改傳入的g狀態(tài)為等待中
    dropg() // 雙向解綁g和m

    if fn := _g_.m.waitunlockf; fn != nil {
        ok := fn(gp, _g_.m.waitlock) // netpollblockcommit執(zhí)行
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule() // 開啟新一輪調(diào)度宫纬,永不返回
}

func dropg() {
    _g_ := getg()

    setMNoWB(&_g_.m.curg.m, nil)
    setGNoWB(&_g_.m.curg, nil)
}

至此從內(nèi)核報syscall.EAGAIN錯誤開始到最終調(diào)用schedule()的鏈路打通了。go的runtime把本應(yīng)該阻塞或者輪詢的執(zhí)行流(比如線程)換成了go自己協(xié)程的休眠膏萧。而協(xié)程的調(diào)度管理又是go語言的強(qiáng)大之處漓骚。

接下來分析如何喚醒協(xié)程,既然上面是讓協(xié)程休眠了榛泛,那么可以猜測到在go的調(diào)度協(xié)程運(yùn)行的地方會有喚醒因網(wǎng)絡(luò)I/O阻塞的協(xié)程邏輯蝌蹂。在go的尋找可執(zhí)行協(xié)程函數(shù)findrunnable我們能看見如下代碼

    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
              // 這里返回可執(zhí)行的協(xié)程列表
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

其中主要會調(diào)用netpoll

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {  
              // 一般走到這里
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent // 為什么是128
retry:
      // 看起來是這里了
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }

        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if ev.events != _EPOLLIN {
                println("runtime: netpoll: break fd ready for", ev.events)
                throw("runtime: netpoll: break fd ready for something unexpected")
            }
            if delay != 0 {
                // netpollBreak could be picked up by a
                // nonblocking poll. Only read the byte
                // if blocking.
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
                atomic.Store(&netpollWakeSig, 0)
            }
            continue
        }
                // 設(shè)置讀還是寫
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
                      // 還原pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.setEventErr(ev.events == _EPOLLERR)
                        // 取出之前保存的阻塞的g,添加到待運(yùn)行列表
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}
// int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
TEXT runtime·epollwait(SB),NOSPLIT,$0
    // This uses pwait instead of wait, because Android O blocks wait.
        // 前四行都是取入?yún)⒌綄?yīng)寄存器
    MOVL    epfd+0(FP), DI 
    MOVQ    ev+8(FP), SI
    MOVL    nev+16(FP), DX
    MOVL    timeout+20(FP), R10
    MOVQ    $0, R8 // 這一步?jīng)]明白
    MOVL    $SYS_epoll_pwait, AX // 系統(tǒng)調(diào)用號281對應(yīng)就是epoll_pwait系統(tǒng)調(diào)用
    SYSCALL
    MOVL    AX, ret+24(FP) // 返回值賦值
    RET

那么epoll_ctl和epoll_create在哪里調(diào)用呢

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC) // 系統(tǒng)調(diào)用
    if epfd < 0 {
        epfd = epollcreate(1024) // 系統(tǒng)調(diào)用
        if epfd < 0 {
            println("runtime: epollcreate failed with", -epfd)
            throw("runtime: netpollinit failed")
        }
        closeonexec(epfd)
    }
    r, w, errno := nonblockingPipe()
    if errno != 0 {
        println("runtime: pipe failed with", -errno)
        throw("runtime: pipe failed")
    }
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    if errno != 0 {
        println("runtime: epollctl failed with", -errno)
        throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

go使用netpollopen對epoll_ctl進(jìn)行了封裝曹锨,主要是在調(diào)用netFD的connect方法時觸發(fā)孤个。大部分情況下就是dial建立連接的時候。

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) // 系統(tǒng)調(diào)用
}

最后值得一提的是go對epoll的接口的交互都是通過pollDesc結(jié)構(gòu)體進(jìn)行的沛简。而pollDesc的內(nèi)存空間是go語言負(fù)責(zé)分配管理(不受垃圾回收管理)齐鲤,不存在類似mmap的操作。

參考
https://blog.csdn.net/qu1993/article/details/111550425
Go netpoller 原生網(wǎng)絡(luò)模型之源碼全面揭秘 - Strike Freedom
https://zhuanlan.zhihu.com/p/464301587

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末椒楣,一起剝皮案震驚了整個濱河市给郊,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌捧灰,老刑警劉巖淆九,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異毛俏,居然都是意外死亡炭庙,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門煌寇,熙熙樓的掌柜王于貴愁眉苦臉地迎上來焕蹄,“玉大人,你說我怎么就攤上這事唧席〔炼埽” “怎么了嘲驾?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長迹卢。 經(jīng)常有香客問我辽故,道長,這世上最難降的妖魔是什么腐碱? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任誊垢,我火速辦了婚禮,結(jié)果婚禮上症见,老公的妹妹穿的比我還像新娘喂走。我一直安慰自己,他們只是感情好谋作,可當(dāng)我...
    茶點故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布芋肠。 她就那樣靜靜地躺著,像睡著了一般遵蚜。 火紅的嫁衣襯著肌膚如雪帖池。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天吭净,我揣著相機(jī)與錄音睡汹,去河邊找鬼。 笑死寂殉,一個胖子當(dāng)著我的面吹牛囚巴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播友扰,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼彤叉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了焕檬?” 一聲冷哼從身側(cè)響起姆坚,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎实愚,沒想到半個月后兼呵,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡腊敲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年击喂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碰辅。...
    茶點故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡懂昂,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出没宾,到底是詐尸還是另有隱情凌彬,我是刑警寧澤沸柔,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站铲敛,受9級特大地震影響褐澎,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜伐蒋,卻給世界環(huán)境...
    茶點故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一工三、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧先鱼,春花似錦俭正、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至宏多,卻和暖如春寺枉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背绷落。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留始苇,地道東北人砌烁。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像催式,于是被迫代替她去往敵國和親函喉。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容