多進(jìn)程
要讓python程序?qū)崿F(xiàn)多進(jìn)程,我們先了解操作系統(tǒng)的相關(guān)知識(shí)请祖。
Unix订歪、Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊肆捕。普通的函數(shù)調(diào)用陌粹,調(diào)用一次,返回一次福压,但是fork()調(diào)用一次掏秩,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程)荆姆,然后蒙幻,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。
子進(jìn)程永遠(yuǎn)返回0胆筒,而父進(jìn)程返回子進(jìn)程的ID邮破。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程仆救,所以抒和,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid() 就可以拿到父進(jìn)程的ID
python的os模塊封裝了常見的系統(tǒng)調(diào)用彤蔽,其中就包括fork摧莽,可以再python程序中輕松創(chuàng)建子進(jìn)程:
import os
print("Process (%s)? start..." % os.getpid())
pid = os.fork()
if pid == 0:
? ? ? ? print("i am child process (%s) and my parent is %s." % (os.getpid(),os.getppid()))
else:
? ? ? ? print("I (%s) just created a child process (%s)" % (os.getpid(),pid))
運(yùn)行結(jié)果如下:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于Windows沒有fork調(diào)用,上面的代碼在windows上無法運(yùn)行顿痪。由于Mac系統(tǒng)是基于BSD(unix的一種)內(nèi)核镊辕,所以在Mac下運(yùn)行是沒有問題的,推薦大家用Mac學(xué)python
有了fork調(diào)用蚁袭,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來處理新任務(wù)征懈,常見的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽端口,每當(dāng)有新的http請(qǐng)求時(shí)揩悄,就fork出子進(jìn)程來處理新的http請(qǐng)求卖哎。
multiprocessing
如果你打算編寫多進(jìn)程的服務(wù)程序,Unix/Linux無疑是正確的選擇删性。由于windows沒有fork調(diào)用亏娜,難道在windows上無法用python編寫多進(jìn)程的程序?
由于python是跨平臺(tái)的镇匀,自然也應(yīng)該提供一個(gè)跨平臺(tái)的多進(jìn)程支持照藻。multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊。
multiprocessing模塊提供了一個(gè)Process類來代表一個(gè)進(jìn)程對(duì)象汗侵,下面的例子演示了一個(gè)子進(jìn)程并等待其結(jié)束:
from multiprocessing import Process
import os
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
? ? ? ? print('Run child process %s (%s)...' % (name,os.getpid()))
if __name__ == "__main__":
? ? ? ? print("Parent process %s." % os.getpid())
? ? ? ? p = Process(target = run_proc,args = ("test",))
? ? ? ? print("Child process will start.")
? ? ? ? p.start()
? ? ? ? p.join()
? ? ? ? print("Child process end")
執(zhí)行結(jié)果如下:
Parent process 928.
Process will start
Run child process test (929)...
Process end.
創(chuàng)建子進(jìn)程時(shí)幸缕,只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù)群发,創(chuàng)建一個(gè)Process實(shí)例,用start()方法啟動(dòng)发乔,這樣創(chuàng)建進(jìn)程比fork()還要簡單熟妓。
join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步栏尚。
Pool
如果要啟動(dòng)大量的子進(jìn)程起愈,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程:
from multiprocessing import Pool
import os,time,random
def long_time_task(name):
? ? ? ? print("Run task %s (%s)..." % (name,os.getpid()))
? ? ? ? start = time.time()
? ? ? ? time.sleep(random.random() * 3)
? ? ? ? end = time.time()
? ? ? ? print("Task %s runs %0.2f seconds." % (name,(end - start)))
if __name__ == "__main__":
? ? ? ? print('Parent process %s.'% os.getpid())?
? ? ? ? p = Pool(4)
????????foriinrange(5):?
? ? ? ? ????????p.apply_async(long_time_task, args=(i,))?
? ? ? ? ? ? ? ? print('Waiting for all subprocesses done...')
? ? ? ? ? ? ? ? p.close()?
? ? ? ? ? ? ? ? p.join()?
? ? ? ? ? ? ? ? print('All subprocesses done.')
執(zhí)行結(jié)果如下:
Parentprocess669.
Waitingforall subprocesses done...
Run task0(671)...
Run task1(672)...
Run task2(673)...
Run task3(674)...
Task2runs0.14seconds.
Runtask4(673)...
Task1runs0.27seconds.
Task3runs0.86seconds.
Task0runs1.41seconds.
Task4runs1.91seconds.
All subprocesses done.
代碼解讀:
對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close()译仗,調(diào)用close()之后就不能繼續(xù)添加新的Process了抬虽。
請(qǐng)注意輸出的結(jié)果,task0纵菌,1阐污,2,3是立刻執(zhí)行的咱圆,而task4要等待前面某個(gè)task完成后才執(zhí)行笛辟,這是因?yàn)镻ool的默認(rèn)大小在我的電腦上是4,因此序苏,最多同時(shí)執(zhí)行4個(gè)進(jìn)程手幢。這是Pool有意設(shè)計(jì)的限制,并不是操作系統(tǒng)的限制忱详。如果改成:
p = Pool(5)
就可以同時(shí)跑5個(gè)進(jìn)程围来。
由于Pool的默認(rèn)大小是CPU的核數(shù),如果你不幸擁有8核CPU踱阿,你要提交至少9個(gè)子進(jìn)程才能看到上面的等待效果
子進(jìn)程
很多時(shí)候管钳,子進(jìn)程并不是自身钦铁,而是一個(gè)外部進(jìn)程软舌。我們創(chuàng)建了子進(jìn)程后,還需要控制子進(jìn)程的輸入和輸出牛曹。
subprocess模塊可以讓我們非常方便地啟動(dòng)一個(gè)子進(jìn)程佛点,然后控制其輸入和輸出。
下面的例子演示了如何在Python代碼中運(yùn)行命令nslookup www.python.org黎比,這和命令行直接運(yùn)行的效果是一樣的:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup','www.python.org'])
print('Exit code:', r)
運(yùn)行結(jié)果:
$ nslookup www.python.org
Server:192.168.19.4
Address:192.168.19.4#53
Non-authoritativeanswer:
www.python.org \? ? canonical name = python.map.fastly.net.?
Name:python.map.fastly.net
Address:199.27.79.223
Exitcode:0
如果子進(jìn)程還需要輸入超营,則可以通過communicate()方法輸入:
importsubprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=sub
process.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的代碼相當(dāng)于命令行執(zhí)行命令nslookup,然后手動(dòng)輸入:
set q=mx
python.org
exit
運(yùn)行結(jié)果如下:
$ nslookup
Server:192.168.19.4
Address:192.168.19.4#53
Non-authoritativeanswer:
python.org?
mail exchanger =50mail.python.org.
Authoritativeanswers can be foundfrom:
mail.python.org internet address =82.94.164.166
mail.python.org hasAAAAaddress2001:888:2000:d::a6
Exitcode:0
進(jìn)程間通信
Process之間肯定是需要通信的,操作系統(tǒng)提供了很多機(jī)制來實(shí)現(xiàn)進(jìn)程間的通信阅虫。Python的multiprocessing模塊包裝了底層的機(jī)制演闭,提供了Queue、Pipes等多種方式來交換數(shù)據(jù)颓帝。
我們以Queue為例米碰,在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程窝革,一個(gè)往Queue里寫數(shù)據(jù),一個(gè)從Queue里讀數(shù)據(jù):
frommultiprocessingimportProcess, Queue
importos, time, random
# 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
defwrite(q):
????????print('Process to write: %s'% os.getpid())
????????forvaluein['A','B','C']:?
? ? ? ? print('Put %s to queue...'% value)
? ? ? ? q.put(value)
????????time.sleep(random.random())
# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
defread(q):
????????print('Process to read: %s'% os.getpid())
????????whileTrue:?
? ? ? ? ????????value = q.get(True)
????????????????print('Get %s from queue.'% value)
if__name__=='__main__'
????????:# 父進(jìn)程創(chuàng)建Queue吕座,并傳給各個(gè)子進(jìn)程:
????????q = Queue()
????????pw = Process(target=write, args=(q,))
????????pr = Process(target=read, args=(q,))
????????# 啟動(dòng)子進(jìn)程pw虐译,寫入:
????????pw.start()
????????# 啟動(dòng)子進(jìn)程pr,讀取:
????????pr.start()
????????# 等待pw結(jié)束:
????????pw.join()
????????# pr進(jìn)程里是死循環(huán)吴趴,無法等待其結(jié)束漆诽,只能強(qiáng)行終止:
????????pr.terminate()
運(yùn)行結(jié)果如下:
Process to write:50563
Put A to queue...
Process to read:50564
Get Afromqueue.
Put B to queue...
Get Bfromqueue.
Put C to queue...
Get Cfromqueue.
在Unix/Linux下,multiprocessing模塊封裝了fork()調(diào)用锣枝,使我們不需要關(guān)注fork()的細(xì)節(jié)厢拭。由于Windows沒有fork調(diào)用,因此撇叁,multiprocessing需要“模擬”出fork的效果蚪腐,父進(jìn)程所有Python對(duì)象都必須通過pickle序列化再傳到子進(jìn)程去,所有税朴,如果multiprocessing在Windows下調(diào)用失敗了回季,要先考慮是不是pickle失敗了。
小結(jié)
在Unix/Linux下正林,可以使用fork()調(diào)用實(shí)現(xiàn)多進(jìn)程泡一。
要實(shí)現(xiàn)跨平臺(tái)的多進(jìn)程,可以使用multiprocessing模塊觅廓。
進(jìn)程間通信是通過Queue鼻忠、Pipes等實(shí)現(xiàn)的。
多線程
多任務(wù)可以由多進(jìn)程完成杈绸,也可以由一個(gè)進(jìn)程內(nèi)的多線程完成帖蔓。
我們前面提到了進(jìn)程是由若干線程組成的,一個(gè)進(jìn)程至少有一個(gè)線程瞳脓。
由于線程是操作系統(tǒng)直接支持的執(zhí)行單元塑娇,因此,高級(jí)語言通常都內(nèi)置多線程的支持劫侧,Python也不例外埋酬,并且,Python的線程是真正的Posix Thread烧栋,而不是模擬出來的線程写妥。
Python的標(biāo)準(zhǔn)庫提供了兩個(gè)模塊:_thread和threading,_thread是低級(jí)模塊审姓,threading是高級(jí)模塊珍特,對(duì)_thread進(jìn)行了封裝。絕大多數(shù)情況下魔吐,我們只需要使用threading這個(gè)高級(jí)模塊扎筒。
啟動(dòng)一個(gè)線程就是把一個(gè)函數(shù)傳入并創(chuàng)建Thread實(shí)例呼猪,然后調(diào)用start()開始執(zhí)行:
importtime, threading
# 新線程執(zhí)行的代碼:
defloop():
????????print('thread %s is running...'% threading.current_thread().name)?
? ? ? ? n =0
????????while n <5:?
? ? ? ? ????????n = n +1
????????????????print('thread %s >>> %s'% (threading.current_thread().name, n))?
????????????????time.sleep(1)
????????print('thread %s ended.'% threading.current_thread().name)
print('thread %s is running...'% threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.'% threading.current_thread().name)
執(zhí)行結(jié)果如下:
thread MainThreadisrunning...
thread LoopThreadisrunning...
thread LoopThread >>>1
thread LoopThread >>>2
thread LoopThread >>>3
thread LoopThread >>>4
thread LoopThread >>>5
thread LoopThread ended.
thread MainThread ended.
由于任何進(jìn)程默認(rèn)就會(huì)啟動(dòng)一個(gè)線程,我們把該線程稱為主線程砸琅,主線程又可以啟動(dòng)新的線程宋距,Python的threading模塊有個(gè)current_thread()函數(shù),它永遠(yuǎn)返回當(dāng)前線程的實(shí)例症脂。主線程實(shí)例的名字叫MainThread谚赎,子線程的名字在創(chuàng)建時(shí)指定,我們用LoopThread命名子線程诱篷。名字僅僅在打印時(shí)用來顯示壶唤,完全沒有其他意義,如果不起名字Python就自動(dòng)給線程命名為Thread-1棕所,Thread-2……
Lock
多線程和多進(jìn)程最大的不同在于闸盔,多進(jìn)程中,同一個(gè)變量琳省,各自有一份拷貝存在于每個(gè)進(jìn)程中迎吵,互不影響,而多線程中针贬,所有變量都由所有線程共享击费,所以,任何一個(gè)變量都可以被任何一個(gè)線程修改桦他,因此蔫巩,線程之間共享數(shù)據(jù)最大的危險(xiǎn)在于多個(gè)線程同時(shí)改一個(gè)變量,把內(nèi)容給改亂了快压。
來看看多個(gè)線程同時(shí)操作一個(gè)變量怎么把內(nèi)容給改亂了:
importtime, threading
# 假定這是你的銀行存款:
balance =0
defchange_it(n):
????????# 先存后取圆仔,結(jié)果應(yīng)該為0:
????????globalbalance?
? ? ? ? balance = balance + n
????????balance = balance - n
defrun_thread(n):
????????for i in range(100000):
????????change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
我們定義了一個(gè)共享變量balance,初始值為0蔫劣,并且啟動(dòng)兩個(gè)線程坪郭,先存后取,理論上結(jié)果應(yīng)該為0拦宣,但是截粗,由于線程的調(diào)度是由操作系統(tǒng)決定的,當(dāng)t1鸵隧、t2交替執(zhí)行時(shí),只要循環(huán)次數(shù)足夠多意推,balance的結(jié)果就不一定是0了豆瘫。
原因是因?yàn)楦呒?jí)語言的一條語句在CPU執(zhí)行時(shí)是若干條語句,即使一個(gè)簡單的計(jì)算:
balance = balance + n
也分兩步:
? ? 1. 計(jì)算 balance + n ,存入臨時(shí)變量中菊值;
? ? 2. 將臨時(shí)變量的值賦值給balance.
也就是可以看成:
x = balance + n
balance = x
由于x是局部變量外驱,兩個(gè)線程各自都有自己的x育灸,當(dāng)代碼正常執(zhí)行時(shí):
初始值 balance = 0
t1:x1 = balance +5# x1 = 0 + 5 = 5
t1:balance = x1# balance = 5
t1:x1 = balance -5# x1 = 5 - 5 = 0
t1:balance = x1# balance = 0
t2:x2 = balance +8# x2 = 0 + 8 = 8
t2:balance = x2# balance = 8
t2:x2 = balance -8# x2 = 8 - 8 = 0
t2:balance = x2# balance = 0
結(jié)果 balance =0
但是t1和t2是交替運(yùn)行的,如果操作系統(tǒng)以下面的順序執(zhí)行t1昵宇、t2:
初始值 balance =0
t1:x1 = balance +5# x1 = 0 + 5 = 5
t2:x2 = balance +8# x2 = 0 + 8 = 8
t2:balance = x2# balance = 8
t1:balance = x1# balance = 5
t1:x1 = balance -5# x1 = 5 - 5 = 0
t1:balance = x1# balance = 0
t2:x2 = balance -8# x2 = 0 - 8 = -8
t2:balance = x2# balance = -8
結(jié)果 balance = -8
究其原因磅崭,是因?yàn)樾薷腷alance需要多條語句,而執(zhí)行這幾條語句時(shí)瓦哎,線程可能中斷砸喻,從而導(dǎo)致多個(gè)線程把同一個(gè)對(duì)象的內(nèi)容改亂了。
兩個(gè)線程同時(shí)一存一取蒋譬,就可能導(dǎo)致余額不對(duì)割岛,你肯定不希望你的銀行存款莫名其妙地變成了負(fù)數(shù),所以犯助,我們必須確保一個(gè)線程在修改balance的時(shí)候癣漆,別的線程一定不能改。
如果我們要確保balance計(jì)算正確剂买,就要給change_it()上一把鎖惠爽,當(dāng)某個(gè)線程開始執(zhí)行change_it()時(shí),我們說瞬哼,該線程因?yàn)楂@得了鎖疆股,因此其他線程不能同時(shí)執(zhí)行change_it(),只能等待倒槐,直到鎖被釋放后旬痹,獲得該鎖以后才能改。由于鎖只有一個(gè)讨越,無論多少線程两残,同一時(shí)刻最多只有一個(gè)線程持有該鎖,所以把跨,不會(huì)造成修改的沖突人弓。創(chuàng)建一個(gè)鎖就是通過threading.Lock()來實(shí)現(xiàn):
balance =0
lock = threading.Lock()
defrun_thread(n):
????????foriinrange(100000):
????????# 先要獲取鎖:lock.acquire()
????????try:
????????????????# 放心地改吧:
????????????????change_it(n)
????????finally:
????????????????# 改完了一定要釋放鎖:
????????????????lock.release()
當(dāng)多個(gè)線程同時(shí)執(zhí)行l(wèi)ock.acquire()時(shí),只有一個(gè)線程能成功地獲取鎖着逐,然后繼續(xù)執(zhí)行代碼崔赌,其他線程就繼續(xù)等待直到獲得鎖為止。
獲得鎖的線程用完后一定要釋放鎖耸别,否則那些苦苦等待鎖的線程將永遠(yuǎn)等待下去健芭,成為死線程。所以我們用try...finally來確保鎖一定會(huì)被釋放秀姐。
鎖的好處就是確保了某段關(guān)鍵代碼只能由一個(gè)線程從頭到尾完整地執(zhí)行慈迈,壞處當(dāng)然也很多,首先是阻止了多線程并發(fā)執(zhí)行省有,包含鎖的某段代碼實(shí)際上只能以單線程模式執(zhí)行痒留,效率就大大地下降了谴麦。其次,由于可以存在多個(gè)鎖伸头,不同的線程持有不同的鎖匾效,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖恤磷,導(dǎo)致多個(gè)線程全部掛起面哼,既不能執(zhí)行,也無法結(jié)束碗殷,只能靠操作系統(tǒng)強(qiáng)制終止精绎。
多核CPU
如果你不幸擁有一個(gè)多核CPU,你肯定在想锌妻,多核應(yīng)該可以同時(shí)執(zhí)行多個(gè)線程代乃。
如果寫一個(gè)死循環(huán)的話,會(huì)出現(xiàn)什么情況呢仿粹?
打開Mac OS X的Activity Monitor搁吓,或者Windows的Task Manager,都可以監(jiān)控某個(gè)進(jìn)程的CPU使用率吭历。
我們可以監(jiān)控到一個(gè)死循環(huán)線程會(huì)100%占用一個(gè)CPU堕仔。
如果有兩個(gè)死循環(huán)線程,在多核CPU中晌区,可以監(jiān)控到會(huì)占用200%的CPU摩骨,也就是占用兩個(gè)CPU核心。
要想把N核CPU的核心全部跑滿朗若,就必須啟動(dòng)N個(gè)死循環(huán)線程恼五。
試試用Python寫個(gè)死循環(huán):
import threading, multiprocessing
defloop():
????????x =0
????????whileTrue:
????????????????x = x ^1
for i in range(multiprocessing.cpu_count()):
????????t = threading.Thread(target=loop)
????????t.start()
啟動(dòng)與CPU核心數(shù)量相同的N個(gè)線程,在4核CPU上可以監(jiān)控到CPU占用率僅有102%哭懈,也就是僅使用了一核灾馒。
但是用C、C++或Java來改寫相同的死循環(huán)遣总,直接可以把全部核心跑滿睬罗,4核就跑到400%,8核就跑到800%旭斥,為什么Python不行呢容达?
因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí)琉预,有一個(gè)GIL鎖:Global Interpreter Lock董饰,任何Python線程執(zhí)行前,必須先獲得GIL鎖圆米,然后卒暂,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖娄帖,讓別的線程有機(jī)會(huì)執(zhí)行也祠。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以近速,多線程在Python中只能交替執(zhí)行诈嘿,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核削葱。
GIL是Python解釋器設(shè)計(jì)的歷史遺留問題奖亚,通常我們用的解釋器是官方實(shí)現(xiàn)的CPython,要真正利用多核析砸,除非重寫一個(gè)不帶GIL的解釋器昔字。
所以,在Python中首繁,可以使用多線程作郭,但不要指望能有效利用多核。如果一定要通過多線程利用多核弦疮,那只能通過C擴(kuò)展來實(shí)現(xiàn)夹攒,不過這樣就失去了Python簡單易用的特點(diǎn)。
不過胁塞,也不用過于擔(dān)心咏尝,Python雖然不能利用多線程實(shí)現(xiàn)多核任務(wù),但可以通過多進(jìn)程實(shí)現(xiàn)多核任務(wù)啸罢。多個(gè)Python進(jìn)程有各自獨(dú)立的GIL鎖编检,互不影響。
小結(jié)
多線程編程伺糠,模型復(fù)雜蒙谓,容易發(fā)生沖突,必須用鎖加以隔離训桶,同時(shí)累驮,又要小心死鎖的發(fā)生。
Python解釋器由于設(shè)計(jì)時(shí)有GIL全局鎖舵揭,導(dǎo)致了多線程無法利用多核谤专。多線程的并發(fā)在Python中就是一個(gè)美麗的夢(mèng)。