系列
redis數(shù)據(jù)淘汰原理
redis過期數(shù)據(jù)刪除策略
redis server事件模型
redis cluster mget 引發(fā)的討論
redis 3.x windows 集群搭建
redis 命令執(zhí)行過程
redis string底層數(shù)據(jù)結(jié)構(gòu)
redis list底層數(shù)據(jù)結(jié)構(gòu)
redis hash底層數(shù)據(jù)結(jié)構(gòu)
redis set底層數(shù)據(jù)結(jié)構(gòu)
redis zset底層數(shù)據(jù)結(jié)構(gòu)
redis 客戶端管理
redis 主從同步-slave端
redis 主從同步-master端
redis 主從超時(shí)檢測(cè)
redis aof持久化
redis rdb持久化
redis 數(shù)據(jù)恢復(fù)過程
redis TTL實(shí)現(xiàn)原理
redis cluster集群建立
redis cluster集群選主
redis 選主過程分析
?當(dāng)slave發(fā)現(xiàn)自己的master變?yōu)镕AIL狀態(tài)時(shí)蛛碌,便嘗試進(jìn)行Failover,以期成為新的master宦焦。由于掛掉的master可能會(huì)有多個(gè)slave。Failover的過程需要經(jīng)過類Raft協(xié)議的過程在整個(gè)集群內(nèi)達(dá)到一致, 其過程如下:
- slave發(fā)現(xiàn)自己的master變?yōu)镕AIL
- 將自己記錄的集群currentEpoch加1,并廣播Failover Request信息
- 其他節(jié)點(diǎn)收到該信息脓斩,只有master響應(yīng),判斷請(qǐng)求者的合法性畴栖,并發(fā)送FAILOVER_AUTH_ACK随静,對(duì)每一個(gè)epoch只發(fā)送一次ack
- 嘗試failover的slave收集FAILOVER_AUTH_ACK,超過半數(shù)后變成新Master,廣播Pong通知其他集群節(jié)點(diǎn)
redis 選主代碼分析
?在作為slave角色節(jié)點(diǎn)會(huì)定期發(fā)送ping命令來檢測(cè)master的存活性燎猛,如果檢測(cè)到master未響應(yīng)恋捆,那么就將master節(jié)點(diǎn)標(biāo)記為疑似下線。
?clusterHandleSlaveFailover執(zhí)行重新選主的核心邏輯重绷。
void clusterCron(void) {
delay = now - node->ping_sent;
// 等待 PONG 回復(fù)的時(shí)長超過了限制值沸停,將目標(biāo)節(jié)點(diǎn)標(biāo)記為 PFAIL (疑似下線)
if (delay > server.cluster_node_timeout) {
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",node->name);
// 打開疑似下線標(biāo)記
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
clusterHandleSlaveFailover();
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}
}
?clusterHandleSlaveFailover內(nèi)部通過clusterRequestFailoverAuth方法向集群當(dāng)中的所有節(jié)點(diǎn)發(fā)送CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST報(bào)文,通知大家slave準(zhǔn)備執(zhí)行failover昭卓。
?當(dāng)節(jié)點(diǎn)收到超過n/2+1個(gè)master的response后即升級(jí)為主星立。
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
int j;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout*2;
// #define nodeFailed(n) ((n)->flags & REDIS_NODE_FAIL)
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
myself->slaveof->numslots == 0) return;
// 將 data_age 設(shè)置為從節(jié)點(diǎn)與主節(jié)點(diǎn)的斷開秒數(shù)
if (server.repl_state == REDIS_REPL_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
// node timeout 的時(shí)間不計(jì)入斷線時(shí)間之內(nèi)
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
// 檢查這個(gè)從節(jié)點(diǎn)的數(shù)據(jù)是否較新:
// 目前的檢測(cè)辦法是斷線時(shí)間不能超過 node timeout 的十倍
if (data_age >
((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT))
{
if (!manual_failover) return;
}
if (auth_age > auth_retry_time) {
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
}
redisLog(REDIS_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
if (server.cluster->failover_auth_sent == 0 &&
server.cluster->mf_end == 0)
{
int newrank = clusterGetSlaveRank();
if (newrank > server.cluster->failover_auth_rank) {
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
redisLog(REDIS_WARNING,
"Slave rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
// 如果執(zhí)行故障轉(zhuǎn)移的時(shí)間未到,先返回
if (mstime() < server.cluster->failover_auth_time) return;
// 如果距離應(yīng)該執(zhí)行故障轉(zhuǎn)移的時(shí)間已經(jīng)過了很久
// 那么不應(yīng)該再執(zhí)行故障轉(zhuǎn)移了(因?yàn)榭赡芤呀?jīng)沒有需要了)
// 直接返回
if (auth_age > auth_timeout) return;
// 向其他節(jié)點(diǎn)發(fā)送故障轉(zhuǎn)移請(qǐng)求
if (server.cluster->failover_auth_sent == 0) {
// 增加配置紀(jì)元
server.cluster->currentEpoch++;
// 記錄發(fā)起故障轉(zhuǎn)移的配置紀(jì)元
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
// 向其他所有節(jié)點(diǎn)發(fā)送信息葬凳,看它們是否支持由本節(jié)點(diǎn)來對(duì)下線主節(jié)點(diǎn)進(jìn)行故障轉(zhuǎn)移
clusterRequestFailoverAuth();
// 打開標(biāo)識(shí)绰垂,表示已發(fā)送信息
server.cluster->failover_auth_sent = 1;
// TODO:
// 在進(jìn)入下個(gè)事件循環(huán)之前,執(zhí)行:
// 1)保存配置文件
// 2)更新節(jié)點(diǎn)狀態(tài)
// 3)同步配置
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
// 如果當(dāng)前節(jié)點(diǎn)獲得了足夠多的投票火焰,那么對(duì)下線主節(jié)點(diǎn)進(jìn)行故障轉(zhuǎn)移
if (server.cluster->failover_auth_count >= needed_quorum) {
// 舊主節(jié)點(diǎn)
clusterNode *oldmaster = myself->slaveof;
redisLog(REDIS_WARNING,
"Failover election won: I'm the new master.");
/*
* 將當(dāng)前節(jié)點(diǎn)的身份由從節(jié)點(diǎn)改為主節(jié)點(diǎn)
*/
clusterSetNodeAsMaster(myself);
// 讓從節(jié)點(diǎn)取消復(fù)制劲装,成為新的主節(jié)點(diǎn)
replicationUnsetMaster();
// 接收所有主節(jié)點(diǎn)負(fù)責(zé)處理的槽
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) {
// 將槽設(shè)置為未分配的
clusterDelSlot(j);
// 將槽的負(fù)責(zé)人設(shè)置為當(dāng)前節(jié)點(diǎn)
clusterAddSlot(myself,j);
}
}
// 更新集群配置紀(jì)元
myself->configEpoch = server.cluster->failover_auth_epoch;
// 更新節(jié)點(diǎn)狀態(tài)
clusterUpdateState();
// 并保存配置文件
clusterSaveConfigOrDie(1);
// 向所有節(jié)點(diǎn)發(fā)送 PONG 信息
// 讓它們可以知道當(dāng)前節(jié)點(diǎn)已經(jīng)升級(jí)為主節(jié)點(diǎn)了
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
// 如果有手動(dòng)故障轉(zhuǎn)移正在執(zhí)行,那么清理和它有關(guān)的狀態(tài)
resetManualFailover();
}
}
/*
* 向其他所有節(jié)點(diǎn)發(fā)送 FAILOVE_AUTH_REQUEST 信息昌简,
* 看它們是否同意由這個(gè)從節(jié)點(diǎn)來對(duì)下線的主節(jié)點(diǎn)進(jìn)行故障轉(zhuǎn)移占业。
*
* 信息會(huì)被發(fā)送給所有節(jié)點(diǎn),包括主節(jié)點(diǎn)和從節(jié)點(diǎn)纯赎,但只有主節(jié)點(diǎn)會(huì)回復(fù)這條信息谦疾。
*/
void clusterRequestFailoverAuth(void) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
// 設(shè)置信息頭(包含當(dāng)前節(jié)點(diǎn)的信息)
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
// 發(fā)送信息
clusterBroadcastMessage(buf,totlen);
}
?在redis主從選舉過程中報(bào)文相關(guān)的解析邏輯,clusterProcessPacket內(nèi)部主要處理CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST和CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK報(bào)文犬金。
- request報(bào)文的處理邏輯:如果master就發(fā)回ack響應(yīng)
- ack報(bào)文的處理邏輯:增加支持投票數(shù)failover_auth_count++
int clusterProcessPacket(clusterLink *link) {
// 這是一條請(qǐng)求獲得故障遷移授權(quán)的消息: sender 請(qǐng)求當(dāng)前節(jié)點(diǎn)為它進(jìn)行故障轉(zhuǎn)移投票
else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1;
// 如果條件允許的話念恍,向 sender 投票,支持它進(jìn)行故障轉(zhuǎn)移
clusterSendFailoverAuthIfNeeded(sender,hdr);
// 這是一條故障遷移投票信息: sender 支持當(dāng)前節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移操作
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1;
// 只有正在處理至少一個(gè)槽的主節(jié)點(diǎn)的投票會(huì)被視為是有效投票
// 只有符合以下條件晚顷, sender 的投票才算有效:
// 1) sender 是主節(jié)點(diǎn)
// 2) sender 正在處理至少一個(gè)槽
// 3) sender 的配置紀(jì)元大于等于當(dāng)前節(jié)點(diǎn)的配置紀(jì)元
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
// 增加支持票數(shù)
server.cluster->failover_auth_count++;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
if (!sender || sender->slaveof != myself) return 1;
resetManualFailover();
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
sender->name);
}
return 1;
}
// 在條件滿足的情況下峰伙,為請(qǐng)求進(jìn)行故障轉(zhuǎn)移的節(jié)點(diǎn) node 進(jìn)行投票,支持它進(jìn)行故障轉(zhuǎn)移
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
// 請(qǐng)求節(jié)點(diǎn)的主節(jié)點(diǎn)
clusterNode *master = node->slaveof;
// 請(qǐng)求節(jié)點(diǎn)的當(dāng)前配置紀(jì)元
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
// 請(qǐng)求節(jié)點(diǎn)想要獲得投票的紀(jì)元
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
// 請(qǐng)求節(jié)點(diǎn)的槽布局
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;
// 如果節(jié)點(diǎn)為從節(jié)點(diǎn)该默,或者是一個(gè)沒有處理任何槽的主節(jié)點(diǎn)瞳氓,
// 那么它沒有投票權(quán)
if (nodeIsSlave(myself) || myself->numslots == 0) return;
// 請(qǐng)求的配置紀(jì)元必須大于等于當(dāng)前節(jié)點(diǎn)的配置紀(jì)元
if (requestCurrentEpoch < server.cluster->currentEpoch) return;
// 已經(jīng)投過票了
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) return;
if (nodeIsMaster(node) || master == NULL ||
(!nodeFailed(master) && !force_ack)) return;
// 如果之前一段時(shí)間已經(jīng)對(duì)請(qǐng)求節(jié)點(diǎn)進(jìn)行過投票,那么不進(jìn)行投票
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
return;
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
// 跳過未指派節(jié)點(diǎn)
if (bitmapTestBit(claimed_slots, j) == 0) continue;
// 查找是否有某個(gè)槽的配置紀(jì)元大于節(jié)點(diǎn)請(qǐng)求的紀(jì)元
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
// 如果有的話栓袖,說明節(jié)點(diǎn)請(qǐng)求的紀(jì)元已經(jīng)過期匣摘,沒有必要進(jìn)行投票
return;
}
/* We can vote for this slave. */
// 為節(jié)點(diǎn)投票
clusterSendFailoverAuth(node);
// 更新時(shí)間值
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
}
參考文章
redis cluster集群的源碼分析(1)
Redis Cluster 實(shí)現(xiàn)細(xì)節(jié)