在Linux中IO復(fù)用機(jī)制主要目的是為了實(shí)現(xiàn)在單進(jìn)程環(huán)境下影斑,同時(shí)監(jiān)控多個(gè)描述符(文件描述符请契,套接字描述符等)的目的。一旦監(jiān)控的描述符就緒做裙,就會(huì)通知相應(yīng)進(jìn)程并解除該進(jìn)程的阻塞狀態(tài)蜈首,使進(jìn)程能夠?qū)途w描述符進(jìn)行處理。
一欠母、什么是IO復(fù)用
UNIX有五大IO模型:
- 1欢策、異步IO(asynchronous IO)
- 2、阻塞IO(blocking IO)
- 3赏淌、非阻塞IO(nonblocking IO)
- 4踩寇、IO多路復(fù)用(IO multiplexing)
- 5、信號(hào)驅(qū)動(dòng)的IO(signal drivened IO)
1六水、異步IO
其中前四種IO統(tǒng)稱為同步IO俺孙。那么同步IO和異步IO的區(qū)別是什么呢?
在同步文件IO中掷贾,線程啟動(dòng)一個(gè)IO操作然后就立即進(jìn)入等待狀態(tài)睛榄,直到IO操作完成后才醒來繼續(xù)執(zhí)行。而異步文件IO方式中想帅,線程發(fā)送一個(gè)IO請(qǐng)求到內(nèi)核场靴,然后繼續(xù)處理其他的事情,內(nèi)核完成IO請(qǐng)求后,將會(huì)通知線程IO操作完成了旨剥。
如果某進(jìn)程有大量IO需要處理咧欣,則選擇異步IO方式可以顯著提高效率。因?yàn)檫x擇異步IO之后轨帜,進(jìn)程不必等待IO完成魄咕,即可繼續(xù)處理其他的事情,直到IO事件結(jié)束蚌父,操作系統(tǒng)會(huì)通知該進(jìn)程IO已經(jīng)結(jié)束哮兰。
2、阻塞IO
從上圖可以直觀的看出:進(jìn)程在使用
recvfrom()
系統(tǒng)調(diào)用后梢什,進(jìn)程由用戶態(tài)轉(zhuǎn)為核心態(tài)奠蹬,同時(shí)內(nèi)核檢測(cè)recvfrom()
函數(shù)對(duì)應(yīng)fd
的數(shù)據(jù)集中沒有新數(shù)據(jù)傳入,并開始等待嗡午。等待過程中整個(gè)進(jìn)程是阻塞的囤躁,不占用任何CPU資源。直到內(nèi)核發(fā)現(xiàn)有新的數(shù)據(jù)寫入對(duì)應(yīng)數(shù)據(jù)集荔睹,此時(shí)內(nèi)核就喚醒處于就緒隊(duì)列的該進(jìn)程狸演。進(jìn)程喚醒后仍處于核心態(tài),將數(shù)據(jù)集中的數(shù)據(jù)拷貝并通過recvfrom()
函數(shù)返回后僻他,此進(jìn)程才由核心態(tài)轉(zhuǎn)化為用戶態(tài)宵距,并繼續(xù)之后的工作。由上述過程可知吨拗,如果使用阻塞IO满哪,那么在等待IO就緒的過程中,整個(gè)進(jìn)程是無法進(jìn)行任何操作且不占用CPU資源的劝篷。直到IO結(jié)束哨鸭,內(nèi)核喚醒該進(jìn)程,該進(jìn)程才能繼續(xù)執(zhí)行娇妓。
3像鸡、非阻塞IO
從上圖可知:在非阻塞IO的情況下,用戶進(jìn)程調(diào)用
recvfrom()
系統(tǒng)調(diào)用后哈恰,如果IO沒有就緒只估,則不會(huì)等待IO就緒直接返回。因此如果想要等到IO結(jié)束着绷,就需要不斷的向內(nèi)核詢問IO是否完成蛔钙。上述過程可以看出,非阻塞IO的特點(diǎn)就是需要不斷向服務(wù)器詢問IO是否就緒蓬戚。
4夸楣、IO多路復(fù)用
上圖用
select
機(jī)制作為例子,如果一個(gè)進(jìn)程調(diào)用了select
,那么整個(gè)進(jìn)程都會(huì)被阻塞豫喧,直到select
所監(jiān)聽的所有fd
中石洗,出現(xiàn)完成IO的情況,進(jìn)程就會(huì)解除阻塞紧显。一個(gè)進(jìn)程通過調(diào)用
select
函數(shù)監(jiān)聽多個(gè)fd
的IO情況讲衫,就是一個(gè)典型的IO多路復(fù)用的例子。
二孵班、Select與Poll機(jī)制
1涉兽、Select機(jī)制
我們先分析一下select函數(shù)
/**
* 該函數(shù)是select函數(shù)的聲明
* @parameter: maxfdp1 指定待測(cè)試的文件描述字個(gè)數(shù),它的值是待測(cè)試的最大描述字加1篙程。
* @parameter: *readset/*writeset/*exceptset 均為fd_set類型枷畏,可以將fd_set理解為一個(gè)集合,這個(gè)集合中存放的是文件描述符虱饿。中間的三個(gè)參數(shù)指定我們要讓內(nèi)核測(cè)試讀拥诡、寫和異常條件的文件描述符集合。如果對(duì)某一個(gè)的條件不感興趣氮发,就可以把它設(shè)為空指針渴肉。
* @parameter: *timeout timeout告知內(nèi)核等待所指定文件描述符集合中的任何一個(gè)就緒可花多少時(shí)間。其timeval結(jié)構(gòu)用于指定這段時(shí)間的秒數(shù)和微秒數(shù)爽冕。
* @return: int 若有就緒描述符返回其數(shù)目仇祭,若超時(shí)則為0,若出錯(cuò)則為-1
*/
int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);
//以下是select的具體使用方法:
FD_ZERO(&fds); //每次循環(huán)都必須清空FD_Set
FD_SET(sock_fd, &fds); //將sock_fd加入集合
//此select設(shè)定為對(duì)整個(gè)集合內(nèi)的fd寫監(jiān)聽颈畸,監(jiān)聽的最長(zhǎng)時(shí)間為timeout
int n = select(maxfd, NULL, &fds, NULL, &timeout);
switch(n) {
case -1:
fprintf(stderr, "Select error:%s \n\a", strerror(errno));
exit(1);
case 0:
printf("select time out, lost packet!\n");
......
break;
default:
//判斷sock_fd是否還在集合中
if(FD_ISSET(sock_fd, &fds)) {
//還在集合中則說明對(duì)該fd監(jiān)聽到了寫操作
......
} else{
//沒有對(duì)該fd監(jiān)聽到寫操作
......
}
}
select
的FD_SET
是通過一組宏函數(shù)進(jìn)行實(shí)現(xiàn)的乌奇,支持的最大監(jiān)聽數(shù)不超過1024個(gè)。select
函數(shù)底層是通過輪詢機(jī)制實(shí)現(xiàn)的眯娱,因此對(duì)CPU占用很高华弓。同時(shí),每次調(diào)用select
都需要把fd_set
集合從用戶態(tài)拷貝到內(nèi)核態(tài)困乒,因此該函數(shù)的效率很低。
2贰谣、Poll機(jī)制
poll
的機(jī)制與select
類似娜搂,與select
在本質(zhì)上沒有多大差別,管理多個(gè)描述符也是進(jìn)行輪詢吱抚,根據(jù)描述符的狀態(tài)進(jìn)行處理百宇。poll
機(jī)制相對(duì)于select
機(jī)制,解決了select
的最大文件描述符支持為1024的問題(支持任意大小的描述符集)秘豹,并沒有解決性能開銷問題携御。
下面是pll的函數(shù)原型:
//poll改變了文件描述符集合的描述方式,使用了pollfd結(jié)構(gòu)而不是select的fd_set結(jié)構(gòu),
//使得poll支持的文件描述符集合限制遠(yuǎn)大于select的1024
typedef struct pollfd {
int fd; // 需要被檢測(cè)或選擇的文件描述符
short events; // 對(duì)文件描述符fd上感興趣的事件
short revents; // 文件描述符fd上當(dāng)前實(shí)際發(fā)生的事件
} pollfd_t;
/**
* 實(shí)現(xiàn)IO多路復(fù)用的poll函數(shù)
* @parameter: fds fds是一個(gè)struct pollfd類型的數(shù)組啄刹,用于存放需要檢測(cè)其狀態(tài)的socket描述符涮坐,
* 并且調(diào)用poll函數(shù)之后fds數(shù)組不會(huì)被清空;一個(gè)pollfd結(jié)構(gòu)體表示一個(gè)被監(jiān)視的文件描述符誓军,
* 通過傳遞fds指示 poll() 監(jiān)視多個(gè)文件描述符袱讹。其中,結(jié)構(gòu)體的events域是監(jiān)視該文件描述符的事件掩碼昵时,
* 由用戶來設(shè)置這個(gè)域捷雕,結(jié)構(gòu)體的revents域是文件描述符的操作結(jié)果事件掩碼,內(nèi)核在調(diào)用返回時(shí)設(shè)置這個(gè)域壹甥。
* @parameter: nfds 記錄數(shù)組fds中描述符的總數(shù)量
* @parameter: timeout 最長(zhǎng)阻塞時(shí)間
*/
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
三救巷、Epoll機(jī)制
epoll
在Linux2.6內(nèi)核正式提出,是基于事件驅(qū)動(dòng)的IO方式句柠,相對(duì)于select
來說浦译,epoll
沒有描述符個(gè)數(shù)限制,使用一個(gè)文件描述符管理多個(gè)描述符俄占,將用戶關(guān)心的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中管怠,這樣在用戶空間和內(nèi)核空間的copy只需一次。
Linux中提供的epoll
相關(guān)函數(shù)如下:
/**
* epoll_create 函數(shù)創(chuàng)建一個(gè)epoll句柄缸榄。
* @parameter: size 參數(shù)size表明內(nèi)核要監(jiān)聽的描述符數(shù)量渤弛。
* @return: 調(diào)用成功時(shí)返回一個(gè)epoll句柄描述符,失敗時(shí)返回-1甚带。
*/
int epoll_create(int size);
//epoll_event 結(jié)構(gòu)體定義如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
/**
* epoll_ctl 函數(shù)注冊(cè)要監(jiān)聽的事件類型她肯。
* @parameter: epfd 表示epoll句柄。由上述 epoll_create() 函數(shù)申請(qǐng)得到鹰贵。
* @parameter: op 表示fd操作類型晴氨,有3種:EPOLL_CTL_ADD 注冊(cè)新的fd到epfd中;EPOLL_CTL_MOD
* 修改已注冊(cè)的fd的監(jiān)聽事件;EPOLL_CTL_DEL 從epfd中刪除一個(gè)fd。
* @parameter: fd 是要監(jiān)聽的描述符碉输。
* @parameter: event 表示要監(jiān)聽的事件籽前。
* @return: 調(diào)用成功時(shí)返回一個(gè)epoll句柄描述符,失敗時(shí)返回-1敷钾。
*/
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/**
* epoll_wait 函數(shù)等待事件的就緒枝哄。
* @parameter: epfd 表示epoll句柄。由上述 epoll_create() 函數(shù)申請(qǐng)得到阻荒。
* @parameter: events 表示從內(nèi)核得到的就緒事件集合挠锥。
* @parameter: maxevents 告訴內(nèi)核events的大小。
* @parameter: timeout 表示等待的超時(shí)事件侨赡。
* @return: 成功時(shí)返回就緒的事件數(shù)目蓖租,調(diào)用失敗時(shí)返回 -1粱侣,等待超時(shí)返回 0。
*/
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
epoll
是Linux內(nèi)核為處理大批量文件描述符而作了改進(jìn)的poll
蓖宦,是Linux下多路復(fù)用IO接口select/poll
的增強(qiáng)版本齐婴,它能顯著提高程序在大量并發(fā)連接中只有少量活躍的情況下的系統(tǒng)CPU利用率。原因就是獲取事件的時(shí)候球昨,它無須遍歷整個(gè)被偵聽的描述符集尔店,只要遍歷那些被內(nèi)核IO事件異步喚醒而加入Ready隊(duì)列的描述符集合就行了。
epoll
除了提供select/poll
那種IO事件的水平觸發(fā)(Level Triggered)外主慰,還提供了邊緣觸發(fā)(Edge Triggered)嚣州,這就使得用戶空間程序有可能緩存IO狀態(tài),減少epoll_wait/epoll_pwait
的調(diào)用共螺,提高應(yīng)用程序效率该肴。
水平觸發(fā)(LT):默認(rèn)工作模式,即當(dāng)epoll_wait
檢測(cè)到某描述符事件就緒并通知應(yīng)用程序時(shí)藐不,應(yīng)用程序可以不立即處理該事件匀哄;下次調(diào)用epoll_wait
時(shí),會(huì)再次通知此事件
邊緣觸發(fā)(ET): 當(dāng)epoll_wait
檢測(cè)到某描述符事件就緒并通知應(yīng)用程序時(shí)雏蛮,應(yīng)用程序必須立即處理該事件涎嚼。如果不處理,下次調(diào)用epoll_wait
時(shí)挑秉,不會(huì)再次通知此事件法梯。(直到你做了某些操作導(dǎo)致該描述符變成未就緒狀態(tài)了,也就是說邊緣觸發(fā)只在狀態(tài)由未就緒變?yōu)榫途w時(shí)只通知一次)犀概。
LT和ET原本應(yīng)該是用于脈沖信號(hào)的立哑,可能用它來解釋更加形象。Level和Edge指的就是觸發(fā)點(diǎn)姻灶,Level為只要處于水平铛绰,那么就一直觸發(fā),而Edge則為上升沿和下降沿的時(shí)候觸發(fā)产喉。比如:0->1 就是Edge捂掰,1->1 就是Level。
ET模式很大程度上減少了epoll事件的觸發(fā)次數(shù)曾沈,因此效率比LT模式下高尘颓。
select
、poll
晦譬、epoll
三者機(jī)制的區(qū)別如下圖所示:
上述對(duì)比圖存在錯(cuò)誤:
epoll
的底層實(shí)現(xiàn)應(yīng)該是紅黑樹!epoll
是Linux
目前大規(guī)模網(wǎng)絡(luò)并發(fā)程序開發(fā)的首選模型互广。在絕大多數(shù)情況下性能遠(yuǎn)超select
和poll
敛腌。目前流行的高性能web服務(wù)器Nginx正式依賴于epoll
提供的高效網(wǎng)絡(luò)套接字輪詢服務(wù)卧土。但是,在并發(fā)連接不高的情況下像樊,多線程+阻塞I/O方式可能性能更好尤莺。以下是一個(gè)簡(jiǎn)單的基于
epoll
的C/S
網(wǎng)絡(luò)通信示例:服務(wù)器代碼(只看
Server
類的定義和main函數(shù)即可):
//
// Created by mylord on 2019/10/26.
//
#ifndef FILETRANSER_SERVER_H
#define FILETRANSER_SERVER_H
#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <vector>
#include <string.h>
#include <algorithm>
#include <sys/epoll.h>
#define MAX_BUFF_SIZE 1024
using namespace std;
typedef struct {
sockaddr_in client_sock;
int client_fd;
string client_ip;
}ClientInfo;
class Server {
private:
vector<ClientInfo> client_info;
int listen_fd, listen_port, listen_size;
sockaddr_in server_addr;
public:
Server(int port = 5555, int size = 10);
void Init();
void Listen();
int AcceptConnection();
int Write(int sock_fd, char buff[]);
int Read(int sock_fd, char buff[]);
int Close(int client_fd);
int getListenFd() const;
};
#endif //FILETRANSER_SERVER_H
int main() {
Server server(5555, 10);
char buffer[1204];
struct epoll_event ev, events[10]; //聲明epoll_event結(jié)構(gòu)體的變量,ev用于注冊(cè)事件,數(shù)組用于回傳要處理的事件
int epoll_fd = epoll_create(10); //創(chuàng)建一個(gè)epoll的句柄,并告訴內(nèi)核這個(gè)監(jiān)聽的數(shù)目為10
int listen_fd = server.getListenFd();
int nfds = 0; //記錄需要處理的事件數(shù)
ev.data.fd = listen_fd;
ev.events = EPOLLIN | EPOLLET; //linsten_fd可讀生棍,邊緣觸發(fā)
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &ev);
server.Listen();
cout << "server is listening......" << endl;
while(true) {
nfds = epoll_wait(epoll_fd, events, 10, -1);
for(int i = 0; i < nfds; i++) {
if(events[i].data.fd == listen_fd) {
int client_fd = server.AcceptConnection();
ev.data.fd = client_fd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev);
} else if(events[i].events & EPOLLIN) { //epoll池中的時(shí)間被邊緣觸發(fā)且是因?yàn)槭盏綌?shù)據(jù)待讀入
int nbytes = 0;
if ((nbytes = server.Read(events[i].data.fd, buffer)) <= 0) {
cout << "fd-" << events[i].data.fd << " receive data error!" << endl;
server.Close(events[i].data.fd);
continue;
}
cout << "EPOLL receive from " << events[i].data.fd << ":" << buffer << endl;
strcpy(buffer, "received from epoll server!");
server.Write(events[i].data.fd, buffer);
}
}
}
}
Server::Server(int port, int size): listen_port(port), listen_size(10) {
this->Init();
}
void Server::Init() {
if((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{ //服務(wù)器端開始建立socket描述符
fprintf(stderr, "Socket error:%s \n\a", strerror(errno));
exit(1);
}
//服務(wù)器端填充tcp sockaddr結(jié)構(gòu)
bzero(&server_addr, sizeof(struct sockaddr_in)); //先將套接字地址數(shù)據(jù)結(jié)構(gòu)清零
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htons(INADDR_ANY);
server_addr.sin_port = htons(listen_port);
if(bind(listen_fd, (struct sockaddr *)(&server_addr), sizeof(struct sockaddr)) == -1)
{
fprintf(stderr, "Bind error:%s\n\a", strerror(errno));
exit(1);
}
}
void Server::Listen() {
if(listen(listen_fd, listen_size) == -1)
{ //端口綁定成功颤霎,監(jiān)聽socketfd描述符,同時(shí)處理的最大連接請(qǐng)求數(shù)為10
fprintf(stderr, "Listen error:%s\n\a", strerror(errno));
exit(1);
}
}
int Server::AcceptConnection() {
int sockaddr_size = sizeof(struct sockaddr_in);
ClientInfo temp;
if ((temp.client_fd = accept(listen_fd, (struct sockaddr *)(&temp.client_sock),
(socklen_t *) &sockaddr_size)) == -1)
{ //調(diào)用accept接受一個(gè)連接請(qǐng)求
fprintf(stderr, "Accept error:%s\n\a", strerror(errno));
exit(1);
}
temp.client_ip.assign(inet_ntoa(temp.client_sock.sin_addr));
cout << "Connected from "<< temp.client_ip << "\tclient fd is " << temp.client_fd << endl;
client_info.push_back(temp);
return temp.client_fd;
}
int Server::Write(int sock_fd, char buff[]) {
int nbytes = 0;
if((nbytes = write(sock_fd, buff, strlen(buff))) == -1)
fprintf(stderr, "Write Error:%s\n", strerror(errno));
return nbytes;
}
int Server::Read(int sock_fd, char buff[]) {
int nbytes = 0;
if ((nbytes = read(sock_fd, buff, MAX_BUFF_SIZE)) == -1)
fprintf(stderr, "Read Error:%s\n", strerror(errno));
buff[nbytes] = '\0';
return nbytes;
}
int Server::Close(int client_fd) {
//先查找client_fd的迭代器client_account_it
auto client_it = find_if(client_info.begin(), client_info.end(),
[client_fd](const ClientInfo &cli){ return cli.client_fd == client_fd; });
client_info.erase(client_it);
close(client_fd);
printf("fd-%d exit!\n", client_fd);
}
int Server::getListenFd() const{
return listen_fd;
}
客戶端代碼(只看Client
類的定義和main函數(shù)即可):
//
// Created by mylord on 2019/10/26.
//
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <iostream>
#include <fstream>
#define MAX_BUFF_SIZE 1024
class Client
{
private:
int sock_fd;
struct sockaddr_in client_addr;
char server_ip[16];
int server_port;
public:
Client(char * server_ip, int server_port);
int Connect();
int Write(char * buff);
int Read(char * buff);
int Close();
};
Client::Client(char * server_ip, int server_port)
{
strncpy(this->server_ip, server_ip, 16);
this->server_port = server_port;
struct hostent * host;
if((host = gethostbyname(this->server_ip)) == NULL)
{
fprintf(stderr, "The host name %s is illegal.\n", server_ip);
exit(1);
}
if((this->sock_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
fprintf(stderr, "Init socket fd error!\n");
exit(1);
}
bzero(&this->client_addr, sizeof(this->client_addr));
this->client_addr.sin_family = AF_INET;
this->client_addr.sin_port = htons(this->server_port);
this->client_addr.sin_addr = *((struct in_addr *)host->h_addr);
}
int Client::Connect()
{
int nbytes = connect(this->sock_fd, (struct sockaddr *)(&this->client_addr),
sizeof(struct sockaddr));
return nbytes;
}
int Client::Write(char * buff)
{
int nbytes = write(this->sock_fd, buff, strlen(buff));
return nbytes;
}
int Client::Read(char * buff)
{
int nbytes = read(this->sock_fd, buff, MAX_BUFF_SIZE);
buff[nbytes] = '\0';
return nbytes;
}
int Client::Close()
{
close(this->sock_fd);
printf("fd-%d exit!\n", this->sock_fd);
}
int main(int argc, char *argv[]) {
Client client(argv[1], 5555);
char buff[1024];
char send_msg[] = "hello epoll!";
client.Connect();
while(true) {
client.Write(send_msg);
client.Read(buff);
std::cout << buff << std::endl;
sleep(1);
}
}
程序運(yùn)行結(jié)果:
參考文獻(xiàn):
https://www.cnblogs.com/zhangmingda/p/9396994.html
http://www.reibang.com/p/397449cadc9a