redis服務(wù)器是一個(gè)事件驅(qū)動(dòng)型的鉴扫,主要包括以下兩種類型的事件:
(1)文件事件:客戶端與服務(wù)器的socket連接狞贱,讀命令,寫命令都是文件事件郎仆。redis服務(wù)器是單線程只祠,采用I/O多路復(fù)用來處理多個(gè)客戶端的請(qǐng)求。
(2)時(shí)間事件:周期性地執(zhí)行一些操作扰肌。
1抛寝、事件循環(huán)
事件循環(huán)的核心部分是aeEventLoop,下圖為數(shù)據(jù)結(jié)構(gòu):
aeEventLoop數(shù)據(jù)結(jié)構(gòu)
aeEventLoop保存了待處理的文件事件曙旭,時(shí)間事件盗舰,以及事件執(zhí)行的上下文。內(nèi)部持有三個(gè)事件數(shù)組:
(1)aeFileEvent *events 已注冊文件事件數(shù)組桂躏;
(2)aeFiredEvent *fired 已就緒文件事件數(shù)組;
(3)aeTimeEvent *timeEventHead 時(shí)間事件列表;
事件循環(huán)
在server啟動(dòng)過程中钻趋,會(huì)調(diào)用aeMain啟動(dòng)事件處理循環(huán)。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件處理前執(zhí)行的函數(shù)剂习,那么運(yùn)行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 開始處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
aeMain循環(huán)處理事件蛮位,直到eventLoop→stop=true為止较沪。實(shí)際處理文件事件和時(shí)間事件的過程是在aeProcessEvents中。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 獲取最近的時(shí)間事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果時(shí)間事件存在的話
// 那么根據(jù)最近可執(zhí)行時(shí)間事件和現(xiàn)在時(shí)間的時(shí)間差來決定文件事件的阻塞時(shí)間
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 計(jì)算距今最近的時(shí)間事件還要多久才能達(dá)到
// 并將該時(shí)間距保存在 tv 結(jié)構(gòu)中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 時(shí)間差小于 0 失仁,說明事件已經(jīng)可以執(zhí)行了尸曼,將秒和毫秒設(shè)為 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 執(zhí)行到這一步,說明沒有時(shí)間事件
// 那么根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞萄焦,以及阻塞的時(shí)間長度
if (flags & AE_DONT_WAIT) {
// 設(shè)置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// 文件事件可以阻塞直到有事件到達(dá)為止
tvp = NULL; /* wait forever */
}
}
// 處理文件事件控轿,阻塞時(shí)間由 tvp 決定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 從已就緒數(shù)組中獲取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執(zhí)行其中一個(gè)
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
// 執(zhí)行時(shí)間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
(1)找出最近的時(shí)間事件,計(jì)算出文件事件的阻塞時(shí)間拂封。
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
如果最近的時(shí)間事件存在茬射,則根據(jù)離當(dāng)前時(shí)間的時(shí)間差得出文件事件的阻塞時(shí)間;
如果不存在冒签,則根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞躲株,以及阻塞的時(shí)間長度。
(2)阻塞等待就緒的文件事件
// 處理文件事件镣衡,阻塞時(shí)間由 tvp 決定
numevents = aeApiPoll(eventLoop, tvp);
底層有四種實(shí)現(xiàn)方式:(1)evport(2)epoll(3)kqueue(4)select
(3)處理已就緒的文件事件
第二步獲取的已就緒事件存儲(chǔ)在fired中。如果文件事件綁定了讀/寫事件档悠,進(jìn)行相應(yīng)的處理廊鸥。
// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執(zhí)行其中一個(gè)
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
其中rfileProc和wfileProc就是在文件事件被創(chuàng)建時(shí)傳入的函數(shù)指針。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
//省略部分代碼
// 設(shè)置文件事件類型辖所,以及事件的處理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 私有數(shù)據(jù)
fe->clientData = clientData;
// 如果有需要惰说,更新事件處理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
(4)執(zhí)行時(shí)間事件
事件循環(huán)的流程如下圖:
事件循環(huán)過程
2、文件事件
下面來看下aeFileEvent內(nèi)部結(jié)構(gòu):
typedef struct aeFileEvent {
// 監(jiān)聽事件類型掩碼缘回,
// 值可以是 AE_READABLE 或 AE_WRITABLE 吆视,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 讀事件處理器
aeFileProc *rfileProc;
// 寫事件處理器
aeFileProc *wfileProc;
// 多路復(fù)用庫的私有數(shù)據(jù)
void *clientData;
} aeFileEvent;
(1)共有兩種類型的文件事件:AE_READABLE和AE_WRITABLE類型。
(2)兩個(gè)函數(shù)指針:一個(gè)是處理讀事件的函數(shù)指針酥宴,一個(gè)是處理寫事件的函數(shù)指針啦吧。
創(chuàng)建文件事件
以下三種場景會(huì)創(chuàng)建文件事件:
1、當(dāng)有client申請(qǐng)socket連接時(shí)拙寡,會(huì)注冊一個(gè)AE_READABLE類型的文件事件授滓。
2、當(dāng)接受client命令請(qǐng)求時(shí)肆糕,會(huì)注冊一個(gè)AE_READABLE類型的文件事件般堆。
3、當(dāng)返回命令處理結(jié)果時(shí)诚啃,會(huì)注冊一個(gè)AE_WRITABLE類型的文件事件淮摔。
(1)socket連接
創(chuàng)建一個(gè)AE_READABLE類型的文件事件,并注冊事件處理函數(shù)指針acceptTcpHandler
// 為 TCP 連接關(guān)聯(lián)連接應(yīng)答(accept)處理器
// 用于接受并應(yīng)答客戶端的 connect() 調(diào)用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
(2)接受client命令請(qǐng)求
創(chuàng)建Readable類型的時(shí)間始赎,并注冊事件處理函數(shù)readQueryFromClient
// 綁定讀事件到事件 loop (開始接收命令請(qǐng)求)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
(3)返回命令處理結(jié)果
創(chuàng)建Writable類型的文件事件和橙,并注冊事件處理函數(shù)sendReplyToClient
int prepareClientToWrite(redisClient *c) {
...
// 一般情況仔燕,為客戶端套接字安裝寫處理器到事件循環(huán)
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
3、時(shí)間事件
3.1胃碾、processTimeEvents中處理時(shí)間事件
static int processTimeEvents(aeEventLoop *eventLoop) {
...
// 通過重置事件的運(yùn)行時(shí)間涨享,防止因系統(tǒng)時(shí)間被修改而造成的事件處理混亂
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次處理時(shí)間事件的時(shí)間
eventLoop->lastTime = now;
// 遍歷鏈表,執(zhí)行那些已經(jīng)就緒的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
// 跳過無效事件
if (te->id > maxId) {
te = te->next;
continue;
}
// 獲取當(dāng)前時(shí)間
aeGetTime(&now_sec, &now_ms);
// 如果當(dāng)前時(shí)間等于或等于事件的執(zhí)行時(shí)間仆百,那么說明事件已就緒厕隧,執(zhí)行這個(gè)事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 執(zhí)行事件處理器,并獲取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
// 記錄是否有需要循環(huán)執(zhí)行這個(gè)事件時(shí)間
if (retval != AE_NOMORE) {
// 是俄周, retval 毫秒之后繼續(xù)執(zhí)行這個(gè)時(shí)間事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 否吁讨,將這個(gè)事件刪除
aeDeleteTimeEvent(eventLoop, id);
}
// 因?yàn)閳?zhí)行事件之后,事件列表可能已經(jīng)被改變了
// 因此需要將 te 放回表頭峦朗,繼續(xù)開始執(zhí)行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
(1)首先判斷系統(tǒng)時(shí)間是否被重新設(shè)置過
如果系統(tǒng)時(shí)間先被設(shè)置成了未來的時(shí)間建丧,又調(diào)成到正確的時(shí)間時(shí),為了防止部分事件延遲執(zhí)行波势,這里會(huì)強(qiáng)制執(zhí)行所有的時(shí)間事件翎朱。
(2)判斷時(shí)間事件是否已到達(dá)
如果當(dāng)前時(shí)間大于或等于事件的執(zhí)行時(shí)間,說明事件已到達(dá)尺铣,則執(zhí)行事件拴曲。事件的執(zhí)行由創(chuàng)建時(shí)間事件時(shí)傳入的函數(shù)指針te->timeProc負(fù)責(zé)。
server啟動(dòng)時(shí)會(huì)注冊一個(gè)時(shí)間事件凛忿,并傳入事件處理函數(shù)serverCron
(3)判斷該時(shí)間事件是否需要循環(huán)執(zhí)行
timeProc函數(shù)的返回值retval為時(shí)間事件執(zhí)行的時(shí)間間隔
如果retval != AE_MORE澈灼,則修改當(dāng)前事件下次執(zhí)行時(shí)間,并在retval間隔之后再次執(zhí)行店溢。
如果retval == AE_MORE叁熔,則刪除當(dāng)前事件。
3.2床牧、ServerCron
serverCron是時(shí)間事件的具體執(zhí)行函數(shù)荣回,具體工作主要有:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/*更新server的統(tǒng)計(jì)信息 */
updateCachedTime();
// 記錄服務(wù)器執(zhí)行命令的次數(shù)
run_with_period(100) trackOperationsPerSecond();
server.lruclock = getLRUClock();
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();
// 檢查客戶端,關(guān)閉超時(shí)客戶端戈咳,并釋放客戶端多余的緩沖區(qū)
clientsCron();
// 對(duì)數(shù)據(jù)庫執(zhí)行各種操作
databasesCron();
// 如果 BGSAVE 和 BGREWRITEAOF 都沒有在執(zhí)行
// 并且有一個(gè) BGREWRITEAOF 在等待驹马,那么執(zhí)行 BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
// 檢查 BGSAVE 或者 BGREWRITEAOF 是否已經(jīng)執(zhí)行完畢
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
...
} else {
// 遍歷所有保存條件,看是否需要執(zhí)行 BGSAVE 命令
for (j = 0; j < server.saveparamslen; j++) {
...
}
/* Trigger an AOF rewrite if needed */
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的當(dāng)前大小大于執(zhí)行 BGREWRITEAOF 所需的最小大小
server.aof_current_size > server.aof_rewrite_min_size)
{
...
}
}
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
//關(guān)閉那些需要異步關(guān)閉的客戶端
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect. */
// 重連接主服務(wù)器除秀、向主服務(wù)器發(fā)送 ACK 糯累、判斷數(shù)據(jù)發(fā)送失敗情況、斷開本服務(wù)器超時(shí)的從服務(wù)器册踩,等等
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// 如果服務(wù)器運(yùn)行在集群模式下泳姐,那么執(zhí)行集群操作
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服務(wù)器運(yùn)行在 sentinel 模式下,那么執(zhí)行 SENTINEL 的主函數(shù)
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
....
return 1000/server.hz;
}
(1)更新server的統(tǒng)計(jì)信息
(2)關(guān)閉已斷開連接的client暂吉,并釋放client占用的空間
(3)刪除數(shù)據(jù)庫過期鍵
(4)執(zhí)行AOF或RDB持久化操作
(5)進(jìn)行主從服務(wù)器同步
(6)如果是集群模式胖秒,則對(duì)集群進(jìn)行同步和連接測試