隊列(進程通信ipc)
隊列主要用于解決進程間通信的問題谒兄,隊列底層就是通過管道和鎖的方式實現(xiàn)的舷嗡。
代碼示例:
from multiprocessing import Queue
import time
q=Queue(3) # 指定隊列的長度
#隊列相關(guān)的操作方法
# put,get,put_nowait,get_nowait,full,empty
q.put(3) # 向隊列中存放數(shù)據(jù),可以是任何類型的數(shù)據(jù)
q.put(3)
q.put(3)
print(q.full()) # 如果隊列滿绳瘟,則返回 True, 否則返回 False
print(q.get()) # 依次取出數(shù)據(jù)
print(q.get())
print(q.get())
print(q.empty()) # 如果隊列為空易茬,則返回True,否則返回 False
主要方法
- q.put(): 用以插入數(shù)據(jù)到隊列中,put方法還有兩個可選參數(shù):blocked和timeout树埠。如果blocked為True(默認值)糠馆,并且timeout為正值,該方法會阻塞timeout指定的時間怎憋,直到該隊列有剩余的空間又碌。如果超時,會拋出Queue.Full異常绊袋。如果blocked為False毕匀,但該Queue已滿,會立即拋出Queue.Full異常
- q.get方法可以從隊列讀取并且刪除一個元素癌别。同樣皂岔,get方法有兩個可選參數(shù):blocked和timeout。如果blocked為True(默認值)展姐,并且timeout為正值躁垛,那么在等待時間內(nèi)沒有取到任何元素,會拋出Queue.Empty異常圾笨。如果blocked為False教馆,有兩種情況存在,如果Queue有一個值可用擂达,則立即返回該值土铺,否則,如果隊列為空板鬓,則立即拋出Queue.Empty異常
- q.get_nowait():同q.get(False)
- q.put_nowait():同q.put(False)
- q.empty():調(diào)用此方法時q為空則返回True悲敷,該結(jié)果不可靠,比如在返回True的過程中俭令,如果隊列中又加入了項目
- q.full():調(diào)用此方法時q已滿則返回True后德,該結(jié)果不可靠,比如在返回True的過程中抄腔,如果隊列中的項目被取走探遵。
- q.qsize():返回隊列中目前項目的正確數(shù)量,結(jié)果也不可靠妓柜,理由同q.empty()和q.full()一樣
線程Queue
同進程隊列一樣箱季,線程也有對于的方法,叫做線程Queue.
import queue
q=queue.Queue(3) # 隊列:先進先出,指定隊列的大小
q.put(1) # 向隊列中放入數(shù)據(jù)
q.put(2)
q.put(3)
print(q.get()) # 從隊列中取出數(shù)據(jù)
# q.put(4) # 當隊列滿后會等待有空閑位置時再放入
# q.put_nowait(4) # 立即放入數(shù)據(jù)棍掐,不等待藏雏,如果隊列已經(jīng)滿,則會報錯。
q.put(4,block=False) # 與put_nowait()方法一樣掘殴,設置不等待赚瘦,直接放入
q.put(4,block=True,timeout=3) # 等待,且超時時間為3s
優(yōu)先級隊列:
import queue
q=queue.PriorityQueue(3) # 優(yōu)先級隊列
q.put((10,'a')) # 指定優(yōu)先級奏寨,數(shù)字越小起意,優(yōu)先級越高
q.put((-3,'b'))
q.put((100,'c'))
print(q.get())
print(q.get())
print(q.get())
# 輸出結(jié)果:
(-3, 'b')
(10, 'a')
(100, 'c')
堆棧,后進先出:
import queue
q=queue.LifoQueue(3) # 堆棧:后進先出
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
輸出:
3
2
1
生產(chǎn)者和消費者模型
為了避免死鎖問題病瞳,能夠解耦合揽咕,定義了生產(chǎn)者消費者模型。生產(chǎn)者只需要創(chuàng)造數(shù)據(jù)套菜,然后將數(shù)據(jù)放入隊列亲善,消費者則從隊列中取出數(shù)據(jù),對數(shù)據(jù)進行消費逗柴。
下面是使用多進程實現(xiàn)了簡單的生產(chǎn)者和消費者模型:
from multiprocessing import Process,Queue
import random
import time
def producer(name,food,q):
for i in range(10):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print("廚師[%s]生產(chǎn)了<%s>" %(name,res))
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('吃貨[%s]吃了<%s>' %(name,res))
if __name__=='__main__':
q=Queue()
p1=Process(target=producer,args=('andy','包子',q))
c1=Process(target=consumer,args=('bob',q))
p1.start()
c1.start()
print('主進程')
在實際的應用中蛹头,可能會有多個生產(chǎn)者和消費者,而且我們必須保證在生產(chǎn)者已經(jīng)生產(chǎn)完數(shù)據(jù)戏溺,并且消費者消費完數(shù)據(jù)后程序正常退出渣蜗,所以這里需要使用到JoinableQueue
模塊。
from multiprocessing import Process,JoinableQueue # 導入可以使用join方法的模塊
import random
import time
def producer(name,food,q):
for i in range(3):
res='%s%s' %(food,i)
time.sleep(random.randint(1,3))
q.put(res)
print("廚師[%s]生產(chǎn)了<%s>" %(name,res))
def consumer(name,q):
while True:
res=q.get()
time.sleep(random.randint(1,3))
print('吃貨[%s]吃了<%s>' %(name,res))
q.task_done() # 通過使用隊列的task_done方法旷祸,通知每一次從隊列取出的信息
if __name__=='__main__':
q=JoinableQueue()
p1=Process(target=producer,args=('andy','包子',q))
p2=Process(target=producer,args=('Tom','包子',q))
c1=Process(target=consumer,args=('bob',q))
c2=Process(target=consumer,args=('Lucy',q))
c3=Process(target=consumer,args=('David',q))
c1.daemon=True # 設置為守護進程炫刷,當主進程運行完畢時眶诈,此子進程也退出
c2.daemon=True
c3.daemon=True
p1.start()
p2.start()
c1.start()
c2.start()
c3.start()
p1.join() # 等待生產(chǎn)子進程運行結(jié)束
p2.join()
q.join() # 等待隊列為空 后結(jié)束主進程
print('主進程')
說明:
- JoinableQueue([maxsize]):這就像是一個Queue對象帽蝶,但隊列允許項目的使用者通知生成者項目已經(jīng)被成功處理没龙。通知進程是使用共享的信號和條件變量來實現(xiàn)的五辽。
- q.task_done():使用者使用此方法發(fā)出信號好乐,表示q.get()的返回項目已經(jīng)被處理瘾蛋。如果調(diào)用此方法的次數(shù)大于從隊列中刪除項目的數(shù)量搁拙,將引發(fā)ValueError異常
- q.join():生產(chǎn)者調(diào)用此方法進行阻塞掺炭,直到隊列中所有的項目均被處理辫诅。阻塞將持續(xù)到隊列中的每個項目均調(diào)用q.task_done()方法為止