上篇文章我們學習了Flask框架——MongoEngine使用MongoDB數(shù)據(jù)庫,這篇文章我們學習Flask框架——基于Celery的后臺任務满俗。
Celery
在Web開發(fā)中蚂会,我們經(jīng)常會遇到一些耗時的操作淋样,例如:上傳/下載數(shù)據(jù)、發(fā)送郵件/短信胁住,執(zhí)行各種任務等等趁猴。這時我們可以使用分布式異步消息任務隊列去執(zhí)行這些任務。
Celery是一款非常簡單彪见、靈活儡司、可靠的分布式異步消息隊列工具,可以用于處理大量消息余指、實時數(shù)據(jù)以及任務調度捕犬。
Celery通過消息機制進行通信,一般使用中間人(Broker)作為客戶端和職程(Worker)調節(jié)酵镜。
其工作流程如下圖所示:
客戶端發(fā)送消息任務給中間人(Broker)咏闪,任務執(zhí)行單元(Celery Worker)監(jiān)控中間人中的任務隊列萨脑,當中間人有消息任務時就分配任務給任務執(zhí)行單元杜顺,任務執(zhí)行單元在后臺運行任務并返回請求采幌。
注意:Celery可以有多個職程(Worker)和中間人(Broker),用來提高Celery的高可用性以及橫向擴展能力靠粪。
Celery優(yōu)點:
- 簡單:上手比較簡單蜡吧,不需要配置文件就可以直接運行;
- 高可用:如果出現(xiàn)丟失連接或連接失敗占键,職程(Worker)和客戶端會自動重試昔善,并且中間人通過 主/主 主/從 的方式來進行提高可用性;
- 快速:單個 Celery 進行每分鐘可以處理數(shù)以百萬的任務捞慌,而且延遲僅為亞毫秒(使用 RabbitMQ、 librabbitmq 在優(yōu)化過后)柬批;
- 靈活:Celery 的每個部分幾乎都可以自定義擴展和單獨使用啸澡,例如自定義連接池、序列化方式氮帐、壓縮方式嗅虏、日志記錄方式、任務調度上沐、生產(chǎn)者皮服、消費者、中間人(Broker)等。
安裝
Celery安裝方式很簡單龄广,執(zhí)行如下命令即可:
pip install celery
這里我們使用redis作為中間人硫眯,執(zhí)行如下代碼安裝redis:
pip install redis
創(chuàng)建Celery程序
對比說明
(1)不使用Celery執(zhí)行耗時任務,創(chuàng)建一個名為test.py文件择同,其示例代碼如下:
import time
def add(a,b):
time.sleep(5) #休眠5秒
return a+b
if __name__ == '__main__':
print('開始執(zhí)行')
result=add(2,3) #調用add函數(shù)
print('執(zhí)行結束')
print(result)
運行test.py文件两入,運行結果如下圖:
(2)使用Celery執(zhí)行耗時任務,創(chuàng)建一個名為tasks.py文件敲才,示例代碼如下:
import time
from celery import Celery
celery = Celery( #實例化Celery對象
'tasks', #當前模塊名
broker='redis://localhost:6379/1', #使用redis為中間人
backend='redis://localhost:6379/2' #結果存儲
)
@celery.task() #使用異步任務裝飾器task
def add(a,b):
time.sleep(5) #休眠5秒
return a+b
if __name__ == '__main__':
print('開始執(zhí)行')
result=add.delay(2,3) #調用add方法并使用delay延時函數(shù)
print('執(zhí)行結束')
print(result)
實例化Celery對象裹纳,其中第一個參數(shù)為當前模塊名,第二個參數(shù)為中間人(Broker)的URL鏈接紧武,第三個參數(shù)為中間人結果放回的存儲URL鏈接剃氧,再調用add()方法時,需要使用delay延時函數(shù)阻星。
運行tasks.py文件朋鞍,運行結果如下圖所示:
當我們運行tasks.py文件時,發(fā)現(xiàn)程序一下子就運行結束并返回任務id迫横,
在終端執(zhí)行如下代碼運行Celery職程(Worker)服務:
celery -A tasks worker -l info
如下圖所示:
雖然職程已經(jīng)收到任務并且在分配到子進程運行了番舆,但是發(fā)現(xiàn)該任務沒有運行結束,這時因為Celery不支持在windows下運行任務矾踱,需要借助eventlet來完成恨狈,執(zhí)行如下安裝eventlet:
pip install eventlet
安裝成功后,執(zhí)行如下代碼運行Celery職程(Worker)服務:
celery -A tasks worker -l info -P eventlet -c 10
運行結果如下:
Celery配置
大多數(shù)情況下呛讲,使用默認的配置即可滿足我們的開發(fā)禾怠,不需要修改配置,當我們需要修改配置時贝搁,可以通過update進行配置吗氏,在上面的tasks.py添加如下代碼:
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
其中:
accept_content:允許的內容類型/序列化程序的白名單,如果收到不在此列表中的消息雷逆,則該消息將被丟棄并出現(xiàn)錯誤弦讽,默認只為json;
task_serializer:標識要使用的默認序列化方法的字符串膀哲,默認值為json往产;
result_serializer:結果序列化格式,默認值為json某宪;
timezone:配置Celery以使用自定義時區(qū)仿村;
enable_utc:啟用消息中的日期和時間,將轉換為使用 UTC 時區(qū)兴喂,與timezone連用蔼囊,當設置為 false 時焚志,將使用系統(tǒng)本地時區(qū)。
除了上面的配置參數(shù)畏鼓,Celery還提供了很多很多配置參數(shù)酱酬,大家可以在官方配置文檔中查看
Celery的配置信息比較多,通常情況下滴肿,我們會在tasks.py同級目錄下為創(chuàng)建Celery的配置文件岳悟, 這里命名為celeryconfig.py,示例代碼如下:
broker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
執(zhí)行如下代碼加載配置:
import celeryconfig
app.config_from_object('celeryconfig')
在Flask項目中使用Celery
首先創(chuàng)建一個名為mycelery.py文件泼差,該文件用來實例化Celery對象贵少,示例代碼如下:
from celery import Celery
def make_celery(app):
celery = Celery( #實例化Celery
'tasks',
broker='redis://localhost:6379/1', #使用redis為中間人
backend='redis://localhost:6379/2' #結果存儲
)
class ContextTask(celery.Task): #創(chuàng)建ContextTask類并繼承Celery.Task子類
def __call__(self, *args, **kwargs):
with app.app_context(): #和Flask中的app建立關系
return self.run(*args, **kwargs) #返回任務
celery.Task = ContextTask #異步任務實例化ContextTask
return celery #返回celery對象
首先自定義一個名為make_celery()方法,該方法傳入Flask程序中的app堆缘,在方法中實例化Celery滔灶,并創(chuàng)建一個名為ContextTask類用來和Flask中的app建立關系,最后返回celery吼肥。
創(chuàng)建名為tasks.py文件录平,該文件用來存放我們的耗時任務,示例代碼如下:
import time
from app import celery
@celery.task #使用異步任務裝飾器task
def add(x, y):
time.sleep(5) #休眠5秒
return x + y
這里我們通過休眠的方式來模擬耗時的下載任務缀皱。
Flask程序app.py文件示例代碼如下:
from flask import Flask
import tasks
from mycelery import make_celery
app = Flask(__name__)
celery = make_celery(app) #調用make_celery方法并傳入app使celery和app進行關聯(lián)
@app.route('/')
def hello():
tasks.add.delay(1,2) #調用tasks文件中的add()異步任務方法
return '請求正在后臺處理中斗这,您可以去處理其他事情'
if __name__ == '__main__':
app.run(debug=True)
app.py文件很簡單,就調用make_celery方法使celery和app進行關聯(lián)啤斗,并在視圖函數(shù)中使用tasks中的異步任務方法表箭。
在終端執(zhí)行如下代碼運行Celery職程(Worker)服務:
celery -A tasks worker -l info -P eventlet -c 10
啟動Flask程序,訪問http://127.0.0.1:5000/后在終端查Worker服務钮莲,如下圖所示:
這樣就成功使用Celery把耗時任務交給后臺來處理免钻,避免了不必要的耗時等待(如下載數(shù)據(jù)任務)。
當我們不使用Celery時崔拥,用戶在執(zhí)行耗時任務時极舔,用戶可能要等耗時任務完成后,才能進行其他操作链瓦。
好了拆魏,F(xiàn)lask框架——基于Celery的后臺任務就講到這里了,感謝觀看慈俯,下篇文章繼續(xù)學習Flask框架的其他知識渤刃。
公眾號:白巧克力LIN
該公眾號發(fā)布Python、數(shù)據(jù)庫肥卡、Linux溪掀、Flask事镣、自動化測試步鉴、Git等相關文章揪胃!