簡介
DelayQueue<E extends Delayed>
Delayed 元素的一個無界阻塞隊列呻惕,只有在延遲期滿時才能從中提取元素。該隊列的頭部 是延遲期滿后保存時間最長的Delayed 元素滥比。如果延遲都還沒有期滿亚脆,則隊列沒有頭部,并且 poll 將返回 null盲泛。當一個元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一個小于等于 0 的值時濒持,將發(fā)生到期。即使無法使用take 或 poll 移除未到期的元素寺滚,也不會將這些元素作為正常元素對待柑营。例如,size方法同時返回到期和未到期元素的計數(shù)村视。此隊列不允許使用 null 元素官套。
DelayQueue類圖結(jié)構(gòu)
在源碼中看到 DelayQueue中內(nèi)部使用的是PriorityQueue存放數(shù)據(jù),使用ReentrantLock實現(xiàn)線程同步蚁孔,可知是阻塞隊列奶赔;
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
元素實例
package com.cr.core.delay.entity;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
*
* 訂單超時實體類
* Created by li on 2018/5/18.
*/
public class Order implements Delayed {
/**
* 訂單號
*/
private long orderId;
/**
* 開始執(zhí)行時間
*/
private long startTime;
public Order(){
}
/**
* orderId:訂單id
* timeout:訂單超時時間,秒
* */
public Order(long orderId, int timeout){
this.orderId = orderId;
this.startTime = System.currentTimeMillis() + timeout*1000L;
}
/**
* 返回當前對象的剩余延遲時間
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 比較當前對象與指定對象的順序(出棧順序)
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
if (o == this){
return 0;
}
if(o instanceof Order){
Order otherRequest = (Order)o;
long otherStartTime = otherRequest.getStartTime();
return (int)(this.startTime - otherStartTime);
}
return 0;
}
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
@Override
public String toString() {
return "DSHOrder{" +
"orderId=" + orderId +
", startTime=" + startTime +
'}';
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
}
應(yīng)用實例
package com.cr.core.delay.service;
import com.cr.core.delay.entity.Order;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import java.util.concurrent.DelayQueue;
/**
* Created by li on 2018/5/18.
*/
@Service
public class DelayService {
private static final Logger log = Logger.getLogger(DelayService.class);
private boolean start ;
private DelayedListener listener;
private DelayQueue<Order> delayQueue = new DelayQueue();
//內(nèi)部接口勒虾,監(jiān)聽器啟動調(diào)本類start接口需要實現(xiàn)
public interface DelayedListener{
void delayedListener(Order order);
}
public void start(DelayedListener listener){
if(start){
return;
}
log.error("DelayService 啟動");
start = true;
this.listener = listener;
new Thread(()->{
try{
while(true){
Order order = delayQueue.take();
if(DelayService.this.listener != null){
DelayService.this.listener.delayedListener(order);
}
}
}catch(Exception e){
log.info(e.getMessage(),e);
}
}).start();
}
public void add(Order order){
delayQueue.put(order);
}
public boolean remove(Order order){
return delayQueue.remove(order);
}
public void add(long orderId){
delayQueue.put(new Order(orderId,24*3600*1000));
}
public void remove(long orderId){
Order[] array = delayQueue.toArray(new Order[]{});
if(array == null || array.length <= 0){
return;
}
Order target = null;
for(Order order : array){
if(order.getOrderId() == orderId){
target = order;
break;
}
}
if(target != null){
delayQueue.remove(target);
}
}
}
監(jiān)聽器實例
package com.cr.core.delay.service;
import com.cr.core.delay.entity.Order;
import com.cr.core.delay.entity.ThreadPoolUtil;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import java.util.List;
/**
* Created by li on 2018/5/18.
*/
public class StartupListener implements ApplicationListener {
private static final Logger log = Logger.getLogger(StartupListener.class);
@Autowired
DelayService delayService;
@Override
public void onApplicationEvent(ApplicationEvent evt) {
if (evt.getSource() == null) {
return;
}
//自動收貨
delayService.start((Order order)->{
//異步來做
ThreadPoolUtil.execute(()->{
long orderId = order.getOrderId();
//查庫判斷是否需要自動收貨
log.error("自動確認收貨纺阔,onDelayedArrived():"+orderId);
//從redis刪除
//redisService.delete(Constants.RedisKey.DSH_PREFIX+orderId, RedisService.DB.DSH);
log.error("自動確認收貨,刪除redis:"+orderId);
});
});
//查找需要入隊的訂單
ThreadPoolUtil.execute(()->{
log.error("查找需要入隊的訂單");
//掃描redis修然,找到所有可能的orderId
List<String> keys = null;//redisService.scan(RedisService.DB.DSH);
if(keys == null || keys.size() <= 0){
return;
}
log.error("需要入隊的訂單keys:"+keys);
//寫到DelayQueue
for(String key : keys){
Order order = null;//redisService.get(key, DSHOrder.class, RedisService.DB.DSH);
log.error("讀redis笛钝,key:"+key);
if(order != null){
//delayService.add(order);
log.error("訂單自動入隊:"+order.getOrderId());
}
}
});
}
}
使用場景
1质况、做訂單超時、支付超時玻靡、和第三方交互時冪等重試等等结榄;