5. stream
? 很多同學(xué)并不認(rèn)識(shí)這個(gè)數(shù)據(jù)結(jié)構(gòu),確實(shí)骗绕,在Redis 5.0之前并沒有這個(gè)數(shù)據(jù)結(jié)構(gòu)藐窄。這個(gè)數(shù)據(jù)結(jié)構(gòu)稱之為“流”,為什么叫“流”呢酬土?這種數(shù)據(jù)就像流水一樣荆忍,不是一次潑了一盆過來而是一點(diǎn)一點(diǎn)地“流”過來。當(dāng)數(shù)據(jù)從生產(chǎn)方“流”到消費(fèi)方的時(shí)候撤缴,消費(fèi)方就可以對(duì)一點(diǎn)一點(diǎn)的數(shù)據(jù)進(jìn)行處理刹枉,這樣就保證了處理的時(shí)效性,也讓機(jī)器能平緩地使用資源屈呕。
? 事實(shí)上微宝,Redis的stream是作者的另一個(gè)開源項(xiàng)目Disque移植過來的,它高度借鑒了Kafka的設(shè)計(jì)虎眨,如果你有Kafka相關(guān)的知識(shí)芥吟,那么stream對(duì)你來說也很簡單侦铜。
5.1 相關(guān)命令
-
消息隊(duì)列相關(guān)命令:
XADD - 添加消息到末尾
XTRIM - 對(duì)流進(jìn)行修剪,限制長度
XDEL - 刪除消息
XLEN - 獲取流包含的元素?cái)?shù)量钟鸵,即消息長度
XRANGE - 獲取消息列表钉稍,會(huì)自動(dòng)過濾已經(jīng)刪除的消息
XREVRANGE - 反向獲取消息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取消息列表
-
消費(fèi)者組相關(guān)命令:
XGROUP CREATE - 創(chuàng)建消費(fèi)者組
XREADGROUP GROUP - 讀取消費(fèi)者組中的消息
XACK - 將消息標(biāo)記為"已處理"
XGROUP SETID - 為消費(fèi)者組設(shè)置新的最后遞送消息ID
XGROUP DELCONSUMER - 刪除消費(fèi)者
XGROUP DESTROY - 刪除消費(fèi)者組
XPENDING - 顯示待處理消息的相關(guān)信息
XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)
XINFO - 查看流和消費(fèi)者組的相關(guān)信息
XINFO GROUPS - 打印消費(fèi)者組的信息
XINFO STREAM - 打印流信息
5.2 結(jié)構(gòu)源碼
消息ID:
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
* of the latest generated ID and an incremented sequence. */
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
流迭代器:
/* We define an iterator to iterate stream items in an abstract way, without
* caring about the radix tree + listpack representation. Technically speaking
* the iterator is only used inside streamReplyWithRange(), so could just
* be implemented inside the function, but practically there is the AOF
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
?
流:
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
5.3 運(yùn)行機(jī)制
? 每個(gè) stream 都有唯一的名稱棺耍,它就是 Redis 的 key贡未,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。
? 每個(gè) stream 都可以掛多個(gè)消費(fèi)組(Consumer Group)蒙袍,每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id俊卤,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng),同一個(gè)消費(fèi)組的消費(fèi)者只能讀取之后的消息害幅。
? pending_ids是消費(fèi)者(Consumer)的狀態(tài)變量消恍,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 它記錄了當(dāng)前已經(jīng)被客戶端讀取的消息以现,但是還沒有 ack(Acknowledge character:確認(rèn)字符)的消息id狠怨。當(dāng)消息被取走,這個(gè)對(duì)應(yīng)的消息ID就會(huì)進(jìn)入消費(fèi)組的PEL(Pending Entries List)結(jié)構(gòu)里邑遏,當(dāng)ack之后佣赖,這個(gè)消息就會(huì)從PEL中刪除。如果stream中的消息被消費(fèi)者取走但是一直不ack记盒,那么PEL就會(huì)一直增長憎蛤,如果消費(fèi)者巨多,就可能出現(xiàn)內(nèi)存占用很大的情況纪吮。當(dāng)Redis服務(wù)器向消費(fèi)者發(fā)送數(shù)據(jù)的時(shí)候俩檬,客戶端斷開了連接,這樣丟失的消息會(huì)在客戶端重連之后繼續(xù)發(fā)送給它碾盟,因?yàn)橄D會(huì)保存在PEL之中豆胸。
? 除了這些,stream還提供maxlen參數(shù)巷疼,來限制自身的最長消息數(shù)晚胡,這樣就能保證數(shù)據(jù)不超過指定的長度。
5.4 異同
? stream 雖然和 Kafka 非常像嚼沿,有消費(fèi)組估盘、水位等概念,但它并不支持天然分區(qū)骡尽。也就是說遣妥,如果我們要進(jìn)行partition的話,只能建立多個(gè) stream key 攀细,然后由客戶端或者中間代理來將不同的消息路由到不同的 stream 中箫踩。
? 在 stream 被發(fā)布之前爱态,Redis本身就有 pub/sub 來實(shí)現(xiàn)消息隊(duì)列的功能。不過pub/sub有個(gè)極大的缺點(diǎn)就是不能持久化境钟,當(dāng)被訂閱的服務(wù)器pub一條消息锦担,如果沒有sub的客戶端,那么這個(gè)消息就會(huì)丟失慨削。在生產(chǎn)環(huán)境中我們沒辦法接受那么容易的丟失消息洞渔,并且 pub/sub 更適合用來做廣播,并不適合做消息隊(duì)列缚态。作者在Reids Cluster 中大量的使用了 pub/sub 這個(gè)功能磁椒。