22. Celery 4.x 動態(tài)添加定時任務(wù)

需求

為了能夠在Web端口動態(tài)添加定時任務(wù)的需求尔邓,本次來調(diào)研一下Celery 4.x 在Django框架下該如何動態(tài)添加定時任務(wù)晾剖。

Celery動態(tài)添加定時任務(wù)的官方文檔

celery文檔:https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

django-celery-beat文檔 : https://pypi.org/project/django-celery-beat/

新建Django項目

安裝最新版本的Django

pip3 install django

當(dāng)前我安裝的版本是 3.0.6

創(chuàng)建項目

django-admin startproject 項目名稱

執(zhí)行如下:

django-admin startproject django_con .

安裝 celery

pip3 install django-celery
pip3 install -U Celery 
pip3 install "celery[librabbitmq,redis,auth,msgpack]" 
pip3 install django-celery-beat # 用于動態(tài)添加定時任務(wù)
pip3 install django-celery-results
pip3 install redis

安裝完畢后,相關(guān)依賴版本

amqp==2.5.2
anyjson==0.3.3
asgiref==3.2.7
billiard==3.6.3.0
celery==4.4.2
cffi==1.14.0
cryptography==2.9.2
Django==3.0.6
django-celery==3.3.1
django-celery-beat==2.0.0
django-celery-results==1.2.1
django-timezone-field==4.0
dnspython==1.16.0
eventlet==0.25.2
greenlet==0.4.15
importlib-metadata==1.6.0
kombu==4.6.8
monotonic==1.5
pycparser==2.20
python-crontab==2.4.2
python-dateutil==2.8.1
pytz==2020.1
redis==3.5.1
six==1.14.0
sqlparse==0.3.1
vine==1.3.0
zipp==3.1.0

測試使用Celery應(yīng)用

創(chuàng)建Celery目錄結(jié)構(gòu)

首先在Django項目中創(chuàng)建一個celery_tasks文件夾梯嗽,再創(chuàng)建tasks.py模塊, 如下:

image-20200514161135291

編寫tasks.py 其內(nèi)容為:

from celery import Celery

# 使用redis作為broker
app = Celery('celery_tasks.tasks', broker='redis://192.168.196.135:6379/8')

# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task():
    print("任務(wù)函數(shù)正在執(zhí)行....")

Celery第一個參數(shù)是給其設(shè)定一個名字齿尽, 第二參數(shù)我們設(shè)定一個中間人broker, 在這里我們使用Redis作為中間人。my_task函數(shù)是我們編寫的一個任務(wù)函數(shù)灯节, 通過加上裝飾器app.task, 將其注冊到broker的隊列中循头。

現(xiàn)在我們在創(chuàng)建一個worker, 等待處理隊列中的任務(wù)炎疆。

進(jìn)入項目的根目錄卡骂,執(zhí)行命令: celery -A celery_tasks.tasks worker -l info

image-20200514161616827

調(diào)用任務(wù)

下面來測試一下功能,創(chuàng)建一個任務(wù)形入,加入任務(wù)隊列中全跨,提供worker執(zhí)行。

進(jìn)入python終端, 執(zhí)行如下代碼:

[root@python_env django_cron]# python3 manage.py shell
In [3]: from celery_tasks.tasks import my_task

# 調(diào)用一個任務(wù)函數(shù)亿遂,將會返回一個AsyncResult對象浓若,這個對象可以用來檢查任務(wù)的狀態(tài)或者獲得任務(wù)的返回值渺杉。
In [4]: my_task.delay()
Out[4]: <AsyncResult: 647b2589-95d2-45c9-a9a7-0b5530caf249>

返回worker的終端界面,查看任務(wù)執(zhí)行情況挪钓,如下:

image-20200514163314267

可以看到已經(jīng)收到任務(wù)是越,并執(zhí)行打印了信息。

存儲結(jié)果

如果我們想跟蹤任務(wù)的狀態(tài)诵原,Celery需要將結(jié)果保存到某個地方英妓。有幾種保存的方案可選:SQLAlchemy、Django ORM绍赛、Memcached蔓纠、 Redis、RPC (RabbitMQ/AMQP)吗蚌。

例子我們?nèi)匀皇褂肦edis作為存儲結(jié)果的方案腿倚,任務(wù)結(jié)果存儲配置我們通過Celery的backend參數(shù)來設(shè)定。我們將tasks模塊修改如下:

from celery import Celery

# 使用redis作為broker以及backend
app = Celery('celery_tasks.tasks',
             broker='redis://192.168.196.135:6379/8',
             backend='redis://192.168.196.135:6379/9')

# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task(a, b):
    print("任務(wù)函數(shù)正在執(zhí)行....")
    return a + b

我給Celery增加了backend參數(shù)蚯妇,指定redis作為結(jié)果存儲,并將任務(wù)函數(shù)修改為兩個參數(shù)敷燎,并且有返回值。

下面再來執(zhí)行調(diào)用一下這個任務(wù)看看箩言。

In [1]: from celery_tasks.tasks import my_task

# 傳遞參數(shù)至任務(wù)中
In [5]: ret = my_task.delay(10,20)

# 查詢返回值的結(jié)果
In [6]: ret.result
Out[6]: 30

# 查看是否執(zhí)行失敗
In [7]: ret.failed()
Out[7]: False

再來看看worker的執(zhí)行情況硬贯,如下:

image-20200514164236552

可以看到celery任務(wù)已經(jīng)執(zhí)行成功了。

但是這只是一個開始陨收,下一步要看看如何添加定時的任務(wù)饭豹。

優(yōu)化Celery目錄結(jié)構(gòu)

上面直接將Celery的應(yīng)用創(chuàng)建、配置务漩、tasks任務(wù)全部寫在了一個文件拄衰,這樣在后面項目越來越大,也是不方便的饵骨。下面來拆分一下翘悉,并且添加一些常用的參數(shù)。

image-20200514165214739

創(chuàng)建Celery應(yīng)用的文件 celery.py

from celery import Celery
from celery_tasks import celeryconfig

import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")

## 創(chuàng)建celery app
app = Celery('celery_tasks')

# 從單獨的配置模塊中加載配置
app.config_from_object(celeryconfig)

# 設(shè)置app自動加載任務(wù)
app.autodiscover_tasks([
    'celery_tasks',
])

配置Celery的參數(shù)文件 celeryconfig.py

# 設(shè)置結(jié)果存儲
CELERY_RESULT_BACKEND = 'redis://192.168.196.135:6379/9'
# 設(shè)置代理人broker
BROKER_URL = 'redis://192.168.196.135:6379/8'
# celery 的啟動工作數(shù)量設(shè)置
CELERY_WORKER_CONCURRENCY = 20
# 任務(wù)預(yù)取功能居触,就是每個工作的進(jìn)程/線程在獲取任務(wù)的時候妖混,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮轮洋。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個任務(wù)后進(jìn)行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制制市,如果網(wǎng)絡(luò)資源有限,不建議開足馬力砖瞧。
CELERY_DISABLE_RATE_LIMITS = True

tasks 任務(wù)文件 tasks.py

from celery_tasks.celery import app

# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task(a, b, c):
    print("任務(wù)函數(shù)正在執(zhí)行....")
    return a + b + c

下一步來開始安裝使用定時任務(wù)息堂。

使用 django-celery-beat 動態(tài)添加定時任務(wù)

celery 4.x 版本在 django 框架中是使用 django-celery-beat 進(jìn)行動態(tài)添加定時任務(wù)的嚷狞。前面雖然已經(jīng)安裝了這個庫块促,但是還要再說明一下荣堰。

官網(wǎng)的配置說明

https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#beat-custom-schedulers

image-20200514180603493

安裝 django-celery-beat

pip3 install django-celery-beat

配置 django-celery-beat

在項目的 settings 文件配置:

# 安裝應(yīng)用 django_celery_beat
INSTALLED_APPS = [
    'django_celery_beat', # 安裝 django_celery_beat
    ...
]

# Django設(shè)置時區(qū)
LANGUAGE_CODE = 'zh-hans'  # 使用中國語言
TIME_ZONE = 'Asia/Shanghai'  # 設(shè)置Django使用中國上海時間
# 如果USE_TZ設(shè)置為True時,Django會使用系統(tǒng)默認(rèn)設(shè)置的時區(qū)竭翠,此時的TIME_ZONE不管有沒有設(shè)置都不起作用
# 如果USE_TZ 設(shè)置為False,TIME_ZONE = 'Asia/Shanghai', 則使用上海的UTC時間振坚。
USE_TZ = False

在 celerconfig.py 配置 django_celery_beat:

from django.conf import settings

import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")

# celery beat配置
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

創(chuàng)建 django-celery-beat 相關(guān)表

執(zhí)行Django數(shù)據(jù)庫遷移: python3 manage.py migrate

image-20200514170553728

配置Celery使用 django-celery-beat

配置 celery.py

from celery import Celery
from celery_tasks import celeryconfig
from django.utils import timezone

import os
# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")

## 創(chuàng)建celery app
app = Celery('celery_tasks')

# 從單獨的配置模塊中加載配置
app.config_from_object(celeryconfig)

# 設(shè)置app自動加載任務(wù)
app.autodiscover_tasks([
    'celery_tasks',
])

# 解決時區(qū)問題,定時任務(wù)啟動就循環(huán)輸出
app.now = timezone.now

配置 celeryconfig.py

from django.conf import settings
import os

# 為celery設(shè)置環(huán)境變量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_con.settings")

# 設(shè)置結(jié)果存儲
CELERY_RESULT_BACKEND = 'redis://192.168.196.135:6379/9'
# 設(shè)置代理人broker
BROKER_URL = 'redis://192.168.196.135:6379/8'
# celery 的啟動工作數(shù)量設(shè)置
CELERY_WORKER_CONCURRENCY = 20
# 任務(wù)預(yù)取功能,就是每個工作的進(jìn)程/線程在獲取任務(wù)的時候斋扰,會盡量多拿 n 個渡八,以保證獲取的通訊成本可以壓縮。
CELERYD_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情況下可以防止死鎖
CELERYD_FORCE_EXECV = True
# celery 的 worker 執(zhí)行多少個任務(wù)后進(jìn)行重啟操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制传货,如果網(wǎng)絡(luò)資源有限屎鳍,不建議開足馬力。
CELERY_DISABLE_RATE_LIMITS = True

# celery beat配置
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = settings.TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

編寫任務(wù) tasks.py

from celery_tasks.celery import app

# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task1(a, b, c):
    print("任務(wù)1函數(shù)正在執(zhí)行....")
    return a + b + c

@app.task
def my_task2():
    print("任務(wù)2函數(shù)正在執(zhí)行....")

啟動定時任務(wù)work

啟動定時任務(wù)首先需要有一個work執(zhí)行異步任務(wù)问裕,然后再啟動一個定時器觸發(fā)任務(wù)逮壁。

啟動任務(wù) work

celery -A celery_tasks worker -l info

啟動定時器觸發(fā) beat

celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

創(chuàng)建定時任務(wù)說明

創(chuàng)建定時任務(wù)可以查看 django_celery_beat 的官網(wǎng)說明:https://pypi.org/project/django-celery-beat/

image-20200514180952462

下面先來翻譯看看官網(wǎng)的示例,然后再實際應(yīng)用一下粮宛。

官網(wǎng)示例說明

創(chuàng)建基于間隔時間的周期性任務(wù)

初始化周期間隔對象 interval 對象

在創(chuàng)建一個基于間隔時間的周期性任務(wù)之前窥淆,首先需要創(chuàng)建一個 interval 對象,用于提供任務(wù)設(shè)置周期間隔:

>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule

# executes every 10 seconds.
>>> schedule, created = IntervalSchedule.objects.get_or_create(
...     every=10,
...     period=IntervalSchedule.SECONDS,
... )

可以看到上面固定間隔的時間是采用秒 period=IntervalSchedule.SECONDS巍杈,如果你還想要固定其他的時間單位忧饭,可以設(shè)置其他字段參數(shù),如下:

  • IntervalSchedule.DAYS 固定間隔天數(shù)
  • IntervalSchedule.HOURS 固定間隔小時數(shù)
  • IntervalSchedule.MINUTES 固定間隔分鐘數(shù)
  • IntervalSchedule.SECONDS 固定間隔秒數(shù)
  • IntervalSchedule.MICROSECONDS 固定間隔微秒

注意:

如果你有多個周期性任務(wù)都是間隔10秒筷畦,那么這些任務(wù)都應(yīng)該設(shè)置同一個 interval 對象

另外词裤,可以如果不清楚有哪些固定的時間單位,可以這樣查看汁咏,如下:

In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule                                                                                                  
In [2]: IntervalSchedule.PERIOD_CHOICES                                                                                                                                       
Out[2]: 
(('days', 'Days'),
 ('hours', 'Hours'),
 ('minutes', 'Minutes'),
 ('seconds', 'Seconds'),
 ('microseconds', 'Microseconds'))
創(chuàng)建周期性間隔任務(wù)

下面這種是無參數(shù)的創(chuàng)建方法:

>>> PeriodicTask.objects.create(
...     interval=schedule,                  # we created this above.
...     name='Importing contacts',          # simply describes this periodic task.
...     task='proj.tasks.import_contacts',  # name of task.
... )

帶參數(shù)的創(chuàng)建方法亚斋,如下:

>>> import json
>>> from datetime import datetime, timedelta

>>> PeriodicTask.objects.create(
...     interval=schedule,                  # we created this above.
...     name='Importing contacts',          # simply describes this periodic task.
...     task='proj.tasks.import_contacts',  # name of task.
...     args=json.dumps(['arg1', 'arg2']),
...     kwargs=json.dumps({
...        'be_careful': True,
...     }),
...     expires=datetime.utcnow() + timedelta(seconds=30)
... )

創(chuàng)建基于 crontab 的周期性任務(wù)

初始化 crontab 的調(diào)度對象

上面是創(chuàng)建基于固定周期的調(diào)度對象,那么 crontab 就是類似 linux 中的 crontab 定時方式攘滩。

crontab 調(diào)度對象有如下字段:minute, hour, day_of_week, day_of_month 帅刊、 month_of_year

對應(yīng)配置 30 * * * * 的 crontab 定時寫法。

>>> from django_celery_beat.models import CrontabSchedule, PeriodicTask
>>> schedule, _ = CrontabSchedule.objects.get_or_create(
...     minute='30',
...     hour='*',
...     day_of_week='*',
...     day_of_month='*',
...     month_of_year='*',
...     timezone=pytz.timezone('Canada/Pacific')
... )

要注意上面的這個時區(qū)設(shè)置漂问,中國的時區(qū)應(yīng)該設(shè)置為 timezone=pytz.timezone('Asia/Shanghai')

創(chuàng)建基于 crontab 調(diào)度的定時任務(wù)

創(chuàng)建任務(wù)的方式跟創(chuàng)建固定間隔時間的周期性任務(wù)基本一致赖瞒,只不過將 interval=schedule 改為了 crontab=schedule,有參數(shù)的寫法也是一致蚤假。

>>> PeriodicTask.objects.create(
...     crontab=schedule,
...     name='Importing contacts',
...     task='proj.tasks.import_contacts',
... )

暫時停止周期性任務(wù)

>>> periodic_task.enabled = False
>>> periodic_task.save()

啟動運行周期任務(wù)的示例

執(zhí)行周期性任務(wù)的前提是需要有 workers 去執(zhí)行栏饮,那么首先需要已經(jīng)安裝好了 Celery,上面我們已經(jīng)安裝好了磷仰。也就是跟我前面說的袍嬉,celery的 workers 和 beat 定時服務(wù)都需要同時開啟。

  1. 開啟 celery 的 worker 服務(wù)

    $ celery -A [project-name] worker --loglevel=info

  2. 作為一個單獨的進(jìn)程,啟動beat服務(wù)

     $ celery -A [project-name] beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    

    或者你也可以使用 -S (scheduler flag) 標(biāo)識符伺通,更多參數(shù)說明查看 celery beat --help

    $ celery -A [project-name] beat -l info -S django
    

    另外箍土,作為替代方案,你也可以只使用一個命令運行上面的兩個步驟(worker和beat服務(wù))(建議只用于開發(fā)環(huán)境

    $ celery -A [project-name] worker --beat --scheduler django --loglevel=info
    
  3. 現(xiàn)在啟動完這兩個服務(wù)罐监,就可以開始添加周期性任務(wù)了吴藻。

具體操作演練

看完了上面官網(wǎng)的說明,下面拿我前面寫好的兩個task任務(wù)來創(chuàng)建一下周期性任務(wù)弓柱。

創(chuàng)建基于間隔時間的周期性任務(wù)

初始化周期間隔對象 interval 對象

In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule                        

# 創(chuàng)建一個間隔10秒鐘的 interval 對象
In [2]: schedule, created = IntervalSchedule.objects.get_or_create( 
   ...:     every=10, 
   ...:     period=IntervalSchedule.SECONDS, 
   ...: ) 

# 查詢已創(chuàng)建的 interval 對象
In [7]: IntervalSchedule.objects.all()                                                               Out[7]: <QuerySet [<IntervalSchedule: every 10 seconds>]>

創(chuàng)建周期性間隔任務(wù)

創(chuàng)建一個無參數(shù)的周期性間隔任務(wù):
In [3]: PeriodicTask.objects.create( 
   ...:     interval=schedule,  # 上面創(chuàng)建10秒的間隔 interval 對象
   ...:     name='my_task2',    # 設(shè)置任務(wù)的name值
   ...:     task='celery_tasks.tasks.my_task2',  # 指定需要周期性執(zhí)行的任務(wù)
   ...: )                                                                                          
Out[3]: <PeriodicTask: my_task1: every 10 seconds>

創(chuàng)建之后沟堡,beat服務(wù)日志顯示如下:

[2020-05-15 10:23:00,176: INFO/MainProcess] Scheduler: Sending due task my_task2 (celery_tasks.tasks.my_task2)
[2020-05-15 10:23:10,180: INFO/MainProcess] Scheduler: Sending due task my_task2 (celery_tasks.tasks.my_task2)

worker服務(wù)日志顯示如下:

[2020-05-15 10:22:50,191: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd]  
[2020-05-15 10:22:50,193: WARNING/ForkPoolWorker-1] 任務(wù)2函數(shù)正在執(zhí)行....
[2020-05-15 10:22:50,196: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[43365c48-9b52-44ea-ba5f-d75cd7df49dd] succeeded in 0.003136722996714525s: None
[2020-05-15 10:23:00,191: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9]  
[2020-05-15 10:23:00,194: WARNING/ForkPoolWorker-1] 任務(wù)2函數(shù)正在執(zhí)行....
[2020-05-15 10:23:00,197: INFO/ForkPoolWorker-1] Task celery_tasks.tasks.my_task2[770beba4-4c3f-4e2f-8d36-7ee0b90dd3b9] succeeded in 0.0030223939975257963s: None
[2020-05-15 10:23:10,194: INFO/MainProcess] Received task: celery_tasks.tasks.my_task2[4b87f2f2-30d0-4c20-b028-601f28cb8193]  
[2020-05-15 10:23:10,196: WARNING/ForkPoolWorker-1] 任務(wù)2函數(shù)正在執(zhí)行....
創(chuàng)建一個帶參數(shù)的周期性間隔任務(wù):
In [27]: import json                                                                               

In [28]: from datetime import datetime, timedelta                                                  

In [29]: PeriodicTask.objects.create(  
    ...:     interval=schedule,  # 設(shè)置使用上面創(chuàng)建的 10 秒間隔 interval 對象
    ...:     name='my_task1',    # 設(shè)置周期性任務(wù)的名稱
    ...:     task='celery_tasks.tasks.my_task1',  # 設(shè)置指定執(zhí)行的task
    ...:     args=json.dumps([10, 20, 30]),  # 傳遞task需要的參數(shù)
    ...:     expires=datetime.now() + timedelta(seconds=30) # 任務(wù)的執(zhí)行超時時間,避免任務(wù)執(zhí)行時間過長
    ...: )                                                                                                                                                                    
Out[29]: <PeriodicTask: my_task1: every 10 seconds>

查看beat服務(wù)的日志:

image-20200515102958056

查看worker服務(wù)的日志:

image-20200515103033364

周期性任務(wù)在worker是否串行執(zhí)行還是并行矢空?

這里有個疑問航罗,如果只有一個worker,其中一個task執(zhí)行時間比較長屁药,例如:上面的兩個任務(wù)都設(shè)置休眠10秒伤哺,確認(rèn)是否可以同時執(zhí)行,還是要開啟多個worker執(zhí)行者祖。

設(shè)置 task 任務(wù)進(jìn)行休眠
from celery_tasks.celery import app
import time

# 創(chuàng)建任務(wù)函數(shù)
@app.task
def my_task1(a, b, c):
    print("任務(wù)1函數(shù)正在執(zhí)行....")
    print("任務(wù)1函數(shù)休眠10秒...")
    time.sleep(10)
    return a + b + c

@app.task
def my_task2():
    print("任務(wù)2函數(shù)正在執(zhí)行....")
    print("任務(wù)2函數(shù)休眠10秒....")
    time.sleep(10)
刪除這兩個周期性任務(wù)立莉,然后再創(chuàng)建后查看beat服務(wù)以及worker服務(wù)日志

刪除之前的兩個周期性任務(wù):

# 暫停執(zhí)行兩個周期性任務(wù)
In [32]: PeriodicTask.objects.get(name="my_task1").enabled = False                                   In [33]: PeriodicTask.objects.get(name="my_task1").save()                                             
In [34]: PeriodicTask.objects.get(name="my_task2").enabled = False                                   In [35]: PeriodicTask.objects.get(name="my_task2").save()                                                                                                                     
# 刪除任務(wù)
In [36]: PeriodicTask.objects.get(name="my_task1").delete()                                           Out[36]: (1, {'django_celery_beat.PeriodicTask': 1})
In [37]: PeriodicTask.objects.get(name="my_task2").delete()                                           Out[37]: (1, {'django_celery_beat.PeriodicTask': 1})

重啟beat服務(wù)、worker服務(wù):

因為修改了 task七问,需要重啟服務(wù)才能重新加載蜓耻。

# 啟動beat進(jìn)程
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

# 啟動一個worker進(jìn)程
celery -A celery_tasks worker -l info

重新創(chuàng)建兩個周期性任務(wù):

In [1]: from django_celery_beat.models import PeriodicTask, IntervalSchedule                        

# 創(chuàng)建一個間隔10秒鐘的 interval 對象
In [2]: schedule, created = IntervalSchedule.objects.get_or_create( 
   ...:     every=10, 
   ...:     period=IntervalSchedule.SECONDS, 
   ...: )      

# 創(chuàng)建無參數(shù)周期性任務(wù)
In [3]: PeriodicTask.objects.create( 
   ...:     interval=schedule,  # 上面創(chuàng)建10秒的間隔 interval 對象
   ...:     name='my_task2',    # 設(shè)置任務(wù)的name值
   ...:     task='celery_tasks.tasks.my_task2',  # 指定需要周期性執(zhí)行的任務(wù)
   ...: )                                                                                          
Out[3]: <PeriodicTask: my_task1: every 10 seconds>

# 創(chuàng)建帶參數(shù)周期性任務(wù)
In [27]: import json                                                                               

In [28]: from datetime import datetime, timedelta                                                  

In [29]: PeriodicTask.objects.create(  
    ...:     interval=schedule,  # 設(shè)置使用上面創(chuàng)建的 10 秒間隔 interval 對象
    ...:     name='my_task1',    # 設(shè)置周期性任務(wù)的名稱
    ...:     task='celery_tasks.tasks.my_task1',  # 設(shè)置指定執(zhí)行的task
    ...:     args=json.dumps([10, 20, 30]),  # 傳遞task需要的參數(shù)
    ...:     expires=datetime.now() + timedelta(seconds=30)
    ...: )                                                                                                                                                                    
Out[29]: <PeriodicTask: my_task1: every 10 seconds>

查看beat推送周期性任務(wù)的日志:

image-20200515105426638

查看單個worker的執(zhí)行日志:

image-20200515112919093

可以看到,因為worker不能并行執(zhí)行任務(wù)械巡,所以任務(wù)從beat發(fā)出來之后刹淌,在單個worker是串行執(zhí)行的,所以如果想要并發(fā)執(zhí)行worker讥耗,可以開啟多線程的方式有勾,或者開啟多個進(jìn)程。

開啟2個worker來查看執(zhí)行日志:

image-20200515113332254

所以需要并行執(zhí)行任務(wù)的時候古程,就需要設(shè)置多個worker來執(zhí)行任務(wù)蔼卡。

創(chuàng)建基于 crontab 的周期性任務(wù)

無限一直循環(huán)執(zhí)行的BUG

crontab周期性任務(wù)在使用的時候會出現(xiàn)beat服務(wù)一直不停發(fā)任務(wù)的情況,導(dǎo)致無法使用挣磨。目前嘗試多種方式雇逞,仍未有解決的辦法。

初始化 crontab 的調(diào)度對象

In [29]: import pytz                                                                                 

In [30]: from django_celery_beat.models import CrontabSchedule, PeriodicTask                                                                                                  
In [31]: schedule, _ = CrontabSchedule.objects.get_or_create( 
    ...:     minute='*', 
    ...:     hour='*', 
    ...:     day_of_week='*', 
    ...:     day_of_month='*', 
    ...:     month_of_year='*', 
    ...:     timezone=pytz.timezone('Asia/Shanghai') 
    ...: )     

In [32]: CrontabSchedule.objects.all()                                                               Out[32]: <QuerySet [<CrontabSchedule: * * * * * (m/h/d/dM/MY) Asia/Shanghai>, <CrontabSchedule: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>

創(chuàng)建一個無參數(shù)的定時任務(wù):

In [36]: PeriodicTask.objects.create(  
    ...:     crontab=schedule, # 上面創(chuàng)建的 crontab 對象 * * * * *茁裙,表示每分鐘執(zhí)行一次
    ...:     name='my_task2_crontab', # 設(shè)置任務(wù)的name值
    ...:     task='celery_tasks.tasks.my_task2',  # 指定需要周期性執(zhí)行的任務(wù)
    ...: )                                                                                                                                                                    
Out[36]: <PeriodicTask: my_task2_crontab: * * * * * (m/h/d/dM/MY) Asia/Shanghai>

beat服務(wù)不停發(fā)任務(wù)的日志塘砸,如下:

image-20200515135653034

周期性任務(wù)的查詢、刪除等操作

其實周期性任務(wù)也是存儲在數(shù)據(jù)庫的數(shù)據(jù)晤锥,基本上是基于ORM的操作的掉蔬。

周期性任務(wù)的查詢

# 導(dǎo)入周期性任務(wù)
In [1]: from django_celery_beat.models import PeriodicTask

# 查詢目前所有的周期性任務(wù)
In [3]: PeriodicTask.objects.all()                                                                     Out[3]: <ExtendedQuerySet [<PeriodicTask: Importing contacts: every 10 seconds>, <PeriodicTask: my_task: every 10 seconds>, <PeriodicTask: celery.backend_cleanup: 0 4 * * * (m/h/d/dM/MY) Asia/Shanghai>]>

# 遍歷周期性任務(wù)    
In [4]: for task in PeriodicTask.objects.all(): 
   ...:     print(task.id) 
   ...:                                                                                                                                                                       
1
2
3

# 通過id獲取其中一個周期性任務(wù)
In [5]: task1 = PeriodicTask.objects.get(id=1)                                                                                                                                
In [6]: task1                                                                                         Out[6]: <PeriodicTask: Importing contacts: every 10 seconds>

In [7]: PeriodicTask.objects.get(name="my_task")                                                                                                                              
Out[7]: <PeriodicTask: my_task: every 10 seconds>

# 通過name獲取其中的周期性任務(wù)
In [8]: task2 = PeriodicTask.objects.get(name="my_task")                                                                                                                      
In [9]: task2                                                                                         Out[9]: <PeriodicTask: my_task: every 10 seconds>

周期性任務(wù)的刪除

獲取到了周期性任務(wù)之后廊宪,好奇的我嘗試直接刪除,發(fā)現(xiàn)直接死循環(huán):

#  刪除周期性的任務(wù)女轿,千萬不要這樣做挤忙,會死循環(huán)
In [10]: task1.delete()                                                                               Out[10]: (1, {'django_celery_beat.PeriodicTask': 1})

In [11]: task2.delete()                                                                               Out[11]: (1, {'django_celery_beat.PeriodicTask': 1})

如果要刪除周期性任務(wù),必須首先暫停任務(wù)谈喳,然后再刪除,如下:

# 設(shè)置name為 my_taks1 的任務(wù)暫停執(zhí)行
In [6]: PeriodicTask.objects.get(name="my_task1").enabled = False                                  
In [7]: PeriodicTask.objects.get(name="my_task1").save()                                           

# 刪除該任務(wù)
In [8]: PeriodicTask.objects.get(name="my_task1").delete()                                         
Out[8]: (1, {'django_celery_beat.PeriodicTask': 1})

周期性任務(wù)暫停之后戈泼,重新啟動

當(dāng)然會有暫停任務(wù)之后婿禽,重新開啟任務(wù)的需求,如下:

# 設(shè)置任務(wù)的 enabled 為 True 即可:
In [21]: PeriodicTask.objects.get(name="my_task1").enabled = True                                     
In [22]: PeriodicTask.objects.get(name="my_task1").save()

更多精彩原創(chuàng)Devops文章大猛,快來關(guān)注我的Devops社群吧:

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末扭倾,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子挽绩,更是在濱河造成了極大的恐慌膛壹,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件唉堪,死亡現(xiàn)場離奇詭異模聋,居然都是意外死亡,警方通過查閱死者的電腦和手機唠亚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進(jìn)店門链方,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人灶搜,你說我怎么就攤上這事祟蚀。” “怎么了割卖?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵前酿,是天一觀的道長。 經(jīng)常有香客問我鹏溯,道長罢维,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任丙挽,我火速辦了婚禮言津,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘取试。我一直安慰自己悬槽,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布瞬浓。 她就那樣靜靜地躺著初婆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上磅叛,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天屑咳,我揣著相機與錄音,去河邊找鬼弊琴。 笑死兆龙,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的敲董。 我是一名探鬼主播紫皇,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼腋寨!你這毒婦竟也來了聪铺?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤萄窜,失蹤者是張志新(化名)和其女友劉穎铃剔,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體查刻,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡键兜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了穗泵。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蝶押。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖火欧,靈堂內(nèi)的尸體忽然破棺而出棋电,到底是詐尸還是另有隱情,我是刑警寧澤苇侵,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布赶盔,位于F島的核電站,受9級特大地震影響榆浓,放射性物質(zhì)發(fā)生泄漏于未。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一陡鹃、第九天 我趴在偏房一處隱蔽的房頂上張望烘浦。 院中可真熱鬧,春花似錦萍鲸、人聲如沸闷叉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽握侧。三九已至蚯瞧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間品擎,已是汗流浹背埋合。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留萄传,地道東北人甚颂。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像秀菱,于是被迫代替她去往敵國和親振诬。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,713評論 2 354