綜述
Python這門解釋性語言也有專門的線程模型假抄,Python虛擬機(jī)使用GIL(Global Interpreter Lock,全局解釋器鎖)來互斥線程對共享資源的訪問,但暫時無法利用多處理器的優(yōu)勢糟港。
在Python中我們主要是通過thread和 threading這兩個模塊來實(shí)現(xiàn)的,其中Python的threading模塊是對thread做了一些包裝的院仿,可以更加方便的被使用秸抚,所以我們使用 threading模塊實(shí)現(xiàn)多線程編程。這篇文章我們主要來看看Python對多線程編程的支持歹垫。
在語言層面剥汤,Python對多線程提供了很好的支持,可以方便地支持創(chuàng)建線程排惨、互斥鎖吭敢、信號量、同步等特性若贮。下面就是官網(wǎng)上介紹threading模塊的基本資料及功能:
實(shí)現(xiàn)模塊
thread:多線程的底層支持模塊省有,一般不建議使用;
threading:對thread進(jìn)行了封裝谴麦,將一些線程的操作對象化
threading模塊
Thread 線程類蠢沿,這是我們用的最多的一個類,你可以指定線程函數(shù)執(zhí)行或者繼承自它都可以實(shí)現(xiàn)子線程功能匾效;
Timer與Thread類似舷蟀,但要等待一段時間后才開始運(yùn)行;
Lock 鎖原語面哼,這個我們可以對全局變量互斥時使用野宜;
RLock 可重入鎖,使單線程可以再次獲得已經(jīng)獲得的鎖魔策;
Condition 條件變量匈子,能讓一個線程停下來,等待其他線程滿足某個“條件”闯袒;
Event 通用的條件變量虎敦。多個線程可以等待某個事件發(fā)生,在事件發(fā)生后政敢,所有的線程都被激活其徙;
Semaphore為等待鎖的線程提供一個類似“等候室”的結(jié)構(gòu);
BoundedSemaphore 與semaphore類似喷户,但不允許超過初始值唾那;
Queue:實(shí)現(xiàn)了多生產(chǎn)者(Producer)、多消費(fèi)者(Consumer)的隊列褪尝,支持鎖原語闹获,能夠在多個線程之間提供很好的同步支持。
其中Thread類
是你主要的線程類恼五,可以創(chuàng)建進(jìn)程實(shí)例昌罩。該類提供的函數(shù)包括:
getName(self) 返回線程的名字
isAlive(self) 布爾標(biāo)志,表示這個線程是否還在運(yùn)行中
isDaemon(self) 返回線程的daemon標(biāo)志
join(self, timeout=None) 程序掛起灾馒,直到線程結(jié)束茎用,如果給出timeout,則最多阻塞timeout秒
run(self) 定義線程的功能函數(shù)
setDaemon(self, daemonic) 把線程的daemon標(biāo)志設(shè)為daemonic
setName(self, name) 設(shè)置線程的名字
start(self) 開始線程執(zhí)行
其中Queue提供的類
Queue隊列
LifoQueue后入先出(LIFO)隊列
PriorityQueue 優(yōu)先隊列
接下來睬罗,我們將會用一個一個示例來展示threading的各個功能轨功,包括但不限于:兩種方式起線程、threading.Thread類的重要函數(shù)容达、使用Lock互斥及RLock實(shí)現(xiàn)重入鎖古涧、使用Condition實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模型、使用Event和Semaphore多線程通信
兩種方式起線程
在Python中我們主要是通過thread和threading這兩個模塊來實(shí)現(xiàn)的花盐,其中Python的threading模塊是對thread做了一些包裝的羡滑,可以更加方便的被使用菇爪,所以我們使用threading模塊實(shí)現(xiàn)多線程編程。一般來說柒昏,使用線程有兩種模式凳宙,一種是創(chuàng)建線程要執(zhí)行的函數(shù),把這個函數(shù)傳遞進(jìn)Thread對象里职祷,讓它來執(zhí)行氏涩;另一種是直接從Thread繼承,創(chuàng)建一個新的class有梆,把線程執(zhí)行的代碼放到這個新的 class里是尖。
將函數(shù)傳遞進(jìn)Thread對象:
import threading?
def thread_fun(num):?
? ? for n in range(0, int(num)):?
? ? ? ? print" I come from %s, num: %s" % ( threading.currentThread().getName(),n)?
def main(thread_num):?
? ? thread_list = list(); ? ?# 先創(chuàng)建線程對象 ?
? ? for i in range(0, thread_num): ? ? ? ??
? ? ? ? thread_name ="thread_%s" % i ??
? ? ? ? thread_list.append(threading.Thread(target = thread_fun, name = thread_name, args = (20,)))# 啟動所有線程?
? ? for thread in thread_list:? ? ? ??
? ? ? ? ?thread.start() ? # 主線程中等待所有子線程退出?
? ? for thread in thread_list: ? ? ? ??
? ? ? ? thread.join()
if__name__ =="__main__":? ?
? ? main(3)
程序啟動了3個線程,并且打印了每一個線程的線程名字泥耀,這個比較簡單吧饺汹,處理重復(fù)任務(wù)就派出用場了,下面介紹使用繼承threading的方式爆袍;
繼承自threading.Thread類:
import threading ?
class MyThread(threading.Thread):?
? ? def __init__(self):?
? ? ? ? threading.Thread.__init__(self);?
? ? def run(self):?
? ? ? ? print"I am %s"%self.name?
if __name__ =="__main__":?
? ? for thread in range(0,5): ? ? ? ??
? ? ? ? t = MyThread()? ? ? ??
? ? ? ? t.start()
接下來首繁,將會介紹如何控制這些線程,包括子線程的退出陨囊,子線程是否存活及將子線程設(shè)置為守護(hù)線程(Daemon)弦疮。
threading.Thread類的重要函數(shù)
介紹threading模塊中的主類Thread的一些主要方法,實(shí)例代碼如下:
import threading
?class MyThread(threading.Thread):?
? ? def__init__(self):?
? ? ? ? threading.Thread.__init__(self)?
? ? def run(self):?
? ? ? ? print"I am %s"% (self.name)
?if__name__ =="__main__":?
? ? ? ? ?for i in range(0,5): ? ? ? ??
? ? ? ? my_thread = MyThread() ? ? ? ??
? ? ? ?my_thread.start()
1蜘醋、name相關(guān)
你可以為每一個thread指定name胁塞,默認(rèn)的是Thread-No形式的,如上述實(shí)例代碼打印出的一樣:
I am Thread-1?
I am Thread-2
?I am Thread-3?
I am Thread-4?
I am Thread-5
當(dāng)然你可以指定每一個thread的name压语,這個通過setName方法啸罢,代碼:
def__init__(self):?
? ? threading.Thread.__init__(self) ? ??
? ? self.setName("new"+ self.name)
2、join方法
join方法原型如下胎食,這個方法是用來阻塞當(dāng)前上下文扰才,直至該線程運(yùn)行結(jié)束:
def join(self, timeout=None):
timeout可以設(shè)置超時時間
3、setDaemon方法
當(dāng)我們在程序運(yùn)行中厕怜,執(zhí)行一個主線程衩匣,如果主線程又創(chuàng)建一個子線程,主線程和子線程就分兵兩路粥航,當(dāng)主線程完成想退出時琅捏,會檢驗子線程是否完成。如果子線程未完成递雀,則主線程會等待子線程完成后再退出柄延。但是有時候我們需要的是,只要主線程完成了缀程,不管子線程是否完成搜吧,都要和主線程一起退出市俊,這時就可以用setDaemon方法,并設(shè)置其參數(shù)為True滤奈。
使用Lock互斥鎖
現(xiàn)在我們考慮這樣一個問題:假設(shè)各個線程需要訪問同一公共資源秕衙,我們的代碼該怎么寫?
import threading
?import time ?
counter=0?
class MyThread(threading.Thread):?
? ? def __init__(self):?
? ? ? ? threading.Thread.__init__(self)?
? ? def run(self):?
? ? ? ? global counter ? ? ? ??
? ? ? ? time.sleep(1); ? ? ? ??
? ? ? ? counter +=1?
? ? ? ? print"I am %s, set counter:%s" % (self.name, counter)?
if__name__ =="__main__":?
? ? for I in range(0,200): ? ? ? ??
? ? my_thread = MyThread() ? ? ? ??
? ? my_thread.start()
解決上面的問題僵刮,我們興許會寫出這樣的代碼,我們假設(shè)跑200個線程鹦牛,但是這200個線程都會去訪問counter這個公共資源搞糕,并對該資源進(jìn)行處理(counter += 1),代碼看起來就是這個樣了曼追,但是我們看下運(yùn)行結(jié)果:
I am Thread-69, set counter:64?
I am Thread-73, set counter:66I am Thread-74, set counter:67I am Thread-75, set counter:68I am Thread-76, set counter:69I am Thread-78, set counter:70I am Thread-77, set counter:71I am Thread-58, set counter:72I am Thread-60, set counter:73I am Thread-62, set counter:74I am Thread-66, set counter:75I am Thread-70, set counter:76I am Thread-72, set counter:77I am Thread-79, set counter:78I am Thread-71, set counter:78
打印結(jié)果我只貼了一部分窍仰,從中我們已經(jīng)看出了這個全局資源(counter)被搶占的情況,問題產(chǎn)生的原因就是沒有控制多個線程對同一資源的訪問礼殊,對數(shù)據(jù)造成破壞驹吮,使得線程運(yùn)行的結(jié)果不可預(yù)期。這種現(xiàn)象稱為“線程不安全”晶伦。在開發(fā)過程中我們必須要避免這種情況碟狞,那怎么避免?這就用到了我們在綜述中提到的互斥鎖了婚陪。
互斥鎖概念
Python編程中族沃,引入了對象互斥鎖的概念,來保證共享數(shù)據(jù)操作的完整性泌参。每個對象都對應(yīng)于一個可稱為” 互斥鎖” 的標(biāo)記脆淹,這個標(biāo)記用來保證在任一時刻,只能有一個線程訪問該對象沽一。在Python中我們使用threading模塊提供的Lock類盖溺。
我們對上面的程序進(jìn)行整改,為此我們需要添加一個互斥鎖變量mutex = threading.Lock()铣缠,然后在爭奪資源的時候之前我們會先搶占這把鎖mutex.acquire()烘嘱,對資源使用完成之后我們在釋放這把鎖mutex.release()。代碼如下:
import threading
?import time ?
counter =0?
mutex =threading.Lock() ?
class MyThread(threading.Thread):?
? ? def__init__(self):?
? ? ? ? threading.Thread.__init__(self)?
? ? def run(self):?
? ? ? ? global counter, mutex ? ? ? ??
? ? ? ? time.sleep(1);?
? ? ? ? if mutex.acquire(): ? ? ? ? ? ??
? ? ? ? ? ? ?counter +=1?
? ? ? ? ? ? print"I am %s, set counter:%s"% (self.name, counter) ? ? ? ? ? ??
? ? ? ? ? ? mutex.release()?
if __name__ =="__main__":?
? ? for I in range(0,100): ? ? ? ??
? ? ? ? my_thread = MyThread() ? ? ? ??
? ? ? ? my_thread.start()
同步阻塞
當(dāng)一個線程調(diào)用Lock對象的acquire()方法獲得鎖時攘残,這把鎖就進(jìn)入“l(fā)ocked”狀態(tài)拙友。因為每次只有一個線程1可以獲得鎖,所以如果此時另一個線程2試圖獲得這個鎖歼郭,該線程2就會變?yōu)椤癰lock“同步阻塞狀態(tài)遗契。直到擁有鎖的線程1調(diào)用鎖的release()方法釋放鎖之后,該鎖進(jìn)入“unlocked”狀態(tài)病曾。線程調(diào)度程序從處于同步阻塞狀態(tài)的線程中選擇一個來獲得鎖牍蜂,并使得該線程進(jìn)入運(yùn)行(running)狀態(tài)漾根。
進(jìn)一步考慮
通過對公共資源使用互斥鎖,這樣就簡單的到達(dá)了我們的目的鲫竞,但是如果我們又遇到下面的情況:
1辐怕、遇到鎖嵌套的情況該怎么辦袱巨,這個嵌套是指當(dāng)我一個線程在獲取臨界資源時祝闻,又需要再次獲取灯荧;
2僵井、如果有多個公共資源陕截,在線程間共享多個資源的時候,如果兩個線程分別占有一部分資源并且同時等待對方的資源批什;
上述這兩種情況會直接造成程序掛起农曲,即死鎖,下面我們會談死鎖及可重入鎖RLock驻债。
死鎖的形成
前一篇文章Python:使用threading模塊實(shí)現(xiàn)多線程編程四[使用Lock互斥鎖]我們已經(jīng)開始涉及到如何使用互斥鎖來保護(hù)我們的公共資源了乳规,現(xiàn)在考慮下面的情況–
如果有多個公共資源,在線程間共享多個資源的時候合呐,如果兩個線程分別占有一部分資源并且同時等待對方的資源暮的,這會引起什么問題?
死鎖概念
所謂死鎖: 是指兩個或兩個以上的進(jìn)程在執(zhí)行過程中淌实,因爭奪資源而造成的一種互相等待的現(xiàn)象青扔,若無外力作用,它們都將無法推進(jìn)下去翩伪。此時稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖微猖,這些永遠(yuǎn)在互相等待的進(jìn)程稱為死鎖進(jìn)程。 由于資源占用是互斥的缘屹,當(dāng)某個進(jìn)程提出申請資源后凛剥,使得有關(guān)進(jìn)程在無外力協(xié)助下,永遠(yuǎn)分配不到必需的資源而無法繼續(xù)運(yùn)行轻姿,這就產(chǎn)生了一種特殊現(xiàn)象死鎖犁珠。
import threading ?
counterA =0?
counterB =0?
mutexA = threading.Lock()?
mutexB = threading.Lock()?
class MyThread(threading.Thread):?
? ? def __init__(self):?
? ? ? ? threading.Thread.__init__(self)?
? ? def run(self):?
? ? ? ? self.fun1() ? ? ? ??
? ? ? ? self.fun2()?
? ? def fun1(self):?
? ? ? ? global mutexA, mutexB?
? ? ? ? if mutexA.acquire():?
? ? ? ? ? ? ?print"I am %s , get res: %s"%(self.name,"ResA")?
? ? ? ? ? ? if mutexB.acquire():?
? ? ? ? ? ? ? ? print"I am %s , get res: %s"%(self.name,"ResB") ? ? ? ? ? ? ? ??
? ? ? ? mutexB.release() ? ? ? ??
? ? ? ? mutexA.release()
? ? ?def fun2(self):?
? ? ? ? global mutexA, mutexB?
? ? ? ? if mutexB.acquire():?
? ? ? ? ? ? print"I am %s , get res: %s"%(self.name,"ResB")?
? ? ? ? ? ? if mutexA.acquire():?
? ? ? ? ? ? ? ? print"I am %s , get res: %s"%(self.name,"ResA") ? ? ? ? ? ? ? ??
? ? ? ? ?mutexA.release() ? ? ? ??
? ? ? ? ?mutexB.release()
?if__name__ =="__main__":?
? ? ?for I in range(0,100): ? ? ? ??
? ? my_thread = MyThread()? ? ? ??
? ? ?my_thread.start()
代碼中展示了一個線程的兩個功能函數(shù)分別在獲取了一個競爭資源之后再次獲取另外的競爭資源,我們看運(yùn)行結(jié)果:
I am Thread-1, get res: ResA?
I am Thread-1, get res: ResB?
I am Thread-2, get res: ResAI am Thread-1, get res: ResB
可以看到互亮,程序已經(jīng)掛起在那兒了犁享,這種現(xiàn)象我們就稱之為”死鎖“。
避免死鎖
避免死鎖主要方法就是:正確有序的分配資源豹休,避免死鎖算法中最有代表性的算法是Dijkstra E.W 于1968年提出的銀行家算法
可重入鎖RLock
考慮這種情況:如果一個線程遇到鎖嵌套的情況該怎么辦炊昆,這個嵌套是指當(dāng)我一個線程在獲取臨界資源時,又需要再次獲取。
根據(jù)這種情況凤巨,代碼如下:
import threading ?
import time ?
counter =0?
mutex = threading.Lock()?
class MyThread(threading.Thread):?
? ? def __init__(self):?
? ? ? ? threading.Thread.__init__(self)?
? ? def run(self):?
? ? ? ? global counter, mutex? ? ? ??
? ? ? ? ?time.sleep(1);?
? ? ? ? if mutex.acquire(): ? ? ? ? ? ??
? ? ? ? ? ? counter +=1?
? ? ? ? ? ? print"I am %s, set counter:%s"% (self.name, counter)
? ? ? ? ?if mutex.acquire(): ? ? ? ? ? ? ? ??
? ? ? ? ? ? counter +=1?
? ? ? ? ? ? print"I am %s, set counter:%s"% (self.name, counter) ? ? ? ? ? ? ? ??
? ? ? ? ? ? mutex.release()? ? ? ? ? ??
? ? ? ? ?mutex.release()
?if __name__ =="__main__":?
? ? for I in range(0,200): ? ? ? ??
? ? ? ? my_thread = MyThread()? ? ? ??
? ? ? ? my_thread.start()
這種情況的代碼運(yùn)行情況如下:
I am Thread-1, set counter:1
之后就直接掛起了视乐,這種情況形成了最簡單的死鎖。
那有沒有一種情況可以在某一個線程使用互斥鎖訪問某一個競爭資源時敢茁,可以再次獲取呢佑淀?在Python中為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock彰檬。這個RLock內(nèi)部維護(hù)著一個Lock和一個counter變量伸刃,counter記錄了acquire的次數(shù),從而使得資源可以被多次require逢倍。直到一個線程所有的acquire都被release奕枝,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock瓶堕,則不會發(fā)生死鎖:
代碼只需將上述的:
mutex = threading.Lock()
替換成:
mutex = threading.RLock()
使用Condition實(shí)現(xiàn)復(fù)雜同步
目前我們已經(jīng)會使用Lock去對公共資源進(jìn)行互斥訪問了,也探討了同一線程可以使用RLock去重入鎖症歇,但是盡管如此我們只不過才處理了一些程序中簡單的同步現(xiàn)象郎笆,我們甚至還不能很合理的去解決使用Lock鎖帶來的死鎖問題。所以我們得學(xué)會使用更深層的解決同步問題忘晤。
Python提供的Condition對象提供了對復(fù)雜線程同步問題的支持宛蚓。Condition被稱為條件變量,除了提供與Lock類似的acquire和release方法外设塔,還提供了wait和notify方法凄吏。
使用Condition的主要方式為:線程首先acquire一個條件變量,然后判斷一些條件闰蛔。如果條件不滿足則wait痕钢;如果條件滿足,進(jìn)行一些處理改變條件后序六,通過notify方法通知其他線程任连,其他處于wait狀態(tài)的線程接到通知后會重新判斷條件。不斷的重復(fù)這一過程例诀,從而解決復(fù)雜的同步問題随抠。
下面我們通過很著名的“生產(chǎn)者-消費(fèi)者”模型來來演示下,在Python中使用Condition實(shí)現(xiàn)復(fù)雜同步繁涂。
import threading ?
import time ?
condition = threading.Condition()?
products =0 ?
class Producer(threading.Thread):?
? ? def__init__(self):?
????????threading.Thread.__init__(self)?
? ? def run(self):?
????????global condition, products ?
????????while True:?
????????????if condition.acquire():?
????????????????if products <10:? ? ? ? ? ? ? ? ? ??
?????????????????????products +=1;
?????????????????????print"Producer(%s):deliver one, now products:%s"%(self.name, products) ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
????????????????????condition.notify()?
????????????????else:?
????????????????????print"Producer(%s):already 10, stop deliver, now products:%s"%(self.name, products)? ? ? ? ? ? ? ? ? ? ????????
????????????????????condition.wait(); ? ? ? ? ? ? ? ??
????????????condition.release()??
? ? ? ? ? ? time.sleep(2)
class Consumer(threading.Thread):?
????def __init__(self):?
????????threading.Thread.__init__(self)?
????def run(self):?
????????global condition, products?
????while True:?
????????if condition.acquire():?
????????????if products >1: ? ? ? ? ? ? ? ? ? ??
????????????????products -=1?
????????????????print"Consumer(%s):consume one, now products:%s"%(self.name, products) ? ? ? ? ? ? ? ? ? ? ????????????
????????????????condition.notify()?
????????????else:?
? ? ? ? ? ? ? ? print"Consumer(%s):only 1, stop consume, products:%s"%(self.name, products)? ? ? ? ? ? ? ? ? ? ????????
????????????????condition.wait(); ? ? ? ? ? ? ? ??
????????????condition.release()? ? ? ? ? ? ? ??
? ? ? ? ? ? time.sleep(2)
if __name__ =="__main__":?
????for p in range(0,2): ? ? ? ??
????p = Producer() ? ? ? ??
????p.start()forcinrange(0,10): ? ? ? ??
????c = Consumer() ? ? ? ??
????c.start()
代碼中主要實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者線程拱她,雙方將會圍繞products來產(chǎn)生同步問題,首先是2個生成者生產(chǎn)products 扔罪,而接下來的10個消費(fèi)者將會消耗products秉沼,代碼運(yùn)行如下:
Producer(Thread-1):deliver one, now products:1?
Producer(Thread-2):deliver one, now products:2
?Consumer(Thread-3):consume one, now products:1?
Consumer(Thread-4):only1, stop consume, products:1?
Consumer(Thread-5):only1, stop consume, products:1?
Consumer(Thread-6):only1, stop consume, products:1?
Consumer(Thread-7):only1, stop consume, products:1
?Consumer(Thread-8):only1, stop consume, products:1?
Consumer(Thread-10):only1, stop consume, products:1?
Consumer(Thread-9):only1, stop consume, products:1?
Consumer(Thread-12):only1, stop consume, products:1?
Consumer(Thread-11):only1, stop consume, products:1
另外:Condition對象的構(gòu)造函數(shù)可以接受一個Lock/RLock對象作為參數(shù),如果沒有指定,則Condition對象會在內(nèi)部自行創(chuàng)建一個RLock氧猬;除了notify方法外背犯,Condition對象還提供了notifyAll方法,可以通知waiting池中的所有線程嘗試acquire內(nèi)部鎖盅抚。由于上述機(jī)制漠魏,處于waiting狀態(tài)的線程只能通過notify方法喚醒,所以notifyAll的作用在于防止有線程永遠(yuǎn)處于沉默狀態(tài)妄均。
使用Event實(shí)現(xiàn)線程間通信
使用threading.Event可以實(shí)現(xiàn)線程間相互通信柱锹,之前的Python:使用threading模塊實(shí)現(xiàn)多線程編程七[使用Condition實(shí)現(xiàn)復(fù)雜同步]我們已經(jīng)初步實(shí)現(xiàn)了線程間通信的基本功能,但是更為通用的一種做法是使用threading.Event對象丰包。
使用threading.Event可以使一個線程等待其他線程的通知禁熏,我們把這個Event傳遞到線程對象中,Event默認(rèn)內(nèi)置了一個標(biāo)志邑彪,初始值為False瞧毙。一旦該線程通過wait()方法進(jìn)入等待狀態(tài),直到另一個線程調(diào)用該Event的set()方法將內(nèi)置標(biāo)志設(shè)置為True時寄症,該Event會通知所有等待狀態(tài)的線程恢復(fù)運(yùn)行宙彪。
import threading
?import time?
class MyThread(threading.Thread):?
????def __init__(self, signal):?
????????threading.Thread.__init__(self) ? ? ? ??
????????self.singal = signal?
? ? def run(self):?
????????print"I am %s,I will sleep ..."%self.name ? ? ? ??
????????self.singal.wait()?
????????print"I am %s, I awake..."%self.name?
if __name__ =="__main__": ? ??
????singal = threading.Event()?
????for t in range(0,3): ? ? ? ??
????????thread = MyThread(singal) ? ? ? ??
????????thread.start()?
????????print"main thread sleep 3 seconds... "?
????????time.sleep(3) ? ??
????????singal.set()
運(yùn)行效果如下:
I am Thread-1,I will sleep ...
I am Thread-2,I will sleep ...
I am Thread-3,I will sleep ...
main thread sleep 3 seconds...
I am Thread-1, I awake...I am Thread-2, I awake...
I am Thread-3, I awake...