安裝:
pip install celery
步驟:
第一步:選一個broker
711.png
第二步:定義一個task
屏幕快照 2030.png
第三步:開啟worker
31.png
第四步:調用test方法
21.png
celery發(fā)送郵件
ext.py
from flask_cache import Cache
from flask_mail import Mail
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
mail = Mail()
cache = Cache(config={'CACHE_TYPE': 'redis'})
my_celery = Celery(__name__,borker='/redis:localhost:6379/')
@my_celery.task
def send_mail(email,username,url):
#只能從這里導入 不然則 循環(huán)導入
from App import create_app
app = create_app()
with app.app_context():
# 1. 創(chuàng)建一個Message對象
message = Message()
# 2. 配置Message
# 2.1 拿到post提交時填寫的郵箱地址
message.recipients = [email]
# 2.2 設置郵箱的主題
message.subject = '歡迎注冊1024網(wǎng)'
# 2.3 設置郵箱的內容
# http://127.0.0.1:5000/accounts/?token=
message.html = render_template('ActivatePage.html', username=username,
activate_url=url)
# 2.4 設置發(fā)送者。如果在settings里設置里MAIL_DEFAULT_SENDER配置項拴驮,可以不再設置sender屬性
# message.sender = 'chris_jiangwei@126.com'
# 3. 把Message對象發(fā)出去
mail.send(message)
def init_ext(app):
db.init_app(app=app)
Migrate(app=app, db=db)
mail.init_app(app=app)
cache.init_app(app=app)
UserAPI.py
import uuid
from flask import render_template, url_for
from flask_mail import Message
from flask_restful import Resource, reqparse, fields, marshal, marshal_with
from werkzeug.security import generate_password_hash
from App.ext import db, mail, cache
from App.models import User
parser = reqparse.RequestParser()
parser.add_argument('username', required=True, help='缺少用戶名')
parser.add_argument('password', required=True, help='缺少密碼')
parser.add_argument('email', required=True, help='缺少郵箱')
user_fields = {
'user_name': fields.String,
'e_mail': fields.String,
'是否激活': fields.Boolean(attribute='is_activity')
}
result_fields = {
'msg': fields.String,
'status': fields.Integer,
'data': fields.Nested(user_fields)
}
test_fields = {
'result': fields.String,
'hi': fields.String
}
class UerResource(Resource):
@marshal_with(test_fields)
def get(self):
# url = url_for('accountresource', _external=True, token='dfasdfk')
# print(url)
return {'result': 'hello', 'sss': 'mmmm'}
def post(self):
args = parser.parse_args()
username = args.get('username')
password = args.get('password')
email = args.get('email')
# 根據(jù)用戶名生成一個HASH值
token = str(uuid.uuid5(uuid.NAMESPACE_DNS, username))
user = User()
user.user_name = username
user.password = generate_password_hash(password)
user.e_mail = email
user.token = token
# save里有異常的處理機制
msg = user.save()
# 如果有異常映砖,直接返回
if msg.get('error'):
return msg
# 用戶寫入數(shù)據(jù)庫成功
cache.set(token, user.id, timeout=60)
# 發(fā)送激活郵件
url = url_for('accountresource', _external=True, token=token)
send_mail.delay(email,username,url)
# return {'msg': '注冊成功', 'status': 201, 'data': user}
return marshal({'msg': '注冊成功', 'status': 201, 'data': user}, result_fields)
啟動命令
celery -A App.ext:my_celery worker --loglevel=info