關(guān)于epoll/io多路復(fù)用的解析是經(jīng)典八股文描函,網(wǎng)上很多講解,這里不做贅述。
本文基于go1.17,是對go語言網(wǎng)絡(luò)模型netpoller的學(xué)習(xí)筆記龟劲,只涉及unix下的epoll模型。側(cè)重于弄清楚go是如何實現(xiàn)epoll模型的用戶代碼部分的轴或,以及go是如何將內(nèi)核緩沖區(qū)到用戶空間的復(fù)制過程接入到go本身的調(diào)度的昌跌。
先來段簡單的代碼。這段代碼監(jiān)聽了本地8888端口照雁,并調(diào)用Accept方法在有連接到來時返回一個net.TCPConn的引用(聲明是tcp服務(wù)器的情況下)蚕愤。這個結(jié)構(gòu)體最終會包含這個新連接的對應(yīng)socket。
func Run(){
listen, err := net.Listen("tcp", "0.0.0.0:8888")
if err != nil {
fmt.Println(err)
return
}
for {
accept, err := listen.Accept()
if err != nil {
fmt.Println(err)
return
}
go handle(accept)
}
}
func handle(conn net.Conn){
data := make([]byte,1024)
read, err := conn.Read(data)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("read:", read)
fmt.Println(data[:read])
_, err = conn.Write([]byte("hello"))
if err != nil {
fmt.Println(err)
return
}
}
跟蹤進(jìn)這個方法饺蚊,發(fā)現(xiàn)主要是個accept的調(diào)用
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
繼續(xù)跟蹤 代碼分為兩部分
func (ln *TCPListener) accept() (*TCPConn, error) {
// 第一部分返回一個netFD的引用(go對網(wǎng)絡(luò)文件描述符的表示)
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// 第二部分通過文件描述符創(chuàng)建一個新的TCPConn對象萍诱。
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
繼續(xù)跟蹤
func (fd *netFD) accept() (netfd *netFD, err error) {
// 這里d是個int類型 應(yīng)該接近系統(tǒng)調(diào)用了
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
// 初始化新netFD結(jié)構(gòu)體
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
繼續(xù)往下 fd.pfd.Accept() 方法
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// 加鎖
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
// 顧名思義,這里先不管
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
// 來到了最核心的地方
for {
// 系統(tǒng)調(diào)用接受新連接污呼,創(chuàng)建新socket
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
// 主要看這個錯誤裕坊,如果用戶進(jìn)程進(jìn)行socket的read操作時內(nèi)核還沒有準(zhǔn)備好數(shù)據(jù),則會返回這個錯誤也就是說這里就是go runtime發(fā)揮作用的地方
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
繼續(xù) fd.pd.waitRead() 方法(TCPConn的Read方法最終也會走到這個函數(shù))
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
// 最終調(diào)用了link到runtime包的方法
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
runtime_pollWait對應(yīng)代碼如下
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// netpollcheckerr的作用先忽略
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
// 并發(fā)環(huán)境中常見的雙重檢查燕酷?
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
既然是阻塞應(yīng)該就是在netpollblock里面的邏輯了
// 一些注意事項
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
// 這里主要注意gpp這個變量既作為要保存的阻塞協(xié)程的地址也作為了信號量在使用籍凝,在pollDesc結(jié)構(gòu)體中有說明(rg uintptr // pdReady, pdWait, G waiting for read or nil)周瞎。有點喪心病狂。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 區(qū)分讀寫操作
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if atomic.Casuintptr(gpp, pdReady, 0) {
return true
}
// 一般來說能走到這都能跳出循環(huán)
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := atomic.Loaduintptr(gpp); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == 0 {
// 這里才是重頭戲了饵蒂,調(diào)用gopark把當(dāng)前協(xié)程停住了声诸。并且停住之后gopark會調(diào)用netpollblockcommit把當(dāng)前協(xié)程的結(jié)構(gòu)體g寫到gpp(也就是pollDesc.wg或者pollDesc.rg)上,而每個pollDesc都會在netpoll的鏈表上退盯。之后go的sysmon或者schedule就能從這里恢復(fù)運(yùn)行協(xié)程彼乌。
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 設(shè)置協(xié)程地址到gpp
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
來看gopark邏輯
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem() // 鎖定當(dāng)前m,不被調(diào)度給其他g運(yùn)行
gp := mp.curg // 獲取m上運(yùn)行的g渊迁,在這個情境下就是發(fā)起讀或者寫的協(xié)程
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock // 也就是gpp
mp.waitunlockf = unlockf // 是netpollblockcommit地址
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp) // 解鎖
// can't do anything that might move the G between Ms here.
mcall(park_m) //切換到g0棧執(zhí)行park_m
}
這里mcall會切換到系統(tǒng)棧執(zhí)行park_m囤攀,并把調(diào)用他的g傳給park_m。
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg() // _g_這個時候是g0了
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting) //修改傳入的g狀態(tài)為等待中
dropg() // 雙向解綁g和m
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock) // netpollblockcommit執(zhí)行
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule() // 開啟新一輪調(diào)度宫纬,永不返回
}
func dropg() {
_g_ := getg()
setMNoWB(&_g_.m.curg.m, nil)
setGNoWB(&_g_.m.curg, nil)
}
至此從內(nèi)核報syscall.EAGAIN錯誤開始到最終調(diào)用schedule()的鏈路打通了。go的runtime把本應(yīng)該阻塞或者輪詢的執(zhí)行流(比如線程)換成了go自己協(xié)程的休眠膏萧。而協(xié)程的調(diào)度管理又是go語言的強(qiáng)大之處漓骚。
接下來分析如何喚醒協(xié)程,既然上面是讓協(xié)程休眠了榛泛,那么可以猜測到在go的調(diào)度協(xié)程運(yùn)行的地方會有喚醒因網(wǎng)絡(luò)I/O阻塞的協(xié)程邏輯蝌蹂。在go的尋找可執(zhí)行協(xié)程函數(shù)findrunnable我們能看見如下代碼
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 這里返回可執(zhí)行的協(xié)程列表
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
其中主要會調(diào)用netpoll
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
// 一般走到這里
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent // 為什么是128
retry:
// 看起來是這里了
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
// 設(shè)置讀還是寫
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 還原pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
// 取出之前保存的阻塞的g,添加到待運(yùn)行列表
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
TEXT runtime·epollwait(SB),NOSPLIT,$0
// This uses pwait instead of wait, because Android O blocks wait.
// 前四行都是取入?yún)⒌綄?yīng)寄存器
MOVL epfd+0(FP), DI
MOVQ ev+8(FP), SI
MOVL nev+16(FP), DX
MOVL timeout+20(FP), R10
MOVQ $0, R8 // 這一步?jīng)]明白
MOVL $SYS_epoll_pwait, AX // 系統(tǒng)調(diào)用號281對應(yīng)就是epoll_pwait系統(tǒng)調(diào)用
SYSCALL
MOVL AX, ret+24(FP) // 返回值賦值
RET
那么epoll_ctl和epoll_create在哪里調(diào)用呢
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) // 系統(tǒng)調(diào)用
if epfd < 0 {
epfd = epollcreate(1024) // 系統(tǒng)調(diào)用
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
go使用netpollopen對epoll_ctl進(jìn)行了封裝曹锨,主要是在調(diào)用netFD的connect方法時觸發(fā)孤个。大部分情況下就是dial建立連接的時候。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) // 系統(tǒng)調(diào)用
}
最后值得一提的是go對epoll的接口的交互都是通過pollDesc結(jié)構(gòu)體進(jìn)行的沛简。而pollDesc的內(nèi)存空間是go語言負(fù)責(zé)分配管理(不受垃圾回收管理)齐鲤,不存在類似mmap的操作。
參考
https://blog.csdn.net/qu1993/article/details/111550425
Go netpoller 原生網(wǎng)絡(luò)模型之源碼全面揭秘 - Strike Freedom
https://zhuanlan.zhihu.com/p/464301587