引子
最近把所有系統(tǒng)的python3 版本都更新到了python3.7扩淀,然后更新了一下代碼缰趋,發(fā)現(xiàn)這個(gè)版本改動(dòng)還是很大的脏嚷,之前更多還是在使用python2.7做ETL或者操作一些API丸氛,沒想到python的變化如此之大衔憨,看來自己還是太落伍了奋献。于是在知乎和官網(wǎng)上找資料學(xué)習(xí)了下站超,看到一篇講協(xié)程的文章很受啟發(fā)企锌,以后應(yīng)該會(huì)較多使用這個(gè)功能寸癌,之前使用的多進(jìn)程多線程效果都不明顯专筷,而協(xié)程應(yīng)該是一個(gè)python的效率利器。
前言
多進(jìn)程和多線程除了創(chuàng)建的開銷大之外還有一個(gè)難以根治的缺陷蒸苇,就是處理進(jìn)程之間或線程之間的協(xié)作問題磷蛹,因?yàn)槭且蕾嚩噙M(jìn)程和多線程的程序在不加鎖的情況下通常是不可控的,而協(xié)程則可以完美地解決協(xié)作問題溪烤,由用戶來決定協(xié)程之間的調(diào)度味咳。
總所周知,Python因?yàn)橛蠫IL(全局解釋鎖)這玩意檬嘀,不可能有真正的多線程的存在槽驶,因此很多情況下都會(huì)用multiprocessing實(shí)現(xiàn)并發(fā),而且在Python中應(yīng)用多線程還要注意關(guān)鍵地方的同步枪眉,不太方便捺檬,用協(xié)程代替多線程和多進(jìn)程是一個(gè)很好的選擇,因?yàn)樗说奶匦裕?em>主動(dòng)調(diào)用/退出贸铜,狀態(tài)保存堡纬,避免cpu上下文切換等…
協(xié)程
基本概念
協(xié)程聂受,又稱作Coroutine,通過 async/await 語法進(jìn)行聲明,是編寫異步應(yīng)用的推薦方式烤镐。
從字面上來理解蛋济,即協(xié)同運(yùn)行的例程,它是比是線程(thread)更細(xì)量級的用戶態(tài)線程炮叶,特點(diǎn)是允許用戶的主動(dòng)調(diào)用和主動(dòng)退出碗旅,掛起當(dāng)前的例程然后返回值或去執(zhí)行其他任務(wù),接著返回原來停下的點(diǎn)繼續(xù)執(zhí)行镜悉。等下祟辟,這是否有點(diǎn)奇怪?我們都知道一般函數(shù)都是線性執(zhí)行的侣肄,不可能說執(zhí)行到一半返回旧困,等會(huì)兒又跑到原來的地方繼續(xù)執(zhí)行。但一些熟悉python(or其他動(dòng)態(tài)語言)的童鞋都知道這可以做到稼锅,答案是用yield語句吼具。其實(shí)這里我們要感謝操作系統(tǒng)(OS)為我們做的工作,因?yàn)樗哂術(shù)etcontext和swapcontext這些特性矩距,通過系統(tǒng)調(diào)用拗盒,我們可以把上下文和狀態(tài)保存起來,切換到其他的上下文锥债,這些特性為coroutine的實(shí)現(xiàn)提供了底層的基礎(chǔ)陡蝇。操作系統(tǒng)的Interrupts和Traps機(jī)制則為這種實(shí)現(xiàn)提供了可能性,因此它看起來可能是下面這樣的:
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
理解生成器(generator)
學(xué)過生成器和迭代器的同學(xué)應(yīng)該都知道python有yield這個(gè)關(guān)鍵字赞弥,yield能把一個(gè)函數(shù)變成一個(gè)generator毅整,與return不同,yield在函數(shù)中返回值時(shí)會(huì)保存函數(shù)的狀態(tài)绽左,使下一次調(diào)用函數(shù)時(shí)會(huì)從上一次的狀態(tài)繼續(xù)執(zhí)行悼嫉,即從yield的下一條語句開始執(zhí)行,這樣做有許多好處拼窥,比如我們想要生成一個(gè)數(shù)列戏蔑,若該數(shù)列的存儲空間太大,而我們僅僅需要訪問前面幾個(gè)元素鲁纠,那么yield就派上用場了总棵,它實(shí)現(xiàn)了這種一邊循環(huán)一邊計(jì)算的機(jī)制,節(jié)省了存儲空間改含,提高了運(yùn)行效率情龄。
運(yùn)行協(xié)程
asyncio.run()
函數(shù)用來運(yùn)行最高層級的入口點(diǎn) "main()" 函數(shù)-
等待一個(gè)協(xié)程。以下代碼段會(huì)在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main())
-
asyncio.create_task()
函數(shù)用來并發(fā)運(yùn)行作為 asyncio任務(wù)
的多個(gè)協(xié)程骤视。async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}")
可等待對象
如果一個(gè)對象可以在 await 語句中使用鞍爱,那么它就是 可等待 對象。許多 asyncio API 都被設(shè)計(jì)為接受可等待對象专酗。
可等待 對象有三種主要類型: 協(xié)程, 任務(wù) 和 Future.
協(xié)程
Python 協(xié)程屬于 可等待 對象睹逃,因此可以在其他協(xié)程中被等待:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
重要
在本文檔中 "協(xié)程" 可用來表示兩個(gè)緊密關(guān)聯(lián)的概念:
-
協(xié)程函數(shù): 定義形式為
async def
的函數(shù); - 協(xié)程對象: 調(diào)用 協(xié)程函數(shù) 所返回的對象。
asyncio 也支持舊式的 基于生成器的 協(xié)程祷肯。
任務(wù)
任務(wù) 被用來設(shè)置日程以便 并發(fā) 執(zhí)行協(xié)程沉填。
當(dāng)一個(gè)協(xié)程通過 asyncio.create_task()
等函數(shù)被打包為一個(gè) 任務(wù),該協(xié)程將自動(dòng)排入日程準(zhǔn)備立即運(yùn)行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Future 對象
Future
是一種特殊的 低層級 可等待對象佑笋,表示一個(gè)異步操作的 最終結(jié)果翼闹。
當(dāng)一個(gè) Future 對象 被等待,這意味著協(xié)程將保持等待直到該 Future 對象在其他地方操作完畢允青。
在 asyncio 中需要 Future 對象以便允許通過 async/await 使用基于回調(diào)的代碼橄碾。
通常情況下 沒有必要 在應(yīng)用層級的代碼中創(chuàng)建 Future 對象卵沉。
Future 對象有時(shí)會(huì)由庫和某些 asyncio API 暴露給用戶颠锉,用作可等待對象:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一個(gè)很好的返回對象的低層級函數(shù)的示例是 loop.run_in_executor()
。
并發(fā)運(yùn)行任務(wù)
awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)
并發(fā) 運(yùn)行 aws 序列中的 可等待對象史汗。
如果 aws 中的某個(gè)可等待對象為協(xié)程琼掠,它將自動(dòng)作為一個(gè)任務(wù)加入日程。
如果所有可等待對象都成功完成停撞,結(jié)果將是一個(gè)由所有返回值聚合而成的列表瓷蛙。結(jié)果值的順序與 aws 中可等待對象的順序一致。
如果 return_exceptions 為 False
(默認(rèn))戈毒,所引發(fā)的首個(gè)異常會(huì)立即傳播給等待 gather()
的任務(wù)艰猬。aws序列中的其他可等待對象 不會(huì)被取消 并將繼續(xù)運(yùn)行。
如果 return_exceptions 為 True
埋市,異常會(huì)和成功的結(jié)果一樣處理冠桃,并聚合至結(jié)果列表。
如果 gather()
被取消道宅,所有被提交 (尚未完成) 的可等待對象也會(huì) 被取消食听。
如果 aws 序列中的任一 Task 或 Future 對象 被取消,它將被當(dāng)作引發(fā)了 CancelledError
一樣處理 -- 在此情況下 gather()
調(diào)用 不會(huì) 被取消污茵。這是為了防止一個(gè)已提交的 Task/Future 被取消導(dǎo)致其他 Tasks/Future 也被取消樱报。
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
爬蟲例子
使用爬蟲爬取豆瓣top250
from lxml import etree
from time import time
import asyncio
import aiohttp
url = "https://movie.douban.com/top250"
header = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36",
"content-type": "text/plain;charset=UTF-8",
}
async def fetch_content(url):
# await asyncio.sleep(1) # 防止請求過快 等待1秒
async with aiohttp.ClientSession(
headers=header, connector=aiohttp.TCPConnector(ssl=False)
) as session:
async with session.get(url) as response:
return await response.text()
async def parse(url):
page = await fetch_content(url)
html = etree.HTML(page)
xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
xpath_title = './/span[@class="title"]'
xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
xpath_descs = './/span[@class="inq"]'
xpath_links = './/div[@class="info"]/div[@class="hd"]/a'
pages = html.xpath(xpath_pages) # 所有頁面的鏈接都在底部獲取
fetch_list = []
result = []
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for p in pages:
fetch_list.append(url + p.get("href")) # 解析翻頁按鈕對應(yīng)的鏈接 組成完整后邊頁面鏈接
tasks = [fetch_content(url) for url in fetch_list] # 并行處理所有翻頁的頁面
pages = await asyncio.gather(*tasks)
# 并發(fā) 運(yùn)行 aws 序列中的 可等待對象。
# 如果 aws 中的某個(gè)可等待對象為協(xié)程泞当,它將自動(dòng)作為一個(gè)任務(wù)加入日程迹蛤。
# 如果所有可等待對象都成功完成,結(jié)果將是一個(gè)由所有返回值聚合而成的列表。結(jié)果值的順序與 aws 中可等待對象的順序一致盗飒。
for page in pages:
html = etree.HTML(page)
for element_movie in html.xpath(xpath_movie):
result.append(element_movie)
for i, movie in enumerate(result, 1):
title = movie.find(xpath_title).text
desc = (
"<" + movie.find(xpath_descs).text + ">"
if movie.find(xpath_descs) is not None
else None
)
link = movie.find(xpath_links).get("href")
print(i, title, desc, link)
async def main():
start = time()
for i in range(5):
await parse(url)
end = time()
print("Cost {} seconds".format((end - start) / 5))
if __name__ == "__main__":
asyncio.run(main())