異步IO
CPU的速度遠遠快于磁盤雀扶、網(wǎng)絡(luò)等IO杖小。在一個線程中,CPU執(zhí)行代碼的速度極快愚墓,然而予权,一旦遇到IO操作,如讀寫文件浪册、發(fā)送網(wǎng)絡(luò)數(shù)據(jù)時扫腺,就需要等待IO操作完成,才能繼續(xù)進行下一步操作村象。這種情況稱為同步IO笆环。
在IO操作的過程中,當前線程被掛起厚者,而其他需要CPU執(zhí)行的代碼就無法被當前線程執(zhí)行了躁劣。
因為一個IO操作就阻塞了當前線程,導(dǎo)致其他代碼無法執(zhí)行库菲,所以我們必須使用多線程或者多進程來并發(fā)執(zhí)行代碼账忘,為多個用戶服務(wù)。每個用戶都會分配一個線程熙宇,如果遇到IO導(dǎo)致線程被掛起鳖擒,其他用戶的線程不受影響。
多線程和多進程的模型雖然解決了并發(fā)問題奇颠,但是系統(tǒng)不能無上限地增加線程败去。由于系統(tǒng)切換線程的開銷也很大,所以烈拒,一旦線程數(shù)量過多圆裕,CPU的時間就花在線程切換上了,真正運行代碼的時間就少了荆几,結(jié)果導(dǎo)致性能嚴重下降吓妆。
由于我們要解決的問題是CPU高速執(zhí)行能力和IO設(shè)備的龜速嚴重不匹配,多線程和多進程只是解決這一問題的一種方法吨铸。
另一種解決IO問題的方法是異步IO行拢。當代碼需要執(zhí)行一個耗時的IO操作時,它只發(fā)出IO指令诞吱,并不等待IO結(jié)果舟奠,然后就去執(zhí)行其他代碼了竭缝。一段時間后,當IO返回結(jié)果時沼瘫,再通知CPU進行處理抬纸。
可以想象如果按普通順序?qū)懗龅拇a實際上是沒法完成異步IO的:
do_some_code()
f = open('/path/to/file', 'r')
r = f.read() # <== 線程停在此處等待IO操作結(jié)果
# IO操作完成后線程才能繼續(xù)執(zhí)行:
do_some_code(r)
所以,同步IO模型的代碼是無法實現(xiàn)異步IO模型的耿戚。
異步IO模型需要一個消息循環(huán)湿故,在消息循環(huán)中,主線程不斷地重復(fù)“讀取消息-處理消息”這一過程:
loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)
消息模型其實早在應(yīng)用在桌面應(yīng)用程序中了膜蛔。一個GUI程序的主線程就負責不停地讀取消息并處理消息坛猪。所有的鍵盤、鼠標等消息都被發(fā)送到GUI程序的消息隊列中皂股,然后由GUI程序的主線程處理墅茉。
由于GUI線程處理鍵盤、鼠標等消息的速度非承寄快躁锁,所以用戶感覺不到延遲。某些時候卵史,GUI線程在一個消息處理的過程中遇到問題導(dǎo)致一次消息處理時間過長,此時搜立,用戶會感覺到整個GUI程序停止響應(yīng)了以躯,敲鍵盤、點鼠標都沒有反應(yīng)啄踊。這種情況說明在消息模型中忧设,處理一個消息必須非常迅速,否則颠通,主線程將無法及時處理消息隊列中的其他消息址晕,導(dǎo)致程序看上去停止響應(yīng)。
消息模型是如何解決同步IO必須等待IO操作這一問題的呢顿锰?當遇到IO操作時谨垃,代碼只負責發(fā)出IO請求,不等待IO結(jié)果硼控,然后直接結(jié)束本輪消息處理刘陶,進入下一輪消息處理過程。當IO操作完成后牢撼,將收到一條“IO完成”的消息匙隔,處理該消息時就可以直接獲取IO操作結(jié)果。
在“發(fā)出IO請求”到收到“IO完成”的這段時間里熏版,同步IO模型下纷责,主線程只能掛起捍掺,但異步IO模型下,主線程并沒有休息再膳,而是在消息循環(huán)中繼續(xù)處理其他消息乡小。這樣,在異步IO模型下饵史,一個線程就可以同時處理多個IO請求满钟,并且沒有切換線程的操作。對于大多數(shù)IO密集型的應(yīng)用程序胳喷,使用異步IO將大大提升系統(tǒng)的多任務(wù)處理能力湃番。
協(xié)程
協(xié)程,又稱微線程吭露,纖程吠撮。英文名Coroutine。
協(xié)程的概念很早就提出來了讲竿,但直到最近幾年才在某些語言(如Lua)中得到廣泛應(yīng)用泥兰。
子程序,或者稱為函數(shù)题禀,在所有語言中都是層級調(diào)用鞋诗,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C迈嘹,C執(zhí)行完畢返回削彬,B執(zhí)行完畢返回,最后是A執(zhí)行完畢秀仲。
所以子程序調(diào)用是通過棧實現(xiàn)的融痛,一個線程就是執(zhí)行一個子程序。
子程序調(diào)用總是一個入口神僵,一次返回雁刷,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同保礼。
協(xié)程看上去也是子程序沛励,但執(zhí)行過程中,在子程序內(nèi)部可中斷氓英,然后轉(zhuǎn)而執(zhí)行別的子程序侯勉,在適當?shù)臅r候再返回來接著執(zhí)行。
注意铝阐,在一個子程序中中斷址貌,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷练对。比如子程序A遍蟋、B:
def A():
print('1')
print('2')
print('3')
def B():
print('x')
print('y')
print('z')
假設(shè)由協(xié)程執(zhí)行,在執(zhí)行A的過程中螟凭,可以隨時中斷虚青,去執(zhí)行B,B也可能在執(zhí)行過程中中斷再去執(zhí)行A螺男,結(jié)果可能是:
1
2
x
y
3
z
但是在A中是沒有調(diào)用B的棒厘,所以協(xié)程的調(diào)用比函數(shù)調(diào)用理解起來要難一些。
看起來A下隧、B的執(zhí)行有點像多線程奢人,但協(xié)程的特點在于是一個線程執(zhí)行,那和多線程比淆院,協(xié)程有何優(yōu)勢何乎?
最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換土辩,而是由程序自身控制支救,因此,沒有線程切換的開銷拷淘,和多線程比各墨,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯辕棚。
第二大優(yōu)勢就是不需要多線程的鎖機制欲主,因為只有一個線程,也不存在同時寫變量沖突逝嚎,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了详恼,所以執(zhí)行效率比多線程高很多补君。
因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢昧互?最簡單的方法是多進程+協(xié)程挽铁,既充分利用多核,又充分發(fā)揮協(xié)程的高效率敞掘,可獲得極高的性能叽掘。
Python對協(xié)程的支持是通過generator
實現(xiàn)的。
在generator中玖雁,我們不但可以通過for循環(huán)來迭代更扁,還可以不斷調(diào)用next()函數(shù)獲取由yield語句返回的下一個值。
但是Python的yield
不但可以返回一個值,它還可以接收調(diào)用者發(fā)出的參數(shù)浓镜。
來看例子:
傳統(tǒng)的生產(chǎn)者-消費者模型是一個線程寫消息溃列,一個線程取消息,通過鎖機制控制隊列和等待膛薛,但一不小心就可能死鎖听隐。
如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后哄啄,直接通過yield
跳轉(zhuǎn)到消費者開始執(zhí)行雅任,待消費者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn)咨跌,效率極高:
def consumer(): # 有yield的函數(shù)就是生成器沪么,沒的跑
r = 'what the fuck?'
print(r) # ??發(fā)送None時,函數(shù)從頭開始執(zhí)行的虑润,到 yield r 停止成玫,此后的send(xxx)都是從 n = yield 開始。記住拳喻,n = yield 是啟動點哭当, yield r 暫停點,并返回yield r結(jié)果給produce函數(shù)
while True:
n = yield r
# 注意冗澈,yield r 是代碼終止點钦勘,n = yield是啟動點,一個正常的循環(huán)??過程是從 n = yield開始執(zhí)行亚亲,到下面彻采,執(zhí)行到r ='200k'后,再回到 yield r處暫停捌归,此時暫停的yield r 應(yīng)該是經(jīng)過新的循環(huán)肛响,這里沒有 for in 函數(shù),但是惜索,r最新的200k就是它的新循環(huán)特笋,所以此時yield r為200k時生成器程序也就是consumer停止,但是巾兆,新的yield r 要回給send猎物,send發(fā)送消息,也會要求得到消息的結(jié)果
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None) #
n = 0 # 沒有上面的c.send角塑,系統(tǒng)報錯can't send non-None value to a just-started generator
while n < 5: #
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
執(zhí)行結(jié)果:
what the fuck? # 它只會出現(xiàn)一次
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer
函數(shù)是一個generator
蔫磨,把一個consumer
傳入produce
后:
首先調(diào)用c.send(None)
啟動生成器;
然后圃伶,一旦生產(chǎn)了東西堤如,通過c.send(n)
切換到consumer
執(zhí)行蒲列;
consumer
通過yield
拿到消息,處理煤惩,又通過yield
把結(jié)果傳回嫉嘀;
produce
拿到consumer
處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息魄揉;
produce
決定不生產(chǎn)了剪侮,通過c.close()
關(guān)閉consumer
,整個過程結(jié)束洛退。
send(vlaue)
發(fā)送value瓣俯,也會要求接收value的yield把最新的值再返回給它,通俗說兵怯,給 你一個消息彩匕,你得給我一個結(jié)果,這樣才公平
整個流程無鎖媒区,由一個線程執(zhí)行驼仪,produce
和consumer
協(xié)作完成任務(wù),所以稱為“協(xié)程”袜漩,而非線程的搶占式多任務(wù)绪爸。
最后套用Donald Knuth的一句話總結(jié)協(xié)程的特點:
“子程序就是協(xié)程的一種特例≈婀ィ”
asyncio
asyncio
是Python 3.4版本引入的標準庫奠货,直接內(nèi)置了對異步IO的支持。
asyncio
的編程模型就是一個消息循環(huán)座掘。我們從asyncio
模塊中直接獲取一個EventLoop
的引用递惋,然后把需要執(zhí)行的協(xié)程扔到EventLoop
中執(zhí)行,就實現(xiàn)了異步IO溢陪。
用asyncio
實現(xiàn)Hello world代碼如下:
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 異步調(diào)用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執(zhí)行coroutine
loop.run_until_complete(hello())
loop.close()
@asyncio.coroutine
把一個generator
標記為coroutine
類型萍虽,然后,我們就把這個coroutine
扔到EventLoop
中執(zhí)行形真。
hello()
會首先打印出Hello world!贩挣,然后,yield from
語法可以讓我們方便地調(diào)用另一個generator
没酣。由于asyncio.sleep()
也是一個coroutine
,所以線程不會等待asyncio.sleep()
卵迂,而是直接中斷并執(zhí)行下一個消息循環(huán)裕便。當asyncio.sleep()返回時,線程就可以從yield from
拿到返回值(此處是None)见咒,然后接著執(zhí)行下一行語句偿衰。
把asyncio.sleep(1)
看成是一個耗時1秒的IO操作,在此期間,主線程并未等待下翎,而是去執(zhí)行EventLoop
中其他可以執(zhí)行的coroutine
了缤言,因此可以實現(xiàn)并發(fā)執(zhí)行。
我們用Task封裝兩個coroutine
試試:
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
觀察執(zhí)行過程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由打印的當前線程名稱可以看出视事,兩個coroutine
是由同一個線程并發(fā)執(zhí)行的胆萧。
如果把asyncio.sleep()
換成真正的IO操作,則多個coroutine
就可以由一個線程并發(fā)執(zhí)行俐东。
我們用asyncio的異步網(wǎng)絡(luò)連接來獲取sina跌穗、sohu和163的網(wǎng)站首頁:
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
執(zhí)行結(jié)果如下:
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(打印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
可見3個連接由一個線程通過coroutine
并發(fā)完成。
小結(jié)
asyncio
提供了完善的異步IO支持虏辫;
異步操作需要在coroutine
中通過yield from
完成蚌吸;
多個coroutine
可以封裝成一組Task然后并發(fā)執(zhí)行。
async/await
用asyncio
提供的@asyncio.coroutine
可以把一個generator標記為coroutine
類型砌庄,然后在coroutine
內(nèi)部用yield from
調(diào)用另一個coroutine
實現(xiàn)異步操作羹唠。
為了簡化并更好地標識異步IO,從Python 3.5開始引入了新的語法async和await娄昆,可以讓coroutine的代碼更簡潔易讀佩微。
請注意,async
和await
是針對coroutine
的新語法稿黄,要使用新的語法喊衫,只需要做兩步簡單的替換:
把@asyncio.coroutine
替換為async
;
把yield from
替換為await
杆怕。
讓我們對比一下上一節(jié)的代碼:
@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")
用新語法重新編寫如下:
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
剩下的代碼保持不變族购。
aiohttp
asyncio
可以實現(xiàn)單線程并發(fā)IO操作。如果僅用在客戶端陵珍,發(fā)揮的威力不大寝杖。如果把asyncio
用在服務(wù)器端,例如Web服務(wù)器互纯,由于HTTP連接就是IO操作瑟幕,因此可以用單線程+coroutine實現(xiàn)多用戶的高并發(fā)支持。
asyncio
實現(xiàn)了TCP留潦、UDP只盹、SSL等協(xié)議,aiohttp
則是基于asyncio
實現(xiàn)的HTTP框架兔院。
我們先安裝aiohttp:
pip install aiohttp
然后編寫一個HTTP服務(wù)器殖卑,分別處理以下URL:
/
- 首頁返回b'<h1>Index</h1>'
;
/hello/{name}
- 根據(jù)URL參數(shù)返回文本hello, %s!
坊萝。
代碼如下:
import asyncio
from aiohttp import web
# 既然是異步孵稽,所有的函數(shù)就必須要變成異步非阻塞單線程方式许起,所以下列所有函數(shù)全部 async await
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index~</h1>',content_type='text/html') # AAA 參考最下方注釋
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
注意aiohttp
的初始化函數(shù)init()
也是一個coroutine
,loop.create_server()
則利用asyncio
創(chuàng)建TCP服務(wù)菩鲜。
AAA處园细,原理代碼是 return web.Response(body=b'<h1>Index</h1>')
,程序中的AAA處代碼為新代碼接校,沒有后面的一串代碼猛频,打開網(wǎng)頁就會進入到下載界面,而無法顯示網(wǎng)頁