asyncio源碼分析之基本執(zhí)行流程

原生協(xié)程是基于async關鍵字的

# 定義一個簡單的原生協(xié)程cor
async def cor():
    print('enter cor')
    print('exit cor')

print(type(cor))    # <class 'function'>
print(type(cor()))  # <class 'coroutine'>

可以看到cor的類型<class 'function'>函數類型,說明async關鍵字修飾的函數也是一個函數而已,跟普通函數在定義上沒啥什么差別耀里,差別在于被調用的時候坞靶,cor()并不是執(zhí)行函數體中的語句,而是生成一個<class 'coroutine'>類型的對象章鲤,即協(xié)程對象税产,跟生成器類似怕轿,協(xié)程也有send()方法。

c = cor()
    try:
        c.send(None)
    except StopIteration as e:
        print(e.value)  # None

當c.send(None)時辟拷,才會執(zhí)行cor中的語句撞羽,也就是執(zhí)行print('enter cor')print('exit cor')這兩句,就執(zhí)行完畢了衫冻,執(zhí)行完畢時會拋出StopIteration異常诀紊,這個異常對象的value屬性中保存著cor的返回值,這里core沒有return語句隅俘,其實默認返回None邻奠。

原生協(xié)程中使用 await 關鍵字

async def cor():
    print('enter cor')
    rst = await cor1()
    print('rst -->', rst)
    print('exit cor')


async def cor1():
    print('enter cor1')
    print('exit cor1')
    return 'cor1'


c = cor()
try:
    c.send(None)
except StopIteration as e:
    print('cor -->', e.value)  # None

輸出如下:

enter cor
enter cor1
exit cor1
rst --> cor1
exit cor
cor --> None

await關鍵字后面必須是一個實現了__awwit__方法的對象,不一定是協(xié)程對象为居,只要實現了這個方法碌宴,就會進入到此方法中,調用該方法中的邏輯颜骤。比如后面要介紹的Future對象唧喉,它也是實現了__await__方法的。

await 關鍵字的語義是表示掛起當前的cor協(xié)程的執(zhí)行,進入到cor1協(xié)程中執(zhí)行cor1的邏輯八孝,直到cor1執(zhí)行完畢董朝,然后執(zhí)行流程又回到cor掛起的地方,繼續(xù)執(zhí)行corawait后面的語句干跛。直到最后拋出StopIteration異常子姜。

基于生成器的協(xié)程,也就是非原生協(xié)程

import types


async def cor():
    print('enter cor')
    rst = await cor2()
    print('rst --> ', rst)
    print('exit cor')


@types.coroutine
def cor2():
    print('enter cor2')
    yield
    print('exit cor2')
    return 'cor2'


c = cor()
try:
    c.send(None)
    c.send(None)
except StopIteration as e:
    print('cor -->', e.value)  # None

輸出如下:

enter cor
enter cor2
exit cor2
rst -->  cor2
exit cor
cor --> None

與上面的原生協(xié)程的嵌套不同的是楼入,調用了兩次c.send(None)哥捕,執(zhí)行第一次c.send(None)時,會在cor2yield關鍵字處掛起嘉熊,第二次c.send(None)則會在yield掛起的地方遥赚,接著往下執(zhí)行,然后core2返回'cor2'阐肤,賦值給rst變量凫佛,繼續(xù)執(zhí)行corawait后面的語句,直到最后拋出StopIteration異常孕惜。

總結

async 是一個關鍵字愧薛,async def 定義的類型還是一個function類型,只有當它被調用時才返回一個協(xié)程對象
async defdef定義的方法在沒被調用時沒有任何區(qū)別衫画,不必看得很神秘毫炉,它也可以有return語句,這點也正常削罩,
因為python中沒有return語句的函數實際上默認是返回None的瞄勾,所以只是顯式的return和隱式return None的區(qū)別

對于協(xié)程的send(None)方法,跟生成器的send(None)類似鲸郊,不同的是丰榴,原生協(xié)程的send方法被調用的時候,會一直執(zhí)行到碰
await語句秆撮,但是不會停下來,會直接再進入到await EXPREXPR中换况,其實EXPR是一個awaitable對象职辨,會調用該對象的
__await__()執(zhí)行該方法的里面的邏輯,如果該awaitable對象是一個原生協(xié)程對象戈二,那么它的__await__()方法中的邏輯就
是在定義此協(xié)程時async def 下面的邏輯舒裤,執(zhí)行完畢后,該協(xié)程對象就關閉了觉吭,執(zhí)行流程就再次跳轉到當前掛起的協(xié)程中腾供,
執(zhí)行該協(xié)程中余下的邏輯,最后執(zhí)行完畢,拋出StopIteration異常。

對于原生生協(xié)程來說,調用send()方法時蒋得,會一直執(zhí)行到出現StopIteration異常為止夭禽,只有在 __await__()方法中有yield語句時才
會掛起在那里,如果__await__()方法中沒有yield語句箍镜,不會掛起,會返回await的返回值,繼續(xù)執(zhí)行匿乃,直到拋出StopIteration異常。

先拋出一個結論豌汇,在asyncio中協(xié)程的流程的掛起操作幢炸,實際上還是是通過yield關鍵字來實現的,并不是await關鍵字拒贱, asyncawait關鍵字只不過是語法糖宛徊。

asyncio的基本流程分析

import asyncio


async def cor():
    print('enter cor ...')
    await asyncio.sleep(2)
    print('exit cor ...')
    
    return 'cor'

loop = asyncio.get_event_loop()
task = loop.create_task(cor())
rst = loop.run_until_complete(task)
print(rst)

從這個簡單的例子入手,逐步分析協(xié)程在事件循環(huán)中的執(zhí)行流程柜思。

  • 第1步 async def cor()定義了一個cor協(xié)程岩调。

  • 第2步 loop = asyncio.get_event_loop()得到一個事件循環(huán)對象loop,這個loop在一個線程中只有唯一的一個實例赡盘,只要在同一個線程中調用此方法号枕,得到的都是同一個loop對象。

  • 第3步 task = loop.create_task(cor())cor協(xié)程包裝成一個task對象陨享。

  • 第4步 rst = loop.run_until_complete(task)把這個task對象添加到事件循環(huán)中去葱淳。

首先看第3步的loop.create_task(cor())這個方法

class BaseEventLoop(events.AbstractEventLoop):

    ...

    def __init__(self):
        ...
        # 用來保存包裹task.step方法的handle對象的對端隊列
        self._ready = collections.deque()
        # 用來保存包裹延遲回調函數的handle對象的二叉堆,是一個最小二叉堆
        self._scheduled = []
        ...

    def create_task(self, coro):
        """Schedule a coroutine object.

        Return a task object.
        """
        self._check_closed()
        # self._task_factory 默認是None
        if self._task_factory is None:
            # 創(chuàng)建一個task對象
            task = tasks.Task(coro, loop=self)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        # 返回這個task對象
        return task

    def call_soon(self, callback, *args):

        self._check_closed()
        if self._debug:
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        # 關鍵代碼callback就是task._step方法抛姑,args是task._step的參數
        handle = self._call_soon(callback, args)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle

    def _call_soon(self, callback, args):
        # 1 handle是一個包裹了task._step方法和args參數的對象
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        # 2 關鍵代碼赞厕,把handle添加到列表self._ready中
        self._ready.append(handle)
        return handle

loop.create_task(cor())實際上是創(chuàng)建了一個Task類的實例。再來看一下Task這個類

class Task(futures.Future):

    ...

    def __init__(self, coro, *, loop=None):
        assert coroutines.iscoroutine(coro), repr(coro)
        # 調用父類的__init__獲得線程中唯一的loop對象
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        self._coro = coro
        self._fut_waiter = None
        self._must_cancel = False
        # 關鍵代碼定硝,把_step方法注冊到loop對象中去
        self._loop.call_soon(self._step)
        self.__class__._all_tasks.add(self)

    def _step(self, exc=None):
        ...
        result = coro.send(None)
        ...

task實例化時皿桑,調用了self._loop.call_soon(self._step) --> loop.call_soon(callback, *args) --> loop._call_soon(callback, args),實際上是把handle(task._step)這個對象放到了loop._ready隊列中蔬啡,放在這個隊列中有什么用呢诲侮?先告訴大家,_step方法會在loop對象的循環(huán)中被調用箱蟆,也就是會執(zhí)行coro.send(None)這句沟绪。coro.send(None)實際上就是執(zhí)行上面定義的cor協(xié)程的里面的語句。

也就是說到第3步執(zhí)行完時空猜,loop對象已經實例化绽慈,task對象也實例化恨旱,并且task對象的_step方法被封裝成handle對象放入了loop對象的_ready隊列中去了。

再來看第4步loop.run_until_complete(task)

class BaseEventLoop(events.AbstractEventLoop):

    def run_until_complete(self, future):
        ...
        
        # future就是task對象坝疼,下面2句是為了確保future是一個Future類實例對象
        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
            
        # 添加回調方法_run_until_complete_cb到當前的task對象的callbacks列表中搜贤,_run_until_complete_cb就是最后
        # 把loop的_stop屬性設置為ture的,用來結束loop循環(huán)的
        future.add_done_callback(_run_until_complete_cb)
        try:
            # 開啟無線循環(huán)
            self.run_forever()
        except:
            ...
            raise
        finally:
            ...
        # 執(zhí)行完畢返回cor的返回值
        return future.result()

    def run_forever(self):

        ...

        try:
            events._set_running_loop(self)
            while True:
                # 每次運行一次循環(huán)裙士,判斷下_stopping是否為true入客,也就是是否結束循環(huán)
                self._run_once()
                if self._stopping:
                    break
        finally:
            ...

    def _run_once(self):

        # loop的_scheduled是一個最小二叉堆,用來存放延遲執(zhí)行的回調函數腿椎,根據延遲的大小桌硫,把這些回調函數構成一個最小堆,然后再每次從對頂彈出延遲最小的回調函數放入_ready雙端隊列中啃炸,
        # loop的_ready是雙端隊列铆隘,所有注冊到loop的回調函數,最終是要放入到這個隊列中南用,依次取出然后執(zhí)行的
        # 1. self._scheduled是否為空
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False
        
        # 2. 給timeout賦值膀钠,self._scheduled為空,timeout就為None
        timeout = None
        # 只要self._ready和self._scheduled中有一個不為空裹虫,timeout就為0
        if self._ready or self._stopping:
            timeout = 0
        # 只要self._scheduled不為空
        elif self._scheduled:
            # Compute the desired timeout.
            # 用堆頂的回調函數的延遲時間作為timeout的等待時間肿嘲,也就是說用等待時間最短的回調函數的時間作為timeout的等待時間
            when = self._scheduled[0]._when
            timeout = max(0, when - self.time())
        、
        
        if self._debug and timeout != 0:
            ...
        # 3. 關注else分支筑公,這是關鍵代碼
        else:
            # timeout=None --> 一直阻塞雳窟,只要有io事件產生,立馬返回event_list事件列表匣屡,否則一直阻塞著
            # timeout=0 --> 不阻塞封救,有io事件產生,就立馬返回event_list事件列表捣作,沒有也返空列表
            # timeout=2 --> 阻塞等待2s誉结,在這2秒內只要有io事件產生,立馬返回event_list事件列表券躁,沒有io事件就阻塞2s惩坑,然后返回空列表
            event_list = self._selector.select(timeout)
            
        #  用來處理真正的io事件的函數
        self._process_events(event_list)

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution
        # 4. 依次取出堆頂的回調函數handle添加到_ready隊列中
        while self._scheduled:
            handle = self._scheduled[0]
            # 當_scheduled[]中有多個延遲回調時,通過handle._when >= end_time來阻止沒有到時間的延遲函數被彈出也拜,
            # 也就是說旭贬,當有n個延遲回調時,會產生n個timeout搪泳,對應n次run_once循環(huán)的調用
            if handle._when >= end_time:
                break
            # 從堆中彈出堆頂最小的回調函數,放入 _ready 隊列中
            handle = heapq.heappop(self._scheduled)
            handle._scheduled = False
            self._ready.append(handle)
        
        # 5. 執(zhí)行self._ready隊列中所有的回調函數handle對象
        ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                # handle._run()實際上就是執(zhí)行task._step()扼脐,也就是執(zhí)行cor.send(None)
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

執(zhí)行run_until_complete方法時future.add_done_callback(_run_until_complete_cb)這一步其實是task.add_done_callback(_run_until_complete_cb)岸军,也就是把_run_until_complete_cb回調函數注冊到task對象中去了奋刽,這個回調函數的作用是當cor協(xié)程執(zhí)行完畢時,回調_run_until_complete_cb把loop對象的 _stopping屬性設為True艰赞,然后_run_once執(zhí)行完畢時佣谐,判斷_stoppingTrue就跳出while循環(huán),run_until_complete才能返回task.result

從上面的函數調用流程

run_until_complete() --> run_forever() --> _run_once()方妖,重點看_run_once這個方法的執(zhí)行流程狭魂。

此時:

  • cor協(xié)程還未開始執(zhí)行。

  • loop._ready = [handle(task._step)]党觅,loop._scheduled = []雌澄。

第一輪_run_once()的調用執(zhí)行開始

注意這里的第1,2,3,4,5步是在_run_once()上標記的1,2,3,4,5注釋

  • 第1,2步的邏輯判斷,timeout = 0杯瞻。

  • 第3步 event_list = self._selector.select(0)镐牺,此時立馬返回空[]。

  • 第4步 由于loop._scheduled = []魁莉,不執(zhí)行第4步中的語句睬涧。

  • 第5步 依次從_ready隊列中取出回調函數handle旗唁,執(zhí)行handle._run()

執(zhí)行handle._run()方法剿牺,也就是調用task._step()湃崩,來看看task._step()的執(zhí)行邏輯:

class Task(futures.Future):
    
    ...
    
    def _step(self, exc=None):
        """
        _step方法可以看做是task包裝的coroutine對象中的代碼的直到y(tǒng)ield的前半部分邏輯
        """
        ...
        try:
            if exc is None:
                
                # 1.關鍵代碼
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        # 2. coro執(zhí)行完畢會拋出StopIteration異常
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                # result為None時剪返,調用task的callbasks列表中的回調方法钱反,在調用loop.run_until_complite耳峦,結束loop循環(huán)
                self.set_result(exc.value)
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            self.set_exception(exc)
        except BaseException as exc:
            self.set_exception(exc)
            raise
        # 3. result = coro.send(None)不拋出異常
        else:
            # 4. 查看result是否含有_asyncio_future_blocking屬性
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if result._loop is not self._loop:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'Task {!r} got Future {!r} attached to a '
                            'different loop'.format(self, result)))
                
                elif blocking:
                    if result is self:
                        self._loop.call_soon(
                            self._step,
                            RuntimeError(
                                'Task cannot await on itself: {!r}'.format(
                                    self)))
                    # 4.1. 如果result是一個future對象時疙咸,blocking會被設置成true
                    else:
                        result._asyncio_future_blocking = False
                        # 把_wakeup回調函數設置到此future對象中兰粉,當此future對象調用set_result()方法時,就會調用_wakeup方法
                        result.add_done_callback(self._wakeup)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'yield was used instead of yield from '
                            'in task {!r} with {!r}'.format(self, result)))
            # 5. 如果result是None备蚓,則注冊task._step到loop對象中去,在下一輪_run_once中被回調
            elif result is None:
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self._step)

            # --------下面的代碼可以暫時不關注了--------
            elif inspect.isgenerator(result):
                # Yielding a generator is just wrong.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'yield was used instead of yield from for '
                        'generator in task {!r} with {}'.format(
                            self, result)))
            else:
                # Yielding something else is an error.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'Task got bad yield: {!r}'.format(result)))
        finally:
            self.__class__._current_tasks.pop(self._loop)
            self = None  # Needed to break cycles when an exception occurs.

    def _wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(exc)
        else:
            
            # 這里是關鍵代碼烤黍,上次的_step()執(zhí)行到第一次碰到y(tǒng)ield的地方掛住了唉锌,此時再次執(zhí)行_step(),
            # 也就是再次執(zhí)行 result = coro.send(None) 這句代碼,也就是從上次yield的地方繼續(xù)執(zhí)行yield后面的邏輯
            self._step()
        self = None  # Needed to break cycles when an exception occurs.

當task._step()執(zhí)行時袄简,調用core.send(None)腥放,即調用:

async def cor():
    # 1.
    print('enter cor ...')
    # 2.
    await asyncio.sleep(2)
    # 3.
    print('exit cor ...')
    # 4.
    return 'cor'

注意這里的第1,2,3,4步是在cor上標記的1,2,3,4注釋

  • 第1步 print('enter cor ...')

  • 第2步 await asyncio.sleep(2)sleep是一個非原生協(xié)程痘番,前面介紹過捉片,await語句掛起當前的協(xié)程也就是cor,然后會進入到sleep協(xié)程中的汞舱。注意伍纫,此時執(zhí)行流程已經在sleep協(xié)程中了,我們來看一下sleep協(xié)程的代碼邏輯昂芜。

看一下sleep協(xié)程實現

@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay == 0:
        yield
        return result

    if loop is None:
        loop = events.get_event_loop()
    # 1. 創(chuàng)建一個future對象莹规,用來銜接前一個task
    future = loop.create_future()
    # 2. 添加一個延遲執(zhí)行的回調函數futures._set_result_unless_cancelled 到當前l(fā)oop對象的_scheduled二叉堆中,這個堆中的回調函數按照delay的大小泌神,形成一個最小堆
    h = future._loop.call_later(delay,
                                futures._set_result_unless_cancelled,
                                future, result)
    try:
        # 3. 執(zhí)行 yield from future 語句良漱,此時會調用future的 __iter__() 方法舞虱,然后在 yield future 處掛住,返回future本身母市,self._asyncio_future_blocking = True
        return (yield from future)
    finally:
        h.cancel()

sleep是一個非原生協(xié)程矾兜,delay=2

注意這里的第1,2,3步是在sleep上標記的1,2,3注釋

  • 第1步 生成一個新Future對象,這個對象不同于跟task是不同的對象患久,他們都是Future類型的對象椅寺,因為Task類繼承自Future類。

  • 第2步 loop對象中注冊了一個futures._set_result_unless_cancelled的延遲回調函數handle
    對象蒋失,前面介紹過返帕,延遲回調函數handle對象是放在loop._scheduled這個最小二叉堆中的,此時篙挽,loop對象的_scheduled最小堆中只有一個延遲回調函數handle荆萤。到sleep中的第2步完成為止,loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]铣卡,loop._ready=[]链韭,注意正在執(zhí)行的handle._run()的流程還沒走完,現在是進入到了sleep協(xié)程中的第2步中算行。

  • 第3步 執(zhí)行(yield from future)會調用future__iter__方法梧油,進入到__iter__方法中,首先把self._asyncio_future_blocking 賦值為 True了州邢,儡陨,然后yield self,注意量淌,此時cor協(xié)程的執(zhí)行流程就掛起在了yield處骗村,返回self也就是Future對象自己,也就是說執(zhí)行result= core.send(None)最終掛起在新的Future對象的yield self處呀枢,返回得到了一個Future對象賦值給result胚股。即result就是在sleep()協(xié)程中新生成的一個Future對象了。

我們看一下Future對象的這個方法裙秋。

class Future:

    ...

    def __iter__(self):
        # self.done()返回False琅拌,
        if not self.done():
            self._asyncio_future_blocking = True
            # 把Future對象自己返回出去
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        return self.result()  # May raise too.

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

到此為止,result = core.send(None)的調用得到了一個Future對象摘刑,然后執(zhí)行流程繼續(xù)往下走也就是由于result是Future對象进宝,因此進入到_step方法的第4.1步,這里看一下代碼片段

# 4.1. 如果result是一個future對象時枷恕,blocking會被設置成true
else:
    result._asyncio_future_blocking = False
    # 把_wakeup回調函數設置到此future對象中党晋,當此future對象調用set_result()方法時,就會調用_wakeup方法
    result.add_done_callback(self._wakeup)

result.add_done_callback(self._wakeup)實際上就是把task._wakeup方法注冊到了新Futrue對象的回調方法列表_callbacks = [task._wakeup,]中,到此為止未玻,task._step方法才徹底執(zhí)行完畢灾而。第一輪_run_once()的調用執(zhí)行結束了。此時 loop._stopping = Fasle扳剿,然后繼續(xù)執(zhí)行下一輪的_run_once()旁趟。

此時:

  • cor協(xié)程的執(zhí)行流程掛起在sleep協(xié)程的中產生的新Future對象的__iter__方法的yield處。cor協(xié)程的執(zhí)行了cor中標記的第1,2步舞终,第3,4步未執(zhí)行轻庆。

  • Future對象的_callbacks = [task._wakeup,]

  • loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]敛劝,loop._ready=[]

第二輪_run_once()的調用執(zhí)行開始

進入到_run_once()方法中纷宇,由于loop._scheduled=[handle(delay_2s__set_result_unless_cancelled)]

注意這里的第1,2,3,4,5步是在_run_once()上標記的1,2,3,4,5注釋

  • 第1,2步的邏輯判斷夸盟,timeout = 2

  • 第3步 event_list = self._selector.select(2),也就是說阻塞2s中像捶,注意上陕,此時因為我們編寫的那個cor協(xié)程是沒有io事件的,是一個通過sleep協(xié)程模擬耗時操作的拓春,不涉及到真正的io事件释簿,所以這個時候,selector.select(2)會完整的阻塞2秒鐘硼莽。

  • 第4步 依次取出 _scheduled的延遲回調函數handle庶溶,放入到 _ready隊列中。

  • 第5步 依次從_ready隊列中取出延遲回調函數handle懂鸵,執(zhí)行handle._run()偏螺。

第5步中的回調函數就是sleep協(xié)程中注冊到loop對象的futures._set_result_unless_cancelled函數

def _set_result_unless_cancelled(fut, result):
    """Helper setting the result only if the future was not cancelled."""
    if fut.cancelled():
        return
    # fut就是sleep中新生成的Future對象,調用set_result()方法
    fut.set_result(result)

Future對象的set_result方法

class Future:
    
    ...

    def set_result(self, result):

        if self._state != _PENDING:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = _FINISHED
        # 回調Future對象中添加的所有回調函數
        self._schedule_callbacks()

    def _schedule_callbacks(self):

        callbacks = self._callbacks[:]
        if not callbacks:
            return

        self._callbacks[:] = []
        # 依次取出注冊到Future對象中的所有回調函數匆光,放入到loop._ready隊列中去套像,等待下一輪 _run_onece的調用時,執(zhí)行這些回調
        for callback in callbacks:
            self._loop.call_soon(callback, self)

fut.set_result(result) --> fut._schedule_callbacks() --> fut._loop.call_soon(callback, self)终息, callback實際上是新Future對象的_callbacks 中的task._wakeup方法夺巩,task._wakeup又被添加到loop._ready隊列中去了。到此為止handle._run()執(zhí)行完畢周崭,第二輪的_run_once()執(zhí)行完畢柳譬。

此時:

  • cor協(xié)程的執(zhí)行流程掛起在sleep協(xié)程的中產生的新Future對象的__iter__方法的yield處。

  • Future對象的_callbacks = []休傍。

  • loop._ready = [handle(task._wakeup)]征绎, loop._scheduled=[]

第三輪_run_once()的調用執(zhí)行開始

注意這里的第1,2,3,4,5步是在_run_once()上標記的1,2,3,4,5注釋

  • 第1,2步的邏輯判斷,timeout = 0人柿。

  • 第3步 event_list = self._selector.select(0)柴墩,也就是說立即返回。

  • 第4步 由于loop._scheduled=[]凫岖,因此不執(zhí)行第4步中的邏輯江咳。

  • 第5步 依次從_ready隊列中取出回調函數handle,執(zhí)行handle._run()

執(zhí)行handle._run()就是執(zhí)行task._wakeup()哥放,又要回到task._wakeup()代碼中看看

class Task(futures.Future):

    def _wakeup(self, future):
        try:
            # future為sleep協(xié)程中生成的新的Future對象
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(exc)
        else:
            
            # 這里是關鍵代碼歼指,上次的_step()執(zhí)行到第一次碰到y(tǒng)ield的地方掛住了,此時再次執(zhí)行_step(),
            # 也就是再次執(zhí)行 result = coro.send(None) 這句代碼甥雕,也就是從上次yield的地方繼續(xù)執(zhí)行yield后面的邏輯
            self._step()
        self = None  # Needed to break cycles when an exception occurs.

調用task._wakeup()實際上又是執(zhí)行task._step()也就是再次執(zhí)行result = core.send(None)這行代碼踩身,前面提到過,core協(xié)程被掛起在Future對象的__iter__方法的yield處社露,此時再次執(zhí)行result = core.send(None)挟阻,就是執(zhí)行yield后面的語句

class Future:

    ...

    def __iter__(self):
        # self.done()返回False,
        if not self.done():
            self._asyncio_future_blocking = True
            # 把Future對象自己返回出去
            yield self  # This tells Task to wait for completion.
        assert self.done(), "yield from wasn't used with future"
        # 調用task._wakeup再次進入到core掛起的地方執(zhí)行yield后面的語句
        return self.result()  # May raise too.

    if compat.PY35:
        __await__ = __iter__ # make compatible with 'await' expression

self.result() 返回的是None峭弟,所以 cor協(xié)程的await asyncio.sleep(2)返回的是None附鸽,到此為止cor協(xié)程的第三步await asyncio.sleep(2)才真正的執(zhí)行完畢,也就是說sleep協(xié)程執(zhí)行完畢了瞒瘸,然后繼續(xù)執(zhí)行cor協(xié)程await下面的語句print('exit cor ...')最后返回'cor'坷备,到此為止cor協(xié)程就完全執(zhí)行完畢了。

async def cor():
    print('enter cor ...')
    await asyncio.sleep(2) # 上次在這里掛起
    print('exit cor ...')
    
    return 'cor'

前面介紹了情臭,原生協(xié)程在執(zhí)行結束時會拋出StopIteration異常省撑,并且把返回值存放在異常的的value屬性中,因此在task._step()的第2步捕捉到StopIteration異常

    # 2. coro執(zhí)行完畢會拋出StopIteration異常
    except StopIteration as exc:
        if self._must_cancel:
            # Task is cancelled right before coro stops.
            self._must_cancel = False
            self.set_exception(futures.CancelledError())
        else:
            # exc.value是'cor'
            # 調用task.set_result('cor')
            self.set_result(exc.value)

task.set_result('cor')其實就是把task中的存放的回調函數又放到loop._ready隊列中去谎柄,task中的回調函數就是前面介紹的_run_until_complete_cb函數丁侄。到此為止第3輪的_run_once()執(zhí)行完畢。

此時:

  • cor協(xié)程的執(zhí)行完畢朝巫。

  • Future對象的_callbacks = []鸿摇。

  • loop._ready = [handle(_run_until_complete_cb)]loop._scheduled=[]劈猿。

第四輪_run_once()開始執(zhí)行

注意這里的第1,2,3,4,5步是在_run_once()上標記的1,2,3,4,5注釋

  • 第1,2步的邏輯判斷拙吉,timeout = 0

  • 第3步 event_list = self._selector.select(0)揪荣,也就是說立即返回筷黔。

  • 第4步 由于loop._scheduled=[],因此不執(zhí)行第4步中的邏輯仗颈。

  • 第5步 依次從_ready隊列中取出回調函數handle佛舱,執(zhí)行handle._run()

執(zhí)行handle._run()就是執(zhí)行_run_until_complete_cb(task)

def _run_until_complete_cb(fut):
    exc = fut._exception
    if (isinstance(exc, BaseException)
    and not isinstance(exc, Exception)):
        # Issue #22429: run_forever() already finished, no need to
        # stop it.
        return
    # fut就是task對象椎例,_loop.stop()就是把loop._stopping賦值為Ture
    fut._loop.stop()

loop._stoppingTrue。第4輪_run_once()執(zhí)行完畢请祖。

def run_forever(self):
        ...
        try:
            events._set_running_loop(self)
            while True:
                # 第4輪_run_once()結束
                self._run_once()
                # _stopping為true订歪,跳出循環(huán),run_forever()執(zhí)行結束
                if self._stopping:
                    break
        finally:
            ...

跳出while循環(huán)肆捕, run_forever()執(zhí)行結束刷晋,run_until_complete()也就執(zhí)行完畢了,最后把cor協(xié)程的返回值'cor'返回出來賦值給rst變量慎陵。

到此為止所有整個task任務執(zhí)行完畢眼虱,loop循環(huán)關閉。

總結

loop._readyloop._scheduledloop對象中非常重要的兩個屬性

  • loop._ready是一個雙端隊列deque席纽,用來存放調用loop.call_soon方法時中放入的回調函數捏悬。

  • loop._scheduled是一個最小堆,根據延遲時間的大小構建的最小堆润梯,用來存放調用loop.call_later時存放的延遲回調函數邮破。

  • loop._scheduled中有延遲調用函數是,timeout被賦值為堆頂的延遲函數的等待時間仆救,這樣會使得select(timeout)阻塞等待timeout秒。到時間后loop._scheduled中的回調函數最終還是會被轉移到loop._ready隊列中去執(zhí)行矫渔。

每一個協(xié)程都會被封裝到一個task對象中彤蔽,task在初始化時就把task._step方法添加到loop._ready隊列中,同時添加一個停止loop的事件循環(huán)的回調函數到task._callback列表中庙洼。

task._step函數就是驅動協(xié)程的顿痪,里面執(zhí)行cor.send(None),分兩種情況:

  • 第一種:cor中有await語句

    cor執(zhí)行到await處油够,await的后面如果是另一個協(xié)程cor1蚁袭,則進入到cor1中執(zhí)行,cor1執(zhí)行完畢石咬,如果cor1返回的是None揩悄,
    task._step方法會對result進行判斷,返回None鬼悠,執(zhí)行task.set_result()删性,這個方法里
    面會調取task._callback列表中的所有回調方法,依次執(zhí)行焕窝,此時蹬挺,task._callback列表中只注冊了一個停止loop事件
    循環(huán)的回調,此時就調用該回調函數它掂,把loop._stopping設置成Ture巴帮,使得loop停止。

    如果cor1返回的是一個future對象,也就是task._step函數執(zhí)行到cor1協(xié)程返回的一個cor1_future時榕茧,則task._step
    法會對result進行判斷垃沦,返回類型是future則對cor1_future添加一個回調函數task._wakeup,當cor1_future.set_result()
    在某一時刻被調用時(比如像sleep協(xié)程n秒后被調用雪猪,或者正在的IO事件觸發(fā)的調用)栏尚,會調用cor1_future添加的回調函數
    也就是task._wakeup函數,task._wakeup里面又會調用task._step來驅動上次cor停止的位置只恨,也就是corawait處译仗,更準確的說是cor1_future.__iter__方法的yield self處,繼續(xù)
    執(zhí)行yield后面的語句官觅,await cor1()才算真正的執(zhí)行完畢纵菌,然后接著執(zhí)行await cor1()下面的語句,直到cor執(zhí)行完畢時會拋出StopIteration異常休涤,cor返回值會保存在StopIteration異常對象的value
    屬性中咱圆,與上面的邏輯一樣,task會調用之前調用的停止loop的回調函數功氨,停止loop循環(huán)

  • 第二種:cor中沒有await語句

    如果沒有await語句序苏,task._step執(zhí)行后,result = cor.send(None)拋出StopIteration異常捷凄,與
    上面的邏輯一樣忱详,task會調用之前調用的停止loop的回調函數,停止loop循環(huán)

本質上跺涤,在一次loop循環(huán)中會執(zhí)行一次result = cor.send(None)語句匈睁,也就是cor中的用戶書寫的邏輯,碰到await expr桶错,再進
入到expr語句航唆,直到碰到futureyield語句,這一次循環(huán)的result = cor.send(None)邏輯才算執(zhí)行完成了院刁。如果
await后壓根沒有返回future糯钙,result = cor.send(None)則會直接執(zhí)行完await expr語句后再接著執(zhí)行await expr下面的語句,直到
拋出StopIteration異常黎比。

通過await future或者yield from future語句對cor的執(zhí)行邏輯進行分割超营,yield之前的語句被封裝到task._step方法中,在一次loop循環(huán)中被調用阅虫,yield之后的邏輯封裝在task._wakeup方法中演闭,在下一次的loop循環(huán)中被執(zhí)行,而這個task._wakeup是由future.set_result()把它注冊到loop._ready隊列中的颓帝。

sleep協(xié)程模擬耗時IO操作米碰,通過向loop中注冊一個延遲回調函數窝革,明確的控制select(timeout)中的timeout超時時間 + future對象的延遲timeout秒調用future.set_result()函數來實現一個模擬耗時的操作。

這個其實每一個真實的耗時的IO操作都會對應一個future對象吕座。只不過sleep中的回調明確的傳入了延遲回調的時間虐译,而真實的IO操作時的future.set_result()的調用則是由真實的IO事件,也就是select(timeout)返回的socket對象的可讀或者可寫事件來觸發(fā)的吴趴,一旦有相應的事件產生漆诽,就會回調對應的可讀可寫事件的回調函數,而在這些回調函數中又會去觸發(fā)future.set_result()方法锣枝。

(上面的總結都是根據本人自己的理解所寫厢拭,限于本人的表達能力有限,很多表達可能會產生一些歧義撇叁,還望各位看官一邊看此文一邊開啟debug調試供鸠,來幫助自己理解。)

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末陨闹,一起剝皮案震驚了整個濱河市楞捂,隨后出現的幾起案子,更是在濱河造成了極大的恐慌趋厉,老刑警劉巖寨闹,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異君账,居然都是意外死亡鼻忠,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進店門杈绸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人矮瘟,你說我怎么就攤上這事瞳脓。” “怎么了澈侠?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵劫侧,是天一觀的道長。 經常有香客問我哨啃,道長烧栋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任拳球,我火速辦了婚禮审姓,結果婚禮上,老公的妹妹穿的比我還像新娘祝峻。我一直安慰自己魔吐,他們只是感情好扎筒,可當我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著酬姆,像睡著了一般嗜桌。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辞色,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天骨宠,我揣著相機與錄音,去河邊找鬼相满。 笑死层亿,一個胖子當著我的面吹牛,可吹牛的內容都是我干的雳灵。 我是一名探鬼主播棕所,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼悯辙!你這毒婦竟也來了琳省?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤躲撰,失蹤者是張志新(化名)和其女友劉穎针贬,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體拢蛋,經...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡桦他,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了谆棱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片快压。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖垃瞧,靈堂內的尸體忽然破棺而出蔫劣,到底是詐尸還是另有隱情,我是刑警寧澤个从,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布脉幢,位于F島的核電站,受9級特大地震影響嗦锐,放射性物質發(fā)生泄漏嫌松。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抵恋,春花似錦、人聲如沸外驱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽昵宇。三九已至磅崭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間瓦哎,已是汗流浹背砸喻。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蒋譬,地道東北人割岛。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像犯助,于是被迫代替她去往敵國和親癣漆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內容