APScheduler管理及監(jiān)控平臺

背景

APScheduler是一個非常好用的調(diào)度平臺,不過目前所有Scheduler的JOB信息都無法通過可視化的方式展示,只能通過后臺日志來查看調(diào)度信息哥捕,對于管理上非常不便喳坠。

但是APScheduler非常的強大,已經(jīng)預(yù)留的event功能可以幫助來實現(xiàn)此功能例朱,對于APScheduler原理還不太理解的話孝情,可以參考之前的一篇文章Python定時庫APScheduler原理及用法

在使用Flask進(jìn)行管理后,通過Flask-APScheduler插件來實現(xiàn)對APScheduler的管理以及動態(tài)增刪JOB的接口實現(xiàn)洒嗤,以此完成對APScheduler的全方位管理箫荡。

目的

本文的目的主要有兩部分功能塊,第一部分是利用APScheduler的event機制來實現(xiàn)以下兩個功能并進(jìn)行可視化查看

  • 將APScheduler中所有添加的JOB進(jìn)行狀態(tài)跟蹤
  • APScheduler中每個JOB的生命周期進(jìn)行跟蹤

第二部分是在Flask框架上構(gòu)建的管理平臺上集成Flask-APScheduler插件渔隶,完成對APScheduler的管理以及動態(tài)增刪JOB的接口實現(xiàn)羔挡。

實現(xiàn)

集成Flask-APScheduler插件完成APScheduler的動態(tài)管理

將APScheduler集成到Flask中

config_name = os.getenv('FLASK_CONFIG') or 'default'
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 初始化Sqlarchemy
db.app = app
db.init_app(app)
# 初始化 flask_apscheduler,將scheduler嵌入到flask管理间唉,本地在flask_apscheduler插件中增加add_listener監(jiān)聽所有的job生命周期
flask_apscheduler = CustomAPScheduler(db.session, app=app)
# 啟動apscheduler
flask_apscheduler.start()

配置Flask-APScheduler開啟對外接口

class Config:
    # apscheduler默認(rèn)的jobstore
    SCHEDULER_JOBSTORES = {}
    # flask_apscheduler是否對外提供接口
    SCHEDULER_API_ENABLED = True

Flask-APScheduler提供的api如下

def _load_api(self):
    """
    Add the routes for the scheduler API.
    """
    self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
    self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
    self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
    self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
    self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
    self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
    self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
    self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
    self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')

啟動后绞灼,通過提供的接口進(jìn)行動態(tài)管理

直接動態(tài)調(diào)用接口添加, 具體的參數(shù)需要到apscheduler的源碼進(jìn)行查看

添加JOB舉例說明(add_job)

請求添加接口:http://127.0.0.1:5000/scheduler/jobs
請求方法:POST
請求header:
{
    "Content-Type": "application/json"
}
請求body:
{
    "id": "test_add_job",
    "name":"管理平臺添加job測試",
    "func": "app:jobs.test.test_job", # 這里就是模塊:函數(shù)呈野,本地定義的方法保證可以import
    "trigger": "date" # 觸發(fā)器為指定時間低矮,這里時間沒有指定,就是立馬執(zhí)行
}
返回結(jié)果:
{
    "id": "test_add_job",
    "name": "管理平臺添加job測試",
    "func": "app:jobs.test.test_job",
    "args": [],
    "kwargs": {},
    "trigger": "date",
    "run_date": "2021-03-05T15:17:10.107210+08:00",
    "misfire_grace_time": 1,
    "max_instances": 1,
    "next_run_time": "2021-03-05T15:17:10.107210+08:00"
}

充分利用APScheduler的Event機制

class CustomAPScheduler(APScheduler):
    # scheduler事件映射本地狀態(tài)
    STATUS_MAPPING = {
        EVENT_JOB_ADDED: 0,
        EVENT_JOB_MODIFIED: 1,
        EVENT_JOB_SUBMITTED: 2,
        EVENT_JOB_EXECUTED: 3,
        EVENT_JOB_REMOVED: 4,
        EVENT_JOB_ERROR: 5,
        EVENT_JOB_MISSED: 6,
        EVENT_ALL_JOBS_REMOVED: 7,
        EVENT_JOB_MAX_INSTANCES: 8
    }

    def __init__(self, session, scheduler=None, app=None):
        super(CustomAPScheduler, self).__init__(scheduler, app)
        self.session = session

    def listener_all_job(self, event):
        """
        監(jiān)控job的生命周期被冒,可視化監(jiān)控军掂,并且可增加后續(xù)的沒有觸發(fā)任務(wù)等監(jiān)控
        添加到線程做處理
        :param event:
        :return:
        """
        job_id = None
        args = []
        if event.code != EVENT_ALL_JOBS_REMOVED:
            job_id = event.job_id
        if job_id:
            jobstore_alias = event.jobstore
            job = self.scheduler.get_job(job_id, jobstore_alias)
            if job:
                name = job.name
                func = str(job.func_ref)
                trigger = job.trigger if isinstance(job.trigger, str) else str(job.trigger).split("[")[0]
                next_run_time = str(job.next_run_time).split(".")[0]
            else:
                name = None
                func = None
                trigger = None
                next_run_time = None
            args = [name, func, trigger, next_run_time]
        traceback = event.traceback if hasattr(event, 'traceback') else "",
        args.append(traceback)
        t = threading.Thread(target=self.handle_listener_all_job, args=[event.code, job_id, *args])
        t.start()
        t.join()

    def handle_listener_all_job(self, event_type, *args):
        """
        實際處理IO操作
        如何處理一個job_id重復(fù)使用的問題,采用本地id自增昨悼,如果真有job_id重復(fù)的情況蝗锥,則認(rèn)為指定的是最后一個job_id對應(yīng)的任務(wù)
        """
        try:
            if event_type == EVENT_JOB_ADDED:
                # 添加任務(wù)定義表
                job = ApschedulerJobInfo()
                job.job_id = args[0]
                job.job_name = args[1]
                job.job_func = args[2]
                job.job_trigger = args[3]
                job.job_next_run_time = args[4]
                job.job_status = 0
                self.session.add(job)
                self.session.flush()
                # 增加任務(wù)事件表
                job_event = ApschedulerJobEventInfo()
                job_event.job_info_id = job.id
                job_event.event = self.STATUS_MAPPING[event_type]
                self.session.add(job_event)
                self.session.commit()
            elif event_type == EVENT_JOB_MODIFIED:
                # 修改job[取數(shù)據(jù)庫表中job_id最后一個進(jìn)行修改]
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_name = args[1]
                    job.job_func = args[2]
                    job.job_trigger = args[3]
                    job.job_next_run_time = args[4]
                    job.job_status = 0

                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_SUBMITTED:
                # 提交job執(zhí)行
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_EXECUTED:
                # 執(zhí)行job
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 1

                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_REMOVED:
                # 刪除job
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 5

                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_ERROR:
                # 執(zhí)行job出錯
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 2
                    job.job_traceback = args[5]
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_JOB_MISSED:
                # job執(zhí)行錯過
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 3
                    job.job_traceback = args[5]
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
            elif event_type == EVENT_ALL_JOBS_REMOVED:
                # 刪除所有job
                all_jobs = ApschedulerJobInfo.query.filter(ApschedulerJobInfo.job_status == 0).all()
                for job in all_jobs:
                    job.job_status = 6
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
            elif event_type == EVENT_JOB_MAX_INSTANCES:
                # job超過最大實例
                job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
                    ApschedulerJobInfo.job_id == args[0]).first()
                if job:
                    # 更新JOB表
                    job.job_status = 4
                    job.job_traceback = args[5]
                    # 增加任務(wù)事件表
                    job_event = ApschedulerJobEventInfo()
                    job_event.job_info_id = job.id
                    job_event.event = self.STATUS_MAPPING[event_type]
                    self.session.add(job_event)
                    self.session.commit()
                else:
                    LOGGER.warning("指定的job本地不存在{}".format(args))
        except:
            LOGGER.exception("執(zhí)行任務(wù)異常")

    def init_app(self, app):
        super(CustomAPScheduler, self).init_app(app)

        # 增加監(jiān)聽函數(shù),監(jiān)聽所有job的生命周期
        self.add_listener(self.listener_all_job,
                          EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_SUBMITTED)

收集完成數(shù)據(jù)后進(jìn)行展示及管理

  • JOB管理


    JOB管理
  • JOB事件執(zhí)行明細(xì)


    JOB事件執(zhí)行明細(xì)

關(guān)注公眾號“戰(zhàn)渣渣”率触,回復(fù)“調(diào)度”獲得源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末终议,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子葱蝗,更是在濱河造成了極大的恐慌穴张,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,681評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件两曼,死亡現(xiàn)場離奇詭異皂甘,居然都是意外死亡,警方通過查閱死者的電腦和手機合愈,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評論 3 399
  • 文/潘曉璐 我一進(jìn)店門叮贩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來击狮,“玉大人佛析,你說我怎么就攤上這事”肱睿” “怎么了寸莫?”我有些...
    開封第一講書人閱讀 169,421評論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長档冬。 經(jīng)常有香客問我膘茎,道長桃纯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,114評論 1 300
  • 正文 為了忘掉前任披坏,我火速辦了婚禮态坦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘棒拂。我一直安慰自己伞梯,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,116評論 6 398
  • 文/花漫 我一把揭開白布帚屉。 她就那樣靜靜地躺著谜诫,像睡著了一般。 火紅的嫁衣襯著肌膚如雪攻旦。 梳的紋絲不亂的頭發(fā)上喻旷,一...
    開封第一講書人閱讀 52,713評論 1 312
  • 那天,我揣著相機與錄音牢屋,去河邊找鬼且预。 笑死,一個胖子當(dāng)著我的面吹牛伟阔,可吹牛的內(nèi)容都是我干的辣之。 我是一名探鬼主播,決...
    沈念sama閱讀 41,170評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼皱炉,長吁一口氣:“原來是場噩夢啊……” “哼怀估!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起合搅,我...
    開封第一講書人閱讀 40,116評論 0 277
  • 序言:老撾萬榮一對情侶失蹤多搀,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后灾部,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體康铭,經(jīng)...
    沈念sama閱讀 46,651評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,714評論 3 342
  • 正文 我和宋清朗相戀三年赌髓,在試婚紗的時候發(fā)現(xiàn)自己被綠了从藤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,865評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡锁蠕,死狀恐怖夷野,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情荣倾,我是刑警寧澤悯搔,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站舌仍,受9級特大地震影響妒貌,放射性物質(zhì)發(fā)生泄漏通危。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,211評論 3 336
  • 文/蒙蒙 一灌曙、第九天 我趴在偏房一處隱蔽的房頂上張望菊碟。 院中可真熱鬧,春花似錦在刺、人聲如沸框沟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽忍燥。三九已至,卻和暖如春隙姿,著一層夾襖步出監(jiān)牢的瞬間梅垄,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評論 1 274
  • 我被黑心中介騙來泰國打工输玷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留队丝,地道東北人。 一個月前我還...
    沈念sama閱讀 49,299評論 3 379
  • 正文 我出身青樓欲鹏,卻偏偏與公主長得像机久,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子赔嚎,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,870評論 2 361

推薦閱讀更多精彩內(nèi)容