隊(duì)列
進(jìn)程彼此之間互相隔離,要實(shí)現(xiàn)進(jìn)程間通信(IPC),multiprocessing模塊支持兩種形式:隊(duì)列和管道睬愤,這兩種方式都是使用消息傳遞的
創(chuàng)建隊(duì)列的類(底層就是以管道和鎖定的方式實(shí)現(xiàn)):
# 創(chuàng)建共享的進(jìn)程隊(duì)列,Queue是多進(jìn)程安全的隊(duì)列猎莲,可以使用Queue實(shí)現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
Queue([maxsize])
參數(shù)介紹:
maxsize是隊(duì)列中允許最大項(xiàng)數(shù)锐锣,省略則無大小限制腌闯。
主要方法:
q.put方法用以插入數(shù)據(jù)到隊(duì)列中,put方法還有兩個(gè)可選參數(shù):blocked和timeout雕憔。如果blocked為True(默認(rèn)值)姿骏,并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間斤彼,直到該隊(duì)列有剩余的空間分瘦。如果超時(shí),會(huì)拋出Queue.Full異常琉苇。如果blocked為False嘲玫,但該Queue已滿,會(huì)立即拋出Queue.Full異常并扇。
q.get方法可以從隊(duì)列讀取并且刪除一個(gè)元素去团。同樣,get方法有兩個(gè)可選參數(shù):blocked和timeout穷蛹。如果blocked為True(默認(rèn)值)土陪,并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素肴熏,會(huì)拋出Queue.Empty異常鬼雀。如果blocked為False,有兩種情況存在扮超,如果Queue有一個(gè)值可用取刃,則立即返回該值,否則出刷,如果隊(duì)列為空璧疗,則立即拋出Queue.Empty異常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():調(diào)用此方法時(shí)q為空則返回True,該結(jié)果不可靠馁龟,比如在返回True的過程中崩侠,如果隊(duì)列中又加入了項(xiàng)目。
q.full():調(diào)用此方法時(shí)q已滿則返回True坷檩,該結(jié)果不可靠却音,比如在返回True的過程中,如果隊(duì)列中的項(xiàng)目被取走矢炼。
q.qsize():返回隊(duì)列中目前項(xiàng)目的正確數(shù)量系瓢,結(jié)果也不可靠,理由同q.empty()和q.full()一樣
應(yīng)用:
'''
multiprocessing模塊支持進(jìn)程間通信的兩種主要形式:管道和隊(duì)列
都是基于消息傳遞實(shí)現(xiàn)的,但是隊(duì)列接口
'''
from multiprocessing import Process,Queue
import time
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #滿了
print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
生產(chǎn)者與消費(fèi)者模型
在并發(fā)編程中使用生產(chǎn)者和消費(fèi)者模式能夠解決絕大多數(shù)并發(fā)問題句灌。該模式通過平衡生產(chǎn)線程和消費(fèi)線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度夷陋。
為什么要使用生產(chǎn)者和消費(fèi)者模式
在線程世界里欠拾,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程骗绕。在多線程開發(fā)當(dāng)中藐窄,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢酬土,那么生產(chǎn)者就必須等待消費(fèi)者處理完荆忍,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理撤缴,如果消費(fèi)者的處理能力大于生產(chǎn)者刹枉,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式腹泌。
什么是生產(chǎn)者消費(fèi)者模式
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題嘶卧。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊凉袱,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理芥吟,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù)专甩,而是直接從阻塞隊(duì)列里取钟鸵,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力涤躲。
基于隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
應(yīng)用:
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
q.task_done() #向q.join()發(fā)送一次信號(hào),證明一個(gè)數(shù)據(jù)已經(jīng)被取走了
def producer(name,q):
for i in range(10):
time.sleep(random.randint(1,3))
res='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生產(chǎn)了 %s\033[0m' %(os.getpid(),res))
q.join()
if __name__ == '__main__':
q=JoinableQueue()
#生產(chǎn)者們:即廚師們
p1=Process(target=producer,args=('包子',q))
p2=Process(target=producer,args=('骨頭',q))
p3=Process(target=producer,args=('泔水',q))
#消費(fèi)者們:即吃貨們
c1=Process(target=consumer,args=(q,))
c2=Process(target=consumer,args=(q,))
c1.daemon=True
c2.daemon=True
#開始
p_l=[p1,p2,p3,c1,c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('主')
#主進(jìn)程等--->p1,p2,p3等---->c1,c2
#p1,p2,p3結(jié)束了,證明c1,c2肯定全都收完了p1,p2,p3發(fā)到隊(duì)列的數(shù)據(jù)
#因而c1,c2也沒有存在的價(jià)值了,應(yīng)該隨著主進(jìn)程的結(jié)束而結(jié)束,所以設(shè)置成守護(hù)進(jìn)程
生產(chǎn)者消費(fèi)者模型總結(jié)
#程序中有兩類角色
一類負(fù)責(zé)生產(chǎn)數(shù)據(jù)(生產(chǎn)者)
一類負(fù)責(zé)處理數(shù)據(jù)(消費(fèi)者)
#引入生產(chǎn)者消費(fèi)者模型為了解決的問題是:
平衡生產(chǎn)者與消費(fèi)者之間的工作能力棺耍,從而提高程序整體處理數(shù)據(jù)的速度
#如何實(shí)現(xiàn):
生產(chǎn)者<-->隊(duì)列<——>消費(fèi)者
#生產(chǎn)者消費(fèi)者模型實(shí)現(xiàn)類程序的解耦和
補(bǔ)充重點(diǎn):
JoinableQueue是Queue的子類,增加了task_done()和join()方法种樱。
task_done()用來告訴queue一個(gè)task完成蒙袍。一般地在調(diào)用get()獲得一個(gè)task,
在task結(jié)束后調(diào)用task_done()來通知Queue當(dāng)前task完成嫩挤。
join() 阻塞直到queue中的所有的task都被處理(即task_done方法被調(diào)用)害幅。
在上面代碼中,對(duì)于調(diào)用get的兩個(gè)子進(jìn)程應(yīng)該設(shè)置為守護(hù)進(jìn)程(daemon = True)岂昭,
這里子進(jìn)程不會(huì)直接被結(jié)束以现,可能是因?yàn)镴oinableQueue自己會(huì)協(xié)調(diào)put和get,
這樣當(dāng)JoinableQueue中的數(shù)據(jù)全部被取出后约啊,這兩個(gè)子進(jìn)程才會(huì)自動(dòng)結(jié)束邑遏。