考慮使用哪種方式實(shí)現(xiàn)延時(shí)隊(duì)列晃听,可能需要考慮下面這些問題:
及時(shí)性 消費(fèi)端能按時(shí)收到
同一時(shí)間消息的消費(fèi)權(quán)重
可靠性 消息不能出現(xiàn)沒有被消費(fèi)掉的情況
可恢復(fù) 假如有其他情況 導(dǎo)致消息系統(tǒng)不可用了 至少能保證數(shù)據(jù)可以恢復(fù)
可撤回 因?yàn)槭茄舆t消息 沒有到執(zhí)行時(shí)間的消息支持可以取消消費(fèi)
高可用 多實(shí)例 這里指HA/主備模式并不是多實(shí)例同時(shí)一起工作
消費(fèi)端如何消費(fèi)
任務(wù)丟失的補(bǔ)償
一码党、單機(jī)
1. while+sleep組合
定義一個(gè)線程,然后 while 循環(huán)
public static void main(String[] args) {
final long timeInterval = 5000;
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + "每隔5秒執(zhí)行一次");
try {
Thread.sleep(timeInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
這種實(shí)現(xiàn)方式下多個(gè)定時(shí)任務(wù)需要開啟多個(gè)線程,而且線程在做無意義sleep,消耗資源,性能低下。
2. 最小堆實(shí)現(xiàn)
2.1 Timer
實(shí)現(xiàn)代碼赂韵,調(diào)度兩個(gè)任務(wù)
public static void main(String[] args) {
Timer timer = new Timer();
//每隔1秒調(diào)用一次
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("test1");
}
}, 1000, 1000);
//每隔3秒調(diào)用一次
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("test2");
}
}, 3000, 3000);
}
schedule實(shí)現(xiàn)源碼
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
shed里面將任務(wù)add到最小堆,然后fixUp進(jìn)行調(diào)整
TimerThread其實(shí)就是一個(gè)任務(wù)調(diào)度線程挠蛉,首先從TaskQueue里面獲取排在最前面的任務(wù)祭示,然后判斷它是否到達(dá)任務(wù)執(zhí)行時(shí)間點(diǎn),如果已到達(dá)谴古,就會(huì)立刻執(zhí)行任務(wù)
class TimerThread extends Thread {
boolean newTasksMayBeScheduled = true;
private TaskQueue queue;
TimerThread(TaskQueue queue) {
this.queue = queue;
}
public void run() {
try {
mainLoop();
} finally {
// Someone killed this Thread, behave as if Timer cancelled
synchronized(queue) {
newTasksMayBeScheduled = false;
queue.clear(); // Eliminate obsolete references
}
}
}
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// Wait for queue to become non-empty
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
if (taskFired = (executionTime<=currentTime)) {
if (task.period == 0) { // Non-repeating, remove
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
if (taskFired) // Task fired; run it, holding no locks
task.run();
} catch(InterruptedException e) {
}
}
}
}
總結(jié)這個(gè)利用最小堆實(shí)現(xiàn)的方案质涛,相比 while + sleep 方案,多了一個(gè)線程來管理所有的任務(wù)掰担,優(yōu)點(diǎn)就是減少了線程之間的性能開銷汇陆,提升了執(zhí)行效率;但是同樣也帶來的了一些缺點(diǎn)带饱,整體的新加任務(wù)寫入效率變成了 O(log(n))毡代。
同時(shí),細(xì)心的發(fā)現(xiàn)勺疼,這個(gè)方案還有以下幾個(gè)缺點(diǎn):
串行阻塞:調(diào)度線程只有一個(gè)教寂,長任務(wù)會(huì)阻塞短任務(wù)的執(zhí)行,例如执庐,A任務(wù)跑了一分鐘酪耕,B任務(wù)至少需要等1分鐘才能跑
容錯(cuò)能力差:沒有異常處理能力,一旦一個(gè)任務(wù)執(zhí)行故障轨淌,后續(xù)任務(wù)都無法執(zhí)行
2.2 ScheduledThreadPoolExecutor
鑒于 Timer 的上述缺陷迂烁,從 Java 5 開始,推出了基于線程池設(shè)計(jì)的 ScheduledThreadPoolExecutor 猿诸。
其設(shè)計(jì)思想是婚被,每一個(gè)被調(diào)度的任務(wù)都會(huì)由線程池來管理執(zhí)行,因此任務(wù)是并發(fā)執(zhí)行的梳虽,相互之間不會(huì)受到干擾址芯。需要注意的是,只有當(dāng)任務(wù)的執(zhí)行時(shí)間到來時(shí)窜觉,ScheduledThreadPoolExecutor 才會(huì)真正啟動(dòng)一個(gè)線程谷炸,其余時(shí)間 ScheduledThreadPoolExecutor 都是在輪詢?nèi)蝿?wù)的狀態(tài)。
簡單的使用示例:
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
//啟動(dòng)1秒之后禀挫,每隔1秒執(zhí)行一次
executor.scheduleAtFixedRate(()-> System.out.println("test3"),1,1, TimeUnit.SECONDS);
//啟動(dòng)1秒之后旬陡,每隔3秒執(zhí)行一次
executor.scheduleAtFixedRate((() -> System.out.println("test4")),1,3, TimeUnit.SECONDS);
同樣的,我們首先打開源碼语婴,看看里面到底做了啥
- 進(jìn)入scheduleAtFixedRate()方法
首先是校驗(yàn)基本參數(shù)描孟,然后將任務(wù)作為封裝到ScheduledFutureTask線程中驶睦,ScheduledFutureTask繼承自RunnableScheduledFuture,并作為參數(shù)調(diào)用delayedExecute()方法進(jìn)行預(yù)處理
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
- 繼續(xù)看delayedExecute()方法
可以很清晰的看到匿醒,當(dāng)線程池沒有關(guān)閉的時(shí)候场航,會(huì)通過super.getQueue().add(task)操作,將任務(wù)加入到隊(duì)列廉羔,同時(shí)調(diào)用ensurePrestart()方法做預(yù)處理
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//預(yù)處理
ensurePrestart();
}
}
其中super.getQueue()得到的是一個(gè)自定義的new DelayedWorkQueue()阻塞隊(duì)列溉痢,數(shù)據(jù)存儲(chǔ)方面也是一個(gè)最小堆結(jié)構(gòu)的隊(duì)列,這一點(diǎn)在初始化new ScheduledThreadPoolExecutor()的時(shí)候憋他,可以看出孩饼!
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
打開源碼可以看到,DelayedWorkQueue其實(shí)是ScheduledThreadPoolExecutor中的一個(gè)靜態(tài)內(nèi)部類竹挡,在添加的時(shí)候镀娶,會(huì)將任務(wù)加入到RunnableScheduledFuture數(shù)組中。然后調(diào)用線程池的ensurePrestart方法將任務(wù)添加到線程池此迅。調(diào)用鏈:addWorker->t.run->new Worker.run-> runWorker->Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();->task.run->RunnableScheduledFuture.run
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
//....
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
}
- 回到我們最開始說到的ScheduledFutureTask任務(wù)線程類汽畴,最終執(zhí)行任務(wù)的其實(shí)就是它
ScheduledFutureTask任務(wù)線程,才是真正執(zhí)行任務(wù)的線程類耸序,只是繞了一圈忍些,做了很多包裝,run()方法就是真正執(zhí)行定時(shí)任務(wù)的方法坎怪。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
private long time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)//非周期性定時(shí)任務(wù)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//周期性定時(shí)任務(wù)罢坝,需要重置
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
//...
}
3.3、小結(jié)
ScheduledExecutorService 相比 Timer 定時(shí)器搅窿,完美的解決上面說到的 Timer 存在的兩個(gè)缺點(diǎn)嘁酿!
在單體應(yīng)用里面,使用 ScheduledExecutorService 可以解決大部分需要使用定時(shí)任務(wù)的業(yè)務(wù)需求男应!
但是這是否意味著它是最佳的解決方案呢闹司?
我們發(fā)現(xiàn)線程池中 ScheduledExecutorService 的排序容器跟 Timer 一樣,都是采用最小堆的存儲(chǔ)結(jié)構(gòu)沐飘,新任務(wù)加入排序效率是O(log(n))游桩,執(zhí)行取任務(wù)是O(1)。
這里的寫入排序效率其實(shí)是有空間可提升的耐朴,有可能優(yōu)化到O(1)的時(shí)間復(fù)雜度借卧,也就是我們下面要介紹的時(shí)間輪實(shí)現(xiàn)!
2.3 DelayQueue
DelayQueue是一個(gè)無界延時(shí)隊(duì)列筛峭,內(nèi)部有一個(gè)優(yōu)先隊(duì)列铐刘,可以重寫compare接口,按照我們想要的方式進(jìn)行排序影晓。
實(shí)現(xiàn)Demo
public static void main(String[] args) throws Exception {
DelayQueue<Order> orders = new DelayQueue<>();
Order order1 = new Order(1000, "1x");
Order order2 = new Order(2000, "2x");
Order order3 = new Order(3000, "3x");
Order order4 = new Order(4000, "4x");
orders.add(order1);
orders.add(order2);
orders.add(order3);
orders.add(order4);
for (; ; ) {
//沒有到期會(huì)阻塞
Order take = orders.take();
System.out.println(take);
}
}
}
class Order implements Delayed {
@Override
public String toString() {
return "DelayedElement{" + "delay=" + delayTime +
", expire=" + expire +
", data='" + data + '\'' +
'}';
}
Order(long delay, String data) {
delayTime = delay;
this.data = data;
expire = System.currentTimeMillis() + delay;
}
private final long delayTime; //延遲時(shí)間
private final long expire; //到期時(shí)間
private String data; //數(shù)據(jù)
/**
* 剩余時(shí)間=到期時(shí)間-當(dāng)前時(shí)間
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 優(yōu)先隊(duì)列里面優(yōu)先級(jí)規(guī)則
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
從源碼可以看出镰吵,DelayQueue的offer和take方法調(diào)用的是優(yōu)先隊(duì)列的offer和take檩禾。并且使用了ReetrtantLock保證線程安全
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
https://my.oschina.net/u/2474629/blog/1919127
3. 時(shí)間輪實(shí)現(xiàn)
代碼實(shí)現(xiàn):支持秒級(jí)別的循環(huán)隊(duì)列,從下標(biāo)最小的任務(wù)集合開始捡遍,提交到線程池執(zhí)行锌订。然后休眠1s,指針移動(dòng)到下一個(gè)下標(biāo)處画株。
所謂時(shí)間輪(RingBuffer)實(shí)現(xiàn),從數(shù)據(jù)結(jié)構(gòu)上看啦辐,簡單的說就是循環(huán)隊(duì)列谓传,從名稱上看可能感覺很抽象。
它其實(shí)就是一個(gè)環(huán)形的數(shù)組芹关,如圖所示续挟,假設(shè)我們創(chuàng)建了一個(gè)長度為 8 的時(shí)間輪。
插入侥衬、取值流程:
- 1.當(dāng)我們需要新建一個(gè) 1s 延時(shí)任務(wù)的時(shí)候诗祸,則只需要將它放到下標(biāo)為 1 的那個(gè)槽中,2轴总、3直颅、...、7也同樣如此怀樟。
- 2.而如果是新建一個(gè) 10s 的延時(shí)任務(wù)功偿,則需要將它放到下標(biāo)為 2 的槽中,但同時(shí)需要記錄它所對應(yīng)的圈數(shù)往堡,也就是 1 圈械荷,不然就和 2 秒的延時(shí)消息重復(fù)了
- 3.當(dāng)創(chuàng)建一個(gè) 21s 的延時(shí)任務(wù)時(shí),它所在的位置就在下標(biāo)為 5 的槽中虑灰,同樣的需要為他加上圈數(shù)為 2吨瞎,依次類推...
因此,總結(jié)起來有兩個(gè)核心的變量:
- 數(shù)組下標(biāo):表示某個(gè)任務(wù)延遲時(shí)間穆咐,從數(shù)據(jù)操作上對執(zhí)行時(shí)間點(diǎn)進(jìn)行取余
- 圈數(shù):表示需要循環(huán)圈數(shù)
通過這張圖可以更直觀的理解颤诀!
當(dāng)我們需要取出延時(shí)任務(wù)時(shí),只需要每秒往下移動(dòng)這個(gè)指針庸娱,然后取出該位置的所有任務(wù)即可着绊,取任務(wù)的時(shí)間消耗為O(1)。
當(dāng)我們需要插入任務(wù)熟尉,也只需要計(jì)算出對應(yīng)的下表和圈數(shù)归露,即可將任務(wù)插入到對應(yīng)的數(shù)組位置中,插入任務(wù)的時(shí)間消耗為O(1)斤儿。
如果時(shí)間輪的槽比較少剧包,會(huì)導(dǎo)致某一個(gè)槽上的任務(wù)非常多恐锦,那么效率也比較低,這就和 HashMap 的 hash 沖突是一樣的疆液,因此在設(shè)計(jì)槽的時(shí)候不能太大也不能太小一铅。
package com.hui.hui;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class RingBuffer {
private static final int STATIC_RING_SIZE = 64;
private Object[] ringBuffer;
private int bufferSize;
/**
* business thread pool
*/
private ExecutorService executorService;
private volatile int size = 0;
/***
* task stop sign
*/
private volatile boolean stop = false;
/**
* task start sign
*/
private volatile AtomicBoolean start = new AtomicBoolean(false);
/**
* total tick times
*/
private AtomicInteger tick = new AtomicInteger();
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private AtomicInteger taskId = new AtomicInteger();
private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);
/**
* Create a new delay task ring buffer by default size
*
* @param executorService the business thread pool
*/
public RingBuffer(ExecutorService executorService) {
this.executorService = executorService;
this.bufferSize = STATIC_RING_SIZE;
this.ringBuffer = new Object[bufferSize];
}
/**
* Create a new delay task ring buffer by custom buffer size
*
* @param executorService the business thread pool
* @param bufferSize custom buffer size
*/
public RingBuffer(ExecutorService executorService, int bufferSize) {
this(executorService);
if (!powerOf2(bufferSize)) {
throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
}
this.bufferSize = bufferSize;
this.ringBuffer = new Object[bufferSize];
}
/**
* Add a task into the ring buffer(thread safe)
*
* @param task business task extends {@link Task}
*/
public int addTask(Task task) {
int key = task.getKey();
int id;
try {
lock.lock();
int index = mod(key, bufferSize);
task.setIndex(index);
Set<Task> tasks = get(index);
int cycleNum = cycleNum(key, bufferSize);
if (tasks != null) {
task.setCycleNum(cycleNum);
tasks.add(task);
} else {
task.setIndex(index);
task.setCycleNum(cycleNum);
Set<Task> sets = new HashSet<>();
sets.add(task);
put(key, sets);
}
id = taskId.incrementAndGet();
task.setTaskId(id);
taskMap.put(id, task);
size++;
} finally {
lock.unlock();
}
start();
return id;
}
/**
* Cancel task by taskId
*
* @param id unique id through {@link #addTask(Task)}
* @return
*/
public boolean cancel(int id) {
boolean flag = false;
Set<Task> tempTask = new HashSet<>();
try {
lock.lock();
Task task = taskMap.get(id);
if (task == null) {
return false;
}
Set<Task> tasks = get(task.getIndex());
for (Task tk : tasks) {
if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
size--;
flag = true;
taskMap.remove(id);
} else {
tempTask.add(tk);
}
}
//update origin data
ringBuffer[task.getIndex()] = tempTask;
} finally {
lock.unlock();
}
return flag;
}
/**
* Thread safe
*
* @return the size of ring buffer
*/
public int taskSize() {
return size;
}
/**
* Same with method {@link #taskSize}
*
* @return
*/
public int taskMapSize() {
return taskMap.size();
}
/**
* Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
*/
public void start() {
if (!start.get()) {
System.out.println("Delay task is starting");
if (start.compareAndSet(start.get(), true)) {
Thread job = new Thread(new TriggerJob());
job.setName("consumer RingBuffer thread");
job.start();
start.set(true);
}
}
}
/**
* Stop consumer ring buffer thread
*
* @param force True will force close consumer thread and discard all pending tasks
* otherwise the consumer thread waits for all tasks to completes before closing.
*/
public void stop(boolean force) {
if (force) {
stop = true;
executorService.shutdownNow();
} else {
System.out.println("Delay task is stopping");
if (taskSize() > 0) {
try {
lock.lock();
condition.await();
stop = true;
} catch (InterruptedException e) {
System.out.println("InterruptedException" + e);
} finally {
lock.unlock();
}
}
executorService.shutdown();
}
}
private Set<Task> get(int index) {
return (Set<Task>) ringBuffer[index];
}
private void put(int key, Set<Task> tasks) {
int index = mod(key, bufferSize);
ringBuffer[index] = tasks;
}
/**
* Remove and get task list.
*
* @param key
* @return task list
*/
private Set<Task> remove(int key) {
Set<Task> tempTask = new HashSet<>();
Set<Task> result = new HashSet<>();
Set<Task> tasks = (Set<Task>) ringBuffer[key];
if (tasks == null) {
return result;
}
for (Task task : tasks) {
if (task.getCycleNum() == 0) {
result.add(task);
size2Notify();
} else {
// decrement 1 cycle number and update origin data
task.setCycleNum(task.getCycleNum() - 1);
tempTask.add(task);
}
// remove task, and free the memory.
taskMap.remove(task.getTaskId());
}
//update origin data
ringBuffer[key] = tempTask;
return result;
}
private void size2Notify() {
try {
lock.lock();
size--;
if (size == 0) {
condition.signal();
}
} finally {
lock.unlock();
}
}
private boolean powerOf2(int target) {
if (target < 0) {
return false;
}
int value = target & (target - 1);
if (value != 0) {
return false;
}
return true;
}
private int mod(int target, int mod) {
// equals target % mod
target = target + tick.get();
return target & (mod - 1);
}
private int cycleNum(int target, int mod) {
//equals target/mod
return target >> Integer.bitCount(mod - 1);
}
/**
* An abstract class used to implement business.
*/
public abstract static class Task extends Thread {
private int index;
private int cycleNum;
private int key;
/**
* The unique ID of the task
*/
private int taskId;
@Override
public void run() {
}
public int getKey() {
return key;
}
/**
* @param key Delay time(seconds)
*/
public void setKey(int key) {
this.key = key;
}
public int getCycleNum() {
return cycleNum;
}
private void setCycleNum(int cycleNum) {
this.cycleNum = cycleNum;
}
public int getIndex() {
return index;
}
private void setIndex(int index) {
this.index = index;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
}
private class TriggerJob implements Runnable {
@Override
public void run() {
int index = 0;
while (!stop) {
try {
Set<Task> tasks = remove(index);
for (Task task : tasks) {
executorService.submit(task);
}
if (++index > bufferSize - 1) {
index = 0;
}
//Total tick number of records
tick.incrementAndGet();
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
System.out.println("Exception" + e);
}
}
System.out.println("Delay task has stopped");
}
}
public static void main(String[] args) {
RingBuffer ringBufferWheel = new RingBuffer(Executors.newFixedThreadPool(2));
for (int i = 0; i < 3; i++) {
RingBuffer.Task job = new Job();
job.setKey(i);
ringBufferWheel.addTask(job);
}
}
public static class Job extends RingBuffer.Task {
@Override
public void run() {
System.out.println("test5"+getIndex());
}
}
}
二色乾、分布式
之前說的單機(jī)實(shí)現(xiàn)颖杏,一旦服務(wù)器重啟,那么延時(shí)任務(wù)會(huì)丟失瘤袖,而分布式的方案則不會(huì)丟失任務(wù)掉缺。
Redis ZSet實(shí)現(xiàn)
- 底層實(shí)現(xiàn):Redis的底層實(shí)現(xiàn)是當(dāng)key大小小于某個(gè)閾值卜录,并且鍵值對個(gè)數(shù)小于某個(gè)閾值(都可配置),使用ZipList實(shí)現(xiàn)眶明,否則使用SkipList和Hash實(shí)現(xiàn)艰毒,SkipList中按照score排序,hash存儲(chǔ)成員到分?jǐn)?shù)的映射搜囱。
- ZSet API
- 添加丑瞧,如果值存在添加,將會(huì)重新排序蜀肘。zadd
127.0.0.1:6379>zadd myZSet 1 zlh ---添加分?jǐn)?shù)為1绊汹,值為zlh的zset集合 - 查看zset集合的成員個(gè)數(shù)。zcard
127.0.0.1:6379>zcard myZSet - 查看Zset指定范圍的成員幌缝,withscores為輸出結(jié)果帶分?jǐn)?shù)灸促。zrange
127.0.0.1:6379>zrange mZySet 0 -1 ----0為開始,-1為結(jié)束涵卵,輸出順序結(jié)果為: zlh tom jim - 獲取zset成員的下標(biāo)位置浴栽,如果值不存在返回null。zrank
127.0.0.1:6379>zrank mZySet Jim ---Jim的在zset集合中的下標(biāo)為2 - 獲取zset集合指定分?jǐn)?shù)之間存在的成員個(gè)數(shù)轿偎。zcount
127.0.0.1:6379>zcount mySet 1 3 ---輸出分?jǐn)?shù)>=1 and 分?jǐn)?shù) <=3的成員個(gè)數(shù)為3
- 實(shí)現(xiàn)思路:
- 添加任務(wù)時(shí)典鸡,將當(dāng)前時(shí)間+延時(shí)時(shí)間作為SkipList的分詞,job的key作為成員標(biāo)識(shí)加入ZSet
- 搬運(yùn)線程開啟定時(shí)任務(wù)坏晦,將在當(dāng)前時(shí)間戳之前的任務(wù)添加到隊(duì)列中
- 開啟消費(fèi)線程萝玷,無限循環(huán),超時(shí)從隊(duì)列獲取Job昆婿,將任務(wù)放到線程池中消費(fèi)
- 添加任務(wù)球碉,消費(fèi)線程,搬運(yùn)線程仓蛆,都需要獲取Redis分布式鎖