本文是結(jié)合redis的源碼來總結(jié)一些原理性內(nèi)容(如有不正確請多多指正)。重新梳理redis的sentinel的高可用解決方案在于更好的去對比理解區(qū)塊鏈的分布式存儲問題的架構(gòu)方案讯檐。理解兩種模式的優(yōu)缺點,從而更好的將不同的模式放入不同的應(yīng)用場景中去⊥ξ铮現(xiàn)在區(qū)塊鏈大火,有很多的人是為了區(qū)塊鏈而區(qū)塊鏈飘弧,更有甚者是為了資本和致富而區(qū)塊鏈识藤。從個人角度看,其分布式存儲解決方案次伶、安全性和獎勵機制及交易體制都是整個區(qū)塊鏈缺一不可的痴昧。盡管如此但還是會有很多人將其一一拆解開來。不過從技術(shù)的角度拆分出來看區(qū)塊鏈架構(gòu)方面的解決方案学少,確是有必要的和給人啟發(fā)的剪个。有關(guān)區(qū)塊鏈的分布式存儲架構(gòu)的內(nèi)容,見《區(qū)塊鏈至分布式存儲》版确,及其兩者在架構(gòu)層面的對比見《區(qū)塊鏈分布式存儲與redis分布式存儲對比》扣囊。
什么是redis的sentinel高可用解決方案
redis的sentinel的解決方案基于主從復(fù)制結(jié)構(gòu)著眼于分布式存儲容錯、容災(zāi)問題的高可用方案绒疗。以確保redis可以從容去應(yīng)對多種突發(fā)情況(比如網(wǎng)絡(luò)連接問題侵歇、宕機問題,設(shè)備故障問題等等)吓蘑。首先容錯的基礎(chǔ)就是數(shù)據(jù)備份惕虑、備份自然就離不開持久化和復(fù)制兩種方式。容錯問題在于當我們擁有了多份備份(這備份指的是復(fù)制磨镶,個人認為在某種意義上redis的主從復(fù)制結(jié)構(gòu)就是一個熱備的過程溃蔫,這種結(jié)構(gòu)一方面可以容錯,另一方面也可以根據(jù)業(yè)務(wù)特性利用來做讀寫分離琳猫,從一定程度上緩解服務(wù)大流量帶來的壓力伟叛。當被使用成為進行讀寫分離的時候就需要根據(jù)業(yè)務(wù)對于數(shù)據(jù)的一致性要求程度了。)系統(tǒng)時如何能夠自主的做出相對正確合理的選擇去應(yīng)對這些問題脐嫂,并對客戶端做到透明统刮。sentinel
機制正是redis對這一問題的一種解決方案,對于配合sentinel
進行master
切換客戶端連接的代碼主要被實現(xiàn)在各種語言的客戶端代碼里账千,不在服務(wù)器代碼中侥蒙。
sentinel的服務(wù)架構(gòu)體系
首先需要理解sentinel的幾點:
- sentinel本身是監(jiān)督者的身份,沒有存儲功能匀奏。在整個體系中一個sentinel者或一群sentinels與主從服務(wù)架構(gòu)體系是監(jiān)督與被監(jiān)督的關(guān)系鞭衩。
- 作為一個sentinel在整個架構(gòu)體系中有就可能有如下三種交互:sentinel與主服務(wù)器、sentinel與從服務(wù)器、sentinel與其他sentinel醋旦。
- 既然是交互恒水,交互所需要的基本內(nèi)容對于這三種場景還是一樣的会放,首先要構(gòu)建這樣的一個交互網(wǎng)絡(luò)無可避免饲齐,需要節(jié)點的注冊與發(fā)現(xiàn)、節(jié)點之間的通信連接咧最、節(jié)點蔽嫒耍活、節(jié)點之間的通信協(xié)議等矢沿。
- 因為角色不同所以在這個架構(gòu)體系中承擔的功能也不一樣滥搭。所以交互的內(nèi)容也不一樣。
在理解了以上幾點之后捣鲸,我們一步步從構(gòu)建sentinel網(wǎng)絡(luò)體系到這整個體系結(jié)構(gòu)是如何來保證其高可用性來分析瑟匆。
構(gòu)建sentinel網(wǎng)絡(luò)結(jié)構(gòu)體系
初始化sentinel
- 啟動sentinel模式初始化使用命令
redis-sentinel /path/to/sentinel.conf
或者redis-server /path/to/sentinel.conf --sentinel
這個官方文檔都有介紹可參考:Redis Sentinel Documentation - 啟動sentinel模式時,sentinel主要做了一下幾件事:初始化服務(wù)器栽惶、加載命令表愁溜、加載配置文件初始化、監(jiān)聽主服務(wù)器信息啟動周期函數(shù)外厂。
如下便是server.c
中main()
方法的有關(guān)sentinel模式的源碼冕象,有興趣可以自己從頭到尾調(diào)試的方法可以參考另一篇博客linux上用gdb調(diào)試redis源碼和Redis debugging guide
...
setlocale(LC_COLLATE,"");
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
char hashseed[16];
getRandomHexChars(hashseed,sizeof(hashseed));
dictSetHashFunctionSeed((uint8_t*)hashseed);
server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
moduleInitModulesSystem();
/* Store the executable path and arguments in a safe place in order
* to be able to restart the server later. */
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
...
在代碼中可以找到與sentinel
模式的相關(guān)初始化方法initSentinelConfig()、initSentinel()
sentinel.c/initSentinelConfig
/* This function overwrites a few normal Redis config default with Sentinel
* specific defaults. */
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT;
}
sentinel.c/initSentinel
/* Perform the Sentinel mode initialization. */
void initSentinel(void) {
unsigned int j;
/* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */
dictEmpty(server.commands,NULL);
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
}
/* Initialize various data structures. */
sentinel.current_epoch = 0;
sentinel.masters = dictCreate(&instancesDictType,NULL);
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
memset(sentinel.myid,0,sizeof(sentinel.myid));
}
從兩段源碼中看到sentinel
模式的初始化過程汁蝶,先初始化默認端口26379
渐扮,然后加載命令表和初始化sentinelState
。
sentinel
加載的命令表與普通redis
模式的命令表有所不同掖棉,sentinel
模式只支持的如下幾種命令,因此也就意味著sentinel
架構(gòu)本身也只會用這些命令墓律。
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};
基本數(shù)據(jù)結(jié)構(gòu)
sentinelState
/* Main state. */
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
unsigned long simfailure_flags; /* Failures simulation. */
} sentinel;
sentinelRedisInstance
typedef struct sentinelRedisInstance {
int flags; /* See SRI_... defines */
char *name; /* Master name from the point of view of this sentinel. */
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
uint64_t config_epoch; /* Configuration epoch. */
sentinelAddr *addr; /* Master host. */
instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
mstime_t s_down_since_time; /* Subjectively down since time. */
mstime_t o_down_since_time; /* Objectively down since time. */
mstime_t down_after_period; /* Consider it down after that period. */
mstime_t info_refresh; /* Time at which we received INFO output from it. */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master specific. */
dict *sentinels; /* Other sentinels monitoring the same master. */
dict *slaves; /* Slaves for this master instance. */
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & slaves. */
/* Slave specific. */
mstime_t master_link_down_time; /* Slave replication link down time. */
int slave_priority; /* Slave priority according to its INFO output. */
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
char *slave_master_host; /* Master host as reported by INFO */
int slave_master_port; /* Master port as reported by INFO */
int slave_master_link_status; /* Master link status as reported by INFO */
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
uint64_t failover_epoch; /* Epoch of the currently started failover. */
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
mstime_t failover_state_change_time;
mstime_t failover_start_time; /* Last failover attempt start time. */
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
} sentinelRedisInstance;
這兩個結(jié)構(gòu)體,就是sentinel
核心的數(shù)據(jù)結(jié)構(gòu)幔亥。一個存儲了sentinel
的自身狀態(tài)耻讽,一個存儲了master、slave及其他監(jiān)聽同一個master的sentinel應(yīng)用實例的信息紫谷。在服務(wù)啟動的時候構(gòu)建一個這樣的關(guān)系來存儲sentinel
與其他三種角色的關(guān)系齐饮。如圖:
sentinel&master
-
發(fā)現(xiàn)服務(wù)
在sentinelState
結(jié)構(gòu)體中發(fā)現(xiàn),有一個master的指針笤昨,這是個字典表祖驱,字典里保存是一個個指向sentinelRedisInstance
實例的地址。而這個監(jiān)聽的master的ip瞒窒、port是從配置文件sentinel.conf
中的配置sentinel monitor mymaster 127.0.0.1 6379 2
中所解析捺僻。詳細配置說明可查詢配置文件中的注解。入口方法調(diào)用server.c/main->config,c/loadServerConfig->config.c/loadServerConfigFromString->sentinel.c/sentinelHandleConfiguration
loadServerConfigFromString
中相關(guān)加載sentinel
模式的代碼如下:
} else if (!strcasecmp(argv[0],"sentinel")) {
/* argc == 1 is handled by main() as we need to enter the sentinel
* mode ASAP. */
if (argc != 1) {
if (!server.sentinel_mode) {
err = "sentinel directive while not in sentinel mode";
goto loaderr;
}
err = sentinelHandleConfiguration(argv+1,argc-1);
if (err) goto loaderr;
}
}
sentinel.c
的sentinelHandleConfiguration
方法解析加載監(jiān)督master
配置代碼如下:
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
if (!strcasecmp(argv[0],"monitor") && argc == 5) {
/* monitor <name> <host> <port> <quorum> */
int quorum = atoi(argv[4]);
if (quorum <= 0) return "Quorum must be 1 or greater.";
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL)
{
switch(errno) {
case EBUSY: return "Duplicated master name.";
case ENOENT: return "Can't resolve master instance hostname.";
case EINVAL: return "Invalid port number";
}
}
} ...
return NULL;
}
根據(jù)對monitor master
的配置的解析,sentinel
創(chuàng)建了存儲master
信息的sentinelRedisInstance
結(jié)構(gòu)匕坯。接著看到createSentinelRedisInstance
方法:
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
char slavename[NET_PEER_ID_LEN], *sdsname;
serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
serverAssert((flags & SRI_MASTER) || master != NULL);
/* Check address validity. */
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;
/* For slaves use ip:port as name. */
if (flags & SRI_SLAVE) {
anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}
/* Make sure the entry is not duplicated. This may happen when the same
* name for a master is used multiple times inside the configuration or
* if we try to add multiple times a slave or sentinel with same ip/port
* to a master. */
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;
sdsname = sdsnew(name);
if (dictFind(table,sdsname)) {
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}
/* Create the instance object. */
ri = zmalloc(sizeof(*ri));
/* Note that all the instances are started in the disconnected state,
* the event loop will take care of connecting them. */
ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
ri->link = createInstanceLink();
ri->last_pub_time = mstime();
ri->last_hello_time = mstime();
ri->last_master_down_reply_time = mstime();
ri->s_down_since_time = 0;
ri->o_down_since_time = 0;
ri->down_after_period = master ? master->down_after_period :
SENTINEL_DEFAULT_DOWN_AFTER;
ri->master_link_down_time = 0;
ri->auth_pass = NULL;
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
ri->slave_reconf_sent_time = 0;
ri->slave_master_host = NULL;
ri->slave_master_port = 0;
ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
ri->slave_repl_offset = 0;
ri->sentinels = dictCreate(&instancesDictType,NULL);
ri->quorum = quorum;
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
ri->master = master;
ri->slaves = dictCreate(&instancesDictType,NULL);
ri->info_refresh = 0;
/* Failover state. */
ri->leader = NULL;
ri->leader_epoch = 0;
ri->failover_epoch = 0;
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = 0;
ri->failover_start_time = 0;
ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
ri->failover_delay_logged = 0;
ri->promoted_slave = NULL;
ri->notification_script = NULL;
ri->client_reconfig_script = NULL;
ri->info = NULL;
/* Role */
ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
ri->role_reported_time = mstime();
ri->slave_conf_change_time = mstime();
/* Add into the right table. */
dictAdd(table, ri->name, ri);
return ri;
}
這段代碼是通用代碼束昵,用于創(chuàng)建master、slave葛峻、sentinel實例關(guān)系的锹雏,通過flags來進行區(qū)分∈踅保看到master
相關(guān)的代碼:
- 首先對傳入的
ip礁遵、port
進行檢驗并初始化SentinelAddr
. - 查找
dict
表中是否有重名的master
、如果有則返回并拋出錯誤碼EBUSY
. - 創(chuàng)建一個實例對象采记,初始化一些配置的默認值如:
InstanceLink
佣耐、sentinels
、slaves
等等唧龄。 - 將當前的實例對象添加進字典中兼砖。
如便完成了監(jiān)聽master字典表的構(gòu)建。
-
建立與master的連接
redis是單線程的既棺,基于事件回調(diào)來實現(xiàn)讽挟。因此還是回到server.c
文件的main()
方法,可以看到在加載完sentinel
的配置文件后會啟動事件循環(huán)援制,代碼如下:
...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
...
在redis的事件回調(diào)中分了時間事件和文件事件戏挡,對于sentinel
的一些連接心跳檢測、服務(wù)狀態(tài)檢測晨仑,sentinel
的發(fā)現(xiàn)等等都是一個周期性的過程褐墅。因此創(chuàng)建連接、發(fā)送cmd獲取服務(wù)器狀態(tài)和廣播消息等這些一定都是通過時間事件來完成洪己。在debug
的aeMain
方法途中妥凳,發(fā)現(xiàn)當運行到時間事件時進入是serverCron
回調(diào)方法。其實仔細一點就會發(fā)現(xiàn)這個方法是在initServer
的時候被注冊為時間事件的回調(diào)答捕,那么順藤摸瓜我們也就發(fā)現(xiàn)sentinel.c
注冊的在serverCron
中入口方法sentinelTimer
逝钥。
initServer
中注冊的時間回調(diào)
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
serverCron
中有關(guān)sentinel
的處理代碼
...
/* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
...
那么繼續(xù)深入sentinelTimer
方法內(nèi)部很快便發(fā)現(xiàn)了創(chuàng)建連接代碼。在sentinel.c
文件方法調(diào)用鏈如下sentinelTimer->sentinelHandleDictOfRedisInstances->sentinelHandleRedisInstance->sentinelReconnectInstance
拱镐,這個鏈路是通用方法艘款。包括了在sentinel結(jié)構(gòu)體系中三種角色的連接創(chuàng)建。
sentinelReconnectInstance
代碼:
/* Create the async connections for the instance link if the link
* is disconnected. Note that link->disconnected is true even if just
* one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;
/* Commands connection. */
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->cc);
sentinelSetClientName(ri,link->cc,"cmd");
/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
/* Pub / Sub */
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
/* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
對于master
沃琅,sentinel
會創(chuàng)建兩個連接哗咆,一個是用于發(fā)送command
而另一個是廣播pub/sub
的連接。
- commands連接創(chuàng)建完后益眉,注冊連接回調(diào)處理方法
sentinelLinkEstablishedCallback
晌柬、連接斷開回調(diào)處理方法sentinelDisconnectCallback
,還有master
連接需要權(quán)限驗證的方法sentinelSendAuthIfNeeded
該方法在連接建立會后發(fā)送Auth pwd
命令驗證權(quán)限姥份,然后設(shè)置客戶端的名字為cmd
,最后給master發(fā)送一個ping
命令,來測試這個command
命令年碘。 -
pub/sub
連接創(chuàng)建之后的內(nèi)容和command一致澈歉,但最后sentinel會發(fā)送SUBSCRIBE __sentinel__:hello
命令來訂閱這個頻道,并注冊sentinelReceiveHelloMessages
函數(shù)處理該頻道廣播回的消息,主要用于發(fā)現(xiàn)網(wǎng)絡(luò)之中其他監(jiān)聽該master
的sentinels
屿衅。
問題:為什么需要創(chuàng)建兩個連接埃难,而不用同一個連接呢?
答: 有一種解釋是為了防止command
連接斷開時傲诵,丟失廣播的消息凯砍。但個人認為理由有點牽強。
sentinel&slave:
-
發(fā)現(xiàn)從服務(wù)器
從sentinelHandleRedisInstance
的代碼中
/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
sentinelReconnectInstance(ri);
sentinelSendPeriodicCommands(ri);
/* ============== ACTING HALF ============= */
/* We don't proceed with the acting half if we are in TILT mode.
* TILT happens when we find something odd with the time, like a
* sudden change in the clock. */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
/* Every kind of instance */
sentinelCheckSubjectivelyDown(ri);
/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}
/* Only masters */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
發(fā)現(xiàn)與主服務(wù)器創(chuàng)建完連接之后拴竹,就會運行一個周期函數(shù)sentinelSendPeriodicCommands
代碼如下:
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
* the specified master or slave instance. */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
if (ri->link->disconnected) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
* want to use a lot of memory just because a link is not working
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
* period. In this state we want to closely monitor slaves in case they
* are turned into masters by another Sentinel, or by the sysadmin.
*
* Similarly we monitor the INFO output more often if the slave reports
* to be disconnected from the master, so that we can have a fresh
* disconnection time figure. */
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
/* Send INFO to masters and slaves, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "INFO");
if (retval == C_OK) ri->link->pending_commands++;
}
/* Send PING to all the three kinds of instances. */
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}
/* PUBLISH hello messages to all the three kinds of instances. */
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}
在這段周期代碼中,sentinel
平時狀態(tài)下,每10s就會發(fā)送一個info
命令剧罩,默認每1s發(fā)送ping
命令且down-after-milliseconds
參數(shù)可配栓拜,每2s廣播hello msg
。先看info
命令的返回:
127.0.0.1:6379> info
# Server
redis_version:4.0.10
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:564e829c2a2c36f6
redis_mode:standalone
os:Linux 4.4.0-17134-Microsoft x86_64
arch_bits:64
multiplexing_api:epoll
atomicvar_api:atomic-builtin
gcc_version:5.4.0
process_id:27
run_id:f137124d98e21709eaa1def3b192c152a2500750
tcp_port:6379
uptime_in_seconds:339
uptime_in_days:0
hz:10
lru_clock:4977260
executable:/home/jane-zhang/redis-server
config_file:/svr/redis_config/redis_6379.conf
# Clients
connected_clients:5
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0
# Memory
used_memory:2022832
used_memory_human:1.93M
used_memory_rss:2912256
used_memory_rss_human:2.78M
used_memory_peak:2082832
used_memory_peak_human:1.99M
used_memory_peak_perc:97.12%
used_memory_overhead:1985938
used_memory_startup:786584
used_memory_dataset:36894
used_memory_dataset_perc:2.98%
total_system_memory:17048510464
total_system_memory_human:15.88G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
mem_fragmentation_ratio:1.44
mem_allocator:jemalloc-4.0.3
active_defrag_running:0
lazyfree_pending_objects:0
# Persistence
loading:0
rdb_changes_since_last_save:0
rdb_bgsave_in_progress:0
rdb_last_save_time:1531703643
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:0
rdb_current_bgsave_time_sec:-1
rdb_last_cow_size:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
aof_last_cow_size:0
# Stats
total_connections_received:9
total_commands_processed:1445
instantaneous_ops_per_sec:5
total_net_input_bytes:66681
total_net_output_bytes:328934
instantaneous_input_kbps:0.28
instantaneous_output_kbps:0.66
rejected_connections:0
sync_full:2
sync_partial_ok:0
sync_partial_err:2
expired_keys:0
expired_stale_perc:0.00
expired_time_cap_reached_count:0
evicted_keys:0
keyspace_hits:0
keyspace_misses:0
pubsub_channels:1
pubsub_patterns:0
latest_fork_usec:1527
migrate_cached_sockets:0
slave_expires_tracked_keys:0
active_defrag_hits:0
active_defrag_misses:0
active_defrag_key_hits:0
active_defrag_key_misses:0
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6381,state=online,offset=36201,lag=1
slave1:ip=127.0.0.1,port=6380,state=online,offset=36334,lag=0
master_replid:5d4684d94bde70a56746ea1c4c30cccd00df7f56
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:36334
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:36334
# CPU
used_cpu_sys:0.33
used_cpu_user:0.16
used_cpu_sys_children:0.03
used_cpu_user_children:0.00
# Cluster
cluster_enabled:0
# Keyspace
看到向master
發(fā)送的info
命令返回結(jié)果的# Replication section
里就有關(guān)于slave
的信息惠昔。info
的命令的回調(diào)鏈路sentinel.c/sentinelInfoReplyCallback->sentinel.c/sentinelRefreshInstanceInfo
如下:
/* Process the INFO output from masters. */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sds *lines;
int numlines, j;
int role = 0;
/* cache full INFO output for instance */
sdsfree(ri->info);
ri->info = sdsnew(info);
/* The following fields must be reset to a given value in the case they
* are not found at all in the INFO output. */
ri->master_link_down_time = 0;
/* Process line by line. */
lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
for (j = 0; j < numlines; j++) {
sentinelRedisInstance *slave;
sds l = lines[j];
/* run_id:<40 hex chars>*/
if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
if (ri->runid == NULL) {
ri->runid = sdsnewlen(l+7,40);
} else {
if (strncmp(ri->runid,l+7,40) != 0) {
sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
sdsfree(ri->runid);
ri->runid = sdsnewlen(l+7,40);
}
}
}
/* old versions: slave0:<ip>,<port>,<state>
* new versions: slave0:ip=127.0.0.1,port=9999,... */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l,"slave",5) && isdigit(l[5]))
{
char *ip, *port, *end;
if (strstr(l,"ip=") == NULL) {
/* Old format. */
ip = strchr(l,':'); if (!ip) continue;
ip++; /* Now ip points to start of ip address. */
port = strchr(ip,','); if (!port) continue;
*port = '\0'; /* nul term for easy access. */
port++; /* Now port points to start of port number. */
end = strchr(port,','); if (!end) continue;
*end = '\0'; /* nul term for easy access. */
} else {
/* New format. */
ip = strstr(l,"ip="); if (!ip) continue;
ip += 3; /* Now ip points to start of ip address. */
port = strstr(l,"port="); if (!port) continue;
port += 5; /* Now port points to start of port number. */
/* Nul term both fields for easy access. */
end = strchr(ip,','); if (end) *end = '\0';
end = strchr(port,','); if (end) *end = '\0';
}
/* Check if we already have this slave into our table,
* otherwise add it. */
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}
/* master_link_down_since_seconds:<seconds> */
if (sdslen(l) >= 32 &&
!memcmp(l,"master_link_down_since_seconds",30))
{
ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
}
/* role:<role> */
if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
if (role == SRI_SLAVE) {
/* master_host:<host> */
if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
if (ri->slave_master_host == NULL ||
strcasecmp(l+12,ri->slave_master_host))
{
sdsfree(ri->slave_master_host);
ri->slave_master_host = sdsnew(l+12);
ri->slave_conf_change_time = mstime();
}
}
/* master_port:<port> */
if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
int slave_master_port = atoi(l+12);
if (ri->slave_master_port != slave_master_port) {
ri->slave_master_port = slave_master_port;
ri->slave_conf_change_time = mstime();
}
}
/* master_link_status:<status> */
if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
ri->slave_master_link_status =
(strcasecmp(l+19,"up") == 0) ?
SENTINEL_MASTER_LINK_STATUS_UP :
SENTINEL_MASTER_LINK_STATUS_DOWN;
}
/* slave_priority:<priority> */
if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
ri->slave_priority = atoi(l+15);
/* slave_repl_offset:<offset> */
if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
ri->slave_repl_offset = strtoull(l+18,NULL,10);
}
}
ri->info_refresh = mstime();
sdsfreesplitres(lines,numlines);
/* ---------------------------- Acting half -----------------------------
* Some things will not happen if sentinel.tilt is true, but some will
* still be processed. */
/* Remember when the role changed. */
if (role != ri->role_reported) {
ri->role_reported_time = mstime();
ri->role_reported = role;
if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
/* Log the event with +role-change if the new role is coherent or
* with -role-change if there is a mismatch with the current config. */
sentinelEvent(LL_VERBOSE,
((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
"+role-change" : "-role-change",
ri, "%@ new reported role is %s",
role == SRI_MASTER ? "master" : "slave",
ri->flags & SRI_MASTER ? "master" : "slave");
}
/* None of the following conditions are processed when in tilt mode, so
* return asap. */
if (sentinel.tilt) return;
/* Handle master -> slave role switch. */
if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
/* Nothing to do, but masters claiming to be slaves are
* considered to be unreachable by Sentinel, so eventually
* a failover will be triggered. */
}
/* Handle slave -> master role switch. */
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
/* If this is a promoted slave we can change state to the
* failover state machine. */
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
/* Now that we are sure the slave was reconfigured as a master
* set the master configuration epoch to the epoch we won the
* election to perform this failover. This will force the other
* Sentinels to update their config (assuming there is not
* a newer one already available). */
ri->master->config_epoch = ri->master->failover_epoch;
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
sentinelForceHelloUpdateForMaster(ri->master);
} else {
/* A slave turned into a master. We want to force our view and
* reconfigure as slave. Wait some time after the change before
* going forward, to receive new configs if any. */
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->role_reported_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}
/* Handle slaves replicating to a different master address. */
if ((ri->flags & SRI_SLAVE) &&
role == SRI_SLAVE &&
(ri->slave_master_port != ri->master->addr->port ||
strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
{
mstime_t wait_time = ri->master->failover_timeout;
/* Make sure the master is sane before reconfiguring this instance
* into a slave. */
if (sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->slave_conf_change_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
}
}
/* Detect if the slave that is in the process of being reconfigured
* changed state. */
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
{
/* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port)
{
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}
/* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
if ((ri->flags & SRI_RECONF_INPROG) &&
c == SENTINEL_MASTER_LINK_STATUS_UP)
{
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
}
由于info
命令返回結(jié)果內(nèi)容繁多幕与、新舊版本格式兼容、以及tilt
模式和故障轉(zhuǎn)移時master與slave
角色對換的處理過程復(fù)雜镇防,所以該方法也巨長啦鸣,這里先只關(guān)注與slave
信息獲取有關(guān)的主要內(nèi)容。
...
/* old versions: slave0:<ip>,<port>,<state>
* new versions: slave0:ip=127.0.0.1,port=9999,... */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l,"slave",5) && isdigit(l[5]))
{
char *ip, *port, *end;
if (strstr(l,"ip=") == NULL) {
/* Old format. */
ip = strchr(l,':'); if (!ip) continue;
ip++; /* Now ip points to start of ip address. */
port = strchr(ip,','); if (!port) continue;
*port = '\0'; /* nul term for easy access. */
port++; /* Now port points to start of port number. */
end = strchr(port,','); if (!end) continue;
*end = '\0'; /* nul term for easy access. */
} else {
/* New format. */
ip = strstr(l,"ip="); if (!ip) continue;
ip += 3; /* Now ip points to start of ip address. */
port = strstr(l,"port="); if (!port) continue;
port += 5; /* Now port points to start of port number. */
/* Nul term both fields for easy access. */
end = strchr(ip,','); if (end) *end = '\0';
end = strchr(port,','); if (end) *end = '\0';
}
/* Check if we already have this slave into our table,
* otherwise add it. */
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}
...
在解析完# Replication
中的slave0:ip=127.0.0.1,port=6381,state=online,offset=36201,lag=1
后来氧,檢查并查找該slave
信息是否已經(jīng)存在表中诫给,沒有就創(chuàng)建一個SentinelRedisInstance
結(jié)構(gòu)存儲信息并添加進slave dict
中,最后slave
的信息保存至配置文件啦扬。
2.創(chuàng)建連接
在第一次對master
初始化完獲得slave
的信息之后中狂。在下一個周期,通過sentinelHandleDictOfRedisInstances
方法的遞歸便可以用和主服務(wù)器建立連接同樣的方法建立command
和pub/sub
兩個連接,并用ping
命令來監(jiān)測心跳扑毡,info
命令來更新slaves
的信息胃榕。
/* Perform scheduled operations for all the instances in the dictionary.
* Recursively call the function against dictionaries of slaves. */
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
/* There are a number of things we need to perform against every master. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
sentinel&sentinel:
-
發(fā)現(xiàn)其他sentinel
在與master
和slave
的連接中會有一條pub/sub
的連接,都訂閱了相同master
的__sentinel__:hello
頻道瞄摊,在上面的周期方法也看到每隔2秒鐘sentinel
便會向master
的頻道廣播hello
消息勋又。那也就意味著,如果有兩個sentinel
同時監(jiān)聽同一個master
時换帜,這兩個sentinel
會收到互相廣播的信息楔壤,而這個信息的內(nèi)容就可以用來傳播自身的信息,從而讓其知道對方的存在膜赃。這個消息的實際處理方法如下:
/* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
* instance in order to broadcast the current configuraiton for this
* master, and to advertise the existence of this Sentinel at the same time.
*
* The message has the following format:
*
* sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
* master_name,master_ip,master_port,master_config_epoch.
*
* Returns C_OK if the PUBLISH was queued correctly, otherwise
* C_ERR is returned. */
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[NET_IP_STR_LEN];
char payload[NET_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
if (ri->link->disconnected) return C_ERR;
/* Use the specified announce address if specified, otherwise try to
* obtain our own IP address. */
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return C_ERR;
announce_ip = ip;
}
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;
/* Format and send the Hello message. */
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
}
廣播的消息內(nèi)容格式為sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch.
這條消息被廣播給所有訂閱這個頻道的節(jié)點挺邀,包括發(fā)送者本身也會收到。那么對于收到這條廣播信息的sentinel
節(jié)點會怎么處理呢?上面解說sentinel
在與主服務(wù)器建立pub/sub
連接時端铛,就注冊回調(diào)方法sentinelReceiveHelloMessages->sentinelProcessHelloMessage
/* Process an hello message received via Pub/Sub in master or slave instance,
* or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
*
* If the master name specified in the message is not known, the message is
* discarded. */
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */
int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;
if (numtokens == 8) {
/* Obtain a reference to the master this hello message is about */
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup; /* Unknown master, skip the message. */
/* First, try to see if we already have this sentinel. */
port = atoi(token[1]);
master_port = atoi(token[6]);
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels,token[0],port,token[2]);
current_epoch = strtoull(token[3],NULL,10);
master_config_epoch = strtoull(token[7],NULL,10);
if (!si) {
/* If not, remove all the sentinels that have the same runid
* because there was an address change, and add the same Sentinel
* with the new address back. */
removed = removeMatchingSentinelFromMaster(master,token[2]);
if (removed) {
sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
"%@ ip %s port %d for %s", token[0],port,token[2]);
} else {
/* Check if there is another Sentinel with the same address this
* new one is reporting. What we do if this happens is to set its
* port to 0, to signal the address is invalid. We'll update it
* later if we get an HELLO message. */
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0],port,NULL);
if (other) {
sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
other->addr->port = 0; /* It means: invalid address. */
sentinelUpdateSentinelAddressInAllMasters(other);
}
}
/* Add the new sentinel. */
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
if (si) {
if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
/* The runid is NULL after a new instance creation and
* for Sentinels we don't have a later chance to fill it,
* so do it now. */
si->runid = sdsnew(token[2]);
sentinelTryConnectionSharing(si);
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
sentinelFlushConfig();
}
}
/* Update local current_epoch if received current_epoch is greater.*/
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
/* Update master info if received configuration is newer. */
if (si && master->config_epoch < master_config_epoch) {
master->config_epoch = master_config_epoch;
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5]))
{
sentinelAddr *old_addr;
sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
token[5], master_port);
old_addr = dupSentinelAddr(master->addr);
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
sentinelCallClientReconfScript(master,
SENTINEL_OBSERVER,"start",
old_addr,master->addr);
releaseSentinelAddr(old_addr);
}
}
/* Update the state of the Sentinel. */
if (si) si->last_hello_time = mstime();
}
cleanup:
sdsfreesplitres(token,numtokens);
}
在獲得publish消息后,
- 將消息按照“,”號分割開來泣矛。
- 查找并獲取
master
,如果master
不在監(jiān)聽列表便跳過該消息,反之下一步禾蚕。 - 檢查自己是否已經(jīng)記錄該
sentinel
節(jié)點的記錄您朽,根據(jù)runId
和ip
查找。如果就沒有就先移除掉原來有相同runId
的SentinelRedisInstance
换淆,因為可能是節(jié)點的地址變了哗总,需要添加節(jié)點以新地址。反之下一步倍试。 - 檢查有沒有ip和port一樣的sentinel正在運行讯屈,它的端口至為0,設(shè)為無效。等待下一輪hello消息的更新县习。
- 根據(jù)接收到的sentinel信息創(chuàng)建一個新的
SentinelRedisInstance
結(jié)構(gòu)涮母,并填充runId
。這個執(zhí)行有一個比較有意思的優(yōu)化點就是sentinelTryConnectionSharing
方法躁愿。 - 將
sentinel
節(jié)點信息保存至配置文件叛本。 - 當其他節(jié)點的當前紀元大于自己的紀元時,修改統(tǒng)一并保存至配置文件彤钟。
- 當節(jié)點保存的
master
信息的配置紀元小于其他節(jié)點時来候,更新master的配置紀元和其ip和port
。所以切記不同sentinel
監(jiān)聽同一個master
時配置的名字不能不一致逸雹。 - 最后更新上次
hello
的時間营搅。
通過處理hello msg
,就解決了其他sentinel
節(jié)點的發(fā)現(xiàn)。和節(jié)點之間master配置和紀元的同步一致性問題峡眶,所有的紀元統(tǒng)一使用最新的剧防。
- 建立連接
- 與slave的連接建立一樣,
sentinel
與sentinel
互相的連接建立也是在周期方法中遞歸調(diào)用創(chuàng)建的辫樱,值得一提的是sentinel
互相之間只有一個命令連接而沒有pub/sub
連接代碼見上面的sentinelReconnectInstance
方法峭拘。 - 在建立
sentinel
其中還有一個優(yōu)化點sentinelTryConnectionSharing
方法,在上面代碼中也有提到狮暑,這里提出來分析一下解釋一下什么叫連接共享鸡挠,該方法的代碼如下:
/* This function will attempt to share the instance link we already have
* for the same Sentinel in the context of a different master, with the
* instance we are passing as argument.
*
* This way multiple Sentinel objects that refer all to the same physical
* Sentinel instance but in the context of different masters will use
* a single connection, will send a single PING per second for failure
* detection and so forth.
*
* Return C_OK if a matching Sentinel was found in the context of a
* different master and sharing was performed. Otherwise C_ERR
* is returned. */
int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_SENTINEL);
dictIterator *di;
dictEntry *de;
if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *master = dictGetVal(de), *match;
/* We want to share with the same physical Sentinel referenced
* in other masters, so skip our master. */
if (master == ri->master) continue;
match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
NULL,0,ri->runid);
if (match == NULL) continue; /* No match. */
if (match == ri) continue; /* Should never happen but... safer. */
/* We identified a matching Sentinel, great! Let's free our link
* and use the one of the matching Sentinel. */
releaseInstanceLink(ri->link,NULL);
ri->link = match->link;
match->link->refcount++;
return C_OK;
}
dictReleaseIterator(di);
return C_ERR;
}
在解說這個方法之前先需要給出一個比較重要的數(shù)據(jù)結(jié)sentinelLink
/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
* monitoring many masters, we have different instances representing the
* same Sentinels, one per master, and we need to share the hiredis connections
* among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
* 500 outgoing connections instead of 5.
* So this structure represents a reference counted link in terms of the two
* hiredis connections for commands and Pub/Sub, and the fields needed for
* failure detection, since the ping/pong time are now local to the link: if
* the link is available, the instance is avaialbe. This way we don't just
* have 5 connections instead of 500, we also send 5 pings instead of 500.
*
* Links are shared only for Sentinels: master and slave instances have
* a link with refcount = 1, always. */
typedef struct instanceLink {
int refcount; /* Number of sentinelRedisInstance owners. */
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
int pending_commands; /* Number of commands sent waiting for a reply. */
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
mstime_t cc_conn_time; /* cc connection time. */
mstime_t pc_conn_time; /* pc connection time. */
mstime_t pc_last_activity; /* Last time we received any message. */
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
mstime_t act_ping_time; /* Time at which the last pending ping (no pong
received after it) was sent. This field is
set to 0 when a pong is received, and set again
to the current time if the value is 0 and a new
ping is sent. */
mstime_t last_ping_time; /* Time at which we sent the last ping. This is
only used to avoid sending too many pings
during failure. Idle time is computed using
the act_ping_time field. */
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
mstime_t last_reconn_time; /* Last reconnection attempt performed when
the link was down. */
} instanceLink;
正如這個方法和數(shù)據(jù)結(jié)構(gòu)的注釋所描述的一樣,如果一個sentinel
集群搬男,它們同時監(jiān)聽著同樣的一批master
拣展,如:除了自身還有其他5個sentinel
共同監(jiān)聽100個master
的話,按照通過master
查找sentinel
節(jié)點循環(huán)來創(chuàng)建的連接的方式,就可能與其他5個sentinel
建立500個連接缔逛,但實際上只要5個連接就可以了备埃,但是sentinelReconnectInstance
結(jié)構(gòu)體還是500個姓惑。因此在檢測到有一樣連接時(根據(jù)runId
判斷),就會去共享該sentinel
連接按脚,保留一個共享就可以了于毙,這樣就可以保證與其他5個sentinel
只建立5個連接,而不是持有500個連接辅搬,并且ping
的命令也只用發(fā)5個了唯沮。這個優(yōu)化過程也是針對sentinel
的所以instanceLink
結(jié)構(gòu)的連接共享也是只針對flags=SRI_sentinel
,其他的模式refcount
總是為1堪遂。
至此整個sentinel
的體系結(jié)構(gòu)的網(wǎng)絡(luò)構(gòu)建就完成了介蛉。
小結(jié)
- 在
sentinel
體系中有三種角色sentinel
、master
溶褪、slave
币旧。 -
sentinel
與master
的連接是通過配置文件來獲取監(jiān)聽服務(wù)器的ip+port
,sentinel
通過注冊周期性的時間事件來與master
創(chuàng)建command
與pub/sub
兩個連接竿滨。 -
sentinel
與slave
的連接信息是通過向主服務(wù)器發(fā)送info
命令而獲得佳恬,并通過周期函數(shù)遞歸來建立連接。同樣創(chuàng)建兩個連接于游。注意向slave
節(jié)點廣播的內(nèi)容是其指向的master
節(jié)點的ip、port
垫言。 -
sentinel
與sentinel
的連接中節(jié)點的發(fā)現(xiàn)是通過訂閱master
的__sentinel__:hello
頻道來發(fā)現(xiàn)的贰剥。sentinel
會通過周期函數(shù)發(fā)布hello msg
。而訂閱了該頻道的其他sentinel
節(jié)點筷频,就會收到消息而獲得其他節(jié)點的信息蚌成,并通過周期方法遞歸建立連接。但是互相之間只創(chuàng)建一個命令連接凛捏。 - 周期函數(shù)中有每10s發(fā)送一次
info
命令(主担忧、從),默認每1s發(fā)送ping
命令且可通過down-after-milliseconds
參數(shù)配置坯癣,默認1s瓶盛,間隔時間最大不超過1s,每2s廣播hello msg
示罗。 -
sentinel
狀態(tài)持久化惩猫,sentinel
會把其某些狀態(tài)信息保存在配置文件中。 -
sentinel
節(jié)點之間的連接共享化蚜点,兩者之間通過共享link
來保持只有一個連接轧房。