我們產(chǎn)品的的業(yè)務(wù)中有那么一個場景邦尊,在醫(yī)生關(guān)閉問診的3min后嚎莉,患者將無法繼續(xù)和醫(yī)生進(jìn)行對話。我根據(jù)對業(yè)務(wù)的理解桌硫,和對技術(shù)實(shí)現(xiàn)成本的衡量夭咬,決定采用通過DelayQueue的方式來實(shí)現(xiàn)的方案。
關(guān)于DelayQueue的相關(guān)內(nèi)容介紹和核心源碼解析已在上一篇DelayQueue之源碼分析說明了铆隘。
據(jù)我所知卓舵,生活中有如下場景可以用得到DelayQueue:
1、下單后一段時間(業(yè)內(nèi)基本上都是30分鐘)內(nèi)不付款膀钠,就自動取消訂單掏湾。
2裹虫、提交打車申請后,一段時間內(nèi)(比如說30秒)沒有附近的司機(jī)接單融击,就自動發(fā)送發(fā)送給更多的司機(jī)筑公。
這類場景都有如下特點(diǎn):
1、需要有一段時間的延遲尊浪,如果僅僅是為了異步執(zhí)行匣屡,那么消息隊列顯然是是更優(yōu)的選擇。
2际长、對執(zhí)行時間的精確度有一定要求耸采,當(dāng)然異常狀況下,也可以對精確度適當(dāng)放寬松工育。比如場景1的訂單取消虾宇,規(guī)則設(shè)置為30分鐘不支付就取消,但實(shí)際場景中如绸,精確到30分自然是最好結(jié)果嘱朽,但假如出現(xiàn)故障,那么在可允許的范圍內(nèi)將訂單取消也是可以接受的(比如說在放寬到32分鐘內(nèi))怔接。
3搪泳、執(zhí)行是高頻率的。這點(diǎn)需要和第2點(diǎn)結(jié)合起來看扼脐,如果僅僅是為了低頻率的定時執(zhí)行岸军,個人認(rèn)為任務(wù)調(diào)度也是可行的。
綜合來看瓦侮,如果不需要延遲執(zhí)行艰赞,那么推薦用消息隊列;如果對執(zhí)行時間的精確度不那么在意且執(zhí)行頻率不高肚吏,那么推薦使用任務(wù)調(diào)度方妖;如果需要延遲執(zhí)行,且執(zhí)行比較高頻罚攀,對執(zhí)行時間的精確度有一定要求党觅,可以考慮使用延遲隊列。
以上這些是我們?yōu)楹尾捎肈elayQueue來實(shí)現(xiàn)這個業(yè)務(wù)場景的原因斋泄。
為了方便使用DelayQueue杯瞻,我封裝了組件對DelayQueue進(jìn)行了擴(kuò)展。
首先我定義了一個類TaskMessage是己,對Delayed進(jìn)行了擴(kuò)展又兵,實(shí)現(xiàn)了compareTo和getDelay方法。
如下是TaskMessage類的核心代碼卒废。
public class TaskMessage implements Delayed {
private String body; //消息內(nèi)容
private long executeTime;//執(zhí)行時間
private Function function;//執(zhí)行方式
public TaskMessage(Long delayTime,String body, Function function) {
this.body = body;
this.function = function;
this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
}
@Override
public int compareTo(Delayed delayed) {
TaskMessage msg = (TaskMessage) delayed;
return (int) (this.getDelay(TimeUnit.MILLISECONDS) -msg.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
外部調(diào)用只需要TaskMessage m1 = new TaskMessage(delayTime, body, function)就可以生成一個延遲任務(wù)的元素了,內(nèi)部自動就根據(jù)延遲時間計算出這個延遲任務(wù)元素的預(yù)期執(zhí)行時間沛厨。
Function是1.8版引入的函數(shù)式接口,主要方法是R apply(T t)摔认,功能是將Function對象應(yīng)用到輸入的參數(shù)上逆皮,然后返回計算結(jié)果。
那么達(dá)到延遲任務(wù)的預(yù)期執(zhí)行時間時参袱,只需要調(diào)用一下function.apply()方法就可以了电谣,不需要關(guān)心apply的具體實(shí)現(xiàn)。apply的具體實(shí)現(xiàn)方法是在調(diào)用時才明確的抹蚀。
然后定義一個延遲任務(wù)的執(zhí)行線程類TaskConsumer剿牺,實(shí)現(xiàn)了Runnable,重寫了run方法环壤。因為延遲任務(wù)的執(zhí)行晒来,必然是需要重新起線程去執(zhí)行的,不能阻礙主線程的操作郑现。
如下是TaskConsumer類的核心代碼湃崩。
public class TaskConsumer implements Runnable {
@Override
public void run() {
while (signal) {
try {
TaskMessage take = queue.take();
if (logger.isInfoEnabled()) {
logger.info("處理線程的id為" + threadId + ",消費(fèi)消息內(nèi)容為:" + take.getBody() + ",預(yù)計執(zhí)行時間為" +
DateFormatUtils.timeStampToString(take.getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
}
take.getFunction().apply(take.getBody());
} catch (InterruptedException e) {
if (logger.isInfoEnabled()) {
logger.info("id為" + threadId + "的處理線程被強(qiáng)制中斷");
}
} catch (Exception e) {
logger.error("taskConsumer error", e);
}
}
if (logger.isInfoEnabled()) {
logger.info("id為" + threadId + "的處理線程已停止");
}
}
}
這個類核心代碼就只有如下兩行。
TaskMessage take = queue.take(); 獲取延遲隊列的隊首元素接箫。前文已經(jīng)解釋過攒读,Queue的take方法會返回隊列的隊首元素,否則就會掛起線程辛友。所以只要有返回值薄扁,必然就能獲取到當(dāng)前需要執(zhí)行的TaskMessage元素。
take.getFunction().apply(take.getBody()); 執(zhí)行延遲任務(wù)元素的apply方法废累。applay方法是在定義TaskMessage的時候確定的邓梅,表明了到達(dá)預(yù)期執(zhí)行時間所需要進(jìn)行的一系列操作,那么此時只需要執(zhí)行對應(yīng)的apply方法就可以了九默。
最后是加載TaskConsumer的統(tǒng)一管理類TaskManager震放。
如下是TaskManager類的核心代碼。
public class TaskManager implements ApplicationContextAware,
InitializingBean,DisposableBean{
@Override
public void afterPropertiesSet() throws Exception {
for (int i = 0; i < threadCount; i++) {
TaskConsumer taskConsumer = new TaskConsumer(queue, i);
taskConsumerList.add(taskConsumer);
Thread thread = new Thread(taskConsumer);
threadList.add(thread);
thread.start();
}
}
@Override
public void destroy() throws Exception {
for(int i=0;i<threadList.size();i++){
threadList.get(i).interrupt();
taskConsumerList.get(i).setSignal(Boolean.FALSE);
}
}
}
這個類的作用在于初始化類后驼修,就啟動線程不斷的去獲取延遲任務(wù)殿遂。然后在銷毀類后,先中斷消費(fèi)者線程乙各,然后設(shè)置信號量使得消費(fèi)者線程的run方法能跳出死循環(huán)墨礁,從而使得消費(fèi)線程正常結(jié)束。
最后是如何調(diào)用的示例耳峦。很簡單恩静,就只有兩步:
1、生成延遲任務(wù)元素taskMessage
2、將taskMessage添加到延遲隊列中
TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
function -> this.processTask(delayTaskMessage));
DelayQueue<TaskMessage> queue = taskManager.getQueue();
queue.offer(taskMessage);
ok驶乾,以上是如何擴(kuò)展DelayQueue的功能構(gòu)造成高可用的組件的方案邑飒,歡迎大家來一起討論。
下一章我準(zhǔn)備講一下我們項目中運(yùn)用DelayQueue的過程中碰到的問題以及如何持久化的方案级乐。