0. 實際問題
假設(shè)我們擁有2個celery的task對象my_taskA
和my_taskB
,具體的兩個函數(shù)邏輯如下:
@celery_app.task()
def my_taskA(a, b, c):
print("doing my_taskA here...")
time.sleep(0.01)
return a + b + c
@celery_app.task()
def my_taskB(x, y):
print("doing my_taskB here...")
time.sleep(10)
return x - y
默認(rèn)來說灌侣,這兩個任務(wù)都會在celery的默認(rèn)隊列(queue)‘default’中運行侧啼,那么存在這么一個問題愕秫,
如果同時添加兩類任務(wù)中戴甩,消費的任務(wù)對象my_taskA
和my_taskB
并不會很快消耗完,那么很可能出現(xiàn)
一個現(xiàn)象协饲,那就是很快這個隊列中的任務(wù)都是my_taskB
的任務(wù)了缴川。
1. 如何處理?
一般來說而线,我們肯定想到的是恋日,對于每個任務(wù)給它分配一個單獨的broker岂膳,這樣不久不會影響了,各自維護
各自的東西筷屡,事實上很多人也是這么做的。但這樣燎潮,增加了維護的成本扼倘。試想一下唉锌,如果有20種任務(wù)竿奏,我們
難道要開辟20個broker去指定不同的任務(wù)嗎?
但實際上泛啸,在閱讀完celery的官方教程后绿语,發(fā)現(xiàn)一個路由和交
換的概念『蛑罚可以在一個broker上面開辟多個隊列吕粹,每個隊列綁定指定類型的任務(wù),而對應(yīng)的worker通過指定
的隊列獲取任務(wù)岗仑。這個概念就類系我們的雙頻路由器一樣匹耕,2.4G的設(shè)備你們走常規(guī)的通道,5G的設(shè)備我有獨享
的高速5G通道荠雕。完美稳其!
2. 如何使用炸卑?
看到這里既鞠,接下來肯定是準(zhǔn)備嘗試了。但是說句實話盖文,像我這樣的白癡級別的任務(wù)嘱蛋,很難通過官方樣例看懂,
所以我查詢了一部分資料五续,然后實現(xiàn)了個demo洒敏。
先上目錄結(jié)構(gòu):
untitled1
├── app.py
├── celeryconfig.py
└── send_task.py
做個簡單說明:app.py
實現(xiàn)具體的worker邏輯,send_task
負責(zé)分發(fā)任務(wù)到隊列中返帕,
celeryconfig.py
是基礎(chǔ)配置桐玻。
- app.py實現(xiàn)
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
import time
from celery import Celery
from celery.signals import task_postrun
celery_app = Celery('test_work', broker='amqp://sangfor:test@localhost/producer-vhost')
celery_app.config_from_object('celeryconfig')
@celery_app.task()
def my_taskA(a, b, c):
print("doing my_taskA here...")
time.sleep(0.01)
return a + b + c
@celery_app.task()
def my_taskB(x, y):
print("doing my_taskB here...")
time.sleep(10)
return x - y
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
"""任務(wù)完成的信號處理函數(shù)"""
print ''' Done!
sender: %s
task_id: %s
task: %s
retval: %s
state: %s
args:%s
kwargs:%s
kwds:%s''' % (sender, task_id, task, retval, state, args, kwargs, kwds,)
- send_task.py的實現(xiàn)
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
import time
from celery import Celery
celery_app = Celery('send_work', broker='amqp://sangfor:test@localhost/producer-vhost')
# celery_app.config_from_object('celeryconfig')
celery_app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_ROUTES={
'app.my_taskA': {'queue': 'for_task_A'},
'app.my_taskB': {'queue': 'for_task_B'},
},
CELERY_QUEUES={
"for_task_A": {
"exchange": "for_task_A"
},
"for_task_B": {
"exchange": "for_task_B"
}
}
)
def run():
while True:
print 'Now send task A'
celery_app.send_task('app.my_taskA', (1, 2, 3,))
print 'Now send task B'
celery_app.send_task('app.my_taskB', (9, 8))
time.sleep(0.01)
if __name__ == '__main__':
run()
- celeryconfig.py的實現(xiàn)
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
CELERY_IMPORTS = ['app']
CELERY_ROUTES = {
'app.my_taskA': {'queue': 'for_task_A'},
'app.my_taskB': {'queue': 'for_task_B'},
}
CELERY_QUEUES = {
"for_task_A": {
"exchange": "for_task_A"
},
"for_task_B": {
"exchange": "for_task_B"
}
}
3.如何執(zhí)行
celery -A app worker -n workerA -Q for_task_A
celery -A app worker -n workerA -Q for_task_B
-
python send_task.py
注意,這里的-Q
是精髓荆萤,為了指定執(zhí)行特定通道中的任務(wù)镊靴。這個指令干啥的铣卡,自己--help吧。
運行一下試試吧偏竟,其他的不解釋了煮落。
4.幾點知識補充
1.Exchange
有幾類type:
(1) direct(default)
:
direct類型的Exchange路由規(guī)則也很簡單,它會把消息路由到那些binding key與routing
key完全匹配的Queue中踊谋。
例子:
CELERY_QUEUES = (
Queue('for_adds',Exchange('for_adds',type='direct'), routing_key='adds'),
Queue('for_send_emails', Exchange('for_adds',type='direct'), routing_key='email'),
Queue('add', Exchange('for_adds',type='direct'), routing_key='add'),
)
CELERY_ROUTES = {
'celery_test.tasks.add': {'exchange':'for_adds','routing_key':'add'},
'celery_test.tasks.send_mail': {'exchange':'for_adds','routing_key':'email'},
'celery_test.tasks.adds': {'exchange':'for_adds','routing_key':'add'},
}
(2)topic
:
topic類型的Exchange在匹配規(guī)則上進行了擴展蝉仇,它與direct類型的Exchage相似,也是將消息路由到
binding key與routing key相匹配的Queue中殖蚕〗蜗危可以看出和上面那個區(qū)別的地方,這里面不
是強匹配睦疫。它引入了兩個通配符#
和*
前者匹配多個單詞(可以為0)害驹,后者匹配一個單詞。
例子:
CELERY_QUEUES = (
Queue('for_adds',Exchange('for_adds',type='topic'), routing_key='*.task.*'),
Queue('for_send_emails', Exchange('for_adds',type='topic'), routing_key='*.*.email'),
Queue('add', Exchange('for_adds',type='topic'), routing_key='*.add'),
)
CELERY_ROUTES = {
'celery_test.tasks.add': {'exchange':'for_adds','routing_key':'q.task.email'},
'celery_test.tasks.send_mail': {'exchange':'for_adds','routing_key':'a.task.e'},
'celery_test.tasks.adds': {'exchange':'for_adds','routing_key':'b.add'},
}
(3)fanout
:
fanout類型就是傳說中的廣播形式蛤育,它沒有參數(shù)綁定宛官,就是不需要指定上面的routing_key
之類的東西,
只要和該交換綁定的queue瓦糕,統(tǒng)統(tǒng)發(fā)送出去底洗。類似于通過交換口,就廣播發(fā)出咕娄。
例子:
CELERY_QUEUES = (
Queue('for_adds',Exchange('for_adds',type='fanout')),
Queue('for_send_emails', Exchange('for_adds',type='fanout')),
Queue('add', Exchange('for_adds',type='fanout')),
)
CELERY_ROUTES = {
'celery_test.tasks.add': {'exchange':'for_adds'},
'celery_test.tasks.send_mail': {'exchange':'for_adds',},
'celery_test.tasks.adds': {'exchange':'for_adds',},
}
(4)headers
:
headers類型中亥揖,通過一個參數(shù)表(包含header和values(可選)),隊列被綁定到交換
還有一個名為“x-match”的特殊參數(shù)確定兩者之間的匹配算法谭胚,可以使用兩個關(guān)鍵字進行表示徐块。
其中“all”意味著一個AND(所有對必須匹配),“any”意味著OR(至少一對必須匹配)灾而。
例子(google沒找到合適的例子胡控,自己看資料寫了個,不一定對旁趟,有知道的歡迎指正):
CELERY_QUEUES = (
Queue('for_adds',Exchange('for_adds',type='header', arguments = {'ham': 'good', 'x-match':'any'})),
)