Celery的架構(gòu)
- Celery包含如下組件:
- Celery Beat:任務(wù)調(diào)度器,Beat進(jìn)程會(huì)讀取配置文件的內(nèi)容募寨,周期性地將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊(duì)列(一般用于定時(shí)任務(wù)使用)智绸。
- Celery Worker:執(zhí)行任務(wù)的消費(fèi)者澈蝙,通常會(huì)在多臺(tái)服務(wù)器運(yùn)行多個(gè)消費(fèi)者來(lái)提高執(zhí)行效率。
- Broker:消息代理寄疏,或者叫作消息中間件是牢,接受任務(wù)生產(chǎn)者發(fā)送過(guò)來(lái)的任務(wù)消息,存進(jìn)隊(duì)列再按序分發(fā)給任務(wù)消費(fèi)方(本方案使用redis)陕截。
- Producer:調(diào)用了Celery提供的API驳棱、函數(shù)或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊(duì)列處理的都是任務(wù)生產(chǎn)者。
- Result Backend:任務(wù)處理完后保存狀態(tài)信息和結(jié)果农曲,以供查詢社搅。(本方案使用redis來(lái)存儲(chǔ)結(jié)果)
Celery的架構(gòu)圖如圖所示驻债。
根據(jù)celery架構(gòu)圖將分布式設(shè)計(jì)方案如下。
- 任務(wù)發(fā)布者:產(chǎn)品在后臺(tái)添加股票完成之后形葬,點(diǎn)擊回測(cè)合呐,然后分發(fā)任務(wù)都不同的機(jī)器。
- 任務(wù)調(diào)度:按照設(shè)定的時(shí)間調(diào)用delay方法笙以。
- 消息代理:使用redis來(lái)存儲(chǔ)股票代碼淌实。每次執(zhí)行的股票代碼都是從redis讀取。
- 任務(wù)消費(fèi)者:在不同的機(jī)器上開(kāi)啟worker猖腕,執(zhí)行調(diào)度的股票進(jìn)行回測(cè)拆祈。
- 回測(cè)結(jié)果:存放在redis中,然后讀取出渲染在后臺(tái)上展示出來(lái)倘感。
需要用到
from kombu import Queue
from flask import Flask
部分代碼如下:
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_QUEUES = ( # 定義任務(wù)隊(duì)列
Queue("default", routing_key="distributed.#"),
Queue("tasks_A", routing_key="A.#"),
Queue("tasks_B", routing_key="B.#"),
)
CELERY_ROUTES = (
[
("web_management.web.trade.distributed.add", {"queue": "default"}),
("web_management.web.trade.distributed.taskA", {"queue": "tasks_A"}),
("web_management.web.trade.distributed.taskB", {"queue": "tasks_B"}),
],
)
CELERY_RESULT_SERIALIZER = "json"
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://' + ppt.redis_ip + ':' + ppt.redis_port + '/1'
app.config['CELERY_QUEUES'] = CELERY_QUEUES
app.config['CELERY_TIMEZONE'] = CELERY_TIMEZONE
app.config['CELERY_ROUTES'] = CELERY_ROUTES
app.config['CELERY_RESULT_SERIALIZER'] = CELERY_RESULT_SERIALIZER
app.config['CELERY_TASK_RESULT_EXPIRES'] = CELERY_TASK_RESULT_EXPIRES
celery = Celery('distributed', backend=app.config['CELERY_RESULT_BACKEND'], broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
開(kāi)啟work的方式
celery worker -A web_management.web.trade.distributed.celery -Q tasks_A --concurrency=30 -l info -Q后面為方法對(duì)應(yīng)的隊(duì)列名稱 --concurrency= 后面 為開(kāi)啟的worker數(shù)目放坏,也可以使用 -c= 具體開(kāi)啟是數(shù)目根據(jù)你的電腦CPU個(gè)數(shù)確定,小于等于cpu個(gè)數(shù)即可
備注
- celery 使用4.1.1版本
- kombu 使用4.2.0版本
*先安裝kombu,然后安裝celery- 可以解決如圖問(wèn)題