linux系統(tǒng)下poll和epoll內(nèi)核源代碼剖析

poll和epoll的使用應(yīng)該不用再多說了眯漩。當(dāng)fd很多時,使用epoll比poll效率更高队萤。我們通過內(nèi)核源碼分析來看看到底是為什么交胚。

poll剖析poll系統(tǒng)調(diào)用:

intpoll(struct pollfd *fds,nfds_tnfds,inttimeout);

對應(yīng)的實現(xiàn)代碼為:

[fs/select.c -->sys_poll]

asmlinkagelongsys_poll(struct pollfd __user * ufds,unsignedintnfds,longtimeout)

{

structpoll_wqueuestable;

intfdcount, err;

unsignedinti;

structpoll_list*head;

structpoll_list*walk;

/* Do a sanity check on nfds ... *//* 用戶給的nfds數(shù)不可以超過一個struct file結(jié)構(gòu)支持

的最大fd數(shù)(默認(rèn)是256)*/

if(nfds > current->files->max_fdset && nfds > OPEN_MAX)

return-EINVAL;

if(timeout) {

/* Careful about overflow in the intermediate values */

if((unsignedlong) timeout < MAX_SCHEDULE_TIMEOUT / HZ)

timeout = (unsignedlong)(timeout*HZ+999)/1000+1;

else/* Negative or overflow */

timeout = MAX_SCHEDULE_TIMEOUT;

}

poll_initwait(&table);

其中poll_initwait較為關(guān)鍵匆帚,從字面上看,應(yīng)該是初始化變量table嚎幸,注意此處table在整個執(zhí)行poll的過程中是很關(guān)鍵的變量。而struct poll_table其實就只包含了一個函數(shù)指針:

[fs/poll.h]

/*

* structures and helpers for f_op->poll implementations

*/

typedefvoid(*poll_queue_proc)(struct file *,wait_queue_head_t*, struct

poll_table_struct *);

typedefstructpoll_table_struct{

poll_queue_proc qproc;

}

poll_table;

現(xiàn)在我們來看看poll_initwait到底在做些什么

[fs/select.c]

void __pollwait(structfile*filp, wait_queue_head_t *wait_address, poll_table *p);

void poll_initwait(structpoll_wqueues*pwq)

{

&(pwq->pt)->qproc = __pollwait;/*此行已經(jīng)被我“翻譯”了替废,方便觀看*/

pwq->error =0;

pwq->table = NULL;

}

需要C/C++ Linux服務(wù)器架構(gòu)師學(xué)習(xí)資料私信“資料”(資料包括C/C++,Linux状答,golang技術(shù)惊科,Nginx,ZeroMQ孙咪,MySQL,Redis荤堪,fastdfs拥知,MongoDB,ZK襟齿,流媒體,CDN,P2P副瀑,K8S,Docker狈孔,TCP/IP,協(xié)程,DPDK深寥,ffmpeg等)惋鹅,免費(fèi)分享

很明顯沽讹,poll_initwait的主要動作就是把table變量的成員poll_table對應(yīng)的回調(diào)函數(shù)置__pollwait。這個__pollwait不僅是poll系統(tǒng)調(diào)用需要武鲁,select系統(tǒng)調(diào)用也一樣是用這個__pollwait爽雄,說白了,這是個操作系統(tǒng)的異步操作的“御用”回調(diào)函數(shù)沐鼠。當(dāng)然了盲链,epoll沒有用這個,它另外新增了一個回調(diào)函數(shù)迟杂,以達(dá)到其高效運(yùn)轉(zhuǎn)的目的,這是后話顿乒,暫且不表涂身。我們先不討論__pollwait的具體實現(xiàn)搜骡,還是繼續(xù)看sys_poll:

[fs/select.c -->sys_poll]

head = NULL;

walk = NULL;

i = nfds;

err = -ENOMEM;

while(i!=0) {

structpoll_list*pp;

pp = kmalloc(sizeof(structpoll_list)+

sizeof(structpollfd)*

(i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i),

GFP_KERNEL);

if(pp==NULL)

goto out_fds;

pp->next=NULL;

pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i);

if(head == NULL)

head = pp;

else

walk->next = pp;

walk = pp;

if(copy_from_user(pp->entries, ufds + nfds-i,

sizeof(structpollfd)*pp->len)) {

err = -EFAULT;

goto out_fds;

}

i -= pp->len;

}

fdcount = do_poll(nfds, head, &table, timeout);

這一大堆代碼就是建立一個鏈表寸痢,每個鏈表的節(jié)點是一個page大泄恰(通常是4k),這鏈表節(jié)點由一個指向struct poll_list的指針掌控惕橙,而眾多的struct pollfd就通過struct_list的entries成員訪問栓始。上面的循環(huán)就是把用戶態(tài)的struct pollfd拷進(jìn)這些entries里。通常用戶程序的poll調(diào)用就監(jiān)控幾個fd,所以上面這個鏈表通常也就只需要一個節(jié)點,即操作系統(tǒng)的一頁剃法。但是雁竞,當(dāng)用戶傳入的fd很多時,由于poll系統(tǒng)調(diào)用每次都要把所有struct pollfd拷進(jìn)內(nèi)核阀湿,所以參數(shù)傳遞和頁分配此時就成了poll系統(tǒng)調(diào)用的性能瓶頸莫矗。最后一句do_poll曹宴,我們跟進(jìn)去:

[fs/select.c-->sys_poll()-->do_poll()]

staticvoiddo_pollfd(unsignedintnum, struct pollfd * fdpage,

poll_table ** pwait,int*count)

{

inti;

for(i =0; i < num; i++) {

intfd;

unsignedintmask;

structpollfd*fdp;

mask =0;

fdp = fdpage+i;

fd = fdp->fd;

if(fd >=0) {

structfile*file=fget(fd);

mask = POLLNVAL;

if(file !=NULL) {

mask = DEFAULT_POLLMASK;

if(file->f_op && file->f_op->poll)

mask = file->f_op->poll(file, *pwait);

mask &= fdp->events | POLLERR | POLLHUP;

fput(file);

}

if(mask) {

*pwait =NULL;

(*count)++;

}

}

fdp->revents = mask;

}

}

staticintdo_poll(unsignedintnfds, struct poll_list *list,

struct poll_wqueues *wait,longtimeout)

{

intcount =0;

poll_table* pt = &wait->pt;

if(!timeout)

pt =NULL;

for(;;) {

structpoll_list*walk;

set_current_state(TASK_INTERRUPTIBLE);

walk =list;

while(walk !=NULL) {

do_pollfd( walk->len, walk->entries, &pt, &count);

walk = walk->next;

}

pt =NULL;

if(count || !timeout || signal_pending(current))

break;

count = wait->error;

if(count)

break;

timeout = schedule_timeout(timeout);/* 讓current掛起泡仗,別的進(jìn)程跑卖鲤,timeout到了

以后再回來運(yùn)行current*/

}

__set_current_state(TASK_RUNNING);

returncount;

}

注意set_current_state和signal_pending亏钩,它們兩句保障了當(dāng)用戶程序在調(diào)用poll后掛起時栅哀,發(fā)信號可以讓程序迅速推出poll調(diào)用,而通常的系統(tǒng)調(diào)用是不會被信號打斷的。

縱覽do_poll函數(shù)霜旧,主要是在循環(huán)內(nèi)等待,直到count大于0才跳出循環(huán),而count主要是靠do_pollfd函數(shù)處理航背。注意這段代碼:

while(walk !=NULL) {

do_pollfd( walk->len, walk->entries, &pt, &count);

walk = walk->next;

}

當(dāng)用戶傳入的fd很多時(比如1000個)喉悴,對do_pollfd就會調(diào)用很多次,poll效率瓶頸的另一原因就在這里玖媚。do_pollfd就是針對每個傳進(jìn)來的fd箕肃,調(diào)用它們各自對應(yīng)的poll函數(shù),簡化一下調(diào)用過程今魔,如下:

structfile* file = fget(fd);

file->f_op->poll(file, &(table->pt));

如果fd對應(yīng)的是某個socket勺像,do_pollfd調(diào)用的就是網(wǎng)絡(luò)設(shè)備驅(qū)動實現(xiàn)的poll;如果fd對應(yīng)的是某個ext3文件系統(tǒng)上的一個打開文件错森,那do_pollfd調(diào)用的就是ext3文件系統(tǒng)驅(qū)動實現(xiàn)的poll吟宦。一句話,這個file->f_op->poll是設(shè)備驅(qū)動程序?qū)崿F(xiàn)的涩维,那設(shè)備驅(qū)動程序的poll實現(xiàn)通常又是什么樣子呢殃姓?其實,設(shè)備驅(qū)動程序的標(biāo)準(zhǔn)實現(xiàn)是:調(diào)用poll_wait瓦阐,即以設(shè)備自己的等待隊列為參數(shù)(通常設(shè)備都有自己的等待隊列蜗侈,不然一個不支持異步操作的設(shè)備會讓人很郁悶)調(diào)用struct poll_table的回調(diào)函數(shù)。作為驅(qū)動程序的代表睡蟋,我們看看socket在使用tcp時的代碼:

[net/ipv4/tcp.c-->tcp_poll]

unsigned int tcp_poll(structfile*file,structsocket*sock, poll_table *wait)

{

unsigned int mask;

structsock*sk = sock->sk;

structtcp_opt*tp = tcp_sk(sk);

poll_wait(file, sk->sk_sleep, wait);

代碼就看這些踏幻,剩下的無非就是判斷狀態(tài)、返回狀態(tài)值戳杀,tcp_poll的核心實現(xiàn)就是poll_wait该面,而

poll_wait就是調(diào)用struct poll_table對應(yīng)的回調(diào)函數(shù),那poll系統(tǒng)調(diào)用對應(yīng)的回調(diào)函數(shù)就是__poll_wait信卡,所以這里幾乎就可以把tcp_poll理解為一個語句:

__poll_wait(file, sk->sk_sleep,wait);

由此也可以看出隔缀,每個socket自己都帶有一個等待隊列sk_sleep,所以上面我們所說的“設(shè)備的等待隊列”其實不止一個坐求。這時候我們再看看__poll_wait的實現(xiàn):

[fs/select.c-->__poll_wait()]

void __pollwait(structfile*filp, wait_queue_head_t *wait_address, poll_table *_p)

{

structpoll_wqueues*p = container_of(_p,structpoll_wqueues, pt);

structpoll_table_page*table = p->table;

if(!table || POLL_TABLE_FULL(table)) {

structpoll_table_page*new_table;

new_table = (structpoll_table_page*) __get_free_page(GFP_KERNEL);

if(!new_table) {

p->error = -ENOMEM;

__set_current_state(TASK_RUNNING);

return;

}

new_table->entry = new_table->entries;

new_table->next = table;

p->table = new_table;

table = new_table;

}

/* Add a new entry */

{

structpoll_table_entry* entry = table->entry;

table->entry = entry+1;

get_file(filp);

entry->filp = filp;

entry->wait_address = wait_address;

init_waitqueue_entry(&entry->wait, current);

add_wait_queue(wait_address,&entry->wait);

}

}

__poll_wait的作用就是創(chuàng)建了上圖所示的數(shù)據(jù)結(jié)構(gòu)(一次__poll_wait即一次設(shè)備poll調(diào)用只創(chuàng)建一個poll_table_entry)蚕泽,并通過struct poll_table_entry的wait成員晌梨,把current掛在了設(shè)備的等待隊列

上桥嗤,此處的等待隊列是wait_address,對應(yīng)tcp_poll里的sk->sk_sleep∽序颍現(xiàn)在我們可以回顧一下poll系統(tǒng)調(diào)用的原理了:先注冊回調(diào)函數(shù)__poll_wait泛领,再初始化table變量(類型為struct poll_wqueues),接著拷貝用戶傳入的struct pollfd(其實主要是fd)敛惊,然后輪流調(diào)用所有fd對應(yīng)的poll(把current掛到各個fd對應(yīng)的設(shè)備等待隊列上)渊鞋。在設(shè)備收到一條消息(網(wǎng)絡(luò)設(shè)備)或填寫完文件數(shù)據(jù)(磁盤設(shè)備)后,會喚醒設(shè)備等待隊列上的進(jìn)程,這時current便被喚醒了锡宋。current醒來后離開sys_poll的操作相對簡單儡湾,這里就不逐行分析了。

epoll

通過上面的分析执俩,poll運(yùn)行效率的兩個瓶頸已經(jīng)找出徐钠,現(xiàn)在的問題是怎么改進(jìn)。首先役首,每次poll都要把1000個fd 拷入內(nèi)核尝丐,太不科學(xué)了,內(nèi)核干嘛不自己保存已經(jīng)拷入的fd呢坎藐?答對了凶朗,epoll就是自己保存拷入的fd毅否,它的API就已經(jīng)說明了這一點——不是 epoll_wait的時候才傳入fd,而是通過epoll_ctl把所有fd傳入內(nèi)核再一起"wait"失息,這就省掉了不必要的重復(fù)拷貝。其次乏屯,在 epoll_wait時根时,也不是把current輪流的加入fd對應(yīng)的設(shè)備等待隊列,而是在設(shè)備等待隊列醒來時調(diào)用一個回調(diào)函數(shù)(當(dāng)然辰晕,這就需要“喚醒回調(diào)”機(jī)制)蛤迎,把產(chǎn)生事件的fd歸入一個鏈表,然后返回這個鏈表上的fd含友。

epoll剖析

epoll是個module替裆,所以先看看module的入口eventpoll_init

[fs/eventpoll.c-->evetpoll_init()]

staticint__init eventpoll_init(void)

{

interror;

init_MUTEX(&epsem);

/* Initialize the structure used to perform safe poll wait head wake ups */

ep_poll_safewake_init(&psw);

/* Allocates slab cache used to allocate "struct epitem" items */

epi_cache = kmem_cache_create("eventpoll_epi",sizeof(structepitem),

0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC,

NULL,NULL);

/* Allocates slab cache used to allocate "struct eppoll_entry" */

pwq_cache = kmem_cache_create("eventpoll_pwq",

sizeof(structeppoll_entry),0,

EPI_SLAB_DEBUG|SLAB_PANIC,NULL,NULL);

/*

* Register the virtual file system that will be the source of inodes

* for the eventpoll files

*/

error = register_filesystem(&eventpoll_fs_type);

if(error)

gotoepanic;

/* Mount the above commented virtual file system */

eventpoll_mnt = kern_mount(&eventpoll_fs_type);

error = PTR_ERR(eventpoll_mnt);

if(IS_ERR(eventpoll_mnt))

gotoepanic;

DNPRINTK(3, (KERN_INFO"[%p] eventpoll: successfully initialized.\n",

current));

return0;

epanic:

panic("eventpoll_init() failed\n");

}

很有趣,這個module在初始化時注冊了一個新的文件系統(tǒng)窘问,叫"eventpollfs"(在eventpoll_fs_type結(jié)構(gòu)里)辆童,然后掛載此文件系統(tǒng)。另外創(chuàng)建兩個內(nèi)核cache(在內(nèi)核編程中惠赫,如果需要頻繁分配小塊內(nèi)存把鉴,應(yīng)該創(chuàng)建kmem_cahe來做“內(nèi)存池”),分別用于存放struct epitem和eppoll_entry。如果以后要開發(fā)新的文件系統(tǒng)儿咱,可以參考這段代碼⊥タ常現(xiàn)在想想epoll_create為什么會返回一個新的fd?因為它就是在這個叫做"eventpollfs"的文件系統(tǒng)里創(chuàng)建了一個新文件混埠!如下:

[fs/eventpoll.c-->sys_epoll_create()]

asmlinkagelongsys_epoll_create(intsize)

{

interror, fd;

structinode*inode;

structfile*file;

DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_create(%d)\n",

current, size));

/* Sanity check on the size parameter */

error = -EINVAL;

if(size <=0)

gotoeexit_1;

/*

* Creates all the items needed to setup an eventpoll file. That is,

* a file structure, and inode and a free file descriptor.

*/

error = ep_getfd(&fd, &inode, &file);

if(error)

gotoeexit_1;

/* Setup the file internal data structure ( "struct eventpoll" ) */

error = ep_file_init(file);

if(error)

gotoeexit_2;

函數(shù)很簡單怠缸,其中ep_getfd看上去是“get”,其實在第一次調(diào)用epoll_create時钳宪,它是要創(chuàng)建新inode揭北、新的file扳炬、新的fd。而ep_file_init則要創(chuàng)建一個struct eventpoll結(jié)構(gòu)搔体,并把它放入file-

>private_data恨樟,注意,這個private_data后面還要用到的疚俱⊙岫牛看到這里,也許有人要問了计螺,為什么epoll的開發(fā)者不做一個內(nèi)核的超級大map把用戶要創(chuàng)建的epoll句柄存起來夯尽,在epoll_create時返回一個指針?那似乎很直觀呀登馒。但是匙握,仔細(xì)看看,linux的系統(tǒng)調(diào)用有多少是返回指針的陈轿?你會發(fā)現(xiàn)幾乎沒有HΨ摹(特此強(qiáng)調(diào),malloc不是系統(tǒng)調(diào)用麦射,malloc調(diào)用的brk才是)因為linux做為unix的最杰出的繼承人蛾娶,它遵循了unix的一個巨大優(yōu)點——一切皆文件,輸入輸出是文件潜秋、socket也

是文件蛔琅,一切皆文件意味著使用這個操作系統(tǒng)的程序可以非常簡單,因為一切都是文件操作而已>骸(unix還不是完全做到罗售,plan 9才算)。而且使用文件系統(tǒng)有個好處:epoll_create返回的是一個fd钩述,而不是該死的指針寨躁,指針如果指錯了,你簡直沒辦法判斷牙勘,而fd則可以通過current->files->fd_array[]找到其真?zhèn)沃翱摇poll_create好了,該epoll_ctl了方面,我們略去判斷性的代碼:

[fs/eventpoll.c-->sys_epoll_ctl()]

asmlinkagelong

sys_epoll_ctl(intepfd,intop,intfd, struct epoll_event __user *event)

{

interror;

structfile*file, *tfile;

structeventpoll*ep;

structepitem*epi;

structepoll_eventepds;

....

epi = ep_find(ep, tfile, fd);

error = -EINVAL;

switch(op) {

caseEPOLL_CTL_ADD:

if(!epi) {

epds.events |= POLLERR | POLLHUP;

error = ep_insert(ep, &epds, tfile, fd);

}else

error = -EEXIST;

break;

caseEPOLL_CTL_DEL:

if(epi)

error = ep_remove(ep, epi);

else

error = -ENOENT;

break;

caseEPOLL_CTL_MOD:

if(epi) {

epds.events |= POLLERR | POLLHUP;

error = ep_modify(ep, epi, &epds);

} else

error = -ENOENT;

break;

}

原來就是在一個大的結(jié)構(gòu)(現(xiàn)在先不管是什么大結(jié)構(gòu))里先ep_find放钦,如果找到了struct epitem而用戶操作是ADD,那么返回-EEXIST葡幸;如果是DEL最筒,則ep_remove贺氓。如果找不到struct epitem而用戶操作是ADD蔚叨,就ep_insert創(chuàng)建并插入一個床蜘。很直白。那這個“大結(jié)構(gòu)”是什么呢蔑水?看ep_find的調(diào)用方式邢锯,ep參數(shù)應(yīng)該是指向這個“大結(jié)構(gòu)”的指針,再看ep = file->private_data搀别,我們才明白丹擎,原來這個“大結(jié)構(gòu)”就是那個在epoll_create時創(chuàng)建的struct eventpoll,具體再看看ep_find的實現(xiàn)歇父,發(fā)現(xiàn)原來是struct eventpoll的rbr成員(struct rb_root)蒂培,原來這是一個紅黑樹的根!而紅黑樹上掛的都是struct epitem“裆唬現(xiàn)在清楚了护戳,一個新創(chuàng)建的epoll文件帶有一個struct eventpoll結(jié)構(gòu),這個結(jié)構(gòu)上再掛一個紅黑樹垂睬,而這個紅黑樹就是每次epoll_ctl時fd存放的地方媳荒!現(xiàn)在數(shù)據(jù)結(jié)構(gòu)都已經(jīng)清楚了,我們來看最核心的:

[fs/eventpoll.c-->sys_epoll_wait()]

asmlinkagelongsys_epoll_wait(intepfd, struct epoll_event __user *events,

intmaxevents,inttimeout)

{

interror;

structfile*file;

structeventpoll*ep;

DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d)\n",

current, epfd, events, maxevents, timeout));

/* The maximum number of event must be greater than zero */

if(maxevents <=0)

return-EINVAL;

/* Verify that the area passed by the user is writeable */

if((error = verify_area(VERIFY_WRITE, events, maxevents *sizeof(struct

epoll_event))))

gotoeexit_1;

/* Get the "struct file *" for the eventpoll file */

error = -EBADF;

file = fget(epfd);

if(!file)

gotoeexit_1;

/*

* We have to check that the file structure underneath the fd

* the user passed to us _is_ an eventpoll file.

*/

error = -EINVAL;

if(!IS_FILE_EPOLL(file))

gotoeexit_2;

/*

* At this point it is safe to assume that the "private_data" contains

* our own data structure.

*/

ep = file->private_data;

/* Time to fish for events ... */

error = ep_poll(ep, events, maxevents, timeout);

eexit_2:

fput(file);

eexit_1:

DNPRINTK(3, (KERN_INFO"[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) =

%d\n",

current, epfd, events, maxevents, timeout, error));

returnerror;

}

故伎重演驹饺,從file->private_data中拿到struct eventpoll钳枕,再調(diào)用ep_poll

[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()]

staticintep_poll(structeventpoll *ep,structepoll_event __user *events,

intmaxevents,longtimeout)

{

intres, eavail;

unsignedlongflags;

longjtimeout;

wait_queue_t wait;

/*

* Calculate the timeout by checking for the "infinite" value ( -1 )

* and the overflow condition. The passed timeout is in milliseconds,

* that why (t * HZ) / 1000.

*/

jtimeout = timeout ==-1|| timeout > (MAX_SCHEDULE_TIMEOUT -1000) / HZ ?

MAX_SCHEDULE_TIMEOUT: (timeout * HZ +999) /1000;

retry:

write_lock_irqsave(&ep->lock, flags);

res =0;

if(list_empty(&ep->rdllist)) {

/*

* We don't have any available event to return to the caller.

* We need to sleep here, and we will be wake up by

?* ep_poll_callback() when events will become available.

*/

init_waitqueue_entry(&wait, current);

add_wait_queue(&ep->wq, &wait);

for(;;) {

/*

* We don't want to sleep if the ep_poll_callback() sends us

* a wakeup in between. That's why we set the task state

* to TASK_INTERRUPTIBLE before doing the checks.

*/

set_current_state(TASK_INTERRUPTIBLE);

if(!list_empty(&ep->rdllist) || !jtimeout)

break;

if(signal_pending(current)) {

res = -EINTR;

break;

}

write_unlock_irqrestore(&ep->lock, flags);

jtimeout = schedule_timeout(jtimeout);

write_lock_irqsave(&ep->lock, flags);

}

remove_wait_queue(&ep->wq, &wait);

set_current_state(TASK_RUNNING);

}

又是一個大循環(huán),不過這個大循環(huán)比poll的那個好赏壹,因為仔細(xì)一看——它居然除了睡覺和判斷ep->rdllist是否為空以外,啥也沒做蝌借!什么也沒做當(dāng)然效率高了,但到底是誰來讓ep->rdllist不為空呢硬爆?答案是ep_insert時設(shè)下的回調(diào)函數(shù)

[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]

staticint ep_insert(structeventpoll*ep,structepoll_event*event,

structfile*tfile, int fd)

{

int error, revents, pwake =0;

unsigned long flags;

structepitem*epi;

structep_pqueueepq;

error = -ENOMEM;

if(!(epi = EPI_MEM_ALLOC()))

goto eexit_1;

/* Item initialization follow here ... */

EP_RB_INITNODE(&epi->rbn);

INIT_LIST_HEAD(&epi->rdllink);

INIT_LIST_HEAD(&epi->fllink);

INIT_LIST_HEAD(&epi->txlink);

INIT_LIST_HEAD(&epi->pwqlist);

epi->ep = ep;

EP_SET_FFD(&epi->ffd, tfile, fd);

epi->event = *event;

atomic_set(&epi->usecnt,1);

epi->nwait =0;

/* Initialize the poll table using the queue callback */

epq.epi = epi;

init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*

* Attach the item to the poll hooks and get current event bits.

* We can safely use the file* here because its usage count has

* been increased by the caller of this function.

*/

revents = tfile->f_op->poll(tfile, &epq.pt);

我們注意init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);這一行,其實就是&(epq.pt)->qproc = ep_ptable_queue_proc;緊接著 tfile->f_op->poll(tfile, &epq.pt)其實就是調(diào)用被監(jiān)控文件(epoll里叫“target file”)的poll方法擎鸠,而這個poll其實就是調(diào)用poll_wait(還記得poll_wait嗎劣光?每個支持poll的設(shè)備驅(qū)動程序都要調(diào)用的)袜蚕,最后就是調(diào)用ep_ptable_queue_proc。這是比較難解的一個調(diào)用關(guān)系绢涡,因為不是語言級的直接調(diào)用。ep_insert還把struct epitem放到struct file里的f_ep_links連表里凿傅,以方便查找,struct epitem里的fllink就是擔(dān)負(fù)這個使命的辨液。

[fs/eventpoll.c-->ep_ptable_queue_proc()]

staticvoid ep_ptable_queue_proc(structfile*file, wait_queue_head_t *whead,

poll_table *pt)

{

structepitem*epi = EP_ITEM_FROM_EPQUEUE(pt);

structeppoll_entry*pwq;

if(epi->nwait >=0&& (pwq = PWQ_MEM_ALLOC())) {

init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

pwq->whead = whead;

pwq->base = epi;

add_wait_queue(whead, &pwq->wait);

list_add_tail(&pwq->llink, &epi->pwqlist);

epi->nwait++;

}else{

/* We have to signal that an error occurred */

epi->nwait = -1;

}

}

上面的代碼就是ep_insert中要做的最重要的事:創(chuàng)建struct eppoll_entry箱残,設(shè)置其喚醒回調(diào)函數(shù)為

ep_poll_callback被辑,然后加入設(shè)備等待隊列(注意這里的whead就是上一章所說的每個設(shè)備驅(qū)動都要帶的等待隊列)。只有這樣间涵,當(dāng)設(shè)備就緒榜揖,喚醒等待隊列上的等待著時,ep_poll_callback就會被調(diào)用思劳。每次調(diào)用poll系統(tǒng)調(diào)用潜叛,操作系統(tǒng)都要把current(當(dāng)前進(jìn)程)掛到fd對應(yīng)的所有設(shè)備的等待隊列上壶硅,可以想象,fd多到上千的時候椒舵,這樣“掛”法很費(fèi)事约谈;而每次調(diào)用epoll_wait則沒有這么羅嗦,epoll只在epoll_ctl時把current掛一遍(這第一遍是免不了的)并給每個fd一個命令“好了就調(diào)回調(diào)函數(shù)”泼橘,如果設(shè)備有事件了迈勋,通過回調(diào)函數(shù),會把fd放入rdllist重归,而每次調(diào)用epoll_wait就只是收集rdllist里的fd就可以了——epoll巧妙的利用回調(diào)函數(shù),實現(xiàn)了更高效的事件驅(qū)動模型。現(xiàn)在我們猜也能猜出來ep_poll_callback會干什么了——肯定是把紅黑樹上的收到event的epitem(代表每個fd)插入ep->rdllist中狈网,這樣笨腥,當(dāng)epoll_wait返回時脖母,rdllist里就都是就緒的fd了!

[fs/eventpoll.c-->ep_poll_callback()]

staticint ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

int pwake =0;

unsigned long flags;

structepitem*epi = EP_ITEM_FROM_WAIT(wait);

structeventpoll*ep = epi->ep;

DNPRINTK(3, (KERN_INFO"[%p] eventpoll: poll_callback(%p) epi=%p

ep=%p\n",

current, epi->file, epi, ep));

write_lock_irqsave(&ep->lock, flags);

/*

* If the event mask does not contain any poll(2) event, we consider the

* descriptor to be disabled. This condition is likely the effect of the

* EPOLLONESHOT bit that disables the descriptor when an event is received,

* until the next EPOLL_CTL_MOD will be issued.

*/

if(!(epi->event.events & ~EP_PRIVATE_BITS))

goto is_disabled;

/* If this file is already in the ready list we exit soon */

if(EP_IS_LINKED(&epi->rdllink))

goto is_linked;

list_add_tail(&epi->rdllink, &ep->rdllist);

is_linked:

/*

* Wake up ( if active ) both the eventpoll wait list and the ->poll()

* wait list.

*/

if(waitqueue_active(&ep->wq))

wake_up(&ep->wq);

if(waitqueue_active(&ep->poll_wait))

pwake++;

is_disabled:

write_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */

if(pwake)

ep_poll_safewake(&psw, &ep->poll_wait);

return1;

}

真正重要的只有 list_add_tail(&epi->rdllink, &ep->rdllist);一句,就是把struct epitem放到struct eventpoll的rdllist中去〗抛校現(xiàn)在我們可以畫出epoll的核心數(shù)據(jù)結(jié)構(gòu)圖了:

epoll獨(dú)有的EPOLLET

EPOLLET是epoll系統(tǒng)調(diào)用獨(dú)有的flag鲤脏,ET就是Edge Trigger(邊緣觸發(fā))的意思吕朵,具體含義和應(yīng)用大家可google之。有了EPOLLET硫嘶,重復(fù)的事件就不會總是出來打擾程序的判斷音半,故而常被使用贡蓖。那EPOLLET的原理是什么呢斥铺?epoll把fd都掛上一個回調(diào)函數(shù),當(dāng)fd對應(yīng)的設(shè)備有消息時邻眷,就把fd放入rdllist鏈表,這樣epoll_wait只要檢查這個rdllist鏈表就可以知道哪些fd有事件了改衩。我們看看ep_poll的最后幾行代碼:

[fs/eventpoll.c->ep_poll()]

/*

* Try to transfer events to user space. In case we get 0 events and

* there's still timeout left over, we go trying again in search of

* more luck.

*/

if(!res && eavail &&

!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)

gotoretry;

returnres;

}

把rdllist里的fd拷到用戶空間葫督,這個任務(wù)是ep_events_transfer做的:

[fs/eventpoll.c->ep_events_transfer()]

staticint ep_events_transfer(structeventpoll*ep,

structepoll_event__user *events, int maxevents)

{

int eventcnt =0;

structlist_headtxlist;

INIT_LIST_HEAD(&txlist);

/*

* We need to lock this because we could be hit by

* eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL).

*/

down_read(&ep->sem);

/* Collect/extract ready items */

if(ep_collect_ready_items(ep, &txlist, maxevents) >0) {

/* Build result set in userspace */

eventcnt = ep_send_events(ep, &txlist, events);

/* Reinject ready items into the ready list */

ep_reinject_items(ep, &txlist);

}

up_read(&ep->sem);

returneventcnt;

}

代碼很少橄镜,其中ep_collect_ready_items把rdllist里的fd挪到txlist里(挪完后rdllist就空了)冯乘,接著

ep_send_events把txlist里的fd拷給用戶空間裆馒,然后ep_reinject_items把一部分fd從txlist里“返還”給

rdllist以便下次還能從rdllist里發(fā)現(xiàn)它。其中ep_send_events的實現(xiàn):

[fs/eventpoll.c->ep_send_events()]

staticint ep_send_events(structeventpoll*ep,structlist_head*txlist,

structepoll_event__user *events)

{

int eventcnt =0;

unsigned int revents;

structlist_head*lnk;

structepitem*epi;

/*

* We can loop without lock because this is a task private list.

* The test done during the collection loop will guarantee us that

* another task will not try to collect this file. Also, items

* cannot vanish during the loop because we are holding "sem".

*/

list_for_each(lnk, txlist) {

epi = list_entry(lnk,structepitem, txlink);

/*

* Get the ready file event set. We can safely use the file

* because we are holding the "sem" in read and this will

* guarantee that both the file and the item will not vanish.

?*/

revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);

/*

* Set the return event set for the current file descriptor.

* Note that only the task task was successfully able to link

* the item to its "txlist" will write this field.

*/

epi->revents = revents & epi->event.events;

if(epi->revents) {

if(__put_user(epi->revents,

&events[eventcnt].events) ||

__put_user(epi->event.data,

&events[eventcnt].data))

return-EFAULT;

if(epi->event.events & EPOLLONESHOT)

epi->event.events &= EP_PRIVATE_BITS;

eventcnt++;

}

}

returneventcnt;

}

這個拷貝實現(xiàn)其實沒什么可看的,但是請注意revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);這一行棕孙,這個poll很狡猾些膨,它把第二個參數(shù)置為NULL來調(diào)用订雾。我們先看一下設(shè)備驅(qū)動通常是怎么實現(xiàn)poll的:

staticunsigned int scull_p_poll(structfile*filp, poll_table *wait)

{

structscull_pipe*dev = filp->private_data;

unsigned int mask =0;

/*

* The buffer is circular; it is considered full

* if "wp" is right behind "rp" and empty if the

* two are equal.

*/

down(&dev->sem);

poll_wait(filp, &dev->inq, wait);

poll_wait(filp, &dev->outq, wait);

if(dev->rp != dev->wp)

mask |= POLLIN | POLLRDNORM;/* readable */

if(spacefree(dev))

mask |= POLLOUT | POLLWRNORM;/* writable */

up(&dev->sem);

returnmask;

}

上面這段代碼摘自《linux設(shè)備驅(qū)動程序(第三版)》洼哎,絕對經(jīng)典,設(shè)備先要把current(當(dāng)前進(jìn)程)掛在inq和outq兩個隊列上(這個“掛”操作是wait回調(diào)函數(shù)指針做的)锭沟,然后等設(shè)備來喚醒识补,喚醒后就能通過mask拿到事件掩碼了(注意那個mask參數(shù),它就是負(fù)責(zé)拿事件掩碼的)贴妻。那如果wait為NULL蝙斜,poll_wait會做些什么呢?

[include/linux/poll.h->poll_wait]

staticinlinevoidpoll_wait(struct file * filp,wait_queue_head_t* wait_address,

poll_table *p)

{

if(p && wait_address)

p->qproc(filp, wait_address, p);

}

如果poll_table為空娩鹉,什么也不做底循。我們倒回ep_send_events槐瑞,那句標(biāo)紅的poll困檩,實際上就是“我不想休眠那槽,我只想拿到事件掩碼”的意思骚灸。然后再把拿到的事件掩碼拷給用戶空間。ep_send_events完成后义郑,就輪到ep_reinject_items了:

[fs/eventpoll.c->ep_reinject_items]

staticvoid ep_reinject_items(structeventpoll*ep,structlist_head*txlist)

{

int ricnt =0, pwake =0;

unsigned long flags;

structepitem*epi;

write_lock_irqsave(&ep->lock, flags);

while(!list_empty(txlist)) {

epi = list_entry(txlist->next,structepitem, txlink);

/* Unlink the current item from the transfer list */

EP_LIST_DEL(&epi->txlink);

/*

* If the item is no more linked to the interest set, we don't

* have to push it inside the ready list because the following

* ep_release_epitem() is going to drop it. Also, if the current

* item is set to have an Edge Triggered behaviour, we don't have

* to push it back either.

*/

if(EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&

(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {

list_add_tail(&epi->rdllink, &ep->rdllist);

ricnt++;

}

}

if(ricnt) {

/*

* Wake up ( if active ) both the eventpoll wait list and the ->poll()

* wait list.

*/

if(waitqueue_active(&ep->wq))

wake_up(&ep->wq);

if(waitqueue_active(&ep->poll_wait))

pwake++;

}

write_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */

if(pwake)

ep_poll_safewake(&psw, &ep->poll_wait);

}

ep_reinject_items把txlist里的一部分fd又放回rdllist非驮,那么劫笙,是把哪一部分fd放回去呢星岗?看上面if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&這個判斷——是哪些“沒有標(biāo)上EPOLLET”(標(biāo)紅代碼)且“事件被關(guān)注”(標(biāo)藍(lán)代碼)的fd被重新放回了rdllist俏橘。那么下次epoll_wait當(dāng)然會又把rdllist里的fd拿來拷給用戶了。舉個例子例获。假設(shè)一個socket榨汤,只是connect,還沒有收發(fā)數(shù)據(jù)妓灌,那么它的poll事件掩碼總是有POLLOUT的(參見上面的驅(qū)動示例)蜜宪,每次調(diào)用epoll_wait總是返回POLLOUT事件(比較煩)圃验,因為它的fd就總是被放回rdllist;假如此時有人往這個socket里寫了一大堆數(shù)據(jù)斧散,造成socket塞准琛(不可寫了)麻裁,那么(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {里的判斷就不成立了(沒有POLLOUT了)煎源,fd不會放回rdllist,epoll_wait將不會再返回用戶POLLOUT事件〗挪荩現(xiàn)在我們給這個socket加上EPOLLET原献,然后connect姑隅,沒有收發(fā)數(shù)據(jù),此時慕趴,if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&判斷又不成立了冕房,所以epoll_wait只會返回一次POLLOUT通知給用戶(因為此fd不會再回到rdllist了),接下來的epoll_wait都不會有任何事件通知了给僵。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末帝际,一起剝皮案震驚了整個濱河市饶辙,隨后出現(xiàn)的幾起案子弃揽,更是在濱河造成了極大的恐慌,老刑警劉巖披粟,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蒿辙,居然都是意外死亡滨巴,警方通過查閱死者的電腦和手機(jī)恭取,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進(jìn)店門蜈垮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來攒发,“玉大人,你說我怎么就攤上這事羔砾。” “怎么了政溃?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵玩祟,是天一觀的道長空扎。 經(jīng)常有香客問我润讥,道長,這世上最難降的妖魔是什么撮慨? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任砌溺,我火速辦了婚禮规伐,結(jié)果婚禮上匣缘,老公的妹妹穿的比我還像新娘肌厨。我一直安慰自己,他們只是感情好吵护,可當(dāng)我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布馅而。 她就那樣靜靜地躺著用爪,像睡著了一般胁镐。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上笨农,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天帖渠,我揣著相機(jī)與錄音空郊,去河邊找鬼狞甚。 笑死,一個胖子當(dāng)著我的面吹牛谐腰,可吹牛的內(nèi)容都是我干的十气。 我是一名探鬼主播春霍,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼终畅,長吁一口氣:“原來是場噩夢啊……” “哼离福!你這毒婦竟也來了炼蛤?” 一聲冷哼從身側(cè)響起理朋,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤嗽上,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后彼念,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年棚赔,在試婚紗的時候發(fā)現(xiàn)自己被綠了靠益。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片残揉。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡冲甘,死狀恐怖江醇,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情凛驮,我是刑警寧澤黔夭,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布本姥,位于F島的核電站杭棵,受9級特大地震影響魂爪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蒋川,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一捺球、第九天 我趴在偏房一處隱蔽的房頂上張望懒构。 院中可真熱鬧,春花似錦絮姆、人聲如沸篙悯。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至诞外,卻和暖如春峡谊,著一層夾襖步出監(jiān)牢的瞬間刊苍,已是汗流浹背正什。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工埠忘, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓绰上,卻偏偏與公主長得像旨怠,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蜈块,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,876評論 2 361