前言
實(shí)測(cè)下來这敬,該方案本地 redis + 8 協(xié)程可以每秒消費(fèi) 14 萬數(shù)據(jù),下文是具體的實(shí)現(xiàn)。
代碼:https://github.com/yaodongen/delay-queue
import delay "github.com/yaodongen/delay-queue"
delay.AddToQueue(ctx, rdb, "key", "value", 5, 86400)
delay.GetFromQueue(ctx, rdb, "key")
延遲隊(duì)列的三種思路
思路一:僅使用 redis zset
- 添加一條數(shù)據(jù):
zadd key 1645880088 '{"x":1}'
- 獲取一條數(shù)據(jù):
zrange key 0 0 withscores
- 刪除數(shù)據(jù):
zrem key value
這里添加數(shù)據(jù)的時(shí)候,直接用未來的時(shí)間戳作為 score 存入 redis。取數(shù)據(jù)的時(shí)候瞄摊,先獲取第一個(gè)數(shù)據(jù),通過比對(duì) score 和當(dāng)前時(shí)間戳苦掘,判斷是否滿足消費(fèi)的條件泉褐。(這里直接獲取第一個(gè)數(shù)據(jù)是因?yàn)?redis 的 sort set 默認(rèn)會(huì)把最小的 score 排在最前面)
然后,等消費(fèi)完成后再刪除對(duì)應(yīng)的數(shù)據(jù)鸟蜡。其中獲取數(shù)據(jù)的時(shí)候也可以使用 zrangebyscore key (0 1645880088
膜赃,把判斷時(shí)間戳的邏輯交給 redis。
需要注意揉忘,在并發(fā)場(chǎng)景下跳座,多個(gè)線程會(huì)獲取到同一個(gè)數(shù)據(jù)端铛,為了防止重復(fù)消費(fèi),需要修改一下取數(shù)邏輯疲眷。一種方案是通過執(zhí)行 zrem key value
先刪除對(duì)應(yīng)的 key禾蚕,獲取刪除的結(jié)果,如果該值為 1
則說明刪除成功狂丝,也就是搶到了數(shù)據(jù)换淆,可以進(jìn)行消費(fèi)。
思路一的優(yōu)點(diǎn)是實(shí)現(xiàn)起來很簡潔几颜,缺點(diǎn)是對(duì)高并發(fā)的支持不友好倍试。因?yàn)樯a(chǎn)環(huán)境下,redis 通常是一個(gè)集群的蛋哭,比如有 5 個(gè)機(jī)器县习。通過這種做法 key 只能在一個(gè)機(jī)器上,不能充分利用 redis 集群的特點(diǎn)谆趾。
思路二 redis zset + list
- 插入一條數(shù)據(jù):
zadd key 1645880088 '{"x":1}'
- 生產(chǎn)者
-
zrange key 0 0 withscores
// 獲取 '{"x":1}' -
rpush mylist01 '{"x":1}'
// 將數(shù)據(jù)推入 mylist 供消費(fèi)者消費(fèi)
-
- 消費(fèi)數(shù)據(jù)
lpop mylist01
該思路在一的基礎(chǔ)上躁愿,引入了一個(gè)生產(chǎn)者程序。生產(chǎn)者通過把數(shù)據(jù)推入多個(gè)不同的 list沪蓬,實(shí)現(xiàn)了數(shù)據(jù)打散彤钟,供消費(fèi)者并發(fā)消費(fèi)。
該方案的優(yōu)點(diǎn)是提供了對(duì)消費(fèi)者端的高并發(fā)支持跷叉,但是在生產(chǎn)者端不能利用到 redis 集群的特點(diǎn)逸雹,瓶頸卡在了 zset 上。同時(shí)該方案引入了額外的生產(chǎn)者性芬,增加了部署的復(fù)雜性。
參考實(shí)現(xiàn):https://segmentfault.com/a/1190000022027194
思路三:一和二的合并
- 添加一條數(shù)據(jù)
zadd key key:1645880088
rpush key:1645880088 '{"x":1}'
- 獲取一條數(shù)據(jù)
-
zrange key 0 0 withscores
// 獲取 key:1645880088 -
lpop key:1645880088
// 獲取具體值 '{"x":1}'
-
該方案是在 zset 中存儲(chǔ)隊(duì)列的 key(可以理解為一個(gè)時(shí)間片)剧防,然后將真正的數(shù)據(jù)存儲(chǔ)在隊(duì)列(時(shí)間片)中植锉,實(shí)現(xiàn)了思路一和思路二的合并。
其中添加數(shù)據(jù)的時(shí)候直接往相應(yīng)的隊(duì)列中推數(shù)據(jù)峭拘,取數(shù)據(jù)的時(shí)候直接從隊(duì)列中取數(shù)據(jù)的俊庇。而因?yàn)殛?duì)列的 key 是不同的,所以天然分散在不同 redis 服務(wù)器上鸡挠,可以支持高擴(kuò)展性辉饱。同時(shí)也不需要引入額外的生產(chǎn)者。實(shí)測(cè)下來本地 redis + 8 協(xié)程可以每秒消費(fèi) 14 萬數(shù)據(jù)拣展。
參考代碼: https://github.com/yaodongen/delay-queue
創(chuàng)建日期:2022-02-26 修改日期:2022-02-26