Python 通過ThreadPoolExecutor 線程池執(zhí)行文件

參考文章:
https://docs.python.org/zh-cn/3.7/library/concurrent.futures.html
https://juejin.im/post/5cf913cfe51d45105d63a4d0

前言

從Python3.2開始,標準庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。
相比 threading 等模塊,該模塊通過 submit 返回的是一個 future 對象嘀趟,它是一個未來可期的對象,通過它可以獲悉線程的狀態(tài)主線程(或進程)中可以獲取某一個線程(進程)執(zhí)行的狀態(tài)或者某一個任務執(zhí)行的狀態(tài)及返回值:

  • 1.主線程可以獲取某一個線程(或者任務的)的狀態(tài),以及返回值阔逼。
  • 2.當一個線程完成的時候,主線程能夠立即知道地沮。
  • 3.讓多線程和多進程的編碼接口一致嗜浮。

線程池的基本使用

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 創(chuàng)建一個最大容納數(shù)量為5的線程池
    task1 = t.submit(spider, 1)
    task2 = t.submit(spider, 2)  # 通過submit提交執(zhí)行的函數(shù)到線程池中
    task3 = t.submit(spider, 3)

    print(f"task1: {task1.done()}")  # 通過done來判斷線程是否完成
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  # 通過result來獲取返回值

執(zhí)行結(jié)果如下:

task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished

    1. 使用 with 語句 ,通過 ThreadPoolExecutor 構(gòu)造實例摩疑,同時傳入 max_workers 參數(shù)來設置線程池中最多能同時運行的線程數(shù)目危融。
    1. 使用 submit 函數(shù)來提交線程需要執(zhí)行的任務到線程池中,并返回該任務的句柄(類似于文件雷袋、畫圖)专挪,注意 submit() 不是阻塞的,而是立即返回。
    1. 通過使用 done() 方法判斷該任務是否結(jié)束寨腔。上面的例子可以看出速侈,提交任務后立即判斷任務狀態(tài),顯示四個任務都未完成迫卢。在延時2.5后倚搬,task1 和 task2 執(zhí)行完畢,task3 仍在執(zhí)行中乾蛤。
    1. 使用 result() 方法可以獲取任務的返回值每界。

主要方法:

wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait 接受三個參數(shù):
  • fs: 表示需要執(zhí)行的序列
  • timeout: 等待的最大時間,如果超過這個時間即使線程未執(zhí)行完成也將返回
  • return_when:表示wait返回結(jié)果的條件家卖,默認為 ALL_COMPLETED 全部執(zhí)行完成再返回
    還是用上面那個例子來熟悉用法 示例:
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t: 
    all_task = [t.submit(spider, page) for page in range(1, 5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('finished')
    print(wait(all_task, timeout=2.5))

# 運行結(jié)果
crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})
crawl task4 finished

  • 1.代碼中返回的條件是:當完成第一個任務的時候眨层,就停止等待,繼續(xù)主線程任務
  • 2.由于設置了延時上荡, 可以看到最后只有 task4 還在運行中

as_completed

上面雖然提供了判斷任務是否結(jié)束的方法趴樱,但是不能在主線程中一直判斷啊。最好的方法是當某個任務結(jié)束了酪捡,就給主線程返回結(jié)果叁征,而不是一直判斷每個任務是否結(jié)束。
ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法逛薇,當子線程中的任務執(zhí)行完后捺疼,直接用 result() 獲取返回結(jié)果
用法如下:

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

def main():
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for page in range(1, 5):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(f"main: {data}")

# 執(zhí)行結(jié)果
crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4

as_completed() 方法是一個生成器,在沒有任務完成的時候永罚,會一直阻塞啤呼,除非設置了 timeout。
當有某個任務完成的時候呢袱,會yield這個任務媳友,就能執(zhí)行 for 循環(huán)下面的語句,然后繼續(xù)阻塞住产捞,循環(huán)到所有的任務結(jié)束醇锚。同時,先完成的任務會先返回給主線程坯临。

map

map(fn, *iterables, timeout=None)

  • 1.fn: 第一個參數(shù) fn 是需要線程執(zhí)行的函數(shù)焊唬;
  • 2.iterables:第二個參數(shù)接受一個可迭代對象;
  • 3.timeout: 第三個參數(shù) timeout 跟 wait() 的 timeout 一樣看靠,但由于 map 是返回線程執(zhí)行的結(jié)果赶促,如果 timeout小于線程執(zhí)行時間會拋異常 TimeoutError。
    用法如下:
import time
from concurrent.futures import ThreadPoolExecutor

def spider(page):
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 3, 1, 4]):
    print("task{}:{}".format(i, result))
    i += 1

#  運行結(jié)果
task1:2
task2:3
task3:1
task4:4

使用 map 方法挟炬,無需提前使用 submit 方法鸥滨,map 方法與 python 高階函數(shù) map 的含義相同嗦哆,都是將序列中的每個元素都執(zhí)行同一個函數(shù)。
上面的代碼對列表中的每個元素都執(zhí)行 spider() 函數(shù)婿滓,并分配各線程池老速。
可以看到執(zhí)行結(jié)果與上面的 as_completed() 方法的結(jié)果不同,輸出順序和列表的順序相同凸主,就算 1s 的任務先執(zhí)行完成橘券,也會先打印前面提交的任務返回的結(jié)果。

? 實戰(zhàn)

以某網(wǎng)站為例卿吐,演示線程池和單線程兩種方式爬取的差異

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
    "Host": "splcgk.court.gov.cn",
    "Origin": "https://splcgk.court.gov.cn",
    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
    "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):
    data = {
        "bt": "",
        "fydw": "",
        "pageNum": page,
    }
    for _ in range(5):
        try:
            response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
            json_data = response.json()
        except (json.JSONDecodeError, adapters.SSLError):
            continue
        else:
            break
    else:
        return {}

    return json_data

def main():
    with ThreadPoolExecutor(max_workers=8) as t:
        obj_list = []
        begin = time.time()
        for page in range(1, 15):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
            print('*' * 50)
        times = time.time() - begin
        print(times)

if __name__ == "__main__":
    main()

運行結(jié)果如下:

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末旁舰,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子嗡官,更是在濱河造成了極大的恐慌箭窜,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件衍腥,死亡現(xiàn)場離奇詭異磺樱,居然都是意外死亡,警方通過查閱死者的電腦和手機紧阔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門坊罢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來续担,“玉大人擅耽,你說我怎么就攤上這事∥镉觯” “怎么了乖仇?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長询兴。 經(jīng)常有香客問我乃沙,道長,這世上最難降的妖魔是什么诗舰? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任警儒,我火速辦了婚禮,結(jié)果婚禮上眶根,老公的妹妹穿的比我還像新娘蜀铲。我一直安慰自己,他們只是感情好属百,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布记劝。 她就那樣靜靜地躺著,像睡著了一般族扰。 火紅的嫁衣襯著肌膚如雪厌丑。 梳的紋絲不亂的頭發(fā)上定欧,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天,我揣著相機與錄音怒竿,去河邊找鬼砍鸠。 笑死,一個胖子當著我的面吹牛愧口,可吹牛的內(nèi)容都是我干的睦番。 我是一名探鬼主播,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼耍属,長吁一口氣:“原來是場噩夢啊……” “哼托嚣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起厚骗,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤示启,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后领舰,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體夫嗓,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年冲秽,在試婚紗的時候發(fā)現(xiàn)自己被綠了舍咖。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡锉桑,死狀恐怖排霉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情民轴,我是刑警寧澤攻柠,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站后裸,受9級特大地震影響瑰钮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜微驶,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一浪谴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧因苹,春花似錦苟耻、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蘸秘,卻和暖如春官卡,著一層夾襖步出監(jiān)牢的瞬間蝗茁,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工寻咒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留哮翘,地道東北人。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓毛秘,卻偏偏與公主長得像饭寺,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子叫挟,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容