簡介
asyncio是python3.4之后的協(xié)程模塊腥放,是python實(shí)現(xiàn)并發(fā)重要的包泣栈,這個(gè)包使用時(shí)間循環(huán)驅(qū)動(dòng)實(shí)現(xiàn)并發(fā)粹舵。
- event_loop:時(shí)間循環(huán),開啟之后戈泼,可以將協(xié)程注冊(cè)進(jìn)來婿禽。
- task:一個(gè)協(xié)程對(duì)象就是一個(gè)可以掛起的函數(shù),任務(wù)是對(duì)協(xié)程的進(jìn)一步封裝大猛,其中包含了任務(wù)的各種狀態(tài)
- future: 期物扭倾,代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果。task可以說是future的子類挽绩。
asyncio
先通過一個(gè)簡單的例子看asyncio的基本用法與作用:
import asyncio
import itertools
import sys
import time
@asyncio.coroutine
def spin():
for i in itertools.cycle('|/-\\'):
write, flush = sys.stdout.write, sys.stdout.flush
write(i)
flush()
write('\x08'*len(i))
try:
yield from asyncio.sleep(1)
except asyncio.CancelledError:
break
@asyncio.coroutine
def slow_f():
yield from asyncio.sleep(3)
return 3
@asyncio.coroutine
def sup():
spiner = asyncio.async(spin())
print("spiner:",spiner)
r = yield from slow_f()
spiner.cancel()
return r
def main():
loop = asyncio.get_event_loop()
r = loop.run_until_complete(sup())
loop.close()
print("r:",r)
main()
輸出結(jié)果:
spiner: <Task pending coro=<spin() running at c:/Users/DELL/Desktop/ssj/search/descrip.py:7>>
r: 3 # 運(yùn)行期間會(huì)有動(dòng)畫指針
- 協(xié)程可以使用@asyncio.coroutine裝飾器裝飾膛壹,asyncio.sleep可以避免時(shí)間循環(huán)阻塞。
- asyncio.async包裝的協(xié)程唉堪,不阻塞恢筝,立即返回一個(gè)Task對(duì)象
- slow_f三秒后,控制器返回到sup上巨坊,spiner.cancel()取消Task,任務(wù)結(jié)束此改。
- get_event_loop獲取時(shí)間循環(huán)
- run_until_complete在時(shí)間循環(huán)中趾撵,載入任務(wù),驅(qū)動(dòng)協(xié)程運(yùn)行完畢共啃。
asyncio.Task 是asyncio.Future的子類占调,用于包裝協(xié)程。asyncio.Future 類的 .result() 方法沒有參數(shù)移剪,因此不能指定超時(shí)時(shí)間究珊。此外,如果調(diào)用 .result() 方法時(shí)期物還沒運(yùn)行完畢纵苛,那么 .result() 方法不會(huì)阻塞去等待結(jié)果剿涮,而是拋出 asyncio.InvalidStateError 異常言津。獲取asyncio.Future 對(duì)象的結(jié)果通常使用 yield from,從中產(chǎn)出結(jié)果取试。使用 yield from 處理期物悬槽,等待期物運(yùn)行完畢這一步無需我們關(guān)心,而且不會(huì)阻塞事件循環(huán)瞬浓,因?yàn)樵?asyncio 包中初婆,yield from 的作用是把控制權(quán)還給事件循環(huán)。
在 asyncio 包中猿棉,期物和協(xié)程關(guān)系緊密磅叛,因?yàn)榭梢允褂?yield from 從asyncio.Future 對(duì)象中產(chǎn)出結(jié)果。如萨赁,若foo是協(xié)程函數(shù)弊琴,那么可以這樣寫:res = yield from foo()。
python3.5之后位迂,協(xié)程采用新語法访雪,采用了關(guān)鍵字async/await:
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
把@asyncio.coroutine替換為async;把yield from替換為await掂林。
- await
- result = await future 或者 result = yield from future:阻塞直到future完成臣缀,然后會(huì)返回future的結(jié)果,或者取消future的話拋出異常CancelledError
- result = await coroutine 或者 result = yield from coroutine:直到協(xié)程產(chǎn)生結(jié)果或者拋出異常才往下執(zhí)行泻帮。
調(diào)用一個(gè)協(xié)程精置,不會(huì)執(zhí)行這個(gè)協(xié)程的代碼,會(huì)立即返回锣杂,當(dāng)await coroutine 或者 yield from coroutine才會(huì)執(zhí)行代碼脂倦。或者通過時(shí)間循環(huán)調(diào)度元莫,
ensure_future 和 create_task都可以包裝協(xié)程赖阻,返回一個(gè)task對(duì)象
- ensure_future(coro_or_future, *, loop=None) 返回task對(duì)象,若參數(shù)是future踱蠢,直接返回
- create_task返回task對(duì)象火欧。
- run_until_complete
import asyncio
import datetime
async def display_date(loop):
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
print("stop")
loop.close()
輸出結(jié)果:
2019-08-25 15:56:04.353596
2019-08-25 15:56:05.354924
2019-08-25 15:56:06.362594
2019-08-25 15:56:07.362925
2019-08-25 15:56:08.363157
stop
display_date(loop)協(xié)程運(yùn)行完畢,run_until_complete才會(huì)返回茎截,否則阻塞苇侵。與之相似功能的call_soon卻是不阻塞的。
- call_soon
import asyncio
def hello_world(loop):
print('Hello World')
loop.stop()
loop = asyncio.get_event_loop()
# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
call_soon(callback),會(huì)把callback放到一個(gè)先進(jìn)先出的隊(duì)列企锌,每個(gè)callback會(huì)被執(zhí)行一次榆浓。
call_soon注冊(cè)的協(xié)程任務(wù)之后,立即返回撕攒,不阻塞陡鹃,配合run_forever使用烘浦,run_forever會(huì)一直循環(huán),直到loop.stop()杉适。
-
call_soon_threadsafe
類似于call_soon谎倔,但是是線程安全的。涉及到多線程時(shí)會(huì)使用到猿推。 -
call_later
call_later(delay, callback, *args) 延時(shí)delay后執(zhí)行callback片习,返回一個(gè)asyncio.Handle對(duì)象,可以通過cancel取消蹬叭。類似于這個(gè)方法的還有call_at等
import asyncio
import datetime
def display_date(end_time, loop):
print(datetime.datetime.now())
if (loop.time() + 1.0) < end_time:
h = loop.call_later(1, display_date, end_time, loop)
if (loop.time() + 3.0) > end_time:
h.cancel()
loop.stop()
else:
loop.stop()
loop = asyncio.get_event_loop()
# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)
# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()
輸出結(jié)果:
2019-08-25 16:33:03.879382
2019-08-25 16:33:04.882574
2019-08-25 16:33:05.886996
- add_signal_handler
另外當(dāng)使用run_forever時(shí)藕咏,可以通過信號(hào)終止
import asyncio
import functools
import os
import signal
def ask_exit(signame):
print("got signal %s: exit" % signame)
loop.stop()
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
functools.partial(ask_exit, signame))
print("Event loop running forever, press Ctrl+C to interrupt.")
print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
try:
loop.run_forever()
finally:
loop.close()
- 鏈?zhǔn)絽f(xié)程
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
print_sum要等到compute完成之后才會(huì)繼續(xù)執(zhí)行。
下面是執(zhí)行鏈:
event_loop事件循環(huán)運(yùn)行之后秽五,將task注冊(cè)進(jìn)來孽查, await compute(x, y)后,控制權(quán)走到compute里坦喘,遇到 await asyncio.sleep(1.0)后盲再,控制權(quán)返回給loop,loop判斷情況后瓣铣,等待1s答朋,控制權(quán)給compute,compute完成后棠笑,控制權(quán)給print_sum梦碗。