Go - 網(wǎng)絡(luò)輪詢器

前言

可以從 Go 源碼目錄結(jié)構(gòu)和對應(yīng)代碼文件了解 Go 在不同平臺(tái)下的網(wǎng)絡(luò) I/O 模式的實(shí)現(xiàn)鹉胖。比如,在 Linux 系統(tǒng)下基于 epoll儒恋,freeBSD 系統(tǒng)下基于 kqueue喇澡,以及 Windows 系統(tǒng)下基于 iocp。

因?yàn)槲覀兊拇a都是部署在Linux上的小泉,所以本文以epoll封裝實(shí)現(xiàn)為例子來講解Go語言中I/O多路復(fù)用的源碼實(shí)現(xiàn)。

介紹

I/O多路復(fù)用

所謂 I/O 多路復(fù)用指的就是 select/epoll 這一系列的多路選擇器:支持單一線程同時(shí)監(jiān)聽多個(gè)文件描述符(I/O 事件)冕杠,阻塞等待微姊,并在其中某個(gè)文件描述符可讀寫時(shí)收到通知。以防很多同學(xué)對select或epoll不那么熟悉分预,所以下面先來講講這兩個(gè)選擇器兢交。

首先我們先說一下什么是文件描述符(File descriptor),根據(jù)它的英文首字母也簡稱FD笼痹,它是一個(gè)用于表述指向文件的引用的抽象化概念配喳。它是一個(gè)索引值飘诗,指向內(nèi)核為每一個(gè)進(jìn)程所維護(hù)的該進(jìn)程打開文件的記錄表。當(dāng)程序打開一個(gè)現(xiàn)有文件或者創(chuàng)建一個(gè)新文件時(shí)界逛,內(nèi)核向進(jìn)程返回一個(gè)文件描述符。

select
int select(int nfds,
            fd_set *restrict readfds,
            fd_set *restrict writefds,
            fd_set *restrict errorfds,
            struct timeval *restrict timeout);

writefds纺座、readfds息拜、和exceptfds是三個(gè)文件描述符集合。select會(huì)遍歷每個(gè)集合的前nfds個(gè)描述符净响,分別找到可以讀取少欺、可以寫入、發(fā)生錯(cuò)誤的描述符馋贤,統(tǒng)稱為就緒的描述符赞别。

timeout參數(shù)表示調(diào)用select時(shí)的阻塞時(shí)長。如果所有文件描述符都未就緒配乓,就阻塞調(diào)用進(jìn)程仿滔,直到某個(gè)描述符就緒,或者阻塞超過設(shè)置的 timeout 后犹芹,返回崎页。如果timeout參數(shù)設(shè)為 NULL,會(huì)無限阻塞直到某個(gè)描述符就緒腰埂;如果timeout參數(shù)設(shè)為 0飒焦,會(huì)立即返回,不阻塞屿笼。

當(dāng)select函數(shù)返回后牺荠,可以通過遍歷fdset,來找到就緒的描述符驴一。



select的缺點(diǎn)也列舉一下:

  1. select最大的缺陷就是單個(gè)進(jìn)程所打開的FD是有一定限制的休雌,它由FD_SETSIZE設(shè)置,默認(rèn)值是1024;
  2. 每次調(diào)用 select蛔趴,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)挑辆,這個(gè)開銷在 fd 很多時(shí)會(huì)很大;
  3. 每次 kernel 都需要線性掃描整個(gè) fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長孝情,其 I/O 性能會(huì)線性下降;
epoll

epoll是selec的增強(qiáng)版本鱼蝉,避免了“性能開銷大”和“文件描述符數(shù)量少”兩個(gè)缺點(diǎn)。

為方便理解后續(xù)的內(nèi)容箫荡,先看一下epoll的用法:

int listenfd = socket(AF_INET, SOCK_STREAM, 0);   
bind(listenfd, ...)
listen(listenfd, ...)

int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //將所有需要監(jiān)聽的fd添加到epfd中

while(1){
    int n = epoll_wait(...)
    for(接收到數(shù)據(jù)的socket){
        //處理
    }
}

先用epoll_create創(chuàng)建一個(gè)epoll對象實(shí)例epfd魁亦,同時(shí)返回一個(gè)引用該實(shí)例的文件描述符,返回的文件描述符僅僅指向?qū)?yīng)的epoll實(shí)例羔挡,并不表示真實(shí)的磁盤文件節(jié)點(diǎn)洁奈。

epoll實(shí)例內(nèi)部存儲(chǔ):

  • 監(jiān)聽列表:所有要監(jiān)聽的文件描述符间唉,使用紅黑樹;
  • 就緒列表:所有就緒的文件描述符利术,使用鏈表呈野;

再通過epoll_ctl將需要監(jiān)視的fd添加到epfd中,同時(shí)為fd設(shè)置一個(gè)回調(diào)函數(shù)印叁,并監(jiān)聽事件event被冒,并添加到監(jiān)聽列表中。當(dāng)有事件發(fā)生時(shí)轮蜕,會(huì)調(diào)用回調(diào)函數(shù)昨悼,并將fd添加到epoll實(shí)例的就緒隊(duì)列上。

最后調(diào)用epoll_wait阻塞監(jiān)聽 epoll 實(shí)例上所有的fd的 I/O 事件跃洛。當(dāng)就緒列表中已有數(shù)據(jù)率触,那么epoll_wait直接返回,解決了select每次都需要輪詢一遍的問題汇竭。

epoll的優(yōu)點(diǎn):
epoll的監(jiān)聽列表使用紅黑樹存儲(chǔ)葱蝗,epoll_ctl 函數(shù)添加進(jìn)來的 fd 都會(huì)被放在紅黑樹的某個(gè)節(jié)點(diǎn)內(nèi),而紅黑樹本身插入和刪除性能比較穩(wěn)定韩玩,時(shí)間復(fù)雜度 O(logN)垒玲,并且可以存儲(chǔ)大量的的fd,避免了只能存儲(chǔ)1024個(gè)fd的限制找颓;

epoll_ctl 中為每個(gè)文件描述符指定了回調(diào)函數(shù)合愈,并在就緒時(shí)將其加入到就緒列表,因此不需要像select一樣遍歷檢測每個(gè)文件描述符击狮,只需要判斷就緒列表是否為空即可佛析;

解析

netpoll本質(zhì)上是對 I/O 多路復(fù)用技術(shù)的封裝,所以自然也是和epoll一樣脫離不了下面幾步:

  1. netpoll創(chuàng)建及其初始化彪蓬;
  2. 向netpoll中加入待監(jiān)控的任務(wù)寸莫;
  3. 從netpoll獲取觸發(fā)的事件;

在go中對epoll提供的三個(gè)函數(shù)進(jìn)行了封裝:

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList

netpollinit函數(shù)負(fù)責(zé)初始化netpoll档冬;

netpollopen負(fù)責(zé)監(jiān)聽文件描述符上的事件膘茎;

netpoll會(huì)阻塞等待返回一組已經(jīng)準(zhǔn)備就緒的 Goroutine;

下面是Go語言中編寫的一個(gè)TCP server:

func main() {
    listen, err := net.Listen("tcp", ":8888")
    if err != nil {
        fmt.Println("listen error: ", err)
        return
    } 
    for {
        conn, err := listen.Accept()
        if err != nil {
            fmt.Println("accept error: ", err)
            break
        } 
        // 創(chuàng)建一個(gè)goroutine來負(fù)責(zé)處理讀寫任務(wù)
        go HandleConn(conn)
    }
} 

下面我們跟著這個(gè)TCP server的源碼一起看看是在哪里使用了netpoll來完成epoll的調(diào)用酷誓。

net.Listen

這個(gè)TCP server中會(huì)調(diào)用net.Listen創(chuàng)建一個(gè)socket同時(shí)返回與之對應(yīng)的fd披坏,該fd用來初始化listener的netFD(go層面封裝的網(wǎng)絡(luò)文件描述符),接著調(diào)用 netFD的listenStream方法完成對 socket 的 bind&listen和netFD的初始化盐数。

調(diào)用過程如下:


func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // 創(chuàng)建一個(gè)socket
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    ...
    // 創(chuàng)建fd
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    } 
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            // 調(diào)用 netFD的listenStream方法完成對 socket 的 bind&listen和netFD的初始化
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            ...
        }
    }
    ...
    return fd, nil
}

func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
    ret := &netFD{
        pfd: poll.FD{
            Sysfd:         sysfd,
            IsStream:      sotype == syscall.SOCK_STREAM,
            ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
        },
        family: family,
        sotype: sotype,
        net:    net,
    }
    return ret, nil
}

sysSocket方法會(huì)發(fā)起一個(gè)系統(tǒng)調(diào)用創(chuàng)建一個(gè)socket棒拂,newFD會(huì)創(chuàng)建一個(gè)netFD,然后調(diào)用netFD的listenStream方法進(jìn)行bind&listen操作,并對netFD進(jìn)行init帚屉。



netFD是一個(gè)文件描述符的封裝谜诫,netFD中包含一個(gè)FD數(shù)據(jù)結(jié)構(gòu),F(xiàn)D中包含了Sysfd 和pollDesc兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu)攻旦,Sysfd是sysSocket返回的socket系統(tǒng)文件描述符喻旷,pollDesc用于監(jiān)控文件描述符的可讀或者可寫。

我們繼續(xù)看listenStream:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    ...
    // 完成綁定操作
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    // 進(jìn)行監(jiān)聽操作
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    // 初始化fd
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

listenStream方法會(huì)調(diào)用Bind方法完成fd的綁定操作牢屋,然后調(diào)用listenFunc進(jìn)行監(jiān)聽赵刑,接著調(diào)用fd的init方法荷辕,完成FD凡蜻、pollDesc初始化觅玻。

func (pd *pollDesc) init(fd *FD) error {
    // 調(diào)用到runtime.poll_runtime_pollServerInit
    serverInit.Do(runtime_pollServerInit)
    // 調(diào)用到runtime.poll_runtime_pollOpen
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    return nil
}

runtime_pollServerInit用Once封裝保證只能被調(diào)用一次掰伸,這個(gè)函數(shù)在Linux平臺(tái)上會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例皱炉;

poll_runtime_pollOpen調(diào)用了netpollopen會(huì)將fd注冊到 epoll實(shí)例中,并返回一個(gè)pollDesc狮鸭;

netpollinit初始化

func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

netpollGenericInit會(huì)調(diào)用平臺(tái)上特定實(shí)現(xiàn)的netpollinit合搅,在Linux中會(huì)調(diào)用到netpoll_epoll.go的netpollinit方法:

var (
    epfd int32 = -1 // epoll descriptor 
)

func netpollinit() {
    // 創(chuàng)建一個(gè)新的 epoll 文件描述符
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    ...
    // 創(chuàng)建一個(gè)用于通信的管道
    r, w, errno := nonblockingPipe()
    ...
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    // 將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    ...
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

調(diào)用epollcreate1方法會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例,需要注意的是epfd是一個(gè)全局的屬性歧蕉。然后創(chuàng)建一個(gè)用于通信的管道灾部,調(diào)用epollctl將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽。

netpollopen加入事件監(jiān)聽

下面再看看poll_runtime_pollOpen方法:

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    lock(&pd.lock)
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    pd.self = pd
    unlock(&pd.lock)

    var errno int32
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

poll_runtime_pollOpen方法會(huì)通過pollcache.alloc初始化總大小約為 4KB的pollDesc結(jié)構(gòu)體惯退。然后重置pd的屬性赌髓,調(diào)用netpollopen向epoll實(shí)例epfd加入新的輪詢事件監(jiān)聽文件描述符的可讀和可寫狀態(tài)。

下面我們再看看pollCache是如何初始化pollDesc的催跪。

type pollCache struct {
    lock  mutex
    first *pollDesc 
}

const pollBlockSize = 4 * 1024

func (c *pollCache) alloc() *pollDesc {
    lock(&c.lock)
    // 初始化首節(jié)點(diǎn)
    if c.first == nil {
        const pdSize = unsafe.Sizeof(pollDesc{})
        n := pollBlockSize / pdSize
        if n == 0 {
            n = 1
        } 
        mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        // 初始化pollDesc鏈表
        for i := uintptr(0); i < n; i++ {
            pd := (*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    pd := c.first
    c.first = pd.link
    lockInit(&pd.lock, lockRankPollDesc)
    unlock(&c.lock)
    return pd
}

pollCache的鏈表頭如果為空锁蠕,那么初始化首節(jié)點(diǎn),首節(jié)點(diǎn)是一個(gè)pollDesc的鏈表頭懊蒸,每次調(diào)用該結(jié)構(gòu)體都會(huì)返回鏈表頭還沒有被使用的pollDesc荣倾。


到這里就完成了net.Listen的分析,下面我們看看listen.Accept骑丸。

net.Accept

Listener.Accept方法最終會(huì)調(diào)用到netFD的accept方法中:

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 調(diào)用netfd.FD的Accept接受新的 socket 連接舌仍,返回 socket 的 fd
    d, rsa, errcall, err := fd.pfd.Accept()
    ...
    // 構(gòu)造一個(gè)新的netfd
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    // 調(diào)用 netFD 的 init 方法完成初始化
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

這個(gè)方法首先會(huì)調(diào)用到FD的Accept接受新的 socket 連接,并返回新的socket對應(yīng)的fd通危,然后調(diào)用newFD構(gòu)造一個(gè)新的netfd铸豁,并通過init 方法完成初始化。

init方法上面我們已經(jīng)看過了黄鳍,下面我們來看看Accept方法:

func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    ...
    for {
        // 使用 linux 系統(tǒng)調(diào)用 accept 接收新連接推姻,創(chuàng)建對應(yīng)的 socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果當(dāng)前沒有發(fā)生期待的 I/O 事件,那么 waitRead 會(huì)通過 park goroutine 讓邏輯 block 在這里
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED: 
            continue
        }
        return -1, nil, errcall, err
    }
}

FD.Accept方法會(huì)使用 linux 系統(tǒng)調(diào)用 accept 接收新連接框沟,創(chuàng)建對應(yīng)的 socket藏古,如果沒有可讀的消息增炭,waitRead會(huì)被阻塞。這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒拧晕。

pollWait事件等待

pollDesc.waitRead實(shí)際上是調(diào)用了runtime.poll_runtime_pollWait

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    ...
    // 進(jìn)入 netpollblock 并且判斷是否有期待的 I/O 事件發(fā)生
    for !netpollblock(pd, int32(mode), false) {
        ...
    }
    return 0
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // 這個(gè) for 循環(huán)是為了等待 io ready 或者 io wait
    for {
        old := *gpp
        // gpp == pdReady 表示此時(shí)已有期待的 I/O 事件發(fā)生隙姿,
        // 可以直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行響應(yīng)的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // 如果沒有期待的 I/O 事件發(fā)生,則通過原子操作把 gpp 的值置為 pdWait 并退出 for 循環(huán)
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 讓出當(dāng)前線程厂捞,將 Goroutine 轉(zhuǎn)換到休眠狀態(tài)并等待運(yùn)行時(shí)的喚醒
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

poll_runtime_pollWait會(huì)用for循環(huán)調(diào)用netpollblock函數(shù)判斷是否有期待的 I/O 事件發(fā)生输玷,直到netpollblock返回true表示io ready才會(huì)走出循環(huán)。

netpollblock方法會(huì)判斷當(dāng)前的狀態(tài)是不是處于pdReady靡馁,如果是那么直接返回true欲鹏;如果不是,那么將gpp通過CAS設(shè)置為pdWait并退出 for 循環(huán)臭墨。通過gopark 把當(dāng)前 goroutine 給 park 住赔嚎,直到對應(yīng)的 fd 上發(fā)生可讀/可寫或者其他I/O 事件為止。

這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒胧弛。

netpoll輪詢等待

runtime.netpoll的核心邏輯是: 根據(jù)入?yún)?delay設(shè)置調(diào)用 epoll_wait 的 timeout 值尤误,調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist雙向列表中獲取IO就緒的fd列表,遍歷epoll_wait 返回的fd列表结缚, 根據(jù)調(diào)用epoll_ctl注冊fd時(shí)封裝的上下文信息組裝可運(yùn)行的 goroutine 并返回损晤。

執(zhí)行完 netpoll 之后,會(huì)返回一個(gè)就緒 fd 列表對應(yīng)的 goroutine 列表红竭,接下來將就緒的 goroutine 加入到調(diào)度隊(duì)列中尤勋,等待調(diào)度運(yùn)行。

func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    // 因?yàn)閭魅雂elay單位是納秒茵宪,下面將納秒轉(zhuǎn)換成毫秒
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent
retry:
    // 等待文件描述符轉(zhuǎn)換成可讀或者可寫
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    // 返回負(fù)值斥黑,那么重新調(diào)用epollwait進(jìn)行等待
    if n < 0 {
        ...
        goto retry
    }
    var toRun gList
    // 意味著被監(jiān)控的文件描述符出現(xiàn)了待處理的事件
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        } 
        ...
        // 判斷發(fā)生的事件類型,讀類型或者寫類型
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出保存在 epollevent 里的 pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 調(diào)用 netpollready眉厨,傳入就緒 fd 的 pollDesc
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

netpoll會(huì)調(diào)用epollwait獲取就緒的 fd 列表锌奴,對應(yīng)的epoll函數(shù)是epoll_wait。toRun是一個(gè) g 的鏈表憾股,存儲(chǔ)要恢復(fù)的 goroutines鹿蜀,最后返回給調(diào)用方。如果epollwait返回的n大于零服球,那么表示被監(jiān)控的文件描述符出現(xiàn)了待處理的事件茴恰,那么需要調(diào)用for循環(huán)進(jìn)行處理。循環(huán)里面會(huì)根據(jù)時(shí)間類型設(shè)置mode斩熊,然后拿出對應(yīng)的pollDesc往枣,調(diào)用netpollready方法。

下面我們再看一下netpollready:

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    // 獲取對應(yīng)的g的指針
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    // 將對應(yīng)的g加入到toRun列表中
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    // 根據(jù)傳入的mode判斷事件類型
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        // 取出 gpp 存儲(chǔ)的 g
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // cas 將讀或者寫信號量轉(zhuǎn)換成 pdReady
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdWait {
                old = 0
            }
            // 返回對應(yīng)的 g指針
            return (*g)(unsafe.Pointer(old))
        }
    }
}

講完了runtime.netpoll的源碼有個(gè)需要注意的地方,調(diào)用runtime.netpoll的地方有兩處:

在調(diào)度器中執(zhí)行runtime.schedule()分冈,該方法中會(huì)執(zhí)行runtime.findrunable()圾另,在runtime.findrunable()中調(diào)用了runtime.netpoll獲取待執(zhí)行的goroutine;
Go runtime 在程序啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)獨(dú)立的sysmon監(jiān)控線程雕沉,sysmon 每 20us~10ms 運(yùn)行一次集乔,每次運(yùn)行會(huì)檢查距離上一次執(zhí)行netpoll是否超過10ms,如果是則會(huì)調(diào)用一次runtime.netpoll坡椒;

這些入口的調(diào)用感興趣的可以自己去看看扰路。

總結(jié)

本文從I/O多路復(fù)用開始講解select以及epoll,然后再回到go語言中去看它是如何實(shí)現(xiàn)多路復(fù)用這樣的結(jié)構(gòu)的倔叼。通過追蹤源碼可以發(fā)現(xiàn)汗唱,其實(shí)go也是根據(jù)epoll來封裝自己的函數(shù):

func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList

通過這三個(gè)函數(shù)來實(shí)現(xiàn)對epoll的創(chuàng)建實(shí)例、注冊丈攒、事件等待操作渡嚣。

轉(zhuǎn)自

https://www.cnblogs.com/luozhiyun/p/14390824.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市肥印,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌绝葡,老刑警劉巖深碱,帶你破解...
    沈念sama閱讀 222,378評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異藏畅,居然都是意外死亡敷硅,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評論 3 399
  • 文/潘曉璐 我一進(jìn)店門愉阎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绞蹦,“玉大人,你說我怎么就攤上這事榜旦∮钠撸” “怎么了?”我有些...
    開封第一講書人閱讀 168,983評論 0 362
  • 文/不壞的土叔 我叫張陵溅呢,是天一觀的道長澡屡。 經(jīng)常有香客問我,道長咐旧,這世上最難降的妖魔是什么驶鹉? 我笑而不...
    開封第一講書人閱讀 59,938評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮铣墨,結(jié)果婚禮上室埋,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好姚淆,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評論 6 398
  • 文/花漫 我一把揭開白布孕蝉。 她就那樣靜靜地躺著,像睡著了一般肉盹。 火紅的嫁衣襯著肌膚如雪昔驱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,549評論 1 312
  • 那天上忍,我揣著相機(jī)與錄音骤肛,去河邊找鬼。 笑死窍蓝,一個(gè)胖子當(dāng)著我的面吹牛腋颠,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播吓笙,決...
    沈念sama閱讀 41,063評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼淑玫,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了面睛?” 一聲冷哼從身側(cè)響起絮蒿,我...
    開封第一講書人閱讀 39,991評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎叁鉴,沒想到半個(gè)月后土涝,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,522評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡幌墓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評論 3 342
  • 正文 我和宋清朗相戀三年但壮,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片常侣。...
    茶點(diǎn)故事閱讀 40,742評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蜡饵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出胳施,到底是詐尸還是另有隱情溯祸,我是刑警寧澤,帶...
    沈念sama閱讀 36,413評論 5 351
  • 正文 年R本政府宣布舞肆,位于F島的核電站您没,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏胆绊。R本人自食惡果不足惜氨鹏,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望压状。 院中可真熱鬧仆抵,春花似錦跟继、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至莺匠,卻和暖如春金吗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背趣竣。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評論 1 274
  • 我被黑心中介騙來泰國打工摇庙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人遥缕。 一個(gè)月前我還...
    沈念sama閱讀 49,159評論 3 378
  • 正文 我出身青樓卫袒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親单匣。 傳聞我的和親對象是個(gè)殘疾皇子夕凝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評論 2 361

推薦閱讀更多精彩內(nèi)容