quartz集群調(diào)度任務(wù)負(fù)載不均

問題:

多臺服務(wù)集群模式執(zhí)行調(diào)度周期短的任務(wù)不能均衡負(fù)載。

原因:

下面的run()方法中寡润,不考慮halted和siglock的情況下1)只有if (triggers != null && !triggers.isEmpty())條件滿足的情況下瑞躺,while循環(huán)才會(huì)進(jìn)入暫停狀態(tài)sigLock.wait(timeUntilContinue);2)并且獲取待執(zhí)行trigger時(shí)最終執(zhí)行的sql以及傳入的條件如下,即next fire time范圍為:
\color{red}{misfireTime <= X<=now + idleWaitTime+timeWindow}

QuartzSchedulerThread.java
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
JobStoreSupport.java 
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

protected long getMisfireTime() {
        long misfireTime = System.currentTimeMillis();
        if (getMisfireThreshold() > 0) {
            misfireTime -= getMisfireThreshold();
        }
        return (misfireTime > 0) ? misfireTime : 0;
    }
public long getMisfireThreshold() {
        return misfireThreshold;
    }

String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT "
        + COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
        + COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
        + TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
        + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
        + " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? " 
        + "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " != -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
        + "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";

與查詢條件相關(guān)的參數(shù):

Variable Property Name Default Value
idleWaitTime org.quartz.scheduler.idleWaitTime 30000L
maxBatchSize org.quartz.scheduler.batchTriggerAcquisitionMaxCount 1
batchTimeWindow org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow 0
misfireThreshold 60000L; // one minute

所以只有run方法中的while暫停竹观,不去獲取新的trigger時(shí)其他服務(wù)才有機(jī)會(huì)拿到執(zhí)行任務(wù)豫喧,以達(dá)到負(fù)載均衡。

QuartzSchedulerThread.java
public void run() {
        int acquiresFailed = 0;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }

                        // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    } catch (RuntimeException e) {
                        if (acquiresFailed == 0) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused,
                            // blocked, or other similar occurrences that prevent it being
                            // fired at this time...  or if the scheduler was shutdown (halted)
                            if (bndle == null) {
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

                        }

                        continue; // while (!halted)
                    }
                } else { // if(availThreadCount > 0)
                    // should never happen, if threadPool.blockForAvailableThreads() follows contract
                    continue; // while (!halted)
                }

                long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }

            } catch(RuntimeException re) {
                getLog().error("Runtime error occurred in main trigger firing loop.", re);
            }
        } // while (!halted)

        // drop references to scheduler stuff to aid garbage collection...
        qs = null;
        qsRsrcs = null;
    }

其他:

https://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/tutorial-lesson-11.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末缀雳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子梢睛,更是在濱河造成了極大的恐慌肥印,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,470評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绝葡,死亡現(xiàn)場離奇詭異深碱,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)藏畅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評論 3 392
  • 文/潘曉璐 我一進(jìn)店門敷硅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人愉阎,你說我怎么就攤上這事绞蹦。” “怎么了榜旦?”我有些...
    開封第一講書人閱讀 162,577評論 0 353
  • 文/不壞的土叔 我叫張陵坦辟,是天一觀的道長。 經(jīng)常有香客問我章办,道長锉走,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,176評論 1 292
  • 正文 為了忘掉前任藕届,我火速辦了婚禮挪蹭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘休偶。我一直安慰自己梁厉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著词顾,像睡著了一般八秃。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上肉盹,一...
    開封第一講書人閱讀 51,155評論 1 299
  • 那天昔驱,我揣著相機(jī)與錄音,去河邊找鬼上忍。 笑死骤肛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的窍蓝。 我是一名探鬼主播腋颠,決...
    沈念sama閱讀 40,041評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼吓笙!你這毒婦竟也來了淑玫?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,903評論 0 274
  • 序言:老撾萬榮一對情侶失蹤面睛,失蹤者是張志新(化名)和其女友劉穎混移,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體侮穿,經(jīng)...
    沈念sama閱讀 45,319評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡歌径,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了亲茅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片回铛。...
    茶點(diǎn)故事閱讀 39,703評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖克锣,靈堂內(nèi)的尸體忽然破棺而出茵肃,到底是詐尸還是另有隱情,我是刑警寧澤袭祟,帶...
    沈念sama閱讀 35,417評論 5 343
  • 正文 年R本政府宣布验残,位于F島的核電站,受9級特大地震影響巾乳,放射性物質(zhì)發(fā)生泄漏您没。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評論 3 325
  • 文/蒙蒙 一胆绊、第九天 我趴在偏房一處隱蔽的房頂上張望氨鹏。 院中可真熱鬧,春花似錦压状、人聲如沸仆抵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽镣丑。三九已至舔糖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間莺匠,已是汗流浹背金吗。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留慨蛙,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,711評論 2 368
  • 正文 我出身青樓纪挎,卻偏偏與公主長得像期贫,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子异袄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 一 問題背景公司系統(tǒng)中每天有大量的后臺任務(wù)需要調(diào)度執(zhí)行通砍,如構(gòu)建索引、統(tǒng)計(jì)報(bào)表烤蜕、周期同步數(shù)據(jù)等等封孙,要求任務(wù)調(diào)度系統(tǒng)具...
    七佰閱讀 1,707評論 0 2
  • Quartz原理解密 Author: DoraeDate:2018年7月17日15:55:02轉(zhuǎn)載請注明出處 由于...
    Dorae132閱讀 1,520評論 0 3
  • 在微服務(wù)環(huán)境下,定時(shí)任務(wù)也需要獨(dú)立為一個(gè)服務(wù)讽营。這里使用spring+quartz搭建定時(shí)任務(wù)開發(fā)環(huán)境虎忌。 在Co...
    天熱蚊子多閱讀 2,695評論 1 6
  • 原文:http://tech.meituan.com/mt-crm-quartz.html 一、問題背景 美團(tuán)CR...
    九都散人閱讀 4,354評論 2 46
  • 哎橱鹏,今天跟蔡婉聊了下天膜蠢。心里面突然間就平衡了。就在這個(gè)周末莉兰,我沒有讀一頁書挑围,也沒有寫多少字√腔模可能是前期的讀書太過于...
    江山吳閱讀 165評論 0 1