最近在學(xué)習(xí)并發(fā)編程考婴,打算寫一個(gè)并發(fā)編程系列的文章。之前也看過(guò)很多Python多線程多進(jìn)程的教程催烘、博客等,但收益不大缎罢。因?yàn)榇蠖鄶?shù)文章上來(lái)就是寫幾個(gè)對(duì)照代碼伊群,不去解釋那些庫(kù)方法的作用,然后得出多線程多進(jìn)程確實(shí)快策精。這種教程相當(dāng)于讓我死記硬背舰始,這是我無(wú)法接受的。
經(jīng)過(guò)長(zhǎng)期摸索咽袜,我發(fā)現(xiàn)了一個(gè)神奇的網(wǎng)址丸卷,那就是The Python Standard Library.這在我剛學(xué)Python時(shí)就了解過(guò),但當(dāng)時(shí)看到整頁(yè)整頁(yè)的英文我就頭大询刹,現(xiàn)在慢慢適應(yīng)了谜嫉,發(fā)現(xiàn)它確實(shí)很好,對(duì)標(biāo)準(zhǔn)庫(kù)的介紹非常詳細(xì)凹联。如果可以接受直接去看它就行沐兰,我的文章只是在它之上的一個(gè)翻譯和總結(jié)。
基礎(chǔ)概念
學(xué)習(xí)并發(fā)編程首先要了解一些基本概念蔽挠,線程住闯,進(jìn)程,協(xié)程,IO等概念比原,不了解的可以學(xué)習(xí)一下操作系統(tǒng)插佛。
GIL
Python有個(gè)歷史遺留問(wèn)題,那就是全局解釋器鎖量窘,一個(gè)進(jìn)程同一個(gè)時(shí)刻只有一個(gè)線程在執(zhí)行雇寇,如果將多線程用于CPU計(jì)算密集行工作可能效果不如單線程,但是在I/O這種耗時(shí)操作方面還是有用的绑改,這方面可以查閱GIL相關(guān)資料谢床。
threading
threading是Python標(biāo)準(zhǔn)的多線程接口,是對(duì)底層的_thread模塊的封裝厘线, 使多線程用起來(lái)更加方便识腿。
這個(gè)模塊定義了如下幾個(gè)函數(shù)(列舉部分用到的):
threading.active_count(): 返回當(dāng)前存活的線程數(shù)量
threading.current_thread(): 返回當(dāng)前線程對(duì)象
threading.enumerate(): 返回一個(gè)列表,列表包含所有存活的線程對(duì)象
threading.main_thread(): 返回主線程對(duì)象
創(chuàng)建線程
有兩種方法可以新創(chuàng)建一個(gè)線程對(duì)象造壮,都基于Thread類渡讼。第一種是傳一個(gè)可調(diào)用的對(duì)象(一般是函數(shù))給Thread的構(gòu)造器,第二種是繼承Thread類耳璧,重寫它的run方法成箫。
一旦thread對(duì)象被創(chuàng)建,需要調(diào)用對(duì)象的start()方法讓他在一個(gè)單獨(dú)的線程中運(yùn)行旨枯,即運(yùn)行run方法蹬昌。
class threading.Thread
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)。
構(gòu)造Thread類時(shí)傳入的參數(shù)必須是關(guān)鍵字參數(shù)攀隔。
group可忽略皂贩,target為需要執(zhí)行的函數(shù)func,name為線程的名字(沒(méi)實(shí)際意義昆汹,可忽略)明刷,args為func需要的參數(shù)元祖(注意是元祖,元祖的單個(gè)形式為(arg,)
)满粗,kwargs為func需要的關(guān)鍵字參數(shù)辈末,daemon設(shè)置是否為守護(hù)進(jìn)程。
start(): 啟動(dòng)線程
run(): 線程運(yùn)行后執(zhí)行的函數(shù)
join(): This blocks the calling thread until the thread whose join() method is called is terminated.比如在A線程中調(diào)用B.join()映皆,A就會(huì)阻塞挤聘,直到B運(yùn)行結(jié)束,看個(gè)例子捅彻。
from threading import Thread
def hello(num):
print(num)
if __name__ == '__main__':
threads = []
for i in range(10):
t = Thread(target=hello, args=(i,))
threads.append(t)
for t in threads:
t.start()
t.join()
print('1111111')
# output: 節(jié)省篇幅檬洞,用,代替換行
# 0沟饥, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1111111
if __name__ == '__main__':
...
for t in threads:
t.start()
print('22222')
# output:
# 0, 1, 2, 3, 4, 6, 5, 22222, 8, 7, 9
注意Print('22222')的位置添怔。the calling thread在這里就是主線程湾戳,the thread whose join is called在這里是threads列表里的對(duì)象。所以這句話意思就是主線程會(huì)一直阻塞广料,直到所有threads里的thread執(zhí)行完畢砾脑。如果不調(diào)用join()方法,主線程就不會(huì)等待子線程的執(zhí)行艾杏。
同步機(jī)制
不同的線程可能需要對(duì)同一個(gè)資源進(jìn)行修改韧衣,如果不加控制,會(huì)出現(xiàn)意想不到的結(jié)果购桑,來(lái)看個(gè)神奇的例子畅铭。
count = 0
def add():
global count
temp = count + 1
time.sleep(0.0001)
count = temp
threads = []
for i in range(10):
t = Thread(target=add)
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
print('count={}'.format(count))
# output: 運(yùn)行三次的結(jié)果
# 3, 4, 4
正常來(lái)說(shuō)count的值應(yīng)該是10,因?yàn)槲艺{(diào)用了10個(gè)線程勃蜘。但結(jié)果卻是3或4或其他數(shù)硕噩,原因在于time.sleep()和多線程并發(fā),time的作用只是讓線程有機(jī)會(huì)交叉執(zhí)行缭贡÷茫考慮兩個(gè)線程并發(fā)執(zhí)行。
A線程 | B線程 |
---|---|
global count(count=0) | |
temp = count+1(temp=1) | |
time.sleep() | |
global count(count=0) | |
temp = count+1 (temp=1) | |
time.sleep() | |
count = temp(count=1) | |
count = temp(count=1) |
如果不加控制阳惹,并發(fā)操作會(huì)導(dǎo)致意料不到的后果谍失,所以需要采取一些手段來(lái)控制并發(fā)的執(zhí)行順序。
Lock 鎖
鎖有兩個(gè)狀態(tài)莹汤,上鎖了(locked)和沒(méi)上鎖(unlocked)快鱼。一個(gè)鎖對(duì)象創(chuàng)建時(shí)是沒(méi)上鎖的狀態(tài)。鎖有兩個(gè)基本函數(shù)纲岭,acquire()用來(lái)上鎖攒巍,release()用來(lái)釋放鎖。如果鎖處于上鎖狀態(tài)荒勇,不能調(diào)用acquire();屬于未上鎖狀態(tài),不能調(diào)用release()闻坚,否則會(huì)引起錯(cuò)誤沽翔。對(duì)上面一個(gè)例子進(jìn)行加鎖處理。
count = 0
def add(lock):
global count
lock.acquire()
temp = count + 1
time.sleep(0.0001)
lock.release()
count = temp
threads = []
lock = Lock()
for i in range(10):
t = Thread(target=add, args=(lock,))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
print('count={}'.format(count))
# output:
# 10
鎖如果不恰當(dāng)控制會(huì)出現(xiàn)死鎖的情況窿凤,關(guān)于死鎖的原因和解決辦法在操作系統(tǒng)中也有涉及仅偎,不了解的學(xué)習(xí)一下操作系統(tǒng),我這篇文章只介紹多線程模塊的基本用法雳殊。
RLock 可重入鎖
對(duì)于這個(gè)鎖我有點(diǎn)困惑橘沥,想不到它的實(shí)際應(yīng)用場(chǎng)景,可重入鎖建立在鎖的基礎(chǔ)上夯秃,獲得鎖A的線程可以對(duì)鎖A進(jìn)行再次加鎖而不會(huì)陷入死鎖座咆,但釋放操作也需要相同次數(shù)痢艺,但是其他線程無(wú)法在鎖住的情況下獲得鎖,同樣對(duì)上面例子進(jìn)行修改介陶。還沒(méi)找到一個(gè)很有說(shuō)服力的需要加多次鎖的例子堤舒。
# ...不變
def add(lock):
global count
lock.acquire()
temp = count + 1
lock.acquire()
time.sleep(0.0001)
lock.release()
count = temp
lock.release()
# ...不變
Condition 條件
條件也是建立在鎖的基礎(chǔ)上,在創(chuàng)建條件對(duì)象時(shí)可以傳入一個(gè)鎖或可重入鎖哺呜,不傳入?yún)?shù)則默認(rèn)生成一個(gè)可重入鎖舌缤。一些線程A等待條件而阻塞,一些線程B發(fā)出條件滿足的信號(hào)某残,則等待的線程A可以繼續(xù)運(yùn)行国撵。線程A首先acquire()加鎖,在wait()處釋放鎖并阻塞玻墅,當(dāng)其他線程發(fā)出條件滿足信號(hào)介牙,發(fā)出條件滿足信號(hào)的方法有兩個(gè),一個(gè)是notify()椭豫,喚醒一個(gè)等待線程耻瑟,另一個(gè)是notify_all()喚醒所有等待線程;A線程的wait()重新加鎖并返回。經(jīng)典的場(chǎng)景可能就是消費(fèi)者和生產(chǎn)者模式赏酥。
def consumer(con):
con.acquire()
print('{} is waiting'.format(threading.currentThread().name))
con.wait()
print('{} cousumes one time.'.format(threading.currentThread().name))
con.release()
def producer(con):
print('prodece and notify')
con.acquire()
con.notify_all()
con.release()
threads, condition = [], Condition()
for i in range(5):
t = Thread(target=consumer, args=(condition,))
threads.append(t)
for t in threads:
t.start()
produce = Thread(target=producer, args=(condition,))
produce.start()
produce.join()
for t in threads:
t.join()
print('end')
# output:
# Thread-1 is waiting
# Thread-2 is waiting
# Thread-3 is waiting
# Thread-4 is waiting
# Thread-5 is waiting
# prodece and notify
# Thread-1 cousumes one time.
# Thread-4 cousumes one time.
# Thread-5 cousumes one time.
# Thread-2 cousumes one time.
# Thread-3 cousumes one time.
# end
Semaphore 信號(hào)量
方法與Lock()一樣喳整,但是可以在創(chuàng)建信號(hào)量時(shí)可以傳入一個(gè)大于0的整數(shù),當(dāng)傳入的整數(shù)為1時(shí)與Lock作用相同裸扶。信號(hào)量一般用于限制并發(fā)的數(shù)量框都,如連接數(shù)據(jù)庫(kù)服務(wù)器時(shí)需要在連接數(shù)量控制在一定范圍內(nèi)。
def hello(sem):
sem.acquire()
time.sleep(1)
print('{} is running'.format(threading.currentThread().name))
sem.release()
threads, sem = [], Semaphore(5)
for i in range(100):
t = Thread(target=hello, args=(sem,))
threads.append(t)
t.start()
for t in threads:
t.join()
可以在自己的電腦上運(yùn)行一下呵晨,會(huì)看到print結(jié)果是5個(gè)5個(gè)的出現(xiàn)魏保,sleep時(shí)間長(zhǎng)點(diǎn)效果更明顯∶溃可以看到雖然開(kāi)了100個(gè)線程谓罗,但是同時(shí)在運(yùn)行只有5個(gè)。
Event 事件
一個(gè)線程標(biāo)識(shí)一個(gè)事件季二,其他線程等待它檩咱。一個(gè)事件對(duì)象管理一個(gè)內(nèi)部標(biāo)志,這個(gè)標(biāo)志可以用set()置為true,用clear()置為false胯舷。wait()方法會(huì)一直阻塞直到標(biāo)志為true刻蚯。這個(gè)看起來(lái)似乎跟條件有點(diǎn)像。
def consumer(event):
print('{} is waiting for an event.'.format(threading.currentThread().name))
event.wait()
print('{} finish.'.format(threading.currentThread().name))
def producer(event):
print('producer')
event.set()
threads, event = [], Event()
for i in range(3):
t = Thread(target=consumer, args=(event,))
threads.append(t)
t.start()
produce = Thread(target=producer, args=(event,))
produce.start()
for t in threads:
t.join()
produce.join()
print('end')
# output:
# Thread-1 is waiting for an event.
# Thread-2 is waiting for an event.
# Thread-3 is waiting for an event.
# producer
# Thread-2 finish.
# Thread-3 finish.
# Thread-1 finish.
# end
上下文管理器
上下文管理協(xié)議是Python的一個(gè)語(yǔ)法糖吧桑嘶,用來(lái)方便資源的申請(qǐng)與釋放炊汹。實(shí)現(xiàn)了上下文管理協(xié)議的類就可以用with語(yǔ)句包裹,簡(jiǎn)化代碼的書寫逃顶。具體實(shí)現(xiàn)查閱資料讨便。
上面介紹的那么多同步機(jī)制大多數(shù)都有一個(gè)申請(qǐng)鎖和釋放鎖的步驟充甚,可以用with語(yǔ)句簡(jiǎn)化這些操作,支持with語(yǔ)法的有l(wèi)ock, Rlock, conditions和semaphore.官網(wǎng)示例如下:
with some_lock:
# do something
等價(jià)于
some_lock.acquire()
try:
# do something
finally:
some_lock.release()
最后
如果還有疑問(wèn)器钟,標(biāo)準(zhǔn)庫(kù)是你最好的選擇津坑!剛開(kāi)始看也許看不下去,有這么幾個(gè)建議傲霸。
- 看簡(jiǎn)短一點(diǎn)的模塊
- 積累專業(yè)詞匯量
- 找自己熟悉的模塊入手疆瑰,這樣你就能大概的猜出它的意思