Celery(https://www.celerycn.io/v/4.4.0/ru-men/celery-jin-jie-shi-yong):
? ?一:特點(diǎn):
? ? ? ? 高可用:如果出現(xiàn)丟失連接或連接失敗绑榴,職程(Worker)和客戶端會自動重試露戒,并且中間人通過 主/主 主/從 的方式來進(jìn)行提高可用性
? ? ? ? 快速: 單個 Celery 進(jìn)行每分鐘可以處理數(shù)以百萬的任務(wù),而且延遲僅為亞毫秒(使用 RabbitMQ但惶、 librabbitmq 在優(yōu)化過后)
? ? ? ? 靈活:Celery 的每個部分幾乎都可以自定義擴(kuò)展和單獨(dú)使用女责,例如自定義連接池漆枚、序列化方式、壓縮方式抵知、日志記錄方式墙基、任務(wù)調(diào)度软族、生產(chǎn)者、消費(fèi)者残制、中間人(Broker)等
? ? 功能:
? ? ? ? 監(jiān)控:可以針對整個流程進(jìn)行監(jiān)控立砸,內(nèi)置的工具或可以實(shí)時說明當(dāng)前集群的概況
? ? ? ? 調(diào)度:可以通過調(diào)度功能在一段時間內(nèi)指定任務(wù)的執(zhí)行時間 datetime,也可以根據(jù)簡單每隔一段時間進(jìn)行執(zhí)行重復(fù)的任務(wù)初茶,支持分鐘颗祝、小時、星期幾恼布,也支持某一天或某一年的Crontab表達(dá)式
? ? ? ? 工作流:可以通過“canvas“進(jìn)行組成工作流螺戳,其中包含分組、鏈接桥氏、分塊等等温峭。簡單和復(fù)雜的工作流程可以使用一組“canvas“組成,其中包含分組字支、鏈接凤藏、分塊等
? ? ? ? 資源(內(nèi)存)泄漏保護(hù): --max-tasks-per-child 參數(shù)適用于可能會出現(xiàn)資源泄漏(例如:內(nèi)存泄漏)的任務(wù)
? ? ? ? 時間和速率的限制:您可以控制每秒/分鐘/小時執(zhí)行任務(wù)的次數(shù),或者任務(wù)執(zhí)行的最長時間堕伪,也將這些設(shè)置為默認(rèn)值揖庄,針對特定的任務(wù)或程序進(jìn)行定制化配置
? ? ? ? 自定義組件:開發(fā)者可以定制化每一個職程(Worker)以及額外的組件。職程(Worker)是用 “bootsteps” 構(gòu)建的-一個依賴關(guān)系圖欠雌,可以對職程(Worker)的內(nèi)部進(jìn)行細(xì)粒度控制
? ? 二: 用法
? ? 2.1創(chuàng)建一個文件夾proj, 在proj目錄下創(chuàng)建3個py文件__init__.py蹄梢, celery.py, tasks.py
? ? 在celery.py代碼:
? ? ? ? from __future__ import absolute_import, unicode_literals
? ? ? ? from celery import Celery
? ? ? ? # app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks'])
? ? ? ? app = Celery('proj', backend='redis://localhost', broker='redis://localhost:6379', include=['proj.tasks'])
? ? ? ? # Optional configuration, see the application user guide.
? ? ? ? app.conf.update(
? ? ? ? ? ? result_expires=3600,
? ? ? ? )
? ? ? ? if __name__ == '__main__':
? ? ? ? ? ? app.start()
? ? 采用的是redis作為中間件(broker)sudo apt-get install redis-server? ? sudo service redis-server restart
? ? task.py:
? ? ? ? from __future__ import absolute_import, unicode_literals
? ? ? ? from .celery import app
? ? ? ? @app.task
? ? ? ? def add(x, y):
? ? ? ? ? ? return x + y
? ? ? ? @app.task
? ? ? ? def mul(x, y):
? ? ? ? ? ? return x * y
? ? ? ? @app.task
? ? ? ? def xsum(numbers):
? ? ? ? ? ? return sum(numbers)
? ? 運(yùn)行:celery -A tasks proj --loglevel=info?
? ? 在proj目錄外面新建一個腳本proj_test.py:
?from proj.tasks import *
from celery import group
from celery import chain, chord
# delay() 實(shí)際上為 apply_async() 的快捷使用, apply_async() 可以指定調(diào)用時執(zhí)行的參數(shù)富俄,例如運(yùn)行的時間禁炒,使用的任務(wù)隊(duì)列等
# res 參數(shù):res.state, res.successful(), res.failed()
res = add.delay(2, 2)? # add.apply_async((2, 2), queue='lopri', countdown=10)
# 任務(wù)執(zhí)行引發(fā)異常,可以進(jìn)行檢查異常以及溯源霍比,默認(rèn)情況下 result.get() 會拋出異常,
# 如果不希望 Celery 拋出異常幕袱,可以通過設(shè)置 propagate 來進(jìn)行禁用
result_add = res.get(timeout=1)? # 或者res.get(propagate=False)
# r_add2 = res.get(propagate=False)
print(res.failed())
print(result_add)
# print(add(2, 2))
print(res.id)? # 每一個任務(wù)都有一個id, 獲取任務(wù)的ID
# 一個任務(wù)只能有當(dāng)前只能有一個狀態(tài)悠瞬,但他的執(zhí)行過程可以為多個狀態(tài)们豌,一個典型的階段是:
# PENDING -> STARTED -> SUCCESS
# 重試任務(wù)比較復(fù)雜,為了證明浅妆,一個任務(wù)會重試兩次望迎,任務(wù)的階段為:
# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
from proj.celery import app
res2 = app.AsyncResult('this-id-does-not-exist')
print(res2.state)
2.2:# Canvas:設(shè)計(jì)工作流程
s1 = add.signature((2, 2), countdown=1)? # add.s(2, 2)
res3 = s1.delay()
print(res3.get())
s2 = add.s(2)
res4 = s2.delay(8)
print(res4.get())
# 組:Groups
# 一個 group 并行調(diào)用任務(wù)列表,返回一個特殊的結(jié)果實(shí)例凌外,可以將結(jié)果作為一個列表進(jìn)行查看辩尊,并且通過索引進(jìn)去獲取返回值。
g1 = group(add.s(i, i) for i in range(10))().get()
g2 = group(add.s(i) for i in range(10))
res5 = g2(10).get()
print(res5)
2.3# 鏈:Chains可以將任務(wù)鏈接在一起康辑,在一個人返回后進(jìn)行調(diào)用另外一個任務(wù)
c1 = chain(add.s(4, 4) | mul.s(8))().get()
print(c1)
2.4:# 和弦:Chords 和弦是一個帶有回調(diào)的組:
c2 = chord((add.s(i, i) for i in range(10)), xsum.s())().get()
print(c2)
c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
# c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
print(c3)
三:在django中使用celery
環(huán)境: python3.7 ,django3.0.6
安裝: pip install django-celery
在settings.py中配置:
import djcelery
djcelery.setup_loader()
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = ('app.tasks', )
BROKER_URL = 'redis://127.0.0.1:6379/8'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
INSTALLED_APPS = [
? ? ...
? ? 'djcelery',
]
當(dāng)djcelery.setup_loader()運(yùn)行時对省,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件蝗拿,找到標(biāo)記為task的方法,將它們注冊為celery task蒿涎。
BROKER_URL ='redis://127.0.0.1:6379/6'
broker是代理人哀托,它負(fù)責(zé)分發(fā)任務(wù)給worker去執(zhí)行。我使用的是Redis作為broker,當(dāng)然你也可以用其它的broker劳秋,比如官方就比較推薦使用RabbitMQ.
有的博客中提到要配置關(guān)鍵字:CELERY_RESULT_BACKEND仓手,例如:
CELERY_RESULT_BACKEND='amqp://guest@localhost//'#可以不用寫
我沒有配置這個關(guān)鍵字。因?yàn)槿绻麤]有配置玻淑,此時Django會使用默認(rèn)的數(shù)據(jù)庫(也是你指定的orm數(shù)據(jù)庫)嗽冒,作為它的結(jié)果作為它的backend。因此你也可以不用寫补履,使用Django默認(rèn)設(shè)置的數(shù)據(jù)庫就很好添坊。
CELERY_IMPORTS = ('app.tasks', )
CELERY_TIMEZONE = TIME_ZONE
CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'
上面第一句是導(dǎo)入目標(biāo)任務(wù)文件,第二句是設(shè)置時區(qū)箫锤,第三句表示使用了django-celery默認(rèn)的數(shù)據(jù)庫調(diào)度模型,任務(wù)執(zhí)行周期都被存在默認(rèn)指定的orm數(shù)據(jù)庫中.
更深入的Celery配置:(http://www.cnblogs.com/ajianbeyourself/p/4950758.html)
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
#定時任務(wù)一: 每24小時周期執(zhí)行任務(wù)(del_redis_data)
u'刪除過期的redis數(shù)據(jù)': {
????"task":"app.tasks.del_redis_data","schedule": crontab(hour='*/24'),"args": (), },
上面是設(shè)置定時的時間配置贬蛙,關(guān)于crontab的具體用法,celery的官方文檔講解的十分詳盡(表格):
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
我選的三個任務(wù)谚攒,是我特意挑選的阳准,非常有代表性。第一個是周期任務(wù)馏臭,它會每隔一個固定時間周期去執(zhí)行一次相應(yīng)的task野蝇,比如隔1分鐘,隔1小時等; 第二個和第三個都是定時任務(wù)括儒,定時在每個時間點(diǎn)绕沈,比如每天的6點(diǎn),或者定時在每個月的1號帮寻。
周期任務(wù)和定時任務(wù)有小小的差別七冲,這也是crontab的強(qiáng)大之處,它同時支持這兩種规婆。
同步數(shù)據(jù)庫:
? ? python manage.py makemigrations
? ? python manage.py migrate
在app路徑下創(chuàng)建task.py腳本, 內(nèi)容如下:
from __future__ import absolute_import
from celery import task
from celery import shared_task
from data_draw.script.database_operation import *
from django.conf import settings
import os
@task()
def select_evaluation(data_name_dict):
? ? oss_data_name_list = select_dir(settings.EVALUATION_DIR, 0)
? ? # print(f"oss_data_name_list:{oss_data_name_list}")
? ? print("*"*200)
? ? print("異步執(zhí)行 select_evaluation 方法")
? ? for data_name in oss_data_name_list:
? ? ? ? oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name)
? ? ? ? eval_oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name, 'evaluation/')
? ? ? ? item = {}
? ? ? ? mp4_name = select_video(oss_data_path)
? ? ? ? if mp4_name:
? ? ? ? ? ? mp4 = True
? ? ? ? ? ? video_name = mp4_name[0]
? ? ? ? else:
? ? ? ? ? ? mp4 = False
? ? ? ? ? ? video_name = ''
? ? ? ? file_name_list = select_dir(eval_oss_data_path, 1)
? ? ? ? for key, value in settings.MEED_EVALUATION_FILE.items():
? ? ? ? ? ? if value in file_name_list:
? ? ? ? ? ? ? ? item[key] = True
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? item[key] = False
? ? ? ? try:
? ? ? ? ? ? value = data_name_dict[data_name]
? ? ? ? ? ? update_data_result(data_name, mp4, video_name, item)
? ? ? ? except:
? ? ? ? ? ? add_data_result(data_name, mp4, video_name, item)
在view.py腳本引用:
from data_draw.task import select_evaluation
select_evaluation.delay(data_name_dict)
運(yùn)行:
python manage.py runserver 0.0.0.0:8001#啟動django的應(yīng)用蝉稳,可以動態(tài)的使用django-admin來管理任務(wù)
python manage.py celery beat #應(yīng)該是用來監(jiān)控任務(wù)變化的
python manage.py celery worker -c 6 -l debug #任務(wù)執(zhí)行進(jìn)程抒蚜,worker進(jìn)程
運(yùn)行時報的第一個錯誤, async導(dǎo)入錯誤:
from . import async, base
? ? ? ? ? ? ? ? ? ? ? ^
SyntaxError: invalid syntax
這是因?yàn)樵?python 3.7 中將 async 作為了關(guān)鍵字耘戚,所以當(dāng) py 文件中出現(xiàn)類似 from . import async, base 這類不符合python語法的語句時嗡髓,Python會報錯
解決:
在 celery 官方的提議下,建議將 async.py 文件的文件名改成 asynchronous收津。
所以我們只需要將 celery\backends\async.py 改成 celery\backends\asynchronous.py饿这,并且把 celery下代碼中的所有 async 改成 asynchronous 就可以了
運(yùn)行時報的第二個錯誤:
File "/home/python/.virtualenvs/django_class/lib/python3.5/site-packages/redis/_compat.py", line 123, in iteritems
? ? return iter(x.items())
AttributeError: 'str' object has no attribute 'items'
這是因?yàn)橐郧鞍姹镜膔edis太高(3.0.1)浊伙,所以重新加載了redis
解決:
? ? pip install redis==2.10.6
參考文檔:
https://www.celerycn.io/v/4.4.0/ru-men/celery-chu-ci-shi-yong
http://www.reibang.com/p/e97ca5315c90