aeEventLoop
Redis 事件驅(qū)動框架對應(yīng)的數(shù)據(jù)結(jié)構(gòu)依溯,在 ae.h
中定義油够,記錄了運行過程信息,有 2 個記錄事件的變量:
-
IO 事件
:aeFileEvent 類型的指針 *events -
時間事件
:aeTimeEvent 類型的指針 *timeEventHead猪贪,按照一定時間周期觸發(fā)的事件
/* State of an event based program */
typedef struct aeEventLoop {
……
// IO 事件數(shù)組
aeFileEvent *events;
// 已觸發(fā)事件數(shù)組
aeFiredEvent *fired;
// 時間事件的鏈表投
aeTimeEvent *timeEventHead;
// polling api 相關(guān)數(shù)據(jù)
void *apidata;
// 進入事件循環(huán)流程前執(zhí)行的函數(shù)
aeBeforeSleepProc *beforesleep;
// 進入事件循環(huán)流程后執(zhí)行的函數(shù)
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
在 server.c
的 initServer 函數(shù)中調(diào)用 aeCreateEventLoop
進行初始化。
// 創(chuàng)建事件循環(huán)框架
server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
monotonicInit(); /* just in case the calling app didn't initialize */
// 創(chuàng)建 eventLoop 并分配內(nèi)存空間
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
……
// 調(diào)用 aeApiCreate 函數(shù)
if (aeApiCreate(eventLoop) == -1) goto err;
// 把所有網(wǎng)絡(luò) IO 事件對應(yīng)文件描述符的掩碼讯私,初始化為 AE_NONE热押,暫時不對任何事件進行監(jiān)聽
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
……
return NULL;
}
核心是調(diào)用 aeApiCreate
函數(shù)。aeApiCreate 函數(shù)封裝了操作系統(tǒng)提供的 IO 多路復(fù)用函數(shù)斤寇,假設(shè) Redis 運行在 Linux 操作系統(tǒng)上桶癣,并且 IO 多路復(fù)用機制是 epoll,此時會調(diào)用 epoll_create
創(chuàng)建 epoll 實例娘锁,同時會創(chuàng)建 epoll_event 結(jié)構(gòu)的數(shù)組牙寞,數(shù)組大小等于參數(shù) setsize。
typedef struct aeApiState {
// epoll 實例的描述符
int epfd;
// epoll_event 結(jié)構(gòu)體數(shù)組莫秆,記錄監(jiān)聽事件
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 將 epoll_event 數(shù)組保存在 aeApiState 中
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
……
// 將 epoll 實例描述符保存在 aeApiState 中
state->epfd = epoll_create(1024);
……
// 將 aeApiState 變量賦值給 eventLoop 的 apidata
eventLoop->apidata = state;
}
aeApiCreate 函數(shù)最后將創(chuàng)建好的 aeApiState 變量賦值給 eventLoop 的 apidata间雀,之后 eventLoop 結(jié)構(gòu)體中就有了 epoll 實例
和 epoll_event 數(shù)組
信息,可以基于 epoll 創(chuàng)建和處理事件了镊屎。
// 將 aeApiState 變量賦值給 eventLoop 的 apidata
eventLoop->apidata = state;
IO 事件處理
Redis 的 IO 事件分 3 類:
可讀事件
可寫事件
-
屏障事件
:反轉(zhuǎn)事件的處理順序惹挟。
IO 事件的數(shù)據(jù)結(jié)構(gòu)是 aeFileEvent
結(jié)構(gòu)體,IO 事件的創(chuàng)建是通過 aeCreateFileEvent
函數(shù)來完成的缝驳。
typedef struct aeFileEvent {
// 事件類型的掩碼连锯,AE_(READABLE|WRITABLE|BARRIER)
int mask;
// AE_READABLE 事件的處理函數(shù)
aeFileProc *rfileProc;
// AE_WRITABLE 事件的處理函數(shù)
aeFileProc *wfileProc;
// 指向客戶端私有數(shù)據(jù)
void *clientData;
} aeFileEvent;
IO 事件創(chuàng)建
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 錯誤處理
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
// 核心
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
入?yún)⒂?5 個:
-
*eventLoop
:循環(huán)流程結(jié)構(gòu)體 -
fd
:IO 事件對應(yīng)的文件描述符 -
mask
:事件類型掩碼 -
*proc
:事件處理回調(diào)函數(shù) -
*clientData
:事件私有數(shù)據(jù)
aeCreateFileEvent 函數(shù)會先根據(jù)傳入的文件描述符 fd归苍,在 eventLoop 的 IO 事件數(shù)組中,獲取該描述符關(guān)聯(lián)的 IO 事件指針變量* fe运怖,如下所示:
aeFileEvent *fe = &eventLoop->events[fd];
之后 aeCreateFileEvent 函數(shù)會調(diào)用 aeApiAddEvent 函數(shù)拼弃,添加要監(jiān)聽的事件:
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
aeApiAddEvent 函數(shù)實際上會調(diào)用操作系統(tǒng)提供的 IO 多路復(fù)用
函數(shù),來完成事件的添加摇展。我們還是假設(shè) Redis 實例運行在使用 epoll 機制的 Linux 上吻氧,那么 aeApiAddEvent 函數(shù)就會調(diào)用 epoll_ctl
函數(shù),添加要監(jiān)聽的事件吗购。aeApiAddEvent 函數(shù)源碼如下:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask;
// 將可讀或可寫 IO 事件類型轉(zhuǎn)換為 epoll 監(jiān)聽的類型 EPOLLIN 或 EPOLLOUT
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
// 將要監(jiān)聽的文件描述符賦值給 epoll_event
ee.data.fd = fd;
// 增加新的觀察事件
if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
return 0;
}
至此事件驅(qū)動框架已經(jīng)基于 epoll医男,封裝了 IO 事件的創(chuàng)建。
讀事件處理
Redis server 接收到客戶端的連接請求時捻勉,會使用注冊好的 acceptTcpHandler
函數(shù)進行處理镀梭。acceptTcpHandler 函數(shù)是在 networking.c
文件中,接受客戶端連接并創(chuàng)建已連接套接字 cfd
踱启。
最終會調(diào)用 acceptCommonHandler
函數(shù)报账,其會調(diào)用 createClient 函數(shù),最終會調(diào)用到 aeCreateFileEvent
函數(shù)埠偿,創(chuàng)建 AE_READABLE
的監(jiān)聽事件透罢,回調(diào)函數(shù)是 readQueryFromClient
。
至此事件驅(qū)動框架就增加了一個對客戶端已連接套接字的監(jiān)聽冠蒋。之后客戶端有請求發(fā)送到 Redis server羽圃,框架就會回調(diào) readQueryFromClient 函數(shù)處理請求。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
……
// 每次處理 1000 個
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
……
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
acceptCommonHandler 函數(shù)會調(diào)用到 createClient:
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
……
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
……
connClose(conn); /* May be already closed, just ignore errors */
return;
}
}
createClient 函數(shù)會創(chuàng)建監(jiān)聽事件:
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
……
}
寫事件處理
readQueryFromClient 函數(shù)在 networking.c
中抖剿,收到客戶端請求后朽寞,處理客戶端命令,并將返回的數(shù)據(jù)寫入客戶端輸出緩沖區(qū)斩郎。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 循環(huán)調(diào)用
while (!eventLoop->stop) {
// 核心函數(shù)脑融,處理事件的邏輯
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
在 aeProcessEvents 函數(shù)中,有 IO 事件發(fā)生時缩宜,會先判斷是否有 beforesleep
函數(shù):
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
……
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
……
beforeSleep 函數(shù)調(diào)用的 handleClientsWithPendingWrites
函數(shù)肘迎,會遍歷每一個待寫回數(shù)據(jù)的客戶端,然后調(diào)用 writeToClient
函數(shù)锻煌,將客戶端輸出緩沖區(qū)中的數(shù)據(jù)寫回妓布。
從 aeProcessEvents 函數(shù)的代碼中,我們可以看到該函數(shù)會調(diào)用 aeApiPoll 函數(shù)宋梧,查詢監(jiān)聽的文件描述符中匣沼,有哪些已經(jīng)就緒。一旦有描述符就緒乃秀,aeProcessEvents 函數(shù)就會根據(jù)事件的可讀或可寫類型肛著,調(diào)用相應(yīng)的回調(diào)函數(shù)進行處理。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
……
// 有 IO 事件發(fā)生 || 緊急時間事件發(fā)生
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
……
// 調(diào)用 aeApiPoll 獲取就緒的描述符
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
……
// 如果觸發(fā)的是可讀事件跺讯,調(diào)用事件注冊時設(shè)置的讀事件回調(diào)處理函數(shù)
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 如果觸發(fā)的是可寫事件枢贿,調(diào)用事件注冊時設(shè)置的寫事件回調(diào)處理函數(shù)
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
}
整個流程就完成了。
時間事件處理
時間事件定義
/* Time event structure */
typedef struct aeTimeEvent {
// 時間事件 ID
long long id;
// 事件到達的時間戳
monotime when;
// 事件到達后的處理函數(shù)
aeTimeProc *timeProc;
// 事件結(jié)束后的處理函數(shù)
aeEventFinalizerProc *finalizerProc;
// 事件相關(guān)的私有數(shù)據(jù)
void *clientData;
// 鏈表前向指針
struct aeTimeEvent *prev;
// 鏈表后向指針
struct aeTimeEvent *next;
int refcount;
} aeTimeEvent;
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
時間事件創(chuàng)建
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
te->when = getMonotonicUs() + milliseconds * 1000;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
核心就是創(chuàng)建 aeTimeEvent 指針 te刀脏,并將 te 放入 eventLoop 的時間事件的鏈表頭:
eventLoop->timeEventHead = te;
aeCreateTimeEvent
函數(shù)是在 server.c
文件中的 initServer
函數(shù)中調(diào)用的:
// 為 server 后臺任務(wù)創(chuàng)建定時事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
時間事件回調(diào)函數(shù)
serverCron
在 server.c
中:
- 調(diào)用
后臺任務(wù)函數(shù)
- 調(diào)用
databaseCron 函數(shù)
局荚,處理過期 key 或 rehash
/* We need to do a few operations on clients asynchronously. */
// 執(zhí)行客戶端的異步操作
clientsCron();
/* Handle background operations on Redis databases. */
// 執(zhí)行數(shù)據(jù)庫的后臺操作
databasesCron();
時間事件的觸發(fā)處理
事件驅(qū)動框架的 aeMain 函數(shù)會循環(huán)調(diào)用 aeProcessEvents 函數(shù),來處理各種事件愈污。aeProcessEvents 函數(shù)的最后耀态,會調(diào)用 processTimeEvents
函數(shù)處理時間任務(wù)。
// 檢查是否有時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
processTimeEvents 函數(shù)的主體邏輯暂雹,就是從 eventLoop 的時間事件的鏈表逐一取出每個事件首装,根據(jù)當(dāng)前時間判斷該事件的時間是否滿足觸發(fā)條件。如果滿足就處理杭跪。
static int processTimeEvents(aeEventLoop *eventLoop) {
……
// 從時間事件鏈表中仙逻,取出事件
te = eventLoop->timeEventHead;
……
while(te) {
……
// 當(dāng)前時間已經(jīng)滿足事件的觸發(fā)時間戳
if (te->when <= now) {
……
// 調(diào)用回調(diào)函數(shù)
retval = te->timeProc(eventLoop, id, te->clientData);
……
now = getMonotonicUs();
if (retval != AE_NOMORE) {
// 處理后,再次更新時間
te->when = now + retval * 1000;
}
……
}
// 獲取下一個事件
te = te->next;
}
return processed;
}
參考鏈接
Redis 源碼簡潔剖析系列
Java 編程思想-最全思維導(dǎo)圖-GitHub 下載鏈接系奉,需要的小伙伴可以自取~
原創(chuàng)不易,希望大家轉(zhuǎn)載時請先聯(lián)系我姑廉,并標(biāo)注原文鏈接悄雅。