Celery
1.什么是Clelery
Celery是一個(gè)簡(jiǎn)單你辣、靈活且可靠的,處理大量消息的分布式系統(tǒng)
專注于實(shí)時(shí)處理的異步任務(wù)隊(duì)列
同時(shí)也支持任務(wù)調(diào)度
Celery架構(gòu)
Celery的架構(gòu)由三部分組成锄贼,消息中間件(message broker)男摧,任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成渠羞。
消息中間件
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成篡帕。包括殖侵,RabbitMQ, Redis等等
任務(wù)執(zhí)行單元
Worker是Celery提供的任務(wù)執(zhí)行的單元贸呢,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。
任務(wù)結(jié)果存儲(chǔ)
Task result store用來存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果拢军,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果楞陷,包括AMQP, redis等
版本支持情況
Celery version 4.0 runs on
Python ?2.7, 3.4, 3.5?
PyPy ?5.4, 5.5?
This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
If you’re running an older version of Python, you need to be running an older version of Celery:
Python 2.6: Celery series 3.1 or earlier.
Python 2.5: Celery series 3.0 or earlier.
Python 2.4 was Celery series 2.2 or earlier.
Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
2.使用場(chǎng)景
異步任務(wù):將耗時(shí)操作任務(wù)提交給Celery去異步執(zhí)行,比如發(fā)送短信/郵件朴沿、消息推送猜谚、音視頻處理等等
定時(shí)任務(wù):定時(shí)執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計(jì)
3.Celery的安裝配置
pip install celery
消息中間件:RabbitMQ/Redis
app=Celery('任務(wù)名'赌渣,backend='xxx',broker='xxx')
4.Celery執(zhí)行異步任務(wù)
基本使用
創(chuàng)建項(xiàng)目celerytest
創(chuàng)建py文件:celery_app_task.py
import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
return x+y
創(chuàng)建py文件:add_task.py,添加任務(wù)
from celery_app_task import add
result = add.delay(4,5)
print(result.id)
創(chuàng)建py文件:run.py魏铅,執(zhí)行任務(wù),或者使用命令執(zhí)行:celery worker -A celery_app_task -l info
注:windows下:celery worker -A celery_app_task -l info -P eventlet
from celery_app_task import cel
if __name__ == '__main__':
cel.worker_main()
# cel.worker_main(argv=['--loglevel=info')
創(chuàng)建py文件:result.py坚芜,查看任務(wù)執(zhí)行結(jié)果
from celery.result import AsyncResult
from celery_app_task import cel
async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
if async.successful():
result = async.get()
print(result)
# result.forget() # 將結(jié)果刪除
elif async.failed():
print('執(zhí)行失敗')
elif async.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif async.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif async.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')
執(zhí)行 add_task.py览芳,添加任務(wù),并獲取任務(wù)ID
執(zhí)行 run.py 鸿竖,或者執(zhí)行命令:celery worker -A celery_app_task -l info
執(zhí)行 result.py,檢查任務(wù)狀態(tài)并獲取結(jié)果
多任務(wù)結(jié)構(gòu)
pro_cel
├── celery_task# celery相關(guān)文件夾
│ ├── celery.py # celery連接和配置相關(guān)文件,必須叫這個(gè)名字
│ └── tasks1.py # 所有任務(wù)函數(shù)
│ └── tasks2.py # 所有任務(wù)函數(shù)
├── check_result.py # 檢查結(jié)果
└── send_task.py # 觸發(fā)任務(wù)
celery.py
from celery import Celery
cel = Celery('celery_demo',
broker='redis://127.0.0.1:6379/1',
backend='redis://127.0.0.1:6379/2',
# 包含以下兩個(gè)任務(wù)文件沧竟,去相應(yīng)的py文件中找任務(wù),對(duì)多個(gè)任務(wù)做分類
include=['celery_task.tasks1',
'celery_task.tasks2'
])
# 時(shí)區(qū)
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False
tasks1.py
import time
from celery_task.celery import cel
@cel.task
def test_celery(res):
time.sleep(5)
return "test_celery任務(wù)結(jié)果:%s"%res
tasks2.py
import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
time.sleep(5)
return "test_celery2任務(wù)結(jié)果:%s"%res
check_result.py
from celery.result import AsyncResult
from celery_task.celery import cel
async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)
if async.successful():
result = async.get()
print(result)
# result.forget() # 將結(jié)果刪除,執(zhí)行完成缚忧,結(jié)果不會(huì)自動(dòng)刪除
# async.revoke(terminate=True) # 無論現(xiàn)在是什么時(shí)候悟泵,都要終止
# async.revoke(terminate=False) # 如果任務(wù)還沒有開始執(zhí)行呢,那么就可以終止闪水。
elif async.failed():
print('執(zhí)行失敗')
elif async.status == 'PENDING':
print('任務(wù)等待中被執(zhí)行')
elif async.status == 'RETRY':
print('任務(wù)異常后正在重試')
elif async.status == 'STARTED':
print('任務(wù)已經(jīng)開始被執(zhí)行')
send_task.py
from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2
# 立即告知celery去執(zhí)行test_celery任務(wù)糕非,并傳入一個(gè)參數(shù)
result = test_celery.delay('第一個(gè)的執(zhí)行')
print(result.id)
result = test_celery2.delay('第二個(gè)的執(zhí)行')
print(result.id)
添加任務(wù)(執(zhí)行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet球榆,檢查任務(wù)執(zhí)行結(jié)果(執(zhí)行check_result.py)
5.Celery執(zhí)行定時(shí)任務(wù)
設(shè)定時(shí)間讓celery執(zhí)行一個(gè)任務(wù)
add_task.py
from celery_app_task import add
from datetime import datetime
# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)
# 方式二
ctime = datetime.now()
# 默認(rèn)用utc時(shí)間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay
# 使用apply_async并設(shè)定時(shí)間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
類似于contab的定時(shí)任務(wù)
多任務(wù)結(jié)構(gòu)中celery.py修改如下
from datetime import timedelta
from celery import Celery
from celery.schedules import crontab
cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
'celery_task.tasks1',
'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False
cel.conf.beat_schedule = {
# 名字隨意命名
'add-every-10-seconds': {
# 執(zhí)行tasks1下的test_celery函數(shù)
'task': 'celery_task.tasks1.test_celery',
# 每隔2秒執(zhí)行一次
# 'schedule': 1.0,
# 'schedule': crontab(minute="*/1"),
'schedule': timedelta(seconds=2),
# 傳遞參數(shù)
'args': ('test',)
},
# 'add-every-12-seconds': {
# 'task': 'celery_task.tasks1.test_celery',
# 每年4月11號(hào)朽肥,8點(diǎn)42分執(zhí)行
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
# 'args': (16, 16)
# },
}
啟動(dòng)一個(gè)beat:celery beat -A celery_task -l info
啟動(dòng)work執(zhí)行:celery worker -A celery_task -l info -P eventlet
6.Django中使用Celery
在項(xiàng)目目錄下創(chuàng)建celeryconfig.py
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設(shè)置并發(fā)worker數(shù)量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個(gè)worker最多執(zhí)行100個(gè)任務(wù)被銷毀,可以防止內(nèi)存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時(shí)時(shí)間
CELERYD_TASK_TIME_LIMIT=12*30
在app01目錄下創(chuàng)建tasks.py
from celery import task
@task
def add(a,b):
with open('a.text', 'a', encoding='utf-8') as f:
f.write('a')
print(a+b)
視圖函數(shù)views.py
from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
# result=add.delay(2,3)
ctime = datetime.now()
# 默認(rèn)用utc時(shí)間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=5)
task_time = utc_ctime + time_delay
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)
return HttpResponse('ok')
settings.py
#INSTALLED_APPS = [
# 'djcelery',
# 'app01'
#]
from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'