python異步任務(wù)處理庫(kù)celery
0.安裝方式
pip install celery
一、單文件模式 基本使用
下面先直接上代碼膘婶,再慢慢說(shuō)明
異步任務(wù)文件 :
task.py
from celery import Celery # 創(chuàng)建應(yīng)用APP tasks當(dāng)前文件名相當(dāng)于當(dāng)前文件被引入時(shí)的__name__ # borker 負(fù)責(zé)攜程隊(duì)列的中間人 負(fù)責(zé)協(xié)調(diào)消費(fèi)者和生產(chǎn)者 利用redis中的列表類型Lpush, Rpush也能完成 此處使用mq # backend 記錄任務(wù)處理結(jié)果的數(shù)據(jù)庫(kù)或者其他組件 # 如果要指定redis 端口賬號(hào)密碼的方式為密碼 地址 端口號(hào) 選擇數(shù)據(jù)庫(kù)號(hào) 選擇6 # 'redis://:密碼@127.0.0.1:6379/6' app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def add(x, y): # 一個(gè)異步實(shí)例阵谚,相當(dāng)于生產(chǎn)者 return x + y
此時(shí)創(chuàng)建了一個(gè)異步任務(wù)文件:
其他文件可以通過(guò) from tasks import add 導(dǎo)入這個(gè)異步實(shí)例
通過(guò) add.delay(a,b)傳入實(shí)參a,b進(jìn)行調(diào)用
需要通過(guò)命令行方式啟動(dòng)異步服務(wù)文件(當(dāng)前目錄執(zhí)行)
task.py
- celery -A tasks worker --loglevel=info
調(diào)用異步任務(wù)實(shí)例(消費(fèi)者)
main.py
form tasks import add # 放入中間人隊(duì)列等待任務(wù)進(jìn)程(消費(fèi)者)調(diào)度 result = add.delay(4慕蔚,5) # 打印任務(wù)的id執(zhí)行完畢會(huì)存到backend id可以用于異步地適時(shí)地獲取執(zhí)行結(jié)果 print(result) # <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d> # 通過(guò)ready查看是否任務(wù)執(zhí)行完畢 result.ready() # false 還未執(zhí)行完畢 # 延時(shí)一秒 等待 result.get(timeout=1) # 9
執(zhí)行完畢
現(xiàn)在讓我們來(lái)看看作為backend的redis發(fā)生了什么
連接redis數(shù)據(jù)庫(kù)
輸入一下命令
select 6(選擇的數(shù)據(jù)庫(kù)號(hào)碼 默認(rèn)0)
keys * (顯示所有的key)
是不是似曾相識(shí)呢梧乘?
沒錯(cuò)就是我們key的后綴部分就是我們的上次執(zhí)行的任務(wù)id
下面我們獲取一下key的值讓我們更了解整個(gè)機(jī)制演示keys
(由于沒有準(zhǔn)備文件趟济,我們就當(dāng)是9吧4鞑睢)
顯然這個(gè)key存放了本次異步任務(wù)執(zhí)行的結(jié)果信息
以上就是單個(gè)執(zhí)行與運(yùn)行機(jī)制了文件的執(zhí)行過(guò)程了
二送爸、多文件 目錄模式
這里提供一個(gè)實(shí)例 供大家參考
目錄結(jié)構(gòu)
init.py
這個(gè)沒什么好說(shuō)的了 空文件 識(shí)別為一個(gè)包
celery.py (APP引用文件)
from __future__ import absolute_import from celery import Celery # 當(dāng)前目錄名 任務(wù)實(shí)例所在的文件 app = Celery('celery_proj', include=['celery_proj.tasks']) # 引入配置文件的目錄 app.config_from_object('celery_proj.config') if __name__ == '__main__': app.start()
config.py (參數(shù)配置文件)
from __future__ import absolute_import from datetime import timedelta # 假設(shè)中間人和結(jié)果后端都使用redis 數(shù)據(jù)庫(kù)密碼為123456 CELERY_RESULT_BACKEND = 'redis://:123456@@127.0.0.1:6379/6' BROKER_URL = 'redis://:123456@@127.0.0.1:6379/5' ''' 設(shè)置定期執(zhí)行任務(wù) 每30秒添加一次隊(duì)列,一分鐘后執(zhí)行 (注釋不啟動(dòng)) CELERY_TIMEZONE = 'Asia/Shanghai' CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_proj.tasks.add', 'schedule': timedelta(minutes=1), 'args': (5,1) # task.add的調(diào)用參數(shù) }, } '''
tasks.py (任務(wù)實(shí)例文件暖释,消費(fèi)者模型)
from __future__ import absolute_import from celery_proj.celery import app @app.task def add(x,y): # 生產(chǎn)者函數(shù) 返回的結(jié)果會(huì)存儲(chǔ)到結(jié)果后端 用戶可以去結(jié)果后端backend中自行查找訪問(wèn)處理結(jié)果 # 我們?cè)趩挝募钠ヅ淠J较?已經(jīng)介紹backend發(fā)生了什么 return x+y
命令行啟動(dòng)方式
celery -A celery_proj worker -B -l info (多文件包模式需要添加-B)
celery multi start w1 -A celery_proj -B -l info --logfile = celerylog.log --pidfile = celerypid.pid (以守護(hù)進(jìn)程啟動(dòng) 停止改成stop就行了 日志指定為celerylog.log)