線程概念的引入背景
有了進(jìn)程為什么還要線程
進(jìn)程有很多的有點(diǎn)猴蹂,它提供了多到編程埠褪,讓我們感覺我們每個人都擁有自己的CPU和其他資源岖妄,可以提高計(jì)算機(jī)的利用率。很多人就不理解了抬探,既然進(jìn)程這么優(yōu)秀子巾,為什么還要現(xiàn)線程?其實(shí)小压,仔細(xì)觀察就會發(fā)現(xiàn)進(jìn)程還是有很多缺陷的,主要體現(xiàn)在兩點(diǎn)上:
①椰于、進(jìn)程只能在一個時間內(nèi)干一件事怠益,如果想同時干兩件事或者多件事,進(jìn)程就無能為力了瘾婿。
②蜻牢、進(jìn)程在執(zhí)行的過程中如果阻塞烤咧,例如等待輸入,整個進(jìn)程就睡掛起抢呆,即使進(jìn)程中有些工作不依賴于輸入的數(shù)據(jù)煮嫌,也將無法執(zhí)行。
線程的出現(xiàn)
60年代抱虐,在os中能擁有資源獨(dú)立運(yùn)行的基本單位是進(jìn)程昌阿,然而隨著計(jì)算機(jī)技術(shù)的發(fā)展,進(jìn)程出現(xiàn)了很多的弊端恳邀,一是由于進(jìn)程是資源的擁有這懦冰,創(chuàng)建、撤銷谣沸、切換存在很大的時空開銷刷钢,因此余姚引入輕型進(jìn)程;二是由于對稱多處理機(jī)(SMP)的出現(xiàn)乳附,可以滿足多個運(yùn)行單位内地,而整個進(jìn)程并行開銷過大。
因此在80年代赋除,出現(xiàn)了能獨(dú)立運(yùn)行的基本單位——線程(Threads)瓤鼻。
(進(jìn)程是最小的資源分配單位;線程是被CPU執(zhí)行的最小單位)
進(jìn)程和線程的關(guān)系
線程與進(jìn)程的區(qū)別:
1贤重、地址空間和其他資源:進(jìn)程間相互獨(dú)立茬祷,同一進(jìn)程的各線程間共享。某線程內(nèi)的想愛你城咋其他進(jìn)程不可見并蝗。
2祭犯、通信:進(jìn)程間通信IPC,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段來進(jìn)行通信——需要進(jìn)程同步和互斥手段的輔助滚停,以保證數(shù)據(jù)的一致性沃粗。
3、調(diào)度和切換:線程上下文切換比進(jìn)程上下文切換要快得多键畴。
4最盅、在多線程操作系統(tǒng)中,進(jìn)程不是一個可執(zhí)行的實(shí)體起惕。
線程的特點(diǎn)
在多線程的操作系統(tǒng)中涡贱,通常是在一個進(jìn)程中包括多個線程,每個線程都是作為利用CPU的基本單位惹想,是花費(fèi)最小開銷的實(shí)體问词。線程具有以下屬性。
1)嘀粱、輕型實(shí)體
線程中的實(shí)體基本上不擁有系統(tǒng)資源激挪,只是有一點(diǎn)必不可少的辰狡、能保證獨(dú)立運(yùn)行的資源。
線程的實(shí)體包括程序垄分、數(shù)據(jù)和TCB宛篇。線程是動態(tài)概念,它的動態(tài)特性由線程控制塊TCB(Thread Control Block)描述
TCB包括以下信息:
(1)線程狀態(tài)
(2)當(dāng)線程不運(yùn)行時薄湿,被保存的現(xiàn)場資源叫倍。
(3)一組執(zhí)行堆棧。
(4)存放每個線程的局部變量主存區(qū)嘿般。
(5)訪問同一個進(jìn)程中的主存和其他資源段标。
用于指示被執(zhí)行指令序列的程序計(jì)數(shù)器、保留局部變量炉奴、少數(shù)狀態(tài)參數(shù)和返回地址等的一組寄存器和堆棧逼庞。
2)獨(dú)立調(diào)度和分派的基本單位。
在多線程OS中瞻赶,線程是能獨(dú)立運(yùn)行的基本單位赛糟,因而也是獨(dú)立調(diào)度和分派的基本單位。由于線程很輕砸逊,故線程的切換非常迅速且開銷小璧南。
3)共享進(jìn)程資源。
線程在同一進(jìn)程中的各個線程师逸,都可以共享該進(jìn)程所擁有過的每一個內(nèi)存資源司倚;此外,還可以訪問進(jìn)程所擁有的一打開文件篓像、定時器动知、信號量機(jī)構(gòu)等。由于同一個進(jìn)程內(nèi)的線程共享內(nèi)存文件员辩,所以線程之間互相通信不必調(diào)用內(nèi)核盒粮。
4)可并發(fā)執(zhí)行
在一個進(jìn)程中的多個線程之間,可以兵法執(zhí)行奠滑,甚至允許在一個進(jìn)程中所有的線程都能兵法執(zhí)行丹皱;同樣,不同進(jìn)程中的線程也能并發(fā)執(zhí)行宋税,充分利用和發(fā)揮了處理機(jī)和外部設(shè)備并行工作的能力摊崭。
使用線程的實(shí)際場景
開啟一個字處理軟件進(jìn)程,該進(jìn)程肯定需要辦不止一件事情弃甥,比如監(jiān)聽鍵盤輸入爽室,處理文字,定時自動將文字保存到硬盤淆攻,這三個任務(wù)操作的都是同一塊數(shù)據(jù)阔墩,因而不能用多進(jìn)程。只能在一個進(jìn)程里并發(fā)地開啟三個線程,如果是單線程瓶珊,那就只能是啸箫,鍵盤輸入時,不能處理文字和自動保存伞芹,自動保存時又不能輸入和處理文字忘苛。
內(nèi)存中的線程
多個線程共享同一個進(jìn)程的地址空間中的資源,是對一臺計(jì)算機(jī)上多個進(jìn)程的模擬唱较,有時也稱線程為輕量級的進(jìn)程扎唾。
而對一臺計(jì)算機(jī)上多個進(jìn)程,則共享物理內(nèi)存南缓、磁盤胸遇、打印機(jī)等其他物理資源。多線程的運(yùn)行也多進(jìn)程的運(yùn)行類似汉形,是cpu在多個線程之間的快速切換纸镊。
不同的進(jìn)程之間是充滿敵意的,彼此是搶占概疆、競爭cpu的關(guān)系逗威,如果迅雷會和QQ搶資源。而同一個進(jìn)程是由一個程序員的程序創(chuàng)建岔冀,所以同一進(jìn)程內(nèi)的線程是合作關(guān)系凯旭,一個線程可以訪問另外一個線程的內(nèi)存地址,大家都是共享的使套,一個線程干死了另外一個線程的內(nèi)存罐呼,那純屬程序員腦子有問題。
類似于進(jìn)程童漩,每個線程也有自己的堆棧弄贿,不同于進(jìn)程,線程庫無法利用時鐘中斷強(qiáng)制線程讓出CPU矫膨,可以調(diào)用thread_yield運(yùn)行線程自動放棄cpu差凹,讓另外一個線程運(yùn)行。
線程通常是有益的侧馅,但是帶來了不小程序設(shè)計(jì)難度危尿,線程的問題是:
1. 父進(jìn)程有多個線程,那么開啟的子線程是否需要同樣多的線程
2. 在同一個進(jìn)程中馁痴,如果一個線程關(guān)閉了文件谊娇,而另外一個線程正準(zhǔn)備往該文件內(nèi)寫內(nèi)容呢?
因此罗晕,在多線程的代碼中济欢,需要更多的心思來設(shè)計(jì)程序的邏輯赠堵、保護(hù)程序的數(shù)據(jù)。
用戶級線程和內(nèi)核級線程
線程的實(shí)現(xiàn)可以分為兩類:用戶級線程(User-Level Thread)和內(nèi)核線線程(Kernel-Level Thread)法褥,后者又稱為內(nèi)核支持的線程或輕量級進(jìn)程茫叭。在多線程操作系統(tǒng)中,各個系統(tǒng)的實(shí)現(xiàn)方式并不相同半等,在有的系統(tǒng)中實(shí)現(xiàn)了用戶級線程揍愁,有的系統(tǒng)中實(shí)現(xiàn)了內(nèi)核級線程。
用戶級線程
內(nèi)核的切換由用戶態(tài)程序自己控制內(nèi)核切換,不需要內(nèi)核干涉杀饵,少了進(jìn)出內(nèi)核態(tài)的消耗莽囤,但不能很好的利用多核Cpu。
在用戶空間模擬操作系統(tǒng)對進(jìn)程的調(diào)度切距,來調(diào)用一個進(jìn)程中的線程朽缎,每個進(jìn)程中都會有一個運(yùn)行時系統(tǒng),用來調(diào)度線程蔚舀。此時當(dāng)該進(jìn)程獲取cpu時饵沧,進(jìn)程內(nèi)再調(diào)度出一個線程去執(zhí)行,同一時刻只有一個線程執(zhí)行赌躺。
內(nèi)核級線程
內(nèi)核級線程:切換由內(nèi)核控制狼牺,當(dāng)線程進(jìn)行切換的時候,由用戶態(tài)轉(zhuǎn)化為內(nèi)核態(tài)礼患。切換完畢要從內(nèi)核態(tài)返回用戶態(tài)是钥;可以很好的利用smp,即利用多核cpu缅叠。windows線程就是這樣的悄泥。
Python中的線程
全局解釋器鎖GIL
Python代碼的執(zhí)行由Python虛擬機(jī)(解釋器主循環(huán))
來控制住。Python在設(shè)計(jì)之初就考慮到要在主循環(huán)中肤粱,同時只有一個線程咋執(zhí)行弹囚。雖然Python解釋器中可以“運(yùn)行”多個線程,但在任意時刻只有一個線程在解釋器中運(yùn)行领曼。
對Python虛擬機(jī)的訪問有全局解釋器鎖GIL來控制鸥鹉,正式這個鎖能保證同一時刻只有一個線程在解釋器中運(yùn)行。
在多線程環(huán)境中庶骄,Python虛擬機(jī)按一下方式執(zhí)行:
a毁渗、設(shè)置 GIL;
b单刁、切換到一個線程去運(yùn)行灸异;
c、運(yùn)行指定數(shù)量的字節(jié)碼指令或者線程主動讓出控制(可以調(diào)用 time.sleep(0));
d肺樟、把線程設(shè)置為睡眠狀態(tài)檐春;
e、解鎖 GIL儡嘶;
d喇聊、再次重復(fù)以上所有步驟恍风。
在調(diào)用外部代碼(如 C/C++擴(kuò)展函數(shù))的時候蹦狂,GIL將會被鎖定,直到這個函數(shù)結(jié)束為止(由于在這期間沒有Python的字節(jié)碼被運(yùn)行朋贬,所以不會做線程切換)編寫擴(kuò)展的程序員可以主動解鎖GIL凯楔。
python線程模塊的選擇
Python提供了幾個用于多線程編程的模塊,包括thread锦募、threading和Queue等摆屯。thread和threading模塊允許程序員創(chuàng)建和管理線程。thread模塊提供了基本的線程和鎖的支持糠亩,threading提供了更高級別虐骑、功能更強(qiáng)的線程管理的功能。Queue模塊允許用戶創(chuàng)建一個可以用于多個線程之間共享數(shù)據(jù)的隊(duì)列數(shù)據(jù)結(jié)構(gòu)赎线。
避免使用thread模塊廷没,因?yàn)楦呒墑e的threading模塊更為先進(jìn),對線程的支持更為完善垂寥,而且使用thread模塊里的屬性有可能會與threading出現(xiàn)沖突颠黎;其次低級別的thread模塊的同步原語很少(實(shí)際上只有一個),而threading模塊則有很多滞项;再者狭归,thread模塊中當(dāng)主線程結(jié)束時,所有的線程都會被強(qiáng)制結(jié)束掉文判,沒有警告也不會有正常的清除工作过椎,至少threading模塊能確保重要的子線程退出后進(jìn)程才退出。
thread模塊不支持守護(hù)線程戏仓,當(dāng)主線程退出時疚宇,所有的子線程不論它們是否還在工作,都會被強(qiáng)行退出柜去。而threading模塊支持守護(hù)線程灰嫉,守護(hù)線程一般是一個等待客戶請求的服務(wù)器,如果沒有客戶提出請求它就在那等著嗓奢,如果設(shè)定一個線程為守護(hù)線程讼撒,就表示這個線程是不重要的,在進(jìn)程退出的時候,不用等待這個線程退出根盒。
threading模塊
multiprocess模塊的完全模仿了threading模塊的接口钳幅,二者在使用層面敢艰,有很大的相似性册赛,因而不再詳細(xì)介紹
線程的創(chuàng)建Threading.Thread類
線程的創(chuàng)建
創(chuàng)建方式1:
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' % name)
if __name__ == '__main__':
t = Thread(target=sayhi,args=('Jay',))
t.start()
print('主線程')
創(chuàng)建方式2:
from threading import Thread
import time
class Sayhi(Thread):
def __init__(self,name):
aquer().__init__()
self.name = name
def run(self):
time.sleep(2)
print('%s say hello' % self.name)
if __name__ == '__main__':
t = Sayhi('Jay')
t.start()
print('主線程')
多線程與多進(jìn)程
pid的比較:
from threading import Thread
from multiprocessing import Process
import os
def work():
print('hello',os.getpid())
if __name__ == '__main__':
#part1:在主進(jìn)程下開啟多個線程,每個線程都跟主進(jìn)程的pid一樣
t1=Thread(target=work)
t2=Thread(target=work)
t1.start()
t2.start()
print('主線程/主進(jìn)程pid',os.getpid())
#part2:開多個進(jìn)程,每個進(jìn)程都有不同的pid
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('主線程/主進(jìn)程pid',os.getpid())
開啟效率的比較:
from threading import Thread
from multiprocessing import Process
import os
def work():
print('hello')
if __name__ == '__main__':
#在主進(jìn)程下開啟線程
t=Thread(target=work)
t.start()
print('主線程/主進(jìn)程')
'''
打印結(jié)果:
hello
主線程/主進(jìn)程
'''
#在主進(jìn)程下開啟子進(jìn)程
t=Process(target=work)
t.start()
print('主線程/主進(jìn)程')
'''
打印結(jié)果:
主線程/主進(jìn)程
hello
'''
內(nèi)存數(shù)據(jù)的共享問題:
from threading import Thread
from multiprocessing import Process
import os
def work():
global n
n=0
if __name__ == '__main__':
# n=100
# p=Process(target=work)
# p.start()
# p.join()
# print('主',n) #毫無疑問子進(jìn)程p已經(jīng)將自己的全局的n改成了0,但改的僅僅是它自己的,查看父進(jìn)程的n仍然為100
n=1
t=Thread(target=work)
t.start()
t.join()
print('主',n) #查看結(jié)果為0,因?yàn)橥贿M(jìn)程內(nèi)的線程之間共享進(jìn)程內(nèi)的數(shù)據(jù)
#同一進(jìn)程內(nèi)的線程共享該進(jìn)程的數(shù)據(jù)森瘪?
多線程socket
Server端:
import multiprocessing
import threading
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
def action(conn):
while True:
data=conn.recv(1024)
print(data)
conn.send(data.upper())
if __name__ == '__main__':
while True:
conn,addr=s.accept()
p=threading.Thread(target=action,args=(conn,))
p.start()
Client端:
import socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
s.send(msg.encode('utf-8'))
data=s.recv(1024)
print(data)
Thread類的其他方法
Thread實(shí)例對象的方法
# isAlive(): 返回線程是否活動的扼睬。
# getName(): 返回線程名。
# setName(): 設(shè)置線程名窗宇。
threading模塊提供的一些方法:
# threading.currentThread(): 返回當(dāng)前的線程變量。
# threading.enumerate(): 返回一個包含正在運(yùn)行的線程的list军俊。正在運(yùn)行指線程啟動后侥加、結(jié)束前,不包括啟動前和終止后的線程蝇完。
# threading.activeCount(): 返回正在運(yùn)行的線程數(shù)量官硝,與len(threading.enumerate())有相同的結(jié)果。
代碼示例:
from threading import Thread
import threading
from multiprocessing import Process
import os
def work():
import time
time.sleep(3)
print(threading.current_thread().getName())
if __name__ == '__main__':
#在主進(jìn)程下開啟線程
t=Thread(target=work)
t.start()
print(threading.current_thread().getName())
print(threading.current_thread()) #主線程
print(threading.enumerate()) #連同主線程在內(nèi)有兩個運(yùn)行的線程
print(threading.active_count())
print('主線程/主進(jìn)程')
'''
打印結(jié)果:
MainThread
<_MainThread(MainThread, started 140735268892672)>
[<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
主線程/主進(jìn)程
Thread-1
'''
join方法:
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.start()
t.join()
print('主線程')
print(t.is_alive())
'''
egon say hello
主線程
False
'''
守護(hù)線程
無論是進(jìn)程還是線程短蜕,都遵循:守護(hù)xx會等待主xx運(yùn)行完畢后被銷毀氢架。需要強(qiáng)調(diào)的是:運(yùn)行完畢并非終止運(yùn)行
1.對主進(jìn)程來說,運(yùn)行完畢指的是主進(jìn)程代碼運(yùn)行完畢
2.對主線程來說朋魔,運(yùn)行完畢指的是主線程所在的進(jìn)程內(nèi)所有非守護(hù)線程統(tǒng)統(tǒng)運(yùn)行完畢岖研,主線程才算運(yùn)行完畢
守護(hù)線程例子1:
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' %name)
if __name__ == '__main__':
t=Thread(target=sayhi,args=('egon',))
t.setDaemon(True) #必須在t.start()之前設(shè)置
t.start()
print('主線程')
print(t.is_alive())
'''
主線程
True
'''
守護(hù)線程例子2:
from threading import Thread
import time
def foo():
print(123)
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end456")
t1=Thread(target=foo)
t2=Thread(target=bar)
t1.daemon=True
t1.start()
t2.start()
print("main-------")
鎖
同步鎖
多個線程搶占資源的情況:
from threading import Thread
import os,time
def work():
global n
temp=n
time.sleep(0.1)
n=temp-1
if __name__ == '__main__':
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #結(jié)果可能為99
import threading
R=threading.Lock()
R.acquire()
'''
對公共數(shù)據(jù)的操作
'''
R.release()
同步鎖的引用:
from threading import Thread,Lock
import os,time
def work():
global n
lock.acquire()
temp=n
time.sleep(0.1)
n=temp-1
lock.release()
if __name__ == '__main__':
lock=Lock()
n=100
l=[]
for i in range(100):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) #結(jié)果肯定為0,由原來的并發(fā)執(zhí)行變成串行警检,犧牲了執(zhí)行效率保證了數(shù)據(jù)安全
互斥鎖與join的區(qū)別:
#不加鎖:并發(fā)執(zhí)行,速度快,數(shù)據(jù)不安全
from threading import current_thread,Thread,Lock
import os,time
def task():
global n
print('%s is running' %current_thread().getName())
temp=n
time.sleep(0.5)
n=temp-1
if __name__ == '__main__':
n=100
lock=Lock()
threads=[]
start_time=time.time()
for i in range(100):
t=Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time=time.time()
print('主:%s n:%s' %(stop_time-start_time,n))
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
'''
#不加鎖:未加鎖部分并發(fā)執(zhí)行,加鎖部分串行執(zhí)行,速度慢,數(shù)據(jù)安全
from threading import current_thread,Thread,Lock
import os,time
def task():
#未加鎖的代碼并發(fā)運(yùn)行
time.sleep(3)
print('%s start to run' %current_thread().getName())
global n
#加鎖的代碼串行運(yùn)行
lock.acquire()
temp=n
time.sleep(0.5)
n=temp-1
lock.release()
if __name__ == '__main__':
n=100
lock=Lock()
threads=[]
start_time=time.time()
for i in range(100):
t=Thread(target=task)
threads.append(t)
t.start()
for t in threads:
t.join()
stop_time=time.time()
print('主:%s n:%s' %(stop_time-start_time,n))
'''
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
'''
#有的同學(xué)可能有疑問:既然加鎖會讓運(yùn)行變成串行,那么我在start之后立即使用join,就不用加鎖了啊,也是串行的效果啊
#沒錯:在start之后立刻使用jion,肯定會將100個任務(wù)的執(zhí)行變成串行,毫無疑問,最終n的結(jié)果也肯定是0,是安全的,但問題是
#start后立即join:任務(wù)內(nèi)的所有代碼都是串行執(zhí)行的,而加鎖,只是加鎖的部分即修改共享數(shù)據(jù)的部分是串行的
#單從保證數(shù)據(jù)安全方面,二者都可以實(shí)現(xiàn),但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock
import os,time
def task():
time.sleep(3)
print('%s start to run' %current_thread().getName())
global n
temp=n
time.sleep(0.5)
n=temp-1
if __name__ == '__main__':
n=100
lock=Lock()
start_time=time.time()
for i in range(100):
t=Thread(target=task)
t.start()
t.join()
stop_time=time.time()
print('主:%s n:%s' %(stop_time-start_time,n))
'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗時是多么的恐怖
'''
死鎖與遞歸鎖
死鎖:所謂死鎖就是指兩個或兩個以上的進(jìn)程或者線程在執(zhí)行過程中孙援,因搶奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用扇雕,他們都將無法推進(jìn)下去拓售。此事稱系統(tǒng)處于死鎖狀態(tài)或系統(tǒng)產(chǎn)生了死鎖,這些永遠(yuǎn)在互相等待的進(jìn)程被稱為死鎖進(jìn)程镶奉。以下就是死鎖的例子:
from threading import Lock as Lock
import time
mutexA=Lock()
mutexA.acquire()
mutexA.acquire()
print(123)
mutexA.release()
mutexA.release()
解決方法础淤,遞歸鎖崭放,在Python中為了支持在同一線程中多次請求統(tǒng)一資源,python提供了可重入鎖RLock鸽凶。
這個RLock內(nèi)部維護(hù)著一個Lock和一個counter變量币砂,counter記錄了acquire的次數(shù),從而使得資源可以被多次require玻侥。直到一個線程所有的acquire都被release,其他的線程才能獲得資源凑兰。
例子:
import time
from threading import Thread,RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print('%s 搶到了面條'%name)
fork_lock.acquire()
print('%s 搶到了叉子'%name)
print('%s 吃面'%name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 搶到了叉子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 搶到了面條' % name)
print('%s 吃面' % name)
noodle_lock.release()
fork_lock.release()
for name in ['哪吒','egon','yuan']:
t1 = Thread(target=eat1,args=(name,))
t2 = Thread(target=eat2,args=(name,))
t1.start()
t2.start()
信號量
Semaphore管理一個內(nèi)置的計(jì)數(shù)器拘鞋,
每當(dāng)調(diào)用acquire()時內(nèi)置計(jì)數(shù)器-1;
調(diào)用release() 時內(nèi)置計(jì)數(shù)器+1祟剔;
計(jì)數(shù)器不能小于0物延;當(dāng)計(jì)數(shù)器為0時,acquire()將阻塞線程直到其他線程調(diào)用release()笙纤。
實(shí)例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數(shù)為5):
from threading import Thread,Semaphore
import threading
import time
# def func():
# if sm.acquire():
# print (threading.currentThread().getName() + ' get semaphore')
# time.sleep(2)
# sm.release()
def func():
sm.acquire()
print('%s get sm' %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()
池與信號量:
與進(jìn)程池是完全不同的概念抖拴,進(jìn)程池Pool(4)阿宅,最大只能產(chǎn)生4個進(jìn)程洒放,而且從頭到尾都只是這四個進(jìn)程榨为,不會產(chǎn)生新的随闺,而信號量是產(chǎn)生一堆線程/進(jìn)程
事件
線程的一個關(guān)鍵特性是每個線程都是獨(dú)立運(yùn)行且狀態(tài)不可預(yù)測矩乐。如果程序中的其 他線程需要通過判斷某個線程的狀態(tài)來確定自己下一步的操作,這時線程同步問題就會變得非常棘手散罕。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設(shè)置的信號標(biāo)志,它允許線程等待某些事件的發(fā)生葬燎。在 初始情況下,Event對象中的信號標(biāo)志被設(shè)置為假窑邦。如果有線程等待一個Event對象, 而這個Event對象的標(biāo)志為假,那么這個線程將會被一直阻塞直至該標(biāo)志為真冈钦。一個線程如果將一個Event對象的信號標(biāo)志設(shè)置為真,它將喚醒所有等待這個Event對象的線程瞧筛。如果一個線程等待一個已經(jīng)被設(shè)置為真的Event對象,那么它將忽略這個事件, 繼續(xù)執(zhí)行
event.isSet():返回event的狀態(tài)值较幌;
event.wait():如果 event.isSet()==False將阻塞線程;
event.set(): 設(shè)置event的狀態(tài)值為True恩急,所有阻塞池的線程激活進(jìn)入就緒狀態(tài)衷恭, 等待操作系統(tǒng)調(diào)度随珠;
event.clear():恢復(fù)event的狀態(tài)值為False。
例如,有多個工作線程嘗試鏈接MySQL显沈,我們想要在鏈接前確保MySQL服務(wù)正常才讓那些工作線程去連接MySQL服務(wù)器拉讯,如果連接不成功魔慷,都會去嘗試重新連接院尔。那么我們就可以采用threading.Event機(jī)制來協(xié)調(diào)各個工作線程的連接操作召边。
實(shí)例:
import threading
import time,random
from threading import Thread,Event
def conn_mysql():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError('鏈接超時')
print('<%s>第%s次嘗試鏈接' % (threading.current_thread().getName(), count))
event.wait(0.5)
count+=1
print('<%s>鏈接成功' %threading.current_thread().getName())
def check_mysql():
print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(2,4))
event.set()
if __name__ == '__main__':
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start()
條件
使得線程等待,只有滿足某條件時贞盯,才釋放n個線程
Python提供的Condition對象提供了對復(fù)雜線程同步問題的支持躏敢。Condition被稱為條件變量件余,除了提供與Lock類似的acquire和release方法外啼器,還提供了wait和notify方法端壳。線程首先acquire一個條件變量损谦,然后判斷一些條件照捡。如果條件不滿足則wait闯参;如果條件滿足术羔,進(jìn)行一些處理改變條件后释移,通過notify方法通知其他線程玩讳,其他處于wait狀態(tài)的線程接到通知后會重新判斷條件熏纯。不斷的重復(fù)這一過程樟澜,從而解決復(fù)雜的同步問題秩贰。
實(shí)例:
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread: %s" % n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()
print('****')
定時器
定時器,指定n秒后執(zhí)行某個操作
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # after 1 seconds, "hello, world" will be printed
線程隊(duì)列
queue隊(duì)列:使用import queue愈魏,用法與進(jìn)程的Queue一樣
class queue.Queue(maxsize=0)#先進(jìn)先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(先進(jìn)先出):
first
second
third
'''
class queue.LifoQueue(maxsize=0) #后進(jìn)先出
import queue
q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(后進(jìn)先出):
third
second
first
'''
class queue.PriorityQueue(maxsize=0) #存儲數(shù)據(jù)時可設(shè)置優(yōu)先級的隊(duì)列
import queue
q=queue.PriorityQueue()
#put進(jìn)入一個元組,元組的第一個元素是優(yōu)先級(通常是數(shù)字,也可以是非數(shù)字之間的比較),數(shù)字越小優(yōu)先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
print(q.get())
print(q.get())
print(q.get())
'''
結(jié)果(數(shù)字越小優(yōu)先級越高,優(yōu)先級高的優(yōu)先出隊(duì)):
(10, 'b')
(20, 'a')
(30, 'c')
'''
Python標(biāo)準(zhǔn)模塊——concurrent.futures
1 介紹
concurrent.futures模塊提供了高度封裝的異步調(diào)用接口
ThreadPoolExecutor:線程池溪厘,提供異步調(diào)用
ProcessPoolExecutor: 進(jìn)程池桩匪,提供異步調(diào)用
Both implement the same interface, which is defined by the abstract Executor class.2 基本方法
submit(fn, *args, **kwargs)
異步提交任務(wù)map(func, *iterables, timeout=None, chunksize=1)
取代for循環(huán)submit的操作shutdown(wait=True)
相當(dāng)于進(jìn)程池的pool.close()+pool.join()操作
wait=True闺骚,等待池內(nèi)所有任務(wù)執(zhí)行完畢回收完資源后才繼續(xù)
wait=False僻爽,立即返回胸梆,并不會等待池內(nèi)的任務(wù)執(zhí)行完畢
但不管wait參數(shù)為何值碰镜,整個程序都會等到所有任務(wù)執(zhí)行完畢
submit和map必須在shutdown之前result(timeout=None)
取得結(jié)果add_done_callback(fn)
ProcessPoolExecutor
#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())
ThreadPoolExecutor
#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
#用法
與ProcessPoolExecutor相同
map的用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(11):
# future=executor.submit(task,i)
executor.map(task,range(1,12)) #map取代了for+submit
回調(diào)函數(shù)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print('<進(jìn)程%s> get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(res):
res=res.result()
print('<進(jìn)程%s> parse %s' %(os.getpid(),res['url']))
parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
# p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join()
p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj柠横,需要用obj.result()拿到結(jié)果