什么是生產(chǎn)者消費者模式
在軟件開發(fā)的過程中鲁捏,經(jīng)常碰到這樣的場景:
某些模塊負責生產(chǎn)數(shù)據(jù)芯砸,這些數(shù)據(jù)由其他模塊來負責處理(此處的模塊可能是:函數(shù)、線程给梅、進程等)假丧。產(chǎn)生數(shù)據(jù)的模塊稱為生產(chǎn)者,而處理數(shù)據(jù)的模塊稱為消費者动羽。在生產(chǎn)者與消費者之間的緩沖區(qū)稱之為倉庫包帚。生產(chǎn)者負責往倉庫運輸商品,而消費者負責從倉庫里取出商品运吓,這就構(gòu)成了生產(chǎn)者消費者模式渴邦。
結(jié)構(gòu)圖如下:
為了大家容易理解疯趟,我們舉一個寄信的例子。假設(shè)你要寄一封信谋梭,大致過程如下:
1信峻、你把信寫好——相當于生產(chǎn)者生產(chǎn)數(shù)據(jù)
2、你把信放入郵箱——相當于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)
3瓮床、郵遞員把信從郵箱取出盹舞,做相應(yīng)處理——相當于消費者把數(shù)據(jù)取出緩沖區(qū),處理數(shù)據(jù)
生產(chǎn)者消費者模式的優(yōu)點
-
解耦
假設(shè)生產(chǎn)者和消費者分別是兩個線程纤垂。如果讓生產(chǎn)者直接調(diào)用消費者的某個方法矾策,那么生產(chǎn)者對于消費者就會產(chǎn)生依賴(也就是耦合)磷账。如果未來消費者的代碼發(fā)生變化峭沦,可能會影響到生產(chǎn)者的代碼。而如果兩者都依賴于某個緩沖區(qū)逃糟,兩者之間不直接依賴吼鱼,耦合也就相應(yīng)降低了。
舉個例子绰咽,我們?nèi)ム]局投遞信件菇肃,如果不使用郵箱(也就是緩沖區(qū)),你必須得把信直接交給郵遞員取募。有同學(xué)會說琐谤,直接給郵遞員不是挺簡單的嘛?其實不簡單玩敏,你必須 得認識誰是郵遞員斗忌,才能把信給他。這就產(chǎn)生了你和郵遞員之間的依賴(相當于生產(chǎn)者和消費者的強耦合)旺聚。萬一哪天郵遞員 換人了织阳,你還要重新認識一下(相當于消費者變化導(dǎo)致修改生產(chǎn)者代碼)。而郵箱相對來說比較固定砰粹,你依賴它的成本就比較低(相當于和緩沖區(qū)之間的弱耦合)唧躲。
-
并發(fā)
由于生產(chǎn)者與消費者是兩個獨立的并發(fā)體,他們之間是用緩沖區(qū)通信的碱璃,生產(chǎn)者只需要往緩沖區(qū)里丟數(shù)據(jù)弄痹,就可以繼續(xù)生產(chǎn)下一個數(shù)據(jù),而消費者只需要從緩沖區(qū)拿數(shù)據(jù)即可嵌器,這樣就不會因為彼此的處理速度而發(fā)生阻塞界酒。
繼續(xù)上面的例子,如果我們不使用郵箱嘴秸,就得在郵局等郵遞員毁欣,直到他回來庇谆,把信件交給他,這期間我們啥事兒都不能干(也就是生產(chǎn)者阻塞)凭疮》苟或者郵遞員得挨家挨戶問,誰要寄信(相當于消費者輪詢)执解。
-
支持忙閑不均
當生產(chǎn)者制造數(shù)據(jù)快的時候寞肖,消費者來不及處理,未處理的數(shù)據(jù)可以暫時存在緩沖區(qū)中衰腌,慢慢處理掉新蟆。而不至于因為消費者的性能造成數(shù)據(jù)丟失或影響生產(chǎn)者生產(chǎn)。
我們再拿寄信的例子右蕊,假設(shè)郵遞員一次只能帶走1000封信琼稻,萬一碰上情人節(jié)(或是圣誕節(jié))送賀卡,需要寄出去的信超過了1000封饶囚,這時候郵箱這個緩沖區(qū)就派上用場了帕翻。郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時再拿走萝风。
通過上面的介紹大家應(yīng)該已經(jīng)明白了生產(chǎn)者消費者模式嘀掸。
Python中的多線程編程
在實現(xiàn)生產(chǎn)者消費者模式之前,我們先學(xué)習下Python中的多線程編程规惰。
線程是操作系統(tǒng)直接支持的執(zhí)行單元睬塌,高級語言通常都內(nèi)置多線程的支持,Python也不例外歇万,并且Python的線程是真正的Posix Thread揩晴,而不是模擬出來的線程。
Python的標準庫提供了兩個模塊:_thread和threading堕花,_thread是低級模塊文狱,threading是高級模塊,對_thread進行了封裝缘挽。絕大多數(shù)情況下瞄崇,我們只需要使用threading這個高級模塊。
下面我們先看一段在Python中實現(xiàn)多線程的代碼壕曼。
import time,threading
#線程代碼
class TaskThread(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self,name=name)
def run(self):
print('thread %s is running...' % self.getName())
for i in range(6):
print('thread %s >>> %s' % (self.getName(), i))
time.sleep(1)
print('thread %s finished.' % self.getName())
taskthread = TaskThread('TaskThread')
taskthread.start()
taskthread.join()
下面是程序的執(zhí)行結(jié)果:
thread TaskThread is running...
thread TaskThread >>> 0
thread TaskThread >>> 1
thread TaskThread >>> 2
thread TaskThread >>> 3
thread TaskThread >>> 4
thread TaskThread >>> 5
thread TaskThread finished.
TaskThread類繼承自threading模塊中的Thread線程類苏研。構(gòu)造函數(shù)的name參數(shù)指定線程的名字,通過重載基類run函數(shù)實現(xiàn)具體任務(wù)腮郊。
在簡單熟悉了Python的線程后摹蘑,下面我們實現(xiàn)一個生產(chǎn)者消費者模shi。
from Queue import Queue
import random,threading,time
#生產(chǎn)者類
class Producer(threading.Thread):
def __init__(self, name,queue):
threading.Thread.__init__(self, name=name)
self.data=queue
def run(self):
for i in range(5):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.data.put(i)
time.sleep(random.randrange(10)/5)
print("%s finished!" % self.getName())
#消費者類
class Consumer(threading.Thread):
def __init__(self,name,queue):
threading.Thread.__init__(self,name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val))
time.sleep(random.randrange(10))
print("%s finished!" % self.getName())
def main():
queue = Queue()
producer = Producer('Producer',queue)
consumer = Consumer('Consumer',queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'All threads finished!'
if __name__ == '__main__':
main()
執(zhí)行結(jié)果可能如下:
Producer is producing 0 to the queue!
Consumer is consuming. 0 in the queue is consumed!
Producer is producing 1 to the queue!
Producer is producing 2 to the queue!
Consumer is consuming. 1 in the queue is consumed!
Consumer is consuming. 2 in the queue is consumed!
Producer is producing 3 to the queue!
Producer is producing 4 to the queue!
Producer finished!
Consumer is consuming. 3 in the queue is consumed!
Consumer is consuming. 4 in the queue is consumed!
Consumer finished!
All threads finished!
因為多線程是搶占式執(zhí)行的轧飞,所以打印出的運行結(jié)果不一定和上面的完全一致衅鹿。
小結(jié)
本例通過Python實現(xiàn)了一個簡單的生產(chǎn)者消費者模型撒踪。Python中的Queue模塊已經(jīng)提供了對線程同步的支持,所以本文并沒有涉及鎖大渤、同步制妄、死鎖等多線程問題。