500 line or less | crawler篇閱讀筆記


花了大概三天時(shí)間閱讀了這篇500 line or less|A Web Crawler With asyncio Coroutines

這應(yīng)該就是真正的深入淺出吧枕屉,不僅對(duì)python3.4 coroutine進(jìn)行了詳細(xì)的闡述被饿,而且源碼寫的很清晰(主要邏輯大概一共的是300-400行代碼坎炼,寫的很Pythonic

就放一下自己的學(xué)習(xí)記錄和整理概耻,方便以后查閱和理解封恰,感興趣的同學(xué)強(qiáng)烈建議去看原文葱弟。



全文都在圍繞一個(gè)coroutine來進(jìn)行講解,從并發(fā)和并行的問題绑警,講到使用事件循環(huán)和非阻塞實(shí)現(xiàn)的異步框架求泰,講到yield生成器,以及yield from 生成器代理這么一個(gè)東西计盒、以及如何asyncio的簡(jiǎn)單實(shí)現(xiàn)渴频,最后是基于生成器的coroutine程序,也就是爬蟲的實(shí)現(xiàn)北启。

重新理解yield

yield 常用于生成器卜朗,我們都知道當(dāng)解釋器看到y(tǒng)ield不會(huì)立刻執(zhí)行它拔第,一直等到將初始化,如給它send(None)场钉,然后它會(huì)運(yùn)行到y(tǒng)ield處暫停將蚊俺。

定義一個(gè)fib生成器:

>>> def gen_fn():
...     a,b = 1,1
...     yield a
...     n = 0
...     while n<100:
...         a,b = b,a+b
...         yield a
...         n += 1

python編譯后字節(jié)碼

dis.dis(gen_fn)
  2           0 LOAD_CONST               4 ((1, 1))
              3 UNPACK_SEQUENCE          2
              6 STORE_FAST               0 (a)
              9 STORE_FAST               1 (b)

  3          12 LOAD_FAST                0 (a)
             15 YIELD_VALUE
             16 POP_TOP

  4          17 LOAD_CONST               2 (0)
             20 STORE_FAST               2 (n)

  5          23 SETUP_LOOP              48 (to 74)
        >>   26 LOAD_FAST                2 (n)
             29 LOAD_CONST               3 (100)
             32 COMPARE_OP               0 (<)
             35 POP_JUMP_IF_FALSE       73

  6          38 LOAD_FAST                1 (b)
             41 LOAD_FAST                0 (a)
             44 LOAD_FAST                1 (b)
             47 BINARY_ADD
             48 ROT_TWO
             49 STORE_FAST               0 (a)
             52 STORE_FAST               1 (b)

  7          55 LOAD_FAST                0 (a)
             58 YIELD_VALUE
             59 POP_TOP

  8          60 LOAD_FAST                2 (n)
             63 LOAD_CONST               1 (1)
             66 INPLACE_ADD
             67 STORE_FAST               2 (n)
             70 JUMP_ABSOLUTE           26
        >>   73 POP_BLOCK
        >>   74 LOAD_CONST               0 (None)
             77 RETURN_VALUE
>>> len(fn.gi_code.co_code)
78

可以看到一共78行字節(jié)碼,注意第15行: 15 YIELD_VALUE

>>> fn.send(None)
1

可以看到逛万,當(dāng)給它發(fā)送一個(gè)None泳猬,程序就會(huì)開始運(yùn)行,此時(shí)程序解釋到yield便停下來了

>>> fn.gi_frame.f_lasti
15

這就是生成器的定位變量宇植,last position,這個(gè)記錄了生成器運(yùn)行停止的位置得封。當(dāng)下次運(yùn)行就可以繼續(xù)從這里運(yùn)行

>>> fn.gi_frame.f_locals
{'a': 1, 'b': 1}

這就是生成器的模擬棧的保存變量的數(shù)據(jù)結(jié)構(gòu)

>>> fn.send(None)
1
>>> fn.gi_frame.f_locals
{'n': 0, 'a': 1, 'b': 2}
>>> fn.send(None)
2
>>> fn.gi_frame.f_locals
{'n': 1, 'a': 2, 'b': 3}

所以,基于yield的生成器就有個(gè)懶加載的特性指郁。

需要注意的是生成器和傳統(tǒng)的函數(shù)不一樣呛每,它的切換是不是靠實(shí)打?qū)嵉臈肀4嫔舷挛那袚Q的相關(guān)信息的,而是靠在堆中創(chuàng)建的棧模擬對(duì)象來實(shí)現(xiàn)的坡氯,所以對(duì)于python程序而言晨横,利用yield就可以自己調(diào)度自己的程序。這就是一個(gè)coroutine的原型箫柳,之后再提

傳統(tǒng)的函數(shù)調(diào)用是這樣: Python解釋器是一個(gè)正常的C程序手形,所以它的堆棧幀是正常的堆棧幀,當(dāng)一個(gè)函數(shù)調(diào)用一個(gè)子函數(shù)悯恍,這個(gè)被調(diào)用函數(shù)獲得控制權(quán)库糠。直到它返回或者有異常發(fā)生,才把控制權(quán)交給調(diào)用者涮毫。

>>> import inspect
>>> frame = None
>>> def bar():
...     a = 1
...     foo(a)
... 
>>> def foo(a):
...     print(a)
...     global frame
...     frame = inspect.currentframe()
>>> bar()
1
>>> frame.f_code.co_name
'foo'
>>> frame.f_back
<frame object at 0x10254a8>
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'bar'

可以看出記錄關(guān)系

yield from

看看yield from 這也是一個(gè)很有意思的東西:具備yield from的函數(shù)我們也將其作為一個(gè)生成器瞬欧,它的特點(diǎn)是:內(nèi)部還有一個(gè)生成器。

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result2 of yield: {}'.format(result2))
...     return 'done'
... 
>>> def call_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return values of yield-from: {}'.format(rv))
>>> caller = call_fn()
>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti
15
>>> import dis
>>> dis.dis(caller)
  2           0 LOAD_GLOBAL              0 (gen_fn)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 STORE_FAST               0 (gen)

  3           9 LOAD_FAST                0 (gen)
             12 GET_YIELD_FROM_ITER
             13 LOAD_CONST               0 (None)
             16 YIELD_FROM
             17 STORE_FAST               1 (rv)

  4          20 LOAD_GLOBAL              1 (print)
             23 LOAD_CONST               1 ('return values of yield-from: {}')
             26 LOAD_ATTR                2 (format)
             29 LOAD_FAST                1 (rv)
             32 CALL_FUNCTION            1 (1 positional, 0 keyword pair)
             35 CALL_FUNCTION            1 (1 positional, 0 keyword pair)
             38 POP_TOP
             39 LOAD_CONST               0 (None)
             42 RETURN_VALUE

函數(shù)在執(zhí)行每個(gè)語句之前遞增其指令指針(PC)罢防,但是在外部生成器執(zhí)行yield from之后艘虎,它從其指令指針中減去1以保持自己固定在yield from語句。所以caller.gi_frame.f_lasti停在了15行

它的效果是咒吐,使用yield from的調(diào)用者和內(nèi)部的生成器之間建立了一個(gè)數(shù)據(jù)交流的管道野建。

比如caller再通過send(xxx)發(fā)送一個(gè)值,會(huì)直接發(fā)送到gen這個(gè)生成器中恬叹。生成器接著執(zhí)行任務(wù)候生,再執(zhí)行到yield語句再返回,這個(gè)數(shù)據(jù)會(huì)通過yield from傳遞到最外面的caller绽昼,線程切換到最外層邏輯執(zhí)行唯鸭,直到caller再次發(fā)送信息,以此循環(huán)硅确。直到內(nèi)部發(fā)生器拋出StopIteration目溉。期間唠粥,caller.gi_frame.f_lasti一直停在了15行

需要注意的是這個(gè)StopIteration可以在caller的邏輯中catch,也可以在內(nèi)部yield from處進(jìn)行捕獲停做。如果你想讓yield from下的程序繼續(xù)執(zhí)行的話就需要在yield from捕獲

異步io和事件循環(huán)

在看異步io之前先看段同步的代碼:

#!/usr/bin/python
# coding: utf-8
import socket

def fetch(link):
    sock = socket.socket()
    sock.connect(('www.zhxfei.com',80))

    request = 'GET {} HTTP/1.0\r\nHOST: zhxfei.com\r\n\r\n'.format(link)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    print(response)

fetch('/')

這是一個(gè)簡(jiǎn)單的同步阻塞的套接字客戶端程序,調(diào)用sock.connect(())是阻塞的大莫,其在client端發(fā)送第三次握手報(bào)文之后返回蛉腌,即在TCP前兩次握手,此程序不能繼續(xù)往下執(zhí)行只厘,等待connect返回烙丛。

而一個(gè)非阻塞的程序是這樣的:

import socket

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('zhxfei.com', 80))
except BlockingIOError:
    pass

request = 'GET /353/ HTTP/1.0\r\nHost: zhxfei.com\r\n\r\n'
encoded = request.encode('ascii')

... so on

當(dāng)sock.connect發(fā)起調(diào)用便直接返回了,去準(zhǔn)備待會(huì)要發(fā)送的request,非阻塞經(jīng)常和事件驅(qū)動(dòng)連用羔味,在linux上就是epoll了河咽。

epoll 使用一個(gè)文件描述符管理多個(gè)描述符,將用戶關(guān)系的文件描述符的事件存放到內(nèi)核的一個(gè)事件表中赋元,這樣在用戶空間和內(nèi)核空間的copy只需一次忘蟹。epoll支持水平觸發(fā)和邊緣觸發(fā),最大的特點(diǎn)在于「邊緣觸發(fā)」搁凸,它只告訴進(jìn)程哪些剛剛變?yōu)榫途w態(tài)媚值,并且只會(huì)通知一次。
epoll技術(shù)的編程模型就是異步非阻塞回調(diào)护糖,也可以叫做Reactor褥芒、事件驅(qū)動(dòng)、事件輪循(EventLoop)嫡良、libevent锰扶、Tornado、Node.js這些就是epoll時(shí)代的產(chǎn)物寝受。

這里提到了事件循環(huán):事件循環(huán)就“是一種等待程序分配事件或消息的編程架構(gòu)”坷牛。基本上來說事件循環(huán)就是很澄,“A發(fā)起調(diào)用前漓帅,注冊(cè)時(shí)間循環(huán),當(dāng)A發(fā)起調(diào)用痴怨,直接返回到事件循環(huán)忙干,當(dāng)循環(huán)監(jiān)聽到A注冊(cè)的具體事件,執(zhí)行B”浪藻。引用網(wǎng)上的一段翻譯文檔:

最簡(jiǎn)單的例子來解釋這一概念就是用每個(gè)瀏覽器中都存在的JavaScript事件循環(huán)捐迫。當(dāng)你點(diǎn)擊了某個(gè)東西(“當(dāng)A發(fā)生時(shí)”),這一點(diǎn)擊動(dòng)作會(huì)發(fā)送給JavaScript的事件循環(huán)爱葵,并檢查是否存在注冊(cè)過的 onclick 回調(diào)來處理這一點(diǎn)擊(“執(zhí)行B”)施戴。只要有注冊(cè)過的回調(diào)函數(shù)就會(huì)伴隨點(diǎn)擊動(dòng)作的細(xì)節(jié)信息被執(zhí)行反浓。事件循環(huán)被認(rèn)為是一種循環(huán)是因?yàn)樗煌5厥占录⑼ㄟ^循環(huán)來發(fā)如何應(yīng)對(duì)這些事件。事件循環(huán)也經(jīng)常用于在別的線程或子進(jìn)程中執(zhí)行代碼赞哗,并將事件循環(huán)作為調(diào)節(jié)機(jī)制(例如雷则,合作式多任務(wù))。如果你恰好理解 Python 的 GIL肪笋,事件循環(huán)對(duì)于需要釋放 GIL 的地方很有用

舉一個(gè)例子:

#!/usr/bin/env python3
# coding:utf-8
import threading
import socket
import time
from selectors import DefaultSelector,\
                        EVENT_WRITE,\
                            EVENT_READ


host = 'baidu.com'
selector = DefaultSelector()

class Fetcher:
    def __init__(self,url):
        self.url = url
        self.res = b''
        self.sock = None

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        print('Thread {} : socket init'.format(threading.currentThread()))
        try:
            self.sock.connect((host,80))
        except BlockingIOError:
            pass

        selector.register(self.sock.fileno(),
                            EVENT_WRITE,
                            self.connneted)

    def connneted(self,key,mask):
        selector.unregister(key.fd)
        print('connected....')
        print('Thread {} : socket connected'.format(
                        threading.currentThread()))

        request = 'GET {} HTTP/1.1\r\nHOST: {}\r\n\r\n'.format(
                        self.url,host).encode('utf-8')

        self.sock.send(request)
        selector.register(key.fd,
                            EVENT_READ,
                            self.read_res)

    def read_res(self,key,mask):
        print('Thread {} : socket readable'.format(
                        threading.currentThread()))

        chunk = self.sock.recv(4096)
        if chunk:
            self.res += chunk
        else:
            selector.unregister(key.fd)
            '''
            a crawler may have some code just like:

                global stopped #全局停止標(biāo)志位
                links = self.parse_link(self.response)  #解析出link
                to_do_link.add(links)   #將link添加到to do list
                seen_link.add(self.url) #將self.url添加到已經(jīng)download過的集合中
                to_do_link.remove(links)    #將self.url在to do list集合中刪除
                if not to_do_link:
                    stopped == True
            '''

def loop():
    fetcher = Fetcher('/')
    fetcher.fetch()
    while True:
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback(event_key,event_mask)
            print('callback time:{}'.format(time.time()))
        print(fetcher.res)
loop()
zhxfei@zhxfei-HP-ENVY-15-Notebook-PC:~/code/python/500lines$ sudo python3 non_blocking_test.py
Thread <_MainThread(MainThread, started 139838592571136)> : socket init
connected....
Thread <_MainThread(MainThread, started 139838592571136)> : socket connected
callback time:1489293273.1518176
b''
Thread <_MainThread(MainThread, started 139838592571136)> : socket readable
callback time:1489293273.182348
b'HTTP/1.1 200 OK\r\nDate: Sun, 12 Mar 2017 04:34:33 GMT\r\nServer: Apache\r\nLast-Modified: Tue, 12 Jan 2010 13:48:00 GMT\r\nETag: "51-47cf7e6ee8400"\r\nAccept-Ranges: bytes\r\nContent-Length: 81\r\nCache-Control: max-age=86400\r\nExpires: Mon, 13 Mar 2017 04:34:33 GMT\r\nConnection: Keep-Alive\r\nContent-Type: text/html\r\n\r\n<html>\n<meta http-equiv="refresh" content="0;url=http://www.baidu.com/">\n</html>\n'

這個(gè)demo執(zhí)行邏輯就是:發(fā)起調(diào)用前在事件循環(huán)上注冊(cè)信息和回調(diào)函數(shù)月劈,當(dāng)非阻塞的調(diào)用返回之后,回到事件循環(huán)藤乙,由事件循環(huán)監(jiān)聽對(duì)應(yīng)的注冊(cè)的信息猜揪,如套接字可讀,可寫等事件發(fā)生坛梁,發(fā)起調(diào)用而姐。

在這里connect注冊(cè)了可寫,發(fā)起connect非阻塞調(diào)用划咐,之后回到事件循環(huán)拴念,監(jiān)聽事件發(fā)生,socket可寫之后褐缠,調(diào)用callback函數(shù)connected丈莺,在回調(diào)函數(shù)中,發(fā)送http請(qǐng)求送丰,并在此注冊(cè)可讀事件缔俄,以及讀事件發(fā)生的回調(diào)函數(shù),之后再次回到事件循環(huán)器躏,事件循環(huán)監(jiān)聽事件讀發(fā)生俐载,再次調(diào)用read_res,就這樣登失,我們的邏輯將靠著注冊(cè)->event loop->callback回調(diào)運(yùn)行下去遏佣。需要注意的是,這里進(jìn)行操作的是一個(gè)線程揽浙。

這個(gè)程序使用了一個(gè)selector的模塊状婶,這個(gè)模塊將epollkqueue等等系統(tǒng)級(jí)異步IO接口抽象成Selector類型馅巷,規(guī)定了統(tǒng)一的對(duì)外接口膛虫,于是程序只管使用selector的接口就行了。一般使用selectors.DefaultSelector就好了钓猬,它是這個(gè)模塊根據(jù)你的系統(tǒng)自動(dòng)幫你選擇的最合適的Selector稍刀。

我們可以知道當(dāng)?shù)却齀/O操作時(shí),一個(gè)函數(shù)必須明確的保存它的狀態(tài),因?yàn)樗鼤?huì)在I/O操作完成之前返回并清除棧幀账月。一個(gè)基于回調(diào)的異步框架综膀,語言特性不能幫助我們保存程序的狀態(tài)。

為了在我們基于回調(diào)的例子中代替局部變量局齿,我們把sockresponse作為Fetcher實(shí)例self屬性剧劝。為了代替指令指針,通過注冊(cè)connnectedread_response回調(diào)來保存continuation抓歼。

這種完全依賴于callback的程序有很多問題讥此,如

  • 意大利面條式的程序
  • 因?yàn)椤眘tack ripping”問題而非常難于調(diào)試

所以大神們想出了使用yield生成器進(jìn)行回調(diào),上面提到過锭部,生成器可以保存相關(guān)的信息,在適當(dāng)?shù)臅r(shí)候再切換回生成器繼續(xù)執(zhí)行面褐,這樣看起來拌禾,這天生就是一個(gè)callback

但是需要注意的是展哭,既然我們用生成器湃窍,就要考慮幾個(gè)問題。

  • 如何優(yōu)雅的初始化和退出生成器
  • 如何控制生成器讓其進(jìn)行回調(diào)
  • 每次切換回生成器如何恢復(fù)其狀態(tài)

如下面這段代碼:

#!/usr/bin/env python
# coding:utf-8
import socket
from selectors import DefaultSelector,\
                        EVENT_READ,\
                        EVENT_WRITE

selector = DefaultSelector()
class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)


class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)



def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())


def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))  # Read 4k at a time.

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield from f
    selector.unregister(sock.fileno())
    return chunk


def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)


class Fetcher:
    def __init__(self, url):
        self.response = b''
        self.url = url

    def fetch(self):

        sock = socket.socket()
        yield from connect(sock, ('zhxfei.com', 80))
        get = 'GET {} HTTP/1.0\r\nHost: zhxfei.com\r\n\r\n'.format(self.url)
        sock.send(get.encode('ascii'))
        self.response = yield from read_all(sock)
        '''
        the program is not end
        
        '''

def loop():
    fetcher = Fetcher('/')
    Task(fetcher.fetch())
    while True:
        events = selector.select()
        for event_key,event_mask in events:
            callback = event_key.data
            callback(event_key,event_mask)

loop()

終于到這段程序了匪傍,這段程序困擾了我一個(gè)晚上(僅供分析)您市,上面的程序是簡(jiǎn)化版本。放兩張圖感受下:


OK役衡,上面我們可以看見:

它用了一個(gè)Task來控制生成器的初始化和結(jié)束茵休,并且控制生成器的回調(diào),它是一個(gè)生成器最頂層的caller手蝎。在它控制的子程序中榕莺,還嵌套yield from,在子子程序中,也許還嵌套著yield from棵介,到處使用這種yield from的好處是可以解耦這個(gè)協(xié)同工作的程序的實(shí)現(xiàn)和調(diào)用钉鸯,符合了開閉原則。


它用一個(gè)future表示了生成器在切換的時(shí)候希望保存的一些信息如results以及什么時(shí)候進(jìn)行回調(diào)邮辽。我們?cè)谑录h(huán)上進(jìn)行了回調(diào)函數(shù)的操作唠雕,這個(gè)回調(diào)函數(shù)是希望生成器繼續(xù)運(yùn)行的保障,它實(shí)際上注冊(cè)的是一個(gè)這個(gè)生成器調(diào)度的一段主程序吨述。

這是一段設(shè)計(jì)十分精巧的邏輯岩睁,也許是我太too young too native,不得不說,我嘆服你的技巧

基于yield的coroutine

這種基于生成器協(xié)調(diào)工作的程序揣云,就是協(xié)程(coroutine)笙僚。源碼關(guān)于這個(gè)邏輯大概300行,我將其簡(jiǎn)化下灵再,去掉了一些日志肋层、命令行解析亿笤、爬蟲等等具體實(shí)現(xiàn)。

其實(shí)我就是想看看這個(gè)py3.4版本的coroutine是如何工作及應(yīng)用的栋猖,我保留了生產(chǎn)者和消費(fèi)者的模式净薛,線程在對(duì)象初始化的時(shí)候向Queue寫入了5個(gè)task,之后這個(gè)協(xié)程在self.queue.join被暫停蒲拉,等待queue中的任務(wù)被消費(fèi)

from asyncio import Queue
import aiohttp
import asyncio
import pdb
import threading
import time

roots = [
    'www.zhxfei.com',
    'www.baidu.com',
    'www.google.com',
    'www.tencent.com',
    'www.qq.com'
    ]


class Crawler:
    def __init__(self,roots,max_task=5):
        self.roots = roots
        self.queue = Queue()
        self.max_task = max_task
        #self.session = aiohttp.ClientSession()
        self.seen_urls = set()
        for root in roots:
            self.add_url(root)

    def add_url(self,root):
        if root in self.seen_urls:
            return
        self.queue.put_nowait(root)

    @asyncio.coroutine
    def crawler(self):
        tasks = [asyncio.Task(self.work())
                        for _ in range(self.max_task)]
        #pdb.set_trace()
        yield from self.queue.join()
        for w in tasks:
            w.cancel()  # w is the object of Task

    @asyncio.coroutine
    def work(self):
        '''consume coroutine'''
        try:
            while True:
            # when the queue has no links , the queue.get blocked
                url = yield from self.queue.get()
                print('{}'.format(threading.currentThread()))
                yield from self.fetch(url)
                self.queue.task_done()
        except asyncio.CancelledError:
            pass

    @asyncio.coroutine
    def fetch(self,url):
        yield from asyncio.sleep(1)

def main():
    t1 = time.time()
    loop = asyncio.get_event_loop()
    crawler = Crawler(roots)
    #pdb.set_trace()
    loop.run_until_complete(crawler.crawler())
    loop.close()
    print('COST:{}'.format(time.time() - t1))

if __name__ == '__main__':
    main()

使用pdb運(yùn)行調(diào)試肃拜,你會(huì)發(fā)現(xiàn)程序

zhxfei@zhxfei-HP-ENVY-15-Notebook-PC:~/code/python/500lines$ python3 asyncio_crawler.py 
> /home/zhxfei/code/python/500lines/asyncio_crawler.py(59)main()
-> loop.run_until_complete(crawler.crawler())
(Pdb) p crawler.queue._unfinished_tasks
5
(Pdb) c
> /home/zhxfei/code/python/500lines/asyncio_crawler.py(35)crawler()
-> yield from self.queue.join()
(Pdb) c

之后1s就退出了程序

zhxfei@zhxfei-HP-ENVY-15-Notebook-PC:~/code/python/500lines$ python3 asyncio_crawler.py 
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
<_MainThread(MainThread, started 140192578610944)>
COST:1.0039052963256836

可以看到,每個(gè)consume程序都自己睡眠雌团,而一個(gè)線程就可以在多個(gè)代碼段跳轉(zhuǎn)運(yùn)行燃领,完全省去了多進(jìn)程/線程的創(chuàng)建、維護(hù)锦援、調(diào)度猛蔽、上下文切換等等開銷。最重要的是:當(dāng)多個(gè)線程去操作臨界量的時(shí)候灵寺,為了保證線程安全曼库,往往需要帶鎖操作,比如會(huì)出現(xiàn)臨界區(qū)變量不一致的情況發(fā)生略板,甚至對(duì)于面對(duì)超高并發(fā)的情況下毁枯,甚至需要做出對(duì)accept加鎖這樣及其消耗性能的事情,而在coroutine看來一切都很簡(jiǎn)單叮称。

讓我們?cè)倩氐缴厦娴某绦蛑致辏紤]這么個(gè)問題程序是如何運(yùn)行的呢?

首先程序什么時(shí)候開始瓤檐?更或者說蒂誉,上帝如何推動(dòng)這個(gè)小球呢?

loop = asyncio.get_event_loop()
crawler = Crawler(roots)
loop.run_until_complete(crawler.crawler())

首先距帅,我們創(chuàng)建了一個(gè)類似事件循環(huán)這樣的東西右锨,然后創(chuàng)建了crawler這個(gè)實(shí)例并且將這個(gè)實(shí)例的主邏輯程序crawler放到了run_until_complete的這個(gè)方法,我們需要搞清楚這個(gè)event_loop如何工作的

class EventLoop:
    def run_until_complete(self, coro):
        """Run until the coroutine is done."""
        task = Task(coro)
        task.add_done_callback(stop_callback)
        try:
            self.run_forever()
        except StopError:
            pass

class StopError(BaseException):
    """Raised to stop the event loop."""

def stop_callback(future):
    raise StopError

可以看到我們的主線程被Task包裝了以下碌秸,并生成了一個(gè)task,在初始化的時(shí)候绍移,同時(shí)創(chuàng)建了5個(gè)經(jīng)過Task包裝的消費(fèi)者進(jìn)程,即work讥电,其也是一個(gè)協(xié)同程序,并且也用Task包裝了一下蹂窖。這個(gè)task其實(shí)是future的子類。
我們仔細(xì)看看這個(gè)Task是如何初始化協(xié)程的:

class Task(Future):

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except CancelledError:
            self.cancelled = True
            return
        except StopIteration as exc:

            # Task resolves itself with coro's return
            # value.
            self.set_result(exc.value)
            return

        next_future.add_done_callback(self.step)

可以看到恩敌,在step方法中瞬测,我們向傳入的或者說包裝的coroutine發(fā)送了一個(gè)值,讓其可以初始化。它和上面的future還不一樣月趟,它是一個(gè)需要自我驅(qū)動(dòng)且自我記錄狀態(tài)的future灯蝴。

直到這個(gè)corotinecrawler()運(yùn)行到對(duì)應(yīng)的協(xié)調(diào)狀態(tài)保存對(duì)應(yīng)的狀態(tài),也就是self.queue.join(),其還是一個(gè)協(xié)程孝宗,并內(nèi)部創(chuàng)建了一個(gè)Future()的實(shí)例穷躁,并將其返回。

class Queue:
    def __init__(self):
        self._join_future = Future()
        self._unfinished_tasks = 0
        # ... other initialization ...

    def put_nowait(self, item):
        self._unfinished_tasks += 1
        # ... store the item ...

    def task_done(self):
        self._unfinished_tasks -= 1
        if self._unfinished_tasks == 0:
            self._join_future.set_result(None)

    @asyncio.coroutine
    def join(self):
        if self._unfinished_tasks > 0:
            yield from self._join_future

且在這個(gè)future實(shí)例也就是task記錄的狀態(tài)中增加stop_callback因妇,并等待self.queue.join()運(yùn)行完成

之后循環(huán)會(huì)調(diào)度出其他的協(xié)調(diào)程序问潭,即work,work就一直運(yùn)行到self.queue.task_done(),直到所有的協(xié)程都運(yùn)行完成并停在self.queue.get()婚被,即隊(duì)列中self._unfinished_tasks清零狡忙。

crawler又被喚醒,之后依次調(diào)用task對(duì)象的cancel址芯,向每個(gè)work協(xié)程中拋進(jìn)去一個(gè)異常asyncio.CancelledError灾茁,work將其捕獲之后退出,之后crawler退出是复,拋出StopIterationTask删顶,Task捕獲到并用set_results消費(fèi)之前注冊(cè)的回調(diào)stop_callback竖螃,拋出StopError給事件循環(huán)淑廊,Eventloop捕獲StopError之后close。這個(gè)程序就結(jié)束了

邏輯可能略顯混亂和復(fù)雜...

最后

其實(shí)我想說特咆,這是python3.5之前版本實(shí)現(xiàn)的基于生成器的協(xié)程

在python3.5 季惩,只有async def關(guān)鍵字定義的函數(shù)才能被叫做協(xié)程,所以腻格。画拾。。菜职。

我花了幾天青抛,學(xué)到了個(gè)假協(xié)程

>>> async def fn():
...     await asyncio.sleep(1)
...     print('hello world')
... 
>>> f = fn()
>>> f
<coroutine object fn at 0x7f2af1d6dc50>
>>> inspect.iscoroutine(f)
True
>>> @asyncio.coroutine
... def fm():
...     yield from asyncio.sleep(1)
...     print('this is a coroutine')
... 
>>> m = fm()
>>> m
<generator object fm at 0x7f2aeefa53b8>
>>> inspect.iscoroutine(m)
False

還是 too young too native
學(xué)無止境,慢慢來唄

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末酬核,一起剝皮案震驚了整個(gè)濱河市蜜另,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嫡意,老刑警劉巖举瑰,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蔬螟,居然都是意外死亡此迅,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來耸序,“玉大人忍些,你說我怎么就攤上這事∮恿撸” “怎么了坐昙?”我有些...
    開封第一講書人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)芋忿。 經(jīng)常有香客問我炸客,道長(zhǎng),這世上最難降的妖魔是什么戈钢? 我笑而不...
    開封第一講書人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任痹仙,我火速辦了婚禮,結(jié)果婚禮上殉了,老公的妹妹穿的比我還像新娘开仰。我一直安慰自己,他們只是感情好薪铜,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開白布众弓。 她就那樣靜靜地躺著,像睡著了一般隔箍。 火紅的嫁衣襯著肌膚如雪谓娃。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評(píng)論 1 290
  • 那天蜒滩,我揣著相機(jī)與錄音滨达,去河邊找鬼。 笑死俯艰,一個(gè)胖子當(dāng)著我的面吹牛捡遍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播竹握,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼画株,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了啦辐?” 一聲冷哼從身側(cè)響起谓传,我...
    開封第一講書人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎昧甘,沒想到半個(gè)月后良拼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡充边,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年庸推,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了常侦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贬媒,死狀恐怖聋亡,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情际乘,我是刑警寧澤坡倔,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站脖含,受9級(jí)特大地震影響罪塔,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜养葵,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一征堪、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧关拒,春花似錦佃蚜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至归露,卻和暖如春洲脂,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背靶擦。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工腮考, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓答倡,卻偏偏與公主長(zhǎng)得像集惋,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子户敬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容