epoll之前是select
PPC(Process per connection)和TPC(thread per connection)是進程和線程被相繼提出來之后可都,面對高并發(fā)問題時首先被提出的方案赞弥,也是相對自然的IO方案屹篓,但是隨著io并發(fā)數(shù)量的提高逛薇,對內(nèi)存的高消耗和創(chuàng)建銷毀時cpu造成的效率損失窖贤,這種方案的適應(yīng)性受到質(zhì)疑产雹。于是新的io管理策略被提出來陋率,也就是io多路復用教翩,所謂的多路復用指的是用一個進程或者一個線程來同時管理多路IO(socket),實現(xiàn)一個高并發(fā)的管理策略齐鲤,而select就是linux中的IO多路復用方案:
select的實現(xiàn)
select的實現(xiàn)是通過一個select系統(tǒng)調(diào)用和多個宏指令完成io管理過程的斥废,其實現(xiàn)過程很精巧,接下來我們從源碼角度分析一下該模型的優(yōu)勢和存在的問題给郊。
關(guān)鍵詞:最大連接受限牡肉,select系統(tǒng)調(diào)用,線性輪詢淆九,內(nèi)存拷貝
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int maxfdp, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);
/****
select系統(tǒng)調(diào)用有五個參數(shù)
maxfdp-->返回就緒隊列的fd的最大數(shù)量统锤,也就是進程當前監(jiān)聽的socket的數(shù)量+1,后面會介紹為什么要加一炭庙;
readset-->等待讀入的fds饲窿,傳入監(jiān)聽的fds和返回就緒的fds
writeset-->等待寫入的fds,同上
exceptset-->異常等待的事件
timeout-->輪詢的時間(unit:ms)焕蹄,如果設(shè)置成null逾雄,如果沒有就緒的fds就一直等待
select所有過程都是通過這一個系統(tǒng)調(diào)用來實現(xiàn)的,所以需要在用戶態(tài)和內(nèi)核態(tài)之間不斷的復制數(shù)據(jù)腻脏,并且僅僅維護一個監(jiān)聽io線性鏈表鸦泳,內(nèi)部需要不斷的輪詢尋找就緒的socket,檢查io的狀態(tài)永品,注意和偽IO的區(qū)別做鹰,給fd設(shè)置回調(diào)函數(shù),以設(shè)置就緒io
****/
我們需要重點關(guān)注一下fd_set數(shù)據(jù)類型鼎姐,該數(shù)據(jù)類型表征監(jiān)聽的文件描述符狀態(tài)的位圖钾麸,其中最大連接數(shù)量受限于sizeof(fd_set), 在當前內(nèi)核中是1024掉弛;
源碼剖析
1. 準備工作
#include <sys/select.h>
//fd_set-->關(guān)鍵數(shù)據(jù)類型,用來表征文件描述符的活動狀態(tài)
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
//在/usr/include/bits/typesizes.h指定為1024喂走;
#define FD_SETSIZE _FD_SETSIZE
//幾個用于操作fd_set的宏
//將各位清零
void FD_CLR(int fd, fd_set *set);
//判斷當前描述符是否置位
int FD_ISSET(int fd, fd_set *set);
//將當前描述符置位
void FD_SET(int fd, fd_set *set);
//取出所有為零的位
void FD_ZERO(fd_set *set);
2. 主調(diào)函數(shù)
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
{
struct timespec end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) {
//將timeout從用戶態(tài)復制進內(nèi)核態(tài),并計算出絕對時間
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
//核心例程------------------>>>>>>>>>>
ret = core_sys_select(n, inp, outp, exp, to);
//
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}
/**轉(zhuǎn)向core_sys_select的調(diào)用
*
*
**/
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec *end_time)
{
fd_set_bits fds;//用于維護六種fds的位圖谋作,
void *bits;
int ret, max_fds;
unsigned int size;
struct fdtable *fdt;//文件描述符表
/* Allocate small arguments on the stack to save memory and be faster */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
if (n < 0)
goto out_nofds;
/* max_fds can increase, so grab it once to avoid race */
rcu_read_lock();//rcu鎖的使用芋肠,適合一寫多讀,讀鎖很輕量級遵蚜,僅僅關(guān)閉搶占即可
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;
/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
size = FDS_BYTES(n);//將最大文件數(shù)需要的位圖轉(zhuǎn)化為以字節(jié)為單位的大小
bits = stack_fds;//棧上開辟空間的首地址
//棧上空間如果不夠帖池,需要到slab內(nèi)存里面額外分配
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc,kmalloc在 */
ret = -ENOMEM吭净;
bits = kmalloc(6 * size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time);
if (ret < 0)
goto out;
if (!ret) {
ret = -ERESTARTNOHAND;
if (signal_pending(current))
goto out;
ret = 0;
}
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
out:
if (bits != stack_fds)
kfree(bits);
out_nofds:
return ret;
}
fds中的各個成員分別按照最大fds的數(shù)量來獲取位圖在棧上的位置睡汹,并通過get_fd_set()將用戶空間的位圖復制到內(nèi)核態(tài),然后將返回緩沖區(qū)置零寂殉,等待設(shè)置囚巴,接下來最重要的就是do_select函數(shù),該函數(shù)將不斷輪詢各fds的狀態(tài)(通過file->poll())友扰,將就緒的fd加入到返回隊列中彤叉,并同時檢查等待時間,這里是通過被動檢查的方式獲取fd上的io事件村怪,也就是poll函數(shù)的返回值秽浇。注意最后函數(shù)返回的時候,也需要使用用戶傳進來的地址對返回的內(nèi)容進行裝載甚负,也就是這里的set_fd_set操作柬焕;
/**轉(zhuǎn)向do_select的調(diào)用
*
*
**/
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
unsigned long slack = 0;
rcu_read_lock();
retval = max_select_fd(n, fds);//根據(jù)給定的文件描述符表修正n的值
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);//初始化文件等待隊列
wait = &table.pt;//將文件等待隊列和打開文件表關(guān)聯(lián)起來
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);//時間修正算法
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
//獲取每一sizeof(unsigned long)32位長度的位圖
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
//處理每一long的位圖
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
//這里可以看出來是從第一位開始,所以需要加一
if (!(bit & all_bits))
continue;
f = fdget(i);//i是從外循環(huán)計數(shù)的梭域,所以這里i就是文件描述符
if (f.file) {
const struct file_operations *f_op;
f_op = f.file->f_op;
mask = DEFAULT_POLLMASK;
//檢查看看這里的文件對象支不支持poll選項
if (f_op && f_op->poll) {
wait_key_set(wait, in, out, bit);
//將文件注冊入相應(yīng)的等待隊列斑举,如果文件已經(jīng)注冊,則只獲取并獲取狀態(tài)
mask = (*f_op->poll)(f.file, wait);
}
fdput(f);
//將就緒的文件描述符加入到用戶態(tài)隊列中
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
}
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait->_qproc = NULL;
//如果有事件準備就緒病涨,等待時間用完懂昂,當前進程有掛起信號則返回
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec_to_ktime(*end_time);
to = &expire;
}
//時間片沒有用完,則重新獲取等待時間
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
}
poll_freewait(&table);
return retval;
}
這個函數(shù)實現(xiàn)整個輪詢現(xiàn)場没宾,首先外層的無限循環(huán)構(gòu)成整個wait事件到來的主體凌彬,內(nèi)部兩個循環(huán)是用來分片檢查各個fd對應(yīng)事件是否到來(就緒?)循衰,并同時檢查等待時間铲敛,一旦有事件到來或者時間用完就直接返回。內(nèi)核對用戶態(tài)采取絕不信任的原則会钝,所以內(nèi)核不會直接使用用戶傳進來的地址值伐蒋,而是使用相應(yīng)的函數(shù)將用戶態(tài)內(nèi)容復制到內(nèi)核態(tài)工三,同樣的,返回給用戶態(tài)時就要進行一步將就緒的fds位圖按值復制給用戶態(tài)(絕不信任意味著先鱼,用戶態(tài)的地址不能在內(nèi)核態(tài)使用(共享內(nèi)存除外)俭正,一切傳入和返回必須按值拷貝,這樣就增加了系統(tǒng)的負擔焙畔,所以就出現(xiàn)了后來的各種零拷貝技術(shù))
至此我們的select源碼部分就分析的差不多了掸读,接下來我們做個簡短的總結(jié)
- select依賴于fd_set這個結(jié)構(gòu)體,而支持的連接數(shù)量受限于系統(tǒng)的__FD_SETBITS參數(shù)宏多,一般為1024儿惫,當然這種位圖的實現(xiàn)就降低了用戶態(tài)與內(nèi)核態(tài)之間的拷貝開銷;
- select的實現(xiàn)依賴于內(nèi)存在內(nèi)核和用戶空間之間的拷貝伸但,需要支持相應(yīng)的開銷肾请,當連接數(shù)增加時這種開銷會變得難以負擔;
- select使用主動輪詢的方式來獲取所有被監(jiān)聽io上就緒的事件更胖,即輪詢的代價隨著連接數(shù)量的增長呈線性增加
select有效的支持了高并發(fā)請求铛铁,并使得單線程或者單進程能夠同時管理監(jiān)聽多個網(wǎng)絡(luò)io,而之前都是以多線程的方式支持這種多連接却妨。這大大提高了編程的效率避归,和系統(tǒng)性能的可擴展性,但是由于上述的三個主要問題的存在管呵,隨著網(wǎng)絡(luò)io的爆發(fā)式增長梳毙,這種io管理方式仍然無法有效的適應(yīng),所有了一個改進版poll
poll
poll是對select的一個簡單的改進版本捐下,性能上不存在明顯的差異账锹,只是有效的解決了io數(shù)量受限的問題,接下來我們了解一下poll是如何改進這個問題的坷襟。
poll重新定義了一個用于存放fd的結(jié)構(gòu)體奸柬,而不是使用fd_set這樣的位圖結(jié)構(gòu)
struct pollfd
{
int fd; /* 文件描述符 */
short events; /* 等待的事件 */
short revents; /* 實際發(fā)生了的事件 */
} ;
typedef unsigned long nfds_t;
- struct pollfd * fds:是一個struct pollfd結(jié)構(gòu)類型的數(shù)組,用于存放需要檢測其狀態(tài)的socket描述符婴程;每當調(diào)用這個函數(shù)之后廓奕,系統(tǒng)不需要清空這個數(shù)組,操作起來比較方便档叔;特別是對于 socket連接比較多的情況下桌粉,在一定程度上可以提高處理的效率;這一點與select()函數(shù)不同衙四,調(diào)用select()函數(shù)之后铃肯,select() 函數(shù)需要清空它所檢測的socket描述符集合,導致每次調(diào)用select()之前都必須把socket描述符重新加入到待檢測的集合中传蹈;因此押逼,select()函數(shù)適合于只檢測少量socket描述符的情況步藕,而poll()函數(shù)適合于大量socket描述符的情況;
- 如果待檢測的socket描述符為負值挑格,則對這個描述符的檢測就會被忽略咙冗,也就是不會對成員變量events進行檢測,在events上注冊的事件也會被忽略漂彤,poll()函數(shù)返回的時候雾消,會把成員變量revents設(shè)置為0,表示沒有事件發(fā)生显歧;
1. 合法的事件如下:
- POLLIN 有數(shù)據(jù)可讀。
- POLLRDNORM 有普通數(shù)據(jù)可讀确镊。
- POLLRDBAND 有優(yōu)先數(shù)據(jù)可讀士骤。
- POLLPRI 有緊迫數(shù)據(jù)可讀。
- POLLOUT 寫數(shù)據(jù)不會導致阻塞蕾域。
- POLLWRNORM 寫普通數(shù)據(jù)不會導致阻塞拷肌。
- POLLWRBAND 寫優(yōu)先數(shù)據(jù)不會導致阻塞。
- POLLMSG SIGPOLL 消息可用旨巷。
此外巨缘,revents域中還可能返回下列事件:
- POLLER 指定的文件描述符發(fā)生錯誤;
- POLLHUP 指定的文件描述符掛起事件采呐。
- POLLNVAL 指定的文件描述符非法若锁。
這些事件在events域中無意義,因為它們在合適的時候總是會從revents中返回斧吐。使用poll()和select()不一樣又固,你不需要顯式地請求異常情況報告。
POLLIN | POLLPRI等價于select()的讀事件煤率,
POLLOUT |POLLWRBAND等價于select()的寫事件仰冠。
POLLIN等價于POLLRDNORM |POLLRDBAND,
而POLLOUT則等價于POLLWRNORM
如果是對一個描述符上的多個事件感興趣的話蝶糯,可以把這些常量標記之間進行按位或運算就可以了洋只;
比如:對socket描述符fd上的讀、寫昼捍、異常事件感興趣识虚,就可以這樣做:
struct pollfd fds;
fds[nIndex].events=POLLIN | POLLOUT | POLLERR;
當 poll()函數(shù)返回時妒茬,要判斷所檢測的socket描述符上發(fā)生的事件舷礼,可以這樣做:
- 檢測可讀TCP連接請求:if((fds[nIndex].revents & POLLIN) == POLLIN){//接收數(shù)據(jù)/調(diào)用accept()接收連接請求}
- 檢測可寫:
if((fds[nIndex].revents & POLLOUT) == POLLOUT){//發(fā)送數(shù)據(jù)}- 檢測異常:
if((fds[nIndex].revents & POLLERR) == POLLERR){//異常處理}
nfds_t nfds:用于標記數(shù)組fds中的結(jié)構(gòu)體元素的總數(shù)量;
timeout:是poll函數(shù)調(diào)用阻塞的時間郊闯,單位:毫秒
如果timeout==0妻献,那么 poll() 函數(shù)立即返回而不阻塞蛛株,
如果timeout==INFTIM,那么poll() 函數(shù)會一直阻塞下去育拨,直到所檢測的socket描述符上的感興趣的事件發(fā) 生是才返回谨履,如果感興趣的事件永遠不發(fā)生,那么poll()就會永遠阻塞下去熬丧;
2. 返回值:
大于0:數(shù)組fds中準備好讀笋粟、寫或出錯狀態(tài)的那些socket描述符的總數(shù)量;
等于0:數(shù)組fds中沒有任何socket描述符準備好讀析蝴、寫害捕,或出錯;此時poll超時闷畸,超時時間是timeout毫秒尝盼;換句話說,如果所檢測的 socket描述符上沒有任何事件發(fā)生的話佑菩,那么poll()函數(shù)會阻塞timeout所指定的毫秒時間長度之后返回盾沫,
-1: poll函數(shù)調(diào)用失敗,同時會自動設(shè)置全局變量errno殿漠;errno為下列值之一:
3. 錯誤代碼
EBADF: 一個或多個結(jié)構(gòu)體中指定的文件描述符無效赴精。
EFAULTfds : 指針指向的地址超出進程的地址空間。
EINTR : 請求的事件之前產(chǎn)生一個信號绞幌,調(diào)用可以重新發(fā)起蕾哟。
EINVALnfds : 參數(shù)超出PLIMIT_NOFILE值。
ENOMEM : 可用內(nèi)存不足莲蜘,無法完成請求渐苏。
4. 實現(xiàn)機制
poll是一個系統(tǒng)調(diào)用,其內(nèi)核入口函數(shù)為sys_poll菇夸,sys_poll幾乎不做任何處理直接調(diào)用do_sys_poll琼富,do_sys_poll的執(zhí)行過程可以分為三個部分:
- 將用戶傳入的pollfd數(shù)組拷貝到內(nèi)核空間,因此拷貝操作和數(shù)組長度相關(guān)庄新,時間上這是一個O(n)操作鞠眉,這一步的代碼在do_sys_poll中包括從函數(shù)開始到調(diào)用do_poll前的部分。
- 查詢每個文件描述符對應(yīng)設(shè)備的狀態(tài)择诈,如果該設(shè)備尚未就緒械蹋,則在該設(shè)備的等待隊列中加入一項并繼續(xù)查詢下一設(shè)備的狀態(tài)。查詢完所有設(shè)備后如果沒有一個設(shè)備就緒羞芍,這時則需要掛起當前進程等待哗戈,直到設(shè)備就緒或者超時,掛起操作是通過調(diào)用schedule_timeout執(zhí)行的荷科。設(shè)備就緒后進程被通知繼續(xù)運行唯咬,這時再次遍歷所有設(shè)備纱注,以查找就緒設(shè)備。這一步因為兩次遍歷所有設(shè)備胆胰,時間復雜度也是O(n)狞贱,這里面不包括等待時間。相關(guān)代碼在do_poll函數(shù)中蜀涨。
- 將獲得的數(shù)據(jù)傳送到用戶空間并執(zhí)行釋放內(nèi)存和剝離等待隊列等善后工作瞎嬉,向用戶空間拷貝數(shù)據(jù)與剝離等待隊列等操作的的時間復雜度同樣是O(n),具體代碼包括do_sys_poll函數(shù)中調(diào)用do_poll后到結(jié)束的部分厚柳。
5. 注意事項
- poll() 函數(shù)不會受到socket描述符上的O_NDELAY標記和O_NONBLOCK標記的影響和制約氧枣,也就是說,不管socket是阻塞的還是非阻塞 的别垮,poll()函數(shù)都不會受到影響便监;
- poll()函數(shù)則只有個別的的操作系統(tǒng)提供支持(如:SunOS、Solaris宰闰、AIX茬贵、HP提供 支持簿透,但是Linux不提供支持)移袍,可移植性差;
接下來我們看看epoll是如何解決select和poll是仍然存在的問題
content
- epoll的概念
- 和epoll相關(guān)的結(jié)構(gòu)體
- epoll_create
- epoll_ctl
- epoll_wait
1. epoll的概念
epoll 全稱 eventpoll老充,是 linux 內(nèi)核2.6以后IO多路復用(IO multiplexing)的一個改進版實現(xiàn)葡盗,顧名思義,我們可以理解epoll是一種基于事件被動響應(yīng)機制的io管理方案啡浊。IO多路復用的意思是在一個操作里同時監(jiān)聽多個輸入輸出源觅够,在其中一個或多個輸入輸出源可用的時候返回,然后對其的進行讀寫操作巷嚣。在select和poll的基礎(chǔ)上喘先,epoll主要在以下幾個方面做出了改進:
- 更高級的內(nèi)存機制(slab分配,內(nèi)存共享)避免了內(nèi)核和用戶之間的數(shù)據(jù)拷貝廷粒;
- 被動響應(yīng)diss主動輪詢窘拯;
- 數(shù)據(jù)結(jié)構(gòu)級的性能優(yōu)化(紅黑樹提高查詢效率,就緒鏈表存儲就緒時間坝茎,中間鏈表用于存儲epoll_wait返回后產(chǎn)生的就緒事件)
2. epoll相關(guān)的結(jié)構(gòu)體
- 大總管eventpoll
// epoll的核心實現(xiàn)對應(yīng)于一個epoll描述符
struct eventpoll {
spinlock_t lock; //自旋鎖涤姊,用于進程間同步,忙等
struct mutex mtx; //互斥鎖嗤放,用于臨界區(qū)互斥訪問思喊,會pending,c++里面會配合條件變量使用
wait_queue_head_t wq; // epoll_wait中pending的進程就被加入到該等待隊列里面
wait_queue_head_t poll_wait; // f_op->poll()時次酌,監(jiān)聽的fds事件在這里等待
struct list_head rdllist; //已就緒的epitem 列表恨课,將它與未就緒的事件分開舆乔,有助于快速給epoll_wait做出響應(yīng)
struct rb_root rbr; //保存所有加入到當前epoll的文件對應(yīng)的epitem,快速查詢
struct epitem *ovflist; // 當正在向用戶空間復制數(shù)據(jù)時, 產(chǎn)生的就緒事件庄呈,所以在下一次epoll_wait到來之時需要將該鏈表中的數(shù)據(jù)加入到返回隊列中去蜕煌;
struct user_struct *user;
struct file *file; //該文件描述符會被映射至一個匿名文件,用于建立自己的文件系統(tǒng)
int visited;
struct list_head visited_list_link; //用于避免重復查詢诬留,深度遞歸查詢斜纪,優(yōu)化查詢結(jié)構(gòu)
}
epoll_create就是用于創(chuàng)建該結(jié)構(gòu)體并獲得一個指針epfd
- 普通員工epitem
struct epitem {
struct rb_node rbn;
struct list_head rdllink;
struct epitem *next;
struct epoll_filefd ffd; //文件描述符fd+file信息,公共構(gòu)成了紅黑樹節(jié)點的key
int nwait; //被掛載的poll_wait數(shù)量
struct list_head pwqlist; //一個文件可能被監(jiān)聽多個事件文兑,所以需要用鏈表把這多個事件所在的wait_queue串起來
// 當前epitem 的所有者
struct eventpoll *ep; //指向父節(jié)點
struct list_head fllink; //文件鏈表啥的盒刚,沒搞懂暫時
struct epoll_event event; //ctl傳入的文件事件,兩個域绿贞,第一個域是events因块,第二個域data是一個聯(lián)合體,但是一般情況下我們使用void *ptr指向用戶創(chuàng)建的結(jié)構(gòu)體籍铁,便于內(nèi)核給用戶返回數(shù)據(jù)用
};
- 被操作的實體eppoll_entry
// 與一個文件上的一個wait_queue_head 相關(guān)聯(lián)涡上,因為同一文件可能有多個等待的事件,
//這些事件可能使用不同的等待隊列
struct eppoll_entry {
struct list_head llink; //pwq_list的節(jié)點
struct epitem *base; //父節(jié)點
wait_queue_t wait; //wait_queue中的節(jié)點
wait_queue_head_t *whead; //wait_queue的頭結(jié)點
};
這些結(jié)構(gòu)體的設(shè)計很精巧拒名,既賦予epoll足夠的io管理能力(fd的多事件等待隊列吩愧,用于管理io的紅黑樹,用于轉(zhuǎn)載就緒事件的鏈表)增显,同時也為用戶提供了便利的數(shù)據(jù)傳遞接口(poll_event便于就緒隊列的返回雁佳,epoll_ctl接口使得內(nèi)核和用戶態(tài)成功解耦)。他們從某種程度上相當于重建了一個epoll文件系統(tǒng)同云,其中的目錄項(epitem)和文件節(jié)點(eppoll_entry)均在slab高速cache中分配糖权;
2. epoll_create
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* 持久化設(shè)置 */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);//分配相應(yīng)的結(jié)構(gòu)體
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));//創(chuàng)建匿名文件inode
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);//將file和fd連接起來
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
epoll_create里面主要創(chuàng)建event_poll和其內(nèi)部數(shù)據(jù),并構(gòu)建一個匿名文件用于映射當前的文件系統(tǒng)炸站,便于將加入進來的fd裝載進這樣的文件系統(tǒng)星澳,并返回匿名文件的文件描述符旱易;注意:文件對象(匿名)屬于當前進程內(nèi)核數(shù)據(jù)届垫,只能被內(nèi)核態(tài)訪問,用戶態(tài)訪問必須使用相應(yīng)接口,這里借助文件系統(tǒng)的思維解耦了用戶應(yīng)用層和內(nèi)核底層。
3. epoll_ctl
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
/* Get the "struct file *" for the target file */
tfile = fget(fd);
if (!tfile)
goto error_fput;
/* The target file descriptor must support poll */
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll)
goto error_tgt_fput;
/* Check if EPOLLWAKEUP is allowed */
if ((epds.events & EPOLLWAKEUP) && !capable(CAP_BLOCK_SUSPEND))
epds.events &= ~EPOLLWAKEUP;
/*
* We have to check that the file structure underneath the file descriptor
* the user passed to us _is_ an eventpoll file. And also we do not permit
* adding an epoll file descriptor inside itself.
*/
error = -EINVAL;
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = file->private_data;
/*
* When we insert an epoll file descriptor, inside another epoll file
* descriptor, there is the change of creating closed loops, which are
* better be handled here, than in more critical paths. While we are
* checking for loops we also determine the list of files reachable
* and hang them on the tfile_check_list, so we can check that we
* haven't created too many possible wakeup paths.
*
* We need to hold the epmutex across both ep_insert and ep_remove
* b/c we want to make sure we are looking at a coherent view of
* epoll network.
*/
if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {
mutex_lock(&epmutex);
did_lock_epmutex = 1;
}
if (op == EPOLL_CTL_ADD) {
if (is_file_epoll(tfile)) {
error = -ELOOP;
if (ep_loop_check(ep, tfile) != 0) {
clear_tfile_check_list();
goto error_tgt_fput;
}
} else
list_add(&tfile->f_tfile_llink, &tfile_check_list);
}
mutex_lock_nested(&ep->mtx, 0);
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (did_lock_epmutex)
mutex_unlock(&epmutex);
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
epoll_ctl主要干以下幾件事
- 檢查動作是否存在,將用戶空間的epoll_event復制進內(nèi)核空間,注意event的data結(jié)是一個指向用戶態(tài)的指針,不可信任完残;
- 根據(jù)傳進來的fd獲取文件對象扎拣,然后判斷當前文件對象是否支持poll操作;
- 檢查文件對象和當前epoll指向的匿名文件是否相同,避免循環(huán)遞歸;
- 檢查當前文件是否被重復訪問(只在當前ctl接口被訪問時),之前的check_list被使用到(這種情況什么時候會發(fā)生??);
- 加鎖訪問紅黑樹,使用fd和file指針;
- 根據(jù)操作類型選擇epoll接口细睡,ep_insert,ep_modify,ep_remove;
- 然后根據(jù)新加入的fd事件構(gòu)建epitem節(jié)點蠢壹,并將其加入到紅黑樹, 注冊初始化poll回調(diào)函數(shù)指針ep_ptable_queue_proc(這里只需要初始化一次疏日,而select需要多次初始化)宾肺,該回調(diào)函數(shù)進一步將epitem加入到相應(yīng)的等待隊列上,然后給給其注冊一個回調(diào)函數(shù)ep_poll_callback隘谣,用于有事件到來的時候?qū)⑵涮砑舆M就緒隊列增拥,并給epoll_wait所在的等待隊列發(fā)送wakeup通知,喚醒所有等待就緒事件的epoll_wait;
4. epoll_wait
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
////
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
epoll_wait做那幾件事
epoll_wait-->ep_poll中等待檢查就緒隊列,睡眠跪者,喚醒--->ep_send_events--->ep_scan_ready_list棵帽,將rdlist數(shù)據(jù)拷貝到txlist(用于用戶數(shù)據(jù)的拷貝),清空rdlist渣玲,準備ovflist --->ep_send_events_proc, 將txlist數(shù)據(jù)拷貝到用戶空間逗概,并*檢查是否是LT模式,如果是將event重新加入到rdlist--->并將ovflist在這一過程中收到的數(shù)據(jù)加入到rdlist中忘衍,供下次訪問;
spin_lock_irqsave(&ep->lock, flags);
/* 這一步要注意, 首先, 所有監(jiān)聽到events的epitem都鏈到rdllist上了,
* 但是這一步之后, 所有的epitem都轉(zhuǎn)移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經(jīng)被清空了! */
list_splice_init(&ep->rdllist, &txlist);
/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 如果調(diào)用ep_poll_callback()函
*數(shù)的時候發(fā)現(xiàn)epoll對象eventpoll的ovflist成員不等于EP_UNACTIVE_PTR
*的話逾苫,說明此時正在掃描rdllist鏈表,這個時候會將就緒事件對應(yīng)的epitem
*對象加入到ovflist鏈表暫存起來枚钓,等rdllist鏈表掃描完之后在將ovflist鏈表中
*的內(nèi)容移動到rdllist鏈表中保存后下次再處理... */
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
- 如果未指定超時直接跳轉(zhuǎn)到check_events铅搓,通常用于非阻塞fd。指定超時搀捷,走正常流程fetch_events:將當前進程掛在等待隊列睡眠星掰,當相應(yīng)待監(jiān)聽事件就緒時會有回調(diào)ep_poll_callback喚醒。喚醒時調(diào)用ep_events_available檢查就緒鏈表嫩舟,不像select每次都需要輪詢氢烘,這里epoll只需要檢查下就緒鏈表是否為空(時間復雜度O(1))。ep_send_events調(diào)用ep_scan_ready_list執(zhí)行回調(diào)ep_read_events_proc遍歷就緒鏈表并傳入用戶空間以及回調(diào)執(zhí)行期間發(fā)生的事件通過eventpoll的ovflist(回調(diào)執(zhí)行前置空)將就緒fd去重插入就緒鏈表以便下一次epoll_wait調(diào)用處理家厌,這里還涉及ET和LT模式下的不同處理播玖,暫時不作分
總結(jié):
epoll使用匿名文件映射的方式實現(xiàn)了用戶態(tài)和內(nèi)核態(tài)高效的消息傳遞
使用紅黑樹提高查詢效率,使用就緒鏈表存儲需要返回給用戶態(tài)的fds饭于,這個數(shù)量相對較小蜀踏,所以拷貝帶來的開銷就相應(yīng)降低
使用AIO的方式給每一個監(jiān)聽事件注冊一個ep_poll_callback的回調(diào)函數(shù),該函數(shù)主動將就緒事件加入到就緒隊列掰吕,無需epoll主動輪詢果覆,是epoll_wait能夠快速返回
references:
https://blog.csdn.net/baiye_xing/article/details/76352935
https://tqr.ink/2017/10/05/implementation-of-epoll/