系列
redis數(shù)據(jù)淘汰原理
redis過(guò)期數(shù)據(jù)刪除策略
redis server事件模型
redis cluster mget 引發(fā)的討論
redis 3.x windows 集群搭建
redis 命令執(zhí)行過(guò)程
redis string底層數(shù)據(jù)結(jié)構(gòu)
redis list底層數(shù)據(jù)結(jié)構(gòu)
redis hash底層數(shù)據(jù)結(jié)構(gòu)
redis set底層數(shù)據(jù)結(jié)構(gòu)
redis zset底層數(shù)據(jù)結(jié)構(gòu)
redis 客戶端管理
redis 主從同步-slave端
redis 主從同步-master端
redis 主從超時(shí)檢測(cè)
redis aof持久化
redis rdb持久化
redis 數(shù)據(jù)恢復(fù)過(guò)程
redis TTL實(shí)現(xiàn)原理
redis cluster集群建立
redis cluster集群選主
cluster 相關(guān)數(shù)據(jù)結(jié)構(gòu)
?在redis cluster的概念當(dāng)中有一個(gè)槽(slot)的概念屎勘,也就是說(shuō)在redis的cluster中存在2**14=16384個(gè)槽分布在集群當(dāng)中黑界,所以在宏定義當(dāng)中REDIS_CLUSTER_SLOTS的值為16384.
// 槽數(shù)量
#define REDIS_CLUSTER_SLOTS 16384
// 集群在線
#define REDIS_CLUSTER_OK 0 /* Everything looks ok */
// 集群下線
#define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */
// 節(jié)點(diǎn)名字的長(zhǎng)度
#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */
// 集群的實(shí)際端口號(hào) = 用戶指定的端口號(hào) + REDIS_CLUSTER_PORT_INCR
#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */
?redis保存集群節(jié)點(diǎn)相關(guān)的數(shù)據(jù)結(jié)構(gòu)說(shuō)明如下:
- 在redisServer的數(shù)據(jù)結(jié)構(gòu)當(dāng)中,我們通過(guò)clusterState的數(shù)據(jù)結(jié)構(gòu)來(lái)保存集群狀態(tài)。
- 在clusterState的數(shù)據(jù)結(jié)構(gòu)當(dāng)中浸赫,我們通過(guò)dict *nodes保存集群的節(jié)點(diǎn)次伶,其中key為節(jié)點(diǎn)名字急波,value為clusterNode對(duì)象春宣。
- clusterNode保存該節(jié)點(diǎn)集群信息昧辽,包括slave信息衙熔,slots信息,clusterLink的連接信息搅荞。
- clusterLink保存是通過(guò)cluster meet命令指定的集群節(jié)點(diǎn)红氯,從名字上可以理解為集群連接相關(guān)的信息框咙,這個(gè)連接是通過(guò)cluster meet指定的。
struct redisServer {
struct clusterState *cluster;
}
-----------------------------------華麗分割線--------------------------------------------
// 集群狀態(tài)脖隶,每個(gè)節(jié)點(diǎn)都保存著一個(gè)這樣的狀態(tài),記錄了它們眼中的集群的樣子暇检。
typedef struct clusterState {
// 指向當(dāng)前節(jié)點(diǎn)的指針
clusterNode *myself; /* This node */
// 集群當(dāng)前的配置紀(jì)元产阱,用于實(shí)現(xiàn)故障轉(zhuǎn)移
uint64_t currentEpoch;
// 集群當(dāng)前的狀態(tài):是在線還是下線
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
// 集群中至少處理著一個(gè)槽的節(jié)點(diǎn)的數(shù)量。
int size; /* Num of master nodes with at least one slot */
// 集群節(jié)點(diǎn)名單(包括 myself 節(jié)點(diǎn))
// 字典的鍵為節(jié)點(diǎn)的名字块仆,字典的值為 clusterNode 結(jié)構(gòu)
dict *nodes; /* Hash table of name -> clusterNode structures */
// 節(jié)點(diǎn)黑名單构蹬,用于 CLUSTER FORGET 命令
// 防止被 FORGET 的命令重新被添加到集群里面
// (不過(guò)現(xiàn)在似乎沒(méi)有在使用的樣子,已廢棄悔据?還是尚未實(shí)現(xiàn)庄敛?)
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
// 記錄要從當(dāng)前節(jié)點(diǎn)遷移到目標(biāo)節(jié)點(diǎn)的槽,以及遷移的目標(biāo)節(jié)點(diǎn)
// migrating_slots_to[i] = NULL 表示槽 i 未被遷移
// migrating_slots_to[i] = clusterNode_A 表示槽 i 要從本節(jié)點(diǎn)遷移至節(jié)點(diǎn) A
clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS];
// 記錄要從源節(jié)點(diǎn)遷移到本節(jié)點(diǎn)的槽科汗,以及進(jìn)行遷移的源節(jié)點(diǎn)
// importing_slots_from[i] = NULL 表示槽 i 未進(jìn)行導(dǎo)入
// importing_slots_from[i] = clusterNode_A 表示正從節(jié)點(diǎn) A 中導(dǎo)入槽 i
clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS];
// 負(fù)責(zé)處理各個(gè)槽的節(jié)點(diǎn)
// 例如 slots[i] = clusterNode_A 表示槽 i 由節(jié)點(diǎn) A 處理
clusterNode *slots[REDIS_CLUSTER_SLOTS];
// 跳躍表藻烤,表中以槽作為分值,鍵作為成員头滔,對(duì)槽進(jìn)行有序排序
// 當(dāng)需要對(duì)某些槽進(jìn)行區(qū)間(range)操作時(shí)怖亭,這個(gè)跳躍表可以提供方便
// 具體操作定義在 db.c 里面
zskiplist *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
// 以下這些域被用于進(jìn)行故障轉(zhuǎn)移選舉
// 上次執(zhí)行選舉或者下次執(zhí)行選舉的時(shí)間
mstime_t failover_auth_time; /* Time of previous or next election. */
// 節(jié)點(diǎn)獲得的投票數(shù)量
int failover_auth_count; /* Number of votes received so far. */
// 如果值為 1 ,表示本節(jié)點(diǎn)已經(jīng)向其他節(jié)點(diǎn)發(fā)送了投票請(qǐng)求
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
/* Manual failover state in common. */
/* 共用的手動(dòng)故障轉(zhuǎn)移狀態(tài) */
// 手動(dòng)故障轉(zhuǎn)移執(zhí)行的時(shí)間限制
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
/* 主服務(wù)器的手動(dòng)故障轉(zhuǎn)移狀態(tài) */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
/* 從服務(wù)器的手動(dòng)故障轉(zhuǎn)移狀態(tài) */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
// 指示手動(dòng)故障轉(zhuǎn)移是否可以開(kāi)始的標(biāo)志值
// 值為非 0 時(shí)表示各個(gè)主服務(wù)器可以開(kāi)始投票
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The followign fields are uesd by masters to take state on elections. */
/* 以下這些域由主服務(wù)器使用坤检,用于記錄選舉時(shí)的狀態(tài) */
// 集群最后一次進(jìn)行投票的紀(jì)元
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
// 在進(jìn)入下個(gè)事件循環(huán)之前要做的事情兴猩,以各個(gè) flag 來(lái)記錄
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
// 通過(guò) cluster 連接發(fā)送的消息數(shù)量
long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */
// 通過(guò) cluster 接收到的消息數(shù)量
long long stats_bus_messages_received; /* Num of msg rcvd via cluster bus.*/
} clusterState;
-----------------------------------華麗分割線--------------------------------------------
// 節(jié)點(diǎn)狀態(tài)
struct clusterNode {
// 創(chuàng)建節(jié)點(diǎn)的時(shí)間
mstime_t ctime; /* Node object creation time. */
// 節(jié)點(diǎn)的名字,由 40 個(gè)十六進(jìn)制字符組成
// 例如 68eef66df23420a5862208ef5b1a7005b806f2ff
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
// 節(jié)點(diǎn)標(biāo)識(shí)
// 使用各種不同的標(biāo)識(shí)值記錄節(jié)點(diǎn)的角色(比如主節(jié)點(diǎn)或者從節(jié)點(diǎn))早歇,
// 以及節(jié)點(diǎn)目前所處的狀態(tài)(比如在線或者下線)倾芝。
int flags; /* REDIS_NODE_... */
// 節(jié)點(diǎn)當(dāng)前的配置紀(jì)元,用于實(shí)現(xiàn)故障轉(zhuǎn)移
uint64_t configEpoch; /* Last configEpoch observed for this node */
// 由這個(gè)節(jié)點(diǎn)負(fù)責(zé)處理的槽
// 一共有 REDIS_CLUSTER_SLOTS / 8 個(gè)字節(jié)長(zhǎng)
// 每個(gè)字節(jié)的每個(gè)位記錄了一個(gè)槽的保存狀態(tài)
// 位的值為 1 表示槽正由本節(jié)點(diǎn)處理箭跳,值為 0 則表示槽并非本節(jié)點(diǎn)處理
// 比如 slots[0] 的第一個(gè)位保存了槽 0 的保存情況
// slots[0] 的第二個(gè)位保存了槽 1 的保存情況晨另,以此類推
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
// 該節(jié)點(diǎn)負(fù)責(zé)處理的槽數(shù)量
int numslots; /* Number of slots handled by this node */
// 如果本節(jié)點(diǎn)是主節(jié)點(diǎn),那么用這個(gè)屬性記錄從節(jié)點(diǎn)的數(shù)量
int numslaves; /* Number of slave nodes, if this is a master */
// 指針數(shù)組谱姓,指向各個(gè)從節(jié)點(diǎn)
struct clusterNode **slaves; /* pointers to slave nodes */
// 如果這是一個(gè)從節(jié)點(diǎn)拯刁,那么指向主節(jié)點(diǎn)
struct clusterNode *slaveof; /* pointer to the master node */
// 最后一次發(fā)送 PING 命令的時(shí)間
mstime_t ping_sent; /* Unix time we sent latest ping */
// 最后一次接收 PONG 回復(fù)的時(shí)間戳
mstime_t pong_received; /* Unix time we received the pong */
// 最后一次被設(shè)置為 FAIL 狀態(tài)的時(shí)間
mstime_t fail_time; /* Unix time when FAIL flag was set */
// 最后一次給某個(gè)從節(jié)點(diǎn)投票的時(shí)間
mstime_t voted_time; /* Last time we voted for a slave of this master */
// 最后一次從這個(gè)節(jié)點(diǎn)接收到復(fù)制偏移量的時(shí)間
mstime_t repl_offset_time; /* Unix time we received offset for this node */
// 這個(gè)節(jié)點(diǎn)的復(fù)制偏移量
long long repl_offset; /* Last known repl offset for this node. */
// 節(jié)點(diǎn)的 IP 地址
char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */
// 節(jié)點(diǎn)的端口號(hào)
int port; /* Latest known port of this node */
// 保存連接節(jié)點(diǎn)所需的有關(guān)信息
clusterLink *link; /* TCP/IP link with this node */
// 一個(gè)鏈表,記錄了所有其他節(jié)點(diǎn)對(duì)該節(jié)點(diǎn)的下線報(bào)告
list *fail_reports; /* List of nodes signaling this as failing */
}
-----------------------------------華麗分割線-------------------------------------------
// clusterLink 包含了與其他節(jié)點(diǎn)進(jìn)行通訊所需的全部信息
typedef struct clusterLink {
// 連接的創(chuàng)建時(shí)間
mstime_t ctime; /* Link creation time */
// TCP 套接字描述符
int fd; /* TCP socket file descriptor */
// 輸出緩沖區(qū)逝段,保存著等待發(fā)送給其他節(jié)點(diǎn)的消息(message)垛玻。
sds sndbuf; /* Packet send buffer */
// 輸入緩沖區(qū),保存著從其他節(jié)點(diǎn)接收到的消息奶躯。
sds rcvbuf; /* Packet reception buffer */
// 與這個(gè)連接相關(guān)聯(lián)的節(jié)點(diǎn)帚桩,如果沒(méi)有的話就為 NULL
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
cluster啟動(dòng)過(guò)程
?redis cluster啟動(dòng)過(guò)程中主要完成了一下幾件事情:
- 創(chuàng)建server.cluster的信息(是個(gè)clusterState對(duì)象),變量值都為空嘹黔。
- 如果有server.cluster_configfile就直接初始化clusterNode對(duì)象账嚎,如果沒(méi)有就創(chuàng)建clusterNode對(duì)象并保存至server.cluster_configfile對(duì)象莫瞬,server本身的clusterNode對(duì)象保存至server.cluster->nodes和server.cluster->myself當(dāng)中。
- 監(jiān)聽(tīng)cluster相關(guān)端口:cluster的監(jiān)聽(tīng)端口=redis 監(jiān)聽(tīng)端口+10000
- 綁定cluster端口的讀事件到clusterAcceptHandler用于處理cluster的連接請(qǐng)求
- clusterAcceptHandler內(nèi)部綁定accept的socket的讀事件到clusterReadHandler
- clusterReadHandler內(nèi)部讀取數(shù)據(jù)后交由clusterProcessPacket繼續(xù)后續(xù)處理
// 初始化集群
void clusterInit(void) {
int saveconf = 0;
// 初始化配置
server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = REDIS_CLUSTER_FAIL;
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list =
dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
if (clusterLockConfig(server.cluster_configfile) == REDIS_ERR)
exit(1);
if (clusterLoadConfig(server.cluster_configfile) == REDIS_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
myself = server.cluster->myself =
createClusterNode(NULL,REDIS_NODE_MYSELF|REDIS_NODE_MASTER);
redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
// 保存 nodes.conf 文件
if (saveconf) clusterSaveConfigOrDie(1);
/* We need a listening TCP port for our cluster messaging needs. */
// 監(jiān)聽(tīng) TCP 端口
server.cfd_count = 0;
/* Port sanity check II
* The other handshake port check is triggered too late to stop
* us from trying to use a too-high cluster port number. */
if (server.port > (65535-REDIS_CLUSTER_PORT_INCR)) {
redisLog(REDIS_WARNING, "Redis port number too high. "
"Cluster communication port is 10,000 port "
"numbers higher than your Redis port. "
"Your Redis port number must be "
"lower than 55535.");
exit(1);
}
if (listenToPort(server.port+REDIS_CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == REDIS_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
// 關(guān)聯(lián)監(jiān)聽(tīng)事件處理器
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
redisPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
/* The slots -> keys map is a sorted set. Init it. */
// slots -> keys 映射是一個(gè)有序集合
server.cluster->slots_to_keys = zslCreate();
resetManualFailover();
}
?redis cluster之間監(jiān)聽(tīng)端口讀事件對(duì)應(yīng)的處理函數(shù)clusterAcceptHandler郭蕉,負(fù)責(zé)處理集群連接事件的回調(diào)函數(shù)疼邀。
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
clusterLink *link;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_VERBOSE,
"Accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
?redis cluster之間處理socket通信報(bào)文的回調(diào)函數(shù)
// 讀事件處理器
// 首先讀入內(nèi)容的頭,以判斷讀入內(nèi)容的長(zhǎng)度
// 如果內(nèi)容是一個(gè) whole packet 召锈,那么調(diào)用函數(shù)來(lái)處理這個(gè) packet 旁振。
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[sizeof(clusterMsg)];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = (clusterLink*) privdata;
int readlen, rcvbuflen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 盡可能地多讀數(shù)據(jù)
while(1) { /* Read as long as there is data to read. */
// 檢查輸入緩沖區(qū)的長(zhǎng)度
rcvbuflen = sdslen(link->rcvbuf);
// 頭信息(8 字節(jié))未讀入完
if (rcvbuflen < 8) {
/* First, obtain the first 8 bytes to get the full message
* length. */
readlen = 8 - rcvbuflen;
// 已讀入完整的信息
} else {
/* Finally read the full message. */
hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 8) {
/* Perform some sanity check on the message signature
* and length. */
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
redisLog(REDIS_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
// 記錄已讀入內(nèi)容長(zhǎng)度
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}
// 讀入內(nèi)容
nread = read(fd,buf,readlen);
// 沒(méi)有內(nèi)容可讀
if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
// 處理讀入錯(cuò)誤
if (nread <= 0) {
/* I/O error... */
redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
return;
} else {
/* Read data and recast the pointer to the new buffer. */
// 將讀入的內(nèi)容追加進(jìn)輸入緩沖區(qū)里面
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}
/* Total length obtained? Process this packet. */
// 檢查已讀入內(nèi)容的長(zhǎng)度,看是否整條信息已經(jīng)被讀入了
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
// 如果是的話涨岁,執(zhí)行處理信息的函數(shù)
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
cluster加入集群過(guò)程
?redis cluster節(jié)點(diǎn)之間的網(wǎng)絡(luò)連接圖如下圖所示(假設(shè)總共有N個(gè)節(jié)點(diǎn))拐袜,主要特點(diǎn)是:
- 每個(gè)節(jié)點(diǎn)都與剩余的N-1個(gè)節(jié)點(diǎn)建立連接。
- 任意兩個(gè)節(jié)點(diǎn)之間有兩個(gè)網(wǎng)絡(luò)連接梢薪,注意是兩個(gè)網(wǎng)絡(luò)連接蹬铺。
?redis cluster集群節(jié)點(diǎn)之間建立連接過(guò)程如下,假設(shè)現(xiàn)在有ABCDE公5個(gè)幾點(diǎn):
- 在節(jié)點(diǎn)A依次執(zhí)行 cluster meet B秉撇,cluster meet C甜攀,cluster meet D,cluster meet E琐馆。
- AB赴邻,AC,AD啡捶,AE之間建立了連接
- 節(jié)點(diǎn)BCDE在接收到A的連接請(qǐng)求后會(huì)獲取節(jié)點(diǎn)A的信息姥敛,反向在BA,CA瞎暑,DA彤敛,EA之間建立連接
- 節(jié)點(diǎn)AB通信過(guò)程中,節(jié)點(diǎn)A會(huì)攜帶CDE的信息給節(jié)點(diǎn)B了赌,那么在BC墨榄, BD,BE之間建立連接
- 節(jié)點(diǎn)AC通信過(guò)程中勿她,節(jié)點(diǎn)A會(huì)攜帶BDE的信息給節(jié)點(diǎn)C袄秩,那么在CB,CD逢并,CE之間建立連接
- 節(jié)點(diǎn)AD通信過(guò)程中之剧,節(jié)點(diǎn)A會(huì)攜帶BCE的信息給節(jié)點(diǎn)D,那么在DB砍聊,DC背稼,DE之間建立連接
- 節(jié)點(diǎn)AE通信過(guò)程中,節(jié)點(diǎn)A會(huì)攜帶BCD的信息給節(jié)點(diǎn)E玻蝌,那么在EB蟹肘,EC词疼,ED之間建立連接
至此節(jié)點(diǎn)ABCDE之間就建立了相互之間的連接了。
這里想說(shuō)明的是兩個(gè)節(jié)點(diǎn)之間通信的時(shí)候帘腹,發(fā)起者會(huì)攜帶它自身知道的其他節(jié)點(diǎn)給對(duì)方節(jié)點(diǎn)贰盗,通過(guò)這種方式實(shí)現(xiàn)整體網(wǎng)絡(luò)的建立
cluster加入集群源碼
?cluster meet命令被用來(lái)連接不同的開(kāi)啟集群支持的 Redis 節(jié)點(diǎn),以進(jìn)入工作集群阳欲。
- 系統(tǒng)管理員發(fā)送一個(gè)cluster meet命令強(qiáng)制一個(gè)節(jié)點(diǎn)去會(huì)面另一個(gè)節(jié)點(diǎn)舵盈。
- 一個(gè)已知的節(jié)點(diǎn)發(fā)送一個(gè)保存在 gossip 部分的節(jié)點(diǎn)列表,包含著未知的節(jié)點(diǎn)胸完。如果接收的節(jié)點(diǎn)已經(jīng)將發(fā)送節(jié)點(diǎn)信任為已知節(jié)點(diǎn)书释,它會(huì)處理 gossip 部分并且發(fā)送一個(gè)握手消息給未知的節(jié)點(diǎn)翘贮。
clusterCommand方法內(nèi)部有處理各種命令的邏輯赊窥,這里我只關(guān)注了處理meet命令的邏輯。
?clusterCommand內(nèi)部通過(guò)clusterStartHandshake執(zhí)行cluster meet當(dāng)中指定的地址去執(zhí)行握手協(xié)議狸页。
{"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
// CLUSTER 命令的實(shí)現(xiàn)
void clusterCommand(redisClient *c) {
// 不能在非集群模式下使用該命令
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
/* CLUSTER MEET <ip> <port> */
// 將給定地址的節(jié)點(diǎn)添加到當(dāng)前節(jié)點(diǎn)所處的集群里面
long long port;
// 檢查 port 參數(shù)的合法性
if (getLongLongFromObject(c->argv[3], &port) != REDIS_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s",
(char*)c->argv[3]->ptr);
return;
}
// 嘗試與給定地址的節(jié)點(diǎn)進(jìn)行連接
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 &&
errno == EINVAL)
{
// 連接失敗
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
// 連接成功
addReply(c,shared.ok);
}
}
}
?redis cluster在執(zhí)行握手的函數(shù)clusterStartHandshake當(dāng)中锨能,生成待連接的clusterNode對(duì)象并保存待連接的ip:port地址,并未真正執(zhí)行連接而只是生成了待連接的對(duì)象芍耘,真正的連接是在serverCron當(dāng)中完成的址遇。
/*
* 如果還沒(méi)有與指定的地址進(jìn)行過(guò)握手,那么進(jìn)行握手斋竞。
* 返回 1 表示握手已經(jīng)開(kāi)始倔约,
* 返回 0 并將 errno 設(shè)置為以下值來(lái)表示意外情況:
*
* EAGAIN - There is already an handshake in progress for this address.
* 已經(jīng)有握手在進(jìn)行中了。
* EINVAL - IP or port are not valid.
* ip 或者 port 參數(shù)不合法坝初。
*/
int clusterStartHandshake(char *ip, int port) {
clusterNode *n;
char norm_ip[REDIS_IP_STR_LEN];
struct sockaddr_storage sa;
// ip 合法性檢查
if (inet_pton(AF_INET,ip,
&(((struct sockaddr_in *)&sa)->sin_addr)))
{
sa.ss_family = AF_INET;
} else if (inet_pton(AF_INET6,ip,
&(((struct sockaddr_in6 *)&sa)->sin6_addr)))
{
sa.ss_family = AF_INET6;
} else {
errno = EINVAL;
return 0;
}
// port 合法性檢查
if (port <= 0 || port > (65535-REDIS_CLUSTER_PORT_INCR)) {
errno = EINVAL;
return 0;
}
if (sa.ss_family == AF_INET)
inet_ntop(AF_INET,
(void*)&(((struct sockaddr_in *)&sa)->sin_addr),
norm_ip,REDIS_IP_STR_LEN);
else
inet_ntop(AF_INET6,
(void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
norm_ip,REDIS_IP_STR_LEN);
// 檢查節(jié)點(diǎn)是否已經(jīng)發(fā)送握手請(qǐng)求浸剩,如果是的話,那么直接返回鳄袍,防止出現(xiàn)重復(fù)握手
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
// 對(duì)給定地址的節(jié)點(diǎn)設(shè)置一個(gè)隨機(jī)名字
// 當(dāng) HANDSHAKE 完成時(shí)绢要,當(dāng)前節(jié)點(diǎn)會(huì)取得給定地址節(jié)點(diǎn)的真正名字
// 到時(shí)會(huì)用真名替換隨機(jī)名,是在返回的pong報(bào)文當(dāng)中帶上真正的名字
n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
// 將節(jié)點(diǎn)添加到集群當(dāng)中
clusterAddNode(n);
return 1;
}
// 將給定 node 添加到節(jié)點(diǎn)表里面
int clusterAddNode(clusterNode *node) {
int retval;
// 將 node 添加到當(dāng)前節(jié)點(diǎn)的 nodes 表中
// 這樣接下來(lái)當(dāng)前節(jié)點(diǎn)就會(huì)創(chuàng)建連向 node 的節(jié)點(diǎn)
retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node);
return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR;
}
cluster集群發(fā)現(xiàn)過(guò)程
cluster集群發(fā)現(xiàn)過(guò)程-client端
?在serverCron當(dāng)中會(huì)調(diào)用clusterCron執(zhí)行redis cluster發(fā)現(xiàn)的邏輯拗小,整個(gè)邏輯如下:
- 遍歷server.cluster->nodes發(fā)現(xiàn)待握手的節(jié)點(diǎn)進(jìn)行連接重罪。
- 針對(duì)未建立連接的node創(chuàng)建對(duì)應(yīng)的ClusterLink,建立link和node之間的關(guān)聯(lián)哀九,link表示鏈接用于關(guān)聯(lián)本端的fd和遠(yuǎn)端的node剿配。
- 通過(guò)建立的socket連接發(fā)送meet報(bào)文即通過(guò)clusterSendPing去實(shí)現(xiàn)。
- 定期選擇一個(gè)node發(fā)送ping報(bào)文交換gossip信息
- 清理已經(jīng)下線的redis cluster node等等
// 集群常規(guī)操作函數(shù)阅束,默認(rèn)每秒執(zhí)行 10 次(每間隔 100 毫秒執(zhí)行一次)
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
// 迭代計(jì)數(shù)器惨篱,一個(gè)靜態(tài)變量
static unsigned long long iteration = 0;
mstime_t handshake_timeout;
// 記錄一次迭代
iteration++;
// 如果一個(gè) handshake 節(jié)點(diǎn)沒(méi)有在 handshake timeout 內(nèi)
// 轉(zhuǎn)換成普通節(jié)點(diǎn)(normal node),
// 那么節(jié)點(diǎn)會(huì)從 nodes 表中移除這個(gè) handshake 節(jié)點(diǎn)
// 一般來(lái)說(shuō) handshake timeout 的值總是等于 NODE_TIMEOUT
// 不過(guò)如果 NODE_TIMEOUT 太少的話围俘,程序會(huì)將值設(shè)為 1 秒鐘
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
// 向集群中的所有斷線或者未連接節(jié)點(diǎn)發(fā)送消息
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
// 跳過(guò)當(dāng)前節(jié)點(diǎn)以及沒(méi)有地址的節(jié)點(diǎn)
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
// 如果 handshake 節(jié)點(diǎn)已超時(shí)砸讳,釋放它
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
freeClusterNode(node);
continue;
}
// 為未創(chuàng)建連接的節(jié)點(diǎn)創(chuàng)建連接
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.bindaddr_count ? server.bindaddr[0] : NULL);
if (fd == -1) {
redisLog(REDIS_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->port+REDIS_CLUSTER_PORT_INCR,
server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
// 向新連接的節(jié)點(diǎn)發(fā)送 PING 命令琢融,防止節(jié)點(diǎn)被識(shí)進(jìn)入下線
// 如果節(jié)點(diǎn)被標(biāo)記為 MEET 预吆,那么發(fā)送 MEET 命令淤齐,否則發(fā)送 PING 命令
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & REDIS_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
// 這不是第一次發(fā)送 PING 信息,所以可以還原這個(gè)時(shí)間
// 等 clusterSendPing() 函數(shù)來(lái)更新它
if (old_ping_sent) {
node->ping_sent = old_ping_sent;
}
/*
* 在發(fā)送 MEET 信息之后碑诉,清除節(jié)點(diǎn)的 MEET 標(biāo)識(shí)常遂。
*
* 如果當(dāng)前節(jié)點(diǎn)(發(fā)送者)沒(méi)能收到 MEET 信息的回復(fù)纳令,
* 那么它將不再向目標(biāo)節(jié)點(diǎn)發(fā)送命令。
*
* 如果接收到回復(fù)的話克胳,那么節(jié)點(diǎn)將不再處于 HANDSHAKE 狀態(tài)平绩,
* 并繼續(xù)向目標(biāo)節(jié)點(diǎn)發(fā)送普通 PING 命令。
*/
node->flags &= ~REDIS_NODE_MEET;
redisLog(REDIS_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
// clusterCron() 每執(zhí)行 10 次(至少間隔一秒鐘)漠另,就向一個(gè)隨機(jī)節(jié)點(diǎn)發(fā)送 gossip 信息
if (!(iteration % 10)) {
int j;
// 隨機(jī) 5 個(gè)節(jié)點(diǎn)捏雌,選出其中一個(gè)
for (j = 0; j < 5; j++) {
// 隨機(jī)在集群中挑選節(jié)點(diǎn)
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
// 不要 PING 連接斷開(kāi)的節(jié)點(diǎn),也不要 PING 最近已經(jīng) PING 過(guò)的節(jié)點(diǎn)
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE))
continue;
// 選出 5 個(gè)隨機(jī)節(jié)點(diǎn)中最近一次接收 PONG 回復(fù)距離現(xiàn)在最舊的節(jié)點(diǎn)
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
// 向最久沒(méi)有收到 PONG 回復(fù)的節(jié)點(diǎn)發(fā)送 PING 命令
if (min_pong_node) {
redisLog(REDIS_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
// 遍歷所有節(jié)點(diǎn)笆搓,檢查是否需要將某個(gè)節(jié)點(diǎn)標(biāo)記為下線
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
// 跳過(guò)節(jié)點(diǎn)本身性湿、無(wú)地址節(jié)點(diǎn)、HANDSHAKE 狀態(tài)的節(jié)點(diǎn)
if (node->flags &
(REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
continue;
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);
if (okslaves == 0 && node->numslots > 0) orphaned_masters++;
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}
// 如果等到 PONG 到達(dá)的時(shí)間超過(guò)了 node timeout 一半的連接
// 因?yàn)楸M管節(jié)點(diǎn)依然正常满败,但連接可能已經(jīng)出問(wèn)題了
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout &&
node->ping_sent &&
node->pong_received < node->ping_sent &&
now - node->ping_sent > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
// 釋放連接肤频,下次 clusterCron() 會(huì)自動(dòng)重連
freeClusterLink(node->link);
}
// 如果目前沒(méi)有在 PING 節(jié)點(diǎn)
// 并且已經(jīng)有 node timeout 一半的時(shí)間沒(méi)有從節(jié)點(diǎn)那里收到 PONG 回復(fù)
// 那么向節(jié)點(diǎn)發(fā)送一個(gè) PING ,確保節(jié)點(diǎn)的信息不會(huì)太舊
// (因?yàn)橐徊糠止?jié)點(diǎn)可能一直沒(méi)有被隨機(jī)中)
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// 如果這是一個(gè)主節(jié)點(diǎn)算墨,并且有一個(gè)從服務(wù)器請(qǐng)求進(jìn)行手動(dòng)故障轉(zhuǎn)移
// 那么向從服務(wù)器發(fā)送 PING 宵荒。
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
// 以下代碼只在節(jié)點(diǎn)發(fā)送了 PING 命令的情況下執(zhí)行
if (node->ping_sent == 0) continue;
// 計(jì)算等待 PONG 回復(fù)的時(shí)長(zhǎng)
delay = now - node->ping_sent;
// 等待 PONG 回復(fù)的時(shí)長(zhǎng)超過(guò)了限制值,將目標(biāo)節(jié)點(diǎn)標(biāo)記為 PFAIL (疑似下線)
if (delay > server.cluster_node_timeout) {
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
// 打開(kāi)疑似下線標(biāo)記
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);
// 如果從節(jié)點(diǎn)沒(méi)有在復(fù)制主節(jié)點(diǎn)净嘀,那么對(duì)從節(jié)點(diǎn)進(jìn)行設(shè)置
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}
manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
clusterHandleSlaveFailover();
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}
// 更新集群狀態(tài)
if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL)
clusterUpdateState();
}
?clusterSendPing的過(guò)程中最核心的就是在發(fā)送ping報(bào)文或者meet報(bào)文的時(shí)候报咳,都會(huì)攜帶本節(jié)點(diǎn)已知的節(jié)點(diǎn)即gossip信息,這樣就可以擴(kuò)展本節(jié)點(diǎn)知道的節(jié)點(diǎn)信息給其他節(jié)點(diǎn)達(dá)到擴(kuò)展的目的面粮。
?按照源碼中的意思每次是攜帶至多3個(gè)已經(jīng)節(jié)點(diǎn)少孝,但是不知道為啥作者的注釋當(dāng)中寫(xiě)的是2。
// 向指定節(jié)點(diǎn)發(fā)送一條 MEET 熬苍、 PING 或者 PONG 消息
void clusterSendPing(clusterLink *link, int type) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
int gossipcount = 0, totlen;
// freshnodes 是用于發(fā)送 gossip 信息的計(jì)數(shù)器
// 每次發(fā)送一條信息時(shí)稍走,程序?qū)?freshnodes 的值減一
// 當(dāng) freshnodes 的數(shù)值小于等于 0 時(shí),程序停止發(fā)送 gossip 信息
// freshnodes 的數(shù)量是節(jié)點(diǎn)目前的 nodes 表中的節(jié)點(diǎn)數(shù)量減去 2
// 這里的 2 指兩個(gè)節(jié)點(diǎn)柴底,一個(gè)是 myself 節(jié)點(diǎn)(也即是發(fā)送信息的這個(gè)節(jié)點(diǎn))
// 另一個(gè)是接受 gossip 信息的節(jié)點(diǎn)
int freshnodes = dictSize(server.cluster->nodes)-2;
// 如果發(fā)送的信息是 PING 婿脸,那么更新最后一次發(fā)送 PING 命令的時(shí)間戳
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
// 將當(dāng)前節(jié)點(diǎn)的信息(比如名字、地址柄驻、端口號(hào)狐树、負(fù)責(zé)處理的槽)記錄到消息里面
clusterBuildMessageHdr(hdr,type);
// 從當(dāng)前節(jié)點(diǎn)已知的節(jié)點(diǎn)中隨機(jī)選出兩個(gè)節(jié)點(diǎn)
// 并通過(guò)這條消息捎帶給目標(biāo)節(jié)點(diǎn),從而實(shí)現(xiàn) gossip 協(xié)議
// 每個(gè)節(jié)點(diǎn)有 freshnodes 次發(fā)送 gossip 信息的機(jī)會(huì)
// 每次向目標(biāo)節(jié)點(diǎn)發(fā)送 2 個(gè)被選中節(jié)點(diǎn)的 gossip 信息(gossipcount 計(jì)數(shù))
while(freshnodes > 0 && gossipcount < 3) {
// 從 nodes 字典中隨機(jī)選出一個(gè)節(jié)點(diǎn)(被選中節(jié)點(diǎn))
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
/*
* 以下節(jié)點(diǎn)不能作為被選中節(jié)點(diǎn):
* 1)節(jié)點(diǎn)本身鸿脓。
* 2) 處于 HANDSHAKE 狀態(tài)的節(jié)點(diǎn)抑钟。
* 3) 帶有 NOADDR 標(biāo)識(shí)的節(jié)點(diǎn)
* 4) 因?yàn)椴惶幚砣魏尾鄱粩嚅_(kāi)連接的節(jié)點(diǎn)
*/
if (this == myself ||
this->flags & (REDIS_NODE_HANDSHAKE|REDIS_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--; /* otherwise we may loop forever. */
continue;
}
// 檢查被選中節(jié)點(diǎn)是否已經(jīng)在 hdr->data.ping.gossip 數(shù)組里面
// 如果是的話說(shuō)明這個(gè)節(jié)點(diǎn)之前已經(jīng)被選中了
// 不要再選中它(否則就會(huì)出現(xiàn)重復(fù))
for (j = 0; j < gossipcount; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,this->name,
REDIS_CLUSTER_NAMELEN) == 0) break;
}
if (j != gossipcount) continue;
// 這個(gè)被選中節(jié)點(diǎn)有效涯曲,計(jì)數(shù)器減一
freshnodes--;
// 指向 gossip 信息結(jié)構(gòu)
gossip = &(hdr->data.ping.gossip[gossipcount]);
// 將被選中節(jié)點(diǎn)的名字記錄到 gossip 信息
memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN);
// 將被選中節(jié)點(diǎn)的 PING 命令發(fā)送時(shí)間戳記錄到 gossip 信息
gossip->ping_sent = htonl(this->ping_sent);
// 將被選中節(jié)點(diǎn)的 PING 命令回復(fù)的時(shí)間戳記錄到 gossip 信息
gossip->pong_received = htonl(this->pong_received);
// 將被選中節(jié)點(diǎn)的 IP 記錄到 gossip 信息
memcpy(gossip->ip,this->ip,sizeof(this->ip));
// 將被選中節(jié)點(diǎn)的端口號(hào)記錄到 gossip 信息
gossip->port = htons(this->port);
// 將被選中節(jié)點(diǎn)的標(biāo)識(shí)值記錄到 gossip 信息
gossip->flags = htons(this->flags);
// 這個(gè)被選中節(jié)點(diǎn)有效,計(jì)數(shù)器增一
gossipcount++;
}
// 計(jì)算信息長(zhǎng)度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
// 將被選中節(jié)點(diǎn)的數(shù)量(gossip 信息中包含了多少個(gè)節(jié)點(diǎn)的信息)
// 記錄在 count 屬性里面
hdr->count = htons(gossipcount);
// 將信息的長(zhǎng)度記錄到信息里面
hdr->totlen = htonl(totlen);
// 發(fā)送信息
clusterSendMessage(link,buf,totlen);
}
cluster集群發(fā)現(xiàn)過(guò)程-server端
?在redis 的server端主要做兩個(gè)事情:
- 通過(guò)clusterAcceptHandler接受連接的socket并建立socket對(duì)應(yīng)的ClusterLink便于接收ping或者meet報(bào)文時(shí)候可以回應(yīng)報(bào)文在塔。
- 注冊(cè)fd的讀事件到處理函數(shù)clusterReadHandler當(dāng)中幻件。
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
clusterLink *link;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_VERBOSE,
"Accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
?clusterReadHandler的內(nèi)部主要讀取報(bào)文的內(nèi)容然后交由clusterProcessPacket去處理報(bào)文。
// 讀事件處理器
// 首先讀入內(nèi)容的頭蛔溃,以判斷讀入內(nèi)容的長(zhǎng)度
// 如果內(nèi)容是一個(gè) whole packet 绰沥,那么調(diào)用函數(shù)來(lái)處理這個(gè) packet 。
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[sizeof(clusterMsg)];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = (clusterLink*) privdata;
int readlen, rcvbuflen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// 盡可能地多讀數(shù)據(jù)
while(1) { /* Read as long as there is data to read. */
// 檢查輸入緩沖區(qū)的長(zhǎng)度
rcvbuflen = sdslen(link->rcvbuf);
// 頭信息(8 字節(jié))未讀入完
if (rcvbuflen < 8) {
/* First, obtain the first 8 bytes to get the full message
* length. */
readlen = 8 - rcvbuflen;
// 已讀入完整的信息
} else {
/* Finally read the full message. */
hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 8) {
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
redisLog(REDIS_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
// 記錄已讀入內(nèi)容長(zhǎng)度
readlen = ntohl(hdr->totlen) - rcvbuflen;
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}
// 讀入內(nèi)容
nread = read(fd,buf,readlen);
// 沒(méi)有內(nèi)容可讀
if (nread == -1 && errno == EAGAIN) return; /* No more data ready. */
// 處理讀入錯(cuò)誤
if (nread <= 0) {
/* I/O error... */
redisLog(REDIS_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
return;
} else {
/* Read data and recast the pointer to the new buffer. */
// 將讀入的內(nèi)容追加進(jìn)輸入緩沖區(qū)里面
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}
/* Total length obtained? Process this packet. */
// 檢查已讀入內(nèi)容的長(zhǎng)度贺待,看是否整條信息已經(jīng)被讀入了
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
// 如果是的話徽曲,執(zhí)行處理信息的函數(shù)
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return; /* Link no longer valid. */
}
}
}
}
?clusterProcessPacket內(nèi)部針對(duì)第一次發(fā)起連接的節(jié)點(diǎn)主要做了下面事情:
- 針對(duì)發(fā)起連接的redis 節(jié)點(diǎn)發(fā)起createClusterNode操作并添加到待連接的節(jié)點(diǎn)當(dāng)中。
- 針對(duì)攜帶的gossip信息麸塞,會(huì)解析里面的信息找到攜帶的redis 節(jié)點(diǎn)然后添加到待連接的節(jié)點(diǎn)當(dāng)中秃臣。
- 發(fā)送響應(yīng)報(bào)文,內(nèi)部攜帶這個(gè)節(jié)點(diǎn)的關(guān)鍵信息即節(jié)點(diǎn)名稱便于發(fā)起端redis節(jié)點(diǎn)維持正確的節(jié)點(diǎn)名和連接映射關(guān)系
- 整個(gè)內(nèi)部處理過(guò)程中處理的所有情況喘垂,這里只針對(duì)建立連接的過(guò)程做了簡(jiǎn)單分析甜刻。
/*
* 當(dāng)這個(gè)函數(shù)被調(diào)用時(shí)绍撞,說(shuō)明 node->rcvbuf 中有一條待處理的信息正勒。
* 信息處理完畢之后的釋放工作由調(diào)用者處理,所以這個(gè)函數(shù)只需負(fù)責(zé)處理信息就可以了傻铣。
*
* 如果函數(shù)返回 1 章贞,那么說(shuō)明處理信息時(shí)沒(méi)有遇到問(wèn)題,連接依然可用非洲。
* 如果函數(shù)返回 0 鸭限,那么說(shuō)明信息處理時(shí)遇到了不一致問(wèn)題
* (比如接收到的 PONG 是發(fā)送自不正確的發(fā)送者 ID 的),連接已經(jīng)被釋放两踏。
*/
int clusterProcessPacket(clusterLink *link) {
// 指向消息頭
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
// 消息的長(zhǎng)度
uint32_t totlen = ntohl(hdr->totlen);
// 消息的類型
uint16_t type = ntohs(hdr->type);
// 消息發(fā)送者的標(biāo)識(shí)
uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;
// 更新接受消息計(jì)數(shù)器
server.cluster->stats_bus_messages_received++;
redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
type, (unsigned long) totlen);
// 合法性檢查
if (totlen < 16) return 1;
if (ntohs(hdr->ver) != 0) return 1;
if (totlen > sdslen(link->rcvbuf)) return 1;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
uint16_t count = ntohs(hdr->count);
uint32_t explen; /* expected length of this packet */
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += (sizeof(clusterMsgDataGossip)*count);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAIL) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) +
ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART)
{
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
}
// 查找發(fā)送者節(jié)點(diǎn)
sender = clusterLookupNode(hdr->sender);
// 節(jié)點(diǎn)存在败京,并且不是 HANDSHAKE 節(jié)點(diǎn)
// 那么個(gè)更新節(jié)點(diǎn)的配置紀(jì)元信息
if (sender && !nodeInHandshake(sender)) {
/* Update our curretEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = senderCurrentEpoch;
/* Update the sender configEpoch if it is publishing a newer one. */
if (senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
}
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = mstime();
if (server.cluster->mf_end &&
nodeIsSlave(myself) &&
myself->slaveof == sender &&
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
server.cluster->mf_master_offset == 0)
{
server.cluster->mf_master_offset = sender->repl_offset;
redisLog(REDIS_WARNING,
"Received replication offset for paused "
"master manual failover: %lld",
server.cluster->mf_master_offset);
}
}
// 根據(jù)消息的類型,處理節(jié)點(diǎn)
// 這是一條 PING 消息或者 MEET 消息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node);
/*
* 如果當(dāng)前節(jié)點(diǎn)是第一次遇見(jiàn)這個(gè)節(jié)點(diǎn)梦染,并且對(duì)方發(fā)來(lái)的是 MEET 信息赡麦,
* 那么將這個(gè)節(jié)點(diǎn)添加到集群的節(jié)點(diǎn)列表里面。
*
* 節(jié)點(diǎn)目前的 flag 帕识、 slaveof 等屬性的值都是未設(shè)置的泛粹,
* 等當(dāng)前節(jié)點(diǎn)向?qū)Ψ桨l(fā)送 PING 命令之后,
* 這些信息可以從對(duì)方回復(fù)的 PONG 信息中取得肮疗。
*/
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
// 創(chuàng)建 HANDSHAKE 狀態(tài)的新節(jié)點(diǎn)
node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE);
// 設(shè)置 IP 和端口
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);
// 將新節(jié)點(diǎn)添加到集群
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
// 分析并取出消息中的 gossip 節(jié)點(diǎn)信息
clusterProcessGossipSection(hdr,link);
// 向目標(biāo)節(jié)點(diǎn)返回一個(gè) PONG
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}
// 這是一條 PING 晶姊、 PONG 或者 MEET 消息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
redisLog(REDIS_DEBUG,"%s packet received: %p",
type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
(void*)link->node);
// 連接的 clusterNode 結(jié)構(gòu)存在
if (link->node) {
// 節(jié)點(diǎn)處于 HANDSHAKE 狀態(tài)
if (nodeInHandshake(link->node)) {
if (sender) {
redisLog(REDIS_VERBOSE,
"Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name);
// 如果有需要的話,更新節(jié)點(diǎn)的地址
if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
// 釋放節(jié)點(diǎn)
freeClusterNode(link->node);
return 0;
}
// 用節(jié)點(diǎn)的真名替換在 HANDSHAKE 時(shí)創(chuàng)建的隨機(jī)名字
clusterRenameNode(link->node, hdr->sender);
redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
// 關(guān)閉 HANDSHAKE 狀態(tài)
link->node->flags &= ~REDIS_NODE_HANDSHAKE;
// 設(shè)置節(jié)點(diǎn)的角色
link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
// 節(jié)點(diǎn)已存在伪货,但它的 id 和當(dāng)前節(jié)點(diǎn)保存的 id 不同
} else if (memcmp(link->node->name,hdr->sender,
REDIS_CLUSTER_NAMELEN) != 0)
{
// 那么將這個(gè)節(jié)點(diǎn)設(shè)為 NOADDR
// 并斷開(kāi)連接
redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
link->node->flags |= REDIS_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
// 斷開(kāi)連接
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}
// 如果發(fā)送的消息為 PING
// 并且發(fā)送者不在 HANDSHAKE 狀態(tài)
// 那么更新發(fā)送者的信息
if (sender && type == CLUSTERMSG_TYPE_PING &&
!nodeInHandshake(sender) &&
nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port)))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Update our info about the node */
// 如果這是一條 PONG 消息们衙,那么更新我們關(guān)于 node 節(jié)點(diǎn)的認(rèn)識(shí)
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
// 最后一次接到該節(jié)點(diǎn)的 PONG 的時(shí)間
link->node->pong_received = mstime();
// 清零最近一次等待 PING 命令的時(shí)間
link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external
* help if it is momentary (that is, if it does not
* turn into a FAIL state).
*
* 接到節(jié)點(diǎn)的 PONG 回復(fù)钾怔,我們可以移除節(jié)點(diǎn)的 PFAIL 狀態(tài)。
*
* The FAIL condition is also reversible under specific
* conditions detected by clearNodeFailureIfNeeded().
*
* 如果節(jié)點(diǎn)的狀態(tài)為 FAIL 蒙挑,
* 那么是否撤銷該狀態(tài)要根據(jù) clearNodeFailureIfNeeded() 函數(shù)來(lái)決定蒂教。
*/
if (nodeTimedOut(link->node)) {
// 撤銷 PFAIL
link->node->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
} else if (nodeFailed(link->node)) {
// 看是否可以撤銷 FAIL
clearNodeFailureIfNeeded(link->node);
}
}
/* Check for role switch: slave -> master or master -> slave. */
// 檢測(cè)節(jié)點(diǎn)的身份信息,并在需要時(shí)進(jìn)行更新
if (sender) {
// 發(fā)送消息的節(jié)點(diǎn)的 slaveof 為 REDIS_NODE_NULL_NAME
// 那么 sender 就是一個(gè)主節(jié)點(diǎn)
if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
sizeof(hdr->slaveof)))
{
/* Node is a master. */
// 設(shè)置 sender 為主節(jié)點(diǎn)
clusterSetNodeAsMaster(sender);
// sender 的 slaveof 不為空脆荷,那么這是一個(gè)從節(jié)點(diǎn)
} else {
/* Node is a slave. */
// 取出 sender 的主節(jié)點(diǎn)
clusterNode *master = clusterLookupNode(hdr->slaveof);
// sender 由主節(jié)點(diǎn)變成了從節(jié)點(diǎn)凝垛,重新配置 sender
if (nodeIsMaster(sender)) {
/* Master turned into a slave! Reconfigure the node. */
// 刪除所有由該節(jié)點(diǎn)負(fù)責(zé)的槽
clusterDelNodeSlots(sender);
// 更新標(biāo)識(shí)
sender->flags &= ~REDIS_NODE_MASTER;
sender->flags |= REDIS_NODE_SLAVE;
/* Remove the list of slaves from the node. */
// 移除 sender 的從節(jié)點(diǎn)名單
if (sender->numslaves) clusterNodeResetSlaves(sender);
/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
/* Master node changed for this slave? */
// 檢查 sender 的主節(jié)點(diǎn)是否變更
if (master && sender->slaveof != master) {
// 如果 sender 之前的主節(jié)點(diǎn)不是現(xiàn)在的主節(jié)點(diǎn)
// 那么在舊主節(jié)點(diǎn)的從節(jié)點(diǎn)列表中移除 sender
if (sender->slaveof)
clusterNodeRemoveSlave(sender->slaveof,sender);
// 并在新主節(jié)點(diǎn)的從節(jié)點(diǎn)列表中添加 sender
clusterNodeAddSlave(master,sender);
// 更新 sender 的主節(jié)點(diǎn)
sender->slaveof = master;
/* Update config. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
}
/*
* 更新當(dāng)前節(jié)點(diǎn)對(duì) sender 所處理槽的認(rèn)識(shí)。
*
* 這部分的更新 *必須* 在更新 sender 的主/從節(jié)點(diǎn)信息之后蜓谋,
* 因?yàn)檫@里需要用到 REDIS_NODE_MASTER 標(biāo)識(shí)梦皮。
*/
clusterNode *sender_master = NULL; /* Sender or its master if slave. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */
if (sender) {
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}
/* 1)
* 如果 sender 是主節(jié)點(diǎn),并且 sender 的槽布局出現(xiàn)了變動(dòng)
* 那么檢查當(dāng)前節(jié)點(diǎn)對(duì) sender 的槽布局設(shè)置桃焕,看是否需要進(jìn)行更新
*/
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
/* 2)
* 檢測(cè)和條件 1 的相反條件剑肯,也即是,
* sender 處理的槽的配置紀(jì)元比當(dāng)前節(jié)點(diǎn)已知的某個(gè)節(jié)點(diǎn)的配置紀(jì)元要低观堂,
* 如果是這樣的話让网,通知 sender 。
*
* 這種情況可能會(huì)出現(xiàn)在網(wǎng)絡(luò)分裂中师痕,
* 一個(gè)重新上線的主節(jié)點(diǎn)可能會(huì)帶有已經(jīng)過(guò)時(shí)的槽布局溃睹。
*
* 比如說(shuō):
*
* A 負(fù)責(zé)槽 1 、 2 胰坟、 3 因篇,而 B 是 A 的從節(jié)點(diǎn)。
*
* A 從網(wǎng)絡(luò)中分裂出去笔横,B 被提升為主節(jié)點(diǎn)竞滓。
*
* B 從網(wǎng)絡(luò)中分裂出去, A 重新上線(但是它所使用的槽布局是舊的)吹缔。
*
* 在正常情況下商佑, B 應(yīng)該向 A 發(fā)送 PING 消息,告知 A 厢塘,自己(B)已經(jīng)接替了
* 槽 1茶没、 2、 3 俗冻,并且?guī)в懈呐渲眉o(jì)元礁叔,但因?yàn)榫W(wǎng)絡(luò)分裂的緣故,
* 節(jié)點(diǎn) B 沒(méi)辦法通知節(jié)點(diǎn) A 迄薄,
* 所以通知節(jié)點(diǎn) A 它帶有的槽布局已經(jīng)更新的工作就交給其他知道 B 帶有更高配置紀(jì)元的節(jié)點(diǎn)來(lái)做琅关。
* 當(dāng) A 接到其他節(jié)點(diǎn)關(guān)于節(jié)點(diǎn) B 的消息時(shí),
* 節(jié)點(diǎn) A 就會(huì)停止自己的主節(jié)點(diǎn)工作,又或者重新進(jìn)行故障轉(zhuǎn)移涣易。
*/
if (sender && dirty_slots) {
int j;
for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
// 檢測(cè) slots 中的槽 j 是否已經(jīng)被指派
if (bitmapTestBit(hdr->myslots,j)) {
// 當(dāng)前節(jié)點(diǎn)認(rèn)為槽 j 由 sender 負(fù)責(zé)處理画机,
// 或者當(dāng)前節(jié)點(diǎn)認(rèn)為該槽未指派,那么跳過(guò)該槽
if (server.cluster->slots[j] == sender ||
server.cluster->slots[j] == NULL) continue;
// 當(dāng)前節(jié)點(diǎn)槽 j 的配置紀(jì)元比 sender 的配置紀(jì)元要大
if (server.cluster->slots[j]->configEpoch >
senderConfigEpoch)
{
redisLog(REDIS_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
sender->name, server.cluster->slots[j]->name);
// 向 sender 發(fā)送關(guān)于槽 j 的更新信息
clusterSendUpdate(sender->link,
server.cluster->slots[j]);
/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
break;
}
}
}
}
if (sender &&
nodeIsMaster(myself) && nodeIsMaster(sender) &&
senderConfigEpoch == myself->configEpoch)
{
clusterHandleConfigEpochCollision(sender);
}
// 分析并提取出消息 gossip 協(xié)議部分的信息
clusterProcessGossipSection(hdr,link);
// 這是一條 FAIL 消息: sender 告知當(dāng)前節(jié)點(diǎn)新症,某個(gè)節(jié)點(diǎn)已經(jīng)進(jìn)入 FAIL 狀態(tài)步氏。
} else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;
if (sender) {
// 獲取下線節(jié)點(diǎn)的消息
failing = clusterLookupNode(hdr->data.fail.about.nodename);
// 下線的節(jié)點(diǎn)既不是當(dāng)前節(jié)點(diǎn),也沒(méi)有處于 FAIL 狀態(tài)
if (failing &&
!(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF)))
{
redisLog(REDIS_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
// 打開(kāi) FAIL 狀態(tài)
failing->flags |= REDIS_NODE_FAIL;
failing->fail_time = mstime();
// 關(guān)閉 PFAIL 狀態(tài)
failing->flags &= ~REDIS_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
} else {
redisLog(REDIS_NOTICE,
"Ignoring FAIL message from unknonw node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
// 這是一條 PUBLISH 消息
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
robj *channel, *message;
uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
// 只在有訂閱者時(shí)創(chuàng)建消息對(duì)象
if (dictSize(server.pubsub_channels) ||
listLength(server.pubsub_patterns))
{
// 頻道長(zhǎng)度
channel_len = ntohl(hdr->data.publish.msg.channel_len);
// 消息長(zhǎng)度
message_len = ntohl(hdr->data.publish.msg.message_len);
// 頻道
channel = createStringObject(
(char*)hdr->data.publish.msg.bulk_data,channel_len);
// 消息
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
// 發(fā)送消息
pubsubPublishMessage(channel,message);
decrRefCount(channel);
decrRefCount(message);
}
// 這是一條請(qǐng)求獲得故障遷移授權(quán)的消息: sender 請(qǐng)求當(dāng)前節(jié)點(diǎn)為它進(jìn)行故障轉(zhuǎn)移投票
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
// 如果條件允許的話徒爹,向 sender 投票荚醒,支持它進(jìn)行故障轉(zhuǎn)移
clusterSendFailoverAuthIfNeeded(sender,hdr);
// 這是一條故障遷移投票信息: sender 支持當(dāng)前節(jié)點(diǎn)執(zhí)行故障轉(zhuǎn)移操作
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
// 只有正在處理至少一個(gè)槽的主節(jié)點(diǎn)的投票會(huì)被視為是有效投票
// 只有符合以下條件, sender 的投票才算有效:
// 1) sender 是主節(jié)點(diǎn)
// 2) sender 正在處理至少一個(gè)槽
// 3) sender 的配置紀(jì)元大于等于當(dāng)前節(jié)點(diǎn)的配置紀(jì)元
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
// 增加支持票數(shù)
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
if (!sender || sender->slaveof != myself) return 1;
resetManualFailover();
server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
server.cluster->mf_slave = sender;
pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
clusterNode *n; /* The node the update is about. */
uint64_t reportedConfigEpoch =
ntohu64(hdr->data.update.nodecfg.configEpoch);
if (!sender) return 1;
// 獲取需要更新的節(jié)點(diǎn)
n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
if (!n) return 1; /* We don't know the reported node. */
// 消息的紀(jì)元并不大于節(jié)點(diǎn) n 所處的配置紀(jì)元
// 無(wú)須更新
if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
// 如果節(jié)點(diǎn) n 為從節(jié)點(diǎn)隆嗅,但它的槽配置更新了
// 那么說(shuō)明這個(gè)節(jié)點(diǎn)已經(jīng)變?yōu)橹鞴?jié)點(diǎn)界阁,將它設(shè)置為主節(jié)點(diǎn)
if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
n->configEpoch = reportedConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
// 將消息中對(duì) n 的槽布局與當(dāng)前節(jié)點(diǎn)對(duì) n 的槽布局進(jìn)行對(duì)比
// 在有需要時(shí)更新當(dāng)前節(jié)點(diǎn)對(duì) n 的槽布局的認(rèn)識(shí)
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
} else {
redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
}
return 1;
}
?clusterProcessGossipSection在建立連接的過(guò)程中主要是解析攜帶的gossip信息并添加到待連接節(jié)點(diǎn)當(dāng)中。
/*
* 解釋 MEET 胖喳、 PING 或 PONG 消息中和 gossip 協(xié)議有關(guān)的信息泡躯。
*
* 注意,這個(gè)函數(shù)假設(shè)調(diào)用者已經(jīng)根據(jù)消息的長(zhǎng)度丽焊,對(duì)消息進(jìn)行過(guò)合法性檢查较剃。
*/
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
// 記錄這條消息中包含了多少個(gè)節(jié)點(diǎn)的信息
uint16_t count = ntohs(hdr->count);
// 指向第一個(gè)節(jié)點(diǎn)的信息
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
// 取出發(fā)送者
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
// 遍歷所有節(jié)點(diǎn)的信息
while(count--) {
sds ci = sdsempty();
// 分析節(jié)點(diǎn)的 flag
uint16_t flags = ntohs(g->flags);
// 信息節(jié)點(diǎn)
clusterNode *node;
// 取出節(jié)點(diǎn)的 flag
if (flags == 0) ci = sdscat(ci,"noflags,");
if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,");
if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ci);
sdsfree(ci);
// 使用消息中的信息對(duì)節(jié)點(diǎn)進(jìn)行更新
node = clusterLookupNode(g->nodename);
// 節(jié)點(diǎn)已經(jīng)存在于當(dāng)前節(jié)點(diǎn)
if (node) {
// 如果 sender 是一個(gè)主節(jié)點(diǎn),那么我們需要處理下線報(bào)告
if (sender && nodeIsMaster(sender) && node != myself) {
// 節(jié)點(diǎn)處于 FAIL 或者 PFAIL 狀態(tài)
if (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) {
// 添加 sender 對(duì) node 的下線報(bào)告
if (clusterNodeAddFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
// 嘗試將 node 標(biāo)記為 FAIL
markNodeAsFailingIfNeeded(node);
// 節(jié)點(diǎn)處于正常狀態(tài)
} else {
// 如果 sender 曾經(jīng)發(fā)送過(guò)對(duì) node 的下線報(bào)告
// 那么清除該報(bào)告
if (clusterNodeDelFailureReport(node,sender)) {
redisLog(REDIS_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
// 如果節(jié)點(diǎn)之前處于 PFAIL 或者 FAIL 狀態(tài)
// 并且該節(jié)點(diǎn)的 IP 或者端口號(hào)已經(jīng)發(fā)生變化
// 那么可能是節(jié)點(diǎn)換了新地址技健,嘗試對(duì)它進(jìn)行握手
if (node->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port)))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
// 當(dāng)前節(jié)點(diǎn)不認(rèn)識(shí) node
} else {
/*
* 如果 node 不在 NOADDR 狀態(tài)写穴,并且當(dāng)前節(jié)點(diǎn)不認(rèn)識(shí) node
* 那么向 node 發(fā)送 HANDSHAKE 消息。
*
* 注意凫乖,當(dāng)前節(jié)點(diǎn)必須保證 sender 是本集群的節(jié)點(diǎn)确垫,
* 否則我們將有加入了另一個(gè)集群的風(fēng)險(xiǎn)弓颈。
*/
if (sender &&
!(flags & REDIS_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port));
}
}
/* Next node */
// 處理下個(gè)節(jié)點(diǎn)的信息
g++;
}
}