DelayQueue之通用組件

我們產(chǎn)品的的業(yè)務(wù)中有那么一個場景邦尊,在醫(yī)生關(guān)閉問診的3min后嚎莉,患者將無法繼續(xù)和醫(yī)生進(jìn)行對話。我根據(jù)對業(yè)務(wù)的理解桌硫,和對技術(shù)實(shí)現(xiàn)成本的衡量夭咬,決定采用通過DelayQueue的方式來實(shí)現(xiàn)的方案。

關(guān)于DelayQueue的相關(guān)內(nèi)容介紹和核心源碼解析已在上一篇DelayQueue之源碼分析說明了铆隘。

據(jù)我所知卓舵,生活中有如下場景可以用得到DelayQueue:
1、下單后一段時間(業(yè)內(nèi)基本上都是30分鐘)內(nèi)不付款膀钠,就自動取消訂單掏湾。
2裹虫、提交打車申請后,一段時間內(nèi)(比如說30秒)沒有附近的司機(jī)接單融击,就自動發(fā)送發(fā)送給更多的司機(jī)筑公。

這類場景都有如下特點(diǎn):
1、需要有一段時間的延遲尊浪,如果僅僅是為了異步執(zhí)行匣屡,那么消息隊列顯然是是更優(yōu)的選擇。
2际长、對執(zhí)行時間的精確度有一定要求耸采,當(dāng)然異常狀況下,也可以對精確度適當(dāng)放寬松工育。比如場景1的訂單取消虾宇,規(guī)則設(shè)置為30分鐘不支付就取消,但實(shí)際場景中如绸,精確到30分自然是最好結(jié)果嘱朽,但假如出現(xiàn)故障,那么在可允許的范圍內(nèi)將訂單取消也是可以接受的(比如說在放寬到32分鐘內(nèi))怔接。
3搪泳、執(zhí)行是高頻率的。這點(diǎn)需要和第2點(diǎn)結(jié)合起來看扼脐,如果僅僅是為了低頻率的定時執(zhí)行岸军,個人認(rèn)為任務(wù)調(diào)度也是可行的。

綜合來看瓦侮,如果不需要延遲執(zhí)行艰赞,那么推薦用消息隊列;如果對執(zhí)行時間的精確度不那么在意且執(zhí)行頻率不高肚吏,那么推薦使用任務(wù)調(diào)度方妖;如果需要延遲執(zhí)行,且執(zhí)行比較高頻罚攀,對執(zhí)行時間的精確度有一定要求党觅,可以考慮使用延遲隊列。
以上這些是我們?yōu)楹尾捎肈elayQueue來實(shí)現(xiàn)這個業(yè)務(wù)場景的原因斋泄。

為了方便使用DelayQueue杯瞻,我封裝了組件對DelayQueue進(jìn)行了擴(kuò)展。

首先我定義了一個類TaskMessage是己,對Delayed進(jìn)行了擴(kuò)展又兵,實(shí)現(xiàn)了compareTo和getDelay方法。
如下是TaskMessage類的核心代碼卒废。

public class TaskMessage implements Delayed {

    private String body;  //消息內(nèi)容
    private long executeTime;//執(zhí)行時間
    private Function function;//執(zhí)行方式
    
    public TaskMessage(Long delayTime,String body,  Function function) {
        this.body = body;
        this.function = function;
        this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed delayed) {
        TaskMessage msg = (TaskMessage) delayed;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -msg.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}

外部調(diào)用只需要TaskMessage m1 = new TaskMessage(delayTime, body, function)就可以生成一個延遲任務(wù)的元素了,內(nèi)部自動就根據(jù)延遲時間計算出這個延遲任務(wù)元素的預(yù)期執(zhí)行時間沛厨。
Function是1.8版引入的函數(shù)式接口,主要方法是R apply(T t)摔认,功能是將Function對象應(yīng)用到輸入的參數(shù)上逆皮,然后返回計算結(jié)果。
那么達(dá)到延遲任務(wù)的預(yù)期執(zhí)行時間時参袱,只需要調(diào)用一下function.apply()方法就可以了电谣,不需要關(guān)心apply的具體實(shí)現(xiàn)。apply的具體實(shí)現(xiàn)方法是在調(diào)用時才明確的抹蚀。

然后定義一個延遲任務(wù)的執(zhí)行線程類TaskConsumer剿牺,實(shí)現(xiàn)了Runnable,重寫了run方法环壤。因為延遲任務(wù)的執(zhí)行晒来,必然是需要重新起線程去執(zhí)行的,不能阻礙主線程的操作郑现。
如下是TaskConsumer類的核心代碼湃崩。

public class TaskConsumer implements Runnable {
        
    @Override
    public void run() {
        while (signal) {
            try {
                TaskMessage take = queue.take();
                if (logger.isInfoEnabled()) {
                    logger.info("處理線程的id為" + threadId + ",消費(fèi)消息內(nèi)容為:" + take.getBody() + ",預(yù)計執(zhí)行時間為" +
                            DateFormatUtils.timeStampToString(take.getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
                }
                take.getFunction().apply(take.getBody());
            } catch (InterruptedException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("id為" + threadId + "的處理線程被強(qiáng)制中斷");
                }
            } catch (Exception e) {
                logger.error("taskConsumer error", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("id為" + threadId + "的處理線程已停止");
        }
    }
}

這個類核心代碼就只有如下兩行。
TaskMessage take = queue.take(); 獲取延遲隊列的隊首元素接箫。前文已經(jīng)解釋過攒读,Queue的take方法會返回隊列的隊首元素,否則就會掛起線程辛友。所以只要有返回值薄扁,必然就能獲取到當(dāng)前需要執(zhí)行的TaskMessage元素。
take.getFunction().apply(take.getBody()); 執(zhí)行延遲任務(wù)元素的apply方法废累。applay方法是在定義TaskMessage的時候確定的邓梅,表明了到達(dá)預(yù)期執(zhí)行時間所需要進(jìn)行的一系列操作,那么此時只需要執(zhí)行對應(yīng)的apply方法就可以了九默。

最后是加載TaskConsumer的統(tǒng)一管理類TaskManager震放。
如下是TaskManager類的核心代碼。

public class TaskManager implements ApplicationContextAware,
        InitializingBean,DisposableBean{

   
    @Override
    public void afterPropertiesSet() throws Exception {
        for (int i = 0; i < threadCount; i++) {
            TaskConsumer taskConsumer = new TaskConsumer(queue, i);
            taskConsumerList.add(taskConsumer);
            Thread thread = new Thread(taskConsumer);
            threadList.add(thread);
            thread.start();
        }
    }

    @Override
    public void destroy() throws Exception {
        for(int i=0;i<threadList.size();i++){
            threadList.get(i).interrupt();
            taskConsumerList.get(i).setSignal(Boolean.FALSE);
        }
    }
}

這個類的作用在于初始化類后驼修,就啟動線程不斷的去獲取延遲任務(wù)殿遂。然后在銷毀類后,先中斷消費(fèi)者線程乙各,然后設(shè)置信號量使得消費(fèi)者線程的run方法能跳出死循環(huán)墨礁,從而使得消費(fèi)線程正常結(jié)束。

最后是如何調(diào)用的示例耳峦。很簡單恩静,就只有兩步:
1、生成延遲任務(wù)元素taskMessage
2、將taskMessage添加到延遲隊列中

TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
        function -> this.processTask(delayTaskMessage));
DelayQueue<TaskMessage> queue = taskManager.getQueue();
queue.offer(taskMessage);

ok驶乾,以上是如何擴(kuò)展DelayQueue的功能構(gòu)造成高可用的組件的方案邑飒,歡迎大家來一起討論。

下一章我準(zhǔn)備講一下我們項目中運(yùn)用DelayQueue的過程中碰到的問題以及如何持久化的方案级乐。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末疙咸,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子风科,更是在濱河造成了極大的恐慌撒轮,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,080評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贼穆,死亡現(xiàn)場離奇詭異题山,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)故痊,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,422評論 3 385
  • 文/潘曉璐 我一進(jìn)店門顶瞳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人崖蜜,你說我怎么就攤上這事浊仆。” “怎么了豫领?”我有些...
    開封第一講書人閱讀 157,630評論 0 348
  • 文/不壞的土叔 我叫張陵抡柿,是天一觀的道長。 經(jīng)常有香客問我等恐,道長洲劣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,554評論 1 284
  • 正文 為了忘掉前任课蔬,我火速辦了婚禮囱稽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘二跋。我一直安慰自己战惊,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,662評論 6 386
  • 文/花漫 我一把揭開白布扎即。 她就那樣靜靜地躺著吞获,像睡著了一般。 火紅的嫁衣襯著肌膚如雪谚鄙。 梳的紋絲不亂的頭發(fā)上各拷,一...
    開封第一講書人閱讀 49,856評論 1 290
  • 那天,我揣著相機(jī)與錄音闷营,去河邊找鬼烤黍。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的速蕊。 我是一名探鬼主播嫂丙,決...
    沈念sama閱讀 39,014評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼互例!你這毒婦竟也來了奢入?” 一聲冷哼從身側(cè)響起筝闹,我...
    開封第一講書人閱讀 37,752評論 0 268
  • 序言:老撾萬榮一對情侶失蹤媳叨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后关顷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體糊秆,經(jīng)...
    沈念sama閱讀 44,212評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,541評論 2 327
  • 正文 我和宋清朗相戀三年议双,在試婚紗的時候發(fā)現(xiàn)自己被綠了痘番。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,687評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡平痰,死狀恐怖汞舱,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宗雇,我是刑警寧澤昂芜,帶...
    沈念sama閱讀 34,347評論 4 331
  • 正文 年R本政府宣布,位于F島的核電站赔蒲,受9級特大地震影響泌神,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜舞虱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,973評論 3 315
  • 文/蒙蒙 一欢际、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧矾兜,春花似錦损趋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,777評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至配并,卻和暖如春括荡,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背溉旋。 一陣腳步聲響...
    開封第一講書人閱讀 32,006評論 1 266
  • 我被黑心中介騙來泰國打工畸冲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 46,406評論 2 360
  • 正文 我出身青樓邑闲,卻偏偏與公主長得像算行,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子苫耸,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,576評論 2 349

推薦閱讀更多精彩內(nèi)容