一入宦、使用threading模塊實現(xiàn)線程的創(chuàng)建
實例1
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.start()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Wed Oct 14 18:32:19 2020 is running
The current threading Thread-1---Wed Oct 14 18:32:19 2020 is running
The current threading MainThread---Wed Oct 14 18:32:19 2020 is running
The current threading Thread-1---Wed Oct 14 18:32:24 2020 is running
Process finished with exit code 0
import threading
首先導(dǎo)入threading 模塊枯饿,這是使用多線程的前提宠哄。
t = threading.Thread(target=target)
創(chuàng)建線程t,使用threading.Thread()方法蕴侧。
t.start()
開始線程活動灰署。
使用threading.current_thread()可以查看到當(dāng)前線程的信息泉坐。
從輸出結(jié)果可以看到在線程Thread-1結(jié)束前MainThread已經(jīng)結(jié)束了再芋,但并沒有殺死子線程Thread-1。
實例2
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t.join()
join()的作用是坚冀,在子線程完成運行之前济赎,這個子線程的父線程將一直被阻塞。Python中记某,默認(rèn)情況下司训,如果不加join()語句,那么主線程不會等到當(dāng)前線程結(jié)束才結(jié)束液南,但卻不會立即殺死該線程壳猜。如上面的輸出結(jié)果所示。
輸出結(jié)果:
The current threading MainThread---Wed Oct 14 18:40:42 2020 is running
The current threading Thread-1---Wed Oct 14 18:40:42 2020 is running
The current threading Thread-1---Wed Oct 14 18:40:47 2020 is running
The current threading MainThread---Wed Oct 14 18:40:47 2020 is running
Process finished with exit code 0
實例3
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.setDaemon(True)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:07:04 2020 is running
The current threading Thread-1---Thu Oct 15 13:07:04 2020 is running
The current threading Thread-1---Thu Oct 15 13:07:09 2020 is running
The current threading MainThread---Thu Oct 15 13:07:09 2020 is running
Process finished with exit code 0
t.setDaemon(True)
t.setDaemon(True)將線程聲明為守護線程滑凉,必須在start() 方法調(diào)用之前設(shè)置统扳,如果不設(shè)置為守護線程程序會被無限掛起。如果當(dāng)前python線程是守護線程畅姊,那么意味著這個線程是“不重要”的咒钟,“不重要”意味著如果他的主進程結(jié)束了但該守護線程沒有運行完,守護進程就會被強制結(jié)束若未。如果線程是非守護線程朱嘴,那么父進程只有等到守護線程運行完畢后才能結(jié)束。
import threading
from time import ctime, sleep
def target():
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
sleep(5)
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
if __name__ == "__main__":
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t = threading.Thread(target=target)
t.setDaemon(True)
t.start()
# t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:08:38 2020 is running
The current threading Thread-1---Thu Oct 15 13:08:38 2020 is running
The current threading MainThread---Thu Oct 15 13:08:38 2020 is running
Process finished with exit code 0
如果為線程實例添加t.setDaemon(True)之后粗合,如果不加join語句萍嬉,那么當(dāng)主線程結(jié)束之后,會殺死子線程隙疚。
二壤追、使用threading模塊實現(xiàn)多線程的創(chuàng)建
1、函數(shù)的方式創(chuàng)建
import threading
from time import ctime, sleep
def code():
print("I'm coding. {}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
def draw():
print("I'm drawing. {}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
if __name__ == "__main__":
threads = []
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t1 = threading.Thread(target=code)
threads.append(t1)
t2 = threading.Thread(target=draw)
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:35:06 2020 is running
I'm coding. Thu Oct 15 13:35:06 2020---Thread-1
I'm drawing. Thu Oct 15 13:35:06 2020---Thread-2
The current threading MainThread---Thu Oct 15 13:35:11 2020 is running
Process finished with exit code 0
給線程傳遞參數(shù)
import threading
from time import ctime, sleep
def code(arg):
print("I'm coding.{}---{}---{}".format(arg, ctime(), threading.current_thread().name))
sleep(5)
def draw(arg):
print("I'm drawing.{}----{}---{}".format(arg, ctime(), threading.current_thread().name))
sleep(5)
if __name__ == "__main__":
threads = []
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
t1 = threading.Thread(target=code, args=('敲代碼',))
threads.append(t1)
t2 = threading.Thread(target=draw, args=('畫畫',))
threads.append(t2)
for t in threads:
t.setDaemon(True)
t.start()
t.join()
print("The current threading {}---{} is running".format(threading.current_thread().name, ctime()))
輸出結(jié)果:
The current threading MainThread---Thu Oct 15 13:39:49 2020 is running
I'm coding.敲代碼---Thu Oct 15 13:39:49 2020---Thread-1
I'm drawing.畫畫----Thu Oct 15 13:39:49 2020---Thread-2
The current threading MainThread---Thu Oct 15 13:39:54 2020 is running
Process finished with exit code 0
2.類的方式創(chuàng)建線程
繼承自threading.Thread類
為了讓線程代碼更好的封裝供屉,可以使用threading模塊的下的Thread類行冰,繼承自這個類捅厂,然后實現(xiàn)run方法,線程就會自動運行run方法中的代碼资柔。
import threading
from time import ctime, sleep
class CodingThread(threading.Thread):
def run(self):
print("I'm coding.{}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
class DrawingThread(threading.Thread):
def run(self):
print("I'm drawing.{}---{}".format(ctime(), threading.current_thread().name))
sleep(5)
def multi_thread():
t1 = CodingThread()
t2 = DrawingThread()
print(threading.enumerate())
t1.start()
print(threading.enumerate())
t2.start()
print(threading.enumerate())
if __name__ == "__main__":
multi_thread()
輸出結(jié)果
[<_MainThread(MainThread, started 4403457344)>]
I'm coding.Thu Oct 15 13:45:06 2020---Thread-1
[<_MainThread(MainThread, started 4403457344)>, <CodingThread(Thread-1, started 123145444630528)>]
I'm drawing.Thu Oct 15 13:45:06 2020---Thread-2
[<_MainThread(MainThread, started 4403457344)>, <CodingThread(Thread-1, started 123145444630528)>, <DrawingThread(Thread-2, started 123145461420032)>]
Process finished with exit code 0
三焙贷、多線程共享全局變量以及鎖機制
1、多線程共享變量的問題
對于多線程來說贿堰,最大的特點就是線程之間可以共享數(shù)據(jù)辙芍,線程的執(zhí)行又是無序的,那么共享數(shù)據(jù)就會出現(xiàn)多線程同時更改一個變量羹与,使用同樣的資源故硅,而出現(xiàn)死鎖、數(shù)據(jù)錯亂等情況纵搁。
import threading
value = 0
class AddValueThread(threading.Thread):
def run(self):
global value
for x in range(1000000):
value += 1
print("{}的值是{}".format(threading.current_thread().name, value))
def multi_thread():
for i in range(2):
t = AddValueThread()
t.start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
Thread-1的值是1214452
Thread-2的值是1393110
Process finished with exit code 0
這個結(jié)果是錯誤的吃衅,正確的結(jié)果應(yīng)該是:
Thread-1的值是1000000
Thread-2的值是2000000
由于兩條線程同時對value操作,所以這里就出現(xiàn)數(shù)據(jù)錯誤了
2腾誉、線程鎖和ThreadLocal
(1)線程鎖
為了解決以上使用共享變量的問題徘层。threading提供了一個Lock類,這個類可以在某個線程訪問某個變量的時候加鎖利职,其他線程就進不來趣效,直到當(dāng)前進程處理完成后,釋放了鎖猪贪,其他線程才能進來進行處理跷敬。當(dāng)訪問某個資源之前,用Lock.acquire()鎖住資源,訪問之后热押,用Lock.release()釋放資源西傀。
import threading
value = 0
gLock = threading.Lock()
class AddValueThread(threading.Thread):
def run(self):
global value
gLock.acquire()
for x in range(1000000):
value += 1
gLock.release()
print("{}的值是{}".format(threading.current_thread().name, value))
def multi_thread():
for i in range(2):
t = AddValueThread()
t.start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
Thread-1的值是1000000
Thread-2的值是2000000
Process finished with exit code 0
(2)、ThreadLocal
介紹完線程鎖桶癣,接下來出場的是ThreadLocal拥褂。當(dāng)不想將變量共享給其他線程時,可以使用局部變量鬼廓,但在函數(shù)中定義局部變量會使得在函數(shù)之間傳遞特別麻煩肿仑。ThreadLocal是非常牛逼的東西致盟,它解決了全局變量需要枷鎖碎税,局部變量傳遞麻煩的兩個問題。通過在線程中定義:
local_school = threading.local()
此時這個local_school就變成了一個全局變量馏锡,但這個全局變量只在該線程中為全局變量雷蹂,對于其他線程來說是局部變量,別的線程不可更改杯道。
def process_thread(name): # 綁定ThreadLocal的student:
local_school.student = name
這個student屬性只有本線程可以修改匪煌,別的線程不可以责蝠。代碼:
import threading
value = 0
gLocal = threading.local()
class AddValueThread(threading.Thread):
def run(self):
gLocal.value = 0
for x in range(1000000):
gLocal.value += 1
print("{}的值是{}".format(threading.current_thread().name, gLocal.value))
def multi_thread():
for i in range(2):
t = AddValueThread()
t.start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
Thread-1的值是1000000
Thread-2的值是1000000
Process finished with exit code 0
四、生產(chǎn)者和消費者模式
(1)萎庭、Lock版
生產(chǎn)者線程專門用來生產(chǎn)一些數(shù)據(jù)霜医,然后存放到中間變量中,消費者再從中間的變量中取出數(shù)據(jù)進行消費驳规。中間變量經(jīng)常是一些全局變量肴敛,所以需要使用鎖來保證數(shù)據(jù)完整性。
import threading
import random
import time
gMoney = 1000
gTimes = 0
gLock = threading.Lock()
class Producer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gLock.acquire()
if gTimes >= 3:
gLock.release()
break
gMoney += money
print("{}當(dāng)前存入{}元錢吗购,剩余{}元錢".format(threading.current_thread(), money, gMoney))
gTimes += 1
time.sleep(0.5)
gLock.release()
class Consumer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 500)
gLock.acquire()
if gMoney > money:
gMoney -= money
print("{}當(dāng)前取出{}元錢医男,剩余{}元錢".format(threading.current_thread(), money, gMoney))
time.sleep(0.5)
else:
if gTimes >= 3:
gLock.release()
break
print("{}當(dāng)前想取出{}元錢,剩余{}元錢捻勉,不足镀梭!".format(threading.current_thread(), money, gMoney))
gLock.release()
def multi_thread():
for i in range(2):
Consumer(name="消費者線程{}".format(i)).start()
for j in range(2):
Producer(name="生產(chǎn)者線程{}".format(j)).start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出128元錢,剩余872元錢
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前取出420元錢踱启,剩余452元錢
<Producer(生產(chǎn)者線程0, started 123145358331904)>當(dāng)前存入997元錢报账,剩余1449元錢
<Producer(生產(chǎn)者線程1, started 123145375121408)>當(dāng)前存入700元錢,剩余2149元錢
<Producer(生產(chǎn)者線程1, started 123145375121408)>當(dāng)前存入984元錢埠偿,剩余3133元錢
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前取出221元錢笙什,剩余2912元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出313元錢,剩余2599元錢
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前取出189元錢胚想,剩余2410元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出356元錢琐凭,剩余2054元錢
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前取出109元錢,剩余1945元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出418元錢浊服,剩余1527元錢
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前取出381元錢统屈,剩余1146元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出416元錢,剩余730元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出166元錢牙躺,剩余564元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前取出111元錢愁憔,剩余453元錢
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前取出384元錢,剩余69元錢
<Consumer(消費者線程0, started 123145324752896)>當(dāng)前想取出415元錢孽拷,剩余69元錢吨掌,不足!
<Consumer(消費者線程1, started 123145341542400)>當(dāng)前想取出100元錢脓恕,剩余69元錢膜宋,不足!
Process finished with exit code 0
(2)炼幔、Condition版
LOCK版本的生產(chǎn)者和消費者存在一個不足秋茫,在消費者中總是通過while True死循環(huán)并且上鎖的方式判斷資源夠不夠。上鎖是一個很耗費cpu資源的行為乃秀。因此這種方式不是最好的肛著。還有一種更好的方式是使用threading.Condition來實現(xiàn)圆兵。threading.Condition消費者可以在沒有數(shù)據(jù)的時候處于阻塞等待狀態(tài)。生產(chǎn)者一旦有合適的數(shù)據(jù)枢贿,還可以使用notify相關(guān)的函數(shù)來通知處于等待阻塞狀態(tài)的線程殉农。這樣就可以避免一些無用的上鎖、解鎖的操作局荚。
threading.Condition類似threading.Lock,可以在修改全局?jǐn)?shù)據(jù)的時候進行上鎖统抬,也可以在修改完畢后進行解鎖。
acquire:上鎖
release:解鎖
wait:將當(dāng)前線程處于等待狀態(tài)危队,并且會釋放鎖聪建。可以被其他線程使用notify和notify_all函數(shù)喚醒茫陆。被喚醒后繼續(xù)等待上鎖金麸,上鎖后繼續(xù)執(zhí)行下面的代碼。
notify:通知某個正在等待的線程簿盅,默認(rèn)是第1個等待的線程挥下。
notify_all:通知所有正在等待的線程。
注意: notify和notify_all不會釋放鎖桨醋。并且需要在release之前調(diào)用棚瘟。
import threading
import random
import time
gMoney = 1000
gCondition = threading.Condition() # 鎖
gTimes = 0
class Producer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gCondition.acquire()
if gTimes >= 3:
gCondition.release()
break
gMoney += money
print("{}當(dāng)前存入{}元錢,剩余{}元錢".format(threading.current_thread(), money, gMoney))
gTimes += 1
gCondition.notify_all()
gCondition.release()
time.sleep(0.5)
class Consumer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 500)
gCondition.acquire()
while gMoney < money:
print("{}準(zhǔn)備消費{}元錢喜最,還剩{}元錢偎蘸,余額不足!".format(threading.current_thread(), money, gMoney))
if gTimes >= 3:
gCondition.release()
return
gCondition.wait()
gMoney -= money
print("{}消費了{}元錢瞬内,剩余{}元錢".format(threading.current_thread(), money, gMoney))
gCondition.release()
time.sleep(0.5)
def multi_thread():
for i in range(2):
Consumer(name="消費者線程{}".format(i)).start()
for j in range(2):
Producer(name="生產(chǎn)者線程{}".format(j)).start()
if __name__ == "__main__":
multi_thread()
輸出結(jié)果:
<Consumer(消費者線程0, started 123145357996032)>消費了273元錢迷雪,剩余727元錢
<Consumer(消費者線程1, started 123145374785536)>消費了470元錢,剩余257元錢
<Producer(生產(chǎn)者線程0, started 123145391575040)>當(dāng)前存入181元錢虫蝶,剩余438元錢
<Producer(生產(chǎn)者線程1, started 123145408364544)>當(dāng)前存入464元錢章咧,剩余902元錢
<Consumer(消費者線程0, started 123145357996032)>消費了455元錢,剩余447元錢
<Producer(生產(chǎn)者線程0, started 123145391575040)>當(dāng)前存入677元錢能真,剩余1124元錢
<Consumer(消費者線程1, started 123145374785536)>消費了400元錢赁严,剩余724元錢
<Consumer(消費者線程0, started 123145357996032)>消費了485元錢,剩余239元錢
<Consumer(消費者線程1, started 123145374785536)>消費了159元錢粉铐,剩余80元錢
<Consumer(消費者線程0, started 123145357996032)>準(zhǔn)備消費325元錢疼约,還剩80元錢,余額不足秦躯!
<Consumer(消費者線程1, started 123145374785536)>準(zhǔn)備消費229元錢忆谓,還剩80元錢,余額不足踱承!
Process finished with exit code 0
五倡缠、線程池
1.什么是線程池
引言:諸如web服務(wù)器、數(shù)據(jù)庫服務(wù)器茎活、文件服務(wù)器和郵件服務(wù)器等許多服務(wù)器應(yīng)用都面向處理來自某些遠程來源的大量短小的任務(wù)昙沦。構(gòu)建服務(wù)器應(yīng)用程序的一個過于簡單的模型是:每當(dāng)一個請求到達就創(chuàng)建一個新的服務(wù)對象,然后在新的服務(wù)對象中為請求服務(wù)载荔。但當(dāng)有大量請求并發(fā)訪問時盾饮,服務(wù)器不斷的創(chuàng)建和銷毀對象的開銷很大。
所以提高服務(wù)器效率的一個手段就是盡可能減少創(chuàng)建和銷毀對象的次數(shù)懒熙,特別是一些很耗資源的對象創(chuàng)建和銷毀丘损,這樣就引入了“池”的概念,
“池”的概念使得人們可以定制一定量的資源工扎,然后對這些資源進行反復(fù)的使用用徘钥,而不是頻繁的創(chuàng)建和銷毀這些資源。
定義:線程池是預(yù)先創(chuàng)建線程的一種技術(shù)肢娘。這些線程都是處于睡眠狀態(tài)呈础,即均為啟動,不消耗CPU橱健,而只是占用較小的內(nèi)存空間而钞。當(dāng)請求到來之后,緩沖池給這次請求分配一個空閑線程拘荡,把請求傳入此線程中運行臼节,進行處理。當(dāng)預(yù)先創(chuàng)建的線程都處于運行狀態(tài)珊皿,即預(yù)制線程不夠官疲,線程池可以自由創(chuàng)建一定數(shù)量的新線程,用于處理更多的請求亮隙。當(dāng)系統(tǒng)比較閑的時候途凫,也可以通過移除一部分一直處于停用狀態(tài)的線程。
2. Python的concurrent.futures 線程池進程池模塊
python3.2加入了concurrent.futures模塊溢吻,實現(xiàn)了線程池和進程池维费。這個主要有兩種類型:執(zhí)行器(executor)和任務(wù)容器(Future)。執(zhí)行器(executor)用來管理工作線程和進程池促王,任務(wù)容器(Feature)直譯是未來對象犀盟,換句話說,就是將我們的任務(wù)(函數(shù))進行一層包裹蝇狼,封裝為未來對象阅畴。簡單理解就是可以把Future看成是任務(wù)的一個容器,除了能夠銷毀任務(wù)迅耘,里面還包含了任務(wù)的執(zhí)行狀態(tài)贱枣。
2.1創(chuàng)建一個Future對象
我們先手動創(chuàng)建一個Future對象监署,分析一下:
from concurrent.futures import Future
# 創(chuàng)建一個Future對象
future = Future()
# 定義callback函數(shù)
def callback(future):
print(f"回調(diào)函數(shù)執(zhí)行,結(jié)果是:{future.result()}")
def test_future():
# 在Future對象中有一個add_done_callback方法纽哥,可以將future綁定一個回調(diào)函數(shù)钠乏,在調(diào)用add_done_callback的時候只需要傳入函數(shù)名,future會自動傳遞給callback的第一個參數(shù)春塌。
print('添加回調(diào)函數(shù)')
future.add_done_callback(callback)
# 當(dāng)future執(zhí)行set_result的時候晓避,執(zhí)行回調(diào)
print("觸發(fā)回調(diào)函數(shù)")
future.set_result("哈哈,想不到吧只壳,我就是結(jié)果")
if __name__ == '__main__':
test_future()
值得注意的是:可以多次set_result俏拱,但是后面的會覆蓋前面的,并且result()獲取可以獲取多次吼句。
2.2通過提交任務(wù)創(chuàng)建一個Future對象
2.2.1使用submit提交任務(wù)
submit锅必。提交任務(wù),并返回 Future 對象代表可調(diào)用對象的執(zhí)行命辖。
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.currentThread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù)况毅,表示最多創(chuàng)建多少個線程
# 如果不指定,那么每一個任務(wù)都會為其創(chuàng)建一個線程
executor = ThreadPoolExecutor(max_workers=3)
# 通過submit就直接將任務(wù)提交到線程池里面了尔艇,一旦提交尔许,就會立刻運行
# 提交之后,相當(dāng)于開啟了一個新的線程终娃,主線程會繼續(xù)往下走
future = executor.submit(threadTask, 1)
print(f'線程狀態(tài):{future}味廊,運行結(jié)果:{future.result()}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{future},運行結(jié)果:{future.result()}')
if __name__ == "__main__":
localThreadPool()
運行結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_0,進程id----20
線程狀態(tài):<Future at 0x10b346850 state=running>棠耕,運行結(jié)果:任務(wù)1:線程id----ThreadPoolExecutor-0_0,進程id----20
線程狀態(tài):<Future at 0x10b346850 state=finished returned str>余佛,運行結(jié)果:任務(wù)1:線程id----ThreadPoolExecutor-0_0,進程id----20
Process finished with exit code 0
我們可以同時提交多個任務(wù)
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個線程
# 如果不指定窍荧,那么每一個任務(wù)都會為其創(chuàng)建一個線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了氏淑,一旦提交啄寡,就會立刻運行
# 提交之后,相當(dāng)于開啟了一個新的線程,主線程會繼續(xù)往下走
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
if __name__ == "__main__":
localThreadPool()
運行結(jié)果:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進程id----20
# 因為我們的max_workers=3,所以同時先啟了三條線程
線程狀態(tài):[<Future at 0x10347fd00 state=running>, <Future at 0x1034a4be0 state=running>, <Future at 0x1034a4f70 state=running>, <Future at 0x1034ac370 state=pending>, <Future at 0x1034ac490 state=pending>]
# 我們可以看到前三條線程已經(jīng)啟動(running)莹汤,后面兩條是待定(pending)
任務(wù)3:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進程id----20
#從線程池中取出兩條線程執(zhí)行任務(wù)3和任務(wù)4
線程狀態(tài):[<Future at 0x10347fd00 state=finished returned str>, <Future at 0x1034a4be0 state=finished returned str>, <Future at 0x1034a4f70 state=finished returned str>, <Future at 0x1034ac370 state=running>, <Future at 0x1034ac490 state=running>]
#我們可以看到當(dāng)前三條任務(wù)已經(jīng)完成(finished)帝牡,后面兩條啟動(running)
線程狀態(tài):[<Future at 0x10347fd00 state=finished returned str>, <Future at 0x1034a4be0 state=finished returned str>, <Future at 0x1034a4f70 state=finished returned str>, <Future at 0x1034ac370 state=finished returned str>, <Future at 0x1034ac490 state=finished returned str>]
#所有任務(wù)運行結(jié)束
此外我們可以使用future.running()和future.done()來判斷當(dāng)前任務(wù)是否執(zhí)行完婴洼,這里不做演示了海蔽。
2.2.2使用map提交任務(wù)
map。和 Python 自帶的 map 函數(shù)功能類似输硝,只不過是以異步的方式把函數(shù)依次作用在列表的每個元素上今瀑。
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個線程
# 如果不指定,那么每一個任務(wù)都會為其創(chuàng)建一個線程
executor = ThreadPoolExecutor(max_workers=3)
futures = executor.map(threadTask, [i for i in range(5)]) # 提交多個任務(wù)橘荠,注意這里與submit的區(qū)別
# 通過submit就直接將任務(wù)提交到線程池里面了屿附,一旦提交,就會立刻運行
# 提交之后砾医,相當(dāng)于開啟了一個新的線程拿撩,主線程會繼續(xù)往下走
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
time.sleep(5) # 主線程休眠
print(f'線程狀態(tài):{futures}')
if __name__ == "__main__":
localThreadPool()
2.3重要函數(shù)
模塊下有 2 個重要函數(shù)wait和as_completed衣厘。用來等待所有任務(wù)完成如蚜。
2.3.1 wait
wait用來等待指定的Future實例完成,它和asyncio.wait意圖很像影暴,返回值有 2 項错邦,第一項表示完成的任務(wù)列表 (done),第二項表示為未完成的任務(wù)列表 (not_done):
from concurrent.futures import ThreadPoolExecutor, wait
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù)型宙,表示最多創(chuàng)建多少個線程
# 如果不指定撬呢,那么每一個任務(wù)都會為其創(chuàng)建一個線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了,一旦提交妆兑,就會立刻運行
# 提交之后魂拦,相當(dāng)于開啟了一個新的線程,主線程會繼續(xù)往下走
print("沒有阻塞搁嗓,我還可以輸出")
fs = wait(futures)
print(fs)
print("任務(wù)跑完了芯勘,我才能被放出來")
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進程id----20
沒有阻塞,我還可以輸出
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進程id----20
DoneAndNotDoneFutures(done={<Future at 0x1104e2c40 state=finished returned str>, <Future at 0x11050f2b0 state=finished returned str>, <Future at 0x110507eb0 state=finished returned str>, <Future at 0x110507b20 state=finished returned str>, <Future at 0x11050f3d0 state=finished returned str>}, not_done=set())
任務(wù)跑完了腺逛,我才能被放出來
Process finished with exit code 0
2.3.2 as_completed
as_completed函數(shù)返回一個包含指定的 Future 實例的迭代器荷愕,這些實例會在完成時被 yield 出來:
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個線程
# 如果不指定棍矛,那么每一個任務(wù)都會為其創(chuàng)建一個線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了安疗,一旦提交,就會立刻運行
# 提交之后够委,相當(dāng)于開啟了一個新的線程荐类,主線程會繼續(xù)往下走
for future in as_completed(futures):
print(future)
if __name__ == "__main__":
localThreadPool()
運行結(jié)果:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進程id----20
#上面的是先輸出的內(nèi)容
任務(wù)3:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_2,進程id----20
<Future at 0x110540c40 state=finished returned str>
<Future at 0x110565eb0 state=finished returned str>
<Future at 0x110565b20 state=finished returned str>
# 當(dāng)前三個任務(wù)完成后就有上面的輸出了,最后才是下面的輸出
<Future at 0x11056d3d0 state=finished returned str>
<Future at 0x11056d2b0 state=finished returned str>
注意:as_completed只能用于多個submit組成的列表茁帽,不能和map一起使用玉罐。
2.3.3等待任務(wù)完成另外兩種方法
方法1:調(diào)用executor的shutdown
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個線程
# 如果不指定脐雪,那么每一個任務(wù)都會為其創(chuàng)建一個線程
executor = ThreadPoolExecutor(max_workers=3)
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了厌小,一旦提交,就會立刻運行
# 提交之后战秋,相當(dāng)于開啟了一個新的線程璧亚,主線程會繼續(xù)往下走
print("沒有阻塞,我還可以輸出")
executor.shutdown()
print("任務(wù)跑完了,我才能被放出來")
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進程id----20任務(wù)1:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進程id----20
沒有阻塞癣蟋,我還可以輸出
任務(wù)3:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)跑完了透硝,我才能被放出來
Process finished with exit code 0
方法2:使用上下文管理
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù),表示最多創(chuàng)建多少個線程
# 如果不指定疯搅,那么每一個任務(wù)都會為其創(chuàng)建一個線程
with ThreadPoolExecutor(max_workers=3) as executor:
print("沒有阻塞濒生,我還可以輸出")
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了,一旦提交幔欧,就會立刻運行
# 提交之后罪治,相當(dāng)于開啟了一個新的線程,主線程會繼續(xù)往下走
print("任務(wù)跑完了礁蔗,我才能被放出來")
for future in futures:
print(future)
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
沒有阻塞觉义,我還可以輸出
任務(wù)0:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)跑完了,我才能被放出來
<Future at 0x10ff82d00 state=finished returned str>
<Future at 0x10ffa7be0 state=finished returned str>
<Future at 0x10ffa7f70 state=finished returned str>
<Future at 0x10ffb0340 state=finished returned str>
<Future at 0x10ffb0460 state=finished returned str>
2.4加入異常處理
2.4.1 submit方式提交的任務(wù)處理異常
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
import random
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
num = random.randint(0, 2)/taskNum
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù)浴井,表示最多創(chuàng)建多少個線程
# 如果不指定晒骇,那么每一個任務(wù)都會為其創(chuàng)建一個線程
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(threadTask, i) for i in range(5)] # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了,一旦提交磺浙,就會立刻運行
# 提交之后洪囤,相當(dāng)于開啟了一個新的線程,主線程會繼續(xù)往下走
for future in futures:
try:
future.result()
except Exception as exc:
print(f'{future},Generated an exception: {exc}')
if __name__ == "__main__":
localThreadPool()
運行結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_0,進程id----20
<Future at 0x10ba96af0 state=finished raised ZeroDivisionError>,Generated an exception: division by zero
Process finished with exit code 0
2.4.2 map方式提交的任務(wù)異常處理
from concurrent.futures import ThreadPoolExecutor
import threading
import os
import time
import random
# 創(chuàng)建單個任務(wù)
def threadTask(taskNum):
num = random.randint(0, 2) / taskNum
threadId = threading.current_thread().getName()
period = f"任務(wù){(diào)taskNum}:線程id----{threadId},進程id----{os.getgid()}"
print(period)
time.sleep(3) # 子線程休眠
return period
# 定義回調(diào)函數(shù)
def callBack(future):
print(f"我是回調(diào)函數(shù):線程狀態(tài){future}")
# 封裝線程池函數(shù)
def localThreadPool():
# max_workers參數(shù)撕氧,表示最多創(chuàng)建多少個線程
# 如果不指定瘤缩,那么每一個任務(wù)都會為其創(chuàng)建一個線程
with ThreadPoolExecutor(max_workers=3) as executor:
futures = executor.map(threadTask, [i for i in range(5)]) # 提交多個任務(wù)
# 通過submit就直接將任務(wù)提交到線程池里面了,一旦提交呵曹,就會立刻運行
# 提交之后款咖,相當(dāng)于開啟了一個新的線程,主線程會繼續(xù)往下走
while 1:
try:
print(next(futures))
except StopIteration:
break
except Exception as exc:
print(f'Generated an exception: {exc}')
if __name__ == "__main__":
localThreadPool()
輸出結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_0,進程id----20
任務(wù)2:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_1,進程id----20
Generated an exception: division by zero
Process finished with exit code 0
如果我們采用submit的異常處理方法
輸出結(jié)果如下:
任務(wù)1:線程id----ThreadPoolExecutor-0_0,進程id----20任務(wù)2:線程id----ThreadPoolExecutor-0_1,進程id----20
任務(wù)3:線程id----ThreadPoolExecutor-0_2,進程id----20
任務(wù)4:線程id----ThreadPoolExecutor-0_2,進程id----20
Traceback (most recent call last):
File "/Users/xianchengchi.py", line 40, in <module>
localThreadPool()
File "/Users/xianchengchi.py", line 31, in localThreadPool
for future in futures:
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 600, in result_iterator
yield fs.pop().result()
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 433, in result
return self.__get_result()
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/local/Cellar/python@3.9/3.9.1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 52, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/xianchengchi.py", line 10, in threadTask
num = random.randint(0, 2)/taskNum
ZeroDivisionError: division by zero
Process finished with exit code 1
可以看到第一次錯誤發(fā)生后生成器就結(jié)束了奄喂,所以一批任務(wù)中可能會出現(xiàn)異常是不合適用map的铐殃,因為list(rs)或者對結(jié)果做循環(huán)是會由于某個任務(wù)拋錯而獲得不了后面的那些任務(wù)結(jié)果,最好的方式還是submit + as_completed跨新。
最后:善用 max_workers
ProcessPoolExecutor和ThreadPoolExecutor都接受max_workers參數(shù)富腊,表示用來執(zhí)行任務(wù)的進程 / 線程數(shù)量。ProcessPoolExecutor 的默認(rèn)值是 CPU 的個數(shù) (通過 < code>os.cpu_count () 獲得)域帐,而 ThreadPoolExecutor 的默認(rèn)值是 CPU 的個數(shù)的 5 倍赘被!
對于初學(xué)者或者通常情況下是不需要手動設(shè)置max_workers參數(shù),默認(rèn)值是可以足夠好的工作的肖揣。但是:
- 根據(jù)不同的業(yè)務(wù)場景民假,提高 max_workers 可以加快任務(wù)完成。不過要注意龙优,不是值越高越高羊异,超過一定閾值會起到反作用。尤其是在 IO 密集型的任務(wù)上使用 ThreadPoolExecutor,不同的 max_workers 差別會很大野舶,但是影響網(wǎng)絡(luò)問題因素太多易迹,我這里就不舉例了。
- 有時候服務(wù)器上跑了很多重要服務(wù)平道,不希望某個任務(wù)影響到全局睹欲,還可以按需把 max_workers 的值設(shè)置成小于默認(rèn)值。