web項(xiàng)目郵件的發(fā)送必不可少掂器,這里推薦兩種在flask下的郵件的異步發(fā)送亚皂。
1:使用thread模塊:
優(yōu)點(diǎn):配置方便,代碼簡(jiǎn)單国瓮。
缺點(diǎn):flask項(xiàng)目部署在uswgi下的時(shí)候灭必,因?yàn)閜ythongil的原因會(huì)失效,造成郵件無(wú)法發(fā)送乃摹。
email.py禁漓,主要用于定義郵件
#文件:email.py
from flask_mail import Message, Mail
from flask import render_template
from exts import mail
from flask import current_app
from threading import Thread
#to 接收方
#sender 發(fā)送方
#subject 主題
#template 內(nèi)容
def send_async_mail(to,subject,template,**kwargs):#發(fā)送郵件
msg = Message(current_app.config["FLASKY_MAIL_SUBJECT_PREFIX"] + subject,sender = current_app.config['FLASKY_MAIL_SENDER'],recipients=[to])
# msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
mail.send(msg)
def send_async_email(app,msg):
with app.app_context():
for i in range(0,5):
try:
mail.send(msg)
return True
except Exception as e:
i =+ 1
print("err is {} .index is {}\n".format(e,i))
return False
def send_email(to,subject,template,**kwargs):#發(fā)送郵件
try:
msg = Message(current_app.config["FLASKY_MAIL_SUBJECT_PREFIX"] + subject,sender = current_app.config['FLASKY_MAIL_SENDER'],recipients=[to])
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
app = current_app._get_current_object() #這一句非常重要,因?yàn)閒lask的上下文特性孵睬,如果沒(méi)有這一句會(huì)造成郵件無(wú)法發(fā)送
thr = Thread(target=send_async_email,args= [app,msg])
thr.start()
return thr
except Exception as e:
print(e)
2:使用celery:
優(yōu)點(diǎn):flask項(xiàng)目部署在uswgi下的時(shí)候播歼,郵件仍可發(fā)送,代碼簡(jiǎn)單掰读。秘狞。
缺點(diǎn):配置相對(duì)復(fù)雜,
一.python層面工作
tasks.py磷支,主要用于定義郵件
#tasks.py
from celery import Celery
from flask_mail import Message,Mail
from flask import Flask,render_template
import config
mail = Mail()
app = Flask(__name__)
app.config.from_object(config)
mail.init_app(app)
# 運(yùn)行本文件:
# 在windows操作系統(tǒng)上:
# celery -A tasks.celery worker --pool=solo --loglevel=info
# 在類*nix操作系統(tǒng)上:
# celery -A tasks.celery worker --loglevel=info
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
@celery.task
def send_email(to,subject,template,**kwargs):#發(fā)送郵件
msg = Message(app.config["FLASKY_MAIL_SUBJECT_PREFIX"] + subject,sender = app.config['FLASKY_MAIL_SENDER'],recipients=[to])
msg.html = render_template(template + '.html', **kwargs)
mail.send(msg)
view層的調(diào)用
#view.py
......
from tasks import send_email
......
send_email.delay(email,"請(qǐng)查收您的驗(yàn)證碼","auth/email/confirm",email = email,captcha = captcha)
......
二.服務(wù)器配置
安裝redis
sudo apt-get install redis-server #安裝
sudo apt-get purge --auto-remove redis-server #完全卸載
ps aux|grep redis #啟動(dòng):redis安裝后谒撼,默認(rèn)會(huì)自動(dòng)啟動(dòng),可以通過(guò)以下命令查看:
sudo service redis-server start # 如果想自己手動(dòng)啟動(dòng)雾狈,可以通過(guò)以下命令進(jìn)行啟動(dòng):
sudo service redis-server stop # 停止
安裝celery
pip install celery #安裝
celery -A tasks.celery worker --loglevel=info #啟動(dòng)