python線程池 ThreadPoolExecutor 的用法及實(shí)戰(zhàn)

公眾號(hào)【Python編程與實(shí)戰(zhàn)】,歡迎關(guān)注贺拣!

? 前言

從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了 concurrent.futures 模塊捂蕴,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進(jìn)程池)兩個(gè)類譬涡。

相比 threading 等模塊,該模塊通過 submit 返回的是一個(gè) future 對(duì)象啥辨,它是一個(gè)未來可期的對(duì)象涡匀,通過它可以獲悉線程的狀態(tài)主線程(或進(jìn)程)中可以獲取某一個(gè)線程(進(jìn)程)執(zhí)行的狀態(tài)或者某一個(gè)任務(wù)執(zhí)行的狀態(tài)及返回值:

  1. 主線程可以獲取某一個(gè)線程(或者任務(wù)的)的狀態(tài),以及返回值溉知。
  2. 當(dāng)一個(gè)線程完成的時(shí)候陨瘩,主線程能夠立即知道。
  3. 讓多線程和多進(jìn)程的編碼接口一致级乍。

? 線程池的基本使用

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 創(chuàng)建一個(gè)最大容納數(shù)量為5的線程池
    task1 = t.submit(spider, 1)
    task2 = t.submit(spider, 2)  # 通過submit提交執(zhí)行的函數(shù)到線程池中
    task3 = t.submit(spider, 3)

    print(f"task1: {task1.done()}")  # 通過done來判斷線程是否完成
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  # 通過result來獲取返回值
執(zhí)行結(jié)果如下:
task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished
  1. 使用 with 語句 舌劳,通過 ThreadPoolExecutor 構(gòu)造實(shí)例,同時(shí)傳入 max_workers 參數(shù)來設(shè)置線程池中最多能同時(shí)運(yùn)行的線程數(shù)目玫荣。

  2. 使用 submit 函數(shù)來提交線程需要執(zhí)行的任務(wù)到線程池中甚淡,并返回該任務(wù)的句柄(類似于文件、畫圖)捅厂,注意 submit() 不是阻塞的贯卦,而是立即返回资柔。

  3. 通過使用 done() 方法判斷該任務(wù)是否結(jié)束。上面的例子可以看出撵割,提交任務(wù)后立即判斷任務(wù)狀態(tài)贿堰,顯示四個(gè)任務(wù)都未完成。在延時(shí)2.5后啡彬,task1 和 task2 執(zhí)行完畢官边,task3 仍在執(zhí)行中。

  4. 使用 result() 方法可以獲取任務(wù)的返回值外遇。

? 主要方法:

wait

 wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait 接受三個(gè)參數(shù):
fs: 表示需要執(zhí)行的序列
timeout: 等待的最大時(shí)間注簿,如果超過這個(gè)時(shí)間即使線程未執(zhí)行完成也將返回
return_when:表示wait返回結(jié)果的條件,默認(rèn)為 ALL_COMPLETED 全部執(zhí)行完成再返回

還是用上面那個(gè)例子來熟悉用法
示例:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t: 
    all_task = [t.submit(spider, page) for page in range(1, 5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('finished')
    print(wait(all_task, timeout=2.5))

# 運(yùn)行結(jié)果
crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})
crawl task4 finished
  1. 代碼中返回的條件是:當(dāng)完成第一個(gè)任務(wù)的時(shí)候跳仿,就停止等待诡渴,繼續(xù)主線程任務(wù)
  2. 由于設(shè)置了延時(shí), 可以看到最后只有 task4 還在運(yùn)行中

as_completed

上面雖然提供了判斷任務(wù)是否結(jié)束的方法菲语,但是不能在主線程中一直判斷啊妄辩。最好的方法是當(dāng)某個(gè)任務(wù)結(jié)束了,就給主線程返回結(jié)果山上,而不是一直判斷每個(gè)任務(wù)是否結(jié)束眼耀。

ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個(gè)方法,當(dāng)子線程中的任務(wù)執(zhí)行完后佩憾,直接用 result() 獲取返回結(jié)果

用法如下:

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

def main():
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for page in range(1, 5):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(f"main: {data}")

# 執(zhí)行結(jié)果
crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4

as_completed() 方法是一個(gè)生成器哮伟,在沒有任務(wù)完成的時(shí)候,會(huì)一直阻塞妄帘,除非設(shè)置了 timeout楞黄。

當(dāng)有某個(gè)任務(wù)完成的時(shí)候,會(huì) yield 這個(gè)任務(wù)抡驼,就能執(zhí)行 for 循環(huán)下面的語句鬼廓,然后繼續(xù)阻塞住,循環(huán)到所有的任務(wù)結(jié)束致盟。同時(shí)碎税,先完成的任務(wù)會(huì)先返回給主線程。

map

map(fn, *iterables, timeout=None)

fn: 第一個(gè)參數(shù) fn 是需要線程執(zhí)行的函數(shù)馏锡;
iterables:第二個(gè)參數(shù)接受一個(gè)可迭代對(duì)象雷蹂;
timeout: 第三個(gè)參數(shù) timeout 跟 wait() 的 timeout 一樣,但由于 map 是返回線程執(zhí)行的結(jié)果眷篇,如果 timeout小于線程執(zhí)行時(shí)間會(huì)拋異常 TimeoutError萎河。

用法如下:

import time
from concurrent.futures import ThreadPoolExecutor

def spider(page):
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 3, 1, 4]):
    print("task{}:{}".format(i, result))
    i += 1

#  運(yùn)行結(jié)果
task1:2
task2:3
task3:1
task4:4

使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數(shù) map 的含義相同虐杯,都是將序列中的每個(gè)元素都執(zhí)行同一個(gè)函數(shù)玛歌。

上面的代碼對(duì)列表中的每個(gè)元素都執(zhí)行 spider() 函數(shù),并分配各線程池擎椰。

可以看到執(zhí)行結(jié)果與上面的 as_completed() 方法的結(jié)果不同支子,輸出順序和列表的順序相同,就算 1s 的任務(wù)先執(zhí)行完成达舒,也會(huì)先打印前面提交的任務(wù)返回的結(jié)果值朋。

? 實(shí)戰(zhàn)

以某網(wǎng)站為例,演示線程池和單線程兩種方式爬取的差異

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
    "Host": "splcgk.court.gov.cn",
    "Origin": "https://splcgk.court.gov.cn",
    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
    "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):
    data = {
        "bt": "",
        "fydw": "",
        "pageNum": page,
    }
    for _ in range(5):
        try:
            response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
            json_data = response.json()
        except (json.JSONDecodeError, adapters.SSLError):
            continue
        else:
            break
    else:
        return {}

    return json_data

def main():
    with ThreadPoolExecutor(max_workers=8) as t:
        obj_list = []
        begin = time.time()
        for page in range(1, 15):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
            print('*' * 50)
        times = time.time() - begin
        print(times)

if __name__ == "__main__":
    main()

運(yùn)行結(jié)果如下:

多線程

可以看到巩搏,14 頁只花了 2 秒鐘就爬完了

下面我們可以使用單線程來爬取昨登,代碼基本和上面的一樣,加個(gè)單線程函數(shù)
代碼如下:

def single():
    begin = time.time()
    for page in range(1, 15):
        data = spider(page)
        print(data)
        print('*' * 50)

    times = time.time() - begin
    print(times)


if __name__ == "__main__":
    single()

運(yùn)行結(jié)果:

單線程

可以看到贯底,總共花了 19 秒丰辣。真是肉眼可見的差距啊禽捆!如果數(shù)據(jù)量大的話笙什,運(yùn)行時(shí)間差距會(huì)更大!

微信圖片_20191202100930.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末胚想,一起剝皮案震驚了整個(gè)濱河市琐凭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌浊服,老刑警劉巖统屈,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異臼闻,居然都是意外死亡鸿吆,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門述呐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蕉毯,你說我怎么就攤上這事乓搬。” “怎么了代虾?”我有些...
    開封第一講書人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵进肯,是天一觀的道長。 經(jīng)常有香客問我棉磨,道長江掩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮环形,結(jié)果婚禮上策泣,老公的妹妹穿的比我還像新娘。我一直安慰自己抬吟,他們只是感情好萨咕,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著火本,像睡著了一般危队。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上钙畔,一...
    開封第一講書人閱讀 51,274評(píng)論 1 300
  • 那天茫陆,我揣著相機(jī)與錄音,去河邊找鬼擎析。 笑死盅弛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的叔锐。 我是一名探鬼主播挪鹏,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼愉烙!你這毒婦竟也來了讨盒?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤步责,失蹤者是張志新(化名)和其女友劉穎返顺,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蔓肯,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡遂鹊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蔗包。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片秉扑。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖调限,靈堂內(nèi)的尸體忽然破棺而出舟陆,到底是詐尸還是另有隱情,我是刑警寧澤耻矮,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布秦躯,位于F島的核電站,受9級(jí)特大地震影響裆装,放射性物質(zhì)發(fā)生泄漏踱承。R本人自食惡果不足惜倡缠,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望茎活。 院中可真熱鬧昙沦,春花似錦、人聲如沸妙色。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽身辨。三九已至丐谋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間煌珊,已是汗流浹背号俐。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留定庵,地道東北人吏饿。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像蔬浙,于是被迫代替她去往敵國和親猪落。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 串行:同一個(gè)時(shí)間段只干一件事 并行:同一個(gè)時(shí)間段可以干多件事 并發(fā) V.S. 并行并發(fā)是指一個(gè)時(shí)間段內(nèi)官疲,有幾個(gè)程序...
    蘇慕漓閱讀 4,878評(píng)論 0 5
  • 初識(shí) Python中已經(jīng)有了threading模塊,為什么還需要線程池呢亮隙,線程池又是什么東西呢途凫?在介紹線程同步的信...
    StormZhu閱讀 199,263評(píng)論 14 96
  • 線程 操作系統(tǒng)線程理論 線程概念的引入背景 進(jìn)程 之前我們已經(jīng)了解了操作系統(tǒng)中進(jìn)程的概念维费,程序并不能單獨(dú)運(yùn)行,只有...
    go以恒閱讀 1,641評(píng)論 0 6
  • 引言 Python標(biāo)準(zhǔn)庫為我們提供了threading和multiprocessing模塊編寫相應(yīng)的多線程/多進(jìn)程...
    派派森森閱讀 861評(píng)論 0 0
  • ??一個(gè)任務(wù)通常就是一個(gè)程序煤裙,每個(gè)運(yùn)行中的程序就是一個(gè)進(jìn)程掩完。當(dāng)一個(gè)程序運(yùn)行時(shí),內(nèi)部可能包含了多個(gè)順序執(zhí)行流硼砰,每個(gè)順...
    OmaiMoon閱讀 1,671評(píng)論 0 12