python異步編程小抄

基礎

異步調用的原理,是在單個線程中通過切換任務(就像單線程的函數(shù)切換寺惫,負擔很小,性能很好)來達到并發(fā)的效果。相較于線程這種比較重的并發(fā)方式垒酬,異步調用可以大大提升I/O密集型任務的執(zhí)行效率,達到非常高的并發(fā)量件炉。python中異步調用的最小單位是協(xié)程,隨著異步編程在python3各個版本中不斷的迭代矮湘,使用異步編程變得越來越簡單斟冕,因此,我們有必要好好掌握python中異步編程相關的知識缅阳。

兩個關鍵字

async:用來聲明一個協(xié)程函數(shù)(async def)磕蛇,與普通函數(shù)不同的是景描,調用協(xié)程函數(shù)并不會返回其運行的結果,而是返回一個協(xié)程對象(coroutine)秀撇。協(xié)程對象需要注冊到事件循環(huán)(event loop)中才能被調度執(zhí)行到超棺。

await:用來異步等待一個協(xié)程的返回,只能在協(xié)程函數(shù)里使用呵燕。await時意味著將函數(shù)控制權交還給event loop棠绘。舉個例子,當我們在g()函數(shù)內部遇到await f()時再扭,就會將暫時掛起g()的執(zhí)行直到 f()返回氧苍,與此同時,將CPU的執(zhí)行權讓出給event loop中的其他函數(shù)繼續(xù)執(zhí)行泛范。

awaitable:就像for關鍵字用于iterable對象让虐, await關鍵字用于awaitable對象。最常見的兩個awaitable對象就是原生的協(xié)程對象以及使用asyncio.create_task()方法創(chuàng)建的asyncio.Task對象罢荡。值得注意的是赡突,你并不總是需要await一個Task如果你不需要取消或者等待協(xié)程運行的結果的話。

例子

Fluent Python 2nd Edition(神作区赵,忍不住再次安利)這本書的示例代碼為例:

import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4 


async def probe(domain: str) -> tuple[str, bool]: 
    loop = asyncio.get_running_loop() 
    try:
        await loop.getaddrinfo(domain, None) 
    except socket.gaierror:
        return (domain, False)
    return (domain, True)

async def main() -> None: 
     names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN) 
     domains = (f'{name}.dev'.lower() for name in names) 
     coros = [probe(domain) for domain in domains] 
     for coro in asyncio.as_completed(coros): 
     domain, found = await coro 
     mark = '+' if found else ' '
     print(f'{mark} {domain}')
    
    
if __name__ == '__main__':
    asyncio.run(main()) 

作者是這么描述協(xié)程的工作流程的惭缰,注意加粗部分:

Using the syntax await loop.getaddrinfo(…) avoids blocking because await suspends the current coroutine object—for example, probe('if.dev'). A new coroutine object is created, getaddrinfo('if.dev', None), it starts the low-level addrinfo query and yields control back to the event loop, which can drive other pending coroutine objects, such as probe('or.dev'). When the event loop gets a response for the getaddrinfo('if.dev', None) query, that specific coroutine object resumes and returns control back to the probe('if.dev')—which was suspended at await—and can now handle a possible exception and return the result tuple.

這里注意一下英文中SuspendPending的差異:

Suspend:一個事情已經開始了, 不過現(xiàn)在要停了(可能是暫時地)。 Classes have been suspended for the holidays.

Pending:一個事情還沒開始, 因為還在等其他東西惧笛。 This project is still pending for approval.

常用對象

Future:

Future對象是用來模仿concurrent.futures包中的Future對象的从媚,除了一小部分API有差異外,他們的API基本上兼容患整。Future對象代表一個任務的結果拜效,注意這里的結果可以是未執(zhí)行的結果或者時一個執(zhí)行異常。源代碼中是這樣描述這個對象的:

class Future(object):
    """
    This class is *almost* compatible with concurrent.futures.Future.
    
        Differences:
    
        - result() and exception() do not take a timeout argument and
          raise an exception when the future isn't done yet.
    
        - Callbacks registered with add_done_callback() are always called
          via the event loop's call_soon_threadsafe().
    
        - This class is not compatible with the wait() and as_completed()
          methods in the concurrent.futures package.
    """

Task:

一個和Future對象類似的協(xié)程對象各谚,非線程安全紧憾,查看源代碼可以看到TaskFuture的子類,因此 Future對象不一定是一個Task對象昌渤, 但Task對象一定是個Future對象赴穗。

class Task(Future):
    """ A coroutine wrapped in a Future. """

Task對象在創(chuàng)建時就會注冊到事件循環(huán)中。

EventLoop:

管理和分配不同Task的執(zhí)行膀息,Task需要注冊到EventLoo以后才能被調度執(zhí)行到般眉。你可以把它看成是某個監(jiān)控著協(xié)程空閑、可執(zhí)行等運行狀態(tài)潜支,并且能根據(jù)某個協(xié)程等待的事件變?yōu)榭蓤?zhí)行時喚醒這些空閑的協(xié)程的While True的循環(huán)甸赃。Loop是可插拔(替換)的,也就是說冗酿,你可以自己實現(xiàn)一個事件循環(huán)來代替的默認的事件循環(huán)埠对,比如Linux系統(tǒng)上非常著名的uvloop络断,使用下面代碼即可替換EventLoop實現(xiàn):

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop是基于libuv庫(nodejs)使用Cython編寫的,擁有比自帶的event loop更高的性能项玛,遺憾的是你只能在*nixpython3.5+的環(huán)境中使用它貌笨。

常用方法

asyncio.run()

在python3.7中引入的新方法,會自動創(chuàng)建event loop并且執(zhí)行run_until_complete襟沮,同時在執(zhí)行結束時會自動關閉event loop锥惋,在引入該方法之前,你可能需要使用如下代碼來執(zhí)行一個簡單的協(xié)程:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

如果你需要對未完成的任務執(zhí)行cancel()方法臣嚣,那么還需要在另外寫一些代碼來處理它們净刮。而asyncio.run()方法將這些代碼范式進行了封裝,使得調用協(xié)程變得不需要再寫這些模板式的代碼了硅则。

asyncio.gather(*aws, loop=None, return_exceptions=False)

這個方法將協(xié)程(準確的說是awaitable對象淹父,因此也可以是future對象)集合統(tǒng)一放到一個future對象里面,并且將協(xié)程的結果統(tǒng)一在一個列表中返回怎虫。如果所有可等待對象都成功完成暑认,結果將是一個由所有返回值聚合而成的列表。結果值的順序與 aws 中可等待對象的順序一致大审。

asyncio.ensure_futureasyncio.create_task

asyncio.ensure_future雖然名字里帶了future蘸际,但通常它返回的對象是一個Task對象(除非傳入的obj對象本身就是一個Future對象),這是一個有點反直覺且經常容易混淆的點徒扶,看下面的例子:

import asyncio


async def foo():
    print("before foo await")
    await asyncio.sleep(1)
    print("after foo await")
    return "foo"


async def bar():
    print("before bar await")
    await asyncio.sleep(1)
    print("after bar await")
    return "bar"


async def popo():
    print("before popo await")
    await asyncio.sleep(1)
    print("after popo await")
    return "popo"


async def set_after(fut, delay, value):
    # Sleep for *delay* seconds.
    await asyncio.sleep(delay)
    # Set *value* as a result of *fut* Future.
    fut.set_result(value)


async def main():
    print("running main")
    task1 = asyncio.create_task(foo())
    task2 = asyncio.create_task(bar())
    fut1 = asyncio.ensure_future(popo())
    loop = asyncio.get_running_loop()
    fut2 = loop.create_future()
    loop.create_task(
        set_after(fut2, 1, '... world'))
    print(isinstance(task1, asyncio.Future))
    print(isinstance(fut1, asyncio.Task))
    print(isinstance(fut2, asyncio.Task))
    print(isinstance(fut2, asyncio.Future))
    await task1
    await task2
    await fut1
    await fut2
    print("exiting main")


asyncio.run(main())

輸出如下, 注意第三行和第四行的輸出:

running main
True
True
False
True
before foo await
before bar await
before popo await
after foo await
after popo await
after bar await
exiting main

因此粮彤,python 3.7 及之后版本都推薦使用asyncio.create_task方法,這個方法限制了傳入的對象必須是一個協(xié)程對象姜骡。

asyncio.get_running_loopasyncio.get_event_loop

asyncio.get_running_loop函數(shù)是在Python 3.7中添加导坟,在協(xié)程內部使用以便獲取運行著的事件循環(huán)的函數(shù),當事件循環(huán)不存在時圈澈,這個函數(shù)可能會返回RuntimeError惫周。它的實現(xiàn)相較于asyncio.get_event_loop (可能會按需開始一個新的事件循環(huán))更加簡單和快速.

其他常用類

Asyncio.Queue

對于并發(fā)編程,經常需要使用隊列來將負載分配到多個任務上康栈,比如經典的生產者-消費者模式递递,asyncio包同樣提了Queue對象來滿足這類需求,參考官方的代碼示例:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

其中的幾個方法需要單獨說明一下:

put_nowait(item): 將item放到隊列中啥么,非阻塞性操作登舞,當隊列滿時,將拋出QueueFull的異常悬荣;

put(item):將item放到隊列中逊躁,阻塞性操作, 當隊列滿時,將一直等待直到有空位隅熙;

get_nowait(): 從隊列中獲取一個item稽煤,非阻塞性操作,當隊列為空時囚戚,將拋出QueueEmpty的異常酵熙。

get(): 從隊列中獲取一個item,阻塞性操作驰坊,當隊列為空時匾二,將一直等待直到有item可用;

task_done(): 該方法通常由消費者處理拳芙,用來表示從隊列獲取的任務已經完成察藐。對于每一個通過get()從隊列獲取的任務,調用該方法會告知隊列任務處理完成;

join(): 阻塞直到隊列中的所有item都已經被處理過舟扎。每個item添加到隊列中時分飞,未完成任務的計數(shù)就會增加;當這些任務調用task_done()時睹限,這個計數(shù)就會減少譬猫;當未完成任務計數(shù)為0時,join()將不再阻塞羡疗。

Asyncio.Semaphore

異步編程處理IO密集型的任務時具有很好的性能染服,但有時我們也會希望限制一下并發(fā)量,這時候就可以使用信號量來達到這個目的叨恨×危基本用法可以參考官方的代碼示例:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市痒钝,隨后出現(xiàn)的幾起案子秉颗,更是在濱河造成了極大的恐慌,老刑警劉巖午乓,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件站宗,死亡現(xiàn)場離奇詭異,居然都是意外死亡益愈,警方通過查閱死者的電腦和手機梢灭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蒸其,“玉大人敏释,你說我怎么就攤上這事∶” “怎么了钥顽?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長靠汁。 經常有香客問我蜂大,道長闽铐,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任奶浦,我火速辦了婚禮兄墅,結果婚禮上,老公的妹妹穿的比我還像新娘澳叉。我一直安慰自己隙咸,他們只是感情好,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布成洗。 她就那樣靜靜地躺著五督,像睡著了一般。 火紅的嫁衣襯著肌膚如雪瓶殃。 梳的紋絲不亂的頭發(fā)上充包,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天,我揣著相機與錄音碌燕,去河邊找鬼误证。 笑死,一個胖子當著我的面吹牛修壕,可吹牛的內容都是我干的愈捅。 我是一名探鬼主播,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼慈鸠,長吁一口氣:“原來是場噩夢啊……” “哼蓝谨!你這毒婦竟也來了?” 一聲冷哼從身側響起青团,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤譬巫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后督笆,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芦昔,經...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年娃肿,在試婚紗的時候發(fā)現(xiàn)自己被綠了咕缎。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡料扰,死狀恐怖凭豪,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情晒杈,我是刑警寧澤嫂伞,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響帖努,放射性物質發(fā)生泄漏撰豺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一拼余、第九天 我趴在偏房一處隱蔽的房頂上張望郑趁。 院中可真熱鬧,春花似錦姿搜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至躲惰,卻和暖如春致份,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背础拨。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工氮块, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人诡宗。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓滔蝉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親塔沃。 傳聞我的和親對象是個殘疾皇子蝠引,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內容