在Flask中使用Celery(上)

為了在后臺運行任務(wù),我們可以使用線程(或者進程)付翁。
使用線程(或者進程)的好處是保持處理邏輯簡潔简肴。但是,在需要可擴展的生產(chǎn)環(huán)境中百侧,我們也可以考慮使用Celery代替線程砰识。

Celery是什么?
Celery是個異步分布式任務(wù)隊列佣渴。
通過Celery在后臺跑任務(wù)并不像用線程那么的簡單辫狼,但是用Celery的話,能夠使應(yīng)用有較好的可擴展性辛润,因為Celery是個分布式架構(gòu)膨处。下面介紹Celery的三個核心組件。

  • 生產(chǎn)者(Celery client)砂竖。生產(chǎn)者(Celery client)發(fā)送消息真椿。在Flask上工作時,生產(chǎn)者(Celery client)在Flask應(yīng)用內(nèi)運行乎澄。

  • 消費者(Celery workers)突硝。消費者用于處理后臺任務(wù)。消費者(Celery client)可以是本地的也可以是遠程的三圆。我們可以在運行Flask的server上運行一個單一的消費者(Celery workers)狞换,當(dāng)業(yè)務(wù)量上漲之后再去添加更多消費者(Celery workers)。

  • 消息傳遞者(message broker)舟肉。生產(chǎn)者(Celery client)和消費者(Celery workers)的信息的交互使用的是消息隊列(message queue)修噪。Celery支持若干方式的消息隊列,其中最常用的是RabbitMQ和Redis路媚。

話不多說上代碼先黄琼。代碼中包含兩個例子:異步發(fā)送郵件;開始一或多個異步工作,然后在網(wǎng)頁上更新其進度脏款。

from flask import Flask
from celery import Celery
 
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
 
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

其中的URL參數(shù)告訴了Celery围苫,消息傳遞的服務(wù)的位置。如果消息傳遞者用的不是Redis撤师,或者Redis部署在其他機器剂府,那么需要做適當(dāng)?shù)母淖儭?br> 而通過調(diào)用 celery.conf.update()方法,我們能夠為Celery同步Flask上的配置剃盾。僅當(dāng)需要Celery存儲狀態(tài)即存儲結(jié)果時腺占,CELERY_RESULT_BACKEND 選項才會被用到。
下文第一個例子不需要存儲狀態(tài)以及存儲結(jié)果痒谴,但是第二個例子是需要的衰伯,所以一次配置好。

任何想要在后臺運行的任務(wù)积蔚,都需要使用裝飾者celery.task
進行包裝意鲸,如下。

@celery.task
def my_background_task(arg1, arg2):
    # some long running task here 
    return result 

現(xiàn)在Flask 應(yīng)用就能夠發(fā)起“在后臺執(zhí)行任務(wù)”的請求了尽爆,如下怎顾。
task = my_background_task.delay(10, 20)
其中delay() 方法是 apply_async() 的快捷調(diào)用。

此處用apply_async()同樣奏效教翩,如下杆勇。
task = my_background_task.apply_async(args=[10, 20])

相比于 delay() 方法,當(dāng)使用 apply_async() 方法時饱亿,我們能夠?qū)笈_任務(wù)的執(zhí)行方式有更多的控制蚜退。例如任務(wù)在何時執(zhí)行等。

舉例來說彪笼,下面的代碼可以讓任務(wù)在一分鐘之后開始運行钻注。
task = my_background_task.apply_async(args=[10, 20], countdown=60)

delay()apply_async() 的返回值是一個 AsyncResult 的對象。通過該對象配猫,能夠獲得任務(wù)的狀態(tài)幅恋。

例一:異步發(fā)郵件
第一個例子的需求比較廣泛:發(fā)電子郵件的時候無需阻塞主應(yīng)用線程。本例使用了擴展Flask-Mail泵肄。

網(wǎng)頁包含了一個Text類型的域的表單捆交。用戶需要在其中輸入郵箱地址,點擊提交腐巢,然后服務(wù)器向該地址發(fā)送一封測試郵件品追。該表單包含兩個提交按鈕,其中一個會立即發(fā)送郵件冯丙,而另一個會在點擊后延遲一分鐘后再發(fā)送肉瓦。html代碼如下。

<html>
  <head>
    <title>Flask + Celery Examples</title>
  </head>
  <body>
    <h1>Flask + Celery Examples</h1>
    <h2>Example 1: Send Asynchronous Email</h2>
    {% for message in get_flashed_messages() %}
    <p style="color: red;">{{ message }}</p>
    {% endfor %}
    <form method="POST">
      <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
      <input type="submit" name="submit" value="Send">
      <input type="submit" name="submit" value="Send in 1 minute">
    </form>
  </body></html>

用于發(fā)送郵件的Flask-Mail需要一些配置,主要與發(fā)送郵件的郵件服務(wù)器泞莉、發(fā)送郵件時間相關(guān)哪雕。考慮到用戶名密碼安全性鲫趁,作者將其放到了環(huán)境變量中斯嚎。

# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'

異步發(fā)送代碼如下。

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # send the email
    msg = Message('Hello from Flask',
                  recipients=[request.form['email']])
    msg.body = 'This is a test email sent from a background Celery task.'
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(msg)
        flash('Sending email to {0}'.format(email))
    else:
        # send in one minute
        send_async_email.apply_async(args=[msg], countdown=60)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))

用 session 將用戶鍵入的信息保存饮寞,以便頁面刷新時能夠使用該信息孝扛。
朋友們發(fā)現(xiàn)了,重點在發(fā)送郵件的代碼幽崩,使用的是Celery 的任務(wù)send_async_email,通過調(diào)用它的 delay() 方法或apply_async() 進行異步發(fā)送寞钥。

最后來看異步任務(wù)代碼慌申。

@celery.task
def send_async_email(msg):
     """Background task to send an email with Flask-Mail.""" 
    with app.app_context():
    mail.send(msg)

使用裝飾者 celery.task 包裝 send_async_email , 使其成為后臺運行的任務(wù)。因為Flask-Mail需要應(yīng)用的context理郑,所以需要在調(diào)用send方法前先創(chuàng)建應(yīng)用的context環(huán)境蹄溉。
另一點很重要,從異步調(diào)用的返回值是不會保存的您炉,所以應(yīng)用本身無法知道是否異步調(diào)用是否成功柒爵。在這個例子之中需要看Celery的消費者的輸出才能確定發(fā)送郵件過程是否有問題。
第一個例子比較簡單赚爵,我們起了后臺任務(wù)然后就不必再去管它了棉胀。很多應(yīng)用的需求與例子一相仿。

然而也會有一些應(yīng)用冀膝,需要監(jiān)控后臺任務(wù)的運行唁奢,獲得任務(wù)的結(jié)果。下面來看第二個例子窝剖。

例二:顯示狀態(tài)更新進度
用戶可以點擊按鈕以啟動一個或者多個長時間任務(wù)麻掸,此時在網(wǎng)頁使用ajax技術(shù)不斷輪詢服務(wù)器以更新所有的這些長時間任務(wù)們的狀態(tài)。
而對于每一個長時間任務(wù)赐纱,網(wǎng)頁上會有一個窗臺條脊奋、一個進度百分比、一個狀態(tài)消息與之對應(yīng)疙描,當(dāng)完成時會顯示相應(yīng)結(jié)果诚隙。

狀態(tài)更新時后臺任務(wù)代碼。

@celery.task(bind=True)
def long_task(self):
    """Background task that runs a long function with progress reports."""
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''
    total = random.randint(10, 50)
    for i in range(total):
        if not message or random.random() < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

代碼中作者在Celery 裝飾者中加入了 bind=True 參數(shù)淫痰,這使得Celery向函數(shù)中傳入了self參數(shù)最楷,因此在函數(shù)中能夠記錄狀態(tài)更新。
本例中隨機挑選了一些單詞作為狀態(tài)的更新,同時籽孙,選取隨機數(shù)作為每個后臺任務(wù)運行時間烈评。
self.update_state()方法用于指明 Celery如何接收任務(wù)更新。
Celery有很多內(nèi)建狀態(tài)比如STARTED , SUCCESS 等等犯建,當(dāng)然Celery也允許程序員自定義狀態(tài)讲冠。本例子中使用的是自定義狀態(tài),PROGRESS 适瓦。與PROGRESS 一起的還有metadata 竿开。 metadata 是一個字典,包含當(dāng)
前進度玻熙,任務(wù)大小否彩,以及消息。
當(dāng)循環(huán)跳出時嗦随,返回字典列荔,字典中包含任務(wù)的執(zhí)行結(jié)果。

long_task() 函數(shù)在 Celery消費者進程中運行枚尼。下面看一下Flask應(yīng)用如何啟動該后臺任務(wù)贴浙。

@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async() 
    return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}

用戶需要向/longtask 發(fā)送 POST 請求以觸發(fā)后臺任務(wù)執(zhí)行。服務(wù)器啟動任務(wù)并存儲返回值署恍。作者使用了狀態(tài)碼202,在REST API中有“請求正在處理中”的意思崎溃,而加入了Location頭則是為了生產(chǎn)者能夠獲取任務(wù)執(zhí)行時的狀態(tài)信息。url_for用于生成路由到taskstatus函數(shù)的url盯质,并且該url包含task id袁串,task id的值是task.id .

taskstatus 函數(shù)用于獲取后臺任務(wù)的更新狀態(tài)。

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        // job did not start yet
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return jsonify(response)

為了得到后臺任務(wù)產(chǎn)生的數(shù)據(jù)唤殴,使用了task id作為參數(shù)創(chuàng)建了一個task 對象般婆。
本函數(shù)產(chǎn)生了JSON響應(yīng),JSON響應(yīng)中的內(nèi)容與update_state()更新的一致朵逝。
我們使用task.state區(qū)分后臺任務(wù)的狀態(tài):本例有未運行蔚袍、未發(fā)生錯誤、發(fā)生錯誤三種狀態(tài)配名。
我們使用 task.info 訪問任務(wù)相關(guān)信息啤咽。而發(fā)生錯誤時, task.state 的狀態(tài)是 FAILURE 時渠脉,異常會包含在 task.info 之中宇整。

前端JS代碼
作者用的是nanobar.js實現(xiàn)進度條,用了jQuery的ajax芋膘。

<script src="http://cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<script src="http://cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>

啟動后臺任務(wù)的按鈕的JS代碼如下鳞青。

function start_long_task() {
        // add task status elements 
        div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
        $('#progress').append(div);

        // create a progress bar
        var nanobar = new Nanobar({
            bg: '#44f',
            target: div[0].childNodes[0]
        });

        // send ajax POST request to start background job
        $.ajax({
            type: 'POST',
            url: '/longtask',
            success: function(data, status, request) {
                status_url = request.getResponseHeader('Location');
                update_progress(status_url, nanobar, div[0]);
            },
            error: function() {
                alert('Unexpected error');
            }
        });
    }

其中被加入的HTML元素與任務(wù)的信息的對應(yīng)關(guān)系如下霸饲。

<div class="progress">
    <div></div>          <-- Progress bar 
    <div>0%</div>        <-- Percentage 
    <div>...</div>       <-- Status message 
    <div> </div>    <-- Result
</div><hr>

start_long_task() 函數(shù)通過ajax向 /longtask發(fā)送POST請求,使得后臺任務(wù)開始運行臂拓。
當(dāng)ajax的POST請求返回時厚脉,回調(diào)函數(shù)獲得響應(yīng),響應(yīng)中包含形如 /status/<task_id>的url, 其他函數(shù)(如update_progress )用此url從 taskstatus 函數(shù)獲取數(shù)據(jù)胶惰。
調(diào)用函數(shù) update_progress() 傻工,向函數(shù)傳入start_url 以及 nanoba r變量,用于生成進度條孵滞。

function update_progress(status_url, nanobar, status_div) {
        // send GET request to status URL
        $.getJSON(status_url, function(data) {
            // update UI
            percent = parseInt(data['current'] * 100 / data['total']);
            nanobar.go(percent);
            $(status_div.childNodes[1]).text(percent + '%');
            $(status_div.childNodes[2]).text(data['status']);
            if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
                if ('result' in data) {
                    // show result
                    $(status_div.childNodes[3]).text('Result: ' + data['result']);
                }
                else {
                    // something unexpected happened
                    $(status_div.childNodes[3]).text('Result: ' + data['state']);
                }
            }
            else {
                // rerun in 2 seconds
                setTimeout(function() {
                    update_progress(status_url, nanobar, status_div);
                }, 2000);
            }
        });
    }

update_progress函數(shù)向/status/<task_id>發(fā)送GET請求中捆,獲得json數(shù)據(jù)然后更新相應(yīng)的頁面元素。
當(dāng)后臺任務(wù)完成時坊饶,result會加載到頁面之中泄伪。如果沒有result的話,這就意味著任務(wù)的執(zhí)行以失敗告終幼东,此時任務(wù)的狀態(tài)是 FAILURE 臂容。
任當(dāng)后臺任務(wù)運行時,為了能夠持續(xù)獲得任務(wù)狀態(tài)并更新頁面根蟹,作者使用了定時器,定時器每個兩秒一更新直到后臺任務(wù)完成糟秘。

運行例子
讀者先安裝好virtualenv(強烈推薦简逮!但是virtualenv非必需安裝)。
下載代碼尿赚,安裝相應(yīng)庫散庶,如下。

1 $ git clone https://github.com/miguelgrinberg/flask-celery-example.git
2 $ cd flask-celery-example
3 $ virtualenv venv
4 $ source venv/bin/activate
5 (venv) $ pip install -r requirements.txt

未安裝virtualenv的話直接跳過第三行第四行命令凌净。

redis server端讀者自行安裝悲龟。安裝后運行啟動。
Celery 消費者也需要讀者運行冰寻,使用 celery命令须教。
郵件用戶名密碼自行設(shè)置。

$ export MAIL_USERNAME=<your-mail-username>
$ export MAIL_PASSWORD=<your-mail-password>
$ source venv/bin/activate(venv) 
$ celery worker -A app.celery --loglevel=info

Celery的 -A選項是應(yīng)用中的celer對象斩芭,與文章最開頭的代碼對應(yīng)轻腺。
--loglevel=info 則是讓日志內(nèi)容更為詳細。

最后啟動應(yīng)用划乖。
$ source venv/bin/activate(venv) $ python app.py

訪問http://localhost:5000/ 即可贬养。

原文鏈接 : http://blog.miguelgrinberg.com/post/using-celery-with-flask
譯文鏈接 : http://www.cnblogs.com/ifkite/p/4257721.html
姊妹篇鏈接:http://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市琴庵,隨后出現(xiàn)的幾起案子误算,更是在濱河造成了極大的恐慌仰美,老刑警劉巖,帶你破解...
    沈念sama閱讀 223,002評論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件儿礼,死亡現(xiàn)場離奇詭異咖杂,居然都是意外死亡,警方通過查閱死者的電腦和手機蜘犁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評論 3 400
  • 文/潘曉璐 我一進店門翰苫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人这橙,你說我怎么就攤上這事奏窑。” “怎么了屈扎?”我有些...
    開封第一講書人閱讀 169,787評論 0 365
  • 文/不壞的土叔 我叫張陵埃唯,是天一觀的道長。 經(jīng)常有香客問我鹰晨,道長墨叛,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,237評論 1 300
  • 正文 為了忘掉前任模蜡,我火速辦了婚禮漠趁,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘忍疾。我一直安慰自己闯传,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,237評論 6 398
  • 文/花漫 我一把揭開白布卤妒。 她就那樣靜靜地躺著甥绿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪则披。 梳的紋絲不亂的頭發(fā)上共缕,一...
    開封第一講書人閱讀 52,821評論 1 314
  • 那天,我揣著相機與錄音士复,去河邊找鬼图谷。 笑死,一個胖子當(dāng)著我的面吹牛判没,可吹牛的內(nèi)容都是我干的蜓萄。 我是一名探鬼主播,決...
    沈念sama閱讀 41,236評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼澄峰,長吁一口氣:“原來是場噩夢啊……” “哼嫉沽!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起俏竞,我...
    開封第一講書人閱讀 40,196評論 0 277
  • 序言:老撾萬榮一對情侶失蹤绸硕,失蹤者是張志新(化名)和其女友劉穎堂竟,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玻佩,經(jīng)...
    沈念sama閱讀 46,716評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡出嘹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,794評論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了咬崔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片税稼。...
    茶點故事閱讀 40,928評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖垮斯,靈堂內(nèi)的尸體忽然破棺而出郎仆,到底是詐尸還是另有隱情,我是刑警寧澤兜蠕,帶...
    沈念sama閱讀 36,583評論 5 351
  • 正文 年R本政府宣布扰肌,位于F島的核電站,受9級特大地震影響熊杨,放射性物質(zhì)發(fā)生泄漏曙旭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,264評論 3 336
  • 文/蒙蒙 一晶府、第九天 我趴在偏房一處隱蔽的房頂上張望桂躏。 院中可真熱鬧,春花似錦川陆、人聲如沸沼头。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至土至,卻和暖如春购对,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背陶因。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評論 1 274
  • 我被黑心中介騙來泰國打工骡苞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人楷扬。 一個月前我還...
    沈念sama閱讀 49,378評論 3 379
  • 正文 我出身青樓解幽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親烘苹。 傳聞我的和親對象是個殘疾皇子躲株,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,937評論 2 361

推薦閱讀更多精彩內(nèi)容