為了防止服務(wù)器accept()后阻塞等客戶端連接或阻塞等客戶端的寫入, 將這個工作交給內(nèi)核去做, 以提高效率, 這叫做多路I/O復(fù)用模型.
select()
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
先聲明fd_set readfds;
, 然后把文件集作為傳入傳出參數(shù)傳入, 當select()返回時readfds的位圖可能已發(fā)生變化(客戶端寫入完畢要讀取了), 需挨個用FD_ISSET()判斷fd是否仍在readfds中.
參數(shù)1: nfds, 代表所有監(jiān)聽的文件描述符中, 最大的文件描述符+1
參數(shù)2: readfds, 所監(jiān)聽的文件描述符"可讀"事件
參數(shù)3: writefds, 所監(jiān)聽的文件描述符"可寫"事件
參數(shù)4: exceptfds, 所監(jiān)聽的文件描述符"異常"事件
參數(shù)5: 設(shè)置未發(fā)生改變時最大等待時間
返回值: 成功: 所監(jiān)聽的所有監(jiān)聽集合中, 滿足條件的總數(shù)(讀+寫+異常), 只有監(jiān)聽的集合至少有一個滿足了才會返回; 失敗返回-1設(shè)置errno.
有4個函數(shù)控制fd_set(二進制位圖類型)的狀態(tài):
void FD_ZERO(fd_set *set);
將set清空為0
void FD_SET(int fd, fd_set *set);
將fd設(shè)置到set集合中, 對應(yīng)位置為1
void FD_CLR(int fd, fd_set *set);
將fd從set中清除出去, 對應(yīng)位置為0
int FD_ISSET(int fd, fd_set *set);
判斷fd是否在集合中
在socket未建立連接時, 只需要監(jiān)聽初始listen_sockfd的讀事件(讀建立連接請求), 當接收到建立連接的請求后, 服務(wù)器再調(diào)用accept()處理(無需阻塞), 同時返回新的sockfd監(jiān)聽讀寫事件.
select缺點:
1.select受文件描述符數(shù)量限制, 只能監(jiān)聽小于1024個客戶端. (改ulimit -a都沒用)
2.只能挨個fd循環(huán)檢查, 當fd間隔過大時, 過于耗時. 需要維護一個數(shù)組存放要監(jiān)聽的fd.
3.fd_set作為傳入傳出參數(shù), 會被select修改, 導(dǎo)致用戶設(shè)置的初始監(jiān)聽狀態(tài)不保存, 需要手動備份一下該fd_set.
在每次調(diào)用select()之前, 先把包含listenfd和所有當前客戶端fd的allset復(fù)制給readset, 之后readset作為select()的傳入傳出參數(shù), 如果某個客戶端fd沒有寫入則自動將其從readset中移除, 并返回所有fd_set中發(fā)生變化的數(shù)量nready. 之后, 對于listen_fd要特意檢查, 如果有新客戶端建立連接(readset中l(wèi)istenfd已發(fā)生變化), 將其fd添加到allset和自定義數(shù)組(初始值全為-1)中以待下次監(jiān)聽.
之后, 處理完listenfd新建立的連接后, 挨個檢查readset中每一個fd是否仍存在(即已發(fā)生變化), 并對變化的進行read()操作. 此時的readset是上一次的allset, 不包含新添加的連接, 所以是逐個比較判斷上次建立過的連接這次是否發(fā)生變化(寫入).
poll()
poll()相對于select()的改進:
1.修改ulimit -a突破1024個文件描述符限制
2.監(jiān)聽和返回文件集分離, 不用再復(fù)用傳入傳出參數(shù)
3.傳入?yún)?shù)為事件數(shù)組, 可以縮小遍歷范圍, 不用全遍歷一遍
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
&fds是結(jié)構(gòu)體數(shù)組首地址, nfds是數(shù)組元素數(shù)量, timeout傳-1阻塞等/0立即返回/大于0等待指定秒數(shù). 返回值同select()仍是發(fā)生變化的數(shù)量, 如果想同時監(jiān)聽一個fd的讀和寫就把數(shù)組兩個結(jié)構(gòu)體元素的fd賦值成同一個fd.
struct pollfd fds[5000]; // 修改ulimit -a突破1024
fds[0].fd = listenfd;
fds[0].events = POLLIN/POLLOUT/POLLERR
fds[0].revents = 0 // 由系統(tǒng)自動返回, 可以不用初始化
fds[1].fd = fd1;
fds[1].events = POLLIN/POLLOUT/POLLERR
poll(fds, 5, -1) // 傳入數(shù)量是數(shù)組實際初始化過的數(shù)量, 不是最大數(shù)量
使用if(fds[i].revents & POLLIN)
判斷返回事件是否符合條件.
如果read()返回-1, 可能是服務(wù)器斷了(收到SYN沒返回ACK), 也可能是客戶端斷了. 使用errno==ECONNRESET
判斷是否是服務(wù)器沒有返回ACK, 如果服務(wù)端斷了要把sockfd關(guān)閉, 并把數(shù)組中fd置為-1, 重新三次握手. 如果客戶端斷了直接perror, exit()退出.
epoll()
epoll相對于poll的改進:
1.無需遍歷整個event數(shù)組, 只需要遍歷活躍的事件數(shù)組, 甚至不用遍歷(只監(jiān)聽1個), 直接回調(diào).
2.可以直接使用回調(diào)函數(shù)加速處理
3.內(nèi)部的mmap與用戶空間交換速率更高
4.ET搭配非阻塞I/O效率比LT更高
在大量連接數(shù)和少量監(jiān)聽數(shù)的情況下, 使用epoll()才有效果. 如果3-1023的大部分都監(jiān)聽, 和select()差不多.
malloc()和mmap()底層都是調(diào)linux的kmalloc().
int epoll_create(int num)
的參數(shù)只是建議值, 表示要監(jiān)聽的文件描述符個數(shù)(紅黑樹節(jié)點數(shù)), 返回紅黑樹根節(jié)點為epfd(第一個可用的fd). 紅黑樹是平衡二叉樹一種, 其左右子樹高度差小于1, 使用二分查找最快.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第一個參數(shù)是epoll_create()返回的epfd根節(jié)點, 第二個參數(shù)是要采取的行動, 有EPOLL_CTL_ADD
, EPOLL_CTL_MOD
, EPOLL_CTL_DEL
三種. 第三個參數(shù)表示要操作的文件描述符, 第四個參數(shù)的結(jié)構(gòu)體有事件類型和union聯(lián)合體(指針/數(shù)據(jù))兩種成員, 事件類型有EPOLLIN/EPOLLOUT/EPOLLERR, 聯(lián)合體內(nèi)的fd和第三個參數(shù)傳一樣的值. 聯(lián)合體內(nèi)的泛型指針也可以傳包含函數(shù)指針的結(jié)構(gòu)體指針進去, 這樣就可以直接調(diào)用事件的處理函數(shù)(回調(diào)函數(shù)).
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
這里的events是一個數(shù)組, 傳出參數(shù), 表示滿足條件的fd. maxevents是數(shù)組大小, timeout同上-1阻塞/0非阻塞/等待毫秒數(shù). 成功返回有多少個fd符合條件.
int epfd = epoll_create(100);
struct epoll_event events;
events.events = EPOLLIN; // LT水平觸發(fā)(默認)
// events.events = EPOLLIN | EPOLLET; // ET邊沿觸發(fā)
event.data.fd = lfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &events);
struct epoll_event events[100];
epoll_wait(epfd, events, 100, -1);
解決ET一次讀不完的缺點, 需要改變epoll為非阻塞等待: 使用fcntl()修改套接字屬性為非阻塞, 然后輪詢的讀(while(len = read() > 0)), 這樣read()不阻塞還只調(diào)用了一次epoll_wait, 優(yōu)于LT. 否則即使LT不阻塞, 但沒讀完也要重新調(diào)一次epoll_wait()
flag = fcntl(connfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
epoll反應(yīng)堆模型
反應(yīng)堆模型防止client不能寫的情況下還去寫, 造成服務(wù)器阻塞, 比如滑動窗口滿了的狀態(tài)或者client意外關(guān)閉(TCP會傳RST給server).
普通流程:
epoll_wait() --- 服務(wù)器 --- 監(jiān)聽 --- lfd/cfd --- 可讀事件 --- epoll_wait()返回 --- read --- 小寫轉(zhuǎn)大寫 --- write --- epoll_wait()繼續(xù)監(jiān)聽
反應(yīng)堆流程:
epoll_wait() --- 服務(wù)器 --- 監(jiān)聽 --- lfd/cfd --- 可讀事件 --- epoll_wait()返回 --- read --- cfd從紅黑樹上摘下 --- 設(shè)置監(jiān)聽cfd寫事件及回調(diào)函數(shù),]節(jié)點上樹 --- 進行操作(如小寫轉(zhuǎn)大寫) --- 等待epoll_wait()返回 --- write回寫客戶端 --- cfd從樹上摘下 --- 設(shè)置監(jiān)聽cfd讀事件及回調(diào)函數(shù) --- epoll_wait()繼續(xù)監(jiān)聽
回調(diào)函數(shù)的參數(shù)就是struct myevent_s結(jié)構(gòu)體本身, 該結(jié)構(gòu)體存儲了fd, 回調(diào)函數(shù)及其參數(shù), 最后活躍時間等各種需要用到的變量. 可以通過最后活躍時間關(guān)閉60秒內(nèi)不和服務(wù)器通信的客戶端.