死磕Redis5.0訂閱和發(fā)布

???????最近和一些朋友討論Redis的訂閱和發(fā)布功能,發(fā)現(xiàn)有些公司喜歡用Redis的訂閱和發(fā)布功能來當(dāng)作消息中間件來使用,當(dāng)時(shí)我就納悶懂酱,消息中間件比較牛逼的不就是那幾個(gè)RocketMQ各墨、Kafka、Rabbit MQ等專門的消息中間件么瘩绒,Redis 的訂閱發(fā)布功能也能當(dāng)消息中間件用猴抹?帶著這個(gè)疑問我們一起來探究一下Redis的訂閱和發(fā)布的實(shí)現(xiàn)吧。
???????文章分為以下幾個(gè)部分講解:
???????1. 涉及的命令
???????2. 數(shù)據(jù)結(jié)構(gòu)
???????3. 訂閱和發(fā)布主流程源碼分析
???????4. Redis 訂閱發(fā)布功能整的適合做消息中間件嗎锁荔?

一蟀给、涉及的命令

???????Redis 訂閱和發(fā)布非常簡單,一共就六個(gè)命令:psubscribe阳堕、publish跋理、pubsub、punsubscribe恬总、subscribe前普、unsubscribe。具體命令的使用大家可以參考 黃健宏老師總結(jié)的 Redis命令參考壹堰,黃健宏老師是我非常崇拜的一個(gè)人拭卿。黃健宏老師把 redis 所用到的命令都總結(jié)好了骡湖,我就不在這里再總結(jié)一遍了。

二峻厚、數(shù)據(jù)結(jié)構(gòu)

???????Redis 訂閱和發(fā)布有兩種類型响蕴,一種是頻道,還有一種就是模式惠桃。我們先看頻道的數(shù)據(jù)結(jié)構(gòu)浦夷。
???????Redis將所有頻道的訂閱關(guān)系都保存在服務(wù)器狀態(tài)的 pubsub_channels 字典里面,這個(gè)字典的鍵是被某個(gè)訂閱的頻道辜王,而鍵的值是一個(gè)鏈表劈狐,鏈表里面紀(jì)錄了所有訂閱這個(gè)頻道的客戶端:

// redisServer 中是使用字典保存的,這里保存著全部的頻道
struct redisServer {
    // ...
    // 保存所有頻道的訂閱關(guān)系
    dict *pubsub_channels;
    // ...
}
// client 中也會保存自己感興趣的頻道
typedef struct client {
    // client 中的感興趣的頻道
    dict *pubsub_channels;  
} client;

/*
 * 下面通過 pubsub.c 文件中的 pubsubSubscribeChannel 方法
 * 看看 channel 和 client 具體是如何映射的呐馆。
 */

/*
 * 將客戶訂閱到頻道懈息。 如果操作成功,則返回1摹恰,如果客戶端已訂閱該頻道辫继,則為0。
 */
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* 查看 client 是否已經(jīng)訂閱了該頻道 */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* 將客戶端添加到 channel - >client list 哈希表中 */

        /*
         * 查找指定頻道是否在 pubsub_channels 字典中存在俗慈,
         * 如果存在直接將客戶端添加到 clients 尾部即可姑宽。
         * 否則創(chuàng)建一個(gè) clients 鏈表,然后將 client 添加到 clients 中
         */
        de = dictFind(server.pubsub_channels,channel);
        // 如果根據(jù)該 channel 查出的值為 null闺阱,說明字典中還沒有該頻道信息
        if (de == NULL) {
            // 從這里我們可以看出多個(gè)客戶端是通過鏈表連接在一起的
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        // 頻道已經(jīng)存在炮车,直接添加到尾部
        listAddNodeTail(clients,c);
    }
    ...
}

通過源碼我們腦海中應(yīng)該有個(gè)大概的印象了,接著我們舉個(gè)栗子加深印象酣溃。比如:
① client-1瘦穆、client-2、client-3 三個(gè)客戶端正在訂閱 "order.it" 頻道
② client-4 正在訂閱 "order.sport" 頻道
③ client-5 和 client-6 兩個(gè)客戶端正在訂閱 "order.business" 頻道
則結(jié)構(gòu)如下圖:

image.png

上面就是頻道的訂閱關(guān)系圖赊豌,模式和頻道類似扛或,都是存儲到服務(wù)器狀態(tài)中,但是具體的數(shù)據(jù)結(jié)構(gòu)卻大不相同碘饼。

struct redisServer {
    // ...
    // 保存所有模式的訂閱關(guān)系
    list *pubsub_patterns;
    // ...
}
// client 中也會保存自己感興趣的模式
typedef struct client {
    // client 中的感興趣的模式
    list *pubsub_patterns;
} client;

/*
 * 我們可以看到 redisServer 中直接就是使用鏈表來存儲模式的
 * 下面我們看看具體的模式和 客戶端的映射關(guān)系吧
 */
/**
 * 訂閱模式的結(jié)構(gòu)體
 * 也就是 pubsub_patterns 鏈表中保存的結(jié)構(gòu)
 */
typedef struct pubsubPattern {
    /**
     * 客戶端
     */
    client *client;
    /**
     * 模式
     */
    robj *pattern;
} pubsubPattern;


/*
 * 下面我們看看 Redis 是如何構(gòu)造 pubsubPattern 并添加到 pubsub_patterns 中
 * 通過 pubsub.c 中的 pubsubSubscribePattern 方法我們可以看到全過程
 */

int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;
    // 查看 client 自己是否已經(jīng)訂閱該模式
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        // 沒有訂閱則將 pubsubPattern 結(jié)構(gòu)體加到 client 的 pubsub_patterns 中
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        // 將該模式和訂閱該模式的client 添加到服務(wù)端的 pubsub_patterns 鏈表中
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    ...
}

舉個(gè) demo熙兔,比如:
① client-7 正在訂閱 "music."。
② client-8 正在訂閱 "book.
"艾恼。
③ client-9 正在訂閱 "order.*".
則結(jié)構(gòu)圖如下

image.png

???????到這里Redis 的頻道和模式的數(shù)據(jù)結(jié)構(gòu)就解剖完了住涉,同學(xué)們都理解了么?看完頻道和模式的數(shù)據(jù)結(jié)構(gòu)钠绍,不知道同學(xué)們有沒有這樣的疑問舆声,頻道和模式到底有啥區(qū)別呢?下面我們就來看看他們之間到底有什么區(qū)別柳爽。我們還是通過 demo來了解吧媳握。
???????現(xiàn)在我們有 client-1碱屁、client-2、client-3毙芜、client-4 個(gè)客戶端,我們讓 client-1 訂閱"order.create"頻道争拐,讓 client-2 訂閱 "order.waitpay"腋粥,讓 client-3 訂閱 "order.pay" 頻道,讓 client-4 訂閱 "order.*" 模式架曹。然后我們分別往 "order.create"隘冲、"order.waitpay"、"order.pay" 發(fā)送消息绑雄,我們看看每個(gè)客戶端有何變化展辞。
client-1 訂閱 order.create 頻道:subscribe order.create


image.png

client-2 訂閱 order.waitpay 頻道:subscribe order.waitpay

image.png

client-3 訂閱 order.pay 頻道:subscribe order.pay

image.png

client-4 訂閱 order.* 模式:psubscribe order.*

image.png

然后我們使用一個(gè)客戶端分別往這幾個(gè)客戶端發(fā)送消息:

image.png

然后我們看看每個(gè)客戶端之間的變化
client-1:

image.png

client-2:

image.png

client-3:

image.png

client-4:

image.png

我們看到client-1、client-2万牺、client-3都只接受了和自己頻道相關(guān)的消息罗珍,但是 client-4 把發(fā)向 client-1、client-2脚粟、client-3 的消息都接收了覆旱,現(xiàn)在大家應(yīng)該明白了吧,模式其實(shí)就是模式匹配的概念核无,order.* 就表示匹配所有和 order 相關(guān)的消息扣唱。

三、訂閱和發(fā)布的源碼分析

我們就拿 publish order.create "order create" 這條消息來分析吧团南!直接上源碼分析:

/**
 * 發(fā)布一條消息
 *
 * 時(shí)間復(fù)雜度 O(N+M)噪沙,其中 N 是頻道 channel 的訂閱者數(shù)量,而 M 則是使用
 * 模式訂閱(subscribed patterns)的客戶端的數(shù)量吐根。
 * 
 * @param channel 頻道
 * @param message 消息體
 * @return 接收到信息 message 的訂閱者數(shù)量
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* 發(fā)送給監(jiān)聽該頻道的客戶端 */
    // 根據(jù)鍵值 channel 從字典中獲取 dictEntry 對象
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        // 從 dictEntry 中獲取監(jiān)聽 channel 的 client list
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        // 循環(huán)整個(gè)訂閱消息的列表正歼,然后發(fā)送消息
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            // 往指定的客戶端輸出緩沖區(qū)中發(fā)送消息
            // todo: 如果 client 消費(fèi)消息不及時(shí),那么 client 輸出緩沖區(qū)
                    // 就會造成消息堆積拷橘,會使 redis 內(nèi)存突然增大
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* 往監(jiān)聽了 channel 模式的 client 發(fā)送消息*/
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        // 循環(huán)整個(gè)模式鏈表
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            // 匹配指定的模式朋腋,找出指定模式對應(yīng)的客戶端,然后往
                    // 訂閱該模式的客戶端發(fā)送消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                // 往指定的客戶端輸出緩沖區(qū)中發(fā)送消息
                // todo: 如果 client 消費(fèi)消息不及時(shí)膜楷,那么 client 輸出緩沖區(qū)
                // 就會造成消息堆積旭咽,會使 redis 內(nèi)存突然增大
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

流程圖如下:

image.png

四、Redis 訂閱發(fā)布功能整的適合做消息中間件嗎赌厅?

???????通過上面的分析穷绵,我想大家心里應(yīng)該都已經(jīng)有答案了。我們根據(jù)上面的源碼分析特愿,可以舉一個(gè)小 demo仲墨,Redis 發(fā)送消息勾缭,是循環(huán)訂閱者列表實(shí)現(xiàn)的,比如我有 100 個(gè)頻道目养,每個(gè)頻道有100個(gè)訂閱者俩由,由于是單線程,豈不是要循環(huán)處理癌蚁,那么最后一個(gè)頻道的最后一個(gè)訂閱者豈不是會等死去幻梯。使用 redis 做消息中間件的,redis 并沒有提供消息重試機(jī)制努释,也沒有提供消息確認(rèn)機(jī)制碘梢,更沒有提供消息的持久化,所以一旦消息丟失伐蒂,我們是沒有任何辦法的煞躬。而且現(xiàn)在突然訂閱方斷線,那么他將會丟失所有在短線期間發(fā)布者發(fā)布的消息逸邦,這個(gè)決定會讓很多人都感到失望吧恩沛。所以還是建議大家不要使用 Redis 做消息中間件了,存在很大的風(fēng)險(xiǎn)缕减。如果要用复唤,還是使用強(qiáng)大的 RocketMQ 或 Kafka 吧。
???????文章到這里就結(jié)束了烛卧,本人水平有限佛纫,寫的不好還請大家多多見諒,如有不對的地方总放,希望大家多提意見呈宇,我也會盡快改正。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末局雄,一起剝皮案震驚了整個(gè)濱河市甥啄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌炬搭,老刑警劉巖蜈漓,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異宫盔,居然都是意外死亡融虽,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門灼芭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來有额,“玉大人,你說我怎么就攤上這事∥∮樱” “怎么了茴迁?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長萤衰。 經(jīng)常有香客問我堕义,道長,這世上最難降的妖魔是什么脆栋? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任倦卖,我火速辦了婚禮,結(jié)果婚禮上筹吐,老公的妹妹穿的比我還像新娘糖耸。我一直安慰自己秘遏,他們只是感情好丘薛,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布熙含。 她就那樣靜靜地躺著挎峦,像睡著了一般政冻。 火紅的嫁衣襯著肌膚如雪篮条。 梳的紋絲不亂的頭發(fā)上赃泡,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天侈离,我揣著相機(jī)與錄音殴穴,去河邊找鬼狱窘。 笑死陵且,一個(gè)胖子當(dāng)著我的面吹牛裁僧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播慕购,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼聊疲,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了沪悲?” 一聲冷哼從身側(cè)響起获洲,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎殿如,沒想到半個(gè)月后贡珊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡涉馁,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年门岔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片烤送。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡固歪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情牢裳,我是刑警寧澤逢防,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站蒲讯,受9級特大地震影響忘朝,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜判帮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一局嘁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧晦墙,春花似錦悦昵、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至抗楔,卻和暖如春棋凳,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背连躏。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工剩岳, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人入热。 一個(gè)月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓拍棕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親勺良。 傳聞我的和親對象是個(gè)殘疾皇子绰播,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容