涉及的模塊主要包括:
-
multiprocessing模塊
- Process類
- Queue類
- Pool類
-
subprocess模塊
- call方法
- Popen類
multiprocessing中的Process類
使用Process類,可以啟動一個子進程并執(zhí)行指定的函數(shù)代碼.
示例代碼
import os
from multiprocessing import process
def test_func(func_name): #定義一個函數(shù),作為子進程的代碼
print(f"subprocess' name is {func_name} PID is {os.getpid()}")
if __name__ == "__main__":
print(f'Parent process PID is {os.getpid()}')
p = Process(target=test_func, args=('TestFunc',)) #定義一個Process的類對象p, 子進程的函數(shù)引用和參數(shù)元組作為類的初始參數(shù)
print('Child process will start...')
p.start() #啟動子進程
p.join() #等待子進程結(jié)束
print('Child process end.')
執(zhí)行結(jié)果
[LiangZhang@MacBook test]$
[LiangZhang@MacBook test]$python3 test.py
Parent process PID is 6153
Child process will start...
subprocess' name is TestFunc PID is 6154
Child process end.
multiprocessing中的Pool類
如果需要批量創(chuàng)建子進程,并且在子進程中執(zhí)行更多的函數(shù),那么我們可以使用Pool類.
import os
from multiprocessing import Pool
import time
import random
def test_func(func_name): #定義一個函數(shù),作為子進程的代碼
print(f"subprocess' name is {func_name} PID is {os.getpid()} and ParentPID is {os.getppid()}")
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f"Task {func_name} has been done in {end-start} seconds.")
if __name__ == "__main__":
print(f'Parent process PID is {os.getpid()}')
p = Pool(4) #通Pool創(chuàng)建一個類對象p, 該對象包含4個子進程.
for i in range(15):
p.apply_async(func=test_func, args=(i,)) #將多個任務(wù)(15個函數(shù))分配進程池對象p中的子進程.
print('Waiting for all subprocess done...')
p.close() #阻止任何其它任務(wù)提交到進程池, 一但之前提交的任務(wù)完成,所有子進程退出.
p.join() #等待進程池中的子進程全部完成后再執(zhí)行后續(xù)代碼. (在調(diào)用join之前必須調(diào)用close或terminate)
print('All the subprocesses done...')
注: 我們在提交任務(wù)給進程池時使用的方法是"p.appy_async()", 該方法和p.appy()的區(qū)別是: p.apply_async()為異步執(zhí)行,所有子進程并行運行. 而p.apply()是同步執(zhí)行, 后一個子進程需要等待前一個子進程執(zhí)行完畢都會運行.
程序輸出**
[LiangZhang@MacBook test]$python3 test.py
Parent process PID is 6426
Waiting for all subprocess done...
subprocess' name is 0 PID is 6427 and ParentPID is 6426
subprocess' name is 1 PID is 6428 and ParentPID is 6426
subprocess' name is 2 PID is 6429 and ParentPID is 6426
subprocess' name is 3 PID is 6430 and ParentPID is 6426
Task 3 has been done in 0.7806532382965088 seconds.
subprocess' name is 4 PID is 6430 and ParentPID is 6426
Task 2 has been done in 0.9312319755554199 seconds.
subprocess' name is 5 PID is 6429 and ParentPID is 6426
.....
....
Task 10 has been done in 2.6583831310272217 seconds.
Task 14 has been done in 1.1612498760223389 seconds.
Task 12 has been done in 2.321526050567627 seconds.
All the subprocesses done...
通過輸出,我們注意到無論有多少任務(wù),都是由進程池中4個進程完成的.
multiprocessing中的Queue類
我們可以通過Queue類在父進程中創(chuàng)建了個隊列,用于子進程之間通信.
以下示例將在主進程中創(chuàng)建一個queue, 和兩個子進程, 一個子進程向queue中寫入數(shù)據(jù),另一個子進程從queue中讀取數(shù)據(jù).
import os
from multiprocessing import Process,Queue
import time
import random
def writter(q,data_list): #定義一個函數(shù)用于向Queue寫入數(shù)據(jù)
print(f"Subprocess Writer's PID is {os.getpid()}")
for data in data_list:
print(f"Put <{data}> to Queue...")
q.put(data) #向隊列q中寫入數(shù)據(jù)
time.sleep(random.random())
def reader(q):
print(f"Subprocess Reader's PID is {os.getpid()}")
while True:
data = q.get(True) #從隊列中獲取數(shù)據(jù), 參數(shù)True為block模式.
print(f'Get <{data}> from Queue!')
if __name__ == "__main__":
test_datalist = ['Alice', 'Bob', 'Cindy', 'David']
print(f'Parent process PID is {os.getpid()}')
q = Queue()
pw = Process(target=writter, args=(q,test_datalist))
pr = Process(target=reader, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()
注: q.get(True) 參數(shù)True為block=True, 意思是如果隊列不為空,立刻取出數(shù)據(jù),但是如果隊列為空則阻塞等待. 如果block=False 如果隊列為空把raise一個queue.Empty的異常.
輸出結(jié)果
[LiangZhang@MacBook test]$python3 test.py
Parent process PID is 6661
Subprocess Writer's PID is 6662
Put <Alice> to Queue...
Subprocess Reader's PID is 6663
Get <Alice> from Queue!
Put <Bob> to Queue...
Get <Bob> from Queue!
Put <Cindy> to Queue...
Get <Cindy> from Queue!
Put <David> to Queue...
Get <David> from Queue!
subprocess中的call方法執(zhí)行外部程序做為子進程
import os
import subprocess
r = subprocess.call(['nslookup', 'www.baidu.com']) #將執(zhí)行的命令和參數(shù)放入一個list傳遞給call方法, r為執(zhí)行完程序后的返回代碼.
print(f"exitCode:", r)
subprocess中的Popen類實現(xiàn)和子程序輸入/輸出的交互
使用Popen啟用一個外部程序作為子進程, 我們可以通過給stdin/stdout/stderr賦值,以便于后期隨時查看子進程的輸出,以及輸入相應的內(nèi)容給子程序.
import subprocess
p = subprocess.Popen(['nslookup',],stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) #創(chuàng)建一個Popen類對象, 執(zhí)行程序'nslookup', 并且設(shè)置輸入/輸出/錯誤輸出為subprocess.PIPE.
try:
output, err = p.communicate(b'www.baidu.com',5) #使用communication方法為'nslookup'傳遞后續(xù)輸入. 參數(shù)5為等待時間, 如果超時后子程序還會退出, 那么就會raise一個異常, 執(zhí)行期間輸出數(shù)據(jù)不會丟失. 注意: communication方法接受的輸入必須是字節(jié)碼,不可以是字符串.
except: #捕獲執(zhí)行超時的異常
p.kill() #如果執(zhí)行超時,那么終止子程序
output, err = p.communicate(None) #并且把執(zhí)行期間的輸出以元組形式返回.
print(output.decode('utf-8')) #由于返回的輸出是字節(jié)碼形式,所以先decode以后才可以打印
輸出
[LiangZhang@MacBook test]$python3 test.py
Server: 192.168.50.1
Address: 192.168.50.1#53
Non-authoritative answer:
www.baidu.com canonical name = www.a.shifen.com.
Name: www.a.shifen.com
Address: 61.135.169.125
Name: www.a.shifen.com
Address: 61.135.169.121