golang筆記——深入了解netpoller

大部分的服務(wù)都是 I/O 密集型的谬晕,應(yīng)用程序會(huì)花費(fèi)大量時(shí)間等待 I/O 操作的完成们何。網(wǎng)絡(luò)輪詢器(netpoller)是 Go 語言運(yùn)行時(shí)用來處理 I/O 操作的關(guān)鍵組件帖旨,它使用了操作系統(tǒng)提供的 I/O 多路復(fù)用機(jī)制增強(qiáng)程序的并發(fā)處理能力。本文會(huì)詳細(xì)介紹I/O模型相關(guān)知識(shí)歌憨,并深入分析 Go 語言網(wǎng)絡(luò)輪詢器的設(shè)計(jì)與實(shí)現(xiàn)原理。

I/O 相關(guān)基礎(chǔ)概念

文件

在Linux世界中文件是一個(gè)很簡(jiǎn)單的概念墩衙,作為程序員我們只需要將其理解為一個(gè)N byte的序列就可以了务嫡。

實(shí)際上所有的I/O設(shè)備都被抽象為了文件這個(gè)概念,一切皆文件底桂,Everything is File植袍,磁盤、網(wǎng)絡(luò)數(shù)據(jù)籽懦、終端于个,甚至進(jìn)程間通信工具管道pipe等都被當(dāng)做文件對(duì)待。

所有的I/O操作也都可以通過文件讀寫來實(shí)現(xiàn)暮顺,這一非常優(yōu)雅的抽象可以讓程序員使用一套接口就能對(duì)所有外設(shè)I/O操作厅篓。
常用的I/O操作接口一般有以下幾類:
? 打開文件,open
? 改變讀寫位置捶码,seek
? 文件讀寫羽氮,read、write
? 關(guān)閉文件惫恼,close

程序員通過這幾個(gè)接口幾乎可以實(shí)現(xiàn)所有I/O操作档押,這就是文件這個(gè)概念的強(qiáng)大之處。

文件描述符

要想進(jìn)行I/O讀操作,像磁盤數(shù)據(jù)令宿,我們需要指定一個(gè)buff用來裝入數(shù)據(jù)叼耙,一般都是這樣寫的

read(buff)

雖然我們指定了往哪里寫數(shù)據(jù),但是我們?cè)搹哪睦镒x數(shù)據(jù)呢粒没?我們無法確定哪個(gè)文件是我們需要去讀取的筛婉。Linux為了高效管理已被打開的文件,于是引入了索引:文件描述符(file descriptor)癞松。fd用于指代被打開的文件爽撒,對(duì)文件所有 I/O 操作相關(guān)的系統(tǒng)調(diào)用都需要通過fd


有了文件描述符响蓉,進(jìn)程可以對(duì)文件一無所知硕勿,比如文件在磁盤的什么位置、加載到內(nèi)存中又是怎樣管理的等等枫甲,這些信息統(tǒng)統(tǒng)交由操作系統(tǒng)打理首尼,進(jìn)程無需關(guān)心,操作系統(tǒng)只需要給進(jìn)程一個(gè)文件描述符就足夠了言秸。
因此我們來完善上述程序:

int fd = open(file_name); // 獲取文件描述符read(fd, buff);
read(fd, buff);

I/O模型

目前Linux系統(tǒng)中提供了以下5種IO處理模型,不同的 I/O 模型會(huì)使用不同的方式操作文件描述符:

  1. 阻塞I/O
  2. 非阻塞I/O
  3. I/O多路復(fù)用
  4. 信號(hào)驅(qū)動(dòng)I/O
  5. 異步I/O
阻塞I/O(Blocking I/O)

阻塞 I/O 是最常見的 I/O 模型迎捺,在默認(rèn)情況下举畸,當(dāng)我們通過 read 或者 write 等系統(tǒng)調(diào)用讀寫文件或者網(wǎng)絡(luò)時(shí),應(yīng)用程序會(huì)被阻塞凳枝。

如下圖所示抄沮,當(dāng)我們執(zhí)行 recvfrom 系統(tǒng)調(diào)用時(shí),應(yīng)用程序會(huì)從用戶態(tài)陷入內(nèi)核態(tài)岖瑰,內(nèi)核會(huì)檢查文件描述符是否就緒叛买;當(dāng)文件描述符中存在數(shù)據(jù)時(shí),操作系統(tǒng)內(nèi)核會(huì)將準(zhǔn)備好的數(shù)據(jù)拷貝給應(yīng)用程序并交回控制權(quán)蹋订。

線程會(huì)阻塞在這里率挣,然后掛起(掛起的時(shí)候,cpu回去處理其他任務(wù))露戒,等待隊(duì)列不為空椒功。

非阻塞I/O(NoneBlocking I/O)

當(dāng)進(jìn)程把一個(gè)文件描述符設(shè)置成非阻塞時(shí),執(zhí)行 read 和 write 等 I/O 操作會(huì)立刻返回智什。在 C 語言中动漾,我們可以使用如下所示的代碼片段將一個(gè)文件描述符設(shè)置成非阻塞的:

int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);

在上述代碼中,最關(guān)鍵的就是系統(tǒng)調(diào)用 fcntl 和參數(shù) O_NONBLOCK荠锭,fcntl 為我們提供了操作文件描述符的能力旱眯,我們可以通過它修改文件描述符的特性。當(dāng)我們將文件描述符修改成非阻塞后,讀寫文件會(huì)經(jīng)歷以下流程:

第一次從文件描述符中讀取數(shù)據(jù)會(huì)觸發(fā)系統(tǒng)調(diào)用并返回 EAGAIN錯(cuò)誤删豺,EAGAIN意味著該文件描述符還在等待緩沖區(qū)中的數(shù)據(jù)共虑;隨后,應(yīng)用程序會(huì)不斷輪詢調(diào)用 read 直到它的返回值大于 0吼鳞,這時(shí)應(yīng)用程序就可以對(duì)讀取操作系統(tǒng)緩沖區(qū)中的數(shù)據(jù)并進(jìn)行操作看蚜。進(jìn)程使用非阻塞的 I/O 操作時(shí),可以在等待過程中執(zhí)行其他任務(wù)赔桌,提高 CPU 的利用率供炎。

I/O多路復(fù)用(I/O multiplexing)

原本是一個(gè) I/O對(duì)應(yīng)一個(gè)進(jìn)程,這樣的話如果有1000個(gè)i/o 就需要啟動(dòng)1000個(gè)進(jìn)程疾党,這對(duì)于一個(gè)16核音诫,8核的cpu來說,需要大量性能損耗在進(jìn)程間的切換上雪位。所以優(yōu)化了一種方案是N個(gè)i/o只對(duì)應(yīng)一個(gè)進(jìn)程來處理竭钝。這個(gè)就是I/O 多路復(fù)用。

I/O 多路復(fù)用機(jī)制雹洗,就是說通過一種機(jī)制香罐,可以監(jiān)視多個(gè)描述符;一旦某個(gè)描述符就緒(一般是讀就緒或?qū)懢途w)时肿,能夠通知程序進(jìn)行相應(yīng)的讀寫操作庇茫;沒有文件句柄就緒就會(huì)阻塞應(yīng)用程序,交出CPU螃成。這種機(jī)制的使用需要 select 旦签、 pollepoll來配合寸宏。

select

在select這種I/O多路復(fù)用機(jī)制下宁炫,我們需要把想監(jiān)控的fd集合通過函數(shù)參數(shù)的形式告訴select氮凝,然后select會(huì)將這些文件描述符集合拷貝到內(nèi)核中覆醇。

我們知道數(shù)據(jù)拷貝是有性能損耗的朵纷,因此為了減少這種數(shù)據(jù)拷貝帶來的性能損耗永脓,Linux內(nèi)核對(duì)集合的大小做了限制,并規(guī)定用戶監(jiān)控的文件描述集合不能超過1024個(gè)常摧,同時(shí)當(dāng)select返回后我們僅僅能知道有些文件描述符可以讀寫了威创,但是我們不知道是哪一個(gè)肚豺,因此程序員必須再遍歷一邊找到具體是哪個(gè)文件描述符可以讀寫了吸申。

select的缺點(diǎn)

  1. 能監(jiān)控的文件描述符太少截碴,通過 FD_SETSIZE 設(shè)置,默認(rèn)1024個(gè)
  2. 每次調(diào)用 select哲虾,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)束凑,這個(gè)開銷在 fd 很多時(shí)會(huì)很大(需要維護(hù)一個(gè)用來存放大量fd的數(shù)據(jù)結(jié)構(gòu)湘今,這樣會(huì)使得用戶空間和內(nèi)核空間在傳遞該結(jié)構(gòu)時(shí)復(fù)制開銷大)
  3. 每次調(diào)用select,都需要在內(nèi)核,遍歷fd集合進(jìn)行無差別輪詢孝常,性能開銷大(如果能給套接字注冊(cè)某個(gè)回調(diào)函數(shù),當(dāng)他們活躍時(shí)喜颁,自動(dòng)完成相關(guān)操作半开,那就避免了輪詢寂拆,這正是epoll與kqueue做的)
poll

poll本質(zhì)上和select沒有區(qū)別鬓长,它將用戶傳入的數(shù)組拷貝到內(nèi)核空間涉波,然后查詢每個(gè)fd對(duì)應(yīng)的設(shè)備狀態(tài)啤覆, 但是它沒有最大連接數(shù)的限制,原因是它是基于鏈表來存儲(chǔ)的嫌佑。

poll的缺點(diǎn)

  1. 每次調(diào)用 poll 屋摇,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài)炮温,這個(gè)開銷在 fd 很多時(shí)會(huì)很大;
  2. 對(duì) fd集合 掃描是線性掃描担巩,采用輪詢的方法涛癌,效率較低(高并發(fā)時(shí))
epoll

epoll可以理解為event poll,不同于忙輪詢和無差別輪詢弃衍,epoll會(huì)把哪個(gè)流發(fā)生了怎樣的I/O事件通知我們笨鸡。所以我們說epoll實(shí)際上是事件驅(qū)動(dòng)(每個(gè)事件關(guān)聯(lián)上fd)的哥桥,此時(shí)我們對(duì)這些流的操作都是有意義的拟糕。(復(fù)雜度降低到了O(1))

epoll函數(shù)接口:

#include <sys/epoll.h>

// 數(shù)據(jù)結(jié)構(gòu)
// 每一個(gè)epoll對(duì)象都有一個(gè)獨(dú)立的eventpoll結(jié)構(gòu)體
// 用于存放通過epoll_ctl方法向epoll對(duì)象中添加進(jìn)來的事件
// epoll_wait檢查是否有事件發(fā)生時(shí),只需要檢查eventpoll對(duì)象中的rdlist雙鏈表中是否有epitem元素即可
struct eventpoll {
    /*紅黑樹的根節(jié)點(diǎn)犁嗅,這顆樹中存儲(chǔ)著所有添加到epoll中的需要監(jiān)控的事件*/
    struct rb_root  rbr;
    /*雙鏈表中則存放著將要通過epoll_wait返回給用戶的滿足條件的事件*/
    struct list_head rdlist;
};

// API
int epoll_create(int size); // 內(nèi)核中間加一個(gè) ep 對(duì)象,把所有需要監(jiān)聽的 fd 都放到 ep 對(duì)象中
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 負(fù)責(zé)把 fd 增加宠蚂、刪除到內(nèi)核紅黑樹
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 負(fù)責(zé)檢測(cè)可讀隊(duì)列求厕,沒有可讀 fd 則阻塞

每一個(gè)epoll對(duì)象都有一個(gè)獨(dú)立的eventpoll結(jié)構(gòu)體,用于存放通過epoll_ctl方法向epoll對(duì)象中添加進(jìn)來的事件十艾。這些事件都會(huì)掛載在紅黑樹中荤牍,如此康吵,重復(fù)添加的事件就可以通過紅黑樹而高效的識(shí)別出來(紅黑樹的插入時(shí)間效率是logn晦嵌,其中n為紅黑樹元素個(gè)數(shù))旱函。

而所有添加到epoll中的事件都會(huì)與設(shè)備(網(wǎng)卡)驅(qū)動(dòng)程序建立回調(diào)關(guān)系棒妨,也就是說伏穆,當(dāng)相應(yīng)的事件發(fā)生時(shí)會(huì)調(diào)用這個(gè)回調(diào)方法枕扫。這個(gè)回調(diào)方法在內(nèi)核中叫ep_poll_callback,它會(huì)將發(fā)生的事件添加到rdlist雙鏈表中。

在epoll中燕刻,對(duì)于每一個(gè)事件,都會(huì)建立一個(gè)epitem結(jié)構(gòu)體过蹂,如下所示:

struct epitem{
    struct rb_node  rbn;//紅黑樹節(jié)點(diǎn)
    struct list_head    rdllink;//雙向鏈表節(jié)點(diǎn)
    struct epoll_filefd  ffd;  //事件句柄信息
    struct eventpoll *ep;    //指向其所屬的eventpoll對(duì)象
    struct epoll_event event; //期待發(fā)生的事件類型
}

當(dāng)調(diào)用epoll_wait檢查是否有事件發(fā)生時(shí),只需要檢查eventpoll對(duì)象中的rdlist雙鏈表中是否有epitem元素即可脆诉。如果rdlist不為空击胜,則把發(fā)生的事件復(fù)制到用戶態(tài),同時(shí)將事件數(shù)量返回給用戶辰斋。


從上面的講解可知:通過于紅黑樹和雙鏈表數(shù)據(jù)結(jié)構(gòu)够挂,并結(jié)合回調(diào)機(jī)制,造就了epoll的高效梭姓。

講解完了Epoll的機(jī)理誉尖,我們便能很容易掌握epoll的用法了。一句話描述就是:三步曲探熔。

第一步:epoll_create()系統(tǒng)調(diào)用。此調(diào)用返回一個(gè)句柄其垄,之后所有的使用都依靠這個(gè)句柄來標(biāo)識(shí)。創(chuàng)建一個(gè)epoll句柄喇颁,實(shí)際上是在內(nèi)核空間,建立一個(gè)root根節(jié)點(diǎn),這個(gè)根節(jié)點(diǎn)的關(guān)系與epfd相對(duì)應(yīng)忱辅。

第二步:epoll_ctl()系統(tǒng)調(diào)用橡卤。通過此調(diào)用向epoll對(duì)象中添加、刪除嵌灰、修改感興趣的事件,返回0標(biāo)識(shí)成功驹溃,返回-1表示失敗。創(chuàng)建的該用戶態(tài)事件布疙,綁定到某個(gè)fd上,然后添加到內(nèi)核中的epoll紅黑樹中俱诸。

第三步:epoll_wait()系統(tǒng)調(diào)用。通過此調(diào)用收集在epoll監(jiān)控中已經(jīng)發(fā)生的事件。如果內(nèi)核檢測(cè)到IO的讀寫響應(yīng)锌唾,會(huì)拋給上層的epoll_wait, 返回給用戶態(tài)一個(gè)已經(jīng)觸發(fā)的事件隊(duì)列滋捶,同時(shí)阻塞返回。開發(fā)者可以從隊(duì)列中取出事件來處理巡扇,其中事件里就有綁定的對(duì)應(yīng)fd是哪個(gè)(之前添加epoll事件的時(shí)候已經(jīng)綁定)。

  • epoll的優(yōu)點(diǎn)
  1. 沒有最大并發(fā)連接的限制知给,能打開的FD的上限遠(yuǎn)大于1024(1G的內(nèi)存上能監(jiān)聽約10萬個(gè)端口)
  2. 效率提升,不是輪詢的方式,不會(huì)隨著FD數(shù)目的增加效率下降花墩。只有活躍可用的FD才會(huì)調(diào)用callback函數(shù);即Epoll最大的優(yōu)點(diǎn)就在于它只管你“活躍”的連接祠肥,而跟連接總數(shù)無關(guān),因此在實(shí)際的網(wǎng)絡(luò)環(huán)境中剂桥,Epoll的效率就會(huì)遠(yuǎn)遠(yuǎn)高于select和poll
  • epoll的缺點(diǎn)
  1. epoll只能工作在 linux 下
  • epoll LT 與 ET 模式的區(qū)別
    epoll 有 EPOLLLT 和 EPOLLET 兩種觸發(fā)模式,LT 是默認(rèn)的模式斟薇,ET 是 “高速” 模式。
  1. LT 模式下,只要這個(gè) fd 還有數(shù)據(jù)可讀犯眠,每次 epoll_wait 都會(huì)返回它的事件,提醒用戶程序去操作量蕊;
  2. ET 模式下,它只會(huì)提示一次,直到下次再有數(shù)據(jù)流入之前都不會(huì)再提示了舅锄,無論 fd 中是否還有數(shù)據(jù)可讀。所以在 ET 模式下,read 一個(gè) fd 的時(shí)候一定要把它的 buffer 讀完老翘,或者遇到 EAGIN 錯(cuò)誤。

epoll使用“事件”的就緒通知方式傀履,通過epoll_ctl注冊(cè)fd,一旦該fd就緒,內(nèi)核就會(huì)采用類似callback的回調(diào)機(jī)制來激活該fd啦粹,epoll_wait便可以收到通知。

select/poll/epoll之間的區(qū)別

select,poll撩荣,epoll都是IO多路復(fù)用的機(jī)制。I/O多路復(fù)用就通過一種機(jī)制,可以監(jiān)視多個(gè)描述符饱狂,一旦某個(gè)描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進(jìn)行相應(yīng)的讀寫操作筹麸。但select,poll雏婶,epoll本質(zhì)上都是同步I/O物赶,因?yàn)樗麄兌夹枰谧x寫事件就緒后自己負(fù)責(zé)進(jìn)行讀寫,也就是說這個(gè)讀寫過程是阻塞的留晚,而異步I/O則無需自己負(fù)責(zé)進(jìn)行讀寫酵紫,異步I/O的實(shí)現(xiàn)會(huì)負(fù)責(zé)把數(shù)據(jù)從內(nèi)核拷貝到用戶空間错维。

epoll跟select都能提供多路I/O復(fù)用的解決方案憨闰。在現(xiàn)在的Linux內(nèi)核里有都能夠支持,其中epoll是Linux所特有需五,而select則應(yīng)該是POSIX所規(guī)定,一般操作系統(tǒng)均有實(shí)現(xiàn)轧坎。

select poll epoll
獲取可用的fd 遍歷 遍歷 回調(diào)
存儲(chǔ)fd的數(shù)據(jù)結(jié)構(gòu) bitmap 數(shù)組 紅黑樹
最大連接數(shù) 1024(x86)或 2048(x64) 無上限 無上限
最大支持fd數(shù)量 一般有最大值限制 65535 65535
fd拷貝 每次調(diào)用select宏邮,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài) 每次調(diào)用poll,都需要把fd集合從用戶態(tài)拷貝到內(nèi)核態(tài) fd首次調(diào)用epoll_ctl拷貝缸血,每次調(diào)用epoll_wait不拷貝
工作模式 LT LT 支持lT默認(rèn)模式及ET高效模式
工作效率 每次調(diào)用都進(jìn)行線性遍歷蜜氨,時(shí)間復(fù)雜度為O(n) 每次調(diào)用都進(jìn)行線性遍歷,時(shí)間復(fù)雜度為O(n) 事件通知方式捎泻,每當(dāng)fd就緒飒炎,系統(tǒng)注冊(cè)的回調(diào)函數(shù)就會(huì)被調(diào)用,將就緒fd放到readyList里面笆豁,時(shí)間復(fù)雜度O(1)

epoll是Linux目前大規(guī)模網(wǎng)絡(luò)并發(fā)程序開發(fā)的首選模型郎汪。在絕大多數(shù)情況下性能遠(yuǎn)超select和poll。目前流行的高性能web服務(wù)器Nginx正式依賴于epoll提供的高效網(wǎng)絡(luò)套接字輪詢服務(wù)闯狱。但是煞赢,在并發(fā)連接不高的情況下,多線程+阻塞I/O方式可能性能更好哄孤。

select照筑,poll實(shí)現(xiàn)需要自己不斷輪詢所有fd集合,直到設(shè)備就緒瘦陈,期間可能要睡眠和喚醒多次交替凝危。而epoll其實(shí)也需要調(diào)用epoll_wait不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替晨逝,但是它是設(shè)備就緒時(shí)蛾默,調(diào)用回調(diào)函數(shù),把就緒fd放入就緒鏈表中捉貌,并喚醒在epoll_wait中進(jìn)入睡眠的進(jìn)程趴生。雖然都要睡眠和交替阀趴,但是select和poll在“醒著”的時(shí)候要遍歷整個(gè)fd集合,而epoll在“醒著”的時(shí)候只要判斷一下就緒鏈表是否為空就行了苍匆,這節(jié)省了大量的CPU時(shí)間刘急。這就是回調(diào)機(jī)制帶來的性能提升。

select浸踩,poll每次調(diào)用都要把fd集合從用戶態(tài)往內(nèi)核態(tài)拷貝一次叔汁,并且要把current往設(shè)備等待隊(duì)列中掛一次。而epoll只要一次拷貝检碗,而且把current往等待隊(duì)列上掛也只掛一次*(在epoll_wait的開始据块,注意這里的等待隊(duì)列并不是設(shè)備等待隊(duì)列,只是一個(gè)epoll內(nèi)部定義的等待隊(duì)列)折剃。這也能節(jié)省不少的開銷另假。

信號(hào)驅(qū)動(dòng)I/O

在信號(hào)驅(qū)動(dòng)IO模型中,當(dāng)用戶線程發(fā)起一個(gè)IO請(qǐng)求操作怕犁,會(huì)給對(duì)應(yīng)的socket注冊(cè)一個(gè)信號(hào)函數(shù)边篮,然后用戶線程會(huì)繼續(xù)執(zhí)行,當(dāng)內(nèi)核數(shù)據(jù)就緒時(shí)會(huì)發(fā)送一個(gè)信號(hào)給用戶線程奏甫,用戶線程接收到信號(hào)后戈轿,便在信號(hào)函數(shù)中調(diào)用IO讀寫操作來進(jìn)行實(shí)際的IO請(qǐng)求操作。這個(gè)一般用于UDP中阵子,對(duì)TCP套接字幾乎沒用思杯,原因是該信號(hào)產(chǎn)生得過于頻繁,并且該信號(hào)的出現(xiàn)并沒有告訴我們發(fā)生了什么請(qǐng)求挠进。


用戶進(jìn)程可以使用信號(hào)方式色乾,當(dāng)系統(tǒng)內(nèi)核描述符就緒時(shí)將會(huì)發(fā)送SIGNO給到用戶空間,這個(gè)時(shí)候再發(fā)起recvfrom的系統(tǒng)調(diào)用等待返回成功提示领突,流程如下:

  • 先開啟套接字的信號(hào)IO啟動(dòng)功能杈湾,并通過一個(gè)內(nèi)置安裝信號(hào)處理函數(shù)的signaction系統(tǒng)調(diào)用,當(dāng)發(fā)起調(diào)用之后會(huì)直接返回攘须;
  • 其次漆撞,等待內(nèi)核從網(wǎng)絡(luò)中接收數(shù)據(jù)報(bào)之后,向用戶空間發(fā)送當(dāng)前數(shù)據(jù)可達(dá)的信號(hào)給信號(hào)處理函數(shù)于宙;
  • 信號(hào)處理函數(shù)接收到信息就發(fā)起recvfrom系統(tǒng)調(diào)用等待內(nèi)核數(shù)據(jù)復(fù)制數(shù)據(jù)報(bào)到用戶空間的緩沖區(qū)浮驳;
  • 接收到復(fù)制完成的返回成功提示之后,應(yīng)用進(jìn)程就可以開始從網(wǎng)絡(luò)中讀取數(shù)據(jù)捞魁。

異步I/O

前面四種IO模型實(shí)際上都屬于同步IO至会,只有最后一種是真正的異步IO,因?yàn)闊o論是多路復(fù)用IO還是信號(hào)驅(qū)動(dòng)模型谱俭,IO操作的第2個(gè)階段都會(huì)引起用戶線程阻塞奉件,也就是內(nèi)核進(jìn)行數(shù)據(jù)拷貝的過程都會(huì)讓用戶線程阻塞宵蛀。


  • 由POSIX規(guī)范定義,告知系統(tǒng)內(nèi)核啟動(dòng)某個(gè)操作县貌,并讓內(nèi)核在整個(gè)操作包含數(shù)據(jù)等待以及數(shù)據(jù)復(fù)制過程的完成之后通知用戶進(jìn)程數(shù)據(jù)已經(jīng)準(zhǔn)備完成术陶,可以進(jìn)行讀取數(shù)據(jù);
  • 與上述的信號(hào)IO模型區(qū)分在于異步是通知我們何時(shí)IO操作完成,而信號(hào)IO是通知我們何時(shí)可以啟動(dòng)一個(gè)IO操作

同步IO/異步IO/阻塞IO/非阻塞IO(基于POSIX規(guī)范)

  • 同步IO: 表示應(yīng)用進(jìn)程發(fā)起真實(shí)的IO操作請(qǐng)求(recvfrom)導(dǎo)致進(jìn)程一直處于等待狀態(tài),這時(shí)候進(jìn)程被阻塞,直到IO操作完成返回成功提示
  • 異步IO: 表示應(yīng)用進(jìn)程發(fā)起真實(shí)的IO操作請(qǐng)求(recvfrom)導(dǎo)致進(jìn)程將直接返回一個(gè)錯(cuò)誤信息,“相當(dāng)于告訴進(jìn)程還沒有處理好,好了會(huì)通知你”
  • 阻塞IO: 主要是體現(xiàn)發(fā)起IO操作請(qǐng)求通知內(nèi)核并且內(nèi)核接收到信號(hào)之后如果讓進(jìn)程等待,那么就是阻塞
  • 非阻塞IO: 發(fā)起IO操作請(qǐng)求的時(shí)候不論結(jié)果直接告訴進(jìn)程“不用等待,晚點(diǎn)再來”,那就是非阻塞

IO模型對(duì)比

除了真正的異步I/O模型以外煤痕,其他幾種模型梧宫,最后一階段的處理都是相同的——阻塞于recvfrom調(diào)用,將數(shù)據(jù)從內(nèi)核拷貝到應(yīng)用緩沖區(qū)摆碉。


同步與異步針對(duì)通信機(jī)制,阻塞與非阻塞針對(duì)程序調(diào)用等待結(jié)果的狀態(tài)

netpoller

netpoller基本原理

在Go的實(shí)現(xiàn)中塘匣,所有IO都是阻塞調(diào)用的,Go的設(shè)計(jì)思想是程序員使用阻塞式的接口來編寫程序巷帝,然后通過goroutine+channel來處理并發(fā)忌卤。因此所有的IO邏輯都是直來直去的,先xx楞泼,再xx, 你不再需要回調(diào)驰徊,不再需要future,要的僅僅是step by step现拒。這對(duì)于代碼的可讀性是很有幫助的。

netpoller的工作就是成為同步(阻塞)IO調(diào)用和異步(非阻塞)IO調(diào)用之間的橋梁望侈。

Go netpoller 通過在底層對(duì) epoll/kqueue/iocp 的封裝印蔬,從而實(shí)現(xiàn)了使用同步編程模式達(dá)到異步執(zhí)行的效果⊥蜒茫總結(jié)來說侥猬,所有的網(wǎng)絡(luò)操作都以網(wǎng)絡(luò)描述符 netFD 為中心實(shí)現(xiàn)。netFD 與底層 PollDesc 結(jié)構(gòu)綁定捐韩,當(dāng)在一個(gè) netFD 上讀寫遇到 EAGAIN 錯(cuò)誤時(shí)退唠,就將當(dāng)前 goroutine 存儲(chǔ)到這個(gè) netFD 對(duì)應(yīng)的 PollDesc 中,同時(shí)調(diào)用 gopark 把當(dāng)前 goroutine 給 park 住荤胁,直到這個(gè) netFD 上再次發(fā)生讀寫事件瞧预,才將此 goroutine 給 ready 激活重新運(yùn)行。顯然仅政,在底層通知 goroutine 再次發(fā)生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅(qū)動(dòng)機(jī)制垢油。

Go 是一門跨平臺(tái)的編程語言,而不同平臺(tái)針對(duì)特定的功能有不用的實(shí)現(xiàn)圆丹,這當(dāng)然也包括了 I/O 多路復(fù)用技術(shù)滩愁,比如 Linux 里的 I/O 多路復(fù)用有 select、poll 和 epoll辫封,而 freeBSD 或者 MacOS 里則是 kqueue硝枉,而 Windows 里則是基于異步 I/O 實(shí)現(xiàn)的 iocp廉丽,等等;因此妻味,Go 為了實(shí)現(xiàn)底層 I/O 多路復(fù)用的跨平臺(tái)正压,分別基于上述的這些不同平臺(tái)的系統(tǒng)調(diào)用實(shí)現(xiàn)了多版本的 netpollers,具體的源碼路徑如下:

本文的解析基于 epoll 版本弧可,如果讀者對(duì)其他平臺(tái)的 netpoller 底層實(shí)現(xiàn)感興趣蔑匣,可以在閱讀完本文后自行翻閱其他 netpoller 源碼,所有實(shí)現(xiàn)版本的機(jī)制和原理基本類似棕诵。

netpoller代碼結(jié)構(gòu)概覽

實(shí)際的實(shí)現(xiàn)(epoll/kqueue)必須定義以下函數(shù):

func netpollinit() // 初始化輪詢器
func netpollopen(fd uintptr, pd *pollDesc) int32 // 為fd和pd啟動(dòng)邊緣觸發(fā)通知

當(dāng)一個(gè)goroutine進(jìn)行io阻塞時(shí)裁良,會(huì)去被放到等待隊(duì)列。這里面就關(guān)鍵的就是建立起文件描述符和goroutine之間的關(guān)聯(lián)校套。 pollDesc結(jié)構(gòu)體就是完成這個(gè)任務(wù)的价脾。代碼參見src/runtime/netpoll.go

type pollDesc struct { // Poller對(duì)象
    link *pollDesc // 鏈表
    lock mutex // 保護(hù)下面字段
    fd uintptr // fd是底層網(wǎng)絡(luò)io文件描述符,整個(gè)生命期內(nèi)笛匙,不能改變值
    closing bool
    seq uintptr // protect from stale(過時(shí)) timers and ready notifications
    rg uintptr // reader goroutine addr
    rt timer
    rd int64
    wg uintptr // writer goroutine addr
    wt timer
    wd int64
    user int32 // user-set cookie用戶自定義數(shù)據(jù)
}

type pollCache struct { // 全局Poller鏈表
    lock mutex // 保護(hù)Poller鏈表
    first *pollDesc
}
// 調(diào)用netpollinit()
func poll_runtime_pollServerInit() {}
// 調(diào)用netpollopen()
func poll_runtime_pollOpen() {}
// 調(diào)用netpollclose()
func poll_runtime_pollClose() {}
// 先check(netpollcheckerr(pd, mode))是否有err發(fā)生侨把,沒有的話重置pd對(duì)應(yīng)字段
func poll_runtime_pollReset(pd, mode) {}
// 先chekerr,再調(diào)用netpollblock(pd, mode, false) {}
func poll_runtime_pollWait(pd, mode) {}
// windows下專用
func poll_runtime_pollWaitCanceled(pd, mode) {}
func poll_runtime_pollSetDeadline(pd, deadline, mode) {}
//1. 重置定時(shí)器妹孙,并seq++
//2. 設(shè)置超時(shí)函數(shù)netpollDeadline(或者netpollReadDeadline秋柄、netpollWriteDeadline)
//3. 如果已經(jīng)過期,調(diào)用netpollunblock和netpollgoready
// netpollUnblock蠢正、netpollgoready
func poll_runtime_pollUnblock(pd) {} 

/*------------------部分實(shí)現(xiàn)------------------*/
// 檢查是否超時(shí)或正在關(guān)閉
func netpollcheckerr(pd, mode) {}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) {}
// 調(diào)用netpollunblock骇笔,更新g的
func netpollready(gpp *guintptr, pd, mode) schedlink {}
// 更新統(tǒng)計(jì)數(shù)據(jù),調(diào)用goready --- 通知調(diào)度器協(xié)程g從parked變?yōu)閞eady
func netpollgoready(gp *g, traceskip) {}
// Set rg/wg = pdWait嚣崭,調(diào)用gopark掛起pd對(duì)應(yīng)的g笨触。
func netpollblock(pd, mode, waitio) {}
func netpollunblock(pd, mode, ioready) {}
func netpoll(Write/Read)Deadline(arg, seq) {}

pollCache是pollDesc鏈表入口,加鎖保護(hù)鏈表安全雹舀。
pollDesc中芦劣,rg、wg有些特殊说榆,它可能有如下3種狀態(tài):

  1. pdReady == 1: 網(wǎng)絡(luò)io就緒通知虚吟,goroutine消費(fèi)完后應(yīng)置為nil

  2. pdWait == 2: goroutine等待被掛起,后續(xù)可能有3種情況:

    • goroutine被調(diào)度器掛起签财,置為goroutine地址
    • 收到io通知稍味,置為pdReady
    • 超時(shí)或者被關(guān)閉,置為nil
  3. Goroutine地址: 被掛起的goroutine的地址荠卷,當(dāng)io就緒時(shí)模庐、或者超時(shí)、被關(guān)閉時(shí)油宜,此goroutine將被喚醒掂碱,同時(shí)將狀態(tài)改為pdReady或者nil怜姿。

另外,由于wg疼燥、rg是goroutine的地址沧卢,因此當(dāng)GC發(fā)生后,如果goroutine被回收(在heap區(qū))醉者,代碼就崩潰了(指針無效)但狭。所以,進(jìn)行網(wǎng)絡(luò)IO的goroutine不能在heap區(qū)分配內(nèi)存撬即。

lock鎖對(duì)象保護(hù)了pollOpen, pollSetDeadline, pollUnblockdeadlineimpl操作立磁。而這些操作又完全包含了對(duì)seq, rt, tw變量。fd在PollDesc整個(gè)生命過程中都是一個(gè)常量剥槐。處理pollReset, pollWait, pollWaitCanceledruntime.netpollready(IO就緒通知)不需要用到鎖唱歧,所以closing, rg, rd, wg和wd的所有操作都是一個(gè)無鎖的操作

netpoller多路復(fù)用三部曲

初始化PollServer

初始化在下面注冊(cè)fd監(jiān)聽時(shí)順便處理了粒竖,調(diào)用runtime_pollServerInit()颅崩,并使用sync.Once()機(jī)制保證只會(huì)被初始化一次。全局使用同一個(gè)EpollServer(同一個(gè)Epfd)蕊苗。

func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lockInit(&netpollInitLock, lockRankNetpollInit)
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit() // 具現(xiàn)化到Linux下沿后,調(diào)用epoll_create
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd < 0 {
        epfd = epollcreate(1024)
        if epfd < 0 {
            println("runtime: epollcreate failed with", -epfd)
            throw("runtime: netpollinit failed")
        }
        closeonexec(epfd)
    }
    r, w, errno := nonblockingPipe()
    if errno != 0 {
        println("runtime: pipe failed with", -errno)
        throw("runtime: pipe failed")
    }
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    if errno != 0 {
        println("runtime: epollctl failed with", -errno)
        throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}
注冊(cè)監(jiān)聽fd

所有Unix文件在初始化時(shí),如果支持Poll朽砰,都會(huì)加入到PollServer的監(jiān)聽中尖滚。

/*****************internal/poll/fd_unix.go*******************/
type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex
    // System file descriptor. Immutable until Close.
    Sysfd int
    // I/O poller.
    pd pollDesc
    ...
}
func(fd *FD) Init(net string, pollable bool) error {
    ...
    err := fd.pd.init(fd) // 初始化pd
    ...
}
...
/*****************internal/poll/fd_poll_runtime.go*****************/
type pollDesc struct {
    runtimeCtx uintptr
}
func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit) // 初始化PollServer(sync.Once)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    ...
    runtimeCtx = ctx
    return nil
}
...
/*****************runtime/netpoll.go*****************/
func poll_runtime_pollOpen(fd uintptr) (*epDesc, int32) {
    ...
    errno := netpollopen(fd, pd) // 具現(xiàn)化到Linux下,調(diào)用epoll_ctl
    ...
}

取消fd的監(jiān)聽與此流程類似锅移,最終調(diào)用epoll_ctl.

定期Poll

結(jié)合上述實(shí)現(xiàn)熔掺,必然有處邏輯定期執(zhí)行epoll_wait來檢測(cè)fd狀態(tài)饱搏。在代碼中搜索下netpoll非剃,即可發(fā)現(xiàn)是在sysmon、startTheWorldWithSema推沸、pollWork备绽、findrunnable中調(diào)用的,以sysmon為例:

// runtime/proc.go
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
// 如果10ms內(nèi)沒有poll過鬓催,則poll肺素。(1ms=1000000ns)
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
    atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
    gp := netpoll(false) // netpoll在Linux具現(xiàn)為epoll_wait
    if gp != nil {
       injectglist(gp) //把g放到sched中去執(zhí)行,底層仍然是調(diào)用的之前在goroutine里面提到的startm函數(shù)宇驾。
    }
}
...

goroutine的I/O讀取流程

當(dāng)goroutine發(fā)起一個(gè)同步調(diào)用倍靡,經(jīng)過一系列的調(diào)用,最后會(huì)進(jìn)入gopark函數(shù)课舍,gopark將當(dāng)前正在執(zhí)行的goroutine狀態(tài)保存起來塌西,然后切換到新的堆棧上執(zhí)行新的goroutine他挎。由于當(dāng)前goroutine狀態(tài)是被保存起來的,因此后面可以被恢復(fù)捡需。這樣調(diào)用Read的goroutine以為一直同步阻塞到現(xiàn)在办桨,其實(shí)內(nèi)部是異步完成的。

1. 加入監(jiān)聽

golang中客戶端與服務(wù)端進(jìn)行通訊時(shí)站辉,常用如下方法:

conn, err := net.Dial("tcp", "localhost:1208")

從net.Dial看進(jìn)去呢撞,最終會(huì)調(diào)用net/net_posix.go中的socket函數(shù),大致流程如下:

func socket(...) ... {
    /*
    1. 調(diào)用sysSocket創(chuàng)建原生socket
    2. 調(diào)用同名包下netFd()饰剥,初始化網(wǎng)絡(luò)文件描述符netFd
    3. 調(diào)用fd.dial()殊霞,其中最終有調(diào)用poll_runtime_pollOpen()加入監(jiān)聽列表
    */
}

runtime.poll_runtime_pollOpen 重置輪詢信息 runtime.pollDesc 并調(diào)用 runtime.netpollopen 初始化輪詢事件:

func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    lock(&pd.lock)
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    ...
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    ...
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    unlock(&pd.lock)

    var errno int32
    // 初始化輪詢事件
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

runtime.netpollopen 的實(shí)現(xiàn)非常簡(jiǎn)單,它會(huì)調(diào)用 epollctl 向全局的輪詢文件描述符 epfd 中加入新的輪詢事件監(jiān)聽文件描述符的可讀和可寫狀態(tài):

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

從全局的 epfd 中刪除待監(jiān)聽的文件描述符可以使用 runtime.netpollclose捐川,因?yàn)樵摵瘮?shù)的實(shí)現(xiàn)與 runtime.netpollopen 比較相似脓鹃,所以這里不展開分析了。

2. 讀等待

主要是掛起goroutine古沥,并建立gorotine和fd之間的關(guān)聯(lián)瘸右。
當(dāng)從netFd讀取數(shù)據(jù)時(shí),調(diào)用system call岩齿,循環(huán)從fd.sysfd讀取數(shù)據(jù):

func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

讀取的時(shí)候只處理EAGAIN類型的錯(cuò)誤太颤,其他錯(cuò)誤一律返回給調(diào)用者,因?yàn)閷?duì)于非阻塞的網(wǎng)絡(luò)連接的文件描述符盹沈,如果錯(cuò)誤是EAGAIN龄章,說明Socket的緩沖區(qū)為空,未讀取到任何數(shù)據(jù)乞封,則調(diào)用fd.pd.WaitRead

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

res是runtime_pollWait函數(shù)返回的結(jié)果做裙,由conevertErr函數(shù)包裝后返回:

func convertErr(res int, isFile bool) error {
    switch res {
    case 0:
        return nil
    case 1:
        return errClosing(isFile)
    case 2:
        return ErrTimeout
    }
    println("unreachable: ", res)
    panic("unreachable")
}

其中0表示io已經(jīng)準(zhǔn)備好了,1表示鏈接意見關(guān)閉肃晚,2表示io超時(shí)锚贱。再來看看pollWait的實(shí)現(xiàn):

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

調(diào)用netpollblock來判斷IO是否準(zhǔn)備好了:

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    }
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

返回true說明IO已經(jīng)準(zhǔn)備好,返回false說明IO操作已經(jīng)超時(shí)或者已經(jīng)關(guān)閉关串。否則當(dāng)waitio為false, 且io不出現(xiàn)錯(cuò)誤或者超時(shí)才會(huì)掛起當(dāng)前goroutine拧廊。

最后的gopark函數(shù),就是將當(dāng)前的goroutine(調(diào)用者)設(shè)置為waiting狀態(tài):

// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp)
    // can't do anything that might move the G between Ms here.
    mcall(park_m)
}

mcall(park_m)將會(huì)掛起當(dāng)前與g綁定的m:

func park_m(gp *g) {
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()

    if _g_.m.waitunlockf != nil {
        fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
        ok := fn(gp, _g_.m.waitlock)
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule()
}

3. 就緒喚醒

那什么時(shí)候goroutine被喚醒并調(diào)度回來呢晋修?運(yùn)行時(shí)在執(zhí)行schedule()方法時(shí)吧碾,會(huì)通過findrunnable(),調(diào)用netpoll()檢查文件描述符狀態(tài)墓卦。

schedule() -> findrunnable() -> netpoll()

調(diào)用netpoll()尋找到IO就緒的socket文件描述符倦春,并找到這些socket文件描述符對(duì)應(yīng)的輪詢器中附帶的信息,根據(jù)這些信息將之前等待這些socket文件描述符就緒的goroutine狀態(tài)修改為Grunnable。執(zhí)行完netpoll之后睁本,會(huì)找到一個(gè)就緒的goroutine列表山叮,接下來將就緒的goroutine加入到調(diào)度隊(duì)列中,等待調(diào)度運(yùn)行添履。

下面我們看下netpoll()的源碼實(shí)現(xiàn):

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) *g {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    // 根據(jù)傳入的 delay 計(jì)算 epoll 系統(tǒng)調(diào)用需要等待的時(shí)間
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        waitms = 1e9
    }
    var events [128]epollevent
retry:
    // 調(diào)用 epollwait 等待可讀或者可寫事件的發(fā)生
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        goto retry
    }
    var gp guintptr
    // 在循環(huán)中依次處理 epollevent 事件
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            // 文件描述符的正常讀寫事件屁倔,對(duì)于這些事件,我們會(huì)交給netpollready處理
            netpollready(&gp, pd, mode)
        }
    }
    if block && gp == 0 {
        goto retry
    }
    return gp.ptr()
}

當(dāng)netpoll()調(diào)用epollwait()獲取到被監(jiān)控的文件描述符出現(xiàn)了待處理的事件暮胧,就會(huì)在循環(huán)中依次調(diào)用netpollready()處理這些事件锐借。

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

runtime.netpollunblock會(huì)在讀寫事件發(fā)生時(shí),將 runtime.pollDesc中的讀或者寫信號(hào)量轉(zhuǎn)換成 pdReady 并返回其中存儲(chǔ)的 goroutine往衷;如果返回的 Goroutine 不會(huì)為空钞翔,那么運(yùn)行時(shí)會(huì)將該 goroutine 會(huì)加入 toRun 列表,并將列表中的全部 goroutine 加入運(yùn)行隊(duì)列席舍。

當(dāng)goroutine 加入運(yùn)行隊(duì)列后布轿,在某一次調(diào)度goroutine的過程中,處于就緒狀態(tài)的FD對(duì)應(yīng)的goroutine就會(huì)被調(diào)度回來来颤。

netpoller超時(shí)控制

網(wǎng)絡(luò)輪詢器和計(jì)時(shí)器的關(guān)系非常緊密汰扭,這不僅僅是因?yàn)榫W(wǎng)絡(luò)輪詢器負(fù)責(zé)計(jì)時(shí)器的喚醒,還因?yàn)槲募途W(wǎng)絡(luò) I/O 的截止日期也由網(wǎng)絡(luò)輪詢器負(fù)責(zé)處理福铅。截止日期在 I/O 操作中萝毛,尤其是網(wǎng)絡(luò)調(diào)用中很關(guān)鍵,網(wǎng)絡(luò)請(qǐng)求存在很高的不確定因素滑黔,我們需要設(shè)置一個(gè)截止日期保證程序的正常運(yùn)行笆包,這時(shí)需要用到網(wǎng)絡(luò)輪詢器中的 runtime.poll_runtime_pollSetDeadline

func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    rd0, wd0 := pd.rd, pd.wd
    if d > 0 {
        d += nanotime()
    }
    pd.rd = d
    ...
    if pd.rt.f == nil {
        if pd.rd > 0 {
            pd.rt.f = netpollReadDeadline
            pd.rt.arg = pd
            pd.rt.seq = pd.rseq
            resettimer(&pd.rt, pd.rd)
        }
    } else if pd.rd != rd0 {
        pd.rseq++
        if pd.rd > 0 {
            modtimer(&pd.rt, pd.rd, 0, rtf, pd, pd.rseq)
        } else {
            deltimer(&pd.rt)
            pd.rt.f = nil
        }
    }

該函數(shù)會(huì)先使用截止日期計(jì)算出過期的時(shí)間點(diǎn),然后根據(jù) runtime.pollDesc 的狀態(tài)做出以下不同的處理:

  1. 如果結(jié)構(gòu)體中的計(jì)時(shí)器沒有設(shè)置執(zhí)行的函數(shù)時(shí)略荡,該函數(shù)會(huì)設(shè)置計(jì)時(shí)器到期后執(zhí)行的函數(shù)庵佣、傳入的參數(shù)并調(diào)用 runtime.resettimer 重置計(jì)時(shí)器;
  2. 如果結(jié)構(gòu)體的讀截止日期已經(jīng)被改變汛兜,我們會(huì)根據(jù)新的截止日期做出不同的處理:
    1. 如果新的截止日期大于 0巴粪,調(diào)用 runtime.modtimer 修改計(jì)時(shí)器;
    2. 如果新的截止日期小于 0序无,調(diào)用 runtime.deltimer 刪除計(jì)時(shí)器验毡;

runtime.poll_runtime_pollSetDeadline 的最后衡创,會(huì)重新檢查輪詢信息中存儲(chǔ)的截止日期:

var rg *g
    if pd.rd < 0 {
        if pd.rd < 0 {
            rg = netpollunblock(pd, 'r', false)
        }
        ...
    }
    if rg != nil {
        netpollgoready(rg, 3)
    }
    ...
}

如果截止日期小于 0帝嗡,上述代碼會(huì)調(diào)用 runtime.netpollgoready 直接喚醒對(duì)應(yīng)的 Goroutine。

runtime.poll_runtime_pollSetDeadline 中直接調(diào)用 runtime.netpollgoready 是相對(duì)比較特殊的情況璃氢。在正常情況下哟玷,運(yùn)行時(shí)都會(huì)在計(jì)時(shí)器到期時(shí)調(diào)用 runtime.netpollDeadlineruntime.netpollReadDeadlineruntime.netpollWriteDeadline 三個(gè)函數(shù):


上述三個(gè)函數(shù)都會(huì)通過 runtime.netpolldeadlineimpl 調(diào)用 runtime.netpollgoready 直接喚醒相應(yīng)的 Goroutine:

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    currentSeq := pd.rseq
    if !read {
        currentSeq = pd.wseq
    }
    if seq != currentSeq {
        return
    }
    var rg *g
    if read {
        pd.rd = -1
        atomic.StorepNoWB(unsafe.Pointer(&pd.rt.f), nil)
        rg = netpollunblock(pd, 'r', false)
    }
    ...
    if rg != nil {
        netpollgoready(rg, 0)
    }
    ...
}

Goroutine 在被喚醒之后會(huì)意識(shí)到當(dāng)前的 I/O 操作已經(jīng)超時(shí),可以根據(jù)需要選擇重試請(qǐng)求或者中止調(diào)用巢寡。

總結(jié)

總的來說喉脖,netpoller的最終的效果就是用戶層阻塞,底層非阻塞抑月。當(dāng)goroutine讀或?qū)懽枞麜r(shí)會(huì)被放到等待隊(duì)列树叽,這個(gè)goroutine失去了運(yùn)行權(quán),但并不是真正的整個(gè)系統(tǒng)“阻塞”于系統(tǒng)調(diào)用谦絮。而通過后臺(tái)的poller不停地poll题诵,所有的文件描述符都被添加到了這個(gè)poller中的,當(dāng)某個(gè)時(shí)刻一個(gè)文件描述符準(zhǔn)備好了层皱,poller就會(huì)喚醒之前因它而阻塞的goroutine性锭,于是goroutine重新運(yùn)行起來。

和使用Unix系統(tǒng)中的select或是poll方法不同地是叫胖,Golang的netpoller查詢的是能被調(diào)度的goroutine而不是那些函數(shù)指針草冈、包含了各種狀態(tài)變量的struct等,這樣你就不用管理這些狀態(tài)瓮增,也不用重新檢查函數(shù)指針等怎棱,這些都是你在傳統(tǒng)Unix網(wǎng)絡(luò)I/O需要操心的問題。


References:
https://zhuanlan.zhihu.com/p/143847169
https://developer.aliyun.com/article/893401
https://zhuanlan.zhihu.com/p/159457916
https://www.yuque.com/aceld/golang/sdgfgu
https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-netpoller
https://strikefreedom.top/archives/go-netpoll-io-multiplexing-reactor
https://juejin.cn/post/6882984260672847879
https://mp.weixin.qq.com/s/T-hP3wt4whtvVh1H1LBU3w
https://yizhi.ren/2019/06/08/gonetpoller/
https://www.cnblogs.com/luozhiyun/p/14390824.html
https://cloud.tencent.com/developer/article/1234360
https://learnku.com/articles/59847

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末绷跑,一起剝皮案震驚了整個(gè)濱河市蹄殃,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌你踩,老刑警劉巖诅岩,帶你破解...
    沈念sama閱讀 210,914評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異带膜,居然都是意外死亡吩谦,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評(píng)論 2 383
  • 文/潘曉璐 我一進(jìn)店門膝藕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來式廷,“玉大人,你說我怎么就攤上這事芭挽』希” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵袜爪,是天一觀的道長(zhǎng)蠕趁。 經(jīng)常有香客問我,道長(zhǎng)辛馆,這世上最難降的妖魔是什么俺陋? 我笑而不...
    開封第一講書人閱讀 56,309評(píng)論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮,結(jié)果婚禮上腊状,老公的妹妹穿的比我還像新娘诱咏。我一直安慰自己,他們只是感情好缴挖,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,381評(píng)論 5 384
  • 文/花漫 我一把揭開白布袋狞。 她就那樣靜靜地躺著,像睡著了一般映屋。 火紅的嫁衣襯著肌膚如雪硕并。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,730評(píng)論 1 289
  • 那天秧荆,我揣著相機(jī)與錄音倔毙,去河邊找鬼。 笑死乙濒,一個(gè)胖子當(dāng)著我的面吹牛陕赃,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播颁股,決...
    沈念sama閱讀 38,882評(píng)論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼么库,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了甘有?” 一聲冷哼從身側(cè)響起诉儒,我...
    開封第一講書人閱讀 37,643評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎亏掀,沒想到半個(gè)月后忱反,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,095評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡滤愕,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,448評(píng)論 2 325
  • 正文 我和宋清朗相戀三年温算,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片间影。...
    茶點(diǎn)故事閱讀 38,566評(píng)論 1 339
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡注竿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出魂贬,到底是詐尸還是另有隱情巩割,我是刑警寧澤,帶...
    沈念sama閱讀 34,253評(píng)論 4 328
  • 正文 年R本政府宣布付燥,位于F島的核電站宣谈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏机蔗。R本人自食惡果不足惜蒲祈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,829評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望萝嘁。 院中可真熱鬧梆掸,春花似錦、人聲如沸牙言。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咱枉。三九已至卑硫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蚕断,已是汗流浹背欢伏。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評(píng)論 1 264
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留亿乳,地道東北人硝拧。 一個(gè)月前我還...
    沈念sama閱讀 46,248評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像葛假,于是被迫代替她去往敵國(guó)和親障陶。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,440評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容