Python定時(shí)任務(wù)
在項(xiàng)目中蓉驹,我們可能遇到有定時(shí)任務(wù)的需求居兆。其一:定時(shí)執(zhí)行任務(wù)剧防。例如每天早上 8 點(diǎn)定時(shí)推送早報(bào)。其二:每隔一個(gè)時(shí)間段就執(zhí)行任務(wù)销睁。比如:每隔一個(gè)小時(shí)提醒自己起來走動走動供璧,避免長時(shí)間坐著。今天冻记,我跟大家分享下 Python 定時(shí)任務(wù)的實(shí)現(xiàn)方法睡毒。
1
第一種辦法是最簡單又最暴力。那就是在一個(gè)死循環(huán)中冗栗,使用線程睡眠函數(shù) sleep()演顾。
from datetime import datetime
import time
'''
每個(gè) 10 秒打印當(dāng)前時(shí)間。
'''
def timedTask():
while True:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time.sleep(10)
if __name__ == '__main__':
timedTask()
這種方法能夠執(zhí)行固定間隔時(shí)間的任務(wù)隅居。如果timedTask()
函數(shù)之后還有些操作钠至,我們還使用死循環(huán) + 阻塞線程。這會使得timedTask()
一直占有 CPU 資源胎源,導(dǎo)致后續(xù)操作無法執(zhí)行棉钧。我建議謹(jǐn)重使用。
2
既然第一種方法暴力涕蚤,那么有沒有比較優(yōu)雅地方法宪卿?答案是肯定的的诵。Python 標(biāo)準(zhǔn)庫 threading 中有個(gè) Timer 類。它會新啟動一個(gè)線程來執(zhí)行定時(shí)任務(wù)佑钾,所以它是非阻塞函式西疤。
如果你有使用多線程的話,需要關(guān)心線程安全問題休溶。那么你可以選使用threading.Timer
模塊代赁。
from datetime import datetime
from threading import Timer
import time
'''
每個(gè) 10 秒打印當(dāng)前時(shí)間。
'''
def timedTask():
'''
第一個(gè)參數(shù): 延遲多長時(shí)間執(zhí)行任務(wù)(單位: 秒)
第二個(gè)參數(shù): 要執(zhí)行的任務(wù), 即函數(shù)
第三個(gè)參數(shù): 調(diào)用函數(shù)的參數(shù)(tuple)
'''
Timer(10, task, ()).start()
# 定時(shí)任務(wù)
def task():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
timedTask()
while True:
print(time.time())
time.sleep(5)
運(yùn)行結(jié)果:
1512486945.1196375
1512486950.119873
2017-12-05 23:15:50
1512486955.133385
3
第三種方式是使用標(biāo)準(zhǔn)庫中sched
模塊兽掰。sched 是事件調(diào)度器芭碍,它通過 scheduler
類來調(diào)度事件,從而達(dá)到定時(shí)執(zhí)行任務(wù)的效果孽尽。
sched
庫使用起來也是非常簡單豁跑。
1)首先構(gòu)造一個(gè)sched.scheduler
類
它接受兩個(gè)參數(shù):timefunc
和 delayfunc
。timefunc 應(yīng)該返回一個(gè)數(shù)字泻云,代表當(dāng)前時(shí)間艇拍,delayfunc 函數(shù)接受一個(gè)參數(shù),用于暫停運(yùn)行的時(shí)間單元宠纯。
一般使用默認(rèn)參數(shù)就行卸夕,即傳入這兩個(gè)參數(shù) time.time
和 time.sleep
.當(dāng)然,你也可以自己實(shí)現(xiàn)時(shí)間暫停的函數(shù)婆瓜。
2)添加調(diào)度任務(wù)
scheduler
提供了兩個(gè)添加調(diào)度任務(wù)的函數(shù):
enter(delay, priority, action, argument=(), kwargs={})
該函數(shù)可以延遲一定時(shí)間執(zhí)行任務(wù)快集。delay
表示延遲多長時(shí)間執(zhí)行任務(wù),單位是秒廉白。priority
為優(yōu)先級个初,越小優(yōu)先級越大。兩個(gè)任務(wù)指定相同的延遲時(shí)間猴蹂,優(yōu)先級大的任務(wù)會向被執(zhí)行院溺。action
即需要執(zhí)行的函數(shù),argument
和 kwargs
分別是函數(shù)的位置和關(guān)鍵字參數(shù)磅轻。
scheduler.enterabs(time, priority, action, argument=(), kwargs={})
添加一項(xiàng)任務(wù)珍逸,但這個(gè)任務(wù)會在 time
這時(shí)刻執(zhí)行。因此聋溜,time
是絕對時(shí)間.其他參數(shù)用法與 enter()
中的參數(shù)用法是一致谆膳。
3)把任務(wù)運(yùn)行起來
調(diào)用 scheduler.run()
函數(shù)就完事了。
下面是 sche 使用的簡單示例:
from datetime import datetime
import sched
import time
'''
每個(gè) 10 秒打印當(dāng)前時(shí)間撮躁。
'''
def timedTask():
# 初始化 sched 模塊的 scheduler 類
scheduler = sched.scheduler(time.time, time.sleep)
# 增加調(diào)度任務(wù)
scheduler.enter(10, 1, task)
# 運(yùn)行任務(wù)
scheduler.run()
# 定時(shí)任務(wù)
def task():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
if __name__ == '__main__':
timedTask()
值得注意的是: scheduler 中的每個(gè)調(diào)度任務(wù)只會工作一次漱病,不會無限循環(huán)被調(diào)用。如果想重復(fù)執(zhí)行同一任務(wù), 需要重復(fù)添加調(diào)度任務(wù)即可杨帽。
以上三種辦法能實(shí)現(xiàn)定時(shí)任務(wù)凝果,但是都無法做到循環(huán)執(zhí)行定時(shí)任務(wù)。因此睦尽,需要一個(gè)能夠擔(dān)當(dāng)此重任的庫。它就是APScheduler
型雳。
4.1 簡介
APScheduler
的全稱是Advanced Python Scheduler
当凡。它是一個(gè)輕量級的 Python 定時(shí)任務(wù)調(diào)度框架。APScheduler 支持三種調(diào)度任務(wù):固定時(shí)間間隔
纠俭,固定時(shí)間點(diǎn)(日期)
沿量,Linux 下的 Crontab 命令
。同時(shí)冤荆,它還支持異步執(zhí)行朴则、后臺執(zhí)行調(diào)度任務(wù)。
4.2 安裝
使用 pip 包管理工具安裝 APScheduler 是最方便快捷的钓简。
pip install APScheduler
# 如果出現(xiàn)因下載失敗導(dǎo)致安裝不上的情況乌妒,建議使用代理
pip --proxy http://代理ip:端口 install APScheduler
4.3 使用步驟
APScheduler 使用起來還算是比較簡單。運(yùn)行一個(gè)調(diào)度任務(wù)只需要以下三部曲外邓。
- 新建一個(gè) schedulers (調(diào)度器) 撤蚊。
- 添加一個(gè)調(diào)度任務(wù)(job stores)。
- 運(yùn)行調(diào)度任務(wù)损话。
下面是執(zhí)行每 2 秒報(bào)時(shí)的簡單示例代碼:
import datetime
import time
from apscheduler.schedulers.background import BackgroundScheduler
def timedTask():
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
if __name__ == '__main__':
# 創(chuàng)建后臺執(zhí)行的 schedulers
scheduler = BackgroundScheduler()
# 添加調(diào)度任務(wù)
# 調(diào)度方法為 timedTask侦啸,觸發(fā)器選擇 interval(間隔性),間隔時(shí)長為 2 秒
scheduler.add_job(timedTask, 'interval', seconds=2)
# 啟動調(diào)度任務(wù)
scheduler.start()
while True:
print(time.time())
time.sleep(5)
4.4 基礎(chǔ)組件
APScheduler 有四種組件丧枪,分別是:調(diào)度器(scheduler)
光涂,作業(yè)存儲(job store)
,觸發(fā)器(trigger)
拧烦,執(zhí)行器(executor)
忘闻。
- schedulers(調(diào)度器)
它是任務(wù)調(diào)度器,屬于控制器角色恋博。它配置作業(yè)存儲器和執(zhí)行器可以在調(diào)度器中完成服赎,例如添加、修改和移除作業(yè)交播。 - triggers(觸發(fā)器)
描述調(diào)度任務(wù)被觸發(fā)的條件重虑。不過觸發(fā)器完全是無狀態(tài)的。 - job stores(作業(yè)存儲器)
任務(wù)持久化倉庫秦士,默認(rèn)保存任務(wù)在內(nèi)存中,也可將任務(wù)保存都各種數(shù)據(jù)庫中,任務(wù)中的數(shù)據(jù)序列化后保存到持久化數(shù)據(jù)庫提针,從數(shù)據(jù)庫加載后又反序列化命爬。 - executors(執(zhí)行器)
負(fù)責(zé)處理作業(yè)的運(yùn)行,它們通常通過在作業(yè)中提交指定的可調(diào)用對象到一個(gè)線程或者進(jìn)城池來進(jìn)行辐脖。當(dāng)作業(yè)完成時(shí)饲宛,執(zhí)行器將會通知調(diào)度器。
4.4.1 schedulers(調(diào)度器)
我個(gè)人覺得 APScheduler 非常好用的原因嗜价。它提供 7 種調(diào)度器艇抠,能夠滿足我們各種場景的需要。例如:后臺執(zhí)行某個(gè)操作久锥,異步執(zhí)行操作等家淤。調(diào)度器分別是:
- BlockingScheduler : 調(diào)度器在當(dāng)前進(jìn)程的主線程中運(yùn)行,也就是會阻塞當(dāng)前線程瑟由。
- BackgroundScheduler : 調(diào)度器在后臺線程中運(yùn)行絮重,不會阻塞當(dāng)前線程。
- AsyncIOScheduler : 結(jié)合
asyncio
模塊(一個(gè)異步框架)一起使用歹苦。 - GeventScheduler : 程序中使用
gevent
(高性能的Python并發(fā)框架)作為IO模型青伤,和GeventExecutor
配合使用。 - TornadoScheduler : 程序中使用
Tornado
(一個(gè)web框架)的IO模型殴瘦,用ioloop.add_timeout
完成定時(shí)喚醒潮模。 - TwistedScheduler : 配合
TwistedExecutor
,用reactor.callLater
完成定時(shí)喚醒痴施。 - QtScheduler : 你的應(yīng)用是一個(gè) Qt 應(yīng)用擎厢,需使用QTimer完成定時(shí)喚醒。
4.4.2 triggers(觸發(fā)器)
APScheduler 有三種內(nèi)建的 trigger:
1)date 觸發(fā)器
date 是最基本的一種調(diào)度辣吃,作業(yè)任務(wù)只會執(zhí)行一次动遭。它表示特定的時(shí)間點(diǎn)觸發(fā)。它的參數(shù)如下:
參數(shù) | 說明 |
---|---|
run_date (datetime 或 str) | 作業(yè)的運(yùn)行日期或時(shí)間 |
timezone (datetime.tzinfo 或 str) | 指定時(shí)區(qū) |
date 觸發(fā)器使用示例如下:
from datetime import datetime
from datetime import date
from apscheduler.schedulers.background import BackgroundScheduler
def job_func(text):
print(text)
scheduler = BackgroundScheduler()
# 在 2017-12-13 時(shí)刻運(yùn)行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 時(shí)刻運(yùn)行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2017-12-13 14:00:01 時(shí)刻運(yùn)行一次 job_func 方法
scheduler .add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
scheduler.start()
2)interval 觸發(fā)器
固定時(shí)間間隔觸發(fā)神得。interval 間隔調(diào)度厘惦,參數(shù)如下:
參數(shù) | 說明 |
---|---|
weeks (int) | 間隔幾周 |
days (int) | 間隔幾天 |
hours (int) | 間隔幾小時(shí) |
minutes (int) | 間隔幾分鐘 |
seconds (int) | 間隔多少秒 |
start_date (datetime 或 str) | 開始日期 |
end_date (datetime 或 str) | 結(jié)束日期 |
timezone (datetime.tzinfo 或str) | 時(shí)區(qū) |
interval 觸發(fā)器使用示例如下:
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
def job_func(text):
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
# 每隔兩分鐘執(zhí)行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之間, 每隔兩分鐘執(zhí)行一次 job_func 方法
scheduler .add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
scheduler.start()
3)cron 觸發(fā)器
在特定時(shí)間周期性地觸發(fā),和Linux crontab格式兼容哩簿。它是功能最強(qiáng)大的觸發(fā)器宵蕉。
我們先了解 cron 參數(shù):
參數(shù) | 說明 |
---|---|
year (int 或 str) | 年,4位數(shù)字 |
month (int 或 str) | 月 (范圍1-12) |
day (int 或 str) | 日 (范圍1-31 |
week (int 或 str) | 周 (范圍1-53) |
day_of_week (int 或 str) | 周內(nèi)第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun) |
hour (int 或 str) | 時(shí) (范圍0-23) |
minute (int 或 str) | 分 (范圍0-59) |
second (int 或 str) | 秒 (范圍0-59) |
start_date (datetime 或 str) | 最早開始日期(包含) |
end_date (datetime 或 str) | 最晚結(jié)束時(shí)間(包含) |
timezone (datetime.tzinfo 或str) | 指定時(shí)區(qū) |
這些參數(shù)是支持算數(shù)表達(dá)式节榜,取值格式有如下:
[圖片上傳失敗...(image-b5f63a-1557987491260)]
點(diǎn)擊查看大圖
cron 觸發(fā)器使用示例如下:
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
def job_func(text):
print("當(dāng)前時(shí)間:", datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
# 在每年 1-3羡玛、7-9 月份中的每個(gè)星期一、二中的 00:00, 01:00, 02:00 和 03:00 執(zhí)行 job_func 任務(wù)
scheduler .add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
scheduler.start()
4.4.3 作業(yè)存儲(job store)
該組件是對調(diào)度任務(wù)的管理宗苍。
1)添加 job
有兩種添加方法稼稿,其中一種上述代碼用到的 add_job()
薄榛, 另一種則是scheduled_job()
修飾器來修飾函數(shù)。
這個(gè)兩種辦法的區(qū)別是:第一種方法返回一個(gè) apscheduler.job.Job 的實(shí)例让歼,可以用來改變或者移除 job敞恋。第二種方法只適用于應(yīng)用運(yùn)行期間不會改變的 job。
第二種添加任務(wù)方式的例子:
import datetime
from apscheduler.schedulers.background import BackgroundScheduler
@scheduler.scheduled_job(job_func, 'interval', minutes=2)
def job_func(text):
print(datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
scheduler = BackgroundScheduler()
scheduler.start()
2)移除 job
移除 job 也有兩種方法:remove_job()
和 job.remove()
谋右。
remove_job() 是根據(jù) job 的 id 來移除硬猫,所以要在 job 創(chuàng)建的時(shí)候指定一個(gè) id。
job.remove() 則是對 job 執(zhí)行 remove 方法即可
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.remove_job(job_one)
job = add_job(job_func, 'interval', minutes=2, id='job_one')
job.remvoe()
3)獲取 job 列表
通過 scheduler.get_jobs()
方法能夠獲取當(dāng)前調(diào)度器中的所有 job 的列表
- 修改 job
如果你因計(jì)劃改變要對 job 進(jìn)行修改改执,可以使用Job.modify()
或者modify_job()
方法來修改 job 的屬性啸蜜。但是值得注意的是,job 的 id 是無法被修改的天梧。
scheduler.add_job(job_func, 'interval', minutes=2, id='job_one')
scheduler.start()
# 將觸發(fā)時(shí)間間隔修改成 5分鐘
scheduler.modify_job('job_one', minutes=5)
job = scheduler.add_job(job_func, 'interval', minutes=2)
# 將觸發(fā)時(shí)間間隔修改成 5分鐘
job.modify(minutes=5)
5)關(guān)閉 job
默認(rèn)情況下調(diào)度器會等待所有正在運(yùn)行的作業(yè)完成后,關(guān)閉所有的調(diào)度器和作業(yè)存儲霞丧。如果你不想等待呢岗,可以將 wait 選項(xiàng)設(shè)置為 False。
scheduler.shutdown()
scheduler.shutdown(wait=false)
4.4.4 執(zhí)行器(executor)
執(zhí)行器顧名思義是執(zhí)行調(diào)度任務(wù)的模塊蛹尝。最常用的 executor 有兩種:ProcessPoolExecutor
和 ThreadPoolExecutor
下面是顯式設(shè)置 job store(使用mongo存儲)和 executor 的代碼的示例后豫。
注:本代碼來源于網(wǎng)絡(luò)
from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
def my_job():
print 'hello world'
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)
jobstores = {
'mongo': MongoDBJobStore(collection='job', database='test', client=client),
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(10),
'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, 'interval', seconds=5)
try:
scheduler.start()
except SystemExit:
client.close()