需求是想用redis做一個延時的隊列桨吊,每次內容必須在一定時間后才能被取出,
比如說:有未支付訂單要在一定時間內關閉认烁,假設為30秒总滩,存入的時候我們使用redis的有序集合進行添加,用當前時間戳加上30秒來排序(zadd)幸逆,然后每次消費者輪詢的時候就只取出開始時間0到當前時間這個時間段(zrangeByScore)
1.生產類 Producer.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Transaction;
public class Producer {
static final String QueueName = "delay-queue";
public static void main(String[] args)throws InterruptedException {
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
Jedis jedis = pool.getResource();
try {
int count = 0;
while (true) {
String message = "Message #" + count;
String key = "foobar:" + count;
System.out.println("Queueing message: " + message);
queueMessage(jedis, QueueName, key, message, 5);
// delete every 5th Action
if (count != 0 && count % 5 == 0) {
System.out.println("Deleting msg with id " + count);
jedis.del(key);
}
count += 1;
Thread.sleep(3000L);
}
} finally {
jedis.close();
pool.destroy();
}
}
private static void queueMessage(Jedis jedis, String queue, String key, String message, Integer delay) {
long time = System.currentTimeMillis() / 1000 + delay;//當前時間的秒數(shù)加上要延時的秒數(shù)
Transaction t = jedis.multi();
t.zadd(queue, time, key);
t.set(key, message);
t.exec();
}
}
2.消費者類 Consumer.java
代碼如下:
public class Consumer {
public static void main(String[] args) throws InterruptedException {
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
Jedis jedis = pool.getResource();
try {
while (true) {
getMessages(jedis, Producer.QueueName);
Thread.sleep(1000L);
}
} finally {
jedis.close();
pool.destroy();
}
}
private static void getMessages(Jedis jedis, String queue) {
int startTime = 0;
long endTime = System.currentTimeMillis() / 1000;
Transaction t = jedis.multi();
Response<Set<String>> setResponse = t.zrangeByScore(queue, startTime, endTime);//在startTime和endTime之間的數(shù)
t.zremrangeByScore(queue, startTime, endTime);//移除所有startTime-endTime中的所有成員
t.exec();
List<String> keys = new ArrayList();
keys.addAll(setResponse.get());//將所有的key添加到list中
String[] keyArray = keys.toArray(new String[keys.size()]);//然后轉換成數(shù)組
if (keyArray.length > 0) {
Transaction tMessage = jedis.multi();
Response<List<String>> messageResponse = tMessage.mget(keyArray);//獲取多個鍵值對
tMessage.del(keyArray);
tMessage.exec();
List<String> messages = messageResponse.get();
for (int i = 0; i < messages.size(); i++) {
String key = keys.get(i);
String message = messages.get(i);
System.out.print("Received key: " + key + ". ");
if (message == null) {
System.out.println("Message for key " + key + " is gone!");
} else {
System.out.println("Message for key " + key + " is " + message);
}
}
}
}
}