最近項(xiàng)目中要實(shí)現(xiàn)消息推送功能倒脓,用推送消息的方式在商品下單的時(shí)候通知貨源發(fā)布者撑螺,后端實(shí)現(xiàn)是 ==python== ,框架是 ==flask== 崎弃。
閑話滯后甘晤,先直接按步驟實(shí)現(xiàn)吧!
參考:
Celery and the Flask Application Factory: (Celery and the Flask Application Factory)
- 安裝 ==Celery== 框架:
pip install celery
- 在項(xiàng)目的 ==app== 應(yīng)用的 ==init== 中初始化 ==celery==:
from celery import Celery
from app.config import base
celery = Celery(__name__, broker=base.CELERY_BROKER_URL)
- ==celery==的配置饲做,在 ==base== 里面:
# celery
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') or 'redis://:@127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') or 'redis://:@127.0.0.1:6379/0'
使用的是 ==redis== 的緩存线婚。
- 由于 ==celery== 執(zhí)行需要用到應(yīng)用程序 ==app== 的上下文,所以上面我們的初始化會(huì)無法執(zhí)行具體的操作盆均,需要我們?cè)趩?dòng) ==celery== 之后將 ==app.app_context().push()==塞弊。
import os
from app import celery, create_app
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
app.app_context().push()
- 啟動(dòng) ==celery== :
celery worker -A celery_worker.celery --loglevel=info # 啟動(dòng) celery -A celery應(yīng)用 --loglevel=info 顯示運(yùn)行日
- 那么,==celery== 的具體工作是怎么執(zhí)行的呢泪姨?不要著急游沿,下面就來開始:
直接上代碼:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import request, current_app
from app.myVerify import getargs
from app.common import success, error
from app import celery
import urllib
import httplib
import time
# 發(fā)送消息通知
@celery.task
def send_notifications(channel, content):
with current_app.app_context():
msg = {'appkey': current_app.config['COMMON_KEY'],
'channel': channel,
'content': content
}
requrl = current_app.config['REQURL']
test_data_urlencode = urllib.urlencode(msg)
header_data = {"Host": current_app.config['REQURL_HOST']}
conn = httplib.HTTPConnection(current_app.config['REQURL_HOST'])
conn.request(method="POST", url=requrl, body=test_data_urlencode, headers=header_data)
response = conn.getresponse()
res = response.read()
print ('發(fā)送推送消息成功')
先從上面步驟2中的初始化引入 ==celery==
然后在具體的方法中添加 ==@celery.task==