Redisson實(shí)現(xiàn)延遲隊(duì)列

一息堂、添加POM依賴

            <!--redisson-->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-boot-starter</artifactId>
                <version>3.12.5</version>
            </dependency>

二电谣、增加redisson配置

package com.zensun.framework.config;

import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.io.IOException;

/**
 * redis配置
 *
 * @author gmk
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;
    @Value("${spring.redis.password}")
    private String password;
    @Value("${spring.redis.database}")
    private int database;


    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() throws IOException {
        Config config = new Config();
        if (StrUtil.isNotBlank(password)) {
            config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password).setDatabase(database);
        } else {
            config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(database);
        }
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

三尼酿、編寫隊(duì)列服務(wù)類


import java.util.concurrent.TimeUnit;

/**
 * @Description 延遲隊(duì)列服務(wù)接口 TODO
 * @Author yuqianwei
 * @Date 2020 2020/12/10 11:58
 */

public interface DelayQueueService<T> {

    /**
     * 推送數(shù)據(jù)
     *
     * @param data
     * @param queueName
     */
    void pushData(T data, String queueName);

    /**
     * 推送數(shù)據(jù)
     *
     * @param data
     * @param time
     * @param timeUnit
     * @param queueName
     */
    void pushData(T data, long time, TimeUnit timeUnit, String queueName);

    /**
     * 拉取數(shù)據(jù)
     *
     * @param queueName
     * @return
     */
    T pullData(String queueName);
}









import com.zensun.common.utils.spring.SpringUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * 
 * 每一個(gè)RedisDelayQueueServiceImpl對(duì)象 是一個(gè)隊(duì)列通道  多個(gè)隊(duì)列通道就用多個(gè)RedisDelayQueueServiceImpl對(duì)象
 * @Description redis延遲隊(duì)列服務(wù)實(shí)現(xiàn)類 TODO
 * @Author yuqianwei
 * @Date 2020 2020/12/10 12:01
 */

public class RedisDelayQueueServiceImpl<T> implements DelayQueueService<T> {

    private static RedissonClient redissonClient;

    static {
        redissonClient = SpringUtils.getBean(RedissonClient.class);


    }

    private final String queueName;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private RBlockingQueue<T> blockingQueue;
    private RDelayedQueue<T> delayedQueue;

    public RedisDelayQueueServiceImpl(String queueName) {
        this.queueName = queueName;
        this.blockingQueue = getBlockingQueue();
        this.delayedQueue = getDelayQueue(blockingQueue);
    }

    /**
     * 獲取阻塞隊(duì)列
     *
     * @return
     */
    private RBlockingQueue<T> getBlockingQueue() {
        return redissonClient.getBlockingQueue(queueName);
    }

    /**
     * 獲取延遲隊(duì)列
     *
     * @param blockingQueue
     * @return
     */
    private RDelayedQueue<T> getDelayQueue(RBlockingQueue<T> blockingQueue) {
        return redissonClient.getDelayedQueue(blockingQueue);
    }

    /**
     * 推送數(shù)據(jù)
     *
     * @param data
     * @param queueName
     */
    @Override
    public void pushData(T data, String queueName) {
        pushData(data, 0, TimeUnit.MILLISECONDS, queueName);
    }

    /**
     * 推送數(shù)據(jù)
     *
     * @param data
     * @param time
     * @param timeUnit
     * @param queueName
     */
    @Override
    public void pushData(T data, long time, TimeUnit timeUnit, String queueName) {
        delayedQueue.offerAsync(data, time < 0 ? 0 : time, timeUnit);
    }

    /**
     * 拉取數(shù)據(jù)
     *
     * @param queueName
     * @return
     */
    @Override
    public T pullData(String queueName) {

        T currentData = null;
        try {
            currentData = blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException interruptedException) {
                interruptedException.printStackTrace();
            }
        }
        logger.info("獲取到的數(shù)據(jù) : {}", currentData);

        return currentData;
    }

}


四拾积、DEMO

生產(chǎn)者:

        long currentTimeMillis = System.currentTimeMillis();
        long delayStartTime = advertInfo.getStartTime().getTime() - currentTimeMillis;
        //開始時(shí)間在現(xiàn)在之后
            try {
                delayToStartQueueService.pushData(String.valueOf(advertInfo.getAdvertId()), delayStartTime, TimeUnit.MILLISECONDS, Constants.ADVERT_TOSTART_QUEUEKEY);
                log.info("成功推送到隊(duì)列 : {}, 廣告信息id : {}, 延遲時(shí)間 : {} 秒", Constants.ADVERT_TOSTART_QUEUENAME, advertInfo.getAdvertId(), delayStartTime / 1000);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("失敗推送到隊(duì)列 : {}, 廣告信息id : {}", Constants.ADVERT_TOSTART_QUEUENAME, advertInfo.getAdvertId());
            }  

消費(fèi)者:



import cn.hutool.core.util.StrUtil;
import com.zensun.business.domain.AdvertInfo;
import com.zensun.business.domain.dto.PatchAdvertInfo;
import com.zensun.business.queue.DelayQueueService;
import com.zensun.business.queue.RedisDelayQueueServiceImpl;
import com.zensun.business.service.IAdvertInfoService;
import com.zensun.common.constant.Constants;
import com.zensun.common.enums.domain.AdvertInfoEnum;
import com.zensun.common.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
 * @Description 自動(dòng)更改廣告開始狀態(tài)定時(shí)任務(wù) TODO
 * @Author gmk
 * @Date 2020 2021/3/17 16:50
 */

@Slf4j
@Component
public class AutoNotStartToStartAdvertInfoStatusTask implements SmartInitializingSingleton {

    @Autowired
    private IAdvertInfoService advertInfoService;

    /**
     * 在所有單例bean實(shí)例化后 進(jìn)行賦值, 否則有空指針錯(cuò)誤, 后期優(yōu)化
     * 見此方法 afterSingletonsInstantiated
     */
    private DelayQueueService<String> delayQueueService;

   //定時(shí)任務(wù)是用來掃描數(shù)據(jù)內(nèi) 沒有被加入到隊(duì)列的數(shù)據(jù) 用來兜底
    /**
     * 廣告更改狀態(tài)
     //@Scheduled(fixedRate = SCHEDULE_RATE)
     public void autoOffSaleGoodsSource() {
     log.info("啟動(dòng)廣告開始自動(dòng)修改狀態(tài)掃描--------------->");
     try {
     long currentTimeMillis = System.currentTimeMillis();
     Date now = new Date(currentTimeMillis);
     //掃描區(qū)間 掃描按照定時(shí)任務(wù)時(shí)間 只掃描離開始時(shí)間最近的一個(gè)區(qū)間
     // (如 定時(shí)任務(wù)每10分鐘執(zhí)行一次 只掃描現(xiàn)在時(shí)間+10分鐘大于等于開始時(shí)間的數(shù)據(jù) )
     //long nextTime = SCHEDULE_RATE + currentTimeMillis;
     long nextTime = currentTimeMillis;
     Date futureTime = new Date(nextTime);
     LambdaQueryWrapper<AdvertInfo> a = new LambdaQueryWrapper<>();
     a.eq(AdvertInfo::getAdvertStatus, AdvertInfoEnum.Status.NOTSTART.getCode())
     .apply("unix_timestamp(start_time) >= {0}", currentTimeMillis / 1000)
     .apply("unix_timestamp(start_time) <= {0}", nextTime / 1000);

     List<AdvertInfo> advertInfoList = advertInfoService.list(a);

     log.info("掃描到開始時(shí)間在 : {}與 : {}之間 的廣告信息 : {} 條", DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, now), DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, futureTime), advertInfoList.size());

     if (CollectionUtils.isNotEmpty(advertInfoList)) {
     advertInfoList.forEach(advertInfo -> {
     long delayTime = advertInfo.getEndTime().getTime() - currentTimeMillis;
     try {
     // delayQueueService.pushData(String.valueOf(advertInfo.getAdvertId()), delayTime, TimeUnit.MILLISECONDS, queueName);
     //log.info("推送到隊(duì)列 : {}, 開始時(shí)間即將和現(xiàn)在時(shí)間一樣的廣告信息id : {}, 延遲時(shí)間 : {} 秒", queueName, advertInfo.getAdvertId(), delayTime / 1000);
     } catch (Exception e) {
     //log.error("推送到隊(duì)列 : {}, 開始時(shí)間即將和現(xiàn)在時(shí)間一樣的廣告信息id : {}, 延遲時(shí)間 : {} 秒, 發(fā)生異常 : {}",
     //        queueName, advertInfo.getAdvertId(), delayTime / 1000, e.getMessage(), e);
     }
     });
     }
     } catch (Exception e) {
     log.error("自動(dòng)更改廣告開始狀態(tài)定時(shí)任務(wù)執(zhí)行異常 : {}", e.getMessage(), e);
     }

     } */

    /**
     * 啟動(dòng)延遲隊(duì)列監(jiān)聽
     */
    public void listenerDelayQueueStart() {
        Thread thread = new Thread(() -> {
            log.info("啟動(dòng)監(jiān)聽{}:的數(shù)據(jù)...", Constants.ADVERT_TOSTART_QUEUENAME);
            while (true) {
                try {
                    String data = delayQueueService.pullData(Constants.ADVERT_TOSTART_QUEUEKEY);
                    log.info("取到 : {}, 的數(shù)據(jù) : {}", Constants.ADVERT_TOSTART_QUEUENAME, data);
                    if (StrUtil.isNotBlank(data)) {
                        Date nowDate = DateUtils.getNowDate();
                        Long advertId = Long.valueOf(data);
                        AdvertInfo byId = Optional.ofNullable(advertInfoService.getById(advertId)).orElse(new AdvertInfo());
                        //消費(fèi)端做冪等性 避免重復(fù)消費(fèi)
                        if (byId.getAdvertStatus() != null && AdvertInfoEnum.Status.NOTSTART.getCode().equals(byId.getAdvertStatus()) && DateUtils.compareTo(nowDate, byId.getStartTime()) == 0) {
                            //修改廣告的狀態(tài)
                            PatchAdvertInfo patchAdvertInfo = new PatchAdvertInfo();
                            patchAdvertInfo.setAdvertId(advertId);
                            patchAdvertInfo.setAdvertStatus(AdvertInfoEnum.Status.START.getCode());
                            advertInfoService.updateAdvertInfoStatusById(patchAdvertInfo);
                            log.info("隊(duì)列執(zhí)行成功:{}殉挽,時(shí)間為:{}丰涉,廣告id : {}", Constants.ADVERT_TOSTART_QUEUENAME, DateUtils.getTime(), advertId);
                        } else {
                            log.info("隊(duì)列執(zhí)行失敗:{}斯碌,時(shí)間為:{}一死,廣告id : {},未獲取到數(shù)據(jù)或者數(shù)據(jù)不符合消費(fèi)規(guī)則", Constants.ADVERT_TOSTART_QUEUENAME, DateUtils.getTime(), advertId);
                        }

                    }
                } catch (Exception e) {
                    log.info("隊(duì)列執(zhí)行失斏低佟:{}投慈,時(shí)間為:{}", Constants.ADVERT_TOSTART_QUEUENAME, e.getMessage());
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException interruptedException) {
                        interruptedException.printStackTrace();
                    }
                }
            }
        });
        thread.setName(Constants.ADVERT_TOSTART_QUEUENAME + "-線程");
        thread.setDaemon(true);
        thread.start();
    }

    @Override
    public void afterSingletonsInstantiated() {
        delayQueueService = new RedisDelayQueueServiceImpl(Constants.ADVERT_TOSTART_QUEUEKEY);
        listenerDelayQueueStart();
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市冠骄,隨后出現(xiàn)的幾起案子伪煤,更是在濱河造成了極大的恐慌,老刑警劉巖凛辣,帶你破解...
    沈念sama閱讀 211,743評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件抱既,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡扁誓,警方通過查閱死者的電腦和手機(jī)防泵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蝗敢,“玉大人捷泞,你說我怎么就攤上這事∈偾矗” “怎么了锁右?”我有些...
    開封第一講書人閱讀 157,285評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)讶泰。 經(jīng)常有香客問我咏瑟,道長(zhǎng),這世上最難降的妖魔是什么峻厚? 我笑而不...
    開封第一講書人閱讀 56,485評(píng)論 1 283
  • 正文 為了忘掉前任响蕴,我火速辦了婚禮谆焊,結(jié)果婚禮上惠桃,老公的妹妹穿的比我還像新娘。我一直安慰自己辖试,他們只是感情好辜王,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,581評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著罐孝,像睡著了一般呐馆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上莲兢,一...
    開封第一講書人閱讀 49,821評(píng)論 1 290
  • 那天汹来,我揣著相機(jī)與錄音续膳,去河邊找鬼。 笑死收班,一個(gè)胖子當(dāng)著我的面吹牛坟岔,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播摔桦,決...
    沈念sama閱讀 38,960評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼社付,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了邻耕?” 一聲冷哼從身側(cè)響起鸥咖,我...
    開封第一講書人閱讀 37,719評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎兄世,沒想到半個(gè)月后啼辣,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,186評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡御滩,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,516評(píng)論 2 327
  • 正文 我和宋清朗相戀三年熙兔,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片艾恼。...
    茶點(diǎn)故事閱讀 38,650評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡住涉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出钠绍,到底是詐尸還是另有隱情舆声,我是刑警寧澤,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布柳爽,位于F島的核電站媳握,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏磷脯。R本人自食惡果不足惜蛾找,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,936評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望赵誓。 院中可真熱鬧打毛,春花似錦、人聲如沸俩功。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)诡蜓。三九已至熬甫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蔓罚,已是汗流浹背椿肩。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工瞻颂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人郑象。 一個(gè)月前我還...
    沈念sama閱讀 46,370評(píng)論 2 360
  • 正文 我出身青樓蘸朋,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親扣唱。 傳聞我的和親對(duì)象是個(gè)殘疾皇子藕坯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,527評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容