重要說明散休,在看這篇文章之前,最好先通過剖析Redis RDB文件 了解RDB文件的結(jié)構(gòu)乐尊;
RDB相關(guān)源碼在rdb.c中戚丸;通過saveCommand(redisClient *c) 和bgsaveCommand(redisClient *c) 兩個方法可知,RDB持久化業(yè)務(wù)邏輯在rdbSave(server.rdb_filename)和rdbSaveBackground(server.rdb_filename這兩個方法中科吭;一個通過執(zhí)行"save"觸發(fā)昏滴,另一個通過執(zhí)行"bgsave"或者save seconds changes條件滿足時(在redis.c的serverCron中)觸發(fā):
redis.c里serverCron中通過調(diào)用rdbSaveBackground(server.rdb_filename)觸發(fā)bgsave的部分代碼:
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveBackground(server.rdb_filename);
break;
}
通過閱讀rdbSaveBackground(char *filename)的源碼可知,其最終的實(shí)現(xiàn)還是調(diào)用rdbSave(char *filename)对人,只不過是通過fork()出的子進(jìn)程來執(zhí)行罷了谣殊,所以bgsave和save的實(shí)現(xiàn)是殊途同歸:
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
// 如果已經(jīng)有RDB持久化任務(wù),那么rdb_child_pid的值就不是-1牺弄,那么返回REDIS_ERR姻几;
if (server.rdb_child_pid != -1) return REDIS_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
// 記錄RDB持久化開始時間
start = ustime();
//fork一個子進(jìn)程,
if ((childpid = fork()) == 0) {
// 如果fork()的結(jié)果childpid為0势告,即當(dāng)前進(jìn)程為fork的子進(jìn)程蛇捌,那么接下來調(diào)用rdbSave()進(jìn)程持久化;
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
// bgsave事實(shí)上就是通過fork的子進(jìn)程調(diào)用rdbSave()實(shí)現(xiàn), rdbSave()就是save命令業(yè)務(wù)實(shí)現(xiàn)咱台;
retval = rdbSave(filename);
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
// RDB持久化成功后络拌,如果是notice級別的日志,那么log輸出RDB過程中copy-on-write使用的內(nèi)存
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
exitFromChild((retval == REDIS_OK) ? 0 : 1);
} else {
// 父進(jìn)程更新redisServer記錄一些信息回溺,例如:fork進(jìn)程消耗的時間stat_fork_time,
/* Parent */
server.stat_fork_time = ustime()-start;
// 更新redisServer記錄fork速率:每秒多少G春贸;zmalloc_used_memory()的單位是字節(jié)混萝,所以通過除以(1024*1024*1024),得到GB;由于記錄的fork_time即fork時間是微妙萍恕,所以*1000000逸嘀,得到每秒鐘fork多少GB的速度;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
// 如果fork子進(jìn)程出錯允粤,即childpid為-1崭倘,更新redisServer,記錄最后一次bgsave狀態(tài)是REDIS_ERR;
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
// 最后在redisServer中記錄的save開始時間重置為空类垫,并記錄執(zhí)行bgsave的子進(jìn)程id司光,即child_pid;
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = REDIS_RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
RDB持久化實(shí)現(xiàn):
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success. */
int rdbSave(char *filename) {
char tmpfile[256];
FILE *fp;
rio rdb;
int error;
// 文件臨時文件名為temp-${pid}.rdb
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
rioInitWithFile(&rdb,fp);
// RDB持久化的核心實(shí)現(xiàn)阔挠;
if (rdbSaveRio(&rdb,&error) == REDIS_ERR) {
errno = error;
goto werr;
}
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
// 重命名rdb文件的命名飘庄;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return REDIS_ERR;
}
rdbSaveRio--RDB持久化實(shí)現(xiàn)的核心代碼--根據(jù)RDB文件協(xié)議將所有redis中的key-value寫入rdb文件中:
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success REDIS_OK is returned, otherwise REDIS_ERR
* is returned and part of the output, or all the output, can be
* missing because of I/O errors.
*
* When the function returns REDIS_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
long long now = mstime();
uint64_t cksum;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
// rdb文件中最先寫入的內(nèi)容就是magic,magic就是REDIS這個字符串+4位版本號
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
// 遍歷所有db重寫rdb文件购撼;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
// 如果db的size為0跪削,即沒有任何key,那么跳過迂求,遍歷下一個db碾盐;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (!di) return REDIS_ERR;
// 寫入REDIS_RDB_OPCODE_SELECTDB,這個值redis定義為254揩局,即FE毫玖,再通過rdbSaveLen合入當(dāng)前dbnum,例如當(dāng)前db為0凌盯,那么寫入FE 00
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
// 如注釋所表達(dá)的付枫,迭代遍歷db這個dict的每一個entry;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
// 先得到當(dāng)前entry的key(sds類型)和value(redisObject類型)驰怎;
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
// 從redisDb的expire這個dict中查詢過期時間屬性值阐滩;
expire = getExpire(db,&key);
// 每個entry(redis中的key和其value)rdb持久化的核心代碼
if (rdbSaveKeyValuePair(rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
// 遍歷所有db后,寫入EOF這個opcode县忌,REDIS_RDB_OPCODE_EOF申明為255掂榔,即FF,所以是寫入FF到rdb文件中症杏;FF是redis對rdb文件結(jié)束的定義装获;
/* EOF opcode */
if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
// 最后寫入8個字節(jié)長度的checksum值到rdb文件尾部;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return REDIS_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
每個entry(key-value)rdb持久化的核心代碼:
/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
long long expiretime, long long now)
{
/* Save the expire time */
if (expiretime != -1) {
// 如果過期時間少于當(dāng)前時間厉颤,那么表示該key已經(jīng)失效穴豫,返回不做任何保存;
/* If this key is already expired skip it */
if (expiretime < now) return 0;
// 如果當(dāng)前遍歷的entry有失效時間屬性逼友,那么保存REDIS_RDB_OPCODE_EXPIRETIME_MS即252精肃,即"FC"以及失效時間到rdb文件中潘鲫,
if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
// 接下來保存redis key的類型,key肋杖,以及value到rdb文件中;
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1;
return 1;
}
通過上面的源碼分析得到最終rdb文件的格式如下:
REDIS // RDB協(xié)議約束的固定字符串
0006 // redis的版本號
FE 00 // 表示當(dāng)前接下來的key都是db=0中的key挖函;
FC 1506327609 // 表示key失效時間點(diǎn)為1506327609
0 // 表示key的屬性是string類型状植;
username // key
afei // value
FF // 表示遍歷完成
y73e9iq1 // checksum值
備注:
#define REDIS_RDB_TYPE_STRING 0
#define REDIS_RDB_TYPE_LIST 1
#define REDIS_RDB_TYPE_SET 2
#define REDIS_RDB_TYPE_ZSET 3
#define REDIS_RDB_TYPE_HASH 4