背景
最近有個需求,需要實現(xiàn)一個定時或定期任務(wù)的功能惜傲,需要實現(xiàn)每月洽故、每日、每時盗誊、一次性等需求时甚,必須是輕量級不依賴其它額外組件隘弊,并能支持動態(tài)添加任務(wù)。由于當(dāng)前任務(wù)信息保存在集群 ETCD 數(shù)據(jù)庫中撞秋,因此對任務(wù)持久化要求不高长捧,每次重啟都直接讀取 ETCD 任務(wù)信息嚣鄙,為了后面擴(kuò)展吻贿,還需要添加任務(wù)持久化功能。
定時任務(wù)庫對比
根據(jù)上面需求哑子,從社區(qū)中找到了幾個 Python 好用的任務(wù)調(diào)度庫舅列。有以下幾個庫:
- schedule:Python job scheduling for humans. 輕量級,無需配置的作業(yè)調(diào)度庫
- python-crontab: 針對系統(tǒng) Cron 操作 crontab 文件的作業(yè)調(diào)度庫
- Apscheduler:一個高級的 Python 任務(wù)調(diào)度庫
- Celery: 是一個簡單卧蜓,靈活帐要,可靠的分布式系統(tǒng),用于處理大量消息弥奸,同時為操作提供維護(hù)此類系統(tǒng)所需的工具, 也可用于任務(wù)調(diào)度
優(yōu)缺點(diǎn)對比:
- schedule 優(yōu)點(diǎn)是簡單榨惠、輕量級、無需配置盛霎、語法簡單赠橙,缺點(diǎn)是阻塞式調(diào)用、無法動態(tài)添加或刪除任務(wù)
- Python-crontab 優(yōu)點(diǎn)是針對于系統(tǒng) crontab 操作愤炸,支持定時期揪、定期任務(wù),能夠動態(tài)添加任務(wù)规个,不能實現(xiàn)一次性任務(wù)需求
- Apscheduler 優(yōu)點(diǎn)支持定時凤薛、定期、一次性任務(wù)诞仓,支持任務(wù)持久化及動態(tài)添加缤苫、支持配置各種持久化存儲源(如 redis、MongoDB)墅拭,支持接入到各種異步框架(如 gevent榨馁、asyncio、tornado)
- Celery 支持配置定期任務(wù)帜矾、支持 crontab 模式配置翼虫,不支持一次性定時任務(wù)
schedule 庫
人類的Python 任務(wù)調(diào)度庫,和 requests 庫一樣 for humans. 這個庫也是最輕量級的一個任務(wù)調(diào)度庫屡萤,schedule 允許用戶使用簡單珍剑、人性化的語法以預(yù)定的時間間隔定期運(yùn)行Python函數(shù)(或其它可調(diào)用函數(shù))。
直接使用 pip install schedule
進(jìn)行安裝使用死陆,下面來看看官網(wǎng)給的示例:
import schedule
import time
# 定義你要周期運(yùn)行的函數(shù)
def job():
print("I'm working...")
schedule.every(10).minutes.do(job) # 每隔 10 分鐘運(yùn)行一次 job 函數(shù)
schedule.every().hour.do(job) # 每隔 1 小時運(yùn)行一次 job 函數(shù)
schedule.every().day.at("10:30").do(job) # 每天在 10:30 時間點(diǎn)運(yùn)行 job 函數(shù)
schedule.every().monday.do(job) # 每周一 運(yùn)行一次 job 函數(shù)
schedule.every().wednesday.at("13:15").do(job) # 每周三 13:15 時間點(diǎn)運(yùn)行 job 函數(shù)
schedule.every().minute.at(":17").do(job) # 每分鐘的 17 秒時間點(diǎn)運(yùn)行 job 函數(shù)
while True:
schedule.run_pending() # 運(yùn)行所有可以運(yùn)行的任務(wù)
time.sleep(1)
通過上面示例招拙,可以很容易學(xué)會使用 schedule 庫唧瘾,可以設(shè)置秒、分鐘别凤、小時饰序、天、周來運(yùn)行任務(wù)规哪,然后通過一個死循環(huán)求豫,一直不斷地運(yùn)行所有的計劃任務(wù)。
schedule 常見問題
1诉稍、如何并行執(zhí)行任務(wù)蝠嘉?
schedule 是阻塞式的,默認(rèn)情況下杯巨, schedule 按順序執(zhí)行所有的作業(yè)蚤告,不能達(dá)到并行執(zhí)行任務(wù)。如下所示:
import arrow
import schedule
def job1():
print("job1 start time: %s" % arrow.get().format())
time.sleep(2)
print("job1 end time: %s" % arrow.get().format())
def job2():
print("job2 start time: %s" % arrow.get().format())
time.sleep(5)
print("job2 end time: %s" % arrow.get().format())
def job3():
print("job3 start time: %s" % arrow.get().format())
time.sleep(10)
print("job3 end time: %s" % arrow.get().format())
if __name__ == '__main__':
schedule.every(10).seconds.do(job1)
schedule.every(30).seconds.do(job2)
schedule.every(5).to(10).seconds.do(job3)
while True:
schedule.run_pending()
返回部分結(jié)果如下所示服爷,幾個任務(wù)并不是并行開始的杜恰,是安裝時間順序先后開始的:
job3 start time: 2019-06-01 09:27:54+00:00
job3 end time: 2019-06-01 09:28:04+00:00
job1 start time: 2019-06-01 09:28:04+00:00
job1 end time: 2019-06-01 09:28:06+00:00
job3 start time: 2019-06-01 09:28:13+00:00
job3 end time: 2019-06-01 09:28:23+00:00
job2 start time: 2019-06-01 09:28:23+00:00
job2 end time: 2019-06-01 09:28:28+00:00
job1 start time: 2019-06-01 09:28:28+00:00
job1 end time: 2019-06-01 09:28:30+00:00
job3 start time: 2019-06-01 09:28:30+00:00
job3 end time: 2019-06-01 09:28:40+00:00
job1 start time: 2019-06-01 09:28:40+00:00
job1 end time: 2019-06-01 09:28:42+00:00
如果需要實現(xiàn)并行,那么使用多線程方式運(yùn)行任務(wù)仍源,官方給出的并行方案如下:
import threading
import time
import schedule
def job():
print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
job_thread = threading.Thread(target=job_func)
job_thread.start()
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
while 1:
schedule.run_pending()
time.sleep(1)
# 我在項目里也是通過對每個任務(wù)運(yùn)行后臺線程方式, 可以通過 run_daemon_thread 起一個守護(hù)線程方式來達(dá)到動態(tài)
添加任務(wù)的功能心褐,每個任務(wù)最終通過新開線程方式執(zhí)行
import threading
def ensure_schedule():
schedule.every(5).seconds.do(do_some)
def ensure_schedule_2():
schedule.every(10).seconds.do(print_some)
def run_daemon_thread(target, *args, **kwargs):
job_thread = threading.Thread(target=target, args=args, kwargs=kwargs)
job_thread.setDaemon(True)
job_thread.start()
def __start_schedule_deamon():
def schedule_run():
while True:
schedule.run_pending()
time.sleep(1)
t = threading.Thread(target=schedule_run)
t.setDaemon(True)
t.start()
def init_schedule_job():
run_daemon_thread(ensure_schedule)
run_daemon_thread(ensure_schedule_2)
init_schedule_job()
__start_schedule_deamon()
2、如何在不阻塞主線程的情況下連續(xù)運(yùn)行調(diào)度程序镜会?
官方推薦了這個方式檬寂,在單獨(dú)的線程中運(yùn)行調(diào)度程序,如下戳表,在單獨(dú)的線程中運(yùn)行 run_pending 調(diào)度程序桶至。通過 threading 庫的 Event 來實現(xiàn)
# https://github.com/mrhwick/schedule/blob/master/schedule/__init__.py
def run_continuously(self, interval=1):
"""Continuously run, while executing pending jobs at each elapsed
time interval.
@return cease_continuous_run: threading.Event which can be set to
cease continuous run.
Please note that it is *intended behavior that run_continuously()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you set a continuous run interval
of one hour then your job won't be run 60 times at each interval but
only once.
"""
cease_continuous_run = threading.Event()
class ScheduleThread(threading.Thread):
@classmethod
def run(cls):
while not cease_continuous_run.is_set():
self.run_pending()
time.sleep(interval)
continuous_thread = ScheduleThread()
continuous_thread.start()
return cease_continuous_run
3、是否支持時區(qū)
# 官方不計劃支持時區(qū)匾旭,可使用:
討論:https://github.com/dbader/schedule/pull/16
時區(qū)解決:https://github.com/imiric/schedule/tree/feat/timezone
4镣屹、如果我的任務(wù)拋出異常怎么辦?
schedule 不捕獲作業(yè)執(zhí)行期間發(fā)生的異常,因此在任務(wù)執(zhí)行期間的任何異常都會冒泡并中斷調(diào)度的 run_xyz(如 run_pending ) 函數(shù), 也就是 run_pending 中斷退出价涝,導(dǎo)致其它任務(wù)無法執(zhí)行
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
另外一種解決方案:
https://gist.github.com/mplewis/8483f1c24f2d6259aef6
5女蜈、如何設(shè)置只跑一次的任務(wù)?
def job_that_executes_once():
# Do some work ...
return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
6、如何一次取消多個任務(wù)色瘩?
# 通過 tag 函數(shù)給它們添加唯一標(biāo)識符進(jìn)行分組伪窖,取消時通過標(biāo)識符進(jìn)行取消相應(yīng)組的任務(wù)
def greet(name):
print('Hello {}'.format(name))
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
schedule.clear('daily-tasks')
7、如何傳遞參數(shù)給任務(wù)函數(shù)
def greet(name):
print('Hello', name)
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')
schedule 源碼閱讀
使用 0.6.0 版本最新代碼進(jìn)行分析居兆,加起來才 6百多行覆山,實現(xiàn)很簡潔,先來看看當(dāng)前的文件結(jié)構(gòu)
"""
? tree
# 省略部分文件泥栖,代碼全部在 __init__.py 中
.
├── schedule
│ └── __init__.py
├── setup.py
├── test_schedule.py
└── tox.ini
先從一段簡單的代碼分析
"""
import schedule
import time
import arrow
def job():
print(f"time: {arrow.now().format('YYYY-MM-DD HH:mm:ss')}, I'm working...")
if __name__ == '__main__':
print('start schedule...')
schedule.every(10).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
"""
start schedule...
time: 2019-09-27 21:28:14, I'm working...
time: 2019-09-27 21:28:24, I'm working...
time: 2019-09-27 21:28:34, I'm working...
"""
先來看看 run_pending 做了什么事情:
def run_pending():
"""Calls :meth:`run_pending <Scheduler.run_pending>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.run_pending() # what? default_scheduler 又是什么簇宽?
#: Default :class:`Scheduler <Scheduler>` object
default_scheduler = Scheduler()
原來是 Scheduler 類的一個實例勋篓,接下來去 Scheduler 找 run_pending 看看這個方法做了什么操作.
Scheduler 類
class Scheduler(object):
"""
Objects instantiated by the :class:`Scheduler <Scheduler>` are
factories to create jobs, keep record of scheduled jobs and
handle their execution.
"""
def __init__(self):
self.jobs = []
1、初始化函數(shù) __init__
做了什么初始化操作魏割,結(jié)果只是簡單地設(shè)置了一個保存 Job 的列表譬嚣,這也是 schedule 簡潔設(shè)計的一個重要點(diǎn),所有運(yùn)行的任務(wù)都使用一個列表來保存在內(nèi)存中钞它,不提供任何接入不同存儲的配置項拜银。
class Scheduler(object):
"""
Objects instantiated by the :class:`Scheduler <Scheduler>` are
factories to create jobs, keep record of scheduled jobs and
handle their execution.
"""
# 省略部分代碼
def run_pending(self):
"""
運(yùn)行所有計劃運(yùn)行的作業(yè)
Run all jobs that are scheduled to run.
Please note that it is *intended behavior that run_pending()
does not run missed jobs*. For example, if you've registered a job
that should run every minute and you only call run_pending()
in one hour increments then your job won't be run 60 times in
between but only once.
"""
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
self._run_job(job)
2、接著看看 run_pending 方法做了什么须揣,可以看到 run_pending 從 jobs 列表中獲取當(dāng)前可以運(yùn)行的 job 保存在 runnable_jobs盐股, 并對 runnable_jobs 元祖進(jìn)行排序并對每一個 job 執(zhí)行 _run_job 方法, 這個地方可能有人會有疑問钱豁,Job 到底是個什么對象耻卡,為什么可以用 sorted 方法排序,難道 Job 對象實現(xiàn)了一些 Python 黑魔法函數(shù)牲尺,例如 __lt__
等?
def _run_job(self, job):
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)
def cancel_job(self, job):
"""
Delete a scheduled job.
:param job: The job to be unscheduled
"""
try:
self.jobs.remove(job)
except ValueError:
pass
3卵酪、進(jìn)入 _run_job 方法,可以看到這個方法只是負(fù)責(zé)調(diào)用 job 對象的 run 方法運(yùn)行這個 job 而已谤碳,并且根據(jù) run 返回的對象判斷是否取消該任務(wù). 取消一個 job 就是從 jobs 列表中將這個 job 對象去除即可
Job 類
從上面 Schedule 追蹤下面溃卡,最后還是調(diào)用 job 的 run 方法進(jìn)行操作,接下來就繼續(xù)對 Job 追蹤
1蜒简、Job 的初始化方法做了什么
class Job(object):
"""
A periodic job as used by :class:`Scheduler`.
:param interval: A quantity of a certain time unit
:param scheduler: The :class:`Scheduler <Scheduler>` instance that
this job will register itself with once it has
been fully configured in :meth:`Job.do()`.
Every job runs at a given fixed time interval that is defined by:
* a :meth:`time unit <Job.second>`
* a quantity of `time units` defined by `interval`
A job is usually created and returned by :meth:`Scheduler.every`
method, which also defines its `interval`.
"""
def __init__(self, interval, scheduler=None):
self.interval = interval # pause interval * unit between runs(配置運(yùn)行任務(wù)的時間單位數(shù)量瘸羡,如 seconds(10).do(job),10 就是這個時間單位數(shù)量)
self.latest = None # upper limit to the interval( interval 的上限)
self.job_func = None # the job job_func to run (運(yùn)行任務(wù)的函數(shù),通常是我們定義需要定時執(zhí)行的代碼邏輯)
self.unit = None # time units, e.g. 'minutes', 'hours', ... (時間單位)
self.at_time = None # optional time at which this job runs (設(shè)置在某個時間點(diǎn)運(yùn)行)
self.last_run = None # datetime of the last run (上次執(zhí)行的時間)
self.next_run = None # datetime of the next run (下次執(zhí)行的時間)
self.period = None # timedelta between runs, only valid for
self.start_day = None # Specific day of the week to start on (指定一周中的第幾天運(yùn)行)
self.tags = set() # unique set of tags for the job (Job 的唯一tag 標(biāo)識)
self.scheduler = scheduler # scheduler to register with (Scheduler 類搓茬,可繼承并使用自己實現(xiàn)的 Scheduler 類)
def __lt__(self, other):
"""
看這個黑魔法函數(shù)犹赖,就是上面 Scheduler 類中 job 可以使用 sorted 排序的原因
PeriodicJobs are sortable based on the scheduled time they
run next.
"""
return self.next_run < other.next_run
2、job run 方法的邏輯
def run(self):
"""
Run the job and immediately reschedule it.
:return: The return value returned by the `job_func`
"""
logger.info('Running job %s', self)
ret = self.job_func()
self.last_run = datetime.datetime.now()
self._schedule_next_run()
return ret
3卷仑、every 函數(shù)操作
可能看到這里的朋友覺得有點(diǎn)混亂了峻村,job 是什么時候?qū)嵗模质鞘裁磿r候加入 Scheduler 的 jobs 列表的锡凝,其實我沒有按照 示例代碼的順序來講話粘昨,可以回到前面代碼示例中有下面這句代碼在 run_pending 之前:
schedule.every(10).seconds.do(job)
也就是這個地方實例化 Job 類的,可以跳到這段代碼分析分析:
class Scheduler(object):
# 省略部分代碼
def every(self, interval=1):
"""
Schedule a new periodic job.
:param interval: A quantity of a certain time unit
:return: An unconfigured :class:`Job <Job>`
"""
job = Job(interval, self)
return job
def every(interval=1):
"""Calls :meth:`every <Scheduler.every>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.every(interval)
4窜锯、do 函數(shù)操作
可以看到在調(diào)用 every 函數(shù)時张肾,最終調(diào)用的是 Scheduler 類的 every 方法,該方法主要是根據(jù)設(shè)置的間隔時間(interval) 實例化 Job 類并返回該實例锚扎,這段代碼同樣沒有 job 加入 Scheduler 的 jobs 列表的邏輯吞瞪,那就是在 seconds.do 方法進(jìn)行的操作,接著翻代碼看看:
def do(self, job_func, *args, **kwargs):
"""
Specifies the job_func that should be called every time the
job runs.
Any additional arguments are passed on to job_func when
the job runs.
:param job_func: The function to be scheduled
:return: The invoked job instance
"""
self.job_func = functools.partial(job_func, *args, **kwargs)
try:
functools.update_wrapper(self.job_func, job_func)
except AttributeError:
# job_funcs already wrapped by functools.partial won't have
# __name__, __module__ or __doc__ and the update_wrapper()
# call will fail.
pass
self._schedule_next_run()
self.scheduler.jobs.append(self)
return self
- 首先使用標(biāo)準(zhǔn)庫的 partial (偏函數(shù)) 先提前為 job_func(我們提供的業(yè)務(wù)邏輯代碼)設(shè)置參數(shù)工秩,用一些默認(rèn)參數(shù)包裝一個可調(diào)用對象,返回結(jié)果是可調(diào)用對象尸饺,并且可以像原始對象一樣對待进统,凍結(jié)部分函數(shù)位置函數(shù)或關(guān)鍵字參數(shù),簡化函數(shù),更少更靈活的函數(shù)參數(shù)調(diào)用浪听。
- update_wrapper 做了什么操作螟碎,這個函數(shù)的作用就是從 **被修飾的函數(shù)(job_func) ** 中取出一些屬性值來,賦值給 修飾器函數(shù)(self.job_func) 迹栓。默認(rèn) partial 對象沒有 name和 doc, 這種情況下掉分,對于裝飾器函數(shù)非常難以debug.
- 接著就是 self._schedule_next_run 方法,這個是 schedule 庫的核心代碼克伊,有點(diǎn)復(fù)雜酥郭,下面慢慢解釋
- 接下來一行就是想要的答案了,這個時候?qū)?dāng)前 Job 類加入 Scheduler 類的 jobs 列表中愿吹,append(self) 這個 self 就是當(dāng)前的 Job 類
5不从、核心 _schedule_next_run
def _schedule_next_run(self):
"""
Compute the instant when this job should run next.
"""
# 第一步,判斷當(dāng)前運(yùn)行的時間單位是否在指定范圍內(nèi)犁跪,不在則報錯
if self.unit not in ('seconds', 'minutes', 'hours', 'days', 'weeks'):
raise ScheduleValueError('Invalid unit')
# 如果 interval 的上限時間不為 None, 判斷 interval 上限時間是否小于 interval
# 小于則報錯椿息,latest 用于 every(A).to(B).seconds 每 N 秒執(zhí)行一次 job 任務(wù),
# 其中 A <= N <= B. 所以 A(interval) <= B (latest)
if self.latest is not None:
if not (self.latest >= self.interval):
raise ScheduleError('`latest` is greater than `interval`')
# 執(zhí)行時間隨機(jī)從 interval 到 latest 之前取值
interval = random.randint(self.interval, self.latest)
else:
interval = self.interval
# 下面兩行用于獲取下一次執(zhí)行的時間
self.period = datetime.timedelta(**{self.unit: interval})
self.next_run = datetime.datetime.now() + self.period
# start_day 這個只會在設(shè)置一周的第幾天執(zhí)行才會有坷衍,所以 unit 時間單位不是 weeks 就報錯
if self.start_day is not None:
if self.unit != 'weeks':
raise ScheduleValueError('`unit` should be \'weeks\'')
weekdays = (
'monday',
'tuesday',
'wednesday',
'thursday',
'friday',
'saturday',
'sunday'
)
# 如果天數(shù)的標(biāo)識不是上面的寝优,報錯
if self.start_day not in weekdays:
raise ScheduleValueError('Invalid start day')
weekday = weekdays.index(self.start_day)
# datetime 的 weekday() 函數(shù),計算目標(biāo)時間是否已經(jīng)在本周發(fā)送
days_ahead = weekday - self.next_run.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
self.next_run += datetime.timedelta(days_ahead) - self.period
if self.at_time is not None:
if (self.unit not in ('days', 'hours', 'minutes')
and self.start_day is None):
raise ScheduleValueError(('Invalid unit without'
' specifying start day'))
kwargs = {
'second': self.at_time.second,
'microsecond': 0
}
if self.unit == 'days' or self.start_day is not None:
kwargs['hour'] = self.at_time.hour
if self.unit in ['days', 'hours'] or self.start_day is not None:
kwargs['minute'] = self.at_time.minute
self.next_run = self.next_run.replace(**kwargs)
# If we are running for the first time, make sure we run
# at the specified time *today* (or *this hour*) as well
if not self.last_run:
now = datetime.datetime.now()
if (self.unit == 'days' and self.at_time > now.time() and
self.interval == 1):
self.next_run = self.next_run - datetime.timedelta(days=1)
elif self.unit == 'hours' \
and self.at_time.minute > now.minute \
or (self.at_time.minute == now.minute
and self.at_time.second > now.second):
self.next_run = self.next_run - datetime.timedelta(hours=1)
elif self.unit == 'minutes' \
and self.at_time.second > now.second:
self.next_run = self.next_run - \
datetime.timedelta(minutes=1)
if self.start_day is not None and self.at_time is not None:
# Let's see if we will still make that time we specified today
if (self.next_run - datetime.datetime.now()).days >= 7:
self.next_run -= self.period
APScheduler 庫
APScheduler(Advanced Python Scheduler)是基于Quartz的一個Python定時任務(wù)框架枫耳,實現(xiàn)了Quartz的所有功能, 是一個輕量級但功能強(qiáng)大的進(jìn)程內(nèi)任務(wù)調(diào)度程序乏矾。它有以下三個特點(diǎn):
- 類似于 Liunx Cron 的調(diào)度程序(可選的開始/結(jié)束時間)
- 基于時間間隔的執(zhí)行調(diào)度(周期性調(diào)度,可選的開始/結(jié)束時間)
- 一次性執(zhí)行任務(wù)(在設(shè)定的日期/時間運(yùn)行一次任務(wù))
可以按照個人喜好來混合和匹配調(diào)度系統(tǒng)和存儲作業(yè)的后端存儲迁杨,支持以下幾種后臺作業(yè)存儲:
- Memory
- SQLAlchemy (任何 SQLAlchemy 支持的關(guān)系型數(shù)據(jù)庫)
- MongoDB
- Redis
- ZooKeeper
- RethinkDB
APScheduler 集成了以下幾個 Python 框架:
- asyncio
- gevent
- Tornado
- Twisted
- Qt
總結(jié)以上钻心,APScheduler 支持基于日期、固定時間仑最、crontab 形式三種形式的任務(wù)調(diào)度扔役,可以靈活接入各種類型的后臺作業(yè)存儲來持久化作業(yè),同時提供了多種調(diào)度器(后面提及)警医,集成多種 Python 框架亿胸,可以根據(jù)實際情況靈活組合后臺存儲以及調(diào)度器來使用。
APScheduler 的架構(gòu)及工作原理
1预皇、APScheduler 基本概念
APScheduler 由四個組件構(gòu)成(注:該部分翻譯至官方文檔):
-
triggers 觸發(fā)器
觸發(fā)器包含調(diào)度邏輯侈玄。每個作業(yè)(job)都有自己的觸發(fā)器,用于確定下一個作業(yè)何時運(yùn)行吟温。除了最初的配置序仙,觸發(fā)器是完全無狀態(tài)的
-
job stores 作業(yè)存儲
job stores 是存放作業(yè)的地方,默認(rèn)保存在內(nèi)存中鲁豪。作業(yè)數(shù)據(jù)序列化后保存至持久性數(shù)據(jù)庫潘悼,從持久性數(shù)據(jù)庫加載回來時會反序列化律秃。作業(yè)存儲(job stores)不將作業(yè)數(shù)據(jù)保存在內(nèi)存中(默認(rèn)存儲除外),相反治唤,內(nèi)存只是充當(dāng)后端存儲在保存棒动、加載、更新宾添、查找作業(yè)時的中間人角色船惨。作業(yè)存儲不能在調(diào)度器(schedulers) 之間共享
-
executors 執(zhí)行器
執(zhí)行器處理作業(yè)的運(yùn)行。它們通常通過將作業(yè)中的指定可調(diào)用部分提交給線程或進(jìn)程池來實現(xiàn)這一點(diǎn)缕陕。 當(dāng)作業(yè)完成后粱锐,執(zhí)行器通知調(diào)度器,然后調(diào)度器發(fā)出一個適當(dāng)?shù)氖录?/p>
-
schedulers 調(diào)度器
調(diào)度器是將其余部分綁定在一起的工具扛邑。通常只有一個調(diào)度器(scheduler)在應(yīng)用程序中運(yùn)行怜浅。應(yīng)用程序開發(fā)者通常不直接處理作業(yè)存儲(job stores)、執(zhí)行器(executors)或者觸發(fā)器(triggers)鹿榜。相反海雪,調(diào)度器提供了適當(dāng)?shù)慕涌趤硖幚硭鼈兘蹙簟E渲米鳂I(yè)存儲(job stores)和執(zhí)行器(executors)是通過調(diào)度器(scheduler)來完成的,就像添加舱殿、修改和刪除 job(作業(yè))一樣
2、APScheduler 架構(gòu)圖