守護(hù)進(jìn)程
關(guān)于守護(hù)進(jìn)程需要強(qiáng)調(diào)兩點(diǎn):
其一:守護(hù)進(jìn)程會(huì)在主進(jìn)程代碼執(zhí)行結(jié)束后就終止
其二:守護(hù)進(jìn)程內(nèi)無(wú)法再開(kāi)啟子進(jìn)程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
在子進(jìn)程開(kāi)始之前添加p.daemon = True設(shè)置為守護(hù)進(jìn)程
不管這個(gè)守護(hù)進(jìn)程是否執(zhí)行完畢,或者是否開(kāi)始執(zhí)行,都會(huì)隨著主進(jìn)程的結(jié)束而結(jié)束
互斥鎖
進(jìn)程之間數(shù)據(jù)不共享,但是共享同一套文件系統(tǒng),所以訪問(wèn)同一個(gè)文件,或同一個(gè)打印終端,是沒(méi)有問(wèn)題的,而共享帶來(lái)的是競(jìng)爭(zhēng),競(jìng)爭(zhēng)帶來(lái)的結(jié)果就是錯(cuò)亂
如何控制,就是加鎖處理踢涌。而互斥鎖的意思就是互相排斥苍鲜,如果把多個(gè)進(jìn)程比喻為多個(gè)人,互斥鎖的工作原理就是多個(gè)人都要去爭(zhēng)搶同一個(gè)資源:衛(wèi)生間拍棕,一個(gè)人搶到衛(wèi)生間后上一把鎖掏呼,其他人都要等著蟆融,等到這個(gè)完成任務(wù)后釋放鎖井佑,其他人才有可能有一個(gè)搶到......所以互斥鎖的原理属铁,就是把并發(fā)改成穿行,降低了效率躬翁,但保證了數(shù)據(jù)安全不錯(cuò)亂
#由并發(fā)變成了串行,犧牲了運(yùn)行效率,但避免了競(jìng)爭(zhēng)
from multiprocessing import Process,Lock
import os,time
def work(lock):
lock.acquire() #加鎖
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
lock.release() #釋放鎖
if __name__ == '__main__':
lock=Lock()
for i in range(3):
p=Process(target=work,args=(lock,))
p.start()
互斥鎖與join的區(qū)別
- join只能將整個(gè)進(jìn)程的代碼進(jìn)行加鎖
- 互斥鎖可以將進(jìn)程中的部分代碼進(jìn)行加鎖
互斥鎖總結(jié)
加鎖可以保證多個(gè)進(jìn)程修改同一塊數(shù)據(jù)時(shí)焦蘑,同一時(shí)間只能有一個(gè)任務(wù)可以進(jìn)行修改,即串行地修改盒发,沒(méi)錯(cuò)例嘱,速度是慢了,但犧牲了速度卻保證了數(shù)據(jù)安全宁舰。
雖然可以用文件共享數(shù)據(jù)實(shí)現(xiàn)進(jìn)程間通信拼卵,但問(wèn)題是:
1、效率低(共享數(shù)據(jù)基于文件蛮艰,而文件是硬盤(pán)上的數(shù)據(jù))
2腋腮、需要自己加鎖處理
因此我們最好找尋一種解決方案能夠兼顧:
1、效率高(多個(gè)進(jìn)程共享一塊內(nèi)存的數(shù)據(jù))
2壤蚜、幫我們處理好鎖問(wèn)題低葫。
這就是mutiprocessing模塊為我們提供的基于消息的IPC通信機(jī)制:隊(duì)列和管道。
隊(duì)列和管道都是將數(shù)據(jù)存放于內(nèi)存中仍律,而隊(duì)列又是基于(管道+鎖)實(shí)現(xiàn)的嘿悬,可以讓我們從復(fù)雜的鎖問(wèn)題中解脫出來(lái),因而隊(duì)列才是進(jìn)程間通信的最佳選擇水泉。
我們應(yīng)該盡量避免使用共享數(shù)據(jù)善涨,盡可能使用消息傳遞和隊(duì)列,避免處理復(fù)雜的同步和鎖問(wèn)題草则,而且在進(jìn)程數(shù)目增多時(shí)钢拧,往往可以獲得更好的可獲展性。
隊(duì)列
進(jìn)程彼此之間互相隔離炕横,要實(shí)現(xiàn)進(jìn)程間通信(IPC)源内,multiprocessing模塊支持兩種形式:隊(duì)列和管道,這兩種方式都是使用消息傳遞的
創(chuàng)建隊(duì)列的類(底層就是以管道和鎖定的方式實(shí)現(xiàn)):
Queue([maxsize]):創(chuàng)建共享的進(jìn)程隊(duì)列份殿,Queue是多進(jìn)程安全的隊(duì)列膜钓,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
參數(shù)介紹:
maxsize是隊(duì)列中允許最大項(xiàng)數(shù)卿嘲,省略則無(wú)大小限制颂斜。
但需要明確:
1、隊(duì)列內(nèi)存放的是消息而非大數(shù)據(jù)
2拾枣、隊(duì)列占用的是內(nèi)存空間沃疮,因而maxsize即便是無(wú)大小限制也受限于內(nèi)存大小
主要方法介紹:
q.put方法用以插入數(shù)據(jù)到隊(duì)列中盒让。
q.get方法可以從隊(duì)列讀取并且刪除一個(gè)元素。
隊(duì)列的使用
from multiprocessing import Process,Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #滿了
# q.put(4) #再放就阻塞住了
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了
生產(chǎn)者消費(fèi)者模型
一什么是生產(chǎn)者和消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題司蔬。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊邑茄,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理俊啼,直接扔給阻塞隊(duì)列肺缕,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取吨些,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū)搓谆,平衡了生產(chǎn)者和消費(fèi)者的處理能力炒辉。
這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的
二 生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)
from multiprocessing import Process,Queue
import time,random,os
def consumer(q,name):
while True:
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print('\033[43m%s 吃 %s\033[0m' %(name,res))
def producer(q,name,food):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m%s 生產(chǎn)了 %s\033[0m' %(name,res))
if __name__ == '__main__':
q=Queue()
#生產(chǎn)者們:即廚師們
p1=Process(target=producer,args=(q,'egon1','包子'))
p2=Process(target=producer,args=(q,'egon2','骨頭'))
p3=Process(target=producer,args=(q,'egon3','泔水'))
#消費(fèi)者們:即吃貨們
c1=Process(target=consumer,args=(q,'alex1'))
c2=Process(target=consumer,args=(q,'alex2'))
#開(kāi)始
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
q.put(None)
q.put(None)
q.put(None)
print('主')
其實(shí)我們的思路無(wú)非是發(fā)送結(jié)束信號(hào)而已豪墅,有另外一種隊(duì)列提供了這種機(jī)制
JoinableQueue([maxsize])
這就像是一個(gè)Queue對(duì)象,但隊(duì)列允許項(xiàng)目的使用者通知生成者項(xiàng)目已經(jīng)被成功處理黔寇。通知進(jìn)程是使用共享的信號(hào)和條件變量來(lái)實(shí)現(xiàn)的偶器。
參數(shù)介紹
maxsize是隊(duì)列中允許最大項(xiàng)數(shù),省略則無(wú)大小限制缝裤。
方法介紹
JoinableQueue的實(shí)例p除了與Queue對(duì)象相同的方法之外還具有:
q.task_done():使用者使用此方法發(fā)出信號(hào)屏轰,表示q.get()的返回項(xiàng)目已經(jīng)被處理(相當(dāng)于一個(gè)計(jì)數(shù)器,生產(chǎn)者拿到這個(gè)信號(hào)知道自己生產(chǎn)的數(shù)據(jù)已經(jīng)被處理的多少個(gè),有沒(méi)有被處理完)。如果調(diào)用此方法的次數(shù)大于從隊(duì)列中刪除項(xiàng)目的數(shù)量憋飞,將引發(fā)ValueError異常
q.join():生產(chǎn)者調(diào)用此方法進(jìn)行阻塞霎苗,直到隊(duì)列中所有的項(xiàng)目均被處理。阻塞將持續(xù)到隊(duì)列中的每個(gè)項(xiàng)目均調(diào)用q.task_done()方法為止(意思就是生產(chǎn)者生產(chǎn)的數(shù)量等于使用者發(fā)回來(lái)的q.task_done()的次數(shù)的話,表示當(dāng)前生產(chǎn)者的所有數(shù)據(jù)已經(jīng)被處理完成,并且自身也不再生產(chǎn)數(shù)據(jù),生產(chǎn)者這條進(jìn)程才結(jié)束,之前使用Queue()方法僅僅是只要生產(chǎn)完了所有數(shù)據(jù)立馬結(jié)束生產(chǎn)者進(jìn)程,而在這里加了q.join()以后會(huì)等到自己生產(chǎn)的所有數(shù)據(jù)被處理完成了才結(jié)束自己的進(jìn)程)
基于JoinableQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q,name):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[43m%s 吃 %s\033[0m' %(name,res))
q.task_done() #發(fā)送信號(hào)給q.join()榛做,說(shuō)明已經(jīng)從隊(duì)列中取走一個(gè)數(shù)據(jù)并處理完畢了
def producer(q,name,food):
for i in range(3):
time.sleep(random.randint(1,3))
res='%s%s' %(food,i)
q.put(res)
print('\033[45m%s 生產(chǎn)了 %s\033[0m' %(name,res))
q.join() #等到消費(fèi)者把自己放入隊(duì)列中的所有的數(shù)據(jù)都取走之后唁盏,生產(chǎn)者才結(jié)束
if __name__ == '__main__':
q=JoinableQueue() #使用JoinableQueue()
#生產(chǎn)者們:即廚師們
p1=Process(target=producer,args=(q,'egon1','包子'))
p2=Process(target=producer,args=(q,'egon2','骨頭'))
p3=Process(target=producer,args=(q,'egon3','泔水'))
#消費(fèi)者們:即吃貨們
c1=Process(target=consumer,args=(q,'alex1'))
c2=Process(target=consumer,args=(q,'alex2'))
c1.daemon=True # 設(shè)置守護(hù)進(jìn)程,當(dāng)所有數(shù)據(jù)已經(jīng)被處理完成,生產(chǎn)者不再生產(chǎn)數(shù)據(jù)了,生產(chǎn)者進(jìn)程和主進(jìn)程都已經(jīng)結(jié)束時(shí),消費(fèi)者進(jìn)程卡在q.get(),直接讓其隨著主進(jìn)程的結(jié)束而結(jié)束
c2.daemon=True
#開(kāi)始
p1.start()
p2.start()
p3.start()
c1.start()
c2.start()
p1.join()
p2.join()
p3.join()
#1、主進(jìn)程等生產(chǎn)者p1检眯、p2厘擂、p3結(jié)束
#2、而p1锰瘸、p2刽严、p3是在消費(fèi)者把所有數(shù)據(jù)都取干凈之后才會(huì)結(jié)束
#3、所以一旦p1避凝、p2舞萄、p3結(jié)束了,證明消費(fèi)者也沒(méi)必要存在了管削,應(yīng)該隨著主進(jìn)程一塊死掉鹏氧,因而需要將生產(chǎn)者們?cè)O(shè)置成守護(hù)進(jìn)程
print('主')
三 生產(chǎn)者消費(fèi)者模型總結(jié)
1、程序中有兩類角色
一類負(fù)責(zé)生產(chǎn)數(shù)據(jù)(生產(chǎn)者)
一類負(fù)責(zé)處理數(shù)據(jù)(消費(fèi)者)
2佩谣、引入生產(chǎn)者消費(fèi)者模型為了解決的問(wèn)題是
平衡生產(chǎn)者與消費(fèi)者之間的速度差
程序解開(kāi)耦合
3把还、如何實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
生產(chǎn)者<--->隊(duì)列<--->消費(fèi)者