背景
APScheduler是一個非常好用的調(diào)度平臺,不過目前所有Scheduler的JOB信息都無法通過可視化的方式展示,只能通過后臺日志來查看調(diào)度信息哥捕,對于管理上非常不便喳坠。
但是APScheduler非常的強大,已經(jīng)預(yù)留的event功能可以幫助來實現(xiàn)此功能例朱,對于APScheduler原理還不太理解的話孝情,可以參考之前的一篇文章Python定時庫APScheduler原理及用法
在使用Flask進(jìn)行管理后,通過Flask-APScheduler插件來實現(xiàn)對APScheduler的管理以及動態(tài)增刪JOB的接口實現(xiàn)洒嗤,以此完成對APScheduler的全方位管理箫荡。
目的
本文的目的主要有兩部分功能塊,第一部分是利用APScheduler的event機制來實現(xiàn)以下兩個功能并進(jìn)行可視化查看
- 將APScheduler中所有添加的JOB進(jìn)行狀態(tài)跟蹤
- APScheduler中每個JOB的生命周期進(jìn)行跟蹤
第二部分是在Flask框架上構(gòu)建的管理平臺上集成Flask-APScheduler插件渔隶,完成對APScheduler的管理以及動態(tài)增刪JOB的接口實現(xiàn)羔挡。
實現(xiàn)
集成Flask-APScheduler插件完成APScheduler的動態(tài)管理
將APScheduler集成到Flask中
config_name = os.getenv('FLASK_CONFIG') or 'default'
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 初始化Sqlarchemy
db.app = app
db.init_app(app)
# 初始化 flask_apscheduler,將scheduler嵌入到flask管理间唉,本地在flask_apscheduler插件中增加add_listener監(jiān)聽所有的job生命周期
flask_apscheduler = CustomAPScheduler(db.session, app=app)
# 啟動apscheduler
flask_apscheduler.start()
配置Flask-APScheduler開啟對外接口
class Config:
# apscheduler默認(rèn)的jobstore
SCHEDULER_JOBSTORES = {}
# flask_apscheduler是否對外提供接口
SCHEDULER_API_ENABLED = True
Flask-APScheduler提供的api如下
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
啟動后绞灼,通過提供的接口進(jìn)行動態(tài)管理
直接動態(tài)調(diào)用接口添加, 具體的參數(shù)需要到apscheduler的源碼進(jìn)行查看
添加JOB舉例說明(add_job)
請求添加接口:http://127.0.0.1:5000/scheduler/jobs
請求方法:POST
請求header:
{
"Content-Type": "application/json"
}
請求body:
{
"id": "test_add_job",
"name":"管理平臺添加job測試",
"func": "app:jobs.test.test_job", # 這里就是模塊:函數(shù)呈野,本地定義的方法保證可以import
"trigger": "date" # 觸發(fā)器為指定時間低矮,這里時間沒有指定,就是立馬執(zhí)行
}
返回結(jié)果:
{
"id": "test_add_job",
"name": "管理平臺添加job測試",
"func": "app:jobs.test.test_job",
"args": [],
"kwargs": {},
"trigger": "date",
"run_date": "2021-03-05T15:17:10.107210+08:00",
"misfire_grace_time": 1,
"max_instances": 1,
"next_run_time": "2021-03-05T15:17:10.107210+08:00"
}
充分利用APScheduler的Event機制
class CustomAPScheduler(APScheduler):
# scheduler事件映射本地狀態(tài)
STATUS_MAPPING = {
EVENT_JOB_ADDED: 0,
EVENT_JOB_MODIFIED: 1,
EVENT_JOB_SUBMITTED: 2,
EVENT_JOB_EXECUTED: 3,
EVENT_JOB_REMOVED: 4,
EVENT_JOB_ERROR: 5,
EVENT_JOB_MISSED: 6,
EVENT_ALL_JOBS_REMOVED: 7,
EVENT_JOB_MAX_INSTANCES: 8
}
def __init__(self, session, scheduler=None, app=None):
super(CustomAPScheduler, self).__init__(scheduler, app)
self.session = session
def listener_all_job(self, event):
"""
監(jiān)控job的生命周期被冒,可視化監(jiān)控军掂,并且可增加后續(xù)的沒有觸發(fā)任務(wù)等監(jiān)控
添加到線程做處理
:param event:
:return:
"""
job_id = None
args = []
if event.code != EVENT_ALL_JOBS_REMOVED:
job_id = event.job_id
if job_id:
jobstore_alias = event.jobstore
job = self.scheduler.get_job(job_id, jobstore_alias)
if job:
name = job.name
func = str(job.func_ref)
trigger = job.trigger if isinstance(job.trigger, str) else str(job.trigger).split("[")[0]
next_run_time = str(job.next_run_time).split(".")[0]
else:
name = None
func = None
trigger = None
next_run_time = None
args = [name, func, trigger, next_run_time]
traceback = event.traceback if hasattr(event, 'traceback') else "",
args.append(traceback)
t = threading.Thread(target=self.handle_listener_all_job, args=[event.code, job_id, *args])
t.start()
t.join()
def handle_listener_all_job(self, event_type, *args):
"""
實際處理IO操作
如何處理一個job_id重復(fù)使用的問題,采用本地id自增昨悼,如果真有job_id重復(fù)的情況蝗锥,則認(rèn)為指定的是最后一個job_id對應(yīng)的任務(wù)
"""
try:
if event_type == EVENT_JOB_ADDED:
# 添加任務(wù)定義表
job = ApschedulerJobInfo()
job.job_id = args[0]
job.job_name = args[1]
job.job_func = args[2]
job.job_trigger = args[3]
job.job_next_run_time = args[4]
job.job_status = 0
self.session.add(job)
self.session.flush()
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
elif event_type == EVENT_JOB_MODIFIED:
# 修改job[取數(shù)據(jù)庫表中job_id最后一個進(jìn)行修改]
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_name = args[1]
job.job_func = args[2]
job.job_trigger = args[3]
job.job_next_run_time = args[4]
job.job_status = 0
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_SUBMITTED:
# 提交job執(zhí)行
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_EXECUTED:
# 執(zhí)行job
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 1
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_REMOVED:
# 刪除job
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 5
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_ERROR:
# 執(zhí)行job出錯
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 2
job.job_traceback = args[5]
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_JOB_MISSED:
# job執(zhí)行錯過
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 3
job.job_traceback = args[5]
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
elif event_type == EVENT_ALL_JOBS_REMOVED:
# 刪除所有job
all_jobs = ApschedulerJobInfo.query.filter(ApschedulerJobInfo.job_status == 0).all()
for job in all_jobs:
job.job_status = 6
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
elif event_type == EVENT_JOB_MAX_INSTANCES:
# job超過最大實例
job = ApschedulerJobInfo.query.order_by(ApschedulerJobInfo.id.desc()).filter(
ApschedulerJobInfo.job_id == args[0]).first()
if job:
# 更新JOB表
job.job_status = 4
job.job_traceback = args[5]
# 增加任務(wù)事件表
job_event = ApschedulerJobEventInfo()
job_event.job_info_id = job.id
job_event.event = self.STATUS_MAPPING[event_type]
self.session.add(job_event)
self.session.commit()
else:
LOGGER.warning("指定的job本地不存在{}".format(args))
except:
LOGGER.exception("執(zhí)行任務(wù)異常")
def init_app(self, app):
super(CustomAPScheduler, self).init_app(app)
# 增加監(jiān)聽函數(shù),監(jiān)聽所有job的生命周期
self.add_listener(self.listener_all_job,
EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_MAX_INSTANCES | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | EVENT_JOB_SUBMITTED)
收集完成數(shù)據(jù)后進(jìn)行展示及管理
-
JOB管理
JOB管理 -
JOB事件執(zhí)行明細(xì)
JOB事件執(zhí)行明細(xì)
關(guān)注公眾號“戰(zhàn)渣渣”率触,回復(fù)“調(diào)度”獲得源碼