asyncio 處理網(wǎng)絡(luò)連接數(shù)據(jù)流
下面是處理網(wǎng)絡(luò)連接的示例代碼,連接三個網(wǎng)站瘪贱,發(fā)送消息流靡砌,接收數(shù)據(jù)流觅闽。三個協(xié)程由一個線程并發(fā)完成:
# File Name: asyncio_stream.py
import asyncio
async def wget(host):
print('wget {}'.format(host))
# 創(chuàng)建 TCP 客戶端并連接服務(wù)器,或者說創(chuàng)建一個 TCP 連接對象
# open_connection 接收兩個參數(shù):主機和端口號
# connect 是協(xié)程聋伦,這步僅是創(chuàng)建協(xié)程對象夫偶,立即返回,不阻塞
connect = asyncio.open_connection(host, 80)
# await 運行協(xié)程連接服務(wù)器觉增,這步是阻塞操作兵拢,釋放 CPU
# 連接創(chuàng)建成功后,asyncio.open_connection 方法的返回值就是讀寫對象
# 讀寫對象分別為 StreamReader 和 StreamWriter 實例
# 它們也是協(xié)程對象逾礁,底層調(diào)用 socket 模塊的 send 和 recv 方法實現(xiàn)讀寫
reader, writer = await connect
# header 是發(fā)送給服務(wù)器的消息说铃,意為獲取頁面的 header 信息
# 這個格式是固定的,見下圖
header = 'GET / HTTP/1.0\r\nHost: {}\r\n\r\n'.format(host)
# 給服務(wù)器發(fā)消息嘹履,注意消息是二進制的
writer.write(header.encode())
# 這是一個與底層 IO 輸入緩沖區(qū)交互的流量控制方法
# 當(dāng)緩沖區(qū)達(dá)到上限時腻扇,drain() 阻塞,待到緩沖區(qū)回落到下限時砾嫉,寫操作恢復(fù)
# 當(dāng)不需要等待時幼苛,drain() 會立即返回,例如上面的消息內(nèi)容較少焰枢,不會阻塞
# 這就是一個控制消息的數(shù)據(jù)量的控制閥
await writer.drain()
# 給服務(wù)器發(fā)送消息后蚓峦,就等著讀取服務(wù)器返回來的消息
while True:
# 讀取數(shù)據(jù)是阻塞操作,釋放 CPU
# reader 相當(dāng)于一個水盆济锄,服務(wù)器發(fā)來的數(shù)據(jù)是水流
# readline 表示讀取一行暑椰,以 \n 作為換行符
# 如果在出現(xiàn) \n 之前,數(shù)據(jù)流中出現(xiàn) EOF(End Of File 文件結(jié)束符)也會返回
# 相當(dāng)于出現(xiàn) \n 或 EOF 時荐绝,擰上水龍頭一汽,line 就是這盆水
line = await reader.readline()
# 數(shù)據(jù)接收完畢,會返回空字符串 \r\n 低滩,退出 while 循環(huán)召夹,結(jié)束數(shù)據(jù)接收
if line.decode() == '\r\n':
break
# 接收的數(shù)據(jù)是二進制數(shù)據(jù),轉(zhuǎn)換為 UTF-8 格式并打印
# rstrip 方法刪掉字符串的結(jié)尾處的空白字符恕沫,也就是 \n
print('{} header > {}'.format(host, line.decode().rstrip()))
writer.close() # 關(guān)閉數(shù)據(jù)流监憎,可以省略
host_list = ['www.shiyanlou.com', 'www.sohu.com', 't.tt'] # 主機列表
loop = asyncio.get_event_loop() # 事件循環(huán)
tasks = asyncio.wait([wget(host) for host in host_list]) # 任務(wù)收集器
loop.run_until_complete(tasks) # 阻塞運行任務(wù)
loop.close() # 關(guān)閉事件循環(huán)
程序運行結(jié)果:
$ python3 asyncio_stream.py
wget t.tt
wget www.shiyanlou.com
wget www.sohu.com
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html;charset=UTF-8
... ...
www.sohu.com header > FSS-Cache: HIT from 3354153.4206131.5193782
www.sohu.com header > FSS-Proxy: Powered by 3550764.4599350.5390396
www.shiyanlou.com header > HTTP/1.1 301 Moved Permanently
www.shiyanlou.com header > Server: nginx/1.14.2
... ...
www.shiyanlou.com header > Connection: close
www.shiyanlou.com header > Location: https://www.shiyanlou.com/
t.tt header > HTTP/1.1 301 Moved Permanently
t.tt header > Date: Wed, 22 May 2019 08:26:32 GMT
... ...
t.tt header > Server: ARTWS/1.0
t.tt header > X-XSS-Protection: 1;mode=block
HTTP 協(xié)議 GET 請求格式
使用 async for
優(yōu)化讀取信息的代碼:
import asyncio
async def wget(host):
print('wget {}'.format(host))
connect = asyncio.open_connection(host, 80)
reader, writer = await connect
header = 'GET / HTTP/1.0\r\nHost: {}\r\n\r\n'.format(host)
writer.write(header.encode())
await writer.drain()
# 給服務(wù)器發(fā)送消息后,就等著讀取服務(wù)器返回來的消息
# reader 對象較為特殊婶溯,它有 __aiter__ 和 __anext__ 方法
# 這種對象不是 Iterable 對象(但仍然是可迭代對象)鲸阔,只能使用 async for 循環(huán)
# __anext__ 方法的返回值會賦值給 line 變量
# 整個循環(huán)其實是阻塞的偷霉,因為 __anext__ 方法里有 yield from 語句
async for line in reader:
print('{} header > {}'.format(host,
line.decode('unicode_escape').rstrip()))
def main():
host_list = ['www.shiyanlou.com', 'www.zhihu.com', 't.tt'] # 主機列表
loop = asyncio.get_event_loop() # 事件循環(huán)
tasks = asyncio.wait([wget(host) for host in host_list]) # 任務(wù)收集器
loop.run_until_complete(tasks)
loop.close
if __name__ == '__main__':
main()
asyncio.as_completed
方法即時獲取任務(wù)結(jié)果
import asyncio
async def wget(host):
print('wget {}'.format(host))
connect = asyncio.open_connection(host, 80)
reader, writer = await connect
header = 'GET / HTTP/1.0\r\nHost: {}\r\n\r\n'.format(host)
writer.write(header.encode())
await writer.drain()
async for line in reader:
print('{} header > {}'.format(host,
line.decode('unicode_escape').rstrip()))
return 'Host: {}'.format(host)
def main():
'''
host_list = ['www.shiyanlou.com', 'www.zhihu.com', 't.tt'] # 主機列表
loop = asyncio.get_event_loop() # 事件循環(huán)
coroutines = [wget(host) for host in host_list] # 協(xié)程列表
tasks = asyncio.wait(coroutines) # 任務(wù)收集器
# 之前的文檔中講到過 asyncio.Task.all_tasks 方法可以獲得事件循環(huán)中的任務(wù)集合
# 事件循環(huán)的 run_until_complete 方法的返回值是二元元組
# 元組的第一個元素也是任務(wù)集合
# 任務(wù)本身是一個協(xié)程函數(shù),函數(shù)的 return 值可以通過任務(wù)的 result 方法獲得
result = loop.run_until_complete(tasks)
print(result)
for task in result[0]:
print(task.result())
'''
# 任務(wù)在結(jié)束時才會產(chǎn)生 result 值
# 上面的寫法只能等事件循環(huán)停止后一并獲取全部任務(wù)的 result 值
# 如果要隨時獲得任務(wù)的 result 值褐筛,可以使用 asyncio.as_completed 方法
# 這樣的話需要創(chuàng)建一個主任務(wù)并加入到事件循環(huán)类少,事件循環(huán)首先運行主任務(wù)
# 在主任務(wù)中使用 asyncio.ensure_future 方法創(chuàng)建新的子任務(wù)
# 這些子任務(wù)會自動加入到事件循環(huán)
# 隨后在主任務(wù)中使用 asyncio.as_completed 方法獲取已經(jīng)完成的任務(wù)
async def main_task():
tasks = []
host_list = ['www.shiyanlou.com', 'www.zhihu.com', 't.tt']
for host in host_list:
tasks.append(asyncio.ensure_future(wget(host)))
# 這里為什么不使用 asyncio.Task.all_tasks 方法獲取任務(wù)集合呢?
# 像這樣:asyncio.as_completed(asyncio.Task.all_tasks())
# 因為任務(wù)集合中包含主任務(wù)和子任務(wù)渔扎,雖然二者在事件循環(huán)中是并列關(guān)系
# 但是 for 循環(huán)會阻塞在這里硫狞,主任務(wù)永遠(yuǎn)完不成
for task in asyncio.as_completed(tasks):
print(await task)
loop = asyncio.get_event_loop()
loop.run_until_complete(main_task())
loop.close()
if __name__ == '__main__':
main()
運行結(jié)果:
$ python3 a.py
wget www.shiyanlou.com
wget www.zhihu.com
wget t.tt
www.shiyanlou.com header > HTTP/1.1 301 Moved Permanently
www.shiyanlou.com header > Server: nginx/1.14.2
... ...
www.shiyanlou.com header > </body>
www.shiyanlou.com header > </html>
Host: www.shiyanlou.com
t.tt header > HTTP/1.1 301 Moved Permanently
t.tt header > Date: Sat, 25 May 2019 06:33:13 GMT
... ...
t.tt header > </body>
t.tt header > </html>
Host: t.tt
www.zhihu.com header > HTTP/1.1 301 Moved Permanently
www.zhihu.com header > Date: Sat, 25 May 2019 06:33:13 GMT
... ...
www.zhihu.com header > </body>
www.zhihu.com header > </html>
Host: www.zhihu.com