摘要:Python
,多進(jìn)程
進(jìn)程是操作系統(tǒng)中具體的處理任務(wù)臣咖,米一個(gè)進(jìn)程都會有自己獨(dú)立的內(nèi)存空間
,它是線程的載體豌鹤。當(dāng)一個(gè)程序啟動時(shí)亡哄,會默認(rèn)啟動一個(gè)進(jìn)程,將該進(jìn)程裝載到內(nèi)存布疙,同時(shí)在該進(jìn)程中還會默認(rèn)啟動一個(gè)線程蚊惯,來執(zhí)行本進(jìn)程中的內(nèi)容。
創(chuàng)建多進(jìn)程程序
在Python中進(jìn)程的相關(guān)實(shí)現(xiàn)被封裝在模塊multiprocessing
中灵临,進(jìn)程的創(chuàng)建方法和線程一樣有兩種
- 直接通過multiprocessing中的進(jìn)程類Process創(chuàng)建
- 繼承multiprocessing的Process類截型,重寫run方法
類似于線程創(chuàng)建,對Process類實(shí)例化時(shí)直接將進(jìn)程的處理函數(shù)傳入即可儒溉,通過target
來接收處理函數(shù)宦焦,創(chuàng)建好進(jìn)程后通過實(shí)例化對象的start方法將其啟動
(1)使用類實(shí)例化方法創(chuàng)建進(jìn)程
定義進(jìn)程處理函數(shù) run_proc 。在實(shí)例化 Process 時(shí) ,將 run_proc 傳入 顿涣。接著使用實(shí)例化對象
start 方法啟動進(jìn)程波闹,然后調(diào)用join代表該子進(jìn)程結(jié)束之后主進(jìn)程才能結(jié)束
import time
import os
from multiprocessing import Process
def run_proc(sid):
print("{} start, pid:{}".format(sid, os.getpid()))
time.sleep(3)
print("{} end, pid:{}".format(sid, os.getpid()))
if __name__ == '__main__':
p1 = Process(target=run_proc, args=("a", ))
p2 = Process(target=run_proc, args=("b", ))
print("main process: {}".format(os.getpid()))
p1.start()
p2.start()
p1.join()
p2.join()
print("main process end...")
輸出如下,課件主進(jìn)程和子進(jìn)程的進(jìn)程號pid都不一樣涛碑,在調(diào)用join之后主進(jìn)程阻塞直到所有子進(jìn)程結(jié)束
main process: 12611
a start, pid:12612
b start, pid:12613
a end, pid:12612
b end, pid:12613
main process end...
(2)使用繼承類的方式創(chuàng)建進(jìn)程
實(shí)現(xiàn)自定義進(jìn)程類使其繼承于系統(tǒng)進(jìn)程類 Process 精堕。重寫類中的 run 方法,
import time
import os
from multiprocessing import Process
def run_proc(sid):
print("{} start, pid:{}".format(sid, os.getpid()))
time.sleep(3)
print("{} end, pid:{}".format(sid, os.getpid()))
class MyProcess(Process):
def __init__(self, sid):
super().__init__()
self.sid = sid
def run(self):
run_proc(self.sid)
if __name__ == '__main__':
p1 = Process(target=run_proc, args=("a",))
p2 = Process(target=run_proc, args=("b",))
print("main process: {}".format(os.getpid()))
p1.start()
p2.start()
p1.join()
p2.join()
print("main process end...")
輸出如下,和直接實(shí)例化Process類輸出效果一致
main process: 14859
a start, pid:14860
b start, pid:14861
a end, pid:14860
b end, pid:14861
main process end...
創(chuàng)建進(jìn)程池
當(dāng)項(xiàng)目達(dá)到一定的規(guī)模蒲障,頻繁創(chuàng)建/銷毀進(jìn)程或者線程是非常消耗資源的歹篓,這個(gè)時(shí)候就要編寫自己的線程池/進(jìn)程池,在Python只有兩個(gè)模塊實(shí)現(xiàn)進(jìn)程池揉阎,分別是concurrent.futures
和multiprocessing.Pool
(1)multiprocessing.Pool
傳遞給Pool一個(gè)參數(shù)設(shè)置進(jìn)程池內(nèi)的最大進(jìn)程數(shù)庄撮,Pool有多個(gè)方法,主要是apply
毙籽,apply_async
洞斯,map
,map_async
坑赡,starmap
巡扇,starmap_async
,區(qū)別如下
-
apply
:單次同步執(zhí)行垮衷,每次執(zhí)行傳入一個(gè)執(zhí)行函數(shù)的參數(shù)厅翔,并且執(zhí)行完畢才能執(zhí)行下一個(gè)進(jìn)程,如果執(zhí)行函數(shù)有返回值返回最后一個(gè)執(zhí)行完進(jìn)程的值 -
apply_async
:單次啟動一個(gè)任務(wù)搀突,傳入一個(gè)執(zhí)行函數(shù)的參數(shù)刀闷,異步執(zhí)行,啟動后不等這個(gè)進(jìn)程結(jié)束又開始執(zhí)行新任務(wù),如果執(zhí)行函數(shù)有返回值返回最后一個(gè)執(zhí)行完進(jìn)程的對象甸昏,需要調(diào)用get獲得結(jié)果值 -
map
:執(zhí)行一個(gè)任務(wù)列表顽分,傳入執(zhí)行函數(shù)的參數(shù)一個(gè)參數(shù)列表,同步執(zhí)行施蜜,即主進(jìn)程阻塞直到任務(wù)列表中的任務(wù)全部執(zhí)行完畢卒蘸,如果執(zhí)行函數(shù)有返回值,返回一個(gè)結(jié)果列表翻默,順序和輸入列表的元素順序一致 -
map_async
:執(zhí)行一個(gè)任務(wù)列表缸沃,不需要等待任務(wù)列表中的任務(wù)執(zhí)行完畢,主進(jìn)程可以繼續(xù)往下執(zhí)行修械,如果執(zhí)行函數(shù)有返回值趾牧,返回一個(gè)結(jié)果對象,需要調(diào)用get方法得到輸出值,順序和輸入列表的元素順序一致 -
starmap
:和map執(zhí)行類似,區(qū)別是map對執(zhí)行函數(shù)只能傳入一個(gè)參數(shù)拾并,而startmap可以傳入多個(gè)參數(shù),如果執(zhí)行函數(shù)有返回值哄芜,返回一個(gè)結(jié)果列表,順序和輸入列表的元素順序一致 -
starmap_async
:和map_async執(zhí)行類似柬唯,區(qū)別是map_async對執(zhí)行函數(shù)只能傳入一個(gè)參數(shù)认臊,而starmap_async可以傳入多個(gè)參數(shù),如果執(zhí)行函數(shù)有返回值权逗,返回一個(gè)結(jié)果對象美尸,需要調(diào)用get方法得到輸出值冤议,順序和輸入列表的元素順序一致
除此之外斟薇,在執(zhí)行完畢多進(jìn)程任務(wù)后,調(diào)用close
關(guān)閉Process對象恕酸,釋放與之關(guān)聯(lián)的所有資源堪滨,調(diào)用join
阻塞主進(jìn)程,直到調(diào)用join的子進(jìn)程執(zhí)行完畢再退出
下面用代碼測試說明
- apply
import time
import datetime
import os
from multiprocessing import Pool
def run_proc(sid):
time.sleep(3)
print("{} end, pid:{}, time:{}".format(sid, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
return str(sid)
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
for i in jobs:
pool.apply(run_proc, i)
pool.close()
pool.join()
輸出如下蕊温,可見apply每次只輸入一個(gè)執(zhí)行函數(shù)的參數(shù)袱箱,即每次執(zhí)行一次,多個(gè)參數(shù)組成的任務(wù)列表只能一個(gè)接著一個(gè)執(zhí)行
a end, pid:25260, time:2021-05-16 18:15:08
b end, pid:25261, time:2021-05-16 18:15:11
c end, pid:25262, time:2021-05-16 18:15:14
d end, pid:25263, time:2021-05-16 18:15:17
2.apply_async
直接修改Pool的apply方法改為apply_assync
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
for i in jobs:
pool.apply_async(run_proc, i)
print("main process")
pool.close()
pool.join()
輸出如下义矛,可見達(dá)到了并行效果发笔,進(jìn)程之間執(zhí)行任務(wù)互補(bǔ)阻塞,主進(jìn)程也沒有被阻塞凉翻,main process被提前打印了出來
main process
a end, pid:25721, time:2021-05-16 18:17:33
b end, pid:25722, time:2021-05-16 18:17:33
c end, pid:25723, time:2021-05-16 18:17:33
d end, pid:25724, time:2021-05-16 18:17:33
3.map
修改Pool的方法為map
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
pool.map(run_proc, jobs)
print("main process")
pool.close()
pool.join()
輸出如下了讨,可見map達(dá)到了并行的效果,但是主進(jìn)程被阻塞,即map執(zhí)行完任務(wù)列表后前计,主進(jìn)程才執(zhí)行
a end, pid:26939, time:2021-05-16 18:21:43d end, pid:26942, time:2021-05-16 18:21:43
b end, pid:26940, time:2021-05-16 18:21:43
c end, pid:26941, time:2021-05-16 18:21:43
main process
4.map_async
if __name__ == '__main__':
pool = Pool(4)
jobs = ["a", "b", "c", "d"]
res = pool.map_async(run_proc, jobs)
print("main process...")
pool.close()
pool.join()
print(res.get())
輸出如下胞谭,可見效果了map類似,區(qū)別是主進(jìn)程沒有被阻塞男杈,main process被提前打印了出來
main process...
c end, pid:21401, time:2021-05-16 21:41:54
b end, pid:21400, time:2021-05-16 21:41:54a end, pid:21399, time:2021-05-16 21:41:54
d end, pid:21402, time:2021-05-16 21:41:54
['a', 'b', 'c', 'd']
5.starmap
將執(zhí)行函數(shù)的入?yún)⒏臑閮蓚€(gè)丈屹,Pool函數(shù)改為starmap,并且傳入的任務(wù)列表的每個(gè)參數(shù)也改為兩個(gè)
def run_proc(sid, other):
time.sleep(3)
print("{} end, other:{}, pid:{}, time:{}".format(sid, other, os.getpid(), datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")))
return str(sid) + str(other)
if __name__ == '__main__':
pool = Pool(4)
jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
pool.starmap(run_proc, jobs)
print("main process")
pool.close()
pool.join()
輸入如下伶棒,執(zhí)行函數(shù)正確的接收了兩個(gè)參數(shù)旺垒,主進(jìn)程阻塞
b end, other:r, pid:27642, time:2021-05-16 18:24:52
a end, other:b, pid:27641, time:2021-05-16 18:24:52
c end, other:d, pid:27643, time:2021-05-16 18:24:52
d end, other:a, pid:27644, time:2021-05-16 18:24:52
main process
看一個(gè)map是否支持兩個(gè)參數(shù)
if __name__ == '__main__':
pool = Pool(4)
jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
pool.map(run_proc, jobs)
print("main process")
pool.close()
pool.join()
直接報(bào)錯(cuò),執(zhí)行函數(shù)缺少參數(shù)
TypeError: run_proc() missing 1 required positional argument: 'other'
6.starmap_async
if __name__ == '__main__':
pool = Pool(4)
jobs = [["a", "b"], ["b", "r"], ["c", "d"], ["d", "a"]]
res = pool.starmap_async(run_proc, jobs)
print("main process")
pool.close()
pool.join()
print(type(res.get()))
輸出如下苞冯,可見和starmap相比袖牙,主進(jìn)程沒有被阻塞,其他一樣
main process
a end, other:b, pid:17123, time:2021-05-16 21:28:30
b end, other:r, pid:17124, time:2021-05-16 21:28:30
d end, other:a, pid:17126, time:2021-05-16 21:28:30
c end, other:d, pid:17125, time:2021-05-16 21:28:30
<class 'list'>
多線程和多進(jìn)程的區(qū)別
多進(jìn)程和多線程都可以用并行機(jī)制來提升系統(tǒng)的運(yùn)行效率舅锄,二者的區(qū)別在于運(yùn)行時(shí)所占的內(nèi)存分布不同
- 多線程是共用一套內(nèi)存的代碼塊區(qū)間
- 多進(jìn)程是各用一套獨(dú)立的內(nèi)存區(qū)間
在大型計(jì)算機(jī)集群系統(tǒng)中鞭达,都會將多進(jìn)程程序分布運(yùn)行在不同的計(jì)算機(jī)上協(xié)同工作,而每一臺機(jī)器上的進(jìn)程內(nèi)部皇忿,又會由多個(gè)線程來并行工作