Swoole 源碼分析——Reactor 模塊之 ReactorEpoll

Epoll 對(duì)象的創(chuàng)建

  • linux 中女淑,最為高效的 reactor 機(jī)制就是 epoll键耕。swReactorobject 會(huì)存儲(chǔ) epoll 的對(duì)象 swReactorEpoll_s尖坤。該數(shù)據(jù)結(jié)構(gòu)中 epfdepollid传黄,events 用于在 epoll_wait 函數(shù)接受就緒的事件突想。
  • 該函數(shù)最重要的是 epoll_create刀诬,該函數(shù)會(huì)創(chuàng)建 epoll 對(duì)象
typedef struct swReactorEpoll_s swReactorEpoll;

struct swReactorEpoll_s
{
    int epfd;
    struct epoll_event *events;
};

int swReactorEpoll_create(swReactor *reactor, int max_event_num)
{
    //create reactor object
    swReactorEpoll *reactor_object = sw_malloc(sizeof(swReactorEpoll));
    if (reactor_object == NULL)
    {
        swWarn("malloc[0] failed.");
        return SW_ERR;
    }
    bzero(reactor_object, sizeof(swReactorEpoll));
    reactor->object = reactor_object;
    reactor->max_event_num = max_event_num;

    reactor_object->events = sw_calloc(max_event_num, sizeof(struct epoll_event));

    if (reactor_object->events == NULL)
    {
        swWarn("malloc[1] failed.");
        sw_free(reactor_object);
        return SW_ERR;
    }
    //epoll create
    reactor_object->epfd = epoll_create(512);
    if (reactor_object->epfd < 0)
    {
        swWarn("epoll_create failed. Error: %s[%d]", strerror(errno), errno);
        sw_free(reactor_object);
        return SW_ERR;
    }
    //binding method
    reactor->add = swReactorEpoll_add;
    reactor->set = swReactorEpoll_set;
    reactor->del = swReactorEpoll_del;
    reactor->wait = swReactorEpoll_wait;
    reactor->free = swReactorEpoll_free;

    return SW_OK;
}

Epoll 添加監(jiān)聽

  • swReactorEpoll_event_set 函數(shù)用于轉(zhuǎn)化可讀(SW_EVENT_READ)、可寫(SW_EVENT_WRITE)的狀態(tài)為 epoll 函數(shù)可用的 EPOLLIN屋灌、EPOLLOUT洁段、EPOLLERR
static sw_inline int swReactorEpoll_event_set(int fdtype)
{
    uint32_t flag = 0;
    if (swReactor_event_read(fdtype))
    {
        flag |= EPOLLIN;
    }
    if (swReactor_event_write(fdtype))
    {
        flag |= EPOLLOUT;
    }
    if (swReactor_event_error(fdtype))
    {
        //flag |= (EPOLLRDHUP);
        flag |= (EPOLLRDHUP | EPOLLHUP | EPOLLERR);
    }
    return flag;
}
  • swReactorEpoll_add 函數(shù)用于為 reactor 添加新的文件描述符進(jìn)行監(jiān)控
  • 添加 fd 最為重要的的是利用 epoll_ctl 函數(shù)的 EPOLL_CTL_ADD 命令。為了能夠更為簡(jiǎn)便在調(diào)用 epoll_wait 后獲取 fd 的類型共郭,并不會(huì)僅僅向 epoll_ctl 函數(shù)添加 fd祠丝,而是會(huì)添加 swFd 類型,該數(shù)據(jù)結(jié)構(gòu)中包含文件描述符和文件類型除嘹。
  • swReactor_add 函數(shù)用于更新 reactor->socket_listfdtypeevents
  • 最后需要自增 event_num 的數(shù)值
typedef struct _swFd
{
    uint32_t fd;
    uint32_t fdtype;
} swFd;

static int swReactorEpoll_add(swReactor *reactor, int fd, int fdtype)
{
    swReactorEpoll *object = reactor->object;
    struct epoll_event e;
    swFd fd_;
    bzero(&e, sizeof(struct epoll_event));

    fd_.fd = fd;
    fd_.fdtype = swReactor_fdtype(fdtype);
    e.events = swReactorEpoll_event_set(fdtype);

    swReactor_add(reactor, fd, fdtype);

    memcpy(&(e.data.u64), &fd_, sizeof(fd_));
    if (epoll_ctl(object->epfd, EPOLL_CTL_ADD, fd, &e) < 0)
    {
        swSysError("add events[fd=%d#%d, type=%d, events=%d] failed.", fd, reactor->id, fd_.fdtype, e.events);
        swReactor_del(reactor, fd);
        return SW_ERR;
    }

    swTraceLog(SW_TRACE_EVENT, "add event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype));
    reactor->event_num++;

    return SW_OK;
}

static sw_inline void swReactor_add(swReactor *reactor, int fd, int type)
{
    swConnection *socket = swReactor_get(reactor, fd);
    socket->fdtype = swReactor_fdtype(type);
    socket->events = swReactor_events(type);
    socket->removed = 0;
}

Epoll 修改監(jiān)聽

  • 修改監(jiān)聽主要調(diào)用 epoll_ctlEPOLL_CTL_MOD 命令
static int swReactorEpoll_set(swReactor *reactor, int fd, int fdtype)
{
    swReactorEpoll *object = reactor->object;
    swFd fd_;
    struct epoll_event e;
    int ret;

    bzero(&e, sizeof(struct epoll_event));
    e.events = swReactorEpoll_event_set(fdtype);

    if (e.events & EPOLLOUT)
    {
        assert(fd > 2);
    }

    fd_.fd = fd;
    fd_.fdtype = swReactor_fdtype(fdtype);
    memcpy(&(e.data.u64), &fd_, sizeof(fd_));

    ret = epoll_ctl(object->epfd, EPOLL_CTL_MOD, fd, &e);
    if (ret < 0)
    {
        swSysError("reactor#%d->set(fd=%d|type=%d|events=%d) failed.", reactor->id, fd, fd_.fdtype, e.events);
        return SW_ERR;
    }
    swTraceLog(SW_TRACE_EVENT, "set event[reactor_id=%d, fd=%d, events=%d]", reactor->id, fd, swReactor_events(fdtype));
    //execute parent method
    swReactor_set(reactor, fd, fdtype);
    return SW_OK;
}

Epoll 刪除監(jiān)聽

  • 修改監(jiān)聽主要調(diào)用 epoll_ctlEPOLL_CTL_DEL 命令
  • 最后需要更新 event_num
static int swReactorEpoll_del(swReactor *reactor, int fd)
{
    swReactorEpoll *object = reactor->object;
    if (epoll_ctl(object->epfd, EPOLL_CTL_DEL, fd, NULL) < 0)
    {
        swSysError("epoll remove fd[%d#%d] failed.", fd, reactor->id);
        return SW_ERR;
    }

    swTraceLog(SW_TRACE_REACTOR, "remove event[reactor_id=%d|fd=%d]", reactor->id, fd);
    reactor->event_num = reactor->event_num <= 0 ? 0 : reactor->event_num - 1;
    swReactor_del(reactor, fd);

    return SW_OK;
}

Epoll 監(jiān)聽等待就緒

  • swReactorEpoll_waitreactor 的核心写半,該函數(shù)最重要的就是調(diào)用 epoll_wait
  • 首先需要通過(guò) timeo 參數(shù)設(shè)置 msec,利用 object->events 設(shè)置 events
  • epoll_wait 函數(shù)返回之后尉咕,如果 n<0叠蝇,那么需要先檢查 erron,如果是 EINTR年缎,那么說(shuō)明有信號(hào)觸發(fā)悔捶,此時(shí)需要進(jìn)行信號(hào)的回調(diào)函數(shù)铃慷,然后再繼續(xù)事件循環(huán)。如果不是 EINTR蜕该,那么就要返回錯(cuò)誤犁柜,結(jié)束事件循環(huán)
  • 如果 n == 0,一般是由于 epoll_wait 已超時(shí)堂淡,此時(shí)需要調(diào)用超時(shí)回調(diào)函數(shù)
  • 如果 n > 0馋缅,那么就要從 events 中取出已經(jīng)就緒的 swFd 對(duì)象,并利用該對(duì)象的值初始化 event
  • 接下來(lái)就要檢查 events[i].events 的值淤齐,來(lái)判斷具體是讀就緒股囊、寫就緒還是發(fā)生了錯(cuò)誤,值得注意的是 EPOLLRDHUP 事件更啄,此事件代表著對(duì)端斷開連接稚疹,這個(gè)是 linux 自從 2.6.17 的新特性
  • 利用 swReactor_getHandle 函數(shù)取出對(duì)應(yīng)的文件描述符類型的事件回調(diào)函數(shù)
  • 事件循環(huán)的最后調(diào)用 onFinish 函數(shù)
  • 如果設(shè)置了 once,說(shuō)明此 reactor 只會(huì)循環(huán)一次祭务,立即退出内狗;否則,繼續(xù)事件循環(huán)
typedef struct _swEvent
{
    int fd;
    int16_t from_id;
    uint8_t type;
    swConnection *socket;
} swEvent;

static int swReactorEpoll_wait(swReactor *reactor, struct timeval *timeo)
{
    swEvent event;
    swReactorEpoll *object = reactor->object;
    swReactor_handle handle;
    int i, n, ret, msec;

    int reactor_id = reactor->id;
    int epoll_fd = object->epfd;
    int max_event_num = reactor->max_event_num;
    struct epoll_event *events = object->events;

    if (reactor->timeout_msec == 0)
    {
        if (timeo == NULL)
        {
            reactor->timeout_msec = -1;
        }
        else
        {
            reactor->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
        }
    }

    reactor->start = 1;

    while (reactor->running > 0)
    {
        if (reactor->onBegin != NULL)
        {
            reactor->onBegin(reactor);
        }
        msec = reactor->timeout_msec;
        n = epoll_wait(epoll_fd, events, max_event_num, msec);
        if (n < 0)
        {
            if (swReactor_error(reactor) < 0)
            {
                swWarn("[Reactor#%d] epoll_wait failed. Error: %s[%d]", reactor_id, strerror(errno), errno);
                return SW_ERR;
            }
            else
            {
                continue;
            }
        }
        else if (n == 0)
        {
            if (reactor->onTimeout != NULL)
            {
                reactor->onTimeout(reactor);
            }
            continue;
        }
        for (i = 0; i < n; i++)
        {
            event.fd = events[i].data.u64;
            event.from_id = reactor_id;
            event.type = events[i].data.u64 >> 32;
            event.socket = swReactor_get(reactor, event.fd);

            //read
            if ((events[i].events & EPOLLIN) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_READ, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLIN handle failed. fd=%d.", event.fd);
                }
            }
            //write
            if ((events[i].events & EPOLLOUT) && !event.socket->removed)
            {
                handle = swReactor_getHandle(reactor, SW_EVENT_WRITE, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLOUT handle failed. fd=%d.", event.fd);
                }
            }
            //error
#ifndef NO_EPOLLRDHUP
            if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) && !event.socket->removed)
#else
            if ((events[i].events & (EPOLLERR | EPOLLHUP)) && !event.socket->removed)
#endif
            {
                //ignore ERR and HUP, because event is already processed at IN and OUT handler.
                if ((events[i].events & EPOLLIN) || (events[i].events & EPOLLOUT))
                {
                    continue;
                }
                handle = swReactor_getHandle(reactor, SW_EVENT_ERROR, event.type);
                ret = handle(reactor, &event);
                if (ret < 0)
                {
                    swSysError("EPOLLERR handle failed. fd=%d.", event.fd);
                }
            }
        }

        if (reactor->onFinish != NULL)
        {
            reactor->onFinish(reactor);
        }
        if (reactor->once)
        {
            break;
        }
    }
    return 0;
}

static sw_inline int swReactor_error(swReactor *reactor)
{
    switch (errno)
    {
    case EINTR:
        if (reactor->singal_no)
        {
            swSignal_callback(reactor->singal_no);
            reactor->singal_no = 0;
        }
        return SW_OK;
    }
    return SW_ERR;
}

static sw_inline swReactor_handle swReactor_getHandle(swReactor *reactor, int event_type, int fdtype)
{
    if (event_type == SW_EVENT_WRITE)
    {
        return (reactor->write_handle[fdtype] != NULL) ? reactor->write_handle[fdtype] : reactor->handle[SW_FD_WRITE];
    }
    else if (event_type == SW_EVENT_ERROR)
    {
        return (reactor->error_handle[fdtype] != NULL) ? reactor->error_handle[fdtype] : reactor->handle[SW_FD_CLOSE];
    }
    return reactor->handle[fdtype];
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末义锥,一起剝皮案震驚了整個(gè)濱河市柳沙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌拌倍,老刑警劉巖赂鲤,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異柱恤,居然都是意外死亡数初,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門梗顺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)泡孩,“玉大人,你說(shuō)我怎么就攤上這事寺谤÷嘏福” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵变屁,是天一觀的道長(zhǎng)眼俊。 經(jīng)常有香客問(wèn)我,道長(zhǎng)敞贡,這世上最難降的妖魔是什么泵琳? 我笑而不...
    開封第一講書人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮誊役,結(jié)果婚禮上获列,老公的妹妹穿的比我還像新娘。我一直安慰自己蛔垢,他們只是感情好击孩,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鹏漆,像睡著了一般巩梢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上艺玲,一...
    開封第一講書人閱讀 49,837評(píng)論 1 290
  • 那天括蝠,我揣著相機(jī)與錄音,去河邊找鬼饭聚。 笑死忌警,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的秒梳。 我是一名探鬼主播法绵,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼酪碘!你這毒婦竟也來(lái)了朋譬?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤兴垦,失蹤者是張志新(化名)和其女友劉穎徙赢,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體探越,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡狡赐,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了扶关。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阴汇。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖节槐,靈堂內(nèi)的尸體忽然破棺而出搀庶,到底是詐尸還是另有隱情,我是刑警寧澤铜异,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布哥倔,位于F島的核電站,受9級(jí)特大地震影響揍庄,放射性物質(zhì)發(fā)生泄漏咆蒿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望沃测。 院中可真熱鬧缭黔,春花似錦、人聲如沸蒂破。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)附迷。三九已至惧互,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間喇伯,已是汗流浹背喊儡。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留稻据,地道東北人艾猜。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像攀甚,于是被迫代替她去往敵國(guó)和親箩朴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容