concurrent.futures模塊提供了一個高層面的接口來實現(xiàn)異步調(diào)用。
concurrent,futures提供了兩種方式來執(zhí)行異步調(diào)用躏救,一種是借助線程,使用ThreadPoolExecutor對象;另一種是借助進程化焕,使用ProcessPoolExecutor對象烛亦,這兩種對象實現(xiàn)了相同的接口,這些接口定義在他們的父類Executor中唧垦。
Executor對象
這是一個抽象類捅儒,提供了異步執(zhí)行的方法。這個類不應(yīng)該被直接使用振亮,應(yīng)該使用上面提到的兩個具體子類巧还。
方法介紹
-
submit(fn, *args, **kwargs)
執(zhí)行fn(*args, **kwargs)并返回一個Future對象,F(xiàn)uture對象在下面會介紹到坊秸。
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
-
map(func, *iterables, timeout=None, chunksize=1)
與內(nèi)置的map函數(shù)用法相似麸祷,除了下面兩個區(qū)別:
- iterables不是懶加載的
- func是異步執(zhí)行的,多個func可以并發(fā)執(zhí)行
timeout指定一個時間褒搔,如果next()被調(diào)用了且在timeout時間內(nèi)沒有得到結(jié)果則引發(fā)concurrent.futures.TimeoutError阶牍。如果為None,則不限制時間。
當使用ProcessPoolExecutor時星瘾,設(shè)置chunksize的值可以將iterables分塊走孽,并一次性發(fā)給進程池中的對象,對于很長的迭代對象琳状,使用一個大的chunksize可以提高效率融求。但是對于ThreadPoolExecutor對象,chunksize沒有任何作用算撮。
我的理解是因為進程之間占用了不同的內(nèi)存空間生宛,所以不同的進程在執(zhí)行時需要先從調(diào)用Executor的進程復(fù)制所需的參數(shù)县昂,當chunksize為1時,每執(zhí)行一次就需要通信一次陷舅,顯然非常浪費時間倒彰,所以設(shè)置一個大點的chunksize可以一次性獲取多次執(zhí)行所需的參數(shù),減少通信的次數(shù)莱睁。而線程因為本來就在同一個內(nèi)存空間中待讳,不存在這個問題,因此沒有影響仰剿。 -
shutdown(wait=True)
釋放資源的创淡,通過給每個thread或process執(zhí)行join()方法實現(xiàn)。
通過使用with語句可以避免使用這個方法南吮。
ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
ThreadPoolExecutor是Executor的子類琳彩,通過使用線程池來實現(xiàn)異步調(diào)用。
當一個Future關(guān)聯(lián)的可調(diào)用對象等待另一個Future的結(jié)果時部凑,會發(fā)生死鎖露乏,官方例子如下:
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
還有一個:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
ProcessPoolExecutor(max_workers=None, thread_name_prefix='')
ProcessPoolExecutor同樣也是Executor的子類,通過進程池來實現(xiàn)異步調(diào)用涂邀,且不受全局解釋器鎖的限制瘟仿。
__main__
模塊必須被工作子進程導(dǎo)入,這意味這ProcessPoolExecutor不會在交互式解釋器中起作用比勉。
同樣劳较,與線程中提到的一樣,不當?shù)氖褂脮鹚梨i浩聋,具體例子同上观蜗。
ProcessPoolExecutor例子
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
可以看出map函數(shù)返回的直接就是結(jié)果,與submit有些區(qū)別赡勘。
Future 對象
Future類封裝了可調(diào)用對象的異步執(zhí)行嫂便,F(xiàn)uture實例由Executor.submit()創(chuàng)建捞镰,不應(yīng)該被直接創(chuàng)建闸与。
方法介紹
-
cancel()
嘗試取消調(diào)用,如果調(diào)用正在執(zhí)行且無法被取消則返回False岸售,否則調(diào)用會被取消并返回True践樱。
-
cancelled()
調(diào)用成功被取消時返回True。
-
running()
如果調(diào)用正在被執(zhí)行且不能被取消則返回True凸丸。
-
done()
如果調(diào)用成功被取消或者執(zhí)行完成則返回True拷邢。
-
result(timeout=None)
返回調(diào)用的返回值。
-
add_done_callback(fn)
這個方法用來添加回調(diào)函數(shù)屎慢,調(diào)用這個方法的future對象會把自己作為唯一參數(shù)傳給函數(shù)fn瞭稼,無論future是執(zhí)行完成還是被取消忽洛,都會調(diào)用fn。
模塊方法
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
返回兩個二元組集合环肘,第一個集合名為done
欲虚,包含已經(jīng)完成的futures;第二個集合名為not_done
,包含沒有完成的futures悔雹。return_when參數(shù)指明了這個方法什么時候返回复哆,一共有三種參數(shù),默認為ALL_COMPLETED.
1. FIRST_COMPLETED:當任何future完成或被取消時返回
2. FIRST_EXCEPTION:當任何future引起一個異常時返回腌零,如果沒有異常梯找,等效于ALL_COMPLETED
3. ALL_COMPLETED:當所有futures完成或被取消時返回
concurrent.futures.as_completed(fs, timeout=None)
返回由fs給出的Future實例迭代器,只有當future完成時才會返回益涧,返回的順序與完成的順序相同锈锤,最先完成的最先返回,如果fs中包含兩個同樣的對象饰躲,只會返回一次牙咏。
官網(wǎng)例子如下:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
future_to_url是一個字典,一個可迭代對象嘹裂。以submit()返回的future作為鍵妄壶,url作為值。執(zhí)行以后可以發(fā)現(xiàn)輸出的結(jié)果的順序與傳入的url順序不一定是相等的寄狼。