本文是我在學(xué)習(xí) Python 多進(jìn)程過程中的一些總結(jié),主要介紹多進(jìn)程的實(shí)現(xiàn)方式以及進(jìn)程間的通信豌鹤,大體有如下這么幾點(diǎn)知識(shí):
-
fork
創(chuàng)建進(jìn)程 -
Process
創(chuàng)建進(jìn)程 - 進(jìn)程池
- 進(jìn)程間通信
- 進(jìn)程池的進(jìn)程間通信
fork 創(chuàng)建進(jìn)程
針對(duì)于類 UNIX 操作系統(tǒng)苟蹈,os
模塊下有一個(gè) fork
方法根资,可以創(chuàng)建多進(jìn)程胎撇。使用方法如下:
pid = os.fork()
調(diào)用 fork
方法會(huì)得到一個(gè)返回值狸捕,該返回值是一個(gè)標(biāo)志值:如果是主進(jìn)程热幔,那么該返回值就是主進(jìn)程的 pid
乐设,如果是創(chuàng)建出來(lái)的子進(jìn)程,該返回值就是0绎巨,我們可以通過這個(gè)返回值來(lái)區(qū)分主進(jìn)程和子進(jìn)程:
# 導(dǎo)入 fork 方法
from os import fork
# 使用 fork 創(chuàng)建進(jìn)程
pid = fork()
# 根據(jù) pid 判斷是主進(jìn)程還是子進(jìn)程
if not pid:
print("我是子進(jìn)程近尚,pid 是 %s"%pid)
else:
print("我是主進(jìn)程,pid 是 %s"%pid)
執(zhí)行該程序:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess01.py
我是主進(jìn)程场勤,pid 是 9667
charley@charley-ubuntu:~/桌面/Py$ 我是子進(jìn)程戈锻,pid 是 0
通過 pid 標(biāo)志值群分主進(jìn)程和子進(jìn)程,我們同時(shí)可以看到和媳,在主進(jìn)程執(zhí)行完成后其會(huì)終止格遭,并不影響子進(jìn)程的執(zhí)行。也就是說:使用 fork
創(chuàng)建的進(jìn)程留瞳,主進(jìn)程并不會(huì)等待子進(jìn)程執(zhí)行完成拒迅,二者是獨(dú)立的。
getpid 和 getppid 方法
主進(jìn)程和子進(jìn)程都是獨(dú)立的進(jìn)程,因此二者都有獨(dú)立的 pid
璧微,前面我們?cè)谑褂?fork
創(chuàng)建進(jìn)程的時(shí)候作箍,如果是主進(jìn)程,那么 fork
函數(shù)的返回值就是主進(jìn)程的 pid
前硫,而如果是子進(jìn)程的話胞得,fork
函數(shù)的返回值并不是其的 pid
,而是 0屹电,以此來(lái)和主進(jìn)程進(jìn)行區(qū)分阶剑。
同時(shí),我們可以使用 getpid
來(lái)獲取進(jìn)程的 pid
危号,使用 getppid
獲取父進(jìn)程的 pid
:
# 導(dǎo)入 os 模塊
import os
# 創(chuàng)建進(jìn)程
pid = os.fork()
# 獲取進(jìn)程的 pid
if not pid:
print("子進(jìn)程的 pid 是 %s个扰,子進(jìn)程的 ppid 是 %s"%(os.getpid(),os.getppid()))
else:
print("父進(jìn)程的 pid 是 %s,父進(jìn)程的 ppid 是 %s"%(os.getpid(),os.getppid()))
運(yùn)行結(jié)果:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess02.py
父進(jìn)程的 pid 是 10431葱色,父進(jìn)程的 ppid 是 9576
子進(jìn)程的 pid 是 10432,子進(jìn)程的 ppid 是 10431
父進(jìn)程也是有 ppid
的娘香,這里它的 ppid 就是 bash 的 pid
苍狰。
進(jìn)程是彼此獨(dú)立的
一旦創(chuàng)建進(jìn)程,他們就是彼此獨(dú)立的烘绽,因此我們不能讓兩個(gè)進(jìn)程修改同一個(gè)變量:
from os import fork
pid = fork()
# 創(chuàng)建一個(gè)列表
testList = [1,2,3]
if not pid:
# 子進(jìn)程向列表中添加元素
testList.append(4)
print("子進(jìn)程:",testList)
else:
# 主進(jìn)程從列表中移除元素
testList.remove(3)
print("主進(jìn)程:",testList)
運(yùn)行結(jié)果:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess3.py
主進(jìn)程: [1, 2]
子進(jìn)程: [1, 2, 3, 4]
可見淋昭,兩個(gè)進(jìn)程是不能修改同一份數(shù)據(jù)的,可以這樣理解:一旦創(chuàng)建了一個(gè)進(jìn)程安接,就在原始程序的基礎(chǔ)上創(chuàng)建了一份全新的副本翔忽,和原始的代碼沒有了關(guān)聯(lián),因此他們是無(wú)法修改同一份數(shù)據(jù)的盏檐,只可修改其所在的代碼副本中的數(shù)據(jù)歇式。
Process 創(chuàng)建進(jìn)程
上面的 fork
可以用來(lái)在類 UNIX 操作系統(tǒng)中創(chuàng)建進(jìn)程,而在 Windows 環(huán)境下就不能再使用 fork
方法胡野,需要使用其他的方式材失。針對(duì)這種情況,Python 為我們提供了一個(gè) multiprocessing
模塊硫豆,通過該模塊中的 Process
類也可以創(chuàng)建進(jìn)程龙巨,并兼容各個(gè)平臺(tái)。
下面是 Process
類的用法:
subprocess = Process([ target ],[ args ],[ kwargs ])
Process
類在創(chuàng)建進(jìn)程對(duì)象時(shí)熊响,可以接收一個(gè)函數(shù)作為參數(shù)旨别,該函數(shù)就是我們要在進(jìn)程中執(zhí)行的函數(shù),后面的 args
和 kwargs
分別是一個(gè)元組和字典汗茄,作為目標(biāo)函數(shù)的參數(shù)傳入秸弛。如果不傳入目標(biāo)函數(shù),將會(huì)默認(rèn)執(zhí)行進(jìn)程對(duì)象中的 run
方法。
# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep
# 定義目標(biāo)函數(shù)
def getNum(num,delay):
for i in range(num):
print(i)
sleep(delay)
# 在入口程序中執(zhí)行多進(jìn)程操作
if __name__ == "__main__":
p = Process(target = getNum,args = (5,1))
# 開始執(zhí)行進(jìn)程
p.start()
運(yùn)行結(jié)果:
0
1
2
3
4
使用 Process
創(chuàng)建的進(jìn)程胆屿,主進(jìn)程會(huì)等待子進(jìn)程執(zhí)行完成嗎奥喻?我們可以進(jìn)行驗(yàn)證:
# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep
# 定義目標(biāo)函數(shù)
def getNum(num,delay):
for i in range(num):
print(i)
sleep(delay)
# 在入口程序中執(zhí)行多進(jìn)程操作
if __name__ == "__main__":
p = Process(target = getNum,args = (5,1))
# 開始執(zhí)行進(jìn)程
p.start()
print("-----我是主進(jìn)程中的代碼-----")
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
-----我是主進(jìn)程中的代碼-----
0
1
2
3
4
PS C:\Users\Charley\Desktop\py>
可見,主進(jìn)程是會(huì)等待子進(jìn)程執(zhí)行完成再退出的非迹。
Process 進(jìn)程對(duì)象的常用方法
下面是 Process
進(jìn)程對(duì)象的一些常用屬性和方法:
-
pid
屬性:獲取當(dāng)前進(jìn)程的 pid -
name
屬性:獲取當(dāng)前進(jìn)程的 name -
is_alive
方法:判斷進(jìn)程是否在運(yùn)行 -
join
方法环鲤,主進(jìn)程是否等待子進(jìn)程結(jié)束再執(zhí)行,默認(rèn)為等子進(jìn)程結(jié)束后再執(zhí)行主進(jìn)程憎兽,也可以接受一個(gè)參數(shù)冷离,表示最多等待多少時(shí)間 -
start
方法:?jiǎn)?dòng)子進(jìn)程 -
run
方法:如果創(chuàng)建子進(jìn)程對(duì)象時(shí)沒有指定target
參數(shù),則會(huì)在start
時(shí)執(zhí)行子進(jìn)程中的run
方法 -
terminate
方法:立即終止子進(jìn)程纯命,不論任務(wù)是否完成
擴(kuò)展 Process 類
除了使用 Process
類直接創(chuàng)建進(jìn)程西剥,我們也可以自定義一個(gè)繼承于 Process
的類來(lái)進(jìn)行創(chuàng)建,提高了擴(kuò)展性:
# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep
# 定義 Process 類的子類
class CreateSubProcess(Process):
def __init__(self,name,maxRange):
Process.__init__(self)
self.name = name
self.__maxRange = maxRange
def run(self):
for i in range(self.__maxRange):
print("%s 正在輸出 %s"%(self.name,i))
sleep(1)
if __name__ == '__main__':
p = CreateSubProcess("子進(jìn)程01",5)
p.start()
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
子進(jìn)程01 正在輸出 0
子進(jìn)程01 正在輸出 1
子進(jìn)程01 正在輸出 2
子進(jìn)程01 正在輸出 3
子進(jìn)程01 正在輸出 4
PS C:\Users\Charley\Desktop\py>
我們定義了 Process
的子類亿汞,實(shí)現(xiàn)了在其基礎(chǔ)上的擴(kuò)展瞭空,可以把某個(gè)獨(dú)立進(jìn)程相關(guān)的方法都放在子類里面,起到了很好的封裝作用疗我。
進(jìn)程池
除了使用 Process
類創(chuàng)建進(jìn)程咆畏,也可以運(yùn)用進(jìn)程池來(lái)實(shí)現(xiàn)多進(jìn)程,更加節(jié)省資源和方便吴裤。進(jìn)程池方便了資源管理和調(diào)度旧找,要使用進(jìn)程池,需要首先創(chuàng)建一個(gè)進(jìn)程池對(duì)象麦牺,創(chuàng)建該對(duì)象時(shí)需要傳入一個(gè)參數(shù)钮蛛,表示池子中的最大進(jìn)程數(shù),我們所有的多任務(wù)都可以通過進(jìn)程池中的進(jìn)程來(lái)完成剖膳,而不是每個(gè)任務(wù)都創(chuàng)建一個(gè)進(jìn)程魏颓,節(jié)省了資源,并且進(jìn)程池會(huì)自動(dòng)幫我們調(diào)度資源吱晒。
創(chuàng)建進(jìn)程池
創(chuàng)建進(jìn)程池很簡(jiǎn)單琼开,只需使用 multiprocessing
模塊中的 Pool
類:
pool = Pool([ maxValue ])
在創(chuàng)建進(jìn)程池對(duì)象時(shí)可以接受一個(gè)參數(shù),表示池子中的最大進(jìn)程數(shù)枕荞,如果不傳入?yún)?shù)柜候,表示無(wú)進(jìn)程數(shù)限制。
進(jìn)程池的幾個(gè)常用方法
下面是幾個(gè)進(jìn)程池中的常用方法:
- apply_async( target,args,kwargs ):接受一個(gè)目標(biāo)函數(shù)作為池子中某個(gè)進(jìn)程的執(zhí)行函數(shù)躏精,異步添加
- apply( target,args,kwargs ):同步添加渣刷,只有在當(dāng)前進(jìn)程執(zhí)行完成后再添加目標(biāo)目標(biāo)函數(shù),會(huì)造成進(jìn)程池中其他進(jìn)程的阻塞矗烛,少用
- close:關(guān)閉進(jìn)程池辅柴,關(guān)閉后不再接受任務(wù)
- join:主進(jìn)程等待進(jìn)程池中的進(jìn)程執(zhí)行完成后再執(zhí)行箩溃,因?yàn)?strong>使用進(jìn)程池創(chuàng)建進(jìn)程時(shí),主進(jìn)程默認(rèn)不會(huì)等待子進(jìn)程執(zhí)行完成碌嘀,因此該方法較常用涣旨,但只能放在
close
方法后執(zhí)行。
下面來(lái)看一個(gè)進(jìn)程池的例子:
from multiprocessing import Pool
from time import sleep
import os
def getNum(maxRange,delay):
for i in range(maxRange):
print("進(jìn)程 %s 正在輸出 %d"%(os.getpid(),i))
sleep(delay)
if __name__ == '__main__':
# 定義進(jìn)程池
pool = Pool(3)
# 向進(jìn)程池中添加任務(wù)
for i in range(5):
pool.apply_async(getNum,(3,1))
# 關(guān)閉進(jìn)程池股冗,不在接受任務(wù)
pool.close()
pool.join()
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 2584 正在輸出 0
進(jìn)程 12780 正在輸出 0
進(jìn)程 6072 正在輸出 0
進(jìn)程 12780 正在輸出 1
進(jìn)程 2584 正在輸出 1
進(jìn)程 6072 正在輸出 1
進(jìn)程 2584 正在輸出 2
進(jìn)程 12780 正在輸出 2
進(jìn)程 6072 正在輸出 2
進(jìn)程 2584 正在輸出 0
進(jìn)程 12780 正在輸出 0
進(jìn)程 12780 正在輸出 1
進(jìn)程 2584 正在輸出 1
進(jìn)程 2584 正在輸出 2
進(jìn)程 12780 正在輸出 2
PS C:\Users\Charley\Desktop\py>
上面我們創(chuàng)建了容量為 3 的進(jìn)程池霹陡,并向其中仍進(jìn)去了 5 個(gè)任務(wù),一開始進(jìn)程池中進(jìn)程數(shù)是小于任務(wù)數(shù)的止状,所以會(huì)先執(zhí)行 3 個(gè)任務(wù)烹棉,當(dāng)池子中有進(jìn)程完成任務(wù)后,再執(zhí)行后面的任務(wù)怯疤,直到所有任務(wù)執(zhí)行完成為止浆洗。
進(jìn)程間通信
前面我們說到過,多個(gè)進(jìn)程間是彼此獨(dú)立的集峦,無(wú)法直接修改同一份數(shù)據(jù)伏社,但實(shí)際情況中往往是有這樣的需求的,于是就要使用進(jìn)程間通信塔淤。進(jìn)程間通信的方式有很多洛口,比如共享內(nèi)存、socket凯沪、網(wǎng)絡(luò)、Queue 等买优,我們這里討論的是使用 Queue 進(jìn)行進(jìn)程間通信妨马。
Queue
Queue
也是 multiprocessing
模塊中的一個(gè)類,意為“隊(duì)列”杀赢,創(chuàng)建一個(gè) Queue
對(duì)象:
queue = Queue( [maxVal] )
在創(chuàng)建 Queue
對(duì)象時(shí)烘跺,可以傳入一個(gè)參數(shù),表示隊(duì)列中的最大消息數(shù)脂崔,如果不傳滤淳,表示無(wú)限制。Queue
對(duì)象中的常用方法有:
-
qsize
:返回當(dāng)前隊(duì)列中包含的消息數(shù)量 -
empty
:判斷隊(duì)列是否為空 -
full
:判斷隊(duì)列是否充滿 -
get([ block, [timeout]])
:從隊(duì)列中取出消息砌左,如果當(dāng)前隊(duì)列為空脖咐,則進(jìn)入阻塞狀態(tài),直到隊(duì)列中有消息可以被取用為止汇歹。如果 block 設(shè)置為False
屁擅,則在隊(duì)列為空時(shí)拋出Queue.Empty
異常。該方法也可以接受一個(gè)超時(shí)參數(shù)产弹,表示最多等待多長(zhǎng)時(shí)間派歌,如果超過等待時(shí)間仍然沒有取出消息,也會(huì)拋出Queue.Empty
異常。 -
get_nowait
:相當(dāng)于get(False)
-
put(item, [block])
:向隊(duì)列中放入消息胶果,當(dāng)隊(duì)列充滿時(shí)匾嘱,會(huì)進(jìn)入等待狀態(tài),直到隊(duì)列中可以放入消息為止早抠。如果 block 設(shè)置為False
辜梳,則在隊(duì)列充滿時(shí)拋出Queue.Full
異常。 -
put_nowait
:相當(dāng)于put(item,False)
使用 Queue 進(jìn)行進(jìn)程間通信
我們可以利用 Queue
進(jìn)行進(jìn)程間通信造壮,只需將 Queue
對(duì)象傳入相應(yīng)的任務(wù)函數(shù)中:
from multiprocessing import Queue,Process
from time import sleep
import os
def getVal(queue):
while True:
if queue.empty():
print("進(jìn)程 %s 已經(jīng)取完了~"%os.getpid())
break
else:
print("進(jìn)程 %s 獲取到了數(shù)據(jù) %d"%(os.getpid(),queue.get()))
sleep(1)
def putVal(queue,maxRange):
for i in range(maxRange):
queue.put(i)
print("進(jìn)程 %s 放入了數(shù)據(jù) %d"%(os.getpid(),i))
sleep(1)
# 創(chuàng)建隊(duì)列
queue = Queue()
# 創(chuàng)建兩個(gè)進(jìn)程
gv = Process(target = getVal,args = (queue,))
pv = Process(target = putVal,args = (queue,10))
# 啟動(dòng)進(jìn)程
if __name__ == "__main__":
pv.start()
gv.start()
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 22208 放入了數(shù)據(jù) 0
進(jìn)程 20656 獲取到了數(shù)據(jù) 0
進(jìn)程 22208 放入了數(shù)據(jù) 1
進(jìn)程 20656 獲取到了數(shù)據(jù) 1
進(jìn)程 22208 放入了數(shù)據(jù) 2
進(jìn)程 20656 獲取到了數(shù)據(jù) 2
進(jìn)程 22208 放入了數(shù)據(jù) 3
進(jìn)程 20656 獲取到了數(shù)據(jù) 3
進(jìn)程 22208 放入了數(shù)據(jù) 4
進(jìn)程 20656 獲取到了數(shù)據(jù) 4
進(jìn)程 22208 放入了數(shù)據(jù) 5
進(jìn)程 20656 獲取到了數(shù)據(jù) 5
進(jìn)程 22208 放入了數(shù)據(jù) 6
進(jìn)程 20656 獲取到了數(shù)據(jù) 6
進(jìn)程 22208 放入了數(shù)據(jù) 7
進(jìn)程 20656 獲取到了數(shù)據(jù) 7
進(jìn)程 22208 放入了數(shù)據(jù) 8
進(jìn)程 20656 獲取到了數(shù)據(jù) 8
進(jìn)程 22208 放入了數(shù)據(jù) 9
進(jìn)程 20656 獲取到了數(shù)據(jù) 9
進(jìn)程 20656 已經(jīng)取完了~
PS C:\Users\Charley\Desktop\py>
進(jìn)程池中的通信
上面的進(jìn)程間通信是基于 Process
類(或者其子類)創(chuàng)建出的進(jìn)程泼诱,可以直接使用 multiprocessing
中的 Queue
隊(duì)列,但對(duì)于進(jìn)程池咪奖,需要使用 multiprocessing
中的 Manager
類盗忱,除此之外,其他操作并沒有變化:
# 導(dǎo)入 Manager 和 Pool 類
from multiprocessing import Manager,Pool
from time import sleep
import os
def getVal(queue):
while True:
if queue.empty():
print("進(jìn)程 %s 已經(jīng)取完了~"%os.getpid())
break
else:
print("進(jìn)程 %s 獲取到了數(shù)據(jù) %d"%(os.getpid(),queue.get()))
sleep(1)
def putVal(queue,maxRange):
for i in range(maxRange):
queue.put(i)
print("進(jìn)程 %s 放入了數(shù)據(jù) %d"%(os.getpid(),i))
sleep(1)
# 入口方法
def main():
# 創(chuàng)建隊(duì)列
queue = Manager().Queue()
# 創(chuàng)建進(jìn)程池
pool = Pool(2)
pool.apply_async(putVal,(queue,5))
pool.apply_async(getVal,(queue,))
# 關(guān)閉進(jìn)程池并使主進(jìn)程等待
pool.close()
pool.join()
if __name__ == '__main__':
main()
運(yùn)行結(jié)果如下:
PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 15304 放入了數(shù)據(jù) 0
進(jìn)程 17264 獲取到了數(shù)據(jù) 0
進(jìn)程 15304 放入了數(shù)據(jù) 1
進(jìn)程 17264 獲取到了數(shù)據(jù) 1
進(jìn)程 15304 放入了數(shù)據(jù) 2
進(jìn)程 17264 獲取到了數(shù)據(jù) 2
進(jìn)程 15304 放入了數(shù)據(jù) 3
進(jìn)程 17264 獲取到了數(shù)據(jù) 3
進(jìn)程 15304 放入了數(shù)據(jù) 4
進(jìn)程 17264 獲取到了數(shù)據(jù) 4
進(jìn)程 17264 已經(jīng)取完了~
PS C:\Users\Charley\Desktop\py>
針對(duì)于進(jìn)程池中的通信羊赵,只需改變創(chuàng)建隊(duì)列對(duì)象的方式即可趟佃,其他的操作都不變。
總結(jié)
本文主要介紹了 Python 中的多進(jìn)程昧捷,下面是一些要點(diǎn):
- 使用
fork
在類 UNIX 操作系統(tǒng)創(chuàng)建進(jìn)程 - 使用兼容各平臺(tái)的
Process
- 擴(kuò)展
Process
類闲昭,封裝進(jìn)程模塊 - 使用
Pool
創(chuàng)建進(jìn)程池 - 幾種創(chuàng)建方式下,主進(jìn)程是否會(huì)等待子進(jìn)程執(zhí)行完成
- 進(jìn)程對(duì)象的幾個(gè)常用方法
- 進(jìn)程池對(duì)象的常用方法
- 隊(duì)列的常用方法
- 進(jìn)程間通信
- 使用進(jìn)程池時(shí)如何通信
完靡挥。