仿kafka實現(xiàn)java版時間輪

系統(tǒng)定時绑莺、超時

在我們平時的項目開發(fā)中圈盔,會設置系統(tǒng)的超時時間戴尸,比如在http接口中設置超時時間,在定時調(diào)度中也會用到付魔。在jdk的開發(fā)的實現(xiàn)Timer和ScheduledThreadPoolExecutor、DelayQueue定時調(diào)度中使用的是最小堆飞蹂,我們知道最小堆的插入時間復雜度是log(n)几苍。在kafka中,采用的是基于O(1)的時間輪算法陈哑,本節(jié)我們就使用java來模仿kafka層級時間輪妻坝。

時間輪簡介

時間輪的實現(xiàn)思想是借鑒我們的鐘表,秒針轉動一圈惊窖,分鐘移動一個刽宪,分鐘轉動一格,始終移動一格爬坑,在kafka中稱為桶bucket纠屋。下面文章中稱為槽。

在kafka中第一個槽默認一格表示1ms盾计,第一個時間輪是20個槽售担,所以第一一個時間輪代表20ms。第二個時間輪的每一格式第一個時間輪的總時間署辉,也就是20ms族铆,所以第二個時間輪可表示的時間范圍是400ms,依次類推哭尝,第三個時間輪可表示的時間范圍是8s,第四個時間輪是160s等等哥攘。由于時間在向前推進,故一段時間后,第二個時間輪上的任務會向轉移到第一個時間輪上逝淹,這樣遞進的方式耕姊,最終任務都會執(zhí)行。
kafka中的每個槽表示一個TimerTaskList,每個任務加到這個TimerTaskList上栅葡,如下圖中時間輪中每個槽都代表一個TimerTaskList茉兰。


image.png
1.任務TimerTask源碼分析

TimerTask類表示一個要執(zhí)行的任務,實現(xiàn)了Runnable接口

public abstract class TimerTask implements Runnable {

    public long delayMs; //表示當前任務延遲多久后執(zhí)行(單位ms)欣簇,比如說延遲3s规脸,則此值為3000

    public TimerTask(long delayMs) {
        this.delayMs =  delayMs;
    }
    // 指向TimerTaskEntry對象,一個TimerTaskEntry包含一個TimerTask熊咽,TimerTaskEntry是可復用的
    private TimerTaskList.TimerTaskEntry timerTaskEntry = null;
  
    // 取消當前任務莫鸭,就是從TimerTaskEntry移出TimerTask,并且把當前的timerTaskEntry置空
    public synchronized void cancel() {
        if(timerTaskEntry != null) {
            timerTaskEntry.remove();
        }
        timerTaskEntry = null;
    }

    //設置當前任務綁定的TimerTaskEntry
    public synchronized void setTimerTaskEntry(TimerTaskList.TimerTaskEntry entry) {
        if(timerTaskEntry != null && timerTaskEntry != entry) {
            timerTaskEntry.remove();
        }
        timerTaskEntry = entry;
    }

    public TimerTaskList.TimerTaskEntry getTimerTaskEntry() {
        return timerTaskEntry;
    }
}
2.任務包裝類TimerTaskEntry

TimerTaskEntry是TimerTask的包裝,實現(xiàn)了Compareable接口横殴,用來比較連個任務的過期時間被因,以決定任務list插入的順序

public static class TimerTaskEntry implements Comparable<TimerTaskEntry>{
        //包含一個任務
        public TimerTask timerTask;
        // 任務的過期時間,此處的過期時間設置的過期間隔+系統(tǒng)當前時間(毫秒)
        public Long expirationMs;
        
        // 當前任務屬于哪一個列表
        private TimerTaskList list;
        // 當前任務的上一個任務滥玷,用雙向列表連接
        private TimerTaskEntry prev;
        private TimerTaskEntry next;


        public TimerTaskEntry(TimerTask timerTask,Long expirationMs) {
            this.timerTask = timerTask;
            this.expirationMs = expirationMs;
            // 傳遞進來任務TimerTask氏身,并設置TimerTask的包裝類
            if(timerTask != null) {
                timerTask.setTimerTaskEntry(this);
            }
        }
      
        // 任務的取消,就是判斷任務TimerTask的Entry是否是當前任務
        public boolean cancel() {
            return timerTask.getTimerTaskEntry() != this;
        }
      
       // 任務的移出
        public void remove() {
            TimerTaskList currentList = list;
            while(currentList != null) {
                currentList.remove(this);
                currentList = list;
            }
        }
        // 比較兩個任務在列表中的位置惑畴,及那個先執(zhí)行
        @Override
        public int compareTo(TimerTaskEntry that) {
            return Long.compare(expirationMs,that.expirationMs);
        }
    }
3.每個槽中的任務列表

在時間輪中每個槽代表一個列表蛋欣,即TimerTaskList,每個TimerTaskList中包含多個TimerTaskEntry如贷,并且用雙向列表鏈接陷虎。TimerTaskList實現(xiàn)了Delayed接口,用于返回剩余的時間杠袱,把上層時間輪的任務移動位置尚猿。

public class TimerTaskList implements Delayed {
    //當前列表中包含的任務數(shù)
    private AtomicInteger taskCounter;
    // 列表的頭結點
    private TimerTaskEntry root;
    // 過期時間
    private AtomicLong expiration = new AtomicLong(-1L);


    public TimerTaskList(AtomicInteger taskCounter) {
        this.taskCounter = taskCounter;
        this.root =  new TimerTaskEntry(null,-1L);
        root.next = root;
        root.prev = root;
    }

    // 給當前槽設置過期時間
    public boolean setExpiration(Long expirationMs) {
        return expiration.getAndSet(expirationMs) != expirationMs;
    }

    public Long getExpiration() {
        return expiration.get();
    }

    // 用于遍歷當前列表中的任務
    public synchronized  void foreach(Consumer<TimerTask> f) {
        TimerTaskEntry entry = root.next;
        while(entry != root) {
            TimerTaskEntry nextEntry = entry.next;
            if(!entry.cancel()) {
                f.accept(entry.timerTask);
            }
            entry = nextEntry;
        }
    }
  
   // 添加任務到列表中
    public void add(TimerTaskEntry timerTaskEntry) {
        boolean done = false;
        while(!done) {
            timerTaskEntry.remove();

            synchronized (this) {
                synchronized (timerTaskEntry) {
                    if(timerTaskEntry.list == null) {
                        TimerTaskEntry tail = root.prev;
                        timerTaskEntry.next = root;
                        timerTaskEntry.prev = tail;
                        timerTaskEntry.list = this;
                        tail.next = timerTaskEntry;
                        root.prev = timerTaskEntry;
                        taskCounter.incrementAndGet();
                        done = true;
                    }
                }
            }
        }
    }

    //移出任務
    private synchronized void remove(TimerTaskEntry timerTaskEntry) {
        synchronized (timerTaskEntry) {
            if(timerTaskEntry.list == this) {
                timerTaskEntry.next.prev = timerTaskEntry.prev;
                timerTaskEntry.prev.next = timerTaskEntry.next;
                timerTaskEntry.next = null;
                timerTaskEntry.prev = null;
                timerTaskEntry.list = null;
                taskCounter.decrementAndGet();
            }
        }
    }

    public synchronized void flush(Consumer<TimerTaskEntry> f) {
        TimerTaskEntry head = root.next;
        while(head != root) {
            remove(head);
            f.accept(head);
            head = root.next;
        }
         expiration.set(-1L);
    }
    //獲得當前任務剩余時間
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Math.max(getExpiration() - System.currentTimeMillis(),0),TimeUnit.MICROSECONDS);
    }
    
    @Override
    public int compareTo(Delayed d) {
        TimerTaskList other = (TimerTaskList) d;
        return Long.compare(getExpiration(),other.getExpiration());
    }
}
4.時間輪結構

時間輪TimeWheel代表一層時間輪,即第一層時間輪表示20ms楣富,主要功能是添加任務和凿掂,驅動時間輪向前。

public class TimingWheel {

    private Long tickMs;  //每一個槽表示的時間范圍
    private Integer wheelSize; // 時間輪大小纹蝴,即每一層時間輪的大小
    private Long startMs; // 系統(tǒng)的啟動時間
    private AtomicInteger taskCounter;  // 當前層任務數(shù)
    private DelayQueue<TimerTaskList> queue; //延遲隊列庄萎,用于從隊列取每個任務列表

    private Long interval; //每一層時間輪代表的時間
    private List<TimerTaskList> buckets;  // 每一層的每一個槽中的時間任務列表
    private Long currentTime;  // 修正后的系統(tǒng)啟動時間
  
    private TimingWheel overflowWheel = null;  // 上一層時間輪

    public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.startMs = startMs;
        this.taskCounter = taskCounter;
        this.queue = queue;
        interval = tickMs * wheelSize;
        currentTime = startMs - (startMs % tickMs); //當前時間,往前推

        buckets = new ArrayList<>(wheelSize);
        for(int i = 0;i < wheelSize;i++) {
            buckets.add(new TimerTaskList(taskCounter));  //創(chuàng)建每一個槽中的列表
        }
    }

    // 創(chuàng)建上層時間輪
    public synchronized void addOverflowWheel() {
        if(overflowWheel == null) {
            overflowWheel = new TimingWheel(
                    interval,  // 此處interval即表示上一層時間輪表示的范圍
                    wheelSize,
                    currentTime,
                    taskCounter,
                    queue
            );
        }
    }
  
  // 添加任務
    public boolean add(TimerTaskList.TimerTaskEntry timerTaskEntry) {
        Long expiration = timerTaskEntry.expirationMs;
       
        Long thisTime = currentTime + tickMs;
        // 任務是否已經(jīng)取消塘安,取消則返回
        if(timerTaskEntry.cancel()) {
            return false;
        // 當前任務是否已經(jīng)過期糠涛,如果過期則返回false,要立即執(zhí)行
        }else if(expiration < currentTime + tickMs) {
            return false;
        // 判斷當前任務能否在添加到當前時間輪
        }else if(expiration < currentTime + interval) {
           
            Long virtualId = expiration / tickMs;  
            // 計算當前任務要分配在哪個槽中
            int whereBucket = (int) (virtualId % wheelSize);
            TimerTaskList bucket = buckets.get((int)(virtualId % wheelSize));

            bucket.add(timerTaskEntry);

            long bucketExpiration = virtualId * tickMs;
            //更新槽的過期時間兼犯,添加入延遲隊列
            if(bucket.setExpiration(virtualId * tickMs)) {
                queue.offer(bucket);
            }
            return true;
        }else {
          //添加任務到高層時間輪
            if(overflowWheel == null) addOverflowWheel();
            return overflowWheel.add(timerTaskEntry);
        }
    }

    // 向前驅動時間
    public void advanceClock(Long timeMs) {
        if(timeMs >= currentTime + tickMs) {
            currentTime = timeMs - (timeMs % tickMs);

            if(overflowWheel != null) {
                overflowWheel.advanceClock(currentTime);
            }
        }
    }
}
5. 時間輪接口
  • kafka中提供了Timer接口忍捡,用于對外提供調(diào)用集漾,分別是Timer#add添加任務;Timer#advanceClock驅動時間砸脊; Timer#size時間輪中總任務數(shù)具篇;Timer#shutdown停止時間輪
public interface Timer {
    void add(TimerTask timerTask);
    boolean advanceClock(Long timeoutMs) throws Exception;
    int size();
    void shutdown();
}
  • Timer的實現(xiàn)類是SystemTimer
public class SystemTimer implements Timer {

    private String executorName;
    private Long tickMs = 1L;
    private Integer wheelSize = 20;
    private Long startMs = System.currentTimeMillis();
    //用來執(zhí)行TimerTask任務
    private ExecutorService taskExecutor =
            Executors.newFixedThreadPool(1,(runnable) -> {
                Thread thread = new Thread(runnable);
                thread.setName("executor-" + executorName);
                thread.setDaemon(false);
                return thread;
            });
    //延遲隊列
    private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
    private AtomicInteger taskCounter = new AtomicInteger(0);
    private TimingWheel timingWheel;

    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    private ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    // 用來執(zhí)行時間輪的重新排列,及上一個槽中的任務列表被執(zhí)行后凌埂,后面的槽中的任務列表移動
    private Consumer<TimerTaskEntry> reinsert = (timerTaskEntry) -> addTimerTaskEntry(timerTaskEntry);

    public SystemTimer(String executorName, Long tickMs, Integer wheelSize, Long startMs) {
        this.executorName = executorName;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.startMs = startMs;
        this.timingWheel = new TimingWheel(
                tickMs,
                wheelSize,
                startMs,
                taskCounter,
                delayQueue
        );
    }

    // 可能會多個線程操作栽连,所以需要加鎖
    @Override
    public void add(TimerTask timerTask) {
        readLock.lock();
        try{
            addTimerTaskEntry(new TimerTaskEntry(timerTask,timerTask.delayMs + System.currentTimeMillis()));
        }finally {
            readLock.unlock();
        }
    }

    private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {      // 往時間輪添加任務
        if(!timingWheel.add(timerTaskEntry)) {
            // 返回false并且任務未取消,則提交當前任務立即執(zhí)行侨舆。
            if(!timerTaskEntry.cancel()) {
                taskExecutor.submit(timerTaskEntry.timerTask);
            }
        }
    }


    // 向前驅動時間輪
    @Override
    public boolean advanceClock(Long timeoutMs) throws Exception{
        // 使用阻塞隊列獲取任務
        TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
        if(bucket != null) {
            writeLock.lock();
            try{
                while(bucket != null) {
                    timingWheel.advanceClock(bucket.getExpiration());
                    // 驅動時間后,需要移動TimerTaskList到上一個槽或者從上一層移動到本層
                    bucket.flush(reinsert);
                    bucket = delayQueue.poll();
                }
            }finally {
                writeLock.unlock();
            }
            return true;
        }else {
            return false;
        }
    }

    @Override
    public int size() {
        return taskCounter.get();
    }

    @Override
    public void shutdown() {
        taskExecutor.shutdown();
    }
}

  1. 測試
public class SystemTimerTest {
    //驅動時間輪向前的線程
    private static ExecutorService executorService = Executors.newFixedThreadPool(1);
    public static  SystemTimer timer = new SystemTimer("test",1000L,5,System.currentTimeMillis());


    public static void runTask() throws Exception {
        for(int i = 0;i < 10000;i+= 1000) {
            // 添加任務绢陌,每個任務間隔1s
            timer.add(new TimerTask(i) {
                @Override
                public void run() {
                    System.out.println("運行testTask的時間: " + System.currentTimeMillis());
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        runTask();

        executorService.submit(() -> {
            while(true) {
                try {
                    // 驅動時間輪線程間隔0.2s驅動
                    timer.advanceClock(200L);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });


        Thread.sleep(1000000);
        timer.shutdown();
        executorService.shutdown();
    }
}
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末挨下,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子脐湾,更是在濱河造成了極大的恐慌臭笆,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秤掌,死亡現(xiàn)場離奇詭異愁铺,居然都是意外死亡,警方通過查閱死者的電腦和手機闻鉴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進店門茵乱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人孟岛,你說我怎么就攤上這事瓶竭。” “怎么了渠羞?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵斤贰,是天一觀的道長。 經(jīng)常有香客問我次询,道長荧恍,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任屯吊,我火速辦了婚禮送巡,結果婚禮上,老公的妹妹穿的比我還像新娘雌芽。我一直安慰自己授艰,他們只是感情好,可當我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布世落。 她就那樣靜靜地躺著淮腾,像睡著了一般糟需。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谷朝,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天洲押,我揣著相機與錄音,去河邊找鬼圆凰。 笑死杈帐,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的专钉。 我是一名探鬼主播挑童,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼跃须!你這毒婦竟也來了站叼?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤菇民,失蹤者是張志新(化名)和其女友劉穎尽楔,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體第练,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡阔馋,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了娇掏。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呕寝。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖驹碍,靈堂內(nèi)的尸體忽然破棺而出壁涎,到底是詐尸還是另有隱情,我是刑警寧澤志秃,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布怔球,位于F島的核電站,受9級特大地震影響浮还,放射性物質發(fā)生泄漏竟坛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一钧舌、第九天 我趴在偏房一處隱蔽的房頂上張望担汤。 院中可真熱鬧,春花似錦洼冻、人聲如沸崭歧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽率碾。三九已至叔营,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間所宰,已是汗流浹背绒尊。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留仔粥,地道東北人婴谱。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像躯泰,于是被迫代替她去往敵國和親谭羔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 時間輪:高效延時隊列(定時器)麦向。 Kafka中時間輪(TimingWheel)存儲定時任務環(huán)形隊列口糕,底層數(shù)組實現(xiàn),...
    hedgehog1112閱讀 1,228評論 0 8
  • 寫在前面 kafka是一個分布式消息中間件磕蛇,其高可用高吞吐的特點是大數(shù)據(jù)領域首選的消息中間件,Kafka是分布式消...
    Java旺閱讀 427評論 0 1
  • [TOC]在kafka中十办,有許多請求并不是立即返回秀撇,而且處理完一些異步操作或者等待某些條件達成后才返回,這些請求一...
    tracy_668閱讀 2,209評論 0 1
  • 夜鶯2517閱讀 127,720評論 1 9
  • 版本:ios 1.2.1 亮點: 1.app角標可以實時更新天氣溫度或選擇空氣質量向族,建議處女座就不要選了呵燕,不然老想...
    我就是沉沉閱讀 6,898評論 1 6