"""
Python實(shí)現(xiàn)定時(shí)任務(wù)的4種方法:
1、死循環(huán) + time.sleep()
2诸蚕、利用Timer對(duì)象實(shí)現(xiàn)定時(shí)輸出
3氧猬、sched 事件調(diào)度器
4、APScheduler
"""
import datetime as dt
import time
1盅抚、死循環(huán) + time.sleep()實(shí)現(xiàn)定時(shí),不過最后陷入死循環(huán)
def task(s):
while True:
print(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
time.sleep(s)
task(2)
2妄均、class threading.Timer(interval, function, args=[], kwargs={}) interval 是時(shí)間間隔,function 是可調(diào)用的對(duì)象,args 和 kwargs 會(huì)作為 function 的參數(shù)。
from threading import Timer
def func(): # 需要定時(shí)執(zhí)行的函數(shù)
print([dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")])
def task(s):
print(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
Timer(s, func, ()).start()
task(2)
實(shí)現(xiàn)循環(huán)打印的方法一:在function里面繼續(xù)注冊(cè)一個(gè)Timer,這樣就能在下一個(gè)interval繼續(xù)執(zhí)行function
def hello():
print('hello world')
Timer(10.0,hello).start() # 有一點(diǎn)類似于函數(shù)的遞歸調(diào)用,即我調(diào)用我自己
t = Timer(10.0, hello) #10秒以后運(yùn)行hello()函數(shù),但是只能運(yùn)行一次
t.start()
實(shí)現(xiàn)循環(huán)打印的方法二:
以后再做補(bǔ)充
3奕纫、sched 事件調(diào)度器
"""
(1) 構(gòu)造一個(gè)sched.scheduler類
scheduler = sched.scheduler(timefunc, delayfunc)
timefunc:當(dāng)前時(shí)間(默認(rèn) time.time)
delayfunc:暫停運(yùn)行的時(shí)間單元(默認(rèn) time.sleep)
(2) 添加調(diào)度任務(wù)
① 定點(diǎn):延長(zhǎng)指定的時(shí)間執(zhí)行任務(wù)
scheduler.enter(delay, priority, action, argument=(), kwargs={})
delay:延遲多長(zhǎng)時(shí)間執(zhí)行任務(wù)(單位:秒)
priority:優(yōu)先級(jí):越小優(yōu)先級(jí)越大
action:函數(shù)名稱
argument 和 kwargs:函數(shù)位置和關(guān)鍵字參數(shù)
② 定時(shí):指定 time 時(shí)刻執(zhí)行任務(wù)
scheduler.enterabs(time, priority, action, argument=(), kwargs={}):
time:指定時(shí)間點(diǎn)
其余參數(shù)同上
(3) 運(yùn)行任務(wù)
scheduler.run()
"""
def func1():
print("scheduler1")
def func2():
print("scheduler2")
import sched
def task(s):
print(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
初始化 sched 模塊的 scheduler 類
scheduler1 = sched.scheduler(time.time, time.sleep)
scheduler2 = sched.scheduler(time.time, time.sleep)
增加調(diào)度任務(wù)
scheduler1.enter(s, 2, func1,())
scheduler2.enter(s,1,func2,())
運(yùn)行任務(wù)
scheduler1.run()
scheduler2.run()
task(2)
4烫沙、APScheduler
"""
APScheduler 四個(gè)組件分別為:調(diào)度器(scheduler)、觸發(fā)器(trigger)升筏,作業(yè)存儲(chǔ)(job store),執(zhí)行器(executor)
(1)新建調(diào)度器schedulers
BlockingScheduler:調(diào)度器在當(dāng)前進(jìn)程的主線程中運(yùn)行,也就是會(huì)阻塞當(dāng)前線程
BackgroundScheduler:調(diào)度器在后臺(tái)線程中運(yùn)行,不會(huì)阻塞當(dāng)前線程
(2)添加調(diào)度任務(wù)trigger
① date 觸發(fā)器:(指定時(shí)間點(diǎn)觸發(fā)),參數(shù)如下:
run_date(datetime或str):任務(wù)運(yùn)行的日期或時(shí)間
timezone(datetime.tzinfo或str):指定時(shí)區(qū)
例1:在 2020-9-24 時(shí)刻運(yùn)行一次 func 方法
scheduler.add_job(func, 'date', run_date = dt.date(2020, 9, 24))
例2: 在 2020-9-24 15:10:00 時(shí)刻運(yùn)行一次 func 方法
scheduler.add_job(func, 'date', run_date = dt.datetime(2020, 9, 24, 15, 10, 0))
例3: 在 2020-9-24 15:11:00 時(shí)刻運(yùn)行一次 func 方法
scheduler.add_job(func, 'date', run_date = '2020-9-24 15:11:00')
② interval 觸發(fā)器: (固定時(shí)間間隔觸發(fā))铅忿,參數(shù)如下:
weeks(int):間隔幾周
days(int):間隔幾天
hours(int):間隔幾小時(shí)
minutes(int):間隔幾分鐘
seconds(int):間隔幾秒鐘
start_date(datetime或str):開始時(shí)間
end_date(datetime或str):結(jié)束時(shí)間
timezone(datetime.tzinfo或str):時(shí)區(qū)
例1:每隔兩分鐘執(zhí)行一次 func 方法
scheduler.add_job(func, 'interval', minutes = 2)
例2:在 2020-9-24 15:15:00 ~ 2020-9-24 15:20:00 之間, 每隔兩分鐘執(zhí)行一次 func 方法
scheduler.add_job(func, 'interval', minutes = 2, start_date = '2020-9-24 15:15:00' , end_date = '2020-9-24 15:20:00')
③ cron 觸發(fā)器:(在指定時(shí)間周期性地觸發(fā))檀训,參數(shù)如下:
year(int 或 str):年
month(int 或 str):月
day(int 或 str):日
week(int 或 str):周(1-53)
day_of_week(int 或 str):星期幾(0-6)
hour(int 或 str):時(shí)
minute(int 或 str):分
second(int 或 str):秒
start_date(datetime或str):最早開始時(shí)間(包含)
end_date(datetime或str):最晚結(jié)束時(shí)間(包含)
timezone(datetime.tzinfo或str):指定時(shí)區(qū)
例:在每年 1-3享言、7-9 月份中的每個(gè)星期一、二中的 00:00, 01:00, 02:00 和 03:00 執(zhí)行 func 任務(wù)
scheduler.add_job(func, 'cron', month = '1-3,7-9',day='0, tue', hour='0-3')
(3)管理調(diào)度任務(wù)job stores
① 添加job:
add_job():可以改變或者移除 job
scheduler.add_job(func, 'interval', minutes = 2)
scheduled_job():只適用于應(yīng)用運(yùn)行期間不會(huì)改變的 job
scheduler.scheduled_job(func, 'interval', minutes = 2)
② 移除job:
remove_job() :根據(jù) job 的 id 來移除荧琼,所以要在 job 創(chuàng)建的時(shí)候指定一個(gè) id
scheduler.add_job(func, 'interval', minutes = 2, id = 'job_one')
scheduler.remove_job(job_one)
job.remove() :對(duì) job 執(zhí)行 remove 方法
job = add_job(func, 'interval', minutes = 2, id = 'job_one')
job.remvoe()
③ 暫停job:
apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()
④ 恢復(fù)job:
apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()
⑤ 修改job:
modify_job()
scheduler.modify_job('job_one', minutes = 5)
job.modify()
job = scheduler.add_job(func, 'interval', minutes = 2)
job.modify(minutes = 5)
⑥ 關(guān)閉job:
scheduler.shutdown()
scheduler.shutdown(wait=false)
(4)executors:執(zhí)行調(diào)度任務(wù)的模塊,常用的 executor 有兩種:
ProcessPoolExecutor
ThreadPoolExecutor
(5)運(yùn)行調(diào)度任務(wù)
scheduler.start()
————————————————
版權(quán)聲明:本文為CSDN博主「相顧之尸」的原創(chuàng)文章差牛,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明偏化。
原文鏈接:https://blog.csdn.net/m0_56312629/article/details/124737291