redis cluster集群建立

系列

redis數(shù)據(jù)淘汰原理
redis過(guò)期數(shù)據(jù)刪除策略
redis server事件模型
redis cluster mget 引發(fā)的討論
redis 3.x windows 集群搭建
redis 命令執(zhí)行過(guò)程
redis string底層數(shù)據(jù)結(jié)構(gòu)
redis list底層數(shù)據(jù)結(jié)構(gòu)
redis hash底層數(shù)據(jù)結(jié)構(gòu)
redis set底層數(shù)據(jù)結(jié)構(gòu)
redis zset底層數(shù)據(jù)結(jié)構(gòu)
redis 客戶端管理
redis 主從同步-slave端
redis 主從同步-master端
redis 主從超時(shí)檢測(cè)
redis aof持久化
redis rdb持久化
redis 數(shù)據(jù)恢復(fù)過(guò)程
redis TTL實(shí)現(xiàn)原理
redis cluster集群建立
redis cluster集群選主

cluster 相關(guān)數(shù)據(jù)結(jié)構(gòu)

?在redis cluster的概念當(dāng)中有一個(gè)槽(slot)的概念屎勘,也就是說(shuō)在redis的cluster中存在2**14=16384個(gè)槽分布在集群當(dāng)中黑界,所以在宏定義當(dāng)中REDIS_CLUSTER_SLOTS的值為16384.

// 槽數(shù)量
#define REDIS_CLUSTER_SLOTS 16384
// 集群在線
#define REDIS_CLUSTER_OK 0          /* Everything looks ok */
// 集群下線
#define REDIS_CLUSTER_FAIL 1        /* The cluster can't work */
// 節(jié)點(diǎn)名字的長(zhǎng)度
#define REDIS_CLUSTER_NAMELEN 40    /* sha1 hex length */
// 集群的實(shí)際端口號(hào) = 用戶指定的端口號(hào) + REDIS_CLUSTER_PORT_INCR
#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */



?redis保存集群節(jié)點(diǎn)相關(guān)的數(shù)據(jù)結(jié)構(gòu)說(shuō)明如下:

  • 在redisServer的數(shù)據(jù)結(jié)構(gòu)當(dāng)中,我們通過(guò)clusterState的數(shù)據(jù)結(jié)構(gòu)來(lái)保存集群狀態(tài)。
  • 在clusterState的數(shù)據(jù)結(jié)構(gòu)當(dāng)中浸赫,我們通過(guò)dict *nodes保存集群的節(jié)點(diǎn)次伶,其中key為節(jié)點(diǎn)名字急波,value為clusterNode對(duì)象春宣。
  • clusterNode保存該節(jié)點(diǎn)集群信息昧辽,包括slave信息衙熔,slots信息,clusterLink的連接信息搅荞。
  • clusterLink保存是通過(guò)cluster meet命令指定的集群節(jié)點(diǎn)红氯,從名字上可以理解為集群連接相關(guān)的信息框咙,這個(gè)連接是通過(guò)cluster meet指定的。
struct redisServer {
  struct clusterState *cluster;
}

-----------------------------------華麗分割線--------------------------------------------

// 集群狀態(tài)脖隶,每個(gè)節(jié)點(diǎn)都保存著一個(gè)這樣的狀態(tài),記錄了它們眼中的集群的樣子暇检。
typedef struct clusterState {

    // 指向當(dāng)前節(jié)點(diǎn)的指針
    clusterNode *myself;  /* This node */

    // 集群當(dāng)前的配置紀(jì)元产阱,用于實(shí)現(xiàn)故障轉(zhuǎn)移
    uint64_t currentEpoch;

    // 集群當(dāng)前的狀態(tài):是在線還是下線
    int state;            /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */

    // 集群中至少處理著一個(gè)槽的節(jié)點(diǎn)的數(shù)量。
    int size;             /* Num of master nodes with at least one slot */

    // 集群節(jié)點(diǎn)名單(包括 myself 節(jié)點(diǎn))
    // 字典的鍵為節(jié)點(diǎn)的名字块仆,字典的值為 clusterNode 結(jié)構(gòu)
    dict *nodes;          /* Hash table of name -> clusterNode structures */

    // 節(jié)點(diǎn)黑名單构蹬,用于 CLUSTER FORGET 命令
    // 防止被 FORGET 的命令重新被添加到集群里面
    // (不過(guò)現(xiàn)在似乎沒(méi)有在使用的樣子,已廢棄悔据?還是尚未實(shí)現(xiàn)庄敛?)
    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

    // 記錄要從當(dāng)前節(jié)點(diǎn)遷移到目標(biāo)節(jié)點(diǎn)的槽,以及遷移的目標(biāo)節(jié)點(diǎn)
    // migrating_slots_to[i] = NULL 表示槽 i 未被遷移
    // migrating_slots_to[i] = clusterNode_A 表示槽 i 要從本節(jié)點(diǎn)遷移至節(jié)點(diǎn) A
    clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];

    // 記錄要從源節(jié)點(diǎn)遷移到本節(jié)點(diǎn)的槽科汗,以及進(jìn)行遷移的源節(jié)點(diǎn)
    // importing_slots_from[i] = NULL 表示槽 i 未進(jìn)行導(dǎo)入
    // importing_slots_from[i] = clusterNode_A 表示正從節(jié)點(diǎn) A 中導(dǎo)入槽 i
    clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];

    // 負(fù)責(zé)處理各個(gè)槽的節(jié)點(diǎn)
    // 例如 slots[i] = clusterNode_A 表示槽 i 由節(jié)點(diǎn) A 處理
    clusterNode *slots[REDIS_CLUSTER_SLOTS];

    // 跳躍表藻烤,表中以槽作為分值,鍵作為成員头滔,對(duì)槽進(jìn)行有序排序
    // 當(dāng)需要對(duì)某些槽進(jìn)行區(qū)間(range)操作時(shí)怖亭,這個(gè)跳躍表可以提供方便
    // 具體操作定義在 db.c 里面
    zskiplist *slots_to_keys;

    /* The following fields are used to take the slave state on elections. */
    // 以下這些域被用于進(jìn)行故障轉(zhuǎn)移選舉

    // 上次執(zhí)行選舉或者下次執(zhí)行選舉的時(shí)間
    mstime_t failover_auth_time; /* Time of previous or next election. */

    // 節(jié)點(diǎn)獲得的投票數(shù)量
    int failover_auth_count;    /* Number of votes received so far. */

    // 如果值為 1 ,表示本節(jié)點(diǎn)已經(jīng)向其他節(jié)點(diǎn)發(fā)送了投票請(qǐng)求
    int failover_auth_sent;     /* True if we already asked for votes. */

    int failover_auth_rank;     /* This slave rank for current auth request. */

    uint64_t failover_auth_epoch; /* Epoch of the current election. */

    /* Manual failover state in common. */
    /* 共用的手動(dòng)故障轉(zhuǎn)移狀態(tài) */

    // 手動(dòng)故障轉(zhuǎn)移執(zhí)行的時(shí)間限制
    mstime_t mf_end;            /* Manual failover time limit (ms unixtime).
                                   It is zero if there is no MF in progress. */
    /* Manual failover state of master. */
    /* 主服務(wù)器的手動(dòng)故障轉(zhuǎn)移狀態(tài) */
    clusterNode *mf_slave;      /* Slave performing the manual failover. */
    /* Manual failover state of slave. */
    /* 從服務(wù)器的手動(dòng)故障轉(zhuǎn)移狀態(tài) */
    long long mf_master_offset; /* Master offset the slave needs to start MF
                                   or zero if stil not received. */
    // 指示手動(dòng)故障轉(zhuǎn)移是否可以開(kāi)始的標(biāo)志值
    // 值為非 0 時(shí)表示各個(gè)主服務(wù)器可以開(kāi)始投票
    int mf_can_start;           /* If non-zero signal that the manual failover
                                   can start requesting masters vote. */

    /* The followign fields are uesd by masters to take state on elections. */
    /* 以下這些域由主服務(wù)器使用坤检,用于記錄選舉時(shí)的狀態(tài) */

    // 集群最后一次進(jìn)行投票的紀(jì)元
    uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */

    // 在進(jìn)入下個(gè)事件循環(huán)之前要做的事情兴猩,以各個(gè) flag 來(lái)記錄
    int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */

    // 通過(guò) cluster 連接發(fā)送的消息數(shù)量
    long long stats_bus_messages_sent;  /* Num of msg sent via cluster bus. */

    // 通過(guò) cluster 接收到的消息數(shù)量
    long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/

} clusterState;

-----------------------------------華麗分割線--------------------------------------------

// 節(jié)點(diǎn)狀態(tài)
struct clusterNode {

    // 創(chuàng)建節(jié)點(diǎn)的時(shí)間
    mstime_t ctime; /* Node object creation time. */

    // 節(jié)點(diǎn)的名字,由 40 個(gè)十六進(jìn)制字符組成
    // 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
    char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

    // 節(jié)點(diǎn)標(biāo)識(shí)
    // 使用各種不同的標(biāo)識(shí)值記錄節(jié)點(diǎn)的角色(比如主節(jié)點(diǎn)或者從節(jié)點(diǎn))早歇,
    // 以及節(jié)點(diǎn)目前所處的狀態(tài)(比如在線或者下線)倾芝。
    int flags;      /* REDIS_NODE_... */

    // 節(jié)點(diǎn)當(dāng)前的配置紀(jì)元,用于實(shí)現(xiàn)故障轉(zhuǎn)移
    uint64_t configEpoch; /* Last configEpoch observed for this node */

    // 由這個(gè)節(jié)點(diǎn)負(fù)責(zé)處理的槽
    // 一共有 REDIS_CLUSTER_SLOTS / 8 個(gè)字節(jié)長(zhǎng)
    // 每個(gè)字節(jié)的每個(gè)位記錄了一個(gè)槽的保存狀態(tài)
    // 位的值為 1 表示槽正由本節(jié)點(diǎn)處理箭跳,值為 0 則表示槽并非本節(jié)點(diǎn)處理
    // 比如 slots[0] 的第一個(gè)位保存了槽 0 的保存情況
    // slots[0] 的第二個(gè)位保存了槽 1 的保存情況晨另,以此類推
    unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */

    // 該節(jié)點(diǎn)負(fù)責(zé)處理的槽數(shù)量
    int numslots;   /* Number of slots handled by this node */

    // 如果本節(jié)點(diǎn)是主節(jié)點(diǎn),那么用這個(gè)屬性記錄從節(jié)點(diǎn)的數(shù)量
    int numslaves;  /* Number of slave nodes, if this is a master */

    // 指針數(shù)組谱姓,指向各個(gè)從節(jié)點(diǎn)
    struct clusterNode **slaves; /* pointers to slave nodes */

    // 如果這是一個(gè)從節(jié)點(diǎn)拯刁,那么指向主節(jié)點(diǎn)
    struct clusterNode *slaveof; /* pointer to the master node */

    // 最后一次發(fā)送 PING 命令的時(shí)間
    mstime_t ping_sent;      /* Unix time we sent latest ping */

    // 最后一次接收 PONG 回復(fù)的時(shí)間戳
    mstime_t pong_received;  /* Unix time we received the pong */

    // 最后一次被設(shè)置為 FAIL 狀態(tài)的時(shí)間
    mstime_t fail_time;      /* Unix time when FAIL flag was set */

    // 最后一次給某個(gè)從節(jié)點(diǎn)投票的時(shí)間
    mstime_t voted_time;     /* Last time we voted for a slave of this master */

    // 最后一次從這個(gè)節(jié)點(diǎn)接收到復(fù)制偏移量的時(shí)間
    mstime_t repl_offset_time;  /* Unix time we received offset for this node */

    // 這個(gè)節(jié)點(diǎn)的復(fù)制偏移量
    long long repl_offset;      /* Last known repl offset for this node. */

    // 節(jié)點(diǎn)的 IP 地址
    char ip[REDIS_IP_STR_LEN];  /* Latest known IP address of this node */

    // 節(jié)點(diǎn)的端口號(hào)
    int port;                   /* Latest known port of this node */

    // 保存連接節(jié)點(diǎn)所需的有關(guān)信息
    clusterLink *link;          /* TCP/IP link with this node */

    // 一個(gè)鏈表,記錄了所有其他節(jié)點(diǎn)對(duì)該節(jié)點(diǎn)的下線報(bào)告
    list *fail_reports;         /* List of nodes signaling this as failing */

}

-----------------------------------華麗分割線-------------------------------------------

// clusterLink 包含了與其他節(jié)點(diǎn)進(jìn)行通訊所需的全部信息
typedef struct clusterLink {

    // 連接的創(chuàng)建時(shí)間
    mstime_t ctime;             /* Link creation time */

    // TCP 套接字描述符
    int fd;                     /* TCP socket file descriptor */

    // 輸出緩沖區(qū)逝段,保存著等待發(fā)送給其他節(jié)點(diǎn)的消息(message)垛玻。
    sds sndbuf;                 /* Packet send buffer */

    // 輸入緩沖區(qū),保存著從其他節(jié)點(diǎn)接收到的消息奶躯。
    sds rcvbuf;                 /* Packet reception buffer */

    // 與這個(gè)連接相關(guān)聯(lián)的節(jié)點(diǎn)帚桩,如果沒(méi)有的話就為 NULL
    struct clusterNode *node;   /* Node related to this link if any, or NULL */

} clusterLink;


cluster啟動(dòng)過(guò)程

?redis cluster啟動(dòng)過(guò)程中主要完成了一下幾件事情:

  • 創(chuàng)建server.cluster的信息(是個(gè)clusterState對(duì)象),變量值都為空嘹黔。
  • 如果有server.cluster_configfile就直接初始化clusterNode對(duì)象账嚎,如果沒(méi)有就創(chuàng)建clusterNode對(duì)象并保存至server.cluster_configfile對(duì)象莫瞬,server本身的clusterNode對(duì)象保存至server.cluster->nodes和server.cluster->myself當(dāng)中。
  • 監(jiān)聽(tīng)cluster相關(guān)端口:cluster的監(jiān)聽(tīng)端口=redis 監(jiān)聽(tīng)端口+10000
  • 綁定cluster端口的讀事件到clusterAcceptHandler用于處理cluster的連接請(qǐng)求
  • clusterAcceptHandler內(nèi)部綁定accept的socket的讀事件到clusterReadHandler
  • clusterReadHandler內(nèi)部讀取數(shù)據(jù)后交由clusterProcessPacket繼續(xù)后續(xù)處理
// 初始化集群
void clusterInit(void) {
    int saveconf = 0;

    // 初始化配置
    server.cluster = zmalloc(sizeof(clusterState));
    server.cluster->myself = NULL;
    server.cluster->currentEpoch = 0;
    server.cluster->state = REDIS_CLUSTER_FAIL;
    server.cluster->size = 1;
    server.cluster->todo_before_sleep = 0;
    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
    server.cluster->nodes_black_list =
        dictCreate(&clusterNodesBlackListDictType,NULL);
    server.cluster->failover_auth_time = 0;
    server.cluster->failover_auth_count = 0;
    server.cluster->failover_auth_rank = 0;
    server.cluster->failover_auth_epoch = 0;
    server.cluster->lastVoteEpoch = 0;
    server.cluster->stats_bus_messages_sent = 0;
    server.cluster->stats_bus_messages_received = 0;
    memset(server.cluster->slots,0, sizeof(server.cluster->slots));
    clusterCloseAllSlots();

    if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR)
        exit(1);

    if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) {
        /* No configuration found. We will just use the random name provided
         * by the createClusterNode() function. */
        myself = server.cluster->myself =
            createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER);
        redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
            myself->name);
        clusterAddNode(myself);
        saveconf = 1;
    }

    // 保存 nodes.conf 文件
    if (saveconf) clusterSaveConfigOrDie(1);

    /* We need a listening TCP port for our cluster messaging needs. */
    // 監(jiān)聽(tīng) TCP 端口
    server.cfd_count = 0;

    /* Port sanity check II
     * The other handshake port check is triggered too late to stop
     * us from trying to use a too-high cluster port number. */
    if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
        redisLog(REDIS_WARNING, "Redis port number too high. "
                   "Cluster communication port is 10,000 port "
                   "numbers higher than your Redis port. "
                   "Your Redis port number must be "
                   "lower than 55535.");
        exit(1);
    }

    if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == REDIS_ERR)
    {
        exit(1);
    } else {
        int j;

        for (j = 0; j < server.cfd_count; j++) {
            // 關(guān)聯(lián)監(jiān)聽(tīng)事件處理器
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler, NULL) == AE_ERR)
                    redisPanic("Unrecoverable error creating Redis Cluster "
                                "file event.");
        }
    }

    /* The slots -> keys map is a sorted set. Init it. */
    // slots -> keys 映射是一個(gè)有序集合
    server.cluster->slots_to_keys = zslCreate();
    resetManualFailover();
}



?redis cluster之間監(jiān)聽(tīng)端口讀事件對(duì)應(yīng)的處理函數(shù)clusterAcceptHandler郭蕉,負(fù)責(zé)處理集群連接事件的回調(diào)函數(shù)疼邀。

void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    clusterLink *link;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    /* If the server is starting up, don't accept cluster connections:
     * UPDATE messages may interact with the database content. */
    if (server.masterhost == NULL && server.loading) return;

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_VERBOSE,
                    "Accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);

       
        redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}



?redis cluster之間處理socket通信報(bào)文的回調(diào)函數(shù)

// 讀事件處理器
// 首先讀入內(nèi)容的頭,以判斷讀入內(nèi)容的長(zhǎng)度
// 如果內(nèi)容是一個(gè) whole packet 召锈,那么調(diào)用函數(shù)來(lái)處理這個(gè) packet 旁振。
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[sizeof(clusterMsg)];
    ssize_t nread;
    clusterMsg *hdr;
    clusterLink *link = (clusterLink*) privdata;
    int readlen, rcvbuflen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    // 盡可能地多讀數(shù)據(jù)
    while(1) { /* Read as long as there is data to read. */

        // 檢查輸入緩沖區(qū)的長(zhǎng)度
        rcvbuflen = sdslen(link->rcvbuf);
        // 頭信息(8 字節(jié))未讀入完
        if (rcvbuflen < 8) {
            /* First, obtain the first 8 bytes to get the full message
             * length. */
            readlen = 8 - rcvbuflen;
        // 已讀入完整的信息
        } else {
            /* Finally read the full message. */
            hdr = (clusterMsg*) link->rcvbuf;
            if (rcvbuflen == 8) {
                /* Perform some sanity check on the message signature
                 * and length. */
                if (memcmp(hdr->sig,"RCmb",4) != 0 ||
                    ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
                {
                    redisLog(REDIS_WARNING,
                        "Bad message length or signature received "
                        "from Cluster bus.");
                    handleLinkIOError(link);
                    return;
                }
            }
            // 記錄已讀入內(nèi)容長(zhǎng)度
            readlen = ntohl(hdr->totlen) - rcvbuflen;
            if (readlen > sizeof(buf)) readlen = sizeof(buf);
        }

        // 讀入內(nèi)容
        nread = read(fd,buf,readlen);

        // 沒(méi)有內(nèi)容可讀
        if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */

        // 處理讀入錯(cuò)誤
        if (nread <= 0) {
            /* I/O error... */
            redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
                (nread == 0) ? "connection closed" : strerror(errno));
            handleLinkIOError(link);
            return;
        } else {
            /* Read data and recast the pointer to the new buffer. */
            // 將讀入的內(nèi)容追加進(jìn)輸入緩沖區(qū)里面
            link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
            hdr = (clusterMsg*) link->rcvbuf;
            rcvbuflen += nread;
        }

        /* Total length obtained? Process this packet. */
        // 檢查已讀入內(nèi)容的長(zhǎng)度,看是否整條信息已經(jīng)被讀入了
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            // 如果是的話涨岁,執(zhí)行處理信息的函數(shù)
            if (clusterProcessPacket(link)) {
                sdsfree(link->rcvbuf);
                link->rcvbuf = sdsempty();
            } else {
                return; /* Link no longer valid. */
            }
        }
    }
}


cluster加入集群過(guò)程

?redis cluster節(jié)點(diǎn)之間的網(wǎng)絡(luò)連接圖如下圖所示(假設(shè)總共有N個(gè)節(jié)點(diǎn))拐袜,主要特點(diǎn)是:

  • 每個(gè)節(jié)點(diǎn)都與剩余的N-1個(gè)節(jié)點(diǎn)建立連接。
  • 任意兩個(gè)節(jié)點(diǎn)之間有兩個(gè)網(wǎng)絡(luò)連接梢薪,注意是兩個(gè)網(wǎng)絡(luò)連接蹬铺。
redis cluster

?redis cluster集群節(jié)點(diǎn)之間建立連接過(guò)程如下,假設(shè)現(xiàn)在有ABCDE公5個(gè)幾點(diǎn):

  • 在節(jié)點(diǎn)A依次執(zhí)行 cluster meet B秉撇,cluster meet C甜攀,cluster meet D,cluster meet E琐馆。
  • AB赴邻,AC,AD啡捶,AE之間建立了連接
  • 節(jié)點(diǎn)BCDE在接收到A的連接請(qǐng)求后會(huì)獲取節(jié)點(diǎn)A的信息姥敛,反向在BA,CA瞎暑,DA彤敛,EA之間建立連接
  • 節(jié)點(diǎn)AB通信過(guò)程中,節(jié)點(diǎn)A會(huì)攜帶CDE的信息給節(jié)點(diǎn)B了赌,那么在BC墨榄, BD,BE之間建立連接
  • 節(jié)點(diǎn)AC通信過(guò)程中勿她,節(jié)點(diǎn)A會(huì)攜帶BDE的信息給節(jié)點(diǎn)C袄秩,那么在CB,CD逢并,CE之間建立連接
  • 節(jié)點(diǎn)AD通信過(guò)程中之剧,節(jié)點(diǎn)A會(huì)攜帶BCE的信息給節(jié)點(diǎn)D,那么在DB砍聊,DC背稼,DE之間建立連接
  • 節(jié)點(diǎn)AE通信過(guò)程中,節(jié)點(diǎn)A會(huì)攜帶BCD的信息給節(jié)點(diǎn)E玻蝌,那么在EB蟹肘,EC词疼,ED之間建立連接
    至此節(jié)點(diǎn)ABCDE之間就建立了相互之間的連接了。

這里想說(shuō)明的是兩個(gè)節(jié)點(diǎn)之間通信的時(shí)候帘腹,發(fā)起者會(huì)攜帶它自身知道的其他節(jié)點(diǎn)給對(duì)方節(jié)點(diǎn)贰盗,通過(guò)這種方式實(shí)現(xiàn)整體網(wǎng)絡(luò)的建立


cluster加入集群源碼

?cluster meet命令被用來(lái)連接不同的開(kāi)啟集群支持的 Redis 節(jié)點(diǎn),以進(jìn)入工作集群阳欲。

  • 系統(tǒng)管理員發(fā)送一個(gè)cluster meet命令強(qiáng)制一個(gè)節(jié)點(diǎn)去會(huì)面另一個(gè)節(jié)點(diǎn)舵盈。
  • 一個(gè)已知的節(jié)點(diǎn)發(fā)送一個(gè)保存在 gossip 部分的節(jié)點(diǎn)列表,包含著未知的節(jié)點(diǎn)胸完。如果接收的節(jié)點(diǎn)已經(jīng)將發(fā)送節(jié)點(diǎn)信任為已知節(jié)點(diǎn)书释,它會(huì)處理 gossip 部分并且發(fā)送一個(gè)握手消息給未知的節(jié)點(diǎn)翘贮。
    clusterCommand方法內(nèi)部有處理各種命令的邏輯赊窥,這里我只關(guān)注了處理meet命令的邏輯。

?clusterCommand內(nèi)部通過(guò)clusterStartHandshake執(zhí)行cluster meet當(dāng)中指定的地址去執(zhí)行握手協(xié)議狸页。

{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},


// CLUSTER 命令的實(shí)現(xiàn)
void clusterCommand(redisClient *c) {

    // 不能在非集群模式下使用該命令
    if (server.cluster_enabled == 0) {
        addReplyError(c,"This instance has cluster support disabled");
        return;
    }

    if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
        /* CLUSTER MEET <ip> <port> */
        // 將給定地址的節(jié)點(diǎn)添加到當(dāng)前節(jié)點(diǎn)所處的集群里面

        long long port;

        // 檢查 port 參數(shù)的合法性
        if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
            addReplyErrorFormat(c,"Invalid TCP port specified: %s",
                                (char*)c->argv[3]->ptr);
            return;
        }

        // 嘗試與給定地址的節(jié)點(diǎn)進(jìn)行連接
        if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
            errno == EINVAL)
        {
            // 連接失敗
            addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
                            (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
        } else {
            // 連接成功
            addReply(c,shared.ok);
        }
    } 
}



?redis cluster在執(zhí)行握手的函數(shù)clusterStartHandshake當(dāng)中锨能,生成待連接的clusterNode對(duì)象并保存待連接的ip:port地址,并未真正執(zhí)行連接而只是生成了待連接的對(duì)象芍耘,真正的連接是在serverCron當(dāng)中完成的址遇。

/* 
 * 如果還沒(méi)有與指定的地址進(jìn)行過(guò)握手,那么進(jìn)行握手斋竞。
 * 返回 1 表示握手已經(jīng)開(kāi)始倔约,
 * 返回 0 并將 errno 設(shè)置為以下值來(lái)表示意外情況:
 *
 * EAGAIN - There is already an handshake in progress for this address.
 *          已經(jīng)有握手在進(jìn)行中了。
 * EINVAL - IP or port are not valid. 
 *          ip 或者 port 參數(shù)不合法坝初。
 */
int clusterStartHandshake(char *ip, int port) {
    clusterNode *n;
    char norm_ip[REDIS_IP_STR_LEN];
    struct sockaddr_storage sa;

    // ip 合法性檢查
    if (inet_pton(AF_INET,ip,
            &(((struct sockaddr_in *)&sa)->sin_addr)))
    {
        sa.ss_family = AF_INET;
    } else if (inet_pton(AF_INET6,ip,
            &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
    {
        sa.ss_family = AF_INET6;
    } else {
        errno = EINVAL;
        return 0;
    }

    // port 合法性檢查
    if (port <= 0 || port > (65535-REDIS_CLUSTER_PORT_INCR)) {
        errno = EINVAL;
        return 0;
    }

    if (sa.ss_family == AF_INET)
        inet_ntop(AF_INET,
            (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
            norm_ip,REDIS_IP_STR_LEN);
    else
        inet_ntop(AF_INET6,
            (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
            norm_ip,REDIS_IP_STR_LEN);

    // 檢查節(jié)點(diǎn)是否已經(jīng)發(fā)送握手請(qǐng)求浸剩,如果是的話,那么直接返回鳄袍,防止出現(xiàn)重復(fù)握手
    if (clusterHandshakeInProgress(norm_ip,port)) {
        errno = EAGAIN;
        return 0;
    }

    // 對(duì)給定地址的節(jié)點(diǎn)設(shè)置一個(gè)隨機(jī)名字
    // 當(dāng) HANDSHAKE 完成時(shí)绢要,當(dāng)前節(jié)點(diǎn)會(huì)取得給定地址節(jié)點(diǎn)的真正名字
    // 到時(shí)會(huì)用真名替換隨機(jī)名,是在返回的pong報(bào)文當(dāng)中帶上真正的名字
    n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
    memcpy(n->ip,norm_ip,sizeof(n->ip));
    n->port = port;

    // 將節(jié)點(diǎn)添加到集群當(dāng)中
    clusterAddNode(n);

    return 1;
}

// 將給定 node 添加到節(jié)點(diǎn)表里面
int clusterAddNode(clusterNode *node) {
    int retval;
    // 將 node 添加到當(dāng)前節(jié)點(diǎn)的 nodes 表中
    // 這樣接下來(lái)當(dāng)前節(jié)點(diǎn)就會(huì)創(chuàng)建連向 node 的節(jié)點(diǎn)
    retval = dictAdd(server.cluster->nodes,
            sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
    return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
}


cluster集群發(fā)現(xiàn)過(guò)程

cluster集群發(fā)現(xiàn)過(guò)程-client端

?在serverCron當(dāng)中會(huì)調(diào)用clusterCron執(zhí)行redis cluster發(fā)現(xiàn)的邏輯拗小,整個(gè)邏輯如下:

  • 遍歷server.cluster->nodes發(fā)現(xiàn)待握手的節(jié)點(diǎn)進(jìn)行連接重罪。
  • 針對(duì)未建立連接的node創(chuàng)建對(duì)應(yīng)的ClusterLink,建立link和node之間的關(guān)聯(lián)哀九,link表示鏈接用于關(guān)聯(lián)本端的fd和遠(yuǎn)端的node剿配。
  • 通過(guò)建立的socket連接發(fā)送meet報(bào)文即通過(guò)clusterSendPing去實(shí)現(xiàn)。
  • 定期選擇一個(gè)node發(fā)送ping報(bào)文交換gossip信息
  • 清理已經(jīng)下線的redis cluster node等等
// 集群常規(guī)操作函數(shù)阅束,默認(rèn)每秒執(zhí)行 10 次(每間隔 100 毫秒執(zhí)行一次)
void clusterCron(void) {
    dictIterator *di;
    dictEntry *de;
    int update_state = 0;
    int orphaned_masters; /* How many masters there are without ok slaves. */
    int max_slaves; /* Max number of ok slaves for a single master. */
    int this_slaves; /* Number of ok slaves for our master (if we are slave). */
    mstime_t min_pong = 0, now = mstime();
    clusterNode *min_pong_node = NULL;
    // 迭代計(jì)數(shù)器惨篱,一個(gè)靜態(tài)變量
    static unsigned long long iteration = 0;
    mstime_t handshake_timeout;

    // 記錄一次迭代
    iteration++; 

    // 如果一個(gè) handshake 節(jié)點(diǎn)沒(méi)有在 handshake timeout 內(nèi)
    // 轉(zhuǎn)換成普通節(jié)點(diǎn)(normal node),
    // 那么節(jié)點(diǎn)會(huì)從 nodes 表中移除這個(gè) handshake 節(jié)點(diǎn)
    // 一般來(lái)說(shuō) handshake timeout 的值總是等于 NODE_TIMEOUT
    // 不過(guò)如果 NODE_TIMEOUT 太少的話围俘,程序會(huì)將值設(shè)為 1 秒鐘
    handshake_timeout = server.cluster_node_timeout;
    if (handshake_timeout < 1000) handshake_timeout = 1000;

    // 向集群中的所有斷線或者未連接節(jié)點(diǎn)發(fā)送消息
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);

        // 跳過(guò)當(dāng)前節(jié)點(diǎn)以及沒(méi)有地址的節(jié)點(diǎn)
        if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;

        // 如果 handshake 節(jié)點(diǎn)已超時(shí)砸讳,釋放它
        if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
            freeClusterNode(node);
            continue;
        }

        // 為未創(chuàng)建連接的節(jié)點(diǎn)創(chuàng)建連接
        if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;

            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->port+REDIS_CLUSTER_PORT_INCR,
                    server.bindaddr_count ? server.bindaddr[0] : NULL);
            if (fd == -1) {
                redisLog(REDIS_DEBUG, "Unable to connect to "
                    "Cluster Node [%s]:%d -> %s", node->ip,
                    node->port+REDIS_CLUSTER_PORT_INCR,
                    server.neterr);
                continue;
            }
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);

            // 向新連接的節(jié)點(diǎn)發(fā)送 PING 命令琢融,防止節(jié)點(diǎn)被識(shí)進(jìn)入下線
            // 如果節(jié)點(diǎn)被標(biāo)記為 MEET 预吆,那么發(fā)送 MEET 命令淤齐,否則發(fā)送 PING 命令
            old_ping_sent = node->ping_sent;
            clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
                    CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);

            // 這不是第一次發(fā)送 PING 信息,所以可以還原這個(gè)時(shí)間
            // 等 clusterSendPing() 函數(shù)來(lái)更新它
            if (old_ping_sent) {
                node->ping_sent = old_ping_sent;
            }

            /* 
             * 在發(fā)送 MEET 信息之后碑诉,清除節(jié)點(diǎn)的 MEET 標(biāo)識(shí)常遂。
             *
             * 如果當(dāng)前節(jié)點(diǎn)(發(fā)送者)沒(méi)能收到 MEET 信息的回復(fù)纳令,
             * 那么它將不再向目標(biāo)節(jié)點(diǎn)發(fā)送命令。
             *
             * 如果接收到回復(fù)的話克胳,那么節(jié)點(diǎn)將不再處于 HANDSHAKE 狀態(tài)平绩,
             * 并繼續(xù)向目標(biāo)節(jié)點(diǎn)發(fā)送普通 PING 命令。
             */
            node->flags &= ~REDIS_NODE_MEET;

            redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
                    node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
        }
    }
    dictReleaseIterator(di);

    // clusterCron() 每執(zhí)行 10 次(至少間隔一秒鐘)漠另,就向一個(gè)隨機(jī)節(jié)點(diǎn)發(fā)送 gossip 信息
    if (!(iteration % 10)) {
        int j;

        // 隨機(jī) 5 個(gè)節(jié)點(diǎn)捏雌,選出其中一個(gè)
        for (j = 0; j < 5; j++) {

            // 隨機(jī)在集群中挑選節(jié)點(diǎn)
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            // 不要 PING 連接斷開(kāi)的節(jié)點(diǎn),也不要 PING 最近已經(jīng) PING 過(guò)的節(jié)點(diǎn)
            if (this->link == NULL || this->ping_sent != 0) continue;

            if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
                continue;

            // 選出 5 個(gè)隨機(jī)節(jié)點(diǎn)中最近一次接收 PONG 回復(fù)距離現(xiàn)在最舊的節(jié)點(diǎn)
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }

        // 向最久沒(méi)有收到 PONG 回復(fù)的節(jié)點(diǎn)發(fā)送 PING 命令
        if (min_pong_node) {
            redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

    // 遍歷所有節(jié)點(diǎn)笆搓,檢查是否需要將某個(gè)節(jié)點(diǎn)標(biāo)記為下線
    orphaned_masters = 0;
    max_slaves = 0;
    this_slaves = 0;
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */
        mstime_t delay;

        // 跳過(guò)節(jié)點(diǎn)本身性湿、無(wú)地址節(jié)點(diǎn)、HANDSHAKE 狀態(tài)的節(jié)點(diǎn)
        if (node->flags &
            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
                continue;

        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);

            if (okslaves == 0 && node->numslots > 0) orphaned_masters++;
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;
        }

        // 如果等到 PONG 到達(dá)的時(shí)間超過(guò)了 node timeout 一半的連接
        // 因?yàn)楸M管節(jié)點(diǎn)依然正常满败,但連接可能已經(jīng)出問(wèn)題了
        if (node->link && /* is connected */
            now - node->link->ctime >
            server.cluster_node_timeout &&
            node->ping_sent &&
            node->pong_received < node->ping_sent &&
            now - node->ping_sent > server.cluster_node_timeout/2)
        {
            /* Disconnect the link, it will be reconnected automatically. */
            // 釋放連接肤频,下次 clusterCron() 會(huì)自動(dòng)重連
            freeClusterLink(node->link);
        }

        // 如果目前沒(méi)有在 PING 節(jié)點(diǎn)
        // 并且已經(jīng)有 node timeout 一半的時(shí)間沒(méi)有從節(jié)點(diǎn)那里收到 PONG 回復(fù)
        // 那么向節(jié)點(diǎn)發(fā)送一個(gè) PING ,確保節(jié)點(diǎn)的信息不會(huì)太舊
        // (因?yàn)橐徊糠止?jié)點(diǎn)可能一直沒(méi)有被隨機(jī)中)
        if (node->link &&
            node->ping_sent == 0 &&
            (now - node->pong_received) > server.cluster_node_timeout/2)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        // 如果這是一個(gè)主節(jié)點(diǎn)算墨,并且有一個(gè)從服務(wù)器請(qǐng)求進(jìn)行手動(dòng)故障轉(zhuǎn)移
        // 那么向從服務(wù)器發(fā)送 PING 宵荒。
        if (server.cluster->mf_end &&
            nodeIsMaster(myself) &&
            server.cluster->mf_slave == node &&
            node->link)
        {
            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
            continue;
        }

        // 以下代碼只在節(jié)點(diǎn)發(fā)送了 PING 命令的情況下執(zhí)行
        if (node->ping_sent == 0) continue;

        // 計(jì)算等待 PONG 回復(fù)的時(shí)長(zhǎng)
        delay = now - node->ping_sent;

        // 等待 PONG 回復(fù)的時(shí)長(zhǎng)超過(guò)了限制值,將目標(biāo)節(jié)點(diǎn)標(biāo)記為 PFAIL (疑似下線)
        if (delay > server.cluster_node_timeout) {

            if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
                redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
                    node->name);
                // 打開(kāi)疑似下線標(biāo)記
                node->flags |= REDIS_NODE_PFAIL;
                update_state = 1;
            }
        }
    }
    dictReleaseIterator(di);

    // 如果從節(jié)點(diǎn)沒(méi)有在復(fù)制主節(jié)點(diǎn)净嘀,那么對(duì)從節(jié)點(diǎn)進(jìn)行設(shè)置
    if (nodeIsSlave(myself) &&
        server.masterhost == NULL &&
        myself->slaveof &&
        nodeHasAddr(myself->slaveof))
    {
        replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
    }

    manualFailoverCheckTimeout();

    if (nodeIsSlave(myself)) {
        clusterHandleManualFailover();
        clusterHandleSlaveFailover();

        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
            clusterHandleSlaveMigration(max_slaves);
    }

    // 更新集群狀態(tài)
    if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
        clusterUpdateState();
}



?clusterSendPing的過(guò)程中最核心的就是在發(fā)送ping報(bào)文或者meet報(bào)文的時(shí)候报咳,都會(huì)攜帶本節(jié)點(diǎn)已知的節(jié)點(diǎn)即gossip信息,這樣就可以擴(kuò)展本節(jié)點(diǎn)知道的節(jié)點(diǎn)信息給其他節(jié)點(diǎn)達(dá)到擴(kuò)展的目的面粮。
?按照源碼中的意思每次是攜帶至多3個(gè)已經(jīng)節(jié)點(diǎn)少孝,但是不知道為啥作者的注釋當(dāng)中寫(xiě)的是2

// 向指定節(jié)點(diǎn)發(fā)送一條 MEET 熬苍、 PING 或者 PONG 消息
void clusterSendPing(clusterLink *link, int type) {
    unsigned char buf[sizeof(clusterMsg)];
    clusterMsg *hdr = (clusterMsg*) buf;
    int gossipcount = 0, totlen;

    // freshnodes 是用于發(fā)送 gossip 信息的計(jì)數(shù)器
    // 每次發(fā)送一條信息時(shí)稍走,程序?qū)?freshnodes 的值減一
    // 當(dāng) freshnodes 的數(shù)值小于等于 0 時(shí),程序停止發(fā)送 gossip 信息
    // freshnodes 的數(shù)量是節(jié)點(diǎn)目前的 nodes 表中的節(jié)點(diǎn)數(shù)量減去 2 
    // 這里的 2 指兩個(gè)節(jié)點(diǎn)柴底,一個(gè)是 myself 節(jié)點(diǎn)(也即是發(fā)送信息的這個(gè)節(jié)點(diǎn))
    // 另一個(gè)是接受 gossip 信息的節(jié)點(diǎn)
    int freshnodes = dictSize(server.cluster->nodes)-2;

    // 如果發(fā)送的信息是 PING 婿脸,那么更新最后一次發(fā)送 PING 命令的時(shí)間戳
    if (link->node && type == CLUSTERMSG_TYPE_PING)
        link->node->ping_sent = mstime();

    // 將當(dāng)前節(jié)點(diǎn)的信息(比如名字、地址柄驻、端口號(hào)狐树、負(fù)責(zé)處理的槽)記錄到消息里面
    clusterBuildMessageHdr(hdr,type);

    // 從當(dāng)前節(jié)點(diǎn)已知的節(jié)點(diǎn)中隨機(jī)選出兩個(gè)節(jié)點(diǎn)
    // 并通過(guò)這條消息捎帶給目標(biāo)節(jié)點(diǎn),從而實(shí)現(xiàn) gossip 協(xié)議

    // 每個(gè)節(jié)點(diǎn)有 freshnodes 次發(fā)送 gossip 信息的機(jī)會(huì)
    // 每次向目標(biāo)節(jié)點(diǎn)發(fā)送 2 個(gè)被選中節(jié)點(diǎn)的 gossip 信息(gossipcount 計(jì)數(shù))
    while(freshnodes > 0 && gossipcount < 3) {
        // 從 nodes 字典中隨機(jī)選出一個(gè)節(jié)點(diǎn)(被選中節(jié)點(diǎn))
        dictEntry *de = dictGetRandomKey(server.cluster->nodes);
        clusterNode *this = dictGetVal(de);

        clusterMsgDataGossip *gossip;
        int j;

        /* 
         * 以下節(jié)點(diǎn)不能作為被選中節(jié)點(diǎn):
         * 1)節(jié)點(diǎn)本身鸿脓。
         * 2) 處于 HANDSHAKE 狀態(tài)的節(jié)點(diǎn)抑钟。
         * 3) 帶有 NOADDR 標(biāo)識(shí)的節(jié)點(diǎn)
         * 4) 因?yàn)椴惶幚砣魏尾鄱粩嚅_(kāi)連接的節(jié)點(diǎn) 
         */
        if (this == myself ||
            this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
            (this->link == NULL && this->numslots == 0))
        {
                freshnodes--; /* otherwise we may loop forever. */
                continue;
        }

        // 檢查被選中節(jié)點(diǎn)是否已經(jīng)在 hdr->data.ping.gossip 數(shù)組里面
        // 如果是的話說(shuō)明這個(gè)節(jié)點(diǎn)之前已經(jīng)被選中了
        // 不要再選中它(否則就會(huì)出現(xiàn)重復(fù))
        for (j = 0; j < gossipcount; j++) {
            if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
                    REDIS_CLUSTER_NAMELEN) == 0) break;
        }
        if (j != gossipcount) continue;

        // 這個(gè)被選中節(jié)點(diǎn)有效涯曲,計(jì)數(shù)器減一
        freshnodes--;

        // 指向 gossip 信息結(jié)構(gòu)
        gossip = &(hdr->data.ping.gossip[gossipcount]);

        // 將被選中節(jié)點(diǎn)的名字記錄到 gossip 信息
        memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
        // 將被選中節(jié)點(diǎn)的 PING 命令發(fā)送時(shí)間戳記錄到 gossip 信息
        gossip->ping_sent = htonl(this->ping_sent);
        // 將被選中節(jié)點(diǎn)的 PING 命令回復(fù)的時(shí)間戳記錄到 gossip 信息
        gossip->pong_received = htonl(this->pong_received);
        // 將被選中節(jié)點(diǎn)的 IP 記錄到 gossip 信息
        memcpy(gossip->ip,this->ip,sizeof(this->ip));
        // 將被選中節(jié)點(diǎn)的端口號(hào)記錄到 gossip 信息
        gossip->port = htons(this->port);
        // 將被選中節(jié)點(diǎn)的標(biāo)識(shí)值記錄到 gossip 信息
        gossip->flags = htons(this->flags);

        // 這個(gè)被選中節(jié)點(diǎn)有效,計(jì)數(shù)器增一
        gossipcount++;
    }

    // 計(jì)算信息長(zhǎng)度
    totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
    totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
    // 將被選中節(jié)點(diǎn)的數(shù)量(gossip 信息中包含了多少個(gè)節(jié)點(diǎn)的信息)
    // 記錄在 count 屬性里面
    hdr->count = htons(gossipcount);
    // 將信息的長(zhǎng)度記錄到信息里面
    hdr->totlen = htonl(totlen);

    // 發(fā)送信息
    clusterSendMessage(link,buf,totlen);
}


cluster集群發(fā)現(xiàn)過(guò)程-server端

?在redis 的server端主要做兩個(gè)事情:

  • 通過(guò)clusterAcceptHandler接受連接的socket并建立socket對(duì)應(yīng)的ClusterLink便于接收ping或者meet報(bào)文時(shí)候可以回應(yīng)報(bào)文在塔。
  • 注冊(cè)fd的讀事件到處理函數(shù)clusterReadHandler當(dāng)中幻件。
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd;
    int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
    char cip[REDIS_IP_STR_LEN];
    clusterLink *link;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    /* If the server is starting up, don't accept cluster connections:
     * UPDATE messages may interact with the database content. */
    if (server.masterhost == NULL && server.loading) return;

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                redisLog(REDIS_VERBOSE,
                    "Accepting cluster node: %s", server.neterr);
            return;
        }
        anetNonBlock(NULL,cfd);
        anetEnableTcpNoDelay(NULL,cfd);

       
        redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
        link = createClusterLink(NULL);
        link->fd = cfd;
        aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
    }
}



?clusterReadHandler的內(nèi)部主要讀取報(bào)文的內(nèi)容然后交由clusterProcessPacket去處理報(bào)文。

// 讀事件處理器
// 首先讀入內(nèi)容的頭蛔溃,以判斷讀入內(nèi)容的長(zhǎng)度
// 如果內(nèi)容是一個(gè) whole packet 绰沥,那么調(diào)用函數(shù)來(lái)處理這個(gè) packet 。
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    char buf[sizeof(clusterMsg)];
    ssize_t nread;
    clusterMsg *hdr;
    clusterLink *link = (clusterLink*) privdata;
    int readlen, rcvbuflen;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);

    // 盡可能地多讀數(shù)據(jù)
    while(1) { /* Read as long as there is data to read. */

        // 檢查輸入緩沖區(qū)的長(zhǎng)度
        rcvbuflen = sdslen(link->rcvbuf);
        // 頭信息(8 字節(jié))未讀入完
        if (rcvbuflen < 8) {
            /* First, obtain the first 8 bytes to get the full message
             * length. */
            readlen = 8 - rcvbuflen;
        // 已讀入完整的信息
        } else {
            /* Finally read the full message. */
            hdr = (clusterMsg*) link->rcvbuf;
            if (rcvbuflen == 8) {
                if (memcmp(hdr->sig,"RCmb",4) != 0 ||
                    ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
                {
                    redisLog(REDIS_WARNING,
                        "Bad message length or signature received "
                        "from Cluster bus.");
                    handleLinkIOError(link);
                    return;
                }
            }
            // 記錄已讀入內(nèi)容長(zhǎng)度
            readlen = ntohl(hdr->totlen) - rcvbuflen;
            if (readlen > sizeof(buf)) readlen = sizeof(buf);
        }

        // 讀入內(nèi)容
        nread = read(fd,buf,readlen);

        // 沒(méi)有內(nèi)容可讀
        if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */

        // 處理讀入錯(cuò)誤
        if (nread <= 0) {
            /* I/O error... */
            redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
                (nread == 0) ? "connection closed" : strerror(errno));
            handleLinkIOError(link);
            return;
        } else {
            /* Read data and recast the pointer to the new buffer. */
            // 將讀入的內(nèi)容追加進(jìn)輸入緩沖區(qū)里面
            link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
            hdr = (clusterMsg*) link->rcvbuf;
            rcvbuflen += nread;
        }

        /* Total length obtained? Process this packet. */
        // 檢查已讀入內(nèi)容的長(zhǎng)度贺待,看是否整條信息已經(jīng)被讀入了
        if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
            // 如果是的話徽曲,執(zhí)行處理信息的函數(shù)
            if (clusterProcessPacket(link)) {
                sdsfree(link->rcvbuf);
                link->rcvbuf = sdsempty();
            } else {
                return; /* Link no longer valid. */
            }
        }
    }
}



?clusterProcessPacket內(nèi)部針對(duì)第一次發(fā)起連接的節(jié)點(diǎn)主要做了下面事情:

  • 針對(duì)發(fā)起連接的redis 節(jié)點(diǎn)發(fā)起createClusterNode操作并添加到待連接的節(jié)點(diǎn)當(dāng)中。
  • 針對(duì)攜帶的gossip信息麸塞,會(huì)解析里面的信息找到攜帶的redis 節(jié)點(diǎn)然后添加到待連接的節(jié)點(diǎn)當(dāng)中秃臣。
  • 發(fā)送響應(yīng)報(bào)文,內(nèi)部攜帶這個(gè)節(jié)點(diǎn)的關(guān)鍵信息即節(jié)點(diǎn)名稱便于發(fā)起端redis節(jié)點(diǎn)維持正確的節(jié)點(diǎn)名和連接映射關(guān)系
  • 整個(gè)內(nèi)部處理過(guò)程中處理的所有情況喘垂,這里只針對(duì)建立連接的過(guò)程做了簡(jiǎn)單分析甜刻。
/*
 * 當(dāng)這個(gè)函數(shù)被調(diào)用時(shí)绍撞,說(shuō)明 node->rcvbuf 中有一條待處理的信息正勒。
 * 信息處理完畢之后的釋放工作由調(diào)用者處理,所以這個(gè)函數(shù)只需負(fù)責(zé)處理信息就可以了傻铣。
 *
 * 如果函數(shù)返回 1 章贞,那么說(shuō)明處理信息時(shí)沒(méi)有遇到問(wèn)題,連接依然可用非洲。
 * 如果函數(shù)返回 0 鸭限,那么說(shuō)明信息處理時(shí)遇到了不一致問(wèn)題
 * (比如接收到的 PONG 是發(fā)送自不正確的發(fā)送者 ID 的),連接已經(jīng)被釋放两踏。
 */
int clusterProcessPacket(clusterLink *link) {

    // 指向消息頭
    clusterMsg *hdr = (clusterMsg*) link->rcvbuf;

    // 消息的長(zhǎng)度
    uint32_t totlen = ntohl(hdr->totlen);

    // 消息的類型
    uint16_t type = ntohs(hdr->type);

    // 消息發(fā)送者的標(biāo)識(shí)
    uint16_t flags = ntohs(hdr->flags);

    uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;

    clusterNode *sender;

    // 更新接受消息計(jì)數(shù)器
    server.cluster->stats_bus_messages_received++;

    redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
        type, (unsigned long) totlen);

    // 合法性檢查
    if (totlen < 16) return 1; 
    if (ntohs(hdr->ver) != 0) return 1;  
    if (totlen > sdslen(link->rcvbuf)) return 1;
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        uint16_t count = ntohs(hdr->count);
        uint32_t explen; /* expected length of this packet */

        explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
        explen += (sizeof(clusterMsgDataGossip)*count);
        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_FAIL) {
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        explen += sizeof(clusterMsgDataFail);
        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        explen += sizeof(clusterMsgDataPublish) +
                ntohl(hdr->data.publish.msg.channel_len) +
                ntohl(hdr->data.publish.msg.message_len);
        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
               type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
               type == CLUSTERMSG_TYPE_MFSTART)
    {
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        if (totlen != explen) return 1;
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

        explen += sizeof(clusterMsgDataUpdate);
        if (totlen != explen) return 1;
    }

    // 查找發(fā)送者節(jié)點(diǎn)
    sender = clusterLookupNode(hdr->sender);
    // 節(jié)點(diǎn)存在败京,并且不是 HANDSHAKE 節(jié)點(diǎn)
    // 那么個(gè)更新節(jié)點(diǎn)的配置紀(jì)元信息
    if (sender && !nodeInHandshake(sender)) {
        /* Update our curretEpoch if we see a newer epoch in the cluster. */
        senderCurrentEpoch = ntohu64(hdr->currentEpoch);
        senderConfigEpoch = ntohu64(hdr->configEpoch);
        if (senderCurrentEpoch > server.cluster->currentEpoch)
            server.cluster->currentEpoch = senderCurrentEpoch;
        /* Update the sender configEpoch if it is publishing a newer one. */
        if (senderConfigEpoch > sender->configEpoch) {
            sender->configEpoch = senderConfigEpoch;
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                 CLUSTER_TODO_FSYNC_CONFIG);
        }
       
        sender->repl_offset = ntohu64(hdr->offset);
        sender->repl_offset_time = mstime();

        if (server.cluster->mf_end &&
            nodeIsSlave(myself) &&
            myself->slaveof == sender &&
            hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
            server.cluster->mf_master_offset == 0)
        {
            server.cluster->mf_master_offset = sender->repl_offset;
            redisLog(REDIS_WARNING,
                "Received replication offset for paused "
                "master manual failover: %lld",
                server.cluster->mf_master_offset);
        }
    }

    // 根據(jù)消息的類型,處理節(jié)點(diǎn)

    // 這是一條 PING 消息或者 MEET 消息
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
        redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);

        /* 
         * 如果當(dāng)前節(jié)點(diǎn)是第一次遇見(jiàn)這個(gè)節(jié)點(diǎn)梦染,并且對(duì)方發(fā)來(lái)的是 MEET 信息赡麦,
         * 那么將這個(gè)節(jié)點(diǎn)添加到集群的節(jié)點(diǎn)列表里面。
         *
         * 節(jié)點(diǎn)目前的 flag 帕识、 slaveof 等屬性的值都是未設(shè)置的泛粹,
         * 等當(dāng)前節(jié)點(diǎn)向?qū)Ψ桨l(fā)送 PING 命令之后,
         * 這些信息可以從對(duì)方回復(fù)的 PONG 信息中取得肮疗。
         */
        if (!sender && type == CLUSTERMSG_TYPE_MEET) {
            clusterNode *node;

            // 創(chuàng)建 HANDSHAKE 狀態(tài)的新節(jié)點(diǎn)
            node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);

            // 設(shè)置 IP 和端口
            nodeIp2String(node->ip,link);
            node->port = ntohs(hdr->port);

            // 將新節(jié)點(diǎn)添加到集群
            clusterAddNode(node);

            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
        }

        // 分析并取出消息中的 gossip 節(jié)點(diǎn)信息
        clusterProcessGossipSection(hdr,link);

        // 向目標(biāo)節(jié)點(diǎn)返回一個(gè) PONG
        clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
    }

    // 這是一條 PING 晶姊、 PONG 或者 MEET 消息
    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        redisLog(REDIS_DEBUG,"%s packet received: %p",
            type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
            (void*)link->node);

        // 連接的 clusterNode 結(jié)構(gòu)存在
        if (link->node) {
            // 節(jié)點(diǎn)處于 HANDSHAKE 狀態(tài)
            if (nodeInHandshake(link->node)) {
               
                if (sender) {
                    redisLog(REDIS_VERBOSE,
                        "Handshake: we already know node %.40s, "
                        "updating the address if needed.", sender->name);
                    // 如果有需要的話,更新節(jié)點(diǎn)的地址
                    if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
                    {
                        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                             CLUSTER_TODO_UPDATE_STATE);
                    }

                    // 釋放節(jié)點(diǎn)
                    freeClusterNode(link->node);
                    return 0;
                }

                // 用節(jié)點(diǎn)的真名替換在 HANDSHAKE 時(shí)創(chuàng)建的隨機(jī)名字
                clusterRenameNode(link->node, hdr->sender);
                redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
                    link->node->name);

                // 關(guān)閉 HANDSHAKE 狀態(tài)
                link->node->flags &= ~REDIS_NODE_HANDSHAKE;

                // 設(shè)置節(jié)點(diǎn)的角色
                link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);

                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);

            // 節(jié)點(diǎn)已存在伪货,但它的 id 和當(dāng)前節(jié)點(diǎn)保存的 id 不同
            } else if (memcmp(link->node->name,hdr->sender,
                        REDIS_CLUSTER_NAMELEN) != 0)
            {
                // 那么將這個(gè)節(jié)點(diǎn)設(shè)為 NOADDR 
                // 并斷開(kāi)連接
                redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
                link->node->flags |= REDIS_NODE_NOADDR;
                link->node->ip[0] = '\0';
                link->node->port = 0;

                // 斷開(kāi)連接
                freeClusterLink(link);

                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
                return 0;
            }
        }

        // 如果發(fā)送的消息為 PING 
        // 并且發(fā)送者不在 HANDSHAKE 狀態(tài)
        // 那么更新發(fā)送者的信息
        if (sender && type == CLUSTERMSG_TYPE_PING &&
            !nodeInHandshake(sender) &&
            nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
        {
            clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                 CLUSTER_TODO_UPDATE_STATE);
        }

        /* Update our info about the node */
        // 如果這是一條 PONG 消息们衙,那么更新我們關(guān)于 node 節(jié)點(diǎn)的認(rèn)識(shí)
        if (link->node && type == CLUSTERMSG_TYPE_PONG) {

            // 最后一次接到該節(jié)點(diǎn)的 PONG 的時(shí)間
            link->node->pong_received = mstime();

            // 清零最近一次等待 PING 命令的時(shí)間
            link->node->ping_sent = 0;

            /* The PFAIL condition can be reversed without external
             * help if it is momentary (that is, if it does not
             * turn into a FAIL state).
             *
             * 接到節(jié)點(diǎn)的 PONG 回復(fù)钾怔,我們可以移除節(jié)點(diǎn)的 PFAIL 狀態(tài)。
             *
             * The FAIL condition is also reversible under specific
             * conditions detected by clearNodeFailureIfNeeded(). 
             *
             * 如果節(jié)點(diǎn)的狀態(tài)為 FAIL 蒙挑,
             * 那么是否撤銷該狀態(tài)要根據(jù) clearNodeFailureIfNeeded() 函數(shù)來(lái)決定蒂教。
             */
            if (nodeTimedOut(link->node)) {
                // 撤銷 PFAIL
                link->node->flags &= ~REDIS_NODE_PFAIL;

                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE);
            } else if (nodeFailed(link->node)) {
                // 看是否可以撤銷 FAIL
                clearNodeFailureIfNeeded(link->node);
            }
        }

        /* Check for role switch: slave -> master or master -> slave. */
        // 檢測(cè)節(jié)點(diǎn)的身份信息,并在需要時(shí)進(jìn)行更新
        if (sender) {

            // 發(fā)送消息的節(jié)點(diǎn)的 slaveof 為 REDIS_NODE_NULL_NAME
            // 那么 sender 就是一個(gè)主節(jié)點(diǎn)
            if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
                sizeof(hdr->slaveof)))
            {
                /* Node is a master. */
                // 設(shè)置 sender 為主節(jié)點(diǎn)
                clusterSetNodeAsMaster(sender);

            // sender 的 slaveof 不為空脆荷,那么這是一個(gè)從節(jié)點(diǎn)
            } else {

                /* Node is a slave. */
                // 取出 sender 的主節(jié)點(diǎn)
                clusterNode *master = clusterLookupNode(hdr->slaveof);

                // sender 由主節(jié)點(diǎn)變成了從節(jié)點(diǎn)凝垛,重新配置 sender
                if (nodeIsMaster(sender)) {
                    /* Master turned into a slave! Reconfigure the node. */

                    // 刪除所有由該節(jié)點(diǎn)負(fù)責(zé)的槽
                    clusterDelNodeSlots(sender);

                    // 更新標(biāo)識(shí)
                    sender->flags &= ~REDIS_NODE_MASTER;
                    sender->flags |= REDIS_NODE_SLAVE;

                    /* Remove the list of slaves from the node. */
                    // 移除 sender 的從節(jié)點(diǎn)名單
                    if (sender->numslaves) clusterNodeResetSlaves(sender);

                    /* Update config and state. */
                    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                         CLUSTER_TODO_UPDATE_STATE);
                }

                /* Master node changed for this slave? */

                // 檢查 sender 的主節(jié)點(diǎn)是否變更
                if (master && sender->slaveof != master) {
                    // 如果 sender 之前的主節(jié)點(diǎn)不是現(xiàn)在的主節(jié)點(diǎn)
                    // 那么在舊主節(jié)點(diǎn)的從節(jié)點(diǎn)列表中移除 sender
                    if (sender->slaveof)
                        clusterNodeRemoveSlave(sender->slaveof,sender);

                    // 并在新主節(jié)點(diǎn)的從節(jié)點(diǎn)列表中添加 sender
                    clusterNodeAddSlave(master,sender);

                    // 更新 sender 的主節(jié)點(diǎn)
                    sender->slaveof = master;

                    /* Update config. */
                    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
                }
            }
        }

        /* 
         * 更新當(dāng)前節(jié)點(diǎn)對(duì) sender 所處理槽的認(rèn)識(shí)。
         *
         * 這部分的更新 *必須* 在更新 sender 的主/從節(jié)點(diǎn)信息之后蜓谋,
         * 因?yàn)檫@里需要用到 REDIS_NODE_MASTER 標(biāo)識(shí)梦皮。
         */

        clusterNode *sender_master = NULL; /* Sender or its master if slave. */
        int dirty_slots = 0; /* Sender claimed slots don't match my view? */

        if (sender) {
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
            if (sender_master) {
                dirty_slots = memcmp(sender_master->slots,
                        hdr->myslots,sizeof(hdr->myslots)) != 0;
            }
        }

        /* 1)
        * 如果 sender 是主節(jié)點(diǎn),并且 sender 的槽布局出現(xiàn)了變動(dòng)
        *  那么檢查當(dāng)前節(jié)點(diǎn)對(duì) sender 的槽布局設(shè)置桃焕,看是否需要進(jìn)行更新
        */
       
        if (sender && nodeIsMaster(sender) && dirty_slots)
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);

        /* 2) 
         *    檢測(cè)和條件 1 的相反條件剑肯,也即是,
         *    sender 處理的槽的配置紀(jì)元比當(dāng)前節(jié)點(diǎn)已知的某個(gè)節(jié)點(diǎn)的配置紀(jì)元要低观堂,
         *    如果是這樣的話让网,通知 sender 。
         *
         * 這種情況可能會(huì)出現(xiàn)在網(wǎng)絡(luò)分裂中师痕,
         * 一個(gè)重新上線的主節(jié)點(diǎn)可能會(huì)帶有已經(jīng)過(guò)時(shí)的槽布局溃睹。
         *
         * 比如說(shuō):
         *
         * A 負(fù)責(zé)槽 1 、 2 胰坟、 3 因篇,而 B 是 A 的從節(jié)點(diǎn)。
         *
         * A 從網(wǎng)絡(luò)中分裂出去笔横,B 被提升為主節(jié)點(diǎn)竞滓。
         *
         * B 從網(wǎng)絡(luò)中分裂出去, A 重新上線(但是它所使用的槽布局是舊的)吹缔。
         *
         * 在正常情況下商佑, B 應(yīng)該向 A 發(fā)送 PING 消息,告知 A 厢塘,自己(B)已經(jīng)接替了
         * 槽 1茶没、 2、 3 俗冻,并且?guī)в懈呐渲眉o(jì)元礁叔,但因?yàn)榫W(wǎng)絡(luò)分裂的緣故,
         * 節(jié)點(diǎn) B 沒(méi)辦法通知節(jié)點(diǎn) A 迄薄,
         * 所以通知節(jié)點(diǎn) A 它帶有的槽布局已經(jīng)更新的工作就交給其他知道 B 帶有更高配置紀(jì)元的節(jié)點(diǎn)來(lái)做琅关。
         * 當(dāng) A 接到其他節(jié)點(diǎn)關(guān)于節(jié)點(diǎn) B 的消息時(shí),
         * 節(jié)點(diǎn) A 就會(huì)停止自己的主節(jié)點(diǎn)工作,又或者重新進(jìn)行故障轉(zhuǎn)移涣易。
         */
        if (sender && dirty_slots) {
            int j;

            for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {

                // 檢測(cè) slots 中的槽 j 是否已經(jīng)被指派
                if (bitmapTestBit(hdr->myslots,j)) {

                    // 當(dāng)前節(jié)點(diǎn)認(rèn)為槽 j 由 sender 負(fù)責(zé)處理画机,
                    // 或者當(dāng)前節(jié)點(diǎn)認(rèn)為該槽未指派,那么跳過(guò)該槽
                    if (server.cluster->slots[j] == sender ||
                        server.cluster->slots[j] == NULL) continue;

                    // 當(dāng)前節(jié)點(diǎn)槽 j 的配置紀(jì)元比 sender 的配置紀(jì)元要大
                    if (server.cluster->slots[j]->configEpoch >
                        senderConfigEpoch)
                    {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s has old slots configuration, sending "
                            "an UPDATE message about %.40s",
                                sender->name, server.cluster->slots[j]->name);

                        // 向 sender 發(fā)送關(guān)于槽 j 的更新信息
                        clusterSendUpdate(sender->link,
                            server.cluster->slots[j]);

                        /* TODO: instead of exiting the loop send every other
                         * UPDATE packet for other nodes that are the new owner
                         * of sender's slots. */
                        break;
                    }
                }
            }
        }

        if (sender &&
            nodeIsMaster(myself) && nodeIsMaster(sender) &&
            senderConfigEpoch == myself->configEpoch)
        {
            clusterHandleConfigEpochCollision(sender);
        }

        // 分析并提取出消息 gossip 協(xié)議部分的信息
        clusterProcessGossipSection(hdr,link);

    // 這是一條 FAIL 消息: sender 告知當(dāng)前節(jié)點(diǎn)新症,某個(gè)節(jié)點(diǎn)已經(jīng)進(jìn)入 FAIL 狀態(tài)步氏。
    } else if (type == CLUSTERMSG_TYPE_FAIL) {
        clusterNode *failing;

        if (sender) {

            // 獲取下線節(jié)點(diǎn)的消息
            failing = clusterLookupNode(hdr->data.fail.about.nodename);
            // 下線的節(jié)點(diǎn)既不是當(dāng)前節(jié)點(diǎn),也沒(méi)有處于 FAIL 狀態(tài)
            if (failing &&
                !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
            {
                redisLog(REDIS_NOTICE,
                    "FAIL message received from %.40s about %.40s",
                    hdr->sender, hdr->data.fail.about.nodename);

                // 打開(kāi) FAIL 狀態(tài)
                failing->flags |= REDIS_NODE_FAIL;
                failing->fail_time = mstime();
                // 關(guān)閉 PFAIL 狀態(tài)
                failing->flags &= ~REDIS_NODE_PFAIL;
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE);
            }
        } else {
            redisLog(REDIS_NOTICE,
                "Ignoring FAIL message from unknonw node %.40s about %.40s",
                hdr->sender, hdr->data.fail.about.nodename);
        }

    // 這是一條 PUBLISH 消息
    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
        robj *channel, *message;
        uint32_t channel_len, message_len;

        /* Don't bother creating useless objects if there are no
         * Pub/Sub subscribers. */
        // 只在有訂閱者時(shí)創(chuàng)建消息對(duì)象
        if (dictSize(server.pubsub_channels) ||
           listLength(server.pubsub_patterns))
        {
            // 頻道長(zhǎng)度
            channel_len = ntohl(hdr->data.publish.msg.channel_len);

            // 消息長(zhǎng)度
            message_len = ntohl(hdr->data.publish.msg.message_len);

            // 頻道
            channel = createStringObject(
                        (char*)hdr->data.publish.msg.bulk_data,channel_len);

            // 消息
            message = createStringObject(
                        (char*)hdr->data.publish.msg.bulk_data+channel_len,
                        message_len);
            // 發(fā)送消息
            pubsubPublishMessage(channel,message);

            decrRefCount(channel);
            decrRefCount(message);
        }

    // 這是一條請(qǐng)求獲得故障遷移授權(quán)的消息: sender 請(qǐng)求當(dāng)前節(jié)點(diǎn)為它進(jìn)行故障轉(zhuǎn)移投票
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
        if (!sender) return 1;  /* We don't know that node. */
        // 如果條件允許的話徒爹,向 sender 投票荚醒,支持它進(jìn)行故障轉(zhuǎn)移
        clusterSendFailoverAuthIfNeeded(sender,hdr);

    // 這是一條故障遷移投票信息: sender 支持當(dāng)前節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移操作
    } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
        if (!sender) return 1;  /* We don't know that node. */

        // 只有正在處理至少一個(gè)槽的主節(jié)點(diǎn)的投票會(huì)被視為是有效投票
        // 只有符合以下條件, sender 的投票才算有效:
        // 1) sender 是主節(jié)點(diǎn)
        // 2) sender 正在處理至少一個(gè)槽
        // 3) sender 的配置紀(jì)元大于等于當(dāng)前節(jié)點(diǎn)的配置紀(jì)元
        if (nodeIsMaster(sender) && sender->numslots > 0 &&
            senderCurrentEpoch >= server.cluster->failover_auth_epoch)
        {
            // 增加支持票數(shù)
            server.cluster->failover_auth_count++;

            /* Maybe we reached a quorum here, set a flag to make sure
             * we check ASAP. */
            clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        }

    } else if (type == CLUSTERMSG_TYPE_MFSTART) {

        if (!sender || sender->slaveof != myself) return 1;

        resetManualFailover();
        server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
        server.cluster->mf_slave = sender;
        pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
        redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
            sender->name);
    } else if (type == CLUSTERMSG_TYPE_UPDATE) {
        clusterNode *n; /* The node the update is about. */
        uint64_t reportedConfigEpoch =
                    ntohu64(hdr->data.update.nodecfg.configEpoch);

        if (!sender) return 1;  

        // 獲取需要更新的節(jié)點(diǎn)
        n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
        if (!n) return 1;   /* We don't know the reported node. */

        // 消息的紀(jì)元并不大于節(jié)點(diǎn) n 所處的配置紀(jì)元
        // 無(wú)須更新
        if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */

        // 如果節(jié)點(diǎn) n 為從節(jié)點(diǎn)隆嗅,但它的槽配置更新了
        // 那么說(shuō)明這個(gè)節(jié)點(diǎn)已經(jīng)變?yōu)橹鞴?jié)點(diǎn)界阁,將它設(shè)置為主節(jié)點(diǎn)
        if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);

        n->configEpoch = reportedConfigEpoch;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_FSYNC_CONFIG);

        // 將消息中對(duì) n 的槽布局與當(dāng)前節(jié)點(diǎn)對(duì) n 的槽布局進(jìn)行對(duì)比
        // 在有需要時(shí)更新當(dāng)前節(jié)點(diǎn)對(duì) n 的槽布局的認(rèn)識(shí)
        clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
            hdr->data.update.nodecfg.slots);
    } else {
        redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
    }
    return 1;
}



?clusterProcessGossipSection在建立連接的過(guò)程中主要是解析攜帶的gossip信息并添加到待連接節(jié)點(diǎn)當(dāng)中。

/* 
 * 解釋 MEET 胖喳、 PING 或 PONG 消息中和 gossip 協(xié)議有關(guān)的信息泡躯。
 *
 * 注意,這個(gè)函數(shù)假設(shè)調(diào)用者已經(jīng)根據(jù)消息的長(zhǎng)度丽焊,對(duì)消息進(jìn)行過(guò)合法性檢查较剃。
 */
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {

    // 記錄這條消息中包含了多少個(gè)節(jié)點(diǎn)的信息
    uint16_t count = ntohs(hdr->count);

    // 指向第一個(gè)節(jié)點(diǎn)的信息
    clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;

    // 取出發(fā)送者
    clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

    // 遍歷所有節(jié)點(diǎn)的信息
    while(count--) {
        sds ci = sdsempty();

        // 分析節(jié)點(diǎn)的 flag
        uint16_t flags = ntohs(g->flags);

        // 信息節(jié)點(diǎn)
        clusterNode *node;

        // 取出節(jié)點(diǎn)的 flag
        if (flags == 0) ci = sdscat(ci,"noflags,");
        if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
        if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
        if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
        if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
        if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
        if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
        if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
        if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';

        redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
            g->nodename,
            g->ip,
            ntohs(g->port),
            ci);
        sdsfree(ci);

        // 使用消息中的信息對(duì)節(jié)點(diǎn)進(jìn)行更新
        node = clusterLookupNode(g->nodename);
        // 節(jié)點(diǎn)已經(jīng)存在于當(dāng)前節(jié)點(diǎn)
        if (node) {

            // 如果 sender 是一個(gè)主節(jié)點(diǎn),那么我們需要處理下線報(bào)告
            if (sender && nodeIsMaster(sender) && node != myself) {
                // 節(jié)點(diǎn)處于 FAIL 或者 PFAIL 狀態(tài)
                if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {

                    // 添加 sender 對(duì) node 的下線報(bào)告
                    if (clusterNodeAddFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s as not reachable.",
                            sender->name, node->name);
                    }

                    // 嘗試將 node 標(biāo)記為 FAIL
                    markNodeAsFailingIfNeeded(node);

                // 節(jié)點(diǎn)處于正常狀態(tài)
                } else {

                    // 如果 sender 曾經(jīng)發(fā)送過(guò)對(duì) node 的下線報(bào)告
                    // 那么清除該報(bào)告
                    if (clusterNodeDelFailureReport(node,sender)) {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s reported node %.40s is back online.",
                            sender->name, node->name);
                    }
                }
            }

            // 如果節(jié)點(diǎn)之前處于 PFAIL 或者 FAIL 狀態(tài)
            // 并且該節(jié)點(diǎn)的 IP 或者端口號(hào)已經(jīng)發(fā)生變化
            // 那么可能是節(jié)點(diǎn)換了新地址技健,嘗試對(duì)它進(jìn)行握手
            if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
                (strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }

        // 當(dāng)前節(jié)點(diǎn)不認(rèn)識(shí) node
        } else {
            /*
             * 如果 node 不在 NOADDR 狀態(tài)写穴,并且當(dāng)前節(jié)點(diǎn)不認(rèn)識(shí) node 
             * 那么向 node 發(fā)送 HANDSHAKE 消息。
             *
             * 注意凫乖,當(dāng)前節(jié)點(diǎn)必須保證 sender 是本集群的節(jié)點(diǎn)确垫,
             * 否則我們將有加入了另一個(gè)集群的風(fēng)險(xiǎn)弓颈。
             */
            if (sender &&
                !(flags & REDIS_NODE_NOADDR) &&
                !clusterBlacklistExists(g->nodename))
            {
                clusterStartHandshake(g->ip,ntohs(g->port));
            }
        }

        /* Next node */
        // 處理下個(gè)節(jié)點(diǎn)的信息
        g++;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末帽芽,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子翔冀,更是在濱河造成了極大的恐慌导街,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件纤子,死亡現(xiàn)場(chǎng)離奇詭異搬瑰,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)控硼,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)泽论,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人卡乾,你說(shuō)我怎么就攤上這事翼悴。” “怎么了幔妨?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵鹦赎,是天一觀的道長(zhǎng)谍椅。 經(jīng)常有香客問(wèn)我,道長(zhǎng)古话,這世上最難降的妖魔是什么雏吭? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮陪踩,結(jié)果婚禮上杖们,老公的妹妹穿的比我還像新娘。我一直安慰自己肩狂,他們只是感情好胀莹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著婚温,像睡著了一般描焰。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上栅螟,一...
    開(kāi)封第一講書(shū)人閱讀 51,521評(píng)論 1 304
  • 那天荆秦,我揣著相機(jī)與錄音,去河邊找鬼力图。 笑死步绸,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的吃媒。 我是一名探鬼主播瓤介,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼赘那!你這毒婦竟也來(lái)了刑桑?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤募舟,失蹤者是張志新(化名)和其女友劉穎祠斧,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拱礁,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡琢锋,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了呢灶。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片吴超。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖鸯乃,靈堂內(nèi)的尸體忽然破棺而出鲸阻,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布赘娄,位于F島的核電站仆潮,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏遣臼。R本人自食惡果不足惜性置,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望揍堰。 院中可真熱鬧鹏浅,春花似錦、人聲如沸屏歹。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蝙眶。三九已至季希,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間幽纷,已是汗流浹背式塌。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留友浸,地道東北人峰尝。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像收恢,于是被迫代替她去往敵國(guó)和親武学。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容