注意,這篇 Blog 嚴(yán)重參考了這兩篇文章:
- Using Celery With Flask: 寫了一個(gè)完整而且有意義的例子來展示如何在 Flask 中使用 Celery.
- Celery and the Flask Application Factory Pattern: 是上文的姊妹篇,描述的是更為真實(shí)的場(chǎng)景下,Celery 與 Flask Application Factory的結(jié)合使用伟葫。
Minimum Example
Celery 的一些設(shè)計(jì)和概念咙崎,與 Flask 很像弟跑,在 Flask 項(xiàng)目中集成 Celery 也很簡(jiǎn)單廊蜒,不像 Django 或其他框架需要擴(kuò)展插件。首先來看個(gè)最簡(jiǎn)單的例子 example.py:
import uuid
from flask import Flask, request, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name,broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_email(to, subject, content):
return do_send_email(to, subject, content)
@app.route('/password/forgot/', methods=['POST'])
def reset_password():
email = request.form['email']
token = str(uuid.uuid4())
content = u'請(qǐng)點(diǎn)擊鏈接重置密碼:http://example.com/password/reset/?token=%s' % token
send_email.delay(email, content)
return jsonify(code=0, message=u'發(fā)送成功')
if __name__ == '__main__': app.run()
啟動(dòng) Celery worker:
$ celery worker -A example.celery -l INFO
啟動(dòng) Web server:
$ python example.py
當(dāng)然胯盯,實(shí)際應(yīng)用在生產(chǎn)環(huán)境下懈费,不能直接用 Flask 自帶的 server,需要使用 Gunicorn 這樣的 WSGI 容器博脑,或 uWSGI. 而且 Celery worker 進(jìn)程和 Web server 進(jìn)程應(yīng)該用 supervisord 管理起來憎乙。
Becoming Bigger
這是個(gè)最簡(jiǎn)單的例子,實(shí)際應(yīng)用會(huì)比這個(gè)復(fù)雜很多:有很多模塊叉趣,更復(fù)雜的配置泞边,更多的 task 等。在這種情況下疗杉,F(xiàn)lask 推薦使用 Application Factory Pattern阵谚,也就是定義一個(gè) function,在這里創(chuàng)建 Flask app 對(duì)象,并且處理注冊(cè)路由(blueprints)梢什、配置 logging 等一系列初始化操作奠蹬。
下面我們看看在更大的 Flask 項(xiàng)目里,應(yīng)該如何使用 Celery.
項(xiàng)目結(jié)構(gòu)
首先來看一下整個(gè)項(xiàng)目的結(jié)構(gòu):
├── README.md
├── app
│ ├── __init__.py
│ ├── config.py
│ ├── forms
│ ├── models
│ ├── tasks
│ │ ├── __init__.py
│ │ └── email.py
│ └── views
│ │ ├── __init__.py
│ │ └── account.py
├── celery_worker.py
├── manage.py└── wsgi.py
這個(gè)圖里省略了很多細(xì)節(jié)嗡午,簡(jiǎn)單解釋一下:
項(xiàng)目的根目錄下囤躁,有個(gè)celery_worker.py的文件,這個(gè)文件的作用類似于wsgi.py翼馆,是啟動(dòng) Celery worker 的入口割以。
app 包里是主要業(yè)務(wù)代碼金度,其中 tasks 里定義里一系列的 task应媚,提供給其他模塊調(diào)用。
主要代碼猜极。
- app/config.py
class BaseConfig(object):
CELERY_BROKER_URL = 'redis://localhost:6379/2'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
CELERY_TASK_SERIALIZER = 'json'
BaseConfig是整個(gè)項(xiàng)目用到的配置的基類中姜,實(shí)際上還會(huì)派生出DevelopmentConfig,StagingConfig和ProductionConfig
等類。這里不討論配置的細(xì)節(jié)跟伏,也只關(guān)心和 Celery 相關(guān)的配置項(xiàng)丢胚。
- app/init.py
from celery import Celery
from flask import Flask
from app.config import BaseConfig
celery = Celery(__name__,broker=BaseConfig.CELERY_BROKER_URL)
def create_app():
app = Flask(__name__)
celery.conf.update(app.config) # 更新 celery 的配置
return app
- app/tasks/email.py
from flask import current_app
from celery.util.log import get_task_logger
from app import celery
logger = get_task_logger(__name__)
@celery.task
def send_email(to, subject, content):
app = current_app._get_current_object()
subject = app.config['EMAIL_SUBJECT_PREFIX'] + subject
logger.info('send message "%s" to %s', content, to)
return do_send_email(to, subject, content)
- app/views/account.py
import uuid
from flask import Blueprint, request,jsonify
from app.tasks.email import send_email
bp_account = Blueprint('account', __name__)
@bp_account.route('/password/forgot/', methods=['POST'])
def reset_password():
email = request.form['email']
token = str(uuid.uuid4())
content = u'請(qǐng)點(diǎn)擊鏈接重置密碼:http://example.com/password/reset/?token=%s' % token
send_email.delay(email, content)
return jsonify(code=0, message=u'發(fā)送成功')
- ceelry_worker.py
from app import create_app, celery
app = create_app()
app.app_context().push()
這個(gè)celery_worker.py文件有兩個(gè)操作:創(chuàng)建一個(gè) Flask 實(shí)例推入 Flask application context
第一個(gè)操作很簡(jiǎn)單,其實(shí)也是初始化了 celery 實(shí)例受扳。
第二個(gè)操作看起來有些奇怪携龟,實(shí)際上也很好理解。如果用過 Flask 就應(yīng)該知道 Flask 的 Application Context和 Request Context. Flask 一個(gè)很重要的設(shè)計(jì)理念是:在一個(gè) Python 進(jìn)程里可以運(yùn)行多個(gè)應(yīng)用(application)勘高,當(dāng)存在多個(gè) application 時(shí)可以通過current_app
獲取當(dāng)前請(qǐng)求所對(duì)應(yīng)的 application.current_app
綁定的是當(dāng)前 request 的 application 的引用峡蟋,在非 request-response 環(huán)境里,是沒有request context
的华望,所以調(diào)用current_app
就會(huì)拋出異常(RuntimeError: working outside of application context)
蕊蝗。創(chuàng)建一個(gè) request context 沒有必要,而且消耗資源赖舟,所以就引入了 application context.app.app_context().push()
會(huì)推入一個(gè) application context
蓬戚,后續(xù)所有操作都會(huì)在這個(gè)環(huán)境里執(zhí)行,直到進(jìn)程退出宾抓。因此子漩,如果在 tasks 里用到了current_app
或其它需要 application context 的東西,就一定需要這樣做石洗。(默認(rèn)情況下 Celery 的 pool 是 prefork幢泼,也就是多進(jìn)程,現(xiàn)在這種寫法沒有問題劲腿;但是如果指定使用 gevent旭绒,是沒用的。這種情況下有別的解決方案,以后會(huì)寫文章討論挥吵。)
運(yùn)行
在項(xiàng)目的根路徑下啟動(dòng) Celery worker:
$ celery worker -A celery_worker.celery -l INFO
總結(jié)
上面兩個(gè)例子重父,實(shí)際上主要的差別就是初始化方式和模塊化,還有需要注意 Flask 的 application context 問題忽匈。文章內(nèi)容比較簡(jiǎn)單房午,文中的一些鏈接是很好的擴(kuò)展和補(bǔ)充,值得一看
丹允。