概述
?? celery是一個(gè)python實(shí)現(xiàn)的分布式任務(wù)執(zhí)行框架,本文為學(xué)習(xí)筆記:
- python 3.8.6
- celery 5.1.2
- 操作系統(tǒng) win10
- redis 3.0.504
安裝
- pip install celery
- pip install flower (任務(wù)監(jiān)控平臺(tái))
- pip install eventlet (win10 環(huán)境需要)
image.png
參考官網(wǎng)上面的例子叠纷,將celery 當(dāng)成一個(gè)獨(dú)立工程來維護(hù)握截,方便將已有的業(yè)務(wù)按統(tǒng)一規(guī)范寫成任務(wù)入口函數(shù)
目錄結(jié)構(gòu)如下:
image.png
celery.py 文件代碼:
from celery import Celery
app = Celery('proj',
broker='redis://:123456@10.2.13.167:6379/1',
backend='redis://:123456@10.2.13.167:6379/2',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
tasks.py 文件代碼如下:
from .celery import app
@app.task
def add(x, y):
print("call add")
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
啟動(dòng)服務(wù)
- 啟動(dòng) celery celery -A proj worker -l INFO -P eventlet
- 啟動(dòng) flower celery -A proj flower --address=127.0.0.1 --port=5566 # web監(jiān)控頁面打開方式 http://127.0.0.1:5566
直接ctrl+c 退出程序
在liunx上面 支持以 守護(hù)進(jìn)程方式啟動(dòng) celery multi start w1 -A proj -l INFO multi 其它參數(shù):重啟是 restart 停止是stop笛谦,還支持啟動(dòng)多個(gè)worker 這里是只啟動(dòng)了一個(gè)枪向,具體參數(shù)參考官方文檔
集成到flask框架中
?? celery集成到flask框架中不需要安裝任何擴(kuò)展,網(wǎng)上有其它的方式進(jìn)行集成彼硫,本人采用的是吃溅,直接調(diào)用 tasks.py 下面的任務(wù)函數(shù)的方式實(shí)現(xiàn)
代碼如下:
from flask import Flask, request
from proj.tasks import app, add, mul
from celery.result import AsyncResult
flask_app = Flask(__name__)
@flask_app.route('/add', methods=["POST"])
def celery_add():
"""
執(zhí)行add任務(wù)
POST http://127.0.0.1:5000/add
Body 為表單 參數(shù)為 args1,args2
:return:
"""
try:
args1, args2 = request.form.values()
print(f"獲取到的參數(shù):{args1},{args2}")
except Exception as e:
return str(e)
result = add.delay(int(args1), int(args2))
result_id = result.id
print(f"任務(wù)id:{result_id}")
return result_id
@flask_app.route('/get_result', methods=["GET"])
def get_result_id():
"""
根據(jù)id獲取任務(wù)結(jié)果
GET http://127.0.0.1:5000/get_result?id=fd7cb7dc-5b1b-4e07-8199-dd89a0c08a2a
:return:
"""
result_id = request.args.get('id')
async_result = AsyncResult(id=result_id, app=app)
result = ""
if async_result.successful():
result = async_result.get()
print(result)
# result.forget() # 將結(jié)果刪除
elif async_result.status == 'PENDING':
print('任務(wù)等待被執(zhí)行')
elif async_result.status == 'RETRY':
print('任務(wù)異常后重試')
elif async_result.status == 'STARTED':
print('任務(wù)執(zhí)行中')
elif async_result.failed():
print('任務(wù)執(zhí)行失敗')
return str(result)
if __name__ == "__main__":
flask_app.run("127.0.0.1", port=5000)
啟動(dòng)flask程序
通過postman進(jìn)行調(diào)用測(cè)試:
-
調(diào)用add函數(shù)溶诞,獲取任務(wù)id:
image.png -
獲取對(duì)應(yīng)id的結(jié)果:
image.png -
打開flower 查看調(diào)用過程 如下圖:
image.png