首先從源碼中找入口
redis源碼:src/redis-cli.c中找到main函數(shù),main函數(shù)中核心的處理就是以下部分
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Ignore SIGPIPE in interactive mode to force a reconnect */
signal(SIGPIPE, SIG_IGN);
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
repl();
}
- cliConnect
主要是與服務端建立連接沥阳,每一個連接都會創(chuàng)建一個redisContext結(jié)構(gòu)來保存 - repl
repl實現(xiàn)了發(fā)送命令并輸出Server返回結(jié)果的主要邏輯
RedisContext
redisContext結(jié)構(gòu)如下,重要的字段都進行了注釋
typedef struct redisContext {
int err; /* Error flags, 0 when there is no error */
char errstr[128]; /* String representation of error when applicable */
int fd; // socket句柄自点,用戶連接redis server
int flags;
char *obuf; /* Write buffer */ //主要存儲發(fā)送的命令桐罕,resp協(xié)議封裝后的sds
redisReader *reader; /* Protocol reader */ // 存儲server返回的數(shù)據(jù)
enum redisConnectionType connection_type;
struct timeval *timeout;
struct {
char *host;
char *source_addr;
int port;
} tcp;
struct {
char *path;
} unix_sock;
} redisContext;
repl是做什么的
repl其實質(zhì)就是在不停的重復解析用戶輸入的命令和redis server返回的參數(shù)。repl中桂敛,實現(xiàn)這個核心操作的便是issueCommandRepeat方法功炮。
issueCommandRepeat方法我們直觀來想,需要做3步操作
- 從標準輸入獲取用戶輸入的命令和參數(shù)术唬,并按照resp協(xié)議封裝
- 將封裝后的數(shù)據(jù)發(fā)送至服務器
- 讀取從服務器返回的結(jié)果并解析輸出
cli如何封裝輸入的命令和參數(shù)
通過對issueCommandRepeat方法的分析薪伏,極其對它里邊調(diào)用關(guān)系的梳理,發(fā)現(xiàn)是redisAppendCommandArgv處理命令并將命令寫入context的obuf中粗仓,redisAppendCommandArgv的調(diào)用層級和源碼如下
issueCommandRepeat
cliSendCommand
redisAppendCommandArgv
redisFormatSdsCommandArgv
__redisAppendCommand
源碼:
sds cmd;
int len;
//redisFormatSdsCommandArgv是將命令極其跟隨的參數(shù)嫁怀,使用resp協(xié)議封裝后,存到一個sds結(jié)構(gòu)中借浊。
len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len == -1) {
__redisSetError(c,REDIS_ERR_OOM,"Out of memory");
return REDIS_ERR;
}
//__redisAppendCommand是將sds保存的resp協(xié)議的數(shù)據(jù)存到redisContext中的obuf中
if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
sdsfree(cmd);
return REDIS_ERR;
}
// 釋放sds結(jié)構(gòu)申請的內(nèi)存
sdsfree(cmd);
return REDIS_OK;
向服務器發(fā)送命令
有了封裝好的數(shù)據(jù)塘淑,下一步就是可以向服務器發(fā)送命令了,還是issueCommandRepeat方法蚂斤,看下述代碼
while(repeat-- > 0) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
while (config.monitor_mode) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
fflush(stdout);
}
if (config.pubsub_mode) {
if (config.output != OUTPUT_RAW)
printf("Reading messages... (press Ctrl-C to quit)\n");
while (1) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
}
}
if (config.slave_mode) {
printf("Entering replica output mode... (press Ctrl-C to quit)\n");
slaveMode();
config.slave_mode = 0;
zfree(argvlen);
return REDIS_ERR; /* Error = slaveMode lost connection to master */
}
if (cliReadReply(output_raw) != REDIS_OK) {
zfree(argvlen);
return REDIS_ERR;
} else {
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,"select") && argc == 2 && config.last_cmd_type != REDIS_REPLY_ERROR) {
config.dbnum = atoi(argv[1]);
cliRefreshPrompt();
} else if (!strcasecmp(command,"auth") && argc == 2) {
cliSelect();
}
}
if (config.interval) usleep(config.interval);
fflush(stdout); /* Make it grep friendly */
}
我們知道redisAppendCommandArgv只是組裝了命令存捺,并沒有發(fā)送,cliReadReply看樣子是將結(jié)果讀取曙蒸,最后fflush是將結(jié)果輸出到標準輸出捌治。那么發(fā)送命令只可能藏在cliReadReply中,繼續(xù)分析cliReadyReply
cliReadyReply
redisGetReply
在redisGetReply中發(fā)現(xiàn)特別隱藏的redisBufferWrite纽窟,這個實際是發(fā)送了請求肖油,我們看具體代碼
if (sdslen(c->obuf) > 0) {
// 發(fā)送命令
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1) {
if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nwritten > 0) {
// 發(fā)送成功,清理obuf
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
} else {
sdsrange(c->obuf,nwritten,-1);
}
}
}
讀取服務器返回的結(jié)果
繼續(xù)在redisGetReply讀代碼臂港,能夠看到redisBufferRead是獲取服務器返回的數(shù)據(jù)的方法森枪。
int redisBufferRead(redisContext *c) {
char buf[1024*16];
int nread;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
// 從fd讀取數(shù)據(jù)
nread = read(c->fd,buf,sizeof(buf));
if (nread == -1) {
if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nread == 0) {
__redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
return REDIS_ERR;
} else {
if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
}
return REDIS_OK;
}
從代碼可以看到如果讀取成功會調(diào)用redisReaderFeed將buf內(nèi)容寫入到redisContext中的reader里
redisGetReply中,又調(diào)用了redisGetReplyFromReader趋艘,redisReaderGetReply將返回的數(shù)據(jù)通過resp協(xié)議解析為字符串
int redisGetReplyFromReader(redisContext *c, void **reply) {
if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
return REDIS_OK;
}