1. 前言
在執(zhí)行一些 IO 密集型任務(wù)的時候按脚,程序常常會因為等待 IO 而阻塞于毙。比如在網(wǎng)絡(luò)爬蟲中,如果我們使用 requests 庫來進(jìn)行請求的話辅搬,如果網(wǎng)站響應(yīng)速度過慢唯沮,程序一直在等待網(wǎng)站響應(yīng),最后導(dǎo)致其爬取效率是非常非常低的堪遂。
為了解決這類問題介蛉,本文就來探討一下 Python 中異步協(xié)程來加速的方法,此種方法對于 IO 密集型任務(wù)非常有效溶褪。如將其應(yīng)用到網(wǎng)絡(luò)爬蟲中甘耿,爬取效率甚至可以成百倍地提升。
注:本文協(xié)程使用 async/await
來實(shí)現(xiàn)竿滨,需要 Python 3.5 及以上版本佳恬。
2. 基本了解
在了解異步協(xié)程之前,我們首先得了解一些基礎(chǔ)概念于游,如阻塞和非阻塞毁葱、同步和異步、多進(jìn)程和協(xié)程贰剥。
2.1 阻塞
阻塞狀態(tài)指程序未得到所需計算資源時被掛起的狀態(tài)倾剿。程序在等待某個操作完成期間,自身無法繼續(xù)干別的事情,則稱該程序在該操作上是阻塞的前痘。
常見的阻塞形式有:網(wǎng)絡(luò) I/O 阻塞凛捏、磁盤 I/O 阻塞、用戶輸入阻塞等芹缔。阻塞是無處不在的坯癣,包括 CPU 切換上下文時,所有的進(jìn)程都無法真正干事情最欠,它們也會被阻塞示罗。如果是多核 CPU 則正在執(zhí)行上下文切換操作的核不可被利用。
2.2 非阻塞
程序在等待某操作過程中芝硬,自身不被阻塞蚜点,可以繼續(xù)運(yùn)行干別的事情,則稱該程序在該操作上是非阻塞的拌阴。
非阻塞并不是在任何程序級別绍绘、任何情況下都可以存在的。
僅當(dāng)程序封裝的級別可以囊括獨(dú)立的子程序單元時迟赃,它才可能存在非阻塞狀態(tài)陪拘。
非阻塞的存在是因為阻塞存在,正因為某個操作阻塞導(dǎo)致的耗時與效率低下捺氢,我們才要把它變成非阻塞的藻丢。
2.3 同步
不同程序單元為了完成某個任務(wù)剪撬,在執(zhí)行過程中需靠某種通信方式以協(xié)調(diào)一致摄乒,稱這些程序單元是同步執(zhí)行的。
例如購物系統(tǒng)中更新商品庫存残黑,需要用“行鎖”作為通信信號馍佑,讓不同的更新請求強(qiáng)制排隊順序執(zhí)行,那更新庫存的操作是同步的梨水。
簡言之拭荤,同步意味著有序。
2.4 異步
為完成某個任務(wù)疫诽,不同程序單元之間過程中無需通信協(xié)調(diào)舅世,也能完成任務(wù)的方式,不相關(guān)的程序單元之間可以是異步的奇徒。
例如雏亚,爬蟲下載網(wǎng)頁。調(diào)度程序調(diào)用下載程序后摩钙,即可調(diào)度其他任務(wù)罢低,而無需與該下載任務(wù)保持通信以協(xié)調(diào)行為。不同網(wǎng)頁的下載胖笛、保存等操作都是無關(guān)的网持,也無需相互通知協(xié)調(diào)宜岛。這些異步操作的完成時刻并不確定。
簡言之功舀,異步意味著無序萍倡。
2.5 多進(jìn)程
多進(jìn)程就是利用 CPU 的多核優(yōu)勢,在同一時間并行地執(zhí)行多個任務(wù)日杈,可以大大提高執(zhí)行效率遣铝。
2.6 協(xié)程
協(xié)程,英文叫做 Coroutine
莉擒,又稱微線程酿炸,纖程,協(xié)程是一種用戶態(tài)的輕量級線程涨冀。
協(xié)程擁有自己的寄存器上下文和棧填硕。協(xié)程調(diào)度切換時,將寄存器上下文和棧保存到其他地方鹿鳖,在切回來的時候扁眯,恢復(fù)先前保存的寄存器上下文和棧。因此協(xié)程能保留上一次調(diào)用時的狀態(tài)翅帜,即所有局部狀態(tài)的一個特定組合姻檀,每次過程重入時,就相當(dāng)于進(jìn)入上一次調(diào)用的狀態(tài)涝滴。
協(xié)程本質(zhì)上是個單進(jìn)程绣版,協(xié)程相對于多進(jìn)程來說,無需線程上下文切換的開銷歼疮,無需原子操作鎖定及同步的開銷杂抽,編程模型也非常簡單。
我們可以使用協(xié)程來實(shí)現(xiàn)異步操作韩脏,比如在網(wǎng)絡(luò)爬蟲場景下缩麸,我們發(fā)出一個請求之后,需要等待一定的時間才能得到響應(yīng)赡矢,但其實(shí)在這個等待過程中杭朱,程序可以干許多其他的事情,等到響應(yīng)得到之后才切換回來繼續(xù)處理吹散,這樣可以充分利用 CPU 和其他資源弧械,這就是異步協(xié)程的優(yōu)勢。
3. 異步協(xié)程用法
接下來讓我們來了解下協(xié)程的實(shí)現(xiàn)送浊,從 Python 3.4 開始梦谜,Python 中加入了協(xié)程的概念,但這個版本的協(xié)程還是以生成器對象為基礎(chǔ)的,在 Python 3.5 則增加了 async/await
唁桩,使得協(xié)程的實(shí)現(xiàn)更加方便闭树。
Python 中使用協(xié)程最常用的庫莫過于 asyncio
,所以本文會以 asyncio
為基礎(chǔ)來介紹協(xié)程的使用荒澡。
首先我們需要了解下面幾個概念:
event_loop
:事件循環(huán)报辱,相當(dāng)于一個無限循環(huán),我們可以把一些函數(shù)注冊到這個事件循環(huán)上单山,當(dāng)滿足條件發(fā)生的時候碍现,就會調(diào)用對應(yīng)的處理方法。
coroutine
:中文翻譯叫協(xié)程米奸,在 Python 中常指代為協(xié)程對象類型昼接,我們可以將協(xié)程對象注冊到時間循環(huán)中,它會被事件循環(huán)調(diào)用悴晰。我們可以使用 async 關(guān)鍵字來定義一個方法慢睡,這個方法在調(diào)用時不會立即被執(zhí)行,而是返回一個協(xié)程對象铡溪。
task
:任務(wù)漂辐,它是對協(xié)程對象的進(jìn)一步封裝,包含了任務(wù)的各個狀態(tài)棕硫。
future
:代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果髓涯,實(shí)際上和 task
沒有本質(zhì)區(qū)別。
另外我們還需要了解 async/await
關(guān)鍵字哈扮,它是從 Python 3.5 才出現(xiàn)的纬纪,專門用于定義協(xié)程。其中灶泵,async
定義一個協(xié)程育八,await
用來掛起阻塞方法的執(zhí)行对途。
3.1 定義協(xié)程
首先我們來定義一個協(xié)程赦邻,體驗一下它和普通進(jìn)程在實(shí)現(xiàn)上的不同之處侵蒙,代碼如下:
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
運(yùn)行結(jié)果:
Coroutine: <coroutine object execute at 0x1034cf830>
After calling execute
Number: 1
After calling loop
首先我們引入了 asyncio 這個包霎迫,這樣我們才可以使用 async 和 await舌仍,然后我們使用 async 定義了一個 execute() 方法揪胃,方法接收一個數(shù)字參數(shù)罩缴,方法執(zhí)行之后會打印這個數(shù)字趾牧。
隨后我們直接調(diào)用了這個方法铃在,然而這個方法并沒有執(zhí)行遥诉,而是返回了一個 coroutine 協(xié)程對象须床。隨后我們使用 get_event_loop()
方法創(chuàng)建了一個事件循環(huán) loop铐料,并調(diào)用了 loop 對象的 run_until_complete()
方法將協(xié)程注冊到事件循環(huán) loop 中,然后啟動。最后我們才看到了 execute()
方法打印了輸出結(jié)果钠惩。
可見柒凉,async 定義的方法就會變成一個無法直接執(zhí)行的 coroutine 對象,必須將其注冊到事件循環(huán)中才可以執(zhí)行篓跛。
上文我們還提到了 task膝捞,它是對 coroutine 對象的進(jìn)一步封裝,它里面相比 coroutine 對象多了運(yùn)行狀態(tài)愧沟,比如 running蔬咬、finished 等,我們可以用這些狀態(tài)來獲取協(xié)程對象的執(zhí)行情況沐寺。
在上面的例子中林艘,當(dāng)我們將 coroutine 對象傳遞給 run_until_complete()
方法的時候,實(shí)際上它進(jìn)行了一個操作就是將 coroutine 封裝成了 task 對象混坞,我們也可以顯式地進(jìn)行聲明北启,如下所示:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
運(yùn)行結(jié)果:
Coroutine: <coroutine object execute at 0x10e0f7830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop
這里我們定義了 loop 對象之后,接著調(diào)用了它的 create_task()
方法將 coroutine 對象轉(zhuǎn)化為了 task 對象拔第,隨后我們打印輸出一下咕村,發(fā)現(xiàn)它是 pending 狀態(tài)。接著我們將 task 對象添加到事件循環(huán)中得到執(zhí)行蚊俺,隨后我們再打印輸出一下 task 對象懈涛,發(fā)現(xiàn)它的狀態(tài)就變成了 finished,同時還可以看到其 result 變成了 1泳猬,也就是我們定義的 execute() 方法的返回結(jié)果批钠。
另外定義 task 對象還有一種方式,就是直接通過 asyncio 的 ensure_future()
方法得封,返回結(jié)果也是 task 對象埋心,這樣的話我們就可以不借助于 loop 來定義,即使我們還沒有聲明 loop 也可以提前定義好 task 對象忙上,寫法如下:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
運(yùn)行結(jié)果:
Coroutine: <coroutine object execute at 0x10aa33830>
After calling execute
Task: <Task pending coro=<execute() running at demo.py:4>>
Number: 1
Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>
After calling loop
發(fā)現(xiàn)其效果都是一樣的拷呆。
3.2 綁定回調(diào)
另外我們也可以為某個 task 綁定一個回調(diào)方法,來看下面的例子:
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task):
print('Status:', task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
在這里我們定義了一個 request()
方法疫粥,請求了百度茬斧,返回狀態(tài)碼,但是這個方法里面我們沒有任何 print() 語句梗逮。隨后我們定義了一個 callback()
方法项秉,這個方法接收一個參數(shù),是 task 對象慷彤,然后調(diào)用 print() 方法打印了 task 對象的結(jié)果娄蔼。這樣我們就定義好了一個 coroutine 對象和一個回調(diào)方法怖喻,我們現(xiàn)在希望的效果是,當(dāng) coroutine 對象執(zhí)行完畢之后岁诉,就去執(zhí)行聲明的 callback() 方法罢防。
那么它們二者怎樣關(guān)聯(lián)起來呢?很簡單唉侄,只需要調(diào)用add_done_callback()
方法即可咒吐,我們將 callback() 方法傳遞給了封裝好的 task 對象,這樣當(dāng) task 執(zhí)行完畢之后就可以調(diào)用 callback() 方法了属划,同時 task 對象還會作為參數(shù)傳遞給 callback() 方法恬叹,調(diào)用 task 對象的 result() 方法就可以獲取返回結(jié)果了。
運(yùn)行結(jié)果:
Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>
Status: <Response [200]>
Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>
實(shí)際上不用回調(diào)方法同眯,直接在 task 運(yùn)行完畢之后也可以直接調(diào)用 result() 方法獲取結(jié)果绽昼,如下所示:
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())
運(yùn)行結(jié)果是一樣的:
Task: <Task pending coro=<request() running at demo.py:4>>
Task: <Task finished coro=<request() done, defined at demo.py:4> result=<Response [200]>>
Task Result: <Response [200]>
3.3 多任務(wù)協(xié)程
上面的例子我們只執(zhí)行了一次請求,如果我們想執(zhí)行多次請求應(yīng)該怎么辦呢须蜗?我們可以定義一個 task 列表硅确,然后使用 asyncio 的 wait() 方法即可執(zhí)行,看下面的例子:
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())
這里我們使用一個 for 循環(huán)創(chuàng)建了五個 task明肮,組成了一個列表菱农,然后把這個列表首先傳遞給了 asyncio 的 wait() 方法,然后再將其注冊到時間循環(huán)中柿估,就可以發(fā)起五個任務(wù)了循未。最后我們再將任務(wù)的運(yùn)行結(jié)果輸出出來,運(yùn)行結(jié)果如下:
Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
可以看到五個任務(wù)被順次執(zhí)行了秫舌,并得到了運(yùn)行結(jié)果的妖。
3.4 協(xié)程實(shí)現(xiàn)
前面說了這么一通,又是 async足陨,又是 coroutine嫂粟,又是 task,又是 callback墨缘,但似乎并沒有看出協(xié)程的優(yōu)勢靶呛纭?反而寫法上更加奇怪和麻煩了飒房,別急搁凸,上面的案例只是為后面的使用作鋪墊媚值,接下來我們正式來看下協(xié)程在解決 IO 密集型任務(wù)上有怎樣的優(yōu)勢吧狠毯!
上面的代碼中,我們用一個網(wǎng)絡(luò)請求作為示例褥芒,這就是一個耗時等待的操作嚼松,因為我們請求網(wǎng)頁之后需要等待頁面響應(yīng)并返回結(jié)果嫡良。耗時等待的操作一般都是 IO 操作,比如文件讀取献酗、網(wǎng)絡(luò)請求等等寝受。協(xié)程對于處理這種操作是有很大優(yōu)勢的,當(dāng)遇到需要等待的情況的時候罕偎,程序可以暫時掛起很澄,轉(zhuǎn)而去執(zhí)行其他的操作,從而避免一直等待一個程序而耗費(fèi)過多的時間颜及,充分利用資源甩苛。
為了表現(xiàn)出協(xié)程的優(yōu)勢,我們需要先創(chuàng)建一個合適的實(shí)驗環(huán)境俏站,最好的方法就是模擬一個需要等待一定時間才可以獲取返回結(jié)果的網(wǎng)頁讯蒲,上面的代碼中使用了百度,但百度的響應(yīng)太快了肄扎,而且響應(yīng)速度也會受本機(jī)網(wǎng)速影響墨林,所以最好的方式是自己在本地模擬一個慢速服務(wù)器,這里我們選用 Flask犯祠。
如果沒有安裝 Flask 的話可以執(zhí)行如下命令安裝:
pip3 install flask
然后編寫服務(wù)器代碼如下:
from flask import Flask
import time
app = Flask(__name__)
@app.route('/')
def index():
time.sleep(3)
return 'Hello!'
if __name__ == '__main__':
app.run(threaded=True)
這里我們定義了一個 Flask 服務(wù)旭等,主入口是 index() 方法,方法里面先調(diào)用了 sleep() 方法休眠 3 秒衡载,然后接著再返回結(jié)果辆雾,也就是說,每次請求這個接口至少要耗時 3 秒月劈,這樣我們就模擬了一個慢速的服務(wù)接口度迂。
注意這里服務(wù)啟動的時候,run() 方法加了一個參數(shù) threaded猜揪,這表明 Flask 啟動了多線程模式惭墓,不然默認(rèn)是只有一個線程的。如果不開啟多線程模式而姐,同一時刻遇到多個請求的時候腊凶,只能順次處理,這樣即使我們使用協(xié)程異步請求了這個服務(wù)拴念,也只能一個一個排隊等待钧萍,瓶頸就會出現(xiàn)在服務(wù)端。所以政鼠,多線程模式是有必要打開的风瘦。
啟動之后,F(xiàn)lask 應(yīng)該默認(rèn)會在 127.0.0.1:5000 上運(yùn)行公般,運(yùn)行之后控制臺輸出結(jié)果如下:
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
接下來我們再重新使用上面的方法請求一遍:
import asyncio
import requests
import time
start = time.time()
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = requests.get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
在這里我們還是創(chuàng)建了五個 task万搔,然后將 task 列表傳給 wait() 方法并注冊到時間循環(huán)中執(zhí)行胡桨。
運(yùn)行結(jié)果如下:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.049368143081665
可以發(fā)現(xiàn)和正常的請求并沒有什么兩樣,依然還是順次執(zhí)行的瞬雹,耗時 15 秒昧谊,平均一個請求耗時 3 秒,說好的異步處理呢酗捌?
其實(shí)呢诬,要實(shí)現(xiàn)異步處理,我們得先要有掛起的操作胖缤,當(dāng)一個任務(wù)需要等待 IO 結(jié)果的時候馅巷,可以掛起當(dāng)前任務(wù),轉(zhuǎn)而去執(zhí)行其他任務(wù)草姻,這樣我們才能充分利用好資源钓猬,上面方法都是一本正經(jīng)的串行走下來,連個掛起都沒有撩独,怎么可能實(shí)現(xiàn)異步敞曹?想太多了。
要實(shí)現(xiàn)異步综膀,接下來我們再了解一下 await 的用法澳迫,使用 await 可以將耗時等待的操作掛起,讓出控制權(quán)剧劝。當(dāng)協(xié)程執(zhí)行的時候遇到 await橄登,時間循環(huán)就會將本協(xié)程掛起,轉(zhuǎn)而去執(zhí)行別的協(xié)程讥此,直到其他的協(xié)程掛起或執(zhí)行完畢拢锹。
所以,我們可能會將代碼中的 request() 方法改成如下的樣子:
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await requests.get(url)
print('Get response from', url, 'Result:', response.text)
僅僅是在 requests 前面加了一個 await萄喳,然而執(zhí)行以下代碼卒稳,會得到如下報錯:
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Cost time: 15.048935890197754
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
File "demo.py", line 10, in request
status = await requests.get(url)
TypeError: object Response can't be used in 'await' expression
這次它遇到 await 方法確實(shí)掛起了,也等待了他巨,但是最后卻報了這么個錯充坑,這個錯誤的意思是 requests 返回的 Response 對象不能和 await 一起使用,為什么呢染突?因為根據(jù)官方文檔說明捻爷,await 后面的對象必須是如下格式之一:
- A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象份企。
- A generator-based coroutine object returned from a function decorated with types.coroutine()也榄,一個由 types.coroutine() 修飾的生成器,這個生成器可以返回 coroutine 對象薪棒。
- An object with an await__ method returning an iterator手蝎,一個包含 __await 方法的對象返回的一個迭代器榕莺。
可以參見:https://www.python.org/dev/peps/pep-0492/#await-expression俐芯。
reqeusts 返回的 Response 不符合上面任一條件棵介,因此就會報上面的錯誤了。
那么有的小伙伴就發(fā)現(xiàn)了吧史,既然 await 后面可以跟一個 coroutine 對象邮辽,那么我用 async 把請求的方法改成 coroutine 對象不就可以了嗎?所以就改寫成如下的樣子:
import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'Result:', response.text)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
這里我們將請求頁面的方法獨(dú)立出來贸营,并用 async 修飾吨述,這樣就得到了一個 coroutine 對象,我們運(yùn)行一下看看:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447
還是不行钞脂,它還不是異步執(zhí)行揣云,也就是說我們僅僅將涉及 IO 操作的代碼封裝到 async 修飾的方法里面是不可行的!我們必須要使用支持異步操作的請求方式才可以實(shí)現(xiàn)真正的異步冰啃,所以這里就需要 aiohttp 派上用場了邓夕。
3.5 使用 aiohttp
aiohttp 是一個支持異步請求的庫,利用它和 asyncio 配合我們可以非常方便地實(shí)現(xiàn)異步請求操作阎毅。
安裝方式如下:
pip3 install aiohttp
官方文檔鏈接為:https://aiohttp.readthedocs.io/焚刚,它分為兩部分,一部分是 Client扇调,一部分是 Server矿咕,詳細(xì)的內(nèi)容可以參考官方文檔。
下面我們將 aiohttp 用上來狼钮,將代碼改成如下樣子:
import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = await get(url)
print('Get response from', url, 'Result:', result)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)
在這里我們將請求庫由 requests 改成了 aiohttp碳柱,通過 aiohttp 的 ClientSession 類的 get() 方法進(jìn)行請求,結(jié)果如下:
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 3.0199508666992188
成功了熬芜!我們發(fā)現(xiàn)這次請求的耗時由 15 秒變成了 3 秒士聪,耗時直接變成了原來的 1/5。
代碼里面我們使用了 await猛蔽,后面跟了 get() 方法剥悟,在執(zhí)行這五個協(xié)程的時候,如果遇到了 await曼库,那么就會將當(dāng)前協(xié)程掛起区岗,轉(zhuǎn)而去執(zhí)行其他的協(xié)程,直到其他的協(xié)程也掛起或執(zhí)行完畢毁枯,再進(jìn)行下一個協(xié)程的執(zhí)行慈缔。
開始運(yùn)行時,時間循環(huán)會運(yùn)行第一個 task种玛,針對第一個 task 來說藐鹤,當(dāng)執(zhí)行到第一個 await 跟著的 get() 方法時瓤檐,它被掛起,但這個 get() 方法第一步的執(zhí)行是非阻塞的娱节,掛起之后立馬被喚醒挠蛉,所以立即又進(jìn)入執(zhí)行,創(chuàng)建了 ClientSession 對象肄满,接著遇到了第二個 await谴古,調(diào)用了 session.get() 請求方法,然后就被掛起了稠歉,由于請求需要耗時很久掰担,所以一直沒有被喚醒,好第一個 task 被掛起了怒炸,那接下來該怎么辦呢带饱?事件循環(huán)會尋找當(dāng)前未被掛起的協(xié)程繼續(xù)執(zhí)行,于是就轉(zhuǎn)而執(zhí)行第二個 task 了阅羹,也是一樣的流程操作勺疼,直到執(zhí)行了第五個 task 的 session.get() 方法之后,全部的 task 都被掛起了灯蝴。所有 task 都已經(jīng)處于掛起狀態(tài)恢口,那咋辦?只好等待了穷躁。3 秒之后耕肩,幾個請求幾乎同時都有了響應(yīng),然后幾個 task 也被喚醒接著執(zhí)行问潭,輸出請求結(jié)果猿诸,最后耗時,3 秒狡忙!
怎么樣梳虽?這就是異步操作的便捷之處,當(dāng)遇到阻塞式操作時灾茁,任務(wù)被掛起窜觉,程序接著去執(zhí)行其他的任務(wù),而不是傻傻地等著北专,這樣可以充分利用 CPU 時間禀挫,而不必把時間浪費(fèi)在等待 IO 上。
有人就會說了拓颓,既然這樣的話语婴,在上面的例子中,在發(fā)出網(wǎng)絡(luò)請求后,既然接下來的 3 秒都是在等待的砰左,在 3 秒之內(nèi)匿醒,CPU 可以處理的 task 數(shù)量遠(yuǎn)不止這些,那么豈不是我們放 10 個缠导、20 個廉羔、50 個、100 個酬核、1000 個 task 一起執(zhí)行蜜另,最后得到所有結(jié)果的耗時不都是 3 秒左右嗎适室?因為這幾個任務(wù)被掛起后都是一起等待的嫡意。
理論來說確實(shí)是這樣的,不過有個前提捣辆,那就是服務(wù)器在同一時刻接受無限次請求都能保證正常返回結(jié)果蔬螟,也就是服務(wù)器無限抗壓,另外還要忽略 IO 傳輸時延汽畴,確實(shí)可以做到無限 task 一起執(zhí)行且在預(yù)想時間內(nèi)得到結(jié)果旧巾。
我們這里將 task 數(shù)量設(shè)置成 100,再試一下:
tasks = [asyncio.ensure_future(request()) for _ in range(100)]
耗時結(jié)果如下:
Cost time: 3.106252670288086
最后運(yùn)行時間也是在 3 秒左右忍些,當(dāng)然多出來的時間就是 IO 時延了鲁猩。
可見,使用了異步協(xié)程之后罢坝,我們幾乎可以在相同的時間內(nèi)實(shí)現(xiàn)成百上千倍次的網(wǎng)絡(luò)請求廓握,把這個運(yùn)用在爬蟲中,速度提升可謂是非赤夷穑可觀了隙券。
3.6 與單進(jìn)程、多進(jìn)程對比
可能有的小伙伴非常想知道上面的例子中闹司,如果 100 次請求娱仔,不是用異步協(xié)程的話,使用單進(jìn)程和多進(jìn)程會耗費(fèi)多少時間游桩,我們來測試一下:
首先來測試一下單進(jìn)程的時間:
import requests
import time
start = time.time()
def request():
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = requests.get(url).text
print('Get response from', url, 'Result:', result)
for _ in range(100):
request()
end = time.time()
print('Cost time:', end - start)
最后耗時:
Cost time: 3.106252670288086
接下來我們使用多進(jìn)程來測試下牲迫,使用 multiprocessing 庫:
import requests
import time
import multiprocessing
start = time.time()
def request(_):
url = 'http://127.0.0.1:5000'
print('Waiting for', url)
result = requests.get(url).text
print('Get response from', url, 'Result:', result)
cpu_count = multiprocessing.cpu_count()
print('Cpu count:', cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(request, range(100))
end = time.time()
print('Cost time:', end - start)
這里我使用了multiprocessing 里面的 Pool 類,即進(jìn)程池借卧。我的電腦的 CPU 個數(shù)是 8 個盹憎,這里的進(jìn)程池的大小就是 8。
運(yùn)行時間:
Cost time: 48.17306900024414
可見 multiprocessing 相比單線程來說谓娃,還是可以大大提高效率的脚乡。
3.7 與多進(jìn)程的結(jié)合
既然異步協(xié)程和多進(jìn)程對網(wǎng)絡(luò)請求都有提升,那么為什么不把二者結(jié)合起來呢?在最新的 PyCon 2018 上奶稠,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點(diǎn)俯艰,并開發(fā)了一個新的庫,叫做 aiomultiprocess锌订,感興趣的可以了解下:https://www.youtube.com/watch?v=0kXaLh8Fz3k竹握。
這個庫的安裝方式是:
pip3 install aiomultiprocess
需要 Python 3.6 及更高版本才可使用。
使用這個庫辆飘,我們可以將上面的例子改寫如下:
import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
urls = [url for _ in range(100)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
end = time.time()
print('Cost time:', end - start)
這樣就會同時使用多進(jìn)程和異步協(xié)程進(jìn)行請求啦辐,當(dāng)然最后的結(jié)果其實(shí)和異步是差不多的:
Cost time: 3.1156570434570312
因為我的測試接口的原因,最快的響應(yīng)也是 3 秒蜈项,所以這部分多余的時間基本都是 IO 傳輸時延芹关。但在真實(shí)情況下,我們在做爬取的時候遇到的情況千變?nèi)f化紧卒,一方面我們使用異步協(xié)程來防止阻塞侥衬,另一方面我們使用 multiprocessing 來利用多核成倍加速,節(jié)省時間其實(shí)還是非撑芊迹可觀的轴总。
以上便是 Python 中協(xié)程的基本用法,希望對大家有幫助博个。
來源:崔神的公眾號 "進(jìn)擊的Coder"