延時(shí)隊(duì)列實(shí)現(xiàn)

考慮使用哪種方式實(shí)現(xiàn)延時(shí)隊(duì)列晃听,可能需要考慮下面這些問題:
及時(shí)性 消費(fèi)端能按時(shí)收到
同一時(shí)間消息的消費(fèi)權(quán)重
可靠性 消息不能出現(xiàn)沒有被消費(fèi)掉的情況
可恢復(fù) 假如有其他情況 導(dǎo)致消息系統(tǒng)不可用了 至少能保證數(shù)據(jù)可以恢復(fù)
可撤回 因?yàn)槭茄舆t消息 沒有到執(zhí)行時(shí)間的消息支持可以取消消費(fèi)
高可用 多實(shí)例 這里指HA/主備模式并不是多實(shí)例同時(shí)一起工作
消費(fèi)端如何消費(fèi)
任務(wù)丟失的補(bǔ)償

一码党、單機(jī)

1. while+sleep組合

定義一個(gè)線程,然后 while 循環(huán)

public static void main(String[] args) {
    final long timeInterval = 5000;
    new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                System.out.println(Thread.currentThread().getName() + "每隔5秒執(zhí)行一次");
                try {
                    Thread.sleep(timeInterval);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }).start();
}

這種實(shí)現(xiàn)方式下多個(gè)定時(shí)任務(wù)需要開啟多個(gè)線程,而且線程在做無意義sleep,消耗資源,性能低下。

2. 最小堆實(shí)現(xiàn)

2.1 Timer

實(shí)現(xiàn)代碼赂韵,調(diào)度兩個(gè)任務(wù)

public static void main(String[] args) {
    Timer timer = new Timer();
    //每隔1秒調(diào)用一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("test1");
        }
    }, 1000, 1000);
    //每隔3秒調(diào)用一次
    timer.schedule(new TimerTask() {
        @Override
        public void run() {
            System.out.println("test2");
        }
    }, 3000, 3000);

}

schedule實(shí)現(xiàn)源碼

    public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        sched(task, System.currentTimeMillis()+delay, -period);
    }

shed里面將任務(wù)add到最小堆,然后fixUp進(jìn)行調(diào)整
TimerThread其實(shí)就是一個(gè)任務(wù)調(diào)度線程挠蛉,首先從TaskQueue里面獲取排在最前面的任務(wù)祭示,然后判斷它是否到達(dá)任務(wù)執(zhí)行時(shí)間點(diǎn),如果已到達(dá)谴古,就會(huì)立刻執(zhí)行任務(wù)

class TimerThread extends Thread {

    boolean newTasksMayBeScheduled = true;

    private TaskQueue queue;

    TimerThread(TaskQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }

    /**
     * The main timer loop.  (See class comment.)
     */
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

總結(jié)這個(gè)利用最小堆實(shí)現(xiàn)的方案质涛,相比 while + sleep 方案,多了一個(gè)線程來管理所有的任務(wù)掰担,優(yōu)點(diǎn)就是減少了線程之間的性能開銷汇陆,提升了執(zhí)行效率;但是同樣也帶來的了一些缺點(diǎn)带饱,整體的新加任務(wù)寫入效率變成了 O(log(n))毡代。

同時(shí),細(xì)心的發(fā)現(xiàn)勺疼,這個(gè)方案還有以下幾個(gè)缺點(diǎn):

串行阻塞:調(diào)度線程只有一個(gè)教寂,長任務(wù)會(huì)阻塞短任務(wù)的執(zhí)行,例如执庐,A任務(wù)跑了一分鐘酪耕,B任務(wù)至少需要等1分鐘才能跑
容錯(cuò)能力差:沒有異常處理能力,一旦一個(gè)任務(wù)執(zhí)行故障轨淌,后續(xù)任務(wù)都無法執(zhí)行

2.2 ScheduledThreadPoolExecutor

鑒于 Timer 的上述缺陷迂烁,從 Java 5 開始,推出了基于線程池設(shè)計(jì)的 ScheduledThreadPoolExecutor 猿诸。

image

其設(shè)計(jì)思想是婚被,每一個(gè)被調(diào)度的任務(wù)都會(huì)由線程池來管理執(zhí)行,因此任務(wù)是并發(fā)執(zhí)行的梳虽,相互之間不會(huì)受到干擾址芯。需要注意的是,只有當(dāng)任務(wù)的執(zhí)行時(shí)間到來時(shí)窜觉,ScheduledThreadPoolExecutor 才會(huì)真正啟動(dòng)一個(gè)線程谷炸,其余時(shí)間 ScheduledThreadPoolExecutor 都是在輪詢?nèi)蝿?wù)的狀態(tài)。

簡單的使用示例:

        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
        //啟動(dòng)1秒之后禀挫,每隔1秒執(zhí)行一次
        executor.scheduleAtFixedRate(()-> System.out.println("test3"),1,1, TimeUnit.SECONDS);
        //啟動(dòng)1秒之后旬陡,每隔3秒執(zhí)行一次
        executor.scheduleAtFixedRate((() -> System.out.println("test4")),1,3, TimeUnit.SECONDS);

同樣的,我們首先打開源碼语婴,看看里面到底做了啥

  • 進(jìn)入scheduleAtFixedRate()方法

首先是校驗(yàn)基本參數(shù)描孟,然后將任務(wù)作為封裝到ScheduledFutureTask線程中驶睦,ScheduledFutureTask繼承自RunnableScheduledFuture,并作為參數(shù)調(diào)用delayedExecute()方法進(jìn)行預(yù)處理

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

  • 繼續(xù)看delayedExecute()方法

可以很清晰的看到匿醒,當(dāng)線程池沒有關(guān)閉的時(shí)候场航,會(huì)通過super.getQueue().add(task)操作,將任務(wù)加入到隊(duì)列廉羔,同時(shí)調(diào)用ensurePrestart()方法做預(yù)處理

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
   //預(yù)處理
            ensurePrestart();
    }
}

其中super.getQueue()得到的是一個(gè)自定義的new DelayedWorkQueue()阻塞隊(duì)列溉痢,數(shù)據(jù)存儲(chǔ)方面也是一個(gè)最小堆結(jié)構(gòu)的隊(duì)列,這一點(diǎn)在初始化new ScheduledThreadPoolExecutor()的時(shí)候憋他,可以看出孩饼!

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

打開源碼可以看到,DelayedWorkQueue其實(shí)是ScheduledThreadPoolExecutor中的一個(gè)靜態(tài)內(nèi)部類竹挡,在添加的時(shí)候镀娶,會(huì)將任務(wù)加入到RunnableScheduledFuture數(shù)組中。然后調(diào)用線程池的ensurePrestart方法將任務(wù)添加到線程池此迅。調(diào)用鏈:addWorker->t.run->new Worker.run-> runWorker->Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();->task.run->RunnableScheduledFuture.run

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

    private static final int INITIAL_CAPACITY = 16;
    private RunnableScheduledFuture<?>[] queue =
        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
    private final ReentrantLock lock = new ReentrantLock();
    private int size = 0;   

    //....

    public boolean add(Runnable e) {
        return offer(e);
    }

    public boolean offer(Runnable x) {
        if (x == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = size;
            if (i >= queue.length)
                grow();
            size = i + 1;
            if (i == 0) {
                queue[0] = e;
                setIndex(e, 0);
            } else {
                siftUp(i, e);
            }
            if (queue[0] == e) {
                leader = null;
                available.signal();
            }
        } finally {
            lock.unlock();
        }
        return true;
    }

    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
}

  • 回到我們最開始說到的ScheduledFutureTask任務(wù)線程類汽畴,最終執(zhí)行任務(wù)的其實(shí)就是它

ScheduledFutureTask任務(wù)線程,才是真正執(zhí)行任務(wù)的線程類耸序,只是繞了一圈忍些,做了很多包裝,run()方法就是真正執(zhí)行定時(shí)任務(wù)的方法坎怪。

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;

    /** The time the task is enabled to execute in nanoTime units */
    private long time;

    /**
     * Period in nanoseconds for repeating tasks.  A positive
     * value indicates fixed-rate execution.  A negative value
     * indicates fixed-delay execution.  A value of 0 indicates a
     * non-repeating task.
     */
    private final long period;

    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;

    /**
     * Overrides FutureTask version so as to reset/requeue if periodic.
     */
    public void run() {
        boolean periodic = isPeriodic();
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        else if (!periodic)//非周期性定時(shí)任務(wù)
            ScheduledFutureTask.super.run();
        else if (ScheduledFutureTask.super.runAndReset()) {//周期性定時(shí)任務(wù)罢坝,需要重置
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }

 //...
}

3.3、小結(jié)

ScheduledExecutorService 相比 Timer 定時(shí)器搅窿,完美的解決上面說到的 Timer 存在的兩個(gè)缺點(diǎn)嘁酿!

在單體應(yīng)用里面,使用 ScheduledExecutorService 可以解決大部分需要使用定時(shí)任務(wù)的業(yè)務(wù)需求男应!

但是這是否意味著它是最佳的解決方案呢闹司?

我們發(fā)現(xiàn)線程池中 ScheduledExecutorService 的排序容器跟 Timer 一樣,都是采用最小堆的存儲(chǔ)結(jié)構(gòu)沐飘,新任務(wù)加入排序效率是O(log(n))游桩,執(zhí)行取任務(wù)是O(1)。

這里的寫入排序效率其實(shí)是有空間可提升的耐朴,有可能優(yōu)化到O(1)的時(shí)間復(fù)雜度借卧,也就是我們下面要介紹的時(shí)間輪實(shí)現(xiàn)

2.3 DelayQueue

DelayQueue是一個(gè)無界延時(shí)隊(duì)列筛峭,內(nèi)部有一個(gè)優(yōu)先隊(duì)列铐刘,可以重寫compare接口,按照我們想要的方式進(jìn)行排序影晓。
實(shí)現(xiàn)Demo

    public static void main(String[] args) throws Exception {
        DelayQueue<Order> orders = new DelayQueue<>();
        Order order1 = new Order(1000, "1x");
        Order order2 = new Order(2000, "2x");
        Order order3 = new Order(3000, "3x");
        Order order4 = new Order(4000, "4x");
        orders.add(order1);
        orders.add(order2);
        orders.add(order3);
        orders.add(order4);
        for (; ; ) {
            //沒有到期會(huì)阻塞
            Order take = orders.take();
            System.out.println(take);
        }
    }
}

class Order implements Delayed {
    @Override
    public String toString() {
        return "DelayedElement{" + "delay=" + delayTime +
                ", expire=" + expire +
                ", data='" + data + '\'' +
                '}';
    }

    Order(long delay, String data) {
        delayTime = delay;
        this.data = data;
        expire = System.currentTimeMillis() + delay;
    }

    private final long delayTime; //延遲時(shí)間
    private final long expire;  //到期時(shí)間
    private String data;   //數(shù)據(jù)

    /**
     * 剩余時(shí)間=到期時(shí)間-當(dāng)前時(shí)間
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 優(yōu)先隊(duì)列里面優(yōu)先級(jí)規(guī)則
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

從源碼可以看出镰吵,DelayQueue的offer和take方法調(diào)用的是優(yōu)先隊(duì)列的offer和take檩禾。并且使用了ReetrtantLock保證線程安全

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }


public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

https://my.oschina.net/u/2474629/blog/1919127

3. 時(shí)間輪實(shí)現(xiàn)

代碼實(shí)現(xiàn):支持秒級(jí)別的循環(huán)隊(duì)列,從下標(biāo)最小的任務(wù)集合開始捡遍,提交到線程池執(zhí)行锌订。然后休眠1s,指針移動(dòng)到下一個(gè)下標(biāo)處画株。
所謂時(shí)間輪(RingBuffer)實(shí)現(xiàn),從數(shù)據(jù)結(jié)構(gòu)上看啦辐,簡單的說就是循環(huán)隊(duì)列谓传,從名稱上看可能感覺很抽象。
它其實(shí)就是一個(gè)環(huán)形的數(shù)組芹关,如圖所示续挟,假設(shè)我們創(chuàng)建了一個(gè)長度為 8 的時(shí)間輪。

image

插入侥衬、取值流程:

  • 1.當(dāng)我們需要新建一個(gè) 1s 延時(shí)任務(wù)的時(shí)候诗祸,則只需要將它放到下標(biāo)為 1 的那個(gè)槽中,2轴总、3直颅、...、7也同樣如此怀樟。
  • 2.而如果是新建一個(gè) 10s 的延時(shí)任務(wù)功偿,則需要將它放到下標(biāo)為 2 的槽中,但同時(shí)需要記錄它所對應(yīng)的圈數(shù)往堡,也就是 1 圈械荷,不然就和 2 秒的延時(shí)消息重復(fù)了
  • 3.當(dāng)創(chuàng)建一個(gè) 21s 的延時(shí)任務(wù)時(shí),它所在的位置就在下標(biāo)為 5 的槽中虑灰,同樣的需要為他加上圈數(shù)為 2吨瞎,依次類推...

因此,總結(jié)起來有兩個(gè)核心的變量:

  • 數(shù)組下標(biāo):表示某個(gè)任務(wù)延遲時(shí)間穆咐,從數(shù)據(jù)操作上對執(zhí)行時(shí)間點(diǎn)進(jìn)行取余
  • 圈數(shù):表示需要循環(huán)圈數(shù)

通過這張圖可以更直觀的理解颤诀!

image

當(dāng)我們需要取出延時(shí)任務(wù)時(shí),只需要每秒往下移動(dòng)這個(gè)指針庸娱,然后取出該位置的所有任務(wù)即可着绊,取任務(wù)的時(shí)間消耗為O(1)。

當(dāng)我們需要插入任務(wù)熟尉,也只需要計(jì)算出對應(yīng)的下表和圈數(shù)归露,即可將任務(wù)插入到對應(yīng)的數(shù)組位置中,插入任務(wù)的時(shí)間消耗為O(1)斤儿。

如果時(shí)間輪的槽比較少剧包,會(huì)導(dǎo)致某一個(gè)槽上的任務(wù)非常多恐锦,那么效率也比較低,這就和 HashMap 的 hash 沖突是一樣的疆液,因此在設(shè)計(jì)槽的時(shí)候不能太大也不能太小一铅。

package com.hui.hui;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class RingBuffer {

    private static final int STATIC_RING_SIZE = 64;

    private Object[] ringBuffer;

    private int bufferSize;

    /**
     * business thread pool
     */
    private ExecutorService executorService;

    private volatile int size = 0;

    /***
     * task stop sign
     */
    private volatile boolean stop = false;

    /**
     * task start sign
     */
    private volatile AtomicBoolean start = new AtomicBoolean(false);

    /**
     * total tick times
     */
    private AtomicInteger tick = new AtomicInteger();

    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    private AtomicInteger taskId = new AtomicInteger();
    private Map<Integer, Task> taskMap = new ConcurrentHashMap<>(16);

    /**
     * Create a new delay task ring buffer by default size
     *
     * @param executorService the business thread pool
     */
    public RingBuffer(ExecutorService executorService) {
        this.executorService = executorService;
        this.bufferSize = STATIC_RING_SIZE;
        this.ringBuffer = new Object[bufferSize];
    }

    /**
     * Create a new delay task ring buffer by custom buffer size
     *
     * @param executorService the business thread pool
     * @param bufferSize      custom buffer size
     */
    public RingBuffer(ExecutorService executorService, int bufferSize) {
        this(executorService);

        if (!powerOf2(bufferSize)) {
            throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
        }
        this.bufferSize = bufferSize;
        this.ringBuffer = new Object[bufferSize];
    }

    /**
     * Add a task into the ring buffer(thread safe)
     *
     * @param task business task extends {@link Task}
     */
    public int addTask(Task task) {
        int key = task.getKey();
        int id;

        try {
            lock.lock();
            int index = mod(key, bufferSize);
            task.setIndex(index);
            Set<Task> tasks = get(index);

            int cycleNum = cycleNum(key, bufferSize);
            if (tasks != null) {
                task.setCycleNum(cycleNum);
                tasks.add(task);
            } else {
                task.setIndex(index);
                task.setCycleNum(cycleNum);
                Set<Task> sets = new HashSet<>();
                sets.add(task);
                put(key, sets);
            }
            id = taskId.incrementAndGet();
            task.setTaskId(id);
            taskMap.put(id, task);
            size++;
        } finally {
            lock.unlock();
        }

        start();

        return id;
    }

    /**
     * Cancel task by taskId
     *
     * @param id unique id through {@link #addTask(Task)}
     * @return
     */
    public boolean cancel(int id) {

        boolean flag = false;
        Set<Task> tempTask = new HashSet<>();

        try {
            lock.lock();
            Task task = taskMap.get(id);
            if (task == null) {
                return false;
            }

            Set<Task> tasks = get(task.getIndex());
            for (Task tk : tasks) {
                if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
                    size--;
                    flag = true;
                    taskMap.remove(id);
                } else {
                    tempTask.add(tk);
                }

            }
            //update origin data
            ringBuffer[task.getIndex()] = tempTask;
        } finally {
            lock.unlock();
        }

        return flag;
    }

    /**
     * Thread safe
     *
     * @return the size of ring buffer
     */
    public int taskSize() {
        return size;
    }

    /**
     * Same with method {@link #taskSize}
     *
     * @return
     */
    public int taskMapSize() {
        return taskMap.size();
    }

    /**
     * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
     */
    public void start() {
        if (!start.get()) {
            System.out.println("Delay task is starting");
            if (start.compareAndSet(start.get(), true)) {
                Thread job = new Thread(new TriggerJob());
                job.setName("consumer RingBuffer thread");
                job.start();
                start.set(true);
            }

        }
    }

    /**
     * Stop consumer ring buffer thread
     *
     * @param force True will force close consumer thread and discard all pending tasks
     *              otherwise the consumer thread waits for all tasks to completes before closing.
     */
    public void stop(boolean force) {
        if (force) {
            stop = true;
            executorService.shutdownNow();
        } else {
            System.out.println("Delay task is stopping");
            if (taskSize() > 0) {
                try {
                    lock.lock();
                    condition.await();
                    stop = true;
                } catch (InterruptedException e) {
                    System.out.println("InterruptedException" + e);
                } finally {
                    lock.unlock();
                }
            }
            executorService.shutdown();
        }

    }

    private Set<Task> get(int index) {
        return (Set<Task>) ringBuffer[index];
    }

    private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }

    /**
     * Remove and get task list.
     *
     * @param key
     * @return task list
     */
    private Set<Task> remove(int key) {
        Set<Task> tempTask = new HashSet<>();
        Set<Task> result = new HashSet<>();

        Set<Task> tasks = (Set<Task>) ringBuffer[key];
        if (tasks == null) {
            return result;
        }

        for (Task task : tasks) {
            if (task.getCycleNum() == 0) {
                result.add(task);

                size2Notify();
            } else {
                // decrement 1 cycle number and update origin data
                task.setCycleNum(task.getCycleNum() - 1);
                tempTask.add(task);
            }
            // remove task, and free the memory.
            taskMap.remove(task.getTaskId());
        }

        //update origin data
        ringBuffer[key] = tempTask;

        return result;
    }

    private void size2Notify() {
        try {
            lock.lock();
            size--;
            if (size == 0) {
                condition.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    private boolean powerOf2(int target) {
        if (target < 0) {
            return false;
        }
        int value = target & (target - 1);
        if (value != 0) {
            return false;
        }

        return true;
    }

    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get();
        return target & (mod - 1);
    }

    private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }

    /**
     * An abstract class used to implement business.
     */
    public abstract static class Task extends Thread {

        private int index;

        private int cycleNum;

        private int key;

        /**
         * The unique ID of the task
         */
        private int taskId;

        @Override
        public void run() {
        }

        public int getKey() {
            return key;
        }

        /**
         * @param key Delay time(seconds)
         */
        public void setKey(int key) {
            this.key = key;
        }

        public int getCycleNum() {
            return cycleNum;
        }

        private void setCycleNum(int cycleNum) {
            this.cycleNum = cycleNum;
        }

        public int getIndex() {
            return index;
        }

        private void setIndex(int index) {
            this.index = index;
        }

        public int getTaskId() {
            return taskId;
        }

        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }

    private class TriggerJob implements Runnable {

        @Override
        public void run() {
            int index = 0;
            while (!stop) {
                try {
                    Set<Task> tasks = remove(index);
                    for (Task task : tasks) {
                        executorService.submit(task);
                    }

                    if (++index > bufferSize - 1) {
                        index = 0;
                    }

                    //Total tick number of records
                    tick.incrementAndGet();
                    TimeUnit.SECONDS.sleep(1);

                } catch (Exception e) {
                    System.out.println("Exception" + e);
                }

            }

            System.out.println("Delay task has stopped");
        }
    }

    public static void main(String[] args) {
        RingBuffer ringBufferWheel = new RingBuffer(Executors.newFixedThreadPool(2));
        for (int i = 0; i < 3; i++) {
            RingBuffer.Task job = new Job();
            job.setKey(i);
            ringBufferWheel.addTask(job);
        }
    }

    public static class Job extends RingBuffer.Task {
        @Override
        public void run() {

            System.out.println("test5"+getIndex());
        }
    }
}

二色乾、分布式

之前說的單機(jī)實(shí)現(xiàn)颖杏,一旦服務(wù)器重啟,那么延時(shí)任務(wù)會(huì)丟失瘤袖,而分布式的方案則不會(huì)丟失任務(wù)掉缺。

Redis ZSet實(shí)現(xiàn)

  1. 底層實(shí)現(xiàn):Redis的底層實(shí)現(xiàn)是當(dāng)key大小小于某個(gè)閾值卜录,并且鍵值對個(gè)數(shù)小于某個(gè)閾值(都可配置),使用ZipList實(shí)現(xiàn)眶明,否則使用SkipList和Hash實(shí)現(xiàn)艰毒,SkipList中按照score排序,hash存儲(chǔ)成員到分?jǐn)?shù)的映射搜囱。
  2. ZSet API
  • 添加丑瞧,如果值存在添加,將會(huì)重新排序蜀肘。zadd
    127.0.0.1:6379>zadd myZSet 1 zlh ---添加分?jǐn)?shù)為1绊汹,值為zlh的zset集合
  • 查看zset集合的成員個(gè)數(shù)。zcard
    127.0.0.1:6379>zcard myZSet
  • 查看Zset指定范圍的成員幌缝,withscores為輸出結(jié)果帶分?jǐn)?shù)灸促。zrange
    127.0.0.1:6379>zrange mZySet 0 -1 ----0為開始,-1為結(jié)束涵卵,輸出順序結(jié)果為: zlh tom jim
  • 獲取zset成員的下標(biāo)位置浴栽,如果值不存在返回null。zrank
    127.0.0.1:6379>zrank mZySet Jim ---Jim的在zset集合中的下標(biāo)為2
  • 獲取zset集合指定分?jǐn)?shù)之間存在的成員個(gè)數(shù)轿偎。zcount
    127.0.0.1:6379>zcount mySet 1 3 ---輸出分?jǐn)?shù)>=1 and 分?jǐn)?shù) <=3的成員個(gè)數(shù)為3
  1. 實(shí)現(xiàn)思路:
  • 添加任務(wù)時(shí)典鸡,將當(dāng)前時(shí)間+延時(shí)時(shí)間作為SkipList的分詞,job的key作為成員標(biāo)識(shí)加入ZSet
  • 搬運(yùn)線程開啟定時(shí)任務(wù)坏晦,將在當(dāng)前時(shí)間戳之前的任務(wù)添加到隊(duì)列中
  • 開啟消費(fèi)線程萝玷,無限循環(huán),超時(shí)從隊(duì)列獲取Job昆婿,將任務(wù)放到線程池中消費(fèi)
  • 添加任務(wù)球碉,消費(fèi)線程,搬運(yùn)線程仓蛆,都需要獲取Redis分布式鎖

RabbitMQ

參考:http://www.reibang.com/p/fb83c68feec4

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末睁冬,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子看疙,更是在濱河造成了極大的恐慌豆拨,老刑警劉巖直奋,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異施禾,居然都是意外死亡脚线,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進(jìn)店門弥搞,熙熙樓的掌柜王于貴愁眉苦臉地迎上來邮绿,“玉大人,你說我怎么就攤上這事拓巧∷孤担” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵肛度,是天一觀的道長。 經(jīng)常有香客問我投慈,道長承耿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任伪煤,我火速辦了婚禮加袋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘抱既。我一直安慰自己职烧,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布防泵。 她就那樣靜靜地躺著蚀之,像睡著了一般。 火紅的嫁衣襯著肌膚如雪捷泞。 梳的紋絲不亂的頭發(fā)上足删,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機(jī)與錄音锁右,去河邊找鬼失受。 笑死,一個(gè)胖子當(dāng)著我的面吹牛咏瑟,可吹牛的內(nèi)容都是我干的拂到。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼码泞,長吁一口氣:“原來是場噩夢啊……” “哼兄旬!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起浦夷,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤辖试,失蹤者是張志新(化名)和其女友劉穎辜王,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體罐孝,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡呐馆,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了莲兢。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汹来。...
    茶點(diǎn)故事閱讀 40,664評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖改艇,靈堂內(nèi)的尸體忽然破棺而出收班,到底是詐尸還是另有隱情,我是刑警寧澤谒兄,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布摔桦,位于F島的核電站,受9級(jí)特大地震影響承疲,放射性物質(zhì)發(fā)生泄漏邻耕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一燕鸽、第九天 我趴在偏房一處隱蔽的房頂上張望兄世。 院中可真熱鬧,春花似錦啊研、人聲如沸御滩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽削解。三九已至,卻和暖如春麸锉,著一層夾襖步出監(jiān)牢的瞬間钠绍,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工花沉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留柳爽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓碱屁,卻偏偏與公主長得像磷脯,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子娩脾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容