線程是一個(gè)程序級(jí)別的概念烈评,可以暫時(shí)理解為一個(gè)程序中的某個(gè)函數(shù),當(dāng)我們同時(shí)執(zhí)行很多個(gè)函數(shù)的時(shí)候就是多線程。比如說一個(gè)程序有需要從網(wǎng)上下載東西的函數(shù),還有要從磁盤中加載數(shù)據(jù)的函數(shù)千康,這兩個(gè)函數(shù)都是比較耗時(shí)的。如果是單進(jìn)程的話需要一個(gè)一個(gè)函數(shù)運(yùn)行铲掐,下載完數(shù)據(jù)然后才能從磁盤加載拾弃,這樣程序就會(huì)很卡頓。如果使用多進(jìn)程的話摆霉,我們可以在下載數(shù)據(jù)的同時(shí)加載數(shù)據(jù)豪椿,這樣就快的多了。
一斯入、啟動(dòng)多個(gè)線程
在說線程的創(chuàng)建之前砂碉,我們先來看一個(gè)單線程的例子蛀蜜,作為對(duì)比再來看之后的多線程刻两,一般來說我們之前寫的程序都是單線程,也就是邏輯流與指令流一致的程序滴某。
import time
def run(n):
print("task", n)
time.sleep(2)
print('task done', n)
start_time = time.time()
run('t1')
run('t2')
print(time.time() - start_time)
task t1
task done t1
task t2
task done t2
4.001030206680298
從輸出上就可以看出來兩個(gè)函數(shù)調(diào)用是依次運(yùn)行的磅摹,一共執(zhí)行了 4 秒多一點(diǎn)。
1. 函數(shù)方式啟動(dòng)線程
我們先來看第一種啟動(dòng)線程的方式霎奢,使用 threading.Thread() 方法户誓。這個(gè)方法需要傳入兩個(gè)參數(shù),
- 線程函數(shù)名:要設(shè)置線程讓其后臺(tái)執(zhí)行的函數(shù)幕侠,這個(gè)函數(shù)由我們自己定義注意不要加 ( )帝美。
- 線程函數(shù)的參數(shù):線程函數(shù)名函數(shù)所需要的參數(shù),以元組的形式傳入晤硕。如果不需要傳參悼潭,可以不指定。
來看一個(gè)例子:
import threading
import time
def run(n):
print("task ",n )
time.sleep(2)
print('task done', n)
print(time.time() - start_time)
start_time = time.time()
t1 = threading.Thread(target=run,args=("t1",))
t2 = threading.Thread(target=run,args=("t2",))
t1.start()
t2.start()
task t1
task t2
task done t1
2.001432418823242
task done t2
2.002263307571411
從結(jié)果上來看確實(shí)兩個(gè)函數(shù)是依次執(zhí)行的舞箍,運(yùn)行時(shí)間也確實(shí)從 4s 多縮短到了 2s 多舰褪。
2. 通過類方式啟動(dòng)線程
首先,我們需要自己定義一個(gè)類疏橄,對(duì)這個(gè)類有兩點(diǎn)要求:
- 必須繼承 threading.Thread 這個(gè)父類
- 必須重寫 run 方法
上面的 run 方法占拍,和我們上面的線程函數(shù)的性質(zhì)是一樣的,當(dāng)我們運(yùn)行 start() 方法的時(shí)候就會(huì)被自動(dòng)調(diào)用。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, n, sleep_time):
super(MyThread, self).__init__()
self.n = n
self.sleep_time = sleep_time
def run(self):
print("runnint task ", self.n)
time.sleep(self.sleep_time)
print("task done ", self.n)
print(time.time() - start_time)
start_time = time.time()
t1 = MyThread("t1", 2)
t2 = MyThread("t2", 2)
t1.start()
t2.start()
runnint task t1
runnint task t2
task done t1
2.0018975734710693
task done t2
2.0018975734710693
結(jié)果和函數(shù)的差不多晃酒。
三表牢、join 方法
可能看了上面的幾個(gè)例子你有一個(gè)疑問,就是在計(jì)算時(shí)間的時(shí)候?yàn)槭裁床粚r(shí)間放到代碼的最后呢贝次,這樣不是更加準(zhǔn)確嗎初茶,我們先來看一個(gè)這樣寫的例子,我們可以夸張一些浊闪,一下創(chuàng)建 50 個(gè)線程恼布。
import threading, time
def run(n):
print("task", n)
time.sleep(2)
print('task done ', n)
start_time = time.time()
for i in range(50):
t = threading.Thread(target=run, args=("t-{}".format(i + 1),))
t.start()
print("-"*50, 'all thread has finished', "-"*50)
print(time.time() - start_time)
因?yàn)檫\(yùn)行結(jié)果很長,我只截取了部分搁宾,因?yàn)楹芏鄠€(gè)函數(shù)同時(shí)向屏幕輸出就會(huì)造成這種很亂的感覺折汞。
task t-1
task t-2
task t-3
task t-4
task t-5
task t-6
...
task t-50--------------------------------------------------
all thread has finished --------------------------------------------------
0.012969255447387695
task done t-2
task done t-1
task done task done t-5t-3
...
t-49
task done t-50
task done t-44
首先主線程啟動(dòng),然后依次啟動(dòng) 50 個(gè)線程盖腿,同時(shí)運(yùn)行爽待,因?yàn)橹骶€程執(zhí)行時(shí)間比較短,所以主線程在子線程結(jié)束前就結(jié)束了翩腐,所以計(jì)算的時(shí)間只是主線程的運(yùn)行時(shí)間鸟款。
我故意把主線程畫的很短,有一種執(zhí)行時(shí)間很短的感覺茂卦。從上圖也可以看出來主線程的結(jié)束對(duì)其他的線程是沒有影響的何什。
所以我們就需要一種方法,使得主線程可以在某處等待其他線程的執(zhí)行等龙,執(zhí)行完后处渣,說不定還會(huì)返回個(gè)結(jié)果,再繼續(xù)執(zhí)行主線程蛛砰,比如下圖這種感覺罐栈。
值得注意的是進(jìn)程 1 到 進(jìn)程 50 之間仍然是并列的關(guān)系。
下面給出一個(gè)實(shí)例:
import threading, time
def run(n):
print("task", n)
time.sleep(2)
print('task done ', n)
start_time = time.time()
t_objs = [] # 儲(chǔ)存線程實(shí)例
for i in range(50):
t = threading.Thread(target=run, args=("t-{}".format(i + 1),))
t.start()
t_objs.append(t) # 防止阻塞后面線程的啟動(dòng)泥畅,不在這里 join
# 這段代碼可以理解為子進(jìn)程未結(jié)束荠诬,主進(jìn)程也不能結(jié)束
for t in t_objs:
t.join()
print("-"*50, 'all thread has finished', "-"*50)
print(time.time() - start_time)
task t-1
task t-2
task t-3
task t-4
task t-5
...
t-46
task done t-50task done t-49task done
t-48
-------------------------------------------------- all thread has finished --------------------------------------------------
2.0129146575927734
這樣就可以計(jì)算所有線程的運(yùn)行時(shí)間了,可以看到只運(yùn)行了 2s 多一點(diǎn)位仁,如果要是單進(jìn)程運(yùn)行的話恐怕要超過 100s 呢柑贞。
在上面的代碼中還有一個(gè)地方值得注意,就是在線程創(chuàng)建完后我們沒有直接 join 障癌,而是先存到一個(gè)列表中凌外,等到全部線程創(chuàng)建完后再全部 join。如果我們?cè)诰€程 1 創(chuàng)建完后直接 join 涛浙,那么主線程就會(huì)等待線程 1 運(yùn)行完康辑,然后再創(chuàng)建線程 2 摄欲,這樣實(shí)際上還是單線程。
根據(jù)當(dāng)前線程的數(shù)量查看線程的生命周期
import threading, time
def sing():
for i in range(3):
print('正在唱歌... ', i+1)
time.sleep(1)
def dance():
for i in range(3):
print('正在跳舞... ', i+1)
time.sleep(1)
if __name__ == "__main__":
print('晚會(huì)開始:', time.ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
while True:
length = len(threading.enumerate()) # 枚舉疮薇,返回一個(gè)列表
print('當(dāng)前運(yùn)行的線程數(shù)量為:', length)
time.sleep(0.7)
if length <= 1:
break
晚會(huì)開始: Tue Aug 20 17:28:32 2019
正在唱歌... 1
正在跳舞... 當(dāng)前運(yùn)行的線程數(shù)量為: 3
1
當(dāng)前運(yùn)行的線程數(shù)量為: 3
正在跳舞... 2
正在唱歌... 2
當(dāng)前運(yùn)行的線程數(shù)量為: 3
正在唱歌... 3
正在跳舞... 3
當(dāng)前運(yùn)行的線程數(shù)量為: 3
當(dāng)前運(yùn)行的線程數(shù)量為: 3
當(dāng)前運(yùn)行的線程數(shù)量為: 1
這個(gè)程序中沒有使用 join 函數(shù)胸墙,因?yàn)檫@個(gè)程序的主線程運(yùn)行時(shí)間很長胰伍。實(shí)際上在本文的前幾個(gè)例子中书闸,我們并沒有使用 join 但是還是計(jì)算出了程序運(yùn)行時(shí)間,就是因?yàn)槲覀兺ㄟ^分析代碼判斷出來哪個(gè)線程最后結(jié)束躲惰,將停止計(jì)時(shí)的代碼放到這個(gè)線程最后就可以了励七。
四智袭、多線程互斥鎖
在不同的線程之間,變量是共享的掠抬,有時(shí)候多個(gè)線程同時(shí)維護(hù)一個(gè)變量吼野,這樣就會(huì)造成混亂,為了防止混亂两波,可以使用鎖瞳步,也就是說當(dāng)一個(gè)變量被訪問的時(shí)候,其他線程就無法在訪問腰奋。
1. 如何使用鎖
import threading
# 生成鎖對(duì)象单起,全局唯一
lock = threading.Lock()
# 獲取鎖,未獲取到會(huì)阻塞程序劣坊,直到獲取到了鎖才會(huì)往下執(zhí)行
lock.acquire()
# 釋放鎖嘀倒,其他線程可以訪問
lock.release()
值得注意的是,lock.acquire() 和 lock.release() 必須要成對(duì)出現(xiàn),但是有時(shí)候還是會(huì)忘記讼稚,所以我們可以使用上下文管理器來實(shí)現(xiàn)括儒。
import threading
lock = threading.Lock()
with lock:
# 這里寫代碼
pass
2. 嘗試在程序中使用鎖
我們先來看一個(gè)不使用鎖的例子:
import threading, time
g_num = 0
def work1(num):
global g_num
for i in range(num):
g_num += 1
print('in work1, g_num is ', g_num)
time.sleep(1) # 代表執(zhí)行其他操作
def work2(num):
global g_num
for i in range(num):
g_num += 1
print('in work2, g_num is ', g_num)
time.sleep(1) # 代表執(zhí)行其他操作
print('線程創(chuàng)建之前:', g_num)
t1 = threading.Thread(target=work1, args=(10000000,))
t2 = threading.Thread(target=work2, args=(10000000,))
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('兩個(gè)線程對(duì)同一個(gè)全局變量操作之后的最終結(jié)果是:', g_num)
線程創(chuàng)建之前: 0
in work2, g_num is 11500115
in work1, g_num is 11593232
兩個(gè)線程對(duì)同一個(gè)全局變量操作之后的最終結(jié)果是: 11593232
當(dāng)我們決定完成一個(gè)任務(wù)绕沈,通常情況下锐想,在計(jì)算機(jī)中,看似很簡單的任務(wù)也是有多個(gè)不同的步驟共同完成乍狐。該步驟是由 cpu 的 一些指令完成的赠摇。比如我們常見的 i ++ ;這是一個(gè)非原子性操作浅蚪,因?yàn)樗葟膬?nèi)存取出 i 的值藕帜,然后再增 1,最后再寫入內(nèi)存中惜傲,經(jīng)過三個(gè)步驟完成洽故,如果在中間一個(gè)步驟被其他線程影響了,那么就可能出現(xiàn)錯(cuò)誤盗誊。
那么我們就將這些步驟加鎖时甚,作為一個(gè)原子步驟隘弊,比如我們將每次循環(huán)中的
import threading, time
g_num = 0
lock = threading.Lock()
def work1(num):
global g_num
for i in range(num):
with lock:
g_num += 1
print('in work1, g_num is ', g_num)
time.sleep(1) # 代表執(zhí)行其他操作
def work2(num):
global g_num
for i in range(num):
with lock:
g_num += 1
print('in work2, g_num is ', g_num)
time.sleep(1) # 代表執(zhí)行其他操作
print('線程創(chuàng)建之前:', g_num)
t1 = threading.Thread(target=work1, args=(100000,))
t2 = threading.Thread(target=work2, args=(100000,))
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('兩個(gè)線程對(duì)同一個(gè)全局變量操作之后的最終結(jié)果是:', g_num)
線程創(chuàng)建之前: 0
in work1, g_num is 181967
in work2, g_num is 200000
兩個(gè)線程對(duì)同一個(gè)全局變量操作之后的最終結(jié)果是: 200000
當(dāng)然如果你想將整個(gè)循環(huán)作為一個(gè)原子性操作也是可以的,注意這兩個(gè)地方的不同荒适。這次我不使用上下文管理器來寫:
import threading, time
g_num = 0
lock = threading.Lock()
def work1(num):
global g_num
lock.acquire()
for i in range(num):
g_num += 1
lock.release()
print('in work1, g_num is ', g_num)
time.sleep(1) # 代表執(zhí)行其他操作
def work2(num):
global g_num
lock.acquire()
for i in range(num):
g_num += 1
lock.release()
print('in work2, g_num is ', g_num)
time.sleep(1) # 代表執(zhí)行其他操作
print('線程創(chuàng)建之前:', g_num)
t1 = threading.Thread(target=work1, args=(100000,))
t2 = threading.Thread(target=work2, args=(100000,))
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
print('兩個(gè)線程對(duì)同一個(gè)全局變量操作之后的最終結(jié)果是:', g_num)
線程創(chuàng)建之前: 0
in work1, g_num is 100000
in work2, g_num is 200000
兩個(gè)線程對(duì)同一個(gè)全局變量操作之后的最終結(jié)果是: 200000
3.死鎖
在線程中同共享多個(gè)資源的時(shí)候梨熙,如果兩個(gè)線程分別占有一部分資源并且同時(shí)等待對(duì)方的資源們就會(huì)造成死鎖。
盡管死鎖很少發(fā)生刀诬,但是一旦發(fā)生就會(huì)造成應(yīng)用的停止響應(yīng)咽扇,下面是一個(gè)死鎖的例子:
import threading, time
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread1(threading.Thread):
def run(self):
# 對(duì) mutexA 上鎖
mutexA.acquire()
# mutexA 上鎖后,等待一秒陕壹,等待另外一個(gè)線程釋放 mutexB
print(self.name + '---do1---up---')
time.sleep(2)
mutexB.acquire()
print(self.name + '---do1---down---')
mutexB.release()
mutexB.release()
class MyThread2(threading.Thread):
def run(self):
# 對(duì) mutexA 上鎖
mutexB.acquire()
# mutexA 上鎖后质欲,等待一秒,等待另外一個(gè)線程釋放 mutexB
print(self.name + '---do2---up---')
time.sleep(2)
mutexA.acquire()
print(self.name + '---do2---down---')
mutexA.release()
mutexB.release()
if __name__ == "__main__":
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
while len(threading.enumerate()) != 1:
time.sleep(1)
length = len(threading.enumerate()) # 枚舉糠馆,返回一個(gè)列表
print('當(dāng)前運(yùn)行的線程數(shù)量為:', length)
Thread-1---do1---up---
Thread-2---do2---up---
當(dāng)前運(yùn)行的線程數(shù)量為: 3
當(dāng)前運(yùn)行的線程數(shù)量為: 3
當(dāng)前運(yùn)行的線程數(shù)量為: 3
...
程序就一直運(yùn)行下去了把敞。
五、線程間的通信
我們來研究一下線程間的消息通訊機(jī)制榨惠,目前我們遇到的問題是這樣的:很多個(gè)線程發(fā)出消息奋早,然后有很多個(gè)線程接收消息。其實(shí)這件事情很容易出問題赠橙,目前最安全的解決辦法是使用 queue 中的隊(duì)列耽装。隊(duì)列已經(jīng)實(shí)現(xiàn)了鎖的功能,使用其他數(shù)據(jù)結(jié)構(gòu)需要自己來實(shí)現(xiàn)期揪。
創(chuàng)建一個(gè)被多個(gè)線程共享的 Queue 對(duì)象掉奄,這些線程通過 put() 和 get() 操作來向隊(duì)列中添加或者刪除元素。下面是一些隊(duì)列常見的函數(shù)凤薛。
from queue import Queue
# maxsize 默認(rèn)為 0姓建,不受限制
# 一旦設(shè)置大于零,當(dāng)消息數(shù)達(dá)到限制缤苫,q.put() 也會(huì)被阻塞
q = Queue(maxsize=0)
# 阻塞程序速兔,等待隊(duì)列消息
q.get()
# 獲取消息,設(shè)置超時(shí)時(shí)間
q.get(timeout=5.0)
# 發(fā)送消息
q.put()
# 所有任務(wù)完成時(shí)程序才繼續(xù)執(zhí)行后面的代碼活玲,否則處于阻塞狀態(tài)涣狗。
q.join()
# 查詢當(dāng)前隊(duì)列的消息個(gè)數(shù)
q.qsize()
# 隊(duì)列消息是否都被消費(fèi)完,True/False
q.empty()
# 隊(duì)列消息是否已滿
q.full()
生產(chǎn)者消費(fèi)者模型:
線程間通信的問題其實(shí)可以抽象成一個(gè)生產(chǎn)者舒憾、消費(fèi)者模型镀钓,發(fā)出信息的進(jìn)程(們)作為生產(chǎn)者,接收信息的進(jìn)程(們)作為消費(fèi)者镀迂。
我們希望:在多線程開發(fā)當(dāng)中丁溅,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢探遵,那么生產(chǎn)者就必須等待消費(fèi)者處理完窟赏,才能繼續(xù)生產(chǎn)數(shù)據(jù)措译。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者饰序,那么消費(fèi)者就必須等待生產(chǎn)者领虹。為了解決這個(gè)問題于是引入了生產(chǎn)者和消費(fèi)者模式。
生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題求豫。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊塌衰,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理蝠嘉,直接扔給阻塞隊(duì)列最疆,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取蚤告,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū)努酸,平衡了生產(chǎn)者和消費(fèi)者的處理能力。
這就像杜恰,在餐廳获诈,廚師做好菜,不需要直接和客戶交流心褐,而是交給前臺(tái)舔涎,而客戶去飯菜也不需要不找廚師,直接去前臺(tái)領(lǐng)取即可逗爹,這也是一個(gè)結(jié)耦的過程亡嫌。
我們先來看一個(gè)例子:
import random
import threading
import time
import queue
q = queue.Queue(maxsize=10)
def producter(name):
count = 1
while True:
q.put("紅包{}".format(count))
print('生成了紅包', count)
count += 1
time.sleep(random.randrange(3))
def consumer(name):
while True:
print('[{}]搶到了紅包[{}]!掘而!'.format(name, q.get()))
time.sleep(random.randrange(5))
p = threading.Thread(target=producter, args=("tencent",))
c1 = threading.Thread(target=consumer, args=('Tim',))
c2 = threading.Thread(target=consumer, args=('King',))
c3 = threading.Thread(target=consumer, args=('Wang',))
p.start()
c1.start()
c2.start()
生成了紅包 1
[Tim]搶到了紅包[紅包1]P凇!
生成了紅包 2
[King]搶到了紅包[紅包2]E鬯知染!
生成了紅包 3
[Tim]搶到了紅包[紅包3]!女蜈!
生成了紅包 4
生成了紅包 5
在這個(gè)例子中持舆,線程 p 作為發(fā)出消息的生產(chǎn)者,發(fā)出了一個(gè) str 對(duì)象伪窖,但是它的生產(chǎn)速度很快,所以保存在隊(duì)列中居兆,c1, c2, c3 作為接收消息的消費(fèi)者覆山,依次從隊(duì)列中獲取消息,這樣就實(shí)現(xiàn)了線程間消息的安全傳遞泥栖。
為了不固化思維簇宽,我們?cè)賮砜匆粋€(gè)例子:
這個(gè)例子來自知乎@咪咪怪
設(shè)想有這樣一個(gè)情況:下面有 6 個(gè)美少女勋篓,她們準(zhǔn)備去量身高,有三個(gè)稱重處可以服務(wù)魏割。
import queue
import threading
def worker():
while True:
item = q.get()
if item is None:
break
print("妹紙名字{}譬嚣,年齡{}".format(item['name'], item['age']))
q.task_done() # 這個(gè)語句與下面的 q.join() 配合使用
q = queue.Queue()
num_worker_threads = 3 # 三個(gè)人在同時(shí)給妹子測(cè)量身高
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
beauty_girls = [{"name": "小H", "age": 23},
{"name": "小E", "age": 22},
{"name": "小D", "age": 21},
{"name": "小C", "age": 20},
{"name": "小B", "age": 19},
{"name": "小A", "age": 18},]
for girl in beauty_girls:
q.put(girl)
# 所有任務(wù)完成時(shí)才繼續(xù)執(zhí)行后面的代碼,否則處于阻塞狀態(tài)
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
妹紙名字小H钞它,年齡23
妹紙名字小E拜银,年齡22
妹紙名字小D,年齡21
妹紙名字小C遭垛,年齡20
妹紙名字小B尼桶,年齡19
妹紙名字小A,年齡18
這個(gè)例子實(shí)際上是主進(jìn)程作為消息的發(fā)出者锯仪,它每次發(fā)出的信息是一個(gè)裝著美女的字典泵督,三個(gè)稱重處進(jìn)程是信息的接收者(也就是接收一個(gè)美女啦)。
六庶喜、線程池
線程池可以看做是線程的集合小腊。在沒有任務(wù)時(shí)線程處于空閑狀態(tài),當(dāng)請(qǐng)求到來:線程池給這個(gè)請(qǐng)求分配一個(gè)空閑的線程久窟,任務(wù)完成后回到線程池中等待下次任務(wù)**(而不是銷毀)溢豆。這樣就實(shí)現(xiàn)了線程的重用。
為每個(gè)請(qǐng)求都開一個(gè)新的線程雖然理論上是可以的瘸羡,但是會(huì)有缺點(diǎn):
- 線程生命周期的開銷非常高漩仙。每個(gè)線程都有自己的生命周期,創(chuàng)建和銷毀線程所花費(fèi)的時(shí)間和資源可能比處理客戶端的任務(wù)花費(fèi)的時(shí)間和資源更多犹赖,并且還會(huì)有某些空閑線程也會(huì)占用資源队他。
- 程序的穩(wěn)定性和健壯性會(huì)下降,每個(gè)請(qǐng)求開一個(gè)線程峻村。如果受到了惡意攻擊或者請(qǐng)求過多(內(nèi)存不足)麸折,程序很容易就奔潰掉了。
所以說:我們的線程最好是交由線程池來管理粘昨,這樣可以減少對(duì)線程生命周期的管理垢啼,一定程度上提高性能。
在 Python3 中张肾,創(chuàng)建線程池是通過 concurrent.futures 函數(shù)庫中的 ThreadPoolExecutor 類來實(shí)現(xiàn)的芭析。
future 對(duì)象(期貨對(duì)象):在未來的某一時(shí)刻完成操作的對(duì)象. submit 方法可以返回一個(gè) future 對(duì)象.
import threading, time
from concurrent.futures import ThreadPoolExecutor
# 線程執(zhí)行的函數(shù)
def add(n1, n2):
v = n1 + n2
print('add: ', v, ',tid: ', threading.currentThread().ident)
time.sleep(n1)
return v
# 通過submit把需要執(zhí)行的函數(shù)扔進(jìn)線程池中.
# submit 直接返回一個(gè)future對(duì)象
ex = ThreadPoolExecutor(max_workers=3) # 制定最多運(yùn)行N個(gè)線程
future1 = ex.submit(add,2,3)
future2 = ex.submit(add,2,2)
print('main thread running')
print(future1.done()) # done 看看任務(wù)結(jié)束了沒
print(future1.result()) # 獲取結(jié)果 ,阻塞方法
add: 5 ,tid: 5564
add: 4 ,tid: 31484
main thread running
False
5
map 方法,返回是跟你提交序列是一致的. 是有序的
import threading, requests
from concurrent.futures import ThreadPoolExecutor
# 下面是map 方法的簡單使用. 注意:map 返回是一個(gè)生成器 ,并且是*有序的*
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
print('thread id:',threading.currentThread().ident,' 訪問了:',url)
return requests.get(url) # 這里使用了requests 模塊
ex = ThreadPoolExecutor(max_workers=3)
res_iter = ex.map(get_html,URLS) # 內(nèi)部迭代中, 每個(gè)url 開啟一個(gè)線程
for res in res_iter: # 此時(shí)將阻塞 , 直到線程完成或異常
print('url:%s ,len: %d'%(res.url,len(res.text)))
thread id: 32500 訪問了: http://www.baidu.com
thread id: 35188 訪問了: http://www.qq.com
thread id: 34976 訪問了: http://www.sina.com.cn
url:http://www.baidu.com/ ,len: 2381
url:https://www.qq.com/ ,len: 231703
url:https://www.sina.com.cn/ ,len: 570395
接下來,使用as_completed . 這個(gè)函數(shù)為submit 而生。
你總想通過一種辦法來解決submit后啥時(shí)候完成的吧 , 而不是一次次調(diào)用future.done 或者 使用 future.result 吧吞瞪。
concurrent.futures.as_completed(fs, timeout=None) 返回一個(gè)生成器,在迭代過程中會(huì)阻塞馁启。
直到線程完成或者異常時(shí),產(chǎn)生一個(gè)Future對(duì)象。
同時(shí)注意, map方法返回是有序的, as_completed 是那個(gè)線程先完成/失敗 就返回芍秆。
import threading, requests, time
from concurrent.futures import ThreadPoolExecutor, as_completed
# as_completed 完整的例子
# as_completed 返回一個(gè)生成器惯疙,用于迭代翠勉, 一旦一個(gè)線程完成(或失敗) 就返回
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
time.sleep(1)
print('thread id:',threading.currentThread().ident,' 訪問了:',url)
return requests.get(url)
ex = ThreadPoolExecutor(max_workers=3) # 最多3個(gè)線程
future_tasks = [ex.submit(get_html,url) for url in URLS] # 創(chuàng)建3個(gè)future對(duì)象
for future in as_completed(future_tasks): # 迭代生成器, 參數(shù)是一個(gè)列表
try:
resp = future.result()
except Exception as e:
print('%s'%e)
else:
print('%s has %d bytes!'%(resp.url, len(resp.text)))
thread id: 5160 訪問了: http://www.baidu.com
thread id: 7752 訪問了: http://www.sina.com.cn
thread id: 5928 訪問了: http://www.qq.com
http://www.qq.com/ has 240668 bytes!
http://www.baidu.com/ has 2381 bytes!
https://www.sina.com.cn/ has 577244 bytes!
就先寫這么多吧!