Celery 是一個(gè)簡(jiǎn)單、靈活且可靠的暂题,處理大量消息的分布式系統(tǒng)啥纸,它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列, 同時(shí)也支持任務(wù)調(diào)度誓斥。Celery 中有兩個(gè)比較關(guān)鍵的概念:
Worker: worker 是一個(gè)獨(dú)立的進(jìn)程只洒,它持續(xù)監(jiān)視隊(duì)列中是否有需要處理的任務(wù);
Broker: broker 也被稱為中間人劳坑,broker 負(fù)責(zé)協(xié)調(diào)客戶端和 worker 的溝通毕谴。客戶端向隊(duì)列添加消息距芬,broker 負(fù)責(zé)把消息派發(fā)給 worker涝开。
安裝 Celery
$ pip install celery
安裝 Broker
Celery 支持多種 broker, 但主要以 RabbitMQ 和 Redis 為主。在 RabbitMQ 和 Redis之間框仔,該如何選擇呢舀武?
RabbitMQ is feature-complete, stable, durable and easy to install. It’s an excellent choice for a production environment.
Redis is also feature-complete, but is more susceptible to data loss in the event of abrupt termination or power failures.
Celery 官方明確表示推薦在生產(chǎn)環(huán)境下使用 RabbitMQ,Redis 存在丟數(shù)據(jù)的問(wèn)題离斩。所以如果你的業(yè)務(wù)可以容忍 worker crash 或者電源故障導(dǎo)致的任務(wù)丟失银舱,采用 Redis 是個(gè)不錯(cuò)的選擇瘪匿,本篇就以 Redis 為例來(lái)介紹。
Celery 對(duì)于 Redis 的支持需要安裝相關(guān)的依賴纵朋,以下命令可以同時(shí)安裝 Celery 和 Redis 相關(guān)的依賴柿顶,但是 redis server 還是必須單獨(dú)安裝的。
$ pip install -U celery[redis] # -U 的意思是把所有指定的包都升級(jí)到最新的版本
Celery 的配置和使用
Celery 本身的配置項(xiàng)是很多的操软,但是如果要讓它跑起來(lái)嘁锯,只需要加一行配置:
BROKER_URL = 'redis://localhost:6379//'
這一行就是告訴 celery broker 的地址和選擇的 redis db,默認(rèn)是 0聂薪。接下來(lái)用個(gè)很簡(jiǎn)單的例子來(lái)介紹 celery 是如何使用的:
# tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost//')
@app.task
def add(x, y):
return x + y
以上創(chuàng)建了一個(gè) celery 的實(shí)例 app家乘,可以通過(guò)它來(lái)創(chuàng)建任務(wù)和管理 workers。
接下來(lái)運(yùn)行 celery worker藏澳,通過(guò)它來(lái)監(jiān)聽是否有任務(wù)要處理:
$ celery -A tasks worker -l info
通過(guò) celery worker –help 查看更多參數(shù)選項(xiàng)
接著再打開一個(gè) shell 窗口仁锯,進(jìn)入 python 控制臺(tái)去調(diào)用 add 任務(wù):
>>> from tasks import add
>>> add.delay(1, 2)
<AsyncResult: 42ade14e-c7ed-4b8d-894c-1ca1ec7ca192>
發(fā)現(xiàn) add 任務(wù)并沒(méi)有返回 3,而是一個(gè)對(duì)象 AsyncResult翔悠,它的作用是被用來(lái)檢查任務(wù)狀態(tài)业崖,獲取任務(wù)結(jié)果,如果任務(wù)失敗蓄愁,它會(huì)返回異常信息或者調(diào)用棧双炕。
先嘗試獲取任務(wù)的執(zhí)行結(jié)果:
>>> result = add.delay(1, 2)
>>> result.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File "/usr/local/lib/python2.7/dist-packages/celery/backends/base.py", line 604, in _is_disabled
'No result backend configured. '
NotImplementedError: No result backend configured. Please see the documentation for more information.
報(bào)錯(cuò)了:No result backend configured. 錯(cuò)誤信息告訴我們沒(méi)有配置 result backend。因?yàn)?celery 會(huì)將任務(wù)的狀態(tài)或結(jié)果保存在 result backend撮抓,result backend 的選擇也有很多妇斤,本例中依然選用 redis 作為 result backend。
修改 tasks.py 的代碼丹拯,添加上 result backend 的設(shè)置站超,保存后重啟 celery worker。
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost//')
然后重新調(diào)用 add task乖酬,看看是否獲取到了執(zhí)行結(jié)果死相?
>>> from tasks import add
>>> result = add.delay(1,2)
>>> result.get()
3
正確的獲得到了結(jié)果!
Flower
flower 是一個(gè) celery 的監(jiān)控工具剑刑,它提供了一個(gè)圖形用戶界面媳纬,可以極大的方便我們監(jiān)控任務(wù)的執(zhí)行過(guò)程, 執(zhí)行細(xì)節(jié)及歷史記錄施掏,還提供了統(tǒng)計(jì)功能钮惠。
安裝
$ pip install flower
使用簡(jiǎn)介
首先通過(guò)命令行啟動(dòng) flower 進(jìn)程:
$ flower -A tasks --port=5555
然后打開瀏覽器 http://localhost:5555/
Celery 任務(wù)類型
apply_async
調(diào)用一個(gè)異步任務(wù),這也是最常用的任務(wù)類型之一七芭,delay 與它的作用相同素挽,只是 delay 不支持 apply_async 中額外的參數(shù)。該方法有幾個(gè)比較重要的參數(shù)狸驳,在實(shí)際應(yīng)用中會(huì)經(jīng)常用到:
- countdown: 任務(wù)延遲執(zhí)行的秒數(shù)预明,默認(rèn)立即執(zhí)行缩赛;
- eta:任務(wù)被執(zhí)行的絕對(duì)時(shí)間
crontab schedules
from datetime import timedelta
from celery.schedules import crontab
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
# Executes every 30 seconds
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16)
},
}
要啟動(dòng)定時(shí)任務(wù),需要啟動(dòng)一個(gè)心跳進(jìn)程:
$ celery -A tasks beat
或者加上 -B
參數(shù)
$ celery -A tasks worker -B -l info
使用 Celery 的常見場(chǎng)景:
Web 應(yīng)用撰糠。當(dāng)用戶觸發(fā)的一個(gè)操作需要較長(zhǎng)時(shí)間才能執(zhí)行完成時(shí)酥馍,可以把它作為任務(wù)交給Celery 去異步執(zhí)行,執(zhí)行完再返回給用戶阅酪。這段時(shí)間用戶不需要等待旨袒,提高了網(wǎng)站的整體吞吐量和響應(yīng)時(shí)間。
定時(shí)任務(wù)术辐。生產(chǎn)環(huán)境經(jīng)常會(huì)跑一些定時(shí)任務(wù)砚尽。假如你有上千臺(tái)的服務(wù)器、上千種任務(wù)辉词,定時(shí)任務(wù)的管理很困難必孤,Celery 可以幫助我們快速在不同的機(jī)器設(shè)定不同種任務(wù)。
其他可以異步執(zhí)行的任務(wù)瑞躺。為了充分提高網(wǎng)站性能敷搪,對(duì)于請(qǐng)求和響應(yīng)之外的那些不要求必須同步完成的附加工作都可以異步完成。比如發(fā)送短信/郵件幢哨、推送消息等购啄。
Celery 特性:
方便地查看定時(shí)任務(wù)的執(zhí)行情況,比如執(zhí)行是否成功嘱么、當(dāng)前狀態(tài)、執(zhí)行任務(wù)花費(fèi)的時(shí)間等顽悼。
可以使用功能齊備的管理后臺(tái)或者命令行添加曼振、更新、刪除任務(wù)蔚龙。
Celery 架構(gòu)(包含如下組件):
Celery Beat:任務(wù)調(diào)度器冰评,Beat 進(jìn)程會(huì)讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊(duì)列木羹。
Celery Worker:執(zhí)行任務(wù)的消費(fèi)者甲雅,通常會(huì)在多臺(tái)服務(wù)器運(yùn)行多個(gè)消費(fèi)者來(lái)提高執(zhí)行效率。
Broker:消息代理坑填,或者叫作消息中間件抛人,接受任務(wù)生產(chǎn)者發(fā)送過(guò)來(lái)的任務(wù)消息,存進(jìn)隊(duì)列再按序分發(fā)給任務(wù)消費(fèi)者脐瑰。由于 Celery 本身不含消息服務(wù)妖枚,所以需要使用第三方消息服務(wù)來(lái)傳遞任務(wù)。目前苍在,Celery 支持的消息服務(wù)有RabbitMQ绝页、Redis甚至是數(shù)據(jù)庫(kù)荠商。
Producer:調(diào)用了 Celery 提供的 API、函數(shù)或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊(duì)列處理的都是任務(wù)的生產(chǎn)者续誉。
Result Backend:任務(wù)處理完成后保存狀態(tài)信息和結(jié)果莱没,以供查詢。
任務(wù)產(chǎn)生的兩種方式:①發(fā)布者發(fā)布任務(wù)(web應(yīng)用)②任務(wù)調(diào)度按期發(fā)布任務(wù)(定時(shí)執(zhí)行)
Celery 序列化
在客戶端和消費(fèi)者之間傳輸數(shù)據(jù)需要序列化和反序列化酷鸦,Celery 支持如下序列化方案饰躲。