大部分的服務(wù)都是 I/O 密集型的谬晕,應(yīng)用程序會(huì)花費(fèi)大量時(shí)間等待 I/O 操作的完成们何。網(wǎng)絡(luò)輪詢器(netpoller)是 Go 語言運(yùn)行時(shí)用來處理 I/O 操作的關(guān)鍵組件帖旨,它使用了操作系統(tǒng)提供的 I/O 多路復(fù)用機(jī)制增強(qiáng)程序的并發(fā)處理能力。本文會(huì)詳細(xì)介紹I/O模型相關(guān)知識(shí)歌憨,并深入分析 Go 語言網(wǎng)絡(luò)輪詢器的設(shè)計(jì)與實(shí)現(xiàn)原理。
I/O 相關(guān)基礎(chǔ)概念
文件
在Linux世界中文件是一個(gè)很簡(jiǎn)單的概念墩衙,作為程序員我們只需要將其理解為一個(gè)N byte的序列就可以了务嫡。
實(shí)際上所有的I/O設(shè)備都被抽象為了文件這個(gè)概念,一切皆文件底桂,Everything is File植袍,磁盤、網(wǎng)絡(luò)數(shù)據(jù)籽懦、終端于个,甚至進(jìn)程間通信工具管道pipe等都被當(dāng)做文件對(duì)待。
所有的I/O操作也都可以通過文件讀寫來實(shí)現(xiàn)暮顺,這一非常優(yōu)雅的抽象可以讓程序員使用一套接口就能對(duì)所有外設(shè)I/O操作厅篓。
常用的I/O操作接口一般有以下幾類:
? 打開文件,open
? 改變讀寫位置捶码,seek
? 文件讀寫羽氮,read、write
? 關(guān)閉文件惫恼,close
程序員通過這幾個(gè)接口幾乎可以實(shí)現(xiàn)所有I/O操作档押,這就是文件這個(gè)概念的強(qiáng)大之處。
文件描述符
要想進(jìn)行I/O讀操作,像磁盤數(shù)據(jù)令宿,我們需要指定一個(gè)buff用來裝入數(shù)據(jù)叼耙,一般都是這樣寫的
read(buff)
雖然我們指定了往哪里寫數(shù)據(jù),但是我們?cè)搹哪睦镒x數(shù)據(jù)呢粒没?我們無法確定哪個(gè)文件是我們需要去讀取的筛婉。Linux為了高效管理已被打開的文件,于是引入了索引:文件描述符(file descriptor)癞松。fd
用于指代被打開的文件爽撒,對(duì)文件所有 I/O 操作相關(guān)的系統(tǒng)調(diào)用都需要通過fd
。
有了文件描述符响蓉,進(jìn)程可以對(duì)文件一無所知硕勿,比如文件在磁盤的什么位置、加載到內(nèi)存中又是怎樣管理的等等枫甲,這些信息統(tǒng)統(tǒng)交由操作系統(tǒng)打理首尼,進(jìn)程無需關(guān)心,操作系統(tǒng)只需要給進(jìn)程一個(gè)文件描述符就足夠了言秸。
因此我們來完善上述程序:
int fd = open(file_name); // 獲取文件描述符read(fd, buff);
read(fd, buff);
I/O模型
目前Linux系統(tǒng)中提供了以下5種IO處理模型,不同的 I/O 模型會(huì)使用不同的方式操作文件描述符:
- 阻塞I/O
- 非阻塞I/O
- I/O多路復(fù)用
- 信號(hào)驅(qū)動(dòng)I/O
- 異步I/O
阻塞I/O(Blocking I/O)
阻塞 I/O 是最常見的 I/O 模型迎捺,在默認(rèn)情況下举畸,當(dāng)我們通過 read 或者 write 等系統(tǒng)調(diào)用讀寫文件或者網(wǎng)絡(luò)時(shí),應(yīng)用程序會(huì)被阻塞凳枝。
如下圖所示抄沮,當(dāng)我們執(zhí)行 recvfrom 系統(tǒng)調(diào)用時(shí),應(yīng)用程序會(huì)從用戶態(tài)陷入內(nèi)核態(tài)岖瑰,內(nèi)核會(huì)檢查文件描述符是否就緒叛买;當(dāng)文件描述符中存在數(shù)據(jù)時(shí),操作系統(tǒng)內(nèi)核會(huì)將準(zhǔn)備好的數(shù)據(jù)拷貝給應(yīng)用程序并交回控制權(quán)蹋订。線程會(huì)阻塞在這里率挣,然后掛起(掛起的時(shí)候,cpu回去處理其他任務(wù))露戒,等待隊(duì)列不為空椒功。
非阻塞I/O(NoneBlocking I/O)
當(dāng)進(jìn)程把一個(gè)文件描述符設(shè)置成非阻塞時(shí),執(zhí)行 read 和 write 等 I/O 操作會(huì)立刻返回智什。在 C 語言中动漾,我們可以使用如下所示的代碼片段將一個(gè)文件描述符設(shè)置成非阻塞的:
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
在上述代碼中,最關(guān)鍵的就是系統(tǒng)調(diào)用 fcntl
和參數(shù) O_NONBLOCK
荠锭,fcntl
為我們提供了操作文件描述符的能力旱眯,我們可以通過它修改文件描述符的特性。當(dāng)我們將文件描述符修改成非阻塞后,讀寫文件會(huì)經(jīng)歷以下流程:
第一次從文件描述符中讀取數(shù)據(jù)會(huì)觸發(fā)系統(tǒng)調(diào)用并返回 EAGAIN
錯(cuò)誤删豺,EAGAIN
意味著該文件描述符還在等待緩沖區(qū)中的數(shù)據(jù)共虑;隨后,應(yīng)用程序會(huì)不斷輪詢調(diào)用 read
直到它的返回值大于 0吼鳞,這時(shí)應(yīng)用程序就可以對(duì)讀取操作系統(tǒng)緩沖區(qū)中的數(shù)據(jù)并進(jìn)行操作看蚜。進(jìn)程使用非阻塞的 I/O 操作時(shí),可以在等待過程中執(zhí)行其他任務(wù)赔桌,提高 CPU 的利用率供炎。
I/O多路復(fù)用(I/O multiplexing)
原本是一個(gè) I/O對(duì)應(yīng)一個(gè)進(jìn)程,這樣的話如果有1000個(gè)i/o 就需要啟動(dòng)1000個(gè)進(jìn)程疾党,這對(duì)于一個(gè)16核音诫,8核的cpu來說,需要大量性能損耗在進(jìn)程間的切換上雪位。所以優(yōu)化了一種方案是N個(gè)i/o只對(duì)應(yīng)一個(gè)進(jìn)程來處理竭钝。這個(gè)就是I/O 多路復(fù)用。
I/O 多路復(fù)用機(jī)制雹洗,就是說通過一種機(jī)制香罐,可以監(jiān)視多個(gè)描述符;一旦某個(gè)描述符就緒(一般是讀就緒或?qū)懢途w)时肿,能夠通知程序進(jìn)行相應(yīng)的讀寫操作庇茫;沒有文件句柄就緒就會(huì)阻塞應(yīng)用程序,交出CPU螃成。這種機(jī)制的使用需要 select
旦签、 poll
、 epoll
來配合寸宏。
select
在select這種I/O多路復(fù)用機(jī)制下宁炫,我們需要把想監(jiān)控的fd
集合通過函數(shù)參數(shù)的形式告訴select氮凝,然后select會(huì)將這些文件描述符集合拷貝到內(nèi)核中覆醇。
select的缺點(diǎn):
- 能監(jiān)控的文件描述符太少截碴,通過 FD_SETSIZE 設(shè)置,默認(rèn)1024個(gè)
- 每次調(diào)用 select哲虾,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)束凑,這個(gè)開銷在 fd 很多時(shí)會(huì)很大(需要維護(hù)一個(gè)用來存放大量fd的數(shù)據(jù)結(jié)構(gòu)湘今,這樣會(huì)使得用戶空間和內(nèi)核空間在傳遞該結(jié)構(gòu)時(shí)復(fù)制開銷大)
- 每次調(diào)用select,都需要在內(nèi)核,遍歷fd集合進(jìn)行無差別輪詢孝常,性能開銷大(如果能給套接字注冊(cè)某個(gè)回調(diào)函數(shù),當(dāng)他們活躍時(shí)喜颁,自動(dòng)完成相關(guān)操作半开,那就避免了輪詢寂拆,這正是epoll與kqueue做的)
poll
poll本質(zhì)上和select沒有區(qū)別鬓长,它將用戶傳入的數(shù)組拷貝到內(nèi)核空間涉波,然后查詢每個(gè)fd對(duì)應(yīng)的設(shè)備狀態(tài)啤覆, 但是它沒有最大連接數(shù)的限制,原因是它是基于鏈表來存儲(chǔ)的嫌佑。
poll的缺點(diǎn):
- 每次調(diào)用 poll 屋摇,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)炮温,這個(gè)開銷在 fd 很多時(shí)會(huì)很大;
- 對(duì) fd集合 掃描是線性掃描担巩,采用輪詢的方法涛癌,效率較低(高并發(fā)時(shí))
epoll
epoll可以理解為event poll,不同于忙輪詢和無差別輪詢弃衍,epoll會(huì)把哪個(gè)流發(fā)生了怎樣的I/O事件通知我們笨鸡。所以我們說epoll實(shí)際上是事件驅(qū)動(dòng)(每個(gè)事件關(guān)聯(lián)上fd)的哥桥,此時(shí)我們對(duì)這些流的操作都是有意義的拟糕。(復(fù)雜度降低到了O(1))
epoll函數(shù)接口:
#include <sys/epoll.h>
// 數(shù)據(jù)結(jié)構(gòu)
// 每一個(gè)epoll對(duì)象都有一個(gè)獨(dú)立的eventpoll結(jié)構(gòu)體
// 用于存放通過epoll_ctl方法向epoll對(duì)象中添加進(jìn)來的事件
// epoll_wait檢查是否有事件發(fā)生時(shí),只需要檢查eventpoll對(duì)象中的rdlist雙鏈表中是否有epitem元素即可
struct eventpoll {
/*紅黑樹的根節(jié)點(diǎn)犁嗅,這顆樹中存儲(chǔ)著所有添加到epoll中的需要監(jiān)控的事件*/
struct rb_root rbr;
/*雙鏈表中則存放著將要通過epoll_wait返回給用戶的滿足條件的事件*/
struct list_head rdlist;
};
// API
int epoll_create(int size); // 內(nèi)核中間加一個(gè) ep 對(duì)象,把所有需要監(jiān)聽的 fd 都放到 ep 對(duì)象中
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 負(fù)責(zé)把 fd 增加宠蚂、刪除到內(nèi)核紅黑樹
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 負(fù)責(zé)檢測(cè)可讀隊(duì)列求厕,沒有可讀 fd 則阻塞
每一個(gè)epoll對(duì)象都有一個(gè)獨(dú)立的eventpoll結(jié)構(gòu)體,用于存放通過epoll_ctl方法向epoll對(duì)象中添加進(jìn)來的事件十艾。這些事件都會(huì)掛載在紅黑樹中荤牍,如此康吵,重復(fù)添加的事件就可以通過紅黑樹而高效的識(shí)別出來(紅黑樹的插入時(shí)間效率是logn晦嵌,其中n為紅黑樹元素個(gè)數(shù))旱函。
而所有添加到epoll中的事件都會(huì)與設(shè)備(網(wǎng)卡)驅(qū)動(dòng)程序建立回調(diào)關(guān)系棒妨,也就是說伏穆,當(dāng)相應(yīng)的事件發(fā)生時(shí)會(huì)調(diào)用這個(gè)回調(diào)方法枕扫。這個(gè)回調(diào)方法在內(nèi)核中叫ep_poll_callback,它會(huì)將發(fā)生的事件添加到rdlist雙鏈表中。
在epoll中燕刻,對(duì)于每一個(gè)事件,都會(huì)建立一個(gè)epitem結(jié)構(gòu)體过蹂,如下所示:
struct epitem{
struct rb_node rbn;//紅黑樹節(jié)點(diǎn)
struct list_head rdllink;//雙向鏈表節(jié)點(diǎn)
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所屬的eventpoll對(duì)象
struct epoll_event event; //期待發(fā)生的事件類型
}
當(dāng)調(diào)用epoll_wait檢查是否有事件發(fā)生時(shí),只需要檢查eventpoll對(duì)象中的rdlist雙鏈表中是否有epitem元素即可脆诉。如果rdlist不為空击胜,則把發(fā)生的事件復(fù)制到用戶態(tài),同時(shí)將事件數(shù)量返回給用戶辰斋。
從上面的講解可知:通過于紅黑樹和雙鏈表數(shù)據(jù)結(jié)構(gòu)够挂,并結(jié)合回調(diào)機(jī)制,造就了epoll的高效梭姓。
講解完了Epoll的機(jī)理誉尖,我們便能很容易掌握epoll的用法了。一句話描述就是:三步曲探熔。
第一步:epoll_create()系統(tǒng)調(diào)用。此調(diào)用返回一個(gè)句柄其垄,之后所有的使用都依靠這個(gè)句柄來標(biāo)識(shí)。創(chuàng)建一個(gè)epoll句柄喇颁,實(shí)際上是在內(nèi)核空間,建立一個(gè)root根節(jié)點(diǎn),這個(gè)根節(jié)點(diǎn)的關(guān)系與epfd相對(duì)應(yīng)忱辅。
第二步:epoll_ctl()系統(tǒng)調(diào)用橡卤。通過此調(diào)用向epoll對(duì)象中添加、刪除嵌灰、修改感興趣的事件,返回0標(biāo)識(shí)成功驹溃,返回-1表示失敗。創(chuàng)建的該用戶態(tài)事件布疙,綁定到某個(gè)fd上,然后添加到內(nèi)核中的epoll紅黑樹中俱诸。
第三步:epoll_wait()系統(tǒng)調(diào)用。通過此調(diào)用收集在epoll監(jiān)控中已經(jīng)發(fā)生的事件。如果內(nèi)核檢測(cè)到IO的讀寫響應(yīng)锌唾,會(huì)拋給上層的epoll_wait, 返回給用戶態(tài)一個(gè)已經(jīng)觸發(fā)的事件隊(duì)列滋捶,同時(shí)阻塞返回。開發(fā)者可以從隊(duì)列中取出事件來處理巡扇,其中事件里就有綁定的對(duì)應(yīng)fd是哪個(gè)(之前添加epoll事件的時(shí)候已經(jīng)綁定)。
- epoll的優(yōu)點(diǎn)
- 沒有最大并發(fā)連接的限制知给,能打開的FD的上限遠(yuǎn)大于1024(1G的內(nèi)存上能監(jiān)聽約10萬個(gè)端口)
- 效率提升,不是輪詢的方式,不會(huì)隨著FD數(shù)目的增加效率下降花墩。只有活躍可用的FD才會(huì)調(diào)用callback函數(shù);即Epoll最大的優(yōu)點(diǎn)就在于它只管你“活躍”的連接祠肥,而跟連接總數(shù)無關(guān),因此在實(shí)際的網(wǎng)絡(luò)環(huán)境中剂桥,Epoll的效率就會(huì)遠(yuǎn)遠(yuǎn)高于select和poll
- epoll的缺點(diǎn)
- epoll只能工作在 linux 下
-
epoll LT 與 ET 模式的區(qū)別
epoll 有 EPOLLLT 和 EPOLLET 兩種觸發(fā)模式,LT 是默認(rèn)的模式斟薇,ET 是 “高速” 模式。
- LT 模式下,只要這個(gè) fd 還有數(shù)據(jù)可讀犯眠,每次 epoll_wait 都會(huì)返回它的事件,提醒用戶程序去操作量蕊;
- ET 模式下,它只會(huì)提示一次,直到下次再有數(shù)據(jù)流入之前都不會(huì)再提示了舅锄,無論 fd 中是否還有數(shù)據(jù)可讀。所以在 ET 模式下,read 一個(gè) fd 的時(shí)候一定要把它的 buffer 讀完老翘,或者遇到 EAGIN 錯(cuò)誤。
epoll使用“事件”的就緒通知方式傀履,通過epoll_ctl注冊(cè)fd,一旦該fd就緒,內(nèi)核就會(huì)采用類似callback的回調(diào)機(jī)制來激活該fd啦粹,epoll_wait便可以收到通知。
select/poll/epoll之間的區(qū)別
select,poll撩荣,epoll都是IO多路復(fù)用的機(jī)制。I/O多路復(fù)用就通過一種機(jī)制,可以監(jiān)視多個(gè)描述符饱狂,一旦某個(gè)描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進(jìn)行相應(yīng)的讀寫操作筹麸。但select,poll雏婶,epoll本質(zhì)上都是同步I/O物赶,因?yàn)樗麄兌夹枰谧x寫事件就緒后自己負(fù)責(zé)進(jìn)行讀寫,也就是說這個(gè)讀寫過程是阻塞的留晚,而異步I/O則無需自己負(fù)責(zé)進(jìn)行讀寫酵紫,異步I/O的實(shí)現(xiàn)會(huì)負(fù)責(zé)把數(shù)據(jù)從內(nèi)核拷貝到用戶空間错维。
epoll跟select都能提供多路I/O復(fù)用的解決方案憨闰。在現(xiàn)在的Linux內(nèi)核里有都能夠支持,其中epoll是Linux所特有需五,而select則應(yīng)該是POSIX所規(guī)定,一般操作系統(tǒng)均有實(shí)現(xiàn)轧坎。
select | poll | epoll | |
---|---|---|---|
獲取可用的fd | 遍歷 | 遍歷 | 回調(diào) |
存儲(chǔ)fd的數(shù)據(jù)結(jié)構(gòu) | bitmap | 數(shù)組 | 紅黑樹 |
最大連接數(shù) | 1024(x86)或 2048(x64) | 無上限 | 無上限 |
最大支持fd數(shù)量 | 一般有最大值限制 | 65535 | 65535 |
fd拷貝 | 每次調(diào)用select宏邮,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài) | 每次調(diào)用poll,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài) | fd首次調(diào)用epoll_ctl拷貝缸血,每次調(diào)用epoll_wait不拷貝 |
工作模式 | LT | LT | 支持lT默認(rèn)模式及ET高效模式 |
工作效率 | 每次調(diào)用都進(jìn)行線性遍歷蜜氨,時(shí)間復(fù)雜度為O(n) | 每次調(diào)用都進(jìn)行線性遍歷,時(shí)間復(fù)雜度為O(n) | 事件通知方式捎泻,每當(dāng)fd就緒飒炎,系統(tǒng)注冊(cè)的回調(diào)函數(shù)就會(huì)被調(diào)用,將就緒fd放到readyList里面笆豁,時(shí)間復(fù)雜度O(1) |
epoll是Linux目前大規(guī)模網(wǎng)絡(luò)并發(fā)程序開發(fā)的首選模型郎汪。在絕大多數(shù)情況下性能遠(yuǎn)超select和poll。目前流行的高性能web服務(wù)器Nginx正式依賴于epoll提供的高效網(wǎng)絡(luò)套接字輪詢服務(wù)闯狱。但是煞赢,在并發(fā)連接不高的情況下,多線程+阻塞I/O方式可能性能更好哄孤。
select
照筑,poll
實(shí)現(xiàn)需要自己不斷輪詢所有fd集合,直到設(shè)備就緒瘦陈,期間可能要睡眠和喚醒多次交替凝危。而epoll
其實(shí)也需要調(diào)用epoll_wait
不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替晨逝,但是它是設(shè)備就緒時(shí)蛾默,調(diào)用回調(diào)函數(shù),把就緒fd放入就緒鏈表中捉貌,并喚醒在epoll_wait中進(jìn)入睡眠的進(jìn)程趴生。雖然都要睡眠和交替阀趴,但是select和poll在“醒著”的時(shí)候要遍歷整個(gè)fd集合,而epoll在“醒著”的時(shí)候只要判斷一下就緒鏈表是否為空就行了苍匆,這節(jié)省了大量的CPU時(shí)間刘急。這就是回調(diào)機(jī)制帶來的性能提升。
select
浸踩,poll
每次調(diào)用都要把fd
集合從用戶態(tài)往內(nèi)核態(tài)拷貝一次叔汁,并且要把current往設(shè)備等待隊(duì)列中掛一次。而epoll只要一次拷貝检碗,而且把current往等待隊(duì)列上掛也只掛一次*(在epoll_wait的開始据块,注意這里的等待隊(duì)列并不是設(shè)備等待隊(duì)列,只是一個(gè)epoll內(nèi)部定義的等待隊(duì)列)折剃。這也能節(jié)省不少的開銷另假。
信號(hào)驅(qū)動(dòng)I/O
在信號(hào)驅(qū)動(dòng)IO模型中,當(dāng)用戶線程發(fā)起一個(gè)IO請(qǐng)求操作怕犁,會(huì)給對(duì)應(yīng)的socket注冊(cè)一個(gè)信號(hào)函數(shù)边篮,然后用戶線程會(huì)繼續(xù)執(zhí)行,當(dāng)內(nèi)核數(shù)據(jù)就緒時(shí)會(huì)發(fā)送一個(gè)信號(hào)給用戶線程奏甫,用戶線程接收到信號(hào)后戈轿,便在信號(hào)函數(shù)中調(diào)用IO讀寫操作來進(jìn)行實(shí)際的IO請(qǐng)求操作。這個(gè)一般用于UDP中阵子,對(duì)TCP套接字幾乎沒用思杯,原因是該信號(hào)產(chǎn)生得過于頻繁,并且該信號(hào)的出現(xiàn)并沒有告訴我們發(fā)生了什么請(qǐng)求挠进。
用戶進(jìn)程可以使用信號(hào)方式色乾,當(dāng)系統(tǒng)內(nèi)核描述符就緒時(shí)將會(huì)發(fā)送SIGNO給到用戶空間,這個(gè)時(shí)候再發(fā)起recvfrom的系統(tǒng)調(diào)用等待返回成功提示领突,流程如下:
- 先開啟套接字的信號(hào)IO啟動(dòng)功能杈湾,并通過一個(gè)內(nèi)置安裝信號(hào)處理函數(shù)的signaction系統(tǒng)調(diào)用,當(dāng)發(fā)起調(diào)用之后會(huì)直接返回攘须;
- 其次漆撞,等待內(nèi)核從網(wǎng)絡(luò)中接收數(shù)據(jù)報(bào)之后,向用戶空間發(fā)送當(dāng)前數(shù)據(jù)可達(dá)的信號(hào)給信號(hào)處理函數(shù)于宙;
- 信號(hào)處理函數(shù)接收到信息就發(fā)起recvfrom系統(tǒng)調(diào)用等待內(nèi)核數(shù)據(jù)復(fù)制數(shù)據(jù)報(bào)到用戶空間的緩沖區(qū)浮驳;
- 接收到復(fù)制完成的返回成功提示之后,應(yīng)用進(jìn)程就可以開始從網(wǎng)絡(luò)中讀取數(shù)據(jù)捞魁。
異步I/O
前面四種IO模型實(shí)際上都屬于同步IO至会,只有最后一種是真正的異步IO,因?yàn)闊o論是多路復(fù)用IO還是信號(hào)驅(qū)動(dòng)模型谱俭,IO操作的第2個(gè)階段都會(huì)引起用戶線程阻塞奉件,也就是內(nèi)核進(jìn)行數(shù)據(jù)拷貝的過程都會(huì)讓用戶線程阻塞宵蛀。
- 由POSIX規(guī)范定義,告知系統(tǒng)內(nèi)核啟動(dòng)某個(gè)操作县貌,并讓內(nèi)核在整個(gè)操作包含數(shù)據(jù)等待以及數(shù)據(jù)復(fù)制過程的完成之后通知用戶進(jìn)程數(shù)據(jù)已經(jīng)準(zhǔn)備完成术陶,可以進(jìn)行讀取數(shù)據(jù);
- 與上述的信號(hào)IO模型區(qū)分在于異步是通知我們何時(shí)IO操作完成,而信號(hào)IO是通知我們何時(shí)可以啟動(dòng)一個(gè)IO操作
同步IO/異步IO/阻塞IO/非阻塞IO(基于POSIX規(guī)范)
- 同步IO: 表示應(yīng)用進(jìn)程發(fā)起真實(shí)的IO操作請(qǐng)求(recvfrom)導(dǎo)致進(jìn)程一直處于等待狀態(tài),這時(shí)候進(jìn)程被阻塞,直到IO操作完成返回成功提示
- 異步IO: 表示應(yīng)用進(jìn)程發(fā)起真實(shí)的IO操作請(qǐng)求(recvfrom)導(dǎo)致進(jìn)程將直接返回一個(gè)錯(cuò)誤信息,“相當(dāng)于告訴進(jìn)程還沒有處理好,好了會(huì)通知你”
- 阻塞IO: 主要是體現(xiàn)發(fā)起IO操作請(qǐng)求通知內(nèi)核并且內(nèi)核接收到信號(hào)之后如果讓進(jìn)程等待,那么就是阻塞
- 非阻塞IO: 發(fā)起IO操作請(qǐng)求的時(shí)候不論結(jié)果直接告訴進(jìn)程“不用等待,晚點(diǎn)再來”,那就是非阻塞
IO模型對(duì)比
除了真正的異步I/O模型以外煤痕,其他幾種模型梧宫,最后一階段的處理都是相同的——阻塞于recvfrom調(diào)用,將數(shù)據(jù)從內(nèi)核拷貝到應(yīng)用緩沖區(qū)摆碉。
同步與異步針對(duì)通信機(jī)制,阻塞與非阻塞針對(duì)程序調(diào)用等待結(jié)果的狀態(tài)
netpoller
netpoller基本原理
在Go的實(shí)現(xiàn)中塘匣,所有IO都是阻塞調(diào)用的,Go的設(shè)計(jì)思想是程序員使用阻塞式的接口來編寫程序巷帝,然后通過goroutine+channel來處理并發(fā)忌卤。因此所有的IO邏輯都是直來直去的,先xx楞泼,再xx, 你不再需要回調(diào)驰徊,不再需要future,要的僅僅是step by step现拒。這對(duì)于代碼的可讀性是很有幫助的。
netpoller的工作就是成為同步(阻塞)IO調(diào)用和異步(非阻塞)IO調(diào)用之間的橋梁望侈。
Go netpoller 通過在底層對(duì) epoll/kqueue/iocp 的封裝印蔬,從而實(shí)現(xiàn)了使用同步編程模式達(dá)到異步執(zhí)行的效果⊥蜒茫總結(jié)來說侥猬,所有的網(wǎng)絡(luò)操作都以網(wǎng)絡(luò)描述符 netFD 為中心實(shí)現(xiàn)。netFD 與底層 PollDesc 結(jié)構(gòu)綁定捐韩,當(dāng)在一個(gè) netFD 上讀寫遇到 EAGAIN 錯(cuò)誤時(shí)退唠,就將當(dāng)前 goroutine 存儲(chǔ)到這個(gè) netFD 對(duì)應(yīng)的 PollDesc 中,同時(shí)調(diào)用 gopark 把當(dāng)前 goroutine 給 park 住荤胁,直到這個(gè) netFD 上再次發(fā)生讀寫事件瞧预,才將此 goroutine 給 ready 激活重新運(yùn)行。顯然仅政,在底層通知 goroutine 再次發(fā)生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅(qū)動(dòng)機(jī)制垢油。
Go 是一門跨平臺(tái)的編程語言,而不同平臺(tái)針對(duì)特定的功能有不用的實(shí)現(xiàn)圆丹,這當(dāng)然也包括了 I/O 多路復(fù)用技術(shù)滩愁,比如 Linux 里的 I/O 多路復(fù)用有 select、poll 和 epoll辫封,而 freeBSD 或者 MacOS 里則是 kqueue硝枉,而 Windows 里則是基于異步 I/O 實(shí)現(xiàn)的 iocp廉丽,等等;因此妻味,Go 為了實(shí)現(xiàn)底層 I/O 多路復(fù)用的跨平臺(tái)正压,分別基于上述的這些不同平臺(tái)的系統(tǒng)調(diào)用實(shí)現(xiàn)了多版本的 netpollers,具體的源碼路徑如下:
- src/runtime/netpoll_epoll.go
- src/runtime/netpoll_kqueue.go
- src/runtime/netpoll_solaris.go
- src/runtime/netpoll_windows.go
- src/runtime/netpoll_aix.go
- src/runtime/netpoll_fake.go
本文的解析基于 epoll 版本弧可,如果讀者對(duì)其他平臺(tái)的 netpoller 底層實(shí)現(xiàn)感興趣蔑匣,可以在閱讀完本文后自行翻閱其他 netpoller 源碼,所有實(shí)現(xiàn)版本的機(jī)制和原理基本類似棕诵。
netpoller代碼結(jié)構(gòu)概覽
實(shí)際的實(shí)現(xiàn)(epoll/kqueue)必須定義以下函數(shù):
func netpollinit() // 初始化輪詢器
func netpollopen(fd uintptr, pd *pollDesc) int32 // 為fd和pd啟動(dòng)邊緣觸發(fā)通知
當(dāng)一個(gè)goroutine進(jìn)行io阻塞時(shí)裁良,會(huì)去被放到等待隊(duì)列。這里面就關(guān)鍵的就是建立起文件描述符和goroutine之間的關(guān)聯(lián)校套。 pollDesc結(jié)構(gòu)體就是完成這個(gè)任務(wù)的价脾。代碼參見src/runtime/netpoll.go
type pollDesc struct { // Poller對(duì)象
link *pollDesc // 鏈表
lock mutex // 保護(hù)下面字段
fd uintptr // fd是底層網(wǎng)絡(luò)io文件描述符,整個(gè)生命期內(nèi)笛匙,不能改變值
closing bool
seq uintptr // protect from stale(過時(shí)) timers and ready notifications
rg uintptr // reader goroutine addr
rt timer
rd int64
wg uintptr // writer goroutine addr
wt timer
wd int64
user int32 // user-set cookie用戶自定義數(shù)據(jù)
}
type pollCache struct { // 全局Poller鏈表
lock mutex // 保護(hù)Poller鏈表
first *pollDesc
}
// 調(diào)用netpollinit()
func poll_runtime_pollServerInit() {}
// 調(diào)用netpollopen()
func poll_runtime_pollOpen() {}
// 調(diào)用netpollclose()
func poll_runtime_pollClose() {}
// 先check(netpollcheckerr(pd, mode))是否有err發(fā)生侨把,沒有的話重置pd對(duì)應(yīng)字段
func poll_runtime_pollReset(pd, mode) {}
// 先chekerr,再調(diào)用netpollblock(pd, mode, false) {}
func poll_runtime_pollWait(pd, mode) {}
// windows下專用
func poll_runtime_pollWaitCanceled(pd, mode) {}
func poll_runtime_pollSetDeadline(pd, deadline, mode) {}
//1. 重置定時(shí)器妹孙,并seq++
//2. 設(shè)置超時(shí)函數(shù)netpollDeadline(或者netpollReadDeadline秋柄、netpollWriteDeadline)
//3. 如果已經(jīng)過期,調(diào)用netpollunblock和netpollgoready
// netpollUnblock蠢正、netpollgoready
func poll_runtime_pollUnblock(pd) {}
/*------------------部分實(shí)現(xiàn)------------------*/
// 檢查是否超時(shí)或正在關(guān)閉
func netpollcheckerr(pd, mode) {}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) {}
// 調(diào)用netpollunblock骇笔,更新g的
func netpollready(gpp *guintptr, pd, mode) schedlink {}
// 更新統(tǒng)計(jì)數(shù)據(jù),調(diào)用goready --- 通知調(diào)度器協(xié)程g從parked變?yōu)閞eady
func netpollgoready(gp *g, traceskip) {}
// Set rg/wg = pdWait嚣崭,調(diào)用gopark掛起pd對(duì)應(yīng)的g笨触。
func netpollblock(pd, mode, waitio) {}
func netpollunblock(pd, mode, ioready) {}
func netpoll(Write/Read)Deadline(arg, seq) {}
pollCache
是pollDesc鏈表入口,加鎖保護(hù)鏈表安全雹舀。
pollDesc
中芦劣,rg、wg有些特殊说榆,它可能有如下3種狀態(tài):
pdReady == 1
: 網(wǎng)絡(luò)io就緒通知虚吟,goroutine消費(fèi)完后應(yīng)置為nil-
pdWait == 2
: goroutine等待被掛起,后續(xù)可能有3種情況:- goroutine被調(diào)度器掛起签财,置為goroutine地址
- 收到io通知稍味,置為pdReady
- 超時(shí)或者被關(guān)閉,置為nil
Goroutine地址: 被掛起的goroutine的地址荠卷,當(dāng)io就緒時(shí)模庐、或者超時(shí)、被關(guān)閉時(shí)油宜,此goroutine將被喚醒掂碱,同時(shí)將狀態(tài)改為pdReady或者nil怜姿。
另外,由于wg疼燥、rg是goroutine的地址沧卢,因此當(dāng)GC發(fā)生后,如果goroutine被回收(在heap區(qū))醉者,代碼就崩潰了(指針無效)但狭。所以,進(jìn)行網(wǎng)絡(luò)IO的goroutine不能在heap區(qū)分配內(nèi)存撬即。
lock鎖對(duì)象保護(hù)了
pollOpen
,pollSetDeadline
,pollUnblock
和deadlineimpl
操作立磁。而這些操作又完全包含了對(duì)seq, rt, tw變量。fd在PollDesc
整個(gè)生命過程中都是一個(gè)常量剥槐。處理pollReset
,pollWait
,pollWaitCanceled
和runtime.netpollready
(IO就緒通知)不需要用到鎖唱歧,所以closing, rg, rd, wg和wd的所有操作都是一個(gè)無鎖的操作。
netpoller多路復(fù)用三部曲
初始化PollServer
初始化在下面注冊(cè)fd監(jiān)聽時(shí)順便處理了粒竖,調(diào)用runtime_pollServerInit()
颅崩,并使用sync.Once()
機(jī)制保證只會(huì)被初始化一次。全局使用同一個(gè)EpollServer
(同一個(gè)Epfd
)蕊苗。
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if atomic.Load(&netpollInited) == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited == 0 {
netpollinit() // 具現(xiàn)化到Linux下沿后,調(diào)用epoll_create
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
注冊(cè)監(jiān)聽fd
所有Unix文件在初始化時(shí),如果支持Poll朽砰,都會(huì)加入到PollServer的監(jiān)聽中尖滚。
/*****************internal/poll/fd_unix.go*******************/
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
...
}
func(fd *FD) Init(net string, pollable bool) error {
...
err := fd.pd.init(fd) // 初始化pd
...
}
...
/*****************internal/poll/fd_poll_runtime.go*****************/
type pollDesc struct {
runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit) // 初始化PollServer(sync.Once)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
runtimeCtx = ctx
return nil
}
...
/*****************runtime/netpoll.go*****************/
func poll_runtime_pollOpen(fd uintptr) (*epDesc, int32) {
...
errno := netpollopen(fd, pd) // 具現(xiàn)化到Linux下,調(diào)用epoll_ctl
...
}
取消fd的監(jiān)聽與此流程類似锅移,最終調(diào)用epoll_ctl
.
定期Poll
結(jié)合上述實(shí)現(xiàn)熔掺,必然有處邏輯定期執(zhí)行epoll_wait
來檢測(cè)fd
狀態(tài)饱搏。在代碼中搜索下netpoll
非剃,即可發(fā)現(xiàn)是在sysmon、startTheWorldWithSema推沸、pollWork备绽、findrunnable
中調(diào)用的,以sysmon
為例:
// runtime/proc.go
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
// 如果10ms內(nèi)沒有poll過鬓催,則poll肺素。(1ms=1000000ns)
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // netpoll在Linux具現(xiàn)為epoll_wait
if gp != nil {
injectglist(gp) //把g放到sched中去執(zhí)行,底層仍然是調(diào)用的之前在goroutine里面提到的startm函數(shù)宇驾。
}
}
...
goroutine的I/O讀取流程
當(dāng)goroutine發(fā)起一個(gè)同步調(diào)用倍靡,經(jīng)過一系列的調(diào)用,最后會(huì)進(jìn)入gopark函數(shù)课舍,gopark將當(dāng)前正在執(zhí)行的goroutine狀態(tài)保存起來塌西,然后切換到新的堆棧上執(zhí)行新的goroutine他挎。由于當(dāng)前goroutine狀態(tài)是被保存起來的,因此后面可以被恢復(fù)捡需。這樣調(diào)用Read的goroutine以為一直同步阻塞到現(xiàn)在办桨,其實(shí)內(nèi)部是異步完成的。
1. 加入監(jiān)聽
golang中客戶端與服務(wù)端進(jìn)行通訊時(shí)站辉,常用如下方法:
conn, err := net.Dial("tcp", "localhost:1208")
從net.Dial看進(jìn)去呢撞,最終會(huì)調(diào)用net/net_posix.go中的socket函數(shù),大致流程如下:
func socket(...) ... {
/*
1. 調(diào)用sysSocket創(chuàng)建原生socket
2. 調(diào)用同名包下netFd()饰剥,初始化網(wǎng)絡(luò)文件描述符netFd
3. 調(diào)用fd.dial()殊霞,其中最終有調(diào)用poll_runtime_pollOpen()加入監(jiān)聽列表
*/
}
runtime.poll_runtime_pollOpen
重置輪詢信息 runtime.pollDesc
并調(diào)用 runtime.netpollopen
初始化輪詢事件:
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
if pd.wg != 0 && pd.wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
...
pd.fd = fd
pd.closing = false
pd.everr = false
...
pd.wseq++
pd.wg = 0
pd.wd = 0
unlock(&pd.lock)
var errno int32
// 初始化輪詢事件
errno = netpollopen(fd, pd)
return pd, int(errno)
}
runtime.netpollopen
的實(shí)現(xiàn)非常簡(jiǎn)單,它會(huì)調(diào)用 epollctl
向全局的輪詢文件描述符 epfd
中加入新的輪詢事件監(jiān)聽文件描述符的可讀和可寫狀態(tài):
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
從全局的 epfd
中刪除待監(jiān)聽的文件描述符可以使用 runtime.netpollclose
捐川,因?yàn)樵摵瘮?shù)的實(shí)現(xiàn)與 runtime.netpollopen
比較相似脓鹃,所以這里不展開分析了。
2. 讀等待
主要是掛起goroutine古沥,并建立gorotine和fd之間的關(guān)聯(lián)瘸右。
當(dāng)從netFd讀取數(shù)據(jù)時(shí),調(diào)用system call岩齿,循環(huán)從fd.sysfd讀取數(shù)據(jù):
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
讀取的時(shí)候只處理EAGAIN
類型的錯(cuò)誤太颤,其他錯(cuò)誤一律返回給調(diào)用者,因?yàn)閷?duì)于非阻塞的網(wǎng)絡(luò)連接的文件描述符盹沈,如果錯(cuò)誤是EAGAIN
龄章,說明Socket的緩沖區(qū)為空,未讀取到任何數(shù)據(jù)乞封,則調(diào)用fd.pd.WaitRead
:
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
res是runtime_pollWait函數(shù)返回的結(jié)果做裙,由conevertErr函數(shù)包裝后返回:
func convertErr(res int, isFile bool) error {
switch res {
case 0:
return nil
case 1:
return errClosing(isFile)
case 2:
return ErrTimeout
}
println("unreachable: ", res)
panic("unreachable")
}
其中0表示io已經(jīng)準(zhǔn)備好了,1表示鏈接意見關(guān)閉肃晚,2表示io超時(shí)锚贱。再來看看pollWait的實(shí)現(xiàn):
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
調(diào)用netpollblock來判斷IO是否準(zhǔn)備好了:
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
}
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
返回true說明IO已經(jīng)準(zhǔn)備好,返回false說明IO操作已經(jīng)超時(shí)或者已經(jīng)關(guān)閉关串。否則當(dāng)waitio為false, 且io不出現(xiàn)錯(cuò)誤或者超時(shí)才會(huì)掛起當(dāng)前goroutine拧廊。
最后的gopark函數(shù),就是將當(dāng)前的goroutine(調(diào)用者)設(shè)置為waiting狀態(tài):
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
mcall(park_m)
將會(huì)掛起當(dāng)前與g綁定的m:
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if _g_.m.waitunlockf != nil {
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
3. 就緒喚醒
那什么時(shí)候goroutine被喚醒并調(diào)度回來呢晋修?運(yùn)行時(shí)在執(zhí)行schedule()
方法時(shí)吧碾,會(huì)通過findrunnable()
,調(diào)用netpoll()
檢查文件描述符狀態(tài)墓卦。
schedule() -> findrunnable() -> netpoll()
調(diào)用netpoll()
尋找到IO就緒的socket文件描述符倦春,并找到這些socket文件描述符對(duì)應(yīng)的輪詢器中附帶的信息,根據(jù)這些信息將之前等待這些socket文件描述符就緒的goroutine狀態(tài)修改為Grunnable。執(zhí)行完netpoll之后睁本,會(huì)找到一個(gè)就緒的goroutine列表山叮,接下來將就緒的goroutine加入到調(diào)度隊(duì)列中,等待調(diào)度運(yùn)行添履。
下面我們看下netpoll()
的源碼實(shí)現(xiàn):
// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) *g {
if epfd == -1 {
return gList{}
}
var waitms int32
// 根據(jù)傳入的 delay 計(jì)算 epoll 系統(tǒng)調(diào)用需要等待的時(shí)間
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
var events [128]epollevent
retry:
// 調(diào)用 epollwait 等待可讀或者可寫事件的發(fā)生
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
goto retry
}
var gp guintptr
// 在循環(huán)中依次處理 epollevent 事件
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
// 文件描述符的正常讀寫事件屁倔,對(duì)于這些事件,我們會(huì)交給netpollready處理
netpollready(&gp, pd, mode)
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
}
當(dāng)netpoll()
調(diào)用epollwait()
獲取到被監(jiān)控的文件描述符出現(xiàn)了待處理的事件暮胧,就會(huì)在循環(huán)中依次調(diào)用netpollready()
處理這些事件锐借。
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
runtime.netpollunblock
會(huì)在讀寫事件發(fā)生時(shí),將 runtime.pollDesc
中的讀或者寫信號(hào)量轉(zhuǎn)換成 pdReady
并返回其中存儲(chǔ)的 goroutine往衷;如果返回的 Goroutine 不會(huì)為空钞翔,那么運(yùn)行時(shí)會(huì)將該 goroutine 會(huì)加入 toRun
列表,并將列表中的全部 goroutine 加入運(yùn)行隊(duì)列席舍。
當(dāng)goroutine 加入運(yùn)行隊(duì)列后布轿,在某一次調(diào)度goroutine的過程中,處于就緒狀態(tài)的FD對(duì)應(yīng)的goroutine就會(huì)被調(diào)度回來来颤。
netpoller超時(shí)控制
網(wǎng)絡(luò)輪詢器和計(jì)時(shí)器的關(guān)系非常緊密汰扭,這不僅僅是因?yàn)榫W(wǎng)絡(luò)輪詢器負(fù)責(zé)計(jì)時(shí)器的喚醒,還因?yàn)槲募途W(wǎng)絡(luò) I/O 的截止日期也由網(wǎng)絡(luò)輪詢器負(fù)責(zé)處理福铅。截止日期在 I/O 操作中萝毛,尤其是網(wǎng)絡(luò)調(diào)用中很關(guān)鍵,網(wǎng)絡(luò)請(qǐng)求存在很高的不確定因素滑黔,我們需要設(shè)置一個(gè)截止日期保證程序的正常運(yùn)行笆包,這時(shí)需要用到網(wǎng)絡(luò)輪詢器中的 runtime.poll_runtime_pollSetDeadline
:
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
rd0, wd0 := pd.rd, pd.wd
if d > 0 {
d += nanotime()
}
pd.rd = d
...
if pd.rt.f == nil {
if pd.rd > 0 {
pd.rt.f = netpollReadDeadline
pd.rt.arg = pd
pd.rt.seq = pd.rseq
resettimer(&pd.rt, pd.rd)
}
} else if pd.rd != rd0 {
pd.rseq++
if pd.rd > 0 {
modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
} else {
deltimer(&pd.rt)
pd.rt.f = nil
}
}
該函數(shù)會(huì)先使用截止日期計(jì)算出過期的時(shí)間點(diǎn),然后根據(jù) runtime.pollDesc
的狀態(tài)做出以下不同的處理:
- 如果結(jié)構(gòu)體中的計(jì)時(shí)器沒有設(shè)置執(zhí)行的函數(shù)時(shí)略荡,該函數(shù)會(huì)設(shè)置計(jì)時(shí)器到期后執(zhí)行的函數(shù)庵佣、傳入的參數(shù)并調(diào)用
runtime.resettimer
重置計(jì)時(shí)器; - 如果結(jié)構(gòu)體的讀截止日期已經(jīng)被改變汛兜,我們會(huì)根據(jù)新的截止日期做出不同的處理:
- 如果新的截止日期大于 0巴粪,調(diào)用
runtime.modtimer
修改計(jì)時(shí)器; - 如果新的截止日期小于 0序无,調(diào)用
runtime.deltimer
刪除計(jì)時(shí)器验毡;
- 如果新的截止日期大于 0巴粪,調(diào)用
在 runtime.poll_runtime_pollSetDeadline
的最后衡创,會(huì)重新檢查輪詢信息中存儲(chǔ)的截止日期:
var rg *g
if pd.rd < 0 {
if pd.rd < 0 {
rg = netpollunblock(pd, 'r', false)
}
...
}
if rg != nil {
netpollgoready(rg, 3)
}
...
}
如果截止日期小于 0帝嗡,上述代碼會(huì)調(diào)用 runtime.netpollgoready
直接喚醒對(duì)應(yīng)的 Goroutine。
在 runtime.poll_runtime_pollSetDeadline
中直接調(diào)用 runtime.netpollgoready
是相對(duì)比較特殊的情況璃氢。在正常情況下哟玷,運(yùn)行時(shí)都會(huì)在計(jì)時(shí)器到期時(shí)調(diào)用 runtime.netpollDeadline
、runtime.netpollReadDeadline
和 runtime.netpollWriteDeadline
三個(gè)函數(shù):
上述三個(gè)函數(shù)都會(huì)通過
runtime.netpolldeadlineimpl
調(diào)用 runtime.netpollgoready
直接喚醒相應(yīng)的 Goroutine:
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
currentSeq := pd.rseq
if !read {
currentSeq = pd.wseq
}
if seq != currentSeq {
return
}
var rg *g
if read {
pd.rd = -1
atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil)
rg = netpollunblock(pd, 'r', false)
}
...
if rg != nil {
netpollgoready(rg, 0)
}
...
}
Goroutine 在被喚醒之后會(huì)意識(shí)到當(dāng)前的 I/O 操作已經(jīng)超時(shí),可以根據(jù)需要選擇重試請(qǐng)求或者中止調(diào)用巢寡。
總結(jié)
總的來說喉脖,netpoller的最終的效果就是用戶層阻塞,底層非阻塞抑月。當(dāng)goroutine讀或?qū)懽枞麜r(shí)會(huì)被放到等待隊(duì)列树叽,這個(gè)goroutine失去了運(yùn)行權(quán),但并不是真正的整個(gè)系統(tǒng)“阻塞”于系統(tǒng)調(diào)用谦絮。而通過后臺(tái)的poller不停地poll题诵,所有的文件描述符都被添加到了這個(gè)poller中的,當(dāng)某個(gè)時(shí)刻一個(gè)文件描述符準(zhǔn)備好了层皱,poller就會(huì)喚醒之前因它而阻塞的goroutine性锭,于是goroutine重新運(yùn)行起來。
和使用Unix系統(tǒng)中的select或是poll方法不同地是叫胖,Golang的netpoller查詢的是能被調(diào)度的goroutine而不是那些函數(shù)指針草冈、包含了各種狀態(tài)變量的struct等,這樣你就不用管理這些狀態(tài)瓮增,也不用重新檢查函數(shù)指針等怎棱,這些都是你在傳統(tǒng)Unix網(wǎng)絡(luò)I/O需要操心的問題。
References:
https://zhuanlan.zhihu.com/p/143847169
https://developer.aliyun.com/article/893401
https://zhuanlan.zhihu.com/p/159457916
https://www.yuque.com/aceld/golang/sdgfgu
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-netpoller
https://strikefreedom.top/archives/go-netpoll-io-multiplexing-reactor
https://juejin.cn/post/6882984260672847879
https://mp.weixin.qq.com/s/T-hP3wt4whtvVh1H1LBU3w
https://yizhi.ren/2019/06/08/gonetpoller/
https://www.cnblogs.com/luozhiyun/p/14390824.html
https://cloud.tencent.com/developer/article/1234360
https://learnku.com/articles/59847