原生協(xié)程是基于async關鍵字的
# 定義一個簡單的原生協(xié)程cor
async def cor():
print('enter cor')
print('exit cor')
print(type(cor)) # <class 'function'>
print(type(cor())) # <class 'coroutine'>
可以看到cor
的類型<class 'function'>
函數類型,說明async
關鍵字修飾的函數也是一個函數而已,跟普通函數在定義上沒啥什么差別耀里,差別在于被調用的時候坞靶,cor()
并不是執(zhí)行函數體中的語句,而是生成一個<class 'coroutine'>
類型的對象章鲤,即協(xié)程對象税产,跟生成器類似怕轿,協(xié)程也有send()
方法。
c = cor()
try:
c.send(None)
except StopIteration as e:
print(e.value) # None
當c.send(None)時辟拷,才會執(zhí)行cor中的語句撞羽,也就是執(zhí)行print('enter cor')
和print('exit cor')
這兩句,就執(zhí)行完畢了衫冻,執(zhí)行完畢時會拋出StopIteration
異常诀紊,這個異常對象的value
屬性中保存著cor
的返回值,這里core
沒有return
語句隅俘,其實默認返回None
邻奠。
原生協(xié)程中使用 await 關鍵字
async def cor():
print('enter cor')
rst = await cor1()
print('rst -->', rst)
print('exit cor')
async def cor1():
print('enter cor1')
print('exit cor1')
return 'cor1'
c = cor()
try:
c.send(None)
except StopIteration as e:
print('cor -->', e.value) # None
輸出如下:
enter cor
enter cor1
exit cor1
rst --> cor1
exit cor
cor --> None
await
關鍵字后面必須是一個實現了__awwit__
方法的對象,不一定是協(xié)程對象为居,只要實現了這個方法碌宴,就會進入到此方法中,調用該方法中的邏輯颜骤。比如后面要介紹的Future
對象唧喉,它也是實現了__await__
方法的。
await
關鍵字的語義是表示掛起當前的cor
協(xié)程的執(zhí)行,進入到cor1
協(xié)程中執(zhí)行cor1
的邏輯八孝,直到cor1
執(zhí)行完畢董朝,然后執(zhí)行流程又回到cor
掛起的地方,繼續(xù)執(zhí)行cor
中await
后面的語句干跛。直到最后拋出StopIteration
異常子姜。
基于生成器的協(xié)程,也就是非原生協(xié)程
import types
async def cor():
print('enter cor')
rst = await cor2()
print('rst --> ', rst)
print('exit cor')
@types.coroutine
def cor2():
print('enter cor2')
yield
print('exit cor2')
return 'cor2'
c = cor()
try:
c.send(None)
c.send(None)
except StopIteration as e:
print('cor -->', e.value) # None
輸出如下:
enter cor
enter cor2
exit cor2
rst --> cor2
exit cor
cor --> None
與上面的原生協(xié)程的嵌套不同的是楼入,調用了兩次c.send(None)
哥捕,執(zhí)行第一次c.send(None)
時,會在cor2
的yield
關鍵字處掛起嘉熊,第二次c.send(None)
則會在yield
掛起的地方遥赚,接著往下執(zhí)行,然后core2
返回'cor2'
阐肤,賦值給rst
變量凫佛,繼續(xù)執(zhí)行cor
中await
后面的語句,直到最后拋出StopIteration
異常孕惜。
總結
async 是一個關鍵字愧薛,async def 定義的類型還是一個function類型,只有當它被調用時才返回一個協(xié)程對象
async def
跟def
定義的方法在沒被調用時沒有任何區(qū)別衫画,不必看得很神秘毫炉,它也可以有return
語句,這點也正常削罩,
因為python
中沒有return
語句的函數實際上默認是返回None
的瞄勾,所以只是顯式的return
和隱式return None
的區(qū)別
對于協(xié)程的send(None)
方法,跟生成器的send(None)
類似鲸郊,不同的是丰榴,原生協(xié)程的send方法被調用的時候,會一直執(zhí)行到碰
到await
語句秆撮,但是不會停下來,會直接再進入到await EXPR
的EXPR
中换况,其實EXPR
是一個awaitable
對象职辨,會調用該對象的
__await__()
執(zhí)行該方法的里面的邏輯,如果該awaitable
對象是一個原生協(xié)程對象戈二,那么它的__await__()
方法中的邏輯就
是在定義此協(xié)程時async def
下面的邏輯舒裤,執(zhí)行完畢后,該協(xié)程對象就關閉了觉吭,執(zhí)行流程就再次跳轉到當前掛起的協(xié)程中腾供,
執(zhí)行該協(xié)程中余下的邏輯,最后執(zhí)行完畢,拋出StopIteration
異常。
對于原生生協(xié)程來說,調用send()
方法時蒋得,會一直執(zhí)行到出現StopIteration
異常為止夭禽,只有在 __await__()
方法中有yield
語句時才
會掛起在那里,如果__await__()
方法中沒有yield
語句箍镜,不會掛起,會返回await的返回值,繼續(xù)執(zhí)行匿乃,直到拋出StopIteration
異常。
先拋出一個結論豌汇,在asyncio
中協(xié)程的流程的掛起操作幢炸,實際上還是是通過yield
關鍵字來實現的,并不是await
關鍵字拒贱, async
和await
關鍵字只不過是語法糖宛徊。
asyncio的基本流程分析
import asyncio
async def cor():
print('enter cor ...')
await asyncio.sleep(2)
print('exit cor ...')
return 'cor'
loop = asyncio.get_event_loop()
task = loop.create_task(cor())
rst = loop.run_until_complete(task)
print(rst)
從這個簡單的例子入手,逐步分析協(xié)程在事件循環(huán)中的執(zhí)行流程柜思。
第1步
async def cor()
定義了一個cor
協(xié)程岩调。第2步
loop = asyncio.get_event_loop()
得到一個事件循環(huán)對象loop
,這個loop
在一個線程中只有唯一的一個實例赡盘,只要在同一個線程中調用此方法号枕,得到的都是同一個loop
對象。第3步
task = loop.create_task(cor())
把cor
協(xié)程包裝成一個task
對象陨享。第4步
rst = loop.run_until_complete(task)
把這個task
對象添加到事件循環(huán)中去葱淳。
首先看第3步的loop.create_task(cor())這個方法
class BaseEventLoop(events.AbstractEventLoop):
...
def __init__(self):
...
# 用來保存包裹task.step方法的handle對象的對端隊列
self._ready = collections.deque()
# 用來保存包裹延遲回調函數的handle對象的二叉堆,是一個最小二叉堆
self._scheduled = []
...
def create_task(self, coro):
"""Schedule a coroutine object.
Return a task object.
"""
self._check_closed()
# self._task_factory 默認是None
if self._task_factory is None:
# 創(chuàng)建一個task對象
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
else:
task = self._task_factory(self, coro)
# 返回這個task對象
return task
def call_soon(self, callback, *args):
self._check_closed()
if self._debug:
self._check_thread()
self._check_callback(callback, 'call_soon')
# 關鍵代碼callback就是task._step方法抛姑,args是task._step的參數
handle = self._call_soon(callback, args)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
def _call_soon(self, callback, args):
# 1 handle是一個包裹了task._step方法和args參數的對象
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
# 2 關鍵代碼赞厕,把handle添加到列表self._ready中
self._ready.append(handle)
return handle
loop.create_task(cor())
實際上是創(chuàng)建了一個Task
類的實例。再來看一下Task
這個類
class Task(futures.Future):
...
def __init__(self, coro, *, loop=None):
assert coroutines.iscoroutine(coro), repr(coro)
# 調用父類的__init__獲得線程中唯一的loop對象
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-1]
self._coro = coro
self._fut_waiter = None
self._must_cancel = False
# 關鍵代碼定硝,把_step方法注冊到loop對象中去
self._loop.call_soon(self._step)
self.__class__._all_tasks.add(self)
def _step(self, exc=None):
...
result = coro.send(None)
...
task
實例化時皿桑,調用了self._loop.call_soon(self._step)
--> loop.call_soon(callback, *args)
--> loop._call_soon(callback, args)
,實際上是把handle(task._step)
這個對象放到了loop._ready
隊列中蔬啡,放在這個隊列中有什么用呢诲侮?先告訴大家,_step
方法會在loop
對象的循環(huán)中被調用箱蟆,也就是會執(zhí)行coro.send(None)
這句沟绪。coro.send(None)
實際上就是執(zhí)行上面定義的cor
協(xié)程的里面的語句。
也就是說到第3步
執(zhí)行完時空猜,loop
對象已經實例化绽慈,task
對象也實例化恨旱,并且task
對象的_step
方法被封裝成handle
對象放入了loop
對象的_ready
隊列中去了。
再來看第4步loop.run_until_complete(task)
class BaseEventLoop(events.AbstractEventLoop):
def run_until_complete(self, future):
...
# future就是task對象坝疼,下面2句是為了確保future是一個Future類實例對象
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
# 添加回調方法_run_until_complete_cb到當前的task對象的callbacks列表中搜贤,_run_until_complete_cb就是最后
# 把loop的_stop屬性設置為ture的,用來結束loop循環(huán)的
future.add_done_callback(_run_until_complete_cb)
try:
# 開啟無線循環(huán)
self.run_forever()
except:
...
raise
finally:
...
# 執(zhí)行完畢返回cor的返回值
return future.result()
def run_forever(self):
...
try:
events._set_running_loop(self)
while True:
# 每次運行一次循環(huán)裙士,判斷下_stopping是否為true入客,也就是是否結束循環(huán)
self._run_once()
if self._stopping:
break
finally:
...
def _run_once(self):
# loop的_scheduled是一個最小二叉堆,用來存放延遲執(zhí)行的回調函數腿椎,根據延遲的大小桌硫,把這些回調函數構成一個最小堆,然后再每次從對頂彈出延遲最小的回調函數放入_ready雙端隊列中啃炸,
# loop的_ready是雙端隊列铆隘,所有注冊到loop的回調函數,最終是要放入到這個隊列中南用,依次取出然后執(zhí)行的
# 1. self._scheduled是否為空
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
# 2. 給timeout賦值膀钠,self._scheduled為空,timeout就為None
timeout = None
# 只要self._ready和self._scheduled中有一個不為空裹虫,timeout就為0
if self._ready or self._stopping:
timeout = 0
# 只要self._scheduled不為空
elif self._scheduled:
# Compute the desired timeout.
# 用堆頂的回調函數的延遲時間作為timeout的等待時間肿嘲,也就是說用等待時間最短的回調函數的時間作為timeout的等待時間
when = self._scheduled[0]._when
timeout = max(0, when - self.time())
、
if self._debug and timeout != 0:
...
# 3. 關注else分支筑公,這是關鍵代碼
else:
# timeout=None --> 一直阻塞雳窟,只要有io事件產生,立馬返回event_list事件列表匣屡,否則一直阻塞著
# timeout=0 --> 不阻塞封救,有io事件產生,就立馬返回event_list事件列表捣作,沒有也返空列表
# timeout=2 --> 阻塞等待2s誉结,在這2秒內只要有io事件產生,立馬返回event_list事件列表券躁,沒有io事件就阻塞2s惩坑,然后返回空列表
event_list = self._selector.select(timeout)
# 用來處理真正的io事件的函數
self._process_events(event_list)
# Handle 'later' callbacks that are ready.
end_time = self.time() + self._clock_resolution
# 4. 依次取出堆頂的回調函數handle添加到_ready隊列中
while self._scheduled:
handle = self._scheduled[0]
# 當_scheduled[]中有多個延遲回調時,通過handle._when >= end_time來阻止沒有到時間的延遲函數被彈出也拜,
# 也就是說旭贬,當有n個延遲回調時,會產生n個timeout搪泳,對應n次run_once循環(huán)的調用
if handle._when >= end_time:
break
# 從堆中彈出堆頂最小的回調函數,放入 _ready 隊列中
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
# 5. 執(zhí)行self._ready隊列中所有的回調函數handle對象
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
# handle._run()實際上就是執(zhí)行task._step()扼脐,也就是執(zhí)行cor.send(None)
handle._run()
handle = None # Needed to break cycles when an exception occurs.
執(zhí)行run_until_complete
方法時future.add_done_callback(_run_until_complete_cb)
這一步其實是task.add_done_callback(_run_until_complete_cb)
岸军,也就是把_run_until_complete_cb
回調函數注冊到task
對象中去了奋刽,這個回調函數的作用是當cor協(xié)程執(zhí)行完畢時,回調_run_until_complete_cb
把loop對象的 _stopping
屬性設為True
艰赞,然后_run_once
執(zhí)行完畢時佣谐,判斷_stopping
為True
就跳出while
循環(huán),run_until_complete
才能返回task.result
從上面的函數調用流程
run_until_complete()
--> run_forever()
--> _run_once()
方妖,重點看_run_once
這個方法的執(zhí)行流程狭魂。
此時:
cor
協(xié)程還未開始執(zhí)行。loop._ready = [handle(task._step)]
党觅,loop._scheduled = []
雌澄。
第一輪_run_once()
的調用執(zhí)行開始
注意這里的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5注釋
第1,2步的邏輯判斷,
timeout = 0
杯瞻。第3步
event_list = self._selector.select(0)
镐牺,此時立馬返回空[]。第4步 由于
loop._scheduled = []
魁莉,不執(zhí)行第4步中的語句睬涧。第5步 依次從
_ready
隊列中取出回調函數handle旗唁,執(zhí)行handle._run()
。
執(zhí)行handle._run()
方法剿牺,也就是調用task._step()
湃崩,來看看task._step()
的執(zhí)行邏輯:
class Task(futures.Future):
...
def _step(self, exc=None):
"""
_step方法可以看做是task包裝的coroutine對象中的代碼的直到y(tǒng)ield的前半部分邏輯
"""
...
try:
if exc is None:
# 1.關鍵代碼
result = coro.send(None)
else:
result = coro.throw(exc)
# 2. coro執(zhí)行完畢會拋出StopIteration異常
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
# result為None時剪返,調用task的callbasks列表中的回調方法钱反,在調用loop.run_until_complite耳峦,結束loop循環(huán)
self.set_result(exc.value)
except futures.CancelledError:
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
# 3. result = coro.send(None)不拋出異常
else:
# 4. 查看result是否含有_asyncio_future_blocking屬性
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
self._loop.call_soon(
self._step,
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
elif blocking:
if result is self:
self._loop.call_soon(
self._step,
RuntimeError(
'Task cannot await on itself: {!r}'.format(
self)))
# 4.1. 如果result是一個future對象時疙咸,blocking會被設置成true
else:
result._asyncio_future_blocking = False
# 把_wakeup回調函數設置到此future對象中兰粉,當此future對象調用set_result()方法時,就會調用_wakeup方法
result.add_done_callback(self._wakeup)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
else:
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from '
'in task {!r} with {!r}'.format(self, result)))
# 5. 如果result是None备蚓,則注冊task._step到loop對象中去,在下一輪_run_once中被回調
elif result is None:
# Bare yield relinquishes control for one event loop iteration.
self._loop.call_soon(self._step)
# --------下面的代碼可以暫時不關注了--------
elif inspect.isgenerator(result):
# Yielding a generator is just wrong.
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from for '
'generator in task {!r} with {}'.format(
self, result)))
else:
# Yielding something else is an error.
self._loop.call_soon(
self._step,
RuntimeError(
'Task got bad yield: {!r}'.format(result)))
finally:
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.
def _wakeup(self, future):
try:
future.result()
except Exception as exc:
# This may also be a cancellation.
self._step(exc)
else:
# 這里是關鍵代碼烤黍,上次的_step()執(zhí)行到第一次碰到y(tǒng)ield的地方掛住了唉锌,此時再次執(zhí)行_step(),
# 也就是再次執(zhí)行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續(xù)執(zhí)行yield后面的邏輯
self._step()
self = None # Needed to break cycles when an exception occurs.
當task._step()執(zhí)行時袄简,調用core.send(None)腥放,即調用:
async def cor():
# 1.
print('enter cor ...')
# 2.
await asyncio.sleep(2)
# 3.
print('exit cor ...')
# 4.
return 'cor'
注意這里的第1,2,3,4步是在cor
上標記的1,2,3,4注釋
第1步
print('enter cor ...')
第2步
await asyncio.sleep(2)
,sleep
是一個非原生協(xié)程痘番,前面介紹過捉片,await
語句掛起當前的協(xié)程也就是cor
,然后會進入到sleep
協(xié)程中的汞舱。注意伍纫,此時執(zhí)行流程已經在sleep
協(xié)程中了,我們來看一下sleep
協(xié)程的代碼邏輯昂芜。
看一下sleep
協(xié)程實現
@coroutine
def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay == 0:
yield
return result
if loop is None:
loop = events.get_event_loop()
# 1. 創(chuàng)建一個future對象莹规,用來銜接前一個task
future = loop.create_future()
# 2. 添加一個延遲執(zhí)行的回調函數futures._set_result_unless_cancelled 到當前l(fā)oop對象的_scheduled二叉堆中,這個堆中的回調函數按照delay的大小泌神,形成一個最小堆
h = future._loop.call_later(delay,
futures._set_result_unless_cancelled,
future, result)
try:
# 3. 執(zhí)行 yield from future 語句良漱,此時會調用future的 __iter__() 方法舞虱,然后在 yield future 處掛住,返回future本身母市,self._asyncio_future_blocking = True
return (yield from future)
finally:
h.cancel()
sleep
是一個非原生協(xié)程矾兜,delay=2
注意這里的第1,2,3步是在sleep
上標記的1,2,3注釋
第1步 生成一個新
Future
對象,這個對象不同于跟task
是不同的對象患久,他們都是Future
類型的對象椅寺,因為Task
類繼承自Future
類。第2步
loop
對象中注冊了一個futures._set_result_unless_cancelled
的延遲回調函數handle
對象蒋失,前面介紹過返帕,延遲回調函數handle
對象是放在loop._scheduled
這個最小二叉堆
中的,此時篙挽,loop
對象的_scheduled
最小堆中只有一個延遲回調函數handle
荆萤。到sleep
中的第2步完成為止,loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]
铣卡,loop._ready=[]
链韭,注意正在執(zhí)行的handle._run()
的流程還沒走完,現在是進入到了sleep
協(xié)程中的第2步中算行。第3步 執(zhí)行
(yield from future)
會調用future
的__iter__
方法梧油,進入到__iter__
方法中,首先把self._asyncio_future_blocking
賦值為True
了州邢,儡陨,然后yield self
,注意量淌,此時cor
協(xié)程的執(zhí)行流程就掛起在了yield
處骗村,返回self
也就是Future
對象自己,也就是說執(zhí)行result= core.send(None)
最終掛起在新的Future
對象的yield self
處呀枢,返回得到了一個Future
對象賦值給result
胚股。即result
就是在sleep()
協(xié)程中新生成的一個Future
對象了。
我們看一下Future
對象的這個方法裙秋。
class Future:
...
def __iter__(self):
# self.done()返回False琅拌,
if not self.done():
self._asyncio_future_blocking = True
# 把Future對象自己返回出去
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
到此為止,result = core.send(None)
的調用得到了一個Future
對象摘刑,然后執(zhí)行流程繼續(xù)往下走也就是由于result是Future對象进宝,因此進入到_step
方法的第4.1步,這里看一下代碼片段
# 4.1. 如果result是一個future對象時枷恕,blocking會被設置成true
else:
result._asyncio_future_blocking = False
# 把_wakeup回調函數設置到此future對象中党晋,當此future對象調用set_result()方法時,就會調用_wakeup方法
result.add_done_callback(self._wakeup)
result.add_done_callback(self._wakeup)
實際上就是把task._wakeup
方法注冊到了新Futrue
對象的回調方法列表_callbacks = [task._wakeup,]
中,到此為止未玻,task._step
方法才徹底執(zhí)行完畢灾而。第一輪_run_once()
的調用執(zhí)行結束了。此時 loop._stopping = Fasle
扳剿,然后繼續(xù)執(zhí)行下一輪的_run_once()
旁趟。
此時:
cor
協(xié)程的執(zhí)行流程掛起在sleep
協(xié)程的中產生的新Future
對象的__iter__
方法的yield
處。cor
協(xié)程的執(zhí)行了cor
中標記的第1,2步舞终,第3,4步未執(zhí)行轻庆。新
Future
對象的_callbacks = [task._wakeup,]
。loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]
敛劝,loop._ready=[]
。
第二輪_run_once()
的調用執(zhí)行開始
進入到_run_once()
方法中纷宇,由于loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]
注意這里的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5注釋
第1,2步的邏輯判斷夸盟,
timeout = 2
第3步
event_list = self._selector.select(2)
,也就是說阻塞2s中像捶,注意上陕,此時因為我們編寫的那個cor
協(xié)程是沒有io事件的,是一個通過sleep
協(xié)程模擬耗時操作的拓春,不涉及到真正的io事件释簿,所以這個時候,selector.select(2)
會完整的阻塞2秒鐘硼莽。第4步 依次取出
_scheduled
的延遲回調函數handle
庶溶,放入到_ready
隊列中。第5步 依次從
_ready
隊列中取出延遲回調函數handle
懂鸵,執(zhí)行handle._run()
偏螺。
第5步中的回調函數就是sleep
協(xié)程中注冊到loop
對象的futures._set_result_unless_cancelled
函數
def _set_result_unless_cancelled(fut, result):
"""Helper setting the result only if the future was not cancelled."""
if fut.cancelled():
return
# fut就是sleep中新生成的Future對象,調用set_result()方法
fut.set_result(result)
Future
對象的set_result
方法
class Future:
...
def set_result(self, result):
if self._state != _PENDING:
raise InvalidStateError('{}: {!r}'.format(self._state, self))
self._result = result
self._state = _FINISHED
# 回調Future對象中添加的所有回調函數
self._schedule_callbacks()
def _schedule_callbacks(self):
callbacks = self._callbacks[:]
if not callbacks:
return
self._callbacks[:] = []
# 依次取出注冊到Future對象中的所有回調函數匆光,放入到loop._ready隊列中去套像,等待下一輪 _run_onece的調用時,執(zhí)行這些回調
for callback in callbacks:
self._loop.call_soon(callback, self)
fut.set_result(result)
--> fut._schedule_callbacks()
--> fut._loop.call_soon(callback, self)
终息, callback
實際上是新Future
對象的_callbacks
中的task._wakeup
方法夺巩,task._wakeup
又被添加到loop._ready
隊列中去了。到此為止handle._run()
執(zhí)行完畢周崭,第二輪的_run_once()
執(zhí)行完畢柳譬。
此時:
cor
協(xié)程的執(zhí)行流程掛起在sleep
協(xié)程的中產生的新Future
對象的__iter__
方法的yield
處。新
Future
對象的_callbacks = []
休傍。loop._ready = [handle(task._wakeup)]
征绎,loop._scheduled=[]
。
第三輪_run_once()
的調用執(zhí)行開始
注意這里的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5注釋
第1,2步的邏輯判斷,
timeout = 0
人柿。第3步
event_list = self._selector.select(0)
柴墩,也就是說立即返回。第4步 由于
loop._scheduled=[]
凫岖,因此不執(zhí)行第4步中的邏輯江咳。第5步 依次從
_ready
隊列中取出回調函數handle
,執(zhí)行handle._run()
執(zhí)行handle._run()
就是執(zhí)行task._wakeup()
哥放,又要回到task._wakeup()
代碼中看看
class Task(futures.Future):
def _wakeup(self, future):
try:
# future為sleep協(xié)程中生成的新的Future對象
future.result()
except Exception as exc:
# This may also be a cancellation.
self._step(exc)
else:
# 這里是關鍵代碼歼指,上次的_step()執(zhí)行到第一次碰到y(tǒng)ield的地方掛住了,此時再次執(zhí)行_step(),
# 也就是再次執(zhí)行 result = coro.send(None) 這句代碼甥雕,也就是從上次yield的地方繼續(xù)執(zhí)行yield后面的邏輯
self._step()
self = None # Needed to break cycles when an exception occurs.
調用task._wakeup()
實際上又是執(zhí)行task._step()
也就是再次執(zhí)行result = core.send(None)
這行代碼踩身,前面提到過,core
協(xié)程被掛起在Future
對象的__iter__
方法的yield
處社露,此時再次執(zhí)行result = core.send(None)
挟阻,就是執(zhí)行yield
后面的語句
class Future:
...
def __iter__(self):
# self.done()返回False,
if not self.done():
self._asyncio_future_blocking = True
# 把Future對象自己返回出去
yield self # This tells Task to wait for completion.
assert self.done(), "yield from wasn't used with future"
# 調用task._wakeup再次進入到core掛起的地方執(zhí)行yield后面的語句
return self.result() # May raise too.
if compat.PY35:
__await__ = __iter__ # make compatible with 'await' expression
self.result()
返回的是None
峭弟,所以 cor
協(xié)程的await asyncio.sleep(2)
返回的是None
附鸽,到此為止cor
協(xié)程的第三步await asyncio.sleep(2)
才真正的執(zhí)行完畢,也就是說sleep
協(xié)程執(zhí)行完畢了瞒瘸,然后繼續(xù)執(zhí)行cor
協(xié)程await下面的語句print('exit cor ...')
最后返回'cor'
坷备,到此為止cor
協(xié)程就完全執(zhí)行完畢了。
async def cor():
print('enter cor ...')
await asyncio.sleep(2) # 上次在這里掛起
print('exit cor ...')
return 'cor'
前面介紹了情臭,原生協(xié)程在執(zhí)行結束時會拋出StopIteration
異常省撑,并且把返回值存放在異常的的value
屬性中,因此在task._step()
的第2步捕捉到StopIteration
異常
# 2. coro執(zhí)行完畢會拋出StopIteration異常
except StopIteration as exc:
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
# exc.value是'cor'
# 調用task.set_result('cor')
self.set_result(exc.value)
task.set_result('cor')
其實就是把task
中的存放的回調函數又放到loop._ready
隊列中去谎柄,task
中的回調函數就是前面介紹的_run_until_complete_cb
函數丁侄。到此為止第3輪的_run_once()
執(zhí)行完畢。
此時:
cor
協(xié)程的執(zhí)行完畢朝巫。新
Future
對象的_callbacks = []
鸿摇。loop._ready = [handle(_run_until_complete_cb)]
,loop._scheduled=[]
劈猿。
第四輪_run_once()
開始執(zhí)行
注意這里的第1,2,3,4,5步是在_run_once()
上標記的1,2,3,4,5注釋
第1,2步的邏輯判斷拙吉,
timeout = 0
。第3步
event_list = self._selector.select(0)
揪荣,也就是說立即返回筷黔。第4步 由于
loop._scheduled=[]
,因此不執(zhí)行第4步中的邏輯仗颈。第5步 依次從
_ready
隊列中取出回調函數handle
佛舱,執(zhí)行handle._run()
執(zhí)行handle._run()
就是執(zhí)行_run_until_complete_cb(task)
def _run_until_complete_cb(fut):
exc = fut._exception
if (isinstance(exc, BaseException)
and not isinstance(exc, Exception)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
# fut就是task對象椎例,_loop.stop()就是把loop._stopping賦值為Ture
fut._loop.stop()
loop._stopping
為True
。第4輪_run_once()
執(zhí)行完畢请祖。
def run_forever(self):
...
try:
events._set_running_loop(self)
while True:
# 第4輪_run_once()結束
self._run_once()
# _stopping為true订歪,跳出循環(huán),run_forever()執(zhí)行結束
if self._stopping:
break
finally:
...
跳出while
循環(huán)肆捕, run_forever()
執(zhí)行結束刷晋,run_until_complete()
也就執(zhí)行完畢了,最后把cor
協(xié)程的返回值'cor'返回出來賦值給rst
變量慎陵。
到此為止所有整個task
任務執(zhí)行完畢眼虱,loop
循環(huán)關閉。
總結
loop._ready
和loop._scheduled
是loop
對象中非常重要的兩個屬性
loop._ready
是一個雙端隊列deque
席纽,用來存放調用loop.call_soon
方法時中放入的回調函數捏悬。loop._scheduled
是一個最小堆,根據延遲時間的大小構建的最小堆润梯,用來存放調用loop.call_later
時存放的延遲回調函數邮破。loop._scheduled
中有延遲調用函數是,timeout
被賦值為堆頂的延遲函數的等待時間仆救,這樣會使得select(timeout)
阻塞等待timeout
秒。到時間后loop._scheduled
中的回調函數最終還是會被轉移到loop._ready
隊列中去執(zhí)行矫渔。
每一個協(xié)程都會被封裝到一個task
對象中彤蔽,task
在初始化時就把task._step
方法添加到loop._ready
隊列中,同時添加一個停止loop
的事件循環(huán)的回調函數到task._callback
列表中庙洼。
task._step
函數就是驅動協(xié)程的顿痪,里面執(zhí)行cor.send(None)
,分兩種情況:
-
第一種:
cor
中有await
語句cor
執(zhí)行到await
處油够,await
的后面如果是另一個協(xié)程cor1
蚁袭,則進入到cor1
中執(zhí)行,cor1
執(zhí)行完畢石咬,如果cor1
返回的是None揩悄,
則task._step
方法會對result
進行判斷,返回None
鬼悠,執(zhí)行task.set_result()
删性,這個方法里
面會調取task._callback
列表中的所有回調方法,依次執(zhí)行焕窝,此時蹬挺,task._callback
列表中只注冊了一個停止loop
事件
循環(huán)的回調,此時就調用該回調函數它掂,把loop._stopping
設置成Ture巴帮,使得loop停止。如果
cor1
返回的是一個future
對象,也就是task._step
函數執(zhí)行到cor1
協(xié)程返回的一個cor1_future
時榕茧,則task._step
方
法會對result
進行判斷垃沦,返回類型是future
則對cor1_future
添加一個回調函數task._wakeup
,當cor1_future.set_result()
在某一時刻被調用時(比如像sleep協(xié)程n秒后被調用雪猪,或者正在的IO事件觸發(fā)的調用)栏尚,會調用cor1_future
添加的回調函數
也就是task._wakeup
函數,task._wakeup
里面又會調用task._step
來驅動上次cor
停止的位置只恨,也就是cor
的await
處译仗,更準確的說是cor1_future.__iter__
方法的yield self
處,繼續(xù)
執(zhí)行yield
后面的語句官觅,await cor1()
才算真正的執(zhí)行完畢纵菌,然后接著執(zhí)行await cor1()
下面的語句,直到cor
執(zhí)行完畢時會拋出StopIteration
異常休涤,cor
返回值會保存在StopIteration
異常對象的value
屬性中咱圆,與上面的邏輯一樣,task
會調用之前調用的停止loop
的回調函數功氨,停止loop
循環(huán) -
第二種:
cor
中沒有await
語句如果沒有await語句序苏,
task._step
執(zhí)行后,result = cor.send(None)
拋出StopIteration
異常捷凄,與
上面的邏輯一樣忱详,task
會調用之前調用的停止loop
的回調函數,停止loop
循環(huán)
本質上跺涤,在一次loop循環(huán)中會執(zhí)行一次result = cor.send(None)
語句匈睁,也就是cor
中的用戶書寫的邏輯,碰到await expr
桶错,再進
入到expr
語句航唆,直到碰到future
的yield
語句,這一次循環(huán)的result = cor.send(None)
邏輯才算執(zhí)行完成了院刁。如果
await
后壓根沒有返回future
糯钙,result = cor.send(None)
則會直接執(zhí)行完await expr
語句后再接著執(zhí)行await expr
下面的語句,直到
拋出StopIteration
異常黎比。
通過await future
或者yield from future
語句對cor
的執(zhí)行邏輯進行分割超营,yield
之前的語句被封裝到task._step
方法中,在一次loop
循環(huán)中被調用阅虫,yield
之后的邏輯封裝在task._wakeup
方法中演闭,在下一次的loop
循環(huán)中被執(zhí)行,而這個task._wakeup
是由future.set_result()
把它注冊到loop._ready
隊列中的颓帝。
sleep
協(xié)程模擬耗時IO
操作米碰,通過向loop中注冊一個延遲回調函數窝革,明確的控制select(timeout)
中的timeout
超時時間 + future
對象的延遲timeout
秒調用future.set_result()
函數來實現一個模擬耗時的操作。
這個其實每一個真實的耗時的IO
操作都會對應一個future
對象吕座。只不過sleep
中的回調明確的傳入了延遲回調的時間虐译,而真實的IO
操作時的future.set_result()
的調用則是由真實的IO
事件,也就是select(timeout)
返回的socket
對象的可讀或者可寫事件來觸發(fā)的吴趴,一旦有相應的事件產生漆诽,就會回調對應的可讀可寫事件的回調函數,而在這些回調函數中又會去觸發(fā)future.set_result()
方法锣枝。
(上面的總結都是根據本人自己的理解所寫厢拭,限于本人的表達能力有限,很多表達可能會產生一些歧義撇叁,還望各位看官一邊看此文一邊開啟debug調試供鸠,來幫助自己理解。)