Redis奇幻之旅(二)5. stream

5. stream

? 很多同學(xué)并不認(rèn)識(shí)這個(gè)數(shù)據(jù)結(jié)構(gòu),確實(shí)骗绕,在Redis 5.0之前并沒有這個(gè)數(shù)據(jù)結(jié)構(gòu)藐窄。這個(gè)數(shù)據(jù)結(jié)構(gòu)稱之為“流”,為什么叫“流”呢酬土?這種數(shù)據(jù)就像流水一樣荆忍,不是一次潑了一盆過來而是一點(diǎn)一點(diǎn)地“流”過來。當(dāng)數(shù)據(jù)從生產(chǎn)方“流”到消費(fèi)方的時(shí)候撤缴,消費(fèi)方就可以對(duì)一點(diǎn)一點(diǎn)的數(shù)據(jù)進(jìn)行處理刹枉,這樣就保證了處理的時(shí)效性,也讓機(jī)器能平緩地使用資源屈呕。

? 事實(shí)上微宝,Redis的stream是作者的另一個(gè)開源項(xiàng)目Disque移植過來的,它高度借鑒了Kafka的設(shè)計(jì)虎眨,如果你有Kafka相關(guān)的知識(shí)芥吟,那么stream對(duì)你來說也很簡單侦铜。

5.1 相關(guān)命令

  • 消息隊(duì)列相關(guān)命令:

    XADD - 添加消息到末尾

    XTRIM - 對(duì)流進(jìn)行修剪,限制長度

    XDEL - 刪除消息

    XLEN - 獲取流包含的元素?cái)?shù)量钟鸵,即消息長度

    XRANGE - 獲取消息列表钉稍,會(huì)自動(dòng)過濾已經(jīng)刪除的消息

    XREVRANGE - 反向獲取消息列表,ID 從大到小

    XREAD - 以阻塞或非阻塞方式獲取消息列表

  • 消費(fèi)者組相關(guān)命令:

    XGROUP CREATE - 創(chuàng)建消費(fèi)者組

    XREADGROUP GROUP - 讀取消費(fèi)者組中的消息

    XACK - 將消息標(biāo)記為"已處理"

    XGROUP SETID - 為消費(fèi)者組設(shè)置新的最后遞送消息ID

    XGROUP DELCONSUMER - 刪除消費(fèi)者

    XGROUP DESTROY - 刪除消費(fèi)者組

    XPENDING - 顯示待處理消息的相關(guān)信息

    XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)

    XINFO - 查看流和消費(fèi)者組的相關(guān)信息

    XINFO GROUPS - 打印消費(fèi)者組的信息

    XINFO STREAM - 打印流信息

5.2 結(jié)構(gòu)源碼

消息ID:

/* Stream item ID: a 128 bit number composed of a milliseconds time and
 * a sequence counter. IDs generated in the same millisecond (or in a past
 * millisecond if the clock jumped backward) will use the millisecond time
 * of the latest generated ID and an incremented sequence. */
typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

流迭代器:

/* We define an iterator to iterate stream items in an abstract way, without
 * caring about the radix tree + listpack representation. Technically speaking
 * the iterator is only used inside streamReplyWithRange(), so could just
 * be implemented inside the function, but practically there is the AOF
 * rewriting code that also needs to iterate the stream to emit the XADD
 * commands. */
typedef struct streamIterator {
    stream *stream;         /* The stream we are iterating. */
    streamID master_id;     /* ID of the master entry at listpack head. */
    uint64_t master_fields_count;       /* Master entries # of fields. */
    unsigned char *master_fields_start; /* Master entries start in listpack. */
    unsigned char *master_fields_ptr;   /* Master field to emit next. */
    int entry_flags;                    /* Flags of entry we are emitting. */
    int rev;                /* True if iterating end to start (reverse). */
    uint64_t start_key[2];  /* Start key as 128 bit big endian. */
    uint64_t end_key[2];    /* End key as 128 bit big endian. */
    raxIterator ri;         /* Rax iterator. */
    unsigned char *lp;      /* Current listpack. */
    unsigned char *lp_ele;  /* Current listpack cursor. */
    unsigned char *lp_flags; /* Current entry flags pointer. */
    /* Buffers used to hold the string of lpGet() when the element is
     * integer encoded, so that there is no string representation of the
     * element inside the listpack itself. */
    unsigned char field_buf[LP_INTBUF_SIZE];
    unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;

?

流:

typedef struct stream {
    rax *rax;               /* The radix tree holding the stream. */
    uint64_t length;        /* Number of elements inside this stream. */
    streamID last_id;       /* Zero if there are yet no items. */
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

5.3 運(yùn)行機(jī)制

? 每個(gè) stream 都有唯一的名稱棺耍,它就是 Redis 的 key贡未,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。

? 每個(gè) stream 都可以掛多個(gè)消費(fèi)組(Consumer Group)蒙袍,每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id俊卤,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng),同一個(gè)消費(fèi)組的消費(fèi)者只能讀取之后的消息害幅。

? pending_ids是消費(fèi)者(Consumer)的狀態(tài)變量消恍,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 它記錄了當(dāng)前已經(jīng)被客戶端讀取的消息以现,但是還沒有 ack(Acknowledge character:確認(rèn)字符)的消息id狠怨。當(dāng)消息被取走,這個(gè)對(duì)應(yīng)的消息ID就會(huì)進(jìn)入消費(fèi)組的PEL(Pending Entries List)結(jié)構(gòu)里邑遏,當(dāng)ack之后佣赖,這個(gè)消息就會(huì)從PEL中刪除。如果stream中的消息被消費(fèi)者取走但是一直不ack记盒,那么PEL就會(huì)一直增長憎蛤,如果消費(fèi)者巨多,就可能出現(xiàn)內(nèi)存占用很大的情況纪吮。當(dāng)Redis服務(wù)器向消費(fèi)者發(fā)送數(shù)據(jù)的時(shí)候俩檬,客戶端斷開了連接,這樣丟失的消息會(huì)在客戶端重連之后繼續(xù)發(fā)送給它碾盟,因?yàn)橄D會(huì)保存在PEL之中豆胸。

? 除了這些,stream還提供maxlen參數(shù)巷疼,來限制自身的最長消息數(shù)晚胡,這樣就能保證數(shù)據(jù)不超過指定的長度。

5.4 異同

? stream 雖然和 Kafka 非常像嚼沿,有消費(fèi)組估盘、水位等概念,但它并不支持天然分區(qū)骡尽。也就是說遣妥,如果我們要進(jìn)行partition的話,只能建立多個(gè) stream key 攀细,然后由客戶端或者中間代理來將不同的消息路由到不同的 stream 中箫踩。

? 在 stream 被發(fā)布之前爱态,Redis本身就有 pub/sub 來實(shí)現(xiàn)消息隊(duì)列的功能。不過pub/sub有個(gè)極大的缺點(diǎn)就是不能持久化境钟,當(dāng)被訂閱的服務(wù)器pub一條消息锦担,如果沒有sub的客戶端,那么這個(gè)消息就會(huì)丟失慨削。在生產(chǎn)環(huán)境中我們沒辦法接受那么容易的丟失消息洞渔,并且 pub/sub 更適合用來做廣播,并不適合做消息隊(duì)列缚态。作者在Reids Cluster 中大量的使用了 pub/sub 這個(gè)功能磁椒。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者玫芦。
  • 序言:七十年代末浆熔,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子桥帆,更是在濱河造成了極大的恐慌医增,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件环葵,死亡現(xiàn)場離奇詭異调窍,居然都是意外死亡宝冕,警方通過查閱死者的電腦和手機(jī)张遭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來地梨,“玉大人菊卷,你說我怎么就攤上這事”ζ剩” “怎么了洁闰?”我有些...
    開封第一講書人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長万细。 經(jīng)常有香客問我扑眉,道長,這世上最難降的妖魔是什么赖钞? 我笑而不...
    開封第一講書人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任腰素,我火速辦了婚禮,結(jié)果婚禮上雪营,老公的妹妹穿的比我還像新娘弓千。我一直安慰自己,他們只是感情好献起,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開白布洋访。 她就那樣靜靜地躺著镣陕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪姻政。 梳的紋絲不亂的頭發(fā)上呆抑,一...
    開封第一講書人閱讀 51,443評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音扶歪,去河邊找鬼理肺。 笑死,一個(gè)胖子當(dāng)著我的面吹牛善镰,可吹牛的內(nèi)容都是我干的妹萨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼炫欺,長吁一口氣:“原來是場噩夢啊……” “哼乎完!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起品洛,我...
    開封第一講書人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤树姨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后桥状,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體帽揪,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年辅斟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了转晰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡士飒,死狀恐怖查邢,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情酵幕,我是刑警寧澤扰藕,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站芳撒,受9級(jí)特大地震影響邓深,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜笔刹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一芥备、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧徘熔,春花似錦门躯、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽染乌。三九已至,卻和暖如春懂讯,著一層夾襖步出監(jiān)牢的瞬間荷憋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來泰國打工褐望, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留勒庄,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓瘫里,卻偏偏與公主長得像实蔽,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子谨读,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 原文鏈接:Redis實(shí)現(xiàn)消息隊(duì)列的方案 Redis作為內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)局装,常用作數(shù)據(jù)庫、緩存和消息代理劳殖。它支持?jǐn)?shù)...
    這個(gè)ID狠溫柔閱讀 101,237評(píng)論 2 28
  • 夜鶯2517閱讀 127,720評(píng)論 1 9
  • 版本:ios 1.2.1 亮點(diǎn): 1.app角標(biāo)可以實(shí)時(shí)更新天氣溫度或選擇空氣質(zhì)量铐尚,建議處女座就不要選了,不然老想...
    我就是沉沉閱讀 6,894評(píng)論 1 6
  • 我是一名過去式的高三狗哆姻,很可悲宣增,在這三年里我沒有戀愛,看著同齡的小伙伴們一對(duì)兒一對(duì)兒的矛缨,我的心不好受爹脾。怎么說呢,高...
    小娘紙閱讀 3,388評(píng)論 4 7
  • 這些日子就像是一天一天在倒計(jì)時(shí) 一想到他走了 心里就是說不出的滋味 從幾個(gè)月前認(rèn)識(shí)他開始 就意識(shí)到終究會(huì)發(fā)生的 只...
    栗子a閱讀 1,621評(píng)論 1 3