celery的簡介
??celery是一個基于分布式消息傳輸?shù)漠惒饺蝿?wù)隊列臀防,它專注于實時處理眠菇,同時也支持任務(wù)調(diào)度。它的執(zhí)行單元為任務(wù)(task)袱衷,利用多線程捎废,如Eventlet,gevent等致燥,它們能被并發(fā)地執(zhí)行在單個或多個職程服務(wù)器(worker servers)上登疗。任務(wù)能異步執(zhí)行(后臺運行)或同步執(zhí)行(等待任務(wù)完成)。
??在生產(chǎn)系統(tǒng)中嫌蚤,celery能夠一天處理上百萬的任務(wù)辐益。它的完整架構(gòu)圖如下:
組件介紹:
- Producer:調(diào)用了Celery提供的API、函數(shù)或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊列處理的都是任務(wù)生產(chǎn)者脱吱。
- Celery Beat:任務(wù)調(diào)度器智政,Beat進(jìn)程會讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊列箱蝠。
- Broker:消息代理续捂,又稱消息中間件,接受任務(wù)生產(chǎn)者發(fā)送過來的任務(wù)消息宦搬,存進(jìn)隊列再按序分發(fā)給任務(wù)消費方(通常是消息隊列或者數(shù)據(jù)庫)牙瓢。Celery目前支持RabbitMQ、Redis间校、MongoDB矾克、Beanstalk、SQLAlchemy撇簿、Zookeeper等作為消息代理聂渊,但適用于生產(chǎn)環(huán)境的只有RabbitMQ和Redis, 官方推薦 RabbitMQ。
- Celery Worker:執(zhí)行任務(wù)的消費者四瘫,通常會在多臺服務(wù)器運行多個消費者來提高執(zhí)行效率汉嗽。
- Result Backend:任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢找蜜。Celery默認(rèn)已支持Redis饼暑、RabbitMQ、MongoDB洗做、Django ORM弓叛、SQLAlchemy等方式。
??在客戶端和消費者之間傳輸數(shù)據(jù)需要序列化和反序列化诚纸。 Celery 支出的序列化方案如下所示:
準(zhǔn)備工作
??在本文中撰筷,我們使用的celery的消息代理和后端存儲數(shù)據(jù)庫都使用redis,序列化和反序列化選擇msgpack畦徘。
??首先毕籽,我們需要安裝redis數(shù)據(jù)庫抬闯,具體的安裝方法可參考:http://www.runoob.com/redis/redis-install.html 。啟動redis关筒,我們會看到如下界面:
在redis可視化軟件rdm中溶握,我們看到的數(shù)據(jù)庫如下:
里面沒有任何數(shù)據(jù)。
??接著蒸播,為了能夠在python中使用celery睡榆,我們需要安裝以下模塊:
- celery
- redis
- msgpack
這樣,我們的準(zhǔn)備工作就完畢了袍榆。
一個簡單的例子
??我們創(chuàng)建的工程名稱為proj胀屿,結(jié)構(gòu)如下圖:
??首先是主程序app_test.py,代碼如下:
from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')
if __name__ == '__main__':
app.start()
分析一下這個程序:
- "from celery import Celery"是導(dǎo)入celery中的Celery類蜡塌。
- app是Celery類的實例碉纳,創(chuàng)建的時候添加了proj.tasks這個模塊,也就是包含了proj/tasks.py這個文件馏艾。
- 把Celery配置存放進(jìn)proj/celeryconfig.py文件,使用app.config_from_object加載配置奴愉。
??接著是任務(wù)函數(shù)文件tasks.py琅摩,代碼如下:
import time
from proj.app_test import app
@app.task
def add(x, y):
time.sleep(1)
return x + y
tasks.py只有一個任務(wù)函數(shù)add,讓它生效的最直接的方法就是添加app.task這個裝飾器锭硼。add的功能是先休眠一秒房资,然后返回兩個數(shù)的和。
??接著是配置文件celeryconfig.py檀头,代碼如下:
BROKER_URL = 'redis://localhost' # 使用Redis作為消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務(wù)結(jié)果存在了Redis
CELERY_TASK_SERIALIZER = 'msgpack' # 任務(wù)序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 讀取任務(wù)結(jié)果一般性能要求不高轰异,所以使用了可讀性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過期時間
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內(nèi)容類型
??最后是調(diào)用文件diaoyong.py,代碼如下:
from proj.tasks import add
import time
t1 = time.time()
r1 = add.delay(1, 2)
r2 = add.delay(2, 4)
r3 = add.delay(3, 6)
r4 = add.delay(4, 8)
r5 = add.delay(5, 10)
r_list = [r1, r2, r3, r4, r5]
for r in r_list:
while not r.ready():
pass
print(r.result)
t2 = time.time()
print('共耗時:%s' % str(t2-t1))
在這個程序中暑始,我們調(diào)用了add函數(shù)五次搭独,delay()用來調(diào)用任務(wù)。
例子的運行
??到此為止廊镜,我們已經(jīng)理解了整個項目的結(jié)構(gòu)與代碼牙肝。
??接下來,我們嘗試著把這個項目運行起來嗤朴。
??首先配椭,我們需要啟動redis。接著雹姊,切換至proj項目所在目錄股缸,并運行命令:
celery -A proj.app_test worker -l info
界面如下:
然后,我們運行diaoyong.py吱雏,輸出的結(jié)果如下:
3
6
9
12
15
共耗時:1.1370790004730225
后臺輸出如下:
接著敦姻,我們看一下rdm中的數(shù)據(jù):
至此瘾境,我們已經(jīng)成功運行了這個項目。
??下面替劈,我們嘗試著對這個運行結(jié)果做些分析寄雀。首先,我們一次性調(diào)用了五次add函數(shù)陨献,但是運行的總時間才1秒多盒犹。這是celery異步運行的結(jié)果,如果是同步運行眨业,那么急膀,至少需要5秒多,因為每調(diào)用add函數(shù)一次龄捡,就會休眠一秒卓嫂。這就是celery的強(qiáng)大之處。
??從后臺輸出可以看到聘殖,程序會先將任務(wù)分發(fā)出來晨雳,每個任務(wù)一個ID,在后臺統(tǒng)一處理奸腺,處理完后會有相應(yīng)的結(jié)果返回餐禁,同時該結(jié)果也會儲存之后臺數(shù)據(jù)庫⊥徽眨可以利用ready()判斷任務(wù)是否執(zhí)行完畢帮非,再用result獲取任務(wù)的結(jié)果。
??本文項目的github地址為:https://github.com/percent4/celery_example 讹蘑。
??本次分享到此結(jié)束末盔,感謝閱讀~
??注意:本人現(xiàn)已開通微信公眾號: Python爬蟲與算法(微信號為:easy_web_scrape), 歡迎大家關(guān)注哦~~