在Python中讽营,并發(fā)并不是同一時刻有多個操作(thread/task)同時進(jìn)行移宅。相反归粉,由于全局解釋器鎖(GIL) 的存在,在某個特定的時刻漏峰,它只允許有一個操作發(fā)生糠悼,只不過線程或任務(wù)之間會互相切換,直到完成浅乔,如下圖所示:Python version 3.8.5
上圖中出現(xiàn)了線程(thread) 和任務(wù)(task)兩種切換順序的不同方式倔喂,分別對應(yīng)Python中的兩種實現(xiàn)并發(fā)的方法:threading 和 asyncio. 對于 threading,操作系統(tǒng)知道每個線程的所有信息靖苇,因此它會做主在適當(dāng)?shù)臅r候做線程切換席噩。而對于 asyncio,主程序想要切換任務(wù)時贤壁,必須得到此任務(wù)可以被切換的通知班挖。
本文內(nèi)容只涉及基于concurrent.futures的多線程并發(fā),不涉及asyncio芯砸。
選擇多線程還是多進(jìn)程?
核心概念:
- 進(jìn)程是操作系統(tǒng)分配資源的最小單元给梅,線程是操作系統(tǒng)調(diào)度的最小單元假丧;
- 一個應(yīng)用程序至少包括一個進(jìn)程,一個進(jìn)程至少包括一個線程动羽;
- 每個進(jìn)程在執(zhí)行過程中擁有獨立的內(nèi)存單元包帚,而一個進(jìn)程的多個線程在執(zhí)行過程中共享內(nèi)存;
如果手頭的任務(wù)是I/O密集型运吓,可以使用標(biāo)準(zhǔn)庫的 threading 模塊渴邦,或者任務(wù)是CPU密集型,則可以使用 multiprocessing 模塊拘哨。這兩個模塊提供了很多控制權(quán)和靈活性谋梭,但代價就是必須編寫相對低級的冗長代碼,在任務(wù)核心邏輯的基礎(chǔ)上增加額外的并具有復(fù)雜性的層倦青,并且當(dāng)項目達(dá)到一定的規(guī)模瓮床,頻繁創(chuàng)建/銷毀進(jìn)程或者線程是非常消耗資源的,這個時候往往需要編寫自己的線程池/進(jìn)程池,以空間換時間隘庄。
從Python 3.2開始踢步,標(biāo)準(zhǔn)庫提供了 concurrent.futures 模塊,它在 threading 和 multiprocessing 之上的一個通用抽象層丑掺,提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類获印,以便使用線程池/進(jìn)程池并發(fā)/并行地執(zhí)行任務(wù)。
多線程并發(fā)編程之concurrent.futures
concurrent.futures 模塊對外提供了以下常量街州、函數(shù)或類:
__all__ = (
'FIRST_COMPLETED',
'FIRST_EXCEPTION',
'ALL_COMPLETED',
'CancelledError',
'TimeoutError',
'BrokenExecutor',
'Future',
'Executor',
'wait',
'as_completed',
'ProcessPoolExecutor',
'ThreadPoolExecutor',
)
Executor
Executor 是一個抽象類兼丰,不應(yīng)該直接使用此類,而是使用它提供的兩個子類:ThreadPoolExecutor 和 ProcessPoolExecutor菇肃,顧名思義兩者分別被用來創(chuàng)建線程池和進(jìn)程池的代碼地粪。
Executor的重要方法
ProcessPoolExecutor 和 ThreadPoolExecutor 類中最重要的幾個方法如下:
submit(fn,*args, **kwargs):提交異步任務(wù)(一般是阻塞的可調(diào)用函數(shù)),并返回表示可調(diào)用對象執(zhí)行的 Future 對象(原文:returns a
Future
object representing the execution of the callable)琐谤;此方法不保存與原始任務(wù)相關(guān)的任何上下文蟆技,如果想把結(jié)果和原始任務(wù)對應(yīng)起來,需要自己去追蹤它們斗忌,比如使用字典推導(dǎo)式质礼;map(fn, *iterables, timeout=None, chunksize=1):和標(biāo)準(zhǔn)的map函數(shù)功能類似,同樣返回一個迭代器织阳,只不過是以異步的方式把函數(shù)依次作用在可迭代對象的每個元素上江锨,注意:如果函數(shù)調(diào)用引發(fā)異常,當(dāng)從迭代器檢索其值時將引發(fā)異常枕磁,并且不會繼續(xù)執(zhí)行篡腌;
shutdown(wait=True):通知執(zhí)行器,當(dāng)所有掛起的 Future 對象執(zhí)行完成時弄痹,釋放正在使用的任何資源饭入;在抽象基類中實現(xiàn)了上下文管理器協(xié)議,
__exit__
方法中調(diào)用了shutdown方法肛真;
future
Futute 類封裝可調(diào)用對象的異步執(zhí)行谐丢,應(yīng)由Executor.submit() 方法創(chuàng)建,可以理解為一個在未來完成的操作蚓让。比如說在寫爬蟲代碼時會用到 requests.get 乾忱,在等待服務(wù)器返回結(jié)果之前的這段時間會產(chǎn)生I/O阻塞,CPU不能讓出來做其他的事情历极,F(xiàn)uture的引入就是幫助我們在等待的這段時間可以完成其他的操作窄瘟。
Futute 類中最重要的幾個方法如下:
- result(timeout=None):返回可調(diào)用對象的實際返回值;
- cancel():嘗試取消future趟卸,如果它正在運行或已經(jīng)完成寞肖,則不能取消纲酗,返回False,若取消成功則返回True新蟆;
- cancelled():如果future已被取消觅赊,則返回True;
- running():如果future當(dāng)前正在運行,則返回True琼稻;
- done():如果future已被取消或執(zhí)行完成吮螺,則返回True;
- add_done_callback(fn):future執(zhí)行完成后,添加回調(diào)帕翻;
- exception(timeout=None):返回future執(zhí)行時所引發(fā)的異常鸠补;
wait 和 as_completed
模塊下有2個重要函數(shù) wait
和 as_completed
。
-
wait(fs, timeout=None, return_when=ALL_COMPLETED)
遍歷fs提供的future(可能由不同的Executor實例創(chuàng)建)嘀掸,并等待執(zhí)行完成(包含已取消)紫岩,如果未設(shè)置timeout參數(shù),則代表不限制等待時間睬塌,return_when參數(shù)則用于設(shè)置次函數(shù)應(yīng)該在何時返回泉蝌,支持的選項如下:
Constant | Description |
---|---|
FIRST_COMPLETED | 在當(dāng)有任何future執(zhí)行完成(包括已取消)時返回結(jié)果 |
FIRST_EXCEPTION | 當(dāng)有任何future執(zhí)行引發(fā)異常時返回結(jié)果,若沒有任何future引發(fā)異常則等同于ALL_COMPLETED |
ALL_COMPLETED | 當(dāng)所有future執(zhí)行完成(包括已取消)時返回結(jié)果 |
該函數(shù)返回一個包含兩個元素的namedtuple揩晴,定義如下:
DoneAndNotDoneFutures = collections.namedtuple('DoneAndNotDoneFutures', 'done not_done')
- as_completed(fs, timeout=None)
返回一個迭代器勋陪,遍歷fs給出的 future 實例(可能由不同的執(zhí)行器實例創(chuàng)建),在它們執(zhí)行完成(包含已取消)時 yield future硫兰。
經(jīng)驗技巧
正確的使用submit和map
submit 方法返回的是 Future 對象诅愚, map方法則返回迭代器,如果沒有調(diào)用future對象的result方法劫映,即使執(zhí)行過程中有異常用戶也是不知道的违孝,如下所示:
f = lambda x: 100 // x
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(f, 0)
future2 = executor.submit(f, 10)
future3 = executor.submit(f, 20)
print(future1, future2, future3, sep='\n')
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = executor.map(f, [0, 10, 20])
print(futures)
>>>
<Future at 0x3d69fb8 state=finished raised ZeroDivisionError>
<Future at 0x3d85460 state=finished returned int>
<Future at 0x3d856d0 state=finished returned int>
<generator object Executor.map.<locals>.result_iterator at 0x03D59A00>
所以通常需要調(diào)用其 result 方法并且捕捉異常:
f = lambda x: 100 // x
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future1 = executor.submit(f, 0)
future2 = executor.submit(f, 0)
future3 = executor.submit(f, 20)
todos = [future1, future2, future3]
for future in concurrent.futures.as_completed(todos):
try:
print(future.result())
except ZeroDivisionError as e:
print(e.__repr__())
>>>
ZeroDivisionError('integer division or modulo by zero')
5
ZeroDivisionError('integer division or modulo by zero')
相對于submit,map方法的結(jié)果就比較難獲取了泳赋,這是因為map方法以異步的方式把函數(shù)依次作用在可迭代對象的每個元素上雌桑,如果在函數(shù)調(diào)用時引發(fā)了一些異常,當(dāng)從迭代器檢索其值時就將引發(fā)異常摹蘑,因此需要使用下面的方法:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
futures = executor.map(f, [0, 10, 20])
while True:
try:
future = futures.__next__()
except StopIteration:
break
except ZeroDivisionError as e:
print(e.__repr__())
>>>
ZeroDivisionError('integer division or modulo by zero')
可以看到,當(dāng)?shù)谝淮五e誤發(fā)生后生成器迭代就結(jié)束了轧飞,所以一批任務(wù)中可能會出現(xiàn)異常時是不合適用 map 方法的衅鹿,最好的方式還是使用 submit+as_completed. 在一些較為簡單的場景下,如果不需要關(guān)心任務(wù)的返回值过咬,則可以考慮使用map方法大渤。
尋找合適的max_worker
使用ThreadPoolExecutor,雖然線程的數(shù)量可以自定義掸绞,但并不是越多越好泵三,因為線程的創(chuàng)建耕捞、維護(hù)和刪除也會有一定的開銷。所以如果設(shè)置的很大烫幕,反而可能會導(dǎo)致速度變慢俺抽,比如下面的例子,把線程數(shù)從5改為10较曼,運行程序會發(fā)現(xiàn)耗時反而增多了磷斧。所以在實際開發(fā)過程中,往往需要根據(jù)實際的需要去做一些測試捷犹,在任務(wù)不影響到全局的情況下弛饭,尋找最優(yōu)的線程數(shù)量。
...
def download_all(urls: list):
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
executor.map(download_one, urls)
...
if __name__ == "__main__":
main()
>>>
...
Download 30 urls in 0.9526623 seconds
max_workers有默認(rèn)值萍歉,等于:min(32, (os.cpu_count() or 1) + 4)
避免死鎖
使用ThreadPoolExecutor時可能出現(xiàn)的死鎖情況侣颂,當(dāng)與Future 關(guān)聯(lián)的可調(diào)用函數(shù)等待另一個Future的結(jié)果時,它們可能永遠(yuǎn)不會釋放對線程的控制并導(dǎo)致死鎖枪孩,官網(wǎng)的示例如下:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
在上面的例子中憔晒,函數(shù)wait_on_b依賴于函數(shù)wait_on_a的結(jié)果(Future對象的結(jié)果),同時后一個函數(shù)的結(jié)果依賴于前一個函數(shù)的結(jié)果销凑。因此丛晌,上下文管理器中的代碼塊永遠(yuǎn)不會執(zhí)行,因為它具有相互依賴性斗幼,這就造成了死鎖澎蛛。
簡單使用場景
- 使用多線程從url下載和保存文件
import os
import concurrent.futures
from pathlib import Path
import requests
htmls_dir = Path(__file__).parent.joinpath('htmls')
if not htmls_dir.exists():
os.makedirs(htmls_dir)
else:
for html in htmls_dir.glob("*.html"):
os.remove(html)
def download_one(url):
resp = requests.get(url)
resp.encoding = 'utf-8'
return resp.text
def save(source, html, chunk=8 * 1024):
with open(source, 'w', encoding="utf-8") as fp:
for text in (html[i:chunk + i] for i in range(0, len(html), chunk)):
fp.write(text)
def download_all(urls):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(download_one, url): url[-7:] for url in urls}
for future in concurrent.futures.as_completed(futures):
source = htmls_dir.joinpath(futures[future]).with_suffix('.html')
save(source, future.result())
def main():
urls = [f"https://www.sogou.com/web?query={i}" for i in range(30)]
start_time = time.perf_counter()
download_all(urls)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f'Download {len(urls)} urls in {elapsed_time} seconds')
if __name__ == "__main__":
main()
參考文檔
- asyncio --- 異步 I/O 官方文檔
- 進(jìn)程與線程的一個簡單解釋,阮一峰
- 一文看懂Python多進(jìn)程與多線程編程
- 使用Python的concurrent.futures輕松實現(xiàn)并發(fā)編程
- 使用concurrent.futures的一些經(jīng)驗
- Python并發(fā)編程之線程池/進(jìn)程池--concurrent.futures模塊
- 官方文檔
- [Book] Python Cook
【To Be Continued...】