概述
這篇博客是我翻譯Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues,這篇博客對(duì)Python多線程的集中實(shí)現(xiàn)同步機(jī)制及其遇到的一些問題,說明的淋漓盡致蕊唐。廢話少說,直接擼代碼惶室。
這篇文章詳細(xì)描述了python多線程機(jī)制劲装,包括Lock、RLock啊终,Semaphore疑苔,Condition,Event and Queue.下面一一通過代碼展示了這些同步機(jī)制的內(nèi)部細(xì)節(jié)甫匹。首先,讓我們看一個(gè)不適用任何同步的線程模塊惦费。
Python多線程的同步機(jī)制
threading
我們要編寫一個(gè)獲取通過一些URL的內(nèi)容并將其寫入到一個(gè)文件中赛惩。我們可以在沒有線程的情況下連續(xù)完成它,但是為了使它更快趁餐,我們可以創(chuàng)建兩個(gè)線程來處理.
# 繼承于高級(jí)線程庫
class FetchUrls(threading.Thread):
"""
Thread checking URLs.
"""
def __init__(self, urls, output):
"""
Constructor.
@param urls list of urls to check
@param output file to write urls output
"""
threading.Thread.__init__(self)
self.urls = urls
self.output = output
# 線程在start()后自動(dòng)調(diào)用run方法喷兼。
def run(self):
"""
Thread run method. Check URLs one by one.
"""
while self.urls:
url = self.urls.pop()
req = urllib2.Request(url)
try:
d = urllib2.urlopen(req)
except urllib2.URLError, e:
print 'URL %s failed: %s' % (url, e.reason)
self.output.write(d.read())
print 'write done by %s' % self.name
print 'URL %s fetched by %s' % (url, self.name)
def main():
# list 1 of urls to fetch
urls1 = ['http://www.google.com', 'http://www.facebook.com']
# list 2 of urls to fetch
urls2 = ['http://www.yahoo.com', 'http://www.youtube.com']
f = open('output.txt', 'w+')
t1 = FetchUrls(urls1, f)
t2 = FetchUrls(urls2, f)
t1.start()
t2.start()
t1.join()
t2.join()
f.close()
if __name__ == '__main__':
main()
#輸出為
write done by Thread-2
URL http://www.163.com fetched by Thread-2
write done by Thread-2
URL http://www.qq.com fetched by Thread-2
write done by Thread-1
URL http://www.sina.com.cn/ fetched by Thread-1
write done by Thread-1
URL http://www.baidu.com fetched by Thread-1
......
上述代碼會(huì)造成一個(gè)沖突,就是在兩個(gè)線程同時(shí)寫入到同一個(gè)文件后雷,內(nèi)容是混亂的季惯。我們需要控制的是在任何時(shí)刻只有一個(gè)線程在寫入文件吠各,一種實(shí)現(xiàn)方式是使用同步機(jī)制比如:鎖機(jī)制。
Lock
Lock擁有兩種狀態(tài):locked and unlocked勉抓,并通過acquire() and release()來改變狀態(tài)贾漏。有如下規(guī)則:
- 如果當(dāng)前狀態(tài)是unlocked狀態(tài),調(diào)用acquire()方法改變狀態(tài)為locked藕筋。
- 如果當(dāng)前狀態(tài)是locked狀態(tài)纵散,調(diào)用acquire()方法將會(huì)阻塞知道另一個(gè)線程調(diào)用release()方法。
- 如果當(dāng)前狀態(tài)是unlocked狀態(tài)隐圾,調(diào)用release()方法將會(huì)造成RuntiemError 異常伍掀。
- 如果當(dāng)前狀態(tài)是locked狀態(tài),調(diào)用release()方法改變狀態(tài)為unlocked暇藏。
為了解決兩個(gè)線程同時(shí)寫入同一個(gè)文件蜜笤,我們需要在程序里面引入lock機(jī)制,代碼如下:
import threading
import urllib2
class FetchUrls(threading.Thread):
"""
Thread checking URLs.
"""
def __init__(self, urls, output, lock):
"""
Constructor.
@param urls list of urls to check
@param output file to write urls output
"""
threading.Thread.__init__(self)
self.lock = lock
self.urls = urls
self.output = output
def run(self):
"""
Thread run method. Check URLs one by one.
"""
while self.urls:
self.lock.acquire()
url = self.urls.pop()
# req = urllib2.Request(url)
try:
# d = urllib2.urlopen(req)
self.output.writelines(url)
except urllib2.URLError, e:
print 'URL %s failed: %s' % (url, e.reason)
print 'write done by %s' % self.name
print 'URL %s fetched by %s' % (url, self.name)
self.lock.release()
def main():
lock = threading.Lock()
# list 1 of urls to fetch
urls1 = ['http://www.baidu.com\n'] * 10
# list 2 of urls to fetch
urls2 = ['http://www.qq.com\n'] * 10
f = open('output.txt', 'w+')
t1 = FetchUrls(urls1, f, lock)
t2 = FetchUrls(urls2, f, lock)
t1.start()
t2.start()
t1.join()
t2.join()
f.close()
if __name__ == '__main__':
main()
# 輸出為
write done by Thread-1
URL http://www.baidu.com
fetched by Thread-1
URL http://www.baidu.com
fetched by Thread-1
write done by Thread-1
URL http://www.baidu.com
fetched by Thread-1
write done by Thread-2
fetched by Thread-2
write done by Thread-2
URL http://www.qq.com
fetched by Thread-2
write done by Thread-2
URL http://www.qq.com
fetched by Thread-2
write done by Thread-2
......
我們通過代碼分析盐碱,沒有兩個(gè)線程同時(shí)操作同一個(gè)文件把兔,一般而言lock是一個(gè)global的變量。那么是如何實(shí)現(xiàn)的呢瓮顽?我們一起來看一下Python的內(nèi)部實(shí)現(xiàn)機(jī)制(Python 2.6.6 base Linux)县好。
# threading.Lock()等同于thread.allocate_lock,詳細(xì)的代碼在Lib/threading.py文件中暖混。
Lock = _allocate_lock
_allocate_lock = thread.allocate_lock
C語言的實(shí)現(xiàn)在Python模塊下的thread_pthread.h缕贡,我們假設(shè)我們的操作系統(tǒng)支持POSXI的信號(hào)量(semaphore),Lock是通過信號(hào)量的內(nèi)部機(jī)制實(shí)現(xiàn)的儒恋。在指向鎖的地址處sem_init()初始化一個(gè)信號(hào)量,該信號(hào)量的默認(rèn)初始值是1也就是unlocked狀態(tài)黔漂。該信號(hào)量在進(jìn)程內(nèi)的多線程之間共享诫尽。。代碼如下:
PyThread_type_lock
PyThread_allocate_lock(void)
{
...
// 動(dòng)態(tài)分配一個(gè)lock信號(hào)量
lock = (sem_t *)malloc(sizeof(sem_t));
// 如果動(dòng)態(tài)分配成功炬守,則初始化該信號(hào)量的value牧嫉。
if (lock) {
status = sem_init(lock,0,1);
CHECK_STATUS("sem_init");
....
}
...
}
當(dāng)調(diào)用acquire()時(shí),下面的代碼被執(zhí)行减途。waitflag默認(rèn)為1也就意味著調(diào)用后阻塞直到狀態(tài)變?yōu)閡nlocked狀態(tài)酣藻。sem_wait()減少信號(hào)量的value或者阻塞直到信號(hào)量的值大于1,例如通過另外一個(gè)線程將狀態(tài)變?yōu)閡nlocked(通過release()方法可以實(shí)現(xiàn))鳍置。
int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
...
do {
if (waitflag)
status = fix_status(sem_wait(thelock));
else
status = fix_status(sem_trywait(thelock));
} while (status == EINTR); /* Retry if interrupted by a signal */
....
}
當(dāng)release()調(diào)用時(shí)辽剧,下面代碼被執(zhí)行。sem_post增加信號(hào)量的值(unlock信號(hào)量的值).
void
PyThread_release_lock(PyThread_type_lock lock)
{
...
status = sem_post(thelock);
...
}
由于lock已經(jīng)實(shí)現(xiàn)了上下文協(xié)議管理器税产,所以你可以通過with來管理lock的上下文:
class FetchUrls(threading.Thread):
...
def run(self):
...
while self.urls:
...
with self.lock:
print 'lock acquired by %s' % self.name
self.output.write(d.read())
print 'write done by %s' % self.name
print 'lock released by %s' % self.name
...
總結(jié):Lock是阻塞其他線程對(duì)共享資源的訪問怕轿,且同一線程只能acquire一次偷崩,如多于一次就出現(xiàn)了死鎖,程序無法繼續(xù)執(zhí)行撞羽。為了保證線程對(duì)共享資源的獨(dú)占阐斜,又避免死鎖的出現(xiàn),就有了RLock诀紊。RLock允許在同一線程中被多次acquire谒出,線程對(duì)共享資源的釋放需要把所有鎖都release。即n次acquire邻奠,需要n次release笤喳。
RLock
雖然RLock也是使用thread.allocate_lock()方法,但是RLock附加了owner屬性(線程所有者)來支持reentrant(可重入)功能惕澎,下面是RLock的acquire()方法莉测,如果當(dāng)前線程是資源的擁有者則每次調(diào)用counter加1,如果不是唧喉,首先獲取lock捣卤,再將owner設(shè)置為當(dāng)前線程并初始化為1.
def acquire(self, blocking=1):
me = _get_ident()
if self.__owner == me:
self.__count = self.__count + 1
...
return 1
rc = self.__block.acquire(blocking)
if rc:
self.__owner = me
self.__count = 1
...
...
return rc
讓我們來看一下release()方法,當(dāng)該方法調(diào)用時(shí)八孝,它會(huì)確保該線程是lock的擁有者(owner)董朝,并且counter-1。如果counter等于0干跛,然后資源將被unlocked子姜,直到另一個(gè)線程搶占資源才變?yōu)橛行А4a如下:
def release(self):
if self.__owner != _get_ident():
raise RuntimeError("cannot release un-acquired lock")
self.__count = count = self.__count - 1
if not count:
self.__owner = None
self.__block.release()
...
...
下面我們看一個(gè)簡單的Demo楼入,
import threading,time
lock=threading.RLock()
result=[]
def func1():
global result
if lock.acquire():
result.append('func1')
time.sleep(1)
lock.release()
def func2():
global result
if lock.acquire():
result.append('step2')
time.sleep(1)
lock.release()
def create():
global result
if lock.acquire():
func1()
func2()
lock.release()
print result
def clear():
global result
if lock.acquire():
result=None
time.sleep(2)
lock.release()
print result
t1=threading.Thread(target= create)
t2=threading.Thread(target= clear)
t1.start()
t2.start()
t1.join()
t2.join()
Condition
一個(gè)線程在等待特定的條件而另一個(gè)線程表明這個(gè)特定條件已經(jīng)發(fā)生哥捕。只要條件發(fā)生,線程就需要獲得lock為了獨(dú)立的使用共享資源嘉熊,比如生產(chǎn)者-消費(fèi)者模式遥赚。在隨機(jī)時(shí)間內(nèi),一個(gè)生產(chǎn)者追加一個(gè)隨機(jī)整數(shù)到共享資源list阐肤;一個(gè)消費(fèi)者從這個(gè)共享資源list取出這些整數(shù)凫佛。
我們先來看一下生產(chǎn)者這個(gè)類,消費(fèi)者(線程)獲得lock孕惜,往列表里追加整數(shù)然后通知消費(fèi)者有東西需要從共享資源取出愧薛,最后釋放lock。生產(chǎn)者會(huì)在每次追加元素后隨機(jī)暫停一點(diǎn)時(shí)間衫画。
#! -*- coding: UTF-8 -*-
import threading
import random
import time
class Producer(threading.Thread):
"""
Produces random integers to a list
"""
def __init__(self, integers, condition):
"""
Constructor.
@param integers list of integers
@param condition condition synchronization object
"""
threading.Thread.__init__(self)
self.integers = integers
self.condition = condition
def run(self):
"""
Thread run method. Append random integers to the integers list
at random time.
"""
while True:
integer = random.randint(0, 256)
self.condition.acquire()
print 'condition acquired by %s' % self.name
self.integers.append(integer)
print '%d appended to list by %s' % (integer, self.name)
print 'condition notified by %s' % self.name
self.condition.notify()
print 'condition released by %s' % self.name
self.condition.release()
time.sleep(1)
再來看一下消費(fèi)者這個(gè)類毫炉,消費(fèi)者首先獲得lock,檢查共享資源是否有整數(shù)削罩。如果沒有就會(huì)通過wait()通知生產(chǎn)者需要生產(chǎn)數(shù)據(jù)碘箍,如果有整數(shù)就會(huì)取出數(shù)據(jù)最后釋放lock遵馆。
#! -*- coding: UTF-8 -*-
import threading
class Consumer(threading.Thread):
"""
Consumes random integers from a list
"""
def __init__(self, integers, condition):
"""
Constructor.
@param integers list of integers
@param condition condition synchronization object
"""
threading.Thread.__init__(self)
self.integers = integers
self.condition = condition
def run(self):
"""
Thread run method. Consumes integers from list
"""
while True:
self.condition.acquire()
print 'condition acquired by %s' % self.name
while True:
if self.integers:
integer = self.integers.pop()
print '%d popped from list by %s' % (integer, self.name)
break
print 'condition wait by %s' % self.name
self.condition.wait()
print 'condition released by %s' % self.name
self.condition.release()
每一次調(diào)用wait() 會(huì)釋放lock,所以生產(chǎn)者可以獲得lock來生產(chǎn)數(shù)據(jù)丰榴。
然后通過main方法調(diào)用货邓,我們來看一下輸出結(jié)果:
def main():
integers = []
condition = threading.Condition()
t1 = Producer(integers, condition)
t2 = Consumer(integers, condition)
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
###output
condition acquired by Thread-1
46 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
46 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
19 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
19 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
228 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
228 popped from list by Thread-2
......
輸出結(jié)果很多,我們簡單的來分析一下輸出結(jié)果四濒,Thread-1 獲得lock并追加46到共享資源list中换况,然后通知消費(fèi)者需要從共享資源里面取出資源并消費(fèi),最后釋放lock盗蟆。Thread-2獲得lock戈二,從共享資源取出46,最后釋放lock喳资。這時(shí)生產(chǎn)者仍然在等待(sleep(1)),所以消費(fèi)者Thread-2獲得lock觉吭,此時(shí)共享資源list沒有生產(chǎn)者生產(chǎn)出來的數(shù)據(jù),通過wait()通知生產(chǎn)者需要生產(chǎn)數(shù)據(jù)仆邓。當(dāng)wait被調(diào)用時(shí)鲜滩,消費(fèi)者解鎖共享資源以便于生產(chǎn)者來獲得它并生產(chǎn)數(shù)據(jù)即追加一個(gè)新的整數(shù)到共享資源list。
對(duì)于Conditio的同步機(jī)制节值,我們來看一下Python內(nèi)部實(shí)現(xiàn)機(jī)制徙硅。在構(gòu)造函數(shù)里面初始化了一個(gè)RLock,這個(gè)lock可以通過acquire()和release()控制lock的狀態(tài)搞疗。
class _Condition(_Verbose):
def __init__(self, lock=None, verbose=None):
_Verbose.__init__(self, verbose)
if lock is None:
lock = RLock()
self.__lock = lock
我們?cè)賮砜匆幌聎ait()方法嗓蘑,我們假設(shè)調(diào)用的wait方法沒有任何timeout的值,只是簡單的解釋wait方法的代碼匿乃。一個(gè)名為waiter的新lock被創(chuàng)建并且狀態(tài)是locked桩皿。waiter常用于線程間的通信,所以生產(chǎn)者可以通知消費(fèi)者通過釋放waiter lock幢炸。Lock對(duì)象被加到waiters的列表中并且在waiter.acquire()后泄隔,方法馬上阻塞。值得注意的是condition lock在開始時(shí)lock的state被保存阳懂,當(dāng)wait()返回時(shí)lock state被重置梅尤。
def wait(self, timeout=None):
...
waiter = _allocate_lock()
waiter.acquire()
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if timeout is None:
waiter.acquire()
...
...
finally:
self._acquire_restore(saved_state)
notify方法常用于釋放waiter lock柜思,生產(chǎn)者調(diào)用notify()通知阻塞在wait()的消費(fèi)者岩调。
def notify(self, n=1):
...
__waiters = self.__waiters
waiters = __waiters[:n]
...
for waiter in waiters:
waiter.release()
try:
__waiters.remove(waiter)
except ValueError:
pass
由于condition也實(shí)現(xiàn)了上下文管理器,所以我們也可以通過with來處理赡盘。
class Producer(threading.Thread):
...
def run(self):
while True:
integer = random.randint(0, 256)
with self.condition:
print 'condition acquired by %s' % self.name
self.integers.append(integer)
print '%d appended to list by %s' % (integer, self.name)
print 'condition notified by %s' % self.name
self.condition.notify()
print 'condition released by %s' % self.name
time.sleep(1)
class Consumer(threading.Thread):
...
def run(self):
while True:
with self.condition:
print 'condition acquired by %s' % self.name
while True:
if self.integers:
integer = self.integers.pop()
print '%d popped from list by %s' % (integer, self.name)
break
print 'condition wait by %s' % self.name
self.condition.wait()
print 'condition released by %s' % self.name
Semaphore
信號(hào)量是基于內(nèi)部計(jì)數(shù)器counter号枕,每次acquire()被調(diào)用時(shí)counter減1,每次release()被調(diào)用計(jì)數(shù)器加1陨享。如果counter==0葱淳,再去調(diào)用acquire()將阻塞钝腺。這是Python內(nèi)部實(shí)現(xiàn)了Dijkstra的信號(hào)概念:P()和V()原語.當(dāng)您想要像服務(wù)器一樣控制對(duì)資源的訪問時(shí),使用信號(hào)量是有意義的赞厕。
semaphore = threading.Semaphore()
semaphore.acquire()
# work on a shared resource
...
semaphore.release()
讓我們來看一下Python內(nèi)部實(shí)現(xiàn)的細(xì)節(jié)艳狐。構(gòu)造函數(shù)獲取一個(gè)值,這個(gè)值是counter的初始值皿桑。這個(gè)值得初始化為1毫目,一個(gè)condition的instance用一個(gè)lock被創(chuàng)建去保護(hù)這個(gè)counter的instance,并且當(dāng)這個(gè)信號(hào)量被釋放時(shí)通知另一個(gè)線程诲侮。我們來看一下詳細(xì)的代碼镀虐。
class _Semaphore(_Verbose):
...
def __init__(self, value=1, verbose=None):
_Verbose.__init__(self, verbose)
self.__cond = Condition(Lock())
self.__value = value
我們來看一下acquire()方法,如果信號(hào)量的counter等于0沟绪,它一直阻塞在condition的wait()方法直到它得到另一個(gè)線程的通知刮便。如果信號(hào)量的counter大于0,它將減少counter的值绽慈。我們來看一下代碼:
def acquire(self, blocking=1):
rc = False
self.__cond.acquire()
while self.__value == 0:
...
self.__cond.wait()
else:
self.__value = self.__value - 1
rc = True
self.__cond.release()
return rc
信號(hào)量的release()方法增加counter的值并且然后通知其他的線程恨旱。
def release(self):
self.__cond.acquire()
self.__value = self.__value + 1
self.__cond.notify()
self.__cond.release()
值得注意一點(diǎn)是,一個(gè)有界的信號(hào)量用來確保信號(hào)量不會(huì)多次調(diào)用release(),下面是Python的內(nèi)部代碼:
class _BoundedSemaphore(_Semaphore):
"""Semaphore that checks that # releases is <= # acquires"""
def __init__(self, value=1, verbose=None):
_Semaphore.__init__(self, value, verbose)
self._initial_value = value
def release(self):
if self._Semaphore__value >= self._initial_value:
raise ValueError, "Semaphore released too many times"
return _Semaphore.release(self)
你同樣可以通過with語句來管理信號(hào)量對(duì)象.
semaphore = threading.Semaphore()
with semaphore:
# work on a shared resource
...
Event
這是一個(gè)相對(duì)于前面幾種方法是一個(gè)比較簡單的機(jī)制久信。一個(gè)線程發(fā)出一個(gè)event的信號(hào)并且其他的線程等待它窖杀。讓我重新返回到我們的生產(chǎn)者和消費(fèi)者的例子,并用event代替condition裙士。首先來看一下生產(chǎn)者的類入客,我們傳遞一個(gè)Event的實(shí)例給構(gòu)造函數(shù)代替原來的Condition實(shí)例。每次一個(gè)整數(shù)倍追加到list腿椎,event被設(shè)置桌硫,然后立即清除通知消費(fèi)者。默認(rèn)情況下event 實(shí)例是被清除狀態(tài)啃炸。
class Producer(threading.Thread):
"""
Produces random integers to a list
"""
def __init__(self, integers, event):
"""
Constructor.
@param integers list of integers
@param event event synchronization object
"""
threading.Thread.__init__(self)
self.integers = integers
self.event = event
def run(self):
"""
Thread run method. Append random integers to the integers list
at random time.
"""
while True:
integer = random.randint(0, 256)
self.integers.append(integer)
print '%d appended to list by %s' % (integer, self.name)
print 'event set by %s' % self.name
self.event.set()
self.event.clear()
print 'event cleared by %s' % self.name
time.sleep(1)
下面我們來看一下消費(fèi)者類铆隘。我們同樣創(chuàng)第一個(gè)event的實(shí)例給構(gòu)造函數(shù)。消費(fèi)者實(shí)例將一直阻塞在wait()方法直到event被設(shè)置即調(diào)用set()表明有個(gè)整數(shù)需要被消費(fèi)掉南用。
class Consumer(threading.Thread):
"""
Consumes random integers from a list
"""
def __init__(self, integers, event):
"""
Constructor.
@param integers list of integers
@param event event synchronization object
"""
threading.Thread.__init__(self)
self.integers = integers
self.event = event
def run(self):
"""
Thread run method. Consumes integers from list
"""
while True:
self.event.wait()
try:
integer = self.integers.pop()
print '%d popped from list by %s' % (integer, self.name)
except IndexError:
# catch pop on empty list
time.sleep(1)
讓我們來看一下Python的內(nèi)部細(xì)節(jié)膀钠。首先來看一下Event的構(gòu)造函數(shù)。使用lock創(chuàng)建event的實(shí)例裹虫,以保護(hù)event flag值肿嘲,并在設(shè)置(set()) event時(shí)通知其他線程。
class _Event(_Verbose):
def __init__(self, verbose=None):
_Verbose.__init__(self, verbose)
self.__cond = Condition(Lock())
self.__flag = False
下面是set()方法筑公。它設(shè)置flag值為True并通知其他的線程雳窟。當(dāng)flag的值發(fā)生變化或者被改變時(shí),condition的對(duì)象常用于保護(hù)極其重要的部分匣屡。
def set(self):
self.__cond.acquire()
try:
self.__flag = True
self.__cond.notify_all()
finally:
self.__cond.release()
clear()方法設(shè)置flag的值為Fale封救。
def clear(self):
self.__cond.acquire()
try:
self.__flag = False
finally:
self.__cond.release()
wait()方法一直阻塞直到set()方法被調(diào)用拇涤。如果event的flag已經(jīng)被設(shè)置了值,那么wait()方法什么也不做誉结。
def wait(self, timeout=None):
self.__cond.acquire()
try:
if not self.__flag:
self.__cond.wait(timeout)
finally:
self.__cond.release()
Queue
Queue是一種有效的機(jī)制鹅士,特別是當(dāng)我們需要在線程之間交換一些數(shù)據(jù)。有四個(gè)主要的方法分別如下:
1.put:放入一個(gè)item到隊(duì)列中惩坑。
2.get:從隊(duì)列中刪除一個(gè)item如绸,并返回刪除的item。
3.task_done:每次一個(gè)item被處理完旭贬,就需要調(diào)用該方法怔接。
4.join:一直阻塞直到所有的items都被處理完。
當(dāng)你熟悉了這幾個(gè)重要的方法稀轨,那么我們改寫成Queue的方法就很簡單扼脐。
class Producer(threading.Thread):
"""
Produces random integers to a list
"""
def __init__(self, queue):
"""
Constructor.
@param integers list of integers
@param queue queue synchronization object
"""
threading.Thread.__init__(self)
self.queue = queue
def run(self):
"""
Thread run method. Append random integers to the integers list at
random time.
"""
while True:
integer = random.randint(0, 256)
self.queue.put(integer)
print '%d put to queue by %s' % (integer, self.name)
time.sleep(1)
class Consumer(threading.Thread):
"""
Consumes random integers from a list
"""
def __init__(self, queue):
"""
Constructor.
@param integers list of integers
@param queue queue synchronization object
"""
threading.Thread.__init__(self)
self.queue = queue
def run(self):
"""
Thread run method. Consumes integers from list
"""
while True:
integer = self.queue.get()
print '%d popped from list by %s' % (integer, self.name)
self.queue.task_done()
Queue對(duì)于我們來說是一個(gè)非常好的機(jī)制。它把lock的機(jī)制替我們實(shí)現(xiàn)了奋刽,我們不需要關(guān)心和重新實(shí)現(xiàn)瓦侮,只需要專注于我們自己的業(yè)務(wù)邏輯。這是一個(gè)很大的優(yōu)勢佣谐。我們來看一下Python內(nèi)部是如何實(shí)現(xiàn)的肚吏?
當(dāng)一個(gè)元素被增加或者被刪除時(shí),為了保護(hù)隊(duì)列Queue的構(gòu)造函數(shù)創(chuàng)建了一個(gè)lock對(duì)象狭魂。一些condition對(duì)象被創(chuàng)建去通知一些events罚攀,比如:queue is not empty(get() call stops blocking),queue is not full(put() call stops blocking) and all items have been processed(join() call stops blocking).
class Queue:
def __init__(self, maxsize=0):
...
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
put()往隊(duì)列增加一個(gè)元素,如果隊(duì)列滿時(shí)將等待雌澄。put通知多線程要從隊(duì)列里面取出數(shù)據(jù)斋泄,如果該隊(duì)列不是空時(shí)將阻塞在get方法。我們來看一下代碼:
def put(self, item, block=True, timeout=None):
...
self.not_full.acquire()
try:
if self.maxsize > 0:
...
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
get()方法是從隊(duì)列中刪除一個(gè)元素并返回刪除的元素镐牺,但是如果該隊(duì)列為空時(shí)則等待炫掐。get取出元素后如果該隊(duì)列還不滿則通知所有線程。
def get(self, block=True, timeout=None):
...
self.not_empty.acquire()
try:
...
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
當(dāng)take_done調(diào)用時(shí)睬涧,未完成的數(shù)量自減1.如果計(jì)數(shù)等于0募胃,然后線程等待隊(duì)列的join()方法繼續(xù)執(zhí)行。
def task_done(self):
self.all_tasks_done.acquire()
try:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def join(self):
self.all_tasks_done.acquire()
try:
while self.unfinished_tasks:
self.all_tasks_done.wait()
finally:
self.all_tasks_done.release()
總結(jié)
文章比較長畦浓,但是對(duì)Python的同步機(jī)制講解的淋漓盡致痹束。如果本文對(duì)你有幫助,請(qǐng)打賞一下宅粥。