序言
第1章 并行和分布式計(jì)算介紹
第2章 異步編程
第3章 Python的并行計(jì)算
第4章 Celery分布式應(yīng)用
第5章 云平臺部署Python
第6章 超級計(jì)算機(jī)群使用Python
第7章 測試和調(diào)試分布式應(yīng)用
第8章 繼續(xù)學(xué)習(xí)
本章是前面某些知識點(diǎn)的延續(xù)辨液。特別的会宪,本章以實(shí)例詳細(xì)的探討了異步編程和分布式計(jì)算。本章關(guān)注Celery志笼,一個復(fù)雜的用于構(gòu)建分布應(yīng)用的Python框架。最后把篓,對比了Celery的對手:Pyro
和Python-RQ
纫溃。
此時,你應(yīng)該已經(jīng)明白了并行韧掩、分布和異步編程的基本含義紊浩。如果沒有的話,最好再學(xué)習(xí)下前面幾章疗锐。
搭建多機(jī)環(huán)境
學(xué)習(xí)Celery和其它Python包之前坊谁,先來搭建測試環(huán)境。我們開發(fā)的是分布應(yīng)用滑臊,因此需要多機(jī)環(huán)境口芍。
可以使用至少兩臺聯(lián)網(wǎng)機(jī)器的讀者可以跳過這部分。其余讀者雇卷,請繼續(xù)閱讀淑掌。對于后者燎潮,仍然有免費(fèi)或便宜的解決方案。
其一是在主機(jī)上使用虛擬機(jī)VM(例如VirtualBox,https://www.virtualbox.org)马昙。創(chuàng)建幾個VM,安裝Linux箱熬,讓它們在后臺運(yùn)行钧萍。因?yàn)樗鼈儾恍枰獔D像化桌面,所以可以很輕量脱货,使用少量RAM和CPU即可岛都。
另一方法是買幾個便宜的小型計(jì)算機(jī)主板律姨,比如樹莓派(https://www.raspberrypi.org),在它上面安裝Linux臼疫,連上局域網(wǎng)择份。
第三種方案是用云服務(wù)器,比如Amazon EC2烫堤,使用它的虛擬機(jī)荣赶。如果使用這種方法,要確認(rèn)這些包的端口在防火墻是打開的鸽斟。
無論是用哪種方法拔创,緊跟著的問題就是沒有在集群上安裝完整的DNS。最便捷的方法是在所有機(jī)器上編輯/etc/hosts
文件富蓄。查看IP地址剩燥,為每臺機(jī)器起一個名字,并將它們添加到/etc/hosts
立倍。
我在Mac主機(jī)上使用了兩個虛擬機(jī)灭红,這是我的hosts文件:
$ cat /etc/hosts
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting. Do not change this entry.
##
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1 localhost
fe80::1%lo0 localhost
# Development VMs
192.168.123.150 ubuntu1 ubuntu1.local
192.168.123.151 ubuntu2 ubuntu2.local
相似的,這是我的兩個虛擬機(jī)(運(yùn)行Ubuntu 15.04)上的host文件:
$ cat /etc/hosts
127.0.0.1 localhost
192.168.123.151 ubuntu2
192.168.123.150 ubuntu1
# The following lines are desirable for IPv6 capable hosts
::1 ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
你要確保hosts文件上的IP地址和名字是要使用的機(jī)器口注。本書变擒,會命名這些機(jī)器命名為HOST1、HOST2寝志、HOST3等等娇斑。
搭建好多機(jī)環(huán)境之后,就可以開始寫分布應(yīng)用了澈段。
安裝Celery
目前為止悠菜,我們用的都是Python的標(biāo)準(zhǔn)庫,Celery(http://www.celeryproject.org)是用到的第一個第三方庫败富。Celery是一個分布任務(wù)隊(duì)列悔醋,就是一個以隊(duì)列為基礎(chǔ)的系統(tǒng),和之前的某些例子很像兽叮。它還是分布式的芬骄,意味著工作進(jìn)程和保存結(jié)果的和請求的隊(duì)列,在不同機(jī)器上鹦聪。
首先安裝Celery和它的依賴账阻。在每臺機(jī)器上建立一個虛擬環(huán)境(起名為book
),代碼如下(環(huán)境是Unix):
$ pip install virtualenvwrapper
如果這個命令被拒絕泽本,可以加上sudo
淘太,用超級用戶權(quán)限來安裝virtualenvwrapper
,代碼如下:
$ sudo pip install virtualenvwrapper
sudo
命令會向你詢問Unix用戶密碼∑涯粒或者撇贺,可以用下面代碼安裝virtualenvwrapper
:
$ pip install --user virtualenvwrapper
不管使用哪種方法,完成安裝virtualenvwrapper
之后冰抢,都需要配置它松嘶,定義三個環(huán)境變量(用于bash類的shell,假定virtualenvwrapper
安裝在/usr/local/bin
):
$ export WORKON_HOME=$HOME/venvs
$ export PROJECT_HOME=$HOME/workspace
$ source /usr/local/bin/virtualenvwrapper.sh
你需要修改前置路徑挎扰,來決定虛擬環(huán)境所在的位置($WORKON_HOME
)和代碼的根目錄($PROJECT_HOME
)翠订。virtualenvwrapper.sh
的路徑也可能需要變動。這三行代碼最好添加到相關(guān)的shell啟動文件(例如遵倦,~/.bashrc
或~/.profile
)尽超。
做好了前面的設(shè)置,我們就可以創(chuàng)建要使用的虛擬環(huán)境了骇吭,如下所示:
$ mkvirtualenv book --python=`which python3.5`
這個命令會在$WORKON_HOME
之下建立新的虛擬環(huán)境橙弱,名字是book
,使用的是Python 3.5燥狰。以后,可以用下面命令啟動這個虛擬環(huán)境:
$ workon book
使用虛擬環(huán)境的好處是斜筐,可以在里面安裝所有需要的包龙致,而不污染系統(tǒng)的Python。以后不再需要這個虛擬環(huán)境時顷链,可以方便的刪除(參考rmvirtualenv
命令)目代。
現(xiàn)在就可以安裝Celery了。和以前一樣嗤练,(在每臺機(jī)器上)使用pip
:
$ pip install celery
該命令可以在激活的虛擬環(huán)境中下載榛了、解壓、安裝所有的依賴煞抬。
快完成了霜大,現(xiàn)在只需安裝配置一個中間代理,Celery用它主持任務(wù)隊(duì)列革答,并向工作進(jìn)程(只有一臺機(jī)器战坤,HOST1)發(fā)送消息。從文檔中可以看到残拐,Celery支持多種中間代理途茫,包括SQLAlchemy(http://www.sqlalchemy.org),用以本地開發(fā)和測試溪食。這里推薦使用的中間代理是RabbitMQ(https://www.rabbitmq.com)囊卜。
https://www.rabbitmq.com上有安裝指導(dǎo)、文檔和下載。在Mac主機(jī)上栅组,安裝的最簡方法是使用homebrew(http://brew.sh)雀瓢,如下所示:
$ brew install rabbitmq
對于Windows用戶,最好使用官方的安裝包笑窜。對于Linux致燥,官方也提供了安裝包。
安裝好RabbitMQ之后排截,就可以立即使用了嫌蚤。這里還有一個簡單的配置步驟,因?yàn)樵诶又卸习粒L問隊(duì)列不會創(chuàng)建用戶和密碼脱吱。只要編輯RabbitMQ的配置文件(通常位于/usr/local/etc/rabbitmq/rabbitmq.config
),添加下面的條目认罩,允許網(wǎng)絡(luò)中的默認(rèn)guest
賬戶:
[
{rabbit, [{loopback_users, []}]}
].
手動啟動RabbitMQ箱蝠,如下所示(服務(wù)器腳本可能不在$PATH
環(huán)境,通常存儲在/usr/local/sbin
):
$ sudo rabbitmq-server
sudo
會向你詢問用戶密碼垦垂。對于我們的例子宦搬,我們不會進(jìn)一步配置中間代理,使用默認(rèn)訪客賬戶就行劫拗。
注意:感興趣的讀者可以在http://www.rabbitmq.com/admin-guide.html閱讀RabbitMQ的管理指導(dǎo)间校。
到這里,我們就安裝好了所有需要的東西页慷,可以開始使用Celery了憔足。有另外一個依賴,也值得考慮安裝酒繁,盡管不是嚴(yán)格需要的滓彰,尤其是我們只想使用Celery。它是結(jié)果后臺州袒,即Celery的工作進(jìn)程用其存儲計(jì)算的結(jié)果揭绑。它就是Redis(http://redis.io)。安裝Redis是非必須的稳析,但極力推薦安裝洗做,和RabbitMQ類似,Redis運(yùn)行在另一臺機(jī)器上彰居,稱作HOST2诚纸。
Redis的安裝十分簡單,安裝代碼適用于Linux陈惰,Mac OS X和Windows畦徘。我們在Mac上用homebrew安裝毕籽,如下:
$ brew install redis
在其它操作系統(tǒng)上,例如Linux井辆,可以方便的用二進(jìn)制碼安裝(例如對于Ubuntu关筒,sudo apt-get install redis-server
)。
啟動Redis的命令如下:
$ sudo redis-server
本章剩下的部分會假定結(jié)果后臺存在杯缺,如果沒有安裝蒸播,會到時指出配置和代碼的不同。同時萍肆,任何在生產(chǎn)環(huán)境中使用Celery的人袍榆,都應(yīng)該考慮使用結(jié)果后臺。
測試安裝
快速嘗試一個例子塘揣,以驗(yàn)證Celery是正確安裝的包雀。我們需要四個終端窗口,三個不同的機(jī)器(命名為HOST1亲铡、HOST2才写、HOST3和HOST4)。在HOST1的窗口啟動RabbitMQ(確保rabbitmq-server
路徑正確):
HOST1 $ sudo /usr/local/sbin/rabbitmq-server
在HOST2的窗口奖蔓,啟動Redis(沒安裝的話赞草,跳到下一段):
HOST2 $ sudo /usr/local/bin/redis-server
最后,在HOST3的窗口吆鹤,創(chuàng)建如下Python文件(記得使用workon book
激活虛擬環(huán)境)房资,命名為test.py
:
import celery
app = celery.Celery('test',
broker='amqp://HOST1',
backend='redis://HOST2')
@app.task
def echo(message):
return message
這段代碼很簡單。先引入了Celery
包檀头,然后定義了一個Celery應(yīng)用(app
),名字是test
岖沛。這個應(yīng)用使用HOST1的中間代理RabbitMQ和HOST2的Redis數(shù)據(jù)庫的默認(rèn)賬戶和消息隊(duì)列暑始。
要是想用RabbitMQ作為結(jié)果后臺而不用Redis,需要修改前面的代碼婴削,將backend
進(jìn)行如下修改:
import celery
app = celery.Celery('test',
broker='amqp://HOST1',
backend=amqp://HOST1')
@app.task
def echo(message):
return message
有了應(yīng)用實(shí)例廊镜,就可以用它裝飾遠(yuǎn)程的worker(使用裝飾器@app.task
)。在這個例子中唉俗,我們裝飾一個簡單的函數(shù)嗤朴,它可以返回傳遞給它的消息(echo
)。
之后虫溜,在終端HOST3雹姊,建立worker池,如下所示:
HOST3 $ celery -A test worker --loglevel=info
記得要在test.py
的目錄(或?qū)?code>PYTHONPATH環(huán)境變量指向test.py
的目錄)衡楞,好讓Celery可以引入代碼吱雏。
celery
命令會默認(rèn)啟動CPU數(shù)目相同的worker進(jìn)程。worker會使用test
模塊中的應(yīng)用app
(我們可以使用實(shí)例的名字celery -A test.app worker
),并使用INFO
等級在控制臺顯示日志歧杏。在我的電腦上(有HyperThreading
的四核電腦)镰惦,Celery默認(rèn)啟用了八個worker進(jìn)程。
在HOST4終端犬绒,復(fù)制test.py
代碼旺入,啟動book
虛擬環(huán)境,在test.py
目錄打開Python shell凯力,如下所示:
HOST4 $ python3.5
Python 3.5.0 (v3.5.0:374f501f4567, Sep 12 2015, 11:00:19)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
從復(fù)制的test
模塊引入echo
函數(shù)茵瘾,如下:
>>> from test import echo
我們現(xiàn)在可以像普通Python函數(shù)一樣調(diào)用echo
,echo
可以直接在本地(即HOST4)運(yùn)行沮协,如下所示:
>>> res = echo('Python rocks!')
>>> print(res)
Python rocks!
為了讓HOST3的worker進(jìn)程運(yùn)行echo()
函數(shù)龄捡,我們不能像之前那樣直接調(diào)用。我們需要調(diào)用它的delay
方法(裝飾器@app.task
注入的)慷暂,見下面的命令:
>>> res = echo.delay('Python rocks!'); print(type(res)); print(res)
<class 'celery.result.AsyncResult'>
1423ec2b-b6c7-4c16-8769-e62e09c1fced
>>> res.ready()
True
>>> res.result
'Python rocks!'
我們看到聘殖,調(diào)用echo.delay('Python rocks!')
不會返回字符串。相反行瑞,它在任務(wù)隊(duì)列(運(yùn)行在HOST1的RabbitMQ服務(wù)器)中安排了一個請求以執(zhí)行echo
函數(shù)奸腺,并返回Future
,準(zhǔn)確的說是AsyncResult
(Celery的Future)血久。正如concurrent.futures
模塊突照,這個對象是一個異步調(diào)用結(jié)果的占位符。在我們的例子中氧吐,異步調(diào)用的是我們安插在任務(wù)隊(duì)列的echo
函數(shù)讹蘑,調(diào)用它的是其它位置的Celery的worker進(jìn)程(我們的例子中是HOST3)。
我們可以查詢AsyncResult
對象來確定它們是否預(yù)備好筑舅。如果是的話座慰,我們可以訪問它們的結(jié)果,在我們的例子中是字符串'Python rocks!'翠拣。
切換到啟用worker進(jìn)程的窗口版仔,我們可以看到worker池接收到了echo
任務(wù)請求,如下所示:
[2015-11-10 08:30:12,869: INFO/MainProcess] Received task: test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced]
[2015-11-10 08:30:12,886: INFO/MainProcess] Task test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced] succeeded in 0.01469148206524551s: 'Python rocks!'
我們現(xiàn)在可以退出Python shell和worker進(jìn)程(在發(fā)起celery worker
命令的終端窗口按CTRL+C
):Celery安裝成功误墓。
Celery介紹
什么是分布式任務(wù)隊(duì)列蛮粮,Celery是怎么運(yùn)行分布式任務(wù)隊(duì)列的呢?分布式任務(wù)隊(duì)列這種架構(gòu)已經(jīng)存在一定時間了谜慌。這是一種master-worker架構(gòu)然想,有一個中間件層,中間件層使用多個任務(wù)請求隊(duì)列(即任務(wù)隊(duì)列)畦娄,和一個用于存儲結(jié)果的隊(duì)列(即結(jié)果后臺)又沾。
主進(jìn)程(也叫作client
或producer
)將任務(wù)請求安插到某個任務(wù)隊(duì)列弊仪,從結(jié)果后臺獲取數(shù)據(jù)。worker進(jìn)程訂閱任務(wù)隊(duì)列以明確任務(wù)是什么杖刷,并把結(jié)果放到結(jié)果后臺励饵。
這是一個簡單靈活的架構(gòu)。主進(jìn)程不需要知道有多少個可用的worker滑燃,也不需要知道worker運(yùn)行在哪臺機(jī)器役听。它只需要知道隊(duì)列在哪,以及如何發(fā)送任務(wù)請求表窘。
worker進(jìn)程也是如此典予。它們不需要知道任務(wù)請求來自何處,也不需要知道結(jié)果用來做什么乐严。它們只需知道從哪里取得任務(wù)瘤袖,存儲在哪里。
這樣的優(yōu)點(diǎn)是worker的數(shù)量昂验、種類捂敌、形態(tài)可以隨意變化,而不對總系統(tǒng)的功能產(chǎn)生影響(但會影響性能和延遲)既琴。分布式任務(wù)隊(duì)列可以方便地進(jìn)行擴(kuò)展(添加新worker)占婉,規(guī)劃優(yōu)先級(給隊(duì)列定義不同的優(yōu)先級,給不同的隊(duì)列安排不同數(shù)量的worker)甫恩。
另一個優(yōu)點(diǎn)是逆济,這個去耦合化的系統(tǒng)在原則上,worker和producer可以用不同語言來寫磺箕。例如奖慌,Python代碼生成的工作由C語言寫的worker進(jìn)程來做,這樣性能是最高的松靡。
Celery使用了第三方升薯、健壯的、實(shí)地驗(yàn)證的系統(tǒng)來做它的隊(duì)列和結(jié)果后臺击困。推薦的中間代理是RabbitMQ,我們之前用過广凸。RabbitMQ是一個非常復(fù)雜的消息代理阅茶,有許多特性,本書不會對它做深入探索谅海。結(jié)果后臺也是如此脸哀,它可以是一個簡單的RabbitMQ隊(duì)列,或者更優(yōu)的扭吁,使用專門的服務(wù)比如Redis撞蜂。
下圖展示了典型的使用RabbitMQ和Redis的Celery應(yīng)用架構(gòu):
每個方框中的進(jìn)程(即RabbitMQ盲镶、Redis、worker和master.py
)都可以運(yùn)行在不同的機(jī)器上蝌诡。小型的安裝方案是將RabbitMQ和Redis放在同一個主機(jī)上溉贿,worker幾點(diǎn)可能只有一個或兩個。大型方案會使用更多的機(jī)器浦旱,或者專門的服務(wù)器宇色。
更復(fù)雜的Celery應(yīng)用
我們用Celery做兩個簡單有趣的應(yīng)用。第一個仿照第3章中匯率例子颁湖,第二個是一個分布式排序算法宣蠕。
我們還是使用四臺機(jī)器(HOST1、HOST2甥捺、HOST3抢蚀、HOST4)。和以前一樣镰禾,HOST1運(yùn)行RabbitMQ皿曲,HOST2運(yùn)行Redis,HOST3運(yùn)行Celery的worker羡微,HOST運(yùn)行主代碼谷饿。
先從簡單的例子開始。創(chuàng)建一個Python文件(celery/currency.py
)妈倔,代碼如下(如果你沒有使用Redis博投,記得修改backend
成'amqp://HOST1'
):
import celery
import urllib.request
app = celery.Celery('currency',
broker='amqp://HOST1',
backend='redis://HOST2')
URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'
@app.task
def get_rate(pair, url_tmplt=URL):
with urllib.request.urlopen(url_tmplt.format(pair)) as res:
body = res.read()
return (pair, float(body.strip()))
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()
results = [get_rate.delay(pair) for pair in args.pairs]
for result in results:
pair, rate = result.get()
print(pair, rate)
這段代碼和第3章的多線程版本差不多。主要的區(qū)別是盯蝴,因?yàn)槭褂玫氖荂elery毅哗,我們不需要創(chuàng)建隊(duì)列,Celery負(fù)責(zé)建立隊(duì)列捧挺。另外虑绵,除了為每個匯率對建一個線程,我們只需讓worker負(fù)責(zé)從隊(duì)列獲取任務(wù)請求闽烙,執(zhí)行相應(yīng)的函數(shù)請求翅睛,完畢之后返回結(jié)果。
探討調(diào)用的行為是有益的黑竞,比如成功的調(diào)用捕发、由于缺少worker而不工作的調(diào)用、失敗且拋出異常的調(diào)用很魂。我們從成功的調(diào)用開始扎酷。
和echo
的例子一樣,在各自的終端啟動RabbitMQ和Redis(通過redis-server
和rabbitmq-server
命令)遏匆。
然后法挨,在worker主機(jī)(HOST3)上谁榜,復(fù)制currency.py
文件,切換到它的目錄凡纳,創(chuàng)建worker池(記住窃植,Celery啟動的worker數(shù)目盡可能和CPU核數(shù)一樣多):
HOST3 $ celery -A currency worker --loglevel=info
最后,復(fù)制相同的文件到HOST4惫企,并運(yùn)行如下:
HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0644
CHFUSD 0.986
GBPUSD 1.5216
GBPEUR 1.4296
CADUSD 0.751
CADEUR 0.7056
一切工作正常撕瞧,我么得到了五個匯率。如果查看啟動worker池的主機(jī)(HOST3)狞尔,我們會看到類似下圖的日志:
這是日志等級loglevel=info
時丛版,Celery worker的日志。每個任務(wù)都被分配了一個獨(dú)立ID(例如GBP兌USD的任務(wù)ID是f8658917-868c-4eb5-b744-6aff997c6dd2)偏序,基本的時間信息也被打印了出來页畦。
如果沒有可用的worker呢?最簡單的方法是停止worker(在終端窗口按CTRL+C
)研儒,返回HOST4的currency.py
豫缨,如下所示:
OST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
什么都沒發(fā)生,currency.py
一直處于等待worker的狀態(tài)端朵。這樣的狀態(tài)可能也可能不是我們想要的:其一好芭,讓文件等待而不發(fā)生崩潰,是很方便的冲呢;其二舍败,我們可能想在一定時間后,停止等待敬拓×谑恚可以在result.get()
用timeout
參數(shù)。
例如乘凸,修改代碼厕诡,使用result.get(timeout=1)
,會有如下結(jié)果(還是在沒有worker的情況下):
HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
Traceback (most recent call last):
File "currency.py", line 29, in <module>
pair, rate = result.get(timeout=1)
File "/venvs/book/lib/python3.5/site-packages/celery/result.py", line 169, in get
no_ack=no_ack,
File " /venvs/book/lib/python3.5/site-packages/celery/backends/base.py", line 226, in wait_for
raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.
當(dāng)然营勤,我們應(yīng)該總是使用超時灵嫌,以捕獲對應(yīng)的異常,作為錯誤處理的策略葛作。
要記住醒第,默認(rèn)下,任務(wù)隊(duì)列是持續(xù)的进鸠,它的日志不會停止(Celery允許用戶定制)。這意味著形病,如果我們現(xiàn)在啟動了一些worker客年,它們就會開始從隊(duì)列獲取懸掛的任務(wù)霞幅,并返回結(jié)果。我們可以用如下命令清空隊(duì)列:
HOST4 $ celery purge
WARNING: This will remove all tasks from queue: celery.
There is no undo for this operation!
(to skip this prompt use the -f option)
Are you sure you want to delete all tasks (yes/NO)? yes
Purged 12 messages from 1 known task queue.
接下來看任務(wù)產(chǎn)生異常的情況量瓜。修改HOST3的currency.py
文件司恳,讓get_rate
拋出一個異常,如下所示:
@app.task
def get_rate(pair, url_tmplt=URL):
raise Exception('Booo!')
現(xiàn)在绍傲,重啟HOST3的worker池(即HOST3 $ celery -A currency worker --loglevel=info
)扔傅,然后在HOST4啟動主程序:
HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
Traceback (most recent call last):
File "currency.py", line 31, in <module>
pair, rate = result.get(timeout=1)
File "/Users/fpierfed/Documents/venvs/book/lib/python3.5/site-packages/celery/result.py", line 175, in get
raise meta['result']
Exception: Booo!
所有的worker都拋出了異常,異常傳遞到了調(diào)用的代碼烫饼,在首次調(diào)用result.get()
返回猎塞。
任務(wù)拋出任何異常,我們都要小心杠纵。遠(yuǎn)程運(yùn)行的代碼失敗的原因可能有很多荠耽,不一定和代碼本身有關(guān),因此需要謹(jǐn)慎應(yīng)對比藻。
Celery可以用如下的方法提供幫助:我們可以用timeout
獲取結(jié)果铝量;重新提交失敗的任務(wù)(參考task
裝飾器的retry
參數(shù))。還可以取消任務(wù)請求(參考任務(wù)的apply_async
方法的expires
參數(shù)银亲,它比之前我們用過的delay
功能強(qiáng)大)慢叨。
有時,任務(wù)圖會很復(fù)雜务蝠。一項(xiàng)任務(wù)的結(jié)果還要傳遞給另一個任務(wù)拍谐。Celery支持復(fù)雜的調(diào)用方式,但是會有性能損耗请梢。
用第二個例子來探討:一個分布式的歸并排序算法赠尾。這是包含兩個文件的長代碼:一個是算法本身(mergesory.py
),一個是主代碼(main.py
)毅弧。
歸并排序是一個簡單的基于遞歸二分輸入列表的算法气嫁,將兩個部分排序,再將結(jié)果合并够坐。建立一個新的Python文件(celery/mergesort.py
)寸宵,代碼如下:
import celery
app = celery.Celery('mergesort',
broker='amqp://HOST1',
backend='redis://HOST2')
@app.task
def sort(xs):
lenxs = len(xs)
if(lenxs <= 1):
return(xs)
half_lenxs = lenxs // 2
left = xs[:half_lenxs]
right = xs[half_lenxs:]
return(merge(sort(left), sort(right)))
def merge(left, right):
nleft = len(left)
nright = len(right)
merged = []
i = 0
j = 0
while i < nleft and j < nright:
if(left[i] < right[j]):
merged.append(left[i])
i += 1
else:
merged.append(right[j])
j += 1
return merged + left[i:] + right[j:]
這段代碼很直白。Celery應(yīng)用命名為app
元咙,它使用RabbitMQ作為任務(wù)隊(duì)列梯影,使用Redis作為結(jié)果后臺。然后庶香,定義了sort
算法甲棍,它使用了附屬的merge
函數(shù)以合并兩個排好序的子列表,成為一個排好序的單列表赶掖。
對于主代碼感猛,另建一個文件(celery/main.py
)七扰,它的代碼如下:
#!/usr/bin/env python3.5
import random
import time
from celery import group
from mergesort import sort, merge
# Create a list of 1,000,000 elements in random order.
sequence = list(range(1000000))
random.shuffle(sequence)
t0 = time.time()
# Split the sequence in a number of chunks and process those
# independently.
n = 4
l = len(sequence) // n
subseqs = [sequence[i * l:(i + 1) * l] for i in range(n - 1)]
subseqs.append(sequence[(n - 1) * l:])
# Ask the Celery workers to sort each sub-sequence.
# Use a group to run the individual independent tasks as a unit of work.
partials = group(sort.s(seq) for seq in subseqs)().get()
# Merge all the individual sorted sub-lists into our final result.
result = partials[0]
for partial in partials[1:]:
result = merge(result, partial)
dt = time.time() - t0
print('Distributed mergesort took %.02fs' % (dt))
# Do the same thing locally and compare the times.
t0 = time.time()
truth = sort(sequence)
dt = time.time() - t0
print('Local mergesort took %.02fs' % (dt))
# Final sanity checks.
assert result == truth
assert result == sorted(sequence)
我們先生成一個足夠長的無序(random.shuffle
)整數(shù)序列(sequence = list(range(1000000))
)。然后陪白,分成長度相近的子列表(n=4
)颈走。
有了子列表,就可以對它們進(jìn)行并行處理(假設(shè)至少有四個可用的worker)咱士。問題是立由,我們要知道什么時候這些列表排序好了,好進(jìn)行合并序厉。
Celery提供了多種方法讓任務(wù)協(xié)同執(zhí)行锐膜,group
是其中之一。它可以在一個虛擬的任務(wù)里脂矫,將并發(fā)的任務(wù)捆綁執(zhí)行枣耀。group
的返回值是GroupResult
(與類AsyncResult
的層級相同)。如果沒有結(jié)果后臺庭再,GroupResult get()
方法是必須要有的捞奕。當(dāng)組中所有的任務(wù)完成并返回值,group
方法會獲得一個任務(wù)簽名(用參數(shù)調(diào)用任務(wù)s()
方法拄轻,比如代碼中的sort.s(seq)
)的列表颅围。任務(wù)簽名是Celery把任務(wù)當(dāng)做參數(shù),傳遞給其它任務(wù)(但不執(zhí)行)的機(jī)制恨搓。
剩下的代碼是在本地合并排好序的列表院促,每次合并兩個。進(jìn)行完分布式排序,我們再用相同的算法重新排序原始列表。最后未玻,對比歸并排序結(jié)果與內(nèi)建的sorted
調(diào)用。
要運(yùn)行這個例子弄抬,需要啟動RabbitMQ和Redis。然后宪郊,在HOST3啟動一些worker掂恕,如下所示:
HOST3 $ celery -A mergesort worker --loglevel=info
記得拷貝mergesort.py
文件,并切換到其目錄運(yùn)行(或者弛槐,定義PYTHONPATH
指向它所在的位置)懊亡。
之后,在HOST4上運(yùn)行:
HOST4 $ python3.5 main.py
Distributed mergesort took 10.84s
Local mergesort took 26.18s
查看Celery日志乎串,我們看到worker池接收并執(zhí)行了n個任務(wù)店枣,結(jié)果發(fā)回給了caller。
性能和預(yù)想的不一樣。使用多進(jìn)程(使用multiprocessing
或concurrent.futures
)來運(yùn)行鸯两,與前面相比坏瞄,可以有n倍的性能提升(7秒,使用四個worker)甩卓。
這是因?yàn)镃elery同步耗時長,最好在只有不得不用的時候再使用蕉斜。Celery持續(xù)詢問組中的部分結(jié)果是否準(zhǔn)備好逾柿,好進(jìn)行后續(xù)的工作。這會非常消耗資源宅此。
生產(chǎn)環(huán)境中使用Celery
下面是在生產(chǎn)環(huán)境中使用Celery的tips机错。
第一個建議是在Celery應(yīng)用中使用配置模塊,而不要在worker代碼中進(jìn)行配置父腕。假設(shè)弱匪,配置文件是config.py
,可以如下將其傳遞給Celery應(yīng)用:
import celery
app = celery.Celery('mergesort')
app.config_from_object('config')
然后璧亮,與其他可能相關(guān)的配置指令一起萧诫,在config.py
中添加:
BROKER_URL = 'amqp://HOST1'
CELERY_RESULT_BACKEND = 'redis://HOST2'
關(guān)于性能的建議是,使用至少兩個隊(duì)列枝嘶,好讓任務(wù)按照執(zhí)行時間劃分優(yōu)先級帘饶。使用多個隊(duì)列,將任務(wù)劃分給合適的隊(duì)列群扶,是分配worker的簡便方法及刻。Celery提供了詳盡的方法將任務(wù)劃分給隊(duì)列。分成兩步:首先竞阐,配置Celery應(yīng)用缴饭,啟動worker,如下所示:
# In config.py
CELERY_ROUTES = {project.task1': {'queue': 'queue1'},
'project.task2': {'queue': 'queue2'}}
為了在隊(duì)列中啟動worker骆莹,在不同的機(jī)器中使用下面的代碼:
HOST3 $ celery –A project worker –Q queue1
HOST5 $ celery –A project worker –Q queue2
使用Celery命令行工具的-c
標(biāo)志颗搂,可以控制worker池的大小,例如汪疮,啟動一個有八個worker的池:
HOST3 $ celery –A project worker –c 8
說道worker峭火,要注意,Celery默認(rèn)使用多進(jìn)程模塊啟動worker池智嚷。這意味著卖丸,每個worker都是一個完整的Python進(jìn)程。如果某些worker只處理I/O密集型任務(wù)盏道,可以將它們轉(zhuǎn)換成協(xié)程或多線程稍浆,像前面的例子。這樣做的話,可以使用-P
標(biāo)志衅枫,如下所示:
$ celery –A project worker –P threads
使用線程和協(xié)程可以節(jié)省資源嫁艇,但不利于CPU制約型任務(wù),如前面的菲波那切數(shù)列的例子弦撩。
談到性能步咪,應(yīng)該盡量避免同步原語(如前面的group()
),除非非用不可益楼。當(dāng)同步無法回避時猾漫,好的方法是使用結(jié)果后臺(如Redis)。另外感凤,如果可能的話悯周,要避免傳遞復(fù)雜的對象給遠(yuǎn)程任務(wù),因?yàn)檫@些對象需要序列化和去序列化陪竿,通常很耗時禽翼。
額外的,如果不需要某個任務(wù)的結(jié)果族跛,應(yīng)該確保Celery不去獲取這些結(jié)果闰挡。這是通過裝飾器@task(ignore_result=True)
來做的。如果所有的任務(wù)結(jié)果都忽略了庸蔼,就不必定義結(jié)果后臺解总。這可以讓性能大幅提高。
除此之外姐仅,還要指出花枫,如何啟動worker、在哪里運(yùn)行worker掏膏、如何確保它們持續(xù)運(yùn)行是很重要的劳翰。默認(rèn)的方法是使用工具,例如supervisord (http://supervisord.org) 馒疹,來管理worker進(jìn)程佳簸。
Celery帶有一個supervisord的配置案例(在安裝文件的extra/supervisord
目錄)。一個監(jiān)督的優(yōu)秀方案是flower(https://github.com/mher/flower)颖变,一個worker的網(wǎng)絡(luò)控制和監(jiān)督工具生均。
最后,RabbitMQ和Redis結(jié)合起來腥刹,是一個很好的中間代理和結(jié)果后臺解決方案马胧,適用于大多數(shù)項(xiàng)目。
Celery的替代方案:Python-RQ
Celery的輕量簡易替代方案之一是 Python-RQ (http://python-rq.org)衔峰。它單單基于Redis作為任務(wù)隊(duì)列和結(jié)果后臺佩脊。沒有復(fù)雜任務(wù)或任務(wù)路由蛙粘,使用它很好。
因?yàn)镃elery和Python-RQ在概念上很像威彰,讓我們立即重寫一個之前的例子出牧。新建一個文件(rq/currency.py
),代碼如下:
import urllib.request
URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'
def get_rate(pair, url_tmplt=URL):
# raise Exception('Booo!')
with urllib.request.urlopen(url_tmplt.format(pair)) as res:
body = res.read()
return (pair, float(body.strip()))
這就是之前的匯率例子的代碼歇盼。區(qū)別是舔痕,與Celery不同,這段代碼不需要依賴Python-RQ或Redis豹缀。將這段代碼拷貝到worker節(jié)點(diǎn)(HOST3)赵讯。
主程序也同樣簡單。新建一個Python文件(rq/main.py)耿眉,代碼如下:
#!/usr/bin/env python3
import argparse
import redis
import rq
from currency import get_rate
parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()
conn = redis.Redis(host='HOST2')
queue = rq.Queue(connection=conn)
jobs = [queue.enqueue(get_rate, pair) for pair in args.pairs]
for job in jobs:
while job.result is None:
pass
print(*job.result)
我們在這里看到Python-RQ是怎么工作的。我們需要連接Redis服務(wù)器(HOST2)鱼响,然后將新建的連接對象傳遞給Queue
類構(gòu)造器。結(jié)果Queue
對象用來向其提交任務(wù)請求筐骇。這是通過傳遞函數(shù)對象和其它參數(shù)給queue.enqueue
铛纬。
函數(shù)排隊(duì)調(diào)用的結(jié)果是job
實(shí)例,它是個異步調(diào)用占位符晶密,之前見過多次懂牧。
因?yàn)镻ython-RQ沒有Celery的阻塞AsyncResult.get()
方法僧凤,我們要手動建一個事件循環(huán)躯保,持續(xù)向job
實(shí)例查詢吻氧,以確認(rèn)是否它們的result
不是None
這種方法不推薦在生產(chǎn)環(huán)境中使用鲁森,因?yàn)槌掷m(xù)的查詢會浪費(fèi)資源歌溉,查詢不足會浪費(fèi)時間痛垛,但對于這個簡易例子沒有問題匙头。
為了運(yùn)行代碼,首先要安裝Python-RQ碟婆,用pip進(jìn)行安裝:
$ pip install rq
在所有機(jī)器上都要安裝蝙叛。然后借帘,在HOST2運(yùn)行Redis:
$ sudo redis-server
在HOST3上姻蚓,啟動一些worker。Python-RQ不自動啟動worker池加叁。啟動多個worker的簡易的方法是使用一個文件(start_workers.py
):
#!/usr/bin/env python3
import argparse
import subprocess
def terminate(proc, timeout=.5):
"""
Perform a two-step termination of process `proc`: send a SIGTERM
and, after `timeout` seconds, send a SIGKILL. This should give
`proc` enough time to do any necessary cleanup.
"""
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout)
except subprocess.TimeoutExpired:
proc.kill()
return
parser = argparse.ArgumentParser()
parser.add_argument('N', type=int)
args = parser.parse_args()
workers = []
for _ in range(args.N):
workers.append(subprocess.Popen(['rqworker',
'-u', 'redis://yippy']))
try:
running = [w for w in workers if w.poll() is None]
while running:
proc = running.pop(0)
try:
proc.wait(timeout=1.)
except subprocess.TimeoutExpired:
running.append(proc)
except KeyboardInterrupt:
for w in workers:
terminate(w)
這個文件會啟動用戶指定書目的Python-RQ worker進(jìn)程(通過使用rqworker
腳本,Python-RQ源碼的一部分)豫柬,通過Ctrl+C
殺死進(jìn)程烧给。更健壯的方法是使用類似之前提過的supervisord工具指么。
在HOST3上運(yùn)行:
HOST3 $ ./start_workers.py 6
現(xiàn)在可以運(yùn)行代碼伯诬。在HOST4,運(yùn)行main.py
:
HOST4 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0635
CHFUSD 0.9819
GBPUSD 1.5123
GBPEUR 1.422
CADUSD 0.7484
CADEUR 0.7037
效果與Celery相同桥言。
Celery的替代方案:Pyro
Pyro (http://pythonhosted.org/Pyro4/)的意思是Python Remote Objects,是1998年創(chuàng)建的一個包鸳粉。因此,它十分穩(wěn)定艰山,且功能完備曙搬。
Pyro使用的任務(wù)分布方法與Celery和Python-RQ十分不同纵装,它是在網(wǎng)絡(luò)中將Python對象作為服務(wù)器。然后創(chuàng)建它們的代理對象挽唉,讓調(diào)用代碼可以將其看做本地對象。這個架構(gòu)在90年代末的系統(tǒng)很流行匠童,比如COBRA和Java RMI。
Pyro掩蓋了代碼中的對象是本地還是遠(yuǎn)程的首昔,是讓人詬病的一點(diǎn)。原因是巧骚,遠(yuǎn)程代碼運(yùn)行錯誤的原因很多竣蹦,當(dāng)遠(yuǎn)程代碼隱藏在代理對象后面執(zhí)行,就不容易發(fā)現(xiàn)錯誤纲菌。
另一個詬病的地方是,Pyro在點(diǎn)對點(diǎn)網(wǎng)絡(luò)(不是所有主機(jī)名都可以解析)中椅贱,或者UDP廣播無效的網(wǎng)絡(luò)中夜涕,很難正確運(yùn)行女器。
盡管如此涣澡,大多數(shù)開發(fā)者認(rèn)為Pyro非常簡易入桂,在生產(chǎn)環(huán)境中足夠健壯。
Pyro安裝很簡單蜘腌,它是純Python寫的,依賴只有幾個芯急,使用pip:
$ pip install pyro4
這個命令會安裝Pyro 4.x和Serpent,后者是Pyro用來編碼和解碼Python對象的序列器榕酒。
用Pyro重寫之前的匯率例子,要比用Python-RQ復(fù)雜,它需要另一個軟件:Pyro nameserver肩榕。但是,不需要中間代理和結(jié)果后臺株汉,因?yàn)镻yro對象之間可以直接進(jìn)行通訊筐乳。
Pyro運(yùn)行原理如下。每個遠(yuǎn)程訪問的對象都封裝在處于連接監(jiān)聽的socket服務(wù)器框架中乔妈。每當(dāng)調(diào)用遠(yuǎn)程對象中的方法蝙云,被調(diào)用的方法,連同它的參數(shù)路召,就被序列化并發(fā)送到適當(dāng)?shù)膶ο?服務(wù)器上勃刨。此時贾铝,遠(yuǎn)程對象執(zhí)行被請求的任務(wù)水孩,經(jīng)由相同的連接苍姜,將結(jié)果發(fā)回到(同樣是序列化的)調(diào)用它的代碼。
因?yàn)槊總€遠(yuǎn)程對象自身就可以調(diào)用遠(yuǎn)程對象订咸,這個架構(gòu)可以是相當(dāng)去中心化的父叙。另外,一旦建立通訊乔煞,對象之間就是p2p的纺讲,這與分布式任務(wù)隊(duì)列的輕度耦合架構(gòu)十分不同。另一點(diǎn),每個遠(yuǎn)程對象既可以做master呻澜,也可以做worker恭朗。
接下來重寫匯率的例子棍丐,來看看具體是怎么運(yùn)行的。建立一個Python文件(pyro/worker.py
)讨惩,代碼如下:
import urllib.request
import Pyro4
URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'
@Pyro4.expose(instance_mode="percall")
class Worker(object):
def get_rate(self, pair, url_tmplt=URL):
with urllib.request.urlopen(url_tmplt.format(pair)) as res:
body = res.read()
return (pair, float(body.strip()))
# Create a Pyro daemon which will run our code.
daemon = Pyro4.Daemon()
uri = daemon.register(Worker)
Pyro4.locateNS().register('MyWorker', uri)
# Sit in an infinite loop accepting connections
print('Accepting connections')
try:
daemon.requestLoop()
except KeyboardInterrupt:
daemon.shutdown()
print('All done')
worker的代碼和之前的很像,不同點(diǎn)是將get_rate
函數(shù)變成了Worker
類的一個方法野揪。變動的原因是,Pyro允許導(dǎo)出類的實(shí)例珍语,但不能導(dǎo)出函數(shù)。
剩下的代碼是Pyro特有的透乾。我們需要一個Daemon
實(shí)例(它本質(zhì)上是后臺的網(wǎng)絡(luò)服務(wù)器)磕秤,它會獲得類磷瘤,并在網(wǎng)絡(luò)上發(fā)布贸呢,好讓其它的代碼可以調(diào)用方法缚忧。分成兩步來做:首先悟泵,創(chuàng)建一個類Pyro4.Daemon
的實(shí)例,然后添加類闪水,通過將其傳遞給register
方法糕非。
每個Pyro的Daemon
實(shí)例可以隱藏任意數(shù)目的類。內(nèi)部球榆,需要的話朽肥,Daemon
對象會創(chuàng)建隱藏類的實(shí)例(也就是說,如果沒有代碼需要這個類持钉,相應(yīng)的Daemon
對象就不會將其實(shí)例化)衡招。
每一次網(wǎng)絡(luò)連接,Daemon
對象默認(rèn)會實(shí)例化一次注冊的類每强,如果要進(jìn)行并發(fā)任務(wù)始腾,這樣就不可以】罩矗可以通過裝飾注冊的類修改窘茁,@Pyro4.expose(instance_mode=...)
。
instance_mode
支持的值有三個:single
脆烟、session
和percall
山林。使用single
意味Daemon
只為類創(chuàng)建一個實(shí)例,使用它應(yīng)付所有的客戶請求邢羔。也可以通過注冊一個類的實(shí)例(而不是類本身)驼抹。
使用session
可以采用默認(rèn)模式:每個client連接都會得到一個新的實(shí)例,client始終都會使用它拜鹤。使用instance_mode="percall"
框冀,會為每個遠(yuǎn)程方法調(diào)用建立一個新實(shí)例。
無論創(chuàng)建實(shí)例的模式是什么敏簿,用Daemon
對象注冊一個類(或?qū)嵗┒紩祷匾粋€唯一的識別符(即URI)明也,其它代碼可以用識別符連接對象宣虾。我們可以手動傳遞URI,但更方便的方法是在Pyro nameserver中存儲它温数,這樣通過兩步來做绣硝。先找到nameserver,然后給URI注冊一個名字撑刺。在前面的代碼中鹉胖,是通過下面來做的:
Pyro4.locateNS().register('MyWorker', uri)
nameserver的運(yùn)行類似Python的字典,注冊兩個名字相同的URI够傍,第二個URI就會覆蓋第一個甫菠。另外,我們看到冕屯,client代碼使用存儲在nameserver中的名字控制了許多遠(yuǎn)程對象寂诱。這意味著,命名需要特別的留意安聘,尤其是當(dāng)許多worker進(jìn)程提供的功能相同時刹衫。
最后,在前面的代碼中搞挣,我們用daemon.requestLoop()
進(jìn)入了一個Daemon
事件循環(huán)带迟。Daemon
對象會在無限循環(huán)中服務(wù)client的請求。
對于client囱桨,創(chuàng)建一個Python文件(pyro/main.py
)仓犬,它的代碼如下:
#!/usr/bin/env python3
import argparse
import time
import Pyro4
parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()
# Retrieve the rates sequentially.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")
for pair in args.pairs:
print(worker.get_rate(pair))
print('Sync calls: %.02f seconds' % (time.time() - t0))
# Retrieve the rates concurrently.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)
results = [async_worker.get_rate(pair) for pair in args.pairs]
for result in results:
print(result.value)
print('Async calls: %.02f seconds' % (time.time() - t0))
可以看到,client把相同的工作做了兩次舍肠。這么做的原因是展示Pyro兩種調(diào)用方式:同步和異步搀继。
來看代碼,我們使用argparse
包從命令行獲得匯率對翠语。然后叽躯,對于同步的方式,通過名字worker = Pyro4.Proxy("PYRONAME:MyWorker")
獲得了一些遠(yuǎn)程worker
對象肌括。前綴PYRONAME:
告訴Pyro在nameserver中該尋找哪個名字点骑。這樣可以避免手動定位nameserver。
一旦有了worker
對象谍夭,可以把它當(dāng)做本地的worke
r類的實(shí)例黑滴,向其調(diào)用方法。這就是我們在第一個循環(huán)中做的:
for pair in args.pairs:
print(worker.get_rate(pair))
對每個worker.get_rate(pair)
聲明紧索,Proxy對象會用它的遠(yuǎn)程Daemon
對象連接袁辈,發(fā)送請求,以運(yùn)行get_rate(pair)
珠漂。我們例子中的Daemon
對象晚缩,每次會創(chuàng)建一個Worker
類的的實(shí)例尾膊,并調(diào)用它的get_rate(pair)
方法。結(jié)果序列化之后發(fā)送給client荞彼,然后打印出來冈敛。每個調(diào)用都是同步的,任務(wù)完成后會封鎖卿泽。
在第二個循環(huán)中,做了同樣的事滋觉,但是使用的是異步調(diào)用签夭。我們需要向遠(yuǎn)程的類創(chuàng)建一個Proxy對象,然后椎侠,將它封裝在一個異步handler中第租。這就是下面代碼的功能:
worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)
我們現(xiàn)在可以在后臺用async_worker
獲取匯率。每次調(diào)用async_worker.get_rate(pair)
是非阻塞的我纪,會返回一個Pyro4.futures.FutureResult
的實(shí)例慎宾,它和concurrent.futures
模塊中Future
對象很像。訪問它的value
需要等待浅悉,直到相應(yīng)的異步調(diào)用完成趟据。
為了運(yùn)行這個例子,需要三臺機(jī)器的三個窗口:一個是nameserver(HOST1)术健,一個是Worker
類和它的Daemon(HOST2)汹碱,第三個(HOST3)是client(即main.py
)。
在第一個終端荞估,啟動nameserver咳促,如下:
HOST1 $ pyro4-ns --host 0.0.0.0
Broadcast server running on 0.0.0.0:9091
NS running on 0.0.0.0:9090 (0.0.0.0)
Warning: HMAC key not set. Anyone can connect to this server!
URI = PYRO:Pyro.NameServer@0.0.0.0:9090
簡單來說,nameserver綁定為0.0.0.0勘伺,任何人都可以連接它跪腹。我們沒有設(shè)置認(rèn)證,因此在倒數(shù)第二行彈出了一個警告飞醉。
nameserver運(yùn)行起來了冲茸,在第二個終端啟動worker:
HOST2 $ python3.5 worker.py
Accepting connections
讓Daemon
對象接收連接,現(xiàn)在去第三個終端窗口運(yùn)行client代碼缅帘,如下:
HOST3 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Sync calls: 1.55 seconds
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Async calls: 0.29 seconds
結(jié)果和預(yù)想一致噪裕,IO限制型代碼可以方便的進(jìn)行擴(kuò)展,異步代碼的速度六倍于同步代碼股毫。
這里膳音,還有幾個提醒。第一是铃诬,Pyro的Daemon
實(shí)例要能解析主機(jī)的名字祭陷。如果不能解析苍凛,那么它只能接受127.0.0.1的連接,這意味著兵志,不能被遠(yuǎn)程連接(只能本地連接)醇蝴。解決方案是將其與運(yùn)行的主機(jī)進(jìn)行IP綁定,確保它不是環(huán)回地址想罕∮扑ǎ可以用下面的Python代碼選擇一個可用的IP:
from socket import gethostname, gethostbyname_ex
ips = [ip for ip in gethostbyname_ex(gethostname())[-1]
if ip != '127.0.0.1']
ip = ips.pop()
另一個要考慮的是:作為Pyro使用“直接連接被命名對象”方法的結(jié)果,很難像Celery和Python-RQ那樣直接啟動一批worker按价。在Pyro中惭适,必須用不同的名字命名worker,然后用名字進(jìn)行連接(通過代理)楼镐。這就是為什么癞志,Pyro的client用一個mini的規(guī)劃器來向可用的worker分配工作。
另一個要注意的是框产,nameserver不會跟蹤worker的斷開凄杯,因此,用名字尋找一個URI對象不代表對應(yīng)的遠(yuǎn)程Daemon
對象是真實(shí)運(yùn)行的秉宿。最好總是這樣對待Pyro調(diào)用:遠(yuǎn)程服務(wù)器的調(diào)用可能成功戒突,也可能不成功。
記住這些點(diǎn)描睦,就可以用Pyro搭建復(fù)雜的網(wǎng)絡(luò)和分布式應(yīng)用妖谴。
總結(jié)
這一章很長。我們學(xué)習(xí)了Celery酌摇,他是一個強(qiáng)大的包膝舅,用以構(gòu)建Python分布式應(yīng)用。然后學(xué)習(xí)了Python-RQ窑多,一個輕量且簡易的替代方案仍稀。兩個包都是使用分布任務(wù)隊(duì)列架構(gòu),它是用多個機(jī)器來運(yùn)行相同系統(tǒng)的分布式任務(wù)埂息。
然后介紹了另一個替代方案技潘,Pyro。Pyro的機(jī)理不同千康,它使用的是代理方式和遠(yuǎn)程過程調(diào)用(RPC)享幽。
兩種方案都有各自的優(yōu)點(diǎn),你可以選擇自己喜歡的拾弃。
下一章會學(xué)習(xí)將分布式應(yīng)用部署到云平臺值桩,會很有趣。
序言
第1章 并行和分布式計(jì)算介紹
第2章 異步編程
第3章 Python的并行計(jì)算
第4章 Celery分布式應(yīng)用
第5章 云平臺部署Python
第6章 超級計(jì)算機(jī)群使用Python
第7章 測試和調(diào)試分布式應(yīng)用
第8章 繼續(xù)學(xué)習(xí)