Redis cluster 是 redis 官方提出的分布式集群解決方案叫胁,在此之前凰慈,有一些第三方的可選方案,如 codis驼鹅、Twemproxy等微谓。cluster 內(nèi)部使用了 gossip 協(xié)議進(jìn)行通信,以達(dá)到數(shù)據(jù)的最終一致性输钩。詳細(xì)介紹可參考官網(wǎng) Redis cluster tutorial豺型。
本文試圖借著cluster meet 命令的實(shí)現(xiàn)來對其中的一些通信細(xì)節(jié)一探究竟。
我們都知道买乃,當(dāng) redis server 以 cluster mode 啟動(dòng)時(shí)姻氨,節(jié)點(diǎn) A 想加入節(jié)點(diǎn) B 所在的集群,只需要執(zhí)行 CLUSTER MEET ip port 這個(gè)命令即可剪验,通過 gossip 通信肴焊,最終 B 所在集群的其他節(jié)點(diǎn)也都會(huì)認(rèn)識到 A。
cluster 初始化
當(dāng) redis server 以 cluster mode 啟動(dòng)時(shí)功戚,即配置文件中的 cluster-enabled 選項(xiàng)設(shè)置為 true娶眷,此時(shí)在服務(wù)啟動(dòng)時(shí),會(huì)有一個(gè) cluster 初始化的流程疫铜,即執(zhí)行函數(shù) clusterInit茂浮。在 cluster 中有三個(gè)數(shù)據(jù)結(jié)構(gòu)很重要, clusterState 壳咕、 clusterNode 和 clusterLink席揽。
每個(gè)節(jié)點(diǎn)都保存著一個(gè) clusterState 結(jié)構(gòu),這個(gè)結(jié)構(gòu)記錄了在當(dāng)前節(jié)點(diǎn)的視角下谓厘,集群目前所處的狀態(tài)幌羞,即“我看到的世界是什么樣子”。
每個(gè)節(jié)點(diǎn)都會(huì)使用一個(gè) clusterNode 結(jié)構(gòu)來記錄自己的狀態(tài)竟稳, 并為集群中的所有其他節(jié)點(diǎn)(包括主節(jié)點(diǎn)和從節(jié)點(diǎn))都創(chuàng)建一個(gè)相應(yīng)的 clusterNode 結(jié)構(gòu)属桦, 以此來記錄其他節(jié)點(diǎn)的狀態(tài)熊痴。
clusterNode 結(jié)構(gòu)的 link 屬性是一個(gè) clusterLink 結(jié)構(gòu), 該結(jié)構(gòu)保存了連接節(jié)點(diǎn)所需的有關(guān)信息聂宾, 比如套接字描述符果善, 輸入緩沖區(qū)和輸出緩沖區(qū)。
該初始化很簡單系谐,首先是創(chuàng)建一個(gè) clusterState 結(jié)構(gòu)巾陕,并初始化一些成員,如下:
server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0; // 新節(jié)點(diǎn)的 currentEpoch = 0
server.cluster->state = CLUSTER_FAIL; // 初始狀態(tài)置為 FAIL
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list = dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
server.cluster->stats_bus_messages_sent = 0;
server.cluster->stats_bus_messages_received = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots(); // Clear the migrating/importing state for all the slots
然后給 node.conf 文件加鎖纪他,確保每個(gè)節(jié)點(diǎn)使用自己的 cluster 配置文件鄙煤。
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
exit(1);
int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
serverLog(LL_WARNING,
"Can't open %s in order to acquire a lock: %s",
filename, strerror(errno));
return C_ERR;
}
if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
if (errno == EWOULDBLOCK) {
serverLog(LL_WARNING,
"Sorry, the cluster configuration file %s is already used "
"by a different Redis Cluster node. Please make sure that "
"different nodes use different cluster configuration "
"files.", filename);
} else {
serverLog(LL_WARNING,
"Impossible to lock %s: %s", filename, strerror(errno));
}
close(fd);
return C_ERR;
}
然后加載 node.conf 文件,這個(gè)過程還會(huì)檢查這個(gè)文件是否合理茶袒。
如果加載失斕莞铡(或者配置文件不存在),則以 REDIS_NODE_MYSELF|REDIS_NODE_MASTER 為標(biāo)記薪寓,創(chuàng)建一個(gè)clusterNode 結(jié)構(gòu)表示自己本身亡资,置為主節(jié)點(diǎn),并設(shè)置自己的名字為一個(gè)40字節(jié)的隨機(jī)串向叉;然后將該節(jié)點(diǎn)添加到server.cluster->nodes中沟于,這說明這是個(gè)新啟動(dòng)的節(jié)點(diǎn),生成的配置文件進(jìn)行刷盤植康。
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
myself = server.cluster->myself =
createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1); // 新節(jié)點(diǎn)旷太,將配置刷到配置文件中,fsync
接下來销睁,調(diào)用 listenToPort 函數(shù)供璧,在集群 gossip 通信端口上創(chuàng)建 socket fd 進(jìn)行監(jiān)聽。集群內(nèi) gossip 通信端口是在 Redis 監(jiān)聽端口基礎(chǔ)上加 10000,比如如果Redis監(jiān)聽客戶端的端口為 6379,則集群監(jiān)聽端口就是16379省容,該監(jiān)聽端口用于接收其他集群節(jié)點(diǎn)發(fā)送過來的 gossip 消息死讹。
然后注冊監(jiān)聽端口上的可讀事件孝凌,事件回調(diào)函數(shù)為 clusterAcceptHandler。
#define CLUSTER_PORT_INCR 10000
if (listenToPort(server.port+CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == C_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
serverPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
當(dāng)前節(jié)點(diǎn)收到其他集群節(jié)點(diǎn)發(fā)來的TCP建鏈請求之后,就會(huì)調(diào)用 clusterAcceptHandler 函數(shù) accept 連接。在 clusterAcceptHandler函數(shù)中钠至,對于每個(gè)已經(jīng) accept 的鏈接,都會(huì)創(chuàng)建一個(gè)clusterLink 結(jié)構(gòu)表示該鏈接胎源,并注冊 socket fd上的可讀事件棉钧,事件回調(diào)函數(shù)為 clusterReadHandler。
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
clusterLink *link;
... ...
// 如果服務(wù)器正在啟動(dòng)涕蚤,不要接受其他節(jié)點(diǎn)的連接, 因?yàn)?UPDATE 消息可能會(huì)干擾數(shù)據(jù)庫內(nèi)容
if (server.masterhost == NULL && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
... ...
// 創(chuàng)建一個(gè) link 結(jié)構(gòu)來處理連接
// 剛開始的時(shí)候宪卿, link->node 被設(shè)置成 null的诵,因?yàn)楝F(xiàn)在我們不知道是哪個(gè)節(jié)點(diǎn)
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
CLUSTER MEET
A 節(jié)點(diǎn)接收 CLUSTER MEET 命令
A 節(jié)點(diǎn)在cluster.c -> clusterCommand 函數(shù)中,接收到 CLUSTER MEET 命令佑钾,即
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) {
long long port;
// CLUSTER MEET <ip> <port>
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s", (char*)c->argv[3]->ptr);
return;
}
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
}
可以看到重點(diǎn)在 clusterStartHandshake 這個(gè)函數(shù)西疤。
int clusterStartHandshake(char *ip, int port) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;
/* IP and Port sanity check */
... ...
// 檢查節(jié)點(diǎn)(flag) norm_ip:port 是否正在握手
if (clusterHandshakeInProgress(norm_ip,port)) {
errno = EAGAIN;
return 0;
}
// 創(chuàng)建一個(gè)含隨機(jī)名字的 node,type 為 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET
// 相關(guān)信息會(huì)在 handshake 過程中被修復(fù)
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
clusterAddNode(n);
return 1;
}
clusterNode *createClusterNode(char *nodename, int flags) {
clusterNode *node = zmalloc(sizeof(*node));
if (nodename)
memcpy(node->name, nodename, CLUSTER_NAMELEN);
else
// 在本地新建一個(gè) nodename 節(jié)點(diǎn)休溶,節(jié)點(diǎn)名字隨機(jī)瘪阁,跟它通信時(shí)它會(huì)告訴我真實(shí)名字
getRandomHexChars(node->name, CLUSTER_NAMELEN);
node->ctime = mstime(); // mstime
node->configEpoch = 0;
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
node->slaveof = NULL;
... ...
node->link = NULL; // link 為空, 在 clusterCron 中能檢查的到
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->fail_reports = listCreate();
... ...
listSetFreeMethod(node->fail_reports,zfree);
return node;
}
這個(gè)函數(shù)會(huì)首先進(jìn)行一些 ip 和 port 的合理性檢查,然后去遍歷所看到的 nodes邮偎,這個(gè) ip:port 對應(yīng)的 node 是不是正處于 CLUSTER_NODE_HANDSHAKE 狀態(tài),是的話义黎,就說明這是重復(fù) meet禾进,沒必要往下走。之后廉涕,通過 createClusterNode 函數(shù)創(chuàng)建一個(gè)帶有 CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET 標(biāo)記的節(jié)點(diǎn)泻云,名字為一個(gè)隨機(jī)的 40 字節(jié)字符串(因?yàn)榇藭r(shí)對 A 來說,B 是一個(gè)陌生的節(jié)點(diǎn)狐蜕,信息除了 ip 和 port宠纯,其他都不知道),通過 clusterAddNode 函數(shù)加到自己的 nodes 中层释。
這個(gè)過程成功后婆瓜,就返回給客戶端 OK 了,其他事情需要通過 gossip 通信去做贡羔。
A 節(jié)點(diǎn)發(fā)送 MEET gossip 消息給 B 節(jié)點(diǎn)
A 節(jié)點(diǎn)在定時(shí)任務(wù) clusterCron
中廉白,會(huì)做一些事情。
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
// 檢查是否有 disconnected nodes 并且重新建立連接
di = dictGetSafeIterator(server.cluster->nodes); // 遍歷所有節(jié)點(diǎn)
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
// 忽略掉 myself 和 noaddr 狀態(tài)的節(jié)點(diǎn)
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
// 節(jié)點(diǎn)處于 handshake 狀態(tài)乖寒,且狀態(tài)維持時(shí)間超過 handshake_timeout猴蹂,那么從 nodes中刪掉它
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}
// 剛剛收到 cluster meet 命令創(chuàng)建的新 node ,或是 server 剛啟動(dòng)楣嘁,或是由于某種原因斷開了
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
// 對端 gossip 通信端口為 node 端口 + 10000磅轻,創(chuàng)建 tcp 連接, 本節(jié)點(diǎn)相當(dāng)于 client
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR);
... ...
link = createClusterLink(node);
link->fd = fd;
node->link = link;
// 注冊 link->fd 上的可讀事件,事件回調(diào)函數(shù)為 clusterReadHandler
aeCreateFileEvent(server.el,link->fd,AE_READABLE, clusterReadHandler,link);
... ...
// 如果 node 帶有 MEET flag逐虚,我們發(fā)送一個(gè) MEET 包而不是 PING,
// 這是為了強(qiáng)制讓接收者把我們加到它的 nodes 中
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ? CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
... ...
node->flags &= ~CLUSTER_NODE_MEET;
... ...
}
}
dictReleaseIterator(di);
可以看到聋溜,遍歷自己看到的 nodes,當(dāng)遍歷到 B 節(jié)點(diǎn)時(shí)叭爱,由于 node->link == NULL勤婚,因此會(huì)監(jiān)聽 B 的啟動(dòng)端口號+10000,即 gossip 通信端口涤伐,然后注冊可讀事件馒胆,處理函數(shù)為 clusterReadHandler缨称。接著會(huì)發(fā)送 CLUSTER_NODE_MEET 消息給 B 節(jié)點(diǎn),消除掉 B 節(jié)點(diǎn)的 meet 狀態(tài)祝迂。
B 節(jié)點(diǎn)處理 A 發(fā)來的 MEET gossip 消息
當(dāng) B 節(jié)點(diǎn)接收到 A 節(jié)點(diǎn)發(fā)送 gossip 時(shí)睦尽,回調(diào)函數(shù) clusterAcceptHandler 進(jìn)行處理,然后會(huì) accept 對端的 connect(B 作為 server型雳,對端作為 client)当凡,注冊可讀事件,回調(diào)函數(shù)為 clusterReadHandler纠俭,基本邏輯如下
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
clusterLink *link;
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// 如果服務(wù)器正在啟動(dòng)沿量,不要接受其他節(jié)點(diǎn)的鏈接,因?yàn)?UPDATE 消息可能會(huì)干擾數(shù)據(jù)庫內(nèi)容
if (server.masterhost == NULL && server.loading) return;
while(max--) { // 1000 個(gè)請求
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
// 創(chuàng)建一個(gè) link 結(jié)構(gòu)來處理連接
// 剛開始的時(shí)候冤荆, link->node 被設(shè)置成 null朴则,因?yàn)楝F(xiàn)在我們不知道是哪個(gè)節(jié)點(diǎn)
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}
可以看到每次 accept 對端connect時(shí),都會(huì)創(chuàng)建一個(gè) clusterLink 結(jié)構(gòu)用來接收數(shù)據(jù)钓简,
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
int fd; /* TCP socket file descriptor */
sds sndbuf; /* Packet send buffer */
sds rcvbuf; /* Packet reception buffer */
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
clusterLink 有一個(gè)指針是指向 node 自身的乌妒。
B 節(jié)點(diǎn)接收到 A 節(jié)點(diǎn)發(fā)送過來的信息,放到 clusterLink 的 rcvbuf 字段外邓,然后使用 clusterProcessPacket 函數(shù)來處理(接收數(shù)據(jù)過程很簡單撤蚊,不做分析)。
所以 clusterProcessPacket 函數(shù)的作用是處理別人發(fā)過來的 gossip 包损话。
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;
// 創(chuàng)建一個(gè)帶有 CLUSTER_NODE_HANDSHAKE 標(biāo)記的 cluster node侦啸,名字隨機(jī)
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link); // ip 和 port 信息均從 link 中獲得
node->port = ntohs(hdr->port);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
.....
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
由于這時(shí) B 節(jié)點(diǎn)還不認(rèn)識 A 節(jié)點(diǎn),因此 B 節(jié)點(diǎn)從自己的 nodes 中找 A 節(jié)點(diǎn)是找不到的丧枪,所以 sender 是空匹中,因此會(huì)走進(jìn)如上的這段邏輯。同樣以隨機(jī)的名字豪诲,CLUSTER_NODE_HANDSHAKE 為 flag 創(chuàng)建一個(gè) node顶捷,加入自己的 nodes 中。
在這個(gè)邏輯末尾會(huì)給 A 節(jié)點(diǎn)回復(fù)一個(gè) PONG 消息屎篱。
A 節(jié)點(diǎn)處理 B 節(jié)點(diǎn)回復(fù)的 PONG gossip 消息
同樣是在 clusterProcessPacket 中處理 gossip 消息服赎。
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) {
... ...
if (link->node) {
if (nodeInHandshake(link->node)) { // node 處于握手狀態(tài)
... ...
clusterRenameNode(link->node, hdr->sender); // 修正節(jié)點(diǎn)名
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; // 消除 handshake 狀態(tài)
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
這個(gè)時(shí)候 A 節(jié)點(diǎn)會(huì)根據(jù) B 節(jié)點(diǎn)發(fā)來的消息,更正 A 節(jié)點(diǎn) nodes 中關(guān)于 B 節(jié)點(diǎn)的名字交播,以及消除 handshake 狀態(tài)重虑。
B 節(jié)點(diǎn)發(fā)送 PING gossip 消息給 A 節(jié)點(diǎn)
當(dāng) B 節(jié)點(diǎn)在做 clusterCron 時(shí),發(fā)現(xiàn)自己看到的 A 節(jié)點(diǎn)中的 link 為空秦士,即 node->link == NULL缺厉,這與上面講的 A 節(jié)點(diǎn)給 B 節(jié)點(diǎn)發(fā) MEET 消息類似,不過在 B 節(jié)點(diǎn)看了 A 節(jié)點(diǎn)沒有 meet flag,因此發(fā)送的是 PING 消息提针。