介紹
隨著Web應用程序的發(fā)展和使用的增加低千,用例也變得多樣化贩据。我們現(xiàn)在正在建設和使用網站來執(zhí)行比以往更復雜的任務饱亮。其中一些任務立即轉發(fā)給用戶舍沙,而其他任務則需要進行進一步處理后轉發(fā)拂铡。
Celery可以幫助我們分解復雜的工作感帅,并由不同的機器來完成,以減輕一臺機器上的負載或減少完成時間失球。
在本文中豺撑,我們將探討Celery在Flask應用程序中安排后臺任務的使用黔牵,以減輕資源密集型任務的負擔并確定對最終用戶的響應的優(yōu)先級猾浦。
Flask
Flask 是一個基于 Python 的輕量級 Web 框架金赦,WSGI 工具箱采用 Werkzeug素邪,模板引擎使用 Jinja2。由于其不依賴于特殊的工具或庫磨确,并且沒有數(shù)據抽象層乏奥、表單驗證或是其他任何已有多種庫可以勝任的功能邓了,從而保持核心簡單骗炉、易于擴展句葵,而被定義為"微"框架乍丈。但是轻专,F(xiàn)lask 可以通過擴展來添加應用功能请垛。并且 Flask 具有自帶開發(fā)用服務器和 debugger递览、集成單元測試和 RESTful 請求調度 (request dispatching)绞铃、支持 secure cookie 的特點儿捧。
Jinja2 是基于 Python 的模版引擎,支持 Unicode颓影,具有集成的沙箱執(zhí)行環(huán)境并支持選擇自動轉義碎浇。Jinja2 擁有強大的自動 HTML 轉移系統(tǒng)璃俗,可以有效的阻止跨站腳本攻擊苟穆;通過模版繼承機制雳旅,對所有模版使用相似布局攒盈;通過在第一次加載時將源碼轉化為 Python 字節(jié)碼從而加快模版執(zhí)行時間沦童。
Redis
Redis 是一個使用 ANSIC 語言編寫、遵守 BSD 協(xié)議驼壶、Key-Value 的存儲系統(tǒng)热凹。擁有支持數(shù)據持久化般妙、支持 string碟渺、map突诬、list苫拍、set、sorted set 等數(shù)據結構和支持數(shù)據備份的特點旺隙。
Redis 會周期性地把更新的數(shù)據寫入磁盤或把修改操作寫入追加的記錄文件绒极,并且在此基礎上實現(xiàn)主從(master-slave)同步,因此數(shù)據可以從主服務器向任意數(shù)量的從服務器上同步蔬捷,從服務器可以是關聯(lián)其他從服務器的主服務器垄提。而且由于 Redis 完全實現(xiàn)了發(fā)布/訂閱機制,使得從數(shù)據庫在任何地方同步樹時,可訂閱一個頻道并接收主服務器完整的消息發(fā)布記錄铡俐。
MQ
MQ 消息隊列是一種應用程序的通信方法摘昌,應用程序可通過讀寫出入對立的消息進行通信。MQ 是一種消費者-生產者 (Producer-Customer)模式的實現(xiàn)高蜂。生產者-消費者模式由生產者稿饰、消費者和緩存區(qū)三個模塊構成惭笑。緩存區(qū)作為一個中介的存在,生產者將數(shù)據放入緩存區(qū),消費者從緩存區(qū)取出數(shù)據。本系統(tǒng)中,F(xiàn)lask 作為生產者,Salesforce 作為消費者悬垃,而 MQ 則是中間的緩存區(qū)廊佩。應用生產者-消費者模式能夠有效的降低兩者之間的耦合茁计,減少互相之間的依賴娜膘;由于緩存區(qū)的存在,消費者無需直接從生產者處獲取數(shù)據颤枪,能夠支持并發(fā)任務春缕、減少阻塞。
RabbitMQ 則是由 erlang 開發(fā)的 AMQP(高級消息隊列協(xié)議)的開源實現(xiàn),作為一個消息隊列管理工具與 Celery 集成后惫确,負責處理服務器之間的通信任務。RabbitMQ 的使用過程如下:
- 客端連接到消息隊列服務器并打開一個 channel。
- 客戶端聲明一個 exchange稚配、一個 queue臊岸,并分別設置相關屬性逻住。
- 客戶端使用 routing key 在 exchange 與 queue 之間綁定好關系。
- 客戶端投遞消息到 exchange,exchange 根據消息的 key 和設置好的 binding锣吼,將消息投遞到隊列中隧膘。
RabbitMQ 常用的 Exchange Type 有以下三種:
- Fanout:能夠將所有發(fā)送到該 exchange 的消息投遞到所有與它綁定的隊列中。
- Direct:把消息投遞到那些 binding key 與 routing key 完全匹配的隊列中。
- Topic:將消息路由到 binding key 與 routing key 模式匹配的隊列中。
什么是任務隊列丑孩?
任務隊列是一種分配小的工作單元或任務的機制,可以在不干擾大多數(shù)基于Web的應用程序的請求-響應周期的情況下執(zhí)行這些任務古拴。
任務隊列有助于委派工作盔然,否則將在等待響應時降低應用程序的速度鹅搪。它們還可以用于在主機或進程與用戶交互時處理資源密集型任務涂召。
這樣辅肾,與用戶的交互是一致的,及時的衰粹,并且不受工作量的影響。
什么是Celery笆怠?
Celery是一個異步任務隊列铝耻,它基于分布式消息傳遞來在計算機或線程之間分配工作負載。Celery由client蹬刷、broker瓢捉、worker組成频丘。
worker負責執(zhí)行放置在隊列中的任務或工作并轉發(fā)結果。使用Celery泡态,您可以同時擁有本地和遠程worker搂漠,這意味著可以通過Internet將工作委派給功能更強大的其他計算機。這樣兽赁,減輕了主機上的負載状答,并且有更多資源可用于處理用戶請求。
Celery的客戶端負責向worker發(fā)布作業(yè)刀崖,并使用消息代理與他們進行通信惊科。
此類消息代理的示例包括Redis和RabbitMQ。
為什么要使用Celery亮钦?
出于各種原因馆截,我們應該選擇Celery執(zhí)行我們的后臺任務。首先蜂莉,它具有很好的可擴展性蜡娶,允許按需添加更多的worker,以適應增加的負載或流量映穗。 Celery仍在積極開發(fā)中窖张,有簡潔的文檔和活躍的用戶社區(qū)。
另一個優(yōu)點是Celery易于集成到多個Web框架中蚁滋,其中大多數(shù)都具有促進集成的庫宿接。
它還提供了webhooks與其他Web應用程序交互。
Celery還可以使用各種消息代理辕录,這為我們提供了靈活性睦霎。建議使用RabbitMQ,但它也可以支持Redis和Beanstalk走诞。
我們將構建一個Flask應用程序副女,該應用程序允許用戶設置提醒,該提醒將在設定的時間傳遞到他們的電子郵件中蚣旱。
文件結構樹如下:
.
├── Pipfile # manage our environment
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py # main Flask application implementation
├── config.py # to host the configuration
├── requirements.txt # store our requirements
└── templates
└── index.html # the landing page
1 directory, 8 files
參考資料
- 本文最新版本地址
- 本文涉及的python測試開發(fā)庫 謝謝點贊碑幅!
- 本文相關海量書籍下載
- python工具書籍下載-持續(xù)更新
- Windows 支持 https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows
- 本文配套視頻 https://sn9.us/file/18113597-406685860 前面30分鐘
- https://www.agiliq.com/blog/2015/07/getting-started-with-celery-and-redis/
- https://redis.io/topics/rediscli
- https://stackoverflow.com/questions/19354826/how-to-get-all-keys-with-their-values-in-redis
- https://pypi.org/project/redis/
實現(xiàn)
我們將Redis用作消息代理,我們可以在其主頁上找到設置它的說明姻锁。 https://redis.io/topics/quickstart
實現(xiàn)
集成Celery
# Existing imports are maintained
from celery import Celery
# Flask app and flask-mail configuration truncated
# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)
# Add this decorator to our send_mail function
@client.task
def send_mail(data):
# Function remains the same
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
return render_template('index.html')
elif request.method == 'POST':
data = {}
data['email'] = request.form['email']
data['first_name'] = request.form['first_name']
data['last_name'] = request.form['last_name']
data['message'] = request.form['message']
duration = int(request.form['duration'])
duration_unit = request.form['duration_unit']
if duration_unit == 'minutes':
duration *= 60
elif duration_unit == 'hours':
duration *= 3600
elif duration_unit == 'days':
duration *= 86400
send_mail.apply_async(args=[data], countdown=duration)
flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")
return redirect(url_for('index'))
執(zhí)行
$ python app.py
$ celery worker -A app.client --loglevel=info
在ubuntu 18調試通過枕赵,如果大家需要代碼或者技術支持,請加圖片上的群位隶。