搭建環(huán)境
為了測試Redis主從復(fù)制功能分飞,需要在本地啟動master和slave兩個Redis實例烙如。這里使用docker創(chuàng)建了兩個容器:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
372cbdc31eb2 redis "docker-entrypoint.sh" 4 days ago Up 5 hours 127.0.0.1:7001->6379/tcp slave
ef5b6b9dce8a redis "docker-entrypoint.sh" 4 days ago Up 5 hours 127.0.0.1:7000->6379/tcp master
使用redis-cli連接上slave redis server,發(fā)送slaveof命令:
127.0.0.1:7001> slaveof 172.17.0.2 6379
=> OK
在master上可以看到日志:
1:M 10 Sep 07:38:43.652 * Slave 172.17.0.3:6379 asks for synchronization
1:M 10 Sep 07:38:43.652 * Full resync requested by slave 172.17.0.3:6379
1:M 10 Sep 07:38:43.652 * Starting BGSAVE for SYNC with target: disk
1:M 10 Sep 07:38:43.655 * Background saving started by pid 21
21:C 10 Sep 07:38:43.661 * DB saved on disk
21:C 10 Sep 07:38:43.661 * RDB: 6 MB of memory used by copy-on-write
1:M 10 Sep 07:38:43.750 * Background saving terminated with success
1:M 10 Sep 07:38:43.753 * Synchronization with slave 172.17.0.3:6379 succeeded
在master上執(zhí)行命令:
set hello world
=> OK
在slave上執(zhí)行命令:
get hello
=> "world"
Redis主從復(fù)制配置完成隘马。
另外一種方式是啟動Redis時制定配置文件,在配置中修改:
slaveof <masterip> <masterport>
分析主從復(fù)制過程
主從復(fù)制可以看作是一次數(shù)據(jù)遷移,涉及存量數(shù)據(jù)同步和增量數(shù)據(jù)同步兩步外莲。
master日志
從master的日志中可以了解主從復(fù)制的大致過程。
1:M 10 Sep 07:38:43.652 * Slave 172.17.0.3:6379 asks for synchronization
slave 172.17.0.3:6379向master請求數(shù)據(jù)同步兔朦。
1:M 10 Sep 07:38:43.652 * Full resync requested by slave 172.17.0.3:6379
master判斷需要進行一次full sync偷线。
1:M 10 Sep 07:38:43.652 * Starting BGSAVE for SYNC with target: disk
1:M 10 Sep 07:38:43.655 * Background saving started by pid 21
21:C 10 Sep 07:38:43.661 * DB saved on disk
21:C 10 Sep 07:38:43.661 * RDB: 6 MB of memory used by copy-on-write
1:M 10 Sep 07:38:43.750 * Background saving terminated with success
master開始執(zhí)行BGSAVE,為存量數(shù)據(jù)生成一份rdb文件沽甥。
1:M 10 Sep 07:38:43.753 * Synchronization with slave 172.17.0.3:6379 succeeded
master將rdb文件發(fā)送給slave声邦,并建立增量同步。
slave端
主從復(fù)制的入口是salve服務(wù)器向master發(fā)送salveof命令摆舟。server.c文件中的redisCommandTable中可以找到slaveof命令對應(yīng)的處理函數(shù)是slaveofCommand
亥曹。在函數(shù)中首先會判斷命令是否是slaveof no one
邓了,如果不是進入else邏輯,解析slaveof命令中的ip地址和端口號:
void slaveofCommand(client *c) {
//判斷命令是否是slaveof no one
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
...
} else {
long port;
if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
return;
//判斷是否已經(jīng)存在master媳瞪,并且與當(dāng)前命令指向的master一致
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}
//設(shè)置master屬性
replicationSetMaster(c->argv[1]->ptr, port);
...
}
//向客戶端返回ok
addReply(c,shared.ok);
}
replicationSetMaster
函數(shù)中會將master的ip與port保存到server的masterhost和masterport屬性中骗炉,并初始化復(fù)制狀態(tài)repl_state為REPL_STATE_CONNECT
,偏移量信息master_repl_offset
(從master復(fù)制的字節(jié)數(shù))和repl_down_since
(時間戳)蛇受。設(shè)置完成后向客戶端返回OK句葵。
在下一次事件循環(huán)中processTimeEvents
會檢查時間事件鏈表。在服務(wù)器啟動時注冊過時間事件serverCrond
函數(shù):
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
serverCron
函數(shù)中兢仰,會調(diào)用復(fù)制相關(guān)的函數(shù)replicationCron
:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
run_with_period(1000) replicationCron();
...
}
在replicationCron
函數(shù)中乍丈,會判斷復(fù)制狀態(tài),如果是REPL_STATE_CONNECT
那么開始創(chuàng)建與master之間的連接:
void replicationCron(void) {
...
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}
...
}
整個主從復(fù)制的準備過程稱為replication handshake
旨别,在過程中server.repl_state
會發(fā)生一系列狀態(tài)變化诗赌,每個狀態(tài)會執(zhí)行不同操作以達到下一個狀態(tài),具體狀態(tài)變化如下(其中虛線代表狀態(tài)變化不是在一個事件循環(huán)中):
其中最關(guān)鍵的操作是發(fā)送PSYNC
命令:
int slaveTryPartialResynchronization(int fd, int read_reply) {
...
if (!read_reply) {
server.repl_master_initial_offset = -1;
//如果之前有連接過master秸弛,取出cache中的runid和offset用于判斷是否能部分同步
if (server.cached_master) {
psync_runid = server.cached_master->replrunid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
} else {
//第一次連接master必然需要全量同步
psync_runid = "?";
memcpy(psync_offset,"-1",3);
}
//發(fā)送psync命令
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_runid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
}
slaveTryPartialResynchronization
函數(shù)首先會判斷是否存在cached_master铭若,如果存在會在發(fā)送psync命令時帶上runid和offset,讓master判斷是進行全量同步還是增量同步递览。
在下一次事件循環(huán)中slaveTryPartialResynchronization
函數(shù)會讀取master的響應(yīng)叼屠,master的響應(yīng)會有幾種:PSYNC_WAIT_REPLY, PSYNC_CONTINUE, PSYNC_NOT_SUPPORTED, PSYNC_FULLRESYNC, PSYNC_WRITE_ERROR
。
PSYNC_WAIT_REPLY
:本次未讀到內(nèi)容绞铃,下次事件繼續(xù)镜雨。
PSYNC_CONTINUE
:增量同步。
PSYNC_NOT_SUPPORTED
:不支持PSYNC命令儿捧,重新發(fā)送SYNC命令荚坞。
PSYNC_FULLRESYNC
:全量同步。
PSYNC_WRITE_ERROR
:錯誤菲盾。
當(dāng)master支持psync命令颓影,且slave是第一次與master建立主從同步關(guān)系時,slaveTryPartialResynchronization
函數(shù)會創(chuàng)建tmpfile懒鉴,用于接收master發(fā)來的rdb文件诡挂。同時注冊可讀事件readSyncBulkPayload
函數(shù),并將server.repl_state更新為REPL_STATE_TRANSFER
临谱。
slaveTryPartialResynchronization
函數(shù)會在每次文件事件觸發(fā)時璃俗,讀取master發(fā)送過來的rdb文件。接收完成后會清空db悉默,使用master發(fā)送來的rdb文件初始化數(shù)據(jù)庫城豁,將repl_state改為REPL_STATE_CONNECTED
。至此全量數(shù)據(jù)同步完成麦牺,進入增量數(shù)據(jù)同步钮蛛。
master端
在slave的connectWithMaster
函數(shù)中鞭缭,會創(chuàng)建與master的tcp連接。master會為slave創(chuàng)建一個client保存到客戶端列表中魏颓,過程參考Redis命令處理流程分析岭辣。
Redis的復(fù)制可以是master->slave->slave這種模式,下面的代碼過濾了這種模式的分支處理代碼甸饱,只保留master->slave模式分支代碼沦童。
在master端,slave的client也有一套狀態(tài)變化:
當(dāng)slave向master發(fā)送psync或者sync命令時會調(diào)用syncCommand
函數(shù):
void syncCommand(client *c) {
...
//如果當(dāng)前client中還有未發(fā)送的內(nèi)容叹话,不能進行sync操作偷遗,否則會導(dǎo)致增量數(shù)據(jù)不一致
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
//master日志中的第一條
serverLog(LL_NOTICE,"Slave %s asks for synchronization",
replicationGetSlaveName(c));
//如果是psync,判斷是否能進行增量同步
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return;
} else {
char *master_runid = c->argv[1]->ptr;
if (master_runid[0] != '?') server.stat_sync_partial_err++;
}
} else {
//不是psync驼壶,設(shè)置slave的客戶端flag
c->flags |= CLIENT_PRE_PSYNC;
}
//全量同步次數(shù)+1
server.stat_sync_full++;
//更新slave的client的狀態(tài):等待bgsave開始
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
...
c->flags |= CLIENT_SLAVE;
//添加到slaves鏈表中
listAddNodeTail(server.slaves,c);
//情況1:bgsave正在執(zhí)行氏豌,且是落磁盤的
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
//循環(huán)slaves鏈表
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
//如果存在狀態(tài)是等待bgsave完成的slave,那么可以復(fù)用這次bgsave產(chǎn)生的rdb文件
//增量數(shù)據(jù)通過拷貝該client的out put buf實現(xiàn)
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
//如果存在
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
copyClientOutputBuffer(c,slave);
//函數(shù)內(nèi)會修改客戶端狀態(tài)到:SLAVE_STATE_WAIT_BGSAVE_END
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
} else {
...
}
//情況2:正在執(zhí)行bgsave热凹,但是文件是直接輸出到某個socket泵喘,需要等待下一輪bgsave
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
...
//情況3:沒有正在執(zhí)行的bgsave
} else {
...
if (server.aof_child_pid == -1) {
//開始執(zhí)行bgsave,修改狀態(tài)為:SLAVE_STATE_WAIT_BGSAVE_END
startBgsaveForReplication(c->slave_capa);
} else {
...
}
...
}
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
//創(chuàng)建backlog般妙,用于增量同步
createReplicationBacklog();
return;
}
當(dāng)rdb文件生成完畢時纪铺,會調(diào)用updateSlavesWaitingBgsave
函數(shù),函數(shù)中會遍歷server.slaves鏈表碟渺,找出狀態(tài)是SLAVE_STATE_WAIT_BGSAVE_END
的客戶端鲜锚,更改狀態(tài)為SLAVE_STATE_SEND_BULK
,并注冊寫事件處理器sendBulkToSlave
苫拍,向salve發(fā)送rdb文件芜繁。
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
...
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//判斷狀態(tài)
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
...
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
struct redis_stat buf;
//打開rdb文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
...
continue;
}
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
//注冊寫事件處理器,發(fā)送rdb文件
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
...
}
在sendBulkToSlave
函數(shù)中绒极,當(dāng)rdb文件發(fā)送完成浆洗,調(diào)用putSlaveOnline
函數(shù)更新client狀態(tài)為SLAVE_STATE_ONLINE
,刪除sendBulkToSlave
處理器,安裝新的sendReplyToClient
處理器用于發(fā)送緩沖中的增量數(shù)據(jù)集峦。
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
...
//文件發(fā)送完成
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
//刪除事件處理器sendBulkToSlave
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
//更新狀態(tài),安裝新寫事件處理器sendReplyToClient
putSlaveOnline(slave);
}
}
在bgsave執(zhí)行和傳輸rdb文件期間抠刺,master還是會繼續(xù)處理寫入請求塔淤,在server.c中會調(diào)用replicationFeedSlaves
函數(shù)向salve的緩沖中寫入增量數(shù)據(jù):
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
...
//遍歷所有的slave
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
//判斷狀態(tài),因為rdb文件需要跟增量數(shù)據(jù)配對數(shù)據(jù)才正確速妖,所以SLAVE_STATE_WAIT_BGSAVE_START狀態(tài)的客戶端不寫入
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
//寫入緩沖
addReplyMultiBulkLen(slave,argc);
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
主從同步的配置
Redis提供了兩項配置高蜂,通過修改配置可以在可用性和一致性之間做調(diào)節(jié)。
# min-slaves-to-write 3 最少有幾個slave處于online狀態(tài)
# min-slaves-max-lag 10 主從之間的延遲需要小于多少(seconds)
如果配置了這兩個選項罕容,在refreshGoodSlavesCount
函數(shù)中會統(tǒng)計good的slave數(shù)量:
void refreshGoodSlavesCount(void) {
listIter li;
listNode *ln;
int good = 0;
//如果沒有配置备恤,直接返回
if (!server.repl_min_slaves_to_write ||
!server.repl_min_slaves_max_lag) return;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
time_t lag = server.unixtime - slave->repl_ack_time;
if (slave->replstate == SLAVE_STATE_ONLINE &&
lag <= server.repl_min_slaves_max_lag) good++;
}
//統(tǒng)計結(jié)果
server.repl_good_slaves_count = good;
}
在master執(zhí)行命令時稿饰,會判斷server.repl_good_slaves_count
值,如果小于配置會停止寫命令執(zhí)行:
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
}
調(diào)高配置露泊,提高數(shù)據(jù)一致性喉镰,降低可用性。相反提高可用性惭笑,降低數(shù)據(jù)一致性侣姆。