前言
并行計算是使用并行計算機來減少單個計算問題所需要的時間咏雌,我們可以通過利用編程語言顯式的說明計算中的不同部分如何再不同的處理器上同時執(zhí)行來設(shè)計我們的并行程序盗尸,最終達到大幅度提升程序效率的目的。
眾所周知焰檩,Python中的GIL限制了Python多線程并行對多核CPU的利用副签,但是我們?nèi)匀豢梢酝ㄟ^各種其他的方式來讓Python真正利用多核資源, 例如通過C/C++擴展來實現(xiàn)多線程/多進程, 以及直接利用Python的多進程模塊multiprocessing來進行多進程編程。
本文主要嘗試僅僅通過python內(nèi)置的multiprocessing模塊對自己的動力學計算程序來進行優(yōu)化和效率提升瑞佩,其中:
實現(xiàn)了單機利用多核資源來實現(xiàn)并行并進行加速對比
使用manager模塊實現(xiàn)了簡單的多機的分布式計算
本文并不是對Python的multiprocessing模塊的接口進行翻譯介紹,需要熟悉multiprocessing的童鞋可以參考官方文檔https://docs.python.org/2/library/multiprocessing.html坯台。
正文
最近想用自己的微觀動力學程序進行一系列的求解并將結(jié)果繪制成二維Map圖進行可視化炬丸,這樣就需要對二維圖上的多個點進行計算并將結(jié)果收集起來并進行繪制,由于每個點都需要進行一次ODE積分以及牛頓法求解方程組蜒蕾,因此要串行地繪制整張圖可能會遇到極低的效率問題尤其是對參數(shù)進行測試的時候稠炬,每畫一張圖都需要等很久的時間。其中繪制的二維圖中每個點都是獨立計算的咪啡,于是很自然而然的想到了進行并行化處理首启。
串行的原始版本
由于腳本比較長,而且實現(xiàn)均為自己的程序撤摸,腳本的大致結(jié)構(gòu)如下, 本質(zhì)是一個二重循環(huán)毅桃,循環(huán)的變量分別為反應(yīng)物氣體(O2 和 CO)的分壓的值:
import time
import numpy as np
# 省略若干...
pCOs = np.linspace(1e-5, 0.5, 10)
pO2s = np.linspace(1e-5, 0.5, 10)
if "__main__" == __name__:
????try:
????????start = time.time()
????????for i, pO2 in enumerate(pO2s):
????????????# ...
????????????for j, pCO in enumerate(pCOs):
????????????????# 針對當前的分壓值 pCO, pO2進行動力學求解
????????????????# 具體代碼略...
????????end = time.time()
????????t = end - start
????finally:
????????# 收集計算的結(jié)果并進行處理繪圖
整體過程就這么簡單,我需要做的就是使用multiprocessing的接口來對這個二重循環(huán)進行并行化准夷。
使用單核串行繪制100個點所需要的時間如下, 總共花了240.76秒:
?
python學習交流群:923414804钥飞,群內(nèi)每天分享干貨,包括最新的企業(yè)級案例學習資料和零基礎(chǔ)入門教程衫嵌,歡迎小伙伴入群學習读宙。
二維map圖繪制的效果如下:
?
進行多進程并行處理
multiprocessing模塊
multiprocessing模塊提供了類似threading模塊的接口,并對進程的各種操作進行了良好的封裝渐扮,提供了各種進程間通信的接口例如Pipe,Queue等等论悴,可以幫助我們實現(xiàn)進程間的通信,同步等操作墓律。
使用Process類來動態(tài)創(chuàng)建進程實現(xiàn)并行
multiprocessing模塊提供了Process能讓我們通過創(chuàng)建進程對象并執(zhí)行該進程對象的start方法來創(chuàng)建一個真正的進程來執(zhí)行任務(wù)膀估,該接口類似threading模塊中的線程類Thread.
但是當被操作對象數(shù)目不大的時候可以使用Process動態(tài)生成多個進程,但是如果需要的進程數(shù)一旦很多的時候耻讽,手動限制進程的數(shù)量以及處理不同進程返回值會變得異常的繁瑣察纯,因此這個時候我們需要使用進程池來簡化操作。
使用進程池來管理進程
multiprocessing模塊提供了一個進程池Pool類针肥,負責創(chuàng)建進程池對象饼记,并提供了一些方法來講運算任務(wù)offload到不同的子進程中執(zhí)行,并很方便的獲取返回值慰枕。例如我們現(xiàn)在要進行的循環(huán)并行便很容易的將其實現(xiàn)具则。
對于這里的單指令多數(shù)據(jù)流的并行,我們可以直接使用Pool.map()來將函數(shù)映射到參數(shù)列表中具帮。Pool.map其實是map函數(shù)的并行版本博肋,此函數(shù)將會阻塞直到所有進程全部結(jié)束低斋,而且此函數(shù)返回的結(jié)果順序仍然不變。
首先匪凡,我先把針對每對分壓數(shù)據(jù)的處理過程封裝成一個函數(shù)膊畴,這樣可以將函數(shù)對象傳遞給子進程執(zhí)行。
import time
from multiprocessing import Pool
import numpy as np
# 省略若干...
pCOs = np.linspace(1e-5, 0.5, 10)
pO2s = np.linspace(1e-5, 0.5, 10)
def task(pO2):
????'''接受一個O2分壓病游,根據(jù)當前的CO分壓進行動力學求解'''
????# 代碼細節(jié)省略...
if "__main__" == __name__:
????try:
????????start = time.time()
????????pool = Pool()????????????????# 創(chuàng)建進程池對象唇跨,進程數(shù)與multiprocessing.cpu_count()相同
????????tofs = pool.map(task, pCOs)??# 并行執(zhí)行函數(shù)
????????end = time.time()
????????t = end - start
????finally:
????????# 收集計算的結(jié)果并進行處理繪圖
使用兩個核心進行計算,計算時間從240.76s降到了148.61秒, 加速比為1.62
?
對不同核心的加速效果進行測試
為了查看使用不同核心數(shù)對程序效率的改善衬衬,我對不同的核心數(shù)和加速比進行了測試繪圖买猖,效果如下:
運行核心數(shù)與程序運行時間:
?
運行核心數(shù)與加速比:
?
可見,由于我外層循環(huán)只循環(huán)了10次因此使用的核心數(shù)超過10以后核心數(shù)的增加并不能對程序進行加速佣耐,也就是多余的核心都浪費掉了政勃。
使用manager實現(xiàn)簡單的分布式計算
前面使用了multiprocessing包提供的接口我們使用了再一臺機器上進行多核心計算的并行處理,但是multiprocessing的用處還有更多兼砖,通過multiprocessing.managers模塊奸远,我們可以實現(xiàn)簡單的多機分布式并行計算,將計算任務(wù)分布到不同的計算機中運行讽挟。
Managers提供了另外的多進程通信工具懒叛,他提供了在多臺計算機之間共享數(shù)據(jù)的接口和數(shù)據(jù)對象,這些數(shù)據(jù)對象全部都是通過代理類實現(xiàn)的耽梅,比如ListProxy和DictProxy等等薛窥,他們都實現(xiàn)了與原生list和dict相同的接口,但是他們可以通過網(wǎng)絡(luò)在不同計算機中的進程中進行共享眼姐。
關(guān)于managers模塊的接口的詳細使用可以參考官方文檔:https://docs.python.org/2/library/multiprocessing.html#managers
好了現(xiàn)在我們開始嘗試將繪圖程序改造成可以在多臺計算機中分布式并行的程序诅迷。改造的主要思想是:
使用一臺計算機作為服務(wù)端(server),此臺計算機通過一個Manager對象來管理共享對象众旗,任務(wù)分配以及結(jié)果的接收罢杉,并再收集結(jié)果以后進行后處理(繪制二維map圖)。
其他多臺計算機可以作為客戶端來接收server的數(shù)據(jù)進行計算贡歧,并將結(jié)果傳到共享數(shù)據(jù)中滩租,讓server可以收集。同時再client端可以同時進行上文所實現(xiàn)的多進程并行來充分利用計算機的多核優(yōu)勢利朵。
大致可總結(jié)為下圖:
?
服務(wù)進程
首先服務(wù)端需要一個manager對象來管理共享對象
def get_manager():
????'''創(chuàng)建服務(wù)端manager對象.
????'''
????# 自定義manager類
????class JobManager(BaseManager):
????????pass
????# 創(chuàng)建任務(wù)隊列律想,并將此數(shù)據(jù)對象共享在網(wǎng)絡(luò)中
????jobid_queue = Queue()
????JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)
????# 創(chuàng)建列表代理類,并將其共享再網(wǎng)絡(luò)中
????tofs = [None]*N
????JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)
????# 將分壓參數(shù)共享到網(wǎng)絡(luò)中
????JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy)
????JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)
????# 創(chuàng)建manager對象并返回
????manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)
????return manager
BaseManager.register是一個類方法绍弟,它可以將某種類型或者可調(diào)用的對象綁定到manager對象并共享到網(wǎng)絡(luò)中技即,使得其他在網(wǎng)絡(luò)中的計算機能夠獲取相應(yīng)的對象。
例如樟遣,
1JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)
我就將一個返回任務(wù)隊列的函數(shù)對象同manager對象綁定并共享到網(wǎng)絡(luò)中而叼,這樣在網(wǎng)絡(luò)中的進程就可以通過自己的manager對象的get_jobid_queue方法得到相同的隊列郭脂,這樣便實現(xiàn)了數(shù)據(jù)的共享.
2. 創(chuàng)建manager對象的時候需要兩個參數(shù),
address, 便是manager所在的ip以及用于監(jiān)聽與服務(wù)端連接的端口號澈歉,例如我如果是在內(nèi)網(wǎng)中的192.168.0.1地址的5000端口進行監(jiān)聽,那么此參數(shù)可以是('192.169.0.1, 5000)`
authkey, 顧名思義屿衅,就是一個認證碼埃难,用于驗證客戶端時候可以連接到服務(wù)端,此參數(shù)必須是一個字符串對象.
進行任務(wù)分配
上面我們將一個任務(wù)隊列綁定到了manager對象中涤久,現(xiàn)在我需要將隊列進行填充涡尘,這樣才能將任務(wù)發(fā)放到不同的客戶端來進行并行執(zhí)行。
def fill_jobid_queue(manager, nclient):
????indices = range(N)
????interval = N/nclient
????jobid_queue = manager.get_jobid_queue()
????start = 0
????for i in range(nclient):
????????jobid_queue.put(indices[start: start+interval])
????????start += interval
????if N % nclient > 0:
????????jobid_queue.put(indices[start:])
這里所謂的任務(wù)其實就是相應(yīng)參數(shù)在list中的index值响迂,這樣不同計算機中得到的結(jié)果可以按照相應(yīng)的index將結(jié)果填入到結(jié)果列表中考抄,這樣服務(wù)端就能在共享的網(wǎng)絡(luò)中收集各個計算機計算的結(jié)果。
啟動服務(wù)端進行監(jiān)聽
def run_server():
????# 獲取manager
????manager = get_manager()
????print "Start manager at {}:{}...".format(ADDR, PORT)
????# 創(chuàng)建一個子進程來啟動manager
????manager.start()
????# 填充任務(wù)隊列
????fill_jobid_queue(manager, NNODE)
????shared_job_queue = manager.get_jobid_queue()
????shared_tofs_list = manager.get_tofs_list()
????queue_size = shared_job_queue.qsize()
????# 循環(huán)進行監(jiān)聽蔗彤,直到結(jié)果列表被填滿
????while None in shared_tofs_list:
????????if shared_job_queue.qsize() < queue_size:
????????????queue_size = shared_job_queue.qsize()
????????????print "Job picked..."
????return manager
任務(wù)進程
服務(wù)進程負責進行簡單的任務(wù)分配和調(diào)度川梅,任務(wù)進程則只負責獲取任務(wù)并進行計算處理。
在任務(wù)進程(客戶端)中基本代碼與我們上面單機中的多核運行的腳本基本相同(因為都是同一個函數(shù)處理不同的數(shù)據(jù))然遏,但是我們也需要為客戶端創(chuàng)建一個manager來進行任務(wù)的獲取和返回贫途。
def get_manager():
????class WorkManager(BaseManager):
????????pass
????# 由于只是從共享網(wǎng)絡(luò)中獲取,因此只需要注冊名字即可
????WorkManager.register('get_jobid_queue')
????WorkManager.register('get_tofs_list')
????WorkManager.register('get_pCOs')
????WorkManager.register('get_pO2s')
????# 這里的地址和驗證碼要與服務(wù)端相同才可以進行數(shù)據(jù)共享
????manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)
????return manager
在客戶端我們?nèi)匀豢梢远噙M程利用多核資源來加速計算待侵。
if "__main__" == __name__:
????manager = get_manager()
????print "work manager connect to {}:{}...".format(ADDR, PORT)
????# 將客戶端本地的manager連接到相應(yīng)的服務(wù)端manager
????manager.connect()
????# 獲取共享的結(jié)果收集列表
????shared_tofs_list = manager.get_tofs_list()
????# 獲取共享的任務(wù)隊列
????shared_jobid_queue = manager.get_jobid_queue()
????# 從服務(wù)端獲取計算參數(shù)
????pCOs = manager.get_pCOs()
????shared_pO2s = manager.get_pO2s()
????# 創(chuàng)建進程池在本地計算機進行多核并行
????pool = Pool()
????while 1:
????????try:
????????????indices = shared_jobid_queue.get_nowait()
????????????pO2s = [shared_pO2s[i] for i in indices]
????????????print "Run {}".format(str(pO2s))
????????????tofs_2d = pool.map(task, pO2s)
????????????# Update shared tofs list.
????????????for idx, tofs_1d in zip(indices, tofs_2d):
????????????????shared_tofs_list[idx] = tofs_1d
????????# 直到將任務(wù)隊列中的任務(wù)全部取完丢早,結(jié)束任務(wù)進程
????????except Queue.Empty:
????????????break
下面我將在3臺在同一局域網(wǎng)中的電腦來進行簡單的分布式計算測試,
其中一臺是實驗室器群中的管理節(jié)點, 內(nèi)網(wǎng)ip為10.10.10.245
另一臺為集群中的一個節(jié)點, 共有12個核心
最后一臺為自己的本本秧倾,4個核心
?先在服務(wù)端運行服務(wù)腳本進行任務(wù)分配和監(jiān)聽:
1python server.py
2. 在兩個客戶端運行任務(wù)腳本來獲取任務(wù)隊列中的任務(wù)并執(zhí)行
1python worker.py
當任務(wù)隊列為空且任務(wù)完成時怨酝,任務(wù)進程終止; 當結(jié)果列表中的結(jié)果收集完畢時,服務(wù)進程也會終止那先。
執(zhí)行過程如圖:
?
執(zhí)行結(jié)果如下圖:
?
上面的panel為服務(wù)端監(jiān)聽农猬,左下為自己的筆記本運行結(jié)果,右下panel為集群中的其中一個節(jié)點胃榕。
可見運行時間為56.86s盛险,無奈,是我的本子脫了后腿(-_-!)
總結(jié)
本文通過python內(nèi)置模塊multiprocessing實現(xiàn)了單機內(nèi)多核并行以及簡單的多臺計算機的分布式并行計算勋又,multiprocessing為我們提供了封裝良好并且友好的接口來使我們的Python程序更方面利用多核資源加速自己的計算程序苦掘,希望能對使用python實現(xiàn)并行話的童鞋有所幫助。
參考