前言
今天開(kāi)始聊一聊python3的asyncio噪舀。關(guān)于asyncio飘诗,大家肯定都有自己的理解,并且網(wǎng)上大神也把基礎(chǔ)概念也解釋的比較透徹纺座。
本文寫(xiě)作的初衷溉潭,主要是理解asyncio的原理并且實(shí)現(xiàn)一遍。
話不多說(shuō)喳瓣,我們開(kāi)始!
一掸掸、知識(shí)準(zhǔn)備
● 理解進(jìn)程蹭秋、線程堤撵、協(xié)程。簡(jiǎn)單來(lái)說(shuō)实昨,這三個(gè)都是為了解決多任務(wù)同時(shí)進(jìn)行的問(wèn)題
? 1)進(jìn)程是操作資源分配的最小單位,多任務(wù)的實(shí)現(xiàn)主要是極快地在進(jìn)程間來(lái)回切換丈挟,而進(jìn)程切換消耗時(shí)間最長(zhǎng)(系統(tǒng)調(diào)用)
? 2)線程依賴(lài)于進(jìn)程志电,多個(gè)線程共享了父進(jìn)程的一部分資源,線程切換時(shí)間相對(duì)于進(jìn)程來(lái)說(shuō)消耗時(shí)間大大減少例朱,但是由于python gil的存在,并不存在多線程(系統(tǒng)調(diào)用)
? 3)協(xié)程依賴(lài)于線程洒嗤,由于進(jìn)程/線程切換都是系統(tǒng)調(diào)用,開(kāi)銷(xiāo)是巨大的羔挡。而協(xié)程是在用戶空間內(nèi)完成任務(wù)切換间唉,不會(huì)切換到操作系統(tǒng)資源(寄存器、信號(hào)量终吼、堆棧等),所以這種方式開(kāi)銷(xiāo)最小商佛。python的協(xié)程核心在于姆打,遇到等待事件,就交出cpu控制權(quán)幔戏,轉(zhuǎn)而讓其他協(xié)程執(zhí)行
● 理解python生成器,yield/yield from
? 這里就不班門(mén)弄斧了痊剖,直接推薦大佬的blog
● 理解關(guān)鍵字async/await垒玲,async/await是3.5之后的語(yǔ)法,和yield/yield from異曲同工
二合愈、環(huán)境準(zhǔn)備
組件 | 版本 |
---|---|
python | 3.7.7 |
三、
run_until_complete
的實(shí)現(xiàn)
先來(lái)看下官方asyncio的使用方法:
|># more main.py
import asyncio
async def hello():
print('enter hello ...')
return 'world'
if __name__ == "__main__":
loop = asyncio.get_event_loop()
task = loop.create_task(hello())
rst = loop.run_until_complete(task)
print(rst)
|># python3 main.py
enter hello ...
world
來(lái)看下造的輪子的使用方式:
? more main.py
from wilsonasyncio import get_event_loop
async def hello():
print('enter hello ...')
return 'world'
if __name__ == "__main__":
loop = get_event_loop()
task = loop.create_task(hello())
rst = loop.run_until_complete(task)
print(rst)
? python3 main.py
enter hello ...
world
自己造的輪子也很好的運(yùn)行了益老,下面我們來(lái)看下輪子的代碼
四寸莫、代碼解析
1)代碼組成
|># tree
.
├── eventloops.py
├── futures.py
├── main.py
├── tasks.py
├── wilsonasyncio.py
文件 | 作用 |
---|---|
eventloops.py | 事件循環(huán) |
futures.py | futures對(duì)象 |
tasks.py | tasks對(duì)象 |
wilsonasyncio.py | 可調(diào)用方法集合 |
main.py | 入口 |
2)代碼概覽:
eventloops.py
類(lèi)/函數(shù) | 方法 | 對(duì)象 | 作用 | 描述 |
---|---|---|---|---|
Eventloop | 事件循環(huán)储狭,一個(gè)線程只有運(yùn)行一個(gè) | |||
__init__ |
初始化兩個(gè)重要對(duì)象 self._ready 與 self._stopping
|
|||
self._ready |
所有的待執(zhí)行任務(wù)都是從這個(gè)隊(duì)列取出來(lái)捣郊,非常重要 | |||
self._stopping |
事件循環(huán)完成的標(biāo)志 | |||
call_soon |
調(diào)用該方法會(huì)立即將任務(wù)添加到待執(zhí)行隊(duì)列 | |||
run_once |
被run_forever 調(diào)用慈参,從self._ready 隊(duì)列里面取出任務(wù)執(zhí)行 |
|||
run_forever |
死循環(huán),若self._stopping 則退出循環(huán) |
|||
run_until_complete |
非常重要的函數(shù)驮配,任務(wù)的起點(diǎn)和終點(diǎn)(后面詳細(xì)介紹) | |||
create_task |
將傳入的函數(shù)封裝成task 對(duì)象,這個(gè)操作會(huì)將task.__step 添加到__ready 隊(duì)列 |
|||
Handle |
所有的任務(wù)進(jìn)入待執(zhí)行隊(duì)列(Eventloop.call_soon )之前都會(huì)封裝成Handle對(duì)象 |
|||
__init__ |
初始化兩個(gè)重要對(duì)象 self._callback 與 self._args
|
|||
self._callback |
待執(zhí)行函數(shù)主體 | |||
self._args |
待執(zhí)行函數(shù)參數(shù) | |||
_run |
待執(zhí)行函數(shù)執(zhí)行 | |||
get_event_loop |
獲取當(dāng)前線程的事件循環(huán) | |||
_complete_eventloop |
將事件循環(huán)的_stopping 標(biāo)志置位True |
tasks.py
類(lèi)/函數(shù) | 方法 | 對(duì)象 | 作用 | 描述 |
---|---|---|---|---|
Task | 繼承自Future,主要用于整個(gè)協(xié)程運(yùn)行的周期 | |||
__init__ |
初始化對(duì)象 self._coro 猜绣,并且call_soon 將self.__step 加入self._ready 隊(duì)列 |
|||
self._coro |
用戶定義的函數(shù)主體 | |||
__step |
Task類(lèi)的核心函數(shù) |
futures.py
類(lèi)/函數(shù) | 方法 | 對(duì)象 | 作用 | 描述 |
---|---|---|---|---|
Future | 主要負(fù)責(zé)與用戶函數(shù)進(jìn)行交互 | |||
__init__ |
初始化兩個(gè)重要對(duì)象 self._loop 與 self._callbacks
|
|||
self._loop |
事件循環(huán) | |||
self._callbacks |
回調(diào)隊(duì)列掰邢,任務(wù)暫存隊(duì)列牺陶,等待時(shí)機(jī)成熟(狀態(tài)不是PENDING )辣之,就會(huì)進(jìn)入_ready 隊(duì)列 |
|||
add_done_callback |
添加任務(wù)回調(diào)函數(shù),狀態(tài)_PENDING 狮鸭,就虎進(jìn)入_callbacks 隊(duì)列多搀,否則進(jìn)入_ready 隊(duì)列 |
|||
set_result |
獲取任務(wù)執(zhí)行結(jié)果并存儲(chǔ)至_result ,將狀態(tài)置位_FINISH 酗昼,調(diào)用__schedule_callbacks
|
|||
__schedule_callbacks |
將回調(diào)函數(shù)放入_ready ,等待執(zhí)行 |
|||
result |
獲取返回值 |
3)執(zhí)行過(guò)程
3.1)入口函數(shù)
main.py
async def hello():
print('enter hello ...')
return 'world'
if __name__ == "__main__":
loop = get_event_loop()
task = loop.create_task(hello())
rst = loop.run_until_complete(task)
print(rst)
-
loop = get_event_loop()
獲取事件循環(huán) -
task = loop.create_task(hello())
將用戶函數(shù)hello()
封裝成協(xié)程,我們看下create_task的源碼
def create_task(self, coro):
task = tasks.Task(coro, loop=self)
return task
? ? 初始化一個(gè)Task對(duì)象春弥,從代碼概覽得知,初始化對(duì)象之后會(huì)立即將__step
添加到_ready
隊(duì)列等待執(zhí)行
-
rst = loop.run_until_complete(task)
開(kāi)始執(zhí)行事件循環(huán)的第一個(gè)函數(shù)run_until_complete
3.2)事件循環(huán)啟動(dòng)
eventloops.py
def run_until_complete(self, future):
future.add_done_callback(_complete_eventloop, future)
self.run_forever()
return future.result()
-
future.add_done_callback(_complete_eventloop, future)
為future
添加回調(diào)函數(shù)(future
就是task對(duì)象扫责,而task對(duì)象里的任務(wù)就是hello()
),回調(diào)函數(shù)是_complete_eventloop
鳖孤。就是future執(zhí)行完成之后執(zhí)行_complete_eventloop
-
self.run_forever()
啟動(dòng)事件循環(huán)
3.3)第一次循環(huán)run_forever
--> run_once
eventloops.py
def run_once(self):
ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
- 將
_ready
隊(duì)列的內(nèi)容(task.__step
)取出來(lái)執(zhí)行
tasks.py
def __step(self, exc=None):
coro = self._coro
try:
if exc is None:
coro.send(None)
else:
coro.throw(exc)
except StopIteration as exc:
super().set_result(exc.value)
finally:
self = None
-
coro.send(None)
核心代碼,跳轉(zhuǎn)回到用戶函數(shù)hello()
main.py
async def hello():
print('enter hello ...')
return 'world'
- 用戶函數(shù)非常簡(jiǎn)單黄鳍,打印一行數(shù)據(jù)平匈,以及返回一個(gè)字符串
world
框沟,執(zhí)行完成之后回到task.__step()
-
super().set_result(exc.value)
由于用戶函數(shù)執(zhí)行完成增炭,會(huì)拋出異常StopIteration
,捕獲之后執(zhí)行set_result
- 由代碼概覽得知
set_result
的作用在于將任務(wù)狀態(tài)置位_FINISHED
梅垄,并且將回調(diào)函數(shù)(_complete_eventloop
)寫(xiě)入_ready
隊(duì)列
3.4)第二次循環(huán)run_forever
--> run_once
eventloops.py
def run_once(self):
ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
- 繼續(xù)循環(huán)输玷,
handle
封裝了_complete_eventloop
def _complete_eventloop(fut):
fut._loop.stop()
- 調(diào)用stop,設(shè)置停止標(biāo)志
3.5)第三次循環(huán)run_forever
def run_forever(self):
while True:
self.run_once()
if self._stopping:
break
- 跳出事件循環(huán)炭玫,回到
run_until_complete
def run_until_complete(self, future):
future.add_done_callback(_complete_eventloop, future)
self.run_forever()
return future.result()
3.6)回到主函數(shù)貌虾,獲取返回值
if __name__ == "__main__":
loop = get_event_loop()
task = loop.create_task(hello())
rst = loop.run_until_complete(task)
print(rst)
-
rst = loop.run_until_complete(task)
獲取返回值
3.7)執(zhí)行結(jié)果
? python3 main.py
enter hello ...
return world ...
五、流程總結(jié)
六衔憨、小結(jié)
● task對(duì)象與future有什么區(qū)別?主要用于整個(gè)協(xié)程運(yùn)行的周期践图,主要負(fù)責(zé)與用戶函數(shù)進(jìn)行交互
● 本文從asyncio的第一個(gè)函數(shù)run_until_complete
沉馆,介紹了asyncio的基本流程:用戶函數(shù)并不是立即執(zhí)行,而是進(jìn)入隊(duì)列斥黑,然后根據(jù)eventloop
在合適的時(shí)機(jī)進(jìn)行統(tǒng)一調(diào)度
● 本文中的代碼,參考了python 3.7.7中asyncio的源代碼兽狭,裁剪而來(lái)
● 本文中代碼:代碼
至此,本文結(jié)束
在下才疏學(xué)淺箕慧,有撒湯漏水的,請(qǐng)各位不吝賜教...