學(xué)習(xí)python高并發(fā)模塊asynio

簡(jiǎn)介

學(xué)習(xí)這個(gè)主要是因?yàn)樵谔幚硪粋€(gè)本地測(cè)試的時(shí)候,需要解決這個(gè)模塊的一個(gè)問題驴娃。在瀏覽解決方案的時(shí)候奏候,發(fā)現(xiàn)大家會(huì)使用這個(gè)模塊來進(jìn)行python爬蟲工作。感覺蠻有意思的唇敞,準(zhǔn)備花一天補(bǔ)補(bǔ)基礎(chǔ)蔗草。

asyncio 可以實(shí)現(xiàn)異步網(wǎng)絡(luò)操作咒彤、并發(fā)、協(xié)程咒精。當(dāng)然目前實(shí)現(xiàn)協(xié)程的不僅僅asyncio還有tornado等模塊镶柱。

創(chuàng)建一個(gè)asyncio的步驟如下

  1. 創(chuàng)建一個(gè)event_loop 事件循環(huán),當(dāng)啟動(dòng)時(shí)模叙,程序開啟一個(gè)無限循環(huán)歇拆,把一些函數(shù)注冊(cè)到事件循環(huán)上,當(dāng)滿足事件發(fā)生的時(shí)候范咨,調(diào)用相應(yīng)的協(xié)程函數(shù)故觅。
  2. 創(chuàng)建協(xié)程: 使用async關(guān)鍵字定義的函數(shù)就是一個(gè)協(xié)程對(duì)象。在協(xié)程函數(shù)內(nèi)部可以使用await關(guān)鍵字用于阻塞操作的掛起渠啊。
  3. 將協(xié)程注冊(cè)到事件循環(huán)中输吏。協(xié)程的調(diào)用不會(huì)立即執(zhí)行函數(shù),而是會(huì)返回一個(gè)協(xié)程對(duì)象替蛉。協(xié)程對(duì)象需要注冊(cè)到事件循環(huán)贯溅,由事件循環(huán)調(diào)用。

基礎(chǔ)知識(shí)

一躲查、定義一個(gè)協(xié)程

import time
import asyncio

now = lambda : time.time()

async def do_some_work(x):
    print("waiting:", x)

start = now()
# 這里是一個(gè)協(xié)程對(duì)象盗迟,這個(gè)時(shí)候do_some_work函數(shù)并沒有執(zhí)行
coroutine = do_some_work(2)
print(coroutine)
#  創(chuàng)建一個(gè)事件loop
loop = asyncio.get_event_loop()
# 將協(xié)程注冊(cè)到事件循環(huán),并啟動(dòng)事件循環(huán)
loop.run_until_complete(coroutine)

print("Time:",now()-start)

二熙含、創(chuàng)建一個(gè)task

一個(gè)協(xié)程對(duì)象就是一個(gè)原生可以掛起的函數(shù)罚缕,任務(wù)則是對(duì)協(xié)程進(jìn)一步封裝,其中包含了任務(wù)的各種狀態(tài)怎静。在上面的代碼中邮弹,在注冊(cè)事件循環(huán)的時(shí)候,其實(shí)是run_until_complete方法將協(xié)程包裝成為了一個(gè)任務(wù)(task)對(duì)象蚓聘。 task對(duì)象是Future類的子類腌乡,保存了協(xié)程運(yùn)行后的狀態(tài),用于未來獲取協(xié)程的結(jié)果夜牡。

import asyncio
import time

now = lambda: time.time()

async def do_some_work(x):
    print("waiting:", x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)#<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
loop.run_until_complete(task)
print(task)#<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
print("Time:",now()-start)

關(guān)于上面通過loop.create_task(coroutine)創(chuàng)建task,同樣的可以通過 asyncio.ensure_future(coroutine)創(chuàng)建task.使用這兩種方式的區(qū)別在官網(wǎng)上有提及与纽。task/future以及使用async創(chuàng)建的都是awaitable對(duì)象,都可以在await關(guān)鍵字之后使用塘装。future對(duì)象意味著在未來返回結(jié)果急迂,可以搭配回調(diào)函數(shù)使用。

三蹦肴、綁定回調(diào)

當(dāng)使用ensure_feature創(chuàng)建任務(wù)的時(shí)候僚碎,可以使用任務(wù)的task.add_done_callback(callback)方法,獲得對(duì)象的協(xié)程返回值阴幌。


async def do_some_work(x):
    print("waiting:",x)
    return "Done after {}s".format(x)


def callback(future):
    print("callback:",future.result())


start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)

#結(jié)果
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s

四勺阐、阻塞和await

前面提到asynic函數(shù)內(nèi)部可以使用await 來針對(duì)耗時(shí)的操作進(jìn)行掛起卷中。


async def do_some_work(x):
    print("waiting:",x)
    # await 后面就是調(diào)用耗時(shí)的操作
    await asyncio.sleep(x)
    return "Done after {}s".format(x)


start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

五、并發(fā)和并行

并發(fā)通常是指有多個(gè)任務(wù)需要同時(shí)進(jìn)行渊抽,并行則是同一個(gè)時(shí)刻有多個(gè)任務(wù)執(zhí)行.
當(dāng)有多個(gè)任務(wù)需要并行時(shí)蟆豫,可以將任務(wù)先放置在任務(wù)隊(duì)列中,然后將任務(wù)隊(duì)列傳給asynicio.wait方法懒闷,這個(gè)方法會(huì)同時(shí)并行運(yùn)行隊(duì)列中的任務(wù)无埃。將其注冊(cè)到事件循環(huán)中。

async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

asyncio.wait(tasks) 也可以使用 asyncio.gather(tasks) 毛雇,前者接收一堆task,后者接受一個(gè)task列表侦镇。asyncio.wait(tasks)方法返回值是兩組task/future的set.dones, pendings = await asyncio.wait(tasks)其中dones是task的set,pendings是future的set灵疮。asyncio.gather(tasks) 返回一個(gè)結(jié)果的list。(見下一節(jié)的列子)

六壳繁、嵌套協(xié)程

使用async可以定義協(xié)程震捣,協(xié)程用于耗時(shí)的io操作,我們也可以封裝更多的io操作過程闹炉,這樣就實(shí)現(xiàn)了嵌套的協(xié)程蒿赢,即一個(gè)協(xié)程中await了另外一個(gè)協(xié)程,如此連接起來渣触。


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

    # results = await asyncio.gather(*tasks)
    # for result in results:
    #     print("Task ret:",result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

使用asyncio.wait的結(jié)果如下羡棵,可見返回的結(jié)果dones并不一定按照順序輸出

waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
Time: 4.006587505340576

使用 await asyncio.gather(*tasks)得到的結(jié)果如下,是按照列表順序進(jìn)行返回的

waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004234313964844

上面的程序?qū)ain也定義為協(xié)程。我們也可以不在main協(xié)程函數(shù)里處理結(jié)果嗅钻,直接返回await的內(nèi)容皂冰,那么最外層的run_until_complete將會(huì)返回main協(xié)程的結(jié)果。

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.gather(*tasks)
    #return await asyncio.wait(tasks)也可以使用养篓。注意gather方法需要*這個(gè)標(biāo)記

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
    print("Task ret:",result)

print("Time:", now()-start)

也可以使用as_complete方法實(shí)現(xiàn)嵌套協(xié)程

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

七秃流、協(xié)程停止

創(chuàng)建future的時(shí)候,task為pending柳弄,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running舶胀,調(diào)用完畢自然就是done,如果需要停止事件循環(huán)碧注,就需要先把task取消嚣伐。可以使用asyncio.Task獲取事件循環(huán)的task萍丐。
future對(duì)象有如下幾個(gè)狀態(tài):Pending纤控、Running、Done碉纺、Cacelled

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("Time:",now()-start)

啟動(dòng)事件循環(huán)之后船万,馬上ctrl+c刻撒,會(huì)觸發(fā)run_until_complete的執(zhí)行異常 KeyBorardInterrupt。然后通過循環(huán)asyncio.Task取消future耿导∩可以看到輸出如下:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True表示cannel成功,loop stop之后還需要再次開啟事件循環(huán)舱呻,最后在close醋火,不然還會(huì)拋出異常.

循環(huán)task,逐個(gè)cancel是一種方案箱吕,可是正如上面我們把task的列表封裝在main函數(shù)中芥驳,main函數(shù)外進(jìn)行事件循環(huán)的調(diào)用。這個(gè)時(shí)候茬高,main相當(dāng)于最外出的一個(gè)task兆旬,那么處理包裝的main函數(shù)即可。

文章來源

python中重要的模塊--asyncio
Python協(xié)程深入理解

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末怎栽,一起剝皮案震驚了整個(gè)濱河市丽猬,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌熏瞄,老刑警劉巖脚祟,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異强饮,居然都是意外死亡由桌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門邮丰,熙熙樓的掌柜王于貴愁眉苦臉地迎上來沥寥,“玉大人,你說我怎么就攤上這事柠座∫匮牛” “怎么了?”我有些...
    開封第一講書人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵妈经,是天一觀的道長淮野。 經(jīng)常有香客問我,道長吹泡,這世上最難降的妖魔是什么骤星? 我笑而不...
    開封第一講書人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮爆哑,結(jié)果婚禮上洞难,老公的妹妹穿的比我還像新娘。我一直安慰自己揭朝,他們只是感情好队贱,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開白布色冀。 她就那樣靜靜地躺著,像睡著了一般柱嫌。 火紅的嫁衣襯著肌膚如雪锋恬。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,708評(píng)論 1 305
  • 那天编丘,我揣著相機(jī)與錄音与学,去河邊找鬼。 笑死嘉抓,一個(gè)胖子當(dāng)著我的面吹牛索守,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播抑片,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼卵佛,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了蓝丙?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤望拖,失蹤者是張志新(化名)和其女友劉穎渺尘,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體说敏,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鸥跟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了盔沫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片医咨。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖架诞,靈堂內(nèi)的尸體忽然破棺而出拟淮,到底是詐尸還是另有隱情,我是刑警寧澤谴忧,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布很泊,位于F島的核電站,受9級(jí)特大地震影響沾谓,放射性物質(zhì)發(fā)生泄漏委造。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一均驶、第九天 我趴在偏房一處隱蔽的房頂上張望昏兆。 院中可真熱鬧,春花似錦妇穴、人聲如沸爬虱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽饮潦。三九已至燃异,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間继蜡,已是汗流浹背回俐。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留稀并,地道東北人仅颇。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像碘举,于是被迫代替她去往敵國和親忘瓦。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 東方網(wǎng)記者Pak Colin 7月31日?qǐng)?bào)道:備受關(guān)注的上海少兒圖書館新圖書館將位于普陀長風(fēng)區(qū),預(yù)計(jì)將于2020完...
    hena9344閱讀 474評(píng)論 0 0
  • 第四章 七夕夜 七夕佳節(jié)蝙场,長安城繁花似錦凌停。夜晚更是燈火通明,男男女女求姻緣售滤,相會(huì)游玩絡(luò)繹不絕罚拟。 夜晚, 雪盈把為...
    寒筠閱讀 286評(píng)論 0 2
  • 使用typealias關(guān)鍵字給一個(gè)已經(jīng)存在的類型指定別名完箩。指定別名后赐俗,就可以使用該別名來聲明常量或者變量。 對(duì)于這...
    透支未來閱讀 1,259評(píng)論 0 1
  • 作家格拉德威爾在《異類》一書中指出:人們眼中的天才之所以卓越非凡弊知,并非天資超人一等阻逮,而是付出了持續(xù)不斷的努力,1萬...
    qtpifan閱讀 310評(píng)論 0 0
  • 果不其然 我還是浪費(fèi)了一塊石頭
    芮兒與晗昱閱讀 136評(píng)論 0 0