Python 將async函數(shù)轉(zhuǎn)為基于同一個(gè)EventLoop實(shí)例運(yùn)行的sync函數(shù)

背景:

  1. 主體業(yè)務(wù)使用的是基于async函數(shù)的異步處理的框架驰怎;
  2. 連接池等資源基于EventLoop進(jìn)行緩存二打,復(fù)用和調(diào)用;
  3. 需要Celery進(jìn)行后臺(tái)任務(wù)症杏,目前版本Celery對(duì)于async并不能良好支持厉颤,需要把a(bǔ)sync轉(zhuǎn)為sync;
  4. 如果每次生成一個(gè)新的EventLoop實(shí)例會(huì)導(dǎo)致連接池等資源無(wú)法得到重用精肃。

目標(biāo):

  • 構(gòu)建一個(gè)裝飾器可以將async函數(shù)轉(zhuǎn)為sync函數(shù)并在執(zhí)行時(shí)重用EventLoop實(shí)例帜乞。

其他:

  • 之前一直使用asgiref將async函數(shù)轉(zhuǎn)化為sync進(jìn)行黎烈,然而在一般使用場(chǎng)景下,async_to_sync每次調(diào)用會(huì)創(chuàng)建一個(gè)新的EventLoop實(shí)例津畸,并以run_必怜。所以每次都會(huì)重新創(chuàng)建一套連接池資源梳庆,并在下次獲取資源時(shí)發(fā)現(xiàn)其對(duì)應(yīng)的EventLoop實(shí)例已關(guān)閉后將其全部釋放。

代碼:

import asyncio
import functools
import threading
from typing import Any, Optional

# 設(shè)置全局的EventLoop
LOOP = asyncio.get_event_loop()


class CallResult:

    result: Any = None
    exception: Optional[BaseException] = None


# async_to_sync 裝飾器
def async_to_sync(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        call_result = CallResult()
        event = threading.Event()  # 用于阻塞等待運(yùn)行結(jié)果

        async def wrapper2():
            try:
                call_result.result = await func(*args, **kwargs)
            except BaseException as e:
                call_result.exception = e  # 寫(xiě)入異常
            finally:
                event.set()

        # 使用 全局EventLoop將wrapper2以task的方式執(zhí)行
        LOOP.call_soon_threadsafe(LOOP.create_task, wrapper2())
        event.wait()  # 等待event激活 返回結(jié)果
        if call_result.exception:
            raise call_result.exception
        return call_result.result

    return wrapper


# 正常的異步函數(shù)
@async_to_sync
async def go():
    print('current_loop: ', id(asyncio.get_event_loop()))
    print('GLOBAL LOOP: ', id(LOOP))
    print('current_loop is GLOBAL LOOP: ', LOOP is asyncio.get_event_loop())


# 拋出異常的異步函數(shù)
@async_to_sync
async def raise_value_error():
    raise ValueError(id(LOOP))

# 下述loop_thread, start_loop, stop_loop可整合為一個(gè)類(lèi),這里為了方便閱讀寫(xiě)成函數(shù)調(diào)用
loop_thread: threading


# 將全局EventLoop設(shè)為運(yùn)行狀態(tài)
def start_loop():
    global loop_thread
    loop_thread = threading.Thread(target=LOOP.run_forever)
    loop_thread.start()


# 將全局EventLoop關(guān)閉更米,正常的服務(wù)可以不用寫(xiě)征峦,這個(gè)是為了示例代碼可以正常結(jié)束用的
def stop_loop():

    @async_to_sync
    async def stop():
        print('Loop stop')
        LOOP.stop()

    stop()
    print('Loop close')
    LOOP.close()
    loop_thread.join()


if __name__ == '__main__':
    start_loop()
    for i in range(10):
        print(f'------------{i:02}------------')
        go()
    print('[run go] end')
    try:
        raise_value_error()
    except ValueError:
        import traceback
        # 使用標(biāo)準(zhǔn)輸出,確保內(nèi)容輸出順序一致
        print(traceback.format_exc())
        print('[raise_value_error] end')

    print('stop loop')
    stop_loop()
    print('stop loop called', flush=True)

輸出如下:

------------00------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------01------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------02------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------03------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------04------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------05------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------06------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------07------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------08------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
------------09------------
current_loop:  2224110182984
GLOBAL LOOP:  2224110182984
current_loop is GLOBAL LOOP:  True
[run go] end
Traceback (most recent call last):
  File "<PythonFile>", line 86, in <module>
    raise_value_error()
  File "<PythonFile>", line 35, in wrapper
    raise call_result.exception
  File "<PythonFile>", line 25, in wrapper2
    call_result.result = await func(*args, **kwargs)
  File "<PythonFile>", line 52, in raise_value_error
    raise ValueError(id(LOOP))
ValueError: 2224110182984

[raise_value_error] end
stop loop
Loop stop
Loop close
stop loop called
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市蚜枢,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌厂抽,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件户盯,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡莽鸭,警方通過(guò)查閱死者的電腦和手機(jī)硫眨,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)巧号,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人丹鸿,你說(shuō)我怎么就攤上這事靠欢⊥埽” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵掷空,是天一觀(guān)的道長(zhǎng)拣帽。 經(jīng)常有香客問(wèn)我减拭,道長(zhǎng)拧粪,這世上最難降的妖魔是什么修陡? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮癣朗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘绢记。我一直安慰自己正卧,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布签孔。 她就那樣靜靜地躺著饥追,像睡著了一般罐盔。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上壁熄,一...
    開(kāi)封第一講書(shū)人閱讀 49,816評(píng)論 1 290
  • 那天草丧,我揣著相機(jī)與錄音莹桅,去河邊找鬼诈泼。 笑死,一個(gè)胖子當(dāng)著我的面吹牛铐达,可吹牛的內(nèi)容都是我干的瓮孙。 我是一名探鬼主播,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼脸甘,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼丹诀!你這毒婦竟也來(lái)了铆遭?” 一聲冷哼從身側(cè)響起硝桩,我...
    開(kāi)封第一講書(shū)人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎疚脐,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體邢疙,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡棍弄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了疟游。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呼畸。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖颁虐,靈堂內(nèi)的尸體忽然破棺而出蛮原,到底是詐尸還是另有隱情,我是刑警寧澤另绩,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布儒陨,位于F島的核電站蹦漠,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏侍芝。R本人自食惡果不足惜棵红,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望浩峡。 院中可真熱鬧,春花似錦纸淮、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)应役。三九已至箩祥,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間盲泛,已是汗流浹背屈雄。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工奶赔, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人绞旅。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓勺爱,卻偏偏與公主長(zhǎng)得像卫旱,于是被迫代替她去往敵國(guó)和親誊涯。 傳聞我的和親對(duì)象是個(gè)殘疾皇子跪呈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容