redis系列之sentinel結(jié)構(gòu)的網(wǎng)絡(luò)構(gòu)建

本文是結(jié)合redis的源碼來總結(jié)一些原理性內(nèi)容(如有不正確請多多指正)。重新梳理redis的sentinel的高可用解決方案在于更好的去對比理解區(qū)塊鏈的分布式存儲問題的架構(gòu)方案讯檐。理解兩種模式的優(yōu)缺點,從而更好的將不同的模式放入不同的應(yīng)用場景中去⊥ξ铮現(xiàn)在區(qū)塊鏈大火,有很多的人是為了區(qū)塊鏈而區(qū)塊鏈飘弧,更有甚者是為了資本和致富而區(qū)塊鏈识藤。從個人角度看,其分布式存儲解決方案次伶、安全性和獎勵機制及交易體制都是整個區(qū)塊鏈缺一不可的痴昧。盡管如此但還是會有很多人將其一一拆解開來。不過從技術(shù)的角度拆分出來看區(qū)塊鏈架構(gòu)方面的解決方案学少,確是有必要的和給人啟發(fā)的剪个。有關(guān)區(qū)塊鏈的分布式存儲架構(gòu)的內(nèi)容,見《區(qū)塊鏈至分布式存儲》版确,及其兩者在架構(gòu)層面的對比見《區(qū)塊鏈分布式存儲與redis分布式存儲對比》扣囊。

什么是redis的sentinel高可用解決方案

redis的sentinel的解決方案基于主從復(fù)制結(jié)構(gòu)著眼于分布式存儲容錯、容災(zāi)問題的高可用方案绒疗。以確保redis可以從容去應(yīng)對多種突發(fā)情況(比如網(wǎng)絡(luò)連接問題侵歇、宕機問題,設(shè)備故障問題等等)吓蘑。首先容錯的基礎(chǔ)就是數(shù)據(jù)備份惕虑、備份自然就離不開持久化和復(fù)制兩種方式。容錯問題在于當我們擁有了多份備份(這備份指的是復(fù)制磨镶,個人認為在某種意義上redis的主從復(fù)制結(jié)構(gòu)就是一個熱備的過程溃蔫,這種結(jié)構(gòu)一方面可以容錯,另一方面也可以根據(jù)業(yè)務(wù)特性利用來做讀寫分離琳猫,從一定程度上緩解服務(wù)大流量帶來的壓力伟叛。當被使用成為進行讀寫分離的時候就需要根據(jù)業(yè)務(wù)對于數(shù)據(jù)的一致性要求程度了。)系統(tǒng)時如何能夠自主的做出相對正確合理的選擇去應(yīng)對這些問題脐嫂,并對客戶端做到透明统刮。sentinel機制正是redis對這一問題的一種解決方案,對于配合sentinel進行master切換客戶端連接的代碼主要被實現(xiàn)在各種語言的客戶端代碼里账千,不在服務(wù)器代碼中侥蒙。

sentinel的服務(wù)架構(gòu)體系

sentinels的架構(gòu)模型

首先需要理解sentinel的幾點:

  • sentinel本身是監(jiān)督者的身份,沒有存儲功能匀奏。在整個體系中一個sentinel者或一群sentinels與主從服務(wù)架構(gòu)體系是監(jiān)督與被監(jiān)督的關(guān)系鞭衩。
  • 作為一個sentinel在整個架構(gòu)體系中有就可能有如下三種交互:sentinel與主服務(wù)器、sentinel與從服務(wù)器、sentinel與其他sentinel醋旦。
  • 既然是交互恒水,交互所需要的基本內(nèi)容對于這三種場景還是一樣的会放,首先要構(gòu)建這樣的一個交互網(wǎng)絡(luò)無可避免饲齐,需要節(jié)點的注冊與發(fā)現(xiàn)、節(jié)點之間的通信連接咧最、節(jié)點蔽嫒耍活、節(jié)點之間的通信協(xié)議等矢沿。
  • 因為角色不同所以在這個架構(gòu)體系中承擔的功能也不一樣滥搭。所以交互的內(nèi)容也不一樣。
    在理解了以上幾點之后捣鲸,我們一步步從構(gòu)建sentinel網(wǎng)絡(luò)體系到這整個體系結(jié)構(gòu)是如何來保證其高可用性來分析瑟匆。

構(gòu)建sentinel網(wǎng)絡(luò)結(jié)構(gòu)體系

初始化sentinel
  1. 啟動sentinel模式初始化使用命令redis-sentinel /path/to/sentinel.conf 或者redis-server /path/to/sentinel.conf --sentinel這個官方文檔都有介紹可參考:Redis Sentinel Documentation
  2. 啟動sentinel模式時,sentinel主要做了一下幾件事:初始化服務(wù)器栽惶、加載命令表愁溜、加載配置文件初始化、監(jiān)聽主服務(wù)器信息啟動周期函數(shù)外厂。
    如下便是server.cmain()方法的有關(guān)sentinel模式的源碼冕象,有興趣可以自己從頭到尾調(diào)試的方法可以參考另一篇博客linux上用gdb調(diào)試redis源碼Redis debugging guide
    ...
    setlocale(LC_COLLATE,"");
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    srand(time(NULL)^getpid());
    gettimeofday(&tv,NULL);
    char hashseed[16];
    getRandomHexChars(hashseed,sizeof(hashseed));
    dictSetHashFunctionSeed((uint8_t*)hashseed);
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    initServerConfig();
    moduleInitModulesSystem();

    /* Store the executable path and arguments in a safe place in order
     * to be able to restart the server later. */
    server.executable = getAbsolutePath(argv[0]);
    server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
    server.exec_argv[argc] = NULL;
    for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);

    /* We need to init sentinel right now as parsing the configuration file
     * in sentinel mode will have the effect of populating the sentinel
     * data structures with master nodes to monitor. */
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }
    ...

在代碼中可以找到與sentinel模式的相關(guān)初始化方法initSentinelConfig()、initSentinel()
sentinel.c/initSentinelConfig

/* This function overwrites a few normal Redis config default with Sentinel
 * specific defaults. */
void initSentinelConfig(void) {
    server.port = REDIS_SENTINEL_PORT;
}

sentinel.c/initSentinel

/* Perform the Sentinel mode initialization. */
void initSentinel(void) {
    unsigned int j;

    /* Remove usual Redis commands from the command table, then just add
     * the SENTINEL command. */
    dictEmpty(server.commands,NULL);
    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
        int retval;
        struct redisCommand *cmd = sentinelcmds+j;

        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
        serverAssert(retval == DICT_OK);
    }

    /* Initialize various data structures. */
    sentinel.current_epoch = 0;
    sentinel.masters = dictCreate(&instancesDictType,NULL);
    sentinel.tilt = 0;
    sentinel.tilt_start_time = 0;
    sentinel.previous_time = mstime();
    sentinel.running_scripts = 0;
    sentinel.scripts_queue = listCreate();
    sentinel.announce_ip = NULL;
    sentinel.announce_port = 0;
    sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
    memset(sentinel.myid,0,sizeof(sentinel.myid));
}

從兩段源碼中看到sentinel模式的初始化過程汁蝶,先初始化默認端口26379渐扮,然后加載命令表和初始化sentinelState
sentinel加載的命令表與普通redis模式的命令表有所不同掖棉,sentinel模式只支持的如下幾種命令,因此也就意味著sentinel架構(gòu)本身也只會用這些命令墓律。

struct redisCommand sentinelcmds[] = {
    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
    {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
    {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};

基本數(shù)據(jù)結(jié)構(gòu)
sentinelState

/* Main state. */
struct sentinelState {
    char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
    uint64_t current_epoch;         /* Current epoch. */
    dict *masters;      /* Dictionary of master sentinelRedisInstances.
                           Key is the instance name, value is the
                           sentinelRedisInstance structure pointer. */
    int tilt;           /* Are we in TILT mode? */
    int running_scripts;    /* Number of scripts in execution right now. */
    mstime_t tilt_start_time;       /* When TITL started. */
    mstime_t previous_time;         /* Last time we ran the time handler. */
    list *scripts_queue;            /* Queue of user scripts to execute. */
    char *announce_ip;  /* IP addr that is gossiped to other sentinels if
                           not NULL. */
    int announce_port;  /* Port that is gossiped to other sentinels if
                           non zero. */
    unsigned long simfailure_flags; /* Failures simulation. */
} sentinel;

sentinelRedisInstance

typedef struct sentinelRedisInstance {
    int flags;      /* See SRI_... defines */
    char *name;     /* Master name from the point of view of this sentinel. */
    char *runid;    /* Run ID of this instance, or unique ID if is a Sentinel.*/
    uint64_t config_epoch;  /* Configuration epoch. */
    sentinelAddr *addr; /* Master host. */
    instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
                                 we received a hello from this Sentinel
                                 via Pub/Sub. */
    mstime_t last_master_down_reply_time; /* Time of last reply to
                                             SENTINEL is-master-down command. */
    mstime_t s_down_since_time; /* Subjectively down since time. */
    mstime_t o_down_since_time; /* Objectively down since time. */
    mstime_t down_after_period; /* Consider it down after that period. */
    mstime_t info_refresh;  /* Time at which we received INFO output from it. */

    /* Role and the first time we observed it.
     * This is useful in order to delay replacing what the instance reports
     * with our own configuration. We need to always wait some time in order
     * to give a chance to the leader to report the new configuration before
     * we do silly things. */
    int role_reported;
    mstime_t role_reported_time;
    mstime_t slave_conf_change_time; /* Last time slave master addr changed. */

    /* Master specific. */
    dict *sentinels;    /* Other sentinels monitoring the same master. */
    dict *slaves;       /* Slaves for this master instance. */
    unsigned int quorum;/* Number of sentinels that need to agree on failure. */
    int parallel_syncs; /* How many slaves to reconfigure at same time. */
    char *auth_pass;    /* Password to use for AUTH against master & slaves. */

    /* Slave specific. */
    mstime_t master_link_down_time; /* Slave replication link down time. */
    int slave_priority; /* Slave priority according to its INFO output. */
    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
    struct sentinelRedisInstance *master; /* Master instance if it's slave. */
    char *slave_master_host;    /* Master host as reported by INFO */
    int slave_master_port;      /* Master port as reported by INFO */
    int slave_master_link_status; /* Master link status as reported by INFO */
    unsigned long long slave_repl_offset; /* Slave replication offset. */
    /* Failover */
    char *leader;       /* If this is a master instance, this is the runid of
                           the Sentinel that should perform the failover. If                        the Sentinel that should perform the failover. If
                           this is a Sentinel, this is the runid of the Sentinel
                           that this Sentinel voted as leader. */
    uint64_t leader_epoch; /* Epoch of the 'leader' field. */
    uint64_t failover_epoch; /* Epoch of the currently started failover. */
    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
    mstime_t failover_state_change_time;
    mstime_t failover_start_time;   /* Last failover attempt start time. */
    mstime_t failover_timeout;      /* Max time to refresh failover state. */
    mstime_t failover_delay_logged; /* For what failover_start_time value we
                                       logged the failover delay. */
    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
    /* Scripts executed to notify admin or reconfigure clients: when they
     * are set to NULL no script is executed. */
    char *notification_script;
    char *client_reconfig_script;
    sds info; /* cached INFO output */
} sentinelRedisInstance;

這兩個結(jié)構(gòu)體,就是sentinel核心的數(shù)據(jù)結(jié)構(gòu)幔亥。一個存儲了sentinel的自身狀態(tài)耻讽,一個存儲了master、slave及其他監(jiān)聽同一個master的sentinel應(yīng)用實例的信息紫谷。在服務(wù)啟動的時候構(gòu)建一個這樣的關(guān)系來存儲sentinel與其他三種角色的關(guān)系齐饮。如圖:

sentinel數(shù)據(jù)結(jié)構(gòu)圖.png

sentinel&master
  1. 發(fā)現(xiàn)服務(wù)
    sentinelState結(jié)構(gòu)體中發(fā)現(xiàn),有一個master的指針笤昨,這是個字典表祖驱,字典里保存是一個個指向sentinelRedisInstance實例的地址。而這個監(jiān)聽的master的ip瞒窒、port是從配置文件sentinel.conf中的配置sentinel monitor mymaster 127.0.0.1 6379 2中所解析捺僻。詳細配置說明可查詢配置文件中的注解。入口方法調(diào)用server.c/main->config,c/loadServerConfig->config.c/loadServerConfigFromString->sentinel.c/sentinelHandleConfiguration
    loadServerConfigFromString中相關(guān)加載sentinel模式的代碼如下:
 } else if (!strcasecmp(argv[0],"sentinel")) {
            /* argc == 1 is handled by main() as we need to enter the sentinel
             * mode ASAP. */
            if (argc != 1) {
                if (!server.sentinel_mode) {
                    err = "sentinel directive while not in sentinel mode";
                    goto loaderr;
                }
                err = sentinelHandleConfiguration(argv+1,argc-1);
                if (err) goto loaderr;
            }
        }

sentinel.csentinelHandleConfiguration方法解析加載監(jiān)督master配置代碼如下:

char *sentinelHandleConfiguration(char **argv, int argc) {
    sentinelRedisInstance *ri;

    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        /* monitor <name> <host> <port> <quorum> */
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)
        {
            switch(errno) {
            case EBUSY: return "Duplicated master name.";
            case ENOENT: return "Can't resolve master instance hostname.";
            case EINVAL: return "Invalid port number";
            }
        }
    } ...
    return NULL;
}

根據(jù)對monitor master的配置的解析,sentinel創(chuàng)建了存儲master信息的sentinelRedisInstance結(jié)構(gòu)匕坯。接著看到createSentinelRedisInstance方法:

sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
    sentinelRedisInstance *ri;
    sentinelAddr *addr;
    dict *table = NULL;
    char slavename[NET_PEER_ID_LEN], *sdsname;

    serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
    serverAssert((flags & SRI_MASTER) || master != NULL);

    /* Check address validity. */
    addr = createSentinelAddr(hostname,port);
    if (addr == NULL) return NULL;

    /* For slaves use ip:port as name. */
    if (flags & SRI_SLAVE) {
        anetFormatAddr(slavename, sizeof(slavename), hostname, port);
        name = slavename;
    }

    /* Make sure the entry is not duplicated. This may happen when the same
     * name for a master is used multiple times inside the configuration or
     * if we try to add multiple times a slave or sentinel with same ip/port
     * to a master. */
    if (flags & SRI_MASTER) table = sentinel.masters;
    else if (flags & SRI_SLAVE) table = master->slaves;
    else if (flags & SRI_SENTINEL) table = master->sentinels;
    sdsname = sdsnew(name);
    if (dictFind(table,sdsname)) {
        releaseSentinelAddr(addr);
        sdsfree(sdsname);
        errno = EBUSY;
        return NULL;
    }

    /* Create the instance object. */
    ri = zmalloc(sizeof(*ri));
    /* Note that all the instances are started in the disconnected state,
     * the event loop will take care of connecting them. */
    ri->flags = flags;
    ri->name = sdsname;
    ri->runid = NULL;
    ri->config_epoch = 0;
    ri->addr = addr;
    ri->link = createInstanceLink();
    ri->last_pub_time = mstime();
    ri->last_hello_time = mstime();
    ri->last_master_down_reply_time = mstime();
    ri->s_down_since_time = 0;
    ri->o_down_since_time = 0;
    ri->down_after_period = master ? master->down_after_period :
                            SENTINEL_DEFAULT_DOWN_AFTER;
    ri->master_link_down_time = 0;
    ri->auth_pass = NULL;
    ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
    ri->slave_reconf_sent_time = 0;
    ri->slave_master_host = NULL;
    ri->slave_master_port = 0;
    ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
    ri->slave_repl_offset = 0;
    ri->sentinels = dictCreate(&instancesDictType,NULL);
    ri->quorum = quorum;
    ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
    ri->master = master;
    ri->slaves = dictCreate(&instancesDictType,NULL);
    ri->info_refresh = 0;

    /* Failover state. */
    ri->leader = NULL;
    ri->leader_epoch = 0;
    ri->failover_epoch = 0;
    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
    ri->failover_state_change_time = 0;
    ri->failover_start_time = 0;
    ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
    ri->failover_delay_logged = 0;
    ri->promoted_slave = NULL;
    ri->notification_script = NULL;
    ri->client_reconfig_script = NULL;
    ri->info = NULL;

    /* Role */
    ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
    ri->role_reported_time = mstime();
    ri->slave_conf_change_time = mstime();

    /* Add into the right table. */
    dictAdd(table, ri->name, ri);
    return ri;
}

這段代碼是通用代碼束昵,用于創(chuàng)建master、slave葛峻、sentinel實例關(guān)系的锹雏,通過flags來進行區(qū)分∈踅保看到master相關(guān)的代碼:

  • 首先對傳入的ip礁遵、port進行檢驗并初始化SentinelAddr.
  • 查找dict表中是否有重名的master、如果有則返回并拋出錯誤碼EBUSY.
  • 創(chuàng)建一個實例對象采记,初始化一些配置的默認值如:InstanceLink佣耐、sentinelsslaves等等唧龄。
  • 將當前的實例對象添加進字典中兼砖。
    如便完成了監(jiān)聽master字典表的構(gòu)建。
  1. 建立與master的連接
    redis是單線程的既棺,基于事件回調(diào)來實現(xiàn)讽挟。因此還是回到server.c文件的main()方法,可以看到在加載完sentinel的配置文件后會啟動事件循環(huán)援制,代碼如下:
...
  aeSetBeforeSleepProc(server.el,beforeSleep);
  aeSetAfterSleepProc(server.el,afterSleep);
  aeMain(server.el);
  aeDeleteEventLoop(server.el);
...

在redis的事件回調(diào)中分了時間事件和文件事件戏挡,對于sentinel的一些連接心跳檢測、服務(wù)狀態(tài)檢測晨仑,sentinel的發(fā)現(xiàn)等等都是一個周期性的過程褐墅。因此創(chuàng)建連接、發(fā)送cmd獲取服務(wù)器狀態(tài)和廣播消息等這些一定都是通過時間事件來完成洪己。在debugaeMain方法途中妥凳,發(fā)現(xiàn)當運行到時間事件時進入是serverCron回調(diào)方法。其實仔細一點就會發(fā)現(xiàn)這個方法是在initServer的時候被注冊為時間事件的回調(diào)答捕,那么順藤摸瓜我們也就發(fā)現(xiàn)sentinel.c注冊的在serverCron中入口方法sentinelTimer逝钥。
initServer中注冊的時間回調(diào)

 /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

serverCron中有關(guān)sentinel的處理代碼

...
 /* Run the Sentinel timer if we are in sentinel mode. */
    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }
...

那么繼續(xù)深入sentinelTimer方法內(nèi)部很快便發(fā)現(xiàn)了創(chuàng)建連接代碼。在sentinel.c文件方法調(diào)用鏈如下sentinelTimer->sentinelHandleDictOfRedisInstances->sentinelHandleRedisInstance->sentinelReconnectInstance拱镐,這個鏈路是通用方法艘款。包括了在sentinel結(jié)構(gòu)體系中三種角色的連接創(chuàng)建。
sentinelReconnectInstance代碼:

/* Create the async connections for the instance link if the link
 * is disconnected. Note that link->disconnected is true even if just
 * one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    if (ri->link->disconnected == 0) return;
    if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
    instanceLink *link = ri->link;
    mstime_t now = mstime();

    if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
    ri->link->last_reconn_time = now;

    /* Commands connection. */
    if (link->cc == NULL) {
        link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        if (link->cc->err) {
            sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
                link->cc->errstr);
            instanceLinkCloseConnection(link,link->cc);
        } else {
            link->pending_commands = 0;
            link->cc_conn_time = mstime();
            link->cc->data = link;
            redisAeAttach(server.el,link->cc);
            redisAsyncSetConnectCallback(link->cc,
                    sentinelLinkEstablishedCallback);
            redisAsyncSetDisconnectCallback(link->cc,
                    sentinelDisconnectCallback);
            sentinelSendAuthIfNeeded(ri,link->cc);
            sentinelSetClientName(ri,link->cc,"cmd");

            /* Send a PING ASAP when reconnecting. */
            sentinelSendPing(ri);
        }
    }
    /* Pub / Sub */
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
        link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
        if (link->pc->err) {
            sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
                link->pc->errstr);
            instanceLinkCloseConnection(link,link->pc);
        } else {
            int retval;

            link->pc_conn_time = mstime();
            link->pc->data = link;
            redisAeAttach(server.el,link->pc);
            redisAsyncSetConnectCallback(link->pc,
                    sentinelLinkEstablishedCallback);
            redisAsyncSetDisconnectCallback(link->pc,
                    sentinelDisconnectCallback);
            sentinelSendAuthIfNeeded(ri,link->pc);
            sentinelSetClientName(ri,link->pc,"pubsub");
            /* Now we subscribe to the Sentinels "Hello" channel. */
            retval = redisAsyncCommand(link->pc,
                sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
                    SENTINEL_HELLO_CHANNEL);
            if (retval != C_OK) {
                /* If we can't subscribe, the Pub/Sub connection is useless
                 * and we can simply disconnect it and try again. */
                instanceLinkCloseConnection(link,link->pc);
                return;
            }
        }
    }
    /* Clear the disconnected status only if we have both the connections
     * (or just the commands connection if this is a sentinel instance). */
    if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
        link->disconnected = 0;
}

對于master沃琅,sentinel會創(chuàng)建兩個連接哗咆,一個是用于發(fā)送command而另一個是廣播pub/sub的連接。

  • commands連接創(chuàng)建完后益眉,注冊連接回調(diào)處理方法sentinelLinkEstablishedCallback晌柬、連接斷開回調(diào)處理方法sentinelDisconnectCallback,還有master連接需要權(quán)限驗證的方法sentinelSendAuthIfNeeded該方法在連接建立會后發(fā)送Auth pwd命令驗證權(quán)限姥份,然后設(shè)置客戶端的名字為cmd,最后給master發(fā)送一個ping命令,來測試這個command命令年碘。
  • pub/sub連接創(chuàng)建之后的內(nèi)容和command一致澈歉,但最后sentinel會發(fā)送SUBSCRIBE __sentinel__:hello命令來訂閱這個頻道,并注冊sentinelReceiveHelloMessages函數(shù)處理該頻道廣播回的消息,主要用于發(fā)現(xiàn)網(wǎng)絡(luò)之中其他監(jiān)聽該mastersentinels屿衅。

問題:為什么需要創(chuàng)建兩個連接埃难,而不用同一個連接呢?
答: 有一種解釋是為了防止command連接斷開時傲诵,丟失廣播的消息凯砍。但個人認為理由有點牽強。

sentinel&slave:
  1. 發(fā)現(xiàn)從服務(wù)器
    sentinelHandleRedisInstance的代碼中
/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    /* ========== MONITORING HALF ============ */
    /* Every kind of instance */
    sentinelReconnectInstance(ri);
    sentinelSendPeriodicCommands(ri);

    /* ============== ACTING HALF ============= */
    /* We don't proceed with the acting half if we are in TILT mode.
     * TILT happens when we find something odd with the time, like a
     * sudden change in the clock. */
    if (sentinel.tilt) {
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
        sentinel.tilt = 0;
        sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
    }

    /* Every kind of instance */
    sentinelCheckSubjectivelyDown(ri);

    /* Masters and slaves */
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
        /* Nothing so far. */
    }

    /* Only masters */
    if (ri->flags & SRI_MASTER) {
        sentinelCheckObjectivelyDown(ri);
        if (sentinelStartFailoverIfNeeded(ri))
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

發(fā)現(xiàn)與主服務(wù)器創(chuàng)建完連接之后拴竹,就會運行一個周期函數(shù)sentinelSendPeriodicCommands代碼如下:

/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
 * the specified master or slave instance. */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;

    /* Return ASAP if we have already a PING or INFO already pending, or
     * in the case the instance is not properly connected. */
    if (ri->link->disconnected) return;

    /* For INFO, PING, PUBLISH that are not critical commands to send we
     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
     * want to use a lot of memory just because a link is not working
     * properly (note that anyway there is a redundant protection about this,
     * that is, the link will be disconnected and reconnected if a long
     * timeout condition is detected. */
    if (ri->link->pending_commands >=
        SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

    /* If this is a slave of a master in O_DOWN condition we start sending
     * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
     * period. In this state we want to closely monitor slaves in case they
     * are turned into masters by another Sentinel, or by the sysadmin.
     *
     * Similarly we monitor the INFO output more often if the slave reports
     * to be disconnected from the master, so that we can have a fresh
     * disconnection time figure. */
    if ((ri->flags & SRI_SLAVE) &&
        ((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
         (ri->master_link_down_time != 0)))
    {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }

    /* We ping instances every time the last received pong is older than
     * the configured 'down-after-milliseconds' time, but every second
     * anyway if 'down-after-milliseconds' is greater than 1 second. */
    ping_period = ri->down_after_period;
    if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

    /* Send INFO to masters and slaves, not sentinels. */
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "INFO");
        if (retval == C_OK) ri->link->pending_commands++;
    }

    /* Send PING to all the three kinds of instances. */
    if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri);
    }

    /* PUBLISH hello messages to all the three kinds of instances. */
    if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
        sentinelSendHello(ri);
    }
}

在這段周期代碼中,sentinel平時狀態(tài)下,每10s就會發(fā)送一個info命令剧罩,默認每1s發(fā)送ping命令且down-after-milliseconds參數(shù)可配栓拜,每2s廣播hello msg。先看info命令的返回:

127.0.0.1:6379> info
# Server
redis_version:4.0.10
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:564e829c2a2c36f6
redis_mode:standalone
os:Linux 4.4.0-17134-Microsoft x86_64
arch_bits:64
multiplexing_api:epoll
atomicvar_api:atomic-builtin
gcc_version:5.4.0
process_id:27
run_id:f137124d98e21709eaa1def3b192c152a2500750
tcp_port:6379
uptime_in_seconds:339
uptime_in_days:0
hz:10
lru_clock:4977260
executable:/home/jane-zhang/redis-server
config_file:/svr/redis_config/redis_6379.conf

# Clients
connected_clients:5
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0

# Memory
used_memory:2022832
used_memory_human:1.93M
used_memory_rss:2912256
used_memory_rss_human:2.78M
used_memory_peak:2082832
used_memory_peak_human:1.99M
used_memory_peak_perc:97.12%
used_memory_overhead:1985938
used_memory_startup:786584
used_memory_dataset:36894
used_memory_dataset_perc:2.98%
total_system_memory:17048510464
total_system_memory_human:15.88G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
mem_fragmentation_ratio:1.44
mem_allocator:jemalloc-4.0.3
active_defrag_running:0
lazyfree_pending_objects:0

# Persistence
loading:0
rdb_changes_since_last_save:0
rdb_bgsave_in_progress:0
rdb_last_save_time:1531703643
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:0
rdb_current_bgsave_time_sec:-1
rdb_last_cow_size:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
aof_last_cow_size:0

# Stats
total_connections_received:9
total_commands_processed:1445
instantaneous_ops_per_sec:5
total_net_input_bytes:66681
total_net_output_bytes:328934
instantaneous_input_kbps:0.28
instantaneous_output_kbps:0.66
rejected_connections:0
sync_full:2
sync_partial_ok:0
sync_partial_err:2
expired_keys:0
expired_stale_perc:0.00
expired_time_cap_reached_count:0
evicted_keys:0
keyspace_hits:0
keyspace_misses:0
pubsub_channels:1
pubsub_patterns:0
latest_fork_usec:1527
migrate_cached_sockets:0
slave_expires_tracked_keys:0
active_defrag_hits:0
active_defrag_misses:0
active_defrag_key_hits:0
active_defrag_key_misses:0

# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6381,state=online,offset=36201,lag=1
slave1:ip=127.0.0.1,port=6380,state=online,offset=36334,lag=0
master_replid:5d4684d94bde70a56746ea1c4c30cccd00df7f56
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:36334
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:36334

# CPU
used_cpu_sys:0.33
used_cpu_user:0.16
used_cpu_sys_children:0.03
used_cpu_user_children:0.00

# Cluster
cluster_enabled:0

# Keyspace

看到向master發(fā)送的info命令返回結(jié)果的# Replication section里就有關(guān)于slave的信息惠昔。info的命令的回調(diào)鏈路sentinel.c/sentinelInfoReplyCallback->sentinel.c/sentinelRefreshInstanceInfo如下:

/* Process the INFO output from masters. */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    sds *lines;
    int numlines, j;
    int role = 0;

    /* cache full INFO output for instance */
    sdsfree(ri->info);
    ri->info = sdsnew(info);

    /* The following fields must be reset to a given value in the case they
     * are not found at all in the INFO output. */
    ri->master_link_down_time = 0;

    /* Process line by line. */
    lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
    for (j = 0; j < numlines; j++) {
        sentinelRedisInstance *slave;
        sds l = lines[j];

        /* run_id:<40 hex chars>*/
        if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
            if (ri->runid == NULL) {
                ri->runid = sdsnewlen(l+7,40);
            } else {
                if (strncmp(ri->runid,l+7,40) != 0) {
                    sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
                    sdsfree(ri->runid);
                    ri->runid = sdsnewlen(l+7,40);
                }
            }
        }

        /* old versions: slave0:<ip>,<port>,<state>
         * new versions: slave0:ip=127.0.0.1,port=9999,... */
        if ((ri->flags & SRI_MASTER) &&
            sdslen(l) >= 7 &&
            !memcmp(l,"slave",5) && isdigit(l[5]))
        {
            char *ip, *port, *end;

            if (strstr(l,"ip=") == NULL) {
                /* Old format. */
                ip = strchr(l,':'); if (!ip) continue;
                ip++; /* Now ip points to start of ip address. */
                port = strchr(ip,','); if (!port) continue;
                *port = '\0'; /* nul term for easy access. */
                port++; /* Now port points to start of port number. */
                end = strchr(port,','); if (!end) continue;
                *end = '\0'; /* nul term for easy access. */
            } else {
                /* New format. */
                ip = strstr(l,"ip="); if (!ip) continue;
                ip += 3; /* Now ip points to start of ip address. */
                port = strstr(l,"port="); if (!port) continue;
                port += 5; /* Now port points to start of port number. */
                /* Nul term both fields for easy access. */
                end = strchr(ip,','); if (end) *end = '\0';
                end = strchr(port,','); if (end) *end = '\0';
            }

            /* Check if we already have this slave into our table,
             * otherwise add it. */
            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
                            atoi(port), ri->quorum, ri)) != NULL)
                {
                    sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
                    sentinelFlushConfig();
                }
            }
        }

        /* master_link_down_since_seconds:<seconds> */
        if (sdslen(l) >= 32 &&
            !memcmp(l,"master_link_down_since_seconds",30))
        {
            ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
        }

        /* role:<role> */
        if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
        else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;

        if (role == SRI_SLAVE) {
            /* master_host:<host> */
            if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
                if (ri->slave_master_host == NULL ||
                    strcasecmp(l+12,ri->slave_master_host))
                {
                    sdsfree(ri->slave_master_host);
                    ri->slave_master_host = sdsnew(l+12);
                    ri->slave_conf_change_time = mstime();
                }
            }

            /* master_port:<port> */
            if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
                int slave_master_port = atoi(l+12);

                if (ri->slave_master_port != slave_master_port) {
                    ri->slave_master_port = slave_master_port;
                    ri->slave_conf_change_time = mstime();
                }
            }

            /* master_link_status:<status> */
            if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
                ri->slave_master_link_status =
                    (strcasecmp(l+19,"up") == 0) ?
                    SENTINEL_MASTER_LINK_STATUS_UP :
                    SENTINEL_MASTER_LINK_STATUS_DOWN;
            }

            /* slave_priority:<priority> */
            if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
                ri->slave_priority = atoi(l+15);

            /* slave_repl_offset:<offset> */
            if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
                ri->slave_repl_offset = strtoull(l+18,NULL,10);
        }
    }
    ri->info_refresh = mstime();
    sdsfreesplitres(lines,numlines);

    /* ---------------------------- Acting half -----------------------------
     * Some things will not happen if sentinel.tilt is true, but some will
     * still be processed. */

    /* Remember when the role changed. */
    if (role != ri->role_reported) {
        ri->role_reported_time = mstime();
        ri->role_reported = role;
        if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
        /* Log the event with +role-change if the new role is coherent or
         * with -role-change if there is a mismatch with the current config. */
        sentinelEvent(LL_VERBOSE,
            ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
            "+role-change" : "-role-change",
            ri, "%@ new reported role is %s",
          role == SRI_MASTER ? "master" : "slave",
            ri->flags & SRI_MASTER ? "master" : "slave");
    }

    /* None of the following conditions are processed when in tilt mode, so
     * return asap. */
    if (sentinel.tilt) return;

    /* Handle master -> slave role switch. */
    if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
        /* Nothing to do, but masters claiming to be slaves are
         * considered to be unreachable by Sentinel, so eventually
         * a failover will be triggered. */
    }

    /* Handle slave -> master role switch. */
    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        /* If this is a promoted slave we can change state to the
         * failover state machine. */
        if ((ri->flags & SRI_PROMOTED) &&
            (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
            /* Now that we are sure the slave was reconfigured as a master
             * set the master configuration epoch to the epoch we won the
             * election to perform this failover. This will force the other
             * Sentinels to update their config (assuming there is not
             * a newer one already available). */
            ri->master->config_epoch = ri->master->failover_epoch;
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            ri->master->failover_state_change_time = mstime();
            sentinelFlushConfig();
            sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
            if (sentinel.simfailure_flags &
                SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
                sentinelSimFailureCrash();
            sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
                ri->master,"%@");
            sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
                "start",ri->master->addr,ri->addr);
            sentinelForceHelloUpdateForMaster(ri->master);
        } else {
            /* A slave turned into a master. We want to force our view and
             * reconfigure as slave. Wait some time after the change before
             * going forward, to receive new configs if any. */
            mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;

            if (!(ri->flags & SRI_PROMOTED) &&
                 sentinelMasterLooksSane(ri->master) &&
     sentinelRedisInstanceNoDownFor(ri,wait_time) &&
                 mstime() - ri->role_reported_time > wait_time)
            {
                int retval = sentinelSendSlaveOf(ri,
                        ri->master->addr->ip,
                        ri->master->addr->port);
                if (retval == C_OK)
                    sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
            }
        }
    }

    /* Handle slaves replicating to a different master address. */
    if ((ri->flags & SRI_SLAVE) &&
        role == SRI_SLAVE &&
        (ri->slave_master_port != ri->master->addr->port ||
         strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
    {
        mstime_t wait_time = ri->master->failover_timeout;

        /* Make sure the master is sane before reconfiguring this instance
         * into a slave. */
        if (sentinelMasterLooksSane(ri->master) &&
            sentinelRedisInstanceNoDownFor(ri,wait_time) &&
            mstime() - ri->slave_conf_change_time > wait_time)
        {
            int retval = sentinelSendSlaveOf(ri,
                    ri->master->addr->ip,
                    ri->master->addr->port);
            if (retval == C_OK)
                sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
        }
    }

    /* Detect if the slave that is in the process of being reconfigured
     * changed state. */
    if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
        (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
    {
        /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
        if ((ri->flags & SRI_RECONF_SENT) &&
            ri->slave_master_host &&
            strcmp(ri->slave_master_host,
                    ri->master->promoted_slave->addr->ip) == 0 &&
            ri->slave_master_port == ri->master->promoted_slave->addr->port)
        {
            ri->flags &= ~SRI_RECONF_SENT;
            ri->flags |= SRI_RECONF_INPROG;
     sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
        }

        /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
        if ((ri->flags & SRI_RECONF_INPROG) &&
            c == SENTINEL_MASTER_LINK_STATUS_UP)
        {
            ri->flags &= ~SRI_RECONF_INPROG;
            ri->flags |= SRI_RECONF_DONE;
            sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
        }
    }
}

由于info命令返回結(jié)果內(nèi)容繁多幕与、新舊版本格式兼容、以及tilt模式和故障轉(zhuǎn)移時master與slave角色對換的處理過程復(fù)雜镇防,所以該方法也巨長啦鸣,這里先只關(guān)注與slave信息獲取有關(guān)的主要內(nèi)容。

...
/* old versions: slave0:<ip>,<port>,<state>
         * new versions: slave0:ip=127.0.0.1,port=9999,... */
        if ((ri->flags & SRI_MASTER) &&
            sdslen(l) >= 7 &&
            !memcmp(l,"slave",5) && isdigit(l[5]))
        {
            char *ip, *port, *end;

            if (strstr(l,"ip=") == NULL) {
                /* Old format. */
                ip = strchr(l,':'); if (!ip) continue;
                ip++; /* Now ip points to start of ip address. */
                port = strchr(ip,','); if (!port) continue;
                *port = '\0'; /* nul term for easy access. */
                port++; /* Now port points to start of port number. */
                end = strchr(port,','); if (!end) continue;
                *end = '\0'; /* nul term for easy access. */
            } else {
                /* New format. */
                ip = strstr(l,"ip="); if (!ip) continue;
                ip += 3; /* Now ip points to start of ip address. */
                port = strstr(l,"port="); if (!port) continue;
                port += 5; /* Now port points to start of port number. */
                /* Nul term both fields for easy access. */
                end = strchr(ip,','); if (end) *end = '\0';
                end = strchr(port,','); if (end) *end = '\0';
            }

            /* Check if we already have this slave into our table,
             * otherwise add it. */
            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
                            atoi(port), ri->quorum, ri)) != NULL)
                {
                    sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
                    sentinelFlushConfig();
                }
            }
        }
...

在解析完# Replication中的slave0:ip=127.0.0.1,port=6381,state=online,offset=36201,lag=1后来氧,檢查并查找該slave信息是否已經(jīng)存在表中诫给,沒有就創(chuàng)建一個SentinelRedisInstance結(jié)構(gòu)存儲信息并添加進slave dict中,最后slave的信息保存至配置文件啦扬。
2.創(chuàng)建連接
在第一次對master初始化完獲得slave的信息之后中狂。在下一個周期,通過sentinelHandleDictOfRedisInstances方法的遞歸便可以用和主服務(wù)器建立連接同樣的方法建立commandpub/sub兩個連接,并用ping命令來監(jiān)測心跳扑毡,info命令來更新slaves的信息胃榕。

 /* Perform scheduled operations for all the instances in the dictionary.
 * Recursively call the function against dictionaries of slaves. */
void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    /* There are a number of things we need to perform against every master. */
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);

        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}
sentinel&sentinel:
  1. 發(fā)現(xiàn)其他sentinel
    在與masterslave的連接中會有一條pub/sub的連接,都訂閱了相同master__sentinel__:hello頻道瞄摊,在上面的周期方法也看到每隔2秒鐘sentinel便會向master的頻道廣播hello消息勋又。那也就意味著,如果有兩個sentinel同時監(jiān)聽同一個master時换帜,這兩個sentinel會收到互相廣播的信息楔壤,而這個信息的內(nèi)容就可以用來傳播自身的信息,從而讓其知道對方的存在膜赃。這個消息的實際處理方法如下:
/* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
 * instance in order to broadcast the current configuraiton for this
 * master, and to advertise the existence of this Sentinel at the same time.
 *
 * The message has the following format:
 *
 * sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
 * master_name,master_ip,master_port,master_config_epoch.
 *
 * Returns C_OK if the PUBLISH was queued correctly, otherwise
 * C_ERR is returned. */
int sentinelSendHello(sentinelRedisInstance *ri) {
    char ip[NET_IP_STR_LEN];
    char payload[NET_IP_STR_LEN+1024];
    int retval;
    char *announce_ip;
    int announce_port;
    sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
    sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

    if (ri->link->disconnected) return C_ERR;

    /* Use the specified announce address if specified, otherwise try to
     * obtain our own IP address. */
    if (sentinel.announce_ip) {
        announce_ip = sentinel.announce_ip;
    } else {
        if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
            return C_ERR;
        announce_ip = ip;
    }
    announce_port = sentinel.announce_port ?
                    sentinel.announce_port : server.port;

    /* Format and send the Hello message. */
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, sentinel.myid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,master_addr->ip,master_addr->port,
        (unsigned long long) master->config_epoch);
    retval = redisAsyncCommand(ri->link->cc,
        sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
            SENTINEL_HELLO_CHANNEL,payload);
    if (retval != C_OK) return C_ERR;
    ri->link->pending_commands++;
    return C_OK;
}

廣播的消息內(nèi)容格式為sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch.這條消息被廣播給所有訂閱這個頻道的節(jié)點挺邀,包括發(fā)送者本身也會收到。那么對于收到這條廣播信息的sentinel節(jié)點會怎么處理呢?上面解說sentinel在與主服務(wù)器建立pub/sub連接時端铛,就注冊回調(diào)方法sentinelReceiveHelloMessages->sentinelProcessHelloMessage

/* Process an hello message received via Pub/Sub in master or slave instance,
 * or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
 *
 * If the master name specified in the message is not known, the message is
 * discarded. */
void sentinelProcessHelloMessage(char *hello, int hello_len) {
    /* Format is composed of 8 tokens:
     * 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
     * 5=master_ip,6=master_port,7=master_config_epoch. */
    int numtokens, port, removed, master_port;
    uint64_t current_epoch, master_config_epoch;
    char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
    sentinelRedisInstance *si, *master;

    if (numtokens == 8) {
        /* Obtain a reference to the master this hello message is about */
        master = sentinelGetMasterByName(token[4]);
        if (!master) goto cleanup; /* Unknown master, skip the message. */

        /* First, try to see if we already have this sentinel. */
        port = atoi(token[1]);
        master_port = atoi(token[6]);
        si = getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels,token[0],port,token[2]);
        current_epoch = strtoull(token[3],NULL,10);
        master_config_epoch = strtoull(token[7],NULL,10);

        if (!si) {
            /* If not, remove all the sentinels that have the same runid
             * because there was an address change, and add the same Sentinel
             * with the new address back. */
            removed = removeMatchingSentinelFromMaster(master,token[2]);
            if (removed) {
                sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
                    "%@ ip %s port %d for %s", token[0],port,token[2]);
            } else {
                /* Check if there is another Sentinel with the same address this
                 * new one is reporting. What we do if this happens is to set its
                 * port to 0, to signal the address is invalid. We'll update it
                 * later if we get an HELLO message. */
                sentinelRedisInstance *other =
                    getSentinelRedisInstanceByAddrAndRunID(
                        master->sentinels, token[0],port,NULL);
                if (other) {
                    sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
                    other->addr->port = 0; /* It means: invalid address. */
                    sentinelUpdateSentinelAddressInAllMasters(other);
                }
            }

            /* Add the new sentinel. */
            si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
                            token[0],port,master->quorum,master);

            if (si) {
                if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
                /* The runid is NULL after a new instance creation and
                 * for Sentinels we don't have a later chance to fill it,
                 * so do it now. */
                si->runid = sdsnew(token[2]);
                sentinelTryConnectionSharing(si);
                if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
                sentinelFlushConfig();
            }
        }

        /* Update local current_epoch if received current_epoch is greater.*/
        if (current_epoch > sentinel.current_epoch) {
            sentinel.current_epoch = current_epoch;
            sentinelFlushConfig();
            sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
                (unsigned long long) sentinel.current_epoch);
        }

        /* Update master info if received configuration is newer. */
        if (si && master->config_epoch < master_config_epoch) {
            master->config_epoch = master_config_epoch;
            if (master_port != master->addr->port ||
                strcmp(master->addr->ip, token[5]))
            {
                sentinelAddr *old_addr;

                sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
                sentinelEvent(LL_WARNING,"+switch-master",
                    master,"%s %s %d %s %d",
                    master->name,
                    master->addr->ip, master->addr->port,
                    token[5], master_port);

                old_addr = dupSentinelAddr(master->addr);
                sentinelResetMasterAndChangeAddress(master, token[5], master_port);
                sentinelCallClientReconfScript(master,
                    SENTINEL_OBSERVER,"start",
                    old_addr,master->addr);
                releaseSentinelAddr(old_addr);
            }
        }

        /* Update the state of the Sentinel. */
        if (si) si->last_hello_time = mstime();
    }

cleanup:
    sdsfreesplitres(token,numtokens);
}

在獲得publish消息后,

  • 將消息按照“,”號分割開來泣矛。
  • 查找并獲取master,如果master不在監(jiān)聽列表便跳過該消息,反之下一步禾蚕。
  • 檢查自己是否已經(jīng)記錄該sentinel節(jié)點的記錄您朽,根據(jù)runIdip查找。如果就沒有就先移除掉原來有相同runIdSentinelRedisInstance换淆,因為可能是節(jié)點的地址變了哗总,需要添加節(jié)點以新地址。反之下一步倍试。
  • 檢查有沒有ip和port一樣的sentinel正在運行讯屈,它的端口至為0,設(shè)為無效。等待下一輪hello消息的更新县习。
  • 根據(jù)接收到的sentinel信息創(chuàng)建一個新的SentinelRedisInstance結(jié)構(gòu)涮母,并填充runId。這個執(zhí)行有一個比較有意思的優(yōu)化點就是sentinelTryConnectionSharing方法躁愿。
  • sentinel節(jié)點信息保存至配置文件叛本。
  • 當其他節(jié)點的當前紀元大于自己的紀元時,修改統(tǒng)一并保存至配置文件彤钟。
  • 當節(jié)點保存的master信息的配置紀元小于其他節(jié)點時来候,更新master的配置紀元和其ip和port。所以切記不同sentinel監(jiān)聽同一個master時配置的名字不能不一致逸雹。
  • 最后更新上次hello的時間营搅。
    通過處理hello msg,就解決了其他sentinel節(jié)點的發(fā)現(xiàn)。和節(jié)點之間master配置和紀元的同步一致性問題峡眶,所有的紀元統(tǒng)一使用最新的剧防。
  1. 建立連接
  • 與slave的連接建立一樣,sentinelsentinel互相的連接建立也是在周期方法中遞歸調(diào)用創(chuàng)建的辫樱,值得一提的是sentinel互相之間只有一個命令連接而沒有pub/sub連接代碼見上面的sentinelReconnectInstance方法峭拘。
  • 在建立sentinel其中還有一個優(yōu)化點sentinelTryConnectionSharing方法,在上面代碼中也有提到狮暑,這里提出來分析一下解釋一下什么叫連接共享鸡挠,該方法的代碼如下:
/* This function will attempt to share the instance link we already have
 * for the same Sentinel in the context of a different master, with the
 * instance we are passing as argument.
 *
 * This way multiple Sentinel objects that refer all to the same physical
 * Sentinel instance but in the context of different masters will use
 * a single connection, will send a single PING per second for failure
 * detection and so forth.
 *
 * Return C_OK if a matching Sentinel was found in the context of a
 * different master and sharing was performed. Otherwise C_ERR
 * is returned. */
int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
    serverAssert(ri->flags & SRI_SENTINEL);
    dictIterator *di;
    dictEntry *de;

    if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
    if (ri->link->refcount > 1) return C_ERR; /* Already shared. */

    di = dictGetIterator(sentinel.masters);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *master = dictGetVal(de), *match;
        /* We want to share with the same physical Sentinel referenced
         * in other masters, so skip our master. */
        if (master == ri->master) continue;
        match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
                                                       NULL,0,ri->runid);
        if (match == NULL) continue; /* No match. */
        if (match == ri) continue; /* Should never happen but... safer. */

        /* We identified a matching Sentinel, great! Let's free our link
         * and use the one of the matching Sentinel. */
        releaseInstanceLink(ri->link,NULL);
        ri->link = match->link;
        match->link->refcount++;
        return C_OK;
    }
    dictReleaseIterator(di);
    return C_ERR;
}

在解說這個方法之前先需要給出一個比較重要的數(shù)據(jù)結(jié)sentinelLink

/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
 * monitoring many masters, we have different instances representing the
 * same Sentinels, one per master, and we need to share the hiredis connections
 * among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
 * 500 outgoing connections instead of 5.
 * So this structure represents a reference counted link in terms of the two
 * hiredis connections for commands and Pub/Sub, and the fields needed for
 * failure detection, since the ping/pong time are now local to the link: if
 * the link is available, the instance is avaialbe. This way we don't just
 * have 5 connections instead of 500, we also send 5 pings instead of 500.
 *
 * Links are shared only for Sentinels: master and slave instances have
 * a link with refcount = 1, always. */
typedef struct instanceLink {
    int refcount;          /* Number of sentinelRedisInstance owners. */
    int disconnected;      /* Non-zero if we need to reconnect cc or pc. */
    int pending_commands;  /* Number of commands sent waiting for a reply. */
    redisAsyncContext *cc; /* Hiredis context for commands. */
    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
    mstime_t cc_conn_time; /* cc connection time. */
    mstime_t pc_conn_time; /* pc connection time. */
    mstime_t pc_last_activity; /* Last time we received any message. */
    mstime_t last_avail_time; /* Last time the instance replied to ping with
                                 a reply we consider valid. */
    mstime_t act_ping_time;   /* Time at which the last pending ping (no pong
                                 received after it) was sent. This field is
                                 set to 0 when a pong is received, and set again
                                 to the current time if the value is 0 and a new
                                 ping is sent. */
    mstime_t last_ping_time;  /* Time at which we sent the last ping. This is
                                 only used to avoid sending too many pings
                                 during failure. Idle time is computed using
                                 the act_ping_time field. */
    mstime_t last_pong_time;  /* Last time the instance replied to ping,
                                 whatever the reply was. That's used to check
                                 if the link is idle and must be reconnected. */
    mstime_t last_reconn_time;  /* Last reconnection attempt performed when
                                   the link was down. */
} instanceLink;

正如這個方法和數(shù)據(jù)結(jié)構(gòu)的注釋所描述的一樣,如果一個sentinel集群搬男,它們同時監(jiān)聽著同樣的一批master拣展,如:除了自身還有其他5個sentinel共同監(jiān)聽100個master的話,按照通過master查找sentinel節(jié)點循環(huán)來創(chuàng)建的連接的方式,就可能與其他5個sentinel建立500個連接缔逛,但實際上只要5個連接就可以了备埃,但是sentinelReconnectInstance結(jié)構(gòu)體還是500個姓惑。因此在檢測到有一樣連接時(根據(jù)runId判斷),就會去共享該sentinel連接按脚,保留一個共享就可以了于毙,這樣就可以保證與其他5個sentinel只建立5個連接,而不是持有500個連接辅搬,并且ping的命令也只用發(fā)5個了唯沮。這個優(yōu)化過程也是針對sentinel的所以instanceLink結(jié)構(gòu)的連接共享也是只針對flags=SRI_sentinel,其他的模式refcount總是為1堪遂。
至此整個sentinel的體系結(jié)構(gòu)的網(wǎng)絡(luò)構(gòu)建就完成了介蛉。

小結(jié)

  1. sentinel體系中有三種角色sentinelmaster溶褪、slave币旧。
  2. sentinelmaster的連接是通過配置文件來獲取監(jiān)聽服務(wù)器的ip+portsentinel通過注冊周期性的時間事件來與master創(chuàng)建commandpub/sub兩個連接竿滨。
  3. sentinelslave的連接信息是通過向主服務(wù)器發(fā)送info命令而獲得佳恬,并通過周期函數(shù)遞歸來建立連接。同樣創(chuàng)建兩個連接于游。注意向slave節(jié)點廣播的內(nèi)容是其指向的master節(jié)點的ip、port垫言。
  4. sentinelsentinel的連接中節(jié)點的發(fā)現(xiàn)是通過訂閱master__sentinel__:hello頻道來發(fā)現(xiàn)的贰剥。sentinel會通過周期函數(shù)發(fā)布hello msg。而訂閱了該頻道的其他sentinel節(jié)點筷频,就會收到消息而獲得其他節(jié)點的信息蚌成,并通過周期方法遞歸建立連接。但是互相之間只創(chuàng)建一個命令連接凛捏。
  5. 周期函數(shù)中有每10s發(fā)送一次info命令(主担忧、從),默認每1s發(fā)送ping命令且可通過down-after-milliseconds參數(shù)配置坯癣,默認1s瓶盛,間隔時間最大不超過1s,每2s廣播hello msg示罗。
  6. sentinel狀態(tài)持久化惩猫,sentinel會把其某些狀態(tài)信息保存在配置文件中。
  7. sentinel節(jié)點之間的連接共享化蚜点,兩者之間通過共享link來保持只有一個連接轧房。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市绍绘,隨后出現(xiàn)的幾起案子奶镶,更是在濱河造成了極大的恐慌迟赃,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件厂镇,死亡現(xiàn)場離奇詭異纤壁,居然都是意外死亡,警方通過查閱死者的電腦和手機剪撬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門摄乒,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人残黑,你說我怎么就攤上這事馍佑。” “怎么了梨水?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵拭荤,是天一觀的道長。 經(jīng)常有香客問我疫诽,道長舅世,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任奇徒,我火速辦了婚禮雏亚,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘摩钙。我一直安慰自己罢低,他們只是感情好,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布胖笛。 她就那樣靜靜地躺著网持,像睡著了一般。 火紅的嫁衣襯著肌膚如雪长踊。 梳的紋絲不亂的頭發(fā)上功舀,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機與錄音身弊,去河邊找鬼辟汰。 笑死,一個胖子當著我的面吹牛佑刷,可吹牛的內(nèi)容都是我干的莉擒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼瘫絮,長吁一口氣:“原來是場噩夢啊……” “哼涨冀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起麦萤,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤鹿鳖,失蹤者是張志新(化名)和其女友劉穎扁眯,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體翅帜,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡姻檀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了涝滴。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片绣版。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖歼疮,靈堂內(nèi)的尸體忽然破棺而出杂抽,到底是詐尸還是另有隱情,我是刑警寧澤韩脏,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布缩麸,位于F島的核電站善延,受9級特大地震影響缚够,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜域醇,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一吹散、第九天 我趴在偏房一處隱蔽的房頂上張望弧械。 院中可真熱鬧,春花似錦空民、人聲如沸梦谜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至闭树,卻和暖如春耸棒,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背报辱。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工与殃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人碍现。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓幅疼,卻偏偏與公主長得像,于是被迫代替她去往敵國和親昼接。 傳聞我的和親對象是個殘疾皇子爽篷,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355

推薦閱讀更多精彩內(nèi)容