Celery 框架學習筆記

在學習Celery之前官紫,我先簡單的去了解了一下什么是生產(chǎn)者消費者模式老充。

生產(chǎn)者消費者模式

在實際的軟件開發(fā)過程中胞谈,經(jīng)常會碰到如下場景:某個模塊負責產(chǎn)生數(shù)據(jù)薇宠,這些數(shù)據(jù)由另一個模塊來負責處理(此處的模塊是廣義的,可以是類统舀、函數(shù)匆骗、線程、進程等)誉简。產(chǎn)生數(shù)據(jù)的模塊碉就,就形象地稱為生產(chǎn)者;而處理數(shù)據(jù)的模塊闷串,就稱為消費者瓮钥。

單單抽象出生產(chǎn)者和消費者,還夠不上是生產(chǎn)者消費者模式烹吵。該模式還需要有一個緩沖區(qū)處于生產(chǎn)者和消費者之間碉熄,作為一個中介。生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)年叮,而消費者從緩沖區(qū)取出數(shù)據(jù),如下圖所示:

生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題玻募。生產(chǎn)者和消費者彼此之間不直接通訊只损,而通過消息隊列(緩沖區(qū))來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理七咧,直接扔給消息隊列跃惫,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從消息隊列里取艾栋,消息隊列就相當于一個緩沖區(qū)爆存,平衡了生產(chǎn)者和消費者的處理能力。這個消息隊列就是用來給生產(chǎn)者和消費者解耦的蝗砾。------------->這里又有一個問題先较,什么叫做解耦携冤?

解耦:假設生產(chǎn)者和消費者分別是兩個類。如果讓生產(chǎn)者直接調(diào)用消費者的某個方法闲勺,那么生產(chǎn)者對于消費者就會產(chǎn)生依賴(也就是耦合)曾棕。將來如果消費者的代碼發(fā)生變化,可能會影響到生產(chǎn)者菜循。而如果兩者都依賴于某個緩沖區(qū)翘地,兩者之間不直接依賴,耦合也就相應降低了癌幕。生產(chǎn)者直接調(diào)用消費者的某個方法衙耕,還有另一個弊端。由于函數(shù)調(diào)用是同步的(或者叫阻塞的)勺远,在消費者的方法沒有返回之前橙喘,生產(chǎn)者只好一直等在那邊。萬一消費者處理數(shù)據(jù)很慢谚中,生產(chǎn)者就會白白糟蹋大好時光渴杆。緩沖區(qū)還有另一個好處。如果制造數(shù)據(jù)的速度時快時慢宪塔,緩沖區(qū)的好處就體現(xiàn)出來了磁奖。當數(shù)據(jù)制造快的時候,消費者來不及處理某筐,未處理的數(shù)據(jù)可以暫時存在緩沖區(qū)中比搭。等生產(chǎn)者的制造速度慢下來,消費者再慢慢處理掉南誊。

因為太抽象身诺,看過網(wǎng)上的說明之后,通過我的理解抄囚,我舉了個例子:吃包子霉赡。

假如你非常喜歡吃包子(吃起來根本停不下來),今天幔托,你媽媽(生產(chǎn)者)在蒸包子穴亏,廚房有張桌子(緩沖區(qū)),你媽媽將蒸熟的包子盛在盤子(消息)里重挑,然后放到桌子上嗓化,你正在看巴西奧運會,看到蒸熟的包子放在廚房桌子上的盤子里谬哀,你就把盤子取走刺覆,一邊吃包子一邊看奧運。在這個過程中史煎,你和你媽媽使用同一個桌子放置盤子和取走盤子谦屑,這里桌子就是一個共享對象驳糯。生產(chǎn)者添加食物,消費者取走食物伦仍。桌子的好處是结窘,你媽媽不用直接把盤子給你,只是負責把包子裝在盤子里放到桌子上充蓝,如果桌子滿了隧枫,就不再放了,等待谓苟。而且生產(chǎn)者還有其他事情要做官脓,消費者吃包子比較慢,生產(chǎn)者不能一直等消費者吃完包子把盤子放回去再去生產(chǎn)涝焙,因為吃包子的人有很多卑笨,如果這期間你好朋友來了,和你一起吃包子仑撞,生產(chǎn)者不用關注是哪個消費者去桌子上拿盤子赤兴,而消費者只去關注桌子上有沒有放盤子,如果有隧哮,就端過來吃盤子中的包子桶良,沒有的話就等待。對應關系如下圖:

考察了一下沮翔,原來當初設計這個模式陨帆,主要就是用來處理并發(fā)問題的,而Celery就是一個用python寫的并行分布式框架采蚀。

然后我接著去學習Celery

Celery的定義

Celery(芹菜)是一個簡單疲牵、靈活且可靠的,處理大量消息的分布式系統(tǒng)榆鼠,并且提供維護這樣一個系統(tǒng)的必需工具纲爸。

我比較喜歡的一點是:Celery支持使用任務隊列的方式在分布的機器、進程妆够、線程上執(zhí)行任務調(diào)度识啦。然后我接著去理解什么是任務隊列。

任務隊列

任務隊列是一種在線程或機器間分發(fā)任務的機制责静。

消息隊列

消息隊列的輸入是工作的一個單元袁滥,稱為任務盖桥,獨立的職程(Worker)進程持續(xù)監(jiān)視隊列中是否有需要處理的新任務灾螃。

Celery 用消息通信,通常使用中間人(Broker)在客戶端和職程間斡旋揩徊。這個過程從客戶端向隊列添加消息開始腰鬼,之后中間人把消息派送給職程嵌赠,職程對消息進行處理。如下圖所示:

Celery 系統(tǒng)可包含多個職程和中間人熄赡,以此獲得高可用性和橫向擴展能力姜挺。

Celery的架構

Celery的架構由三部分組成,消息中間件(message broker)彼硫,任務執(zhí)行單元(worker)和任務執(zhí)行結果存儲(task result store)組成炊豪。

消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成拧篮,包括词渤,RabbitMQ,Redis,MongoDB等,這里我先去了解RabbitMQ,Redis串绩。

任務執(zhí)行單元

Worker是Celery提供的任務執(zhí)行的單元缺虐,worker并發(fā)的運行在分布式的系統(tǒng)節(jié)點中

任務結果存儲

Task result store用來存儲Worker執(zhí)行的任務的結果,Celery支持以不同方式存儲任務的結果礁凡,包括Redis高氮,MongoDB,Django ORM顷牌,AMQP等剪芍,這里我先不去看它是如何存儲的,就先選用Redis來存儲任務執(zhí)行結果韧掩。

然后我接著去安裝Celery紊浩,在安裝Celery之前,我已經(jīng)在自己虛擬機上安裝好了Python疗锐,版本是2.7坊谁,是為了更好的支持Celery的3.0以上的版本。

因為涉及到消息中間件滑臊,所以我先去選擇一個在我工作中要用到的消息中間件(在Celery幫助文檔中稱呼為中間人)口芍,為了更好的去理解文檔中的例子,我安裝了兩個中間件雇卷,一個是RabbitMQ,一個redis鬓椭。

在這里我就先根據(jù)Celery3.1的幫助文檔安裝和設置RabbitMQ, 要使用 Celery,我們需要創(chuàng)建一個 RabbitMQ 用戶关划、一個虛擬主機小染,并且允許這個用戶訪問這個虛擬主機。下面是我在個人虛擬機Ubuntu14.04上的設置:

$ sudo rabbitmqctl add_user forward password

#創(chuàng)建了一個RabbitMQ用戶,用戶名為forward贮折,密碼是password

$ sudo rabbitmqctl add_vhost ubuntu

#創(chuàng)建了一個虛擬主機裤翩,主機名為ubuntu

$ sudo rabbitmqctl set_permissions -p ubuntu forward ".*" ".*" ".*"

#允許用戶forward訪問虛擬主機ubuntu,因為RabbitMQ通過主機名來與節(jié)點通信

$ sudo rabbitmq-server

之后我啟用RabbitMQ服務器调榄,結果如下踊赠,成功運行:

之后我安裝Redis,它的安裝比較簡單,如下:

$ sudo pip install redis

然后進行簡單的配置呵扛,只需要設置 Redis 數(shù)據(jù)庫的位置:

BROKER_URL = 'redis://localhost:6379/0'

URL的格式為:

redis://:password@hostname:port/db_number

URL Scheme 后的所有字段都是可選的,并且默認為 localhost 的 6379 端口筐带,使用數(shù)據(jù)庫 0今穿。我的配置是:

redis://:password@ubuntu:6379/5

之后安裝Celery,我是用標準的Python工具pip安裝的伦籍,如下:

$ sudo pip install celery

為了測試Celery能否工作蓝晒,我運行了一個最簡單的任務,編寫tasks.py帖鸦,如下圖所示:

編輯保存退出后拔创,我在當前目錄下運行如下命令:

$ celery -A tasks worker --loglevel=info

#查詢文檔,了解到該命令中-A參數(shù)表示的是Celery APP的名稱富蓄,這個實例中指的就是tasks.py,后面的tasks就是APP的名稱剩燥,worker是一個執(zhí)行任務角色,后面的loglevel=info記錄日志類型默認是info,這個命令啟動了一個worker,用來執(zhí)行程序中add這個加法任務(task)立倍。

然后看到界面顯示結果如下:

我們可以看到Celery正常工作在名稱ubuntu的虛擬主機上灭红,版本為3.1.23,在下面的[config]中我們可以看到當前APP的名稱tasks口注,運輸工具transport就是我們在程序中設置的中間人redis://127.0.0.1:6379/5变擒,result我們沒有設置,暫時顯示為disabled,然后我們也可以看到worker缺省使用perfork來執(zhí)行并發(fā)寝志,當前并發(fā)數(shù)顯示為1娇斑,然后可以看到下面的[queues]就是我們說的隊列,當前默認的隊列是celery,然后我們看到下面的[tasks]中有一個任務tasks.add.

了解了這些之后材部,根據(jù)文檔我重新打開一個terminal,然后執(zhí)行Python,進入Python交互界面毫缆,用delay()方法調(diào)用任務,執(zhí)行如下操作:

這個任務已經(jīng)由之前啟動的Worker異步執(zhí)行了乐导,然后我打開之前啟動的worker的控制臺,對輸出進行查看驗證苦丁,結果如下:

綠色部分第一行說明worker收到了一個任務:tasks.add,這里我們和之前發(fā)送任務返回的AsyncResult對比我們發(fā)現(xiàn),每個task都有一個唯一的ID物臂,第二行說明了這個任務執(zhí)行succeed,執(zhí)行結果為12旺拉。

查看資料說調(diào)用任務后會返回一個AsyncResult實例,可用于檢查任務的狀態(tài)棵磷,等待任務完成或獲取返回值(如果任務失敗蛾狗,則為異常和回溯)。但這個功能默認是不開啟的仪媒,需要設置一個 Celery 的結果后端(backend)沉桌,這塊我在下一個例子中進行了學習。

通過這個例子后我對Celery有了初步的了解,然后我在這個例子的基礎上去進一步的學習蒲牧。

因為Celery是用Python編寫的,所以為了讓代碼結構化一些赌莺,就像一個應用冰抢,我使用python包,創(chuàng)建了一個celery服務艘狭,命名為pj挎扰。文件目錄如下:

celery.py

from __future __ import absolute_import

#定義未來文件的絕對進口,而且絕對進口必須在每個模塊的頂部啟用巢音。

from celery import Celery

#從celery導入Celery的應用程序接口

App.config_from_object(‘pj.config’)

#從config.py中導入配置文件

if __name__ == ‘__main__’:

app.start()

#執(zhí)行當前文件遵倦,運行celery

app = Celery(‘pj’,

broker=‘redis://localhost’,

backend=‘redis://localhost’,

include=[‘pj.tasks’]

)

#首先創(chuàng)建了一個celery實例app,實例化的過程中,制定了任務名pj(與當前文件的名字相同)官撼,Celery的第一個參數(shù)是當前模塊的名稱梧躺,在這個例子中就是pj,后面的參數(shù)可以在這里直接指定,也可以寫在配置文件中傲绣,我們可以調(diào)用config_from_object()來讓Celery實例加載配置模塊掠哥,我的例子中的配置文件起名為config.py,配置文件如下:

在配置文件中我們可以對任務的執(zhí)行等進行管理,比如說我們可能有很多的任務,但是我希望有些優(yōu)先級比較高的任務先被執(zhí)行,而不希望先進先出的等待秃诵。那么需要引入一個隊列的問題. 也就是說在我的broker的消息存儲里面有一些隊列续搀,他們并行運行,但是worker只從對應 的隊列里面取任務菠净。在這里我們希望tasks.py中的add先被執(zhí)行禁舷。task中我設置了兩個任務:

所以我通過from celery import group引入group,用來創(chuàng)建并行執(zhí)行的一組任務。然后這塊現(xiàn)需要理解的就是這個@app.task,@符號在python中用作函數(shù)修飾符毅往,到這塊我又回頭去看python的裝飾器(在代碼運行期間動態(tài)增加功能的方式)到底是如何實現(xiàn)的牵咙,在這里的作用就是通過task()裝飾器在可調(diào)用的對象(app)上創(chuàng)建一個任務。

了解完裝飾器后攀唯,我回過頭去整理配置的問題霜大,前面提到任務的優(yōu)先級問題,在這個例子中如果我們想讓add這個加法任務優(yōu)先于subtract減法任務被執(zhí)行革答,我們可以將兩個任務放到不同的隊列中战坤,由我們決定先執(zhí)行哪個任務,我們可以在配置文件中這樣配置:

先了解了幾個常用的參數(shù)的含義:

Exchange:交換機残拐,決定了消息路由規(guī)則途茫;

Queue:消息隊列;

Channel:進行消息讀寫的通道溪食;

Bind:綁定了Queue和Exchange囊卜,意即為符合什么樣路由規(guī)則的消息,將會放置入哪一個消息隊列

我將add這個函數(shù)任務放在了一個叫做for_add的隊列里面栅组,將subtract這個函數(shù)任務放在了一個叫做for_subtract的隊列里面雀瓢,然后我在當前應用目錄下執(zhí)行命令:

這個worker就只負責處理for_add這個隊列的任務,執(zhí)行這個任務:

任務已經(jīng)被執(zhí)行玉掸,我在worker控制臺查看結果:

可以看到worker收到任務刃麸,并且執(zhí)行了任務。

在這里我們還是在交互模式下手動去執(zhí)行司浪,我們想要crontab的定時生成和執(zhí)行泊业,我們可以用celery的beat去周期的生成任務和執(zhí)行任務,在這個例子中我希望每10秒鐘產(chǎn)生一個任務啊易,然后去執(zhí)行這個任務吁伺,我可以這樣配置:

使用了scheduler,要制定時區(qū):CELERY_TIMEZONE = 'Asia/Shanghai'租谈,啟動celery加上-B的參數(shù):

并且要在config.py中加入from datetime import timedelta篮奄。

更近一步,如果我希望在每周四的19點30分生成任務割去,分發(fā)任務宦搬,讓worker取走執(zhí)行,可以這樣配置:

看完這些基礎的東西劫拗,我回過頭對celery在回顧了一下间校,用圖把它的框架大致畫出來,如下圖:

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末页慷,一起剝皮案震驚了整個濱河市憔足,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌酒繁,老刑警劉巖滓彰,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異州袒,居然都是意外死亡揭绑,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進店門郎哭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來他匪,“玉大人,你說我怎么就攤上這事夸研“蠲郏” “怎么了?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵亥至,是天一觀的道長悼沈。 經(jīng)常有香客問我贱迟,道長,這世上最難降的妖魔是什么絮供? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任衣吠,我火速辦了婚禮,結果婚禮上壤靶,老公的妹妹穿的比我還像新娘缚俏。我一直安慰自己,他們只是感情好萍肆,可當我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著胀屿,像睡著了一般塘揣。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上宿崭,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天亲铡,我揣著相機與錄音,去河邊找鬼葡兑。 笑死奖蔓,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的讹堤。 我是一名探鬼主播吆鹤,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼洲守!你這毒婦竟也來了疑务?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤梗醇,失蹤者是張志新(化名)和其女友劉穎知允,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體叙谨,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡温鸽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了手负。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片涤垫。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖竟终,靈堂內(nèi)的尸體忽然破棺而出雹姊,到底是詐尸還是另有隱情,我是刑警寧澤衡楞,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布吱雏,位于F島的核電站敦姻,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏歧杏。R本人自食惡果不足惜镰惦,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望犬绒。 院中可真熱鬧旺入,春花似錦、人聲如沸凯力。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咐鹤。三九已至拗秘,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間祈惶,已是汗流浹背雕旨。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留捧请,地道東北人凡涩。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像疹蛉,于是被迫代替她去往敵國和親活箕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容

  • 序言第1章 并行和分布式計算介紹第2章 異步編程第3章 Python的并行計算第4章 Celery分布式應用第5章...
    SeanCheney閱讀 12,475評論 3 35
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理可款,服務發(fā)現(xiàn)讹蘑,斷路器,智...
    卡卡羅2017閱讀 134,601評論 18 139
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務器筑舅。支持消息的持久化座慰、事務、擁塞控...
    jiangmo閱讀 10,344評論 2 34
  • 前言 本系列文章計劃分三個章節(jié)進行講述翠拣,分別是理論篇版仔、基礎篇和實戰(zhàn)篇。理論篇主要為構建分布式爬蟲而儲備的理論知識误墓,...
    resolvewang閱讀 14,312評論 9 54
  • http://student-lp.iteye.com/blog/2093397https://segmentfa...
    樊海鵬閱讀 1,650評論 1 6