Python多線程(四):生產(chǎn)者消費(fèi)者問(wèn)題

上一篇:

生產(chǎn)者消費(fèi)者問(wèn)題是多線程中一個(gè)很經(jīng)典并發(fā)協(xié)作的問(wèn)題煞檩,這個(gè)問(wèn)題主要包含兩類線程处嫌,一個(gè)是生產(chǎn)者用于生產(chǎn)數(shù)據(jù),另一個(gè)是消費(fèi)者用于消費(fèi)數(shù)據(jù)斟湃,兩者操作同一個(gè)數(shù)據(jù)共享區(qū)域熏迹,這種模型在編程中非常常見(jiàn),比如爬蟲(chóng)凝赛,生產(chǎn)者負(fù)責(zé)爬取鏈接注暗,消費(fèi)者負(fù)責(zé)解析鏈接所指向的網(wǎng)頁(yè)內(nèi)容。這種模型需要滿足下面的兩個(gè)特征:

  • 消費(fèi)者在數(shù)據(jù)共享區(qū)域?yàn)榭諘r(shí)阻塞墓猎,直到共享區(qū)域出現(xiàn)新數(shù)據(jù)捆昏。
  • 生產(chǎn)者在數(shù)據(jù)共享區(qū)域滿時(shí)阻塞,直到數(shù)據(jù)共享區(qū)出現(xiàn)空位毙沾。

下面是一個(gè)簡(jiǎn)單的例子:

import threading
import time
import random
MAX_BUFF_LEN = 5

buff = []
lock = threading.Lock()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            lock.acquire()
            if len(buff) < MAX_BUFF_LEN:
                # 如果共享區(qū)域未滿骗卜,生產(chǎn)數(shù)據(jù)
                num = random.uniform(0, 5)
                buff.append(num)
                print('生產(chǎn)者向共享區(qū)域加入%f' % num)
                lock.release()
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            lock.acquire()
            if buff:
                # 如果共享區(qū)非空,消費(fèi)數(shù)據(jù)
                num = buff.pop(0)
                print('消費(fèi)者消費(fèi)掉%f' %num)
                lock.release()
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序強(qiáng)制結(jié)束左胞!')

程序運(yùn)行結(jié)果如下:

生產(chǎn)者向共享區(qū)域加入1.653411
消費(fèi)者消費(fèi)掉1.653411
生產(chǎn)者向共享區(qū)域加入2.176285
生產(chǎn)者向共享區(qū)域加入4.727504
生產(chǎn)者向共享區(qū)域加入3.053323
消費(fèi)者消費(fèi)掉2.176285
生產(chǎn)者向共享區(qū)域加入0.951072
消費(fèi)者消費(fèi)掉4.727504
^C程序強(qiáng)制結(jié)束寇仓!

在程序中設(shè)置兩個(gè)進(jìn)程為守護(hù)進(jìn)程,并捕捉KeyboardInterrupt錯(cuò)誤烤宙,一旦捕捉到就結(jié)束主線程遍烦,同時(shí)結(jié)束兩個(gè)子線程。上面是一個(gè)生產(chǎn)者消費(fèi)者模型的一個(gè)簡(jiǎn)單實(shí)現(xiàn)躺枕,通過(guò)共享變量的方式使兩個(gè)線程互相通信來(lái)達(dá)成一致服猪。共享變量是線程間通信的常用方法,只要記得在對(duì)共享變量進(jìn)行操作時(shí)加鎖拐云,程序就不會(huì)有問(wèn)題罢猪。

但是上面的代碼也有問(wèn)題,在于這種代碼通過(guò)無(wú)限對(duì)共享變量訪問(wèn)的方式進(jìn)行判斷空還是滿叉瘩,這樣也降低了效率坡脐。因?yàn)槠渲幸粋€(gè)程序在明明知道buff滿了或者空了的情況下還要進(jìn)行無(wú)意義的循環(huán),由于GIL機(jī)制房揭,它會(huì)和其他線程爭(zhēng)奪執(zhí)行權(quán)。如果某一方在判斷buff滿了或者空了的情況下主動(dòng)阻塞晌端,直到另外一方通知它捅暴,它才恢復(fù),這樣就能最大化的效率咧纠。

Python中threading中的Condition類就是來(lái)幫助我們完成這件事的蓬痒。它的waitnotify方法能夠阻塞和通知一個(gè)線程,下面還是通過(guò)例子來(lái)了解一下:

import threading
import time
import random
MAX_BUFF_LEN = 5

buff = []
condition = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            condition.acquire()
            if len(buff) < MAX_BUFF_LEN:
                # 如果共享區(qū)域未滿漆羔,生產(chǎn)數(shù)據(jù)
                num = random.uniform(0, 5)
                buff.append(num)
                print('生產(chǎn)者向共享區(qū)域加入%f' % num)
                condition.notify()
            else:
                # 如果共享區(qū)滿梧奢,停止生產(chǎn)
                print('共享區(qū)滿狱掂,生產(chǎn)者阻塞!')
                condition.wait()
            condition.release()
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            condition.acquire()
            if buff:
                # 如果共享區(qū)非空亲轨,消費(fèi)數(shù)據(jù)
                num = buff.pop(0)
                print('消費(fèi)者消費(fèi)掉%f' %num)
                condition.notify()
            else:
                # 如果共享去空趋惨,停止消費(fèi)
                print('共享區(qū)空,消費(fèi)者阻塞惦蚊!')
                condition.wait()
            condition.release()
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序強(qiáng)制結(jié)束器虾!')

程序結(jié)果:

生產(chǎn)者向共享區(qū)域加入0.040350
消費(fèi)者消費(fèi)掉0.040350
共享區(qū)空,消費(fèi)者阻塞蹦锋!
生產(chǎn)者向共享區(qū)域加入3.266167
消費(fèi)者消費(fèi)掉3.266167
生產(chǎn)者向共享區(qū)域加入3.468917
^C程序強(qiáng)制結(jié)束兆沙!

上面的代碼中,acquire方法實(shí)際上是獲得鎖莉掂,wait方法將線程阻塞葛圃,實(shí)際上是將鎖釋放。當(dāng)一個(gè)線程調(diào)用notify方法時(shí)憎妙,另一個(gè)線程就被喚醒库正,但是這時(shí)候這個(gè)線程并沒(méi)有調(diào)用wait或者release方法釋放鎖,因此另一個(gè)線程雖然醒過(guò)來(lái)了但是還是沒(méi)有執(zhí)行尚氛,直到這個(gè)線程將鎖釋放诀诊。

在使用共享變量的時(shí)候,需要時(shí)刻注意是否線程安全阅嘶,非常不方便属瓣。好在是Python中提供了一個(gè)Queue類,它是線程安全的讯柔,有了它我們可以把注意力放在如何實(shí)現(xiàn)代碼邏輯上抡蛙,而不是過(guò)多的注意到線程安全上。在Python2.7中該模塊名為Queue魂迄,而在Python3.6中該模塊名為queue粗截。使用Queue類改進(jìn)的代碼如下:

import threading
import time
import random
from queue import Queue

MAX_BUFF_LEN = 5

buff = Queue(MAX_BUFF_LEN)
condition = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        global buff
        while True:
            num = random.uniform(0, 5)
            buff.put(num)
            print('生產(chǎn)者向共享區(qū)域加入%f' % num)
            time.sleep(random.uniform(0, 10))

class Consumer(threading.Thread):
    def run(self):
        global buff
        while True:
            num = buff.get()
            print('消費(fèi)者消費(fèi)掉%f' %num)
            time.sleep(random.uniform(0, 10))

producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
except KeyboardInterrupt:
    print('程序強(qiáng)制結(jié)束!')

Queue是一個(gè)FIFO隊(duì)列捣炬,它的get方法和put方法分別是入隊(duì)和出隊(duì)熊昌,在入隊(duì)和出隊(duì)時(shí)獲取了鎖以保證線程安全,如果隊(duì)列空或者滿湿酸,默認(rèn)情況下get方法和put方法自動(dòng)阻塞婿屹。阻塞和喚醒的方式實(shí)質(zhì)上是調(diào)用了Condition類的waitnotify方法。Queue類比較簡(jiǎn)單推溃,推薦大家直接查看源碼或者官方文檔昂利。

這里還有一篇寫(xiě)得非常好的博客,推薦大家去看看:Producer-consumer problem in Python

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蜂奸,隨后出現(xiàn)的幾起案子犁苏,更是在濱河造成了極大的恐慌,老刑警劉巖扩所,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件围详,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡碌奉,警方通過(guò)查閱死者的電腦和手機(jī)短曾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)赐劣,“玉大人嫉拐,你說(shuō)我怎么就攤上這事】妫” “怎么了婉徘?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)咐汞。 經(jīng)常有香客問(wèn)我盖呼,道長(zhǎng),這世上最難降的妖魔是什么化撕? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任几晤,我火速辦了婚禮,結(jié)果婚禮上植阴,老公的妹妹穿的比我還像新娘蟹瘾。我一直安慰自己,他們只是感情好掠手,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布憾朴。 她就那樣靜靜地躺著,像睡著了一般喷鸽。 火紅的嫁衣襯著肌膚如雪众雷。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天做祝,我揣著相機(jī)與錄音砾省,去河邊找鬼。 笑死混槐,一個(gè)胖子當(dāng)著我的面吹牛纯蛾,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播纵隔,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了捌刮?” 一聲冷哼從身側(cè)響起碰煌,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绅作,沒(méi)想到半個(gè)月后芦圾,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡俄认,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年个少,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片眯杏。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡夜焦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出岂贩,到底是詐尸還是另有隱情茫经,我是刑警寧澤,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布萎津,位于F島的核電站卸伞,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏锉屈。R本人自食惡果不足惜荤傲,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望颈渊。 院中可真熱鬧遂黍,春花似錦、人聲如沸儡炼。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)乌询。三九已至榜贴,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間妹田,已是汗流浹背唬党。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留鬼佣,地道東北人驶拱。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像晶衷,于是被迫代替她去往敵國(guó)和親蓝纲。 傳聞我的和親對(duì)象是個(gè)殘疾皇子阴孟,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 進(jìn)程和線程 進(jìn)程 所有運(yùn)行中的任務(wù)通常對(duì)應(yīng)一個(gè)進(jìn)程,當(dāng)一個(gè)程序進(jìn)入內(nèi)存運(yùn)行時(shí),即變成一個(gè)進(jìn)程.進(jìn)程是處于運(yùn)行過(guò)程中...
    勝浩_ae28閱讀 5,087評(píng)論 0 23
  • ??一個(gè)任務(wù)通常就是一個(gè)程序,每個(gè)運(yùn)行中的程序就是一個(gè)進(jìn)程税迷。當(dāng)一個(gè)程序運(yùn)行時(shí)永丝,內(nèi)部可能包含了多個(gè)順序執(zhí)行流,每個(gè)順...
    OmaiMoon閱讀 1,663評(píng)論 0 12
  • 進(jìn)程和線程 進(jìn)程 所有運(yùn)行中的任務(wù)通常對(duì)應(yīng)一個(gè)進(jìn)程,當(dāng)一個(gè)程序進(jìn)入內(nèi)存運(yùn)行時(shí),即變成一個(gè)進(jìn)程.進(jìn)程是處于運(yùn)行過(guò)程中...
    小徐andorid閱讀 2,799評(píng)論 3 53
  • 本鏡像采用官方原版app制作箭养,集成Clover 4391慕嚷,支持UEFI啟動(dòng)安裝;如果卡+++請(qǐng)?zhí)鎿QDrivers6...
    daliansky閱讀 12,294評(píng)論 2 0
  • 大家好,我是侯俊玲毕泌,說(shuō)實(shí)話看到這個(gè)主題時(shí)喝检,我想了很久,我的夢(mèng)想是什么撼泛?我好像是一只溫水里的青蛙挠说,跳不出這個(gè)圈子,消...
    侯俊玲閱讀 420評(píng)論 0 0