Python多線程并發(fā)編程 -- concurrent.futures

Python version 3.8.5

在Python中讽营,并發(fā)并不是同一時刻有多個操作(thread/task)同時進(jìn)行移宅。相反归粉,由于全局解釋器鎖(GIL) 的存在,在某個特定的時刻漏峰,它只允許有一個操作發(fā)生糠悼,只不過線程或任務(wù)之間會互相切換,直到完成浅乔,如下圖所示:

上圖中出現(xiàn)了線程(thread) 和任務(wù)(task)兩種切換順序的不同方式倔喂,分別對應(yīng)Python中的兩種實現(xiàn)并發(fā)的方法:threading 和 asyncio. 對于 threading,操作系統(tǒng)知道每個線程的所有信息靖苇,因此它會做主在適當(dāng)?shù)臅r候做線程切換席噩。而對于 asyncio,主程序想要切換任務(wù)時贤壁,必須得到此任務(wù)可以被切換的通知班挖。

本文內(nèi)容只涉及基于concurrent.futures的多線程并發(fā),不涉及asyncio芯砸。

選擇多線程還是多進(jìn)程?

核心概念:

  • 進(jìn)程是操作系統(tǒng)分配資源的最小單元给梅,線程是操作系統(tǒng)調(diào)度的最小單元假丧;
  • 一個應(yīng)用程序至少包括一個進(jìn)程,一個進(jìn)程至少包括一個線程动羽;
  • 每個進(jìn)程在執(zhí)行過程中擁有獨立的內(nèi)存單元包帚,而一個進(jìn)程的多個線程在執(zhí)行過程中共享內(nèi)存;

如果手頭的任務(wù)是I/O密集型运吓,可以使用標(biāo)準(zhǔn)庫的 threading 模塊渴邦,或者任務(wù)是CPU密集型,則可以使用 multiprocessing 模塊拘哨。這兩個模塊提供了很多控制權(quán)和靈活性谋梭,但代價就是必須編寫相對低級的冗長代碼,在任務(wù)核心邏輯的基礎(chǔ)上增加額外的并具有復(fù)雜性的層倦青,并且當(dāng)項目達(dá)到一定的規(guī)模瓮床,頻繁創(chuàng)建/銷毀進(jìn)程或者線程是非常消耗資源的,這個時候往往需要編寫自己的線程池/進(jìn)程池,以空間換時間隘庄。

從Python 3.2開始踢步,標(biāo)準(zhǔn)庫提供了 concurrent.futures 模塊,它在 threading 和 multiprocessing 之上的一個通用抽象層丑掺,提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類获印,以便使用線程池/進(jìn)程池并發(fā)/并行地執(zhí)行任務(wù)。

多線程并發(fā)編程之concurrent.futures

concurrent.futures 模塊對外提供了以下常量街州、函數(shù)或類:

__all__ = (
    'FIRST_COMPLETED',
    'FIRST_EXCEPTION',
    'ALL_COMPLETED',
    'CancelledError',
    'TimeoutError',
    'BrokenExecutor',
    'Future',
    'Executor',
    'wait',
    'as_completed',
    'ProcessPoolExecutor',
    'ThreadPoolExecutor',
)

Executor

Executor 是一個抽象類兼丰,不應(yīng)該直接使用此類,而是使用它提供的兩個子類:ThreadPoolExecutor 和 ProcessPoolExecutor菇肃,顧名思義兩者分別被用來創(chuàng)建線程池和進(jìn)程池的代碼地粪。

Executor的重要方法
ProcessPoolExecutor 和 ThreadPoolExecutor 類中最重要的幾個方法如下:

  • submit(fn,*args, **kwargs):提交異步任務(wù)(一般是阻塞的可調(diào)用函數(shù)),并返回表示可調(diào)用對象執(zhí)行的 Future 對象(原文:returns a Future object representing the execution of the callable)琐谤;此方法不保存與原始任務(wù)相關(guān)的任何上下文蟆技,如果想把結(jié)果和原始任務(wù)對應(yīng)起來,需要自己去追蹤它們斗忌,比如使用字典推導(dǎo)式质礼;

  • map(fn, *iterables, timeout=None, chunksize=1):和標(biāo)準(zhǔn)的map函數(shù)功能類似,同樣返回一個迭代器织阳,只不過是以異步的方式把函數(shù)依次作用在可迭代對象的每個元素上江锨,注意:如果函數(shù)調(diào)用引發(fā)異常,當(dāng)從迭代器檢索其值時將引發(fā)異常枕磁,并且不會繼續(xù)執(zhí)行篡腌;

  • shutdown(wait=True):通知執(zhí)行器,當(dāng)所有掛起的 Future 對象執(zhí)行完成時弄痹,釋放正在使用的任何資源饭入;在抽象基類中實現(xiàn)了上下文管理器協(xié)議,__exit__方法中調(diào)用了shutdown方法肛真;

future

Futute 類封裝可調(diào)用對象的異步執(zhí)行谐丢,應(yīng)由Executor.submit() 方法創(chuàng)建,可以理解為一個在未來完成的操作蚓让。比如說在寫爬蟲代碼時會用到 requests.get 乾忱,在等待服務(wù)器返回結(jié)果之前的這段時間會產(chǎn)生I/O阻塞,CPU不能讓出來做其他的事情历极,F(xiàn)uture的引入就是幫助我們在等待的這段時間可以完成其他的操作窄瘟。

Futute 類中最重要的幾個方法如下:

  • result(timeout=None):返回可調(diào)用對象的實際返回值;
  • cancel():嘗試取消future趟卸,如果它正在運行或已經(jīng)完成寞肖,則不能取消纲酗,返回False,若取消成功則返回True新蟆;
  • cancelled():如果future已被取消觅赊,則返回True;
  • running():如果future當(dāng)前正在運行,則返回True琼稻;
  • done():如果future已被取消或執(zhí)行完成吮螺,則返回True;
  • add_done_callback(fn):future執(zhí)行完成后,添加回調(diào)帕翻;
  • exception(timeout=None):返回future執(zhí)行時所引發(fā)的異常鸠补;

wait 和 as_completed

模塊下有2個重要函數(shù) waitas_completed

  • wait(fs, timeout=None, return_when=ALL_COMPLETED)

    遍歷fs提供的future(可能由不同的Executor實例創(chuàng)建)嘀掸,并等待執(zhí)行完成(包含已取消)紫岩,如果未設(shè)置timeout參數(shù),則代表不限制等待時間睬塌,return_when參數(shù)則用于設(shè)置次函數(shù)應(yīng)該在何時返回泉蝌,支持的選項如下:

Constant Description
FIRST_COMPLETED 在當(dāng)有任何future執(zhí)行完成(包括已取消)時返回結(jié)果
FIRST_EXCEPTION 當(dāng)有任何future執(zhí)行引發(fā)異常時返回結(jié)果,若沒有任何future引發(fā)異常則等同于ALL_COMPLETED
ALL_COMPLETED 當(dāng)所有future執(zhí)行完成(包括已取消)時返回結(jié)果

該函數(shù)返回一個包含兩個元素的namedtuple揩晴,定義如下:

DoneAndNotDoneFutures = collections.namedtuple('DoneAndNotDoneFutures', 'done not_done')

  • as_completed(fs, timeout=None)

返回一個迭代器勋陪,遍歷fs給出的 future 實例(可能由不同的執(zhí)行器實例創(chuàng)建),在它們執(zhí)行完成(包含已取消)時 yield future硫兰。

經(jīng)驗技巧

正確的使用submit和map

submit 方法返回的是 Future 對象诅愚, map方法則返回迭代器,如果沒有調(diào)用future對象的result方法劫映,即使執(zhí)行過程中有異常用戶也是不知道的违孝,如下所示:

f = lambda x: 100 // x

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(f, 0)
    future2 = executor.submit(f, 10)
    future3 = executor.submit(f, 20)
print(future1, future2, future3, sep='\n')

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = executor.map(f, [0, 10, 20])
print(futures)

>>>
<Future at 0x3d69fb8 state=finished raised ZeroDivisionError>
<Future at 0x3d85460 state=finished returned int>
<Future at 0x3d856d0 state=finished returned int>
<generator object Executor.map.<locals>.result_iterator at 0x03D59A00>

所以通常需要調(diào)用其 result 方法并且捕捉異常:

f = lambda x: 100 // x

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    future1 = executor.submit(f, 0)
    future2 = executor.submit(f, 0)
    future3 = executor.submit(f, 20)

todos = [future1, future2, future3]
for future in concurrent.futures.as_completed(todos):
    try:
        print(future.result())
    except ZeroDivisionError as e:
        print(e.__repr__())

>>>
ZeroDivisionError('integer division or modulo by zero')
5
ZeroDivisionError('integer division or modulo by zero')

相對于submit,map方法的結(jié)果就比較難獲取了泳赋,這是因為map方法以異步的方式把函數(shù)依次作用在可迭代對象的每個元素上雌桑,如果在函數(shù)調(diào)用時引發(fā)了一些異常,當(dāng)從迭代器檢索其值時就將引發(fā)異常摹蘑,因此需要使用下面的方法:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
    futures = executor.map(f, [0, 10, 20])

while True:
    try:
        future = futures.__next__()
    except StopIteration:
        break
    except ZeroDivisionError as e:
        print(e.__repr__())
 
>>>
ZeroDivisionError('integer division or modulo by zero')

可以看到,當(dāng)?shù)谝淮五e誤發(fā)生后生成器迭代就結(jié)束了轧飞,所以一批任務(wù)中可能會出現(xiàn)異常時是不合適用 map 方法的衅鹿,最好的方式還是使用 submit+as_completed. 在一些較為簡單的場景下,如果不需要關(guān)心任務(wù)的返回值过咬,則可以考慮使用map方法大渤。

尋找合適的max_worker

使用ThreadPoolExecutor,雖然線程的數(shù)量可以自定義掸绞,但并不是越多越好泵三,因為線程的創(chuàng)建耕捞、維護(hù)和刪除也會有一定的開銷。所以如果設(shè)置的很大烫幕,反而可能會導(dǎo)致速度變慢俺抽,比如下面的例子,把線程數(shù)從5改為10较曼,運行程序會發(fā)現(xiàn)耗時反而增多了磷斧。所以在實際開發(fā)過程中,往往需要根據(jù)實際的需要去做一些測試捷犹,在任務(wù)不影響到全局的情況下弛饭,尋找最優(yōu)的線程數(shù)量。

...
def download_all(urls: list):
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        executor.map(download_one, urls)
...
if __name__ == "__main__":
    main()
    
>>>
...
Download 30 urls in 0.9526623 seconds

max_workers有默認(rèn)值萍歉,等于:min(32, (os.cpu_count() or 1) + 4)

避免死鎖

使用ThreadPoolExecutor時可能出現(xiàn)的死鎖情況侣颂,當(dāng)與Future 關(guān)聯(lián)的可調(diào)用函數(shù)等待另一個Future的結(jié)果時,它們可能永遠(yuǎn)不會釋放對線程的控制并導(dǎo)致死鎖枪孩,官網(wǎng)的示例如下:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

在上面的例子中憔晒,函數(shù)wait_on_b依賴于函數(shù)wait_on_a的結(jié)果(Future對象的結(jié)果),同時后一個函數(shù)的結(jié)果依賴于前一個函數(shù)的結(jié)果销凑。因此丛晌,上下文管理器中的代碼塊永遠(yuǎn)不會執(zhí)行,因為它具有相互依賴性斗幼,這就造成了死鎖澎蛛。

簡單使用場景

  1. 使用多線程從url下載和保存文件
import os
import concurrent.futures
from pathlib import Path

import requests

htmls_dir = Path(__file__).parent.joinpath('htmls')
if not htmls_dir.exists():
    os.makedirs(htmls_dir)
else:
    for html in htmls_dir.glob("*.html"):
        os.remove(html)


def download_one(url):
    resp = requests.get(url)
    resp.encoding = 'utf-8'
    return resp.text


def save(source, html, chunk=8 * 1024):
    with open(source, 'w', encoding="utf-8") as fp:
        for text in (html[i:chunk + i] for i in range(0, len(html), chunk)):
            fp.write(text)


def download_all(urls):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(download_one, url): url[-7:] for url in urls}

    for future in concurrent.futures.as_completed(futures):
        source = htmls_dir.joinpath(futures[future]).with_suffix('.html')
        save(source, future.result())


def main():
    urls = [f"https://www.sogou.com/web?query={i}" for i in range(30)]
    start_time = time.perf_counter()
    download_all(urls)
    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f'Download {len(urls)} urls in {elapsed_time} seconds')


if __name__ == "__main__":
    main()

參考文檔

  1. asyncio --- 異步 I/O 官方文檔
  2. 進(jìn)程與線程的一個簡單解釋,阮一峰
  3. 一文看懂Python多進(jìn)程與多線程編程
  4. 使用Python的concurrent.futures輕松實現(xiàn)并發(fā)編程
  5. 使用concurrent.futures的一些經(jīng)驗
  6. Python并發(fā)編程之線程池/進(jìn)程池--concurrent.futures模塊
  7. 官方文檔
  8. [Book] Python Cook

【To Be Continued...】

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蜕窿,一起剝皮案震驚了整個濱河市谋逻,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌桐经,老刑警劉巖毁兆,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異阴挣,居然都是意外死亡气堕,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進(jìn)店門畔咧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來茎芭,“玉大人,你說我怎么就攤上這事誓沸∶纷” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵拜隧,是天一觀的道長宿百。 經(jīng)常有香客問我趁仙,道長,這世上最難降的妖魔是什么垦页? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任雀费,我火速辦了婚禮,結(jié)果婚禮上外臂,老公的妹妹穿的比我還像新娘坐儿。我一直安慰自己,他們只是感情好宋光,可當(dāng)我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布貌矿。 她就那樣靜靜地躺著,像睡著了一般罪佳。 火紅的嫁衣襯著肌膚如雪逛漫。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天赘艳,我揣著相機(jī)與錄音酌毡,去河邊找鬼。 笑死蕾管,一個胖子當(dāng)著我的面吹牛枷踏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播掰曾,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼旭蠕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了旷坦?” 一聲冷哼從身側(cè)響起掏熬,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎秒梅,沒想到半個月后旗芬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡捆蜀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年疮丛,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辆它。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡誊薄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出娩井,到底是詐尸還是另有隱情暇屋,我是刑警寧澤似袁,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布洞辣,位于F島的核電站咐刨,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏扬霜。R本人自食惡果不足惜定鸟,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望著瓶。 院中可真熱鬧联予,春花似錦、人聲如沸材原。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽余蟹。三九已至卷胯,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間威酒,已是汗流浹背窑睁。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留葵孤,地道東北人担钮。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像尤仍,于是被迫代替她去往敵國和親箫津。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容