從頭造輪子:python3 asyncio之 gather (3)

前言

書接上文:喧笔,本文造第三個輪子,也是asyncio包里面非常常用的一個函數(shù)gather

一我注、知識準(zhǔn)備

● 相對于前兩個函數(shù)捆憎,gather的使用頻率更高舅柜,因?yàn)樗С侄鄠€協(xié)程任務(wù)“同時”執(zhí)行
● 理解__await__ __iter__的使用
● 理解關(guān)鍵字async/await,async/await是3.5之后的語法躲惰,和yield/yield from異曲同工
● 今天的文章有點(diǎn)長致份,請大家耐心看完


二、環(huán)境準(zhǔn)備

組件 版本
python 3.7.7


三础拨、gather的實(shí)現(xiàn)

先來看下官方gather的使用方法:

|># more main.py
import asyncio

async def hello():
    print('enter hello ...')
    return 'return hello ...'

async def world():
    print('enter world ...')
    return 'return world ...'

async def helloworld():
    print('enter helloworld')
    ret = await asyncio.gather(hello(), world())
    print('exit helloworld')
    return ret

if __name__ == "__main__":
    ret = asyncio.run(helloworld())
    print(ret)
    
|># python3 main.py
enter helloworld
enter hello ...
enter world ...
exit helloworld
['return hello ...', 'return world ...']

來看下造的輪子的使用方式:

? more main.py
import wilsonasyncio

async def hello():
    print('enter hello ...')
    return 'return hello ...'

async def world():
    print('enter world ...')
    return 'return world ...'

async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret


if __name__ == "__main__":
    ret = wilsonasyncio.run(helloworld())
    print(ret)

    
? python3 main.py
enter helloworld
enter hello ...
enter world ...
exit helloworld
['return hello ...', 'return world ...']

自己造的輪子也很好的運(yùn)行了氮块,下面我們來看下輪子的代碼

四、代碼解析

輪子代碼

1)代碼組成

|># tree
.
├── eventloops.py 
├── futures.py
├── main.py
├── tasks.py
├── wilsonasyncio.py
文件 作用
eventloops.py 事件循環(huán)
futures.py futures對象
tasks.py tasks對象
wilsonasyncio.py 可調(diào)用方法集合
main.py 入口

2)代碼概覽:

eventloops.py

類/函數(shù) 方法 對象 作用 描述
Eventloop 事件循環(huán)诡宗,一個線程只有運(yùn)行一個
__init__ 初始化兩個重要對象 self._readyself._stopping
self._ready 所有的待執(zhí)行任務(wù)都是從這個隊(duì)列取出來滔蝉,非常重要
self._stopping 事件循環(huán)完成的標(biāo)志
call_soon 調(diào)用該方法會立即將任務(wù)添加到待執(zhí)行隊(duì)列
run_once run_forever調(diào)用,從self._ready隊(duì)列里面取出任務(wù)執(zhí)行
run_forever 死循環(huán)塔沃,若self._stopping則退出循環(huán)
run_until_complete 非常重要的函數(shù)蝠引,任務(wù)的起點(diǎn)和終點(diǎn)(后面詳細(xì)介紹)
create_task 將傳入的函數(shù)封裝成task對象,這個操作會將task.__step添加到__ready隊(duì)列
Handle 所有的任務(wù)進(jìn)入待執(zhí)行隊(duì)列(Eventloop.call_soon)之前都會封裝成Handle對象
__init__ 初始化兩個重要對象 self._callbackself._args
self._callback 待執(zhí)行函數(shù)主體
self._args 待執(zhí)行函數(shù)參數(shù)
_run 待執(zhí)行函數(shù)執(zhí)行
get_event_loop 獲取當(dāng)前線程的事件循環(huán)
_complete_eventloop 將事件循環(huán)的_stopping標(biāo)志置位True
run 入口函數(shù)
gather 可以同時執(zhí)行多個任務(wù)的入口函數(shù) 新增
_GatheringFuture 將每一個任務(wù)組成列表,封裝成一個新的類 新增

tasks.py

類/函數(shù) 方法 對象 作用 描述
Task 繼承自Future螃概,主要用于整個協(xié)程運(yùn)行的周期
__init__ 初始化對象 self._coro 矫夯,并且call_soonself.__step加入self._ready隊(duì)列
self._coro 用戶定義的函數(shù)主體
__step Task類的核心函數(shù)
__wakeup 喚醒任務(wù) 新增
ensure_future 如果對象是一個Future對象,就返回吊洼,否則就會調(diào)用create_task返回训貌,并且加入到_ready隊(duì)列

futures.py

類/函數(shù) 方法 對象 作用 描述
Future 主要負(fù)責(zé)與用戶函數(shù)進(jìn)行交互
__init__ 初始化兩個重要對象 self._loopself._callbacks
self._loop 事件循環(huán)
self._callbacks 回調(diào)隊(duì)列,任務(wù)暫存隊(duì)列冒窍,等待時機(jī)成熟(狀態(tài)不是PENDING)递沪,就會進(jìn)入_ready隊(duì)列
add_done_callback 添加任務(wù)回調(diào)函數(shù),狀態(tài)_PENDING综液,就虎進(jìn)入_callbacks隊(duì)列区拳,否則進(jìn)入_ready隊(duì)列
set_result 獲取任務(wù)執(zhí)行結(jié)果并存儲至_result,將狀態(tài)置位_FINISH意乓,調(diào)用__schedule_callbacks
__schedule_callbacks 將回調(diào)函數(shù)放入_ready,等待執(zhí)行
result 獲取返回值
__await__ 使用await就會進(jìn)入這個方法 新增
__iter__ 使用yield from就會進(jìn)入這個方法 新增

3)執(zhí)行過程

3.1)入口函數(shù)

main.py

    
if __name__ == "__main__":
    ret = wilsonasyncio.run(helloworld())
    print(ret)
  • ret = wilsonasyncio.run(helloworld())使用run约素,參數(shù)是用戶函數(shù)helloworld()届良,進(jìn)入runrun的流程可以參考上一小節(jié)
  • run --> run_until_complete

3.2)事件循環(huán)啟動圣猎,同之前

3.3)第一次循環(huán)run_forever --> run_once

  • _ready隊(duì)列的內(nèi)容(即:task.__step)取出來執(zhí)行士葫,這里的corohelloworld()
    def __step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking:
                result._asyncio_future_blocking = False
                result.add_done_callback(self.__wakeup, result)
        finally:
            self = None
  • __step較之前的代碼有改動
  • result = coro.send(None),進(jìn)入用戶定義函數(shù)
async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret
  • ret = await wilsonasyncio.gather(hello(), world())送悔,這里沒啥可說的慢显,進(jìn)入gather函數(shù)
def gather(*coros_or_futures, loop=None):
    loop = get_event_loop()

    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1

        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)

            outer.set_result(results)

    children = []
    nfuts = 0
    nfinished = 0

    for arg in coros_or_futures:
        fut = tasks.ensure_future(arg, loop=loop)
        nfuts += 1
        fut.add_done_callback(_done_callback)

        children.append(fut)

    outer = _GatheringFuture(children, loop=loop)
    return outer
  • loop = get_event_loop()獲取事件循環(huán)
  • def _done_callback(fut)這個函數(shù)是回調(diào)函數(shù),細(xì)節(jié)后面分析欠啤,現(xiàn)在只需要知道任務(wù)(hello()world())執(zhí)行完之后就會回調(diào)就行
  • for arg in coros_or_futures for循環(huán)確保每一個任務(wù)都是Future對象荚藻,并且add_done_callback將回調(diào)函數(shù)設(shè)置為_done_callback,還有將他們加入到_ready隊(duì)列等待下一次循環(huán)調(diào)度
  • 3個重要的變量:
    ? ? ? ?children里面存放的是每一個異步任務(wù)洁段,在本例是hello()world()
    ? ? ? ?nfuts存放是異步任務(wù)的數(shù)量应狱,在本例是2
    ? ? ? ?nfinished存放的是異步任務(wù)完成的數(shù)量,目前是0祠丝,完成的時候是2
  • 繼續(xù)往下疾呻,來到了_GatheringFuture,看看源碼:
class _GatheringFuture(Future):

    def __init__(self, children, *, loop=None):
        super().__init__(loop=loop)
        self._children = children
  • _GatheringFuture最主要的作用就是將多個異步任務(wù)放入self._children写半,然后用_GatheringFuture這個對象來管理岸蜗。需要注意,這個對象繼承了Future
  • 至此叠蝇,gather完成初始化璃岳,返回了outer,其實(shí)就是_GatheringFuture
  • 總結(jié)一下gather,初始化了3個重要的變量矾睦,后面用來存放狀態(tài)晦款;給每一個異步任務(wù)添加回調(diào)函數(shù);將多個異步子任務(wù)合并枚冗,并且使用一個Future對象去管理

3.3.1)gather完成缓溅,回到helloworld()

async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret
  • ret = await wilsonasyncio.gather(hello(), world()) gather返回_GatheringFuture,隨后使用await赁温,就會進(jìn)入Future.__await__
    def __await__(self):
        if self._state == _PENDING:
            self._asyncio_future_blocking = True
            yield self
        return self.result()
  • 由于_GatheringFuture的狀態(tài)是_PENDING坛怪,所以進(jìn)入if,遇到yield self股囊,將self袜匿,也就是_GatheringFuture返回(這里注意yield的用法,流程控制的功能)
  • yield回到哪兒去了呢稚疹?從哪兒send就回到哪兒去居灯,所以,他又回到了task.__step函數(shù)里面去
    def __step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking:
                result._asyncio_future_blocking = False
                result.add_done_callback(self.__wakeup, result)
        finally:
            self = None
  • 這里是本函數(shù)的第一個核心點(diǎn)内狗,流程控制/跳轉(zhuǎn)怪嫌,需要非常的清晰,如果搞不清楚的同學(xué)柳沙,再詳細(xì)的去閱讀有關(guān)yield/yield from的文章
  • 繼續(xù)往下走岩灭,由于用戶函數(shù)helloworld()沒有結(jié)束,所以不會拋異常赂鲤,所以來到了else分支
  • blocking = getattr(result, '_asyncio_future_blocking', None)這里有一個重要的狀態(tài)噪径,那就是_asyncio_future_blocking,只有調(diào)用__await__数初,才會有這個參數(shù)找爱,默認(rèn)是true,這個參數(shù)主要的作用:一個異步函數(shù)妙真,如果調(diào)用了多個子異步函數(shù)缴允,那證明該異步函數(shù)沒有結(jié)束(后面詳細(xì)講解),就需要添加“喚醒”回調(diào)
  • result._asyncio_future_blocking = False將參數(shù)置位False珍德,并且添加self.__wakeup回調(diào)等待喚醒
  • __step函數(shù)完成

\color{red}{這里需要詳細(xì)講解一下:} _asyncio_future_blocking

  • 如果在異步函數(shù)里面出現(xiàn)了await练般,調(diào)用其他異步函數(shù)的情況,就會走到Future.__await___asyncio_future_blocking設(shè)置為true
async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret
    
class Future:
    def __await__(self):
        if self._state == _PENDING:
            self._asyncio_future_blocking = True
            yield self
        return self.result()
  • 這樣做了之后锈候,在task.__step中就會把該任務(wù)的回調(diào)函數(shù)設(shè)置為__wakeup
  • 為啥要__wakeup薄料,因?yàn)?code>helloworld()并沒有執(zhí)行完成,所以需要再次__wakeup來喚醒helloworld()

這里揭示了泵琳,在Eventloop里面摄职,只要使用await調(diào)用其他異步任務(wù)誊役,就會掛起父任務(wù),轉(zhuǎn)而去執(zhí)行子任務(wù)谷市,直至子任務(wù)完成之后蛔垢,回到父任務(wù)繼續(xù)執(zhí)行


先喝口水,休息一下迫悠,下面更復(fù)雜鹏漆。。创泄。

3.4)第二次循環(huán)run_forever --> run_once

eventloops.py

    def run_once(self):
        ntodo = len(self._ready)
        for _ in range(ntodo):
            handle = self._ready.popleft()
            handle._run()
  • 從隊(duì)列中取出數(shù)據(jù)艺玲,此時_ready隊(duì)列有兩個任務(wù),hello() world()鞠抑,在gather的for循環(huán)時添加的
async def hello():
    print('enter hello ...')
    return 'return hello ...'

async def world():
    print('enter world ...')
    return 'return world ...'
  • 由于hello() world()沒有await調(diào)用其他異步任務(wù)饭聚,所以他們的執(zhí)行比較簡單,分別一次task.__step就結(jié)束了搁拙,到達(dá)set_result()
  • set_result()將回調(diào)函數(shù)放入_ready隊(duì)列秒梳,等待下次循環(huán)執(zhí)行

3.5)第三次循環(huán)run_forever --> run_once

  • 我們來看下回調(diào)函數(shù)
    def _done_callback(fut):
        nonlocal nfinished
        nfinished += 1

        if nfinished == nfuts:
            results = []
            for fut in children:
                res = fut.result()
                results.append(res)

            outer.set_result(results)
  • 沒錯,這是本文的第二個核心點(diǎn)箕速,我們來仔細(xì)分析一下
  • 這段代碼最主要的邏輯端幼,其實(shí)就是,只有當(dāng)所有的子任務(wù)執(zhí)行完之后弧满,才會啟動父任務(wù)的回調(diào)函數(shù),本文中只有hello() world()都執(zhí)行完之后if nfinished == nfuts: 此熬,才會啟動父任務(wù)_GatheringFuture的回調(diào)outer.set_result(results)
  • results.append(res)將子任務(wù)的結(jié)果取出來庭呜,放進(jìn)父任務(wù)的results里面
  • 子任務(wù)執(zhí)行完成,終于到了喚醒父任務(wù)的時候了task.__wakeup
    def __wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            raise exc
        else:
            self.__step()
        self = None

3.6)第四次循環(huán)run_forever --> run_once

  • future.result()_GatheringFuture取出結(jié)果犀忱,然后進(jìn)入task.__step
    def __step(self, exc=None):
        coro = self._coro
        try:
            if exc is None:
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            super().set_result(exc.value)
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking:
                result._asyncio_future_blocking = False
                result.add_done_callback(self.__wakeup, result)
        finally:
            self = None
  • result = coro.send(None)其實(shí)就是helloworld() --> send又要跳回到當(dāng)初yield的地方募谎,那就是Future.__await__
    def __await__(self):
        if self._state == _PENDING:
            self._asyncio_future_blocking = True
            yield self
        return self.result()
  • return self.result()終于返回到helloworld()函數(shù)里面去了
async def helloworld():
    print('enter helloworld')
    ret = await wilsonasyncio.gather(hello(), world())
    print('exit helloworld')
    return ret

  • helloworld終于也執(zhí)行完了,返回了ret

3.7)第五次循環(huán)run_forever --> run_once

  • 循環(huán)結(jié)束
  • 回到run

3.8)回到主函數(shù)阴汇,獲取返回值

if __name__ == "__main__":
    ret = wilsonasyncio.run(helloworld())
    print(ret)

3.9)執(zhí)行結(jié)果

? python3 main.py
enter helloworld
enter hello ...
enter world ...
exit helloworld
['return hello ...', 'return world ...']

五数冬、流程總結(jié)

gather.jpg

六、小結(jié)

● 終于結(jié)束了搀庶,這是一個非常長的小節(jié)了拐纱,但是我感覺很多細(xì)節(jié)還是沒有說到,大家有問題請及時留言探討
_GatheringFuture一個非常重要的對象哥倔,它不但追蹤了hello() world()的執(zhí)行狀態(tài)秸架,喚醒helloworld(),并且將返回值傳遞給helloworld
await async yield對流程的控制需要特別關(guān)注
● 本文中的代碼咆蒿,參考了python 3.7.7中asyncio的源代碼东抹,裁剪而來
● 本文中代碼:代碼



至此蚂子,本文結(jié)束
在下才疏學(xué)淺,有撒湯漏水的缭黔,請各位不吝賜教...

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末食茎,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子馏谨,更是在濱河造成了極大的恐慌别渔,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件田巴,死亡現(xiàn)場離奇詭異钠糊,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)壹哺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進(jìn)店門抄伍,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人管宵,你說我怎么就攤上這事截珍。” “怎么了箩朴?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵岗喉,是天一觀的道長。 經(jīng)常有香客問我炸庞,道長钱床,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任埠居,我火速辦了婚禮查牌,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘滥壕。我一直安慰自己纸颜,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布绎橘。 她就那樣靜靜地躺著胁孙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪称鳞。 梳的紋絲不亂的頭發(fā)上涮较,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天,我揣著相機(jī)與錄音冈止,去河邊找鬼法希。 笑死,一個胖子當(dāng)著我的面吹牛靶瘸,可吹牛的內(nèi)容都是我干的苫亦。 我是一名探鬼主播毛肋,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼屋剑!你這毒婦竟也來了润匙?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤唉匾,失蹤者是張志新(化名)和其女友劉穎孕讳,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巍膘,經(jīng)...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡厂财,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了峡懈。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片璃饱。...
    茶點(diǎn)故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖肪康,靈堂內(nèi)的尸體忽然破棺而出荚恶,到底是詐尸還是另有隱情,我是刑警寧澤磷支,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布谒撼,位于F島的核電站,受9級特大地震影響雾狈,放射性物質(zhì)發(fā)生泄漏廓潜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一善榛、第九天 我趴在偏房一處隱蔽的房頂上張望峡继。 院中可真熱鬧雹仿,春花似錦私恬、人聲如沸柜裸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至钮呀,卻和暖如春剑鞍,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背爽醋。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工蚁署, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蚂四。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓光戈,卻偏偏與公主長得像哪痰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子久妆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內(nèi)容