簡介
APScheduler:強大的任務(wù)調(diào)度工具犀斋,可以完成定時任務(wù)奏篙,周期任務(wù)等,它是跨平臺的关斜,用于取代Linux下的cron daemon或者Windows下的task scheduler示括。
內(nèi)置三種調(diào)度調(diào)度系統(tǒng):
- Cron風(fēng)格
- 間隔性執(zhí)行
- 僅在某個時間執(zhí)行一次
作業(yè)存儲的backends支持:
- Memory
- SQLAlchemy (any RDBMS supported by SQLAlchemy works)
- MongoDB
- Redis
- RethinkDB
- ZooKeeper
基本概念:4個組件
triggers
: 描述一個任務(wù)何時被觸發(fā),有按日期痢畜、按時間間隔垛膝、按cronjob描述式三種觸發(fā)方式
job stores
: 任務(wù)持久化倉庫,默認(rèn)保存任務(wù)在內(nèi)存中丁稀,也可將任務(wù)保存都各種數(shù)據(jù)庫中繁涂,任務(wù)中的數(shù)據(jù)序列化后保存到持久化數(shù)據(jù)庫,從數(shù)據(jù)庫加載后又反序列化二驰。
executors
: 執(zhí)行任務(wù)模塊,當(dāng)任務(wù)完成時executors通知schedulers秉沼,schedulers收到后會發(fā)出一個適當(dāng)?shù)氖录?br>
schedulers
: 任務(wù)調(diào)度器桶雀,控制器角色矿酵,通過它配置job stores和executors,添加矗积、修改和刪除任務(wù)全肮。
插件機制: 供用戶自由選擇scheduler, job store(s), executor(s) and trigger(s)
scheduler
scheduler的主循環(huán)(main_loop),其實就是反復(fù)檢查是不是有到時需要執(zhí)行的任務(wù)棘捣,完成一次檢查的函數(shù)是_process_jobs, 這個函數(shù)做這么幾件事:
- 詢問自己的每一個jobstore辜腺,有沒有到期需要執(zhí)行的任務(wù)(jobstore.get_due_jobs())
- 如果有,計算這些job中每個job需要運行的時間點(run_times = job._get_run_times(now))如果run_times有多個乍恐,這種情況我們上面討論過评疗,有coalesce檢查
提交給executor排期運行(executor.submit_job(job, run_times)) - 那么在這個_process_jobs的邏輯,什么時候調(diào)用合適呢茵烈?如果不間斷地調(diào)用百匆,而實際上沒有要執(zhí)行的job,是一種浪費呜投。每次掉用_process_jobs后加匈,其實可以預(yù)先判斷一下,下一次要執(zhí)行的job(離現(xiàn)在最近的)還要多長時間仑荐,作為返回值告訴main_loop, 這時主循環(huán)就可以去睡一覺雕拼,等大約這么長時間后再喚醒,執(zhí)行下一次_process_jobs粘招。這里喚醒的機制就會有IO模型的區(qū)別了
scheduler由于IO模型的不同啥寇,可以有多種實現(xiàn),內(nèi)置scheduler供選:
-
BlockingScheduler
: scheduler在當(dāng)前進程的主線程中運行男图,所以調(diào)用start函數(shù)會阻塞當(dāng)前線程示姿,不能立即返回。 -
BackgroundScheduler
: 放到后臺線程中運行逊笆,所以調(diào)用start后主線程不會阻塞 -
AsyncIOScheduler
: 使用asyncio模塊 -
GeventScheduler
: 使用gevent作為IO模型栈戳,和GeventExecutor配合使用 -
TornadoScheduler
: 配合TwistedExecutor,用reactor.callLater完成定時喚醒 -
TwistedScheduler
: 使用tornado的IO模型难裆,用ioloop.add_timeout完成定時喚醒 -
QtScheduler
: 使用QTimer完成定時喚醒
jobstore
jobstore提供給scheduler一個序列化jobs的統(tǒng)一抽象子檀,提供對scheduler中job的增刪改查接口,根據(jù)存儲backend的不同乃戈,分以下幾種
內(nèi)置job stores供選:
-
MemoryJobStore
:沒有序列化褂痰,jobs就存在內(nèi)存里,增刪改查也都是在內(nèi)存中操作 -
SQLAlchemyJobStore
:所有sqlalchemy支持的數(shù)據(jù)庫都可以做為backend症虑,增刪改查操作轉(zhuǎn)化為對應(yīng)backend的sql語句 -
MongoDBJobStore
:用mongodb作backend -
RedisJobStore
: 用redis作backend
除了MemoryJobStore外缩歪,其他幾種都使用pickle做序列化工具,所以這里要指出一點谍憔,如果你不是在用內(nèi)存做jobstore匪蝙,那么必須確保你提供給job的可執(zhí)行函數(shù)必須是可以被全局訪問的主籍,也就是可以通過ref_to_obj反查出來的,否則無法序列化逛球。
使用數(shù)據(jù)庫做jobstore千元,就會發(fā)現(xiàn),其實創(chuàng)建了一張有三個域的的jobs表颤绕,分別是****id, next_run_time, job_state幸海,其中job_state是job對象pickle序列化后的二進制**,而id和next_run_time則是支持job的兩類查詢(按id和按最近運行時間)
executor
aps把任務(wù)最終的執(zhí)行機制也抽象了出來奥务,可以根據(jù)IO模型選配物独,不需要講太多,最常用的是threadpool和processpoll兩種(來自concurrent.futures的線程/進程池)汗洒。
不同類型的executor實現(xiàn)自己的_do_submit_job议纯,完成一次實際的任務(wù)實例執(zhí)行。以線程/進程池實現(xiàn)為例
內(nèi)置executors供選:
-
ProcessPoolExecutor
: 多進程溢谤,可指定進程數(shù)瞻凤,當(dāng)工作負(fù)載為CPU密集型操作時可以考慮使用它來利用多核CPU -
ThreadPoolExecutor
: 多線程,可指定線程數(shù)世杀,默認(rèn)阀参,可以滿足大多數(shù)用途 AsyncIOExecutor
DebugExecutor
GeventExecutor
ProcessPoolExecutor
ThreadPoolExecutor
TwistedExecutor
trigger
trigger是抽象出了“一個job是何時被觸發(fā)”這個策略,每種trigger實現(xiàn)自己的get_next_fire_time函數(shù)
aps提供的trigger包括:
-
date
:一次性指定日期 -
interval
:在某個時間范圍內(nèi)間隔多長時間執(zhí)行一次 -
cron
:和unix crontab格式兼容瞻坝,最為強大
默認(rèn)配置: 使用MemoryJobStore和ThreadPoolExecutor
優(yōu)點:插件化思想和抽象出接口蛛壳,策略與不同實現(xiàn)機制分離
User guide
配置scheduler
官網(wǎng)提供了等價的三種方法,第一種比較簡潔明了所刀。
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
from apscheduler.schedulers.background import BackgroundScheduler
# 使用默認(rèn)配置衙荐,即MemoryJobStore和ThreadPoolExecutor(10)
scheduler = BackgroundScheduler()
啟動調(diào)度器
調(diào)用調(diào)度器的start()
方法
添加任務(wù)
兩種方式:
- 調(diào)用調(diào)度器的
add_job()
- 使用調(diào)度器的
scheduled_job()
裝飾器: 很簡潔,推薦這種浮创。
其他的不常用操作如移除任務(wù)忧吟、暫停和恢復(fù)任務(wù)、獲取調(diào)度了的任務(wù)列表斩披、修改任務(wù)溜族、關(guān)停調(diào)度器、暫停/恢復(fù)任務(wù)處理等見文檔:http://apscheduler.readthedocs.io/en/latest/userguide.html
限制并發(fā)執(zhí)行的任務(wù)實例數(shù)量
默認(rèn)同一時刻只能有一個實例運行垦沉,通過max_instances=3修改為3個煌抒。
錯過執(zhí)行的任務(wù)與合并
misfire_grace_time:如果一個job本來14:00有一次執(zhí)行,但是由于某種原因沒有被調(diào)度上厕倍,現(xiàn)在14:01了寡壮,這個14:00的運行實例被提交時,會檢查它預(yù)訂運行的時間和當(dāng)下時間的差值(這里是1分鐘),大于我們設(shè)置的30秒限制诬像,那么這個運行實例不會被執(zhí)行屋群。
合并:最常見的情形是scheduler被shutdown后重啟,某個任務(wù)會積攢了好幾次沒執(zhí)行如5次坏挠,下次這個job被submit給executor時,執(zhí)行5次邪乍。將coalesce=True后降狠,只會執(zhí)行一次
Scheduler 事件
監(jiān)聽Scheduler發(fā)出的事件并作出處理,如任務(wù)執(zhí)行完庇楞、任務(wù)出錯等
def my_listener(event):
if event.exception:
print('The job crashed :(') # or logger.fatal('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)