簡(jiǎn)介
學(xué)習(xí)這個(gè)主要是因?yàn)樵谔幚硪粋€(gè)本地測(cè)試的時(shí)候,需要解決這個(gè)模塊的一個(gè)問題驴娃。在瀏覽解決方案的時(shí)候奏候,發(fā)現(xiàn)大家會(huì)使用這個(gè)模塊來進(jìn)行python爬蟲工作。感覺蠻有意思的唇敞,準(zhǔn)備花一天補(bǔ)補(bǔ)基礎(chǔ)蔗草。
asyncio 可以實(shí)現(xiàn)異步網(wǎng)絡(luò)操作咒彤、并發(fā)、協(xié)程咒精。當(dāng)然目前實(shí)現(xiàn)協(xié)程的不僅僅asyncio還有tornado等模塊镶柱。
創(chuàng)建一個(gè)asyncio的步驟如下
- 創(chuàng)建一個(gè)event_loop 事件循環(huán),當(dāng)啟動(dòng)時(shí)模叙,程序開啟一個(gè)無限循環(huán)歇拆,把一些函數(shù)注冊(cè)到事件循環(huán)上,當(dāng)滿足事件發(fā)生的時(shí)候范咨,調(diào)用相應(yīng)的協(xié)程函數(shù)故觅。
- 創(chuàng)建協(xié)程: 使用async關(guān)鍵字定義的函數(shù)就是一個(gè)協(xié)程對(duì)象。在協(xié)程函數(shù)內(nèi)部可以使用await關(guān)鍵字用于阻塞操作的掛起渠啊。
- 將協(xié)程注冊(cè)到事件循環(huán)中输吏。協(xié)程的調(diào)用不會(huì)立即執(zhí)行函數(shù),而是會(huì)返回一個(gè)協(xié)程對(duì)象替蛉。協(xié)程對(duì)象需要注冊(cè)到事件循環(huán)贯溅,由事件循環(huán)調(diào)用。
基礎(chǔ)知識(shí)
一躲查、定義一個(gè)協(xié)程
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print("waiting:", x)
start = now()
# 這里是一個(gè)協(xié)程對(duì)象盗迟,這個(gè)時(shí)候do_some_work函數(shù)并沒有執(zhí)行
coroutine = do_some_work(2)
print(coroutine)
# 創(chuàng)建一個(gè)事件loop
loop = asyncio.get_event_loop()
# 將協(xié)程注冊(cè)到事件循環(huán),并啟動(dòng)事件循環(huán)
loop.run_until_complete(coroutine)
print("Time:",now()-start)
二熙含、創(chuàng)建一個(gè)task
一個(gè)協(xié)程對(duì)象就是一個(gè)原生可以掛起的函數(shù)罚缕,任務(wù)則是對(duì)協(xié)程進(jìn)一步封裝,其中包含了任務(wù)的各種狀態(tài)怎静。在上面的代碼中邮弹,在注冊(cè)事件循環(huán)的時(shí)候,其實(shí)是run_until_complete方法將協(xié)程包裝成為了一個(gè)任務(wù)(task)對(duì)象蚓聘。 task對(duì)象是Future類的子類腌乡,保存了協(xié)程運(yùn)行后的狀態(tài),用于未來獲取協(xié)程的結(jié)果夜牡。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:", x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)#<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex2.py:13>>
loop.run_until_complete(task)
print(task)#<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex2.py:13> result=None>
print("Time:",now()-start)
關(guān)于上面通過loop.create_task(coroutine)創(chuàng)建task,同樣的可以通過 asyncio.ensure_future(coroutine)創(chuàng)建task.使用這兩種方式的區(qū)別在官網(wǎng)上有提及与纽。task/future以及使用async創(chuàng)建的都是awaitable對(duì)象,都可以在await關(guān)鍵字之后使用塘装。future對(duì)象意味著在未來返回結(jié)果急迂,可以搭配回調(diào)函數(shù)使用。
三蹦肴、綁定回調(diào)
當(dāng)使用ensure_feature創(chuàng)建任務(wù)的時(shí)候僚碎,可以使用任務(wù)的task.add_done_callback(callback)方法,獲得對(duì)象的協(xié)程返回值阴幌。
async def do_some_work(x):
print("waiting:",x)
return "Done after {}s".format(x)
def callback(future):
print("callback:",future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)
print(task)
loop.run_until_complete(task)
#結(jié)果
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
四勺阐、阻塞和await
前面提到asynic函數(shù)內(nèi)部可以使用await 來針對(duì)耗時(shí)的操作進(jìn)行掛起卷中。
async def do_some_work(x):
print("waiting:",x)
# await 后面就是調(diào)用耗時(shí)的操作
await asyncio.sleep(x)
return "Done after {}s".format(x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)
五、并發(fā)和并行
并發(fā)通常是指有多個(gè)任務(wù)需要同時(shí)進(jìn)行渊抽,并行則是同一個(gè)時(shí)刻有多個(gè)任務(wù)執(zhí)行.
當(dāng)有多個(gè)任務(wù)需要并行時(shí)蟆豫,可以將任務(wù)先放置在任務(wù)隊(duì)列中,然后將任務(wù)隊(duì)列傳給asynicio.wait方法懒闷,這個(gè)方法會(huì)同時(shí)并行運(yùn)行隊(duì)列中的任務(wù)无埃。將其注冊(cè)到事件循環(huán)中。
async def do_some_work(x):
print("Waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
asyncio.wait(tasks) 也可以使用 asyncio.gather(tasks) 毛雇,前者接收一堆task,后者接受一個(gè)task列表侦镇。asyncio.wait(tasks)方法返回值是兩組task/future的set.dones, pendings = await asyncio.wait(tasks)
其中dones是task的set,pendings是future的set灵疮。asyncio.gather(tasks) 返回一個(gè)結(jié)果的list。(見下一節(jié)的列子)
六壳繁、嵌套協(xié)程
使用async可以定義協(xié)程震捣,協(xié)程用于耗時(shí)的io操作,我們也可以封裝更多的io操作過程闹炉,這樣就實(shí)現(xiàn)了嵌套的協(xié)程蒿赢,即一個(gè)協(xié)程中await了另外一個(gè)協(xié)程,如此連接起來渣触。
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print("Task ret:", task.result())
# results = await asyncio.gather(*tasks)
# for result in results:
# print("Task ret:",result)
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)
使用asyncio.wait的結(jié)果如下羡棵,可見返回的結(jié)果dones并不一定按照順序輸出
waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
Time: 4.006587505340576
使用 await asyncio.gather(*tasks)得到的結(jié)果如下,是按照列表順序進(jìn)行返回的
waiting: 1
waiting: 2
waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004234313964844
上面的程序?qū)ain也定義為協(xié)程。我們也可以不在main協(xié)程函數(shù)里處理結(jié)果嗅钻,直接返回await的內(nèi)容皂冰,那么最外層的run_until_complete將會(huì)返回main協(xié)程的結(jié)果。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
return await asyncio.gather(*tasks)
#return await asyncio.wait(tasks)也可以使用养篓。注意gather方法需要*這個(gè)標(biāo)記
start = now()
loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
print("Task ret:",result)
print("Time:", now()-start)
也可以使用as_complete方法實(shí)現(xiàn)嵌套協(xié)程
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print("waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
for task in asyncio.as_completed(tasks):
result = await task
print("Task ret: {}".format(result))
start = now()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)
七秃流、協(xié)程停止
創(chuàng)建future的時(shí)候,task為pending柳弄,事件循環(huán)調(diào)用執(zhí)行的時(shí)候當(dāng)然就是running舶胀,調(diào)用完畢自然就是done,如果需要停止事件循環(huán)碧注,就需要先把task取消嚣伐。可以使用asyncio.Task獲取事件循環(huán)的task萍丐。
future對(duì)象有如下幾個(gè)狀態(tài):Pending纤控、Running、Done碉纺、Cacelled
import asyncio
import time
now = lambda :time.time()
async def do_some_work(x):
print("Waiting:",x)
await asyncio.sleep(x)
return "Done after {}s".format(x)
coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3),
]
start = now()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print("Time:",now()-start)
啟動(dòng)事件循環(huán)之后船万,馬上ctrl+c刻撒,會(huì)觸發(fā)run_until_complete的執(zhí)行異常 KeyBorardInterrupt。然后通過循環(huán)asyncio.Task取消future耿导∩可以看到輸出如下:
Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547
True表示cannel成功,loop stop之后還需要再次開啟事件循環(huán)舱呻,最后在close醋火,不然還會(huì)拋出異常.
循環(huán)task,逐個(gè)cancel是一種方案箱吕,可是正如上面我們把task的列表封裝在main函數(shù)中芥驳,main函數(shù)外進(jìn)行事件循環(huán)的調(diào)用。這個(gè)時(shí)候茬高,main相當(dāng)于最外出的一個(gè)task兆旬,那么處理包裝的main函數(shù)即可。