異步任務(wù)
異步任務(wù)是web開發(fā)中一個(gè)很常見的方法。對(duì)于一些耗時(shí)耗資源的操作涕俗,往往從主應(yīng)用中隔離罗丰,通過異步的方式執(zhí)行。簡(jiǎn)而言之再姑,做一個(gè)注冊(cè)的功能萌抵,在用戶使用郵箱注冊(cè)成功之后,需要給該郵箱發(fā)送一封激活郵件询刹。如果直接放在應(yīng)用中谜嫉,則調(diào)用發(fā)郵件的過程會(huì)遇到網(wǎng)絡(luò)IO的阻塞,比好優(yōu)雅的方式則是使用異步任務(wù)凹联,應(yīng)用在業(yè)務(wù)邏輯中觸發(fā)一個(gè)異步任務(wù)沐兰。
實(shí)現(xiàn)異步任務(wù)的工具有很多,其原理都是使用一個(gè)任務(wù)隊(duì)列蔽挠,比如使用redis生產(chǎn)消費(fèi)模型或者發(fā)布訂閱模式實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列住闯。
除了redis,還可以使用另外一個(gè)神器---Celery澳淑。Celery是一個(gè)異步任務(wù)的調(diào)度工具比原。它是Python寫的庫,但是它實(shí)現(xiàn)的通訊協(xié)議也可以使用ruby杠巡,php量窘,javascript等調(diào)用。異步任務(wù)除了消息隊(duì)列的后臺(tái)執(zhí)行的方式氢拥,還是一種則是跟進(jìn)時(shí)間的計(jì)劃任務(wù)蚌铜。下面將會(huì)介紹如何使用celery實(shí)現(xiàn)這兩種需求锨侯。
Celry broker 和 backend
最早學(xué)習(xí)celery的時(shí)候,冒出了一個(gè)rabbitmq冬殃,又冒出一個(gè)redis囚痴。當(dāng)時(shí)一頭霧水。實(shí)際上這正是celery的設(shè)計(jì)奧妙审葬。簡(jiǎn)單來說深滚,rabbitmq是一個(gè)采用Erlang寫的強(qiáng)大的消息隊(duì)列工具。在celery中可以扮演broker的角色涣觉。那么什么是broker痴荐?
broker是一個(gè)消息傳輸?shù)闹虚g件,可以理解為一個(gè)郵箱旨枯。每當(dāng)應(yīng)用程序調(diào)用celery的異步任務(wù)的時(shí)候蹬昌,會(huì)向broker傳遞消息,而后celery的worker將會(huì)取到消息攀隔,進(jìn)行對(duì)于的程序執(zhí)行皂贩。好吧,這個(gè)郵箱可以看成是一個(gè)消息隊(duì)列昆汹。那么什么又是backend明刷,通常程序發(fā)送的消息,發(fā)完就完了满粗,可能都不知道對(duì)方時(shí)候接受了辈末。為此,celery實(shí)現(xiàn)了一個(gè)backend映皆,用于存儲(chǔ)這些消息以及celery執(zhí)行的一些消息和結(jié)果挤聘。對(duì)于 brokers,官方推薦是rabbitmq和redis捅彻,至于backend组去,就是數(shù)據(jù)庫啦。為了簡(jiǎn)單起見步淹,我們都用redis从隆。
Getting Starting
使用celery包含三個(gè)方面,其一是定義任務(wù)函數(shù)缭裆,其二是運(yùn)行celery服務(wù)键闺,最后是客戶應(yīng)用程序的調(diào)用。
創(chuàng)建一個(gè)文件 tasks.py
輸入下列代碼:
from celery import Celery
brokers = 'redis://127.0.0.1:6379/5'
backend = 'redis://127.0.0.1:6379/6'
app = Celery('tasks', broker=broker, backend=backend)
@app.task
def add(x, y):
return x + y
上述代碼導(dǎo)入了celery澈驼,然后創(chuàng)建了celery實(shí)例app辛燥,實(shí)力話的過程中,指定了任務(wù)名tasks
(和文件名一致),傳入了broker和backend挎塌。然后創(chuàng)建了一個(gè)任務(wù)函數(shù)add
畅铭。
下面就啟動(dòng)celery服務(wù)
在當(dāng)前命令行終端運(yùn)行:
celery -A tasks worker --loglevel=info
此時(shí)會(huì)看見一對(duì)輸出。包括注冊(cè)的任務(wù)啦勃蜘。
下面客戶端程序如何調(diào)用呢?打開一個(gè)命令行假残,進(jìn)入Python環(huán)境
In [0]:from tasks import add
In [1]: r = add.delay(2, 2)
In [2]: add.delay(2, 2)
Out[2]: <AsyncResult: 6fdb0629-4beb-4eb7-be47-f22be1395e1d>
In [3]: r = add.delay(3, 3)
In [4]: r.re
r.ready r.result r.revoke
In [4]: r.ready()
Out[4]: True
In [6]: r.result
Out[6]: 6
In [7]: r.get()
Out[7]: 6
在celery命令行可以看見celery執(zhí)行的日志:
[2015-09-20 21:37:06,086: INFO/MainProcess] Task proj.tasks.add[76beb980-0f55-4629-a4fb-4a1776428ea8] succeeded in 0.00089102005586s: 6
打開 backend的redis缭贡,也可以看見celery執(zhí)行的信息。
現(xiàn)在時(shí)在python環(huán)境中調(diào)用的add函數(shù)辉懒,實(shí)際上通常在應(yīng)用程序中調(diào)用這個(gè)方法阳惹。需要注意,如果把返回值賦值給一個(gè)變量眶俩,那么原來的應(yīng)用程序也會(huì)被阻塞莹汤,需要等待異步任務(wù)返回的結(jié)果。因此颠印,實(shí)際使用中纲岭,不需要把結(jié)果賦值。
計(jì)劃任務(wù)
上述的使用是簡(jiǎn)單的配置线罕,下面介紹一個(gè)更健壯的方式來使用celery止潮。首先創(chuàng)建一個(gè)python包,celery服務(wù)钞楼,姑且命名為proj喇闸。目錄文件如下:
? proj tree
.
├── __init__.py
├── celery.py # 創(chuàng)建 celery 實(shí)例
├── config.py # 配置文件
└── tasks.py # 任務(wù)函數(shù)
首先是 celery.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from celery import Celery
app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.config')
if __name__ == '__main__':
app.start()
這一次創(chuàng)建 app,并沒有直接指定 broker 和 backend询件。而是在配置文件中燃乍。
config.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
剩下的就是tasks.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
使用方法也很簡(jiǎn)單,在proj的同一級(jí)目錄執(zhí)行celery:
celery -A proj worker -l info
現(xiàn)在使用任務(wù)也很簡(jiǎn)單宛琅,直接在客戶端代碼調(diào)用 proj.tasks 里的函數(shù)即可刻蟹。
Scheduler
一種常見的需求是每隔一段時(shí)間執(zhí)行一個(gè)任務(wù)。配置如下
config.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
CELERY_TIMEZONE = 'Asia/Shanghai'
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'proj.tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
}
注意配置文件需要指定時(shí)區(qū)夯秃。這段代碼表示每隔30秒執(zhí)行 add 函數(shù)座咆。
一旦使用了 scheduler, 啟動(dòng) celery需要加上-B 參數(shù)
celery -A proj worker -B -l info
crontab
計(jì)劃任務(wù)當(dāng)然也可以用crontab實(shí)現(xiàn),celery也有crontab模式仓洼。修改 config.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from __future__ import absolute_import
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/5'
BROKER_URL = 'redis://127.0.0.1:6379/6'
CELERY_TIMEZONE = 'Asia/Shanghai'
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
總而言之介陶,scheduler的切分度更細(xì),可以精確到秒色建。crontab模式就不用說了哺呜。當(dāng)然celery還有更高級(jí)的用法,比如多個(gè)機(jī)器使用箕戳,啟用多個(gè)worker并發(fā)處理等某残。