花了大概三天時(shí)間閱讀了這篇500 line or less|A Web Crawler With asyncio Coroutines
這應(yīng)該就是真正的深入淺出吧枕屉,不僅對(duì)python3.4 coroutine
進(jìn)行了詳細(xì)的闡述被饿,而且源碼寫的很清晰(主要邏輯大概一共的是300-400行代碼坎炼,寫的很Pythonic
)
就放一下自己的學(xué)習(xí)記錄和整理概耻,方便以后查閱和理解封恰,感興趣的同學(xué)強(qiáng)烈建議去看原文葱弟。
全文都在圍繞一個(gè)coroutine
來進(jìn)行講解,從并發(fā)和并行的問題绑警,講到使用事件循環(huán)和非阻塞實(shí)現(xiàn)的異步框架求泰,講到yield
生成器,以及yield from
生成器代理這么一個(gè)東西计盒、以及如何asyncio的簡(jiǎn)單實(shí)現(xiàn)渴频,最后是基于生成器的coroutine程序,也就是爬蟲的實(shí)現(xiàn)北启。
重新理解yield
yield 常用于生成器卜朗,我們都知道當(dāng)解釋器看到y(tǒng)ield不會(huì)立刻執(zhí)行它拔第,一直等到將初始化,如給它send(None)
场钉,然后它會(huì)運(yùn)行到y(tǒng)ield處暫停將蚊俺。
定義一個(gè)fib生成器:
>>> def gen_fn():
... a,b = 1,1
... yield a
... n = 0
... while n<100:
... a,b = b,a+b
... yield a
... n += 1
python編譯后字節(jié)碼
dis.dis(gen_fn)
2 0 LOAD_CONST 4 ((1, 1))
3 UNPACK_SEQUENCE 2
6 STORE_FAST 0 (a)
9 STORE_FAST 1 (b)
3 12 LOAD_FAST 0 (a)
15 YIELD_VALUE
16 POP_TOP
4 17 LOAD_CONST 2 (0)
20 STORE_FAST 2 (n)
5 23 SETUP_LOOP 48 (to 74)
>> 26 LOAD_FAST 2 (n)
29 LOAD_CONST 3 (100)
32 COMPARE_OP 0 (<)
35 POP_JUMP_IF_FALSE 73
6 38 LOAD_FAST 1 (b)
41 LOAD_FAST 0 (a)
44 LOAD_FAST 1 (b)
47 BINARY_ADD
48 ROT_TWO
49 STORE_FAST 0 (a)
52 STORE_FAST 1 (b)
7 55 LOAD_FAST 0 (a)
58 YIELD_VALUE
59 POP_TOP
8 60 LOAD_FAST 2 (n)
63 LOAD_CONST 1 (1)
66 INPLACE_ADD
67 STORE_FAST 2 (n)
70 JUMP_ABSOLUTE 26
>> 73 POP_BLOCK
>> 74 LOAD_CONST 0 (None)
77 RETURN_VALUE
>>> len(fn.gi_code.co_code)
78
可以看到一共78行字節(jié)碼,注意第15行: 15 YIELD_VALUE
>>> fn.send(None)
1
可以看到逛万,當(dāng)給它發(fā)送一個(gè)None
泳猬,程序就會(huì)開始運(yùn)行,此時(shí)程序解釋到yield
便停下來了
>>> fn.gi_frame.f_lasti
15
這就是生成器的定位變量宇植,last position
,這個(gè)記錄了生成器運(yùn)行停止的位置得封。當(dāng)下次運(yùn)行就可以繼續(xù)從這里運(yùn)行
>>> fn.gi_frame.f_locals
{'a': 1, 'b': 1}
這就是生成器的模擬棧的保存變量的數(shù)據(jù)結(jié)構(gòu)
>>> fn.send(None)
1
>>> fn.gi_frame.f_locals
{'n': 0, 'a': 1, 'b': 2}
>>> fn.send(None)
2
>>> fn.gi_frame.f_locals
{'n': 1, 'a': 2, 'b': 3}
所以,基于yield的生成器就有個(gè)懶加載的特性指郁。
需要注意的是生成器和傳統(tǒng)的函數(shù)不一樣呛每,它的切換是不是靠實(shí)打?qū)嵉臈肀4嫔舷挛那袚Q的相關(guān)信息的,而是靠在堆中創(chuàng)建的棧模擬對(duì)象來實(shí)現(xiàn)的坡氯,所以對(duì)于python程序而言晨横,利用yield
就可以自己調(diào)度自己的程序。這就是一個(gè)coroutine
的原型箫柳,之后再提
傳統(tǒng)的函數(shù)調(diào)用是這樣: Python解釋器是一個(gè)正常的C程序手形,所以它的堆棧幀是正常的堆棧幀,當(dāng)一個(gè)函數(shù)調(diào)用一個(gè)子函數(shù)悯恍,這個(gè)被調(diào)用函數(shù)獲得控制權(quán)库糠。直到它返回或者有異常發(fā)生,才把控制權(quán)交給調(diào)用者涮毫。
>>> import inspect
>>> frame = None
>>> def bar():
... a = 1
... foo(a)
...
>>> def foo(a):
... print(a)
... global frame
... frame = inspect.currentframe()
>>> bar()
1
>>> frame.f_code.co_name
'foo'
>>> frame.f_back
<frame object at 0x10254a8>
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'bar'
可以看出記錄關(guān)系
yield from
看看yield from 這也是一個(gè)很有意思的東西:具備yield from
的函數(shù)我們也將其作為一個(gè)生成器瞬欧,它的特點(diǎn)是:內(nèi)部還有一個(gè)生成器。
>>> def gen_fn():
... result = yield 1
... print('result of yield: {}'.format(result))
... result2 = yield 2
... print('result2 of yield: {}'.format(result2))
... return 'done'
...
>>> def call_fn():
... gen = gen_fn()
... rv = yield from gen
... print('return values of yield-from: {}'.format(rv))
>>> caller = call_fn()
>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti
15
>>> import dis
>>> dis.dis(caller)
2 0 LOAD_GLOBAL 0 (gen_fn)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 STORE_FAST 0 (gen)
3 9 LOAD_FAST 0 (gen)
12 GET_YIELD_FROM_ITER
13 LOAD_CONST 0 (None)
16 YIELD_FROM
17 STORE_FAST 1 (rv)
4 20 LOAD_GLOBAL 1 (print)
23 LOAD_CONST 1 ('return values of yield-from: {}')
26 LOAD_ATTR 2 (format)
29 LOAD_FAST 1 (rv)
32 CALL_FUNCTION 1 (1 positional, 0 keyword pair)
35 CALL_FUNCTION 1 (1 positional, 0 keyword pair)
38 POP_TOP
39 LOAD_CONST 0 (None)
42 RETURN_VALUE
函數(shù)在執(zhí)行每個(gè)語句之前遞增其指令指針(PC)罢防,但是在外部生成器執(zhí)行yield from
之后艘虎,它從其指令指針中減去1以保持自己固定在yield from
語句。所以caller.gi_frame.f_lasti
停在了15行
它的效果是咒吐,使用yield from
的調(diào)用者和內(nèi)部的生成器之間建立了一個(gè)數(shù)據(jù)交流的管道野建。
比如caller
再通過send(xxx)
發(fā)送一個(gè)值,會(huì)直接發(fā)送到gen這個(gè)生成器中恬叹。生成器接著執(zhí)行任務(wù)候生,再執(zhí)行到yield
語句再返回,這個(gè)數(shù)據(jù)會(huì)通過yield from
傳遞到最外面的caller
绽昼,線程切換到最外層邏輯執(zhí)行唯鸭,直到caller再次發(fā)送信息,以此循環(huán)硅确。直到內(nèi)部發(fā)生器拋出StopIteration
目溉。期間唠粥,caller.gi_frame.f_lasti
一直停在了15行
需要注意的是這個(gè)StopIteration
可以在caller的邏輯中catch
,也可以在內(nèi)部yield from
處進(jìn)行捕獲停做。如果你想讓yield from
下的程序繼續(xù)執(zhí)行的話就需要在yield from
捕獲
異步io和事件循環(huán)
在看異步io之前先看段同步的代碼:
#!/usr/bin/python
# coding: utf-8
import socket
def fetch(link):
sock = socket.socket()
sock.connect(('www.zhxfei.com',80))
request = 'GET {} HTTP/1.0\r\nHOST: zhxfei.com\r\n\r\n'.format(link)
sock.send(request.encode('ascii'))
response = b''
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
print(response)
fetch('/')
這是一個(gè)簡(jiǎn)單的同步阻塞的套接字客戶端程序,調(diào)用sock.connect(())
是阻塞的大莫,其在client端發(fā)送第三次握手報(bào)文之后返回蛉腌,即在TCP前兩次握手,此程序不能繼續(xù)往下執(zhí)行只厘,等待connect
返回烙丛。
而一個(gè)非阻塞的程序是這樣的:
import socket
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('zhxfei.com', 80))
except BlockingIOError:
pass
request = 'GET /353/ HTTP/1.0\r\nHost: zhxfei.com\r\n\r\n'
encoded = request.encode('ascii')
... so on
當(dāng)sock.connect
發(fā)起調(diào)用便直接返回了,去準(zhǔn)備待會(huì)要發(fā)送的request
,非阻塞經(jīng)常和事件驅(qū)動(dòng)連用羔味,在linux上就是epoll
了河咽。
epoll 使用一個(gè)文件描述符管理多個(gè)描述符,將用戶關(guān)系的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中赋元,這樣在用戶空間和內(nèi)核空間的copy只需一次忘蟹。epoll支持水平觸發(fā)和邊緣觸發(fā),最大的特點(diǎn)在于「邊緣觸發(fā)」搁凸,它只告訴進(jìn)程哪些剛剛變?yōu)榫途w態(tài)媚值,并且只會(huì)通知一次。
epoll技術(shù)的編程模型就是異步非阻塞回調(diào)护糖,也可以叫做Reactor褥芒、事件驅(qū)動(dòng)、事件輪循(EventLoop)嫡良、libevent锰扶、Tornado、Node.js這些就是epoll時(shí)代的產(chǎn)物寝受。
這里提到了事件循環(huán):事件循環(huán)就“是一種等待程序分配事件或消息的編程架構(gòu)”坷牛。基本上來說事件循環(huán)就是很澄,“A發(fā)起調(diào)用前漓帅,注冊(cè)時(shí)間循環(huán),當(dāng)A發(fā)起調(diào)用痴怨,直接返回到事件循環(huán)忙干,當(dāng)循環(huán)監(jiān)聽到A注冊(cè)的具體事件,執(zhí)行B”浪藻。引用網(wǎng)上的一段翻譯文檔:
最簡(jiǎn)單的例子來解釋這一概念就是用每個(gè)瀏覽器中都存在的JavaScript事件循環(huán)捐迫。當(dāng)你點(diǎn)擊了某個(gè)東西(“當(dāng)A發(fā)生時(shí)”),這一點(diǎn)擊動(dòng)作會(huì)發(fā)送給JavaScript的事件循環(huán)爱葵,并檢查是否存在注冊(cè)過的 onclick 回調(diào)來處理這一點(diǎn)擊(“執(zhí)行B”)施戴。只要有注冊(cè)過的回調(diào)函數(shù)就會(huì)伴隨點(diǎn)擊動(dòng)作的細(xì)節(jié)信息被執(zhí)行反浓。事件循環(huán)被認(rèn)為是一種循環(huán)是因?yàn)樗煌5厥占录⑼ㄟ^循環(huán)來發(fā)如何應(yīng)對(duì)這些事件。事件循環(huán)也經(jīng)常用于在別的線程或子進(jìn)程中執(zhí)行代碼赞哗,并將事件循環(huán)作為調(diào)節(jié)機(jī)制(例如雷则,合作式多任務(wù))。如果你恰好理解 Python 的 GIL肪笋,事件循環(huán)對(duì)于需要釋放 GIL 的地方很有用
舉一個(gè)例子:
#!/usr/bin/env python3
# coding:utf-8
import threading
import socket
import time
from selectors import DefaultSelector,\
EVENT_WRITE,\
EVENT_READ
host = 'baidu.com'
selector = DefaultSelector()
class Fetcher:
def __init__(self,url):
self.url = url
self.res = b''
self.sock = None
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
print('Thread {} : socket init'.format(threading.currentThread()))
try:
self.sock.connect((host,80))
except BlockingIOError:
pass
selector.register(self.sock.fileno(),
EVENT_WRITE,
self.connneted)
def connneted(self,key,mask):
selector.unregister(key.fd)
print('connected....')
print('Thread {} : socket connected'.format(
threading.currentThread()))
request = 'GET {} HTTP/1.1\r\nHOST: {}\r\n\r\n'.format(
self.url,host).encode('utf-8')
self.sock.send(request)
selector.register(key.fd,
EVENT_READ,
self.read_res)
def read_res(self,key,mask):
print('Thread {} : socket readable'.format(
threading.currentThread()))
chunk = self.sock.recv(4096)
if chunk:
self.res += chunk
else:
selector.unregister(key.fd)
'''
a crawler may have some code just like:
global stopped #全局停止標(biāo)志位
links = self.parse_link(self.response) #解析出link
to_do_link.add(links) #將link添加到to do list
seen_link.add(self.url) #將self.url添加到已經(jīng)download過的集合中
to_do_link.remove(links) #將self.url在to do list集合中刪除
if not to_do_link:
stopped == True
'''
def loop():
fetcher = Fetcher('/')
fetcher.fetch()
while True:
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback(event_key,event_mask)
print('callback time:{}'.format(time.time()))
print(fetcher.res)
loop()
zhxfei@zhxfei-HP-ENVY-15-Notebook-PC:~/code/python/500lines$ sudo python3 non_blocking_test.py
Thread <_MainThread(MainThread, started 139838592571136)> : socket init
connected....
Thread <_MainThread(MainThread, started 139838592571136)> : socket connected
callback time:1489293273.1518176
b''
Thread <_MainThread(MainThread, started 139838592571136)> : socket readable
callback time:1489293273.182348
b'HTTP/1.1 200 OK\r\nDate: Sun, 12 Mar 2017 04:34:33 GMT\r\nServer: Apache\r\nLast-Modified: Tue, 12 Jan 2010 13:48:00 GMT\r\nETag: "51-47cf7e6ee8400"\r\nAccept-Ranges: bytes\r\nContent-Length: 81\r\nCache-Control: max-age=86400\r\nExpires: Mon, 13 Mar 2017 04:34:33 GMT\r\nConnection: Keep-Alive\r\nContent-Type: text/html\r\n\r\n<html>\n<meta http-equiv="refresh" content="0;url=http://www.baidu.com/">\n</html>\n'
這個(gè)demo執(zhí)行邏輯就是:發(fā)起調(diào)用前在事件循環(huán)上注冊(cè)信息和回調(diào)函數(shù)月劈,當(dāng)非阻塞的調(diào)用返回之后,回到事件循環(huán)藤乙,由事件循環(huán)監(jiān)聽對(duì)應(yīng)的注冊(cè)的信息猜揪,如套接字可讀,可寫等事件發(fā)生坛梁,發(fā)起調(diào)用而姐。
在這里connect
注冊(cè)了可寫,發(fā)起connect
非阻塞調(diào)用划咐,之后回到事件循環(huán)拴念,監(jiān)聽事件發(fā)生,socket可寫之后褐缠,調(diào)用callback函數(shù)connected
丈莺,在回調(diào)函數(shù)中,發(fā)送http請(qǐng)求送丰,并在此注冊(cè)可讀事件缔俄,以及讀事件發(fā)生的回調(diào)函數(shù),之后再次回到事件循環(huán)器躏,事件循環(huán)監(jiān)聽事件讀發(fā)生俐载,再次調(diào)用read_res
,就這樣登失,我們的邏輯將靠著注冊(cè)->event loop->callback回調(diào)
運(yùn)行下去遏佣。需要注意的是,這里進(jìn)行操作的是一個(gè)線程揽浙。
這個(gè)程序使用了一個(gè)selector
的模塊状婶,這個(gè)模塊將epoll
、kqueue
等等系統(tǒng)級(jí)異步IO接口抽象成Selector
類型馅巷,規(guī)定了統(tǒng)一的對(duì)外接口膛虫,于是程序只管使用selector
的接口就行了。一般使用selectors.DefaultSelector
就好了钓猬,它是這個(gè)模塊根據(jù)你的系統(tǒng)自動(dòng)幫你選擇的最合適的Selector
稍刀。
我們可以知道當(dāng)?shù)却齀/O操作時(shí),一個(gè)函數(shù)必須明確的保存它的狀態(tài),因?yàn)樗鼤?huì)在I/O操作完成之前返回并清除棧幀账月。一個(gè)基于回調(diào)的異步框架综膀,語言特性不能幫助我們保存程序的狀態(tài)。
為了在我們基于回調(diào)的例子中代替局部變量局齿,我們把sock
和response
作為Fetcher
實(shí)例self
屬性剧劝。為了代替指令指針,通過注冊(cè)connnected
和read_response
回調(diào)來保存continuation抓歼。
這種完全依賴于callback的程序有很多問題讥此,如
- 意大利面條式的程序
- 因?yàn)椤眘tack ripping”問題而非常難于調(diào)試
所以大神們想出了使用yield
生成器進(jìn)行回調(diào),上面提到過锭部,生成器可以保存相關(guān)的信息,在適當(dāng)?shù)臅r(shí)候再切換回生成器繼續(xù)執(zhí)行面褐,這樣看起來拌禾,這天生就是一個(gè)callback
。
但是需要注意的是展哭,既然我們用生成器湃窍,就要考慮幾個(gè)問題。
- 如何優(yōu)雅的初始化和退出生成器
- 如何控制生成器讓其進(jìn)行回調(diào)
- 每次切換回生成器如何恢復(fù)其狀態(tài)
如下面這段代碼:
#!/usr/bin/env python
# coding:utf-8
import socket
from selectors import DefaultSelector,\
EVENT_READ,\
EVENT_WRITE
selector = DefaultSelector()
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
class Task:
def __init__(self, coro):
self.coro = coro
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.coro.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(address)
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f
selector.unregister(sock.fileno())
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096)) # Read 4k at a time.
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
class Fetcher:
def __init__(self, url):
self.response = b''
self.url = url
def fetch(self):
sock = socket.socket()
yield from connect(sock, ('zhxfei.com', 80))
get = 'GET {} HTTP/1.0\r\nHost: zhxfei.com\r\n\r\n'.format(self.url)
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
'''
the program is not end
'''
def loop():
fetcher = Fetcher('/')
Task(fetcher.fetch())
while True:
events = selector.select()
for event_key,event_mask in events:
callback = event_key.data
callback(event_key,event_mask)
loop()
終于到這段程序了匪傍,這段程序困擾了我一個(gè)晚上(僅供分析)您市,上面的程序是簡(jiǎn)化版本。放兩張圖感受下:
OK役衡,上面我們可以看見:
它用了一個(gè)Task
來控制生成器的初始化和結(jié)束茵休,并且控制生成器的回調(diào),它是一個(gè)生成器最頂層的caller
手蝎。在它控制的子程序中榕莺,還嵌套yield from
,在子子程序中,也許還嵌套著yield from
棵介,到處使用這種yield from
的好處是可以解耦這個(gè)協(xié)同工作的程序的實(shí)現(xiàn)和調(diào)用钉鸯,符合了開閉原則。
它用一個(gè)
future
表示了生成器在切換的時(shí)候希望保存的一些信息如results
以及什么時(shí)候進(jìn)行回調(diào)邮辽。我們?cè)谑录h(huán)上進(jìn)行了回調(diào)函數(shù)的操作唠雕,這個(gè)回調(diào)函數(shù)是希望生成器繼續(xù)運(yùn)行的保障,它實(shí)際上注冊(cè)的是一個(gè)這個(gè)生成器調(diào)度的一段主程序吨述。
這是一段設(shè)計(jì)十分精巧的邏輯岩睁,也許是我太
too young too native
,不得不說,我嘆服你的技巧
基于yield的coroutine
這種基于生成器協(xié)調(diào)工作的程序揣云,就是協(xié)程(coroutine)笙僚。源碼關(guān)于這個(gè)邏輯大概300行,我將其簡(jiǎn)化下灵再,去掉了一些日志肋层、命令行解析亿笤、爬蟲等等具體實(shí)現(xiàn)。
其實(shí)我就是想看看這個(gè)py3.4
版本的coroutine
是如何工作及應(yīng)用的栋猖,我保留了生產(chǎn)者和消費(fèi)者的模式净薛,線程在對(duì)象初始化的時(shí)候向Queue
寫入了5個(gè)task,之后這個(gè)協(xié)程在self.queue.join
被暫停蒲拉,等待queue
中的任務(wù)被消費(fèi)
from asyncio import Queue
import aiohttp
import asyncio
import pdb
import threading
import time
roots = [
'www.zhxfei.com',
'www.baidu.com',
'www.google.com',
'www.tencent.com',
'www.qq.com'
]
class Crawler:
def __init__(self,roots,max_task=5):
self.roots = roots
self.queue = Queue()
self.max_task = max_task
#self.session = aiohttp.ClientSession()
self.seen_urls = set()
for root in roots:
self.add_url(root)
def add_url(self,root):
if root in self.seen_urls:
return
self.queue.put_nowait(root)
@asyncio.coroutine
def crawler(self):
tasks = [asyncio.Task(self.work())
for _ in range(self.max_task)]
#pdb.set_trace()
yield from self.queue.join()
for w in tasks:
w.cancel() # w is the object of Task
@asyncio.coroutine
def work(self):
'''consume coroutine'''
try:
while True:
# when the queue has no links , the queue.get blocked
url = yield from self.queue.get()
print('{}'.format(threading.currentThread()))
yield from self.fetch(url)
self.queue.task_done()
except asyncio.CancelledError:
pass
@asyncio.coroutine
def fetch(self,url):
yield from asyncio.sleep(1)
def main():
t1 = time.time()
loop = asyncio.get_event_loop()
crawler = Crawler(roots)
#pdb.set_trace()
loop.run_until_complete(crawler.crawler())
loop.close()
print('COST:{}'.format(time.time() - t1))
if __name__ == '__main__':
main()
使用pdb
運(yùn)行調(diào)試肃拜,你會(huì)發(fā)現(xiàn)程序
zhxfei@zhxfei-HP-ENVY-15-Notebook-PC:~/code/python/500lines$ python3 asyncio_crawler.py
> /home/zhxfei/code/python/500lines/asyncio_crawler.py(59)main()
-> loop.run_until_complete(crawler.crawler())
(Pdb) p crawler.queue._unfinished_tasks
5
(Pdb) c
> /home/zhxfei/code/python/500lines/asyncio_crawler.py(35)crawler()
-> yield from self.queue.join()
(Pdb) c
之后1s就退出了程序
zhxfei@zhxfei-HP-ENVY-15-Notebook-PC:~/code/python/500lines$ python3 asyncio_crawler.py
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
COST:1.0039052963256836
可以看到,每個(gè)consume
程序都自己睡眠雌团,而一個(gè)線程就可以在多個(gè)代碼段跳轉(zhuǎn)運(yùn)行燃领,完全省去了多進(jìn)程/線程的創(chuàng)建、維護(hù)锦援、調(diào)度猛蔽、上下文切換等等開銷。最重要的是:當(dāng)多個(gè)線程去操作臨界量的時(shí)候灵寺,為了保證線程安全曼库,往往需要帶鎖操作,比如會(huì)出現(xiàn)臨界區(qū)變量不一致的情況發(fā)生略板,甚至對(duì)于面對(duì)超高并發(fā)的情況下毁枯,甚至需要做出對(duì)accept
加鎖這樣及其消耗性能的事情,而在coroutine
看來一切都很簡(jiǎn)單叮称。
讓我們?cè)倩氐缴厦娴某绦蛑致辏紤]這么個(gè)問題程序是如何運(yùn)行的呢?
首先程序什么時(shí)候開始瓤檐?更或者說蒂誉,上帝如何推動(dòng)這個(gè)小球呢?
loop = asyncio.get_event_loop()
crawler = Crawler(roots)
loop.run_until_complete(crawler.crawler())
首先距帅,我們創(chuàng)建了一個(gè)類似事件循環(huán)這樣的東西右锨,然后創(chuàng)建了crawler
這個(gè)實(shí)例并且將這個(gè)實(shí)例的主邏輯程序crawler
放到了run_until_complete
的這個(gè)方法,我們需要搞清楚這個(gè)event_loop
如何工作的
class EventLoop:
def run_until_complete(self, coro):
"""Run until the coroutine is done."""
task = Task(coro)
task.add_done_callback(stop_callback)
try:
self.run_forever()
except StopError:
pass
class StopError(BaseException):
"""Raised to stop the event loop."""
def stop_callback(future):
raise StopError
可以看到我們的主線程被Task
包裝了以下碌秸,并生成了一個(gè)task
,在初始化的時(shí)候绍移,同時(shí)創(chuàng)建了5個(gè)經(jīng)過Task包裝的消費(fèi)者進(jìn)程,即work
讥电,其也是一個(gè)協(xié)同程序,并且也用Task
包裝了一下蹂窖。這個(gè)task其實(shí)是future
的子類。
我們仔細(xì)看看這個(gè)Task是如何初始化協(xié)程的:
class Task(Future):
def step(self, future):
try:
next_future = self.coro.send(future.result)
except CancelledError:
self.cancelled = True
return
except StopIteration as exc:
# Task resolves itself with coro's return
# value.
self.set_result(exc.value)
return
next_future.add_done_callback(self.step)
可以看到恩敌,在step
方法中瞬测,我們向傳入的或者說包裝的coroutine
發(fā)送了一個(gè)值,讓其可以初始化。它和上面的future
還不一樣月趟,它是一個(gè)需要自我驅(qū)動(dòng)且自我記錄狀態(tài)的future
灯蝴。
直到這個(gè)corotine
即crawler()
運(yùn)行到對(duì)應(yīng)的協(xié)調(diào)狀態(tài)保存對(duì)應(yīng)的狀態(tài),也就是self.queue.join()
,其還是一個(gè)協(xié)程孝宗,并內(nèi)部創(chuàng)建了一個(gè)Future()
的實(shí)例穷躁,并將其返回。
class Queue:
def __init__(self):
self._join_future = Future()
self._unfinished_tasks = 0
# ... other initialization ...
def put_nowait(self, item):
self._unfinished_tasks += 1
# ... store the item ...
def task_done(self):
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._join_future.set_result(None)
@asyncio.coroutine
def join(self):
if self._unfinished_tasks > 0:
yield from self._join_future
且在這個(gè)future
實(shí)例也就是task
記錄的狀態(tài)中增加stop_callback
因妇,并等待self.queue.join()
運(yùn)行完成
之后循環(huán)會(huì)調(diào)度出其他的協(xié)調(diào)程序问潭,即work
,work
就一直運(yùn)行到self.queue.task_done()
,直到所有的協(xié)程都運(yùn)行完成并停在self.queue.get()
婚被,即隊(duì)列中self._unfinished_tasks
清零狡忙。
crawler
又被喚醒,之后依次調(diào)用task對(duì)象的cancel
址芯,向每個(gè)work
協(xié)程中拋進(jìn)去一個(gè)異常asyncio.CancelledError
灾茁,work
將其捕獲之后退出,之后crawler
退出是复,拋出StopIteration
給Task
删顶,Task
捕獲到并用set_results
消費(fèi)之前注冊(cè)的回調(diào)stop_callback
竖螃,拋出StopError
給事件循環(huán)淑廊,Eventloop
捕獲StopError
之后close
。這個(gè)程序就結(jié)束了
邏輯可能略顯混亂和復(fù)雜...
最后
其實(shí)我想說特咆,這是python3.5
之前版本實(shí)現(xiàn)的基于生成器的協(xié)程
在python3.5 季惩,只有async def
關(guān)鍵字定義的函數(shù)才能被叫做協(xié)程,所以腻格。画拾。。菜职。
我花了幾天青抛,學(xué)到了個(gè)假協(xié)程
>>> async def fn():
... await asyncio.sleep(1)
... print('hello world')
...
>>> f = fn()
>>> f
<coroutine object fn at 0x7f2af1d6dc50>
>>> inspect.iscoroutine(f)
True
>>> @asyncio.coroutine
... def fm():
... yield from asyncio.sleep(1)
... print('this is a coroutine')
...
>>> m = fm()
>>> m
<generator object fm at 0x7f2aeefa53b8>
>>> inspect.iscoroutine(m)
False
還是 too young too native
學(xué)無止境,慢慢來唄