一息堂、添加POM依賴
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.12.5</version>
</dependency>
二电谣、增加redisson配置
package com.zensun.framework.config;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.codec.JsonJacksonCodec;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.io.IOException;
/**
* redis配置
*
* @author gmk
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.database}")
private int database;
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson() throws IOException {
Config config = new Config();
if (StrUtil.isNotBlank(password)) {
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password).setDatabase(database);
} else {
config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(database);
}
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
三尼酿、編寫隊(duì)列服務(wù)類
import java.util.concurrent.TimeUnit;
/**
* @Description 延遲隊(duì)列服務(wù)接口 TODO
* @Author yuqianwei
* @Date 2020 2020/12/10 11:58
*/
public interface DelayQueueService<T> {
/**
* 推送數(shù)據(jù)
*
* @param data
* @param queueName
*/
void pushData(T data, String queueName);
/**
* 推送數(shù)據(jù)
*
* @param data
* @param time
* @param timeUnit
* @param queueName
*/
void pushData(T data, long time, TimeUnit timeUnit, String queueName);
/**
* 拉取數(shù)據(jù)
*
* @param queueName
* @return
*/
T pullData(String queueName);
}
import com.zensun.common.utils.spring.SpringUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
*
* 每一個(gè)RedisDelayQueueServiceImpl對(duì)象 是一個(gè)隊(duì)列通道 多個(gè)隊(duì)列通道就用多個(gè)RedisDelayQueueServiceImpl對(duì)象
* @Description redis延遲隊(duì)列服務(wù)實(shí)現(xiàn)類 TODO
* @Author yuqianwei
* @Date 2020 2020/12/10 12:01
*/
public class RedisDelayQueueServiceImpl<T> implements DelayQueueService<T> {
private static RedissonClient redissonClient;
static {
redissonClient = SpringUtils.getBean(RedissonClient.class);
}
private final String queueName;
private Logger logger = LoggerFactory.getLogger(getClass());
private RBlockingQueue<T> blockingQueue;
private RDelayedQueue<T> delayedQueue;
public RedisDelayQueueServiceImpl(String queueName) {
this.queueName = queueName;
this.blockingQueue = getBlockingQueue();
this.delayedQueue = getDelayQueue(blockingQueue);
}
/**
* 獲取阻塞隊(duì)列
*
* @return
*/
private RBlockingQueue<T> getBlockingQueue() {
return redissonClient.getBlockingQueue(queueName);
}
/**
* 獲取延遲隊(duì)列
*
* @param blockingQueue
* @return
*/
private RDelayedQueue<T> getDelayQueue(RBlockingQueue<T> blockingQueue) {
return redissonClient.getDelayedQueue(blockingQueue);
}
/**
* 推送數(shù)據(jù)
*
* @param data
* @param queueName
*/
@Override
public void pushData(T data, String queueName) {
pushData(data, 0, TimeUnit.MILLISECONDS, queueName);
}
/**
* 推送數(shù)據(jù)
*
* @param data
* @param time
* @param timeUnit
* @param queueName
*/
@Override
public void pushData(T data, long time, TimeUnit timeUnit, String queueName) {
delayedQueue.offerAsync(data, time < 0 ? 0 : time, timeUnit);
}
/**
* 拉取數(shù)據(jù)
*
* @param queueName
* @return
*/
@Override
public T pullData(String queueName) {
T currentData = null;
try {
currentData = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
logger.info("獲取到的數(shù)據(jù) : {}", currentData);
return currentData;
}
}
四拾积、DEMO
生產(chǎn)者:
long currentTimeMillis = System.currentTimeMillis();
long delayStartTime = advertInfo.getStartTime().getTime() - currentTimeMillis;
//開始時(shí)間在現(xiàn)在之后
try {
delayToStartQueueService.pushData(String.valueOf(advertInfo.getAdvertId()), delayStartTime, TimeUnit.MILLISECONDS, Constants.ADVERT_TOSTART_QUEUEKEY);
log.info("成功推送到隊(duì)列 : {}, 廣告信息id : {}, 延遲時(shí)間 : {} 秒", Constants.ADVERT_TOSTART_QUEUENAME, advertInfo.getAdvertId(), delayStartTime / 1000);
} catch (Exception e) {
e.printStackTrace();
log.error("失敗推送到隊(duì)列 : {}, 廣告信息id : {}", Constants.ADVERT_TOSTART_QUEUENAME, advertInfo.getAdvertId());
}
消費(fèi)者:
import cn.hutool.core.util.StrUtil;
import com.zensun.business.domain.AdvertInfo;
import com.zensun.business.domain.dto.PatchAdvertInfo;
import com.zensun.business.queue.DelayQueueService;
import com.zensun.business.queue.RedisDelayQueueServiceImpl;
import com.zensun.business.service.IAdvertInfoService;
import com.zensun.common.constant.Constants;
import com.zensun.common.enums.domain.AdvertInfoEnum;
import com.zensun.common.utils.DateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* @Description 自動(dòng)更改廣告開始狀態(tài)定時(shí)任務(wù) TODO
* @Author gmk
* @Date 2020 2021/3/17 16:50
*/
@Slf4j
@Component
public class AutoNotStartToStartAdvertInfoStatusTask implements SmartInitializingSingleton {
@Autowired
private IAdvertInfoService advertInfoService;
/**
* 在所有單例bean實(shí)例化后 進(jìn)行賦值, 否則有空指針錯(cuò)誤, 后期優(yōu)化
* 見此方法 afterSingletonsInstantiated
*/
private DelayQueueService<String> delayQueueService;
//定時(shí)任務(wù)是用來掃描數(shù)據(jù)內(nèi) 沒有被加入到隊(duì)列的數(shù)據(jù) 用來兜底
/**
* 廣告更改狀態(tài)
//@Scheduled(fixedRate = SCHEDULE_RATE)
public void autoOffSaleGoodsSource() {
log.info("啟動(dòng)廣告開始自動(dòng)修改狀態(tài)掃描--------------->");
try {
long currentTimeMillis = System.currentTimeMillis();
Date now = new Date(currentTimeMillis);
//掃描區(qū)間 掃描按照定時(shí)任務(wù)時(shí)間 只掃描離開始時(shí)間最近的一個(gè)區(qū)間
// (如 定時(shí)任務(wù)每10分鐘執(zhí)行一次 只掃描現(xiàn)在時(shí)間+10分鐘大于等于開始時(shí)間的數(shù)據(jù) )
//long nextTime = SCHEDULE_RATE + currentTimeMillis;
long nextTime = currentTimeMillis;
Date futureTime = new Date(nextTime);
LambdaQueryWrapper<AdvertInfo> a = new LambdaQueryWrapper<>();
a.eq(AdvertInfo::getAdvertStatus, AdvertInfoEnum.Status.NOTSTART.getCode())
.apply("unix_timestamp(start_time) >= {0}", currentTimeMillis / 1000)
.apply("unix_timestamp(start_time) <= {0}", nextTime / 1000);
List<AdvertInfo> advertInfoList = advertInfoService.list(a);
log.info("掃描到開始時(shí)間在 : {}與 : {}之間 的廣告信息 : {} 條", DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, now), DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, futureTime), advertInfoList.size());
if (CollectionUtils.isNotEmpty(advertInfoList)) {
advertInfoList.forEach(advertInfo -> {
long delayTime = advertInfo.getEndTime().getTime() - currentTimeMillis;
try {
// delayQueueService.pushData(String.valueOf(advertInfo.getAdvertId()), delayTime, TimeUnit.MILLISECONDS, queueName);
//log.info("推送到隊(duì)列 : {}, 開始時(shí)間即將和現(xiàn)在時(shí)間一樣的廣告信息id : {}, 延遲時(shí)間 : {} 秒", queueName, advertInfo.getAdvertId(), delayTime / 1000);
} catch (Exception e) {
//log.error("推送到隊(duì)列 : {}, 開始時(shí)間即將和現(xiàn)在時(shí)間一樣的廣告信息id : {}, 延遲時(shí)間 : {} 秒, 發(fā)生異常 : {}",
// queueName, advertInfo.getAdvertId(), delayTime / 1000, e.getMessage(), e);
}
});
}
} catch (Exception e) {
log.error("自動(dòng)更改廣告開始狀態(tài)定時(shí)任務(wù)執(zhí)行異常 : {}", e.getMessage(), e);
}
} */
/**
* 啟動(dòng)延遲隊(duì)列監(jiān)聽
*/
public void listenerDelayQueueStart() {
Thread thread = new Thread(() -> {
log.info("啟動(dòng)監(jiān)聽{}:的數(shù)據(jù)...", Constants.ADVERT_TOSTART_QUEUENAME);
while (true) {
try {
String data = delayQueueService.pullData(Constants.ADVERT_TOSTART_QUEUEKEY);
log.info("取到 : {}, 的數(shù)據(jù) : {}", Constants.ADVERT_TOSTART_QUEUENAME, data);
if (StrUtil.isNotBlank(data)) {
Date nowDate = DateUtils.getNowDate();
Long advertId = Long.valueOf(data);
AdvertInfo byId = Optional.ofNullable(advertInfoService.getById(advertId)).orElse(new AdvertInfo());
//消費(fèi)端做冪等性 避免重復(fù)消費(fèi)
if (byId.getAdvertStatus() != null && AdvertInfoEnum.Status.NOTSTART.getCode().equals(byId.getAdvertStatus()) && DateUtils.compareTo(nowDate, byId.getStartTime()) == 0) {
//修改廣告的狀態(tài)
PatchAdvertInfo patchAdvertInfo = new PatchAdvertInfo();
patchAdvertInfo.setAdvertId(advertId);
patchAdvertInfo.setAdvertStatus(AdvertInfoEnum.Status.START.getCode());
advertInfoService.updateAdvertInfoStatusById(patchAdvertInfo);
log.info("隊(duì)列執(zhí)行成功:{}殉挽,時(shí)間為:{}丰涉,廣告id : {}", Constants.ADVERT_TOSTART_QUEUENAME, DateUtils.getTime(), advertId);
} else {
log.info("隊(duì)列執(zhí)行失敗:{}斯碌,時(shí)間為:{}一死,廣告id : {},未獲取到數(shù)據(jù)或者數(shù)據(jù)不符合消費(fèi)規(guī)則", Constants.ADVERT_TOSTART_QUEUENAME, DateUtils.getTime(), advertId);
}
}
} catch (Exception e) {
log.info("隊(duì)列執(zhí)行失斏低佟:{}投慈,時(shí)間為:{}", Constants.ADVERT_TOSTART_QUEUENAME, e.getMessage());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
});
thread.setName(Constants.ADVERT_TOSTART_QUEUENAME + "-線程");
thread.setDaemon(true);
thread.start();
}
@Override
public void afterSingletonsInstantiated() {
delayQueueService = new RedisDelayQueueServiceImpl(Constants.ADVERT_TOSTART_QUEUEKEY);
listenerDelayQueueStart();
}
}