0x01 基本概念
如上圖所示急灭,由三部分組成:消息中間件(message broker)、任務(wù)執(zhí)行單元(worker)珊燎、任務(wù)執(zhí)行結(jié)果存儲(chǔ)(task result store)
消息中間件
Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括炼杖,RabbitMQ, Redis, MongoDB (experimental), Amazon SQS (experimental),CouchDB (experimental), SQLAlchemy (experimental),Django ORM (experimental), IronMQ
任務(wù)執(zhí)行單元
Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中盗迟。
任務(wù)結(jié)果存儲(chǔ)
Task result store用來(lái)存儲(chǔ)Worker執(zhí)行的任務(wù)的結(jié)果坤邪,Celery支持以不同方式存儲(chǔ)任務(wù)的結(jié)果,包括AMQP, Redis罚缕,memcached, MongoDB艇纺,SQLAlchemy, Django ORM,Apache Cassandra, IronCache
0x02 實(shí)例
創(chuàng)建實(shí)例app:
# celery.py
from celery import Celery
app = Celery('task_name', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')
# 加載celery配置
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
## 可以把配置寫(xiě)到py文件內(nèi)加載
app.config_from_object('django.conf:settings')
## 自動(dòng)發(fā)現(xiàn)任務(wù)(需要在app下創(chuàng)建tasks.py模塊)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
創(chuàng)建tasks.py:
# tasks.py
from celery import app
@app.task
def add(x, y):
return x+y
啟動(dòng)worker(啟動(dòng)完邮弹,當(dāng)然此時(shí)broker中還沒(méi)有任務(wù)黔衡,worker此時(shí)相當(dāng)于處于待命的狀態(tài))
celery -A tasks task_name --loglevel-info
觸發(fā)任務(wù):
#trigger.py
from tasks import add
result = add.delay(4, 4) #不要直接 add(4, 4),這里需要用 celery 提供的接口 delay 進(jìn)行調(diào)用
while not result.ready():
time.sleep(1)
print 'task done: {0}'.format(result.get())
delay返回的是一個(gè)AsyncResult對(duì)象腌乡,里面存的就是一個(gè)異步的結(jié)果盟劫,當(dāng)任務(wù)完成時(shí)result.ready()為T(mén)rue,然后用result.get()取結(jié)果即可与纽。
至此侣签,一個(gè)簡(jiǎn)單的任務(wù)隊(duì)列就完成了。
歡迎關(guān)注微信公眾號(hào)(coder0x00)或掃描下方二維碼關(guān)注急迂,我們將持續(xù)搜尋程序員必備基礎(chǔ)技能包提供給大家影所。