介紹
APScheduler是基于Quartz的一個Python定時任務(wù)框架。提供了基于日期烦味、固定時間間隔以及crontab類型的任務(wù)壁拉,并且可以持久化任務(wù)弃理。官方參考 https://apscheduler.readthedocs.io/en/latest/userguide.html
安裝
pip install apscheduler
如果由于某種原因 pip 不起作用,您可以從 PyPI 手動下載 APScheduler 分發(fā)包钥勋,解壓縮然后安裝它 https://pypi.org/project/APScheduler/
$ python setup.py install
使用方法
- 新建一個調(diào)度器schedulers
- 添加調(diào)度任務(wù)
- 運(yùn)行調(diào)度任務(wù)
使用舉例
特定的時間點(diǎn)觸發(fā)
from datetime import datetime
from datetime import date
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
print(text)
scheduler = BlockingScheduler()
# 在 2019-8-30 運(yùn)行一次 job 方法
scheduler.add_job(job, 'date', run_date=date(2019, 8, 30), args=['text1'])
# 在 2019-8-30 01:00:00 運(yùn)行一次 job 方法
scheduler.add_job(job, 'date', run_date=datetime(2019, 8, 30, 1, 0, 0), args=['text2'])
# 在 2019-8-30 01:00:01 運(yùn)行一次 job 方法
scheduler.add_job(job, 'date', run_date='2019-8-30 01:00:00', args=['text3'])
scheduler.start()
固定時間間隔觸發(fā)
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 每隔 1分鐘 運(yùn)行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, args=['job1'])
# 在 2019-08-29 22:15:00至2019-08-29 22:17:00期間算灸,每隔1分30秒 運(yùn)行一次 job 方法
scheduler.add_job(job, 'interval', minutes=1, seconds = 30, start_date='2019-08-29 22:15:00', end_date='2019-08-29 22:17:00', args=['job2'])
scheduler.start()
在特定時間周期性地觸發(fā)
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def job(text):
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('{} --- {}'.format(text, t))
scheduler = BlockingScheduler()
# 在每天22點(diǎn)菲驴,每隔 1分鐘 運(yùn)行一次 job 方法
scheduler.add_job(job, 'cron', hour=22, minute='*/1', args=['job1'])
# 在每天22和23點(diǎn)的25分赊瞬,運(yùn)行一次 job 方法
scheduler.add_job(job, 'cron', hour='22-23', minute='25', args=['job2'])
scheduler.start()
通過裝飾器scheduled_job()添加方法
添加任務(wù)的方法有兩種:
- 通過調(diào)用add_job()---見上面1至3代碼
- 通過裝飾器scheduled_job():
第一種方法是最常用的方法巧涧。第二種方法主要是方便地聲明在應(yīng)用程序運(yùn)行時不會更改的任務(wù)。該 add_job()方法返回一個apscheduler.job.Job實(shí)例谤绳,可以使用該實(shí)例稍后修改或刪除該任務(wù)
import time
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', seconds=5)
def job1():
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('job1 --- {}'.format(t))
@scheduler.scheduled_job('cron', second='*/7')
def job2():
t = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
print('job2 --- {}'.format(t))
scheduler.start()