其實(shí)
asyncio
的學(xué)習(xí)一點(diǎn)也不快樂
一、python 的多線程和多進(jìn)程
要想理解 asyncio
的異步編程,需要簡(jiǎn)單了解一下 python
的多線程和多進(jìn)程知識(shí)
-
1岖常、多線程
python
有GIL
機(jī)制,因此,python
的多線程雖然是操作系統(tǒng)的原生線程纵散,但無法完成真正的并行運(yùn)行,而僅僅在線程處于睡眠或者等待I/O
時(shí)隐圾,才會(huì)發(fā)揮真正的多線程作用伍掀。
-
1.1、睡眠
time.sleep()
threading.Lock
- 線程模塊其他同步對(duì)象
-
1.2暇藏、I/O
aiohttp
open
-
1.3蜜笤、釋放
GIL
- 所謂釋放
GIL
是指當(dāng)前線程執(zhí)行一定長(zhǎng)度字節(jié)碼或者一段時(shí)間后,釋放GIL
盐碱,由系統(tǒng)將GIL
分配給其他線程把兔,當(dāng)前線程進(jìn)入等待狀態(tài) -
py2
解釋器每執(zhí)行1000
字節(jié)碼釋放GIL
-
py3
解釋器每執(zhí)行15ms
釋放GIL
- 所謂釋放
-
1.4、
GIL
全局解釋器鎖- 同一進(jìn)程同一時(shí)間只有一個(gè)線程在執(zhí)行字節(jié)碼瓮顽,但睡眠線程或者
I/O
操作相關(guān)線程不受GIL
鎖限制县好,允許并發(fā)執(zhí)行。(GIL
保證同一時(shí)刻只有一個(gè)線程對(duì)共享資源進(jìn)行存取暖混,省去線程間資源鎖的開銷)
- 同一進(jìn)程同一時(shí)間只有一個(gè)線程在執(zhí)行字節(jié)碼瓮顽,但睡眠線程或者
-
1.5缕贡、
GIL
原理/* s.connect((host, port)) method */ static PyObject * sock_connect(PySocketSockObject *s, PyObject *addro) { sock_addr_t addrbuf; int addrlen; int res; /* convert (host, port) tuple to C address */ getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen); Py_BEGIN_ALLOW_THREADS res = connect(s->sock_fd, addr, addrlen); Py_END_ALLOW_THREADS /* error handling and so on .... */ }
-
Py_BEGIN_ALLOW_THREADS
放棄GIL
-
Py_END_ALLOW_THREADS
重新獲取GIL
,一個(gè)線程會(huì)在這個(gè)位置阻塞儒恋,等待另一個(gè)線程釋放鎖善绎;一旦出現(xiàn)這個(gè)情況,等待的線程會(huì)搶奪回鎖诫尽,并恢復(fù)字節(jié)碼的執(zhí)行 - 簡(jiǎn)而言之:允許有N個(gè)線程在網(wǎng)絡(luò)
I/O
堵塞禀酱,或等待重新獲取GIL
,但只有一個(gè)線程運(yùn)行字節(jié)碼
-
-
1.6牧嫉、示例
- 睡眠阻塞
import time from threading import Thread from datetime import datetime def write(i): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) time.sleep(4) print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 輸出結(jié)果 start ... 2018-02-09 23:58:25 start write --> 0 2018-02-09 23:58:25 start write --> 1 2018-02-09 23:58:25 start write --> 2 end ... 2018-02-09 23:58:29 end write --> 0 2018-02-09 23:58:29 end write --> 1 2018-02-09 23:58:29 end write --> 2
- CPU 阻塞
import time from threading import Thread from datetime import datetime def write(n): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) l, sum_ = list(range(100000000)), 0 for i in l: sum_ += i print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 輸出結(jié)果 start ... 2018-02-10 00:13:55 start write --> 0 2018-02-10 00:13:58 start write --> 1 2018-02-10 00:14:02 start write --> 2 end ... 2018-02-10 00:14:27 end write --> 0 2018-02-10 00:14:32 end write --> 1 2018-02-10 00:14:35 end write --> 2
- 總結(jié)
- 對(duì)于睡眠操作或者
I/O
操作剂跟,多線程的作用非常明顯,明顯減少所消耗總時(shí)間酣藻; - 對(duì)于
CPU
計(jì)算型操作曹洽,多線程操作反而因?yàn)槎嗑€程間獲取GIL
而增加總的消耗時(shí)間。
- 對(duì)于睡眠操作或者
-
2辽剧、
python
多進(jìn)程
python
多進(jìn)程即其他語言中的多進(jìn)程概念送淆,不再累述
二、異步編程思想
- 1怕轿、協(xié)程
coroutine
- 2偷崩、任務(wù)
Task
- 3辟拷、事件循環(huán)
loop
1、Task 對(duì)象主要包含 協(xié)程(coro)和輪詢對(duì)象(loop)2個(gè)屬性阐斜;
2衫冻、Loop 對(duì)象使用隊(duì)列和堆數(shù)據(jù)結(jié)構(gòu)存放Handle對(duì)象(綁定了回調(diào)函數(shù),如:task的_step方法等)谒出。隊(duì)列中存放的是可以立即執(zhí)行的任務(wù)隅俘,堆中存放的是一定時(shí)間后要執(zhí)行的任務(wù)。對(duì)于yield from asyncio.sleep() 的任務(wù)則是添加到堆中笤喳,到達(dá)指定時(shí)間后執(zhí)行为居。
# 簡(jiǎn)單的調(diào)用示例
import asyncio
@asyncio.coroutine
def coro_fun():
yield from range(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro_fun())
# or
tasks = [asyncio.ensure_future(coro_fun())]
loop.run_until_complete(asyncio.wait(tasks))
三、什么是協(xié)程
進(jìn)程或線程間的創(chuàng)建依賴于系統(tǒng)底層進(jìn)程或線程庫莉测,其運(yùn)行也依賴于系統(tǒng)的任務(wù)調(diào)度系統(tǒng)颜骤,在任務(wù)切換時(shí),
cpu
需要進(jìn)行上下文切換捣卤。
協(xié)程是運(yùn)行在單線程上忍抽,協(xié)程間的切換是在語言層級(jí)實(shí)現(xiàn)的,依賴于對(duì)應(yīng)協(xié)程庫董朝。
- 在單線程執(zhí)行過程中鸠项,如果涉及
sleep
,網(wǎng)絡(luò)IO
操作時(shí)子姜,線程會(huì)阻塞住等待任務(wù)完成祟绊; - 但如果使用協(xié)程,輪詢對(duì)象(
loop
)在輪詢事件時(shí)哥捕,會(huì)分別處理就緒對(duì)象_ready
和調(diào)度對(duì)象_scheduled
以及select
監(jiān)聽對(duì)象牧抽。每次進(jìn)行輪詢時(shí),會(huì)篩選出調(diào)度對(duì)象中滿足執(zhí)行條件的對(duì)象以及select
監(jiān)聽到可讀或可寫的對(duì)象遥赚,添加到就緒對(duì)象中扬舒,由loop
對(duì)象進(jìn)行循環(huán)調(diào)度。_ready
-
_ready
+= 滿足執(zhí)行條件的_scheduled
對(duì)象 -
_ready
+=select
監(jiān)聽到的可讀或可寫對(duì)象 -
loop
遍歷執(zhí)行_ready
中的對(duì)象
四凫佛、什么是期物
期物對(duì)象的設(shè)計(jì)初衷是讲坎,期物用來追蹤任務(wù)或者協(xié)程的運(yùn)行狀態(tài)。一般使用中愧薛,期物用來追蹤
_ready
,_scheduled
,select
監(jiān)聽的對(duì)象晨炕,在各對(duì)象執(zhí)行完成后設(shè)置期物對(duì)象狀態(tài)為FINISHED
,并將設(shè)置_loop
輪詢對(duì)象狀態(tài)為close
的函數(shù)注冊(cè)到loop
對(duì)象的_ready
隊(duì)列中毫炉,由loop
對(duì)象輪詢完成瓮栗。
五、源代碼分析
- 關(guān)于 _ code _.co_flags
# 每個(gè)函數(shù)或方法都有 __code__ 魔法方法 以及其對(duì)應(yīng)的 co_flags 值 # 在 Cpython 中, 1费奸、生成器函數(shù)的標(biāo)識(shí)符為 CO_GENERATOR 即 0x20鲸郊, 2、協(xié)程函數(shù)的標(biāo)識(shí)符為 CO_COROUTINE 即 0x180 3货邓、CO_ITERABLE_COROUTINE 即 0x100 # 通過對(duì)函數(shù)對(duì)象的 __code__.co_flags 與 對(duì)應(yīng)的標(biāo)識(shí)符做位與運(yùn)算,如果是真值四濒,則表明函數(shù)對(duì)象屬于生成器函數(shù)或協(xié)程函數(shù) def gen_fun(): yield from range(10) >>> gen_fun.__code__.co_flags # 99 >>> 99 & 0x20 # 32, True >>> 99 & 0x180 # 0, False async def asy_fun(): await sleep(4) >>> asy_fun.__code__.co_flags # 227 >>> 99 & 0x20 # 32, True >>> 227 & 0x180 # 128, True
- 關(guān)于類型判斷
from collections import Iterator, Awaitable # 判斷迭代器 和 Awaitable 對(duì)象 class A: def __iter__(self): return iter([1,2,3,4,5]) def __await__(self): return iter([1,2,3,4,5]) a = A() >>> isinstance(a, Iterator) # True >>> isinstance(a, Awaitable) # True # 判斷是否為協(xié)程等 import inspect async def asy_fun(): await a >>> inspect.iscoroutine(asy_fun()) # True
- @asyncio.coroutine
def coroutine(func): # 將一個(gè)生成器標(biāo)記為協(xié)程换况,如果在destroyed前沒有調(diào)用,則會(huì)記錄錯(cuò)誤 # 這個(gè)方法是使用 inspect.iscoroutinefunction 方法判斷是否為協(xié)程方法盗蟆,使用 types.coroutine 裝飾的生成器戈二,或 async def 語法定義的函數(shù)都會(huì)返回 True if _inspect_iscoroutinefunction(func): return func # 使用 co_flags 判斷是否為生成器 if inspect.isgeneratorfunction(func): coro = func else: @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) # 判斷 res 是否為期物,生成器 或 協(xié)程包裝類 實(shí)例 if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ isinstance(res, CoroWrapper): res = yield from res elif _AwaitableABC is not None: # py 3.5 才會(huì)有 Awaitable 類 try: # 如果有 __await__屬性喳资,__await__屬性只會(huì)返回一個(gè)不是協(xié)程的迭代器 await_meth = res.__await__ except AttributeError: pass else: # 如果是 Awaitable 對(duì)象 if isinstance(res, _AwaitableABC): # 使用 yield from 處理其迭代器 res = yield from await_meth() return res # 使用 types.coroutine 包裝 coro(注意觉吭,多層 @types.coroutine 裝飾不會(huì)影響,會(huì)直接return裝飾的值) if not _DEBUG: if _types_coroutine is None: wrapper = coro else: wrapper = _types_coroutine(coro) else: @functools.wraps(func) def wrapper(*args, **kwds): # 使用協(xié)程包裝器處理 w = CoroWrapper(coro(*args, **kwds), func=func) if w._source_traceback: del w._source_traceback[-1] # 如果是 py 3.5 則包裝增加 協(xié)程 對(duì)象的屬性,否則包裝為 生成器 對(duì)象的屬性 w.__name__ = getattr(func, '__name__', None) w.__qualname__ = getattr(func, '__qualname__', None) return w # 用以別處使用 asyncio.iscoroutinefunction() 判斷為 True 的作用 wrapper._is_coroutine = True # For iscoroutinefunction(). return wrapper
- @types.coroutine
def coroutine(func): # 將一個(gè)普通的生成器函數(shù)轉(zhuǎn)化為協(xié)程 if not callable(func): raise TypeError('types.coroutine() expects a callable') if (func.__class__ is FunctionType and getattr(func, '__code__', None).__class__ is CodeType): # 獲取函數(shù)的 co_flags co_flags = func.__code__.co_flags # 檢查是否為協(xié)程函數(shù) if co_flags & 0x180: return func # 檢查是否為生成器函數(shù)仆邓,此步主要作用是將生成器的 co_flags 同 0x100 做位或運(yùn)算鲜滩,將其標(biāo)識(shí)變更為協(xié)程標(biāo)識(shí) if co_flags & 0x20: # TODO: Implement this in C. co = func.__code__ func.__code__ = CodeType( co.co_argcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, co.co_flags | 0x100, # 0x100 == CO_ITERABLE_COROUTINE co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars) return func # 用以支持類似生成器的對(duì)象 @_functools.wraps(func) def wrapped(*args, **kwargs): coro = func(*args, **kwargs) # 協(xié)程或 co_flags 大于 256 的生成器對(duì)象,直接返回 if (coro.__class__ is CoroutineType or coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100): return coro if (isinstance(coro, _collections_abc.Generator) and not isinstance(coro, _collections_abc.Coroutine)): # 實(shí)現(xiàn)了生成器抽象類的方法节值,使用生成器包裝器處理成生成器 return _GeneratorWrapper(coro) # 協(xié)程抽象類實(shí)例或其他對(duì)象 return coro return wrapped