基礎
異步調用的原理,是在單個線程中通過切換任務(就像單線程的函數(shù)切換寺惫,負擔很小,性能很好)來達到并發(fā)的效果。相較于線程這種比較重的并發(fā)方式垒酬,異步調用可以大大提升I/O密集型任務的執(zhí)行效率,達到非常高的并發(fā)量件炉。python中異步調用的最小單位是協(xié)程,隨著異步編程在python3各個版本中不斷的迭代矮湘,使用異步編程變得越來越簡單斟冕,因此,我們有必要好好掌握python中異步編程相關的知識缅阳。
兩個關鍵字
async
:用來聲明一個協(xié)程函數(shù)(async def
)磕蛇,與普通函數(shù)不同的是景描,調用協(xié)程函數(shù)并不會返回其運行的結果,而是返回一個協(xié)程對象(coroutine)秀撇。協(xié)程對象需要注冊到事件循環(huán)(event loop)中才能被調度執(zhí)行到超棺。
await
:用來異步等待一個協(xié)程的返回,只能在協(xié)程函數(shù)里使用呵燕。await
時意味著將函數(shù)控制權交還給event loop棠绘。舉個例子,當我們在g()
函數(shù)內部遇到await f()
時再扭,就會將暫時掛起g()
的執(zhí)行直到 f()
返回氧苍,與此同時,將CPU的執(zhí)行權讓出給event loop中的其他函數(shù)繼續(xù)執(zhí)行泛范。
awaitable
:就像for
關鍵字用于iterable
對象让虐, await
關鍵字用于awaitable
對象。最常見的兩個awaitable
對象就是原生的協(xié)程對象以及使用asyncio.create_task()
方法創(chuàng)建的asyncio.Task
對象罢荡。值得注意的是赡突,你并不總是需要await
一個Task
如果你不需要取消或者等待協(xié)程運行的結果的話。
例子
以Fluent Python 2nd Edition
(神作区赵,忍不住再次安利)這本書的示例代碼為例:
import asyncio
import socket
from keyword import kwlist
MAX_KEYWORD_LEN = 4
async def probe(domain: str) -> tuple[str, bool]:
loop = asyncio.get_running_loop()
try:
await loop.getaddrinfo(domain, None)
except socket.gaierror:
return (domain, False)
return (domain, True)
async def main() -> None:
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
domains = (f'{name}.dev'.lower() for name in names)
coros = [probe(domain) for domain in domains]
for coro in asyncio.as_completed(coros):
domain, found = await coro
mark = '+' if found else ' '
print(f'{mark} {domain}')
if __name__ == '__main__':
asyncio.run(main())
作者是這么描述協(xié)程的工作流程的惭缰,注意加粗部分:
Using the syntax await loop.getaddrinfo(…)
avoids blocking because await suspends the current coroutine object—for example, probe('if.dev')
. A new coroutine object is created, getaddrinfo('if.dev', None)
, it starts the low-level addrinfo query and yields control back to the event loop, which can drive other pending coroutine objects, such as probe('or.dev')
. When the event loop gets a response for the getaddrinfo('if.dev', None)
query, that specific coroutine object resumes and returns control back to the probe('if.dev')
—which was suspended at await—and can now handle a possible exception and return the result tuple.
這里注意一下英文中
Suspend
和Pending
的差異:Suspend:一個事情已經開始了, 不過現(xiàn)在要停了(可能是暫時地)。 Classes have been suspended for the holidays.
Pending:一個事情還沒開始, 因為還在等其他東西惧笛。 This project is still pending for approval.
常用對象
Future:
Future
對象是用來模仿concurrent.futures
包中的Future
對象的从媚,除了一小部分API有差異外,他們的API基本上兼容患整。Future
對象代表一個任務的結果拜效,注意這里的結果可以是未執(zhí)行的結果或者時一個執(zhí)行異常。源代碼中是這樣描述這個對象的:
class Future(object):
"""
This class is *almost* compatible with concurrent.futures.Future.
Differences:
- result() and exception() do not take a timeout argument and
raise an exception when the future isn't done yet.
- Callbacks registered with add_done_callback() are always called
via the event loop's call_soon_threadsafe().
- This class is not compatible with the wait() and as_completed()
methods in the concurrent.futures package.
"""
Task:
一個和Future對象類似的協(xié)程對象各谚,非線程安全紧憾,查看源代碼可以看到Task
是Future
的子類,因此 Future
對象不一定是一個Task
對象昌渤, 但Task
對象一定是個Future
對象赴穗。
class Task(Future):
""" A coroutine wrapped in a Future. """
Task對象在創(chuàng)建時就會注冊到事件循環(huán)中。
EventLoop:
管理和分配不同Task
的執(zhí)行膀息,Task
需要注冊到EventLoo
以后才能被調度執(zhí)行到般眉。你可以把它看成是某個監(jiān)控著協(xié)程空閑、可執(zhí)行等運行狀態(tài)潜支,并且能根據(jù)某個協(xié)程等待的事件變?yōu)榭蓤?zhí)行時喚醒這些空閑的協(xié)程的While True
的循環(huán)甸赃。Loop
是可插拔(替換)的,也就是說冗酿,你可以自己實現(xiàn)一個事件循環(huán)來代替的默認的事件循環(huán)埠对,比如Linux
系統(tǒng)上非常著名的uvloop
络断,使用下面代碼即可替換EventLoop
實現(xiàn):
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
uvloop是基于libuv庫(nodejs)使用Cython編寫的,擁有比自帶的event loop更高的性能项玛,遺憾的是你只能在*nix 和 python3.5+的環(huán)境中使用它貌笨。
常用方法
asyncio.run()
在python3.7中引入的新方法,會自動創(chuàng)建event loop并且執(zhí)行run_until_complete襟沮,同時在執(zhí)行結束時會自動關閉event loop锥惋,在引入該方法之前,你可能需要使用如下代碼來執(zhí)行一個簡單的協(xié)程:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
如果你需要對未完成的任務執(zhí)行cancel()
方法臣嚣,那么還需要在另外寫一些代碼來處理它們净刮。而asyncio.run()
方法將這些代碼范式進行了封裝,使得調用協(xié)程變得不需要再寫這些模板式的代碼了硅则。
asyncio.gather(*aws, loop=None, return_exceptions=False)
這個方法將協(xié)程(準確的說是awaitable對象淹父,因此也可以是future對象)集合統(tǒng)一放到一個future對象里面,并且將協(xié)程的結果統(tǒng)一在一個列表中返回怎虫。如果所有可等待對象都成功完成暑认,結果將是一個由所有返回值聚合而成的列表。結果值的順序與 aws
中可等待對象的順序一致大审。
asyncio.ensure_future
和 asyncio.create_task
asyncio.ensure_future
雖然名字里帶了future蘸际,但通常它返回的對象是一個Task
對象(除非傳入的obj
對象本身就是一個Future對象),這是一個有點反直覺且經常容易混淆的點徒扶,看下面的例子:
import asyncio
async def foo():
print("before foo await")
await asyncio.sleep(1)
print("after foo await")
return "foo"
async def bar():
print("before bar await")
await asyncio.sleep(1)
print("after bar await")
return "bar"
async def popo():
print("before popo await")
await asyncio.sleep(1)
print("after popo await")
return "popo"
async def set_after(fut, delay, value):
# Sleep for *delay* seconds.
await asyncio.sleep(delay)
# Set *value* as a result of *fut* Future.
fut.set_result(value)
async def main():
print("running main")
task1 = asyncio.create_task(foo())
task2 = asyncio.create_task(bar())
fut1 = asyncio.ensure_future(popo())
loop = asyncio.get_running_loop()
fut2 = loop.create_future()
loop.create_task(
set_after(fut2, 1, '... world'))
print(isinstance(task1, asyncio.Future))
print(isinstance(fut1, asyncio.Task))
print(isinstance(fut2, asyncio.Task))
print(isinstance(fut2, asyncio.Future))
await task1
await task2
await fut1
await fut2
print("exiting main")
asyncio.run(main())
輸出如下, 注意第三行和第四行的輸出:
running main
True
True
False
True
before foo await
before bar await
before popo await
after foo await
after popo await
after bar await
exiting main
因此粮彤,python 3.7 及之后版本都推薦使用asyncio.create_task
方法,這個方法限制了傳入的對象必須是一個協(xié)程對象姜骡。
asyncio.get_running_loop
和asyncio.get_event_loop
asyncio.get_running_loop
函數(shù)是在Python 3.7中添加导坟,在協(xié)程內部使用以便獲取運行著的事件循環(huán)的函數(shù),當事件循環(huán)不存在時圈澈,這個函數(shù)可能會返回RuntimeError
惫周。它的實現(xiàn)相較于asyncio.get_event_loop
(可能會按需開始一個新的事件循環(huán))更加簡單和快速.
其他常用類
Asyncio.Queue
對于并發(fā)編程,經常需要使用隊列來將負載分配到多個任務上康栈,比如經典的生產者-消費者模式递递,asyncio
包同樣提了Queue
對象來滿足這類需求,參考官方的代碼示例:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
其中的幾個方法需要單獨說明一下:
put_nowait(item)
: 將item放到隊列中啥么,非阻塞性操作登舞,當隊列滿時,將拋出QueueFull
的異常悬荣;
put(item)
:將item放到隊列中逊躁,阻塞性操作, 當隊列滿時,將一直等待直到有空位隅熙;
get_nowait()
: 從隊列中獲取一個item稽煤,非阻塞性操作,當隊列為空時囚戚,將拋出QueueEmpty
的異常酵熙。
get()
: 從隊列中獲取一個item,阻塞性操作驰坊,當隊列為空時匾二,將一直等待直到有item可用;
task_done()
: 該方法通常由消費者處理拳芙,用來表示從隊列獲取的任務已經完成察藐。對于每一個通過get()
從隊列獲取的任務,調用該方法會告知隊列任務處理完成;
join()
: 阻塞直到隊列中的所有item都已經被處理過舟扎。每個item添加到隊列中時分飞,未完成任務的計數(shù)就會增加;當這些任務調用task_done()
時睹限,這個計數(shù)就會減少譬猫;當未完成任務計數(shù)為0時,join()
將不再阻塞羡疗。
Asyncio.Semaphore
異步編程處理IO密集型的任務時具有很好的性能染服,但有時我們也會希望限制一下并發(fā)量,這時候就可以使用信號量來達到這個目的叨恨×危基本用法可以參考官方的代碼示例:
sem = asyncio.Semaphore(10)
# ... later
async with sem:
# work with shared resource