Celery

Celery

1.什么是Clelery

Celery是一個(gè)簡(jiǎn)單你辣、靈活且可靠的,處理大量消息的分布式系統(tǒng)

專注于實(shí)時(shí)處理的異步任務(wù)隊(duì)列

同時(shí)也支持任務(wù)調(diào)度

Celery架構(gòu)

Celery的架構(gòu)由三部分組成锄贼,消息中間件(message broker)男摧,任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)組成渠羞。

消息中間件

Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成篡帕。包括殖侵,RabbitMQ, Redis等等

任務(wù)執(zhí)行單元

Worker是Celery提供的任務(wù)執(zhí)行的單元贸呢,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。

任務(wù)結(jié)果存儲(chǔ)

Task result store用來存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果拢军,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果楞陷,包括AMQP, redis等

版本支持情況

Celery version 4.0 runs on
        Python ?2.7, 3.4, 3.5?
        PyPy ?5.4, 5.5?
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

2.使用場(chǎng)景

異步任務(wù):將耗時(shí)操作任務(wù)提交給Celery去異步執(zhí)行,比如發(fā)送短信/郵件朴沿、消息推送猜谚、音視頻處理等等

定時(shí)任務(wù):定時(shí)執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計(jì)

3.Celery的安裝配置

pip install celery

消息中間件:RabbitMQ/Redis

app=Celery('任務(wù)名'赌渣,backend='xxx',broker='xxx')

4.Celery執(zhí)行異步任務(wù)

基本使用

創(chuàng)建項(xiàng)目celerytest

創(chuàng)建py文件:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y


創(chuàng)建py文件:add_task.py,添加任務(wù)

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

創(chuàng)建py文件:run.py魏铅,執(zhí)行任務(wù),或者使用命令執(zhí)行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

創(chuàng)建py文件:result.py坚芜,查看任務(wù)執(zhí)行結(jié)果

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結(jié)果刪除
elif async.failed():
    print('執(zhí)行失敗')
elif async.status == 'PENDING':
    print('任務(wù)等待中被執(zhí)行')
elif async.status == 'RETRY':
    print('任務(wù)異常后正在重試')
elif async.status == 'STARTED':
    print('任務(wù)已經(jīng)開始被執(zhí)行')

執(zhí)行 add_task.py览芳,添加任務(wù),并獲取任務(wù)ID

執(zhí)行 run.py 鸿竖,或者執(zhí)行命令:celery worker -A celery_app_task -l info

執(zhí)行 result.py,檢查任務(wù)狀態(tài)并獲取結(jié)果

多任務(wù)結(jié)構(gòu)

pro_cel
    ├── celery_task# celery相關(guān)文件夾
    │   ├── celery.py   # celery連接和配置相關(guān)文件,必須叫這個(gè)名字
    │   └── tasks1.py    #  所有任務(wù)函數(shù)
    │   └── tasks2.py    #  所有任務(wù)函數(shù)
    ├── check_result.py # 檢查結(jié)果
    └── send_task.py    # 觸發(fā)任務(wù)

celery.py

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下兩個(gè)任務(wù)文件沧竟,去相應(yīng)的py文件中找任務(wù),對(duì)多個(gè)任務(wù)做分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

# 時(shí)區(qū)
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

tasks1.py

import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務(wù)結(jié)果:%s"%res

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務(wù)結(jié)果:%s"%res

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結(jié)果刪除,執(zhí)行完成缚忧,結(jié)果不會(huì)自動(dòng)刪除
    # async.revoke(terminate=True)  # 無論現(xiàn)在是什么時(shí)候悟泵,都要終止
    # async.revoke(terminate=False) # 如果任務(wù)還沒有開始執(zhí)行呢,那么就可以終止闪水。
elif async.failed():
    print('執(zhí)行失敗')
elif async.status == 'PENDING':
    print('任務(wù)等待中被執(zhí)行')
elif async.status == 'RETRY':
    print('任務(wù)異常后正在重試')
elif async.status == 'STARTED':
    print('任務(wù)已經(jīng)開始被執(zhí)行')

send_task.py

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 立即告知celery去執(zhí)行test_celery任務(wù)糕非,并傳入一個(gè)參數(shù)
result = test_celery.delay('第一個(gè)的執(zhí)行')
print(result.id)
result = test_celery2.delay('第二個(gè)的執(zhí)行')
print(result.id)

添加任務(wù)(執(zhí)行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet球榆,檢查任務(wù)執(zhí)行結(jié)果(執(zhí)行check_result.py)

5.Celery執(zhí)行定時(shí)任務(wù)

設(shè)定時(shí)間讓celery執(zhí)行一個(gè)任務(wù)

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默認(rèn)用utc時(shí)間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并設(shè)定時(shí)間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

類似于contab的定時(shí)任務(wù)

多任務(wù)結(jié)構(gòu)中celery.py修改如下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執(zhí)行tasks1下的test_celery函數(shù)
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒執(zhí)行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數(shù)
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks1.test_celery',
    #     每年4月11號(hào)朽肥,8點(diǎn)42分執(zhí)行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

啟動(dòng)一個(gè)beat:celery beat -A celery_task -l info

啟動(dòng)work執(zhí)行:celery worker -A celery_task -l info -P eventlet

6.Django中使用Celery

在項(xiàng)目目錄下創(chuàng)建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設(shè)置并發(fā)worker數(shù)量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個(gè)worker最多執(zhí)行100個(gè)任務(wù)被銷毀,可以防止內(nèi)存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時(shí)時(shí)間
CELERYD_TASK_TIME_LIMIT=12*30

在app01目錄下創(chuàng)建tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

視圖函數(shù)views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認(rèn)用utc時(shí)間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py

#INSTALLED_APPS = [
#    'djcelery',
#    'app01'
#]

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末持钉,一起剝皮案震驚了整個(gè)濱河市衡招,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌每强,老刑警劉巖始腾,帶你破解...
    沈念sama閱讀 211,496評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異空执,居然都是意外死亡窘茁,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,187評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門脆烟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人房待,你說我怎么就攤上這事邢羔⊥漳ǎ” “怎么了?”我有些...
    開封第一講書人閱讀 157,091評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵拜鹤,是天一觀的道長(zhǎng)框冀。 經(jīng)常有香客問我,道長(zhǎng)敏簿,這世上最難降的妖魔是什么明也? 我笑而不...
    開封第一講書人閱讀 56,458評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮惯裕,結(jié)果婚禮上温数,老公的妹妹穿的比我還像新娘。我一直安慰自己蜻势,他們只是感情好撑刺,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,542評(píng)論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著握玛,像睡著了一般够傍。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挠铲,一...
    開封第一講書人閱讀 49,802評(píng)論 1 290
  • 那天冕屯,我揣著相機(jī)與錄音,去河邊找鬼拂苹。 笑死安聘,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的醋寝。 我是一名探鬼主播搞挣,決...
    沈念sama閱讀 38,945評(píng)論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼音羞!你這毒婦竟也來了囱桨?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,709評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤嗅绰,失蹤者是張志新(化名)和其女友劉穎舍肠,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窘面,經(jīng)...
    沈念sama閱讀 44,158評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡翠语,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,502評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了财边。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肌括。...
    茶點(diǎn)故事閱讀 38,637評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖酣难,靈堂內(nèi)的尸體忽然破棺而出谍夭,到底是詐尸還是另有隱情黑滴,我是刑警寧澤,帶...
    沈念sama閱讀 34,300評(píng)論 4 329
  • 正文 年R本政府宣布紧索,位于F島的核電站袁辈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏珠漂。R本人自食惡果不足惜晚缩,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,911評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望媳危。 院中可真熱鬧荞彼,春花似錦、人聲如沸济舆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,744評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽滋觉。三九已至签夭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間椎侠,已是汗流浹背第租。 一陣腳步聲響...
    開封第一講書人閱讀 31,982評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留我纪,地道東北人慎宾。 一個(gè)月前我還...
    沈念sama閱讀 46,344評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像浅悉,于是被迫代替她去往敵國(guó)和親趟据。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,500評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容