一彻采、環(huán)境搭建
配置運行環(huán)境:
$ python -m venv env
$ source ./env/bin/activate
$ pip install django-celery-beat django-celery-results redis
項目初始化:
$ django-admin startproject schedule_task
$ cd schedule_task
$ django-admin startapp schedules
修改 schedule_task/settings.py
配置文件,將 ALLOWED_HOSTS = []
改為 ALLOWED_HOSTS = ['*']
運行 web 服務(wù): $ python manage.py runserver 0.0.0.0:8000
二碗淌、啟用 schedule-celery-beat 和 schedule-celery-results
在 schedule_task/settings.py
文件中的 INSTALLED_APPS
配置項下,添加如下三個應(yīng)用:
INSTALLED_APPS = [
...
'schedules',
'django_celery_results',
'django_celery_beat'
]
其中 django_celery_results
用于在數(shù)據(jù)庫中存儲 Celery 任務(wù)執(zhí)行的結(jié)果。
django_celery_beat
則用于在數(shù)據(jù)庫中記錄預(yù)先定義好的任務(wù)執(zhí)行規(guī)則(比如每隔一分鐘執(zhí)行一次)皂贩,以及與這些規(guī)則關(guān)聯(lián)的待執(zhí)行的具體任務(wù)决采。
數(shù)據(jù)庫遷移自沧,創(chuàng)建超級用戶:
$ python manage.py migrate
$ python manage.py createsuperuser
三、系統(tǒng)后臺
啟動 web 服務(wù)树瞭,用上一步中創(chuàng)建的超級用戶登錄后臺管理系統(tǒng):http://127.0.0.1:8000/admin 拇厢。界面如下:
界面中 CELERY RESULTS
為 django_celery_results
創(chuàng)建的用于保存任務(wù)結(jié)果的數(shù)據(jù)庫表爱谁。
PERIODIC TASKS
下面則是由 django_celery_beat
創(chuàng)建的用于保存 Celery 任務(wù)及其執(zhí)行規(guī)則的幾張數(shù)據(jù)庫表,具體含義如下:
- Clocked:定義在具體某個時間點觸發(fā)的執(zhí)行規(guī)則
- Crontabs:類似于 Linux 系統(tǒng)下 crontab 的語法
- Intervals:定義任務(wù)重復(fù)執(zhí)行的時間間隔
- Periodic tasks:具體某個待執(zhí)行的任務(wù)孝偎,需要與其他表(Clocked访敌、Crontabs、Intervals衣盾、Solar events)中定義的執(zhí)行規(guī)則相關(guān)聯(lián)
- Solar events:根據(jù)日升和日落等太陽運行軌跡確定執(zhí)行規(guī)則
如定義一個每隔 10 秒執(zhí)行一次的規(guī)則捐顷,步驟如下:
四、創(chuàng)建 Celery 任務(wù)
Celery 任務(wù)需要在源代碼中手動創(chuàng)建雨效,具體可參考官方文檔 Using Celery With Django迅涮,簡要步驟如下:
schedule_task/schedule_task/celery.py
:
# schedule_task/schedule_task/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'schedule_task.settings')
app = Celery('schedule_task')
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
schedule_task/schedule_task/__init__.py
:
# schedule_task/schedule_task/__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
schedule_tasks/schedules/tasks.py
:
# schedule_tasks/schedules/tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task(bind=True)
def debug_task(self):
return f'Hello Celery, the task id is: {self.request.id}'
使用 Redis 作為 Message Broker,Django 默認配置的數(shù)據(jù)庫作為 Result Backend徽龟,DatabaseScheduler
作為 Celery 的任務(wù)調(diào)度器:
schedule_task/schedule_task/settings.py
:
# schedule_task/schedule_task/settings.py
# ...
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
此時可進入系統(tǒng)管理后臺叮姑,將任務(wù) debug_task
關(guān)聯(lián)給每隔 10s 執(zhí)行的規(guī)則:
只需要填寫基本信息,選擇相關(guān)聯(lián)的任務(wù)和 Schedule 即可据悔。此外传透,還可以根據(jù)需求自行定義計劃任務(wù)的其他參數(shù),如:
- 生效時間
- 是否只執(zhí)行一次
- 傳遞給任務(wù)的參數(shù)
- 失效時間
五极颓、運行測試
為了使系統(tǒng)正常運行朱盐,需要同時開啟三個服務(wù):
- web 服務(wù):
python manage.py runserver 0.0.0.0:8000
- Celery Worker:
celery -A schedule_task worker -l info
- Celery Beat:
celery -A schedule_task beat -l info
服務(wù)成功運行后,輸出信息如下
- Celery Beat 持續(xù)監(jiān)測數(shù)據(jù)庫中存儲的計劃任務(wù)信息菠隆,將滿足觸發(fā)條件的任務(wù)傳遞給 Celery Worker 執(zhí)行:
$ celery -A schedule_task beat -l info
celery beat v4.4.2 (cliffs) is starting.
__ - ... __ - _
LocalTime -> 2020-05-08 03:44:41
Configuration ->
. broker -> redis://127.0.0.1:6379/0
. loader -> celery.loaders.app.AppLoader
. scheduler -> django_celery_beat.schedulers.DatabaseScheduler
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 seconds (5s)
[2020-05-08 03:44:41,578: INFO/MainProcess] beat: Starting...
[2020-05-08 03:44:41,578: INFO/MainProcess] Writing entries...
[2020-05-08 03:44:46,745: INFO/MainProcess] Writing entries...
[2020-05-08 03:44:51,594: INFO/MainProcess] Scheduler: Sending due task debug_task (schedules.tasks.debug_task)
[2020-05-08 03:45:01,585: INFO/MainProcess] Scheduler: Sending due task debug_task (schedules.tasks.debug_task)
[2020-05-08 03:45:11,587: INFO/MainProcess] Scheduler: Sending due task debug_task (schedules.tasks.debug_task)
[2020-05-08 03:45:21,588: INFO/MainProcess] Scheduler: Sending due task debug_task (schedules.tasks.debug_task)
[2020-05-08 03:45:31,591: INFO/MainProcess] Scheduler: Sending due task debug_task (schedules.tasks.debug_task)
- Celery Worker 負責(zé)執(zhí)行由 Beat 傳過來的任務(wù)兵琳,輸出執(zhí)行結(jié)果并將結(jié)果保存至 result backend(即數(shù)據(jù)庫):
$ celery -A schedule_task worker -l info
[tasks]
. schedules.tasks.debug_task
[2020-05-08 03:44:05,521: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-05-08 03:44:05,529: INFO/MainProcess] mingle: searching for neighbors
[2020-05-08 03:44:06,546: INFO/MainProcess] mingle: all alone
[2020-05-08 03:44:06,558: INFO/MainProcess] celery@mirrors ready.
[2020-05-08 03:44:51,607: INFO/MainProcess] Received task: schedules.tasks.debug_task[3d6b77bb-d4b7-4a5d-b05f-3b85e5dafce7]
[2020-05-08 03:44:51,687: INFO/ForkPoolWorker-1] Task schedules.tasks.debug_task[3d6b77bb-d4b7-4a5d-b05f-3b85e5dafce7] succeeded in 0.07936301361769438s: 'Hello Celery, the task id is: 3d6b77bb-d4b7-4a5d-b05f-3b85e5dafce7'
[2020-05-08 03:45:01,588: INFO/MainProcess] Received task: schedules.tasks.debug_task[a097dc02-71c9-4cab-9871-92ed1a7f2f45]
[2020-05-08 03:45:01,660: INFO/ForkPoolWorker-1] Task schedules.tasks.debug_task[a097dc02-71c9-4cab-9871-92ed1a7f2f45] succeeded in 0.07120843604207039s: 'Hello Celery, the task id is: a097dc02-71c9-4cab-9871-92ed1a7f2f45'
[2020-05-08 03:45:11,590: INFO/MainProcess] Received task: schedules.tasks.debug_task[1b0dfc23-d3cc-495a-b306-9d1defe4b119]
[2020-05-08 03:45:11,659: INFO/ForkPoolWorker-1] Task schedules.tasks.debug_task[1b0dfc23-d3cc-495a-b306-9d1defe4b119] succeeded in 0.0677587790414691s: 'Hello Celery, the task id is: 1b0dfc23-d3cc-495a-b306-9d1defe4b119'
后臺管理系統(tǒng) task results 界面:task results 里默認顯示的是 UTC 時間,可以修改 schedule_task/schedule_task/settings.py
配置文件更改時區(qū)設(shè)置:
TIME_ZONE = 'Asia/Shanghai'
PS:實際測試以后骇径,此處的時區(qū)設(shè)置只會對網(wǎng)頁端 task results 表格中顯示的時間起作用躯肌,實際保存到 task results 數(shù)據(jù)庫表中的時間依舊是 UTC 時間。如需要二次開發(fā)破衔,可以調(diào)用返回的 datetime 對象的 astimezone
方法進行時區(qū)格式轉(zhuǎn)換清女。
參考資料
Celery 4.4.2 documentation: First steps with Django
django-celery-beat - Database-backed Periodic Tasks
django-celery-results - Celery Result Backends for Django