何為延遲隊(duì)列
隊(duì)列,即先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)否淤,就和食堂打飯一樣悄但,排在最前面的先打飯,打完飯就走石抡;延遲隊(duì)列即隊(duì)列中的元素相比以往多了一個(gè)屬性特征:延遲檐嚣。延遲隊(duì)列中的每個(gè)元素都指定了延遲時(shí)間,表示該元素到達(dá)指定時(shí)間之后將出隊(duì)進(jìn)行處理啰扛。其實(shí)從上述定義來看嚎京,與其說是延遲隊(duì)列,不如說它是一個(gè)以時(shí)間為權(quán)重的最小堆結(jié)構(gòu)隐解。
那么延遲隊(duì)列有什么用呢鞍帝?我們生活中其實(shí)平時(shí)接觸到很多可以使用延遲隊(duì)列來解決的例子:
- 訂單超時(shí)30分鐘未付款將自動(dòng)關(guān)閉
- 會(huì)議系統(tǒng)中,會(huì)議開始前10分鐘煞茫,發(fā)送會(huì)議提醒
- 夏天晚上時(shí)帕涌,我們經(jīng)常會(huì)給空調(diào)設(shè)置指定時(shí)長的時(shí)間,到時(shí)空調(diào)自動(dòng)關(guān)閉
- 再比如微波爐续徽、烤箱蚓曼、等等
可以發(fā)現(xiàn)延遲隊(duì)列想要實(shí)現(xiàn)的功能其實(shí)就是一個(gè)定時(shí)任務(wù)調(diào)度的一種。
延遲隊(duì)列實(shí)現(xiàn)方式
延遲隊(duì)列實(shí)現(xiàn)的方式有很多種炸宵,具體采用哪種去實(shí)現(xiàn)辟躏,和我們的業(yè)務(wù)背景、業(yè)務(wù)訴求都息息相關(guān)土全,不同的實(shí)現(xiàn)方式都有其適用的應(yīng)用場景捎琐,我這里將延遲隊(duì)列分為兩類:單機(jī)延遲隊(duì)列和分布式延遲隊(duì)列。
單機(jī)實(shí)現(xiàn)
JDK 提供了DelayedQueue可以實(shí)現(xiàn)延遲隊(duì)列的目的裹匙。其類圖如下:
可以看到DelayedQueue是一個(gè)阻塞隊(duì)列瑞凑,其隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口:
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
其中g(shù)etDelay返回代表該元素的一個(gè)在隊(duì)列中可存在的時(shí)間,通過這種方式來實(shí)現(xiàn)元素的延遲彈出概页。接下來看訂單超時(shí)30秒將自動(dòng)關(guān)閉的實(shí)際例子:
public class JDKDelayQueueTest {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DelayQueue<Order> DELAY_QUEUE = new DelayQueue<>();
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws Exception {
EXECUTOR_SERVICE.submit(() -> {
while (true) {
if (!DELAY_QUEUE.isEmpty()) {
Order order = DELAY_QUEUE.poll();
if (order != null) {
System.out.println(order.getOrderId() + " 超時(shí)關(guān)閉與:" + FORMATTER.format(LocalDateTime.now()));
}
}
TimeUnit.MILLISECONDS.sleep(1000);
}
});
EXECUTOR_SERVICE.submit(() -> {
try {
DELAY_QUEUE.add(new Order("黃燜雞訂單"));
TimeUnit.SECONDS.sleep(5);
DELAY_QUEUE.add(new Order("麻辣香鍋訂單"));
TimeUnit.SECONDS.sleep(10);
DELAY_QUEUE.add(new Order("石鍋拌飯訂單"));
} catch (Exception e) {
}
});
}
public static class Order implements Delayed {
private final LocalDateTime expireTime;
private final String orderId;
public Order(String orderId) {
this.expireTime = LocalDateTime.now().plusSeconds(30);
this.orderId = orderId;
System.out.println(orderId + " 創(chuàng)建于:" + FORMATTER.format(LocalDateTime.now()));
}
@Override
public long getDelay(TimeUnit unit) {
return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
}
@Override
public int compareTo(Delayed targetOrder) {
// 誰的過期時(shí)間最早誰就排最前面
return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
}
public String getOrderId() {
return orderId;
}
public LocalDateTime getExpireTime() {
return expireTime;
}
}
}
輸出:
黃燜雞訂單 創(chuàng)建于:2021-08-21 18:26:30
麻辣香鍋訂單 創(chuàng)建于:2021-08-21 18:26:35
石鍋拌飯訂單 創(chuàng)建于:2021-08-21 18:26:45
黃燜雞訂單 超時(shí)關(guān)閉與:2021-08-21 18:27:00
麻辣香鍋訂單 超時(shí)關(guān)閉與:2021-08-21 18:27:05
石鍋拌飯訂單 超時(shí)關(guān)閉與:2021-08-21 18:27:15
DelayQueue實(shí)現(xiàn)方式小結(jié)
這種方式的優(yōu)點(diǎn)就是實(shí)現(xiàn)簡單籽御,不復(fù)雜,但是其缺點(diǎn)也比較多:不具備可擴(kuò)展性,內(nèi)存限制技掏、無持久化機(jī)制铃将,數(shù)據(jù)容易丟失。
分布式實(shí)現(xiàn)
數(shù)據(jù)庫輪詢
數(shù)據(jù)庫論詢的方式相對而言也比較好理解哑梳,后臺(tái)啟動(dòng)定時(shí)任務(wù)每隔一段時(shí)間掃描指定的數(shù)據(jù)庫表每一行數(shù)據(jù)劲阎,獲取出到達(dá)指定延遲時(shí)間的行進(jìn)行處理,所以使用該方式重要的就三個(gè)要素:
1)撈取任務(wù)
掃描數(shù)據(jù)庫的后臺(tái)任務(wù)鸠真,可以使用分布式任務(wù)去掃悯仙,比如A任務(wù)掃描limit 0,100滿足條件的數(shù)據(jù)行,B任務(wù)掃描limit 100,200滿足條件的數(shù)據(jù)行
2)執(zhí)行任務(wù)
一般來說講究分工協(xié)作吠卷,第一步中的分布式線程任務(wù)專門用來撈取任務(wù)锡垄,那么撈取到的任務(wù)可以再次扔給另外專門用戶處理任務(wù)的線程中
3)數(shù)據(jù)庫表設(shè)計(jì)
可以在表中增加一個(gè)字段來表示延遲時(shí)間,比如針對上面的訂單超時(shí)30秒關(guān)閉祭隔,我們可以增加一個(gè)字段timeout,可以是此時(shí)間的毫秒數(shù)來記錄訂單的超時(shí)時(shí)間货岭,那么此時(shí)我們的SQL就可以是:
select * from order where ${now} >= timeout limit ${start},100;
數(shù)據(jù)庫輪詢實(shí)現(xiàn)方式小結(jié)
采用這種方式可以看到首先我們需要查詢數(shù)據(jù)庫,那么查詢數(shù)據(jù)庫就意味著存在查詢耗時(shí)序攘,那么可能最終導(dǎo)致的就是實(shí)時(shí)性不高茴她,但是它的優(yōu)點(diǎn)在于天生滿足任務(wù)持久化機(jī)制寻拂,不用擔(dān)心延遲任務(wù)丟失程奠。
通過Redis實(shí)現(xiàn)
Redis的五大數(shù)據(jù)類型中的zset數(shù)據(jù)類型中,包含一個(gè)稱為score的屬性祭钉,該數(shù)據(jù)類型中所有元素都會(huì)按照score進(jìn)行排序瞄沙,所以如果將score作為我們的延遲時(shí)間的時(shí)間戳,那么我們可以通過命令Zrangebyscore來獲取滿足條件的數(shù)據(jù)慌核,然后交給我們的任務(wù)處理線程去處理距境,其實(shí)整個(gè)實(shí)現(xiàn)思想和數(shù)據(jù)庫輪循是一樣的,只不過數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)由數(shù)據(jù)庫轉(zhuǎn)變成了redis垮卓,準(zhǔn)確來說redis也是數(shù)據(jù)庫垫桂,只不過不同的存儲(chǔ)結(jié)構(gòu)帶來的影響就是適用場景的不同罷了。
那么如果通過Redis來實(shí)現(xiàn)延遲隊(duì)列粟按,大概會(huì)有如下幾步:
1) 增加任務(wù)
zadd tasks ${過期時(shí)間戳} ${任務(wù)相關(guān)數(shù)據(jù)}
2)撈取任務(wù)
ZRANGEBYSCORE tasks -inf ${當(dāng)前時(shí)間戳} WITHSCORES
撈取過期時(shí)間早于當(dāng)前時(shí)間的這部分任務(wù)
3)執(zhí)行任務(wù)
接下來就是執(zhí)行诬滩,這個(gè)就沒什么好說的了
關(guān)于redis zset數(shù)據(jù)結(jié)構(gòu)以及命令可以看這里:https://www.runoob.com/redis/redis-sorted-sets.html
一些優(yōu)化點(diǎn)
1.在添加延遲任務(wù)時(shí),可以通過對任務(wù)id進(jìn)行hash分散至多個(gè)redis key灭将,可以避免所有任務(wù)存儲(chǔ)在一個(gè)key中導(dǎo)致大key從而影響元素的添加和查找性能
2.每個(gè)key獨(dú)自擁有一個(gè)線程處理
3.每個(gè)key的線程只負(fù)責(zé)拉取需要處理的數(shù)據(jù)疼鸟,然后再轉(zhuǎn)發(fā)至消息隊(duì)列中,不做任何其他處理庙曙,可以提升處理速度空镜,消息消費(fèi)者可擴(kuò)展性好,性能不夠,機(jī)器來湊
redis實(shí)現(xiàn)方式小結(jié)
redis因?yàn)槠涠际莾?nèi)存中操作吴攒,所以查詢插入速度和mysql來比都是非痴懦快的,所以實(shí)時(shí)性會(huì)比mysql高洼怔,雖然redis也能滿足任務(wù)數(shù)據(jù)的持久化欣鳖,但是無法保證任務(wù)不丟失,所以這里持久性會(huì)比mysql稍弱一點(diǎn)
不同實(shí)現(xiàn)方式的對比
實(shí)現(xiàn)方式 | 復(fù)雜度 | 數(shù)據(jù)量 | 持久化茴厉,數(shù)據(jù)丟失 | 擴(kuò)展性 | 實(shí)時(shí)性 |
---|---|---|---|---|---|
jdk DelayQueue | 簡單 | 由于程序內(nèi)存限制泽台,適用于少數(shù)據(jù)量 | 無持久化 | 差 | 高 |
mysql 輪詢 | 稍微復(fù)雜 | 可支持大數(shù)據(jù)量 | 可保證持久化,保證任務(wù)不丟失 | 可擴(kuò)展 | 由于查詢開銷矾缓,稍弱 |
redis zet | 稍微復(fù)雜 | 可支持大數(shù)據(jù)量 | 可盡量保證持久化怀酷,不保證任務(wù)不丟失 | 可擴(kuò)展 | 高 |
結(jié)語
除了以上實(shí)現(xiàn)方式,還有其他比如通過Rabbit MQ的TTL和死信隊(duì)列來實(shí)現(xiàn):每一個(gè)消息帶有TTL屬性嗜闻,該TTL即延遲任務(wù)的延遲時(shí)間蜕依,只要超過指定時(shí)間沒被消費(fèi),此消息將被轉(zhuǎn)至死信隊(duì)列中琉雳,我們可以監(jiān)聽死信隊(duì)列消費(fèi)消息進(jìn)而達(dá)到延遲任務(wù)的目的样眠;還有時(shí)間輪轉(zhuǎn)算法等,時(shí)間有限翠肘,日后再學(xué)檐束,日后再講。
參考
[1] https://zhuanlan.zhihu.com/p/266156267
[2] https://hiddenpps.blog.csdn.net/article/details/108988992