python asyncio
網(wǎng)絡(luò)模型有很多中毫缆,為了實(shí)現(xiàn)高并發(fā)也有很多方案,多線程乐导,多進(jìn)程苦丁。無(wú)論多線程和多進(jìn)程,IO的調(diào)度更多取決于系統(tǒng)物臂,而協(xié)程的方式旺拉,調(diào)度來自用戶,用戶可以在函數(shù)中yield一個(gè)狀態(tài)棵磷。使用協(xié)程可以實(shí)現(xiàn)高效的并發(fā)任務(wù)蛾狗。Python的在3.4中引入了協(xié)程的概念,可是這個(gè)還是以生成器對(duì)象為基礎(chǔ)仪媒,3.5則確定了協(xié)程的語(yǔ)法沉桌。下面將簡(jiǎn)單介紹asyncio的使用。實(shí)現(xiàn)協(xié)程的不僅僅是asyncio,tornado和gevent都實(shí)現(xiàn)了類似的功能蒲牧。
event_loop 事件循環(huán):程序開啟一個(gè)無(wú)限的循環(huán)撇贺,程序員會(huì)把一些函數(shù)注冊(cè)到事件循環(huán)上。當(dāng)滿足事件發(fā)生的時(shí)候冰抢,調(diào)用相應(yīng)的協(xié)程函數(shù)松嘶。
coroutine 協(xié)程:協(xié)程對(duì)象,指一個(gè)使用async關(guān)鍵字定義的函數(shù)挎扰,它的調(diào)用不會(huì)立即執(zhí)行函數(shù)翠订,而是會(huì)返回一個(gè)協(xié)程對(duì)象。協(xié)程對(duì)象需要注冊(cè)到事件循環(huán)遵倦,由事件循環(huán)調(diào)用尽超。
task ?任務(wù):一個(gè)協(xié)程對(duì)象就是一個(gè)原生可以掛起的函數(shù),任務(wù)則是對(duì)協(xié)程進(jìn)一步封裝梧躺,其中包含任務(wù)的各種狀態(tài)似谁。
future: 代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果。它和task上沒有本質(zhì)的區(qū)別
async/await 關(guān)鍵字:python3.5 用于定義協(xié)程的關(guān)鍵字掠哥,async定義一個(gè)協(xié)程巩踏,await用于掛起阻塞的異步調(diào)用接口。
上述的概念單獨(dú)拎出來都不好懂续搀,比較他們之間是相互聯(lián)系塞琼,一起工作。下面看例子禁舷,再回溯上述概念彪杉,更利于理解。
定義一個(gè)協(xié)程
定義一個(gè)協(xié)程很簡(jiǎn)單牵咙,使用async關(guān)鍵字派近,就像定義普通函數(shù)一樣:
通過async關(guān)鍵字定義一個(gè)協(xié)程(coroutine),協(xié)程也是一種對(duì)象霜大。協(xié)程不能直接運(yùn)行构哺,需要把協(xié)程加入到事件循環(huán)(loop),由后者在適當(dāng)?shù)臅r(shí)候調(diào)用協(xié)程战坤。asyncio.get_event_loop方法可以創(chuàng)建一個(gè)事件循環(huán),然后使用run_until_complete將協(xié)程注冊(cè)到事件循環(huán)残拐,并啟動(dòng)事件循環(huán)途茫。因?yàn)楸纠挥幸粋€(gè)協(xié)程,于是可以看見如下輸出:
創(chuàng)建一個(gè)task
協(xié)程對(duì)象不能直接運(yùn)行溪食,在注冊(cè)事件循環(huán)的時(shí)候囊卜,其實(shí)是run_until_complete方法將協(xié)程包裝成為了一個(gè)任務(wù)(task)對(duì)象。所謂task對(duì)象是Future類的子類。保存了協(xié)程運(yùn)行后的狀態(tài)栅组,用于未來獲取協(xié)程的結(jié)果雀瓢。
創(chuàng)建task后,task在加入事件循環(huán)之前是pending狀態(tài)玉掸,因?yàn)閐o_some_work中沒有耗時(shí)的阻塞操作刃麸,task很快就執(zhí)行完畢了。后面打印的finished狀態(tài)司浪。
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創(chuàng)建一個(gè)task泊业,run_until_complete的參數(shù)是一個(gè)futrue對(duì)象。當(dāng)傳入一個(gè)協(xié)程啊易,其內(nèi)部會(huì)自動(dòng)封裝成task吁伺,task是Future的子類。isinstance(task, asyncio.Future)將會(huì)輸出True租谈。
綁定回調(diào)
綁定回調(diào)篮奄,在task執(zhí)行完畢的時(shí)候可以獲取執(zhí)行的結(jié)果,回調(diào)的最后一個(gè)參數(shù)是future對(duì)象割去,通過該對(duì)象可以獲取協(xié)程返回值窟却。如果回調(diào)需要多個(gè)參數(shù),可以通過偏函數(shù)導(dǎo)入劫拗。
可以看到间校,coroutine執(zhí)行結(jié)束時(shí)候會(huì)調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果页慷。我們創(chuàng)建的task和回調(diào)里的future對(duì)象憔足,實(shí)際上是同一個(gè)對(duì)象。
future 與 result
回調(diào)一直是很多異步編程的惡夢(mèng)酒繁,程序員更喜歡使用同步的編寫方式寫異步代碼滓彰,以避免回調(diào)的惡夢(mèng)≈萏唬回調(diào)中我們使用了future對(duì)象的result方法揭绑。前面不綁定回調(diào)的例子中,我們可以看到task有fiinished狀態(tài)郎哭。在那個(gè)時(shí)候他匪,可以直接讀取task的result方法。
阻塞和await
使用async可以定義協(xié)程對(duì)象夸研,使用await可以針對(duì)耗時(shí)的操作進(jìn)行掛起邦蜜,就像生成器里的yield一樣捉超,函數(shù)讓出控制權(quán)碱鳞。協(xié)程遇到await,事件循環(huán)將會(huì)掛起該協(xié)程塞关,執(zhí)行別的協(xié)程,直到其他的協(xié)程也掛起或者執(zhí)行完畢絮供,再進(jìn)行下一個(gè)協(xié)程的執(zhí)行衣吠。
耗時(shí)的操作一般是一些IO操作,例如網(wǎng)絡(luò)請(qǐng)求壤靶,文件讀取等缚俏。我們使用asyncio.sleep函數(shù)來模擬IO操作。協(xié)程的目的也是讓這些IO操作異步化萍肆。
在 sleep的時(shí)候袍榆,使用await讓出控制權(quán)。即當(dāng)遇到阻塞調(diào)用的函數(shù)的時(shí)候塘揣,使用await方法將協(xié)程的控制權(quán)讓出包雀,以便loop調(diào)用其他的協(xié)程。現(xiàn)在我們的例子就用耗時(shí)的阻塞操作了亲铡。
并發(fā)和并行
并發(fā)和并行一直是容易混淆的概念才写。并發(fā)通常指有多個(gè)任務(wù)需要同時(shí)進(jìn)行,并行則是同一時(shí)刻有多個(gè)任務(wù)執(zhí)行奖蔓。用上課來舉例就是赞草,并發(fā)情況下是一個(gè)老師在同一時(shí)間段輔助不同的人功課。并行則是好幾個(gè)老師分別同時(shí)輔助多個(gè)學(xué)生功課吆鹤。簡(jiǎn)而言之就是一個(gè)人同時(shí)吃三個(gè)饅頭還是三個(gè)人同時(shí)分別吃一個(gè)的情況厨疙,吃一個(gè)饅頭算一個(gè)任務(wù)。
asyncio實(shí)現(xiàn)并發(fā)疑务,就需要多個(gè)協(xié)程來完成任務(wù)沾凄,每當(dāng)有任務(wù)阻塞的時(shí)候就await,然后其他協(xié)程繼續(xù)工作知允。創(chuàng)建多個(gè)協(xié)程的列表撒蟀,然后將這些協(xié)程注冊(cè)到事件循環(huán)中。
結(jié)果如下
總時(shí)間為4s左右温鸽。4s的阻塞時(shí)間保屯,足夠前面兩個(gè)協(xié)程執(zhí)行完畢。如果是同步順序的任務(wù)涤垫,那么至少需要7s姑尺。此時(shí)我們使用了aysncio實(shí)現(xiàn)了并發(fā)。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個(gè)task列表蝠猬,后者接收一堆task股缸。
協(xié)程嵌套
使用async可以定義協(xié)程,協(xié)程用于耗時(shí)的io操作吱雏,我們也可以封裝更多的io操作過程敦姻,這樣就實(shí)現(xiàn)了嵌套的協(xié)程,即一個(gè)協(xié)程中await了另外一個(gè)協(xié)程歧杏,如此連接起來镰惦。
如果使用的是 asyncio.gather創(chuàng)建協(xié)程對(duì)象,那么await的返回值就是協(xié)程運(yùn)行的結(jié)果犬绒。
不在main協(xié)程函數(shù)里處理結(jié)果旺入,直接返回await的內(nèi)容,那么最外層的run_until_complete將會(huì)返回main協(xié)程的結(jié)果凯力。
或者返回使用asyncio.wait方式掛起協(xié)程茵瘾。
也可以使用asyncio的as_completed方法
由此可見,協(xié)程的調(diào)用和組合十分靈活咐鹤,尤其是對(duì)于結(jié)果的處理拗秘,如何返回,如何掛起祈惶,需要逐漸積累經(jīng)驗(yàn)和前瞻的設(shè)計(jì)雕旨。
協(xié)程停止
上面見識(shí)了協(xié)程的幾種常用的用法,都是協(xié)程圍繞著事件循環(huán)進(jìn)行的操作捧请。future對(duì)象有幾個(gè)狀態(tài):
Pending
Running
Done
Cancelled
創(chuàng)建future的時(shí)候凡涩,task為pending,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running疹蛉,調(diào)用完畢自然就是done活箕,如果需要停止事件循環(huán),就需要先把task取消可款∮可以使用asyncio.Task獲取事件循環(huán)的task
啟動(dòng)事件循環(huán)之后,馬上ctrl+c筑舅,會(huì)觸發(fā)run_until_complete的執(zhí)行異常 KeyBorardInterrupt座慰。然后通過循環(huán)asyncio.Task取消future〈浼穑可以看到輸出如下:
True表示cannel成功版仔,loop stop之后還需要再次開啟事件循環(huán),最后在close误墓,不然還會(huì)拋出異常:
循環(huán)task蛮粮,逐個(gè)cancel是一種方案,可是正如上面我們把task的列表封裝在main函數(shù)中谜慌,main函數(shù)外進(jìn)行事件循環(huán)的調(diào)用然想。這個(gè)時(shí)候,main相當(dāng)于最外出的一個(gè)task欣范,那么處理包裝的main函數(shù)即可变泄。
不同線程的事件循環(huán)
很多時(shí)候令哟,我們的事件循環(huán)用于注冊(cè)協(xié)程,而有的協(xié)程需要?jiǎng)討B(tài)的添加到事件循環(huán)中妨蛹。一個(gè)簡(jiǎn)單的方式就是使用多線程屏富。當(dāng)前線程創(chuàng)建一個(gè)事件循環(huán),然后在新建一個(gè)線程蛙卤,在新線程中啟動(dòng)事件循環(huán)狠半。當(dāng)前線程不會(huì)被block。
啟動(dòng)上述代碼之后颤难,當(dāng)前線程不會(huì)被block神年,新線程中會(huì)按照順序執(zhí)行call_soon_threadsafe方法注冊(cè)的more_work方法,后者因?yàn)閠ime.sleep操作是同步阻塞的行嗤,因此運(yùn)行完畢more_work需要大致6 + 3
新線程協(xié)程
上述的例子已日,主線程中創(chuàng)建一個(gè)new_loop,然后在另外的子線程中開啟一個(gè)無(wú)限事件循環(huán)昂验。主線程通過run_coroutine_threadsafe新注冊(cè)協(xié)程對(duì)象捂敌。這樣就能在子線程中進(jìn)行事件循環(huán)的并發(fā)操作,同時(shí)主線程又不會(huì)被block既琴。一共執(zhí)行的時(shí)間大概在6s左右占婉。
master-worker主從模式
對(duì)于并發(fā)任務(wù),通常是用生成消費(fèi)模型甫恩,對(duì)隊(duì)列的處理可以使用類似master-worker的方式逆济,master主要用戶獲取隊(duì)列的msg,worker用戶處理消息磺箕。
為了簡(jiǎn)單起見奖慌,并且協(xié)程更適合單線程的方式,我們的主線程用來監(jiān)聽隊(duì)列松靡,子線程用于處理隊(duì)列简僧。這里使用redis的隊(duì)列。主線程中有一個(gè)是無(wú)限循環(huán)雕欺,用戶消費(fèi)隊(duì)列岛马。
我們發(fā)起了一個(gè)耗時(shí)5s的操作,然后又發(fā)起了連個(gè)1s的操作屠列,可以看見子線程并發(fā)的執(zhí)行了這幾個(gè)任務(wù)啦逆,其中5s awati的時(shí)候,相繼執(zhí)行了1s的兩個(gè)任務(wù)笛洛。
停止子線程
如果一切正常夏志,那么上面的例子很完美】寥茫可是沟蔑,需要停止程序湿诊,直接ctrl+c,會(huì)拋出KeyboardInterrupt錯(cuò)誤溉贿,我們修改一下主循環(huán):
可是實(shí)際上并不好使枫吧,雖然主線程try了KeyboardInterrupt異常,但是子線程并沒有退出宇色,為了解決這個(gè)問題,可以設(shè)置子線程為守護(hù)線程颁湖,這樣當(dāng)主線程結(jié)束的時(shí)候宣蠕,子線程也隨機(jī)退出。
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.setDaemon(True)# 設(shè)置子線程為守護(hù)線程
t.start()
try:
線程停止程序的時(shí)候甥捺,主線程退出后抢蚀,子線程也隨機(jī)退出才了,并且停止了子線程的協(xié)程任務(wù)镰禾。
aiohttp
在消費(fèi)隊(duì)列的時(shí)候皿曲,我們使用asyncio的sleep用于模擬耗時(shí)的io操作。以前有一個(gè)短信服務(wù)吴侦,需要在協(xié)程中請(qǐng)求遠(yuǎn)程的短信api屋休,此時(shí)需要是需要使用aiohttp進(jìn)行異步的http請(qǐng)求。大致代碼如下:
/接口表示短信接口备韧,/error表示請(qǐng)求/失敗之后的報(bào)警劫樟。
有一個(gè)問題需要注意,我們?cè)趂etch的時(shí)候try了異常织堂,如果沒有try這個(gè)異常叠艳,即使發(fā)生了異常,子線程的事件循環(huán)也不會(huì)退出易阳。主線程也不會(huì)退出附较,暫時(shí)沒找到辦法可以把子線程的異常raise傳播到主線程。(如果誰(shuí)找到了比較好的方式潦俺,希望可以帶帶我)拒课。
對(duì)于redis的消費(fèi),還有一個(gè)block的方法:
使用 brpop方法黑竞,會(huì)block住task捕发,如果主線程有消息,才會(huì)消費(fèi)很魂。測(cè)試了一下扎酷,似乎brpop的方式更適合這種隊(duì)列消費(fèi)的模型。
可以看到結(jié)果
協(xié)程消費(fèi)
主線程用于監(jiān)聽隊(duì)列遏匆,然后子線程的做事件循環(huán)的worker是一種方式法挨。還有一種方式實(shí)現(xiàn)這種類似master-worker的方案谁榜。即把監(jiān)聽隊(duì)列的無(wú)限循環(huán)邏輯一道協(xié)程中。程序初始化就創(chuàng)建若干個(gè)協(xié)程凡纳,實(shí)現(xiàn)類似并行的效果窃植。
這樣做就可以多多啟動(dòng)幾個(gè)worker來監(jiān)聽隊(duì)列。一樣可以到達(dá)效果荐糜。
總結(jié)
上述簡(jiǎn)單的介紹了asyncio的用法巷怜,主要是理解事件循環(huán),協(xié)程和任務(wù)暴氏,future的關(guān)系延塑。異步編程不同于常見的同步編程,設(shè)計(jì)程序的執(zhí)行流的時(shí)候答渔,需要特別的注意关带。畢竟這和以往的編碼經(jīng)驗(yàn)有點(diǎn)不一樣≌铀海可是仔細(xì)想想宋雏,我們平時(shí)處事的時(shí)候,大腦會(huì)自然而然的實(shí)現(xiàn)異步協(xié)程务豺。比如等待煮茶的時(shí)候磨总,可以多寫幾行代碼。