實(shí)現(xiàn)延遲隊(duì)列徽鼎,這些你知道嗎盛末?

何為延遲隊(duì)列

隊(duì)列,即先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)否淤,就和食堂打飯一樣悄但,排在最前面的先打飯,打完飯就走石抡;延遲隊(duì)列即隊(duì)列中的元素相比以往多了一個(gè)屬性特征:延遲檐嚣。延遲隊(duì)列中的每個(gè)元素都指定了延遲時(shí)間,表示該元素到達(dá)指定時(shí)間之后將出隊(duì)進(jìn)行處理啰扛。其實(shí)從上述定義來看嚎京,與其說是延遲隊(duì)列,不如說它是一個(gè)以時(shí)間為權(quán)重最小堆結(jié)構(gòu)隐解。

那么延遲隊(duì)列有什么用呢鞍帝?我們生活中其實(shí)平時(shí)接觸到很多可以使用延遲隊(duì)列來解決的例子:

  • 訂單超時(shí)30分鐘未付款將自動(dòng)關(guān)閉
  • 會(huì)議系統(tǒng)中,會(huì)議開始前10分鐘煞茫,發(fā)送會(huì)議提醒
  • 夏天晚上時(shí)帕涌,我們經(jīng)常會(huì)給空調(diào)設(shè)置指定時(shí)長的時(shí)間,到時(shí)空調(diào)自動(dòng)關(guān)閉
  • 再比如微波爐续徽、烤箱蚓曼、等等

可以發(fā)現(xiàn)延遲隊(duì)列想要實(shí)現(xiàn)的功能其實(shí)就是一個(gè)定時(shí)任務(wù)調(diào)度的一種。

延遲隊(duì)列實(shí)現(xiàn)方式

延遲隊(duì)列實(shí)現(xiàn)的方式有很多種炸宵,具體采用哪種去實(shí)現(xiàn)辟躏,和我們的業(yè)務(wù)背景、業(yè)務(wù)訴求都息息相關(guān)土全,不同的實(shí)現(xiàn)方式都有其適用的應(yīng)用場景捎琐,我這里將延遲隊(duì)列分為兩類:單機(jī)延遲隊(duì)列分布式延遲隊(duì)列

單機(jī)實(shí)現(xiàn)

JDK 提供了DelayedQueue可以實(shí)現(xiàn)延遲隊(duì)列的目的裹匙。其類圖如下:

image.png

可以看到DelayedQueue是一個(gè)阻塞隊(duì)列瑞凑,其隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

其中g(shù)etDelay返回代表該元素的一個(gè)在隊(duì)列中可存在的時(shí)間,通過這種方式來實(shí)現(xiàn)元素的延遲彈出概页。接下來看訂單超時(shí)30秒將自動(dòng)關(guān)閉的實(shí)際例子:

public class JDKDelayQueueTest {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DelayQueue<Order> DELAY_QUEUE = new DelayQueue<>();

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {

        EXECUTOR_SERVICE.submit(() -> {
            while (true) {
                if (!DELAY_QUEUE.isEmpty()) {
                    Order order = DELAY_QUEUE.poll();
                    if (order != null) {
                        System.out.println(order.getOrderId() + " 超時(shí)關(guān)閉與:" + FORMATTER.format(LocalDateTime.now()));
                    }

                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
        });
        EXECUTOR_SERVICE.submit(() -> {
            try {
                DELAY_QUEUE.add(new Order("黃燜雞訂單"));
                TimeUnit.SECONDS.sleep(5);
                DELAY_QUEUE.add(new Order("麻辣香鍋訂單"));
                TimeUnit.SECONDS.sleep(10);
                DELAY_QUEUE.add(new Order("石鍋拌飯訂單"));
            } catch (Exception e) {

            }

        });

    }

    public static class Order implements Delayed {

        private final LocalDateTime expireTime;
        private final String orderId;

        public Order(String orderId) {
            this.expireTime = LocalDateTime.now().plusSeconds(30);
            this.orderId = orderId;
            System.out.println(orderId + " 創(chuàng)建于:" + FORMATTER.format(LocalDateTime.now()));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
        }

        @Override
        public int compareTo(Delayed targetOrder) {
            // 誰的過期時(shí)間最早誰就排最前面
            return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
        }

        public String getOrderId() {
            return orderId;
        }

        public LocalDateTime getExpireTime() {
            return expireTime;
        }
    }
}

輸出:

黃燜雞訂單 創(chuàng)建于:2021-08-21 18:26:30
麻辣香鍋訂單 創(chuàng)建于:2021-08-21 18:26:35
石鍋拌飯訂單 創(chuàng)建于:2021-08-21 18:26:45
黃燜雞訂單 超時(shí)關(guān)閉與:2021-08-21 18:27:00
麻辣香鍋訂單 超時(shí)關(guān)閉與:2021-08-21 18:27:05
石鍋拌飯訂單 超時(shí)關(guān)閉與:2021-08-21 18:27:15

DelayQueue實(shí)現(xiàn)方式小結(jié)

這種方式的優(yōu)點(diǎn)就是實(shí)現(xiàn)簡單籽御,不復(fù)雜,但是其缺點(diǎn)也比較多:不具備可擴(kuò)展性內(nèi)存限制技掏、無持久化機(jī)制铃将,數(shù)據(jù)容易丟失。

分布式實(shí)現(xiàn)

數(shù)據(jù)庫輪詢

數(shù)據(jù)庫論詢的方式相對而言也比較好理解哑梳,后臺(tái)啟動(dòng)定時(shí)任務(wù)每隔一段時(shí)間掃描指定的數(shù)據(jù)庫表每一行數(shù)據(jù)劲阎,獲取出到達(dá)指定延遲時(shí)間的行進(jìn)行處理,所以使用該方式重要的就三個(gè)要素:

1)撈取任務(wù)
掃描數(shù)據(jù)庫的后臺(tái)任務(wù)鸠真,可以使用分布式任務(wù)去掃悯仙,比如A任務(wù)掃描limit 0,100滿足條件的數(shù)據(jù)行,B任務(wù)掃描limit 100,200滿足條件的數(shù)據(jù)行

2)執(zhí)行任務(wù)
一般來說講究分工協(xié)作吠卷,第一步中的分布式線程任務(wù)專門用來撈取任務(wù)锡垄,那么撈取到的任務(wù)可以再次扔給另外專門用戶處理任務(wù)的線程中

3)數(shù)據(jù)庫表設(shè)計(jì)
可以在表中增加一個(gè)字段來表示延遲時(shí)間,比如針對上面的訂單超時(shí)30秒關(guān)閉祭隔,我們可以增加一個(gè)字段timeout,可以是此時(shí)間的毫秒數(shù)來記錄訂單的超時(shí)時(shí)間货岭,那么此時(shí)我們的SQL就可以是:

select * from order where ${now} >= timeout limit ${start},100;
image.png

數(shù)據(jù)庫輪詢實(shí)現(xiàn)方式小結(jié)

采用這種方式可以看到首先我們需要查詢數(shù)據(jù)庫,那么查詢數(shù)據(jù)庫就意味著存在查詢耗時(shí)序攘,那么可能最終導(dǎo)致的就是實(shí)時(shí)性不高茴她,但是它的優(yōu)點(diǎn)在于天生滿足任務(wù)持久化機(jī)制寻拂,不用擔(dān)心延遲任務(wù)丟失程奠。

通過Redis實(shí)現(xiàn)

Redis的五大數(shù)據(jù)類型中的zset數(shù)據(jù)類型中,包含一個(gè)稱為score的屬性祭钉,該數(shù)據(jù)類型中所有元素都會(huì)按照score進(jìn)行排序瞄沙,所以如果將score作為我們的延遲時(shí)間的時(shí)間戳,那么我們可以通過命令Zrangebyscore來獲取滿足條件的數(shù)據(jù)慌核,然后交給我們的任務(wù)處理線程去處理距境,其實(shí)整個(gè)實(shí)現(xiàn)思想和數(shù)據(jù)庫輪循是一樣的,只不過數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)由數(shù)據(jù)庫轉(zhuǎn)變成了redis垮卓,準(zhǔn)確來說redis也是數(shù)據(jù)庫垫桂,只不過不同的存儲(chǔ)結(jié)構(gòu)帶來的影響就是適用場景的不同罷了。

那么如果通過Redis來實(shí)現(xiàn)延遲隊(duì)列粟按,大概會(huì)有如下幾步:

1) 增加任務(wù)

zadd tasks ${過期時(shí)間戳} ${任務(wù)相關(guān)數(shù)據(jù)}

2)撈取任務(wù)

ZRANGEBYSCORE tasks -inf ${當(dāng)前時(shí)間戳} WITHSCORES

撈取過期時(shí)間早于當(dāng)前時(shí)間的這部分任務(wù)

3)執(zhí)行任務(wù)
接下來就是執(zhí)行诬滩,這個(gè)就沒什么好說的了

關(guān)于redis zset數(shù)據(jù)結(jié)構(gòu)以及命令可以看這里:https://www.runoob.com/redis/redis-sorted-sets.html

一些優(yōu)化點(diǎn)

1.在添加延遲任務(wù)時(shí),可以通過對任務(wù)id進(jìn)行hash分散至多個(gè)redis key灭将,可以避免所有任務(wù)存儲(chǔ)在一個(gè)key中導(dǎo)致大key從而影響元素的添加和查找性能

2.每個(gè)key獨(dú)自擁有一個(gè)線程處理

3.每個(gè)key的線程只負(fù)責(zé)拉取需要處理的數(shù)據(jù)疼鸟,然后再轉(zhuǎn)發(fā)至消息隊(duì)列中,不做任何其他處理庙曙,可以提升處理速度空镜,消息消費(fèi)者可擴(kuò)展性好,性能不夠,機(jī)器來湊

redis實(shí)現(xiàn)方式小結(jié)

redis因?yàn)槠涠际莾?nèi)存中操作吴攒,所以查詢插入速度和mysql來比都是非痴懦快的,所以實(shí)時(shí)性會(huì)比mysql高洼怔,雖然redis也能滿足任務(wù)數(shù)據(jù)的持久化欣鳖,但是無法保證任務(wù)不丟失,所以這里持久性會(huì)比mysql稍弱一點(diǎn)

不同實(shí)現(xiàn)方式的對比

實(shí)現(xiàn)方式 復(fù)雜度 數(shù)據(jù)量 持久化茴厉,數(shù)據(jù)丟失 擴(kuò)展性 實(shí)時(shí)性
jdk DelayQueue 簡單 由于程序內(nèi)存限制泽台,適用于少數(shù)據(jù)量 無持久化
mysql 輪詢 稍微復(fù)雜 可支持大數(shù)據(jù)量 可保證持久化,保證任務(wù)不丟失 可擴(kuò)展 由于查詢開銷矾缓,稍弱
redis zet 稍微復(fù)雜 可支持大數(shù)據(jù)量 可盡量保證持久化怀酷,不保證任務(wù)不丟失 可擴(kuò)展

結(jié)語

除了以上實(shí)現(xiàn)方式,還有其他比如通過Rabbit MQTTL死信隊(duì)列來實(shí)現(xiàn):每一個(gè)消息帶有TTL屬性嗜闻,該TTL即延遲任務(wù)的延遲時(shí)間蜕依,只要超過指定時(shí)間沒被消費(fèi),此消息將被轉(zhuǎn)至死信隊(duì)列中琉雳,我們可以監(jiān)聽死信隊(duì)列消費(fèi)消息進(jìn)而達(dá)到延遲任務(wù)的目的样眠;還有時(shí)間輪轉(zhuǎn)算法等,時(shí)間有限翠肘,日后再學(xué)檐束,日后再講。

image.png

參考

[1] https://zhuanlan.zhihu.com/p/266156267
[2] https://hiddenpps.blog.csdn.net/article/details/108988992

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末束倍,一起剝皮案震驚了整個(gè)濱河市被丧,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌绪妹,老刑警劉巖甥桂,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異邮旷,居然都是意外死亡黄选,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進(jìn)店門婶肩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來办陷,“玉大人,你說我怎么就攤上這事狡孔《” “怎么了?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵苗膝,是天一觀的道長殃恒。 經(jīng)常有香客問我植旧,道長,這世上最難降的妖魔是什么离唐? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任病附,我火速辦了婚禮,結(jié)果婚禮上亥鬓,老公的妹妹穿的比我還像新娘完沪。我一直安慰自己,他們只是感情好嵌戈,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布覆积。 她就那樣靜靜地躺著,像睡著了一般熟呛。 火紅的嫁衣襯著肌膚如雪宽档。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天庵朝,我揣著相機(jī)與錄音吗冤,去河邊找鬼。 笑死九府,一個(gè)胖子當(dāng)著我的面吹牛椎瘟,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播侄旬,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼肺蔚,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了勾怒?” 一聲冷哼從身側(cè)響起婆排,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤声旺,失蹤者是張志新(化名)和其女友劉穎笔链,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體腮猖,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鉴扫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了澈缺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片坪创。...
    茶點(diǎn)故事閱讀 40,912評論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖姐赡,靈堂內(nèi)的尸體忽然破棺而出莱预,到底是詐尸還是另有隱情,我是刑警寧澤项滑,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布依沮,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏危喉。R本人自食惡果不足惜宋渔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望辜限。 院中可真熱鬧皇拣,春花似錦、人聲如沸薄嫡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽毫深。三九已至态蒂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間费什,已是汗流浹背钾恢。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鸳址,地道東北人瘩蚪。 一個(gè)月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像稿黍,于是被迫代替她去往敵國和親疹瘦。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,922評論 2 361

推薦閱讀更多精彩內(nèi)容