在前面的博客中介紹了線程的用法稚叹,每次使用都要創(chuàng)建線程帐我,啟動線程符糊,有沒有什么辦法簡單操作呢。
python3.2引入的concurrent.future模塊中有ThreadPoolExecutor和ProcessPoolExecutor兩個類甘改,這兩個類內(nèi)部維護著線程/進程池旅东,以及要執(zhí)行的任務(wù)隊列,使得操作變得非常簡單十艾,不需要關(guān)心任何實現(xiàn)細節(jié)
來看一個簡單的例子
#!/usr/bin/env python3.6
from concurrent.futures import ThreadPoolExecutor
import requests
import os
DEST_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "download")
BASE_URL = "http://flupy.org/data/flags"
CC_LIST = ("CN", "US", "JP", "EG")
if not os.path.exists(DEST_DIR):
os.mkdir(DEST_DIR)
def get_img(cc):
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
response = requests.get(url)
return response.content
def save_img(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as f:
f.write(img)
def download_one(cc):
img = get_img(cc)
save_img(img, cc.lower() + ".gif")
return cc
def download_many(cc_list):
works = len(cc_list)
with ThreadPoolExecutor(works) as exector: # 使用with來管理ThreadPoolExecutor
# map方法和內(nèi)置的map方法類似抵代,不過exector的map方法會并發(fā)調(diào)用,返回一個由返回的值構(gòu)成的生成器
response = exector.map(download_one, cc_list)
return len(list(response))
if __name__ == "__main__":
download_many(CC_LIST)
Future
concurrent.futures和asyncio中的Future類的作用相同忘嫉,****都表示可能己經(jīng)完成或尚未完成的延遲計算****
Future封裝待完成的操作荤牍,可以放入隊列案腺,完成的狀態(tài)可以查詢,得到結(jié)果后可以獲取結(jié)果
使用exector.submit()
方法提交執(zhí)行的函數(shù)并獲取一個Future康吵,而不是直接創(chuàng)建劈榨,傳入的參數(shù)是一個可調(diào)用的對象;獲取的Future對象有一個done()
方法,判斷該Future是否己完成晦嵌, add_one_callback()
設(shè)置回調(diào)函數(shù), result()
來獲取Future的結(jié)果同辣。as_completed()
傳一個Future列表,在Future都完成之后返回一個迭代器
使用submit()方法試試看
def download_many(cc_list):
with ThreadPoolExecutor(max_workers=5) as exector:
future_list = []
for cc in cc_list:
# 使用submit提交執(zhí)行的函數(shù)到線程池中耍铜,并返回futer對象(非阻塞)
future = exector.submit(download_one, cc)
future_list.append(future)
print(cc, future)
result = []
# as_completed方法傳入一個Future迭代器邑闺,然后在Future對象運行結(jié)束之后yield Future
for future in futures.as_completed(future_list):
# 通過result()方法獲取結(jié)果
res = future.result()
print(res, future)
result.append(res)
return len(result)
>>>
CN <Future at 0x7f80d32f5400 state=running>
US <Future at 0x7f80d330c320 state=running>
JP <Future at 0x7f80d330c8d0 state=running>
EG <Future at 0x7f80d330ce10 state=running>
JP <Future at 0x7f80d330c8d0 state=finished returned str>
CN <Future at 0x7f80d32f5400 state=finished returned str>
EG <Future at 0x7f80d330ce10 state=finished returned str>
US <Future at 0x7f80d330c320 state=finished returned str>
ProcessPoolExecutor的使用方法是一樣的跌前,唯一需要注意的區(qū)別是傳入的max_workers這個參數(shù)對于ProcessPoolExecutor是可選的棕兼,在不使用的情況下默認值是os.cpu_count()的返回值(cpu的數(shù)量)
exector.submit()和futures.as_completed()這個組合比exector.map()更靈活,submit()可以處理不同的調(diào)用函數(shù)和參數(shù)抵乓,而map只能處理同一個可調(diào)用對象伴挚。
wait()阻塞主線程,直到所有task都完成灾炭。