背景:
- 主體業(yè)務(wù)使用的是基于async函數(shù)的異步處理的框架驰怎;
- 連接池等資源基于EventLoop進(jìn)行緩存二打,復(fù)用和調(diào)用;
- 需要Celery進(jìn)行后臺(tái)任務(wù)症杏,目前版本Celery對(duì)于async并不能良好支持厉颤,需要把a(bǔ)sync轉(zhuǎn)為sync;
- 如果每次生成一個(gè)新的EventLoop實(shí)例會(huì)導(dǎo)致連接池等資源無(wú)法得到重用精肃。
目標(biāo):
- 構(gòu)建一個(gè)裝飾器可以將async函數(shù)轉(zhuǎn)為sync函數(shù)并在執(zhí)行時(shí)重用EventLoop實(shí)例帜乞。
其他:
- 之前一直使用asgiref將async函數(shù)轉(zhuǎn)化為sync進(jìn)行黎烈,然而在一般使用場(chǎng)景下,async_to_sync每次調(diào)用會(huì)創(chuàng)建一個(gè)新的EventLoop實(shí)例津畸,并以run_必怜。所以每次都會(huì)重新創(chuàng)建一套連接池資源梳庆,并在下次獲取資源時(shí)發(fā)現(xiàn)其對(duì)應(yīng)的EventLoop實(shí)例已關(guān)閉后將其全部釋放。
代碼:
import asyncio
import functools
import threading
from typing import Any, Optional
# 設(shè)置全局的EventLoop
LOOP = asyncio.get_event_loop()
class CallResult:
result: Any = None
exception: Optional[BaseException] = None
# async_to_sync 裝飾器
def async_to_sync(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
call_result = CallResult()
event = threading.Event() # 用于阻塞等待運(yùn)行結(jié)果
async def wrapper2():
try:
call_result.result = await func(*args, **kwargs)
except BaseException as e:
call_result.exception = e # 寫(xiě)入異常
finally:
event.set()
# 使用 全局EventLoop將wrapper2以task的方式執(zhí)行
LOOP.call_soon_threadsafe(LOOP.create_task, wrapper2())
event.wait() # 等待event激活 返回結(jié)果
if call_result.exception:
raise call_result.exception
return call_result.result
return wrapper
# 正常的異步函數(shù)
@async_to_sync
async def go():
print('current_loop: ', id(asyncio.get_event_loop()))
print('GLOBAL LOOP: ', id(LOOP))
print('current_loop is GLOBAL LOOP: ', LOOP is asyncio.get_event_loop())
# 拋出異常的異步函數(shù)
@async_to_sync
async def raise_value_error():
raise ValueError(id(LOOP))
# 下述loop_thread, start_loop, stop_loop可整合為一個(gè)類(lèi),這里為了方便閱讀寫(xiě)成函數(shù)調(diào)用
loop_thread: threading
# 將全局EventLoop設(shè)為運(yùn)行狀態(tài)
def start_loop():
global loop_thread
loop_thread = threading.Thread(target=LOOP.run_forever)
loop_thread.start()
# 將全局EventLoop關(guān)閉更米,正常的服務(wù)可以不用寫(xiě)征峦,這個(gè)是為了示例代碼可以正常結(jié)束用的
def stop_loop():
@async_to_sync
async def stop():
print('Loop stop')
LOOP.stop()
stop()
print('Loop close')
LOOP.close()
loop_thread.join()
if __name__ == '__main__':
start_loop()
for i in range(10):
print(f'------------{i:02}------------')
go()
print('[run go] end')
try:
raise_value_error()
except ValueError:
import traceback
# 使用標(biāo)準(zhǔn)輸出,確保內(nèi)容輸出順序一致
print(traceback.format_exc())
print('[raise_value_error] end')
print('stop loop')
stop_loop()
print('stop loop called', flush=True)
輸出如下:
------------00------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------01------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------02------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------03------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------04------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------05------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------06------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------07------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------08------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
------------09------------
current_loop: 2224110182984
GLOBAL LOOP: 2224110182984
current_loop is GLOBAL LOOP: True
[run go] end
Traceback (most recent call last):
File "<PythonFile>", line 86, in <module>
raise_value_error()
File "<PythonFile>", line 35, in wrapper
raise call_result.exception
File "<PythonFile>", line 25, in wrapper2
call_result.result = await func(*args, **kwargs)
File "<PythonFile>", line 52, in raise_value_error
raise ValueError(id(LOOP))
ValueError: 2224110182984
[raise_value_error] end
stop loop
Loop stop
Loop close
stop loop called