哨兵機(jī)制
基本原理
命令發(fā)送
- sentinel每10s每個(gè)Sentinel向master\slaves發(fā)送INFO命令
- 發(fā)現(xiàn)salve節(jié)點(diǎn)
- 確認(rèn)主從關(guān)系
- sentinel每2s每個(gè)Sentinel向master\slaves\sentinels用PUBLISH發(fā)布信息
- 發(fā)布對(duì)節(jié)點(diǎn)的“看法”和自身信息
- 發(fā)現(xiàn)其他sentinel節(jié)點(diǎn)
- sentinel每1s每個(gè)sentinel向master\slaves\sentinels發(fā)送PING命令
- 心跳檢測(cè)拴孤,下線判定的依據(jù)
- 客戶端將sentinel作為redis發(fā)現(xiàn)的配置中心荆虱,也通過(guò)sentinel的頻道作為配置變更的提醒機(jī)制
- SENTINEL GET-MASTER-ADDR-BY-NAME獲取主節(jié)點(diǎn)信息
- SENTINEL SLAVES獲取從節(jié)點(diǎn)信息
- 通過(guò)switch-master頻道獲取主備切換的信息
- 通過(guò)convert-to-slave, +sdown, +slave, +slave-reconf-done等頻道獲取slave節(jié)點(diǎn)配置變更信息
命令接收
- 收到master\slaves的INFO回復(fù)
- 收到master\slaves\sentinels對(duì)PING命令的回復(fù)
- 收到訂閱頻道m(xù)aster/slaves的sentinel:hello的訂閱信息预厌,和其它sentinels用PUBLISH發(fā)來(lái)的信息
流程圖
源碼分析
相關(guān)常量和結(jié)構(gòu)體
/* 正在監(jiān)控的RedisInstance對(duì)象 */
#define SRI_MASTER (1<<0) /* master */
#define SRI_SLAVE (1<<1) /* slave */
#define SRI_SENTINEL (1<<2) /* sentinel */
#define SRI_S_DOWN (1<<3) /* 主觀下線 */
#define SRI_O_DOWN (1<<4) /* 客觀下線 */
#define SRI_MASTER_DOWN (1<<5) /* 當(dāng)前sentinel認(rèn)為master下線 */
#define SRI_FAILOVER_IN_PROGRESS (1<<6) /* 對(duì)于這個(gè)master驮履,正在進(jìn)行故障轉(zhuǎn)移 */
#define SRI_PROMOTED (1<<7) /* 這個(gè)slave已經(jīng)被選為待晉升 */
#define SRI_RECONF_SENT (1<<8) /* SLAVEOF <newmaster>已經(jīng)發(fā)送. */
#define SRI_RECONF_INPROG (1<<9) /* slave正在同步master */
#define SRI_RECONF_DONE (1<<10) /* slave已經(jīng)完成到master的同步 */
#define SRI_FORCE_FAILOVER (1<<11) /* Force failover with master up. */
#define SRI_SCRIPT_KILL_SENT (1<<12) /* SCRIPT KILL already sent on -BUSY */
/* Note: times are in milliseconds. */
#define SENTINEL_INFO_PERIOD 10000
#define SENTINEL_PING_PERIOD 1000
#define SENTINEL_ASK_PERIOD 1000
#define SENTINEL_PUBLISH_PERIOD 2000
#define SENTINEL_DEFAULT_DOWN_AFTER 30000
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
#define SENTINEL_TILT_TRIGGER 2000
#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
#define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
#define SENTINEL_MAX_PENDING_COMMANDS 100
#define SENTINEL_ELECTION_TIMEOUT 10000
#define SENTINEL_MAX_DESYNC 1000
#define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1
/* Failover machine different states. */
#define SENTINEL_FAILOVER_STATE_NONE 0 /* 沒(méi)有故障轉(zhuǎn)移在進(jìn)行中 */
#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* 等待開始一個(gè)故障轉(zhuǎn)移 Wait for failover_start_time*/
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* 正選擇一個(gè)slave晉升 */
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* 發(fā)送slaveof no one給slave */
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* 等待所選擇的slave晉升完成 */
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* 正在重新配置其它salves:SLAVEOF newmaster */
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* 其它slave重新配置完成惕耕,監(jiān)視晉升的slave . */
#define SENTINEL_MASTER_LINK_STATUS_UP 0
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
/* 一條到sentinelRedisInstance的連接簿盅。我們可能會(huì)有一個(gè)sentinel集合監(jiān)控很多master站蝠。
* 比如5個(gè)sentinel監(jiān)控100個(gè)master诅病,那么對(duì)于100個(gè)master如果我們都各創(chuàng)建5個(gè)到sentinel的instanceLink哪亿,
* 我們將會(huì)創(chuàng)建500個(gè)instanceLink,但實(shí)際上我們只需要?jiǎng)?chuàng)建5個(gè)到sentinel的instanceLink共享就行了贤笆,
* refcount被用來(lái)實(shí)現(xiàn)共享蝇棉,這樣不僅用5個(gè)instanceLink代替了500個(gè)instanceLink,還用5個(gè)PING代替了500個(gè)PING苏潜。
*
* instanceLink的共享僅用于sentinels银萍,master和slave的refcount總是1。 */
typedef struct instanceLink {
int refcount; /* 這個(gè)連接被多少sentinelRedisInstance共用 */
int disconnected; /* 如果cc或pc連接需要重連 */
int pending_commands; /* 這條連接上正在等待reply的數(shù)量 */
redisAsyncContext *cc; /* 命令連接的Hiredis上下文 */
redisAsyncContext *pc; /* 命令連接的Hiredis上下文 */
mstime_t cc_conn_time; /* cc連接的時(shí)間 */
mstime_t pc_conn_time; /* pc連接的時(shí)間 */
mstime_t pc_last_activity; /* 我們最后收到消息的時(shí)間 */
mstime_t last_avail_time; /* 最后一次這個(gè)實(shí)例收到一個(gè)合法的PING回復(fù)的時(shí)間 */
/* 正在等待回復(fù)的最后一次PING的時(shí)間恤左。
* 當(dāng)收到回復(fù)時(shí)時(shí)設(shè)置為0
* 當(dāng)值為0并發(fā)送一個(gè)新的PING時(shí)設(shè)置為當(dāng)前時(shí)間贴唇, */
mstime_t act_ping_time;
/* 最后一次發(fā)送ping的時(shí)間,僅用于防止在失敗時(shí)發(fā)送過(guò)多的PING飞袋〈疗空閑時(shí)間使用act_ping_time計(jì)算。 */
mstime_t last_ping_time;
/* 收到上一次回復(fù)的時(shí)間巧鸭,無(wú)論收到的回復(fù)是什么瓶您。用來(lái)檢查連接是否空閑,從而必須重連。 */
mstime_t last_pong_time;
mstime_t last_reconn_time; /* 當(dāng)連接段開始呀袱,最后一次企圖重連的時(shí)間 */
} instanceLink;
typedef struct sentinelRedisInstance {
int flags; /* SRI_...標(biāo)志 */
char *name; /* 從這個(gè)sentinel視角看到的master名 */
char *runid; /* 這個(gè)實(shí)例的runid或者這個(gè)sentinel的uniqueID */
uint64_t config_epoch; /* 配置紀(jì)元 */
sentinelAddr *addr; /* Master host. */
instanceLink *link; /* 到這個(gè)實(shí)例的連接贸毕,對(duì)于sentinels可能是共享的 */
mstime_t last_pub_time; /* 最后一次我們我們發(fā)送hello的時(shí)間 */
mstime_t last_hello_time; /* 僅Sentinel使用,最后一次我們收到hello消息的時(shí)間 */
mstime_t last_master_down_reply_time; /* 最后一次收到SENTINEL is-master-down的回復(fù)時(shí)間 */
mstime_t s_down_since_time; /* Subjectively down since time. */
mstime_t o_down_since_time; /* Objectively down since time. */
mstime_t down_after_period; /* 如果經(jīng)歷了 Consider it down after that period. */
mstime_t info_refresh; /* 我們最后一次收到INFO回復(fù)的時(shí)間點(diǎn) */
dict *renamed_commands; /* 重命名的命令 */
/* 角色和我們第一次觀察到變更為該角色的時(shí)間夜赵。
* 在延遲替換時(shí)這是很有用的明棍。我們需要等待一段時(shí)間來(lái)給新的leader報(bào)告新的配置。 */
int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time; /* 最后一次slave的master地址改變的時(shí)間點(diǎn) */
/* master相關(guān) */
dict *sentinels; /* 監(jiān)視相同的master的其他sentinels */
dict *slaves; /* 這個(gè)master的slaves */
unsigned int quorum;/* 故障轉(zhuǎn)移時(shí)需要統(tǒng)一的sentinels數(shù)量 */
int parallel_syncs; /* 同時(shí)有多少slaves進(jìn)行重配置寇僧。 */
char *auth_pass; /* 驗(yàn)證master和slaves時(shí)需要提供的密碼 */
/* slave相關(guān) */
mstime_t master_link_down_time; /* slave的復(fù)制連接下線的時(shí)間 */
int slave_priority; /* 根據(jù)INFO輸出的slave的優(yōu)先級(jí) */
mstime_t slave_reconf_sent_time; /* 發(fā)送SLAVE OF <new>的時(shí)間 */
struct sentinelRedisInstance *master; /* 如果是slave摊腋,這個(gè)字段表示她的master */
char *slave_master_host; /* INFO報(bào)告的master host */
int slave_master_port; /* INFO報(bào)告的master port */
int slave_master_link_status; /* Master link status as reported by INFO */
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
char *leader; /* 如果是master,表示執(zhí)行故障轉(zhuǎn)移時(shí)的runid嘁傀,如果是sentinel兴蒸,表示這個(gè)sentinel投票的leader */
uint64_t leader_epoch; /* leader的配置紀(jì)元 */
uint64_t failover_epoch; /* 當(dāng)前執(zhí)行的故障轉(zhuǎn)移的配置紀(jì)元 */
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
mstime_t failover_state_change_time;
mstime_t failover_start_time; /* 最后一次故障轉(zhuǎn)移企圖開始的時(shí)間 */
mstime_t failover_timeout; /* 刷新故障轉(zhuǎn)移狀態(tài)的最大時(shí)間 */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
struct sentinelRedisInstance *promoted_slave; /* 晉升的slave實(shí)例 */
/* 用來(lái)提醒管理員和重新配置客戶端的腳本:如果為NULL則沒(méi)有腳本會(huì)執(zhí)行 */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
} sentinelRedisInstance;
/* Sentinel狀態(tài) */
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE + 1]; /* 這個(gè)Sentinel的ID */
uint64_t current_epoch; /* 當(dāng)前紀(jì)元 */
dict *masters; /* 所有master的字典。key是實(shí)例名细办,value是sentinelRedisInstance指針 */
int tilt; /* 我們是否處于TILT模式? */
int running_scripts; /* 正在執(zhí)行的腳本的數(shù)量 */
mstime_t tilt_start_time; /* TILT模式開始時(shí)間 */
mstime_t previous_time; /* 上次運(yùn)行定時(shí)器函數(shù)的時(shí)間 */
list *scripts_queue; /* 等待執(zhí)行的用戶腳本隊(duì)列 */
char *announce_ip; /* gossiped給其它sentinels的IP地址 */
int announce_port; /* gossiped給其它sentinels的IP端口 */
unsigned long simfailure_flags; /* 模擬故障轉(zhuǎn)移 */
int deny_scripts_reconfig; /* 是否運(yùn)行通過(guò)Allow SENTINEL SET ...在運(yùn)行時(shí)改變腳本地址 */
} sentinel;
初始化
int main(int argc, char **argv) {
...
/* 初始化端口號(hào)常量和數(shù)據(jù)結(jié)構(gòu) */
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
...
if (!server.sentinel_mode) {
...
} else {
sentinelIsRunning();
}
...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}
/* 這個(gè)函數(shù)會(huì)在server運(yùn)行在Sentinel模式下時(shí)調(diào)用橙凳,來(lái)啟動(dòng)、加載配置和為正常操作做準(zhǔn)備 */
void sentinelIsRunning(void) {
int j;
if (server.configfile == NULL) {
serverLog(LL_WARNING,
"Sentinel started without a config file. Exiting...");
exit(1);
} else if (access(server.configfile, W_OK) == -1) {
serverLog(LL_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile, strerror(errno));
exit(1);
}
/* 如果Sentinel在配置文件中還沒(méi)有一個(gè)ID笑撞,我們將會(huì)隨機(jī)生成一個(gè)并持久化到磁盤上痕惋。
* 從現(xiàn)在開始,即使重啟也會(huì)使用同一個(gè)ID */
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;
if (j == CONFIG_RUN_ID_SIZE) {
/* 選擇一個(gè)ID并持久化到配置文件中 */
getRandomHexChars(sentinel.myid, CONFIG_RUN_ID_SIZE);
sentinelFlushConfig();
}
/* Log its ID to make debugging of issues simpler. */
serverLog(LL_WARNING, "Sentinel ID is %s", sentinel.myid);
/* 在啟動(dòng)時(shí)娃殖,對(duì)于每個(gè)配置的master值戳,我們想要生成一個(gè)+monitor的事件 */
sentinelGenerateInitialMonitorEvents();
}
void sentinelTimer(void) {
sentinelCheckTiltCondition(); // 檢查TILT條件
sentinelHandleDictOfRedisInstances(sentinel.masters);
sentinelRunPendingScripts();
sentinelCollectTerminatedScripts();
sentinelKillTimedoutScripts();
/* 我們持續(xù)的修改Redis定時(shí)器中斷的頻率是為了防止各個(gè)sentinel的定時(shí)器同步,
* 從而降低同時(shí)發(fā)起領(lǐng)導(dǎo)選舉的可能性 */
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}
/* 這個(gè)函數(shù)檢查我們是否需要進(jìn)入TITL模式炉爆。
* 當(dāng)我們?cè)趦纱螘r(shí)間中斷調(diào)用中遇到以下情況時(shí)堕虹,如果調(diào)用時(shí)間差為負(fù)或者超過(guò)了2s,我們會(huì)進(jìn)入TITL模式芬首。
* 注意:如果我們認(rèn)為100ms左右是正常的赴捞,如果我們需要進(jìn)入TITL模式,說(shuō)明我們遇到了以下情況:
* 1) Sentinel進(jìn)程因?yàn)槟承┰蜃枞×擞羯裕赡苁牵贺?fù)載太高赦政,IO凍結(jié),信號(hào)停止等等耀怜。
* 2) 系統(tǒng)時(shí)鐘被修改了恢着。
* 在上面兩種情況下Sentinel會(huì)認(rèn)為出現(xiàn)了超時(shí)甚至故障,這是我們進(jìn)入TILT财破,并且在SENTINEL_TILT_PERIOD
* 時(shí)間內(nèi)我們都不執(zhí)行任何操作掰派。
* 在TILT期間,我們?nèi)匀皇占畔⒆罅。俏覀儾粓?zhí)行操作靡羡。 */
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time;
if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
sentinelEvent(LL_WARNING, "+tilt", NULL, "#tilt mode entered");
}
sentinel.previous_time = mstime();
}
/* 對(duì)所有監(jiān)控的master執(zhí)行調(diào)度操作系洛。 */
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
/* 對(duì)于每個(gè)master,有一些額外的事情要執(zhí)行 */
di = dictGetIterator(instances);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
// 如果是master略步,需要遞歸的處理slaves和sentinels
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
/* 對(duì)指定的Redis實(shí)例執(zhí)行調(diào)度操作 */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* 對(duì)每種實(shí)例都要執(zhí)行 */
sentinelReconnectInstance(ri); // 如果斷開描扯,重連
sentinelSendPeriodicCommands(ri); // 發(fā)送周期命令
/* ============== ACTING HALF ============= */
/* 如果我們?cè)赥ILT模式,則不會(huì)執(zhí)行故障轉(zhuǎn)移的相關(guān)操作 */
if (sentinel.tilt) {
if (mstime() - sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING, "-tilt", NULL, "#tilt mode exited");
}
/* 對(duì)master趟薄、slave或sentinel檢查是否客觀下線 */
sentinelCheckSubjectivelyDown(ri);
/* 如果是master或者slave */
if (ri->flags & (SRI_MASTER | SRI_SLAVE)) {
/* Nothing so far. */
}
/* 如果當(dāng)前實(shí)例是master */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_NO_FLAGS);
}
}
/* 如果ri的link是斷連狀態(tài)荆烈,創(chuàng)建一個(gè)異步的連接。
* 注意:命令連接和訂閱連接只要有一條斷開竟趾,link->disconnected都會(huì)變成true */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;
/* 命令連接 */
if (link->cc == NULL) {
// 發(fā)起一個(gè)異步連接
link->cc = redisAsyncConnectBind(ri->addr->ip, ri->addr->port, NET_FIRST_BIND_ADDR);
if (link->cc->err) {
sentinelEvent(LL_DEBUG, "-cmd-link-reconnection", ri, "%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link, link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
// 加入事件循環(huán),創(chuàng)建Connect回調(diào)宫峦,Disconnect回調(diào)岔帽,發(fā)送AUTH消息,設(shè)置Client名字导绷,發(fā)送PING消息
redisAeAttach(server.el, link->cc);
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri, link->cc);
sentinelSetClientName(ri, link->cc, "cmd");
/* 當(dāng)重連后我們盡快先發(fā)送一個(gè)PING命令 */
sentinelSendPing(ri);
}
}
/* 對(duì)于master和slave犀勒,發(fā)起訂閱連接 */
if ((ri->flags & (SRI_MASTER | SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip, ri->addr->port, NET_FIRST_BIND_ADDR);
if (link->pc->err) {
sentinelEvent(LL_DEBUG, "-pubsub-link-reconnection", ri, "%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link, link->pc);
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
// 將pc socket附加到ae事件循環(huán)框架上
redisAeAttach(server.el, link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri, link->pc);
sentinelSetClientName(ri, link->pc, "pubsub");
/* 訂閱__sentinel:hello頻道 */
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri, "SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* 如果我們訂閱失敗,我們就關(guān)閉連接重試 */
instanceLinkCloseConnection(link, link->pc);
return;
}
}
}
/* 清理disconnected狀態(tài) */
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
心跳檢測(cè)
通過(guò)sentinel:hello頻道發(fā)現(xiàn)其它sentinel妥曲。
/* 處理從master或者slave收到的__sentinel__:hello頻道中的hello消息贾费,或者從其它sentinel直接發(fā)來(lái)的信息。
* 如果消息中指定的master name未知檐盟,消息將被丟棄褂萧。 */
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */
int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;
if (numtokens == 8) {
/* 包含一個(gè)master的引用 */
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup; /* 未知的master,跳過(guò) */
/* 首先葵萎,檢查我們是否有相同ip:port和runid的sentinel的信息 */
port = atoi(token[1]);
master_port = atoi(token[6]);
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0], port, token[2]);
current_epoch = strtoull(token[3], NULL, 10);
master_config_epoch = strtoull(token[7], NULL, 10);
/* Sentinel處理情況分析:
* 1. 相同runid并且相同ip:port导犹,什么都不必做
* 2. 相同runid但是不同ip:port,說(shuō)明這個(gè)sentinel出現(xiàn)了地址切換羡忘,刪除谎痢,并重新添加
* 3. 不同runid但是相同ip:port,說(shuō)明這個(gè)ip:port所在的sentinel地址是非法的卷雕,我們需要標(biāo)示所有具有該runid的sentinel非法节猿,然后新增一個(gè)新的。
* 4. 不同runid并且不同ip:port漫雕,直接新增滨嘱。 */
if (!si) { // 如果沒(méi)發(fā)現(xiàn)相同ip和runid的sentinel,說(shuō)明這是一個(gè)新的sentinel
/* 如果沒(méi)有浸间,因?yàn)閟entinel的地址切換九孩,我們需要移除所有相同runid的sentinels,
* 我們將會(huì)在之后添加一個(gè)相同runid但是有新的地址的sentinel */
removed = removeMatchingSentinelFromMaster(master, token[2]);
if (removed) {
// 如果找到并刪除了相同runid但不同ip的sentinel发框,說(shuō)明是sentinel進(jìn)行了地址切換
sentinelEvent(LL_NOTICE, "+sentinel-address-switch", master,
"%@ ip %s port %d for %s", token[0], port, token[2]);
} else {
/* 如果找到了相同ip:port但不同runid的sentinel躺彬,說(shuō)明這個(gè)sentinel是非法的煤墙,
* 我們將把這個(gè)ip:port關(guān)聯(lián)的sentinel標(biāo)記為非法,我們將把port設(shè)置為0宪拥,來(lái)標(biāo)示地址非法仿野。
* 我們將會(huì)在之后收到帶有該runid實(shí)例的Hello消息時(shí)更新。 */
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0], port, NULL);
if (other) {
sentinelEvent(LL_NOTICE, "+sentinel-invalid-addr", other, "%@");
other->addr->port = 0; /* 這意味著:地址是非法的 */
sentinelUpdateSentinelAddressInAllMasters(other);
}
}
/* 增加一個(gè)新的sentinel她君,并增加到master的sentinels中 */
si = createSentinelRedisInstance(token[2], SRI_SENTINEL,
token[0], port, master->quorum, master);
if (si) {
if (!removed) sentinelEvent(LL_NOTICE, "+sentinel", si, "%@");
/* 剛創(chuàng)建完實(shí)例脚作,runid為空,我們需要立即填充它缔刹,否則以后沒(méi)機(jī)會(huì)了 */
si->runid = sdsnew(token[2]);
sentinelTryConnectionSharing(si);
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
sentinelFlushConfig();
}
}
...
/* 更新sentinel的狀態(tài) */
if (si) si->last_hello_time = mstime();
}
cleanup:
sdsfreesplitres(token, numtokens);
}
通過(guò)master的INFO響應(yīng)發(fā)現(xiàn)從節(jié)點(diǎn)
/* 處理從master接收到的INFO命令的回復(fù) */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sds *lines;
int numlines, j;
int role = 0;
/* 緩存INFO的回復(fù)信息 */
sdsfree(ri->info);
ri->info = sdsnew(info);
/* 如果在INFO輸出中沒(méi)有下面的域球涛,那么需要被設(shè)定為給定的值 */
ri->master_link_down_time = 0;
/* 按行解析 */
lines = sdssplitlen(info, strlen(info), "\r\n", 2, &numlines);
for (j = 0; j < numlines; j++) {
sentinelRedisInstance *slave;
sds l = lines[j];
/* 查找runid:run_id:<40 hex chars>*/
if (sdslen(l) >= 47 && !memcmp(l, "run_id:", 7)) {
if (ri->runid == NULL) { // 如果為空,則填充
ri->runid = sdsnewlen(l + 7, 40);
} else { // 如果不為空校镐,說(shuō)明該實(shí)例進(jìn)行過(guò)重啟
if (strncmp(ri->runid, l + 7, 40) != 0) {
sentinelEvent(LL_NOTICE, "+reboot", ri, "%@");
sdsfree(ri->runid);
ri->runid = sdsnewlen(l + 7, 40);
}
}
}
/* old versions: slave0:<ip>,<port>,<state>
* new versions: slave0:ip=127.0.0.1,port=9999,... */
/* 如果我們是從master收到的slave信息亿扁,則新增slave */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l, "slave", 5) && isdigit(l[5])) {
char *ip, *port, *end;
if (strstr(l, "ip=") == NULL) {
/* Old format. */
ip = strchr(l, ':');
if (!ip) continue;
ip++; /* Now ip points to start of ip address. */
port = strchr(ip, ',');
if (!port) continue;
*port = '\0'; /* nul term for easy access. */
port++; /* Now port points to start of port number. */
end = strchr(port, ',');
if (!end) continue;
*end = '\0'; /* nul term for easy access. */
} else {
/* New format. */
ip = strstr(l, "ip=");
if (!ip) continue;
ip += 3; /* Now ip points to start of ip address. */
port = strstr(l, "port=");
if (!port) continue;
port += 5; /* Now port points to start of port number. */
/* Nul term both fields for easy access. */
end = strchr(ip, ',');
if (end) *end = '\0';
end = strchr(port, ',');
if (end) *end = '\0';
}
/* 如果我們沒(méi)有記錄這個(gè)slave,則新增 */
if (sentinelRedisInstanceLookupSlave(ri, ip, atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL, SRI_SLAVE, ip,
atoi(port), ri->quorum, ri)) != NULL) {
sentinelEvent(LL_NOTICE, "+slave", slave, "%@");
sentinelFlushConfig();
}
}
}
/* master_link_down_since_seconds:<seconds> */
/* 更新主從斷開時(shí)間 */
if (sdslen(l) >= 32 &&
!memcmp(l, "master_link_down_since_seconds", 30)) {
ri->master_link_down_time = strtoll(l + 31, NULL, 10) * 1000;
}
/* role:<role> */
if (!memcmp(l, "role:master", 11)) role = SRI_MASTER;
else if (!memcmp(l, "role:slave", 10)) role = SRI_SLAVE;
// 如果當(dāng)前角色是SLAVE鸟廓,則更新slave相關(guān)的信息
if (role == SRI_SLAVE) {
/* master_host:<host> */
if (sdslen(l) >= 12 && !memcmp(l, "master_host:", 12)) {
if (ri->slave_master_host == NULL ||
strcasecmp(l + 12, ri->slave_master_host)) {
sdsfree(ri->slave_master_host);
ri->slave_master_host = sdsnew(l + 12);
ri->slave_conf_change_time = mstime();
}
}
/* master_port:<port> */
if (sdslen(l) >= 12 && !memcmp(l, "master_port:", 12)) {
int slave_master_port = atoi(l + 12);
if (ri->slave_master_port != slave_master_port) {
ri->slave_master_port = slave_master_port;
ri->slave_conf_change_time = mstime();
}
}
/* master_link_status:<status> */
if (sdslen(l) >= 19 && !memcmp(l, "master_link_status:", 19)) {
ri->slave_master_link_status =
(strcasecmp(l + 19, "up") == 0) ?
SENTINEL_MASTER_LINK_STATUS_UP :
SENTINEL_MASTER_LINK_STATUS_DOWN;
}
/* slave_priority:<priority> */
if (sdslen(l) >= 15 && !memcmp(l, "slave_priority:", 15))
ri->slave_priority = atoi(l + 15);
/* slave_repl_offset:<offset> */
if (sdslen(l) >= 18 && !memcmp(l, "slave_repl_offset:", 18))
ri->slave_repl_offset = strtoull(l + 18, NULL, 10);
}
}
ri->info_refresh = mstime();
sdsfreesplitres(lines, numlines);
...
}
故障發(fā)現(xiàn)
心跳探測(cè)
/* 發(fā)送周期性的PING从祝、INFO和PUBLISH命令到指定的Redis實(shí)例 */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
/* 如果當(dāng)前沒(méi)有成功連接,直接返回 */
if (ri->link->disconnected) return;
/* 像INFO引谜、PING牍陌、PUBLISH這樣的命令并不重要,如果這個(gè)連接上阻塞的命令過(guò)多员咽,我們直接返回毒涧。
* 當(dāng)網(wǎng)絡(luò)環(huán)境不好時(shí),我們不希望發(fā)送為此消耗過(guò)多的內(nèi)存贝室。
* 注意我們有保護(hù)措施链嘀,如果這條連接檢測(cè)到超時(shí),連接會(huì)斷開并重連档玻。 */
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount)
return;
/* 當(dāng)該實(shí)例是一個(gè)處于客觀下線或者故障轉(zhuǎn)移中的master的slave時(shí)怀泊,INFO的發(fā)送周期從10s一次改為1s一次。
* 這樣我們可以更快的捕捉到slave向master的晉升误趴。
* 類似的霹琼,如果這個(gè)slave和master斷連了,我們也會(huì)更頻繁的監(jiān)視INFO的輸出凉当,來(lái)更快的捕捉到恢復(fù)枣申。*/
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN | SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0))) {
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
/* ping的周期為min(down_after_period,SENTINEL_PING_PERIOD) */
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
/* 對(duì)master和slaves發(fā)送INFO命令 */
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period)) {
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri, "INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}
/* 向所有實(shí)例發(fā)送PING命令 */
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period / 2) {
sentinelSendPing(ri);
}
/* 每2s向所有實(shí)例發(fā)布PUBLISH hello消息:
* 其中對(duì)于master和slaves通過(guò)頻道發(fā)送,對(duì)sentinel通過(guò)PUBLISH發(fā)送 */
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}
主觀下線
/* 從我們的視角看這個(gè)節(jié)點(diǎn)是否是主觀下線的 */
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;
if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;
/* 檢查是否我們需要重連鏈接
* 1) 如果命令連接已建立超過(guò)15s且發(fā)送了PING看杭,但是下線周期已超過(guò)一半忠藤,還沒(méi)有收到回復(fù),就重連楼雹。
* 2) 如果訂閱連接已建立超過(guò)15s且發(fā)送了PING模孩,但是已經(jīng)過(guò)3個(gè)訂閱周期=6s都沒(méi)有收到回復(fù)尖阔,就重連。*/
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 && /* 有一個(gè)PING正在等待響應(yīng) */
/* 這個(gè)阻塞的PING延遲了榨咐,我們甚至都沒(méi)有收到錯(cuò)誤信息介却,可能redis在執(zhí)行一個(gè)很長(zhǎng)的阻塞命令 */
(mstime() - ri->link->act_ping_time) > (ri->down_after_period / 2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period / 2)) {
instanceLinkCloseConnection(ri->link, ri->link->cc);
}
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD * 3)) {
instanceLinkCloseConnection(ri->link, ri->link->pc);
}
/* 如果這個(gè)實(shí)例滿足以下兩個(gè)條件時(shí),我們認(rèn)為它主觀下線了:
* 1) 在down_after_period時(shí)間內(nèi)块茁,沒(méi)有收到PING的回復(fù)或者沒(méi)有重連上齿坷。
* 2) 我們認(rèn)為這是一個(gè)master,但是他說(shuō)自己是一個(gè)slave数焊,并且已經(jīng)報(bào)告了很久了永淌。*/
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period + SENTINEL_INFO_PERIOD * 2))) {
/* 客觀下線 */
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING, "+sdown", ri, "%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
/* 客觀上線 */
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING, "-sdown", ri, "%@");
ri->flags &= ~(SRI_S_DOWN | SRI_SCRIPT_KILL_SENT);
}
}
}
客觀下線
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->sentinels);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
char port[32];
int retval;
/* 如果從其他sentinel得到state的時(shí)間過(guò)長(zhǎng),我們認(rèn)為失效了佩耳,就清理掉 */
if (elapsed > SENTINEL_ASK_PERIOD * 5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}
/* 僅當(dāng)滿足以下情況時(shí)遂蛀,我們才發(fā)送詢問(wèn)消息:
* 1) 當(dāng)前master處于主觀下線。
* 2) 和sentinel是連接狀態(tài)蚕愤。
* 3) 在1s內(nèi)我們沒(méi)有接受過(guò)sentinel is-master-down-by-addr回復(fù)信息。*/
if ((master->flags & SRI_S_DOWN) == 0) continue;
if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;
/* 詢問(wèn)其他sentinel */
ll2string(port, sizeof(port), master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri, "SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}
/* 根據(jù)配置的quorum饺蚊,該master是否處于客觀下線狀態(tài)萍诱。
* 注意:客觀下線是個(gè)法定人數(shù),它僅僅意味著在給定的時(shí)間內(nèi)有足夠多的sentinels到這個(gè)實(shí)例是不可達(dá)的污呼。
* 然而這個(gè)消息可能會(huì)延遲裕坊,所有它不是一個(gè)強(qiáng)保證,不能保證:
* N個(gè)實(shí)例在同一時(shí)刻都認(rèn)為某個(gè)實(shí)例處于主觀下線狀態(tài)燕酷。*/
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;
if (master->flags & SRI_S_DOWN) {
quorum = 1; /* 當(dāng)前sentinel認(rèn)為已經(jīng)下線 */
/* 統(tǒng)計(jì)其他節(jié)點(diǎn)是否認(rèn)為已經(jīng)下線 */
di = dictGetIterator(master->sentinels);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);
// 如果超過(guò)了預(yù)設(shè)的法定人數(shù)籍凝,則認(rèn)為客觀下線了
if (quorum >= master->quorum) odown = 1;
}
/* 根據(jù)odown設(shè)置master狀態(tài) */
if (odown) {
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING, "+odown", master, "%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING, "-odown", master, "%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
/* 這個(gè)實(shí)例用來(lái)檢查是否可以開始故障轉(zhuǎn)移,需要滿足以下條件:
* 1) master必須處于客觀下線條件苗缩。
* 2) 沒(méi)有故障轉(zhuǎn)移正在進(jìn)行中饵蒂。
* 3) 故障轉(zhuǎn)移冷卻中:在之前的failover_timeout*2的時(shí)間內(nèi)有一個(gè)故障轉(zhuǎn)移開始的企圖。
* 我們還不知道我們是否能夠贏得選舉酱讶,所以有可能我們可是一個(gè)故障轉(zhuǎn)移但是不做事情退盯。
* 如果故障轉(zhuǎn)移開始了,我們將會(huì)返回非0泻肯。 */
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
/* 非客觀下線渊迁,不開始故障轉(zhuǎn)移 */
if (!(master->flags & SRI_O_DOWN)) return 0;
/* 故障轉(zhuǎn)移進(jìn)行中,不開始 */
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
/* 故障轉(zhuǎn)移灶挟,冷卻中 */
if (mstime() - master->failover_start_time <
master->failover_timeout * 2) {
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout * 2) / 1000;
char ctimebuf[26];
ctime_r(&clock, ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}
sentinelStartFailover(master);
return 1;
}
故障轉(zhuǎn)移
領(lǐng)導(dǎo)選舉
/* 設(shè)置master狀態(tài)來(lái)開始一個(gè)故障轉(zhuǎn)移 */
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING, "+new-epoch", master, "%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING, "+try-failover", master, "%@");
master->failover_start_time = mstime() + rand() % SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}
// 發(fā)起一次投票
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
...
/* 如果當(dāng)前實(shí)例是master */
if (ri->flags & SRI_MASTER) {
...
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri, SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
...
}
}
void sentinelCommand(client *c) {
if (!strcasecmp(c->argv[1]->ptr, "masters")) {
...
} else if (!strcasecmp(c->argv[1]->ptr, "is-master-down-by-addr")) {
/* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
* 參數(shù):
* ip:port是要檢查的master的ip和port琉朽,注意這個(gè)命令將不會(huì)通過(guò)name檢查,
* 因?yàn)槔碚撋蟻?lái)說(shuō)稚铣,不同的sentinel可能監(jiān)控帶有相同name的不同master箱叁。
* current-epoch是為了理解我們是否被允許進(jìn)行一次故障轉(zhuǎn)移的投票墅垮。
* 每一個(gè)sentinel對(duì)于一個(gè)epoch僅能投票一次。
* runid不為空意味著我們需要為了故障轉(zhuǎn)移而投票蝌蹂,否則將會(huì)僅進(jìn)行查詢噩斟。*/
sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
char *leader = NULL;
long port;
int isdown = 0;
if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != C_OK ||
getLongLongFromObjectOrReply(c, c->argv[4], &req_epoch, NULL)
!= C_OK)
return;
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr, port, NULL);
/* 是否存在,是否是master孤个,是否是主觀下線狀態(tài)剃允?
* 注意:如果我們處于TILT狀態(tài),我們總是回復(fù)0 */
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;
/* 為這個(gè)master投票或者拉取之前的投票結(jié)果 */
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr, "*")) {
leader = sentinelVoteLeader(ri, (uint64_t) req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}
/* 返回回復(fù):down state, leader, vote epoch */
addReplyMultiBulkLen(c, 3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long) leader_epoch);
if (leader) sdsfree(leader);
} else if (!strcasecmp(c->argv[1]->ptr, "reset")) {
/* SENTINEL RESET <pattern> */
if (c->argc != 3) goto numargserr;
addReplyLongLong(c, sentinelResetMastersByPattern(c->argv[2]->ptr, SENTINEL_GENERATE_EVENT));
} else {
...
}
}
/* 接收SENTINEL is-master-down-by-addr命令回復(fù) */
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;
if (!reply || !link) return;
link->pending_commands--;
r = reply;
/* 忽略錯(cuò)誤或者不期望的回復(fù)齐鲤。
* 注意:如果命令回復(fù)了錯(cuò)誤斥废,我們將會(huì)在timeout之后清理SRI_MASTER_DOWN標(biāo)志。
* 回復(fù)格式:0: 主節(jié)點(diǎn)下線狀態(tài) 1:runid 2: epoch */
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER) {
ri->last_master_down_reply_time = mstime();
if (r->element[0]->integer == 1) {
ri->flags |= SRI_MASTER_DOWN;
} else {
ri->flags &= ~SRI_MASTER_DOWN;
}
if (strcmp(r->element[1]->str, "*")) {
/* 如果runid不是*给郊,說(shuō)明對(duì)端sentinel進(jìn)行了一次投票牡肉。 */
sdsfree(ri->leader);
if ((long long) ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);
ri->leader = sdsnew(r->element[1]->str);
ri->leader_epoch = r->element[2]->integer;
}
}
}
故障轉(zhuǎn)移
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch (ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
// leader選舉成功
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;
/* 檢查我們是否是當(dāng)前故障轉(zhuǎn)移紀(jì)元的leader */
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader, sentinel.myid) == 0;
sdsfree(leader);
/* 如果我不是領(lǐng)導(dǎo)者,并且也不是通過(guò)SENTINEL FAILOVER強(qiáng)行開啟的故障轉(zhuǎn)移淆九,那我們不能繼續(xù) */
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;
/* 選舉超時(shí)的時(shí)間是max(SENTINEL_ELECTION_TIMEOUT,failover_timeout) */
if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;
/* 如果我在選舉超時(shí)時(shí)間內(nèi)都沒(méi)有成為leader统锤,則中止故障轉(zhuǎn)移過(guò)程 */
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING, "-failover-abort-not-elected", ri, "%@");
sentinelAbortFailover(ri);
}
return;
}
sentinelEvent(LL_WARNING, "+elected-leader", ri, "%@");
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING, "+failover-state-select-slave", ri, "%@");
}
// 選擇要晉升的slave
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
/* 這個(gè)狀態(tài)下我們不處理超時(shí),因?yàn)檫@個(gè)函數(shù)會(huì)自己中止或者進(jìn)入下一階段 */
if (slave == NULL) {
sentinelEvent(LL_WARNING, "-failover-abort-no-good-slave", ri, "%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING, "+selected-slave", slave, "%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE, "+failover-state-send-slaveof-noone",
slave, "%@");
}
}
/* 選擇一個(gè)合適的slave來(lái)進(jìn)行晉升炭庙。這個(gè)算法僅僅允許滿足下列的實(shí)例:
* 1) 不帶有下列標(biāo)志:S_DOWN, O_DOWN, DISCONNECTED饲窿。
* 2) 最后收到PING回復(fù)的時(shí)間不超過(guò)5個(gè)PING周期。
* 3) info_refresh不超過(guò)3個(gè)INFO周期焕蹄。
* 4) master_link_down_time到現(xiàn)在的時(shí)間不超過(guò):
* (now - master->s_down_since_time) + (master->down_after_period * 10)逾雄。
* 基本上,從我們的視角看到master下線腻脏,slave將會(huì)被斷開不超過(guò)10個(gè)down-after-period鸦泳。
* 這個(gè)想法是,因?yàn)閙aster下線了永品,所以slave將會(huì)堆積做鹰,但不應(yīng)該堆積過(guò)久。無(wú)論如何鼎姐,
* 我們應(yīng)該根據(jù)復(fù)制偏移量選擇一個(gè)最好的slave誊垢。
* 5) slave優(yōu)先級(jí)不能為0,不然我們會(huì)放棄這個(gè)症见。
* 滿足以上條件時(shí)喂走,我們將會(huì)按照以下條件排序:
* - 更小的優(yōu)先級(jí)
* - 更大的復(fù)制偏移量
* - 更小字典序的runid
* 如果找到了合適的slave,將會(huì)返回谋作,沒(méi)找到則返回NULL */
/* sentinelSelectSlave()的輔助函數(shù)芋肠,被用于qsort()來(lái)選出"better first"的slave。 */
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **) a,
**sb = (sentinelRedisInstance **) b;
char *sa_runid, *sb_runid;
/* 選擇最大優(yōu)先級(jí) */
if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;
/* 選擇最大復(fù)制量 */
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
return 1; /* a > b */
}
/* 選擇最小的runid遵蚜,注意:低版本的redis不會(huì)在INFO發(fā)布runid帖池,所以是NULL */
sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0;
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid);
}
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0]) * dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;
di = dictGetIterator(master->slaves);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
if (slave->flags & (SRI_S_DOWN | SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD * 5) continue;
if (slave->slave_priority == 0) continue;
/* 如果master處于SDOWN狀態(tài)奈惑,我們將會(huì)從slaves每1秒獲取一次INFO信息,否則是每10s一次 */
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD * 5;
else
info_validity_time = SENTINEL_INFO_PERIOD * 3;
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
if (instances) {
qsort(instance, instances, sizeof(sentinelRedisInstance *),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}
// 提升選定的slave
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
/* 如果和要晉升的slave斷開了睡汹,我們無(wú)法發(fā)送命令肴甸。重試直到超時(shí)然后中止 */
if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING, "-failover-abort-slave-timeout", ri, "%@");
sentinelAbortFailover(ri);
}
return;
}
/* 發(fā)送SLAVEOF NO ONE到slave,從而把這個(gè)slave轉(zhuǎn)換成一個(gè)master囚巴,
* 我們注冊(cè)了一個(gè)通用的回調(diào)原在,因?yàn)槲覀儾魂P(guān)心回復(fù)的內(nèi)容,
* 我們將會(huì)通過(guò)不斷檢查INFO的返回來(lái)判斷是否切換成功:slave -> master */
retval = sentinelSendSlaveOf(ri->promoted_slave, NULL, 0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave, "%@");
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
// 等待要晉升的slave晉升完成
/* 我們將會(huì)通過(guò)一直檢查INFO命令的輸出來(lái)確定是否這個(gè)slave已經(jīng)轉(zhuǎn)變成了master */
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
/* 僅僅處理這個(gè)超時(shí)彤叉。切換到下一個(gè)狀態(tài)是通過(guò)解析INFO命令的回復(fù)來(lái)確定slave的晉升的 */
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING, "-failover-abort-slave-timeout", ri, "%@");
sentinelAbortFailover(ri);
}
}
/* 處理從master接收到的INFO命令的回復(fù) */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
/* ---------------------------- Acting half -----------------------------
/* 如果處于TILT模式庶柿,則只會(huì)記錄相關(guān)信息,不執(zhí)行某些操作 */
/* 當(dāng)上次報(bào)告的角色和本次報(bào)告的不一樣時(shí) */
if (role != ri->role_reported) {
ri->role_reported_time = mstime();
ri->role_reported = role;
if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
/* 如果本次匯報(bào)的角色配置和當(dāng)前配置是一致的秽浇,我們記錄+role-change事件浮庐,
* 如果本次匯報(bào)的角色配置和當(dāng)前配置不一致,我們記錄-role-change事件柬焕。 */
sentinelEvent(LL_VERBOSE,
((ri->flags & (SRI_MASTER | SRI_SLAVE)) == role) ?
"+role-change" : "-role-change",
ri, "%@ new reported role is %s",
role == SRI_MASTER ? "master" : "slave",
ri->flags & SRI_MASTER ? "master" : "slave");
}
/* 下面的行為不能在TILT模式下執(zhí)行 */
if (sentinel.tilt) return;
/* 處理 master -> slave 角色轉(zhuǎn)變 */
if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
/* 我們什么都不做审残,但是一個(gè)聲明為slave的master被sentinel認(rèn)為是無(wú)法訪問(wèn)的,
* 如果該實(shí)例一直這樣報(bào)告斑举,我們將會(huì)認(rèn)為它是主觀下線的搅轿,最終可能會(huì)觸發(fā)一次故障轉(zhuǎn)移。 */
}
/* 處理slave->master的角色轉(zhuǎn)變 */
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
/* 如果該slave是晉升的slave懂昂,我們需要修改故障轉(zhuǎn)移狀態(tài)機(jī) */
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) {
/* 注意:我們確認(rèn)了slave已經(jīng)重新配置為了master介时,所以我們把master的配置紀(jì)元作為當(dāng)前紀(jì)元没宾,
* 我們是通過(guò)這個(gè)紀(jì)元贏得故障轉(zhuǎn)移的選舉的凌彬。
* 這將會(huì)強(qiáng)制其他sentinels更新它們的配置(假設(shè)沒(méi)有一個(gè)更新的紀(jì)元可用)。 */
ri->master->config_epoch = ri->master->failover_epoch;
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
sentinelEvent(LL_WARNING, "+promoted-slave", ri, "%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING, "+failover-state-reconf-slaves",
ri->master, "%@");
sentinelCallClientReconfScript(ri->master, SENTINEL_LEADER,
"start", ri->master->addr, ri->addr);
sentinelForceHelloUpdateForMaster(ri->master);
} else {
/* 另外一個(gè)slave轉(zhuǎn)化為了master循衰。我們將原來(lái)master重新配置為slave铲敛。
* 在此之前等待8s時(shí)間,來(lái)接收新的配置会钝,減少數(shù)據(jù)包亂序帶來(lái)的影響伐蒋。 */
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD * 4;
if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri, wait_time) &&
mstime() - ri->role_reported_time > wait_time) {
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE, "+convert-to-slave", ri, "%@");
}
}
}
/* slaves開始跟從一個(gè)新的master */
if ((ri->flags & SRI_SLAVE) &&
role == SRI_SLAVE &&
(ri->slave_master_port != ri->master->addr->port ||
strcasecmp(ri->slave_master_host, ri->master->addr->ip))) {
mstime_t wait_time = ri->master->failover_timeout;
/* 在更新slave之前確保master是正常的 */
if (sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri, wait_time) &&
mstime() - ri->slave_conf_change_time > wait_time) {
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE, "+fix-slave-config", ri, "%@");
}
}
/* 檢查slave重配置的進(jìn)度狀態(tài) */
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT | SRI_RECONF_INPROG))) {
/* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port) {
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE, "+slave-reconf-inprog", ri, "%@");
}
/* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
if ((ri->flags & SRI_RECONF_INPROG) &&
ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) {
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE, "+slave-reconf-done", ri, "%@");
}
}
}
// 配置其它的slaves同步新的master
/* 發(fā)送slave of <new master address>對(duì)所有未完成配置更新的slaves */
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;
di = dictGetIterator(master->slaves);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_RECONF_SENT | SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);
di = dictGetIterator(master->slaves);
while (in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
/* 跳過(guò)晉升的slave和已經(jīng)完成配置的slave */
if (slave->flags & (SRI_PROMOTED | SRI_RECONF_DONE)) continue;
/* 如果發(fā)送SLAVEOF <new master>之后超過(guò)10s,則認(rèn)為超時(shí)迁酸,我們認(rèn)為它已經(jīng)完成先鱼,
* sentinels將會(huì)檢查出這種情況并且在之后進(jìn)行修復(fù) */
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT) {
sentinelEvent(LL_NOTICE, "-slave-reconf-sent-timeout", slave, "%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}
/* 對(duì)于斷連的或者處于同步中的,我們直接跳過(guò) */
if (slave->flags & (SRI_RECONF_SENT | SRI_RECONF_INPROG)) continue;
if (slave->link->disconnected) continue;
/* 發(fā)送SLAVEOF <new master>命令 */
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE, "+slave-reconf-sent", slave, "%@");
in_progress++;
}
}
dictReleaseIterator(di);
/* 檢查是否所有的slaves都已經(jīng)重新配置或者處理了超時(shí) */
sentinelFailoverDetectEnd(master);
}
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
mstime_t elapsed = mstime() - master->failover_state_change_time;
/* 如果這個(gè)新晉升的slave不可達(dá)奸鬓,我們不認(rèn)為故障轉(zhuǎn)移完成 */
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN)
return;
/* 如果所有可達(dá)的slaves都已經(jīng)配置好了焙畔,則故障轉(zhuǎn)移就結(jié)束了 */
di = dictGetIterator(master->slaves);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
if (slave->flags & (SRI_PROMOTED | SRI_RECONF_DONE)) continue;
if (slave->flags & SRI_S_DOWN) continue;
not_reconfigured++;
}
dictReleaseIterator(di);
/* 如果故障轉(zhuǎn)移超時(shí)了,我們強(qiáng)制結(jié)束 */
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING, "+failover-end-for-timeout", master, "%@");
}
if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING, "+failover-end", master, "%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}
/* 如果是因?yàn)槌瑫r(shí)導(dǎo)致的串远,則向所有還沒(méi)有同步master的slaves發(fā)送slaveof命令 */
if (timeout) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->slaves);
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
if (slave->flags & (SRI_RECONF_DONE | SRI_RECONF_SENT)) continue;
if (slave->link->disconnected) continue;
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE, "+slave-reconf-sent-be", slave, "%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}
/* 這個(gè)函數(shù)當(dāng)slave處于SENTINEL_FAILOVER_STATE_UPDATE_CONFIG狀態(tài)時(shí)被調(diào)用宏多。
* 在這種情況下儿惫,我們將會(huì)把master從master表中移除,并把晉升的slave加入master表伸但。 */
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;
sentinelEvent(LL_WARNING, "+switch-master", master, "%s %s %d %s %d",
master->name, master->addr->ip, master->addr->port,
ref->addr->ip, ref->addr->port);
sentinelResetMasterAndChangeAddress(master, ref->addr->ip, ref->addr->port);
}
JedisSentinelPool實(shí)現(xiàn):主備切換
- 找到任意一個(gè)可用的sentinel肾请,通過(guò)sentinel get-master-addr-by-name獲取master地址
- 通過(guò)訂閱+switch-master頻道,獲取master地址的變更事件
- 缺點(diǎn):過(guò)于依賴sentinel的+switch-master更胖,事件丟失則無(wú)法完成客戶端切換
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {
HostAndPort master = null;
boolean sentinelAvailable = false;
log.info("Trying to find master from available Sentinels...");
for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
log.debug("Connecting to Sentinel {}", hap);
Jedis jedis = null;
try {
jedis = new Jedis(hap.getHost(), hap.getPort(), sentinelConnectionTimeout, sentinelSoTimeout);
if (sentinelUser != null) {
jedis.auth(sentinelUser, sentinelPassword);
} else if (sentinelPassword != null) {
jedis.auth(sentinelPassword);
}
if (sentinelClientName != null) {
jedis.clientSetname(sentinelClientName);
}
List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
// connected to sentinel...
sentinelAvailable = true;
if (masterAddr == null || masterAddr.size() != 2) {
log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap);
continue;
}
master = toHostAndPort(masterAddr);
log.debug("Found Redis master at {}", master);
break;
} catch (JedisException e) {
// resolves #1036, it should handle JedisException there's another chance
// of raising JedisDataException
log.warn(
"Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", hap, e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
if (master == null) {
if (sentinelAvailable) {
// can connect to sentinel, but master name seems to not
// monitored
throw new JedisException("Can connect to sentinel, but " + masterName
+ " seems to be not monitored...");
} else {
throw new JedisConnectionException("All sentinels down, cannot determine where is "
+ masterName + " master is running...");
}
}
log.info("Redis master running at {}, starting Sentinel listeners...", master);
for (String sentinel : sentinels) {
final HostAndPort hap = HostAndPort.parseString(sentinel);
MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
// whether MasterListener threads are alive or not, process can be stopped
masterListener.setDaemon(true);
masterListeners.add(masterListener);
masterListener.start();
}
return master;
}
protected class MasterListener extends Thread {
...
@Override
public void run() {
running.set(true);
while (running.get()) {
try {
// double check that it is not being shutdown
if (!running.get()) {
break;
}
j = new Jedis(host, port, sentinelConnectionTimeout, sentinelSoTimeout);
if (sentinelUser != null) {
j.auth(sentinelUser, sentinelPassword);
} else if (sentinelPassword != null) {
j.auth(sentinelPassword);
}
if (sentinelClientName != null) {
j.clientSetname(sentinelClientName);
}
// code for active refresh
List<String> masterAddr = j.sentinelGetMasterAddrByName(masterName);
if (masterAddr == null || masterAddr.size() != 2) {
log.warn("Can not get master addr, master name: {}. Sentinel: {}:{}.", masterName, host, port);
} else {
initPool(toHostAndPort(masterAddr));
}
j.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
log.debug("Sentinel {}:{} published: {}.", host, port, message);
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (masterName.equals(switchMasterMsg[0])) {
initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
log.debug(
"Ignoring message on +switch-master for master name {}, our master name is {}",
switchMasterMsg[0], masterName);
}
} else {
log.error(
"Invalid message received on Sentinel {}:{} on channel +switch-master: {}", host,
port, message);
}
}
}, "+switch-master");
} catch (JedisException e) {
if (running.get()) {
log.error("Lost connection to Sentinel at {}:{}. Sleeping 5000ms and retrying.", host,
port, e);
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
log.error("Sleep interrupted: ", e1);
}
} else {
log.debug("Unsubscribing from Sentinel at {}:{}", host, port);
}
} finally {
if (j != null) {
j.close();
}
}
}
}
...
}
Redisson實(shí)現(xiàn):主從切換
- 通過(guò)sentinel get-master-addr-by-name獲取master地址
- 通過(guò)sentinel slaves獲取slaves地址
- 通過(guò)周期性定時(shí)輪詢來(lái)獲取變更地址信息變更