Celery
在程序運(yùn)行過程中,經(jīng)常會(huì)遇到一些耗時(shí)耗資源的任務(wù)对蒲,為了避免這些任務(wù)阻塞主進(jìn)程的運(yùn)行,我們會(huì)采用多線程或異步任務(wù)去處理。比如在Web中需要對(duì)新注冊(cè)的用戶發(fā)一封激活郵件來(lái)驗(yàn)證賬戶卿拴,而發(fā)郵件本身是一個(gè)IO阻塞式任務(wù),如果把它直接放到應(yīng)用中梨与,就會(huì)阻塞主程序的運(yùn)行堕花,用戶需要等待郵件發(fā)送完成后才能進(jìn)行下一步操作。一種解決辦法是為每個(gè)發(fā)郵件任務(wù)新開一個(gè)線程去執(zhí)行粥鞋,當(dāng)線程很多時(shí)這也不是很好的解決辦法缘挽;更好的方式是在業(yè)務(wù)邏輯中觸發(fā)一個(gè)發(fā)郵件的異步任務(wù),把待發(fā)送的任務(wù)都發(fā)往任務(wù)隊(duì)列呻粹,主程序可以繼續(xù)往下運(yùn)行壕曼。
Celery是一個(gè)強(qiáng)大的分布式任務(wù)隊(duì)列,他可以讓任務(wù)的執(zhí)行完全脫離主程序等浊,甚至可以把任務(wù)分配到其他主機(jī)上運(yùn)行腮郊。我們通常使用它來(lái)實(shí)現(xiàn)異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。它的架構(gòu)組成如下圖:
可以看到筹燕, Celery 主要包含以下幾個(gè)模塊:
-
任務(wù)模塊
充當(dāng)任務(wù)生產(chǎn)者轧飞,包含異步任務(wù)和定時(shí)任務(wù)。其中庄萎,異步任務(wù)通常在業(yè)務(wù)邏輯中被觸發(fā)并發(fā)往任務(wù)隊(duì)列踪少,而定時(shí)任務(wù)由 Celery Beat 進(jìn)程周期性地將任務(wù)發(fā)往任務(wù)隊(duì)列。
-
消息中間件 Broker
Broker 糠涛,即為任務(wù)調(diào)度隊(duì)列援奢,接收任務(wù)生產(chǎn)者發(fā)來(lái)的消息,將任務(wù)存入隊(duì)列忍捡。 Celery 本身不提供隊(duì)列服務(wù)集漾,官方推薦使用 RabbitMQ 和 Redis 等。
-
任務(wù)執(zhí)行單元 Worker
充當(dāng)任務(wù)消費(fèi)者砸脊。Worker 是執(zhí)行任務(wù)的處理單元具篇,它實(shí)時(shí)監(jiān)控消息隊(duì)列,獲取隊(duì)列中調(diào)度的任務(wù)凌埂,并執(zhí)行它驱显。
-
任務(wù)結(jié)果存儲(chǔ) Backend
Backend 用于存儲(chǔ)任務(wù)的執(zhí)行結(jié)果,以供查詢。同消息中間件一樣埃疫,存儲(chǔ)也可使用 RabbitMQ, Redis 和 MongoDB 等伏恐。
異步任務(wù)
使用 Celery 實(shí)現(xiàn)異步任務(wù)主要包含三個(gè)步驟:
- 創(chuàng)建一個(gè) Celery 實(shí)例
- 啟動(dòng) Celery Worker
- 應(yīng)用程序調(diào)用異步任務(wù)
項(xiàng)目結(jié)構(gòu):
在init.py文件中創(chuàng)建一個(gè)Celery實(shí)例:
from celery import Celery
cele = Celery('demo') # 創(chuàng)建 Celery 實(shí)例
cele.config_from_object('app.celery_config') # 通過 Celery 實(shí)例加載配置模塊
Celery配置文件,celery_config.py
# encoding: utf-8
# Author: Timeashore
# celery
BROKER_URL = 'redis://localhost:6379/0' # 使用Redis作為消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務(wù)結(jié)果存在了Redis
CELERY_TIMEZONE='Asia/Shanghai' # 指定時(shí)區(qū)栓霜,默認(rèn)是 UTC
CELERY_TASK_SERIALIZER = 'pickle' # 任務(wù)序列化和反序列化使用pickle方案
CELERY_RESULT_SERIALIZER = 'json' # 讀取任務(wù)結(jié)果一般性能要求不高翠桦,所以使用了可讀性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過期時(shí)間,不建議直接寫86400胳蛮,應(yīng)該讓這樣的magic數(shù)字表述更明顯
CELERY_ACCEPT_CONTENT = ['json','pickle'] # 指定接受的內(nèi)容類型
CELERY_IMPORTS = ( # 指定導(dǎo)入的任務(wù)模塊
# 'app.task1',
# 'app.task2'
'app.email'
)
為了簡(jiǎn)單起見销凑,對(duì)于 Broker 和 Backend ,這里都使用 redis 仅炊。
在CELERY_IMPORTS中指定task任務(wù)文件斗幼。
task任務(wù)文件,email.py
import time
from app import cele
@cele.task
def havefun(args):
# do something
# ...
# 模擬執(zhí)行耗時(shí)任務(wù)
print u"開始執(zhí)行celery任務(wù)"
time.sleep(10)
print u"celery任務(wù)結(jié)束"
return args
被@cele裝飾的函數(shù)都成為可被Celery調(diào)度的任務(wù)茂洒。
在應(yīng)用程序中觸發(fā)執(zhí)行celery異步任務(wù)孟岛,view.py
from ..email import havefun
@main.route('/test_module', methods=['GET','POST'])
def test_module():
# do something
# ...
# 調(diào)用異步任務(wù)
havefun.delay([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# havefun.apply_async([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # 和delay效果一樣
return render_template('fake_celery_task.html')
啟動(dòng)程序
1,在 app 同級(jí)目錄下督勺,啟動(dòng) Celery Worker 進(jìn)程
celery -A app worker --loglevel=info
其中:
- 參數(shù) -A 指定了 Celery 實(shí)例的位置,本例是在
__init__.py
中斤贰, Celery 會(huì)自動(dòng)在該文件中尋找 Celery 對(duì)象實(shí)例智哀,當(dāng)然,我們也可以自己指定荧恍; - 參數(shù) --loglevel 指定了日志級(jí)別瓷叫,默認(rèn)為 warning ,也可以使用 -l info 來(lái)表示送巡;
在生產(chǎn)環(huán)境中摹菠,我們通常會(huì)使用 Supervisor 來(lái)控制 Celery Worker 進(jìn)程。
啟動(dòng)成功后骗爆,控制臺(tái)會(huì)顯示如下輸出:
2次氨,運(yùn)行后臺(tái)任務(wù)調(diào)用Celery任務(wù)結(jié)果:
使用 delay() 方法將任務(wù)發(fā)送到消息中間件( Broker ), Celery Worker 進(jìn)程監(jiān)控到該任務(wù)后摘投,就會(huì)進(jìn)行執(zhí)行煮寡。我們將窗口切換到 Worker 的啟動(dòng)窗口,會(huì)看到多了兩條日志:
這說(shuō)明任務(wù)已經(jīng)被調(diào)度并執(zhí)行成功犀呼。
另外幸撕,我們還可以在Python shell中使用,如果想獲取執(zhí)行后的結(jié)果外臂,可以這樣做:
>>> from app.email import havefun
>>> havefun.delay([1,2,3,4,5])
<AsyncResult: 651c5beb-983e-4df3-ae0e-aaad2a758239>
>>> result = havefun.delay([1,2,3,4,5])
>>> result.ready() # 使用 ready() 判斷任務(wù)是否執(zhí)行完畢
False
>>> result.ready()
True
>>> result.get() # 使用 get() 獲取任務(wù)結(jié)果
[1, 2, 3, 4, 5]
>>>
需要注意坐儿,如果在views.py中把函數(shù)返回值賦值給一個(gè)變量,那么主應(yīng)用程序也會(huì)被阻塞,需要等待異步任務(wù)返回的結(jié)果貌矿。因此累铅,實(shí)際使用中,不需要把結(jié)果賦值站叼。
在應(yīng)用程序的views.py中我們使用了delay()和apply_async()兩種調(diào)用后臺(tái)任務(wù)的方法娃兽,兩者有什么區(qū)別呢?
事實(shí)上尽楔,delay() 方法封裝了 apply_async()投储,delay函數(shù)如下:
def delay(self, *partial_args, **partial_kwargs):
return self.apply_async(partial_args, partial_kwargs)
apply_async()函數(shù)原型如下:
def apply_async(self, args=(), kwargs={}, **options):
try:
_apply = self._apply_async
except IndexError: # no tasks for chain, etc to find type
return
# For callbacks: extra args are prepended to the stored args.
if args or kwargs or options:
args, kwargs, options = self._merge(args, kwargs, options)
else:
args, kwargs, options = self.args, self.kwargs, self.options
return _apply(args, kwargs, **options)
apply_async() 支持更多的參數(shù),常用的參數(shù)如下:
- countdown :指定多少秒后執(zhí)行任務(wù)
havefun.apply_async(args=(1,), countdown=15) # 15 秒后執(zhí)行任務(wù)
執(zhí)行后阔馋,多了兩條日志文件:
[2018-06-05 16:27:16,592: INFO/MainProcess] Received task: app.email.havefun[d1c4ab89-4fcc-491c-bdd4-b5d0b4faab94]
eta:[2018-06-05 16:27:31.590000+08:00]
表示函數(shù)在第16 秒被加入到消息隊(duì)列玛荞,第二行eta表示程序會(huì)等到第31 秒才去執(zhí)行這個(gè)任務(wù)。
- eta (estimated time of arrival):指定任務(wù)被調(diào)度的具體時(shí)間呕寝,參數(shù)類型是 datetime
from datetime import datetime, timedelta
# 當(dāng)前 UTC 時(shí)間再加 10 秒后執(zhí)行任務(wù)
havefun.apply_async(args=(1,), eta=datetime.utcnow() + timedelta(seconds=10))
- expires :任務(wù)過期時(shí)間勋眯,參數(shù)類型可以是 int ,也可以是 datetime
havefun.apply_async(args=(1,), expires=10) # 10 秒后過期
如當(dāng)前參數(shù)設(shè)置滿足不了需求下梢,請(qǐng)查閱官方文檔客蹋。
定時(shí)任務(wù)
Celery 除了可以執(zhí)行異步任務(wù),也支持執(zhí)行周期性/定時(shí)任務(wù)( Periodic Tasks )孽江。 Celery Beat 進(jìn)程通過讀取配置文件的內(nèi)容讶坯,周期性地/定時(shí)將任務(wù)發(fā)往任務(wù)隊(duì)列。
同樣的岗屏,我們還使用上述的目錄結(jié)構(gòu)辆琅;不同的是,我們只需要在Celery配置文件 celery_config.py 文件中添加配置 CELERYBEAT_SCHEDULE 字段并在 CELERY_IMPORTS 中導(dǎo)入任務(wù)模塊就可以實(shí)現(xiàn)定時(shí)任務(wù)这刷,具體如下:
# encoding: utf-8
# Author: Timeashore
from datetime import timedelta
from celery.schedules import crontab
# celery
BROKER_URL = 'redis://localhost:6379/0' # 使用Redis作為消息代理
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務(wù)結(jié)果存在了Redis
CELERY_TIMEZONE='Asia/Shanghai' # 指定時(shí)區(qū)婉烟,默認(rèn)是 UTC
CELERY_TASK_SERIALIZER = 'pickle' # 任務(wù)序列化和反序列化使用pickle方案
CELERY_RESULT_SERIALIZER = 'json' # 讀取任務(wù)結(jié)果一般性能要求不高,所以使用了可讀性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過期時(shí)間暇屋,不建議直接寫86400似袁,應(yīng)該讓這樣的magic數(shù)字表述更明顯
CELERY_ACCEPT_CONTENT = ['json','pickle'] # 指定接受的內(nèi)容類型
CELERY_IMPORTS = ( # 指定導(dǎo)入的任務(wù)模塊
'app.task1',
'app.task2'
'app.email'
)
# schedules
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'app.task1.add',
'schedule': timedelta(seconds=30), # 每 30 秒一次
# 'schedule': timedelta(minutes=1), # 每 1 分鐘一次
# 'schedule': timedelta(hours=4), # 每 4 小時(shí)一次
'args': (5, 8) # 任務(wù)函數(shù)參數(shù)
},
'multiply-at-some-time': {
'task': 'app.task2.multiply',
'schedule': crontab(hour=9, minute=50), # 每天早上 9 點(diǎn) 50 分執(zhí)行一次
'args': (3, 7) # 任務(wù)函數(shù)參數(shù)
}
}
我們還需在 task 任務(wù) email.py 同級(jí)目錄下添加兩個(gè)任務(wù)文件 task1.py , task2.py 。
task1.py
import time
from app import cele
@cele.task
def add(x, y):
time.sleep(2)
return x + y
task2.py
import time
from app import cele
@cele.task
def multiply(x, y):
time.sleep(2)
return x * y
運(yùn)行
關(guān)閉上次的程序率碾,重新啟動(dòng) Celery Worker 進(jìn)程
celery -A app worker --loglevel=info
然后叔营,app同級(jí)目錄,啟動(dòng) Celery Beat 進(jìn)程所宰,定時(shí)將任務(wù)發(fā)送到 Broker
celery beat -A app
手動(dòng)調(diào)用 2 次 Celery 任務(wù), havefun():
查看日志文件绒尊,在此為了好展示,僅添加了 task1.py 文件并設(shè)置 1 分鐘執(zhí)行一次:
Celery 周期性任務(wù)也有多個(gè)配置項(xiàng)仔粥,如當(dāng)前設(shè)置滿足不了需求婴谱,請(qǐng)查閱官方文檔蟹但。
END!