需求
為了能夠在Web端口動態(tài)添加定時任務(wù)的需求尔邓,本次來調(diào)研一下Celery 4.x 在Django框架下該如何動態(tài)添加定時任務(wù)晾剖。
Celery動態(tài)添加定時任務(wù)的官方文檔
celery
文檔:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers
django-celery-beat
文檔 : https://pypi.org/project/django-celery-beat/
新建Django項目
安裝最新版本的Django
pip3 install django
當(dāng)前我安裝的版本是 3.0.6
創(chuàng)建項目
django-admin startproject 項目名稱
執(zhí)行如下:
django-admin startproject django_con .
安裝 celery
pip3 install django-celery
pip3 install -U Celery
pip3 install "celery[librabbitmq,redis,auth,msgpack]"
pip3 install django-celery-beat # 用于動態(tài)添加定時任務(wù)
pip3 install django-celery-results
pip3 install redis
安裝完畢后,相關(guān)依賴版本
amqp==2.5.2
anyjson==0.3.3
asgiref==3.2.7
billiard==3.6.3.0
celery==4.4.2
cffi==1.14.0
cryptography==2.9.2
Django==3.0.6
django-celery==3.3.1
django-celery-beat==2.0.0
django-celery-results==1.2.1
django-timezone-field==4.0
dnspython==1.16.0
eventlet==0.25.2
greenlet==0.4.15
importlib-metadata==1.6.0
kombu==4.6.8
monotonic==1.5
pycparser==2.20
python-crontab==2.4.2
python-dateutil==2.8.1
pytz==2020.1
redis==3.5.1
six==1.14.0
sqlparse==0.3.1
vine==1.3.0
zipp==3.1.0
測試使用Celery應(yīng)用
創(chuàng)建Celery目錄結(jié)構(gòu)
首先在Django項目中創(chuàng)建一個celery_tasks文件夾梯嗽,再創(chuàng)建tasks.py模塊, 如下:
編寫tasks.py 其內(nèi)容為:
from celery import Celery
# 使用redis作為broker
app = Celery('celery_tasks.tasks', broker='redis://192.168.196.135:6379/8')
# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task():
print("任務(wù)函數(shù)正在執(zhí)行....")
Celery第一個參數(shù)是給其設(shè)定一個名字齿尽, 第二參數(shù)我們設(shè)定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數(shù)是我們編寫的一個任務(wù)函數(shù)灯节, 通過加上裝飾器app.task, 將其注冊到broker的隊列中循头。
現(xiàn)在我們在創(chuàng)建一個worker, 等待處理隊列中的任務(wù)炎疆。
進(jìn)入項目的根目錄卡骂,執(zhí)行命令: celery -A celery_tasks.tasks worker -l info
調(diào)用任務(wù)
下面來測試一下功能,創(chuàng)建一個任務(wù)形入,加入任務(wù)隊列中全跨,提供worker執(zhí)行。
進(jìn)入python終端, 執(zhí)行如下代碼:
[root@python_env django_cron]# python3 manage.py shell
In [3]: from celery_tasks.tasks import my_task
# 調(diào)用一個任務(wù)函數(shù)亿遂,將會返回一個AsyncResult對象浓若,這個對象可以用來檢查任務(wù)的狀態(tài)或者獲得任務(wù)的返回值渺杉。
In [4]: my_task.delay()
Out[4]: <AsyncResult: 647b2589-95d2-45c9-a9a7-0b5530caf249>
返回worker的終端界面,查看任務(wù)執(zhí)行情況挪钓,如下:
可以看到已經(jīng)收到任務(wù)是越,并執(zhí)行打印了信息。
存儲結(jié)果
如果我們想跟蹤任務(wù)的狀態(tài)诵原,Celery需要將結(jié)果保存到某個地方英妓。有幾種保存的方案可選:SQLAlchemy、Django ORM绍赛、Memcached蔓纠、 Redis、RPC (RabbitMQ/AMQP)吗蚌。
例子我們?nèi)匀皇褂肦edis作為存儲結(jié)果的方案腿倚,任務(wù)結(jié)果存儲配置我們通過Celery的backend參數(shù)來設(shè)定。我們將tasks模塊修改如下:
from celery import Celery
# 使用redis作為broker以及backend
app = Celery('celery_tasks.tasks',
broker='redis://192.168.196.135:6379/8',
backend='redis://192.168.196.135:6379/9')
# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task(a, b):
print("任務(wù)函數(shù)正在執(zhí)行....")
return a + b
我給Celery增加了backend參數(shù)蚯妇,指定redis作為結(jié)果存儲,并將任務(wù)函數(shù)修改為兩個參數(shù)敷燎,并且有返回值。
下面再來執(zhí)行調(diào)用一下這個任務(wù)看看箩言。
In [1]: from celery_tasks.tasks import my_task
# 傳遞參數(shù)至任務(wù)中
In [5]: ret = my_task.delay(10,20)
# 查詢返回值的結(jié)果
In [6]: ret.result
Out[6]: 30
# 查看是否執(zhí)行失敗
In [7]: ret.failed()
Out[7]: False
再來看看worker的執(zhí)行情況硬贯,如下:
可以看到celery任務(wù)已經(jīng)執(zhí)行成功了。
但是這只是一個開始陨收,下一步要看看如何添加定時的任務(wù)饭豹。
優(yōu)化Celery目錄結(jié)構(gòu)
上面直接將Celery的應(yīng)用創(chuàng)建、配置务漩、tasks任務(wù)全部寫在了一個文件拄衰,這樣在后面項目越來越大,也是不方便的饵骨。下面來拆分一下翘悉,并且添加一些常用的參數(shù)。
創(chuàng)建Celery應(yīng)用的文件 celery.py
from celery import Celery
from celery_tasks import celeryconfig
import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
## 創(chuàng)建celery app
app = Celery('celery_tasks')
# 從單獨的配置模塊中加載配置
app.config_from_object(celeryconfig)
# 設(shè)置app自動加載任務(wù)
app.autodiscover_tasks([
'celery_tasks',
])
配置Celery的參數(shù)文件 celeryconfig.py
# 設(shè)置結(jié)果存儲
CELERY_RESULT_BACKEND = 'redis://192.168.196.135:6379/9'
# 設(shè)置代理人broker
BROKER_URL = 'redis://192.168.196.135:6379/8'
# celery 的啟動工作數(shù)量設(shè)置
CELERY_WORKER_CONCURRENCY = 20
# 任務(wù)預(yù)取功能居触,就是每個工作的進(jìn)程/線程在獲取任務(wù)的時候妖混,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮轮洋。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個任務(wù)后進(jìn)行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制制市,如果網(wǎng)絡(luò)資源有限,不建議開足馬力砖瞧。
CELERY_DISABLE_RATE_LIMITS = True
tasks 任務(wù)文件 tasks.py
from celery_tasks.celery import app
# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task(a, b, c):
print("任務(wù)函數(shù)正在執(zhí)行....")
return a + b + c
下一步來開始安裝使用定時任務(wù)息堂。
使用 django-celery-beat 動態(tài)添加定時任務(wù)
celery 4.x 版本在 django 框架中是使用 django-celery-beat 進(jìn)行動態(tài)添加定時任務(wù)的嚷狞。前面雖然已經(jīng)安裝了這個庫块促,但是還要再說明一下荣堰。
官網(wǎng)的配置說明
https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers
安裝 django-celery-beat
pip3 install django-celery-beat
配置 django-celery-beat
在項目的 settings 文件配置:
# 安裝應(yīng)用 django_celery_beat
INSTALLED_APPS = [
'django_celery_beat', # 安裝 django_celery_beat
...
]
# Django設(shè)置時區(qū)
LANGUAGE_CODE = 'zh-hans' # 使用中國語言
TIME_ZONE = 'Asia/Shanghai' # 設(shè)置Django使用中國上海時間
# 如果USE_TZ設(shè)置為True時,Django會使用系統(tǒng)默認(rèn)設(shè)置的時區(qū)竭翠,此時的TIME_ZONE不管有沒有設(shè)置都不起作用
# 如果USE_TZ 設(shè)置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時間振坚。
USE_TZ = False
在 celerconfig.py 配置 django_celery_beat:
from django.conf import settings
import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
# celery beat配置
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
創(chuàng)建 django-celery-beat 相關(guān)表
執(zhí)行Django數(shù)據(jù)庫遷移: python3 manage.py migrate
配置Celery使用 django-celery-beat
配置 celery.py
from celery import Celery
from celery_tasks import celeryconfig
from django.utils import timezone
import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
## 創(chuàng)建celery app
app = Celery('celery_tasks')
# 從單獨的配置模塊中加載配置
app.config_from_object(celeryconfig)
# 設(shè)置app自動加載任務(wù)
app.autodiscover_tasks([
'celery_tasks',
])
# 解決時區(qū)問題,定時任務(wù)啟動就循環(huán)輸出
app.now = timezone.now
配置 celeryconfig.py
from django.conf import settings
import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")
# 設(shè)置結(jié)果存儲
CELERY_RESULT_BACKEND = 'redis://192.168.196.135:6379/9'
# 設(shè)置代理人broker
BROKER_URL = 'redis://192.168.196.135:6379/8'
# celery 的啟動工作數(shù)量設(shè)置
CELERY_WORKER_CONCURRENCY = 20
# 任務(wù)預(yù)取功能,就是每個工作的進(jìn)程/線程在獲取任務(wù)的時候斋扰,會盡量多拿 n 個渡八,以保證獲取的通訊成本可以壓縮。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個任務(wù)后進(jìn)行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制传货,如果網(wǎng)絡(luò)資源有限屎鳍,不建議開足馬力。
CELERY_DISABLE_RATE_LIMITS = True
# celery beat配置
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
編寫任務(wù) tasks.py
from celery_tasks.celery import app
# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task1(a, b, c):
print("任務(wù)1函數(shù)正在執(zhí)行....")
return a + b + c
@app.task
def my_task2():
print("任務(wù)2函數(shù)正在執(zhí)行....")
啟動定時任務(wù)work
啟動定時任務(wù)首先需要有一個work執(zhí)行異步任務(wù)问裕,然后再啟動一個定時器觸發(fā)任務(wù)逮壁。
啟動任務(wù) work
celery -A celery_tasks worker -l info
啟動定時器觸發(fā) beat
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
創(chuàng)建定時任務(wù)說明
創(chuàng)建定時任務(wù)可以查看 django_celery_beat 的官網(wǎng)說明:https://pypi.org/project/django-celery-beat/
下面先來翻譯看看官網(wǎng)的示例,然后再實際應(yīng)用一下粮宛。
官網(wǎng)示例說明
創(chuàng)建基于間隔時間的周期性任務(wù)
初始化周期間隔對象 interval
對象
在創(chuàng)建一個基于間隔時間的周期性任務(wù)之前窥淆,首先需要創(chuàng)建一個 interval
對象,用于提供任務(wù)設(shè)置周期間隔:
>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
# executes every 10 seconds.
>>> schedule, created = IntervalSchedule.objects.get_or_create(
... every=10,
... period=IntervalSchedule.SECONDS,
... )
可以看到上面固定間隔的時間是采用秒 period=IntervalSchedule.SECONDS
巍杈,如果你還想要固定其他的時間單位忧饭,可以設(shè)置其他字段參數(shù),如下:
-
IntervalSchedule.DAYS
固定間隔天數(shù) -
IntervalSchedule.HOURS
固定間隔小時數(shù) -
IntervalSchedule.MINUTES
固定間隔分鐘數(shù) -
IntervalSchedule.SECONDS
固定間隔秒數(shù) -
IntervalSchedule.MICROSECONDS
固定間隔微秒
注意:
如果你有多個周期性任務(wù)都是間隔10秒筷畦,那么這些任務(wù)都應(yīng)該設(shè)置同一個 interval
對象
另外词裤,可以如果不清楚有哪些固定的時間單位,可以這樣查看汁咏,如下:
In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule
In [2]: IntervalSchedule.PERIOD_CHOICES
Out[2]:
(('days', 'Days'),
('hours', 'Hours'),
('minutes', 'Minutes'),
('seconds', 'Seconds'),
('microseconds', 'Microseconds'))
創(chuàng)建周期性間隔任務(wù)
下面這種是無參數(shù)的創(chuàng)建方法:
>>> PeriodicTask.objects.create(
... interval=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.import_contacts', # name of task.
... )
帶參數(shù)的創(chuàng)建方法亚斋,如下:
>>> import json
>>> from datetime import datetime, timedelta
>>> PeriodicTask.objects.create(
... interval=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.import_contacts', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... expires=datetime.utcnow() + timedelta(seconds=30)
... )
創(chuàng)建基于 crontab 的周期性任務(wù)
初始化 crontab 的調(diào)度對象
上面是創(chuàng)建基于固定周期的調(diào)度對象,那么 crontab 就是類似 linux 中的 crontab 定時方式攘滩。
crontab 調(diào)度對象有如下字段:minute
, hour
, day_of_week
, day_of_month
帅刊、 month_of_year
對應(yīng)配置 30 * * * *
的 crontab 定時寫法。
>>> from django_celery_beat.models import CrontabSchedule, PeriodicTask
>>> schedule, _ = CrontabSchedule.objects.get_or_create(
... minute='30',
... hour='*',
... day_of_week='*',
... day_of_month='*',
... month_of_year='*',
... timezone=pytz.timezone('Canada/Pacific')
... )
要注意上面的這個時區(qū)設(shè)置漂问,中國的時區(qū)應(yīng)該設(shè)置為 timezone=pytz.timezone('Asia/Shanghai')
創(chuàng)建基于 crontab 調(diào)度的定時任務(wù)
創(chuàng)建任務(wù)的方式跟創(chuàng)建固定間隔時間的周期性任務(wù)基本一致赖瞒,只不過將 interval=schedule
改為了 crontab=schedule
,有參數(shù)的寫法也是一致蚤假。
>>> PeriodicTask.objects.create(
... crontab=schedule,
... name='Importing contacts',
... task='proj.tasks.import_contacts',
... )
暫時停止周期性任務(wù)
>>> periodic_task.enabled = False
>>> periodic_task.save()
啟動運行周期任務(wù)的示例
執(zhí)行周期性任務(wù)的前提是需要有 workers
去執(zhí)行栏饮,那么首先需要已經(jīng)安裝好了 Celery,上面我們已經(jīng)安裝好了磷仰。也就是跟我前面說的袍嬉,celery的 workers 和 beat 定時服務(wù)都需要同時開啟。
-
開啟 celery 的 worker 服務(wù)
$ celery -A [project-name] worker --loglevel=info
-
作為一個單獨的進(jìn)程,啟動beat服務(wù)
$ celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
或者你也可以使用
-S (scheduler flag)
標(biāo)識符伺通,更多參數(shù)說明查看celery beat --help
$ celery -A [project-name] beat -l info -S django
另外箍土,作為替代方案,你也可以只使用一個命令運行上面的兩個步驟(worker和beat服務(wù))(建議只用于開發(fā)環(huán)境)
$ celery -A [project-name] worker --beat --scheduler django --loglevel=info
現(xiàn)在啟動完這兩個服務(wù)罐监,就可以開始添加周期性任務(wù)了吴藻。
具體操作演練
看完了上面官網(wǎng)的說明,下面拿我前面寫好的兩個task任務(wù)來創(chuàng)建一下周期性任務(wù)弓柱。
創(chuàng)建基于間隔時間的周期性任務(wù)
初始化周期間隔對象 interval 對象
In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule
# 創(chuàng)建一個間隔10秒鐘的 interval 對象
In [2]: schedule, created = IntervalSchedule.objects.get_or_create(
...: every=10,
...: period=IntervalSchedule.SECONDS,
...: )
# 查詢已創(chuàng)建的 interval 對象
In [7]: IntervalSchedule.objects.all() Out[7]: <QuerySet [<IntervalSchedule: every 10 seconds>]>
創(chuàng)建周期性間隔任務(wù)
創(chuàng)建一個無參數(shù)的周期性間隔任務(wù):
In [3]: PeriodicTask.objects.create(
...: interval=schedule, # 上面創(chuàng)建10秒的間隔 interval 對象
...: name='my_task2', # 設(shè)置任務(wù)的name值
...: task='celery_tasks.tasks.my_task2', # 指定需要周期性執(zhí)行的任務(wù)
...: )
Out[3]: <PeriodicTask: my_task1: every 10 seconds>
創(chuàng)建之后沟堡,beat服務(wù)日志顯示如下:
[2020-05-15 10:23:00,176: INFO/MainProcess] Scheduler: Sending due task my_task2 (celery_tasks.tasks.my_task2)
[2020-05-15 10:23:10,180: INFO/MainProcess] Scheduler: Sending due task my_task2 (celery_tasks.tasks.my_task2)
worker服務(wù)日志顯示如下:
[2020-05-15 10:22:50,191: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd]
[2020-05-15 10:22:50,193: WARNING/ForkPoolWorker-1] 任務(wù)2函數(shù)正在執(zhí)行....
[2020-05-15 10:22:50,196: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd] succeeded in 0.003136722996714525s: None
[2020-05-15 10:23:00,191: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9]
[2020-05-15 10:23:00,194: WARNING/ForkPoolWorker-1] 任務(wù)2函數(shù)正在執(zhí)行....
[2020-05-15 10:23:00,197: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9] succeeded in 0.0030223939975257963s: None
[2020-05-15 10:23:10,194: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[4b87f2f2-30d0-4c20-b028-601f28cb8193]
[2020-05-15 10:23:10,196: WARNING/ForkPoolWorker-1] 任務(wù)2函數(shù)正在執(zhí)行....
創(chuàng)建一個帶參數(shù)的周期性間隔任務(wù):
In [27]: import json
In [28]: from datetime import datetime, timedelta
In [29]: PeriodicTask.objects.create(
...: interval=schedule, # 設(shè)置使用上面創(chuàng)建的 10 秒間隔 interval 對象
...: name='my_task1', # 設(shè)置周期性任務(wù)的名稱
...: task='celery_tasks.tasks.my_task1', # 設(shè)置指定執(zhí)行的task
...: args=json.dumps([10, 20, 30]), # 傳遞task需要的參數(shù)
...: expires=datetime.now() + timedelta(seconds=30) # 任務(wù)的執(zhí)行超時時間,避免任務(wù)執(zhí)行時間過長
...: )
Out[29]: <PeriodicTask: my_task1: every 10 seconds>
查看beat服務(wù)的日志:
查看worker服務(wù)的日志:
周期性任務(wù)在worker是否串行執(zhí)行還是并行矢空?
這里有個疑問航罗,如果只有一個worker,其中一個task執(zhí)行時間比較長屁药,例如:上面的兩個任務(wù)都設(shè)置休眠10秒伤哺,確認(rèn)是否可以同時執(zhí)行,還是要開啟多個worker執(zhí)行者祖。
設(shè)置 task 任務(wù)進(jìn)行休眠
from celery_tasks.celery import app
import time
# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task1(a, b, c):
print("任務(wù)1函數(shù)正在執(zhí)行....")
print("任務(wù)1函數(shù)休眠10秒...")
time.sleep(10)
return a + b + c
@app.task
def my_task2():
print("任務(wù)2函數(shù)正在執(zhí)行....")
print("任務(wù)2函數(shù)休眠10秒....")
time.sleep(10)
刪除這兩個周期性任務(wù)立莉,然后再創(chuàng)建后查看beat服務(wù)以及worker服務(wù)日志
刪除之前的兩個周期性任務(wù):
# 暫停執(zhí)行兩個周期性任務(wù)
In [32]: PeriodicTask.objects.get(name="my_task1").enabled = False In [33]: PeriodicTask.objects.get(name="my_task1").save()
In [34]: PeriodicTask.objects.get(name="my_task2").enabled = False In [35]: PeriodicTask.objects.get(name="my_task2").save()
# 刪除任務(wù)
In [36]: PeriodicTask.objects.get(name="my_task1").delete() Out[36]: (1, {'django_celery_beat.PeriodicTask': 1})
In [37]: PeriodicTask.objects.get(name="my_task2").delete() Out[37]: (1, {'django_celery_beat.PeriodicTask': 1})
重啟beat服務(wù)、worker服務(wù):
因為修改了 task七问,需要重啟服務(wù)才能重新加載蜓耻。
# 啟動beat進(jìn)程
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 啟動一個worker進(jìn)程
celery -A celery_tasks worker -l info
重新創(chuàng)建兩個周期性任務(wù):
In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule
# 創(chuàng)建一個間隔10秒鐘的 interval 對象
In [2]: schedule, created = IntervalSchedule.objects.get_or_create(
...: every=10,
...: period=IntervalSchedule.SECONDS,
...: )
# 創(chuàng)建無參數(shù)周期性任務(wù)
In [3]: PeriodicTask.objects.create(
...: interval=schedule, # 上面創(chuàng)建10秒的間隔 interval 對象
...: name='my_task2', # 設(shè)置任務(wù)的name值
...: task='celery_tasks.tasks.my_task2', # 指定需要周期性執(zhí)行的任務(wù)
...: )
Out[3]: <PeriodicTask: my_task1: every 10 seconds>
# 創(chuàng)建帶參數(shù)周期性任務(wù)
In [27]: import json
In [28]: from datetime import datetime, timedelta
In [29]: PeriodicTask.objects.create(
...: interval=schedule, # 設(shè)置使用上面創(chuàng)建的 10 秒間隔 interval 對象
...: name='my_task1', # 設(shè)置周期性任務(wù)的名稱
...: task='celery_tasks.tasks.my_task1', # 設(shè)置指定執(zhí)行的task
...: args=json.dumps([10, 20, 30]), # 傳遞task需要的參數(shù)
...: expires=datetime.now() + timedelta(seconds=30)
...: )
Out[29]: <PeriodicTask: my_task1: every 10 seconds>
查看beat推送周期性任務(wù)的日志:
查看單個worker的執(zhí)行日志:
可以看到,因為worker不能并行執(zhí)行任務(wù)械巡,所以任務(wù)從beat發(fā)出來之后刹淌,在單個worker是串行執(zhí)行的,所以如果想要并發(fā)執(zhí)行worker讥耗,可以開啟多線程的方式有勾,或者開啟多個進(jìn)程。
開啟2個worker來查看執(zhí)行日志:
所以需要并行執(zhí)行任務(wù)的時候古程,就需要設(shè)置多個worker來執(zhí)行任務(wù)蔼卡。
創(chuàng)建基于 crontab 的周期性任務(wù)
無限一直循環(huán)執(zhí)行的BUG
crontab周期性任務(wù)在使用的時候會出現(xiàn)beat服務(wù)一直不停發(fā)任務(wù)的情況,導(dǎo)致無法使用挣磨。目前嘗試多種方式雇逞,仍未有解決的辦法。
初始化 crontab 的調(diào)度對象
In [29]: import pytz
In [30]: from django_celery_beat.models import CrontabSchedule, PeriodicTask
In [31]: schedule, _ = CrontabSchedule.objects.get_or_create(
...: minute='*',
...: hour='*',
...: day_of_week='*',
...: day_of_month='*',
...: month_of_year='*',
...: timezone=pytz.timezone('Asia/Shanghai')
...: )
In [32]: CrontabSchedule.objects.all() Out[32]: <QuerySet [<CrontabSchedule: * * * * * (m/h/d/dM/MY) Asia/Shanghai>, <CrontabSchedule: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>
創(chuàng)建一個無參數(shù)的定時任務(wù):
In [36]: PeriodicTask.objects.create(
...: crontab=schedule, # 上面創(chuàng)建的 crontab 對象 * * * * *茁裙,表示每分鐘執(zhí)行一次
...: name='my_task2_crontab', # 設(shè)置任務(wù)的name值
...: task='celery_tasks.tasks.my_task2', # 指定需要周期性執(zhí)行的任務(wù)
...: )
Out[36]: <PeriodicTask: my_task2_crontab: * * * * * (m/h/d/dM/MY) Asia/Shanghai>
beat服務(wù)不停發(fā)任務(wù)的日志塘砸,如下:
周期性任務(wù)的查詢、刪除等操作
其實周期性任務(wù)也是存儲在數(shù)據(jù)庫的數(shù)據(jù)晤锥,基本上是基于ORM的操作的掉蔬。
周期性任務(wù)的查詢
# 導(dǎo)入周期性任務(wù)
In [1]: from django_celery_beat.models import PeriodicTask
# 查詢目前所有的周期性任務(wù)
In [3]: PeriodicTask.objects.all() Out[3]: <ExtendedQuerySet [<PeriodicTask: Importing contacts: every 10 seconds>, <PeriodicTask: my_task: every 10 seconds>, <PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>
# 遍歷周期性任務(wù)
In [4]: for task in PeriodicTask.objects.all():
...: print(task.id)
...:
1
2
3
# 通過id獲取其中一個周期性任務(wù)
In [5]: task1 = PeriodicTask.objects.get(id=1)
In [6]: task1 Out[6]: <PeriodicTask: Importing contacts: every 10 seconds>
In [7]: PeriodicTask.objects.get(name="my_task")
Out[7]: <PeriodicTask: my_task: every 10 seconds>
# 通過name獲取其中的周期性任務(wù)
In [8]: task2 = PeriodicTask.objects.get(name="my_task")
In [9]: task2 Out[9]: <PeriodicTask: my_task: every 10 seconds>
周期性任務(wù)的刪除
獲取到了周期性任務(wù)之后廊宪,好奇的我嘗試直接刪除,發(fā)現(xiàn)直接死循環(huán):
# 刪除周期性的任務(wù)女轿,千萬不要這樣做挤忙,會死循環(huán)
In [10]: task1.delete() Out[10]: (1, {'django_celery_beat.PeriodicTask': 1})
In [11]: task2.delete() Out[11]: (1, {'django_celery_beat.PeriodicTask': 1})
如果要刪除周期性任務(wù),必須首先暫停任務(wù)谈喳,然后再刪除,如下:
# 設(shè)置name為 my_taks1 的任務(wù)暫停執(zhí)行
In [6]: PeriodicTask.objects.get(name="my_task1").enabled = False
In [7]: PeriodicTask.objects.get(name="my_task1").save()
# 刪除該任務(wù)
In [8]: PeriodicTask.objects.get(name="my_task1").delete()
Out[8]: (1, {'django_celery_beat.PeriodicTask': 1})
周期性任務(wù)暫停之后戈泼,重新啟動
當(dāng)然會有暫停任務(wù)之后婿禽,重新開啟任務(wù)的需求,如下:
# 設(shè)置任務(wù)的 enabled 為 True 即可:
In [21]: PeriodicTask.objects.get(name="my_task1").enabled = True
In [22]: PeriodicTask.objects.get(name="my_task1").save()
更多精彩原創(chuàng)Devops文章大猛,快來關(guān)注我的Devops社群吧: