Celery基礎(chǔ),django-celery在django如何使用和一些問題如何解決

Celery(https://www.celerycn.io/v/4.4.0/ru-men/celery-jin-jie-shi-yong):

? ?一:特點(diǎn):

? ? ? ? 高可用:如果出現(xiàn)丟失連接或連接失敗绑榴,職程(Worker)和客戶端會自動重試露戒,并且中間人通過 主/主 主/從 的方式來進(jìn)行提高可用性

? ? ? ? 快速: 單個 Celery 進(jìn)行每分鐘可以處理數(shù)以百萬的任務(wù),而且延遲僅為亞毫秒(使用 RabbitMQ但惶、 librabbitmq 在優(yōu)化過后)

? ? ? ? 靈活:Celery 的每個部分幾乎都可以自定義擴(kuò)展和單獨(dú)使用女责,例如自定義連接池漆枚、序列化方式、壓縮方式抵知、日志記錄方式墙基、任務(wù)調(diào)度软族、生產(chǎn)者、消費(fèi)者残制、中間人(Broker)等

? ? 功能:

? ? ? ? 監(jiān)控:可以針對整個流程進(jìn)行監(jiān)控立砸,內(nèi)置的工具或可以實(shí)時說明當(dāng)前集群的概況

? ? ? ? 調(diào)度:可以通過調(diào)度功能在一段時間內(nèi)指定任務(wù)的執(zhí)行時間 datetime,也可以根據(jù)簡單每隔一段時間進(jìn)行執(zhí)行重復(fù)的任務(wù)初茶,支持分鐘颗祝、小時、星期幾恼布,也支持某一天或某一年的Crontab表達(dá)式

? ? ? ? 工作流:可以通過“canvas“進(jìn)行組成工作流螺戳,其中包含分組、鏈接桥氏、分塊等等温峭。簡單和復(fù)雜的工作流程可以使用一組“canvas“組成,其中包含分組字支、鏈接凤藏、分塊等

? ? ? ? 資源(內(nèi)存)泄漏保護(hù): --max-tasks-per-child 參數(shù)適用于可能會出現(xiàn)資源泄漏(例如:內(nèi)存泄漏)的任務(wù)

? ? ? ? 時間和速率的限制:您可以控制每秒/分鐘/小時執(zhí)行任務(wù)的次數(shù),或者任務(wù)執(zhí)行的最長時間堕伪,也將這些設(shè)置為默認(rèn)值揖庄,針對特定的任務(wù)或程序進(jìn)行定制化配置

? ? ? ? 自定義組件:開發(fā)者可以定制化每一個職程(Worker)以及額外的組件。職程(Worker)是用 “bootsteps” 構(gòu)建的-一個依賴關(guān)系圖欠雌,可以對職程(Worker)的內(nèi)部進(jìn)行細(xì)粒度控制


? ? 二: 用法

? ? 2.1創(chuàng)建一個文件夾proj, 在proj目錄下創(chuàng)建3個py文件__init__.py蹄梢, celery.py, tasks.py

? ? 在celery.py代碼:

? ? ? ? from __future__ import absolute_import, unicode_literals

? ? ? ? from celery import Celery

? ? ? ? # app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks'])

? ? ? ? app = Celery('proj', backend='redis://localhost', broker='redis://localhost:6379', include=['proj.tasks'])

? ? ? ? # Optional configuration, see the application user guide.

? ? ? ? app.conf.update(

? ? ? ? ? ? result_expires=3600,

? ? ? ? )

? ? ? ? if __name__ == '__main__':

? ? ? ? ? ? app.start()

? ? 采用的是redis作為中間件(broker)sudo apt-get install redis-server? ? sudo service redis-server restart

? ? task.py:

? ? ? ? from __future__ import absolute_import, unicode_literals

? ? ? ? from .celery import app

? ? ? ? @app.task

? ? ? ? def add(x, y):

? ? ? ? ? ? return x + y

? ? ? ? @app.task

? ? ? ? def mul(x, y):

? ? ? ? ? ? return x * y

? ? ? ? @app.task

? ? ? ? def xsum(numbers):

? ? ? ? ? ? return sum(numbers)

? ? 運(yùn)行:celery -A tasks proj --loglevel=info?

? ? 在proj目錄外面新建一個腳本proj_test.py:

?from proj.tasks import *

from celery import group

from celery import chain, chord

# delay() 實(shí)際上為 apply_async() 的快捷使用, apply_async() 可以指定調(diào)用時執(zhí)行的參數(shù)富俄,例如運(yùn)行的時間禁炒,使用的任務(wù)隊(duì)列等

# res 參數(shù):res.state, res.successful(), res.failed()

res = add.delay(2, 2)? # add.apply_async((2, 2), queue='lopri', countdown=10)

# 任務(wù)執(zhí)行引發(fā)異常,可以進(jìn)行檢查異常以及溯源霍比,默認(rèn)情況下 result.get() 會拋出異常,

# 如果不希望 Celery 拋出異常幕袱,可以通過設(shè)置 propagate 來進(jìn)行禁用

result_add = res.get(timeout=1)? # 或者res.get(propagate=False)

# r_add2 = res.get(propagate=False)

print(res.failed())

print(result_add)

# print(add(2, 2))

print(res.id)? # 每一個任務(wù)都有一個id, 獲取任務(wù)的ID

# 一個任務(wù)只能有當(dāng)前只能有一個狀態(tài)悠瞬,但他的執(zhí)行過程可以為多個狀態(tài)们豌,一個典型的階段是:

# PENDING -> STARTED -> SUCCESS

# 重試任務(wù)比較復(fù)雜,為了證明浅妆,一個任務(wù)會重試兩次望迎,任務(wù)的階段為:

# PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

from proj.celery import app

res2 = app.AsyncResult('this-id-does-not-exist')

print(res2.state)

2.2:# Canvas:設(shè)計(jì)工作流程

s1 = add.signature((2, 2), countdown=1)? # add.s(2, 2)

res3 = s1.delay()

print(res3.get())

s2 = add.s(2)

res4 = s2.delay(8)

print(res4.get())

# 組:Groups

# 一個 group 并行調(diào)用任務(wù)列表,返回一個特殊的結(jié)果實(shí)例凌外,可以將結(jié)果作為一個列表進(jìn)行查看辩尊,并且通過索引進(jìn)去獲取返回值。

g1 = group(add.s(i, i) for i in range(10))().get()

g2 = group(add.s(i) for i in range(10))

res5 = g2(10).get()

print(res5)

2.3# 鏈:Chains可以將任務(wù)鏈接在一起康辑,在一個人返回后進(jìn)行調(diào)用另外一個任務(wù)

c1 = chain(add.s(4, 4) | mul.s(8))().get()

print(c1)

2.4:# 和弦:Chords 和弦是一個帶有回調(diào)的組:

c2 = chord((add.s(i, i) for i in range(10)), xsum.s())().get()

print(c2)

c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()

# c3 = (group(add.s(i, i) for i in range(10)) | xsum.s())().get()

print(c3)


三:在django中使用celery

環(huán)境: python3.7 ,django3.0.6

安裝: pip install django-celery

在settings.py中配置:

import djcelery

djcelery.setup_loader()

CELERY_TIMEZONE = 'Asia/Shanghai'

CELERY_IMPORTS = ('app.tasks', )

BROKER_URL = 'redis://127.0.0.1:6379/8'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

INSTALLED_APPS = [

? ? ...

? ? 'djcelery',

]

當(dāng)djcelery.setup_loader()運(yùn)行時对省,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件蝗拿,找到標(biāo)記為task的方法,將它們注冊為celery task蒿涎。

BROKER_URL ='redis://127.0.0.1:6379/6'

broker是代理人哀托,它負(fù)責(zé)分發(fā)任務(wù)給worker去執(zhí)行。我使用的是Redis作為broker,當(dāng)然你也可以用其它的broker劳秋,比如官方就比較推薦使用RabbitMQ.

有的博客中提到要配置關(guān)鍵字:CELERY_RESULT_BACKEND仓手,例如:

CELERY_RESULT_BACKEND='amqp://guest@localhost//'#可以不用寫

我沒有配置這個關(guān)鍵字。因?yàn)槿绻麤]有配置玻淑,此時Django會使用默認(rèn)的數(shù)據(jù)庫(也是你指定的orm數(shù)據(jù)庫)嗽冒,作為它的結(jié)果作為它的backend。因此你也可以不用寫补履,使用Django默認(rèn)設(shè)置的數(shù)據(jù)庫就很好添坊。

CELERY_IMPORTS = ('app.tasks', )

CELERY_TIMEZONE = TIME_ZONE

CELERYBEAT_SCHEDULER ='djcelery.schedulers.DatabaseScheduler'

上面第一句是導(dǎo)入目標(biāo)任務(wù)文件,第二句是設(shè)置時區(qū)箫锤,第三句表示使用了django-celery默認(rèn)的數(shù)據(jù)庫調(diào)度模型,任務(wù)執(zhí)行周期都被存在默認(rèn)指定的orm數(shù)據(jù)庫中.

更深入的Celery配置:(http://www.cnblogs.com/ajianbeyourself/p/4950758.html)

from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {

#定時任務(wù)一: 每24小時周期執(zhí)行任務(wù)(del_redis_data)

u'刪除過期的redis數(shù)據(jù)': {

????"task":"app.tasks.del_redis_data","schedule": crontab(hour='*/24'),"args": (), },

上面是設(shè)置定時的時間配置贬蛙,關(guān)于crontab的具體用法,celery的官方文檔講解的十分詳盡(表格):

http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

我選的三個任務(wù)谚攒,是我特意挑選的阳准,非常有代表性。第一個是周期任務(wù)馏臭,它會每隔一個固定時間周期去執(zhí)行一次相應(yīng)的task野蝇,比如隔1分鐘,隔1小時等; 第二個和第三個都是定時任務(wù)括儒,定時在每個時間點(diǎn)绕沈,比如每天的6點(diǎn),或者定時在每個月的1號帮寻。

周期任務(wù)和定時任務(wù)有小小的差別七冲,這也是crontab的強(qiáng)大之處,它同時支持這兩種规婆。

同步數(shù)據(jù)庫:

? ? python manage.py makemigrations

? ? python manage.py migrate

在app路徑下創(chuàng)建task.py腳本, 內(nèi)容如下:

from __future__ import absolute_import

from celery import task

from celery import shared_task

from data_draw.script.database_operation import *

from django.conf import settings

import os

@task()

def select_evaluation(data_name_dict):

? ? oss_data_name_list = select_dir(settings.EVALUATION_DIR, 0)

? ? # print(f"oss_data_name_list:{oss_data_name_list}")

? ? print("*"*200)

? ? print("異步執(zhí)行 select_evaluation 方法")

? ? for data_name in oss_data_name_list:

? ? ? ? oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name)

? ? ? ? eval_oss_data_path = os.path.join(settings.EVALUATION_DIR, data_name, 'evaluation/')

? ? ? ? item = {}

? ? ? ? mp4_name = select_video(oss_data_path)

? ? ? ? if mp4_name:

? ? ? ? ? ? mp4 = True

? ? ? ? ? ? video_name = mp4_name[0]

? ? ? ? else:

? ? ? ? ? ? mp4 = False

? ? ? ? ? ? video_name = ''

? ? ? ? file_name_list = select_dir(eval_oss_data_path, 1)

? ? ? ? for key, value in settings.MEED_EVALUATION_FILE.items():

? ? ? ? ? ? if value in file_name_list:

? ? ? ? ? ? ? ? item[key] = True

? ? ? ? ? ? else:

? ? ? ? ? ? ? ? item[key] = False

? ? ? ? try:

? ? ? ? ? ? value = data_name_dict[data_name]

? ? ? ? ? ? update_data_result(data_name, mp4, video_name, item)

? ? ? ? except:

? ? ? ? ? ? add_data_result(data_name, mp4, video_name, item)

在view.py腳本引用:

from data_draw.task import select_evaluation

select_evaluation.delay(data_name_dict)

運(yùn)行:

python manage.py runserver 0.0.0.0:8001#啟動django的應(yīng)用蝉稳,可以動態(tài)的使用django-admin來管理任務(wù)

python manage.py celery beat #應(yīng)該是用來監(jiān)控任務(wù)變化的

python manage.py celery worker -c 6 -l debug #任務(wù)執(zhí)行進(jìn)程抒蚜,worker進(jìn)程

運(yùn)行時報的第一個錯誤, async導(dǎo)入錯誤:

from . import async, base

? ? ? ? ? ? ? ? ? ? ? ^

SyntaxError: invalid syntax

這是因?yàn)樵?python 3.7 中將 async 作為了關(guān)鍵字耘戚,所以當(dāng) py 文件中出現(xiàn)類似 from . import async, base 這類不符合python語法的語句時嗡髓,Python會報錯

解決:

在 celery 官方的提議下,建議將 async.py 文件的文件名改成 asynchronous收津。

所以我們只需要將 celery\backends\async.py 改成 celery\backends\asynchronous.py饿这,并且把 celery下代碼中的所有 async 改成 asynchronous 就可以了

運(yùn)行時報的第二個錯誤:

File "/home/python/.virtualenvs/django_class/lib/python3.5/site-packages/redis/_compat.py", line 123, in iteritems

? ? return iter(x.items())

AttributeError: 'str' object has no attribute 'items'

這是因?yàn)橐郧鞍姹镜膔edis太高(3.0.1)浊伙,所以重新加載了redis

解決:

? ? pip install redis==2.10.6

參考文檔:

https://www.celerycn.io/v/4.4.0/ru-men/celery-chu-ci-shi-yong

http://www.reibang.com/p/e97ca5315c90

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市长捧,隨后出現(xiàn)的幾起案子嚣鄙,更是在濱河造成了極大的恐慌,老刑警劉巖串结,帶你破解...
    沈念sama閱讀 221,635評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哑子,死亡現(xiàn)場離奇詭異,居然都是意外死亡肌割,警方通過查閱死者的電腦和手機(jī)卧蜓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來把敞,“玉大人弥奸,你說我怎么就攤上這事》茉纾” “怎么了盛霎?”我有些...
    開封第一講書人閱讀 168,083評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長伸蚯。 經(jīng)常有香客問我摩渺,道長,這世上最難降的妖魔是什么剂邮? 我笑而不...
    開封第一講書人閱讀 59,640評論 1 296
  • 正文 為了忘掉前任摇幻,我火速辦了婚禮,結(jié)果婚禮上挥萌,老公的妹妹穿的比我還像新娘绰姻。我一直安慰自己,他們只是感情好引瀑,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評論 6 397
  • 文/花漫 我一把揭開白布狂芋。 她就那樣靜靜地躺著,像睡著了一般憨栽。 火紅的嫁衣襯著肌膚如雪帜矾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,262評論 1 308
  • 那天屑柔,我揣著相機(jī)與錄音屡萤,去河邊找鬼。 笑死掸宛,一個胖子當(dāng)著我的面吹牛死陆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播唧瘾,決...
    沈念sama閱讀 40,833評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼措译,長吁一口氣:“原來是場噩夢啊……” “哼别凤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起领虹,我...
    開封第一講書人閱讀 39,736評論 0 276
  • 序言:老撾萬榮一對情侶失蹤规哪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后掠械,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體由缆,經(jīng)...
    沈念sama閱讀 46,280評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評論 3 340
  • 正文 我和宋清朗相戀三年猾蒂,在試婚紗的時候發(fā)現(xiàn)自己被綠了均唉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,503評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡肚菠,死狀恐怖舔箭,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蚊逢,我是刑警寧澤层扶,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站烙荷,受9級特大地震影響镜会,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜终抽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評論 3 333
  • 文/蒙蒙 一戳表、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧昼伴,春花似錦匾旭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至持舆,卻和暖如春色瘩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背逸寓。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評論 1 272
  • 我被黑心中介騙來泰國打工居兆, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人席覆。 一個月前我還...
    沈念sama閱讀 48,909評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像汹买,于是被迫代替她去往敵國和親佩伤。 傳聞我的和親對象是個殘疾皇子聊倔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評論 2 359

推薦閱讀更多精彩內(nèi)容