為什么要使用celery
Celery是一個使用Python開發(fā)的分布式任務(wù)調(diào)度模塊,因此對于大量使用Python構(gòu)建的系統(tǒng),可以說是無縫銜接误辑,使用起來很方便旋炒。Celery專注于實時處理任務(wù),同時也支持任務(wù)的定時調(diào)度脆淹。因此適合實時異步任務(wù)定時任務(wù)等調(diào)度場景智润。Celery需要依靠RabbitMQ等作為消息代理,同時也支持Redis甚至是Mysql未辆,Mongo等窟绷,當然,官方默認推薦的是RabbitMQ咐柜。
broker的選擇
雖然官方支持的broker有很多兼蜈,包括RabbitMQ攘残,Redis甚至是數(shù)據(jù)庫,但是不推薦使用數(shù)據(jù)庫为狸,因為數(shù)據(jù)庫需要不斷訪問磁盤歼郭,當你的任務(wù)量大了之后會造成很嚴重的性能問題,同時你的應(yīng)用很可能也在使用同一個數(shù)據(jù)庫辐棒,這樣可能導(dǎo)致你的應(yīng)用被拖垮病曾。如果業(yè)務(wù)環(huán)境比較簡單可以選擇Redis,如果比較復(fù)雜選擇RabbitMQ漾根,因為RabbitMQ是官方推薦的泰涂,但是比Redis操作起來又相對復(fù)雜些。我的選擇是broker用RabbitMQ辐怕,backend用Redis
celery不能用root用戶啟動問題 C_FORCE_ROOT environment
如果使用root用戶啟動celery會遇到下面的問題
Running a worker with superuser privileges when the
worker accepts messages serialized with pickle is a very bad idea!
If you really want to continue then you have to set the C_FORCE_ROOT
environment variable (but please think about this before you do).
解決辦法:
from celery import Celery, platforms
platforms.C_FORCE_ROOT = True #加上這一行
任務(wù)重復(fù)執(zhí)行
celery執(zhí)行定時任務(wù)的時候遇到了重復(fù)執(zhí)行的問題逼蒙,當時是用redis做broker和backend。
官方文檔中有相關(guān)描述寄疏。
If a task is not acknowledged within the Visibility Timeout the task will
be redelivered to another worker and executed.
This causes problems with ETA/countdown/retry tasks where the time to execute exceeds the visibility timeout; in fact if that happens it will be executed again, and again in a loop.
So you have to increase the visibility timeout to match the time of the longest ETA you are planning to use.
Note that Celery will redeliver messages at worker shutdown, so having a long visibility timeout will only delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers.
Periodic tasks will not be affected by the visibility timeout, as this is a concept separate from ETA/countdown.
You can increase this timeout by configuring a transport option with the same name:
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}
The value must be an int describing the number of seconds.
就是說當我們設(shè)置一個ETA時間比visibility_timeout長的任務(wù)時是牢,每過一次 visibility_timeout 時間,celery就會認為這個任務(wù)沒被worker執(zhí)行成功陕截,重新分配給其它worker再執(zhí)行驳棱。
解決辦法就是把 visibility_timeout參數(shù)調(diào)大,比我們ETA的時間差要大农曲。celery本身的定位就主要是實時的異步隊列社搅,對于這種長時間定時執(zhí)行,支持不太好朋蔫。
但是第二天依然重復(fù)執(zhí)行了罚渐。。驯妄。
最后我的解決方法是在每次定時任務(wù)執(zhí)行完就在redis中寫入一個唯一的key對應(yīng)一個時間戳荷并,當下次任務(wù)執(zhí)行前去獲取redis中的這個key對應(yīng)的value值,和當前的時間做比較青扔,當滿足我們的定時頻率要求時才執(zhí)行源织,這樣保證了同一個任務(wù)在規(guī)定的時間內(nèi)只會執(zhí)行一次。
使用不同的queue
當你有很多任務(wù)需要執(zhí)行的時候微猖,不要偷懶只使用默認的queue谈息,這樣會相互影響,并且拖慢任務(wù)執(zhí)行的凛剥,導(dǎo)致重要的任務(wù)不能被快速的執(zhí)行侠仇。雞蛋不能放在同一個籃子里的道理大家都懂。
有一種簡單的方式設(shè)置queue
Automatic routing
The simplest way to do routing is to use the CELERY_CREATE_MISSING_QUEUES setting (on by default).
With this setting on, a named queue that is not already defined in CELERY_QUEUES will be created automatically. This makes it easy to perform simple routing tasks.
Say you have two servers, x, and y that handles regular tasks, and one server z, that only handles feed related tasks. You can use this configuration:
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
With this route enabled import feed tasks will be routed to the “feeds” queue, while all other tasks will be routed to the default queue (named “celery” for historical reasons).
Now you can start server z to only process the feeds queue like this:
user@z:/$ celery -A proj worker -Q feeds
You can specify as many queues as you want, so you can make this server process the default queue as well:
user@z:/$ celery -A proj worker -Q feeds,celery
直接使用
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
user@z:/$ celery -A proj worker -Q feeds,celery
指定routes,就會自動生成對應(yīng)的queue,然后使用-Q指定queue啟動celery就可以逻炊,默認的queue名字是celery互亮。可以看官方文檔對默認queue的名字進行修改余素。
啟動多個workers執(zhí)行不同的任務(wù)
在同一臺機器上豹休,對于優(yōu)先級不同的任務(wù)最好啟動不同的worker去執(zhí)行,比如把實時任務(wù)和定時任務(wù)分開桨吊,把執(zhí)行頻率高的任務(wù)和執(zhí)行頻率低的任務(wù)分開威根,這樣有利于保證高優(yōu)先級的任務(wù)可以得到更多的系統(tǒng)資源,同時高頻率的實時任務(wù)日志比較多也會影響實時任務(wù)的日志查看视乐,分開就可以記錄到不同的日志文件洛搀,方便查看。
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2.%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3.%h
可以像這樣啟動不同的worker炊林,%h可以指定hostname姥卢,詳細說明可以查看官方文檔
高優(yōu)先級的任務(wù)可以分配更多的concurrency卷要,但是并不是worker并法數(shù)越多越好渣聚,保證任務(wù)不堆積就好。
是否需要關(guān)注任務(wù)執(zhí)行狀態(tài)
這個要視具體的業(yè)務(wù)場景來看僧叉,如果對結(jié)果不關(guān)心奕枝,或者任務(wù)的執(zhí)行本身會對數(shù)據(jù)產(chǎn)生影響,通過對數(shù)據(jù)的判斷可以知道執(zhí)行的結(jié)果那就不需要返回celery任務(wù)的退出狀態(tài)瓶堕,可以設(shè)置
CELERY_IGNORE_RESULT = True
或者
@app.task(ignore_result=True)
def mytask(…):
something()
但是隘道,如果業(yè)務(wù)需要根據(jù)任務(wù)執(zhí)行的狀態(tài)進行響應(yīng)的處理就不要這樣設(shè)置。
內(nèi)存泄漏
長時間運行Celery有可能發(fā)生內(nèi)存泄露郎笆,可以像下面這樣設(shè)置
CELERYD_MAX_TASKS_PER_CHILD = 40 # 每個worker執(zhí)行了多少任務(wù)就會死掉