Redis事件

redis服務(wù)器是一個(gè)事件驅(qū)動(dòng)型的鉴扫,主要包括以下兩種類型的事件:
(1)文件事件:客戶端與服務(wù)器的socket連接狞贱,讀命令,寫命令都是文件事件郎仆。redis服務(wù)器是單線程只祠,采用I/O多路復(fù)用來處理多個(gè)客戶端的請(qǐng)求。
(2)時(shí)間事件:周期性地執(zhí)行一些操作扰肌。

1抛寝、事件循環(huán)

事件循環(huán)的核心部分是aeEventLoop,下圖為數(shù)據(jù)結(jié)構(gòu):

aeEventLoop數(shù)據(jù)結(jié)構(gòu)

aeEventLoop保存了待處理的文件事件曙旭,時(shí)間事件盗舰,以及事件執(zhí)行的上下文。內(nèi)部持有三個(gè)事件數(shù)組:
(1)aeFileEvent *events 已注冊文件事件數(shù)組桂躏;
(2)aeFiredEvent *fired 已就緒文件事件數(shù)組;
(3)aeTimeEvent *timeEventHead 時(shí)間事件列表;
事件循環(huán)
在server啟動(dòng)過程中钻趋,會(huì)調(diào)用aeMain啟動(dòng)事件處理循環(huán)。

void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件處理前執(zhí)行的函數(shù)剂习,那么運(yùn)行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 開始處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}

aeMain循環(huán)處理事件蛮位,直到eventLoop→stop=true為止较沪。實(shí)際處理文件事件和時(shí)間事件的過程是在aeProcessEvents中。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 獲取最近的時(shí)間事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果時(shí)間事件存在的話
// 那么根據(jù)最近可執(zhí)行時(shí)間事件和現(xiàn)在時(shí)間的時(shí)間差來決定文件事件的阻塞時(shí)間
long now_sec, now_ms;
/* Calculate the time missing for the nearest
* timer to fire. */
// 計(jì)算距今最近的時(shí)間事件還要多久才能達(dá)到
// 并將該時(shí)間距保存在 tv 結(jié)構(gòu)中
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
// 時(shí)間差小于 0 失仁,說明事件已經(jīng)可以執(zhí)行了尸曼,將秒和毫秒設(shè)為 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 執(zhí)行到這一步,說明沒有時(shí)間事件
// 那么根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞萄焦,以及阻塞的時(shí)間長度
if (flags & AE_DONT_WAIT) {
// 設(shè)置文件事件不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// 文件事件可以阻塞直到有事件到達(dá)為止
tvp = NULL; /* wait forever */
}
}
// 處理文件事件控轿,阻塞時(shí)間由 tvp 決定
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 從已就緒數(shù)組中獲取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執(zhí)行其中一個(gè)
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
// 執(zhí)行時(shí)間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}

(1)找出最近的時(shí)間事件,計(jì)算出文件事件的阻塞時(shí)間拂封。

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);

如果最近的時(shí)間事件存在茬射,則根據(jù)離當(dāng)前時(shí)間的時(shí)間差得出文件事件的阻塞時(shí)間;
如果不存在冒签,則根據(jù) AE_DONT_WAIT 是否設(shè)置來決定是否阻塞躲株,以及阻塞的時(shí)間長度。
(2)阻塞等待就緒的文件事件
// 處理文件事件镣衡,阻塞時(shí)間由 tvp 決定

numevents = aeApiPoll(eventLoop, tvp);

底層有四種實(shí)現(xiàn)方式:(1)evport(2)epoll(3)kqueue(4)select
(3)處理已就緒的文件事件
第二步獲取的已就緒事件存儲(chǔ)在fired中。如果文件事件綁定了讀/寫事件档悠,進(jìn)行相應(yīng)的處理廊鸥。

// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執(zhí)行其中一個(gè)
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}

其中rfileProc和wfileProc就是在文件事件被創(chuàng)建時(shí)傳入的函數(shù)指針。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
//省略部分代碼
// 設(shè)置文件事件類型辖所,以及事件的處理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 私有數(shù)據(jù)
fe->clientData = clientData;
// 如果有需要惰说,更新事件處理器的最大 fd
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

(4)執(zhí)行時(shí)間事件
事件循環(huán)的流程如下圖:

事件循環(huán)過程

2、文件事件

下面來看下aeFileEvent內(nèi)部結(jié)構(gòu):

typedef struct aeFileEvent {
// 監(jiān)聽事件類型掩碼缘回,
// 值可以是 AE_READABLE 或 AE_WRITABLE 吆视,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 讀事件處理器
aeFileProc *rfileProc;
// 寫事件處理器
aeFileProc *wfileProc;
// 多路復(fù)用庫的私有數(shù)據(jù)
void *clientData;
} aeFileEvent;

(1)共有兩種類型的文件事件:AE_READABLE和AE_WRITABLE類型。
(2)兩個(gè)函數(shù)指針:一個(gè)是處理讀事件的函數(shù)指針酥宴,一個(gè)是處理寫事件的函數(shù)指針啦吧。
創(chuàng)建文件事件
以下三種場景會(huì)創(chuàng)建文件事件:
1、當(dāng)有client申請(qǐng)socket連接時(shí)拙寡,會(huì)注冊一個(gè)AE_READABLE類型的文件事件授滓。
2、當(dāng)接受client命令請(qǐng)求時(shí)肆糕,會(huì)注冊一個(gè)AE_READABLE類型的文件事件般堆。
3、當(dāng)返回命令處理結(jié)果時(shí)诚啃,會(huì)注冊一個(gè)AE_WRITABLE類型的文件事件淮摔。
(1)socket連接
創(chuàng)建一個(gè)AE_READABLE類型的文件事件,并注冊事件處理函數(shù)指針acceptTcpHandler

// 為 TCP 連接關(guān)聯(lián)連接應(yīng)答(accept)處理器
// 用于接受并應(yīng)答客戶端的 connect() 調(diào)用
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}

(2)接受client命令請(qǐng)求
創(chuàng)建Readable類型的時(shí)間始赎,并注冊事件處理函數(shù)readQueryFromClient

// 綁定讀事件到事件 loop (開始接收命令請(qǐng)求)
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}

(3)返回命令處理結(jié)果
創(chuàng)建Writable類型的文件事件和橙,并注冊事件處理函數(shù)sendReplyToClient

int prepareClientToWrite(redisClient *c) {
...
// 一般情況仔燕,為客戶端套接字安裝寫處理器到事件循環(huán)
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}

3、時(shí)間事件

3.1胃碾、processTimeEvents中處理時(shí)間事件

static int processTimeEvents(aeEventLoop *eventLoop) {
...
// 通過重置事件的運(yùn)行時(shí)間涨享,防止因系統(tǒng)時(shí)間被修改而造成的事件處理混亂
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
// 更新最后一次處理時(shí)間事件的時(shí)間
eventLoop->lastTime = now;
// 遍歷鏈表,執(zhí)行那些已經(jīng)就緒的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
// 跳過無效事件
if (te->id > maxId) {
te = te->next;
continue;
}
// 獲取當(dāng)前時(shí)間
aeGetTime(&now_sec, &now_ms);
// 如果當(dāng)前時(shí)間等于或等于事件的執(zhí)行時(shí)間仆百,那么說明事件已就緒厕隧,執(zhí)行這個(gè)事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 執(zhí)行事件處理器,并獲取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
// 記錄是否有需要循環(huán)執(zhí)行這個(gè)事件時(shí)間
if (retval != AE_NOMORE) {
// 是俄周, retval 毫秒之后繼續(xù)執(zhí)行這個(gè)時(shí)間事件
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 否吁讨,將這個(gè)事件刪除
aeDeleteTimeEvent(eventLoop, id);
}
// 因?yàn)閳?zhí)行事件之后,事件列表可能已經(jīng)被改變了
// 因此需要將 te 放回表頭峦朗,繼續(xù)開始執(zhí)行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}

(1)首先判斷系統(tǒng)時(shí)間是否被重新設(shè)置過
如果系統(tǒng)時(shí)間先被設(shè)置成了未來的時(shí)間建丧,又調(diào)成到正確的時(shí)間時(shí),為了防止部分事件延遲執(zhí)行波势,這里會(huì)強(qiáng)制執(zhí)行所有的時(shí)間事件翎朱。
(2)判斷時(shí)間事件是否已到達(dá)
如果當(dāng)前時(shí)間大于或等于事件的執(zhí)行時(shí)間,說明事件已到達(dá)尺铣,則執(zhí)行事件拴曲。事件的執(zhí)行由創(chuàng)建時(shí)間事件時(shí)傳入的函數(shù)指針te->timeProc負(fù)責(zé)。
server啟動(dòng)時(shí)會(huì)注冊一個(gè)時(shí)間事件凛忿,并傳入事件處理函數(shù)serverCron
(3)判斷該時(shí)間事件是否需要循環(huán)執(zhí)行
timeProc函數(shù)的返回值retval為時(shí)間事件執(zhí)行的時(shí)間間隔
如果retval != AE_MORE澈灼,則修改當(dāng)前事件下次執(zhí)行時(shí)間,并在retval間隔之后再次執(zhí)行店溢。
如果retval == AE_MORE叁熔,則刪除當(dāng)前事件。

3.2床牧、ServerCron

serverCron是時(shí)間事件的具體執(zhí)行函數(shù)荣回,具體工作主要有:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/*更新server的統(tǒng)計(jì)信息 */
updateCachedTime();
// 記錄服務(wù)器執(zhí)行命令的次數(shù)
run_with_period(100) trackOperationsPerSecond();
server.lruclock = getLRUClock();
if (zmalloc_used_memory() > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used_memory();
/* Sample the RSS here since this is a relatively slow call. */
server.resident_set_size = zmalloc_get_rss();
// 檢查客戶端,關(guān)閉超時(shí)客戶端戈咳,并釋放客戶端多余的緩沖區(qū)
clientsCron();
// 對(duì)數(shù)據(jù)庫執(zhí)行各種操作
databasesCron();
// 如果 BGSAVE 和 BGREWRITEAOF 都沒有在執(zhí)行
// 并且有一個(gè) BGREWRITEAOF 在等待驹马,那么執(zhí)行 BGREWRITEAOF
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
// 檢查 BGSAVE 或者 BGREWRITEAOF 是否已經(jīng)執(zhí)行完畢
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
...
} else {
// 遍歷所有保存條件,看是否需要執(zhí)行 BGSAVE 命令
for (j = 0; j < server.saveparamslen; j++) {
...
}
/* Trigger an AOF rewrite if needed */
if (server.rdb_child_pid == -1 &&
server.aof_child_pid == -1 &&
server.aof_rewrite_perc &&
// AOF 文件的當(dāng)前大小大于執(zhí)行 BGREWRITEAOF 所需的最小大小
server.aof_current_size > server.aof_rewrite_min_size)
{
...
}
}
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
//關(guān)閉那些需要異步關(guān)閉的客戶端
freeClientsInAsyncFreeQueue();
/* Clear the paused clients flag if needed. */
clientsArePaused(); /* Don't check return value, just use the side effect. */
// 重連接主服務(wù)器除秀、向主服務(wù)器發(fā)送 ACK 糯累、判斷數(shù)據(jù)發(fā)送失敗情況、斷開本服務(wù)器超時(shí)的從服務(wù)器册踩,等等
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
// 如果服務(wù)器運(yùn)行在集群模式下泳姐,那么執(zhí)行集群操作
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
// 如果服務(wù)器運(yùn)行在 sentinel 模式下,那么執(zhí)行 SENTINEL 的主函數(shù)
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
....
return 1000/server.hz;
}

(1)更新server的統(tǒng)計(jì)信息
(2)關(guān)閉已斷開連接的client暂吉,并釋放client占用的空間
(3)刪除數(shù)據(jù)庫過期鍵
(4)執(zhí)行AOF或RDB持久化操作
(5)進(jìn)行主從服務(wù)器同步
(6)如果是集群模式胖秒,則對(duì)集群進(jìn)行同步和連接測試

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末缎患,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子阎肝,更是在濱河造成了極大的恐慌挤渔,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件风题,死亡現(xiàn)場離奇詭異判导,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)沛硅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門眼刃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人摇肌,你說我怎么就攤上這事擂红。” “怎么了围小?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵昵骤,是天一觀的道長。 經(jīng)常有香客問我肯适,道長涉茧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任疹娶,我火速辦了婚禮,結(jié)果婚禮上伦连,老公的妹妹穿的比我還像新娘雨饺。我一直安慰自己,他們只是感情好惑淳,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布额港。 她就那樣靜靜地躺著,像睡著了一般歧焦。 火紅的嫁衣襯著肌膚如雪移斩。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天绢馍,我揣著相機(jī)與錄音向瓷,去河邊找鬼。 笑死舰涌,一個(gè)胖子當(dāng)著我的面吹牛猖任,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播瓷耙,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼朱躺,長吁一口氣:“原來是場噩夢啊……” “哼刁赖!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起长搀,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤宇弛,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后源请,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體枪芒,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年巢钓,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了病苗。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡症汹,死狀恐怖硫朦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情背镇,我是刑警寧澤咬展,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站瞒斩,受9級(jí)特大地震影響破婆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜胸囱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一祷舀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧烹笔,春花似錦裳扯、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至允蜈,卻和暖如春冤吨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背饶套。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國打工漩蟆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人妓蛮。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓爆安,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子扔仓,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • redis的主要事件源包括信號(hào)褐奥,網(wǎng)絡(luò),文件和時(shí)間事件翘簇,文件事件沒看到用于做什么撬码。 1、信號(hào)事件 redis忽略了S...
    x1wan閱讀 562評(píng)論 0 1
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,116評(píng)論 25 707
  • 生活是什么?生活就是吃喝拉撒彻犁?就是柴木油鹽叫胁?就是家長里短? 過了這么多年汞幢。我也不確定生活是什么驼鹅。或許父母給了我們生...
    sunflowerqiu閱讀 329評(píng)論 0 1
  • 01417歐陽微 剛看到這本書名《我和我的楓姐姐》的時(shí)候,讓我想起我小時(shí)候和我妹妹小時(shí)間的一些事,到現(xiàn)在...
    微微姐姐閱讀 437評(píng)論 0 0
  • 我的男友是軍人翼虫, 所以沒有陪伴, 我只有思念买乃。 我的男友是軍人, 因?yàn)槿盅b在身钓辆, 他選擇報(bào)國剪验。 我的男友是軍人, ...
    渠六億閱讀 766評(píng)論 2 7