Redisson實(shí)現(xiàn)延遲隊(duì)列
1.場(chǎng)景介紹
假設(shè)有這樣一個(gè)場(chǎng)景仇祭,我們有一個(gè)訂單醋奠,或者工單等等贱纠。需要在超時(shí)30分鐘后進(jìn)行關(guān)閉莫换。這個(gè)時(shí)候我們最先想到的應(yīng)該是采用定時(shí)任務(wù)去進(jìn)行輪訓(xùn)判斷霞玄,但是呢,每個(gè)訂單的創(chuàng)建時(shí)間是不一樣的拉岁,這個(gè)時(shí)間怎么確定才好呢坷剧,5分鐘。喊暖。1分鐘惫企。。執(zhí)行一次嗎陵叽。這樣就會(huì)非常影響性能狞尔。且時(shí)間誤差很大」簦基于以上業(yè)務(wù)需要我們想到了有以下解決方案偏序。
- JDK延遲隊(duì)列,但是數(shù)據(jù)都在內(nèi)存中,重啟后什么都沒(méi)了胖替。
- MQ中的延遲隊(duì)列研儒,比如RocketMQ。
- 基于Redisson的延遲隊(duì)列
2.JDK延遲隊(duì)列
我們首先來(lái)回顧下JDK的延遲隊(duì)列
基于延遲隊(duì)列要實(shí)現(xiàn)接口Delayed
独令,并且實(shí)現(xiàn)getDelay
方法和compareTo
方法
-
getDelay
主要是計(jì)算返回剩余時(shí)間端朵,單位時(shí)間戳(毫秒)延遲任務(wù)是否到時(shí)就是按照這個(gè)方法判斷如果返回的是負(fù)數(shù)則說(shuō)明到期否則還沒(méi)到期 -
compareTo
主要是自定義實(shí)現(xiàn)比較方法返回 1 0 -1三個(gè)參數(shù)
@ToString
public class MyDelayed<T> implements Delayed {
/**
* 延遲時(shí)間
*/
Long delayTime;
/**
* 過(guò)期時(shí)間
*/
Long expire;
/**
* 數(shù)據(jù)
*/
T t;
public MyDelayed(long delayTime, T t) {
this.delayTime = delayTime;
// 過(guò)期時(shí)間 = 當(dāng)前時(shí)間 + 延遲時(shí)間
this.expire = System.currentTimeMillis() + delayTime;
this.t = t;
}
/**
* 剩余時(shí)間 = 到期時(shí)間 - 當(dāng)前時(shí)間
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 優(yōu)先級(jí)規(guī)則:兩個(gè)任務(wù)比較,時(shí)間短的優(yōu)先執(zhí)行
*/
@Override
public int compareTo(Delayed o) {
long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (int) f;
}
訂單的實(shí)體,為了簡(jiǎn)單就定義基礎(chǔ)幾個(gè)字段燃箭。
@Data
public class OrderInfo implements Serializable {
private static final long serialVersionUID = -2837036864073566484L;
/**
* 訂單id
*/
private Long id;
/**
* 訂單金額
*/
private Double salary;
/**
* 訂單創(chuàng)建時(shí)間 對(duì)于java8LocalDateTime 以下注解序列化反序列化用到
*/
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime createTime;
}
為了簡(jiǎn)單我們暫且定義延遲時(shí)間為10s
public static void main(String[] args) throws InterruptedException {
OrderInfo orderInfo = new OrderInfo();
orderInfo.setCreateTime(LocalDateTimeUtil.parse("2022-07-01 15:00:00", "yyyy-MM-dd HH:mm:ss"));
MyDelayed<OrderInfo> myDelayed = new MyDelayed<>(10000L,orderInfo);
DelayQueue<MyDelayed<OrderInfo>> queue = new DelayQueue<>();
queue.add(myDelayed);
System.out.println(queue.take().getT().getCreateTime());
System.out.println("當(dāng)前時(shí)間:" + LocalDateTime.now());
}
輸出結(jié)果
2022-07-01T15:00
當(dāng)前時(shí)間:2022-07-01T15:10:37.375
3.基于Redisson的延遲隊(duì)列
當(dāng)然今天的主角是它了冲呢,我們主要圍繞著基于Redisson的延遲隊(duì)列來(lái)說(shuō)。
其實(shí)Redisson延遲隊(duì)列內(nèi)部也是基于redis來(lái)實(shí)現(xiàn)的招狸,我們先來(lái)進(jìn)行整合使用看看效果敬拓∪肯妫基于springboot
1.依賴:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.7</version>
</dependency>
2.創(chuàng)建redisson.yml
# 單節(jié)點(diǎn)配置
singleServerConfig:
# 連接空閑超時(shí),單位:毫秒
idleConnectionTimeout: 10000
# 連接超時(shí)恩尾,單位:毫秒
connectTimeout: 10000
# 命令等待超時(shí),單位:毫秒
timeout: 3000
# 命令失敗重試次數(shù),如果嘗試達(dá)到 retryAttempts(命令失敗重試次數(shù)) 仍然不能將命令發(fā)送至某個(gè)指定的節(jié)點(diǎn)時(shí)挽懦,將拋出錯(cuò)誤翰意。
# 如果嘗試在此限制之內(nèi)發(fā)送成功,則開(kāi)始啟用 timeout(命令等待超時(shí)) 計(jì)時(shí)信柿。
retryAttempts: 3
# 命令重試發(fā)送時(shí)間間隔冀偶,單位:毫秒
retryInterval: 1500
# 密碼
password:
# 單個(gè)連接最大訂閱數(shù)量
subscriptionsPerConnection: 5
# 客戶端名稱
clientName: null
# 節(jié)點(diǎn)地址
address: redis://127.0.0.1:6379
# 發(fā)布和訂閱連接的最小空閑連接數(shù)
subscriptionConnectionMinimumIdleSize: 1
# 發(fā)布和訂閱連接池大小
subscriptionConnectionPoolSize: 50
# 最小空閑連接數(shù)
connectionMinimumIdleSize: 32
# 連接池大小
connectionPoolSize: 64
# 數(shù)據(jù)庫(kù)編號(hào)
database: 0
# DNS監(jiān)測(cè)時(shí)間間隔,單位:毫秒
dnsMonitoringInterval: 5000
# 線程池?cái)?shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
#threads: 0
# Netty線程池?cái)?shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
#nettyThreads: 0
# 編碼
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 傳輸模式
transportMode : "NIO"
3.創(chuàng)建配置類(lèi)RedissonConfig,這里是為了讀取我們剛剛創(chuàng)建在配置文件中的yml
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() throws IOException {
Config config = Config.fromYAML(RedissonConfig.class.getClassLoader().getResource("redisson.yml"));;
return Redisson.create(config);
}
}
4.測(cè)試
// redisson 延遲隊(duì)列
// Redisson的延時(shí)隊(duì)列是對(duì)另一個(gè)隊(duì)列的再包裝渔嚷,使用時(shí)要先將延時(shí)消息添加到延時(shí)隊(duì)列中进鸠,
// 當(dāng)延時(shí)隊(duì)列中的消息達(dá)到設(shè)定的延時(shí)時(shí)間后,該延時(shí)消息才會(huì)進(jìn)行進(jìn)入到被包裝隊(duì)列中形病,因此客年,我們只需要對(duì)被包裝隊(duì)列進(jìn)行監(jiān)聽(tīng)即可。
RBlockingQueue<OrderInfo> blockingFairQueue = redissonClient.getBlockingQueue("my-test");
RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
OrderInfo orderInfo = new OrderInfo();
// 訂單生成時(shí)間
orderInfo.setCreateTime(LocalDateTime.now());
// 10秒鐘以后將消息發(fā)送到指定隊(duì)列
delayedQueue.offer(orderInfo, 10, TimeUnit.SECONDS);
RBlockingQueue<OrderInfo> outQueue = redissonClient.getBlockingQueue("my-test");
OrderInfo orderInfo2 = outQueue.take();
System.out.println("訂單生成時(shí)間" + orderInfo2.getCreateTime());
System.out.println("訂單關(guān)閉時(shí)間" + LocalDateTime.now());
// 在該對(duì)象不再需要的情況下漠吻,應(yīng)該主動(dòng)銷(xiāo)毀量瓜。僅在相關(guān)的Redisson對(duì)象也需要關(guān)閉的時(shí)候可以不用主動(dòng)銷(xiāo)毀
delayedQueue.destroy();
控制臺(tái)輸出:
訂單生成時(shí)間2022-07-01T15:22:10.304
訂單關(guān)閉時(shí)間2022-07-01T15:22:20.414
解決項(xiàng)目重新啟動(dòng)并不會(huì)消費(fèi)之前隊(duì)列里的消息的問(wèn)題,增加如下代碼
redissonClient.getDelayedQueue(deque);
4.深入探究Redisson的延遲隊(duì)列實(shí)現(xiàn)原理
我們首先來(lái)了解兩個(gè)API
RBlockingQueue 就是目標(biāo)隊(duì)列
RDelayedQueue 就是中轉(zhuǎn)隊(duì)列
那么為什么會(huì)涉及到兩個(gè)隊(duì)列呢,這兩個(gè)隊(duì)列到底有什么用呢途乃?
首先我們實(shí)際操作的是RBlockingQueue阻塞隊(duì)列绍傲,并不是RDelayedQueue隊(duì)列,RDelayedQueue對(duì)接主要是提供中間轉(zhuǎn)發(fā)的一個(gè)隊(duì)列耍共,類(lèi)似中間商的意思
畫(huà)個(gè)小圖理解下
這里不難看出我們都是基于RBlockingQueue
目標(biāo)隊(duì)列在進(jìn)行消費(fèi)烫饼,而RDelayedQueue
就是會(huì)把過(guò)期的消息放入到我們的目標(biāo)隊(duì)列中
我們只要從RBlockingQueue
隊(duì)列中取數(shù)據(jù)即可。
好像還是不夠深入试读,我們接著看杠纵。我們知道Redisson
是基于redis來(lái)實(shí)現(xiàn)的那么我們看看里面到底做了什么事
打開(kāi)redis客戶端,執(zhí)行monitor命令钩骇,看下在執(zhí)行上面訂單操作時(shí)redis到底執(zhí)行了哪些命令
monitor命令可以看到操作redis時(shí)執(zhí)行了什么命令
// 這里訂閱了一個(gè)固定的隊(duì)列 redisson_delay_queue_channel:{my-test}淡诗,為了開(kāi)啟進(jìn)程里面的延時(shí)任務(wù)
"SUBSCRIBE" "redisson_delay_queue_channel:{my-test}"
// Redis Zrangebyscore 返回有序集合中指定分?jǐn)?shù)區(qū)間的成員列表。有序集成員按分?jǐn)?shù)值遞增(從小到大)次序排列伊履。
// redisson_delay_queue_channel:{my-test} 是一個(gè)zset韩容,當(dāng)有延時(shí)數(shù)據(jù)存入Redisson隊(duì)列時(shí),就會(huì)在此隊(duì)列中插入 數(shù)據(jù)唐瀑,排序分?jǐn)?shù)為延時(shí)的時(shí)間戳(毫秒 以下同理)群凶。
"zrangebyscore" "redisson_delay_queue_timeout:{my-test}" "0" "1656404479385" "limit" "0" "100"
// 取出第一個(gè)數(shù),也就是判斷上面執(zhí)行的操作是否有下一頁(yè)哄辣。(因?yàn)閯倓傞_(kāi)始總是0的)除非是之前的操作(zrangebyscore)沒(méi)有取完
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0" "WITHSCORES"
// 往zset里面設(shè)置 數(shù)據(jù)過(guò)期的時(shí)間戳(當(dāng)前執(zhí)行的時(shí)間戳+延時(shí)的時(shí)間毫秒值)內(nèi)容就是訂單數(shù)據(jù)
"zadd" "redisson_delay_queue_timeout:{my-test}" "1656404489400" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
// 同步一份數(shù)據(jù)到list隊(duì)列
"rpush" "redisson_delay_queue:{my-test}" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
// 取出排序好的第一個(gè)數(shù)據(jù)请梢,也就是最臨近要觸發(fā)的數(shù)據(jù)赠尾,然后發(fā)送通知
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0"
// 發(fā)送通知 之前第一步 SUBSCRIBE 訂閱了 客戶端收到通知后,就在自己進(jìn)程里面開(kāi)啟延時(shí)任務(wù)(HashedWheelTimer)毅弧,到時(shí)間后就可以從redis取數(shù)據(jù)發(fā)送
"publish" "redisson_delay_queue_channel:{my-test}" "1656404489400"
// 這里就是取數(shù)據(jù)環(huán)節(jié)了
"BLPOP" "my-test" "0"
// 在范圍 0-過(guò)期時(shí)間 取出100條數(shù)據(jù)
"zrangebyscore" "redisson_delay_queue_timeout:{my-test}" "0" "1656404489444" "limit" "0" "100"
// 將上面取到的數(shù)據(jù)push到阻塞隊(duì)列 很顯然能看到 com.example.mytest.domain.OrderInfo 是我們的訂單數(shù)據(jù)
"rpush" "my-test" "{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
// 刪除數(shù)據(jù)
"lrem" "redisson_delay_queue:{my-test}" "1" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
"zrem" "redisson_delay_queue_timeout:{my-test}" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
// 取zset第一個(gè)數(shù)據(jù)气嫁,有的話繼續(xù)上面邏輯取數(shù)據(jù)
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0" "WITHSCORES"
// 退訂
"UNSUBSCRIBE" "redisson_delay_queue_channel:{my-test}"
這里參考:https://zhuanlan.zhihu.com/p/343811173
我們知道Zset是按照分?jǐn)?shù)升序的也就是最小的分?jǐn)?shù)在最前面,基于這個(gè)特點(diǎn)够坐,大致明白寸宵,利用過(guò)期時(shí)間的時(shí)間戳作為分?jǐn)?shù)放入到Zset中,那么即將過(guò)期的就在最上面元咙。
直接上個(gè)圖解