Python 多線程下的多種同步機(jī)制

概述

這篇博客是我翻譯Python threads synchronization: Locks, RLocks, Semaphores, Conditions, Events and Queues,這篇博客對(duì)Python多線程的集中實(shí)現(xiàn)同步機(jī)制及其遇到的一些問題,說明的淋漓盡致蕊唐。廢話少說,直接擼代碼惶室。
這篇文章詳細(xì)描述了python多線程機(jī)制劲装,包括Lock、RLock啊终,Semaphore疑苔,Condition,Event and Queue.下面一一通過代碼展示了這些同步機(jī)制的內(nèi)部細(xì)節(jié)甫匹。首先,讓我們看一個(gè)不適用任何同步的線程模塊惦费。

Python多線程的同步機(jī)制

threading

我們要編寫一個(gè)獲取通過一些URL的內(nèi)容并將其寫入到一個(gè)文件中赛惩。我們可以在沒有線程的情況下連續(xù)完成它,但是為了使它更快趁餐,我們可以創(chuàng)建兩個(gè)線程來處理.

# 繼承于高級(jí)線程庫
class FetchUrls(threading.Thread):
    """
    Thread checking URLs.
    """

    def __init__(self, urls, output):
        """
        Constructor.

        @param urls list of urls to check
        @param output file to write urls output
        """
        threading.Thread.__init__(self)
        self.urls = urls
        self.output = output
    # 線程在start()后自動(dòng)調(diào)用run方法喷兼。
    def run(self):
        """
        Thread run method. Check URLs one by one.
        """
        while self.urls:
            url = self.urls.pop()
            req = urllib2.Request(url)
            try:
                d = urllib2.urlopen(req)
            except urllib2.URLError, e:
                print 'URL %s failed: %s' % (url, e.reason)
            self.output.write(d.read())
            print 'write done by %s' % self.name
            print 'URL %s fetched by %s' % (url, self.name)
def main():
    # list 1 of urls to fetch
    urls1 = ['http://www.google.com', 'http://www.facebook.com']
    # list 2 of urls to fetch
    urls2 = ['http://www.yahoo.com', 'http://www.youtube.com']
    f = open('output.txt', 'w+')
    t1 = FetchUrls(urls1, f)
    t2 = FetchUrls(urls2, f)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    f.close()

if __name__ == '__main__':
    main()
#輸出為
write done by Thread-2
URL http://www.163.com fetched by Thread-2
write done by Thread-2
URL http://www.qq.com fetched by Thread-2
write done by Thread-1
URL http://www.sina.com.cn/ fetched by Thread-1
write done by Thread-1
URL http://www.baidu.com fetched by Thread-1
......

上述代碼會(huì)造成一個(gè)沖突,就是在兩個(gè)線程同時(shí)寫入到同一個(gè)文件后雷,內(nèi)容是混亂的季惯。我們需要控制的是在任何時(shí)刻只有一個(gè)線程在寫入文件吠各,一種實(shí)現(xiàn)方式是使用同步機(jī)制比如:鎖機(jī)制。

Lock

Lock擁有兩種狀態(tài):locked and unlocked勉抓,并通過acquire() and release()來改變狀態(tài)贾漏。有如下規(guī)則:

  • 如果當(dāng)前狀態(tài)是unlocked狀態(tài),調(diào)用acquire()方法改變狀態(tài)為locked藕筋。
  • 如果當(dāng)前狀態(tài)是locked狀態(tài)纵散,調(diào)用acquire()方法將會(huì)阻塞知道另一個(gè)線程調(diào)用release()方法。
  • 如果當(dāng)前狀態(tài)是unlocked狀態(tài)隐圾,調(diào)用release()方法將會(huì)造成RuntiemError 異常伍掀。
  • 如果當(dāng)前狀態(tài)是locked狀態(tài),調(diào)用release()方法改變狀態(tài)為unlocked暇藏。
    為了解決兩個(gè)線程同時(shí)寫入同一個(gè)文件蜜笤,我們需要在程序里面引入lock機(jī)制,代碼如下:
import threading
import urllib2


class FetchUrls(threading.Thread):
    """
    Thread checking URLs.
    """

    def __init__(self, urls, output, lock):
        """
        Constructor.

        @param urls list of urls to check
        @param output file to write urls output
        """
        threading.Thread.__init__(self)
        self.lock = lock
        self.urls = urls
        self.output = output

    def run(self):
        """
        Thread run method. Check URLs one by one.
        """
        while self.urls:
            self.lock.acquire()
            url = self.urls.pop()
            # req = urllib2.Request(url)
            try:
                # d = urllib2.urlopen(req)
                self.output.writelines(url)
            except urllib2.URLError, e:
                print 'URL %s failed: %s' % (url, e.reason)

            print 'write done by %s' % self.name
            print 'URL %s fetched by %s' % (url, self.name)
            self.lock.release()

def main():
    lock = threading.Lock()
    # list 1 of urls to fetch
    urls1 = ['http://www.baidu.com\n'] * 10
    # list 2 of urls to fetch
    urls2 = ['http://www.qq.com\n'] * 10
    f = open('output.txt', 'w+')
    t1 = FetchUrls(urls1, f, lock)
    t2 = FetchUrls(urls2, f, lock)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    f.close()

if __name__ == '__main__':
    main()
# 輸出為
write done by Thread-1
URL http://www.baidu.com
 fetched by Thread-1
URL http://www.baidu.com
 fetched by Thread-1
write done by Thread-1
URL http://www.baidu.com
 fetched by Thread-1
write done by Thread-2
 fetched by Thread-2
write done by Thread-2
URL http://www.qq.com
 fetched by Thread-2
write done by Thread-2
URL http://www.qq.com
 fetched by Thread-2
write done by Thread-2
......

我們通過代碼分析盐碱,沒有兩個(gè)線程同時(shí)操作同一個(gè)文件把兔,一般而言lock是一個(gè)global的變量。那么是如何實(shí)現(xiàn)的呢瓮顽?我們一起來看一下Python的內(nèi)部實(shí)現(xiàn)機(jī)制(Python 2.6.6 base Linux)县好。

# threading.Lock()等同于thread.allocate_lock,詳細(xì)的代碼在Lib/threading.py文件中暖混。
Lock = _allocate_lock
_allocate_lock = thread.allocate_lock

C語言的實(shí)現(xiàn)在Python模塊下的thread_pthread.h缕贡,我們假設(shè)我們的操作系統(tǒng)支持POSXI的信號(hào)量(semaphore),Lock是通過信號(hào)量的內(nèi)部機(jī)制實(shí)現(xiàn)的儒恋。在指向鎖的地址處sem_init()初始化一個(gè)信號(hào)量,該信號(hào)量的默認(rèn)初始值是1也就是unlocked狀態(tài)黔漂。該信號(hào)量在進(jìn)程內(nèi)的多線程之間共享诫尽。。代碼如下:

PyThread_type_lock
PyThread_allocate_lock(void)
{
    ...
    // 動(dòng)態(tài)分配一個(gè)lock信號(hào)量
    lock = (sem_t *)malloc(sizeof(sem_t));
    // 如果動(dòng)態(tài)分配成功炬守,則初始化該信號(hào)量的value牧嫉。
    if (lock) {
        status = sem_init(lock,0,1);
        CHECK_STATUS("sem_init");
        ....
    }
    ...
}

當(dāng)調(diào)用acquire()時(shí),下面的代碼被執(zhí)行减途。waitflag默認(rèn)為1也就意味著調(diào)用后阻塞直到狀態(tài)變?yōu)閡nlocked狀態(tài)酣藻。sem_wait()減少信號(hào)量的value或者阻塞直到信號(hào)量的值大于1,例如通過另外一個(gè)線程將狀態(tài)變?yōu)閡nlocked(通過release()方法可以實(shí)現(xiàn))鳍置。

int
PyThread_acquire_lock(PyThread_type_lock lock, int waitflag)
{
    ...
    do {
        if (waitflag)
            status = fix_status(sem_wait(thelock));
        else
            status = fix_status(sem_trywait(thelock));
    } while (status == EINTR); /* Retry if interrupted by a signal */
    ....
}

當(dāng)release()調(diào)用時(shí)辽剧,下面代碼被執(zhí)行。sem_post增加信號(hào)量的值(unlock信號(hào)量的值).

void
PyThread_release_lock(PyThread_type_lock lock)
{
    ...
    status = sem_post(thelock);
    ...
}

由于lock已經(jīng)實(shí)現(xiàn)了上下文協(xié)議管理器税产,所以你可以通過with來管理lock的上下文:

class FetchUrls(threading.Thread):
    ...
    def run(self):
        ...
        while self.urls:
            ...
            with self.lock:
                print 'lock acquired by %s' % self.name
                self.output.write(d.read())
                print 'write done by %s' % self.name
                print 'lock released by %s' % self.name
            ...

總結(jié):Lock是阻塞其他線程對(duì)共享資源的訪問怕轿,且同一線程只能acquire一次偷崩,如多于一次就出現(xiàn)了死鎖,程序無法繼續(xù)執(zhí)行撞羽。為了保證線程對(duì)共享資源的獨(dú)占阐斜,又避免死鎖的出現(xiàn),就有了RLock诀紊。RLock允許在同一線程中被多次acquire谒出,線程對(duì)共享資源的釋放需要把所有鎖都release。即n次acquire邻奠,需要n次release笤喳。

RLock

雖然RLock也是使用thread.allocate_lock()方法,但是RLock附加了owner屬性(線程所有者)來支持reentrant(可重入)功能惕澎,下面是RLock的acquire()方法莉测,如果當(dāng)前線程是資源的擁有者則每次調(diào)用counter加1,如果不是唧喉,首先獲取lock捣卤,再將owner設(shè)置為當(dāng)前線程并初始化為1.

def acquire(self, blocking=1):
    me = _get_ident()
    if self.__owner == me:
        self.__count = self.__count + 1
        ...
        return 1
    rc = self.__block.acquire(blocking)
    if rc:
        self.__owner = me
        self.__count = 1
        ...
    ...
    return rc

讓我們來看一下release()方法,當(dāng)該方法調(diào)用時(shí)八孝,它會(huì)確保該線程是lock的擁有者(owner)董朝,并且counter-1。如果counter等于0干跛,然后資源將被unlocked子姜,直到另一個(gè)線程搶占資源才變?yōu)橛行А4a如下:

def release(self):
    if self.__owner != _get_ident():
        raise RuntimeError("cannot release un-acquired lock")
    self.__count = count = self.__count - 1
    if not count:
        self.__owner = None
        self.__block.release()
        ...
    ...

下面我們看一個(gè)簡單的Demo楼入,

import threading,time
lock=threading.RLock()
result=[]


def func1():
    global result
    if lock.acquire():
        result.append('func1')
        time.sleep(1)
        lock.release()
def func2():
    global result
    if lock.acquire():
        result.append('step2')
        time.sleep(1)
        lock.release()

def create():
    global result
    if lock.acquire():
        func1()
        func2()
        lock.release()
print result

def clear():
    global result
    if lock.acquire():
        result=None
        time.sleep(2)
        lock.release()
    print result

t1=threading.Thread(target= create)
t2=threading.Thread(target= clear)

t1.start()
t2.start()
t1.join()
t2.join()

Condition

一個(gè)線程在等待特定的條件而另一個(gè)線程表明這個(gè)特定條件已經(jīng)發(fā)生哥捕。只要條件發(fā)生,線程就需要獲得lock為了獨(dú)立的使用共享資源嘉熊,比如生產(chǎn)者-消費(fèi)者模式遥赚。在隨機(jī)時(shí)間內(nèi),一個(gè)生產(chǎn)者追加一個(gè)隨機(jī)整數(shù)到共享資源list阐肤;一個(gè)消費(fèi)者從這個(gè)共享資源list取出這些整數(shù)凫佛。
我們先來看一下生產(chǎn)者這個(gè)類,消費(fèi)者(線程)獲得lock孕惜,往列表里追加整數(shù)然后通知消費(fèi)者有東西需要從共享資源取出愧薛,最后釋放lock。生產(chǎn)者會(huì)在每次追加元素后隨機(jī)暫停一點(diǎn)時(shí)間衫画。

#! -*- coding: UTF-8 -*-

import threading
import random
import time


class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, integers, condition):
        """
        Constructor.

        @param integers list of integers
        @param condition condition synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.condition = condition

    def run(self):
        """
        Thread run method. Append random integers to the integers list
        at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.condition.acquire()
            print 'condition acquired by %s' % self.name
            self.integers.append(integer)
            print '%d appended to list by %s' % (integer, self.name)
            print 'condition notified by %s' % self.name
            self.condition.notify()
            print 'condition released by %s' % self.name
            self.condition.release()
            time.sleep(1)

再來看一下消費(fèi)者這個(gè)類毫炉,消費(fèi)者首先獲得lock,檢查共享資源是否有整數(shù)削罩。如果沒有就會(huì)通過wait()通知生產(chǎn)者需要生產(chǎn)數(shù)據(jù)碘箍,如果有整數(shù)就會(huì)取出數(shù)據(jù)最后釋放lock遵馆。

#! -*- coding: UTF-8 -*-

import threading


class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, integers, condition):
        """
        Constructor.

        @param integers list of integers
        @param condition condition synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.condition = condition

    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            self.condition.acquire()
            print 'condition acquired by %s' % self.name
            while True:
                if self.integers:
                    integer = self.integers.pop()
                    print '%d popped from list by %s' % (integer, self.name)
                    break
                print 'condition wait by %s' % self.name
                self.condition.wait()
            print 'condition released by %s' % self.name
            self.condition.release()

每一次調(diào)用wait() 會(huì)釋放lock,所以生產(chǎn)者可以獲得lock來生產(chǎn)數(shù)據(jù)丰榴。
然后通過main方法調(diào)用货邓,我們來看一下輸出結(jié)果:

def main():
    integers = []
    condition = threading.Condition()
    t1 = Producer(integers, condition)
    t2 = Consumer(integers, condition)
    t1.start()
    t2.start()
    t1.join()
    t2.join()


if __name__ == '__main__':
    main()
###output
condition acquired by Thread-1
46 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
condition acquired by Thread-2
46 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
19 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
19 popped from list by Thread-2
condition released by Thread-2
condition acquired by Thread-2
condition wait by Thread-2
condition acquired by Thread-1
228 appended to list by Thread-1
condition notified by Thread-1
condition released by Thread-1
228 popped from list by Thread-2
......

輸出結(jié)果很多,我們簡單的來分析一下輸出結(jié)果四濒,Thread-1 獲得lock并追加46到共享資源list中换况,然后通知消費(fèi)者需要從共享資源里面取出資源并消費(fèi),最后釋放lock盗蟆。Thread-2獲得lock戈二,從共享資源取出46,最后釋放lock喳资。這時(shí)生產(chǎn)者仍然在等待(sleep(1)),所以消費(fèi)者Thread-2獲得lock觉吭,此時(shí)共享資源list沒有生產(chǎn)者生產(chǎn)出來的數(shù)據(jù),通過wait()通知生產(chǎn)者需要生產(chǎn)數(shù)據(jù)仆邓。當(dāng)wait被調(diào)用時(shí)鲜滩,消費(fèi)者解鎖共享資源以便于生產(chǎn)者來獲得它并生產(chǎn)數(shù)據(jù)即追加一個(gè)新的整數(shù)到共享資源list。
對(duì)于Conditio的同步機(jī)制节值,我們來看一下Python內(nèi)部實(shí)現(xiàn)機(jī)制徙硅。在構(gòu)造函數(shù)里面初始化了一個(gè)RLock,這個(gè)lock可以通過acquire()和release()控制lock的狀態(tài)搞疗。

class _Condition(_Verbose):

    def __init__(self, lock=None, verbose=None):
        _Verbose.__init__(self, verbose)
        if lock is None:
            lock = RLock()
        self.__lock = lock

我們?cè)賮砜匆幌聎ait()方法嗓蘑,我們假設(shè)調(diào)用的wait方法沒有任何timeout的值,只是簡單的解釋wait方法的代碼匿乃。一個(gè)名為waiter的新lock被創(chuàng)建并且狀態(tài)是locked桩皿。waiter常用于線程間的通信,所以生產(chǎn)者可以通知消費(fèi)者通過釋放waiter lock幢炸。Lock對(duì)象被加到waiters的列表中并且在waiter.acquire()后泄隔,方法馬上阻塞。值得注意的是condition lock在開始時(shí)lock的state被保存阳懂,當(dāng)wait()返回時(shí)lock state被重置梅尤。

def wait(self, timeout=None):
    ...
    waiter = _allocate_lock()
    waiter.acquire()
    self.__waiters.append(waiter)
    saved_state = self._release_save()
    try:    # restore state no matter what (e.g., KeyboardInterrupt)
        if timeout is None:
            waiter.acquire()
            ...
        ...
    finally:
        self._acquire_restore(saved_state)

notify方法常用于釋放waiter lock柜思,生產(chǎn)者調(diào)用notify()通知阻塞在wait()的消費(fèi)者岩调。

def notify(self, n=1):
    ...
    __waiters = self.__waiters
    waiters = __waiters[:n]
    ...
    for waiter in waiters:
        waiter.release()
        try:
            __waiters.remove(waiter)
        except ValueError:
            pass

由于condition也實(shí)現(xiàn)了上下文管理器,所以我們也可以通過with來處理赡盘。

class Producer(threading.Thread):
    ...
    def run(self):
        while True:
            integer = random.randint(0, 256)
            with self.condition:
                print 'condition acquired by %s' % self.name
                self.integers.append(integer) 
                print '%d appended to list by %s' % (integer, self.name)
                print 'condition notified by %s' % self.name
                self.condition.notify()
                print 'condition released by %s' % self.name
            time.sleep(1)

class Consumer(threading.Thread):
    ... 
    def run(self):
        while True:
            with self.condition:
                print 'condition acquired by %s' % self.name
                while True:
                    if self.integers:
                        integer = self.integers.pop()
                        print '%d popped from list by %s' % (integer, self.name)
                        break
                    print 'condition wait by %s' % self.name
                    self.condition.wait()
                print 'condition released by %s' % self.name

Semaphore

信號(hào)量是基于內(nèi)部計(jì)數(shù)器counter号枕,每次acquire()被調(diào)用時(shí)counter減1,每次release()被調(diào)用計(jì)數(shù)器加1陨享。如果counter==0葱淳,再去調(diào)用acquire()將阻塞钝腺。這是Python內(nèi)部實(shí)現(xiàn)了Dijkstra的信號(hào)概念:P()和V()原語.當(dāng)您想要像服務(wù)器一樣控制對(duì)資源的訪問時(shí),使用信號(hào)量是有意義的赞厕。

semaphore = threading.Semaphore()
semaphore.acquire()
# work on a shared resource
...
semaphore.release()

讓我們來看一下Python內(nèi)部實(shí)現(xiàn)的細(xì)節(jié)艳狐。構(gòu)造函數(shù)獲取一個(gè)值,這個(gè)值是counter的初始值皿桑。這個(gè)值得初始化為1毫目,一個(gè)condition的instance用一個(gè)lock被創(chuàng)建去保護(hù)這個(gè)counter的instance,并且當(dāng)這個(gè)信號(hào)量被釋放時(shí)通知另一個(gè)線程诲侮。我們來看一下詳細(xì)的代碼镀虐。

class _Semaphore(_Verbose):
    ...    
    def __init__(self, value=1, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__value = value

我們來看一下acquire()方法,如果信號(hào)量的counter等于0沟绪,它一直阻塞在condition的wait()方法直到它得到另一個(gè)線程的通知刮便。如果信號(hào)量的counter大于0,它將減少counter的值绽慈。我們來看一下代碼:

def acquire(self, blocking=1):
    rc = False
    self.__cond.acquire()
    while self.__value == 0:
        ...
        self.__cond.wait()
    else:
        self.__value = self.__value - 1
        rc = True
    self.__cond.release()
    return rc

信號(hào)量的release()方法增加counter的值并且然后通知其他的線程恨旱。

def release(self):
    self.__cond.acquire()
    self.__value = self.__value + 1
    self.__cond.notify()
    self.__cond.release()

值得注意一點(diǎn)是,一個(gè)有界的信號(hào)量用來確保信號(hào)量不會(huì)多次調(diào)用release(),下面是Python的內(nèi)部代碼:

class _BoundedSemaphore(_Semaphore):
    """Semaphore that checks that # releases is <= # acquires"""
    def __init__(self, value=1, verbose=None):
        _Semaphore.__init__(self, value, verbose)
        self._initial_value = value

    def release(self):
        if self._Semaphore__value >= self._initial_value:
            raise ValueError, "Semaphore released too many times"
        return _Semaphore.release(self)

你同樣可以通過with語句來管理信號(hào)量對(duì)象.

semaphore = threading.Semaphore()
with semaphore:
  # work on a shared resource
  ...

Event

這是一個(gè)相對(duì)于前面幾種方法是一個(gè)比較簡單的機(jī)制久信。一個(gè)線程發(fā)出一個(gè)event的信號(hào)并且其他的線程等待它窖杀。讓我重新返回到我們的生產(chǎn)者和消費(fèi)者的例子,并用event代替condition裙士。首先來看一下生產(chǎn)者的類入客,我們傳遞一個(gè)Event的實(shí)例給構(gòu)造函數(shù)代替原來的Condition實(shí)例。每次一個(gè)整數(shù)倍追加到list腿椎,event被設(shè)置桌硫,然后立即清除通知消費(fèi)者。默認(rèn)情況下event 實(shí)例是被清除狀態(tài)啃炸。

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, integers, event):
        """
        Constructor.

        @param integers list of integers
        @param event event synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.event = event
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list
        at random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.integers.append(integer) 
            print '%d appended to list by %s' % (integer, self.name)
            print 'event set by %s' % self.name
            self.event.set()
            self.event.clear()
            print 'event cleared by %s' % self.name
            time.sleep(1)

下面我們來看一下消費(fèi)者類铆隘。我們同樣創(chuàng)第一個(gè)event的實(shí)例給構(gòu)造函數(shù)。消費(fèi)者實(shí)例將一直阻塞在wait()方法直到event被設(shè)置即調(diào)用set()表明有個(gè)整數(shù)需要被消費(fèi)掉南用。

class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, integers, event):
        """
        Constructor.

        @param integers list of integers
        @param event event synchronization object
        """
        threading.Thread.__init__(self)
        self.integers = integers
        self.event = event
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            self.event.wait()
            try:
                integer = self.integers.pop()
                print '%d popped from list by %s' % (integer, self.name)
            except IndexError:
                # catch pop on empty list
                time.sleep(1)

讓我們來看一下Python的內(nèi)部細(xì)節(jié)膀钠。首先來看一下Event的構(gòu)造函數(shù)。使用lock創(chuàng)建event的實(shí)例裹虫,以保護(hù)event flag值肿嘲,并在設(shè)置(set()) event時(shí)通知其他線程。

class _Event(_Verbose):
    def __init__(self, verbose=None):
        _Verbose.__init__(self, verbose)
        self.__cond = Condition(Lock())
        self.__flag = False

下面是set()方法筑公。它設(shè)置flag值為True并通知其他的線程雳窟。當(dāng)flag的值發(fā)生變化或者被改變時(shí),condition的對(duì)象常用于保護(hù)極其重要的部分匣屡。

def set(self):
    self.__cond.acquire()
    try:
        self.__flag = True
        self.__cond.notify_all()
    finally:
        self.__cond.release()

clear()方法設(shè)置flag的值為Fale封救。

def clear(self):
    self.__cond.acquire()
    try:
        self.__flag = False
    finally:
        self.__cond.release()

wait()方法一直阻塞直到set()方法被調(diào)用拇涤。如果event的flag已經(jīng)被設(shè)置了值,那么wait()方法什么也不做誉结。

def wait(self, timeout=None):
    self.__cond.acquire()
    try:
        if not self.__flag:
            self.__cond.wait(timeout)
    finally:
        self.__cond.release()

Queue

Queue是一種有效的機(jī)制鹅士,特別是當(dāng)我們需要在線程之間交換一些數(shù)據(jù)。有四個(gè)主要的方法分別如下:
1.put:放入一個(gè)item到隊(duì)列中惩坑。
2.get:從隊(duì)列中刪除一個(gè)item如绸,并返回刪除的item。
3.task_done:每次一個(gè)item被處理完旭贬,就需要調(diào)用該方法怔接。
4.join:一直阻塞直到所有的items都被處理完。

當(dāng)你熟悉了這幾個(gè)重要的方法稀轨,那么我們改寫成Queue的方法就很簡單扼脐。

class Producer(threading.Thread):
    """
    Produces random integers to a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        """
        Thread run method. Append random integers to the integers list at
        random time.
        """
        while True:
            integer = random.randint(0, 256)
            self.queue.put(integer) 
            print '%d put to queue by %s' % (integer, self.name)
            time.sleep(1)



class Consumer(threading.Thread):
    """
    Consumes random integers from a list
    """

    def __init__(self, queue):
        """
        Constructor.

        @param integers list of integers
        @param queue queue synchronization object
        """
        threading.Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        """
        Thread run method. Consumes integers from list
        """
        while True:
            integer = self.queue.get()
            print '%d popped from list by %s' % (integer, self.name)
            self.queue.task_done()

Queue對(duì)于我們來說是一個(gè)非常好的機(jī)制。它把lock的機(jī)制替我們實(shí)現(xiàn)了奋刽,我們不需要關(guān)心和重新實(shí)現(xiàn)瓦侮,只需要專注于我們自己的業(yè)務(wù)邏輯。這是一個(gè)很大的優(yōu)勢佣谐。我們來看一下Python內(nèi)部是如何實(shí)現(xiàn)的肚吏?
當(dāng)一個(gè)元素被增加或者被刪除時(shí),為了保護(hù)隊(duì)列Queue的構(gòu)造函數(shù)創(chuàng)建了一個(gè)lock對(duì)象狭魂。一些condition對(duì)象被創(chuàng)建去通知一些events罚攀,比如:queue is not empty(get() call stops blocking),queue is not full(put() call stops blocking) and all items have been processed(join() call stops blocking).

class Queue:
    def __init__(self, maxsize=0):
        ...
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

put()往隊(duì)列增加一個(gè)元素,如果隊(duì)列滿時(shí)將等待雌澄。put通知多線程要從隊(duì)列里面取出數(shù)據(jù)斋泄,如果該隊(duì)列不是空時(shí)將阻塞在get方法。我們來看一下代碼:

def put(self, item, block=True, timeout=None):
    ...
    self.not_full.acquire()
    try:
        if self.maxsize > 0:
            ...
            elif timeout is None:
                while self._qsize() == self.maxsize:
                    self.not_full.wait()
        self._put(item)
        self.unfinished_tasks += 1
        self.not_empty.notify()
    finally:
        self.not_full.release()

get()方法是從隊(duì)列中刪除一個(gè)元素并返回刪除的元素镐牺,但是如果該隊(duì)列為空時(shí)則等待炫掐。get取出元素后如果該隊(duì)列還不滿則通知所有線程。

def get(self, block=True, timeout=None):
    ...
    self.not_empty.acquire()
    try:
        ...
        elif timeout is None:
            while not self._qsize():
                self.not_empty.wait()
        item = self._get()
        self.not_full.notify()
        return item
    finally:
        self.not_empty.release()

當(dāng)take_done調(diào)用時(shí)睬涧,未完成的數(shù)量自減1.如果計(jì)數(shù)等于0募胃,然后線程等待隊(duì)列的join()方法繼續(xù)執(zhí)行。

def task_done(self):
    self.all_tasks_done.acquire()
    try:
        unfinished = self.unfinished_tasks - 1
        if unfinished <= 0:
            if unfinished < 0:
                raise ValueError('task_done() called too many times')
            self.all_tasks_done.notify_all()
        self.unfinished_tasks = unfinished
    finally:
        self.all_tasks_done.release()

def join(self):
    self.all_tasks_done.acquire()
    try:
        while self.unfinished_tasks:
            self.all_tasks_done.wait()
    finally:
        self.all_tasks_done.release()

總結(jié)

文章比較長畦浓,但是對(duì)Python的同步機(jī)制講解的淋漓盡致痹束。如果本文對(duì)你有幫助,請(qǐng)打賞一下宅粥。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末参袱,一起剝皮案震驚了整個(gè)濱河市电谣,隨后出現(xiàn)的幾起案子秽梅,更是在濱河造成了極大的恐慌抹蚀,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件企垦,死亡現(xiàn)場離奇詭異环壤,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)钞诡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門郑现,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人荧降,你說我怎么就攤上這事接箫。” “怎么了朵诫?”我有些...
    開封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵辛友,是天一觀的道長。 經(jīng)常有香客問我剪返,道長废累,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任脱盲,我火速辦了婚禮邑滨,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘钱反。我一直安慰自己掖看,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開白布面哥。 她就那樣靜靜地躺著乙各,像睡著了一般。 火紅的嫁衣襯著肌膚如雪幢竹。 梳的紋絲不亂的頭發(fā)上耳峦,一...
    開封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音焕毫,去河邊找鬼蹲坷。 笑死,一個(gè)胖子當(dāng)著我的面吹牛邑飒,可吹牛的內(nèi)容都是我干的循签。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼疙咸,長吁一口氣:“原來是場噩夢啊……” “哼县匠!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤乞旦,失蹤者是張志新(化名)和其女友劉穎贼穆,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體兰粉,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡故痊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了玖姑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片愕秫。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖焰络,靈堂內(nèi)的尸體忽然破棺而出戴甩,到底是詐尸還是另有隱情,我是刑警寧澤闪彼,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布等恐,位于F島的核電站,受9級(jí)特大地震影響备蚓,放射性物質(zhì)發(fā)生泄漏课蔬。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一郊尝、第九天 我趴在偏房一處隱蔽的房頂上張望二跋。 院中可真熱鬧,春花似錦流昏、人聲如沸扎即。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谚鄙。三九已至,卻和暖如春刁绒,著一層夾襖步出監(jiān)牢的瞬間闷营,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來泰國打工知市, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留傻盟,地道東北人麻敌。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓趋急,卻偏偏與公主長得像,于是被迫代替她去往敵國和親询兴。 傳聞我的和親對(duì)象是個(gè)殘疾皇子跟啤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • 線程狀態(tài)新建诽表,就緒唉锌,運(yùn)行,阻塞竿奏,死亡袄简。 線程同步多線程可以同時(shí)運(yùn)行多個(gè)任務(wù),線程需要共享數(shù)據(jù)的時(shí)候议双,可能出現(xiàn)數(shù)據(jù)不...
    KevinCool閱讀 793評(píng)論 0 0
  • 概述 多線程給我們帶來的好處是可以并發(fā)的執(zhí)行多個(gè)任務(wù),特別是對(duì)于I/O密集型的業(yè)務(wù)捉片,使用多線程平痰,可以帶來成倍的性能...
    SimonChen閱讀 9,431評(píng)論 0 5
  • 線程和進(jìn)程 計(jì)算機(jī),用于計(jì)算的機(jī)器伍纫。計(jì)算機(jī)的核心是CPU宗雇,在現(xiàn)在多核心的電腦很常見了。為了充分利用cpu核心做計(jì)算...
    人世間閱讀 24,326評(píng)論 3 85
  • 前言 拖了好久莹规,不過還是得堅(jiān)持赔蒲。喜歡本文的話可以加下公眾號(hào)【于你供讀】。 目錄 線程與進(jìn)程 線程與進(jìn)程是操作系統(tǒng)里...
    GitHubClub閱讀 825評(píng)論 0 4
  • 1 初心 數(shù)據(jù)采集在價(jià)值投資中是非常必要的。我們可能一開始并不清楚一個(gè)行業(yè)的盈利邏輯母市,看中的公司是否值得投資矾兜,但通...
    止一量化養(yǎng)家閱讀 682評(píng)論 1 14