Python異步任務(wù)之Celery

Celery

在程序運(yùn)行過程中,經(jīng)常會(huì)遇到一些耗時(shí)耗資源的任務(wù)对蒲,為了避免這些任務(wù)阻塞主進(jìn)程的運(yùn)行,我們會(huì)采用多線程或異步任務(wù)去處理。比如在Web中需要對(duì)新注冊(cè)的用戶發(fā)一封激活郵件來(lái)驗(yàn)證賬戶卿拴,而發(fā)郵件本身是一個(gè)IO阻塞式任務(wù),如果把它直接放到應(yīng)用中梨与,就會(huì)阻塞主程序的運(yùn)行堕花,用戶需要等待郵件發(fā)送完成后才能進(jìn)行下一步操作。一種解決辦法是為每個(gè)發(fā)郵件任務(wù)新開一個(gè)線程去執(zhí)行粥鞋,當(dāng)線程很多時(shí)這也不是很好的解決辦法缘挽;更好的方式是在業(yè)務(wù)邏輯中觸發(fā)一個(gè)發(fā)郵件的異步任務(wù),把待發(fā)送的任務(wù)都發(fā)往任務(wù)隊(duì)列呻粹,主程序可以繼續(xù)往下運(yùn)行壕曼。

Celery是一個(gè)強(qiáng)大的分布式任務(wù)隊(duì)列,他可以讓任務(wù)的執(zhí)行完全脫離主程序等浊,甚至可以把任務(wù)分配到其他主機(jī)上運(yùn)行腮郊。我們通常使用它來(lái)實(shí)現(xiàn)異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。它的架構(gòu)組成如下圖:

celery架構(gòu)組成圖

可以看到筹燕, Celery 主要包含以下幾個(gè)模塊:

  • 任務(wù)模塊

    充當(dāng)任務(wù)生產(chǎn)者轧飞,包含異步任務(wù)和定時(shí)任務(wù)。其中庄萎,異步任務(wù)通常在業(yè)務(wù)邏輯中被觸發(fā)并發(fā)往任務(wù)隊(duì)列踪少,而定時(shí)任務(wù)由 Celery Beat 進(jìn)程周期性地將任務(wù)發(fā)往任務(wù)隊(duì)列。

  • 消息中間件 Broker

    Broker 糠涛,即為任務(wù)調(diào)度隊(duì)列援奢,接收任務(wù)生產(chǎn)者發(fā)來(lái)的消息,將任務(wù)存入隊(duì)列忍捡。 Celery 本身不提供隊(duì)列服務(wù)集漾,官方推薦使用 RabbitMQ 和 Redis 等。

  • 任務(wù)執(zhí)行單元 Worker

    充當(dāng)任務(wù)消費(fèi)者砸脊。Worker 是執(zhí)行任務(wù)的處理單元具篇,它實(shí)時(shí)監(jiān)控消息隊(duì)列,獲取隊(duì)列中調(diào)度的任務(wù)凌埂,并執(zhí)行它驱显。

  • 任務(wù)結(jié)果存儲(chǔ) Backend

    Backend 用于存儲(chǔ)任務(wù)的執(zhí)行結(jié)果,以供查詢。同消息中間件一樣埃疫,存儲(chǔ)也可使用 RabbitMQ, Redis 和 MongoDB 等伏恐。

異步任務(wù)

使用 Celery 實(shí)現(xiàn)異步任務(wù)主要包含三個(gè)步驟:

  1. 創(chuàng)建一個(gè) Celery 實(shí)例
  2. 啟動(dòng) Celery Worker
  3. 應(yīng)用程序調(diào)用異步任務(wù)

項(xiàng)目結(jié)構(gòu):

項(xiàng)目結(jié)構(gòu)

init.py文件中創(chuàng)建一個(gè)Celery實(shí)例:

from celery import Celery

cele = Celery('demo')                                # 創(chuàng)建 Celery 實(shí)例
cele.config_from_object('app.celery_config')         # 通過 Celery 實(shí)例加載配置模塊

Celery配置文件,celery_config.py

# encoding: utf-8
# Author: Timeashore

# celery
BROKER_URL = 'redis://localhost:6379/0'                 # 使用Redis作為消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'      # 把任務(wù)結(jié)果存在了Redis

CELERY_TIMEZONE='Asia/Shanghai'                         # 指定時(shí)區(qū)栓霜,默認(rèn)是 UTC

CELERY_TASK_SERIALIZER = 'pickle'                       # 任務(wù)序列化和反序列化使用pickle方案
CELERY_RESULT_SERIALIZER = 'json'                       # 讀取任務(wù)結(jié)果一般性能要求不高翠桦,所以使用了可讀性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24               # 任務(wù)過期時(shí)間,不建議直接寫86400胳蛮,應(yīng)該讓這樣的magic數(shù)字表述更明顯
CELERY_ACCEPT_CONTENT = ['json','pickle']               # 指定接受的內(nèi)容類型

CELERY_IMPORTS = (                                  # 指定導(dǎo)入的任務(wù)模塊
    # 'app.task1',
    # 'app.task2'
    'app.email'
)

為了簡(jiǎn)單起見销凑,對(duì)于 Broker 和 Backend ,這里都使用 redis 仅炊。
在CELERY_IMPORTS中指定task任務(wù)文件斗幼。

task任務(wù)文件,email.py

import time
from app import cele

@cele.task
def havefun(args):
    # do something
    # ...
    
    
    # 模擬執(zhí)行耗時(shí)任務(wù)
    print u"開始執(zhí)行celery任務(wù)"
    time.sleep(10)
    print u"celery任務(wù)結(jié)束"
    
    return args

被@cele裝飾的函數(shù)都成為可被Celery調(diào)度的任務(wù)茂洒。

在應(yīng)用程序中觸發(fā)執(zhí)行celery異步任務(wù)孟岛,view.py

from ..email import havefun

@main.route('/test_module', methods=['GET','POST'])
def test_module():
    # do something
    # ...
    
    # 調(diào)用異步任務(wù)
    havefun.delay([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    # havefun.apply_async([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])  # 和delay效果一樣
    
    return render_template('fake_celery_task.html')

啟動(dòng)程序

1,在 app 同級(jí)目錄下督勺,啟動(dòng) Celery Worker 進(jìn)程

celery -A app worker --loglevel=info

其中:

  • 參數(shù) -A 指定了 Celery 實(shí)例的位置,本例是在__init__.py中斤贰, Celery 會(huì)自動(dòng)在該文件中尋找 Celery 對(duì)象實(shí)例智哀,當(dāng)然,我們也可以自己指定荧恍;
  • 參數(shù) --loglevel 指定了日志級(jí)別瓷叫,默認(rèn)為 warning ,也可以使用 -l info 來(lái)表示送巡;

在生產(chǎn)環(huán)境中摹菠,我們通常會(huì)使用 Supervisor 來(lái)控制 Celery Worker 進(jìn)程。

啟動(dòng)成功后骗爆,控制臺(tái)會(huì)顯示如下輸出:

image

2次氨,運(yùn)行后臺(tái)任務(wù)調(diào)用Celery任務(wù)結(jié)果:

使用 delay() 方法將任務(wù)發(fā)送到消息中間件( Broker ), Celery Worker 進(jìn)程監(jiān)控到該任務(wù)后摘投,就會(huì)進(jìn)行執(zhí)行煮寡。我們將窗口切換到 Worker 的啟動(dòng)窗口,會(huì)看到多了兩條日志:

image

這說(shuō)明任務(wù)已經(jīng)被調(diào)度并執(zhí)行成功犀呼。

另外幸撕,我們還可以在Python shell中使用,如果想獲取執(zhí)行后的結(jié)果外臂,可以這樣做:

>>> from app.email import havefun
>>> havefun.delay([1,2,3,4,5])
<AsyncResult: 651c5beb-983e-4df3-ae0e-aaad2a758239>
>>> result = havefun.delay([1,2,3,4,5])
>>> result.ready()  # 使用 ready() 判斷任務(wù)是否執(zhí)行完畢
False
>>> result.ready()
True
>>> result.get()    # 使用 get() 獲取任務(wù)結(jié)果
[1, 2, 3, 4, 5]
>>>

需要注意坐儿,如果在views.py中把函數(shù)返回值賦值給一個(gè)變量,那么主應(yīng)用程序也會(huì)被阻塞,需要等待異步任務(wù)返回的結(jié)果貌矿。因此累铅,實(shí)際使用中,不需要把結(jié)果賦值站叼。


在應(yīng)用程序的views.py中我們使用了delay()和apply_async()兩種調(diào)用后臺(tái)任務(wù)的方法娃兽,兩者有什么區(qū)別呢?

事實(shí)上尽楔,delay() 方法封裝了 apply_async()投储,delay函數(shù)如下:

    def delay(self, *partial_args, **partial_kwargs):
        return self.apply_async(partial_args, partial_kwargs)

apply_async()函數(shù)原型如下:

    def apply_async(self, args=(), kwargs={}, **options):
        try:
            _apply = self._apply_async
        except IndexError:  # no tasks for chain, etc to find type
            return
        # For callbacks: extra args are prepended to the stored args.
        if args or kwargs or options:
            args, kwargs, options = self._merge(args, kwargs, options)
        else:
            args, kwargs, options = self.args, self.kwargs, self.options
        return _apply(args, kwargs, **options)

apply_async() 支持更多的參數(shù),常用的參數(shù)如下:

  1. countdown :指定多少秒后執(zhí)行任務(wù)
havefun.apply_async(args=(1,), countdown=15)    # 15 秒后執(zhí)行任務(wù)

執(zhí)行后阔馋,多了兩條日志文件:

[2018-06-05 16:27:16,592: INFO/MainProcess] Received task: app.email.havefun[d1c4ab89-4fcc-491c-bdd4-b5d0b4faab94]
eta:[2018-06-05 16:27:31.590000+08:00]

表示函數(shù)在第16 秒被加入到消息隊(duì)列玛荞,第二行eta表示程序會(huì)等到第31 秒才去執(zhí)行這個(gè)任務(wù)。

  1. eta (estimated time of arrival):指定任務(wù)被調(diào)度的具體時(shí)間呕寝,參數(shù)類型是 datetime
from datetime import datetime, timedelta

# 當(dāng)前 UTC 時(shí)間再加 10 秒后執(zhí)行任務(wù)
havefun.apply_async(args=(1,), eta=datetime.utcnow() + timedelta(seconds=10))
  1. expires :任務(wù)過期時(shí)間勋眯,參數(shù)類型可以是 int ,也可以是 datetime
havefun.apply_async(args=(1,), expires=10)    # 10 秒后過期

如當(dāng)前參數(shù)設(shè)置滿足不了需求下梢,請(qǐng)查閱官方文檔客蹋。

定時(shí)任務(wù)

Celery 除了可以執(zhí)行異步任務(wù),也支持執(zhí)行周期性/定時(shí)任務(wù)( Periodic Tasks )孽江。 Celery Beat 進(jìn)程通過讀取配置文件的內(nèi)容讶坯,周期性地/定時(shí)將任務(wù)發(fā)往任務(wù)隊(duì)列。

同樣的岗屏,我們還使用上述的目錄結(jié)構(gòu)辆琅;不同的是,我們只需要在Celery配置文件 celery_config.py 文件中添加配置 CELERYBEAT_SCHEDULE 字段并在 CELERY_IMPORTS 中導(dǎo)入任務(wù)模塊就可以實(shí)現(xiàn)定時(shí)任務(wù)这刷,具體如下:

# encoding: utf-8
# Author: Timeashore

from datetime import timedelta
from celery.schedules import crontab

# celery
BROKER_URL = 'redis://localhost:6379/0'                 # 使用Redis作為消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'      # 把任務(wù)結(jié)果存在了Redis

CELERY_TIMEZONE='Asia/Shanghai'                         # 指定時(shí)區(qū)婉烟,默認(rèn)是 UTC

CELERY_TASK_SERIALIZER = 'pickle'                       # 任務(wù)序列化和反序列化使用pickle方案
CELERY_RESULT_SERIALIZER = 'json'                       # 讀取任務(wù)結(jié)果一般性能要求不高,所以使用了可讀性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24               # 任務(wù)過期時(shí)間暇屋,不建議直接寫86400似袁,應(yīng)該讓這樣的magic數(shù)字表述更明顯
CELERY_ACCEPT_CONTENT = ['json','pickle']               # 指定接受的內(nèi)容類型

CELERY_IMPORTS = (                                      # 指定導(dǎo)入的任務(wù)模塊
    'app.task1',
    'app.task2'
    'app.email'
)

# schedules
CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
         'task': 'app.task1.add',
         'schedule': timedelta(seconds=30),          # 每 30 秒一次
         # 'schedule': timedelta(minutes=1),         # 每 1 分鐘一次
         # 'schedule': timedelta(hours=4),           # 每 4 小時(shí)一次
         'args': (5, 8)                              # 任務(wù)函數(shù)參數(shù)
    },
    'multiply-at-some-time': {
        'task': 'app.task2.multiply',
        'schedule': crontab(hour=9, minute=50),      # 每天早上 9 點(diǎn) 50 分執(zhí)行一次
        'args': (3, 7)                               # 任務(wù)函數(shù)參數(shù)
    }
}

我們還需在 task 任務(wù) email.py 同級(jí)目錄下添加兩個(gè)任務(wù)文件 task1.py , task2.py 。

task1.py

import time
from app import cele

@cele.task
def add(x, y):
    time.sleep(2)
    return x + y

task2.py

import time
from app import cele

@cele.task
def multiply(x, y):
    time.sleep(2)
    return x * y

運(yùn)行

關(guān)閉上次的程序率碾,重新啟動(dòng) Celery Worker 進(jìn)程

celery -A app worker --loglevel=info

然后叔营,app同級(jí)目錄,啟動(dòng) Celery Beat 進(jìn)程所宰,定時(shí)將任務(wù)發(fā)送到 Broker

celery beat -A app
image

手動(dòng)調(diào)用 2 次 Celery 任務(wù), havefun():

image

查看日志文件绒尊,在此為了好展示,僅添加了 task1.py 文件并設(shè)置 1 分鐘執(zhí)行一次:

image

Celery 周期性任務(wù)也有多個(gè)配置項(xiàng)仔粥,如當(dāng)前設(shè)置滿足不了需求婴谱,請(qǐng)查閱官方文檔蟹但。

END!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谭羔,一起剝皮案震驚了整個(gè)濱河市华糖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌瘟裸,老刑警劉巖客叉,帶你破解...
    沈念sama閱讀 222,252評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異话告,居然都是意外死亡兼搏,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門沙郭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)佛呻,“玉大人,你說(shuō)我怎么就攤上這事病线∠胖” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵送挑,是天一觀的道長(zhǎng)绑莺。 經(jīng)常有香客問我,道長(zhǎng)让虐,這世上最難降的妖魔是什么紊撕? 我笑而不...
    開封第一講書人閱讀 59,869評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮赡突,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘区赵。我一直安慰自己惭缰,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,888評(píng)論 6 398
  • 文/花漫 我一把揭開白布笼才。 她就那樣靜靜地躺著漱受,像睡著了一般。 火紅的嫁衣襯著肌膚如雪骡送。 梳的紋絲不亂的頭發(fā)上昂羡,一...
    開封第一講書人閱讀 52,475評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音摔踱,去河邊找鬼虐先。 笑死,一個(gè)胖子當(dāng)著我的面吹牛派敷,可吹牛的內(nèi)容都是我干的蛹批。 我是一名探鬼主播撰洗,決...
    沈念sama閱讀 41,010評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼腐芍!你這毒婦竟也來(lái)了差导?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,924評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤猪勇,失蹤者是張志新(化名)和其女友劉穎设褐,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體泣刹,經(jīng)...
    沈念sama閱讀 46,469評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡助析,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,552評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了项玛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片貌笨。...
    茶點(diǎn)故事閱讀 40,680評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖襟沮,靈堂內(nèi)的尸體忽然破棺而出锥惋,到底是詐尸還是另有隱情,我是刑警寧澤开伏,帶...
    沈念sama閱讀 36,362評(píng)論 5 351
  • 正文 年R本政府宣布膀跌,位于F島的核電站,受9級(jí)特大地震影響固灵,放射性物質(zhì)發(fā)生泄漏捅伤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,037評(píng)論 3 335
  • 文/蒙蒙 一巫玻、第九天 我趴在偏房一處隱蔽的房頂上張望丛忆。 院中可真熱鬧,春花似錦仍秤、人聲如沸熄诡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)凰浮。三九已至,卻和暖如春苇本,著一層夾襖步出監(jiān)牢的瞬間袜茧,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工瓣窄, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留笛厦,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,099評(píng)論 3 378
  • 正文 我出身青樓康栈,卻偏偏與公主長(zhǎng)得像递递,于是被迫代替她去往敵國(guó)和親喷橙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,691評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容