???????最近和一些朋友討論Redis的訂閱和發(fā)布功能,發(fā)現(xiàn)有些公司喜歡用Redis的訂閱和發(fā)布功能來當(dāng)作消息中間件來使用,當(dāng)時(shí)我就納悶懂酱,消息中間件比較牛逼的不就是那幾個(gè)RocketMQ各墨、Kafka、Rabbit MQ等專門的消息中間件么瘩绒,Redis 的訂閱發(fā)布功能也能當(dāng)消息中間件用猴抹?帶著這個(gè)疑問我們一起來探究一下Redis的訂閱和發(fā)布的實(shí)現(xiàn)吧。
???????文章分為以下幾個(gè)部分講解:
???????1. 涉及的命令
???????2. 數(shù)據(jù)結(jié)構(gòu)
???????3. 訂閱和發(fā)布主流程源碼分析
???????4. Redis 訂閱發(fā)布功能整的適合做消息中間件嗎锁荔?
一蟀给、涉及的命令
???????Redis 訂閱和發(fā)布非常簡單,一共就六個(gè)命令:psubscribe阳堕、publish跋理、pubsub、punsubscribe恬总、subscribe前普、unsubscribe。具體命令的使用大家可以參考 黃健宏老師總結(jié)的 Redis命令參考壹堰,黃健宏老師是我非常崇拜的一個(gè)人拭卿。黃健宏老師把 redis 所用到的命令都總結(jié)好了骡湖,我就不在這里再總結(jié)一遍了。
二峻厚、數(shù)據(jù)結(jié)構(gòu)
???????Redis 訂閱和發(fā)布有兩種類型响蕴,一種是頻道,還有一種就是模式惠桃。我們先看頻道的數(shù)據(jù)結(jié)構(gòu)浦夷。
???????Redis將所有頻道的訂閱關(guān)系都保存在服務(wù)器狀態(tài)的 pubsub_channels 字典里面,這個(gè)字典的鍵是被某個(gè)訂閱的頻道辜王,而鍵的值是一個(gè)鏈表劈狐,鏈表里面紀(jì)錄了所有訂閱這個(gè)頻道的客戶端:
// redisServer 中是使用字典保存的,這里保存著全部的頻道
struct redisServer {
// ...
// 保存所有頻道的訂閱關(guān)系
dict *pubsub_channels;
// ...
}
// client 中也會保存自己感興趣的頻道
typedef struct client {
// client 中的感興趣的頻道
dict *pubsub_channels;
} client;
/*
* 下面通過 pubsub.c 文件中的 pubsubSubscribeChannel 方法
* 看看 channel 和 client 具體是如何映射的呐馆。
*/
/*
* 將客戶訂閱到頻道懈息。 如果操作成功,則返回1摹恰,如果客戶端已訂閱該頻道辫继,則為0。
*/
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* 查看 client 是否已經(jīng)訂閱了該頻道 */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* 將客戶端添加到 channel - >client list 哈希表中 */
/*
* 查找指定頻道是否在 pubsub_channels 字典中存在俗慈,
* 如果存在直接將客戶端添加到 clients 尾部即可姑宽。
* 否則創(chuàng)建一個(gè) clients 鏈表,然后將 client 添加到 clients 中
*/
de = dictFind(server.pubsub_channels,channel);
// 如果根據(jù)該 channel 查出的值為 null闺阱,說明字典中還沒有該頻道信息
if (de == NULL) {
// 從這里我們可以看出多個(gè)客戶端是通過鏈表連接在一起的
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// 頻道已經(jīng)存在炮车,直接添加到尾部
listAddNodeTail(clients,c);
}
...
}
通過源碼我們腦海中應(yīng)該有個(gè)大概的印象了,接著我們舉個(gè)栗子加深印象酣溃。比如:
① client-1瘦穆、client-2、client-3 三個(gè)客戶端正在訂閱 "order.it" 頻道
② client-4 正在訂閱 "order.sport" 頻道
③ client-5 和 client-6 兩個(gè)客戶端正在訂閱 "order.business" 頻道
則結(jié)構(gòu)如下圖:
上面就是頻道的訂閱關(guān)系圖赊豌,模式和頻道類似扛或,都是存儲到服務(wù)器狀態(tài)中,但是具體的數(shù)據(jù)結(jié)構(gòu)卻大不相同碘饼。
struct redisServer {
// ...
// 保存所有模式的訂閱關(guān)系
list *pubsub_patterns;
// ...
}
// client 中也會保存自己感興趣的模式
typedef struct client {
// client 中的感興趣的模式
list *pubsub_patterns;
} client;
/*
* 我們可以看到 redisServer 中直接就是使用鏈表來存儲模式的
* 下面我們看看具體的模式和 客戶端的映射關(guān)系吧
*/
/**
* 訂閱模式的結(jié)構(gòu)體
* 也就是 pubsub_patterns 鏈表中保存的結(jié)構(gòu)
*/
typedef struct pubsubPattern {
/**
* 客戶端
*/
client *client;
/**
* 模式
*/
robj *pattern;
} pubsubPattern;
/*
* 下面我們看看 Redis 是如何構(gòu)造 pubsubPattern 并添加到 pubsub_patterns 中
* 通過 pubsub.c 中的 pubsubSubscribePattern 方法我們可以看到全過程
*/
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;
// 查看 client 自己是否已經(jīng)訂閱該模式
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
// 沒有訂閱則將 pubsubPattern 結(jié)構(gòu)體加到 client 的 pubsub_patterns 中
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
// 將該模式和訂閱該模式的client 添加到服務(wù)端的 pubsub_patterns 鏈表中
listAddNodeTail(server.pubsub_patterns,pat);
}
...
}
舉個(gè) demo熙兔,比如:
① client-7 正在訂閱 "music."。
② client-8 正在訂閱 "book."艾恼。
③ client-9 正在訂閱 "order.*".
則結(jié)構(gòu)圖如下
???????到這里Redis 的頻道和模式的數(shù)據(jù)結(jié)構(gòu)就解剖完了住涉,同學(xué)們都理解了么?看完頻道和模式的數(shù)據(jù)結(jié)構(gòu)钠绍,不知道同學(xué)們有沒有這樣的疑問舆声,頻道和模式到底有啥區(qū)別呢?下面我們就來看看他們之間到底有什么區(qū)別柳爽。我們還是通過 demo來了解吧媳握。
???????現(xiàn)在我們有 client-1碱屁、client-2、client-3毙芜、client-4 個(gè)客戶端,我們讓 client-1 訂閱"order.create"頻道争拐,讓 client-2 訂閱 "order.waitpay"腋粥,讓 client-3 訂閱 "order.pay" 頻道,讓 client-4 訂閱 "order.*" 模式架曹。然后我們分別往 "order.create"隘冲、"order.waitpay"、"order.pay" 發(fā)送消息绑雄,我們看看每個(gè)客戶端有何變化展辞。
client-1 訂閱 order.create 頻道:subscribe order.create
client-2 訂閱 order.waitpay 頻道:subscribe order.waitpay
client-3 訂閱 order.pay 頻道:subscribe order.pay
client-4 訂閱 order.* 模式:psubscribe order.*
然后我們使用一個(gè)客戶端分別往這幾個(gè)客戶端發(fā)送消息:
然后我們看看每個(gè)客戶端之間的變化
client-1:
client-2:
client-3:
client-4:
我們看到client-1、client-2万牺、client-3都只接受了和自己頻道相關(guān)的消息罗珍,但是 client-4 把發(fā)向 client-1、client-2脚粟、client-3 的消息都接收了覆旱,現(xiàn)在大家應(yīng)該明白了吧,模式其實(shí)就是模式匹配的概念核无,order.* 就表示匹配所有和 order 相關(guān)的消息扣唱。
三、訂閱和發(fā)布的源碼分析
我們就拿 publish order.create "order create" 這條消息來分析吧团南!直接上源碼分析:
/**
* 發(fā)布一條消息
*
* 時(shí)間復(fù)雜度 O(N+M)噪沙,其中 N 是頻道 channel 的訂閱者數(shù)量,而 M 則是使用
* 模式訂閱(subscribed patterns)的客戶端的數(shù)量吐根。
*
* @param channel 頻道
* @param message 消息體
* @return 接收到信息 message 的訂閱者數(shù)量
*/
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
/* 發(fā)送給監(jiān)聽該頻道的客戶端 */
// 根據(jù)鍵值 channel 從字典中獲取 dictEntry 對象
de = dictFind(server.pubsub_channels,channel);
if (de) {
// 從 dictEntry 中獲取監(jiān)聽 channel 的 client list
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
// 循環(huán)整個(gè)訂閱消息的列表正歼,然后發(fā)送消息
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
// 往指定的客戶端輸出緩沖區(qū)中發(fā)送消息
// todo: 如果 client 消費(fèi)消息不及時(shí),那么 client 輸出緩沖區(qū)
// 就會造成消息堆積拷橘,會使 redis 內(nèi)存突然增大
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
/* 往監(jiān)聽了 channel 模式的 client 發(fā)送消息*/
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
// 循環(huán)整個(gè)模式鏈表
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
// 匹配指定的模式朋腋,找出指定模式對應(yīng)的客戶端,然后往
// 訂閱該模式的客戶端發(fā)送消息
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
// 往指定的客戶端輸出緩沖區(qū)中發(fā)送消息
// todo: 如果 client 消費(fèi)消息不及時(shí)膜楷,那么 client 輸出緩沖區(qū)
// 就會造成消息堆積旭咽,會使 redis 內(nèi)存突然增大
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
流程圖如下:
四、Redis 訂閱發(fā)布功能整的適合做消息中間件嗎赌厅?
???????通過上面的分析穷绵,我想大家心里應(yīng)該都已經(jīng)有答案了。我們根據(jù)上面的源碼分析特愿,可以舉一個(gè)小 demo仲墨,Redis 發(fā)送消息勾缭,是循環(huán)訂閱者列表實(shí)現(xiàn)的,比如我有 100 個(gè)頻道目养,每個(gè)頻道有100個(gè)訂閱者俩由,由于是單線程,豈不是要循環(huán)處理癌蚁,那么最后一個(gè)頻道的最后一個(gè)訂閱者豈不是會等死去幻梯。使用 redis 做消息中間件的,redis 并沒有提供消息重試機(jī)制努释,也沒有提供消息確認(rèn)機(jī)制碘梢,更沒有提供消息的持久化,所以一旦消息丟失伐蒂,我們是沒有任何辦法的煞躬。而且現(xiàn)在突然訂閱方斷線,那么他將會丟失所有在短線期間發(fā)布者發(fā)布的消息逸邦,這個(gè)決定會讓很多人都感到失望吧恩沛。所以還是建議大家不要使用 Redis 做消息中間件了,存在很大的風(fēng)險(xiǎn)缕减。如果要用复唤,還是使用強(qiáng)大的 RocketMQ 或 Kafka 吧。
???????文章到這里就結(jié)束了烛卧,本人水平有限佛纫,寫的不好還請大家多多見諒,如有不對的地方总放,希望大家多提意見呈宇,我也會盡快改正。