在程序運行過程中,要執(zhí)行一個很久的任務想罕,但是我們又不想主程序被阻塞悠栓,常見的方法是多線程“醇郏可是當并發(fā)量過大時惭适,多線程也會扛不住,必須要用線程池來限制并發(fā)個數(shù)楼镐,而且多線程對共享資源的使用也是很麻煩的事情癞志。還有就是前面幾篇介紹過的協(xié)程,但是協(xié)程畢竟還是在同一線程內(nèi)執(zhí)行的框产,如果一個任務本身就要執(zhí)行很長時間凄杯,而不是因為等待IO被掛起,那其他協(xié)程照樣無法得到運行秉宿。本文要介紹一個強大的分布式任務隊列Celery戒突,它可以讓任務的執(zhí)行同主程序完全脫離,甚至不在同一臺主機內(nèi)描睦。它通過隊列來調(diào)度任務膊存,不用擔心并發(fā)量高時系統(tǒng)負載過大。它可以用來處理復雜系統(tǒng)性能問題酌摇,卻又相當靈活易用膝舅。
架構(gòu)組成:
參考引用:http://www.bjhee.com/celery.html
一個完整的Celery分布式隊列架構(gòu)應該包含一下幾個模塊:
- 消息中間人 Broker
消息中間人嗡载,就是任務調(diào)度隊列窑多,通常以獨立服務形式出現(xiàn)。它是一個生產(chǎn)者消費者模式洼滚,即主程序?qū)⑷蝿辗湃腙犃兄泄∠ⅲ笈_職程則會從隊列中取出任務并執(zhí)行。任務可以按順序調(diào)度,也可以按計劃時間調(diào)度千康。Celery組件本身并不提供隊列服務享幽,你需要集成第三方消息中間件。Celery推薦的有RabbitMQ和Redis拾弃,另外也支持MongoDB值桩、SQLAlchemy、Memcached等豪椿,但不推薦奔坟。 - 任務執(zhí)行單元 Worker,也叫職程
即執(zhí)行任務的程序搭盾,可以有多個并發(fā)咳秉。它實時監(jiān)控消息隊列,獲取隊列中調(diào)度的任務鸯隅,并執(zhí)行它澜建。 - 執(zhí)行結(jié)果存儲 Backend
由于任務的執(zhí)行同主程序分開,如果主程序想獲取任務執(zhí)行的結(jié)果蝌以,就必須通過中間件存儲炕舵。同消息中間人一樣,存儲也可以使用RabbitMQ跟畅、Redis幕侠、MongoDB、SQLAlchemy碍彭、Memcached等晤硕,建議使用帶持久化功能的存儲中間件。(另外庇忌,并非所有的任務執(zhí)行都需要保存結(jié)果舞箍,這個模塊可以不配置。)
安裝CELERY
pip install celery
pip install django=celery=3.x.x
備注:經(jīng)過搜索發(fā)現(xiàn)是因為winsows是不支持celery4的皆疹。參照的回答在這https://github.com/celery/celery/issues/3551
Framework Integration | |
---|---|
Django | django-celery |
Pyramid | pyramid_celery |
Pylons | celery-pylons |
Flask | not needed |
web2py | web2py-celery |
Tornado | tornado-celery |
為了支持redis
pip install 'celery[redis]'
help:
celery help
celery worker --help
- 然后疏橄,我們編寫任務代碼TASKS.PY
from celery import Celery
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
啟動后臺職程
職程會監(jiān)聽消息中間人隊列并等待任務調(diào)度,啟動命令為:
$ celery worker -A tasks --loglevel=info --concurrency=5
解釋:
- 參數(shù)”-A”指定了Celery實例的位置略就,更建議你指定Celery對象名稱捎迫,如”-A tasks.app”。
- 參數(shù)”loglevel”指定了日志等級表牢,也可以不加窄绒,默認為warning。
- 參數(shù)”concurrency”指定最大并發(fā)數(shù)崔兴,默認為CPU核數(shù)彰导。
輸入指令:
- 任務發(fā)送到消息中間人隊列
>>> from tasks import add
>>> add.delay(2, 5)
<AsyncResult: 4c079d93-fd5f-47f0-8b93-c77a0112eb4e>
這個”delay()”方法會將任務發(fā)送到消息中間人隊列蛔翅,并由之前啟動的后臺職程來執(zhí)行。所以這時Python控制臺上只會返回”AsyncResult”信息位谋。如果你看下之前職程的啟動窗口山析,你會看到多了條日志”Task tasks.add[4c079d93-fd5f-47f0-8b93-c77a0112eb4e] succeeded in 0.0211374238133s: 7″。說明”add”任務已經(jīng)被調(diào)度并執(zhí)行成功掏父,并且返回7笋轨。
- 配置了后臺結(jié)果存儲(backend),我們可以通過如下方法獲取任務執(zhí)行后的返回值:
>>> result=add.delay(2, 5)
>>> result.ready()
True
>>> result.get(timeout=1)
7
- 關于并發(fā)
任務的并發(fā)默認采用多進程方式赊淑,Celery也支持gevent或者eventlet協(xié)程并發(fā)翩腐。方法是在啟動職程時使用”-P”參數(shù):
celery worker -A tasks --loglevel=info -P gevent -c 100
通過”-P gevent”我們就將并發(fā)改為了gevent方式了;”-c 100″同之前介紹的”concurrency”參數(shù)膏燃,指定了并發(fā)個數(shù)茂卦。
- 關于后臺
配置了Redis存儲,那讓我們?nèi)edis里看看Celery任務執(zhí)行的結(jié)果是怎么存儲的吧组哩。通過”keys celery*”等龙,可以查到所有屬于celery的鍵值.
一條記錄詳細內(nèi)容是:
#JSon序列化存在Redis:
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 7, \"task_id\": \"4c079d93-fd5f-47f0-8b93-c77a0112eb4e\", \"children\": []}"
關于配置 : 三種方式可供選擇
- 單個參數(shù)配置
app.conf.CELERY_BROKER_URL = 'amqp://guest@localhost//'
app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
- 多個參數(shù)配置
app.conf.update(
CELERY_BROKER_URL = 'amqp://guest@localhost//',
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
)
- 從配置文件中獲取
先將配置項放入配置文件中,如”celeryconfig.py”
BROKER_URL='amqp://guest@localhost//'
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
然后導入到celery對象中:
app.config_from_object('celeryconfig')
資料參考:
https://blog.csdn.net/yeyingcai/article/details/78647553(很容易懂)
http://www.reibang.com/p/b7f843f21c46
http://www.reibang.com/p/f1f2cd1cd491(實踐例子)
https://blog.csdn.net/weixin_43688726/article/details/89242366
from future import absolute_import : 在 3.0 以前的舊版本中啟用相對導入等特性所必須的 future 語句伶贰。