一種解決IO問題的方法是異步IO。當代碼需要執(zhí)行一個耗時的IO操作時弦疮,它只發(fā)出IO指令,并不等待IO結(jié)果蜘醋,然后就去執(zhí)行其他代碼了胁塞。一段時間后,當IO返回結(jié)果時压语,再通知CPU進行處理
消息模型是如何解決同步IO必須等待IO操作這一問題的呢啸罢?當遇到IO操作時,代碼只負責發(fā)出IO請求胎食,不等待IO結(jié)果扰才,然后直接結(jié)束本輪消息處理,進入下一輪消息處理過程厕怜。當IO操作完成后衩匣,將收到一條“IO完成”的消息,處理該消息時就可以直接獲取IO操作結(jié)果粥航。
在“發(fā)出IO請求”到收到“IO完成”的這段時間里琅捏,同步IO模型下,主線程只能掛起递雀,但異步IO模型下柄延,主線程并沒有休息,而是在消息循環(huán)中繼續(xù)處理其他消息映之。這樣拦焚,在異步IO模型下,一個線程就可以同時處理多個IO請求杠输,并且沒有切換線程的操作赎败。對于大多數(shù)IO密集型的應用程序,使用異步IO將大大提升系統(tǒng)的多任務處理能力蠢甲。
異步IO模型需要一個消息循環(huán)僵刮,在消息循環(huán)中,主線程不斷地重復“讀取消息-處理消息”這一過程:
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
協(xié)程
協(xié)程,又稱微線程搞糕,纖程勇吊。英文名Coroutine
子程序,或者稱為函數(shù)窍仰,在所有語言中都是層級調(diào)用汉规,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C驹吮,C執(zhí)行完畢返回针史,B執(zhí)行完畢返回,最后是A執(zhí)行完畢碟狞。
所以子程序調(diào)用是通過棧實現(xiàn)的啄枕,一個線程就是執(zhí)行一個子程序。
子程序調(diào)用總是一個入口族沃,一次返回频祝,調(diào)用順序是明確的。
協(xié)程看上去也是子程序脆淹,但執(zhí)行過程中常空,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序未辆,在適當?shù)臅r候再返回來接著執(zhí)行窟绷。
在一個子程序中中斷锯玛,去執(zhí)行其他子程序咐柜,不是函數(shù)調(diào)用,有點類似CPU的中斷攘残。
子程序就是協(xié)程的一種特例
和多線程比拙友,協(xié)程的優(yōu)勢在于:
- 協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換歼郭,而是由程序自身控制遗契,因此,沒有線程切換的開銷病曾,和多線程比牍蜂,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯
- 不需要多線程的鎖機制泰涂,因為只有一個線程鲫竞,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖逼蒙,只需要判斷狀態(tài)就好了从绘,所以執(zhí)行效率比多線程高很多
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進程+協(xié)程僵井,既充分利用多核陕截,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能批什。
Python 對協(xié)程的支持是通過 generator
實現(xiàn)的:
在 generator
中农曲,我們不但可以通過 for
循環(huán)來迭代,還可以不斷調(diào)用 next()
函數(shù)獲取由 yield
語句返回的下一個值驻债。
但是 Python 的 yield
不但可以返回一個值朋蔫,它還可以接收調(diào)用者發(fā)出的參數(shù)
傳統(tǒng)的生產(chǎn)者-消費者模型改用協(xié)程:
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
# 調(diào)用c.send(None)啟動生成器
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
注意到 consumer
函數(shù)是一個 generator
,把一個 consumer
傳入 produce
后:
- 首先調(diào)用
c.send(None)
啟動生成器却汉; - 然后驯妄,一旦生產(chǎn)了東西,通過
c.send(n)
切換到consumer
執(zhí)行合砂; -
consumer
通過yield
拿到消息青扔,處理,又通過yield
把結(jié)果傳回翩伪; -
produce
拿到consumer
處理的結(jié)果微猖,繼續(xù)生產(chǎn)下一條消息; -
produce
決定不生產(chǎn)了缘屹,通過c.close()
關閉consumer
凛剥,整個過程結(jié)束。
asyncio
asyncio
的編程模型就是一個消息循環(huán)轻姿。我們從 asyncio
模塊中直接獲取一個 EventLoop
的引用犁珠,然后把需要執(zhí)行的協(xié)程扔到 EventLoop
中執(zhí)行,就實現(xiàn)了異步IO
import threading
import asyncio
# @asyncio.coroutine 把一個 generator 標記為 coroutine 類型
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
# 異步調(diào)用asyncio.sleep(1)
# yield from 語法可以讓我們方便地調(diào)用另一個 generator
# 由于 asyncio.sleep() 也是一個 coroutine 互亮,所以線程不會等待 asyncio.sleep() 犁享,而是直接中斷并執(zhí)行下一個消息循環(huán)
# 當 asyncio.sleep() 返回時,線程就可以從 yield from 拿到返回值(此處是None)豹休,然后接著執(zhí)行下一行語句
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
# 獲取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 把這個 coroutine 扔到 EventLoop 中執(zhí)行
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
用 asyncio
的異步網(wǎng)絡連接來獲取sina炊昆、sohu和163的網(wǎng)站首頁:
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
async/await
用 asyncio
提供的 @asyncio.coroutine
可以把一個 generator
標記為 coroutine
類型,然后在 coroutine
內(nèi)部用 yield from
調(diào)用另一個 coroutine
實現(xiàn)異步操作
為了簡化并更好地標識異步IO威根,從Python 3.5開始引入了新的語法 async
和 await
凤巨,可以讓 coroutine
的代碼更簡潔易讀
要使用新的語法,只需要做兩步簡單的替換:
- 把
@asyncio.coroutine
替換為async
- 把
yield from
替換為await
對比一下上一節(jié)的代碼:
@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")
用新語法重新編寫如下:
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
aiohttp
asyncio可以實現(xiàn)單線程并發(fā)IO操作
如果把asyncio用在服務器端洛搀,例如Web服務器敢茁,由于HTTP連接就是IO操作,因此可以用單線程+coroutine實現(xiàn)多用戶的高并發(fā)支持
asyncio實現(xiàn)了TCP姥卢、UDP卷要、SSL等協(xié)議渣聚,aiohttp則是基于asyncio實現(xiàn)的HTTP框架
$ pip install aiohttp