目錄結(jié)構(gòu)
taskproj
├── app01
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── models.py
│ ├── tasks.py
│ └── views.py
├── manage.py
├── taskproj
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
└── templates
在項(xiàng)目目錄taskproj/taskproj/目錄下新建celery.py:
```python
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'taskproj.settings') # 設(shè)置django環(huán)境
app = Celery('taskproj')
app.config_from_object('django.conf:settings', namespace='CELERY') # 使用CELERY_ 作為前綴莹弊,在settings中寫配置
app.autodiscover_tasks() # 發(fā)現(xiàn)任務(wù)文件每個(gè)app下的task.py
taskproj/taskproj/init.py:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']
taskproj/taskproj/settings.py:
CELERY_BROKER_URL = 'redis://10.1.210.69:6379/0' # Broker配置,使用Redis作為消息中間件
CELERY_RESULT_BACKEND = 'redis://10.1.210.69:6379/0' # BACKEND配置,這里使用redis
CELERY_RESULT_EXPIRES = None # 存儲(chǔ)結(jié)果過期時(shí)間
CELERY_RESULT_SERIALIZER = 'json' # 結(jié)果序列化方案
進(jìn)入項(xiàng)目的taskproj目錄啟動(dòng)worker:
celery worker -A taskproj -l debug
任務(wù)定義在每個(gè)tasks文件中佑吝,app01/tasks.py:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def mul(x, y):
return x * y