celery路由和交換的相關(guān)知識

0. 實際問題

假設(shè)我們擁有2個celery的task對象my_taskAmy_taskB,具體的兩個函數(shù)邏輯如下:

@celery_app.task()
def my_taskA(a, b, c):
    print("doing my_taskA here...")
    time.sleep(0.01)
    return a + b + c


@celery_app.task()
def my_taskB(x, y):
    print("doing my_taskB here...")
    time.sleep(10)
    return x - y

默認(rèn)來說灌侣,這兩個任務(wù)都會在celery的默認(rèn)隊列(queue)‘default’中運行侧啼,那么存在這么一個問題愕秫,
如果同時添加兩類任務(wù)中戴甩,消費的任務(wù)對象my_taskAmy_taskB并不會很快消耗完,那么很可能出現(xiàn)
一個現(xiàn)象协饲,那就是很快這個隊列中的任務(wù)都是my_taskB的任務(wù)了缴川。

1. 如何處理?

一般來說而线,我們肯定想到的是恋日,對于每個任務(wù)給它分配一個單獨的broker岂膳,這樣不久不會影響了,各自維護
各自的東西筷屡,事實上很多人也是這么做的。但這樣燎潮,增加了維護的成本扼倘。試想一下唉锌,如果有20種任務(wù)竿奏,我們
難道要開辟20個broker去指定不同的任務(wù)嗎?

但實際上泛啸,在閱讀完celery的官方教程后绿语,發(fā)現(xiàn)一個路由和交
換的概念『蛑罚可以在一個broker上面開辟多個隊列吕粹,每個隊列綁定指定類型的任務(wù),而對應(yīng)的worker通過指定
的隊列獲取任務(wù)岗仑。這個概念就類系我們的雙頻路由器一樣匹耕,2.4G的設(shè)備你們走常規(guī)的通道,5G的設(shè)備我有獨享
的高速5G通道荠雕。完美稳其!

2. 如何使用炸卑?

看到這里既鞠,接下來肯定是準(zhǔn)備嘗試了。但是說句實話盖文,像我這樣的白癡級別的任務(wù)嘱蛋,很難通過官方樣例看懂,
所以我查詢了一部分資料五续,然后實現(xiàn)了個demo洒敏。
先上目錄結(jié)構(gòu):

untitled1
├── app.py
├── celeryconfig.py
└── send_task.py

做個簡單說明:app.py實現(xiàn)具體的worker邏輯,send_task負責(zé)分發(fā)任務(wù)到隊列中返帕,
celeryconfig.py是基礎(chǔ)配置桐玻。

  • app.py實現(xiàn)
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

import time

from celery import Celery
from celery.signals import task_postrun

celery_app = Celery('test_work', broker='amqp://sangfor:test@localhost/producer-vhost')
celery_app.config_from_object('celeryconfig')


@celery_app.task()
def my_taskA(a, b, c):
    print("doing my_taskA here...")
    time.sleep(0.01)
    return a + b + c


@celery_app.task()
def my_taskB(x, y):
    print("doing my_taskB here...")
    time.sleep(10)
    return x - y


@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    """任務(wù)完成的信號處理函數(shù)"""
    print '''   Done!
    sender: %s
    task_id: %s
    task: %s
    retval: %s
    state: %s
    args:%s
    kwargs:%s
    kwds:%s''' % (sender, task_id, task, retval, state, args, kwargs, kwds,)
  • send_task.py的實現(xiàn)
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

import time
from celery import Celery

celery_app = Celery('send_work', broker='amqp://sangfor:test@localhost/producer-vhost')
# celery_app.config_from_object('celeryconfig')

celery_app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ENABLE_UTC=True,
    CELERY_ROUTES={
        'app.my_taskA': {'queue': 'for_task_A'},
        'app.my_taskB': {'queue': 'for_task_B'},
    },
    CELERY_QUEUES={
        "for_task_A": {
            "exchange": "for_task_A"
        },
        "for_task_B": {
            "exchange": "for_task_B"
        }
    }
)


def run():
    while True:
        print 'Now send task A'
        celery_app.send_task('app.my_taskA', (1, 2, 3,))
        print 'Now send task B'
        celery_app.send_task('app.my_taskB', (9, 8))
        time.sleep(0.01)

if __name__ == '__main__':
    run()
  • celeryconfig.py的實現(xiàn)
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-

CELERY_IMPORTS = ['app']

CELERY_ROUTES = {
    'app.my_taskA': {'queue': 'for_task_A'},
    'app.my_taskB': {'queue': 'for_task_B'},
}

CELERY_QUEUES = {
    "for_task_A": {
        "exchange": "for_task_A"
    },
    "for_task_B": {
        "exchange": "for_task_B"
    }
}

3.如何執(zhí)行

  • celery -A app worker -n workerA -Q for_task_A
  • celery -A app worker -n workerA -Q for_task_B
  • python send_task.py
    注意,這里的-Q是精髓荆萤,為了指定執(zhí)行特定通道中的任務(wù)镊靴。這個指令干啥的铣卡,自己--help吧。
    運行一下試試吧偏竟,其他的不解釋了煮落。

4.幾點知識補充

1.Exchange有幾類type:
(1) direct(default):

direct類型的Exchange路由規(guī)則也很簡單,它會把消息路由到那些binding key與routing
key完全匹配的Queue中踊谋。

例子:

CELERY_QUEUES = (
    Queue('for_adds',Exchange('for_adds',type='direct'), routing_key='adds'),
    Queue('for_send_emails', Exchange('for_adds',type='direct'), routing_key='email'),
    Queue('add', Exchange('for_adds',type='direct'), routing_key='add'),
)
CELERY_ROUTES = {
    'celery_test.tasks.add': {'exchange':'for_adds','routing_key':'add'},
    'celery_test.tasks.send_mail': {'exchange':'for_adds','routing_key':'email'},
    'celery_test.tasks.adds': {'exchange':'for_adds','routing_key':'add'},
}

(2)topic:

topic類型的Exchange在匹配規(guī)則上進行了擴展蝉仇,它與direct類型的Exchage相似,也是將消息路由到
binding key與routing key相匹配的Queue中殖蚕〗蜗危可以看出和上面那個區(qū)別的地方,這里面不
是強匹配睦疫。它引入了兩個通配符#*前者匹配多個單詞(可以為0)害驹,后者匹配一個單詞。

例子:

CELERY_QUEUES = (
    Queue('for_adds',Exchange('for_adds',type='topic'), routing_key='*.task.*'),
    Queue('for_send_emails', Exchange('for_adds',type='topic'), routing_key='*.*.email'),
    Queue('add', Exchange('for_adds',type='topic'), routing_key='*.add'),
)
CELERY_ROUTES = {
    'celery_test.tasks.add': {'exchange':'for_adds','routing_key':'q.task.email'},
    'celery_test.tasks.send_mail': {'exchange':'for_adds','routing_key':'a.task.e'},
    'celery_test.tasks.adds': {'exchange':'for_adds','routing_key':'b.add'},
}

(3)fanout:

fanout類型就是傳說中的廣播形式蛤育,它沒有參數(shù)綁定宛官,就是不需要指定上面的routing_key之類的東西,
只要和該交換綁定的queue瓦糕,統(tǒng)統(tǒng)發(fā)送出去底洗。類似于通過交換口,就廣播發(fā)出咕娄。

例子:

CELERY_QUEUES = (
    Queue('for_adds',Exchange('for_adds',type='fanout')),
    Queue('for_send_emails', Exchange('for_adds',type='fanout')),
    Queue('add', Exchange('for_adds',type='fanout')),
)
CELERY_ROUTES = {
    'celery_test.tasks.add': {'exchange':'for_adds'},
    'celery_test.tasks.send_mail': {'exchange':'for_adds',},
    'celery_test.tasks.adds': {'exchange':'for_adds',},
}

(4)headers:

headers類型中亥揖,通過一個參數(shù)表(包含header和values(可選)),隊列被綁定到交換
還有一個名為“x-match”的特殊參數(shù)確定兩者之間的匹配算法谭胚,可以使用兩個關(guān)鍵字進行表示徐块。
其中“all”意味著一個AND(所有對必須匹配),“any”意味著OR(至少一對必須匹配)灾而。
例子(google沒找到合適的例子胡控,自己看資料寫了個,不一定對旁趟,有知道的歡迎指正):

CELERY_QUEUES = (
    Queue('for_adds',Exchange('for_adds',type='header', arguments = {'ham': 'good', 'x-match':'any'})),
)

5.參考文獻

  1. Using pika to create headers exchanges with RabbitMQ in python
  2. RabbitMQ Exchanges, routing keys and bindings
  3. celery 昼激、rabbitmq的exchange三種方式的實現(xiàn)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市锡搜,隨后出現(xiàn)的幾起案子橙困,更是在濱河造成了極大的恐慌,老刑警劉巖耕餐,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件凡傅,死亡現(xiàn)場離奇詭異,居然都是意外死亡肠缔,警方通過查閱死者的電腦和手機夏跷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門哼转,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人槽华,你說我怎么就攤上這事壹蔓。” “怎么了猫态?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵佣蓉,是天一觀的道長。 經(jīng)常有香客問我亲雪,道長勇凭,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任义辕,我火速辦了婚禮套像,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘终息。我一直安慰自己,他們只是感情好贞让,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布周崭。 她就那樣靜靜地躺著,像睡著了一般喳张。 火紅的嫁衣襯著肌膚如雪续镇。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天销部,我揣著相機與錄音摸航,去河邊找鬼。 笑死舅桩,一個胖子當(dāng)著我的面吹牛酱虎,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播擂涛,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼读串,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了撒妈?” 一聲冷哼從身側(cè)響起恢暖,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎狰右,沒想到半個月后杰捂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡棋蚌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年嫁佳,在試婚紗的時候發(fā)現(xiàn)自己被綠了挨队。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡脱拼,死狀恐怖瞒瘸,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情熄浓,我是刑警寧澤情臭,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站赌蔑,受9級特大地震影響俯在,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜娃惯,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一跷乐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧趾浅,春花似錦愕提、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至证膨,卻和暖如春如输,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背央勒。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工不见, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人崔步。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓稳吮,卻偏偏與公主長得像,于是被迫代替她去往敵國和親井濒。 傳聞我的和親對象是個殘疾皇子盖高,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 序言第1章 并行和分布式計算介紹第2章 異步編程第3章 Python的并行計算第4章 Celery分布式應(yīng)用第5章...
    SeanCheney閱讀 12,504評論 3 35
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)眼虱,斷路器喻奥,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化捏悬、事務(wù)撞蚕、擁塞控...
    jiangmo閱讀 10,359評論 2 34
  • 1.定義: Celery是一個異步的任務(wù)隊列(也叫做分布式任務(wù)隊列) 2.工作結(jié)構(gòu) Celery分為3個部...
    四號公園_2016閱讀 28,732評論 5 60
  • 在學(xué)習(xí)Celery之前,我先簡單的去了解了一下什么是生產(chǎn)者消費者模式过牙。 生產(chǎn)者消費者模式 在實際的軟件開發(fā)過程中甥厦,...
    c2db9ba35639閱讀 3,530評論 0 8