? ? ? ?在《多線程&多進程(上)》中,記錄了python中的threading模塊常用的類的使用方法办悟,對比了Lock/RLock和condition版本的生產者與消費者的問題,但是python中并不支持真正的支持多線程除师,不能充分的利用多核cpu的資源凝化,大部分情況下使用的是多進程。在下半部分中官辽,將記錄多進程的使用蛹磺。
多進程
? ? ? ?官方文檔
? ? ? ? multiprocessing模塊常用的類和方法(腦圖)
本節(jié)將介紹:
- Process(用于創(chuàng)建進程模塊)
- Pool(用于創(chuàng)建管理進程池)
- Queue(用于進程通信,資源共享)
- Lock
- Pipe(用于管道通信)
- Semaphore
1. Process模塊
-
基本使用
? ? ? ?在multiprocessing中同仆,每一個進程都用一個Process類來表示萤捆。其用法和Thread對象的用法很相似,也有start(),run(),join()等方法。Process類適合簡單的進程創(chuàng)建俗或,如需資源共享可以結合multiprocessing.Queue使用市怎;如果想要控制進程數量,則建議使用進程池Pool類辛慰。
首先看下它的API
Process([group [, target [, name [, args [, kwargs]]]]])
- target:調用對象区匠,你可以傳入方法的名字。
- args:被調用對象的位置參數元組帅腌,比如target是函數a驰弄,他有兩個參數m,n速客,那么args就傳入(m, n)即可戚篙。
- kwargs:調用對象的字典。
- name:別名溺职,相當于給這個進程取一個名字岔擂。
- group:線程組,目前還沒有實現(xiàn)辅愿,庫引用中提示必須是None智亮。
其包含以下實例方法,和Thead類似:
方法 | 說明 |
---|---|
is_alive() | 返回進程是否在運行 |
join([timeout]) | 阻塞當前上下文環(huán)境的進程程点待,直到調用此方法的進程終止或到達指定的timeout(可選參數) |
start() | 進程準備就緒阔蛉,等待CPU調度 |
run() | strat()調用run方法,如果實例進程時未制定傳入target癞埠,這star執(zhí)行t默認run()方法状原。 |
terminate() | 不管任務是否完成,立即停止工作進程苗踪。 |
Process類中創(chuàng)建多進程有兩種方法:
(1)用法與threading類似
from multiprocessing import Process #導入Process模塊
import os
def test(name):
'''
函數輸出當前進程ID颠区,以及其父進程ID。
此代碼應在Linux下運行通铲,因為windows下os模塊不支持getppid()
'''
print("Process ID: %s" % os.getpid())
print("Parent Process ID: %s" % os.getppid())
if __name__ == "__main__":
'''
windows下毕莱,創(chuàng)建進程的代碼一下要放在main函數里面
'''
proc = Process(target=test, args=('nmask',))
proc.start()
proc.join()
(2)繼承Process類,修run函數代碼
from multiprocessing import Process
import time
class MyProcess(Process):
'''
繼承Process類颅夺,類似threading.Thread
'''
def __init__(self, arg):
super(MyProcess, self).__init__()
#multiprocessing.Process.__init__(self)
self.arg = arg
def run(self):
'''
重構run函數
'''
print('nMask', self.arg)
time.sleep(1)
if __name__ == '__main__':
for i in range(10):
p = MyProcess(i)
p.start()
for i in range(10):
p.join()
Process 的屬性:
- authkey
- daemon:和線程的setDeamon功能一樣(將父進程設置為守護進程朋截,當父進程結束時,子進程也結束)吧黄。
- exitcode(進程在運行時為None部服、如果為–N,表示被信號N結束)拗慨。
- name:進程名字廓八。
- pid:進程號奉芦。
例如使用daemon屬性:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
p.daemon = True
p.start()
#p.join()
print('Main process Ended!')
輸出結果為:
Main process Ended!
? ? ? ?因為主進程只輸出一句話就結束了,并且我們設置了p.daemon=True剧蹂,所以此時并不會等待子進程結束声功,類似多線程里說介紹的,我們同樣可以使用join()方法国夜,就可等待子進程完成再結束主進程了(將上面代碼中p.join() 取消注釋即可)减噪。
2.Pool模塊
? ? ? ?Pool模塊是用來創(chuàng)建管理進程池的,當子進程非常多且需要控制子進程數量時可以使用此模塊车吹〕镌# Multiprocessing.Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時窄驹,如果池還沒有滿朝卒,那么就會創(chuàng)建一個新的進程用來執(zhí)行該請求;但如果池中的進程數已經達到規(guī)定最大值乐埠,那么該請求就會等待抗斤,直到池中有進程結束,才會創(chuàng)建新的進程來執(zhí)行它丈咐。在共享資源時瑞眼,只能使用Multiprocessing.Manager類,而不能使用Queue或者Array棵逊。
? ? ? ?我們看看它的API:
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes :使用的工作進程的數量伤疙,如果processes是None那么使用 os.cpu_count()返回的數量。
- initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
- maxtasksperchild:工作進程退出之前可以完成的任務數愤钾,完成后用一個新的工作進程來替代原進程,來讓閑置的資源被釋放锯蛀。maxtasksperchild默認是None,意味著只要Pool存在工作進程就會一直存活次慢。
- context: 用在制定工作進程啟動時的上下文旁涤,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創(chuàng)建一個池,兩種方法都適當的設置了context迫像。
其包含如下實例方法:
方法 | 說明 |
---|---|
apply_async(func[, args[, kwds[, callback]]]) | 非阻塞 |
apply(func[, args[, kwds]]) | 阻塞 |
close() | 關閉pool拭抬,使其不在接受新的任務。 |
terminate() | 關閉pool侵蒙,結束工作進程,不在處理未完成的任務傅蹂。 |
join() | 主進程阻塞纷闺,等待子進程的退出算凿, join方法要在close或terminate之后使用。 |
Pool使用方法:
(1)Pool+map函數
from multiprocessing import Pool
def test(i):
print(i)
if __name__=="__main__":
lists=[1,2,3,4,5]
pool=Pool(processes=2) #定義最大的進程數
pool.map(test,lists)#map的第二個參數必須是一個可迭代變量--如list犁功。
pool.close()
pool.join()
此寫法缺點在于只能通過map向函數傳遞一個參數氓轰。
(2)異步進程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__=="__main__":
pool = Pool(processes=10)
for i in range(500):
pool.apply_async(test, args=(i,)) #維持執(zhí)行的進程總數為10,當一個進程執(zhí)行完后啟動一個新進程.
print("test")
pool.close()
pool.join()
輸出:
For循環(huán)中執(zhí)行步驟:
- 循環(huán)遍歷浸卦,將500個子進程添加到進程池(相對父進程會阻塞)
- 每次執(zhí)行10個子進程署鸡,等一個子進程執(zhí)行完后,立馬啟動新的子進程限嫌。(相對父進程不阻塞)
? ? ? ?apply_async為異步進程池寫法靴庆。
? ? ? ?異步指的是啟動子進程的過程,與父進程本身的執(zhí)行(print)是異步的怒医,而For循環(huán)中往進程池添加子進程的過程炉抒,與父進程本身的執(zhí)行卻是同步的。
注意:調用join之前稚叹,先調用close或者terminate方法焰薄,否則會出錯。執(zhí)行完close后不會有新的進程加入到pool,join函數等待所有子進程結束扒袖。
(3)同步進程池(阻塞)
from multiprocessing import Pool
import time
def test(i):
print(i)
if __name__=="__main__":
pool = Pool(processes=10)
for i in range(500):
pool.apply(test, args=(i,)) #維持執(zhí)行的進程總數為10塞茅,當一個進程執(zhí)行完后啟動一個新進程.
print("test")
pool.close()
pool.join()
輸出:
實際測試發(fā)現(xiàn),for循環(huán)內部執(zhí)行步驟:
- 遍歷500個可迭代對象季率,往進程池放一個子進程
- 執(zhí)行這個子進程野瘦,等子進程執(zhí)行完畢,再往進程池放一個子進程蚀同,再執(zhí)行缅刽。(同時只執(zhí)行一個子進程)
- for循環(huán)執(zhí)行完畢,再執(zhí)行print函數蠢络。
(并未實現(xiàn)多進程并行)
3. queue線程安全隊列
? ? ? ?該用法和線程中的用法一樣衰猛。
4. Lock模塊
? ? ? ?當多進程需要訪問共享資源的時候,類似多線程刹孔,它同樣有一個Lock類啡省,可以避免訪問的沖突。
from multiprocessing import Process, Lock
def l(lock, num):
with lock:
# lock.acquire()
print("Hello Num: %s" % (num))
# lock.release()
if __name__ == '__main__':
lock = Lock() #這個一定要定義為全局
for num in range(20):
Process(target=l, args=(lock, num)).start()
#這個類似多線程中的threading髓霞,但是進程太多了卦睹,控制不了。
? ? ? ?父進程的全局變量能不能被子進程共享呢方库?答案是否定的结序,如果想要共享資源,可以使用manage類纵潦,或者queue模塊徐鹤。但這里我就有個疑問了垃环,不是說多線程之中的內存資源是不共享的嗎,那么它的Lock有什么用呢返敬?
其使用場景可以參考這篇文章:Python的多進程鎖的使用
? ? ? ?多進程中一般是不推薦使用資源共享遂庄,如果要使用,可以參考:多進程共享資源
5. Pipe 管道
? ? ? ?顧名思義劲赠,一端發(fā)一端收涛目。Pipe可以是單向(half-duplex),也可以是雙向(duplex)凛澎。我們通過mutiprocessing.Pipe(duplex=False)創(chuàng)建單向管道 (默認為雙向)霹肝。一個進程從PIPE一端輸入對象,然后被PIPE另一端的進程接收预厌,單向管道只允許管道一端的進程輸入阿迈,而雙向管道則允許從兩端輸入。
6. Semaphore轧叽,信號量
? ? ? ?其是在進程同步過程中一個比較重要的角色苗沧。可以控制臨界資源的數量炭晒,保證各個進程之間的互斥和同步待逞。
? ? ? ?對于上述內容的詳細解釋,可以參考:https://cuiqingcai.com/3335.html