系統(tǒng)定時绑莺、超時
在我們平時的項目開發(fā)中圈盔,會設置系統(tǒng)的超時時間戴尸,比如在http接口中設置超時時間,在定時調(diào)度中也會用到付魔。在jdk的開發(fā)的實現(xiàn)Timer和ScheduledThreadPoolExecutor、DelayQueue定時調(diào)度中使用的是最小堆飞蹂,我們知道最小堆的插入時間復雜度是log(n)几苍。在kafka中,采用的是基于O(1)的時間輪算法陈哑,本節(jié)我們就使用java來模仿kafka層級時間輪妻坝。
時間輪簡介
時間輪的實現(xiàn)思想是借鑒我們的鐘表,秒針轉動一圈惊窖,分鐘移動一個刽宪,分鐘轉動一格,始終移動一格爬坑,在kafka中稱為桶bucket纠屋。下面文章中稱為槽。
在kafka中第一個槽默認一格表示1ms盾计,第一個時間輪是20個槽售担,所以第一一個時間輪代表20ms。第二個時間輪的每一格式第一個時間輪的總時間署辉,也就是20ms族铆,所以第二個時間輪可表示的時間范圍是400ms,依次類推哭尝,第三個時間輪可表示的時間范圍是8s,第四個時間輪是160s等等哥攘。由于時間在向前推進,故一段時間后,第二個時間輪上的任務會向轉移到第一個時間輪上逝淹,這樣遞進的方式耕姊,最終任務都會執(zhí)行。
kafka中的每個槽表示一個TimerTaskList,每個任務加到這個TimerTaskList上栅葡,如下圖中時間輪中每個槽都代表一個TimerTaskList茉兰。
1.任務TimerTask源碼分析
TimerTask類表示一個要執(zhí)行的任務,實現(xiàn)了Runnable接口
public abstract class TimerTask implements Runnable {
public long delayMs; //表示當前任務延遲多久后執(zhí)行(單位ms)欣簇,比如說延遲3s规脸,則此值為3000
public TimerTask(long delayMs) {
this.delayMs = delayMs;
}
// 指向TimerTaskEntry對象,一個TimerTaskEntry包含一個TimerTask熊咽,TimerTaskEntry是可復用的
private TimerTaskList.TimerTaskEntry timerTaskEntry = null;
// 取消當前任務莫鸭,就是從TimerTaskEntry移出TimerTask,并且把當前的timerTaskEntry置空
public synchronized void cancel() {
if(timerTaskEntry != null) {
timerTaskEntry.remove();
}
timerTaskEntry = null;
}
//設置當前任務綁定的TimerTaskEntry
public synchronized void setTimerTaskEntry(TimerTaskList.TimerTaskEntry entry) {
if(timerTaskEntry != null && timerTaskEntry != entry) {
timerTaskEntry.remove();
}
timerTaskEntry = entry;
}
public TimerTaskList.TimerTaskEntry getTimerTaskEntry() {
return timerTaskEntry;
}
}
2.任務包裝類TimerTaskEntry
TimerTaskEntry是TimerTask的包裝,實現(xiàn)了Compareable接口横殴,用來比較連個任務的過期時間被因,以決定任務list插入的順序
public static class TimerTaskEntry implements Comparable<TimerTaskEntry>{
//包含一個任務
public TimerTask timerTask;
// 任務的過期時間,此處的過期時間設置的過期間隔+系統(tǒng)當前時間(毫秒)
public Long expirationMs;
// 當前任務屬于哪一個列表
private TimerTaskList list;
// 當前任務的上一個任務滥玷,用雙向列表連接
private TimerTaskEntry prev;
private TimerTaskEntry next;
public TimerTaskEntry(TimerTask timerTask,Long expirationMs) {
this.timerTask = timerTask;
this.expirationMs = expirationMs;
// 傳遞進來任務TimerTask氏身,并設置TimerTask的包裝類
if(timerTask != null) {
timerTask.setTimerTaskEntry(this);
}
}
// 任務的取消,就是判斷任務TimerTask的Entry是否是當前任務
public boolean cancel() {
return timerTask.getTimerTaskEntry() != this;
}
// 任務的移出
public void remove() {
TimerTaskList currentList = list;
while(currentList != null) {
currentList.remove(this);
currentList = list;
}
}
// 比較兩個任務在列表中的位置惑畴,及那個先執(zhí)行
@Override
public int compareTo(TimerTaskEntry that) {
return Long.compare(expirationMs,that.expirationMs);
}
}
3.每個槽中的任務列表
在時間輪中每個槽代表一個列表蛋欣,即TimerTaskList,每個TimerTaskList中包含多個TimerTaskEntry如贷,并且用雙向列表鏈接陷虎。TimerTaskList實現(xiàn)了Delayed接口,用于返回剩余的時間杠袱,把上層時間輪的任務移動位置尚猿。
public class TimerTaskList implements Delayed {
//當前列表中包含的任務數(shù)
private AtomicInteger taskCounter;
// 列表的頭結點
private TimerTaskEntry root;
// 過期時間
private AtomicLong expiration = new AtomicLong(-1L);
public TimerTaskList(AtomicInteger taskCounter) {
this.taskCounter = taskCounter;
this.root = new TimerTaskEntry(null,-1L);
root.next = root;
root.prev = root;
}
// 給當前槽設置過期時間
public boolean setExpiration(Long expirationMs) {
return expiration.getAndSet(expirationMs) != expirationMs;
}
public Long getExpiration() {
return expiration.get();
}
// 用于遍歷當前列表中的任務
public synchronized void foreach(Consumer<TimerTask> f) {
TimerTaskEntry entry = root.next;
while(entry != root) {
TimerTaskEntry nextEntry = entry.next;
if(!entry.cancel()) {
f.accept(entry.timerTask);
}
entry = nextEntry;
}
}
// 添加任務到列表中
public void add(TimerTaskEntry timerTaskEntry) {
boolean done = false;
while(!done) {
timerTaskEntry.remove();
synchronized (this) {
synchronized (timerTaskEntry) {
if(timerTaskEntry.list == null) {
TimerTaskEntry tail = root.prev;
timerTaskEntry.next = root;
timerTaskEntry.prev = tail;
timerTaskEntry.list = this;
tail.next = timerTaskEntry;
root.prev = timerTaskEntry;
taskCounter.incrementAndGet();
done = true;
}
}
}
}
}
//移出任務
private synchronized void remove(TimerTaskEntry timerTaskEntry) {
synchronized (timerTaskEntry) {
if(timerTaskEntry.list == this) {
timerTaskEntry.next.prev = timerTaskEntry.prev;
timerTaskEntry.prev.next = timerTaskEntry.next;
timerTaskEntry.next = null;
timerTaskEntry.prev = null;
timerTaskEntry.list = null;
taskCounter.decrementAndGet();
}
}
}
public synchronized void flush(Consumer<TimerTaskEntry> f) {
TimerTaskEntry head = root.next;
while(head != root) {
remove(head);
f.accept(head);
head = root.next;
}
expiration.set(-1L);
}
//獲得當前任務剩余時間
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(Math.max(getExpiration() - System.currentTimeMillis(),0),TimeUnit.MICROSECONDS);
}
@Override
public int compareTo(Delayed d) {
TimerTaskList other = (TimerTaskList) d;
return Long.compare(getExpiration(),other.getExpiration());
}
}
4.時間輪結構
時間輪TimeWheel代表一層時間輪,即第一層時間輪表示20ms楣富,主要功能是添加任務和凿掂,驅動時間輪向前。
public class TimingWheel {
private Long tickMs; //每一個槽表示的時間范圍
private Integer wheelSize; // 時間輪大小纹蝴,即每一層時間輪的大小
private Long startMs; // 系統(tǒng)的啟動時間
private AtomicInteger taskCounter; // 當前層任務數(shù)
private DelayQueue<TimerTaskList> queue; //延遲隊列庄萎,用于從隊列取每個任務列表
private Long interval; //每一層時間輪代表的時間
private List<TimerTaskList> buckets; // 每一層的每一個槽中的時間任務列表
private Long currentTime; // 修正后的系統(tǒng)啟動時間
private TimingWheel overflowWheel = null; // 上一層時間輪
public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
this.taskCounter = taskCounter;
this.queue = queue;
interval = tickMs * wheelSize;
currentTime = startMs - (startMs % tickMs); //當前時間,往前推
buckets = new ArrayList<>(wheelSize);
for(int i = 0;i < wheelSize;i++) {
buckets.add(new TimerTaskList(taskCounter)); //創(chuàng)建每一個槽中的列表
}
}
// 創(chuàng)建上層時間輪
public synchronized void addOverflowWheel() {
if(overflowWheel == null) {
overflowWheel = new TimingWheel(
interval, // 此處interval即表示上一層時間輪表示的范圍
wheelSize,
currentTime,
taskCounter,
queue
);
}
}
// 添加任務
public boolean add(TimerTaskList.TimerTaskEntry timerTaskEntry) {
Long expiration = timerTaskEntry.expirationMs;
Long thisTime = currentTime + tickMs;
// 任務是否已經(jīng)取消塘安,取消則返回
if(timerTaskEntry.cancel()) {
return false;
// 當前任務是否已經(jīng)過期糠涛,如果過期則返回false,要立即執(zhí)行
}else if(expiration < currentTime + tickMs) {
return false;
// 判斷當前任務能否在添加到當前時間輪
}else if(expiration < currentTime + interval) {
Long virtualId = expiration / tickMs;
// 計算當前任務要分配在哪個槽中
int whereBucket = (int) (virtualId % wheelSize);
TimerTaskList bucket = buckets.get((int)(virtualId % wheelSize));
bucket.add(timerTaskEntry);
long bucketExpiration = virtualId * tickMs;
//更新槽的過期時間兼犯,添加入延遲隊列
if(bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket);
}
return true;
}else {
//添加任務到高層時間輪
if(overflowWheel == null) addOverflowWheel();
return overflowWheel.add(timerTaskEntry);
}
}
// 向前驅動時間
public void advanceClock(Long timeMs) {
if(timeMs >= currentTime + tickMs) {
currentTime = timeMs - (timeMs % tickMs);
if(overflowWheel != null) {
overflowWheel.advanceClock(currentTime);
}
}
}
}
5. 時間輪接口
- kafka中提供了Timer接口忍捡,用于對外提供調(diào)用集漾,分別是Timer#add添加任務;Timer#advanceClock驅動時間砸脊; Timer#size時間輪中總任務數(shù)具篇;Timer#shutdown停止時間輪
public interface Timer {
void add(TimerTask timerTask);
boolean advanceClock(Long timeoutMs) throws Exception;
int size();
void shutdown();
}
- Timer的實現(xiàn)類是SystemTimer
public class SystemTimer implements Timer {
private String executorName;
private Long tickMs = 1L;
private Integer wheelSize = 20;
private Long startMs = System.currentTimeMillis();
//用來執(zhí)行TimerTask任務
private ExecutorService taskExecutor =
Executors.newFixedThreadPool(1,(runnable) -> {
Thread thread = new Thread(runnable);
thread.setName("executor-" + executorName);
thread.setDaemon(false);
return thread;
});
//延遲隊列
private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
private AtomicInteger taskCounter = new AtomicInteger(0);
private TimingWheel timingWheel;
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
// 用來執(zhí)行時間輪的重新排列,及上一個槽中的任務列表被執(zhí)行后凌埂,后面的槽中的任務列表移動
private Consumer<TimerTaskEntry> reinsert = (timerTaskEntry) -> addTimerTaskEntry(timerTaskEntry);
public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {
this.executorName = executorName;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.startMs = startMs;
this.timingWheel = new TimingWheel(
tickMs,
wheelSize,
startMs,
taskCounter,
delayQueue
);
}
// 可能會多個線程操作栽连,所以需要加鎖
@Override
public void add(TimerTask timerTask) {
readLock.lock();
try{
addTimerTaskEntry(new TimerTaskEntry(timerTask,timerTask.delayMs + System.currentTimeMillis()));
}finally {
readLock.unlock();
}
}
private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) { // 往時間輪添加任務
if(!timingWheel.add(timerTaskEntry)) {
// 返回false并且任務未取消,則提交當前任務立即執(zhí)行侨舆。
if(!timerTaskEntry.cancel()) {
taskExecutor.submit(timerTaskEntry.timerTask);
}
}
}
// 向前驅動時間輪
@Override
public boolean advanceClock(Long timeoutMs) throws Exception{
// 使用阻塞隊列獲取任務
TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
if(bucket != null) {
writeLock.lock();
try{
while(bucket != null) {
timingWheel.advanceClock(bucket.getExpiration());
// 驅動時間后,需要移動TimerTaskList到上一個槽或者從上一層移動到本層
bucket.flush(reinsert);
bucket = delayQueue.poll();
}
}finally {
writeLock.unlock();
}
return true;
}else {
return false;
}
}
@Override
public int size() {
return taskCounter.get();
}
@Override
public void shutdown() {
taskExecutor.shutdown();
}
}
- 測試
public class SystemTimerTest {
//驅動時間輪向前的線程
private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static SystemTimer timer = new SystemTimer("test",1000L,5,System.currentTimeMillis());
public static void runTask() throws Exception {
for(int i = 0;i < 10000;i+= 1000) {
// 添加任務绢陌,每個任務間隔1s
timer.add(new TimerTask(i) {
@Override
public void run() {
System.out.println("運行testTask的時間: " + System.currentTimeMillis());
}
});
}
}
public static void main(String[] args) throws Exception {
runTask();
executorService.submit(() -> {
while(true) {
try {
// 驅動時間輪線程間隔0.2s驅動
timer.advanceClock(200L);
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000000);
timer.shutdown();
executorService.shutdown();
}
}