分析從客戶端發(fā)送命令棺妓,到服務(wù)端執(zhí)行命令价涝、返回執(zhí)行結(jié)果經(jīng)歷的整個(gè)過(guò)程且改。
建立連接
無(wú)論是redis-cli還是Jedis這樣的三方包客服端验烧,要向Redis服務(wù)器發(fā)送命令,首先要建立與Redis服務(wù)器之間的TCP連接又跛。在分析Redis啟動(dòng)過(guò)程時(shí)碍拆,初始化這一步會(huì)注冊(cè)事件處理器:
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
在Redis配置文件中有一項(xiàng)
bind
配置,通過(guò)bind
可以配置監(jiān)聽(tīng)來(lái)自哪些網(wǎng)絡(luò)接口請(qǐng)求。Redis在啟動(dòng)時(shí)會(huì)監(jiān)聽(tīng)這些接口倔监,將fds保存在server.ipfd
數(shù)組中直砂。
Redis循環(huán)所有的網(wǎng)絡(luò)接口,為這些接口綁定AE_READABLE(可讀事件)浩习,事件的處理器是acceptTcpHandler
静暂。
aeCreateFileEvent
函數(shù)會(huì)為每個(gè)對(duì)應(yīng)的fd綁定一個(gè)aeFileEvent
,aeFileEvent
中綁定了處理讀寫(xiě)事件的函數(shù)谱秽。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
//根據(jù)fd值獲取aeFileEvent洽蛀,后面用來(lái)綁定aeFileProc
aeFileEvent *fe = &eventLoop->events[fd];
//注冊(cè)事件,取決于具體實(shí)現(xiàn)(epoll疟赊、select等)
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
//這里注冊(cè)的事讀事件郊供,aeFileProc是acceptTcpHandler
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
當(dāng)有客戶端嘗試連接Redis服務(wù)器時(shí),aeApiPoll
函數(shù)會(huì)返回1近哟,并從eventLoop->fired[j]
中獲取發(fā)生事件的fd驮审,進(jìn)而獲取到對(duì)應(yīng)的aeFileEvent
。因?yàn)橹霸趕erverSocket上注冊(cè)的是AE_READABLE
事件吉执,所以調(diào)用fe->rfileProc
處理客戶端連接(acceptTcpHandler
)疯淫。
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
...
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
...
}
在acceptTcpHandler
函數(shù)中,會(huì)創(chuàng)建與客戶端之間的socket連接、調(diào)用createClient
創(chuàng)建客戶端:
if ((c = createClient(fd)) == NULL) {
...
return;
}
Redis在調(diào)用
createClient
后會(huì)判斷客戶端數(shù)量,如果超過(guò)上限會(huì)返回錯(cuò)誤信息并關(guān)閉客戶端义辕。因?yàn)樵?code>createClient中已經(jīng)將socket設(shè)置為non-bolcking互纯,如果超出上限在向客戶端寫(xiě)入錯(cuò)誤信息時(shí),不會(huì)被阻塞。
在createClient函數(shù)中注冊(cè)了readQueryFromClient
處理器并初始化客戶端:
//在socket上注冊(cè)讀事件處理器
if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
...
//設(shè)置默認(rèn)db
selectDb(c,0);
//客戶端id
c->id = server.next_client_id++;
//客戶端命令緩沖區(qū)
c->querybuf = sdsempty();
//命令參數(shù)個(gè)數(shù)
c->argc = 0;
//參數(shù)列表
c->argv = NULL;
//回復(fù)列表
c->reply = listCreate();
//回復(fù)列表長(zhǎng)度
c->reply_bytes = 0;
//watch命令的key
c->watched_keys = listCreate();
//精確訂閱的頻道
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
//匹配模式訂閱的頻道
c->pubsub_patterns = listCreate();
執(zhí)行到這里客戶端與服務(wù)器連接建立完成,服務(wù)器已經(jīng)為客戶端socket注冊(cè)了命令處理器,等待客戶端發(fā)送命令缆镣。
客戶端發(fā)送命令
客戶端向socket寫(xiě)入RESP協(xié)議格式的命令,等待服務(wù)器返回執(zhí)行結(jié)果肌似。RESP協(xié)議可以參考這里Redis協(xié)議:RESP费就。
假設(shè)客戶端向服務(wù)器發(fā)送了命令:
SET simpleKey simpleValue
轉(zhuǎn)換為RESP協(xié)議之后是:
*3\r\n$3\r\nSET\r\n$9\r\nsimpleKey\r\n$11\r\nsimpleValue\r\n
服務(wù)器解析命令
在經(jīng)歷了N個(gè)事件循環(huán)之后,客戶端發(fā)送的請(qǐng)求終于到達(dá)了服務(wù)器川队,還是老位置:
numevents = aeApiPoll(eventLoop, tvp);
只是這次事件處理器變成了readQueryFromClient
力细。在readQueryFromClient
函數(shù)中會(huì)讀取readlen個(gè)字節(jié)到c->querybuf
中,默認(rèn)readlen長(zhǎng)度為1024*16固额。
//獲取當(dāng)前長(zhǎng)度
qblen = sdslen(c->querybuf);
//計(jì)算峰值
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
//擴(kuò)展sds長(zhǎng)度眠蚂,以保存readlen字節(jié)的內(nèi)容
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//讀取數(shù)據(jù)
nread = read(fd, c->querybuf+qblen, readlen);
如果c->querybuf
的長(zhǎng)度沒(méi)有超過(guò)server.client_max_querybuf_len
的限制,就會(huì)開(kāi)始解析c->querybuf
中的內(nèi)容斗躏。
在processInputBuffer
函數(shù)中逝慧,會(huì)一直循環(huán)處理c->querybuf
中的內(nèi)容,直到全部處理完,或者processInlineBuffer
和processMultibulkBuffer
返回C_ERR(c->flags變化也會(huì)導(dǎo)致退出循環(huán)笛臣,但跟處理命令流程關(guān)系不大云稚,這里不展開(kāi))。
void processInputBuffer(client *c) {
//設(shè)置當(dāng)前正在處理的client
server.current_client = c;
//當(dāng)client緩沖中有內(nèi)容時(shí)循環(huán)
while(sdslen(c->querybuf)) {
...
//判斷命令類型
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
...
//解析命令到argc和argv
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
if (c->argc == 0) {
resetClient(c);
} else {
//執(zhí)行命令
if (processCommand(c) == C_OK)
//重置客戶端沈堡,記錄precmd静陈,釋放參數(shù)列表等
resetClient(c);
}
}
server.current_client = NULL;
}
絕大多數(shù)Redis命令都是PROTO_REQ_MULTIBULK
類型的,例如上面的SET
命令诞丽,會(huì)通過(guò)processMultibulkBuffer
來(lái)解析鲸拥,整個(gè)解析大致可以分為兩個(gè)步驟,確認(rèn)有多少個(gè)bulk和確認(rèn)每個(gè)bulk的長(zhǎng)度僧免,對(duì)照RESP協(xié)議:
*3\r\n$3\r\nSET\r\n$9\r\nsimpleKey\r\n$11\r\nsimpleValue\r\n
第一步是讀取*3\r\n
刑赶,確認(rèn)有3個(gè)bulk。之后是循環(huán)解析$3\r\n
$9\r\n
$11\r\n
懂衩,確認(rèn)3個(gè)bulk的長(zhǎng)度撞叨,將解析結(jié)果放入argc(數(shù)量)和argv(參數(shù)列表)中:
c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen);
解析結(jié)果argc=3,argv=['SET', 'simpleKey', 'simpleValue']勃痴。
實(shí)際上argv是redisObject結(jié)構(gòu)的數(shù)據(jù)谒所,這里為了方便理解直接用數(shù)組結(jié)構(gòu)表達(dá)。
服務(wù)器執(zhí)行命令
在上一步中Redis服務(wù)器已經(jīng)將客戶端的請(qǐng)求解析完成沛申,參數(shù)保存在client的argv中。在processCommand
函數(shù)中首先會(huì)通過(guò)argv[0]在命令字典中找到對(duì)應(yīng)的命令然后做一系列的判斷姐军,例如client是否通過(guò)auth驗(yàn)證铁材、命令參數(shù)個(gè)數(shù)是否正確、是否開(kāi)啟了集群功能需要轉(zhuǎn)向請(qǐng)求奕锌、服務(wù)器最大內(nèi)存限制判斷等等著觉,這里只專注于命令執(zhí)行,簡(jiǎn)化后的代碼如下:
int processCommand(client *c) {
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
...
} else {
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}
else分支中的call函數(shù)真正調(diào)用了命令執(zhí)行函數(shù):
c->cmd->proc(c);
client的cmd是一個(gè)redisCommand
結(jié)構(gòu)變量惊暴,它的結(jié)構(gòu)是:
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};
proc可以在server.c文件中的redisCommandTable
中找到:
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}
setCommand
屬于String類型值的命令饼丘,可以在t_string.c中找到。setCommand
函數(shù)中會(huì)針對(duì)NX EX expire等進(jìn)行判斷辽话,最終通過(guò)dict的setKey函數(shù)設(shè)置鍵值對(duì)肄鸽,更新server.dirty值:
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
...
setKey(c->db,key,val);
server.dirty++;
...
addReply(c, ok_reply ? ok_reply : shared.ok);
}
c->cmd->proc(c)
執(zhí)行完成后,call函數(shù)中還要進(jìn)行一些收尾工作:
- 判斷是否需要寫(xiě)slowlog油啤。
- 更新cmd總執(zhí)行時(shí)間和次數(shù)典徘。
- 向Slave和AOF傳播命令。
服務(wù)器返回結(jié)果
在setCommand
函數(shù)中益咬,調(diào)用了addReply
函數(shù)向client的輸出緩沖或reply中寫(xiě)入返回結(jié)果逮诲,返回結(jié)果為shared.ok
:
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
}
...
}
prepareClientToWrite
函數(shù)中會(huì)判斷client的flag,如果符合條件將當(dāng)前client放入server.clients_pending_write
鏈表。
有幾種情況會(huì)跳過(guò):
- 如果是執(zhí)行l(wèi)ua腳本的client梅鹦;
- 如果client設(shè)置了CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP裆甩;
- 如果client是連接master的。
以上幾種情況不需要回復(fù)齐唆。
_addReplyToBuffer
函數(shù)會(huì)嘗試將命令結(jié)果放入輸出緩沖(c->buf)中嗤栓,如果不成功(c-reply中有內(nèi)容,或者超過(guò)緩沖大小)蝶念,會(huì)調(diào)用_addReplyObjectToList
函數(shù)放入c->reply鏈表中抛腕。
以上內(nèi)容與命令執(zhí)行在一次事件循環(huán)中,因?yàn)樗闶禽敵鰣?zhí)行結(jié)果的一部分媒殉,所以到了返回結(jié)果的一節(jié)中担敌。
再貼一下事件循環(huán)的代碼,在aeProcessEvents
執(zhí)行前會(huì)先執(zhí)行eventLoop->beforesleep
函數(shù)廷蓉,這個(gè)函數(shù)在main函數(shù)中指定全封,是beforeSleep
。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
在beforeSleep
函數(shù)中與輸出結(jié)果相關(guān)的是調(diào)用了handleClientsWithPendingWrites
函數(shù)桃犬。
int handleClientsWithPendingWrites(void) {
...
//獲取待輸出結(jié)果的client數(shù)量
int processed = listLength(server.clients_pending_write);
...
while((ln = listNext(&li))) {
...
//輸出buf內(nèi)容
if (writeToClient(c->fd,c,0) == C_ERR) continue;
...
//注冊(cè)寫(xiě)事件處理器
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
return processed;
}
代碼中調(diào)用了writeToClient
函數(shù)輸出結(jié)果刹悴,如果調(diào)用writeToClient
后還有待輸出內(nèi)容,則為client注冊(cè)寫(xiě)事件處理器sendReplyToClient
攒暇。
int writeToClient(int fd, client *c, int handler_installed) {
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
sds o;
//如果client還有待輸出結(jié)果土匀,執(zhí)行循環(huán)
while(clientHasPendingReplies(c)) {
//先檢查buf中是否有內(nèi)容
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
//統(tǒng)計(jì)本次一共輸出了多少子節(jié)
totwritten += nwritten;
//如果輸出子節(jié)與buf中數(shù)量一直,代表緩沖內(nèi)容已經(jīng)全部輸出
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
//檢查c->reply中
} else {
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o);
nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objlen;
}
}
server.stat_net_output_bytes += totwritten;
//如果輸出的字節(jié)數(shù)量已經(jīng)超過(guò)NET_MAX_WRITES_PER_EVENT限制形用,break
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
...
return C_OK;
}
在writeToClient
函數(shù)中就轧,會(huì)檢查緩沖和reploy中的內(nèi)容,向客戶端輸出內(nèi)容田度。如果超過(guò)了每次輸出的最大值NET_MAX_WRITES_PER_EVENT
會(huì)跳出循環(huán)妒御。
如果緩沖區(qū)和reply中的內(nèi)容沒(méi)有輸出完,handleClientsWithPendingWrites
函數(shù)中會(huì)為client關(guān)聯(lián)寫(xiě)事件處理器sendReplyToClient
镇饺,在后面的事件循環(huán)中socket會(huì)返回并調(diào)用sendReplyToClient
繼續(xù)輸出乎莉。sendReplyToClient
函數(shù)內(nèi)部直接調(diào)用了writeToClient
函數(shù),區(qū)別是參數(shù)handler_installed
不同奸笤,需要對(duì)事件處理器做額外的處理:
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
//如果內(nèi)容已經(jīng)全部輸出惋啃,刪除事件處理器
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}
}
客戶端接收命令返回結(jié)果
client可以從socket中收到server返回的RESP結(jié)果的返回結(jié)果,經(jīng)過(guò)轉(zhuǎn)換返回給調(diào)用端或者在控制臺(tái)輸出揭保。