本文主要從源碼角度說明Redis為客戶端提供服務(包括命令處理與回復)的過程檩咱。
建議閱讀:
1芭碍、Redis 事件的理論說明見:wenmingxing Redis之事件
2、閱讀本文之前應該閱讀:Redis源碼研究之事件驅(qū)動
I粘捎、上帝視角
Redis在啟動的時候會做一系列的初始化邏輯,如配置文件讀取,網(wǎng)絡通信模塊初始化等惭嚣,然后便開始進行事件循環(huán)遵湖,準備等待并處理請求。
當客戶端發(fā)起請求時晚吞,Redis進程會被喚醒(I/O多路復用函數(shù)的系統(tǒng)調(diào)用)延旧。讀取來自客戶端的數(shù)據(jù),解析命令槽地,查找命令迁沫,執(zhí)行命令,回復命令捌蚊。
首先來看main()
函數(shù):
/* src/redis.c/main */
int main(int argc, char **argv) {
......
// 初始化服務器配置集畅,主要是填充 redisServer 結(jié)構(gòu)體中的各種參數(shù)
initServerConfig();
......
// 初始化服務器
initServer();
......
// 進入事件循環(huán)
aeMain(server.el);
}
II、initServer()
1缅糟、initServerConfig()
主要作用是填充struct redisServer
結(jié)構(gòu)體挺智,Redis服務器的相關配置都在redisServer
中。
2窗宦、initServer()
中完成對事件循環(huán)的初始化操作赦颇,并為監(jiān)聽做準備,并初始化數(shù)據(jù)庫空間赴涵。
/* src/redis.c/initServer */
/*完成對事件循環(huán)的初始化工作媒怯,并為監(jiān)聽做準備 */
void initServer() {
// 創(chuàng)建事件循環(huán)結(jié)構(gòu)體,函數(shù)aeCreateEventLoop在事件驅(qū)動一文中介紹過
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
// 分配數(shù)據(jù)庫空間
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// listenToPort() 中有調(diào)用listen()
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
......
// 逐個初始化redis 數(shù)據(jù)庫
/* Create the Redis databases, and initialize other internal state. */
for (j = 0; j < server.REDIS_DEFAULT_DBNUM; j++) { // 初始化多個數(shù)據(jù)庫
// 哈希表髓窜,用于存儲鍵值對
server.db[j].dict = dictCreate(&dbDictType,NULL);
// 哈希表沪摄,用于存儲每個鍵的過期時間
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
......
// 創(chuàng)建接收TCP 或者UNIX 域套接字的事件處理
// TCP
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
// acceptTcpHandler() tcp 連接接受處理函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
......
}
III、aeMain()
在完成initServerConfig
與initServer
的兩步初始化之后纱烘,aeMain
開始進入事件循環(huán)杨拐,等待請求的到來:
/*事件處理器的主循環(huán)*/
/* src/ae.c/aeMain */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
//如果有需要在時間處理前執(zhí)行的函數(shù),則運行它
// 進入事件循環(huán)可能會進入睡眠狀態(tài)擂啥。在睡眠之前编整,執(zhí)行預設置
// 的函數(shù)aeSetBeforeSleepProc()养筒。
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 開始處理事件,下面說明這個函數(shù)。AE_ALL_EVENTS 表示處理所有的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
下面主要說明之前提到過的aeProcessEvents()
函數(shù):
/*處理所有已到達的時間事件慰毅,以及所有已經(jīng)就緒的文件事件*/
/* src/ae.c */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
......//獲取最近的時間事件铣焊,并以此來初始化下面對文件事件的阻塞時間
......
// 調(diào)用IO 多路復用函數(shù)阻塞監(jiān)聽,阻塞時間由tvp決定
numevents = aeApiPoll(eventLoop, tvp);
// 處理已經(jīng)觸發(fā)的事件
for (j = 0; j < numevents; j++) {
// 找到文件事件表中存儲的數(shù)據(jù)拢蛋,并完成參數(shù)的局部化
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
/* note the fe->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
/*根據(jù)局部化的參數(shù)楞遏,進行判斷*/
// 讀事件
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask); //調(diào)用讀事件函數(shù)
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++; //更新處理的事件個數(shù),最后返回
}
}
/*先處理文件事件再處理時間事件*/
// 處理時間事件
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
IV资锰、處理新的連接
在initServer()
中敢课,Redis注冊了回調(diào)函數(shù)acceptTcpHandler()
,當有新的連接到來時,這個函數(shù)會被回調(diào)直秆,而aeProcessEvents
中的rfileProc()
實際上就是指向了acceptTcpHandler()
用以處理連接:
/* 創(chuàng)建一個TCP連接處理器 */
/* src/networking.c/acceptTcpHandler */
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
// anetTcpAccept接收客戶端請求濒募,封裝的accept函數(shù)
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
// 出錯
if (cfd == AE_ERR) {
redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);
return;
}
// 記錄
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 為客戶端創(chuàng)建客戶端狀態(tài)redisClient,下面說明
acceptCommonHandler(cfd,0);
}
anetTcpAccept()
即為accept
接收客戶端請求圾结,然后調(diào)用acceptCommonHandler()
處理接收到的cfd瑰剃,其中acceptCommonHandler
最重要的調(diào)用就是createClient
。Redis為每個客戶端連接筝野,都創(chuàng)建一個struct redisClient
結(jié)構(gòu)體:
/* 創(chuàng)建一個新的客戶端 */
/* src/networking.c/createClient */
redisClient *createClient(int fd) {
//為結(jié)構(gòu)體分配空間
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
/*當fd為-1時晌姚,則證明是偽客戶端,不需要socket歇竟;
* 當fd不為-1時挥唠,需要創(chuàng)建帶網(wǎng)絡連接的客戶端*/
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
//設置keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 為接收到的套接字注冊讀事件
// readQueryFromClient() 應該為讀取客戶端并查詢緩沖區(qū)的內(nèi)容
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
......
return c;
}
V、處理請求
readQueryFromClient()
獲取到客戶端緩沖區(qū)的內(nèi)容之后途蒋,會調(diào)用processInputBuffer()
函數(shù)進行命令解析,然后會調(diào)用processCommand()
函數(shù)處理命令:
/* 負責執(zhí)行讀取到的命令 */
/* src/redis.c/processCommand */
int processCommand(redisClient *c) {
......
// 查找命令馋记,redisClient.cmd 在此時賦值
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
//在命令表中查找命令
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
/*判斷命令是否合法*/
// 沒有找到命令
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
// 參數(shù)個數(shù)不符合
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
.....//一些判斷号坡,如集群,發(fā)布與訂閱等
// 加入命令隊列的梯醒,除去EXEC宽堆,MULTI,WATCH等事務命令茸习,if為事務操作畜隶,這里我們暫且不看,直接看else
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 事務命令入隊
queueMultiCommand(c);
addReply(c,shared.queued);
// 真正執(zhí)行命令号胚。
// 注意籽慢,如果是設置了多命令模式,那么不是直接執(zhí)行命令猫胁,而是讓命令入隊
} else { //非事務操作
call(c,REDIS_CALL_FULL); //執(zhí)行命令
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
processCommand
函數(shù)除了檢查之外箱亿,核心調(diào)用為call()
函數(shù),其對應了Redis的所有命令弃秆。
以set
請求為例届惋,會調(diào)用setCommand()
函數(shù):
/* 執(zhí)行set命令的調(diào)用*/
/* src/t_string.c/setCommand */
void setCommand(redisClient *c) {
......//一些判斷
//真正的命令執(zhí)行
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
/* src/t_string.c/setGenericCommand */
void setGenericCommand(redisClient *c, int flags, robj *key,
robj *val, robj *expire, int unit, robj *ok_reply,
robj *abort_reply) {
......
//將鍵值關聯(lián)到數(shù)據(jù)庫
setKey(c->db,key,val);
......
//回復結(jié)果
/* src/networking.c/addReply */
addReply(c, ok_reply ? ok_reply : shared.ok);
}
//將鍵值關聯(lián)到數(shù)據(jù)庫
/* src/db.c/setKey */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
......
}
在完成一系列檢查與轉(zhuǎn)化之后,調(diào)用setGenericCommand
菠赚,最后調(diào)用addReply()
函數(shù)脑豹,為客戶端連接的socket注冊可寫事件,將返回信息添加到回復緩沖區(qū)中衡查,回傳給客戶端瘩欺。
【參考】
[1] 《Redis設計與實現(xiàn)》
[2] 《Redis源碼日志》