轉(zhuǎn)https://blog.csdn.net/kk123a/article/details/74549117
一. celery 簡(jiǎn)介
Celery 是一個(gè)專(zhuān)注于實(shí)時(shí)處理和任務(wù)調(diào)度的分布式任務(wù)隊(duì)列, 同時(shí)提供操作和維護(hù)分布式系統(tǒng)所需的工具.. 所謂任務(wù)就是消息, 消息中的有效載荷中包含要執(zhí)行任務(wù)需要的全部數(shù)據(jù).
Celery 是一個(gè)分布式隊(duì)列的管理工具, 可以用 Celery 提供的接口快速實(shí)現(xiàn)并管理一個(gè)分布式的任務(wù)隊(duì)列.
Celery 本身不是任務(wù)隊(duì)列, 是管理分布式任務(wù)隊(duì)列的工具. 它封裝了操作常見(jiàn)任務(wù)隊(duì)列的各種操作, 我們使用它可以快速進(jìn)行任務(wù)隊(duì)列的使用與管理.
Celery 特性 :
- 方便查看定時(shí)任務(wù)的執(zhí)行情況, 如 是否成功, 當(dāng)前狀態(tài), 執(zhí)行任務(wù)花費(fèi)的時(shí)間等.
- 使用功能齊備的管理后臺(tái)或命令行添加,更新,刪除任務(wù).
- 方便把任務(wù)和配置管理相關(guān)聯(lián).
- 可選 多進(jìn)程, Eventlet 和 Gevent 三種模型并發(fā)執(zhí)行.
- 提供錯(cuò)誤處理機(jī)制.
- 提供多種任務(wù)原語(yǔ), 方便實(shí)現(xiàn)任務(wù)分組,拆分,和調(diào)用鏈.
- 支持多種消息代理和存儲(chǔ)后端.
- Celery 是語(yǔ)言無(wú)關(guān)的.它提供了python 等常見(jiàn)語(yǔ)言的接口支持.
二. celery 組件
1. Celery 扮演生產(chǎn)者和消費(fèi)者的角色,
- Celery Beat : 任務(wù)調(diào)度器. Beat 進(jìn)程會(huì)讀取配置文件的內(nèi)容, 周期性的將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊(duì)列.
- Celery Worker : 執(zhí)行任務(wù)的消費(fèi)者, 通常會(huì)在多臺(tái)服務(wù)器運(yùn)行多個(gè)消費(fèi)者, 提高運(yùn)行效率.
- Broker : 消息代理, 隊(duì)列本身. 也稱(chēng)為消息中間件. 接受任務(wù)生產(chǎn)者發(fā)送過(guò)來(lái)的任務(wù)消息, 存進(jìn)隊(duì)列再按序分發(fā)給任務(wù)消費(fèi)方(通常是消息隊(duì)列或者數(shù)據(jù)庫(kù)).
- Producer : 任務(wù)生產(chǎn)者. 調(diào)用 Celery API , 函數(shù)或者裝飾器, 而產(chǎn)生任務(wù)并交給任務(wù)隊(duì)列處理的都是任務(wù)生產(chǎn)者.
- Result Backend : 任務(wù)處理完成之后保存狀態(tài)信息和結(jié)果, 以供查詢(xún).
Celery架構(gòu)圖
2. 產(chǎn)生任務(wù)的方式 :
- 發(fā)布者發(fā)布任務(wù)(WEB 應(yīng)用)
- 任務(wù)調(diào)度按期發(fā)布任務(wù)(定時(shí)任務(wù))
3. celery 依賴(lài)三個(gè)庫(kù): 這三個(gè)庫(kù), 都由 Celery 的開(kāi)發(fā)者開(kāi)發(fā)和維護(hù).
- billiard : 基于 Python2.7 的 multisuprocessing 而改進(jìn)的庫(kù), 主要用來(lái)提高性能和穩(wěn)定性.
- librabbitmp : C 語(yǔ)言實(shí)現(xiàn)的 Python 客戶(hù)端,
- kombu : Celery 自帶的用來(lái)收發(fā)消息的庫(kù), 提供了符合 Python 語(yǔ)言習(xí)慣的, 使用 AMQP 協(xié)議的高級(jí)借口.
三. 選擇消息代理
使用于生產(chǎn)環(huán)境的消息代理有 RabbitMQ 和 Redis, 官方推薦 RabbitMQ.
四. Celery 序列化
在客戶(hù)端和消費(fèi)者之間傳輸數(shù)據(jù)需要 序列化和反序列化. Celery 支出的序列化方案如下所示:
方案 | 說(shuō)明 |
---|---|
pickle | pickle 是Python 標(biāo)準(zhǔn)庫(kù)中的一個(gè)模塊, 支持 Pyuthon 內(nèi)置的數(shù)據(jù)結(jié)構(gòu), 但他是 Python 的專(zhuān)有協(xié)議. Celery 官方不推薦. |
json | json 支持多種語(yǔ)言, 可用于跨語(yǔ)言方案. |
yaml | yaml 表達(dá)能力更強(qiáng), 支持的數(shù)據(jù)類(lèi)型較 json 多, 但是 python 客戶(hù)端的性能不如 json |
msgpack | 二進(jìn)制的類(lèi) json 序列化方案, 但比 json 的數(shù)據(jù)結(jié)構(gòu)更小, 更快. |
五. 安裝,配置與簡(jiǎn)單示例
Celery 配置參數(shù)匯總
配置項(xiàng) | 說(shuō)明 |
---|---|
CELERY_DEFAULT_QUEUE | 默認(rèn)隊(duì)列 |
CELERY_BROKER_URL | Broker 地址 |
CELERY_RESULT_BACKEND | 結(jié)果存儲(chǔ)地址 |
CELERY_TASK_SERIALIZER | 任務(wù)序列化方式 |
CELERY_RESULT_SERIALIZER | 任務(wù)執(zhí)行結(jié)果序列化方式 |
CELERY_TASK_RESULT_EXPIRES | 任務(wù)過(guò)期時(shí)間 |
CELERY_ACCEPT_CONTENT | 指定任務(wù)接受的內(nèi)容類(lèi)型(序列化) |
代碼示例 :
# 安裝$ pip install celery, redis, msgpack # 配置文件 celeryconfig.py CELERY_BROKER_URL = 'redis://localhost:6379/1' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務(wù)過(guò)期時(shí)間 CELERY_ACCEPT_CONTENT = ["json"] # 指定任務(wù)接受的內(nèi)容類(lèi)型. # 初始化文件 celery.py from __future__ import absolute_import from celery import Celery app = Celery('proj', include=["proj.tasks"]) app.config_from_object("proj.celeryconfig") if __name__ == "__main__": app.start() # 任務(wù)文件 tasks.py from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y # 啟動(dòng)消費(fèi)者 $ celery -A proj worker -l info # 在終端中測(cè)試 > from proj.tasks import add > r = add.delay(2,4) > r.result 6 > r.status u"SUCCESS" > r.successful() True > r.ready() # 返回布爾值, 任務(wù)執(zhí)行完成, 返回 True, 否則返回 False. > r.wait() # 等待任務(wù)完成, 返回任務(wù)執(zhí)行結(jié)果. > r.get() # 獲取任務(wù)執(zhí)行結(jié)果 > r.result # 任務(wù)執(zhí)行結(jié)果. > r.state # PENDING, START, SUCCESS > r.status # PENDING, START, SUCCESS # 使用 AsyncResult 方式獲取執(zhí)行結(jié)果. # AsyncResult 主要用來(lái)存儲(chǔ)任務(wù)執(zhí)行信息與執(zhí)行結(jié)果(類(lèi)似 js 中的 Promise 對(duì)象), > from celery.result import AsyncResult > AsyncResult(task_id).get() 4
六. 調(diào)用任務(wù)的方法 :
1. delay
task.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
2. apply_async
delay 實(shí)際上是 apply_async 的別名, 還可以使用如下方法調(diào)用, 但是 apply_async 支持更多的參數(shù):
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
支持的參數(shù) :
-
countdown : 等待一段時(shí)間再執(zhí)行.
add.apply_async((2,3), countdown=5)
-
eta : 定義任務(wù)的開(kāi)始時(shí)間.
add.apply_async((2,3), eta=now+tiedelta(second=10))
-
expires : 設(shè)置超時(shí)時(shí)間.
add.apply_async((2,3), expires=60)
-
retry : 定時(shí)如果任務(wù)失敗后, 是否重試.
add.apply_async((2,3), retry=False)
-
retry_policy : 重試策略.
- max_retries : 最大重試次數(shù), 默認(rèn)為 3 次.
- interval_start : 重試等待的時(shí)間間隔秒數(shù), 默認(rèn)為 0 , 表示直接重試不等待.
- interval_step : 每次重試讓重試間隔增加的秒數(shù), 可以是數(shù)字或浮點(diǎn)數(shù), 默認(rèn)為 0.2
- interval_max : 重試間隔最大的秒數(shù), 即 通過(guò) interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點(diǎn)數(shù), 默認(rèn)為 0.2 .
自定義發(fā)布者,交換機(jī),路由鍵, 隊(duì)列, 優(yōu)先級(jí),序列方案和壓縮方法:
task.apply_async((2,2), compression='zlib', serialize='json', queue='priority.high', routing_key='web.add', priority=0, exchange='web_exchange')
七. 指定隊(duì)列 :
Celery 默認(rèn)使用名為 celery 的隊(duì)列 (可以通過(guò) CELERY_DEFAULT_QUEUE 修改) 來(lái)存放任務(wù). 我們可以使用 優(yōu)先級(jí)不同的隊(duì)列 來(lái)確保高優(yōu)先級(jí)的任務(wù)優(yōu)先執(zhí)行.
# 修改配置文件, 保證隊(duì)列優(yōu)先級(jí) from kombu import Queue CELERY_QUEUE = ( # 定義任務(wù)隊(duì)列. Queue('default', routing_key="task.#"), # 路由鍵 以 "task." 開(kāi)頭的消息都進(jìn)入 default 隊(duì)列. Queue('web_tasks', routing_key="web.#") # 路由鍵 以 "web." 開(kāi)頭的消息都進(jìn)入 web_tasks 隊(duì)列.) CELERY_DEFAULT_EXCHANGE = 'tasks' # 默認(rèn)的交換機(jī)名字為 tasksCELERY_DEFAULT_EXCHANGE_KEY = 'topic' # 默認(rèn)的交換機(jī)類(lèi)型為 topicCELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默認(rèn)的路由鍵是 task.default , 這個(gè)路由鍵符合上面的 default 隊(duì)列. CELERY_ROUTES = { 'proj.tasks.add': { 'queue': 'web_tasks', 'routing_key': 'web.add', }} # 使用指定隊(duì)列的方式啟動(dòng)消費(fèi)者進(jìn)程.$ celery -A proj worker -Q web_tasks -l info # 該 worker 只會(huì)執(zhí)行 web_tasks 中任務(wù), 我們可以合理安排消費(fèi)者數(shù)量, 讓 web_tasks 中任務(wù)的優(yōu)先級(jí)更高.
閱后即焚模式(transient):
from kombu import QueueQueue('transient', routing_key='transient', delivery_mode=1)
八. 使用任務(wù)調(diào)度
使用 Beat 進(jìn)程自動(dòng)生成任務(wù).
# 修改配置文件, # 下面的任務(wù)指定 tasks.add 任務(wù) 每 10s 跑一次, 任務(wù)參數(shù)為 (16,16). from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add': { 'task': 'proj.tasks.add', 'schedule': timedelta(seconds=10), 'args': (16, 16) }} # crontab 風(fēng)格 from celery.schedules import crontab CELERYBEAT_SCHEDULE = { "add": { "task": "tasks.add", "schedule": crontab(hour="*/3", minute=12), "args": (16, 16), } } # 啟動(dòng) Beat 程序$ celery beat -A proj # 之后啟動(dòng) worker 進(jìn)程.$ celery -A proj worker -l info 或者$ celery -B -A proj worker -l info
使用自定義調(diào)度類(lèi)還可以實(shí)現(xiàn)動(dòng)態(tài)添加任務(wù). 使用 Django 可以通過(guò) Django-celery 實(shí)現(xiàn)在管理后臺(tái)創(chuàng)建,刪除,更新任務(wù), 是因?yàn)樗褂昧俗远x的 調(diào)度類(lèi) djcelery.schedulers.DatabaseScheduler .
九. 任務(wù)綁定, 記錄日志, 重試
# 修改 tasks.py 文件. from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__) @app.task(bind=True)def div(self, x, y): logger.info(('Executing task id {0.id}, args: {0.args!r}' 'kwargs: {0.kwargs!r}').format(self.request)) try: result = x/y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) # 發(fā)生 ZeroDivisionError 錯(cuò)誤時(shí), 每 5s 重試一次, 最多重試 3 次. return result
當(dāng)使用 bind=True 參數(shù)之后, 函數(shù)的參數(shù)發(fā)生變化, 多出了參數(shù) self, 這這相當(dāng)于把 div 編程了一個(gè)已綁定的方法, 通過(guò) self 可以獲得任務(wù)的上下文.
十. 信號(hào)系統(tǒng) :
信號(hào)可以幫助我們了解任務(wù)執(zhí)行情況, 分析任務(wù)運(yùn)行的瓶頸. Celery 支持 7 種信號(hào)類(lèi)型.
- 任務(wù)信號(hào)
- before_task_publish : 任務(wù)發(fā)布前
- after_task_publish : 任務(wù)發(fā)布后
- task_prerun : 任務(wù)執(zhí)行前
- task_postrun : 任務(wù)執(zhí)行后
- task_retry : 任務(wù)重試時(shí)
- task_success : 任務(wù)成功時(shí)
- task_failure : 任務(wù)失敗時(shí)
- task_revoked : 任務(wù)被撤銷(xiāo)或終止時(shí)
- 應(yīng)用信號(hào)
- Worker 信號(hào)
- Beat 信號(hào)
- Eventlet 信號(hào)
- 日志信號(hào)
- 命令信號(hào)
不同的信號(hào)參數(shù)格式不同, 具體格式參見(jiàn)官方文檔
代碼示例 :
# 在執(zhí)行任務(wù) add 之后, 打印一些信息. @after_task_publishdef task_send_handler(sender=None, body=None, **kwargs): print 'after_task_publish: task_id: {body[id]}; sender: {sender}'.format(body=body, sender=sender)
十一. 子任務(wù)與工作流:
可以把任務(wù) 通過(guò)簽名的方法傳給其他任務(wù), 成為一個(gè)子任務(wù).
from celery import signaturetask = signature('task.add', args=(2,2), countdown=10)tasktask.add(2,2) # 通過(guò)簽名生成任務(wù)task.apply_async()
還可以通過(guò)如下方式生成子任務(wù) :
from proj.task import addtask = add.subtask((2,2), countdown=10) # 快捷方式 add.s((2,2), countdown-10) task.apply_async()
自任務(wù)實(shí)現(xiàn)片函數(shù)的方式非常有用, 這種方式可以讓任務(wù)在傳遞過(guò)程中財(cái)傳入?yún)?shù).
partial = add.s(2)partial.apply_async((4,))
子任務(wù)支持如下 5 種原語(yǔ),實(shí)現(xiàn)工作流. 原語(yǔ)表示由若干指令組成的, 用于完成一定功能的過(guò)程.
-
chain : 調(diào)用連, 前面的執(zhí)行結(jié)果, 作為參數(shù)傳給后面的任務(wù), 直到全部完成, 類(lèi)似管道.
from celery import chainres = chain(add.s(2,2), add.s(4), add.s(8))()res.get() 管道式: (add.s(2,2) | add.s(4) | add.s(8))().get()
-
group : 一次創(chuàng)建多個(gè)(一組)任務(wù).
from celery import group res = group(add.s(i,i) for i in range(10))()res.get()
-
chord : 等待任務(wù)全部完成時(shí)添加一個(gè)回調(diào)任務(wù).
res = chord((add.s(i,i) for i in range(10)), add.s(['a']))()res.get() # 執(zhí)行完前面的循環(huán), 把結(jié)果拼成一個(gè)列表之后, 再對(duì)這個(gè)列表 添加 'a'.[0,2,4,6,8,10,12,14,16,18,u'a']
-
map/starmap : 每個(gè)參數(shù)都作為任務(wù)的參數(shù)執(zhí)行一遍, map 的參數(shù)只有一個(gè), starmap 支持多個(gè)參數(shù).
add.starmap(zip(range(10), range(10))) 相當(dāng)于: @app.taskdef temp(): return [add(i,i) for i in range(10)]
-
chunks : 將任務(wù)分塊.
res = add.chunks(zip(range(50), range(50)),10)()res.get()
在生成任務(wù)的時(shí)候, 應(yīng)該充分利用 group/chain/chunks 這些原語(yǔ).
十二. 其他
關(guān)閉不想要的功能 :
@app.task(ignore_result=True) # 關(guān)閉任務(wù)執(zhí)行結(jié)果.def func(): pass CELERY_DISABLE_RATE_LIMITS=True # 關(guān)閉限速.
根據(jù)任務(wù)狀態(tài)執(zhí)行不同操作 :
# tasks.pyclass MyTask(Task): def on_success(self, retval, task_id, args, kwargs): print 'task done: {0}'.format(retval) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): print 'task fail, reason: {0}'.format(exc) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) # 正確函數(shù), 執(zhí)行 MyTask.on_success() :@app.task(base=MyTask)def add(x, y): return x + y # 錯(cuò)誤函數(shù), 執(zhí)行 MyTask.on_failure() : @app.task #普通函數(shù)裝飾為 celery taskdef add(x, y): raise KeyError return x + y
十三. Celery 管理命令
任務(wù)狀態(tài)回調(diào) :
參數(shù) | 說(shuō)明 |
---|---|
PENDING | 任務(wù)等待中 |
STARTED | 任務(wù)已開(kāi)始 |
SUCCESS | 任務(wù)執(zhí)行成功 |
FAILURE | 任務(wù)執(zhí)行失敗 |
RETRY | 任務(wù)將被 |
REVOKED | 任務(wù)取消 |
PROGRESS | 任務(wù)進(jìn)行中 |
普通啟動(dòng)命令 :
$ celery -A proj worker -l info
使用 daemon 方式 multi :
$ celery multi start web -A proj -l info --pidfile=/path/to/celery_%n.pid --logfile=/path/to/celery_%n.log # web 是對(duì)項(xiàng)目啟動(dòng)的標(biāo)識(shí), # %n 是對(duì)節(jié)點(diǎn)的格式化用法. %n : 只包含主機(jī)名 %h : 包含域名的主機(jī) %d : 只包含域名 %i : Prefork 類(lèi)型的進(jìn)程索引,如果是主進(jìn)程, 則為 0. %I : 帶分隔符的 Prefork 類(lèi)型的進(jìn)程索引. 假設(shè)主進(jìn)程為 worker1, 那么進(jìn)程池的第一個(gè)進(jìn)程則為 worker1-1
常用 multi 相關(guān)命令:
$ celery multi show web # 查看 web 啟動(dòng)時(shí)的命令$ celery multi names web # 獲取 web 的節(jié)點(diǎn)名字$ celery multi stop web # 停止 web 進(jìn)程$ celery multi restart web # 重啟 web$ celery multi kill web # 殺掉 web 進(jìn)程
常用監(jiān)控和管理命令 :
-
shell : 交互時(shí)環(huán)境, 內(nèi)置了 Celery 應(yīng)用實(shí)例和全部已注冊(cè)的任務(wù), 支持 默認(rèn)解釋器,IPython,BPython .
$ celery shell -A proj
-
result : 通過(guò) task_id 在命令行獲得任務(wù)執(zhí)行結(jié)果
$ celery -A proj result TASK_ID
-
inspect active : 列出當(dāng)前正在執(zhí)行的任務(wù)
$ celery -A proj inspect active
-
inspect stats : 列出 worker 的統(tǒng)計(jì)數(shù)據(jù), 常用來(lái)查看配置是否正確以及系統(tǒng)的使用情況.
$ celery -A proj inspect stats
Flower web 監(jiān)控工具
- 查看任務(wù)歷史,任務(wù)具體參數(shù),開(kāi)始時(shí)間等信息;
- 提供圖表和統(tǒng)計(jì)數(shù)據(jù)
- 實(shí)現(xiàn)全面的遠(yuǎn)程控制功能, 包括但不限于 撤銷(xiāo)/終止任務(wù), 關(guān)閉重啟 worker, 查看正在運(yùn)行任務(wù)
- 提供一個(gè) HTTP API , 方便集成.
Flower 的 supervisor 管理配置文件:
[program:flower]command=/opt/PyProjects/venv/bin/flower -A celery_worker:celery --broker="redis://localhost:6379/2" --address=0.0.0.0 --port=5555 directory=/opt/PyProjects/appautostart=trueautorestart=truestartretries=3 user=derbystdout_logfile=/var/logs/%(program_name)s.logstdout_logfile_maxbytes=50MBstdout_logfile_backups=30stderr_logfile=/var/logs/%(program_name)s-error.logstderr_logfile_maxbytes=50MBstderr_logfile_backups=3
Celery 自帶的事件監(jiān)控工具顯示任務(wù)歷史等信息.
$ celery -A proj event** 需要把 CELERY_SEND_TASK_SEND_EVENT = True 設(shè)置, 才可以獲取時(shí)間.
使用自動(dòng)擴(kuò)展 :
$ celery -A proj worker -l info --autoscale=6,3 # 平時(shí)保持 3 個(gè)進(jìn)程, 最大時(shí)可以達(dá)到 6 個(gè).
Celery 命令匯總
$ celery --help -A APP, --app APP -b BROKER, --broker BROKER --loader LOADER --config CONFIG --workdir WORKDIR --no-color, -C --quiet, -q $ celery <command> --help + Main: | celery worker| celery events| celery beat| celery shell| celery multi| celery amqp + Remote Control: | celery status | celery inspect --help| celery inspect active | celery inspect active_queues | celery inspect clock | celery inspect conf [include_defaults=False]| celery inspect memdump [n_samples=10]| celery inspect memsample | celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]| celery inspect ping | celery inspect query_task [id1 [id2 [... [idN]]]]| celery inspect registered [attr1 [attr2 [... [attrN]]]]| celery inspect report | celery inspect reserved | celery inspect revoked | celery inspect scheduled | celery inspect stats | celery control --help| celery control add_consumer <queue> [exchange [type [routing_key]]]| celery control autoscale [max [min]]| celery control cancel_consumer <queue>| celery control disable_events | celery control election | celery control enable_events | celery control heartbeat | celery control pool_grow [N=1]| celery control pool_restart | celery control pool_shrink [N=1]| celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>| celery control revoke [id1 [id2 [... [idN]]]]| celery control shutdown | celery control terminate <signal> [id1 [id2 [... [idN]]]]| celery control time_limit <task_name> <soft_secs> [hard_secs]+ Utils: | celery purge| celery list| celery call| celery result| celery migrate| celery graph| celery upgrade + Debugging: | celery report| celery logtool + Extensions: | celery flower
十四. 在 Flask 中使用 Celery
Flask 文檔: 基于 Celery 的后臺(tái)任務(wù)
在 Flask 中使用 Celery