本例將使用django+celery+redis演示如何配置celery的任務隊列以及如何使用他們
django==3.0.4
celery==4.4.0
redis==3.3.10
大家都知道redis不像rabbitmq支持任務的優(yōu)先級,但可以使用不同的隊列來區(qū)分不同的任務類型律胀,比如優(yōu)先級較高且耗時較短的任務都放到queue1种樱,而耗時較長的可以放到queue2忿危。這樣不同類型的任務就不再互相干擾了若专,這不是個完美的方案从藤,但可以滿足大部分場景了比规。
配置
方法1
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = {
"default": {
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
"queue0": {
"routing_key": "P0",
"exchange": "P0",
"exchange_type": "P0",
}
}
方法2
from kombu import Queue,Exchange
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('P0', Exchange('P0'), routing_key='P0'),
)
其中路由鍵支持正則匹配吱窝,比如routing_key="WEB.” 則以WEB開頭的路有都歸到default隊列.
其它參數(shù)自行查閱吧
調(diào)用
@app.task(name='DEMO_P0', queue='P0')
def task_p0():
print('this is p0')
return 0
注:也可以在調(diào)用時指定隊列
r = task1.apply_async(args=(1, 2), queue='queue1', routing_key='queue1')
亦或使用CELERY_ROUTES配置路由
啟動
celery -A core worker -E -l info
celery -A core worker -E -l info -Q P0
這里第一行是消費默認的default隊列讥邻,第二行是單獨消費P0隊列
注:這里啟動時我沒有指定-P eventlet,因為測試時發(fā)現(xiàn)會提示django.db.utils.DatabaseError: DatabaseWrapper objects created in a thread can only be used in that same thread.
網(wǎng)上有幾個解法如版本問題、關(guān)閉線程數(shù)據(jù)庫連接等都不太好使癣诱,也沒再深入研究计维。如果哪位知曉也煩請告知,感謝撕予。