閑來無事是己,用redis做了一個(gè)延時(shí)隊(duì)列献联,僅供學(xué)習(xí)與參考劝篷。歡迎拍磚巴粪!
@Slf4j
@Service
public class RedisDelayQueue {
@Resource
private StringRedisTemplate stringRedisTemplate;
private ScheduledExecutorService timer = Executors.newScheduledThreadPool(16);
private List<String> consumeTopics = new ArrayList<>(64);
@PostConstruct
public void init() {
Set<String> topicList = stringRedisTemplate.opsForSet().members("topicList");
if (topicList != null) {
topicList.forEach(this::registTopic);
}
}
private void registTopic(String topic) {
log.info("注冊監(jiān)聽topic消息:{}", topic);
timer.scheduleAtFixedRate(() -> {
Set<String> msgs = stringRedisTemplate.opsForZSet().rangeByScore(topic, 0, System.currentTimeMillis(), 0, 1000);
if (msgs != null && msgs.size() > 0) {
Long remove = stringRedisTemplate.opsForZSet().remove(topic, msgs.toArray());
//刪除結(jié)果大于0代表 搶到了
if( remove != null && remove> 0 ){
stringRedisTemplate.opsForList().leftPushAll(topic + "queue", msgs);
}
}
}, 1, 1, TimeUnit.SECONDS);
}
public void produce(String topic, String msg, Date date) {
log.info("topic:{} 生產(chǎn)消息:{},于{}消費(fèi)", topic, msg, date);
Long addSuccess = stringRedisTemplate.opsForSet().add("topicList", topic);
if (addSuccess != null && addSuccess > 0) {
registTopic(topic);
}
stringRedisTemplate.opsForZSet().add(topic, msg, date.getTime());
}
public synchronized void consumer(String topic, Function<String, Boolean> consumer) {
if (consumeTopics.contains(topic)) {
throw new RuntimeException("請勿重復(fù)監(jiān)聽消費(fèi)" + topic);
}
consumeTopics.add(topic);
int consumerPoolSize = 10;
ExecutorService consumerPool = Executors.newFixedThreadPool(consumerPoolSize);
for (int i = 0; i < consumerPoolSize; i++) {
consumerPool.submit(() -> {
do {
log.info("循環(huán)取消息:{}", topic);
String msg;
try {
msg = stringRedisTemplate.opsForList().rightPop(topic + "queue", 1000, TimeUnit.MINUTES);
} catch (QueryTimeoutException e) {
log.debug("監(jiān)聽超時(shí)通今,重試中!");
continue;
}
log.info("{}監(jiān)聽到消息:{}", topic, msg);
if (msg != null) {
Boolean consumerSuccess;
try {
consumerSuccess = consumer.apply(msg);
} catch (Exception e) {
log.warn("消費(fèi)失敻馗辫塌!", e);
consumerSuccess = false;
}
//消費(fèi)失敗,1分鐘后再重試
if (consumerSuccess == null || !consumerSuccess) {
log.info("消費(fèi)失敗派哲,重新放回隊(duì)列臼氨。msg:{},topic:{}", msg, topic);
produce(topic, msg, new Date(System.currentTimeMillis() + 60000));
}
}
} while (true);
});
}
}
}
單元測試
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisDelayQueueTest {
@Resource
private RedisDelayQueue redisDelayQueue;
@Test
public void produce() {
for (int i = 0; i < 30; i++) {
redisDelayQueue.produce("topic"+i%3 , "hello message"+i , new Date(System.currentTimeMillis()+i*1000));
}
}
@Test
public void consumer() throws InterruptedException {
redisDelayQueue.consumer("topic0", (msg)->{
log.info("topic【{}】收到消息:{}","topic0",msg);
return true;
}); redisDelayQueue.consumer("topic1", (msg)->{
log.info("topic【{}】收到消息:{}","topic1",msg);
return true;
});
redisDelayQueue.consumer("topic2", (msg) -> {
log.info("topic【{}】收到消息:{}", "topic2", msg);
return true;
});
TimeUnit.MINUTES.sleep(10);
}
}