0三篡撵、線(xiàn)程和進(jìn)程的關(guān)系
1、一個(gè)進(jìn)程可以有多個(gè)線(xiàn)程豆挽,但至少有一個(gè)線(xiàn)程育谬;而一個(gè)線(xiàn)程只能在一個(gè)進(jìn)程的地址空間內(nèi)活動(dòng)。
2帮哈、資源分配給進(jìn)程膛檀,同一個(gè)進(jìn)程的所有線(xiàn)程共享該進(jìn)程所有資源。
3娘侍、CPU分配給線(xiàn)程咖刃,即真正在處理器運(yùn)行的是線(xiàn)程。
4憾筏、線(xiàn)程在執(zhí)行過(guò)程中需要協(xié)作同步嚎杨,不同進(jìn)程的線(xiàn)程間要利用消息通信的辦法實(shí)現(xiàn)同步。
注:進(jìn)程是最基本的資源擁有單位和調(diào)度單位氧腰。
進(jìn)程間通信方式:(1)消息傳遞(2)共享存儲(chǔ)(3)管道通信
搞定python多線(xiàn)程和多進(jìn)程
2017年02月24日 22:30
1 概念梳理:
1.1 線(xiàn)程1.1.1 什么是線(xiàn)程
線(xiàn)程是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位枫浙。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際運(yùn)作單位古拴。一條線(xiàn)程指的是進(jìn)程中一個(gè)單一順序的控制流箩帚,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線(xiàn)程,每條線(xiàn)程并行執(zhí)行不同的任務(wù)黄痪。一個(gè)線(xiàn)程是一個(gè)execution context(執(zhí)行上下文)紧帕,即一個(gè)cpu執(zhí)行時(shí)所需要的一串指令。
1.1.2 線(xiàn)程的工作方式
假設(shè)你正在讀一本書(shū)桅打,沒(méi)有讀完焕参,你想休息一下,但是你想在回來(lái)時(shí)恢復(fù)到當(dāng)時(shí)讀的具體進(jìn)度油额。有一個(gè)方法就是記下頁(yè)數(shù)叠纷、行數(shù)與字?jǐn)?shù)這三個(gè)數(shù)值,這些數(shù)值就是execution context潦嘶。如果你的室友在你休息的時(shí)候涩嚣,使用相同的方法讀這本書(shū)崇众。你和她只需要這三個(gè)數(shù)字記下來(lái)就可以在交替的時(shí)間共同閱讀這本書(shū)了。
線(xiàn)程的工作方式與此類(lèi)似航厚。CPU會(huì)給你一個(gè)在同一時(shí)間能夠做多個(gè)運(yùn)算的幻覺(jué)顷歌,實(shí)際上它在每個(gè)運(yùn)算上只花了極少的時(shí)間,本質(zhì)上CPU同一時(shí)刻只干了一件事幔睬。它能這樣做就是因?yàn)樗忻總€(gè)運(yùn)算的execution context眯漩。就像你能夠和你朋友共享同一本書(shū)一樣,多任務(wù)也能共享同一塊CPU麻顶。
1.2 進(jìn)程
一個(gè)程序的執(zhí)行實(shí)例就是一個(gè)進(jìn)程赦抖。每一個(gè)進(jìn)程提供執(zhí)行程序所需的所有資源。(進(jìn)程本質(zhì)上是資源的集合)
一個(gè)進(jìn)程有一個(gè)虛擬的地址空間辅肾、可執(zhí)行的代碼队萤、操作系統(tǒng)的接口、安全的上下文(記錄啟動(dòng)該進(jìn)程的用戶(hù)和權(quán)限等等)矫钓、唯一的進(jìn)程ID要尔、環(huán)境變量、優(yōu)先級(jí)類(lèi)新娜、最小和最大的工作空間(內(nèi)存空間)赵辕,還要有至少一個(gè)線(xiàn)程。
每一個(gè)進(jìn)程啟動(dòng)時(shí)都會(huì)最先產(chǎn)生一個(gè)線(xiàn)程概龄,即主線(xiàn)程还惠。然后主線(xiàn)程會(huì)再創(chuàng)建其他的子線(xiàn)程。
與進(jìn)程相關(guān)的資源包括:
內(nèi)存頁(yè)(同一個(gè)進(jìn)程中的所有線(xiàn)程共享同一個(gè)內(nèi)存空間)
文件描述符(e.g. open sockets)
安全憑證(e.g.啟動(dòng)該進(jìn)程的用戶(hù)ID)
1.3 進(jìn)程與線(xiàn)程區(qū)別
1.同一個(gè)進(jìn)程中的線(xiàn)程共享同一內(nèi)存空間旁钧,但是進(jìn)程之間是獨(dú)立的。
2.同一個(gè)進(jìn)程中的所有線(xiàn)程的數(shù)據(jù)是共享的(進(jìn)程通訊)互拾,進(jìn)程之間的數(shù)據(jù)是獨(dú)立的歪今。
3.對(duì)主線(xiàn)程的修改可能會(huì)影響其他線(xiàn)程的行為,但是父進(jìn)程的修改(除了刪除以外)不會(huì)影響其他子進(jìn)程颜矿。
4.線(xiàn)程是一個(gè)上下文的執(zhí)行指令寄猩,而進(jìn)程則是與運(yùn)算相關(guān)的一簇資源。
5.同一個(gè)進(jìn)程的線(xiàn)程之間可以直接通信骑疆,但是進(jìn)程之間的交流需要借助中間代理來(lái)實(shí)現(xiàn)田篇。
6.創(chuàng)建新的線(xiàn)程很容易,但是創(chuàng)建新的進(jìn)程需要對(duì)父進(jìn)程做一次復(fù)制箍铭。
7.一個(gè)線(xiàn)程可以操作同一進(jìn)程的其他線(xiàn)程泊柬,但是進(jìn)程只能操作其子進(jìn)程。
8.線(xiàn)程啟動(dòng)速度快诈火,進(jìn)程啟動(dòng)速度慢(但是兩者運(yùn)行速度沒(méi)有可比性)兽赁。
2 多線(xiàn)程
2.1 線(xiàn)程常用方法
方法 注釋
start() 線(xiàn)程準(zhǔn)備就緒,等待CPU調(diào)度
setName() 為線(xiàn)程設(shè)置名稱(chēng)
getName() 獲取線(xiàn)程名稱(chēng)
setDaemon(True) 設(shè)置為守護(hù)線(xiàn)程
join() 逐個(gè)執(zhí)行每個(gè)線(xiàn)程,執(zhí)行完畢后繼續(xù)往下執(zhí)行
run() 線(xiàn)程被cpu調(diào)度后自動(dòng)執(zhí)行線(xiàn)程對(duì)象的run方法刀崖,如果想自定義線(xiàn)程類(lèi)惊科,直接重寫(xiě)run方法就行了
2.1.1 Thread類(lèi)
1.普通創(chuàng)建方式
import threading import time defrun(n): print("task", n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) t1 = threading.Thread(target=run, args=("t1",)) t2 = threading.Thread(target=run, args=("t2",)) t1.start() t2.start() """ task t1 task t2 2s 2s 1s 1s 0s 0s """
2.繼承threading.Thread來(lái)自定義線(xiàn)程類(lèi)
其本質(zhì)是重構(gòu)Thread類(lèi)中的run方法
import threading import time classMyThread(threading.Thread):def__init__(self, n): super(MyThread, self).init() # 重構(gòu)run函數(shù)必須要寫(xiě) self.n = n defrun(self): print("task", self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if name == "main": t1 = MyThread("t1") t2 = MyThread("t2") t1.start() t2.start()2.1.2 計(jì)算子線(xiàn)程執(zhí)行的時(shí)間
注:sleep的時(shí)候是不會(huì)占用cpu的,在sleep的時(shí)候操作系統(tǒng)會(huì)把線(xiàn)程暫時(shí)掛起。
join() #等此線(xiàn)程執(zhí)行完后亮钦,再執(zhí)行其他線(xiàn)程或主線(xiàn)程 threading.current_thread() #輸出當(dāng)前線(xiàn)程import threading import time defrun(n): print("task", n,threading.current_thread()) #輸出當(dāng)前的線(xiàn)程 time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') strat_time = time.time() t_obj = [] #定義列表用于存放子線(xiàn)程實(shí)例for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) """ 由主線(xiàn)程生成的三個(gè)子線(xiàn)程 task t-0 <Thread(Thread-1, started 44828)> task t-1 <Thread(Thread-2, started 42804)> task t-2 <Thread(Thread-3, started 41384)> """for tmp in t_obj: t.join() #為每個(gè)子線(xiàn)程添加join之后馆截,主線(xiàn)程就會(huì)等這些子線(xiàn)程執(zhí)行完之后再執(zhí)行。 print("cost:", time.time() - strat_time) #主線(xiàn)程 print(threading.current_thread()) #輸出當(dāng)前線(xiàn)程""" <_MainThread(MainThread, started 43740)> """2.1.3 統(tǒng)計(jì)當(dāng)前活躍的線(xiàn)程數(shù)
由于主線(xiàn)程比子線(xiàn)程快很多蜂莉,當(dāng)主線(xiàn)程執(zhí)行active_count()時(shí)蜡娶,其他子線(xiàn)程都還沒(méi)執(zhí)行完畢,因此利用主線(xiàn)程統(tǒng)計(jì)的活躍的線(xiàn)程數(shù)num = sub_num(子線(xiàn)程數(shù)量)+1(主線(xiàn)程本身)
import threading import time defrun(n): print("task", n) time.sleep(1) #此時(shí)子線(xiàn)程停1sfor i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(0.5) #主線(xiàn)程停0.5秒 print(threading.active_count()) #輸出當(dāng)前活躍的線(xiàn)程數(shù)""" task t-0 task t-1 task t-2 4 """
由于主線(xiàn)程比子線(xiàn)程慢很多巡语,當(dāng)主線(xiàn)程執(zhí)行active_count()時(shí)翎蹈,其他子線(xiàn)程都已經(jīng)執(zhí)行完畢,因此利用主線(xiàn)程統(tǒng)計(jì)的活躍的線(xiàn)程數(shù)num = 1(主線(xiàn)程本身)
import threading import time defrun(n): print("task", n) time.sleep(0.5) #此時(shí)子線(xiàn)程停0.5sfor i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() time.sleep(1) #主線(xiàn)程停1秒 print(threading.active_count()) #輸出活躍的線(xiàn)程數(shù)""" task t-0 task t-1 task t-2 1 """
此外我們還能發(fā)現(xiàn)在python內(nèi)部默認(rèn)會(huì)等待最后一個(gè)進(jìn)程執(zhí)行完后再執(zhí)行exit()男公,或者說(shuō)python內(nèi)部在此時(shí)有一個(gè)隱藏的join()荤堪。
2.2 守護(hù)進(jìn)程
我們看下面這個(gè)例子,這里使用setDaemon(True)把所有的子線(xiàn)程都變成了主線(xiàn)程的守護(hù)線(xiàn)程枢赔,因此當(dāng)主進(jìn)程結(jié)束后澄阳,子線(xiàn)程也會(huì)隨之結(jié)束。所以當(dāng)主線(xiàn)程結(jié)束后踏拜,整個(gè)程序就退出了碎赢。
import threading import time defrun(n): print("task", n) time.sleep(1) #此時(shí)子線(xiàn)程停1s print('3') time.sleep(1) print('2') time.sleep(1) print('1') for i in range(3): t = threading.Thread(target=run, args=("t-%s" % i,)) t.setDaemon(True) #把子進(jìn)程設(shè)置為守護(hù)線(xiàn)程,必須在start()之前設(shè)置 t.start() time.sleep(0.5) #主線(xiàn)程停0.5秒 print(threading.active_count()) #輸出活躍的線(xiàn)程數(shù)""" task t-0 task t-1 task t-2 4 Process finished with exit code 0 """2.3 GIL
在非python環(huán)境中速梗,單核情況下肮塞,同時(shí)只能有一個(gè)任務(wù)執(zhí)行。多核時(shí)可以支持多個(gè)線(xiàn)程同時(shí)執(zhí)行姻锁。但是在python中枕赵,無(wú)論有多少核,同時(shí)只能執(zhí)行一個(gè)線(xiàn)程位隶。究其原因拷窜,這就是由于GIL的存在導(dǎo)致的。
GIL的全稱(chēng)是Global Interpreter Lock(全局解釋器鎖)涧黄,來(lái)源是python設(shè)計(jì)之初的考慮篮昧,為了數(shù)據(jù)安全所做的決定。某個(gè)線(xiàn)程想要執(zhí)行笋妥,必須先拿到GIL懊昨,我們可以把GIL看作是“通行證”,并且在一個(gè)python進(jìn)程中春宣,GIL只有一個(gè)疚颊。拿不到通行證的線(xiàn)程狈孔,就不允許進(jìn)入CPU執(zhí)行。GIL只在cpython中才有材义,因?yàn)閏python調(diào)用的是c語(yǔ)言的原生線(xiàn)程均抽,所以他不能直接操作cpu,只能利用GIL保證同一時(shí)間只能有一個(gè)線(xiàn)程拿到數(shù)據(jù)其掂。而在pypy和jpython中是沒(méi)有GIL的油挥。
Python多線(xiàn)程的工作過(guò)程:
python在使用多線(xiàn)程的時(shí)候,調(diào)用的是c語(yǔ)言的原生線(xiàn)程款熬。
拿到公共數(shù)據(jù)
申請(qǐng)gil
python解釋器調(diào)用os原生線(xiàn)程
os操作cpu執(zhí)行運(yùn)算
當(dāng)該線(xiàn)程執(zhí)行時(shí)間到后深寥,無(wú)論運(yùn)算是否已經(jīng)執(zhí)行完,gil都被要求釋放
進(jìn)而由其他進(jìn)程重復(fù)上面的過(guò)程
等其他進(jìn)程執(zhí)行完后贤牛,又會(huì)切換到之前的線(xiàn)程(從他記錄的上下文繼續(xù)執(zhí)行)
整個(gè)過(guò)程是每個(gè)線(xiàn)程執(zhí)行自己的運(yùn)算惋鹅,當(dāng)執(zhí)行時(shí)間到就進(jìn)行切換(context switch)。
python針對(duì)不同類(lèi)型的代碼執(zhí)行效率也是不同的:
1殉簸、CPU密集型代碼(各種循環(huán)處理闰集、計(jì)算等等),在這種情況下般卑,由于計(jì)算工作多武鲁,ticks計(jì)數(shù)很快就會(huì)達(dá)到閾值,然后觸發(fā)GIL的釋放與再競(jìng)爭(zhēng)(多個(gè)線(xiàn)程來(lái)回切換當(dāng)然是需要消耗資源的)蝠检,所以python下的多線(xiàn)程對(duì)CPU密集型代碼并不友好沐鼠。
2、IO密集型代碼(文件處理叹谁、網(wǎng)絡(luò)爬蟲(chóng)等涉及文件讀寫(xiě)的操作)饲梭,多線(xiàn)程能夠有效提升效率(單線(xiàn)程下有IO操作會(huì)進(jìn)行IO等待,造成不必要的時(shí)間浪費(fèi)焰檩,而開(kāi)啟多線(xiàn)程能在線(xiàn)程A等待時(shí)憔涉,自動(dòng)切換到線(xiàn)程B,可以不浪費(fèi)CPU的資源锅尘,從而能提升程序執(zhí)行效率)监氢。所以python的多線(xiàn)程對(duì)IO密集型代碼比較友好布蔗。
使用建議藤违?
python下想要充分利用多核CPU,就用多進(jìn)程纵揍。因?yàn)槊總€(gè)進(jìn)程有各自獨(dú)立的GIL顿乒,互不干擾,這樣就可以真正意義上的并行執(zhí)行泽谨,在python中璧榄,多進(jìn)程的執(zhí)行效率優(yōu)于多線(xiàn)程(僅僅針對(duì)多核CPU而言)特漩。
GIL在python中的版本差異:
1、在python2.x里骨杂,GIL的釋放邏輯是當(dāng)前線(xiàn)程遇見(jiàn)IO操作或者ticks計(jì)數(shù)達(dá)到100時(shí)進(jìn)行釋放涂身。(ticks可以看作是python自身的一個(gè)計(jì)數(shù)器,專(zhuān)門(mén)做用于GIL搓蚪,每次釋放后歸零蛤售,這個(gè)計(jì)數(shù)可以通過(guò)sys.setcheckinterval 來(lái)調(diào)整)。而每次釋放GIL鎖妒潭,線(xiàn)程進(jìn)行鎖競(jìng)爭(zhēng)悴能、切換線(xiàn)程,會(huì)消耗資源雳灾。并且由于GIL鎖存在漠酿,python里一個(gè)進(jìn)程永遠(yuǎn)只能同時(shí)執(zhí)行一個(gè)線(xiàn)程(拿到GIL的線(xiàn)程才能執(zhí)行),這就是為什么在多核CPU上谎亩,python的多線(xiàn)程效率并不高炒嘲。
2、在python3.x中团驱,GIL不使用ticks計(jì)數(shù)摸吠,改為使用計(jì)時(shí)器(執(zhí)行時(shí)間達(dá)到閾值后,當(dāng)前線(xiàn)程釋放GIL)嚎花,這樣對(duì)CPU密集型程序更加友好寸痢,但依然沒(méi)有解決GIL導(dǎo)致的同一時(shí)間只能執(zhí)行一個(gè)線(xiàn)程的問(wèn)題,所以效率依然不盡如人意紊选。
2.4 線(xiàn)程鎖
由于線(xiàn)程之間是進(jìn)行隨機(jī)調(diào)度啼止,并且每個(gè)線(xiàn)程可能只執(zhí)行n條執(zhí)行之后,當(dāng)多個(gè)線(xiàn)程同時(shí)修改同一條數(shù)據(jù)時(shí)可能會(huì)出現(xiàn)臟數(shù)據(jù)兵罢,所以献烦,出現(xiàn)了線(xiàn)程鎖,即同一時(shí)刻允許一個(gè)線(xiàn)程執(zhí)行操作卖词。線(xiàn)程鎖用于鎖定資源巩那,你可以定義多個(gè)鎖, 像下面的代碼, 當(dāng)你需要獨(dú)占某一資源時(shí),任何一個(gè)鎖都可以鎖這個(gè)資源此蜈,就好比你用不同的鎖都可以把相同的一個(gè)門(mén)鎖住是一個(gè)道理即横。
由于線(xiàn)程之間是進(jìn)行隨機(jī)調(diào)度,如果有多個(gè)線(xiàn)程同時(shí)操作一個(gè)對(duì)象裆赵,如果沒(méi)有很好地保護(hù)該對(duì)象东囚,會(huì)造成程序結(jié)果的不可預(yù)期,我們也稱(chēng)此為“線(xiàn)程不安全”战授。
實(shí)測(cè):在python2.7页藻、mac os下桨嫁,運(yùn)行以下代碼可能會(huì)產(chǎn)生臟數(shù)據(jù)。但是在python3中就不一定會(huì)出現(xiàn)下面的問(wèn)題份帐。import threading import time defrun(n):global num num += 1 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print"num:", num """ 產(chǎn)生臟數(shù)據(jù)后的運(yùn)行結(jié)果: num: 19999 """2.5 互斥鎖(mutex)
為了方式上面情況的發(fā)生璃吧,就出現(xiàn)了互斥鎖(Lock)
import threading import time defrun(n): lock.acquire() #獲取鎖global num num += 1 lock.release() #釋放鎖 lock = threading.Lock() #實(shí)例化一個(gè)鎖對(duì)象 num = 0 t_obj = [] for i in range(20000): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() t_obj.append(t) for t in t_obj: t.join() print"num:", num2.6 遞歸鎖
RLcok類(lèi)的用法和Lock類(lèi)一模一樣,但它支持嵌套废境,肚逸,在多個(gè)鎖沒(méi)有釋放的時(shí)候一般會(huì)使用使用RLcok類(lèi)。
import threading import time gl_num = 0 lock = threading.RLock() defFunc(): lock.acquire() global gl_num gl_num +=1 time.sleep(1) print gl_num lock.release() for i in range(10): t = threading.Thread(target=Func) t.start()2.7 信號(hào)量(BoundedSemaphore類(lèi))
互斥鎖同時(shí)只允許一個(gè)線(xiàn)程更改數(shù)據(jù)彬坏,而Semaphore是同時(shí)允許一定數(shù)量的線(xiàn)程更改數(shù)據(jù) 朦促,比如廁所有3個(gè)坑,那最多只允許3個(gè)人上廁所栓始,后面的人只能等里面有人出來(lái)了才能再進(jìn)去务冕。
import threading import time defrun(n): semaphore.acquire() #加鎖 time.sleep(1) print("run the thread:%s\n" % n) semaphore.release() #釋放 num = 0 semaphore = threading.BoundedSemaphore(5) # 最多允許5個(gè)線(xiàn)程同時(shí)運(yùn)行for i in range(22): t = threading.Thread(target=run, args=("t-%s" % i,)) t.start() while threading.active_count() != 1: pass# print threading.active_count()else: print('-----all threads done-----')
2.8 事件(Event類(lèi))
python線(xiàn)程的事件用于主線(xiàn)程控制其他線(xiàn)程的執(zhí)行,事件是一個(gè)簡(jiǎn)單的線(xiàn)程同步對(duì)象幻赚,其主要提供以下幾個(gè)方法:
方法 注釋
clear 將flag設(shè)置為“False”
set 將flag設(shè)置為“True”
is_set 判斷是否設(shè)置了flag
wait 會(huì)一直監(jiān)聽(tīng)flag禀忆,如果沒(méi)有檢測(cè)到flag就一直處于阻塞狀態(tài)
事件處理的機(jī)制:全局定義了一個(gè)“Flag”,當(dāng)flag值為“False”落恼,那么event.wait()就會(huì)阻塞箩退,當(dāng)flag值為“True”,那么event.wait()便不再阻塞佳谦。
利用Event類(lèi)模擬紅綠燈import threading import time event = threading.Event() deflighter(): count = 0 event.set() #初始值為綠燈whileTrue: if5 < count <=10 : event.clear() # 紅燈戴涝,清除標(biāo)志位 print("\33[41;1mred light is on...\033[0m") elif count > 10: event.set() # 綠燈,設(shè)置標(biāo)志位 count = 0else: print("\33[42;1mgreen light is on...\033[0m") time.sleep(1) count += 1defcar(name):whileTrue: if event.is_set(): #判斷是否設(shè)置了標(biāo)志位 print("[%s] running..."%name) time.sleep(1) else: print("[%s] sees red light,waiting..."%name) event.wait() print("[%s] green light is on,start going..."%name) light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=("MINI",)) car.start()2.9 條件(Condition類(lèi))
使得線(xiàn)程等待钻蔑,只有滿(mǎn)足某條件時(shí)啥刻,才釋放n個(gè)線(xiàn)程
2.10 定時(shí)器(Timer類(lèi))
定時(shí)器,指定n秒后執(zhí)行某操作
from threading import Timer defhello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed
3 多進(jìn)程
在linux中咪笑,每個(gè)進(jìn)程都是由父進(jìn)程提供的可帽。每啟動(dòng)一個(gè)子進(jìn)程就從父進(jìn)程克隆一份數(shù)據(jù),但是進(jìn)程之間的數(shù)據(jù)本身是不能共享的窗怒。
from multiprocessing import Process import time deff(name): time.sleep(2) print('hello', name) if name == 'main': p = Process(target=f, args=('bob',)) p.start() p.join() from multiprocessing import Process import os definfo(title): print(title) print('module name:', name) print('parent process:', os.getppid()) #獲取父進(jìn)程id print('process id:', os.getpid()) #獲取自己的進(jìn)程id print("\n\n") deff(name): info('\033[31;1mfunction f\033[0m') print('hello', name) if name == 'main': info('\033[32;1mmain process line\033[0m') p = Process(target=f, args=('bob',)) p.start() p.join()3.1 進(jìn)程間通信
由于進(jìn)程之間數(shù)據(jù)是不共享的映跟,所以不會(huì)出現(xiàn)多線(xiàn)程GIL帶來(lái)的問(wèn)題。多進(jìn)程之間的通信通過(guò)Queue()或Pipe()來(lái)實(shí)現(xiàn)
3.1.1 Queue()
使用方法跟threading里的queue差不多
from multiprocessing import Process, Queue deff(q): q.put([42, None, 'hello']) if name == 'main': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()3.1.2 Pipe()
Pipe的本質(zhì)是進(jìn)程之間的數(shù)據(jù)傳遞扬虚,而不是數(shù)據(jù)共享努隙,這和socket有點(diǎn)像。pipe()返回兩個(gè)連接對(duì)象分別表示管道的兩端孔轴,每端都有send()和recv()方法剃法。如果兩個(gè)進(jìn)程試圖在同一時(shí)間的同一端進(jìn)行讀取和寫(xiě)入那么碎捺,這可能會(huì)損壞管道中的數(shù)據(jù)路鹰。
from multiprocessing import Process, Pipe deff(conn): conn.send([42, None, 'hello']) conn.close() if name == 'main': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()3.2 Manager
通過(guò)Manager可實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)的共享贷洲。Manager()返回的manager對(duì)象會(huì)通過(guò)一個(gè)服務(wù)進(jìn)程,來(lái)使其他進(jìn)程通過(guò)代理的方式操作python對(duì)象晋柱。manager對(duì)象支持 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value ,Array.
from multiprocessing import Process, Manager deff(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.append(1) print(l) if name == 'main': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d, l)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)3.3 進(jìn)程鎖(進(jìn)程同步)
數(shù)據(jù)輸出的時(shí)候保證不同進(jìn)程的輸出內(nèi)容在同一塊屏幕正常顯示优构,防止數(shù)據(jù)亂序的情況。
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock deff(l, i): l.acquire() try: print('hello world', i) finally: l.release() if name == 'main': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()3.4 進(jìn)程池
由于進(jìn)程啟動(dòng)的開(kāi)銷(xiāo)比較大雁竞,使用多進(jìn)程的時(shí)候會(huì)導(dǎo)致大量?jī)?nèi)存空間被消耗钦椭。為了防止這種情況發(fā)生可以使用進(jìn)程池,(由于啟動(dòng)線(xiàn)程的開(kāi)銷(xiāo)比較小碑诉,所以不需要線(xiàn)程池這種概念彪腔,多線(xiàn)程只會(huì)頻繁得切換cpu導(dǎo)致系統(tǒng)變慢,并不會(huì)占用過(guò)多的內(nèi)存空間)
進(jìn)程池中常用方法:
apply() 同步執(zhí)行(串行)
apply_async() 異步執(zhí)行(并行)
terminate() 立刻關(guān)閉進(jìn)程池
join() 主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢进栽。必須在close或terminate()之后德挣。
close() 等待所有進(jìn)程結(jié)束后,才關(guān)閉進(jìn)程池快毛。
from multiprocessing import Process,Pool import time defFoo(i): time.sleep(2) return i+100defBar(arg): print(,arg) pool = Pool(5) #允許進(jìn)程池同時(shí)放入5個(gè)進(jìn)程for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) #func子進(jìn)程執(zhí)行完后格嗅,才會(huì)執(zhí)行callback,否則callback不執(zhí)行(而且callback是由父進(jìn)程來(lái)執(zhí)行了)#pool.apply(func=Foo, args=(i,)) print('end') pool.close() pool.join() #主進(jìn)程等待所有子進(jìn)程執(zhí)行完畢唠帝。必須在close()或terminate()之后屯掖。
進(jìn)程池內(nèi)部維護(hù)一個(gè)進(jìn)程序列,當(dāng)使用時(shí)襟衰,去進(jìn)程池中獲取一個(gè)進(jìn)程贴铜,如果進(jìn)程池序列中沒(méi)有可供使用的進(jìn)程,那么程序就會(huì)等待瀑晒,直到進(jìn)程池中有可用進(jìn)程為止阀湿。在上面的程序中產(chǎn)生了10個(gè)進(jìn)程,但是只能有5同時(shí)被放入進(jìn)程池瑰妄,剩下的都被暫時(shí)掛起陷嘴,并不占用內(nèi)存空間,等前面的五個(gè)進(jìn)程執(zhí)行完后间坐,再執(zhí)行剩下5個(gè)進(jìn)程灾挨。
4 補(bǔ)充:協(xié)程
線(xiàn)程和進(jìn)程的操作是由程序觸發(fā)系統(tǒng)接口,最后的執(zhí)行者是系統(tǒng)竹宋,它本質(zhì)上是操作系統(tǒng)提供的功能劳澄。而協(xié)程的操作則是程序員指定的,在python中通過(guò)yield蜈七,人為的實(shí)現(xiàn)并發(fā)處理秒拔。
協(xié)程存在的意義:對(duì)于多線(xiàn)程應(yīng)用,CPU通過(guò)切片的方式來(lái)切換線(xiàn)程間的執(zhí)行飒硅,線(xiàn)程切換時(shí)需要耗時(shí)砂缩。協(xié)程作谚,則只使用一個(gè)線(xiàn)程,分解一個(gè)線(xiàn)程成為多個(gè)“微線(xiàn)程”庵芭,在一個(gè)線(xiàn)程中規(guī)定某個(gè)代碼塊的執(zhí)行順序妹懒。
協(xié)程的適用場(chǎng)景:當(dāng)程序中存在大量不需要CPU的操作時(shí)(IO)。
常用第三方模塊gevent和greenlet双吆。(本質(zhì)上眨唬,gevent是對(duì)greenlet的高級(jí)封裝,因此一般用它就行好乐,這是一個(gè)相當(dāng)高效的模塊匾竿。)
4.1 greenletfrom greenlet import greenlet deftest1(): print(12) gr2.switch() print(34) gr2.switch() deftest2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
實(shí)際上,greenlet就是通過(guò)switch方法在不同的任務(wù)之間進(jìn)行切換蔚万。
4.2 geventfrom gevent import monkey; monkey.patch_all() import gevent import requests deff(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'), ])
通過(guò)joinall將任務(wù)f和它的參數(shù)進(jìn)行統(tǒng)一調(diào)度搂橙,實(shí)現(xiàn)單線(xiàn)程中的協(xié)程。代碼封裝層次很高笛坦,實(shí)際使用只需要了解它的幾個(gè)主要方法即可区转。