1.在一些復(fù)雜的處理中缔御,使用隊列
在settings.py中添加
CELERY_QUEUES = {
"default": { # 這是上面指定的默認(rèn)隊列
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"web_tasks_1": {
"routing_key": "demoapp.tasks.get_data_tencent",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"web_tasks_2": {
"routing_key": "demoapp.tasks.get_alipay_data",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
}
class QueRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task =='demoapp.tasks.get_data_tencent':
return {
'queue': 'web_tasks_1',
}
# 我的dongwm.tasks文件里面有2個任務(wù)都是test開頭
elif task == 'demoapp.tasks.get_alipay_data':
return {
"queue": "web_tasks_2",
}
# 剩下的其實就會被放到默認(rèn)隊列
else:
return {
"queue": "default",
}
# CELERY_ROUTES本來也可以用一個大的含有多個字典的字典,但是不如直接對它做一個名稱統(tǒng)配
CELERY_ROUTES = (QueRouter(), )
2.啟動特定隊列處理任務(wù)
celery -A proj.celery:app worker -l info -n worker.%h -Q default
celery -A proj.celery:app worker -l info -n worker.%h -Q web_tasks_2
3.定時任務(wù)
settings.py
定時任務(wù)內(nèi)容
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-3-seconds': {
'task': 'demoapp.tasks.add',
# 'schedule': crontab(minute=u'40', hour=u'17',),
'schedule': timedelta(seconds=3),
'args': (16, 16)
},
}
tasks.py
定時任務(wù)
from __future__ import absolute_import
from celery import shared_task
@shared_task
def add(x, y):
return x + y
效果圖