別再用 Redis List 實(shí)現(xiàn)消息隊(duì)列了,Stream 專(zhuān)為隊(duì)列而生

上回說(shuō)到[使用 Redis 的 List 實(shí)現(xiàn)消息隊(duì)列有很多局限性胸竞,比如:

  • 沒(méi)有良好的 ACK 機(jī)制参萄;
  • 沒(méi)有 ConsumerGroup 消費(fèi)組概念;
  • 消息堆積讹挎。
  • List 是線性結(jié)構(gòu),想要查詢(xún)指定數(shù)據(jù)需要遍歷整個(gè)列表马篮;

Stream 是 Redis 5.0 引入的一種專(zhuān)門(mén)為消息隊(duì)列設(shè)計(jì)的數(shù)據(jù)類(lèi)型怜奖,Stream 是一個(gè)包含 0 個(gè)或者多個(gè)元素的有序隊(duì)列,這些元素根據(jù) ID 的大小進(jìn)行有序排列歪玲。

它實(shí)現(xiàn)了大部分消息隊(duì)列的功能:

  • 消息 ID 系列化生成怎顾;
  • 消息遍歷;
  • 消息的阻塞和非阻塞讀夭委;
  • Consumer Groups 消費(fèi)組;
  • ACK 確認(rèn)機(jī)制株灸。
  • 支持多播。

提供了很多消息隊(duì)列操作命令逐抑,并且借鑒 Kafka 的 Consumer Groups 的概念屹蚊,提供了消費(fèi)組功能。

同時(shí)提供了消息的持久化和主從復(fù)制機(jī)制汹粤,客戶(hù)端可以訪問(wèn)任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶(hù)端的訪問(wèn)位置国葬,從而保證消息不丟失。

廢話(huà)少說(shuō)汇四,先來(lái)看下如何使用踢涌,官網(wǎng)文檔詳見(jiàn):redis.io/topics/stre…

XADD:插入消息

「云嵐宗眾弟子聽(tīng)命,擊殺蕭炎利虫!」

當(dāng)云山最后一字落下,那彌漫的緊繃氣氛糠惫,頓時(shí)宣告破碎钉疫,懸浮半空的眾多云嵐宗長(zhǎng)老背后雙翼一振,便是咻咻的劃過(guò)天際牲阁,追殺蕭炎壤躲。

云山使用以下指令向隊(duì)列中插入「追殺蕭炎」命令碉克,讓長(zhǎng)老帶領(lǐng)子弟去執(zhí)行。

XADD 云嵐宗 * task kill name 蕭炎
"1645936602161-0"

Stream 中的每個(gè)元素由鍵值對(duì)的形式組成漏麦,不同元素可以包含不同數(shù)量的鍵值對(duì)况褪。

該命令的語(yǔ)法如下:

XADD streamName id field value [field value ...]

消息隊(duì)列名稱(chēng)后面的 「*」 ,表示讓 Redis 為插入的消息自動(dòng)生成唯一 ID测垛,當(dāng)然也可以自己定義。

消息 ID 由兩部分組成:

  • 當(dāng)前毫秒內(nèi)的時(shí)間戳号涯;
  • 順序編號(hào)疙描。從 0 為起始值,用于區(qū)分同一時(shí)間內(nèi)產(chǎn)生的多個(gè)命令起胰。

通過(guò)將元素ID與時(shí)間進(jìn)行關(guān)聯(lián),并強(qiáng)制要求新元素的ID必須大于舊元素的ID, Redis從邏輯上將流變成了一種只執(zhí)行追加操作(append only)的數(shù)據(jù)結(jié)構(gòu)地消。

這種特性對(duì)于使用流實(shí)現(xiàn)消息隊(duì)列和事件系統(tǒng)的用戶(hù)來(lái)說(shuō)是非常重要的:

用戶(hù)可以確信畏妖,新的消息和事件只會(huì)出現(xiàn)在已有消息和事件之后,就像現(xiàn)實(shí)世界里新事件總是發(fā)生在已有事件之后一樣戒劫,一切都是有序進(jìn)行的。

XREAD:讀取消息

云凌老狗使用如下指令接收云山的命令:

XREAD COUNT 1 BLOCK 0 STREAMS 云嵐宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
   2) 1) 1) "1645936602161-0"
         2) 1) "task"
            2) "kill"
            3) "name"
            4) "蕭炎" # 蕭炎

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

該指令可以同時(shí)對(duì)多個(gè)流進(jìn)行讀取巫橄,每個(gè)心法對(duì)應(yīng)含義如下:

  • COUNT:表示每個(gè)流中最多讀取的元素個(gè)數(shù)茵典;
  • BLOCK:阻塞讀取,當(dāng)消息隊(duì)列沒(méi)有消息的時(shí)候,則阻塞等待筹我, 0 表示無(wú)限等待帆离,單位是毫秒。
  • ID:消息 ID哥谷,在讀取消息的時(shí)候可以指定 ID,并從這個(gè) ID 的下一條消息開(kāi)始讀取,0-0 則表示從第一個(gè)元素開(kāi)始讀取赎瑰。

如果想使用 XREAD 進(jìn)行順序消費(fèi),每次讀取后要記住返回的消息 ID餐曼,下次調(diào)用 XREAD 就將上一次返回的消息 ID 作為參數(shù)傳遞到下一次調(diào)用就可以繼續(xù)消費(fèi)后續(xù)的消息了。

云韻宗主集惋,我今天剛到云嵐宗踩娘,歷史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻開(kāi)始通過(guò) XADD 發(fā)布的消息要咋整养渴?

運(yùn)行「」心法即可,心法的最后 「」心法即可翘紊,心法的最后「」符號(hào)表示讀取最新的阻塞消息,讀取不到則一直死等帆疟。

等待過(guò)程中宇立,其他長(zhǎng)老向隊(duì)列追加消息,則會(huì)立即讀取到泄伪。

XREAD COUNT 1 BLOCK 0 STREAMS 云嵐宗 $

這么容易就實(shí)現(xiàn)消息隊(duì)列了么?說(shuō)好的 ACK 機(jī)制呢染厅?

這里只是開(kāi)胃菜,通過(guò) XREAD 讀取的數(shù)據(jù)其實(shí)并沒(méi)有被刪除肖粮,當(dāng)重新執(zhí)行 XREAD COUNT 2 BLOCK 0 STREAMS 云嵐宗 0-0 指令的時(shí)候又會(huì)重新讀取到。

所以我們還需要 ACK 機(jī)制行施,

接下來(lái)魂那,我們來(lái)一個(gè)真正的消息隊(duì)列。

ConsumerGroup

Redis Stream 的 ConsumerGroup(消費(fèi)者組)允許用戶(hù)將一個(gè)流從邏輯上劃分為多個(gè)不同的流涯雅,并讓 ConsumerGroup 的消費(fèi)者去處理。

它是一個(gè)強(qiáng)大的支持多播的可持久化的消息隊(duì)列精刷。 Redis Stream 借鑒了 Kafka 的設(shè)計(jì)。

Stream 的高可用是建立主從復(fù)制基礎(chǔ)上的怒允,它和其它數(shù)據(jù)結(jié)構(gòu)的復(fù)制機(jī)制沒(méi)有區(qū)別锈遥,也就是說(shuō)在 Sentinel 和 Cluster 集群環(huán)境下 Stream 是可以支持高可用的。

  • Redis Stream 的結(jié)構(gòu)如上圖所示儿礼。有一個(gè)消息鏈表,每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容蚊夫;
  • 消息持久化懦尝;
  • 每個(gè)消費(fèi)組的狀態(tài)是獨(dú)立的,不不影響陵霉,同一份的 Stream 消息會(huì)被所有的消費(fèi)組消費(fèi);
  • 一個(gè)消費(fèi)組可以有多個(gè)消費(fèi)者組成乍桂,消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使 last_deliverd_id 往前移動(dòng)睹酌;
  • 每個(gè)消費(fèi)者有一個(gè) pending_ids 變量,用于記錄當(dāng)前消費(fèi)者讀取了但是還沒(méi) ack 的消息憋沿。它用來(lái)保證消息至少被客戶(hù)端消費(fèi)了一次。

消費(fèi)組實(shí)現(xiàn)的消息隊(duì)列主要涉及以下三個(gè)指令:

  • [XGROUP]用于創(chuàng)建采章、銷(xiāo)毀和管理消費(fèi)者組。
  • [XREADGROUP]用于通過(guò)消費(fèi)者組從流中讀取悯舟。
  • [XACK]是允許消費(fèi)者將待處理消息標(biāo)記為已正確處理的命令砸民。

創(chuàng)建消費(fèi)組

Stream 通過(guò) XGROUP CREATE 指令創(chuàng)建消費(fèi)組 (Consumer Group),需要傳遞起始消息 ID 參數(shù)用來(lái)初始化 last_delivered_id 變量阱洪。

我們使用 XADD 往 bossStream 隊(duì)列插入一些消息:

XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40

如下指令冗荸,為消息隊(duì)列名為 bossStream 創(chuàng)建「青龍門(mén)」和「六扇門(mén)」兩個(gè)消費(fèi)組利耍。

# 語(yǔ)法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龍門(mén) 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇門(mén) 0-0 MKSTREAM
  • stream:指定隊(duì)列的名字;
  • group:指定消費(fèi)組名字隘梨;
  • start_id:指定消費(fèi)組在 Stream 中的起始 ID,它決定了消費(fèi)者組從哪個(gè) ID 之后開(kāi)始讀取消息嵌莉,0-0 從第一條開(kāi)始讀取, $ 表示從最后一條向后開(kāi)始讀取锐峭,只接收新消息可婶。
  • MKSTREAM:默認(rèn)情況下,XGROUP CREATE命令在目標(biāo)流不存在時(shí)返回錯(cuò)誤矛渴。可以使用可選MKSTREAM子命令作為 之后的最后一個(gè)參數(shù)來(lái)自動(dòng)創(chuàng)建流。

讀取消息

讓「青龍門(mén)」消費(fèi)組的 consumer1bossStream 阻塞讀取一條消息:

XREADGROUP GROUP 青龍門(mén) consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957821396-0"
         2) 1) "name"
            2) "zhangsan"
            3) "age"
            4) "26"

語(yǔ)法如下:

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]

[] 內(nèi)的表示可選參數(shù)筐赔,該命令與 XREAD 大同小異钻趋,區(qū)別在于新增 GROUP groupName consumerName 選項(xiàng)。

該選項(xiàng)的兩個(gè)參數(shù)分別用于指定被讀取的消費(fèi)者組以及負(fù)責(zé)處理消息的消費(fèi)者较沪。

其中:

  • >:命令的最后參數(shù) >,表示從尚未被消費(fèi)的消息開(kāi)始讀仁萄焦;
  • BLOCK:阻塞讀取拂封;

敲黑板了

如果消息隊(duì)列中的消息被消費(fèi)組的一個(gè)消費(fèi)者消費(fèi)了,這條消息就不會(huì)再被這個(gè)消費(fèi)組的其他消費(fèi)者讀取到在抛。

比如 consumer2 執(zhí)行讀取操作:

XREADGROUP GROUP 青龍門(mén) consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
   2) 1) 1) "1645957838700-0"
         2) 1) "name"
            2) "lisi"
            3) "age"
            4) "2"

consumer2 不能再讀取到 zhangsan 了,而是讀取下一條 lisi 因?yàn)檫@條消息已經(jīng)被 consumer1 讀取了刚梭。

使用消費(fèi)者的另一個(gè)目的可以讓組內(nèi)的多個(gè)消費(fèi)者分擔(dān)讀取消息票唆,也就是每個(gè)消費(fèi)者讀取部分消息,從而實(shí)現(xiàn)均衡負(fù)載走趋。

比如一個(gè)消費(fèi)組有三個(gè)消費(fèi)者 C1、C2典挑、C3 和一個(gè)包含消息 1、2您觉、3授滓、4肆糕、5在孝、6、7 的流:

XPENDING 查看已讀未確認(rèn)消息

為了保證消費(fèi)者在消費(fèi)的時(shí)候發(fā)生故障或者宕機(jī)重啟后依然可以讀取消息私沮,Stream 內(nèi)部有一個(gè)隊(duì)列(pending List)保存每個(gè)消費(fèi)者讀取但是還沒(méi)有執(zhí)行 ACK 的消息

如果消費(fèi)者使用了 XREADGROUP GROUP groupName consumerName 讀取消息造垛,但是沒(méi)有給 Stream 發(fā)送 XACK 命令晰搀,消息依然保留。

比如查看 bossStream 中的 消費(fèi)組「青龍門(mén)」中各個(gè)消費(fèi)者已讀取未確認(rèn)的消息信息:

XPENDING bossStream 青龍門(mén)
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
  1. 1)未確認(rèn)消息條數(shù)外恕;
  2. 2) ~ 3)青龍門(mén)中所有消費(fèi)者讀取的消息最小和最大 ID;

查看 consumer1讀取了哪些數(shù)據(jù)罪郊,使用以下命令:

XPENDING bossStream 青龍門(mén) - + 10 consumer1
1) 1) "1645957821396-0"
   2) "consumer1"
   3) (integer) 3758384
   4) (integer) 1

ACK 確認(rèn)

所以當(dāng)接收到消息并且消費(fèi)成功以后,我們需要手動(dòng) ACK 通知 Streams排龄,這條消息就會(huì)被刪除了翎朱。命令如下:

XACK bossStream 青龍門(mén) 1645957821396-0 1645957838700-0
(integer) 2

語(yǔ)法如下:

XACK key group-key ID [ID ...]

消費(fèi)確認(rèn)增加了消息的可靠性拴曲,一般在業(yè)務(wù)處理完成之后,需要執(zhí)行 ack 確認(rèn)消息已經(jīng)被消費(fèi)完成澈灼,整個(gè)流程的執(zhí)行如下圖所示:

使用 Redisson 實(shí)戰(zhàn)

使用 maven 添加依賴(lài)

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>

添加 Redis 配置店溢,碼哥的 Redis 沒(méi)有配置密碼床牧,大家根據(jù)實(shí)際情況配置即可。

spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false
@Slf4j
@Service
public class QueueService {
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 發(fā)送消息到隊(duì)列
     *
     * @param message
     */
    public void sendMessage(String message) {
        RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.add("speed", "19");
        stream.add("velocity", "39%");
        stream.add("temperature", "10C");
    }

    /**
     * 消費(fèi)者消費(fèi)消息
     *
     * @param message
     */
    public void consumerMessage(String message) {
        RStream<String, String> stream = redissonClient.getStream("sensor#4921");
        stream.createGroup("sensors_data", StreamMessageId.ALL);
        Map<StreamMessageId, Map<String, String>> messages = stream.readGroup("sensors_data", "consumer_1");
        for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
          Map<String, String> msg = entry.getValue();
          System.out.println(msg);
          stream.ack("sensors_data", entry.getKey());
        }
    }
}

讀者朋友閱讀后有收獲的話(huà)點(diǎn)贊心软、收藏并分享壕吹,感謝支持删铃。

作者:碼哥字節(jié)
鏈接:https://juejin.cn/post/7070757711926788103

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市猎唁,隨后出現(xiàn)的幾起案子咒劲,更是在濱河造成了極大的恐慌诫隅,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件阎肝,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡风题,警方通過(guò)查閱死者的電腦和手機(jī)判导,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)眼刃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人摇肌,你說(shuō)我怎么就攤上這事∥。” “怎么了昵骤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵肯适,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我框舔,道長(zhǎng),這世上最難降的妖魔是什么刘绣? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任樱溉,我火速辦了婚禮纬凤,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘停士。我一直安慰自己肚医,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布肠套。 她就那樣靜靜地躺著,像睡著了一般你稚。 火紅的嫁衣襯著肌膚如雪瓷耙。 梳的紋絲不亂的頭發(fā)上刁赖,一...
    開(kāi)封第一講書(shū)人閱讀 49,772評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音宇弛,去河邊找鬼鸡典。 笑死枪芒,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的舅踪。 我是一名探鬼主播纽甘,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼抽碌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了货徙?” 一聲冷哼從身側(cè)響起左权,我...
    開(kāi)封第一講書(shū)人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤赏迟,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后祷舀,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體烹笔,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年饰豺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了允蜈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蒿柳。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡漩蟆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出怠李,到底是詐尸還是另有隱情,我是刑警寧澤捺癞,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站髓介,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏唐础。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一叫胁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧驼鹅,春花似錦、人聲如沸输钩。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至剪验,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間功戚,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工啸臀, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留烁落,地道東北人豌注。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓灯萍,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親竟稳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容