Python asyncio 異步編程(三)

asyncio 處理網(wǎng)絡(luò)連接數(shù)據(jù)流

下面是處理網(wǎng)絡(luò)連接的示例代碼,連接三個網(wǎng)站瘪贱,發(fā)送消息流靡砌,接收數(shù)據(jù)流觅闽。三個協(xié)程由一個線程并發(fā)完成:

# File Name: asyncio_stream.py

import asyncio

async def wget(host):
    print('wget {}'.format(host))
    # 創(chuàng)建 TCP 客戶端并連接服務(wù)器,或者說創(chuàng)建一個 TCP 連接對象
    # open_connection 接收兩個參數(shù):主機和端口號
    # connect 是協(xié)程聋伦,這步僅是創(chuàng)建協(xié)程對象夫偶,立即返回,不阻塞
    connect = asyncio.open_connection(host, 80)
    # await 運行協(xié)程連接服務(wù)器觉增,這步是阻塞操作兵拢,釋放 CPU
    # 連接創(chuàng)建成功后,asyncio.open_connection 方法的返回值就是讀寫對象
    # 讀寫對象分別為 StreamReader 和 StreamWriter 實例
    # 它們也是協(xié)程對象逾礁,底層調(diào)用 socket 模塊的 send 和 recv 方法實現(xiàn)讀寫
    reader, writer = await connect
    # header 是發(fā)送給服務(wù)器的消息说铃,意為獲取頁面的 header 信息
    # 這個格式是固定的,見下圖
    header = 'GET / HTTP/1.0\r\nHost: {}\r\n\r\n'.format(host)
    # 給服務(wù)器發(fā)消息嘹履,注意消息是二進制的
    writer.write(header.encode())
    # 這是一個與底層 IO 輸入緩沖區(qū)交互的流量控制方法
    # 當(dāng)緩沖區(qū)達(dá)到上限時腻扇,drain() 阻塞,待到緩沖區(qū)回落到下限時砾嫉,寫操作恢復(fù)
    # 當(dāng)不需要等待時幼苛,drain() 會立即返回,例如上面的消息內(nèi)容較少焰枢,不會阻塞
    # 這就是一個控制消息的數(shù)據(jù)量的控制閥
    await writer.drain()
    # 給服務(wù)器發(fā)送消息后蚓峦,就等著讀取服務(wù)器返回來的消息
    while True:
        # 讀取數(shù)據(jù)是阻塞操作,釋放 CPU
        # reader 相當(dāng)于一個水盆济锄,服務(wù)器發(fā)來的數(shù)據(jù)是水流
        # readline 表示讀取一行暑椰,以 \n 作為換行符
        # 如果在出現(xiàn) \n 之前,數(shù)據(jù)流中出現(xiàn) EOF(End Of File 文件結(jié)束符)也會返回
        # 相當(dāng)于出現(xiàn) \n 或 EOF 時荐绝,擰上水龍頭一汽,line 就是這盆水
        line = await reader.readline()
        # 數(shù)據(jù)接收完畢,會返回空字符串 \r\n 低滩,退出 while 循環(huán)召夹,結(jié)束數(shù)據(jù)接收
        if line.decode() == '\r\n':
            break
        # 接收的數(shù)據(jù)是二進制數(shù)據(jù),轉(zhuǎn)換為 UTF-8 格式并打印
        # rstrip 方法刪掉字符串的結(jié)尾處的空白字符恕沫,也就是 \n
        print('{} header > {}'.format(host, line.decode().rstrip()))
    writer.close()   # 關(guān)閉數(shù)據(jù)流监憎,可以省略

host_list = ['www.shiyanlou.com', 'www.sohu.com', 't.tt']   # 主機列表
loop = asyncio.get_event_loop()                             # 事件循環(huán)
tasks = asyncio.wait([wget(host) for host in host_list])    # 任務(wù)收集器
loop.run_until_complete(tasks)                              # 阻塞運行任務(wù)
loop.close()                                                # 關(guān)閉事件循環(huán)

程序運行結(jié)果:

$ python3 asyncio_stream.py
wget t.tt
wget www.shiyanlou.com
wget www.sohu.com
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html;charset=UTF-8
... ...
www.sohu.com header > FSS-Cache: HIT from 3354153.4206131.5193782
www.sohu.com header > FSS-Proxy: Powered by 3550764.4599350.5390396
www.shiyanlou.com header > HTTP/1.1 301 Moved Permanently
www.shiyanlou.com header > Server: nginx/1.14.2
... ...
www.shiyanlou.com header > Connection: close
www.shiyanlou.com header > Location: https://www.shiyanlou.com/
t.tt header > HTTP/1.1 301 Moved Permanently
t.tt header > Date: Wed, 22 May 2019 08:26:32 GMT
... ...
t.tt header > Server: ARTWS/1.0
t.tt header > X-XSS-Protection: 1;mode=block
HTTP 協(xié)議 GET 請求格式

使用 async for 優(yōu)化讀取信息的代碼:

import asyncio

async def wget(host):
    print('wget {}'.format(host))
    connect = asyncio.open_connection(host, 80)
    reader, writer = await connect
    header = 'GET / HTTP/1.0\r\nHost: {}\r\n\r\n'.format(host)
    writer.write(header.encode())
    await writer.drain()
    # 給服務(wù)器發(fā)送消息后,就等著讀取服務(wù)器返回來的消息
    # reader 對象較為特殊婶溯,它有 __aiter__ 和 __anext__ 方法
    # 這種對象不是 Iterable 對象(但仍然是可迭代對象)鲸阔,只能使用 async for 循環(huán)
    # __anext__ 方法的返回值會賦值給 line 變量
    # 整個循環(huán)其實是阻塞的偷霉,因為 __anext__ 方法里有 yield from 語句
    async for line in reader:
        print('{} header > {}'.format(host,
            line.decode('unicode_escape').rstrip()))

def main():
    host_list = ['www.shiyanlou.com', 'www.zhihu.com', 't.tt']  # 主機列表
    loop = asyncio.get_event_loop()                             # 事件循環(huán)
    tasks = asyncio.wait([wget(host) for host in host_list])    # 任務(wù)收集器
    loop.run_until_complete(tasks) 
    loop.close

if __name__ == '__main__':
    main()

asyncio.as_completed 方法即時獲取任務(wù)結(jié)果

import asyncio

async def wget(host):
    print('wget {}'.format(host))
    connect = asyncio.open_connection(host, 80)
    reader, writer = await connect
    header = 'GET / HTTP/1.0\r\nHost: {}\r\n\r\n'.format(host)
    writer.write(header.encode())
    await writer.drain()
    async for line in reader:
        print('{} header > {}'.format(host,
            line.decode('unicode_escape').rstrip()))
    return 'Host: {}'.format(host)

def main():
    '''
    host_list = ['www.shiyanlou.com', 'www.zhihu.com', 't.tt']  # 主機列表
    loop = asyncio.get_event_loop()                             # 事件循環(huán)
    coroutines = [wget(host) for host in host_list]             # 協(xié)程列表
    tasks = asyncio.wait(coroutines)                            # 任務(wù)收集器
    # 之前的文檔中講到過 asyncio.Task.all_tasks 方法可以獲得事件循環(huán)中的任務(wù)集合
    # 事件循環(huán)的 run_until_complete 方法的返回值是二元元組
    # 元組的第一個元素也是任務(wù)集合
    # 任務(wù)本身是一個協(xié)程函數(shù),函數(shù)的 return 值可以通過任務(wù)的 result 方法獲得
    result = loop.run_until_complete(tasks)
    print(result)
    for task in result[0]:
        print(task.result())
    '''
    # 任務(wù)在結(jié)束時才會產(chǎn)生 result 值
    # 上面的寫法只能等事件循環(huán)停止后一并獲取全部任務(wù)的 result 值
    # 如果要隨時獲得任務(wù)的 result 值褐筛,可以使用 asyncio.as_completed 方法
    # 這樣的話需要創(chuàng)建一個主任務(wù)并加入到事件循環(huán)类少,事件循環(huán)首先運行主任務(wù)
    # 在主任務(wù)中使用 asyncio.ensure_future 方法創(chuàng)建新的子任務(wù)
    # 這些子任務(wù)會自動加入到事件循環(huán)
    # 隨后在主任務(wù)中使用 asyncio.as_completed 方法獲取已經(jīng)完成的任務(wù)
    async def main_task():
        tasks = []
        host_list = ['www.shiyanlou.com', 'www.zhihu.com', 't.tt']
        for host in host_list:
            tasks.append(asyncio.ensure_future(wget(host)))
        # 這里為什么不使用 asyncio.Task.all_tasks 方法獲取任務(wù)集合呢?
        # 像這樣:asyncio.as_completed(asyncio.Task.all_tasks())
        # 因為任務(wù)集合中包含主任務(wù)和子任務(wù)渔扎,雖然二者在事件循環(huán)中是并列關(guān)系
        # 但是 for 循環(huán)會阻塞在這里硫狞,主任務(wù)永遠(yuǎn)完不成
        for task in asyncio.as_completed(tasks):
            print(await task)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_task())
    loop.close()

if __name__ == '__main__':
    main()

運行結(jié)果:

$ python3 a.py
wget www.shiyanlou.com
wget www.zhihu.com
wget t.tt
www.shiyanlou.com header > HTTP/1.1 301 Moved Permanently
www.shiyanlou.com header > Server: nginx/1.14.2
... ...
www.shiyanlou.com header > </body>
www.shiyanlou.com header > </html>
Host: www.shiyanlou.com
t.tt header > HTTP/1.1 301 Moved Permanently
t.tt header > Date: Sat, 25 May 2019 06:33:13 GMT
... ...
t.tt header > </body>
t.tt header > </html>
Host: t.tt
www.zhihu.com header > HTTP/1.1 301 Moved Permanently
www.zhihu.com header > Date: Sat, 25 May 2019 06:33:13 GMT
... ...
www.zhihu.com header > </body>
www.zhihu.com header > </html>
Host: www.zhihu.com
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市晃痴,隨后出現(xiàn)的幾起案子残吩,更是在濱河造成了極大的恐慌,老刑警劉巖愧旦,帶你破解...
    沈念sama閱讀 222,807評論 6 518
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件世剖,死亡現(xiàn)場離奇詭異,居然都是意外死亡笤虫,警方通過查閱死者的電腦和手機旁瘫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,284評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來琼蚯,“玉大人酬凳,你說我怎么就攤上這事≡馐” “怎么了宁仔?”我有些...
    開封第一講書人閱讀 169,589評論 0 363
  • 文/不壞的土叔 我叫張陵,是天一觀的道長峦睡。 經(jīng)常有香客問我翎苫,道長,這世上最難降的妖魔是什么榨了? 我笑而不...
    開封第一講書人閱讀 60,188評論 1 300
  • 正文 為了忘掉前任煎谍,我火速辦了婚禮,結(jié)果婚禮上龙屉,老公的妹妹穿的比我還像新娘呐粘。我一直安慰自己,他們只是感情好转捕,可當(dāng)我...
    茶點故事閱讀 69,185評論 6 398
  • 文/花漫 我一把揭開白布作岖。 她就那樣靜靜地躺著,像睡著了一般五芝。 火紅的嫁衣襯著肌膚如雪痘儡。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,785評論 1 314
  • 那天枢步,我揣著相機與錄音沉删,去河邊找鬼蓄坏。 笑死,一個胖子當(dāng)著我的面吹牛丑念,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播结蟋,決...
    沈念sama閱讀 41,220評論 3 423
  • 文/蒼蘭香墨 我猛地睜開眼脯倚,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了嵌屎?” 一聲冷哼從身側(cè)響起推正,我...
    開封第一講書人閱讀 40,167評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宝惰,沒想到半個月后植榕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,698評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡尼夺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,767評論 3 343
  • 正文 我和宋清朗相戀三年尊残,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片淤堵。...
    茶點故事閱讀 40,912評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡寝衫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拐邪,到底是詐尸還是另有隱情慰毅,我是刑警寧澤,帶...
    沈念sama閱讀 36,572評論 5 351
  • 正文 年R本政府宣布扎阶,位于F島的核電站汹胃,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏东臀。R本人自食惡果不足惜着饥,卻給世界環(huán)境...
    茶點故事閱讀 42,254評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望啡邑。 院中可真熱鬧贱勃,春花似錦、人聲如沸谤逼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,746評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽流部。三九已至戚绕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間枝冀,已是汗流浹背舞丛。 一陣腳步聲響...
    開封第一講書人閱讀 33,859評論 1 274
  • 我被黑心中介騙來泰國打工耘子, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人球切。 一個月前我還...
    沈念sama閱讀 49,359評論 3 379
  • 正文 我出身青樓谷誓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親吨凑。 傳聞我的和親對象是個殘疾皇子捍歪,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,922評論 2 361