基于Redisson實(shí)現(xiàn)延遲隊(duì)列

Redisson實(shí)現(xiàn)延遲隊(duì)列

1.場(chǎng)景介紹

假設(shè)有這樣一個(gè)場(chǎng)景仇祭,我們有一個(gè)訂單醋奠,或者工單等等贱纠。需要在超時(shí)30分鐘后進(jìn)行關(guān)閉莫换。這個(gè)時(shí)候我們最先想到的應(yīng)該是采用定時(shí)任務(wù)去進(jìn)行輪訓(xùn)判斷霞玄,但是呢,每個(gè)訂單的創(chuàng)建時(shí)間是不一樣的拉岁,這個(gè)時(shí)間怎么確定才好呢坷剧,5分鐘。喊暖。1分鐘惫企。。執(zhí)行一次嗎陵叽。這樣就會(huì)非常影響性能狞尔。且時(shí)間誤差很大」簦基于以上業(yè)務(wù)需要我們想到了有以下解決方案偏序。

  • JDK延遲隊(duì)列,但是數(shù)據(jù)都在內(nèi)存中,重啟后什么都沒(méi)了胖替。
  • MQ中的延遲隊(duì)列研儒,比如RocketMQ。
  • 基于Redisson的延遲隊(duì)列

2.JDK延遲隊(duì)列

我們首先來(lái)回顧下JDK的延遲隊(duì)列

基于延遲隊(duì)列要實(shí)現(xiàn)接口Delayed独令,并且實(shí)現(xiàn)getDelay方法和compareTo方法

  • getDelay主要是計(jì)算返回剩余時(shí)間端朵,單位時(shí)間戳(毫秒)延遲任務(wù)是否到時(shí)就是按照這個(gè)方法判斷如果返回的是負(fù)數(shù)則說(shuō)明到期否則還沒(méi)到期
  • compareTo主要是自定義實(shí)現(xiàn)比較方法返回 1 0 -1三個(gè)參數(shù)
@ToString
public class MyDelayed<T> implements Delayed {
    /**
     * 延遲時(shí)間
     */
    Long delayTime;

    /**
     * 過(guò)期時(shí)間
     */
    Long expire;
    /**
     * 數(shù)據(jù)
     */
    T t;

    public MyDelayed(long delayTime, T t) {
        this.delayTime = delayTime;
        // 過(guò)期時(shí)間 = 當(dāng)前時(shí)間 + 延遲時(shí)間
        this.expire = System.currentTimeMillis() + delayTime;
        this.t = t;
    }

    /**
     * 剩余時(shí)間 = 到期時(shí)間 - 當(dāng)前時(shí)間
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 優(yōu)先級(jí)規(guī)則:兩個(gè)任務(wù)比較,時(shí)間短的優(yōu)先執(zhí)行
     */
    @Override
    public int compareTo(Delayed o) {
        long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) f;
    }

訂單的實(shí)體,為了簡(jiǎn)單就定義基礎(chǔ)幾個(gè)字段燃箭。

@Data
public class OrderInfo implements Serializable {
    private static final long serialVersionUID = -2837036864073566484L;
    /**
     * 訂單id
     */
    private Long id;
    
    /**
     * 訂單金額
     */
    private Double salary;

    /**
     * 訂單創(chuàng)建時(shí)間   對(duì)于java8LocalDateTime 以下注解序列化反序列化用到
     */
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime createTime;

}

為了簡(jiǎn)單我們暫且定義延遲時(shí)間為10s

public static void main(String[] args) throws InterruptedException {
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setCreateTime(LocalDateTimeUtil.parse("2022-07-01 15:00:00", "yyyy-MM-dd HH:mm:ss"));
        MyDelayed<OrderInfo> myDelayed = new MyDelayed<>(10000L,orderInfo);
        DelayQueue<MyDelayed<OrderInfo>> queue = new DelayQueue<>();
        queue.add(myDelayed);
       
        System.out.println(queue.take().getT().getCreateTime());
        System.out.println("當(dāng)前時(shí)間:" + LocalDateTime.now());
    }

輸出結(jié)果

2022-07-01T15:00
當(dāng)前時(shí)間:2022-07-01T15:10:37.375

3.基于Redisson的延遲隊(duì)列

當(dāng)然今天的主角是它了冲呢,我們主要圍繞著基于Redisson的延遲隊(duì)列來(lái)說(shuō)。

其實(shí)Redisson延遲隊(duì)列內(nèi)部也是基于redis來(lái)實(shí)現(xiàn)的招狸,我們先來(lái)進(jìn)行整合使用看看效果敬拓∪肯妫基于springboot

1.依賴:

 <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson-spring-boot-starter</artifactId>
      <version>3.16.7</version>
 </dependency>

2.創(chuàng)建redisson.yml

# 單節(jié)點(diǎn)配置
singleServerConfig:
  # 連接空閑超時(shí),單位:毫秒
  idleConnectionTimeout: 10000
  # 連接超時(shí)恩尾,單位:毫秒
  connectTimeout: 10000
  # 命令等待超時(shí),單位:毫秒
  timeout: 3000
  # 命令失敗重試次數(shù),如果嘗試達(dá)到 retryAttempts(命令失敗重試次數(shù)) 仍然不能將命令發(fā)送至某個(gè)指定的節(jié)點(diǎn)時(shí)挽懦,將拋出錯(cuò)誤翰意。
  # 如果嘗試在此限制之內(nèi)發(fā)送成功,則開(kāi)始啟用 timeout(命令等待超時(shí)) 計(jì)時(shí)信柿。
  retryAttempts: 3
  # 命令重試發(fā)送時(shí)間間隔冀偶,單位:毫秒
  retryInterval: 1500
  # 密碼
  password:
  # 單個(gè)連接最大訂閱數(shù)量
  subscriptionsPerConnection: 5
  # 客戶端名稱
  clientName: null
  # 節(jié)點(diǎn)地址
  address: redis://127.0.0.1:6379
  # 發(fā)布和訂閱連接的最小空閑連接數(shù)
  subscriptionConnectionMinimumIdleSize: 1
  # 發(fā)布和訂閱連接池大小
  subscriptionConnectionPoolSize: 50
  # 最小空閑連接數(shù)
  connectionMinimumIdleSize: 32
  # 連接池大小
  connectionPoolSize: 64
  # 數(shù)據(jù)庫(kù)編號(hào)
  database: 0
  # DNS監(jiān)測(cè)時(shí)間間隔,單位:毫秒
  dnsMonitoringInterval: 5000
# 線程池?cái)?shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
#threads: 0
# Netty線程池?cái)?shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
#nettyThreads: 0
# 編碼
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 傳輸模式
transportMode : "NIO"

3.創(chuàng)建配置類(lèi)RedissonConfig,這里是為了讀取我們剛剛創(chuàng)建在配置文件中的yml

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() throws IOException {
        Config config = Config.fromYAML(RedissonConfig.class.getClassLoader().getResource("redisson.yml"));;
        return Redisson.create(config);
    }
}

4.測(cè)試

        // redisson  延遲隊(duì)列
        // Redisson的延時(shí)隊(duì)列是對(duì)另一個(gè)隊(duì)列的再包裝渔嚷,使用時(shí)要先將延時(shí)消息添加到延時(shí)隊(duì)列中进鸠,
        // 當(dāng)延時(shí)隊(duì)列中的消息達(dá)到設(shè)定的延時(shí)時(shí)間后,該延時(shí)消息才會(huì)進(jìn)行進(jìn)入到被包裝隊(duì)列中形病,因此客年,我們只需要對(duì)被包裝隊(duì)列進(jìn)行監(jiān)聽(tīng)即可。
        RBlockingQueue<OrderInfo> blockingFairQueue = redissonClient.getBlockingQueue("my-test");

        RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);

        OrderInfo orderInfo = new OrderInfo();
        // 訂單生成時(shí)間
        orderInfo.setCreateTime(LocalDateTime.now());
        // 10秒鐘以后將消息發(fā)送到指定隊(duì)列
        delayedQueue.offer(orderInfo, 10, TimeUnit.SECONDS);
        RBlockingQueue<OrderInfo> outQueue = redissonClient.getBlockingQueue("my-test");

        OrderInfo orderInfo2 = outQueue.take();
        System.out.println("訂單生成時(shí)間" + orderInfo2.getCreateTime());
        System.out.println("訂單關(guān)閉時(shí)間" + LocalDateTime.now());

        // 在該對(duì)象不再需要的情況下漠吻,應(yīng)該主動(dòng)銷(xiāo)毀量瓜。僅在相關(guān)的Redisson對(duì)象也需要關(guān)閉的時(shí)候可以不用主動(dòng)銷(xiāo)毀
        delayedQueue.destroy();

控制臺(tái)輸出:

訂單生成時(shí)間2022-07-01T15:22:10.304
訂單關(guān)閉時(shí)間2022-07-01T15:22:20.414

解決項(xiàng)目重新啟動(dòng)并不會(huì)消費(fèi)之前隊(duì)列里的消息的問(wèn)題,增加如下代碼

 redissonClient.getDelayedQueue(deque);

4.深入探究Redisson的延遲隊(duì)列實(shí)現(xiàn)原理

我們首先來(lái)了解兩個(gè)API

  • RBlockingQueue 就是目標(biāo)隊(duì)列

  • RDelayedQueue 就是中轉(zhuǎn)隊(duì)列

那么為什么會(huì)涉及到兩個(gè)隊(duì)列呢,這兩個(gè)隊(duì)列到底有什么用呢途乃?

首先我們實(shí)際操作的是RBlockingQueue阻塞隊(duì)列绍傲,并不是RDelayedQueue隊(duì)列,RDelayedQueue對(duì)接主要是提供中間轉(zhuǎn)發(fā)的一個(gè)隊(duì)列耍共,類(lèi)似中間商的意思

畫(huà)個(gè)小圖理解下

image-20220701153738619.png

這里不難看出我們都是基于RBlockingQueue 目標(biāo)隊(duì)列在進(jìn)行消費(fèi)烫饼,而RDelayedQueue就是會(huì)把過(guò)期的消息放入到我們的目標(biāo)隊(duì)列中

我們只要從RBlockingQueue隊(duì)列中取數(shù)據(jù)即可。

好像還是不夠深入试读,我們接著看杠纵。我們知道Redisson是基于redis來(lái)實(shí)現(xiàn)的那么我們看看里面到底做了什么事

打開(kāi)redis客戶端,執(zhí)行monitor命令钩骇,看下在執(zhí)行上面訂單操作時(shí)redis到底執(zhí)行了哪些命令

monitor命令可以看到操作redis時(shí)執(zhí)行了什么命令

// 這里訂閱了一個(gè)固定的隊(duì)列 redisson_delay_queue_channel:{my-test}淡诗,為了開(kāi)啟進(jìn)程里面的延時(shí)任務(wù)
"SUBSCRIBE" "redisson_delay_queue_channel:{my-test}"

// Redis Zrangebyscore 返回有序集合中指定分?jǐn)?shù)區(qū)間的成員列表。有序集成員按分?jǐn)?shù)值遞增(從小到大)次序排列伊履。
// redisson_delay_queue_channel:{my-test} 是一個(gè)zset韩容,當(dāng)有延時(shí)數(shù)據(jù)存入Redisson隊(duì)列時(shí),就會(huì)在此隊(duì)列中插入 數(shù)據(jù)唐瀑,排序分?jǐn)?shù)為延時(shí)的時(shí)間戳(毫秒 以下同理)群凶。
"zrangebyscore" "redisson_delay_queue_timeout:{my-test}" "0" "1656404479385" "limit" "0" "100"

// 取出第一個(gè)數(shù),也就是判斷上面執(zhí)行的操作是否有下一頁(yè)哄辣。(因?yàn)閯倓傞_(kāi)始總是0的)除非是之前的操作(zrangebyscore)沒(méi)有取完
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0" "WITHSCORES"

// 往zset里面設(shè)置 數(shù)據(jù)過(guò)期的時(shí)間戳(當(dāng)前執(zhí)行的時(shí)間戳+延時(shí)的時(shí)間毫秒值)內(nèi)容就是訂單數(shù)據(jù)
"zadd" "redisson_delay_queue_timeout:{my-test}" "1656404489400" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 同步一份數(shù)據(jù)到list隊(duì)列
"rpush" "redisson_delay_queue:{my-test}" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 取出排序好的第一個(gè)數(shù)據(jù)请梢,也就是最臨近要觸發(fā)的數(shù)據(jù)赠尾,然后發(fā)送通知
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0"

// 發(fā)送通知  之前第一步 SUBSCRIBE 訂閱了 客戶端收到通知后,就在自己進(jìn)程里面開(kāi)啟延時(shí)任務(wù)(HashedWheelTimer)毅弧,到時(shí)間后就可以從redis取數(shù)據(jù)發(fā)送
"publish" "redisson_delay_queue_channel:{my-test}" "1656404489400"

// 這里就是取數(shù)據(jù)環(huán)節(jié)了
"BLPOP" "my-test" "0"

// 在范圍 0-過(guò)期時(shí)間  取出100條數(shù)據(jù)
"zrangebyscore" "redisson_delay_queue_timeout:{my-test}" "0" "1656404489444" "limit" "0" "100"

// 將上面取到的數(shù)據(jù)push到阻塞隊(duì)列 很顯然能看到 com.example.mytest.domain.OrderInfo 是我們的訂單數(shù)據(jù)
"rpush" "my-test" "{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 刪除數(shù)據(jù)
"lrem" "redisson_delay_queue:{my-test}" "1" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
"zrem" "redisson_delay_queue_timeout:{my-test}" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 取zset第一個(gè)數(shù)據(jù)气嫁,有的話繼續(xù)上面邏輯取數(shù)據(jù)
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0" "WITHSCORES"

// 退訂
"UNSUBSCRIBE" "redisson_delay_queue_channel:{my-test}"

這里參考:https://zhuanlan.zhihu.com/p/343811173

我們知道Zset是按照分?jǐn)?shù)升序的也就是最小的分?jǐn)?shù)在最前面,基于這個(gè)特點(diǎn)够坐,大致明白寸宵,利用過(guò)期時(shí)間的時(shí)間戳作為分?jǐn)?shù)放入到Zset中,那么即將過(guò)期的就在最上面元咙。

直接上個(gè)圖解


image-20220701155445411.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末梯影,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子庶香,更是在濱河造成了極大的恐慌甲棍,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件赶掖,死亡現(xiàn)場(chǎng)離奇詭異感猛,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)奢赂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門(mén)唱遭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人呈驶,你說(shuō)我怎么就攤上這事拷泽。” “怎么了袖瞻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵司致,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我聋迎,道長(zhǎng)脂矫,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任霉晕,我火速辦了婚禮庭再,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘牺堰。我一直安慰自己拄轻,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布伟葫。 她就那樣靜靜地躺著恨搓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上斧抱,一...
    開(kāi)封第一講書(shū)人閱讀 49,166評(píng)論 1 284
  • 那天常拓,我揣著相機(jī)與錄音,去河邊找鬼辉浦。 笑死弄抬,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的宪郊。 我是一名探鬼主播掂恕,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼废膘!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起慕蔚,我...
    開(kāi)封第一講書(shū)人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤丐黄,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后孔飒,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體灌闺,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年坏瞄,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了桂对。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鸠匀,死狀恐怖蕉斜,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情缀棍,我是刑警寧澤宅此,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布,位于F島的核電站爬范,受9級(jí)特大地震影響父腕,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望否过。 院中可真熱鬧穿仪,春花似錦、人聲如沸蛋济。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)躬络。三九已至,卻和暖如春搭儒,著一層夾襖步出監(jiān)牢的瞬間穷当,已是汗流浹背提茁。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留馁菜,地道東北人茴扁。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像汪疮,于是被迫代替她去往敵國(guó)和親峭火。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容