前言
可以從 Go 源碼目錄結(jié)構(gòu)和對應(yīng)代碼文件了解 Go 在不同平臺(tái)下的網(wǎng)絡(luò) I/O 模式的實(shí)現(xiàn)鹉胖。比如,在 Linux 系統(tǒng)下基于 epoll儒恋,freeBSD 系統(tǒng)下基于 kqueue喇澡,以及 Windows 系統(tǒng)下基于 iocp。
因?yàn)槲覀兊拇a都是部署在Linux上的小泉,所以本文以epoll封裝實(shí)現(xiàn)為例子來講解Go語言中I/O多路復(fù)用的源碼實(shí)現(xiàn)。
介紹
I/O多路復(fù)用
所謂 I/O 多路復(fù)用指的就是 select/epoll 這一系列的多路選擇器:支持單一線程同時(shí)監(jiān)聽多個(gè)文件描述符(I/O 事件)冕杠,阻塞等待微姊,并在其中某個(gè)文件描述符可讀寫時(shí)收到通知。以防很多同學(xué)對select或epoll不那么熟悉分预,所以下面先來講講這兩個(gè)選擇器兢交。
首先我們先說一下什么是文件描述符(File descriptor),根據(jù)它的英文首字母也簡稱FD笼痹,它是一個(gè)用于表述指向文件的引用的抽象化概念配喳。它是一個(gè)索引值飘诗,指向內(nèi)核為每一個(gè)進(jìn)程所維護(hù)的該進(jìn)程打開文件的記錄表。當(dāng)程序打開一個(gè)現(xiàn)有文件或者創(chuàng)建一個(gè)新文件時(shí)界逛,內(nèi)核向進(jìn)程返回一個(gè)文件描述符。
select
int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);
writefds纺座、readfds息拜、和exceptfds是三個(gè)文件描述符集合。select會(huì)遍歷每個(gè)集合的前nfds個(gè)描述符净响,分別找到可以讀取少欺、可以寫入、發(fā)生錯(cuò)誤的描述符馋贤,統(tǒng)稱為就緒的描述符赞别。
timeout參數(shù)表示調(diào)用select時(shí)的阻塞時(shí)長。如果所有文件描述符都未就緒配乓,就阻塞調(diào)用進(jìn)程仿滔,直到某個(gè)描述符就緒,或者阻塞超過設(shè)置的 timeout 后犹芹,返回崎页。如果timeout參數(shù)設(shè)為 NULL,會(huì)無限阻塞直到某個(gè)描述符就緒腰埂;如果timeout參數(shù)設(shè)為 0飒焦,會(huì)立即返回,不阻塞屿笼。
當(dāng)select函數(shù)返回后牺荠,可以通過遍歷fdset,來找到就緒的描述符驴一。
select的缺點(diǎn)也列舉一下:
- select最大的缺陷就是單個(gè)進(jìn)程所打開的FD是有一定限制的休雌,它由FD_SETSIZE設(shè)置,默認(rèn)值是1024;
- 每次調(diào)用 select蛔趴,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)挑辆,這個(gè)開銷在 fd 很多時(shí)會(huì)很大;
- 每次 kernel 都需要線性掃描整個(gè) fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長孝情,其 I/O 性能會(huì)線性下降;
epoll
epoll是selec的增強(qiáng)版本鱼蝉,避免了“性能開銷大”和“文件描述符數(shù)量少”兩個(gè)缺點(diǎn)。
為方便理解后續(xù)的內(nèi)容箫荡,先看一下epoll的用法:
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
bind(listenfd, ...)
listen(listenfd, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); //將所有需要監(jiān)聽的fd添加到epfd中
while(1){
int n = epoll_wait(...)
for(接收到數(shù)據(jù)的socket){
//處理
}
}
先用epoll_create創(chuàng)建一個(gè)epoll對象實(shí)例epfd魁亦,同時(shí)返回一個(gè)引用該實(shí)例的文件描述符,返回的文件描述符僅僅指向?qū)?yīng)的epoll實(shí)例羔挡,并不表示真實(shí)的磁盤文件節(jié)點(diǎn)洁奈。
epoll實(shí)例內(nèi)部存儲(chǔ):
- 監(jiān)聽列表:所有要監(jiān)聽的文件描述符间唉,使用紅黑樹;
- 就緒列表:所有就緒的文件描述符利术,使用鏈表呈野;
再通過epoll_ctl將需要監(jiān)視的fd添加到epfd中,同時(shí)為fd設(shè)置一個(gè)回調(diào)函數(shù)印叁,并監(jiān)聽事件event被冒,并添加到監(jiān)聽列表中。當(dāng)有事件發(fā)生時(shí)轮蜕,會(huì)調(diào)用回調(diào)函數(shù)昨悼,并將fd添加到epoll實(shí)例的就緒隊(duì)列上。
最后調(diào)用epoll_wait阻塞監(jiān)聽 epoll 實(shí)例上所有的fd的 I/O 事件跃洛。當(dāng)就緒列表中已有數(shù)據(jù)率触,那么epoll_wait直接返回,解決了select每次都需要輪詢一遍的問題汇竭。
epoll的優(yōu)點(diǎn):
epoll的監(jiān)聽列表使用紅黑樹存儲(chǔ)葱蝗,epoll_ctl 函數(shù)添加進(jìn)來的 fd 都會(huì)被放在紅黑樹的某個(gè)節(jié)點(diǎn)內(nèi),而紅黑樹本身插入和刪除性能比較穩(wěn)定韩玩,時(shí)間復(fù)雜度 O(logN)垒玲,并且可以存儲(chǔ)大量的的fd,避免了只能存儲(chǔ)1024個(gè)fd的限制找颓;
epoll_ctl 中為每個(gè)文件描述符指定了回調(diào)函數(shù)合愈,并在就緒時(shí)將其加入到就緒列表,因此不需要像select一樣遍歷檢測每個(gè)文件描述符击狮,只需要判斷就緒列表是否為空即可佛析;
解析
netpoll本質(zhì)上是對 I/O 多路復(fù)用技術(shù)的封裝,所以自然也是和epoll一樣脫離不了下面幾步:
- netpoll創(chuàng)建及其初始化彪蓬;
- 向netpoll中加入待監(jiān)控的任務(wù)寸莫;
- 從netpoll獲取觸發(fā)的事件;
在go中對epoll提供的三個(gè)函數(shù)進(jìn)行了封裝:
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(delay int64) gList
netpollinit函數(shù)負(fù)責(zé)初始化netpoll档冬;
netpollopen負(fù)責(zé)監(jiān)聽文件描述符上的事件膘茎;
netpoll會(huì)阻塞等待返回一組已經(jīng)準(zhǔn)備就緒的 Goroutine;
下面是Go語言中編寫的一個(gè)TCP server:
func main() {
listen, err := net.Listen("tcp", ":8888")
if err != nil {
fmt.Println("listen error: ", err)
return
}
for {
conn, err := listen.Accept()
if err != nil {
fmt.Println("accept error: ", err)
break
}
// 創(chuàng)建一個(gè)goroutine來負(fù)責(zé)處理讀寫任務(wù)
go HandleConn(conn)
}
}
下面我們跟著這個(gè)TCP server的源碼一起看看是在哪里使用了netpoll來完成epoll的調(diào)用酷誓。
net.Listen
這個(gè)TCP server中會(huì)調(diào)用net.Listen創(chuàng)建一個(gè)socket同時(shí)返回與之對應(yīng)的fd披坏,該fd用來初始化listener的netFD(go層面封裝的網(wǎng)絡(luò)文件描述符),接著調(diào)用 netFD的listenStream方法完成對 socket 的 bind&listen和netFD的初始化盐数。
調(diào)用過程如下:
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// 創(chuàng)建一個(gè)socket
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
...
// 創(chuàng)建fd
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
// 調(diào)用 netFD的listenStream方法完成對 socket 的 bind&listen和netFD的初始化
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
...
}
}
...
return fd, nil
}
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
ret := &netFD{
pfd: poll.FD{
Sysfd: sysfd,
IsStream: sotype == syscall.SOCK_STREAM,
ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
},
family: family,
sotype: sotype,
net: net,
}
return ret, nil
}
sysSocket方法會(huì)發(fā)起一個(gè)系統(tǒng)調(diào)用創(chuàng)建一個(gè)socket棒拂,newFD會(huì)創(chuàng)建一個(gè)netFD,然后調(diào)用netFD的listenStream方法進(jìn)行bind&listen操作,并對netFD進(jìn)行init帚屉。
netFD是一個(gè)文件描述符的封裝谜诫,netFD中包含一個(gè)FD數(shù)據(jù)結(jié)構(gòu),F(xiàn)D中包含了Sysfd 和pollDesc兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu)攻旦,Sysfd是sysSocket返回的socket系統(tǒng)文件描述符喻旷,pollDesc用于監(jiān)控文件描述符的可讀或者可寫。
我們繼續(xù)看listenStream:
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
...
// 完成綁定操作
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 進(jìn)行監(jiān)聽操作
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// 初始化fd
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
listenStream方法會(huì)調(diào)用Bind方法完成fd的綁定操作牢屋,然后調(diào)用listenFunc進(jìn)行監(jiān)聽赵刑,接著調(diào)用fd的init方法荷辕,完成FD凡蜻、pollDesc初始化觅玻。
func (pd *pollDesc) init(fd *FD) error {
// 調(diào)用到runtime.poll_runtime_pollServerInit
serverInit.Do(runtime_pollServerInit)
// 調(diào)用到runtime.poll_runtime_pollOpen
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
return nil
}
runtime_pollServerInit用Once封裝保證只能被調(diào)用一次掰伸,這個(gè)函數(shù)在Linux平臺(tái)上會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例皱炉;
poll_runtime_pollOpen調(diào)用了netpollopen會(huì)將fd注冊到 epoll實(shí)例中,并返回一個(gè)pollDesc狮鸭;
netpollinit初始化
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
netpollGenericInit會(huì)調(diào)用平臺(tái)上特定實(shí)現(xiàn)的netpollinit合搅,在Linux中會(huì)調(diào)用到netpoll_epoll.go的netpollinit方法:
var (
epfd int32 = -1 // epoll descriptor
)
func netpollinit() {
// 創(chuàng)建一個(gè)新的 epoll 文件描述符
epfd = epollcreate1(_EPOLL_CLOEXEC)
...
// 創(chuàng)建一個(gè)用于通信的管道
r, w, errno := nonblockingPipe()
...
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
...
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
調(diào)用epollcreate1方法會(huì)創(chuàng)建一個(gè)epoll文件描述符實(shí)例,需要注意的是epfd是一個(gè)全局的屬性歧蕉。然后創(chuàng)建一個(gè)用于通信的管道灾部,調(diào)用epollctl將讀取數(shù)據(jù)的文件描述符加入監(jiān)聽。
netpollopen加入事件監(jiān)聽
下面再看看poll_runtime_pollOpen方法:
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
if pd.rg != 0 && pd.rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.everr = false
pd.rseq++
pd.rg = 0
pd.rd = 0
pd.wseq++
pd.wg = 0
pd.wd = 0
pd.self = pd
unlock(&pd.lock)
var errno int32
errno = netpollopen(fd, pd)
return pd, int(errno)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
poll_runtime_pollOpen方法會(huì)通過pollcache.alloc初始化總大小約為 4KB的pollDesc結(jié)構(gòu)體惯退。然后重置pd的屬性赌髓,調(diào)用netpollopen向epoll實(shí)例epfd加入新的輪詢事件監(jiān)聽文件描述符的可讀和可寫狀態(tài)。
下面我們再看看pollCache是如何初始化pollDesc的催跪。
type pollCache struct {
lock mutex
first *pollDesc
}
const pollBlockSize = 4 * 1024
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
// 初始化首節(jié)點(diǎn)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
// 初始化pollDesc鏈表
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}
pollCache的鏈表頭如果為空锁蠕,那么初始化首節(jié)點(diǎn),首節(jié)點(diǎn)是一個(gè)pollDesc的鏈表頭懊蒸,每次調(diào)用該結(jié)構(gòu)體都會(huì)返回鏈表頭還沒有被使用的pollDesc荣倾。
到這里就完成了net.Listen的分析,下面我們看看listen.Accept骑丸。
net.Accept
Listener.Accept方法最終會(huì)調(diào)用到netFD的accept方法中:
func (fd *netFD) accept() (netfd *netFD, err error) {
// 調(diào)用netfd.FD的Accept接受新的 socket 連接舌仍,返回 socket 的 fd
d, rsa, errcall, err := fd.pfd.Accept()
...
// 構(gòu)造一個(gè)新的netfd
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
// 調(diào)用 netFD 的 init 方法完成初始化
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
這個(gè)方法首先會(huì)調(diào)用到FD的Accept接受新的 socket 連接,并返回新的socket對應(yīng)的fd通危,然后調(diào)用newFD構(gòu)造一個(gè)新的netfd铸豁,并通過init 方法完成初始化。
init方法上面我們已經(jīng)看過了黄鳍,下面我們來看看Accept方法:
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
...
for {
// 使用 linux 系統(tǒng)調(diào)用 accept 接收新連接推姻,創(chuàng)建對應(yīng)的 socket
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果當(dāng)前沒有發(fā)生期待的 I/O 事件,那么 waitRead 會(huì)通過 park goroutine 讓邏輯 block 在這里
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
FD.Accept方法會(huì)使用 linux 系統(tǒng)調(diào)用 accept 接收新連接框沟,創(chuàng)建對應(yīng)的 socket藏古,如果沒有可讀的消息增炭,waitRead會(huì)被阻塞。這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒拧晕。
pollWait事件等待
pollDesc.waitRead實(shí)際上是調(diào)用了runtime.poll_runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
// 進(jìn)入 netpollblock 并且判斷是否有期待的 I/O 事件發(fā)生
for !netpollblock(pd, int32(mode), false) {
...
}
return 0
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 這個(gè) for 循環(huán)是為了等待 io ready 或者 io wait
for {
old := *gpp
// gpp == pdReady 表示此時(shí)已有期待的 I/O 事件發(fā)生隙姿,
// 可以直接返回 unblock 當(dāng)前 goroutine 并執(zhí)行響應(yīng)的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// 如果沒有期待的 I/O 事件發(fā)生,則通過原子操作把 gpp 的值置為 pdWait 并退出 for 循環(huán)
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
// 讓出當(dāng)前線程厂捞,將 Goroutine 轉(zhuǎn)換到休眠狀態(tài)并等待運(yùn)行時(shí)的喚醒
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
poll_runtime_pollWait會(huì)用for循環(huán)調(diào)用netpollblock函數(shù)判斷是否有期待的 I/O 事件發(fā)生输玷,直到netpollblock返回true表示io ready才會(huì)走出循環(huán)。
netpollblock方法會(huì)判斷當(dāng)前的狀態(tài)是不是處于pdReady靡馁,如果是那么直接返回true欲鹏;如果不是,那么將gpp通過CAS設(shè)置為pdWait并退出 for 循環(huán)臭墨。通過gopark 把當(dāng)前 goroutine 給 park 住赔嚎,直到對應(yīng)的 fd 上發(fā)生可讀/可寫或者其他I/O 事件為止。
這些被park住的goroutine會(huì)在goroutine的調(diào)度中調(diào)用runtime.netpoll被喚醒胧弛。
netpoll輪詢等待
runtime.netpoll的核心邏輯是: 根據(jù)入?yún)?delay設(shè)置調(diào)用 epoll_wait 的 timeout 值尤误,調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist雙向列表中獲取IO就緒的fd列表,遍歷epoll_wait 返回的fd列表结缚, 根據(jù)調(diào)用epoll_ctl注冊fd時(shí)封裝的上下文信息組裝可運(yùn)行的 goroutine 并返回损晤。
執(zhí)行完 netpoll 之后,會(huì)返回一個(gè)就緒 fd 列表對應(yīng)的 goroutine 列表红竭,接下來將就緒的 goroutine 加入到調(diào)度隊(duì)列中尤勋,等待調(diào)度運(yùn)行。
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
// 因?yàn)閭魅雂elay單位是納秒茵宪,下面將納秒轉(zhuǎn)換成毫秒
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
// 等待文件描述符轉(zhuǎn)換成可讀或者可寫
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
// 返回負(fù)值斥黑,那么重新調(diào)用epollwait進(jìn)行等待
if n < 0 {
...
goto retry
}
var toRun gList
// 意味著被監(jiān)控的文件描述符出現(xiàn)了待處理的事件
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
...
// 判斷發(fā)生的事件類型,讀類型或者寫類型
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在 epollevent 里的 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 調(diào)用 netpollready眉厨,傳入就緒 fd 的 pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}
netpoll會(huì)調(diào)用epollwait獲取就緒的 fd 列表锌奴,對應(yīng)的epoll函數(shù)是epoll_wait。toRun是一個(gè) g 的鏈表憾股,存儲(chǔ)要恢復(fù)的 goroutines鹿蜀,最后返回給調(diào)用方。如果epollwait返回的n大于零服球,那么表示被監(jiān)控的文件描述符出現(xiàn)了待處理的事件茴恰,那么需要調(diào)用for循環(huán)進(jìn)行處理。循環(huán)里面會(huì)根據(jù)時(shí)間類型設(shè)置mode斩熊,然后拿出對應(yīng)的pollDesc往枣,調(diào)用netpollready方法。
下面我們再看一下netpollready:
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
// 獲取對應(yīng)的g的指針
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
// 將對應(yīng)的g加入到toRun列表中
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
// 根據(jù)傳入的mode判斷事件類型
if mode == 'w' {
gpp = &pd.wg
}
for {
// 取出 gpp 存儲(chǔ)的 g
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// cas 將讀或者寫信號量轉(zhuǎn)換成 pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdWait {
old = 0
}
// 返回對應(yīng)的 g指針
return (*g)(unsafe.Pointer(old))
}
}
}
講完了runtime.netpoll的源碼有個(gè)需要注意的地方,調(diào)用runtime.netpoll的地方有兩處:
在調(diào)度器中執(zhí)行runtime.schedule()分冈,該方法中會(huì)執(zhí)行runtime.findrunable()圾另,在runtime.findrunable()中調(diào)用了runtime.netpoll獲取待執(zhí)行的goroutine;
Go runtime 在程序啟動(dòng)的時(shí)候會(huì)創(chuàng)建一個(gè)獨(dú)立的sysmon監(jiān)控線程雕沉,sysmon 每 20us~10ms 運(yùn)行一次集乔,每次運(yùn)行會(huì)檢查距離上一次執(zhí)行netpoll是否超過10ms,如果是則會(huì)調(diào)用一次runtime.netpoll坡椒;
這些入口的調(diào)用感興趣的可以自己去看看扰路。
總結(jié)
本文從I/O多路復(fù)用開始講解select以及epoll,然后再回到go語言中去看它是如何實(shí)現(xiàn)多路復(fù)用這樣的結(jié)構(gòu)的倔叼。通過追蹤源碼可以發(fā)現(xiàn)汗唱,其實(shí)go也是根據(jù)epoll來封裝自己的函數(shù):
func netpollinit()
func netpollopen(fd uintptr, pd *pollDesc) int32
func netpoll(block bool) gList
通過這三個(gè)函數(shù)來實(shí)現(xiàn)對epoll的創(chuàng)建實(shí)例、注冊丈攒、事件等待操作渡嚣。