系列
redis數(shù)據(jù)淘汰原理
redis過期數(shù)據(jù)刪除策略
redis server事件模型
redis cluster mget 引發(fā)的討論
redis 3.x windows 集群搭建
redis 命令執(zhí)行過程
redis string底層數(shù)據(jù)結(jié)構(gòu)
redis list底層數(shù)據(jù)結(jié)構(gòu)
redis hash底層數(shù)據(jù)結(jié)構(gòu)
redis set底層數(shù)據(jù)結(jié)構(gòu)
redis zset底層數(shù)據(jù)結(jié)構(gòu)
redis 客戶端管理
redis 主從同步-slave端
redis 主從同步-master端
redis 主從超時(shí)檢測(cè)
redis aof持久化
redis rdb持久化
redis 數(shù)據(jù)恢復(fù)過程
redis TTL實(shí)現(xiàn)原理
redis cluster集群建立
redis cluster集群選主
redis aof緩存數(shù)據(jù)結(jié)構(gòu)
?redis用于存儲(chǔ)aof內(nèi)存數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)是aof_buf數(shù)據(jù)結(jié)構(gòu)睬关,所有數(shù)據(jù)先追加到內(nèi)存的aof_buf后嗦玖,再通過定時(shí)任務(wù)檢查是否能夠持久化到磁盤文件當(dāng)中狐赡。
struct redisServer {
// AOF 緩沖區(qū)
sds aof_buf; /* AOF buffer, written before entering the event loop */
redis aof內(nèi)存化
?redis aof內(nèi)存化的操作主要有以下三部曲:
- 執(zhí)行redis命令后開始保存數(shù)據(jù)至內(nèi)存當(dāng)中的aof_buf當(dāng)中
- 將執(zhí)行的命令解析成redis的命令格式// 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
- 保存數(shù)據(jù)至aof_buf當(dāng)中
/*
* 將指定命令(以及執(zhí)行該命令的上下文杏节,比如數(shù)據(jù)庫 id 等信息)傳播到 AOF 和 slave 谤民。
* FLAG 可以是以下標(biāo)識(shí)的 xor :
* + REDIS_PROPAGATE_NONE (no propagation of command at all)
* 不傳播
* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
* 傳播到 AOF
* + REDIS_PROPAGATE_REPL (propagate into the replication link)
* 傳播到 slave
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
// 傳播到 AOF
if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
}
?執(zhí)行的操作主要是解析成redis的命令格式并保存到內(nèi)存的aof_buf當(dāng)中慎框。
/*
* 將命令追加到 AOF 文件中辆影,
* 如果 AOF 重寫正在進(jìn)行徒像,那么也將命令追加到 AOF 重寫緩存中。
*/
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/*
* 使用 SELECT 命令蛙讥,顯式設(shè)置數(shù)據(jù)庫厨姚,確保之后的命令被設(shè)置到正確的數(shù)據(jù)庫
*/
if (dictid != server.aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}
// EXPIRE 、 PEXPIRE 和 EXPIREAT 命令
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT
*
* 將 EXPIRE 键菱、 PEXPIRE 和 EXPIREAT 都翻譯成 PEXPIREAT
*/
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
// SETEX 和 PSETEX 命令
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
/* Translate SETEX/PSETEX to SET and PEXPIREAT
*
* 將兩個(gè)命令都翻譯成 SET 和 PEXPIREAT
*/
// SET
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
// PEXPIREAT
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
// 其他命令
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
/*
* 將命令追加到 AOF 緩存中谬墙,
* 在重新進(jìn)入事件循環(huán)之前,這些命令會(huì)被沖洗到磁盤上经备,
* 并向客戶端返回一個(gè)回復(fù)拭抬。
*/
if (server.aof_state == REDIS_AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/*
* 如果 BGREWRITEAOF 正在進(jìn)行,
* 那么我們還需要將命令追加到重寫緩存中侵蒙,
* 從而記錄當(dāng)前正在重寫的 AOF 文件和數(shù)據(jù)庫當(dāng)前狀態(tài)的差異造虎。
*/
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
// 釋放
sdsfree(buf);
}
?解析成redis的命令格式:例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
/*
* 根據(jù)傳入的命令和命令參數(shù),將它們還原成協(xié)議格式纷闺。
*/
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;
// 重建命令的個(gè)數(shù)算凿,格式為 *<count>\r\n
// 例如 *3\r\n
buf[0] = '*';
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
// 重建命令和命令參數(shù),格式為 $<length>\r\n<content>\r\n
// 例如 $3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nVALUE\r\n
for (j = 0; j < argc; j++) {
o = getDecodedObject(argv[j]);
// 組合 $<length>\r\n
buf[0] = '$';
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
// 組合 <content>\r\n
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
}
// 返回重建后的協(xié)議內(nèi)容
return dst;
}
redis aof持久化
?serverCron內(nèi)部定期執(zhí)行flushAppendOnlyFile,這里的if判斷是判斷是否延遲執(zhí)行犁功,暫且忽略這個(gè)判斷而認(rèn)為每次都會(huì)進(jìn)行aof持久化氓轰。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// 根據(jù) AOF 政策,
// 考慮是否需要將 AOF 緩沖區(qū)中的內(nèi)容寫入到 AOF 文件中
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
}
?執(zhí)行aof內(nèi)存數(shù)據(jù)持久化的過程浸卦,考慮異常情況的情況主要分為以下步驟:
- 寫入aof_buf數(shù)據(jù)到aof_fd代表的aof持久化文件當(dāng)中
- 處理aof寫入異常的情況署鸡,嘗試修復(fù)失敗后會(huì)移除失敗的命令
- 更新aof相關(guān)的統(tǒng)計(jì)參數(shù)
- 如果aof_buf數(shù)據(jù)過大那么就清空aof_buf的內(nèi)容
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
// 緩沖區(qū)中沒有任何內(nèi)容,直接返回
if (sdslen(server.aof_buf) == 0) return;
// 策略為每秒 FSYNC
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
// 是否有 SYNC 正在后臺(tái)進(jìn)行?
sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
// 每秒 fsync 靴庆,并且強(qiáng)制寫入為假
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/*
* 當(dāng) fsync 策略為每秒鐘一次時(shí)时捌, fsync 在后臺(tái)執(zhí)行。
*
* 如果后臺(tái)仍在執(zhí)行 FSYNC 炉抒,那么我們可以延遲寫操作一兩秒
* (如果強(qiáng)制執(zhí)行 write 的話奢讨,服務(wù)器主線程將阻塞在 write 上面)
*/
if (sync_in_progress) {
// 有 fsync 正在后臺(tái)進(jìn)行 。焰薄。禽笑。
if (server.aof_flush_postponed_start == 0) {
/*
* 前面沒有推遲過 write 操作,這里將推遲寫操作的時(shí)間記錄下來
* 然后就返回蛤奥,不執(zhí)行 write 或者 fsync
*/
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/*
* 如果之前已經(jīng)因?yàn)?fsync 而推遲了 write 操作
* 但是推遲的時(shí)間不超過 2 秒,那么直接返回
* 不執(zhí)行 write 或者 fsync
*/
return;
}
/*
* 如果后臺(tái)還有 fsync 在執(zhí)行僚稿,并且 write 已經(jīng)推遲 >= 2 秒
* 那么執(zhí)行寫操作(write 將被阻塞)
*/
server.aof_delayed_fsync++;
}
}
/*
* 執(zhí)行到這里凡桥,程序會(huì)對(duì) AOF 文件進(jìn)行寫入。
*
* 清零延遲 write 的時(shí)間記錄
*/
server.aof_flush_postponed_start = 0;
/*
* 執(zhí)行單個(gè) write 操作蚀同,如果寫入設(shè)備是物理的話缅刽,那么這個(gè)操作應(yīng)該是原子的
*
* 當(dāng)然,如果出現(xiàn)像電源中斷這樣的不可抗現(xiàn)象蠢络,那么 AOF 文件也是可能會(huì)出現(xiàn)問題的
* 這時(shí)就要用 redis-check-aof 程序來進(jìn)行修復(fù)衰猛。
*/
nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
if (nwritten != (signed)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
// 將日志的記錄頻率限制在每行 AOF_WRITE_LOG_ERROR_RATE 秒
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
// 如果寫入出錯(cuò),那么嘗試將該情況寫入到日志里面
if (nwritten == -1) {
if (can_log) {
redisLog(REDIS_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
redisLog(REDIS_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
// 嘗試移除新追加的不完整內(nèi)容
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
redisLog(REDIS_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftrunacate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
// 處理寫入 AOF 文件時(shí)出現(xiàn)的錯(cuò)誤
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
redisLog(REDIS_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
server.aof_last_write_status = REDIS_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
// 寫入成功刹孔,更新最后寫入狀態(tài)
if (server.aof_last_write_status == REDIS_ERR) {
redisLog(REDIS_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = REDIS_OK;
}
}
// 更新寫入后的 AOF 文件大小
server.aof_current_size += nwritten;
/*
* 如果 AOF 緩存的大小足夠小的話啡省,那么重用這個(gè)緩存,
* 否則的話髓霞,釋放 AOF 緩存卦睹。
*/
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
// 清空緩存中的內(nèi)容,等待重用
sdsclear(server.aof_buf);
} else {
// 釋放緩存
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
/*
* 如果 no-appendfsync-on-rewrite 選項(xiàng)為開啟狀態(tài)方库,
* 并且有 BGSAVE 或者 BGREWRITEAOF 正在進(jìn)行的話结序,
* 那么不執(zhí)行 fsync
*/
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;
// 總是執(zhí)行 fsnyc
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
// 更新最后一次執(zhí)行 fsnyc 的時(shí)間
server.aof_last_fsync = server.unixtime;
// 策略為每秒 fsnyc ,并且距離上次 fsync 已經(jīng)超過 1 秒
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
// 放到后臺(tái)執(zhí)行
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
// 更新最后一次執(zhí)行 fsync 的時(shí)間
server.aof_last_fsync = server.unixtime;
}
}