Epoll
對(duì)象的創(chuàng)建
- 在
linux
中女淑,最為高效的 reactor
機(jī)制就是 epoll
键耕。swReactor
的 object
會(huì)存儲(chǔ) epoll
的對(duì)象 swReactorEpoll_s
尖坤。該數(shù)據(jù)結(jié)構(gòu)中 epfd
是 epoll
的 id
传黄,events
用于在 epoll_wait
函數(shù)接受就緒的事件突想。
- 該函數(shù)最重要的是
epoll_create
刀诬,該函數(shù)會(huì)創(chuàng)建 epoll
對(duì)象
typedef struct swReactorEpoll_s swReactorEpoll;
struct swReactorEpoll_s
{
int epfd;
struct epoll_event *events;
};
int swReactorEpoll_create(swReactor *reactor, int max_event_num)
{
//create reactor object
swReactorEpoll *reactor_object = sw_malloc(sizeof(swReactorEpoll));
if (reactor_object == NULL)
{
swWarn("malloc[0] failed.");
return SW_ERR;
}
bzero(reactor_object, sizeof(swReactorEpoll));
reactor->object = reactor_object;
reactor->max_event_num = max_event_num;
reactor_object->events = sw_calloc(max_event_num, sizeof(struct epoll_event));
if (reactor_object->events == NULL)
{
swWarn("malloc[1] failed.");
sw_free(reactor_object);
return SW_ERR;
}
//epoll create
reactor_object->epfd = epoll_create(512);
if (reactor_object->epfd < 0)
{
swWarn("epoll_create failed. Error: %s[%d]", strerror(errno), errno);
sw_free(reactor_object);
return SW_ERR;
}
//binding method
reactor->add = swReactorEpoll_add;
reactor->set = swReactorEpoll_set;
reactor->del = swReactorEpoll_del;
reactor->wait = swReactorEpoll_wait;
reactor->free = swReactorEpoll_free;
return SW_OK;
}
Epoll
添加監(jiān)聽
-
swReactorEpoll_event_set
函數(shù)用于轉(zhuǎn)化可讀(SW_EVENT_READ
)、可寫(SW_EVENT_WRITE
)的狀態(tài)為 epoll
函數(shù)可用的 EPOLLIN
屋灌、EPOLLOUT
洁段、EPOLLERR
static sw_inline int swReactorEpoll_event_set(int fdtype)
{
uint32_t flag = 0;
if (swReactor_event_read(fdtype))
{
flag |= EPOLLIN;
}
if (swReactor_event_write(fdtype))
{
flag |= EPOLLOUT;
}
if (swReactor_event_error(fdtype))
{
//flag |= (EPOLLRDHUP);
flag |= (EPOLLRDHUP | EPOLLHUP | EPOLLERR);
}
return flag;
}
-
swReactorEpoll_add
函數(shù)用于為 reactor
添加新的文件描述符進(jìn)行監(jiān)控
- 添加
fd
最為重要的的是利用 epoll_ctl
函數(shù)的 EPOLL_CTL_ADD
命令。為了能夠更為簡(jiǎn)便在調(diào)用 epoll_wait
后獲取 fd
的類型共郭,并不會(huì)僅僅向 epoll_ctl
函數(shù)添加 fd
祠丝,而是會(huì)添加 swFd
類型,該數(shù)據(jù)結(jié)構(gòu)中包含文件描述符和文件類型除嘹。
-
swReactor_add
函數(shù)用于更新 reactor->socket_list
的 fdtype
與 events
- 最后需要自增
event_num
的數(shù)值
typedef struct _swFd
{
uint32_t fd;
uint32_t fdtype;
} swFd;
static int swReactorEpoll_add(swReactor *reactor, int fd, int fdtype)
{
swReactorEpoll *object = reactor->object;
struct epoll_event e;
swFd fd_;
bzero(&e, sizeof(struct epoll_event));
fd_.fd = fd;
fd_.fdtype = swReactor_fdtype(fdtype);
e.events = swReactorEpoll_event_set(fdtype);
swReactor_add(reactor, fd, fdtype);
memcpy(&(e.data.u64), &fd_, sizeof(fd_));
if (epoll_ctl(object->epfd, EPOLL_CTL_ADD, fd, &e) < 0)
{
swSysError("add events[fd=%d#%d, type=%d, events=%d] failed.", fd, reactor->id, fd_.fdtype, e.events);
swReactor_del(reactor, fd);
return SW_ERR;
}
swTraceLog(SW_TRACE_EVENT, "add event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype));
reactor->event_num++;
return SW_OK;
}
static sw_inline void swReactor_add(swReactor *reactor, int fd, int type)
{
swConnection *socket = swReactor_get(reactor, fd);
socket->fdtype = swReactor_fdtype(type);
socket->events = swReactor_events(type);
socket->removed = 0;
}
Epoll
修改監(jiān)聽
- 修改監(jiān)聽主要調(diào)用
epoll_ctl
的 EPOLL_CTL_MOD
命令
static int swReactorEpoll_set(swReactor *reactor, int fd, int fdtype)
{
swReactorEpoll *object = reactor->object;
swFd fd_;
struct epoll_event e;
int ret;
bzero(&e, sizeof(struct epoll_event));
e.events = swReactorEpoll_event_set(fdtype);
if (e.events & EPOLLOUT)
{
assert(fd > 2);
}
fd_.fd = fd;
fd_.fdtype = swReactor_fdtype(fdtype);
memcpy(&(e.data.u64), &fd_, sizeof(fd_));
ret = epoll_ctl(object->epfd, EPOLL_CTL_MOD, fd, &e);
if (ret < 0)
{
swSysError("reactor#%d->set(fd=%d|type=%d|events=%d) failed.", reactor->id, fd, fd_.fdtype, e.events);
return SW_ERR;
}
swTraceLog(SW_TRACE_EVENT, "set event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype));
//execute parent method
swReactor_set(reactor, fd, fdtype);
return SW_OK;
}
Epoll
刪除監(jiān)聽
- 修改監(jiān)聽主要調(diào)用
epoll_ctl
的 EPOLL_CTL_DEL
命令
- 最后需要更新
event_num
static int swReactorEpoll_del(swReactor *reactor, int fd)
{
swReactorEpoll *object = reactor->object;
if (epoll_ctl(object->epfd, EPOLL_CTL_DEL, fd, NULL) < 0)
{
swSysError("epoll remove fd[%d#%d] failed.", fd, reactor->id);
return SW_ERR;
}
swTraceLog(SW_TRACE_REACTOR, "remove event[reactor_id=%d|fd=%d]", reactor->id, fd);
reactor->event_num = reactor->event_num <= 0 ? 0 : reactor->event_num - 1;
swReactor_del(reactor, fd);
return SW_OK;
}
Epoll
監(jiān)聽等待就緒
-
swReactorEpoll_wait
是 reactor
的核心写半,該函數(shù)最重要的就是調(diào)用 epoll_wait
- 首先需要通過(guò)
timeo
參數(shù)設(shè)置 msec
,利用 object->events
設(shè)置 events
-
epoll_wait
函數(shù)返回之后尉咕,如果 n<0
叠蝇,那么需要先檢查 erron
,如果是 EINTR
年缎,那么說(shuō)明有信號(hào)觸發(fā)悔捶,此時(shí)需要進(jìn)行信號(hào)的回調(diào)函數(shù)铃慷,然后再繼續(xù)事件循環(huán)。如果不是 EINTR
蜕该,那么就要返回錯(cuò)誤犁柜,結(jié)束事件循環(huán)
- 如果
n == 0
,一般是由于 epoll_wait
已超時(shí)堂淡,此時(shí)需要調(diào)用超時(shí)回調(diào)函數(shù)
- 如果
n > 0
馋缅,那么就要從 events
中取出已經(jīng)就緒的 swFd
對(duì)象,并利用該對(duì)象的值初始化 event
- 接下來(lái)就要檢查
events[i].events
的值淤齐,來(lái)判斷具體是讀就緒股囊、寫就緒還是發(fā)生了錯(cuò)誤,值得注意的是 EPOLLRDHUP
事件更啄,此事件代表著對(duì)端斷開連接稚疹,這個(gè)是 linux
自從 2.6.17
的新特性
- 利用
swReactor_getHandle
函數(shù)取出對(duì)應(yīng)的文件描述符類型的事件回調(diào)函數(shù)
- 事件循環(huán)的最后調(diào)用
onFinish
函數(shù)
- 如果設(shè)置了
once
,說(shuō)明此 reactor
只會(huì)循環(huán)一次祭务,立即退出内狗;否則,繼續(xù)事件循環(huán)
typedef struct _swEvent
{
int fd;
int16_t from_id;
uint8_t type;
swConnection *socket;
} swEvent;
static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
swEvent event;
swReactorEpoll *object = reactor->object;
swReactor_handle handle;
int i, n, ret, msec;
int reactor_id = reactor->id;
int epoll_fd = object->epfd;
int max_event_num = reactor->max_event_num;
struct epoll_event *events = object->events;
if (reactor->timeout_msec == 0)
{
if (timeo == NULL)
{
reactor->timeout_msec = -1;
}
else
{
reactor->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
}
}
reactor->start = 1;
while (reactor->running > 0)
{
if (reactor->onBegin != NULL)
{
reactor->onBegin(reactor);
}
msec = reactor->timeout_msec;
n = epoll_wait(epoll_fd, events, max_event_num, msec);
if (n < 0)
{
if (swReactor_error(reactor) < 0)
{
swWarn("[Reactor#%d] epoll_wait failed. Error: %s[%d]", reactor_id, strerror(errno), errno);
return SW_ERR;
}
else
{
continue;
}
}
else if (n == 0)
{
if (reactor->onTimeout != NULL)
{
reactor->onTimeout(reactor);
}
continue;
}
for (i = 0; i < n; i++)
{
event.fd = events[i].data.u64;
event.from_id = reactor_id;
event.type = events[i].data.u64 >> 32;
event.socket = swReactor_get(reactor, event.fd);
//read
if ((events[i].events & EPOLLIN) && !event.socket->removed)
{
handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
ret = handle(reactor, &event);
if (ret < 0)
{
swSysError("EPOLLIN handle failed. fd=%d.", event.fd);
}
}
//write
if ((events[i].events & EPOLLOUT) && !event.socket->removed)
{
handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
ret = handle(reactor, &event);
if (ret < 0)
{
swSysError("EPOLLOUT handle failed. fd=%d.", event.fd);
}
}
//error
#ifndef NO_EPOLLRDHUP
if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed)
#else
if ((events[i].events & (EPOLLERR | EPOLLHUP)) && !event.socket->removed)
#endif
{
//ignore ERR and HUP, because event is already processed at IN and OUT handler.
if ((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT))
{
continue;
}
handle = swReactor_getHandle(reactor, SW_EVENT_ERROR, event.type);
ret = handle(reactor, &event);
if (ret < 0)
{
swSysError("EPOLLERR handle failed. fd=%d.", event.fd);
}
}
}
if (reactor->onFinish != NULL)
{
reactor->onFinish(reactor);
}
if (reactor->once)
{
break;
}
}
return 0;
}
static sw_inline int swReactor_error(swReactor *reactor)
{
switch (errno)
{
case EINTR:
if (reactor->singal_no)
{
swSignal_callback(reactor->singal_no);
reactor->singal_no = 0;
}
return SW_OK;
}
return SW_ERR;
}
static sw_inline swReactor_handle swReactor_getHandle(swReactor *reactor, int event_type, int fdtype)
{
if (event_type == SW_EVENT_WRITE)
{
return (reactor->write_handle[fdtype] != NULL) ? reactor->write_handle[fdtype] : reactor->handle[SW_FD_WRITE];
}
else if (event_type == SW_EVENT_ERROR)
{
return (reactor->error_handle[fdtype] != NULL) ? reactor->error_handle[fdtype] : reactor->handle[SW_FD_CLOSE];
}
return reactor->handle[fdtype];
}