《Python分布式計(jì)算》 第4章 Celery分布式應(yīng)用 (Distributed Computing with Python)


序言
第1章 并行和分布式計(jì)算介紹
第2章 異步編程
第3章 Python的并行計(jì)算
第4章 Celery分布式應(yīng)用
第5章 云平臺部署Python
第6章 超級計(jì)算機(jī)群使用Python
第7章 測試和調(diào)試分布式應(yīng)用
第8章 繼續(xù)學(xué)習(xí)


本章是前面某些知識點(diǎn)的延續(xù)辨液。特別的会宪,本章以實(shí)例詳細(xì)的探討了異步編程和分布式計(jì)算。本章關(guān)注Celery志笼,一個復(fù)雜的用于構(gòu)建分布應(yīng)用的Python框架。最后把篓,對比了Celery的對手:PyroPython-RQ纫溃。

此時,你應(yīng)該已經(jīng)明白了并行韧掩、分布和異步編程的基本含義紊浩。如果沒有的話,最好再學(xué)習(xí)下前面幾章疗锐。

搭建多機(jī)環(huán)境

學(xué)習(xí)Celery和其它Python包之前坊谁,先來搭建測試環(huán)境。我們開發(fā)的是分布應(yīng)用滑臊,因此需要多機(jī)環(huán)境口芍。

可以使用至少兩臺聯(lián)網(wǎng)機(jī)器的讀者可以跳過這部分。其余讀者雇卷,請繼續(xù)閱讀淑掌。對于后者燎潮,仍然有免費(fèi)或便宜的解決方案。

其一是在主機(jī)上使用虛擬機(jī)VM(例如VirtualBox,https://www.virtualbox.org)马昙。創(chuàng)建幾個VM,安裝Linux箱熬,讓它們在后臺運(yùn)行钧萍。因?yàn)樗鼈儾恍枰獔D像化桌面,所以可以很輕量脱货,使用少量RAM和CPU即可岛都。

另一方法是買幾個便宜的小型計(jì)算機(jī)主板律姨,比如樹莓派(https://www.raspberrypi.org),在它上面安裝Linux臼疫,連上局域網(wǎng)择份。

第三種方案是用云服務(wù)器,比如Amazon EC2烫堤,使用它的虛擬機(jī)荣赶。如果使用這種方法,要確認(rèn)這些包的端口在防火墻是打開的鸽斟。

無論是用哪種方法拔创,緊跟著的問題就是沒有在集群上安裝完整的DNS。最便捷的方法是在所有機(jī)器上編輯/etc/hosts文件富蓄。查看IP地址剩燥,為每臺機(jī)器起一個名字,并將它們添加到/etc/hosts立倍。

我在Mac主機(jī)上使用了兩個虛擬機(jī)灭红,這是我的hosts文件:

$ cat /etc/hosts
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1 localhost
255.255.255.255 broadcasthost
::1             localhost 
fe80::1%lo0 localhost

# Development VMs
192.168.123.150 ubuntu1 ubuntu1.local
192.168.123.151 ubuntu2 ubuntu2.local

相似的,這是我的兩個虛擬機(jī)(運(yùn)行Ubuntu 15.04)上的host文件:

$ cat /etc/hosts
127.0.0.1 localhost
192.168.123.151 ubuntu2
192.168.123.150 ubuntu1

# The following lines are desirable for IPv6 capable hosts
::1     ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

你要確保hosts文件上的IP地址和名字是要使用的機(jī)器口注。本書变擒,會命名這些機(jī)器命名為HOST1、HOST2寝志、HOST3等等娇斑。

搭建好多機(jī)環(huán)境之后,就可以開始寫分布應(yīng)用了澈段。

安裝Celery

目前為止悠菜,我們用的都是Python的標(biāo)準(zhǔn)庫,Celery(http://www.celeryproject.org)是用到的第一個第三方庫败富。Celery是一個分布任務(wù)隊(duì)列悔醋,就是一個以隊(duì)列為基礎(chǔ)的系統(tǒng),和之前的某些例子很像兽叮。它還是分布式的芬骄,意味著工作進(jìn)程和保存結(jié)果的和請求的隊(duì)列,在不同機(jī)器上鹦聪。

首先安裝Celery和它的依賴账阻。在每臺機(jī)器上建立一個虛擬環(huán)境(起名為book),代碼如下(環(huán)境是Unix):

$ pip install virtualenvwrapper

如果這個命令被拒絕泽本,可以加上sudo淘太,用超級用戶權(quán)限來安裝virtualenvwrapper,代碼如下:

$ sudo pip install virtualenvwrapper

sudo命令會向你詢問Unix用戶密碼∑涯粒或者撇贺,可以用下面代碼安裝virtualenvwrapper

$ pip install --user virtualenvwrapper

不管使用哪種方法,完成安裝virtualenvwrapper之后冰抢,都需要配置它松嘶,定義三個環(huán)境變量(用于bash類的shell,假定virtualenvwrapper安裝在/usr/local/bin):

$ export WORKON_HOME=$HOME/venvs
$ export PROJECT_HOME=$HOME/workspace
$ source /usr/local/bin/virtualenvwrapper.sh

你需要修改前置路徑挎扰,來決定虛擬環(huán)境所在的位置($WORKON_HOME)和代碼的根目錄($PROJECT_HOME)翠订。virtualenvwrapper.sh的路徑也可能需要變動。這三行代碼最好添加到相關(guān)的shell啟動文件(例如遵倦,~/.bashrc~/.profile)尽超。

做好了前面的設(shè)置,我們就可以創(chuàng)建要使用的虛擬環(huán)境了骇吭,如下所示:

$ mkvirtualenv book --python=`which python3.5`

這個命令會在$WORKON_HOME之下建立新的虛擬環(huán)境橙弱,名字是book,使用的是Python 3.5燥狰。以后,可以用下面命令啟動這個虛擬環(huán)境:

$ workon book

使用虛擬環(huán)境的好處是斜筐,可以在里面安裝所有需要的包龙致,而不污染系統(tǒng)的Python。以后不再需要這個虛擬環(huán)境時顷链,可以方便的刪除(參考rmvirtualenv命令)目代。

現(xiàn)在就可以安裝Celery了。和以前一樣嗤练,(在每臺機(jī)器上)使用pip

$ pip install celery

該命令可以在激活的虛擬環(huán)境中下載榛了、解壓、安裝所有的依賴煞抬。

快完成了霜大,現(xiàn)在只需安裝配置一個中間代理,Celery用它主持任務(wù)隊(duì)列革答,并向工作進(jìn)程(只有一臺機(jī)器战坤,HOST1)發(fā)送消息。從文檔中可以看到残拐,Celery支持多種中間代理途茫,包括SQLAlchemyhttp://www.sqlalchemy.org),用以本地開發(fā)和測試溪食。這里推薦使用的中間代理是RabbitMQhttps://www.rabbitmq.com)囊卜。

https://www.rabbitmq.com上有安裝指導(dǎo)、文檔和下載。在Mac主機(jī)上栅组,安裝的最簡方法是使用homebrewhttp://brew.sh)雀瓢,如下所示:

$ brew install rabbitmq

對于Windows用戶,最好使用官方的安裝包笑窜。對于Linux致燥,官方也提供了安裝包。

安裝好RabbitMQ之后排截,就可以立即使用了嫌蚤。這里還有一個簡單的配置步驟,因?yàn)樵诶又卸习粒L問隊(duì)列不會創(chuàng)建用戶和密碼脱吱。只要編輯RabbitMQ的配置文件(通常位于/usr/local/etc/rabbitmq/rabbitmq.config),添加下面的條目认罩,允許網(wǎng)絡(luò)中的默認(rèn)guest賬戶:

[
  {rabbit, [{loopback_users, []}]}
].

手動啟動RabbitMQ箱蝠,如下所示(服務(wù)器腳本可能不在$PATH環(huán)境,通常存儲在/usr/local/sbin):

$ sudo rabbitmq-server

sudo會向你詢問用戶密碼垦垂。對于我們的例子宦搬,我們不會進(jìn)一步配置中間代理,使用默認(rèn)訪客賬戶就行劫拗。

注意:感興趣的讀者可以在http://www.rabbitmq.com/admin-guide.html閱讀RabbitMQ的管理指導(dǎo)间校。

到這里,我們就安裝好了所有需要的東西页慷,可以開始使用Celery了憔足。有另外一個依賴,也值得考慮安裝酒繁,盡管不是嚴(yán)格需要的滓彰,尤其是我們只想使用Celery。它是結(jié)果后臺州袒,即Celery的工作進(jìn)程用其存儲計(jì)算的結(jié)果揭绑。它就是Redis(http://redis.io)。安裝Redis是非必須的稳析,但極力推薦安裝洗做,和RabbitMQ類似,Redis運(yùn)行在另一臺機(jī)器上彰居,稱作HOST2诚纸。

Redis的安裝十分簡單,安裝代碼適用于Linux陈惰,Mac OS X和Windows畦徘。我們在Mac上用homebrew安裝毕籽,如下:

$ brew install redis

在其它操作系統(tǒng)上,例如Linux井辆,可以方便的用二進(jìn)制碼安裝(例如對于Ubuntu关筒,sudo apt-get install redis-server)。

啟動Redis的命令如下:

$ sudo redis-server

本章剩下的部分會假定結(jié)果后臺存在杯缺,如果沒有安裝蒸播,會到時指出配置和代碼的不同。同時萍肆,任何在生產(chǎn)環(huán)境中使用Celery的人袍榆,都應(yīng)該考慮使用結(jié)果后臺。

測試安裝

快速嘗試一個例子塘揣,以驗(yàn)證Celery是正確安裝的包雀。我們需要四個終端窗口,三個不同的機(jī)器(命名為HOST1亲铡、HOST2才写、HOST3和HOST4)。在HOST1的窗口啟動RabbitMQ(確保rabbitmq-server路徑正確):

HOST1 $ sudo /usr/local/sbin/rabbitmq-server

在HOST2的窗口奖蔓,啟動Redis(沒安裝的話赞草,跳到下一段):

HOST2 $ sudo /usr/local/bin/redis-server

最后,在HOST3的窗口吆鹤,創(chuàng)建如下Python文件(記得使用workon book激活虛擬環(huán)境)房资,命名為test.py

import celery

app = celery.Celery('test',
                        broker='amqp://HOST1',
                        backend='redis://HOST2')

@app.task
def echo(message):
    return message

這段代碼很簡單。先引入了Celery包檀头,然后定義了一個Celery應(yīng)用(app),名字是test岖沛。這個應(yīng)用使用HOST1的中間代理RabbitMQ和HOST2的Redis數(shù)據(jù)庫的默認(rèn)賬戶和消息隊(duì)列暑始。

要是想用RabbitMQ作為結(jié)果后臺而不用Redis,需要修改前面的代碼婴削,將backend進(jìn)行如下修改:

import celery

app = celery.Celery('test',
                        broker='amqp://HOST1',
                        backend=amqp://HOST1')

@app.task
def echo(message):
    return message

有了應(yīng)用實(shí)例廊镜,就可以用它裝飾遠(yuǎn)程的worker(使用裝飾器@app.task)。在這個例子中唉俗,我們裝飾一個簡單的函數(shù)嗤朴,它可以返回傳遞給它的消息(echo)。

之后虫溜,在終端HOST3雹姊,建立worker池,如下所示:

HOST3 $ celery -A test worker --loglevel=info

記得要在test.py的目錄(或?qū)?code>PYTHONPATH環(huán)境變量指向test.py的目錄)衡楞,好讓Celery可以引入代碼吱雏。

celery命令會默認(rèn)啟動CPU數(shù)目相同的worker進(jìn)程。worker會使用test模塊中的應(yīng)用app(我們可以使用實(shí)例的名字celery -A test.app worker),并使用INFO等級在控制臺顯示日志歧杏。在我的電腦上(有HyperThreading的四核電腦)镰惦,Celery默認(rèn)啟用了八個worker進(jìn)程。

在HOST4終端犬绒,復(fù)制test.py代碼旺入,啟動book虛擬環(huán)境,在test.py目錄打開Python shell凯力,如下所示:

HOST4 $ python3.5
Python 3.5.0 (v3.5.0:374f501f4567, Sep 12 2015, 11:00:19)
[GCC 4.2.1 (Apple Inc. build 5666) (dot 3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.

從復(fù)制的test模塊引入echo函數(shù)茵瘾,如下:

>>> from test import echo

我們現(xiàn)在可以像普通Python函數(shù)一樣調(diào)用echoecho可以直接在本地(即HOST4)運(yùn)行沮协,如下所示:

>>> res = echo('Python rocks!')
>>> print(res)
Python rocks!

為了讓HOST3的worker進(jìn)程運(yùn)行echo()函數(shù)龄捡,我們不能像之前那樣直接調(diào)用。我們需要調(diào)用它的delay方法(裝飾器@app.task注入的)慷暂,見下面的命令:

>>> res = echo.delay('Python rocks!'); print(type(res)); print(res)
<class 'celery.result.AsyncResult'>
1423ec2b-b6c7-4c16-8769-e62e09c1fced
>>> res.ready()
True
>>> res.result
'Python rocks!'

我們看到聘殖,調(diào)用echo.delay('Python rocks!')不會返回字符串。相反行瑞,它在任務(wù)隊(duì)列(運(yùn)行在HOST1的RabbitMQ服務(wù)器)中安排了一個請求以執(zhí)行echo函數(shù)奸腺,并返回Future,準(zhǔn)確的說是AsyncResult(Celery的Future)血久。正如concurrent.futures模塊突照,這個對象是一個異步調(diào)用結(jié)果的占位符。在我們的例子中氧吐,異步調(diào)用的是我們安插在任務(wù)隊(duì)列的echo函數(shù)讹蘑,調(diào)用它的是其它位置的Celery的worker進(jìn)程(我們的例子中是HOST3)。

我們可以查詢AsyncResult對象來確定它們是否預(yù)備好筑舅。如果是的話座慰,我們可以訪問它們的結(jié)果,在我們的例子中是字符串'Python rocks!'翠拣。

切換到啟用worker進(jìn)程的窗口版仔,我們可以看到worker池接收到了echo任務(wù)請求,如下所示:

[2015-11-10 08:30:12,869: INFO/MainProcess] Received task: test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced]
[2015-11-10 08:30:12,886: INFO/MainProcess] Task test.echo[1423ec2b-b6c7-4c16-8769-e62e09c1fced] succeeded in 0.01469148206524551s: 'Python rocks!'

我們現(xiàn)在可以退出Python shell和worker進(jìn)程(在發(fā)起celery worker命令的終端窗口按CTRL+C):Celery安裝成功误墓。

Celery介紹

什么是分布式任務(wù)隊(duì)列蛮粮,Celery是怎么運(yùn)行分布式任務(wù)隊(duì)列的呢?分布式任務(wù)隊(duì)列這種架構(gòu)已經(jīng)存在一定時間了谜慌。這是一種master-worker架構(gòu)然想,有一個中間件層,中間件層使用多個任務(wù)請求隊(duì)列(即任務(wù)隊(duì)列)畦娄,和一個用于存儲結(jié)果的隊(duì)列(即結(jié)果后臺)又沾。

主進(jìn)程(也叫作clientproducer)將任務(wù)請求安插到某個任務(wù)隊(duì)列弊仪,從結(jié)果后臺獲取數(shù)據(jù)。worker進(jìn)程訂閱任務(wù)隊(duì)列以明確任務(wù)是什么杖刷,并把結(jié)果放到結(jié)果后臺励饵。

這是一個簡單靈活的架構(gòu)。主進(jìn)程不需要知道有多少個可用的worker滑燃,也不需要知道worker運(yùn)行在哪臺機(jī)器役听。它只需要知道隊(duì)列在哪,以及如何發(fā)送任務(wù)請求表窘。

worker進(jìn)程也是如此典予。它們不需要知道任務(wù)請求來自何處,也不需要知道結(jié)果用來做什么乐严。它們只需知道從哪里取得任務(wù)瘤袖,存儲在哪里。

這樣的優(yōu)點(diǎn)是worker的數(shù)量昂验、種類捂敌、形態(tài)可以隨意變化,而不對總系統(tǒng)的功能產(chǎn)生影響(但會影響性能和延遲)既琴。分布式任務(wù)隊(duì)列可以方便地進(jìn)行擴(kuò)展(添加新worker)占婉,規(guī)劃優(yōu)先級(給隊(duì)列定義不同的優(yōu)先級,給不同的隊(duì)列安排不同數(shù)量的worker)甫恩。

另一個優(yōu)點(diǎn)是逆济,這個去耦合化的系統(tǒng)在原則上,worker和producer可以用不同語言來寫磺箕。例如奖慌,Python代碼生成的工作由C語言寫的worker進(jìn)程來做,這樣性能是最高的松靡。

Celery使用了第三方升薯、健壯的、實(shí)地驗(yàn)證的系統(tǒng)來做它的隊(duì)列和結(jié)果后臺击困。推薦的中間代理是RabbitMQ,我們之前用過广凸。RabbitMQ是一個非常復(fù)雜的消息代理阅茶,有許多特性,本書不會對它做深入探索谅海。結(jié)果后臺也是如此脸哀,它可以是一個簡單的RabbitMQ隊(duì)列,或者更優(yōu)的扭吁,使用專門的服務(wù)比如Redis撞蜂。

下圖展示了典型的使用RabbitMQ和Redis的Celery應(yīng)用架構(gòu):

每個方框中的進(jìn)程(即RabbitMQ盲镶、Redis、worker和master.py)都可以運(yùn)行在不同的機(jī)器上蝌诡。小型的安裝方案是將RabbitMQ和Redis放在同一個主機(jī)上溉贿,worker幾點(diǎn)可能只有一個或兩個。大型方案會使用更多的機(jī)器浦旱,或者專門的服務(wù)器宇色。

更復(fù)雜的Celery應(yīng)用

我們用Celery做兩個簡單有趣的應(yīng)用。第一個仿照第3章中匯率例子颁湖,第二個是一個分布式排序算法宣蠕。

我們還是使用四臺機(jī)器(HOST1、HOST2甥捺、HOST3抢蚀、HOST4)。和以前一樣镰禾,HOST1運(yùn)行RabbitMQ皿曲,HOST2運(yùn)行Redis,HOST3運(yùn)行Celery的worker羡微,HOST運(yùn)行主代碼谷饿。

先從簡單的例子開始。創(chuàng)建一個Python文件(celery/currency.py)妈倔,代碼如下(如果你沒有使用Redis博投,記得修改backend'amqp://HOST1'):

import celery
import urllib.request


app = celery.Celery('currency',
                    broker='amqp://HOST1',
                    backend='redis://HOST2')


URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

@app.task
def get_rate(pair, url_tmplt=URL):
    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument('pairs', type=str, nargs='+')
    args = parser.parse_args()

    results = [get_rate.delay(pair) for pair in args.pairs]
    for result in results:
        pair, rate = result.get()
        print(pair, rate)

這段代碼和第3章的多線程版本差不多。主要的區(qū)別是盯蝴,因?yàn)槭褂玫氖荂elery毅哗,我們不需要創(chuàng)建隊(duì)列,Celery負(fù)責(zé)建立隊(duì)列捧挺。另外虑绵,除了為每個匯率對建一個線程,我們只需讓worker負(fù)責(zé)從隊(duì)列獲取任務(wù)請求闽烙,執(zhí)行相應(yīng)的函數(shù)請求翅睛,完畢之后返回結(jié)果。

探討調(diào)用的行為是有益的黑竞,比如成功的調(diào)用捕发、由于缺少worker而不工作的調(diào)用、失敗且拋出異常的調(diào)用很魂。我們從成功的調(diào)用開始扎酷。

echo的例子一樣,在各自的終端啟動RabbitMQ和Redis(通過redis-serverrabbitmq-server命令)遏匆。

然后法挨,在worker主機(jī)(HOST3)上谁榜,復(fù)制currency.py文件,切換到它的目錄凡纳,創(chuàng)建worker池(記住窃植,Celery啟動的worker數(shù)目盡可能和CPU核數(shù)一樣多):

HOST3 $ celery -A currency worker --loglevel=info

最后,復(fù)制相同的文件到HOST4惫企,并運(yùn)行如下:

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0644
CHFUSD 0.986
GBPUSD 1.5216
GBPEUR 1.4296
CADUSD 0.751
CADEUR 0.7056

一切工作正常撕瞧,我么得到了五個匯率。如果查看啟動worker池的主機(jī)(HOST3)狞尔,我們會看到類似下圖的日志:

這是日志等級loglevel=info時丛版,Celery worker的日志。每個任務(wù)都被分配了一個獨(dú)立ID(例如GBP兌USD的任務(wù)ID是f8658917-868c-4eb5-b744-6aff997c6dd2)偏序,基本的時間信息也被打印了出來页畦。

如果沒有可用的worker呢?最簡單的方法是停止worker(在終端窗口按CTRL+C)研儒,返回HOST4的currency.py豫缨,如下所示:

OST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR

什么都沒發(fā)生,currency.py一直處于等待worker的狀態(tài)端朵。這樣的狀態(tài)可能也可能不是我們想要的:其一好芭,讓文件等待而不發(fā)生崩潰,是很方便的冲呢;其二舍败,我們可能想在一定時間后,停止等待敬拓×谑恚可以在result.get()timeout參數(shù)。

例如乘凸,修改代碼厕诡,使用result.get(timeout=1),會有如下結(jié)果(還是在沒有worker的情況下):

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
 Traceback (most recent call last):
  File "currency.py", line 29, in <module>
    pair, rate = result.get(timeout=1)
  File "/venvs/book/lib/python3.5/site-packages/celery/result.py", line 169, in get
    no_ack=no_ack,
  File " /venvs/book/lib/python3.5/site-packages/celery/backends/base.py", line 226, in wait_for
    raise TimeoutError('The operation timed out.')
celery.exceptions.TimeoutError: The operation timed out.

當(dāng)然营勤,我們應(yīng)該總是使用超時灵嫌,以捕獲對應(yīng)的異常,作為錯誤處理的策略葛作。

要記住醒第,默認(rèn)下,任務(wù)隊(duì)列是持續(xù)的进鸠,它的日志不會停止(Celery允許用戶定制)。這意味著形病,如果我們現(xiàn)在啟動了一些worker客年,它們就會開始從隊(duì)列獲取懸掛的任務(wù)霞幅,并返回結(jié)果。我們可以用如下命令清空隊(duì)列:

HOST4 $ celery purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation!

(to skip this prompt use the -f option)

Are you sure you want to delete all tasks (yes/NO)? yes
Purged 12 messages from 1 known task queue.

接下來看任務(wù)產(chǎn)生異常的情況量瓜。修改HOST3的currency.py文件司恳,讓get_rate拋出一個異常,如下所示:

@app.task
def get_rate(pair, url_tmplt=URL):
    raise Exception('Booo!')

現(xiàn)在绍傲,重啟HOST3的worker池(即HOST3 $ celery -A currency worker --loglevel=info)扔傅,然后在HOST4啟動主程序:

HOST4 $ python3.5 currency.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
Traceback (most recent call last):
  File "currency.py", line 31, in <module>
    pair, rate = result.get(timeout=1)
  File "/Users/fpierfed/Documents/venvs/book/lib/python3.5/site-packages/celery/result.py", line 175, in get
    raise meta['result']
Exception: Booo!

所有的worker都拋出了異常,異常傳遞到了調(diào)用的代碼烫饼,在首次調(diào)用result.get()返回猎塞。

任務(wù)拋出任何異常,我們都要小心杠纵。遠(yuǎn)程運(yùn)行的代碼失敗的原因可能有很多荠耽,不一定和代碼本身有關(guān),因此需要謹(jǐn)慎應(yīng)對比藻。

Celery可以用如下的方法提供幫助:我們可以用timeout獲取結(jié)果铝量;重新提交失敗的任務(wù)(參考task裝飾器的retry參數(shù))。還可以取消任務(wù)請求(參考任務(wù)的apply_async方法的expires參數(shù)银亲,它比之前我們用過的delay功能強(qiáng)大)慢叨。

有時,任務(wù)圖會很復(fù)雜务蝠。一項(xiàng)任務(wù)的結(jié)果還要傳遞給另一個任務(wù)拍谐。Celery支持復(fù)雜的調(diào)用方式,但是會有性能損耗请梢。

用第二個例子來探討:一個分布式的歸并排序算法赠尾。這是包含兩個文件的長代碼:一個是算法本身(mergesory.py),一個是主代碼(main.py)毅弧。

歸并排序是一個簡單的基于遞歸二分輸入列表的算法气嫁,將兩個部分排序,再將結(jié)果合并够坐。建立一個新的Python文件(celery/mergesort.py)寸宵,代碼如下:

import celery


app = celery.Celery('mergesort',
                        broker='amqp://HOST1',
                        backend='redis://HOST2')

@app.task
def sort(xs):
    lenxs = len(xs)
    if(lenxs <= 1):
        return(xs)

    half_lenxs = lenxs // 2
    left = xs[:half_lenxs]
    right = xs[half_lenxs:]
    return(merge(sort(left), sort(right)))

def merge(left, right):
    nleft = len(left)
    nright = len(right)

    merged = []
    i = 0
    j = 0
    while i < nleft and j < nright:
        if(left[i] < right[j]):
            merged.append(left[i])
            i += 1
        else:
            merged.append(right[j])
            j += 1
    return merged + left[i:] + right[j:]

這段代碼很直白。Celery應(yīng)用命名為app元咙,它使用RabbitMQ作為任務(wù)隊(duì)列梯影,使用Redis作為結(jié)果后臺。然后庶香,定義了sort算法甲棍,它使用了附屬的merge函數(shù)以合并兩個排好序的子列表,成為一個排好序的單列表赶掖。

對于主代碼感猛,另建一個文件(celery/main.py)七扰,它的代碼如下:

#!/usr/bin/env python3.5
import random
import time
from celery import group
from mergesort import sort, merge


# Create a list of 1,000,000 elements in random order.
sequence = list(range(1000000))
random.shuffle(sequence)

t0 = time.time()

# Split the sequence in a number of chunks and process those 
# independently.
n = 4
l = len(sequence) // n
subseqs = [sequence[i * l:(i + 1) * l] for i in range(n - 1)]
subseqs.append(sequence[(n - 1) * l:])

# Ask the Celery workers to sort each sub-sequence.
# Use a group to run the individual independent tasks as a unit of work.
partials = group(sort.s(seq) for seq in subseqs)().get()

# Merge all the individual sorted sub-lists into our final result.
result = partials[0]
for partial in partials[1:]:
    result = merge(result, partial)

dt = time.time() - t0
print('Distributed mergesort took %.02fs' % (dt))

# Do the same thing locally and compare the times.
t0 = time.time()
truth = sort(sequence)
dt = time.time() - t0
print('Local mergesort took %.02fs' % (dt))

# Final sanity checks.
assert result == truth
assert result == sorted(sequence)

我們先生成一個足夠長的無序(random.shuffle)整數(shù)序列(sequence = list(range(1000000)))。然后陪白,分成長度相近的子列表(n=4)颈走。

有了子列表,就可以對它們進(jìn)行并行處理(假設(shè)至少有四個可用的worker)咱士。問題是立由,我們要知道什么時候這些列表排序好了,好進(jìn)行合并序厉。

Celery提供了多種方法讓任務(wù)協(xié)同執(zhí)行锐膜,group是其中之一。它可以在一個虛擬的任務(wù)里脂矫,將并發(fā)的任務(wù)捆綁執(zhí)行枣耀。group的返回值是GroupResult(與類AsyncResult的層級相同)。如果沒有結(jié)果后臺庭再,GroupResult get()方法是必須要有的捞奕。當(dāng)組中所有的任務(wù)完成并返回值,group方法會獲得一個任務(wù)簽名(用參數(shù)調(diào)用任務(wù)s()方法拄轻,比如代碼中的sort.s(seq))的列表颅围。任務(wù)簽名是Celery把任務(wù)當(dāng)做參數(shù),傳遞給其它任務(wù)(但不執(zhí)行)的機(jī)制恨搓。

剩下的代碼是在本地合并排好序的列表院促,每次合并兩個。進(jìn)行完分布式排序,我們再用相同的算法重新排序原始列表。最后未玻,對比歸并排序結(jié)果與內(nèi)建的sorted調(diào)用。

要運(yùn)行這個例子弄抬,需要啟動RabbitMQ和Redis。然后宪郊,在HOST3啟動一些worker掂恕,如下所示:

HOST3 $ celery -A mergesort worker --loglevel=info

記得拷貝mergesort.py文件,并切換到其目錄運(yùn)行(或者弛槐,定義PYTHONPATH指向它所在的位置)懊亡。

之后,在HOST4上運(yùn)行:

HOST4 $ python3.5 main.py
Distributed mergesort took 10.84s
Local mergesort took 26.18s

查看Celery日志乎串,我們看到worker池接收并執(zhí)行了n個任務(wù)店枣,結(jié)果發(fā)回給了caller。

性能和預(yù)想的不一樣。使用多進(jìn)程(使用multiprocessingconcurrent.futures)來運(yùn)行鸯两,與前面相比坏瞄,可以有n倍的性能提升(7秒,使用四個worker)甩卓。

這是因?yàn)镃elery同步耗時長,最好在只有不得不用的時候再使用蕉斜。Celery持續(xù)詢問組中的部分結(jié)果是否準(zhǔn)備好逾柿,好進(jìn)行后續(xù)的工作。這會非常消耗資源宅此。

生產(chǎn)環(huán)境中使用Celery

下面是在生產(chǎn)環(huán)境中使用Celery的tips机错。

第一個建議是在Celery應(yīng)用中使用配置模塊,而不要在worker代碼中進(jìn)行配置父腕。假設(shè)弱匪,配置文件是config.py,可以如下將其傳遞給Celery應(yīng)用:

import celery
app = celery.Celery('mergesort')
app.config_from_object('config')

然后璧亮,與其他可能相關(guān)的配置指令一起萧诫,在config.py中添加:

BROKER_URL = 'amqp://HOST1'
CELERY_RESULT_BACKEND = 'redis://HOST2'

關(guān)于性能的建議是,使用至少兩個隊(duì)列枝嘶,好讓任務(wù)按照執(zhí)行時間劃分優(yōu)先級帘饶。使用多個隊(duì)列,將任務(wù)劃分給合適的隊(duì)列群扶,是分配worker的簡便方法及刻。Celery提供了詳盡的方法將任務(wù)劃分給隊(duì)列。分成兩步:首先竞阐,配置Celery應(yīng)用缴饭,啟動worker,如下所示:

# In config.py
CELERY_ROUTES = {project.task1': {'queue': 'queue1'},
                    'project.task2': {'queue': 'queue2'}}

為了在隊(duì)列中啟動worker骆莹,在不同的機(jī)器中使用下面的代碼:

HOST3 $ celery –A project worker –Q queue1
HOST5 $ celery –A project worker –Q queue2

使用Celery命令行工具的-c標(biāo)志颗搂,可以控制worker池的大小,例如汪疮,啟動一個有八個worker的池:

HOST3 $ celery –A project worker –c 8

說道worker峭火,要注意,Celery默認(rèn)使用多進(jìn)程模塊啟動worker池智嚷。這意味著卖丸,每個worker都是一個完整的Python進(jìn)程。如果某些worker只處理I/O密集型任務(wù)盏道,可以將它們轉(zhuǎn)換成協(xié)程或多線程稍浆,像前面的例子。這樣做的話,可以使用-P標(biāo)志衅枫,如下所示:

$ celery –A project worker –P threads

使用線程和協(xié)程可以節(jié)省資源嫁艇,但不利于CPU制約型任務(wù),如前面的菲波那切數(shù)列的例子弦撩。

談到性能步咪,應(yīng)該盡量避免同步原語(如前面的group()),除非非用不可益楼。當(dāng)同步無法回避時猾漫,好的方法是使用結(jié)果后臺(如Redis)。另外感凤,如果可能的話悯周,要避免傳遞復(fù)雜的對象給遠(yuǎn)程任務(wù),因?yàn)檫@些對象需要序列化和去序列化陪竿,通常很耗時禽翼。

額外的,如果不需要某個任務(wù)的結(jié)果族跛,應(yīng)該確保Celery不去獲取這些結(jié)果闰挡。這是通過裝飾器@task(ignore_result=True)來做的。如果所有的任務(wù)結(jié)果都忽略了庸蔼,就不必定義結(jié)果后臺解总。這可以讓性能大幅提高。

除此之外姐仅,還要指出花枫,如何啟動worker、在哪里運(yùn)行worker掏膏、如何確保它們持續(xù)運(yùn)行是很重要的劳翰。默認(rèn)的方法是使用工具,例如supervisord (http://supervisord.org) 馒疹,來管理worker進(jìn)程佳簸。

Celery帶有一個supervisord的配置案例(在安裝文件的extra/supervisord目錄)。一個監(jiān)督的優(yōu)秀方案是flower(https://github.com/mher/flower)颖变,一個worker的網(wǎng)絡(luò)控制和監(jiān)督工具生均。

最后,RabbitMQ和Redis結(jié)合起來腥刹,是一個很好的中間代理和結(jié)果后臺解決方案马胧,適用于大多數(shù)項(xiàng)目。

Celery的替代方案:Python-RQ

Celery的輕量簡易替代方案之一是 Python-RQ (http://python-rq.org)衔峰。它單單基于Redis作為任務(wù)隊(duì)列和結(jié)果后臺佩脊。沒有復(fù)雜任務(wù)或任務(wù)路由蛙粘,使用它很好。

因?yàn)镃elery和Python-RQ在概念上很像威彰,讓我們立即重寫一個之前的例子出牧。新建一個文件(rq/currency.py),代碼如下:

import urllib.request

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

def get_rate(pair, url_tmplt=URL):
    # raise Exception('Booo!')

    with urllib.request.urlopen(url_tmplt.format(pair)) as res:
        body = res.read()
    return (pair, float(body.strip()))

這就是之前的匯率例子的代碼歇盼。區(qū)別是舔痕,與Celery不同,這段代碼不需要依賴Python-RQ或Redis豹缀。將這段代碼拷貝到worker節(jié)點(diǎn)(HOST3)赵讯。

主程序也同樣簡單。新建一個Python文件(rq/main.py)耿眉,代碼如下:

#!/usr/bin/env python3
import argparse
import redis
import rq
from currency import get_rate

parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()

conn = redis.Redis(host='HOST2')
queue = rq.Queue(connection=conn)

jobs = [queue.enqueue(get_rate, pair) for pair in args.pairs]

for job in jobs:
    while job.result is None:
        pass
    print(*job.result)

我們在這里看到Python-RQ是怎么工作的。我們需要連接Redis服務(wù)器(HOST2)鱼响,然后將新建的連接對象傳遞給Queue類構(gòu)造器。結(jié)果Queue對象用來向其提交任務(wù)請求筐骇。這是通過傳遞函數(shù)對象和其它參數(shù)給queue.enqueue铛纬。

函數(shù)排隊(duì)調(diào)用的結(jié)果是job實(shí)例,它是個異步調(diào)用占位符晶密,之前見過多次懂牧。

因?yàn)镻ython-RQ沒有Celery的阻塞AsyncResult.get()方法僧凤,我們要手動建一個事件循環(huán)躯保,持續(xù)向job實(shí)例查詢吻氧,以確認(rèn)是否它們的result不是None這種方法不推薦在生產(chǎn)環(huán)境中使用鲁森,因?yàn)槌掷m(xù)的查詢會浪費(fèi)資源歌溉,查詢不足會浪費(fèi)時間痛垛,但對于這個簡易例子沒有問題匙头。

為了運(yùn)行代碼,首先要安裝Python-RQ碟婆,用pip進(jìn)行安裝:

$ pip install rq

在所有機(jī)器上都要安裝蝙叛。然后借帘,在HOST2運(yùn)行Redis:

$ sudo redis-server

在HOST3上姻蚓,啟動一些worker。Python-RQ不自動啟動worker池加叁。啟動多個worker的簡易的方法是使用一個文件(start_workers.py):

#!/usr/bin/env python3
import argparse
import subprocess

def terminate(proc, timeout=.5):
    """
    Perform a two-step termination of process `proc`: send a SIGTERM
    and, after `timeout` seconds, send a SIGKILL. This should give 
    `proc` enough time to do any necessary cleanup.
    """
    if proc.poll() is None:
        proc.terminate()
        try:
            proc.wait(timeout)
        except subprocess.TimeoutExpired:
            proc.kill()
    return

parser = argparse.ArgumentParser()
parser.add_argument('N', type=int)
args = parser.parse_args()

workers = []
for _ in range(args.N):
    workers.append(subprocess.Popen(['rqworker',
                                            '-u', 'redis://yippy']))
try:
    running = [w for w in workers if w.poll() is None]
    while running:
        proc = running.pop(0)
        try:
            proc.wait(timeout=1.)
        except subprocess.TimeoutExpired:
            running.append(proc)
except KeyboardInterrupt:
    for w in workers:
        terminate(w)

這個文件會啟動用戶指定書目的Python-RQ worker進(jìn)程(通過使用rqworker腳本,Python-RQ源碼的一部分)豫柬,通過Ctrl+C殺死進(jìn)程烧给。更健壯的方法是使用類似之前提過的supervisord工具指么。

在HOST3上運(yùn)行:

HOST3 $ ./start_workers.py 6

現(xiàn)在可以運(yùn)行代碼伯诬。在HOST4,運(yùn)行main.py

HOST4 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
EURUSD 1.0635
CHFUSD 0.9819
GBPUSD 1.5123
GBPEUR 1.422
CADUSD 0.7484
CADEUR 0.7037

效果與Celery相同桥言。

Celery的替代方案:Pyro

Pyro (http://pythonhosted.org/Pyro4/)的意思是Python Remote Objects,是1998年創(chuàng)建的一個包鸳粉。因此,它十分穩(wěn)定艰山,且功能完備曙搬。

Pyro使用的任務(wù)分布方法與Celery和Python-RQ十分不同纵装,它是在網(wǎng)絡(luò)中將Python對象作為服務(wù)器。然后創(chuàng)建它們的代理對象挽唉,讓調(diào)用代碼可以將其看做本地對象。這個架構(gòu)在90年代末的系統(tǒng)很流行匠童,比如COBRA和Java RMI。

Pyro掩蓋了代碼中的對象是本地還是遠(yuǎn)程的首昔,是讓人詬病的一點(diǎn)。原因是巧骚,遠(yuǎn)程代碼運(yùn)行錯誤的原因很多竣蹦,當(dāng)遠(yuǎn)程代碼隱藏在代理對象后面執(zhí)行,就不容易發(fā)現(xiàn)錯誤纲菌。

另一個詬病的地方是,Pyro在點(diǎn)對點(diǎn)網(wǎng)絡(luò)(不是所有主機(jī)名都可以解析)中椅贱,或者UDP廣播無效的網(wǎng)絡(luò)中夜涕,很難正確運(yùn)行女器。

盡管如此涣澡,大多數(shù)開發(fā)者認(rèn)為Pyro非常簡易入桂,在生產(chǎn)環(huán)境中足夠健壯。

Pyro安裝很簡單蜘腌,它是純Python寫的,依賴只有幾個芯急,使用pip:

$ pip install pyro4

這個命令會安裝Pyro 4.x和Serpent,后者是Pyro用來編碼和解碼Python對象的序列器榕酒。

用Pyro重寫之前的匯率例子,要比用Python-RQ復(fù)雜,它需要另一個軟件:Pyro nameserver肩榕。但是,不需要中間代理和結(jié)果后臺株汉,因?yàn)镻yro對象之間可以直接進(jìn)行通訊筐乳。

Pyro運(yùn)行原理如下。每個遠(yuǎn)程訪問的對象都封裝在處于連接監(jiān)聽的socket服務(wù)器框架中乔妈。每當(dāng)調(diào)用遠(yuǎn)程對象中的方法蝙云,被調(diào)用的方法,連同它的參數(shù)路召,就被序列化并發(fā)送到適當(dāng)?shù)膶ο?服務(wù)器上勃刨。此時贾铝,遠(yuǎn)程對象執(zhí)行被請求的任務(wù)水孩,經(jīng)由相同的連接苍姜,將結(jié)果發(fā)回到(同樣是序列化的)調(diào)用它的代碼。

因?yàn)槊總€遠(yuǎn)程對象自身就可以調(diào)用遠(yuǎn)程對象订咸,這個架構(gòu)可以是相當(dāng)去中心化的父叙。另外,一旦建立通訊乔煞,對象之間就是p2p的纺讲,這與分布式任務(wù)隊(duì)列的輕度耦合架構(gòu)十分不同。另一點(diǎn),每個遠(yuǎn)程對象既可以做master呻澜,也可以做worker恭朗。

接下來重寫匯率的例子棍丐,來看看具體是怎么運(yùn)行的。建立一個Python文件(pyro/worker.py)讨惩,代碼如下:

import urllib.request
import Pyro4

URL = 'http://finance.yahoo.com/d/quotes.csv?s={}=X&f=p'

@Pyro4.expose(instance_mode="percall")
class Worker(object):
    def get_rate(self, pair, url_tmplt=URL):
        with urllib.request.urlopen(url_tmplt.format(pair)) as res:
            body = res.read()
        return (pair, float(body.strip()))

# Create a Pyro daemon which will run our code.
daemon = Pyro4.Daemon()
uri = daemon.register(Worker)
Pyro4.locateNS().register('MyWorker', uri)

# Sit in an infinite loop accepting connections
print('Accepting connections')
try:
    daemon.requestLoop()
except KeyboardInterrupt:
    daemon.shutdown()
print('All done')

worker的代碼和之前的很像,不同點(diǎn)是將get_rate函數(shù)變成了Worker類的一個方法野揪。變動的原因是,Pyro允許導(dǎo)出類的實(shí)例珍语,但不能導(dǎo)出函數(shù)。

剩下的代碼是Pyro特有的透乾。我們需要一個Daemon實(shí)例(它本質(zhì)上是后臺的網(wǎng)絡(luò)服務(wù)器)磕秤,它會獲得類磷瘤,并在網(wǎng)絡(luò)上發(fā)布贸呢,好讓其它的代碼可以調(diào)用方法缚忧。分成兩步來做:首先悟泵,創(chuàng)建一個類Pyro4.Daemon的實(shí)例,然后添加類闪水,通過將其傳遞給register方法糕非。

每個Pyro的Daemon實(shí)例可以隱藏任意數(shù)目的類。內(nèi)部球榆,需要的話朽肥,Daemon對象會創(chuàng)建隱藏類的實(shí)例(也就是說,如果沒有代碼需要這個類持钉,相應(yīng)的Daemon對象就不會將其實(shí)例化)衡招。

每一次網(wǎng)絡(luò)連接,Daemon對象默認(rèn)會實(shí)例化一次注冊的類每强,如果要進(jìn)行并發(fā)任務(wù)始腾,這樣就不可以】罩矗可以通過裝飾注冊的類修改窘茁,@Pyro4.expose(instance_mode=...)

instance_mode支持的值有三個:single脆烟、sessionpercall山林。使用single意味Daemon只為類創(chuàng)建一個實(shí)例,使用它應(yīng)付所有的客戶請求邢羔。也可以通過注冊一個類的實(shí)例(而不是類本身)驼抹。

使用session可以采用默認(rèn)模式:每個client連接都會得到一個新的實(shí)例,client始終都會使用它拜鹤。使用instance_mode="percall"框冀,會為每個遠(yuǎn)程方法調(diào)用建立一個新實(shí)例。

無論創(chuàng)建實(shí)例的模式是什么敏簿,用Daemon對象注冊一個類(或?qū)嵗┒紩祷匾粋€唯一的識別符(即URI)明也,其它代碼可以用識別符連接對象宣虾。我們可以手動傳遞URI,但更方便的方法是在Pyro nameserver中存儲它温数,這樣通過兩步來做绣硝。先找到nameserver,然后給URI注冊一個名字撑刺。在前面的代碼中鹉胖,是通過下面來做的:

Pyro4.locateNS().register('MyWorker', uri)

nameserver的運(yùn)行類似Python的字典,注冊兩個名字相同的URI够傍,第二個URI就會覆蓋第一個甫菠。另外,我們看到冕屯,client代碼使用存儲在nameserver中的名字控制了許多遠(yuǎn)程對象寂诱。這意味著,命名需要特別的留意安聘,尤其是當(dāng)許多worker進(jìn)程提供的功能相同時刹衫。

最后,在前面的代碼中搞挣,我們用daemon.requestLoop()進(jìn)入了一個Daemon事件循環(huán)带迟。Daemon對象會在無限循環(huán)中服務(wù)client的請求。

對于client囱桨,創(chuàng)建一個Python文件(pyro/main.py)仓犬,它的代碼如下:

#!/usr/bin/env python3
import argparse
import time
import Pyro4

parser = argparse.ArgumentParser()
parser.add_argument('pairs', type=str, nargs='+')
args = parser.parse_args()

# Retrieve the rates sequentially.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")

for pair in args.pairs:
    print(worker.get_rate(pair))
print('Sync calls: %.02f seconds' % (time.time() - t0))

# Retrieve the rates concurrently.
t0 = time.time()
worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)

results = [async_worker.get_rate(pair) for pair in args.pairs]
for result in results:
    print(result.value)
print('Async calls: %.02f seconds' % (time.time() - t0))

可以看到,client把相同的工作做了兩次舍肠。這么做的原因是展示Pyro兩種調(diào)用方式:同步和異步搀继。

來看代碼,我們使用argparse包從命令行獲得匯率對翠语。然后叽躯,對于同步的方式,通過名字worker = Pyro4.Proxy("PYRONAME:MyWorker")獲得了一些遠(yuǎn)程worker對象肌括。前綴PYRONAME:告訴Pyro在nameserver中該尋找哪個名字点骑。這樣可以避免手動定位nameserver。

一旦有了worker對象谍夭,可以把它當(dāng)做本地的worker類的實(shí)例黑滴,向其調(diào)用方法。這就是我們在第一個循環(huán)中做的:

for pair in args.pairs:
    print(worker.get_rate(pair))

對每個worker.get_rate(pair)聲明紧索,Proxy對象會用它的遠(yuǎn)程Daemon對象連接袁辈,發(fā)送請求,以運(yùn)行get_rate(pair)珠漂。我們例子中的Daemon對象晚缩,每次會創(chuàng)建一個Worker類的的實(shí)例尾膊,并調(diào)用它的get_rate(pair)方法。結(jié)果序列化之后發(fā)送給client荞彼,然后打印出來冈敛。每個調(diào)用都是同步的,任務(wù)完成后會封鎖卿泽。

在第二個循環(huán)中,做了同樣的事滋觉,但是使用的是異步調(diào)用签夭。我們需要向遠(yuǎn)程的類創(chuàng)建一個Proxy對象,然后椎侠,將它封裝在一個異步handler中第租。這就是下面代碼的功能:

worker = Pyro4.Proxy("PYRONAME:MyWorker")
async_worker = Pyro4.async(worker)

我們現(xiàn)在可以在后臺用async_worker獲取匯率。每次調(diào)用async_worker.get_rate(pair)是非阻塞的我纪,會返回一個Pyro4.futures.FutureResult的實(shí)例慎宾,它和concurrent.futures模塊中Future對象很像。訪問它的value需要等待浅悉,直到相應(yīng)的異步調(diào)用完成趟据。

為了運(yùn)行這個例子,需要三臺機(jī)器的三個窗口:一個是nameserver(HOST1)术健,一個是Worker類和它的Daemon(HOST2)汹碱,第三個(HOST3)是client(即main.py)。

在第一個終端荞估,啟動nameserver咳促,如下:

HOST1 $ pyro4-ns --host 0.0.0.0
Broadcast server running on 0.0.0.0:9091
NS running on 0.0.0.0:9090 (0.0.0.0)
Warning: HMAC key not set. Anyone can connect to this server!
URI = PYRO:Pyro.NameServer@0.0.0.0:9090

簡單來說,nameserver綁定為0.0.0.0勘伺,任何人都可以連接它跪腹。我們沒有設(shè)置認(rèn)證,因此在倒數(shù)第二行彈出了一個警告飞醉。

nameserver運(yùn)行起來了冲茸,在第二個終端啟動worker:

HOST2 $ python3.5 worker.py
Accepting connections

Daemon對象接收連接,現(xiàn)在去第三個終端窗口運(yùn)行client代碼缅帘,如下:

HOST3 $ python3.5 main.py EURUSD CHFUSD GBPUSD GBPEUR CADUSD CADEUR
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Sync calls: 1.55 seconds
('EURUSD', 1.093)
('CHFUSD', 1.0058)
('GBPUSD', 1.5141)
('GBPEUR', 1.3852)
('CADUSD', 0.7493)
('CADEUR', 0.6856)
Async calls: 0.29 seconds

結(jié)果和預(yù)想一致噪裕,IO限制型代碼可以方便的進(jìn)行擴(kuò)展,異步代碼的速度六倍于同步代碼股毫。

這里膳音,還有幾個提醒。第一是铃诬,Pyro的Daemon實(shí)例要能解析主機(jī)的名字祭陷。如果不能解析苍凛,那么它只能接受127.0.0.1的連接,這意味著兵志,不能被遠(yuǎn)程連接(只能本地連接)醇蝴。解決方案是將其與運(yùn)行的主機(jī)進(jìn)行IP綁定,確保它不是環(huán)回地址想罕∮扑ǎ可以用下面的Python代碼選擇一個可用的IP:

from socket import gethostname, gethostbyname_ex

ips = [ip for ip in gethostbyname_ex(gethostname())[-1] 
        if ip != '127.0.0.1']
ip = ips.pop()

另一個要考慮的是:作為Pyro使用“直接連接被命名對象”方法的結(jié)果,很難像Celery和Python-RQ那樣直接啟動一批worker按价。在Pyro中惭适,必須用不同的名字命名worker,然后用名字進(jìn)行連接(通過代理)楼镐。這就是為什么癞志,Pyro的client用一個mini的規(guī)劃器來向可用的worker分配工作。

另一個要注意的是框产,nameserver不會跟蹤worker的斷開凄杯,因此,用名字尋找一個URI對象不代表對應(yīng)的遠(yuǎn)程Daemon對象是真實(shí)運(yùn)行的秉宿。最好總是這樣對待Pyro調(diào)用:遠(yuǎn)程服務(wù)器的調(diào)用可能成功戒突,也可能不成功。

記住這些點(diǎn)描睦,就可以用Pyro搭建復(fù)雜的網(wǎng)絡(luò)和分布式應(yīng)用妖谴。

總結(jié)

這一章很長。我們學(xué)習(xí)了Celery酌摇,他是一個強(qiáng)大的包膝舅,用以構(gòu)建Python分布式應(yīng)用。然后學(xué)習(xí)了Python-RQ窑多,一個輕量且簡易的替代方案仍稀。兩個包都是使用分布任務(wù)隊(duì)列架構(gòu),它是用多個機(jī)器來運(yùn)行相同系統(tǒng)的分布式任務(wù)埂息。

然后介紹了另一個替代方案技潘,Pyro。Pyro的機(jī)理不同千康,它使用的是代理方式和遠(yuǎn)程過程調(diào)用(RPC)享幽。

兩種方案都有各自的優(yōu)點(diǎn),你可以選擇自己喜歡的拾弃。

下一章會學(xué)習(xí)將分布式應(yīng)用部署到云平臺值桩,會很有趣。


序言
第1章 并行和分布式計(jì)算介紹
第2章 異步編程
第3章 Python的并行計(jì)算
第4章 Celery分布式應(yīng)用
第5章 云平臺部署Python
第6章 超級計(jì)算機(jī)群使用Python
第7章 測試和調(diào)試分布式應(yīng)用
第8章 繼續(xù)學(xué)習(xí)


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末豪椿,一起剝皮案震驚了整個濱河市奔坟,隨后出現(xiàn)的幾起案子携栋,更是在濱河造成了極大的恐慌,老刑警劉巖咳秉,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件婉支,死亡現(xiàn)場離奇詭異,居然都是意外死亡澜建,警方通過查閱死者的電腦和手機(jī)向挖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來炕舵,“玉大人何之,你說我怎么就攤上這事∧幌溃” “怎么了帝美?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵碍彭,是天一觀的道長晤硕。 經(jīng)常有香客問我,道長庇忌,這世上最難降的妖魔是什么舞箍? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮皆疹,結(jié)果婚禮上疏橄,老公的妹妹穿的比我還像新娘。我一直安慰自己略就,他們只是感情好捎迫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著表牢,像睡著了一般窄绒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上崔兴,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天彰导,我揣著相機(jī)與錄音,去河邊找鬼敲茄。 笑死位谋,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的堰燎。 我是一名探鬼主播掏父,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼秆剪!你這毒婦竟也來了损同?” 一聲冷哼從身側(cè)響起翩腐,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎膏燃,沒想到半個月后茂卦,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡组哩,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年等龙,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片伶贰。...
    茶點(diǎn)故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡蛛砰,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出黍衙,到底是詐尸還是另有隱情泥畅,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布琅翻,位于F島的核電站位仁,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏方椎。R本人自食惡果不足惜聂抢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望棠众。 院中可真熱鬧琳疏,春花似錦、人聲如沸闸拿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽新荤。三九已至揽趾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間迟隅,已是汗流浹背但骨。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留智袭,地道東北人奔缠。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像吼野,于是被迫代替她去往敵國和親校哎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容