(2024.03.19 Tues @KLN)
并發(fā)(concurrency)表示多個(gè)任務(wù)在同一時(shí)間運(yùn)行,python中內(nèi)置的asyncio
包提供了在單線程(single thread)下實(shí)現(xiàn)任務(wù)并行诗祸。
asyncio
和event loop
asyncio
使用了名為事件循環(huán)(event loop)的結(jié)構(gòu)以實(shí)現(xiàn)單線程下的任務(wù)并行捎拯。
(2024.04.13 Sat @KLN)
其工作流程如下:
- 主線程(main thread)將多個(gè)任務(wù)發(fā)送到任務(wù)隊(duì)列(task queue)
- 事件循環(huán)持續(xù)監(jiān)控任務(wù)隊(duì)列职辅,運(yùn)行任務(wù)直到遇到I/O任務(wù)
- 檢測(cè)I/O任務(wù)是否完成,若完成,系統(tǒng)(OS)會(huì)通知程序杰标,事件循環(huán)繼續(xù)運(yùn)行未暫停的任務(wù)
- 重復(fù)上述步驟直到任務(wù)隊(duì)列被清空
不同操作系統(tǒng)中有不同的通知程序:
OS | Event notification system |
---|---|
Linux | epoll |
Windows | I/O completion port (IOCP) |
macOS | kqueue |
在Python 3.7版本之前绳慎,事件循環(huán)和運(yùn)行任務(wù)都要手動(dòng)觸發(fā)完成纵竖,而asyncio
包的引入使開發(fā)者自動(dòng)管理事件循環(huán)漠烧,不必關(guān)注低級(jí)API(low-level API)。
asyncio
的基本命令
協(xié)程是一個(gè)常規(guī)函數(shù)靡砌,當(dāng)遇到其他可能花費(fèi)一定時(shí)間完成的任務(wù)時(shí)協(xié)程可暫停并等待已脓。耗費(fèi)時(shí)間的任務(wù)執(zhí)行完畢,被暫停的協(xié)程將恢復(fù)并執(zhí)行該協(xié)程中剩下的代碼通殃。協(xié)程暫停和等待時(shí)度液,其他代碼會(huì)被運(yùn)行,這樣也就是實(shí)現(xiàn)了異步運(yùn)行画舌,提高了效率堕担。
async
and await
Python中使用async
和await
關(guān)鍵字創(chuàng)建和暫停協(xié)程
-
async
關(guān)鍵字創(chuàng)建協(xié)程 -
await
關(guān)鍵字暫停協(xié)程
這兩個(gè)關(guān)鍵字用在函數(shù)或類名之前。注意如果協(xié)程在調(diào)用時(shí)沒有使用await
關(guān)鍵字骗炉,則調(diào)用返回的是協(xié)程對(duì)象本身照宝,而非該協(xié)程的運(yùn)行結(jié)果。比如下面案例:
一個(gè)用來計(jì)算正方形面積的函數(shù)句葵,用同步的方式實(shí)現(xiàn)如下
def square_area(side_length: int) -> int:
return side_length**2
>>> result = square_area(10)
>>> print(result)
Output:
>>> 100
函數(shù)square_area
加入async
關(guān)鍵字厕鹃,則該函數(shù)變成協(xié)程
async def square_area(side_length: int) -> int:
return side_length**2
采用同樣的方法調(diào)用,返回協(xié)程對(duì)象
>>> result = square_area(10)
>>> print(result)
Output
<coroutine object square_area at 0x7f8e4f38b240>
為了運(yùn)行協(xié)程乍丈,需要在事件循環(huán)event loop執(zhí)行該協(xié)程剂碴。Python 3.7之前,開發(fā)者需要手工創(chuàng)建一個(gè)event loop來執(zhí)行運(yùn)行并關(guān)閉event loop轻专。Python 3.7及之后的版本提供了asyncio
庫其中的函數(shù)簡(jiǎn)化了event loop的管理忆矛。比如可使用asyncio.run()
函數(shù)自動(dòng)創(chuàng)建event loop、運(yùn)行協(xié)程以及關(guān)閉请垛。
上面這個(gè)案例用asyncio.run
運(yùn)行如下
import asyncio
async def square_area(side_length: int) -> int:
return side_length**2
>>> result = asyncio.run(square_area(10))
>>> print(result)
Output
<stdin>:1: RuntimeWarning: coroutine 'square_area' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
>>> result
100
注意催训,asyncio.run()
被設(shè)計(jì)成異步程序的入口函數(shù)(main entry point),而且asyncio.run()
只能執(zhí)行一個(gè)協(xié)程宗收,該協(xié)程可以調(diào)用其他函數(shù)和協(xié)程漫拭。
asyncio.create_task
(2024.04.06 Sat @KLN)
用asyncio
建立多個(gè)協(xié)程并運(yùn)行,需要用.create_task
方法創(chuàng)建混稽,否則協(xié)程不生效采驻,只會(huì)按順序執(zhí)行代碼。下面是不使用.create_task
方法創(chuàng)建多個(gè)協(xié)程的結(jié)果匈勋。
創(chuàng)建一個(gè)協(xié)程call_api
礼旅,該協(xié)程的作用是耗費(fèi)3秒。
import asyncio
import time
async def call_api(message, result=1000, delay=3):
print(message)
await asyncio.sleep(delay)
return result
調(diào)用兩次該協(xié)程
async def main():
start = time.perf_counter()
price = await call_api('Get stock price of GOOG...', 300)
print(price)
price = await call_api('Get stock price of APPL...', 400)
print(price)
end = time.perf_counter()
print(f'It took {round(end-start,0)} second(s) to complete.')
運(yùn)行該函數(shù)
>>> asyncio.run(main())
Get stock price of GOOG...
300
Get stock price of APPL...
400
It took 6.0 second(s) to complete.
該案例中洽洁,直接調(diào)用協(xié)程痘系,而不放在event loop中運(yùn)行,并沒有實(shí)現(xiàn)并行饿自,運(yùn)行時(shí)間是兩個(gè)協(xié)程運(yùn)行時(shí)間總和汰翠。
asyncio
中的任務(wù)(task
)作為一個(gè)包裝(wrapper
)临谱,可將協(xié)程置于event loop中運(yùn)行和部署。協(xié)程的scheduling和執(zhí)行以非阻塞(non-blocking)的方式運(yùn)行奴璃,也就是可以在創(chuàng)建任務(wù)之后立刻執(zhí)行其他代碼,而任務(wù)同時(shí)在運(yùn)行城豁。
需要注意的是任務(wù)不同于await
關(guān)鍵字苟穆,await
會(huì)阻礙整個(gè)協(xié)程直到運(yùn)行返回一個(gè)結(jié)果。
為解決多個(gè)協(xié)程不能并行的問題唱星,需要?jiǎng)?chuàng)建多個(gè)任務(wù)并安排進(jìn)event loop中同時(shí)運(yùn)行雳旅。
創(chuàng)建任務(wù)可使用asyncio
中create_task()
函數(shù),將協(xié)程傳遞到該函數(shù)中间聊,該函數(shù)返回一個(gè)Task
對(duì)象攒盈。
async def main():
start = time.perf_counter()
task_1 = asyncio.create_task(
call_api('Get stock price of GOOG...', 300)
)
task_2 = asyncio.create_task(
call_api('Get stock price of APPL...', 300)
)
price = await task_1
print(price)
price = await task_2
print(price)
end = time.perf_counter()
print(f'It took {round(end-start,0)} second(s) to complete.')
運(yùn)行該協(xié)程
>>> asyncio.run(main())
Get stock price of GOOG...
Get stock price of APPL...
300
300
It took 3.0 second(s) to complete.
如果在命令行中定義create_task
會(huì)返回如下錯(cuò)誤
>>> task_2 = asyncio.create_task(
... call_api('Get stock price of APPL...', 300)
... )
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/jeffcheung/opt/anaconda3/lib/python3.9/asyncio/tasks.py", line 360, in create_task
loop = events.get_running_loop()
RuntimeError: no running event loop
提示no running event loop,原因是并沒有經(jīng)由asyncio.run
生成一個(gè)event loop哎榴。
注意在運(yùn)行中使用await
關(guān)鍵字型豁,如果不使用,Python在asyncio.run()
關(guān)閉event loop時(shí)并不關(guān)閉停止任務(wù)的運(yùn)行尚蝌。
總結(jié):
-
asyncio
中的任務(wù)(task
)用于將協(xié)程包裹進(jìn)event loop并運(yùn)行 - 用
create_task()
函數(shù)創(chuàng)建任務(wù) - 注意在運(yùn)行中使用
await
關(guān)鍵字迎变,如果不使用,Python在asyncio.run()
關(guān)閉event loop時(shí)并不關(guān)閉停止任務(wù)的運(yùn)行飘言。
asyncio.cancel_task
取消任務(wù):如果協(xié)程因不用的原因卡住而無法完成任務(wù)衣形,可能無法終止任務(wù)。為解決該問題姿鸿,可使用Task
對(duì)象中cancel
方法谆吴,該方法將會(huì)在await
時(shí)提一個(gè)CancelledError
。
import asyncio
from asyncio import CancelledError
async def call_api(message, result=1000, delay=3):
print(message)
await asyncio.sleep(delay)
return result
創(chuàng)建任務(wù)之后立刻判斷任務(wù)是否執(zhí)行完成苛预,沒完成則cancel句狼。
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
if not task.done():
print('Cancelling the task...')
task.cancel()
try:
await task
except CancelledError:
print('Task has been cancelled.')
運(yùn)行返回
>>> asyncio.run(main())
Cancelling the task...
Task has been cancelled.
創(chuàng)建任務(wù)后,在任務(wù)尚未完成時(shí)檢測(cè)是否完成碟渺,之后取消鲜锚。被取消的任務(wù)在遇到await
關(guān)鍵字時(shí)就會(huì)值機(jī)返回CancelledError
。
如果想間隔特定特定時(shí)間判斷是否完成任務(wù)苫拍,可使用while
循環(huán)
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
time_elapsed = 0
while not task.done():
time_elapsed += 1
await asyncio.sleep(1)
print('Task has not completed, checking again in a second')
if time_elapsed == 3:
print('Cancelling the task...')
task.cancel()
break
try:
await task
except CancelledError:
print('Task has been cancelled.')
返回結(jié)果如下
>>> asyncio.run(main())
Calling API...
Task has not completed, checking again in a second
Task has not completed, checking again in a second
Task has not completed, checking again in a second
Cancelling the task...
Task has been cancelled.
asyncio.wait_for
一個(gè)任務(wù)可以被取消芜繁,也可以被等待直到設(shè)定的timeout,這時(shí)需要使用asyncio
中wait_for()
方法绒极。該方法等待一個(gè)單一任務(wù)完成骏令,并判斷是否在設(shè)定的timeouot內(nèi)完成。如果發(fā)生timeout垄提,asyncio.wait_for()
方法會(huì)取消該任務(wù)并返回TimeoutError
榔袋,否則返回該任務(wù)的預(yù)期結(jié)果周拐。
另有asyncio.shield
函數(shù)可保護(hù)任務(wù)免于被取消。
案例
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
MAX_TIMEOUT = 3
try:
await asyncio.wait_for(task, timeout=MAX_TIMEOUT)
except TimeoutError:
print('The task was cancelled due to a timeout')
運(yùn)行結(jié)果
>>> asyncio.run(main())
Calling API...
The task was cancelled due to a timeout
注意凰兑,該結(jié)果在python 3.11中顯示如上妥粟,但在python 3.9中會(huì)顯示其他error。
有時(shí)可能需要僅僅需要通知用戶該程序超過了預(yù)設(shè)的timeout而即便超時(shí)也不會(huì)取消任務(wù)吏够。這種情況下需喲將任務(wù)包裹進(jìn)asyncio
的.shield()
函數(shù)勾给,該函數(shù)將會(huì)保護(hù)任務(wù)免于被取消。
async def main():
task = asyncio.create_task(
call_api('Calling API...', result=2000, delay=5)
)
MAX_TIMEOUT = 3
try:
await asyncio.wait_for(asyncio.shield(task), timeout=MAX_TIMEOUT)
except TimeoutError:
print('The task took more than expected and will complete soon.')
result = await task
print(result)
運(yùn)行結(jié)果如下锅知。
>>> asyncio.run(main())
Calling API...
The task took more than expected and will complete soon.
2000
注意到直到timeout播急,任務(wù)也沒有完成,所以捕獲異常TimeoutError
售睹。接下來因?yàn)閷?duì)任務(wù)使用了await
關(guān)鍵字且前面代碼使用asyncio.shield
函數(shù)對(duì)任務(wù)保護(hù)桩警,任務(wù)繼續(xù)執(zhí)行,直到返回運(yùn)行結(jié)果昌妹。
asyncio.wait
asyncio.wait()
函數(shù)可實(shí)現(xiàn)并發(fā)運(yùn)行an iterable of awaitable objects捶枢,并在特定條件下阻塞(block)。asyncio.wait()
函數(shù)的語法如下
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
-
aws
: 將并行運(yùn)行的iterable of awaitable objects -
timeout
: int/float飞崖,柱蟀,返回結(jié)果之前等待最大秒數(shù) -
return_when
: 函數(shù)返回時(shí)間/條件,具體參數(shù)查看下面列表
Constant | Description |
---|---|
ALL_COMPLETED/asyncio.ALL_COMPLETED
|
Return when all awaitables are complete or cancelled. |
FIRST_COMPLETED/asyncio.FIRST_COMPLETED
|
Return when all awaitables are complete or canceled. |
FIRST_EXCEPTION/asyncio.FIRST_EXCEPTION
|
Return when any awaitable is complete by raising an exception. If no awaitable raises an exception, the FIRST_EXCEPTION is equivalent to ALL_COMPLETED. |
該函數(shù)的返回結(jié)果格式為
done, pending = await asyncio.wait(aws)
-
done
: 已經(jīng)完成運(yùn)行的awaitables -
pending
: 未完成(pending)的awaitables
案例如下:
import asyncio
from asyncio import create_task
class APIError(Exception):
pass
async def call_api(message, result=100, delay=3, raise_exception=False):
print(message)
await asyncio.sleep(delay)
if raise_exception:
raise APIError
else:
return result
async def main():
task_1 = create_task(call_api('calling API 1...', result=1, delay=1))
task_2 = create_task(call_api('calling API 2...', result=2, delay=2))
task_3 = create_task(call_api('calling API 3...', result=3, delay=3))
pending = (task_1, task_2, task_3)
while pending:
done, pending = await asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED
)
result = done.pop().result()
print(result)
運(yùn)行結(jié)果如下
>>> asyncio.run(main())
calling API 1...
calling API 2...
calling API 3...
1
2
3
asyncio.Future
asyncio
中的future
是一個(gè)現(xiàn)時(shí)無法返回值但未來會(huì)返回的對(duì)象蚜厉。一般來說长已,future對(duì)象是異步運(yùn)行的結(jié)果。
比如昼牛,調(diào)用一個(gè)遠(yuǎn)程服務(wù)器的API并預(yù)計(jì)稍后會(huì)返回結(jié)果术瓮。這個(gè)API調(diào)用可以返回一個(gè)future
對(duì)象,使用者可以await
贰健。
創(chuàng)建future
對(duì)象可使用asyncio
的Future
類胞四。
舉例如下
import asyncio
from asyncio import Future
async def main():
my_future = Future()
print(my_future.done()) # False
my_future.set_result('Bright')
print(my_future.done()) # True
print(my_future.result())
運(yùn)行該代碼返回
>>> asyncio.run(main())
False
True
Bright
注意到在Future
對(duì)象被設(shè)定結(jié)果之前,即執(zhí)行my_future.set_result('Bright')
之前伶椿,檢測(cè)是否完成(my_future.done()
)則返回結(jié)果是False
辜伟。只有在設(shè)定結(jié)果之后.done()
方法才返回True
。
Future
對(duì)象的built-in attributes和方法包括
['__await__', '__class__', '__class_getitem__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__',
'__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__',
'__init_subclass__', '__iter__', '__le__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__',
'__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '_asyncio_future_blocking',
'_callbacks', '_cancel_message', '_exception', '_log_traceback', '_loop', '_make_cancelled_error',
'_result', '_source_traceback', '_state', 'add_done_callback', 'cancel', 'cancelled', 'done', 'exception',
'get_loop', 'remove_done_callback', 'result', 'set_exception', 'set_result']
對(duì)future
對(duì)象使用await
關(guān)鍵字:
from asyncio import Future
import asyncio
async def plan(my_future):
print('Planning my future...')
await asyncio.sleep(1)
my_future.set_result('Bright')
def create() -> Future:
my_future = Future()
asyncio.create_task(plan(my_future))
return my_future
async def main():
my_future = create()
result = await my_future
print(result)
返回結(jié)果如
>>> asyncio.run(main())
Planning my future...
Bright
future
對(duì)象和coroutine
對(duì)象略相似脊另,下面對(duì)比future
, coroutine
和task
:
-
Coroutine
导狡,Future
和Task
都是Awaitable
抽象類的子類(?) - 具體地,
Coroutine
是Awaitable
子類偎痛,Future
是Awaitable
子類旱捧,Task
是Future
子類
Awitable
類中包含抽象方法__await__()
,任何含有__await__()
方法實(shí)現(xiàn)的類都可使用await
關(guān)鍵字,可以通過await
關(guān)鍵字調(diào)用的類稱作awaitables
枚赡。
asyncio.gather()
asyncio.gather
提供了運(yùn)行多個(gè)異步運(yùn)行的功能氓癌。格式如下
gather(*aws, return_exceptions=False) -> Future[tuple[()]]
asyncio.gather()
函數(shù)有兩個(gè)參數(shù):
-
aws
:awaitable對(duì)象序列,如果aws
中任何一個(gè)對(duì)象是協(xié)程贫橙,則.gather()
函數(shù)對(duì)自動(dòng)將其部署位task
-
return_exceptions
:默認(rèn)為False
贪婉,如果異常發(fā)生在awaitable對(duì)象內(nèi),則立刻傳遞到(propagated)await onasyncio.gather()
的任務(wù)中卢肃,其他awaitable繼續(xù)運(yùn)行谓松,且不會(huì)被取消
asyncio.gather()
函數(shù)以tuple形式返回的awaitable,其中的元素順序與輸入?yún)?shù)中的順序相同践剂。
如果return_exceptions
為True
,asyncio.gather()
會(huì)加入一個(gè)異常(如果有)娜膘,并不會(huì)將異常傳遞給調(diào)用者逊脯。
案例:
import asyncio
async def call_api(message, result, delay=3):
print(message)
await asyncio.sleep(delay)
return result
async def main():
a, b = await asyncio.gather(
call_api('Calling API 1 ...', 1),
call_api('Calling API 2 ...', 2)
)
print(a, b)
運(yùn)行結(jié)果
>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
1 2
帶有異常的案例
import asyncio
class APIError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def call_api_failed():
await asyncio.sleep(3)
raise APIError('API failed')
async def call_api(message, result, delay=3):
print(message)
await asyncio.sleep(delay)
return result
async def main():
a, b, c = await asyncio.gather(
call_api('Calling API 1 ...', 100, 1),
call_api('Calling API 2 ...', 200, 2),
call_api_failed()
)
print(a, b, c)
運(yùn)行結(jié)果
>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/Cellar/python@3.11/3.11.6_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "<stdin>", line 2, in main
File "<stdin>", line 3, in call_api_failed
APIError: API failed
在結(jié)果中有異常的案例
import asyncio
class APIError(Exception):
def __init__(self, message):
self._message = message
def __str__(self):
return self._message
async def call_api(message, result, delay=3):
print(message)
await asyncio.sleep(delay)
return result
async def call_api_failed():
await asyncio.sleep(1)
raise APIError('API failed')
async def main():
a, b, c = await asyncio.gather(
call_api('Calling API 1 ...', 100, 1),
call_api('Calling API 2 ...', 200, 2),
call_api_failed(),
return_exceptions=True
)
print(a, b, c)
運(yùn)行結(jié)果
>>> asyncio.run(main())
Calling API 1 ...
Calling API 2 ...
100 200 API failed
Reference
1 pythontutorial dot net