需求場(chǎng)景
(Delay)代碼需要延時(shí)執(zhí)行,(Queue)需要不斷失敗重試
實(shí)例
當(dāng)前時(shí)間延時(shí)5s執(zhí)行Task,并在Task中添加一個(gè)指定時(shí)間執(zhí)行的延時(shí)隊(duì)列
try {
//第一個(gè)參數(shù): 延時(shí)任務(wù)中需要使用的參數(shù)對(duì)象,
//第二個(gè)參數(shù): 傳入new Date()作為延時(shí)的基準(zhǔn)時(shí)間
DelayGetPrise delay = new DelayGetPrise(asset, new Date());
GetOpenPriseTask.addtask(delay);
} catch (Exception e) {
e.printStackTrace();
}
/**
* @Auther: Young
* @Date: 2019/4/13 10:28
* @Description:
*/
@Data
public class DelayGetPrise implements Delayed {
public DelayGetPrise(Asset asset, Date endDate)
{
this.asset = asset;
this.endDate = endDate.getTime();
}
private long endDate;
private Asset asset; //獲取的時(shí)間
@Override
//調(diào)用處傳入時(shí)間的基礎(chǔ)上延時(shí) 5s鐘后執(zhí)行task
public long getDelay(TimeUnit unit) {
return (endDate + 5000) - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayGetPrise jia = (DelayGetPrise) o;
if (this.endDate - jia.getEndDate() > 0)
//時(shí)間到達(dá)
return 1;
else
return 0;
}
}
/**
* @Auther: Young
* @Date: 2019/4/13 10:27
* @Description:該方法繼承AfterSpringLoaded,項(xiàng)目啟動(dòng)后就加載Task,初始化隊(duì)列等
*/
@Component
public class GetOpenPriseTask implements AfterSpringLoaded {
private static final Logger logger = LoggerFactory.getLogger(GetOpenPriseTask.class);
@Autowired
private AssetService assetService;
@Autowired
private MarketService marketService;
private static final BlockingQueue<DelayGetPrise> getOpenPriseQueue = new DelayQueue<DelayGetPrise>();
public static void addtask(DelayGetPrise dalay) {
getOpenPriseQueue.add(dalay);
}
@Override
public void load() {
while (true) {
try {
logger.info("task任務(wù) 開(kāi)始");
DelayGetPrise prise = getOpenPriseQueue.take();
try {
Asset asset = prise.getAsset();
...
//失敗重試
if (失敗) {
logger.error("Task任務(wù) 數(shù)據(jù)異常,重試");
//注意此處傳入的時(shí)間
DelayGetPrise newDelay = new DelayGetPrise(asset, new Date());
GetOpenPriseTask.addtask(newDelay);
//此處不能用break,否則會(huì)跳出while(true),關(guān)閉隊(duì)列
continue;
} else {
logger.info("Task任務(wù) 正常邏輯執(zhí)行");
...
logger.info("Task任務(wù)完成");
LFExcuter.excute(() -> {
try {
//指定時(shí)間延時(shí)Task
int delay = asset.getPeriodValue().multiply(new BigDecimal(6)).intValue();
Calendar calUp = Calendar.getInstance();
int delaysecond = RandomUtil.randomInt(delay + 1);
calUp.add(Calendar.SECOND, delaysecond);
logger.info("指定時(shí)間:delay: " + delaysecond + "秒");
//在計(jì)算好的時(shí)間基礎(chǔ)上延時(shí)
DelayAutoBuy delayAutoUp = new DelayAutoBuy(asset, calUp.getTime(), ExchangeConstant.ORDER_TYPE.UP);
AutoBuyTask.addtask(delayAutoUp);
} catch (Exception e) {
logger.error("指定時(shí)間Task任務(wù)", e);
}
});
}
} catch (Exception e) {
logger.error("獲取隊(duì)列錯(cuò)誤,重試Task任務(wù)", e);
GetOpenPriseTask.addtask(prise);
}
} catch (Exception e) {
logger.error("指定時(shí)間Task任務(wù) 異常:", e);
}
logger.info("Task任務(wù) 結(jié)束");
}
}
}