要讓Python程序?qū)崿F(xiàn)多進程(multiprocessing),我們先了解操作系統(tǒng)的相關(guān)知識汤徽。
Unix/Linux操作系統(tǒng)提供了一個fork()系統(tǒng)調(diào)用娩缰,它非常特殊。普通的函數(shù)調(diào)用谒府,調(diào)用一次拼坎,返回一次,但是fork()調(diào)用一次完疫,返回兩次泰鸡,因為操作系統(tǒng)自動把當前進程(稱為父進程)復(fù)制了一份(稱為子進程),然后壳鹤,分別在父進程和子進程內(nèi)返回鸟顺。
子進程永遠返回0,而父進程返回子進程的ID器虾。這樣做的理由是,一個父進程可以fork出很多子進程蹦锋,所以兆沙,父進程要記下每個子進程的ID,而子進程只需要調(diào)用getppid()就可以拿到父進程的ID莉掂。
Python的os模塊封裝了常見的系統(tǒng)調(diào)用葛圃,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進程:
import os
print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
運行結(jié)果如下:
Process (31887) start...
I (31887) just created a child process (31888).
I am child process (31888) and my parent is 31887.
由于Windows沒有fork調(diào)用憎妙,上面的代碼在Windows上無法運行库正。由于Mac系統(tǒng)是基于BSD(Unix的一種)內(nèi)核,所以厘唾,在Mac下運行是沒有問題的褥符,推薦大家用Mac學(xué)Python!
有了fork調(diào)用抚垃,一個進程在接到新任務(wù)時就可以復(fù)制出一個子進程來處理新任務(wù)喷楣,常見的Apache服務(wù)器就是由父進程監(jiān)聽端口,每當有新的http請求時鹤树,就fork出子進程來處理新的http請求铣焊。
Python跨平臺多進程支持
multiprocessing模塊提供了一個Process類來代表一個進程對象,下面的例子演示了啟動一個子進程并等待其結(jié)束:
from multiprocessing import Process
import os
# 子進程要執(zhí)行的代碼
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
Pool:如果要啟動大量的子進程罕伯,可以用進程池的方式批量創(chuàng)建子進程
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print 'Task %s runs %0.2f seconds.' % (name, (end - start))
if __name__=='__main__':
print 'Parent process %s.' % os.getpid()
p = Pool()
for i in range(9):
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'
運行結(jié)果:
Parent process 32203.
Waiting for all subprocesses done...
Run task 0 (32204)...
Run task 1 (32206)...
Run task 3 (32205)...
Run task 4 (32208)...
Run task 5 (32211)...
Run task 2 (32207)...
Run task 6 (32209)...
Run task 7 (32210)...
Task 0 runs 0.35 seconds.
Run task 8 (32204)...
Task 7 runs 0.82 seconds.
Task 3 runs 0.89 seconds.
Task 4 runs 1.20 seconds.
Task 1 runs 1.66 seconds.
Task 6 runs 1.97 seconds.
Task 5 runs 2.10 seconds.
Task 8 runs 1.76 seconds.
Task 2 runs 2.27 seconds.
All subprocesses done.
對Pool對象調(diào)用join()方法會等待所有子進程執(zhí)行完畢曲伊,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了追他。
請注意輸出的結(jié)果坟募,task 0岛蚤,1,2...8是立刻執(zhí)行的婿屹,而task 9要等待前面某個task完成后才執(zhí)行灭美,這是因為Pool的默認大小在我的電腦上是8,因此昂利,最多同時執(zhí)行8個進程届腐。這是Pool有意設(shè)計的限制,并不是操作系統(tǒng)的限制蜂奸。默認是CPU的核心數(shù)
進程間通信
Process之間肯定是需要通信的犁苏,操作系統(tǒng)提供了很多機制來實現(xiàn)進程間的通信。Python的multiprocessing模塊包裝了底層的機制扩所,提供了Queue围详、Pipes等多種方式來交換數(shù)據(jù)。
我們以Queue為例祖屏,在父進程中創(chuàng)建兩個子進程助赞,一個往Queue里寫數(shù)據(jù),一個從Queue里讀數(shù)據(jù):
from multiprocessing import Process, Queue
import os, time, random
#寫數(shù)據(jù)進程執(zhí)行的代碼:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進程執(zhí)行的代碼:
def read(q):
while True:
value = q.get(True)
print 'Get %s from queue.' % value
if __name__=='__main__':
# 父進程創(chuàng)建Queue袁勺,并傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw雹食,寫入:
pw.start()
# 啟動子進程pr,讀取:
pr.start()
# 等待pw結(jié)束:
pw.join()
# pr進程里是死循環(huán)期丰,無法等待其結(jié)束群叶,只能強行終止:
pr.terminate()
運行結(jié)果
Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
在Unix/Linux下,multiprocessing模塊封裝了fork()調(diào)用钝荡,使我們不需要關(guān)注fork()的細節(jié)街立。由于Windows沒有fork調(diào)用,因此埠通,multiprocessing需要“模擬”出fork的效果赎离,父進程所有Python對象都必須通過pickle序列化再傳到子進程去,所有端辱,如果multiprocessing在Windows下調(diào)用失敗了蟹瘾,要先考慮是不是pickle失敗了。