jdk實現(xiàn)延遲隊列

1丧慈、隊列初始化

@Component
 public class DelayQueueManager implements CommandLineRunner {

@Resource
private AltcPsControlService altcPsControlService;

private final Logger logger = LoggerFactory.getLogger(DelayQueueManager.class);
private DelayQueue<DelayTask> delayQueue = new DelayQueue<>();


/**
 * 加入到延時隊列中
 * @param task
 */
public void put(DelayTask task) {
    logger.info("加入延時任務:{}", task);
    delayQueue.put(task);
}

/**
 * 取消延時任務
 * @param task
 * @return
 */
public boolean remove(DelayTask task) {
    logger.info("取消延時任務:{}", task);
    return delayQueue.remove(task);
}


@Override
public void run(String... args) throws Exception {
    logger.info("初始化延時隊列");
    Executors.newSingleThreadExecutor().execute(new Thread(this::excuteThread));
    this.initAltcPsControl();
}
/**
 * 延時任務執(zhí)行線程
 */
private void excuteThread() {
    while (true) {
        try {
            DelayTask task = delayQueue.take();
            processTask(task);
        } catch (InterruptedException e) {
            break;
        }
    }
}

/**
 * 初始化掃描布控表加入延遲隊列
 * @Author caody
 * @Date 2020/11/4 15:39
 * @Param
 * @return {@link }
 **/
private void initAltcPsControl(){
    //延遲1分鐘后執(zhí)行
    try {
        Thread.sleep(1000*60);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    logger.info("初始化掃描布控表加入延遲隊列");
    List<AltcPsControlEntity> controlEntityList = altcPsControlService.selectInitDelayQueue();
    if (controlEntityList.size()>0){
        for (AltcPsControlEntity psControlEntity : controlEntityList){
            String identifier = psControlEntity.getId().toString();
            long expire = psControlEntity.getEndTime().getTime() - psControlEntity.getBeginTime().getTime();
            TaskBase taskBase = new TaskBase(identifier);
            DelayTask delayTask = new DelayTask(taskBase,expire);
            this.put(delayTask);
        }
    }
}

/**
 * 內部執(zhí)行延時任務
 * @param task
 */
private void processTask(DelayTask task) {
    logger.info("執(zhí)行延時任務:{}", task);
   //根據(jù)task中的data自定義數(shù)據(jù)來處理相關邏輯灭翔,例 if (task.getData() instanceof XXX) {}
    TaskBase taskBase = task.getData();
    Long id = Long.valueOf(taskBase.getIdentifier());
    AltcPsControlEntity altcPsControlEntity = new AltcPsControlEntity();
    altcPsControlEntity.setId(id);
    altcPsControlEntity.setStatus((byte) 3);
    boolean psControl = altcPsControlService.updateAltcPsControl(altcPsControlEntity);
    if (!psControl){
        logger.info("任務執(zhí)行失敗,重新放入隊列中,一小時后重新執(zhí)行:{}", task);
        Long expire = 1000*60*60L;
        DelayTask delayTask = new DelayTask(taskBase,expire);
        this.put(delayTask);
    }
}
}

2威恼、消息體結構

  public class DelayTask implements Delayed {

final private TaskBase data;
final private long expire;

public DelayTask(TaskBase data, long expire) {
    super();
    this.data = data;
    this.expire = expire + System.currentTimeMillis();
}

public TaskBase getData() {
    return data;
}

public long getExpire() {
    return expire;
}

@Override
public boolean equals(Object obj) {
    if (obj instanceof DelayTask) {
        return this.data.getIdentifier().equals(((DelayTask) obj).getData().getIdentifier());
    }
    return false;
}

@Override
public String toString() {
    return "{" + "data:" + data.toString() + "," + "expire:" + new Date(expire) + "}";
}

@Override
public long getDelay(TimeUnit unit) {
    return unit.convert(this.expire - System.currentTimeMillis(), unit);
}

@Override
public int compareTo(Delayed o) {
    long delta = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
    return (int) delta;
}
}

3积仗、消息內容

public class TaskBase {
private String identifier;

public TaskBase(String identifier) {
    this.identifier = identifier;
}


public String getIdentifier() {
    return identifier;
}

public void setIdentifier(String identifier) {
    this.identifier = identifier;
}

@Override
public String toString() {
    return JSON.toJSONString(this);
}
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末疆拘,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子寂曹,更是在濱河造成了極大的恐慌哎迄,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件隆圆,死亡現(xiàn)場離奇詭異漱挚,居然都是意外死亡,警方通過查閱死者的電腦和手機渺氧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門旨涝,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人侣背,你說我怎么就攤上這事白华。” “怎么了贩耐?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵弧腥,是天一觀的道長。 經(jīng)常有香客問我潮太,道長管搪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任铡买,我火速辦了婚禮更鲁,結果婚禮上,老公的妹妹穿的比我還像新娘寻狂。我一直安慰自己岁经,他們只是感情好,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布蛇券。 她就那樣靜靜地躺著缀壤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪纠亚。 梳的紋絲不亂的頭發(fā)上塘慕,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機與錄音蒂胞,去河邊找鬼图呢。 笑死,一個胖子當著我的面吹牛,可吹牛的內容都是我干的蛤织。 我是一名探鬼主播赴叹,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼指蚜!你這毒婦竟也來了乞巧?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤摊鸡,失蹤者是張志新(化名)和其女友劉穎绽媒,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體免猾,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡是辕,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了猎提。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片获三。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖锨苏,靈堂內的尸體忽然破棺而出石窑,到底是詐尸還是另有隱情,我是刑警寧澤蚓炬,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站躺屁,受9級特大地震影響肯夏,放射性物質發(fā)生泄漏。R本人自食惡果不足惜犀暑,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一驯击、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧耐亏,春花似錦徊都、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至择吊,卻和暖如春李根,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背几睛。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工房轿, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓囱持,卻偏偏與公主長得像夯接,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子纷妆,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內容