multiprocessing.Queue
- q = multiprocessing.Queue()
- q.put(x) # 向隊(duì)列中放入一個(gè)值
- q.get() # 從隊(duì)列中取一個(gè)值
- q.empty() # 判斷隊(duì)列是否為空
- q.full() # 判斷隊(duì)列是否滿了
- q.get_nowait() # 從隊(duì)列中取值魂角,如果隊(duì)列為空丛忆,則拋出異常。普通get()方法為空時(shí)堵塞
- q.qsize() # 求取隊(duì)列長(zhǎng)度
生產(chǎn)者消費(fèi)者模型
用來解決數(shù)據(jù)供需的平衡問題
- 生產(chǎn)者 -- 進(jìn)程
- 消費(fèi)者 -- 進(jìn)程
import multiprocessing
import time
import random
def consumer(q, name):
while True:
food = q.get()
if food is None:
print("%s 拿到一個(gè)空" % name)
break
print("\033[31m%s 消費(fèi)了 %s\033[0m" %(name, food))
time.sleep(random.randint(1, 3))
def producer(name, food, q):
for i in range(10):
f = "%s生產(chǎn)的 %s %s" % (name, food, i)
print(f)
q.put(f)
time.sleep(1)
if __name__ == "__main__":
q = multiprocessing.Queue(20)
p = multiprocessing.Process(target=producer, args=("Sun", "包子", q))
p.start()
p2 = multiprocessing.Process(target=producer, args=("Wang", "泔水", q))
p2.start()
c1 = multiprocessing.Process(target=consumer, args=(q, "1號(hào)"))
c1.start()
c2 = multiprocessing.Process(target=consumer, args=(q, "2號(hào)"))
c2.start()
p.join()
p2.join()
q.put(None)
q.put(None)
JoinableQueue
使用阻塞方式等待隊(duì)列空閑后退出
import multiprocessing
import time
import random
def consumer(q, name):
while True:
food = q.get()
print("\033[31m%s 消費(fèi)了 %s\033[0m" %(name, food))
time.sleep(random.randint(1, 3))
q.task_done()
def producer(name, food, q):
for i in range(10):
f = "%s生產(chǎn)的 %s %s" % (name, food, i)
print(f)
q.put(f)
time.sleep(1)
q.join()
if __name__ == "__main__":
q = multiprocessing.JoinableQueue(20)
p1 = multiprocessing.Process(target=producer, args=("Sun", "包子", q))
p1.start()
p2 = multiprocessing.Process(target=producer, args=("Wang", "泔水", q))
p2.start()
c1 = multiprocessing.Process(target=consumer, args=(q, "1號(hào)"))
c2 = multiprocessing.Process(target=consumer, args=(q, "2號(hào)"))
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()