multiprocessing是Python內(nèi)置的進程并行庫,具有十分簡潔良好的并行機制镐捧。但就我個人使用感受而言潜索,更適合于單機并行,而不適合分布式結(jié)點懂酱。
multiprocessing
一竹习、multiprocessing使用
-
Process
Process
是multiprocessing
最基本的進程類,內(nèi)置了進程的啟動玩焰、掛起由驹、關閉等方法芍锚。# 并行的最小單位依舊是函數(shù)昔园,或者可以稱之為 handler from multiprocessing import Process def test_mult(i): print(i+1) return i ** 2 result = [] # 用于保存返回的結(jié)果 for i in range(10): p = Process(target=test_mult, args=(i, )) result.append(p) # 防止進程不見 p.start() # 正式啟動進程 result = [p.join() for p in result] # 掛起進程,直至其完成 print(result) # 進程是無法返回內(nèi)容的 """ Output: 1 2 3 4 5 6 7 8 9 10 [None, None, None, None, None, None, None, None, None, None] """
-
Pool
Pool
類提供了進程池化的能力并炮,可以合理地管理和使用資源默刚。from multiprocessing import Pool def test_pool(args): a, b = args print( a * b) pool = Pool(5) # 指定該進程池最多只有5個進程 # map的第二個參數(shù)必須是可迭代的對象,因此如果需要 # 傳入多個對象時必須也要是一個多參數(shù)的迭代器 result = pool.map(test_pool, [(a, b) for a, b in zip(range(10), range(10))]) print(result) pool.close() # 關掉進程池 """ Output: 0 1 9 4 36 49 16 64 25 81 [None, None, None, None, None, None, None, None, None, None] """
-
Queue
Queue
提供了隊列的數(shù)據(jù)結(jié)構逃魄,可以用于進程之間的數(shù)據(jù)通信荤西、消息通信,同時保證數(shù)據(jù)的讀寫安全。Queue
提供了FIFO(默認)邪锌、FILO等方式勉躺。詳細使用方法將在《事件驅(qū)動》中介紹
-
Manager
Manager
是用于多節(jié)點并行共享變量的類,不過我還是覺得用于單節(jié)點最好用觅丰。- 共享變量
- Dict:共享的字典變量
- Array:共享的數(shù)組變量(同dtype)
- Value:共享的數(shù)值變量(如int饵溅、float)
- ctypes:支持ctypes構造更復雜的變量
# 這里以我項目中出現(xiàn)的復數(shù)矩陣來構造一些變量 from multiprocessing.sharedctypes import Array import ctypes import numpy as np shared_array_base = Array(ctypes.c_double, 3*3*2) shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) shared_array = shared_array.view(np.complex128).reshape(3, 3) print(shared_array) shared_array[2, 2] = 2. + 3.j print('\n', shared_array) """ Output: [[0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 0.+0.j]] [[0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 0.+0.j] [0.+0.j 0.+0.j 2.+3.j]] """ # 使用RawArray來構造共享變量 from multiprocessing import RawArray data = np.random.randn(16, 1000000) X_shape = data.shape X_size = data.size X = RawArray('d', X_size * 2) X_np = np.frombuffer(X, dtype=np.complex128).reshape(X_shape) np.copyto(X_np, data) """ Output: array([[ 1.43361395+0.j, -0.13536996+0.j, -1.05048751+0.j, ..., 0.34899814+0.j, 0.33336308+0.j, -1.41943919+0.j], [-0.65600705+0.j, 0.81952908+0.j, 0.78193087+0.j, ..., 0.73767972+0.j, -0.52045135+0.j, 0.96770416+0.j], [-2.13355565+0.j, 0.17741152+0.j, -1.2255968 +0.j, ..., 0.71831462+0.j, 0.1928877 +0.j, 0.14207214+0.j], ..., [-0.27040098+0.j, 0.21613441+0.j, -0.24113161+0.j, ..., -1.02808119+0.j, 0.07977458+0.j, -0.86394499+0.j], [ 0.27319615+0.j, -0.15105511+0.j, -0.03926541+0.j, ..., -0.20495524+0.j, 0.09575596+0.j, 0.58463843+0.j], [-0.51712435+0.j, -0.63082962+0.j, -0.47347812+0.j, ..., -0.15066354+0.j, -0.87177074+0.j, -0.24865684+0.j]]) """
- 共享變量
二、注意事項
-
進程pickle問題
-
pool
類實例的進程一般需要序列化妇萄,這意味著會將代碼以及參數(shù)使用pickle打包蜕企,因此不能包含lambda
的代碼或者數(shù)據(jù),這也導致了pool
無法使用manager
的共享變量來共享狀態(tài)冠句。 而process
則無須序列化轻掩,因此可以和manager
搭配使用。 - 也正是需要序列化的原因懦底,如果傳輸?shù)膮?shù)過大唇牧,將會使得進程初始化的時間大大增加。
-
-
進程生成方式
- 進程的啟動方式有以下幾種:
- spawn:父進程會啟動一個全新的 python 解釋器進程基茵。 子進程將只繼承那些運行進程對象的
run()
方法所必需的資源奋构。使用此方法啟動進程相比使用fork
或forkserver
要慢上許多。 - fork:父進程使用
os.fork()
來產(chǎn)生 Python 解釋器分叉拱层。子進程在開始時實際上與父進程相同弥臼。父進程的所有資源都由子進程繼承。 - forkserver:程序啟動并選擇
forkserver
啟動方法時根灯,將啟動服務器進程径缅。從那時起,每當需要一個新進程時烙肺,父進程就會連接到服務器并請求它分叉一個新進程纳猪。分叉服務器進程是單線程的,因此使用os.fork()
是安全的桃笙。沒有不必要的資源被繼承氏堤。
- 當使用
spawn
或者forkserver
的啟動方式時,multiprocessing
中的許多類型都必須是可序列化的搏明,這樣子進程才能使用它們鼠锈。 - 例子:
from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') # 設置啟動方式 p = Process(target=foo) p.start()
-
注意進程間可能存在無法共享的變量
- 情況:編寫事件驅(qū)動類時,使用一個
self.processPool = []
來充當進程池星著,但是經(jīng)過多次的實驗购笆,在每次添加事件add_event()
時,僅僅是在當時append
成功了虚循,但往后再訪問self.processPool
同欠,都是空的样傍。原因是:self.processPool
是存在類這個進程下的,而進程池的真是操作是在定義的某個mainprocessor
下的铺遂,他們不是同一進程衫哥,并不能互相更改。
- 情況:編寫事件驅(qū)動類時,使用一個
-
Process
類無法進行pickle襟锐,其用于父子進程通信的AuthenticationString
是不允許pickle的炕檩。- 情況:由于
list
的append
無法統(tǒng)一,所以打算使用Queue
或者Manager.dict()
來充當進程池捌斧,結(jié)果Process
無法被pickle(分配到不同進程時需要pickle)笛质。解決方法可以是: 將while循環(huán)的條件設為隊列,判斷隊列空或者為True
(必須先判斷是否為空)時便循環(huán)等待捞蚂;而一旦接收到False
妇押,則在run()
的進程內(nèi)去join()
進程(此進程內(nèi)可見),最后return結(jié)束run()
姓迅,則確保此進程也關閉了敲霍。
- 情況:由于
-
The “freeze_support()” line can be omitted if the program is not going to be frozen錯誤
- 情況:該錯誤出現(xiàn)于Windows系統(tǒng)下,腳本的主函數(shù)沒有使用
if __name__ == '__main__'
前提下丁存,使用了多進程肩杈。原因極可能是:multiprocessing
默認創(chuàng)建進程的方式是spwan
,也就是fork
解寝;而windows下創(chuàng)建進程并不是使用fork
扩然,所以解決方法是要么設定創(chuàng)建時用fork
,要么就按照上面的寫法聋伦。
- 情況:該錯誤出現(xiàn)于Windows系統(tǒng)下,腳本的主函數(shù)沒有使用
-
RuntimeError: Queue objects should only be shared between processes through inheritance
- 情況:使用
multiprocessing
進行多進程計算時夫偶,打算不同函數(shù)中使用Queue
來進行共享的通信。但實際上觉增,Queue
基于Pipe實現(xiàn)兵拢,而Pipe對象的共享需要通過繼承才能使用。故Queue
一般只能用在 父進程創(chuàng)建隊列逾礁,父子進程之間共享狀態(tài) 的情況说铃。可以使用multiprocessing.Manager.Queue
代替嘹履。
- 情況:使用
-
隊列
queue
有線程的隊列與進程隊列之分腻扇。- 線程一般使用
from queue import Queue
, 進程可以使用from multiprocessing import Queue
- 線程一般使用
-
父進程與子進程之間不可以使用
Queue
來進行通信- 情況:做進程并行植捎,希望父進程分發(fā)任務衙解,子進程完成后返回給父進程結(jié)果阳柔。但是
Queue
盡管在子進程可以成功進行put
操作焰枢,父進程卻無法接收而導致隊列仍為空。 - 解決方法:使用
form multiprocessing import Manager
和Manager().Queue()
來進行進程通信,Queue
對象是隸屬于manager
對象進程的济锄,且并無父子關系暑椰,可以和任務進程之間通信,解決了問題荐绝。
- 情況:做進程并行植捎,希望父進程分發(fā)任務衙解,子進程完成后返回給父進程結(jié)果阳柔。但是