上一篇:鎖
生產(chǎn)者消費(fèi)者問(wèn)題是多線程中一個(gè)很經(jīng)典并發(fā)協(xié)作的問(wèn)題煞檩,這個(gè)問(wèn)題主要包含兩類線程处嫌,一個(gè)是生產(chǎn)者用于生產(chǎn)數(shù)據(jù),另一個(gè)是消費(fèi)者用于消費(fèi)數(shù)據(jù)斟湃,兩者操作同一個(gè)數(shù)據(jù)共享區(qū)域熏迹,這種模型在編程中非常常見(jiàn),比如爬蟲(chóng)凝赛,生產(chǎn)者負(fù)責(zé)爬取鏈接注暗,消費(fèi)者負(fù)責(zé)解析鏈接所指向的網(wǎng)頁(yè)內(nèi)容。這種模型需要滿足下面的兩個(gè)特征:
- 消費(fèi)者在數(shù)據(jù)共享區(qū)域?yàn)榭諘r(shí)阻塞墓猎,直到共享區(qū)域出現(xiàn)新數(shù)據(jù)捆昏。
- 生產(chǎn)者在數(shù)據(jù)共享區(qū)域滿時(shí)阻塞,直到數(shù)據(jù)共享區(qū)出現(xiàn)空位毙沾。
下面是一個(gè)簡(jiǎn)單的例子:
import threading
import time
import random
MAX_BUFF_LEN = 5
buff = []
lock = threading.Lock()
class Producer(threading.Thread):
def run(self):
global buff
while True:
lock.acquire()
if len(buff) < MAX_BUFF_LEN:
# 如果共享區(qū)域未滿骗卜,生產(chǎn)數(shù)據(jù)
num = random.uniform(0, 5)
buff.append(num)
print('生產(chǎn)者向共享區(qū)域加入%f' % num)
lock.release()
time.sleep(random.uniform(0, 10))
class Consumer(threading.Thread):
def run(self):
global buff
while True:
lock.acquire()
if buff:
# 如果共享區(qū)非空,消費(fèi)數(shù)據(jù)
num = buff.pop(0)
print('消費(fèi)者消費(fèi)掉%f' %num)
lock.release()
time.sleep(random.uniform(0, 10))
producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
producer.start()
consumer.start()
producer.join()
consumer.join()
except KeyboardInterrupt:
print('程序強(qiáng)制結(jié)束左胞!')
程序運(yùn)行結(jié)果如下:
生產(chǎn)者向共享區(qū)域加入1.653411
消費(fèi)者消費(fèi)掉1.653411
生產(chǎn)者向共享區(qū)域加入2.176285
生產(chǎn)者向共享區(qū)域加入4.727504
生產(chǎn)者向共享區(qū)域加入3.053323
消費(fèi)者消費(fèi)掉2.176285
生產(chǎn)者向共享區(qū)域加入0.951072
消費(fèi)者消費(fèi)掉4.727504
^C程序強(qiáng)制結(jié)束寇仓!
在程序中設(shè)置兩個(gè)進(jìn)程為守護(hù)進(jìn)程,并捕捉KeyboardInterrupt
錯(cuò)誤烤宙,一旦捕捉到就結(jié)束主線程遍烦,同時(shí)結(jié)束兩個(gè)子線程。上面是一個(gè)生產(chǎn)者消費(fèi)者模型的一個(gè)簡(jiǎn)單實(shí)現(xiàn)躺枕,通過(guò)共享變量的方式使兩個(gè)線程互相通信來(lái)達(dá)成一致服猪。共享變量是線程間通信的常用方法,只要記得在對(duì)共享變量進(jìn)行操作時(shí)加鎖拐云,程序就不會(huì)有問(wèn)題罢猪。
但是上面的代碼也有問(wèn)題,在于這種代碼通過(guò)無(wú)限對(duì)共享變量訪問(wèn)的方式進(jìn)行判斷空還是滿叉瘩,這樣也降低了效率坡脐。因?yàn)槠渲幸粋€(gè)程序在明明知道buff
滿了或者空了的情況下還要進(jìn)行無(wú)意義的循環(huán),由于GIL機(jī)制房揭,它會(huì)和其他線程爭(zhēng)奪執(zhí)行權(quán)。如果某一方在判斷buff
滿了或者空了的情況下主動(dòng)阻塞晌端,直到另外一方通知它捅暴,它才恢復(fù),這樣就能最大化的效率咧纠。
Python中threading
中的Condition
類就是來(lái)幫助我們完成這件事的蓬痒。它的wait
和notify
方法能夠阻塞和通知一個(gè)線程,下面還是通過(guò)例子來(lái)了解一下:
import threading
import time
import random
MAX_BUFF_LEN = 5
buff = []
condition = threading.Condition()
class Producer(threading.Thread):
def run(self):
global buff
while True:
condition.acquire()
if len(buff) < MAX_BUFF_LEN:
# 如果共享區(qū)域未滿漆羔,生產(chǎn)數(shù)據(jù)
num = random.uniform(0, 5)
buff.append(num)
print('生產(chǎn)者向共享區(qū)域加入%f' % num)
condition.notify()
else:
# 如果共享區(qū)滿梧奢,停止生產(chǎn)
print('共享區(qū)滿狱掂,生產(chǎn)者阻塞!')
condition.wait()
condition.release()
time.sleep(random.uniform(0, 10))
class Consumer(threading.Thread):
def run(self):
global buff
while True:
condition.acquire()
if buff:
# 如果共享區(qū)非空亲轨,消費(fèi)數(shù)據(jù)
num = buff.pop(0)
print('消費(fèi)者消費(fèi)掉%f' %num)
condition.notify()
else:
# 如果共享去空趋惨,停止消費(fèi)
print('共享區(qū)空,消費(fèi)者阻塞惦蚊!')
condition.wait()
condition.release()
time.sleep(random.uniform(0, 10))
producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
producer.start()
consumer.start()
producer.join()
consumer.join()
except KeyboardInterrupt:
print('程序強(qiáng)制結(jié)束器虾!')
程序結(jié)果:
生產(chǎn)者向共享區(qū)域加入0.040350
消費(fèi)者消費(fèi)掉0.040350
共享區(qū)空,消費(fèi)者阻塞蹦锋!
生產(chǎn)者向共享區(qū)域加入3.266167
消費(fèi)者消費(fèi)掉3.266167
生產(chǎn)者向共享區(qū)域加入3.468917
^C程序強(qiáng)制結(jié)束兆沙!
上面的代碼中,acquire
方法實(shí)際上是獲得鎖莉掂,wait
方法將線程阻塞葛圃,實(shí)際上是將鎖釋放。當(dāng)一個(gè)線程調(diào)用notify
方法時(shí)憎妙,另一個(gè)線程就被喚醒库正,但是這時(shí)候這個(gè)線程并沒(méi)有調(diào)用wait
或者release
方法釋放鎖,因此另一個(gè)線程雖然醒過(guò)來(lái)了但是還是沒(méi)有執(zhí)行尚氛,直到這個(gè)線程將鎖釋放诀诊。
在使用共享變量的時(shí)候,需要時(shí)刻注意是否線程安全阅嘶,非常不方便属瓣。好在是Python中提供了一個(gè)Queue
類,它是線程安全的讯柔,有了它我們可以把注意力放在如何實(shí)現(xiàn)代碼邏輯上抡蛙,而不是過(guò)多的注意到線程安全上。在Python2.7中該模塊名為Queue
魂迄,而在Python3.6中該模塊名為queue粗截。
使用Queue
類改進(jìn)的代碼如下:
import threading
import time
import random
from queue import Queue
MAX_BUFF_LEN = 5
buff = Queue(MAX_BUFF_LEN)
condition = threading.Condition()
class Producer(threading.Thread):
def run(self):
global buff
while True:
num = random.uniform(0, 5)
buff.put(num)
print('生產(chǎn)者向共享區(qū)域加入%f' % num)
time.sleep(random.uniform(0, 10))
class Consumer(threading.Thread):
def run(self):
global buff
while True:
num = buff.get()
print('消費(fèi)者消費(fèi)掉%f' %num)
time.sleep(random.uniform(0, 10))
producer = Producer()
consumer = Consumer()
producer.setDaemon(True)
consumer.setDaemon(True)
try:
producer.start()
consumer.start()
producer.join()
consumer.join()
except KeyboardInterrupt:
print('程序強(qiáng)制結(jié)束!')
Queue
是一個(gè)FIFO隊(duì)列捣炬,它的get
方法和put
方法分別是入隊(duì)和出隊(duì)熊昌,在入隊(duì)和出隊(duì)時(shí)獲取了鎖以保證線程安全,如果隊(duì)列空或者滿湿酸,默認(rèn)情況下get
方法和put
方法自動(dòng)阻塞婿屹。阻塞和喚醒的方式實(shí)質(zhì)上是調(diào)用了Condition
類的wait
和notify
方法。Queue
類比較簡(jiǎn)單推溃,推薦大家直接查看源碼或者官方文檔昂利。
這里還有一篇寫(xiě)得非常好的博客,推薦大家去看看:Producer-consumer problem in Python