Redis 源碼簡潔剖析 10 - aeEventLoop 及事件

aeEventLoop

Redis 事件驅(qū)動框架對應(yīng)的數(shù)據(jù)結(jié)構(gòu)依溯,在 ae.h 中定義油够,記錄了運行過程信息,有 2 個記錄事件的變量:

  • IO 事件:aeFileEvent 類型的指針 *events
  • 時間事件:aeTimeEvent 類型的指針 *timeEventHead猪贪,按照一定時間周期觸發(fā)的事件
/* State of an event based program */
typedef struct aeEventLoop {
    ……
    // IO 事件數(shù)組
    aeFileEvent *events;
    // 已觸發(fā)事件數(shù)組
    aeFiredEvent *fired;
    // 時間事件的鏈表投
    aeTimeEvent *timeEventHead;
    // polling api 相關(guān)數(shù)據(jù)
    void *apidata;
    // 進入事件循環(huán)流程前執(zhí)行的函數(shù)
    aeBeforeSleepProc *beforesleep;
    // 進入事件循環(huán)流程后執(zhí)行的函數(shù)
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

server.c 的 initServer 函數(shù)中調(diào)用 aeCreateEventLoop 進行初始化。

// 創(chuàng)建事件循環(huán)框架
server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    monotonicInit();    /* just in case the calling app didn't initialize */

    // 創(chuàng)建 eventLoop 并分配內(nèi)存空間
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    ……

    // 調(diào)用 aeApiCreate 函數(shù)
    if (aeApiCreate(eventLoop) == -1) goto err;
    // 把所有網(wǎng)絡(luò) IO 事件對應(yīng)文件描述符的掩碼讯私,初始化為 AE_NONE热押,暫時不對任何事件進行監(jiān)聽
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

    err:
    ……
    return NULL;
}

核心是調(diào)用 aeApiCreate 函數(shù)。aeApiCreate 函數(shù)封裝了操作系統(tǒng)提供的 IO 多路復(fù)用函數(shù)斤寇,假設(shè) Redis 運行在 Linux 操作系統(tǒng)上桶癣,并且 IO 多路復(fù)用機制是 epoll,此時會調(diào)用 epoll_create 創(chuàng)建 epoll 實例娘锁,同時會創(chuàng)建 epoll_event 結(jié)構(gòu)的數(shù)組牙寞,數(shù)組大小等于參數(shù) setsize。

typedef struct aeApiState {
    // epoll 實例的描述符
    int epfd;
    // epoll_event 結(jié)構(gòu)體數(shù)組莫秆,記錄監(jiān)聽事件
    struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    // 將 epoll_event 數(shù)組保存在 aeApiState 中
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    ……
    // 將 epoll 實例描述符保存在 aeApiState 中
    state->epfd = epoll_create(1024); 
    ……
    // 將 aeApiState 變量賦值給 eventLoop 的 apidata
    eventLoop->apidata = state;
}

aeApiCreate 函數(shù)最后將創(chuàng)建好的 aeApiState 變量賦值給 eventLoop 的 apidata间雀,之后 eventLoop 結(jié)構(gòu)體中就有了 epoll 實例epoll_event 數(shù)組信息,可以基于 epoll 創(chuàng)建和處理事件了镊屎。

// 將 aeApiState 變量賦值給 eventLoop 的 apidata
eventLoop->apidata = state;

IO 事件處理

Redis 的 IO 事件分 3 類:

  1. 可讀事件
  2. 可寫事件
  3. 屏障事件:反轉(zhuǎn)事件的處理順序惹挟。

IO 事件的數(shù)據(jù)結(jié)構(gòu)是 aeFileEvent 結(jié)構(gòu)體,IO 事件的創(chuàng)建是通過 aeCreateFileEvent 函數(shù)來完成的缝驳。

typedef struct aeFileEvent {
    // 事件類型的掩碼连锯,AE_(READABLE|WRITABLE|BARRIER)
    int mask;
    // AE_READABLE 事件的處理函數(shù)
    aeFileProc *rfileProc;
    // AE_WRITABLE 事件的處理函數(shù)
    aeFileProc *wfileProc;
    // 指向客戶端私有數(shù)據(jù)
    void *clientData;
} aeFileEvent;

IO 事件創(chuàng)建

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // 錯誤處理
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    aeFileEvent *fe = &eventLoop->events[fd];

    // 核心
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

入?yún)⒂?5 個:

  • *eventLoop:循環(huán)流程結(jié)構(gòu)體
  • fd:IO 事件對應(yīng)的文件描述符
  • mask:事件類型掩碼
  • *proc:事件處理回調(diào)函數(shù)
  • *clientData:事件私有數(shù)據(jù)

aeCreateFileEvent 函數(shù)會先根據(jù)傳入的文件描述符 fd归苍,在 eventLoop 的 IO 事件數(shù)組中,獲取該描述符關(guān)聯(lián)的 IO 事件指針變量* fe运怖,如下所示:

aeFileEvent *fe = &eventLoop->events[fd];

之后 aeCreateFileEvent 函數(shù)會調(diào)用 aeApiAddEvent 函數(shù)拼弃,添加要監(jiān)聽的事件:

if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

aeApiAddEvent 函數(shù)實際上會調(diào)用操作系統(tǒng)提供的 IO 多路復(fù)用函數(shù),來完成事件的添加摇展。我們還是假設(shè) Redis 實例運行在使用 epoll 機制的 Linux 上吻氧,那么 aeApiAddEvent 函數(shù)就會調(diào)用 epoll_ctl 函數(shù),添加要監(jiān)聽的事件吗购。aeApiAddEvent 函數(shù)源碼如下:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
             EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask;
    // 將可讀或可寫 IO 事件類型轉(zhuǎn)換為 epoll 監(jiān)聽的類型 EPOLLIN 或 EPOLLOUT
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    // 將要監(jiān)聽的文件描述符賦值給 epoll_event
    ee.data.fd = fd;
    // 增加新的觀察事件
    if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
    return 0;
}

至此事件驅(qū)動框架已經(jīng)基于 epoll医男,封裝了 IO 事件的創(chuàng)建。

讀事件處理

Redis server 接收到客戶端的連接請求時捻勉,會使用注冊好的 acceptTcpHandler 函數(shù)進行處理镀梭。acceptTcpHandler 函數(shù)是在 networking.c 文件中,接受客戶端連接并創(chuàng)建已連接套接字 cfd踱启。

最終會調(diào)用 acceptCommonHandler 函數(shù)报账,其會調(diào)用 createClient 函數(shù),最終會調(diào)用到 aeCreateFileEvent 函數(shù)埠偿,創(chuàng)建 AE_READABLE 的監(jiān)聽事件透罢,回調(diào)函數(shù)是 readQueryFromClient

至此事件驅(qū)動框架就增加了一個對客戶端已連接套接字的監(jiān)聽冠蒋。之后客戶端有請求發(fā)送到 Redis server羽圃,框架就會回調(diào) readQueryFromClient 函數(shù)處理請求。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ……

    // 每次處理 1000 個
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        ……
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

acceptCommonHandler 函數(shù)會調(diào)用到 createClient:

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    ……
    /* Create connection and client */
    if ((c = createClient(conn)) == NULL) {
        ……
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }
}

createClient 函數(shù)會創(chuàng)建監(jiān)聽事件:

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
    ……
}
image

寫事件處理

readQueryFromClient 函數(shù)在 networking.c 中抖剿,收到客戶端請求后朽寞,處理客戶端命令,并將返回的數(shù)據(jù)寫入客戶端輸出緩沖區(qū)斩郎。

image
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 循環(huán)調(diào)用
    while (!eventLoop->stop) {
        // 核心函數(shù)脑融,處理事件的邏輯
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

在 aeProcessEvents 函數(shù)中,有 IO 事件發(fā)生時缩宜,會先判斷是否有 beforesleep 函數(shù):

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ……
    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
        eventLoop->beforesleep(eventLoop);
    ……

beforeSleep 函數(shù)調(diào)用的 handleClientsWithPendingWrites 函數(shù)肘迎,會遍歷每一個待寫回數(shù)據(jù)的客戶端,然后調(diào)用 writeToClient 函數(shù)锻煌,將客戶端輸出緩沖區(qū)中的數(shù)據(jù)寫回妓布。

從 aeProcessEvents 函數(shù)的代碼中,我們可以看到該函數(shù)會調(diào)用 aeApiPoll 函數(shù)宋梧,查詢監(jiān)聽的文件描述符中匣沼,有哪些已經(jīng)就緒。一旦有描述符就緒乃秀,aeProcessEvents 函數(shù)就會根據(jù)事件的可讀或可寫類型肛著,調(diào)用相應(yīng)的回調(diào)函數(shù)進行處理。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    ……
    // 有 IO 事件發(fā)生 || 緊急時間事件發(fā)生
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        ……
        // 調(diào)用 aeApiPoll 獲取就緒的描述符
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            ……

            // 如果觸發(fā)的是可讀事件跺讯,調(diào)用事件注冊時設(shè)置的讀事件回調(diào)處理函數(shù)
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop, fd, fe->clientData, mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            // 如果觸發(fā)的是可寫事件枢贿,調(diào)用事件注冊時設(shè)置的寫事件回調(diào)處理函數(shù)
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop, fd, fe->clientData, mask);
                    fired++;
                }
            }

整個流程就完成了。

時間事件處理

時間事件定義

/* Time event structure */
typedef struct aeTimeEvent {
    // 時間事件 ID
    long long id;
    // 事件到達的時間戳
    monotime when;
    // 事件到達后的處理函數(shù)
    aeTimeProc *timeProc;
    // 事件結(jié)束后的處理函數(shù)
    aeEventFinalizerProc *finalizerProc;
    // 事件相關(guān)的私有數(shù)據(jù)
    void *clientData;
    // 鏈表前向指針
    struct aeTimeEvent *prev;
    // 鏈表后向指針
    struct aeTimeEvent *next;
    int refcount;
} aeTimeEvent;
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);

時間事件創(chuàng)建

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

核心就是創(chuàng)建 aeTimeEvent 指針 te刀脏,并將 te 放入 eventLoop 的時間事件的鏈表頭:

eventLoop->timeEventHead = te;

aeCreateTimeEvent 函數(shù)是在 server.c 文件中的 initServer 函數(shù)中調(diào)用的:

// 為 server 后臺任務(wù)創(chuàng)建定時事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    serverPanic("Can't create event loop timers.");
    exit(1);
}

時間事件回調(diào)函數(shù)

serverCronserver.c 中:

  • 調(diào)用后臺任務(wù)函數(shù)
  • 調(diào)用 databaseCron 函數(shù)局荚,處理過期 key 或 rehash
/* We need to do a few operations on clients asynchronously. */
// 執(zhí)行客戶端的異步操作
clientsCron();

/* Handle background operations on Redis databases. */
// 執(zhí)行數(shù)據(jù)庫的后臺操作
databasesCron();

時間事件的觸發(fā)處理

事件驅(qū)動框架的 aeMain 函數(shù)會循環(huán)調(diào)用 aeProcessEvents 函數(shù),來處理各種事件愈污。aeProcessEvents 函數(shù)的最后耀态,會調(diào)用 processTimeEvents 函數(shù)處理時間任務(wù)。

// 檢查是否有時間事件
if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

processTimeEvents 函數(shù)的主體邏輯暂雹,就是從 eventLoop 的時間事件的鏈表逐一取出每個事件首装,根據(jù)當(dāng)前時間判斷該事件的時間是否滿足觸發(fā)條件。如果滿足就處理杭跪。

static int processTimeEvents(aeEventLoop *eventLoop) {
    ……
    // 從時間事件鏈表中仙逻,取出事件
    te = eventLoop->timeEventHead;
    ……
    while(te) {
        ……

        // 當(dāng)前時間已經(jīng)滿足事件的觸發(fā)時間戳
        if (te->when <= now) {
            ……
            // 調(diào)用回調(diào)函數(shù)
            retval = te->timeProc(eventLoop, id, te->clientData);
            ……
            now = getMonotonicUs();
            if (retval != AE_NOMORE) {
                // 處理后,再次更新時間
                te->when = now + retval * 1000;
            }
            ……
        }
        // 獲取下一個事件
        te = te->next;
    }
    return processed;
}

參考鏈接

Redis 源碼簡潔剖析系列

最簡潔的 Redis 源碼剖析系列文章

Java 編程思想-最全思維導(dǎo)圖-GitHub 下載鏈接系奉,需要的小伙伴可以自取~

原創(chuàng)不易,希望大家轉(zhuǎn)載時請先聯(lián)系我姑廉,并標(biāo)注原文鏈接悄雅。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末韵卤,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌狈定,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件巷挥,死亡現(xiàn)場離奇詭異烹卒,居然都是意外死亡,警方通過查閱死者的電腦和手機倦西,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門能真,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扰柠,你說我怎么就攤上這事粉铐。” “怎么了卤档?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵蝙泼,是天一觀的道長。 經(jīng)常有香客問我劝枣,道長汤踏,這世上最難降的妖魔是什么织鲸? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮溪胶,結(jié)果婚禮上搂擦,老公的妹妹穿的比我還像新娘。我一直安慰自己哗脖,他們只是感情好瀑踢,可當(dāng)我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著才避,像睡著了一般橱夭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上桑逝,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天棘劣,我揣著相機與錄音,去河邊找鬼楞遏。 笑死呈础,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的橱健。 我是一名探鬼主播而钞,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼拘荡!你這毒婦竟也來了臼节?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤珊皿,失蹤者是張志新(化名)和其女友劉穎网缝,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蟋定,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡粉臊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了驶兜。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片扼仲。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖抄淑,靈堂內(nèi)的尸體忽然破棺而出屠凶,到底是詐尸還是另有隱情,我是刑警寧澤肆资,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布矗愧,位于F島的核電站,受9級特大地震影響郑原,放射性物質(zhì)發(fā)生泄漏唉韭。R本人自食惡果不足惜夜涕,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望属愤。 院中可真熱鬧钠乏,春花似錦、人聲如沸春塌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽只壳。三九已至,卻和暖如春暑塑,著一層夾襖步出監(jiān)牢的瞬間吼句,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工事格, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留惕艳,地道東北人。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓驹愚,卻偏偏與公主長得像远搪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子逢捺,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容