詳解python3+flask+celery+redis
Celery是什么留晚?
Celery是個(gè)異步分布式任務(wù)隊(duì)列。
通過Celery在后臺(tái)跑任務(wù)并不像用線程那么的簡單澈蝙,但是用Celery的話,能夠使應(yīng)用有較好的可擴(kuò)展性,因?yàn)镃elery是個(gè)分布式架構(gòu)甘苍。下面介紹Celery的三個(gè)核心組件。
生產(chǎn)者(Celery client)烘豌。生產(chǎn)者(Celery client)發(fā)送消息载庭。在Flask上工作時(shí),生產(chǎn)者(Celery client)在Flask應(yīng)用內(nèi)運(yùn)行扇谣。
消費(fèi)者(Celery workers)昧捷。消費(fèi)者用于處理后臺(tái)任務(wù)。消費(fèi)者(Celery client)可以是本地的也可以是遠(yuǎn)程的罐寨。我們可以在運(yùn)行Flask的server上運(yùn)行一個(gè)單一的消費(fèi)者(Celery workers)靡挥,當(dāng)業(yè)務(wù)量上漲之后再去添加更多消費(fèi)者(Celery workers)。
消息傳遞者(message broker)鸯绿。生產(chǎn)者(Celery client)和消費(fèi)者(Celery workers)的信息的交互使用的是消息隊(duì)列(message queue)跋破。Celery支持若干方式的消息隊(duì)列,其中最常用的是RabbitMQ和Redis.
話不多說上代碼先瓶蝴。
一毒返、基本框架結(jié)構(gòu)
二、重要文件配置如下
在Flask中集成celery需要做到兩點(diǎn):
創(chuàng)建celery的實(shí)例對象的名字必須是flask應(yīng)用程序app的名字,否則celery啟動(dòng)會(huì)失斚鲜帧拧簸;
celery必須能順利加載初始化文件
1、__init__.py文件 (初始化flask與celery)
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from config import *
import pymysql
pymysql.install_as_MySQLdb()
db= SQLAlchemy()
from celery import Celery
# Celery相關(guān)配置
CELERY_RESULT_BACKEND= "redis://localhost:6379/0"
CELERY_BROKER_URL= "redis://localhost:6379/0"
def create_app(config_name):
? ? app= Flask(__name__)
????app.config.from_object(config[config_name])
????config[config_name].init_app(app)
????db.init_app(app)
????register_blueprint(app)
????return app
def make_celery(app=None):
? ? app= app or create_app(os.getenv('FLASK_CONFIG')or 'default')
? ? ##在第一階段的基礎(chǔ)上 開始使用celery 任務(wù)調(diào)度男窟,我這使用 redis 做為緩存服務(wù)器盆赤,安裝配置redis 這? ? ? 里不再贅述
????celery= Celery(__name__,broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)
????celery.conf.update(app.config)
????TaskBase= celery.Task
????class ContextTask(TaskBase):
? ? ? ? abstract= True
? ? ? ? def __call__(self,*args,**kwargs):
? ? ? ? ? ? with app.app_context():
? ? ? ? ? ? ? ? return TaskBase.__call__(self,*args,**kwargs)
????celery.Task= ContextTask
????return celery
def register_blueprint(app):
? ? from app.mainimport main
????app.register_blueprint(main)
????from app.mailimport mail
????app.register_blueprint(mail)
????from app.testsimport tests
????app.register_blueprint(tests)
2、tasks.py文件
"""
執(zhí)行的任務(wù)文件
"""
from .import make_celery
celery= make_celery(app=None)
@celery.task()
def add_together(a,b):
? ? return a + b
@celery.task()
def print_hello():
? ? print('Hello World!')
3.config.py 項(xiàng)目的配置文件
import os
basedir= os.path.abspath(os.path.dirname(__file__))
class config:
? ? SECRET_KEY= os.environ.get('SECRET_KEY')or 'this is a secret string'
? ? SQLALCHEMY_TRACK_MODIFICATIONS= True
? ? @staticmethod
? ? def init_app(app):
? ? ? ? pass
class DevelopmentConfig(config):
? ? DEBUG= True
? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'
class TestingConfig(config):
? ? TESTING= True
? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'
class ProductionConfig(config):
? ? SQLALCHEMY_DATABASE_URI= 'mysql+pymysql://username:pwd@sqldbadress/db'
config= {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
4歉眷、manage.py (flask框架項(xiàng)目啟動(dòng)運(yùn)行文件)
import os
from appimport create_app, db
from flask_scriptimport Manager, Shell
from flask_migrateimport Migrate, MigrateCommand
app= create_app(os.getenv('FLASK_CONFIG')or 'default')
manager= Manager(app)
migrate= Migrate(app, db)
def make_shell_context():
? ? return dict(app=app,db=db)
manager.add_command("shell",Shell(make_context=make_shell_context))
manager.add_command('db', MigrateCommand)
if __name__== '__main__':
? ? manager.run()
5牺六、views.py 文件 (flask主要接口業(yè)務(wù))
from .import main
from flaskimport Flask,request, jsonify
from app.tasksimport *
@main.route("/api/task_start",methods=['POST'])
def task_start():
? ? result= add_together.delay(10,20)
????print(result.wait())
????return jsonify({"msg":"Welcome to my app!"})
6、啟動(dòng)項(xiàng)目步驟:
1)啟動(dòng)falsk框架:python manage.py runserver -h 127.0.0.1 -p 8090
2)?啟動(dòng) Celery Worker::??celery -A app.tasks worker --loglevel=info
注意:如果flask和 celery 聯(lián)合用的時(shí)候發(fā)現(xiàn)報(bào)了個(gè)錯(cuò)誤:
NotImplementedError: No result backend is configured.
用的時(shí)候是這么用的:
CELERY_RESULT_BACKEND= "redis://localhost:6379/0"
CELERY_BROKER_URL= "redis://localhost:6379/0"
celery= Celery(__name__,broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND)