簡(jiǎn)介
? ????Stream是Redis 5.0引入的一種新數(shù)據(jù)類(lèi)型缅帘,允許消費(fèi)者等待生產(chǎn)者發(fā)送的新數(shù)據(jù)斯嚎,還引入了消費(fèi)者組概念丹莲,組之間數(shù)據(jù)是相同的(前提是設(shè)置的偏移量一樣)昼汗,組內(nèi)的消費(fèi)者不會(huì)拿到相同數(shù)據(jù)笔喉。這種概念和kafka很雷同取视。
?Stream 類(lèi)型,從字面上看是流類(lèi)型常挚,但其實(shí)從功能上看作谭,應(yīng)該是 Redis 對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)奄毡≌矍罚基于 Redis 做消息隊(duì)列的方案有很多種,例如:
PUB/SUB吼过,訂閱/發(fā)布模式
基于 Lis 類(lèi)型t的 LPUSH+BRPOP 的實(shí)現(xiàn)
基于 Sorted-Set 的實(shí)現(xiàn)
Redis 5.0 中發(fā)布的 Stream 類(lèi)型锐秦,也用來(lái)實(shí)現(xiàn)典型的消息隊(duì)列。該 Stream 類(lèi)型的出現(xiàn)盗忱,幾乎滿足了消息隊(duì)列具備的全部?jī)?nèi)容酱床。
??? 在某些特定場(chǎng)景可以使用redis的stream代替kafka等消息隊(duì)列,減少系統(tǒng)復(fù)雜性趟佃,增強(qiáng)系統(tǒng)的穩(wěn)定性
特點(diǎn)
? 1.如果使用xrange和xrevrange命令斤葱,則Stream和list功能類(lèi)同
? 2.如果使用xread命令,則有其非常獨(dú)特的地方
? 2.1 與redis的pub/sub不同揖闸,pub/sub多個(gè)客戶端是收到相同的數(shù)據(jù)揍堕,而stream的多個(gè)客戶端是競(jìng)爭(zhēng)關(guān)系,每個(gè)客戶端收到的數(shù)據(jù)是不相同的
? 2.2 pub/sub中一旦觸發(fā)數(shù)據(jù)獲取汤纸,不會(huì)記錄下上一次拿的位置衩茸,意味著客戶端無(wú)法重復(fù)去拿以前的數(shù)據(jù),而blpop方式一旦pop贮泞,數(shù)據(jù)就會(huì)永久的刪除楞慈,也無(wú)法重復(fù)去拿以前的數(shù)據(jù)幔烛。而Stream會(huì)永久的存放數(shù)據(jù),并且客戶端會(huì)保留上一次拿的id囊蓝,甚至通過(guò)修改id可以拿回以前的數(shù)據(jù)饿悬。和kafka的機(jī)制類(lèi)似。
? 2.3.Stream提供了消費(fèi)者組(kafka也有)聚霜,不同組接收到的數(shù)據(jù)完全一樣(前提是條件一樣)狡恬,但是組內(nèi)的消費(fèi)者則是競(jìng)爭(zhēng)關(guān)系(還是和kafka一樣)。
? 2.4.可以設(shè)置為阻塞與非阻塞模式
2.5.多客戶端時(shí)蝎宇,遵循FIFO特性
命令操作詳解
先對(duì)基本命令做一些熟練操作弟劲,后面再研究高級(jí)特性。
注意:命令是不區(qū)分大小寫(xiě)的姥芥,個(gè)人比較喜歡小寫(xiě)的方式
添加
命令格式:XADD stream_name id key-value [key-value ...]
? stream_name:給流指定一個(gè)名字
? id:entry在流中的標(biāo)識(shí)兔乞,entry可以理解為添加到流中數(shù)據(jù)的封裝。id一般來(lái)說(shuō)都使用自增的序列凉唐,而不需要自己手動(dòng)指定庸追,有兩部分組成:<millisecondsTime>-<sequenceNumber>。在命令行中執(zhí)行XADD命令后台囱,會(huì)將自動(dòng)生成的ID返回并輸出锚国。如果兩個(gè)命令在時(shí)間上非常接近,那么millisecondsTime相同玄坦,則sequenceNumber就自動(dòng)增長(zhǎng)血筑。
? 因?yàn)閞edis的stream是支持范圍查詢,所以ID組成部分使用了millisecondsTime煎楣。因?yàn)閙illisecondsTime就是不斷有序自增的
? 如果要自定義id豺总,則一定要保證全局唯一,避免出現(xiàn)意料不到問(wèn)題
1.命令行單條執(zhí)行
XADD mytopic * acctid 012 age 9
? 輸出結(jié)果:
127.0.0.1:6379> XADD mytopic * acctid 012 age 9
1527837352024-0
2.使用文件批量執(zhí)行
在文件中編寫(xiě)命令:
XADD mytopic * acctid 123 age 10
XADD mytopic * acctid 234 age 11
XADD mytopic * acctid 345 age 12
XADD mytopic * acctid 456 age 13
執(zhí)行命令:
cat 1.txt | redis-cli -a runoob
輸出結(jié)果:
[hadoop@hadoop00 /home/hadoop/proc/redis-5.0-rc1]$ cat 1.txt | redis-cli -a runoobWarning: Using a password with '-a' option on the command line interface may not be safe.1527837440631-01527837440632-01527837440632-11527837440632-2
查看隊(duì)列長(zhǎng)度命令:
xlen mytopic
輸出:
127.0.0.1:6379> xlen mytopic
(integer) 5
127.0.0.1:6379>
獲取數(shù)據(jù):xrange xrevrange
1.xrange
xrange mytopic - +
? 符號(hào)"-":表示最小值
? 符號(hào)"+":表示最大值
xrange mytopic 0 +
? 命令xrange mytopic 0 +的效果和上面一樣择懂,因?yàn)榕判驎r(shí)字符0是字符1小的喻喳,而上面所有自動(dòng)生成的millisecondsTime肯定是大于0的
xrange mytopic 1527837440632 1527837440632
? 自定義查詢范圍,指定特殊的值困曙,上面的查詢結(jié)果為同一個(gè)毫秒向Stream中添加的數(shù)據(jù)表伦,包含如下三個(gè)entry
1527837440632-0
1527837440632-1
1527837440632-2
xrange mytopic 1527837440632 + count 2
? 該命令的意思為:查詢ID以1527837440632開(kāi)始,以無(wú)限大為結(jié)束的entry慷丽,但只取出查詢結(jié)果集(升序排列)中的前兩個(gè)entry蹦哼,輸出結(jié)果包含如下兩個(gè)Id
1527837440632-0
1527837440632-1
2.xrevrange
xrevrange mytopic + 1527837440632 count 3
? 該命令的意思為:反向查詢ID以無(wú)限大為開(kāi)始,以1527837440632為結(jié)束的entry要糊,但只取出查詢結(jié)果集(降序排列)中的前三個(gè)entry,輸出結(jié)果包含如下三個(gè)id
1527839429360-0
1527837440632-2
1527837440632-1
獲取數(shù)據(jù):xread
1.非阻塞
xread count 4 streams mytopic 0
? 從stream 中拿ID比0大的4個(gè)Entry纲熏,按升序排列
count 4:count參數(shù)的值為4
streams:該參數(shù)必須是xread命令的最后一個(gè)參數(shù)
xread count 10 streams mytopic mystream 0 0
一次訪問(wèn)多個(gè)stream,可分別指定最大ID
2.阻塞
xread block 0 streams mystream $
監(jiān)聽(tīng)name為mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry局劲,其實(shí)只要你將"$"設(shè)置為任何一個(gè)比當(dāng)前ID還大的值勺拣,一樣可以實(shí)現(xiàn)阻塞等待,如果比當(dāng)前ID小鱼填,那么立馬返回符合條件的entry
block 0:block表示命令要阻塞药有,0表示阻塞時(shí)間為無(wú)限大,不超時(shí)苹丸,如果設(shè)置為>0的整數(shù)愤惰,即為阻塞超時(shí)時(shí)間
監(jiān)聽(tīng)生效后,拿到數(shù)據(jù)監(jiān)聽(tīng)就失效谈跛,與zk的watcher雷同羊苟。意思是該命令執(zhí)行后塑陵,只能拿到一條ID比設(shè)置ID更大的entry感憾,要想繼續(xù)拿,必須執(zhí)行xread命令令花,官方推薦下一次拿entry使用上一次得到的ID阻桅。注意千萬(wàn)別亂設(shè)置很大的ID ,否則你可能永遠(yuǎn)拿不到entry兼都。
xread block 0 streams mystream mytopic $ $
收到任何一個(gè)stream的消息嫂沉,本次監(jiān)聽(tīng)就失效,只能拿到一條數(shù)據(jù)扮碧,后面還需要拿數(shù)據(jù)趟章,可以將各自stream拿到的ID作為最大ID,重新執(zhí)行命令
消費(fèi)者組-Consumer groups
redis5引入了消費(fèi)者組的概念慎王,一個(gè)stream的數(shù)據(jù)每一個(gè)消費(fèi)者組都發(fā)一份蚓土,消費(fèi)者組里面的消費(fèi)者競(jìng)爭(zhēng)同一份數(shù)據(jù),亦即在同一個(gè)消費(fèi)者組內(nèi)赖淤,一個(gè)消息是不可能發(fā)給多個(gè)消費(fèi)者的
need-to-insert-img
消費(fèi)者組提供了如下5點(diǎn)保障:
組內(nèi)消費(fèi)者消費(fèi)的消息不重復(fù)
組內(nèi)消費(fèi)者名稱(chēng)必須唯一
消費(fèi)者拿到的消息肯定是沒(méi)有被組內(nèi)其他消費(fèi)者消費(fèi)過(guò)的消息
消費(fèi)者成功消費(fèi)消息之后要求發(fā)送ACK蜀漆,然后這條消息才會(huì)從消費(fèi)者組中移除,也就是說(shuō)消息至少被消費(fèi)一次咱旱,和kafka一樣
消費(fèi)者組會(huì)跟蹤所有待處理的消息
命令:
1.創(chuàng)建消費(fèi)者組
xgroup create mytopic mygroup $
該命令的意思是:使用xgroup命令創(chuàng)建了一個(gè)mygroup消費(fèi)者組确丢,該消費(fèi)者組與mytopic stream進(jìn)行了關(guān)聯(lián),以后mygroup消費(fèi)者組中的消費(fèi)者就會(huì)mytopic stream中拿數(shù)據(jù)
符號(hào)"$"和上面一樣吐限,代表mytopic stream中目前最大的ID鲜侥,消費(fèi)者拿到的entry的id一定會(huì)大于此刻$代表的最大ID。你也可以指定這個(gè)最大的ID诸典,比如0
2.從消費(fèi)者組讀數(shù)據(jù)
xreadgroup group mygroup consumer_a count 1 streams mytopic >
該命令的意思是:使用xreadgroup命令讓消費(fèi)者consumer_a從mygroup消費(fèi)者組的mytopic stream中拿最新的剃毒,并且沒(méi)有被發(fā)送給其他消費(fèi)者處理的entry。
group:該參數(shù)是必選項(xiàng)
">":該符號(hào)只有在消費(fèi)者組命令xreadgroup中有效,意思為mytopic stream中最新且沒(méi)有被其他消費(fèi)者處理的ID赘阀,千萬(wàn)記住不要與上面"$"最大ID搞混了益缠,否則拿出來(lái)的數(shù)據(jù)與你的期望值不符,如果使用的是任何數(shù)組ID基公,那么該消費(fèi)者就無(wú)法拿到任何新的消息幅慌,只是從它的已經(jīng)處理過(guò)的消息中拿,并且不會(huì)有ACK機(jī)制
如果想一個(gè)消費(fèi)者組關(guān)聯(lián)多個(gè)stream可以這樣做:
xgroup create mystream mygroup $
xgroup create mytopic mygroup $
xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >
讀消息的參數(shù)多了一個(gè)block 0轰豆,就是說(shuō)讀數(shù)據(jù)需要阻塞胰伍。
3.發(fā)送ACK
將指定ID對(duì)應(yīng)的entry從consumer的已處理消息列表中刪除
XACK mystream mygroup 1527864992409-0
參考:https://blog.csdn.net/valada/article/details/88904110
異步執(zhí)行
指的是若操作 A 和操作 B 需要做數(shù)據(jù)交互,但是兩個(gè)操作處理數(shù)據(jù)的速度相差較大酸休,例如典型的 IO 請(qǐng)求(包括磁盤(pán)和網(wǎng)絡(luò))骂租,此時(shí)應(yīng)該將操作設(shè)計(jì)為異步執(zhí)行的,也就是不需要等待其他的操作執(zhí)行完畢斑司,當(dāng)前操作就可以繼續(xù)執(zhí)行渗饮。那此時(shí)也可以使用消息隊(duì)列。在操作A執(zhí)行完畢后宿刮,將結(jié)果放入隊(duì)列即可完成任務(wù)互站,不需要等待操作 B。而操作 B 直接去隊(duì)列中提取消息進(jìn)行處理即可僵缺。這樣胡桃,慢的操作還可以并發(fā)執(zhí)行,進(jìn)而提升響應(yīng)速度磕潮。