Twisted源碼分析系列01-reactor

簡介

Twisted是用Python實現(xiàn)的事件驅(qū)動的網(wǎng)絡(luò)框架江场。

如果想看教程的話,我覺得寫得最好的就是Twisted Introduction了艾扮,這是翻譯掉盅。

下面就直接進入主題了。

我們通過一個示例開始分析源碼赃阀,那么先看下面這個示例霎肯。

#!/usr/bin/env python
# coding=utf8

from twisted.internet.protocol import Protocol, ServerFactory


HOST = '127.0.0.1'
PORT = 8080


class EchoProtocol(Protocol):

    def dataReceived(self, data):
        self.transport.write(data)


if __name__ == '__main__':
    factory = ServerFactory()
    factory.protocol = EchoProtocol

    from twisted.internet import reactor
    reactor.listenTCP(PORT, factory, interface=HOST)
    reactor.run()

這是一個非常簡單的Echo server擎颖,每當有數(shù)據(jù)發(fā)來,都會將數(shù)據(jù)往回發(fā)观游。

reactor

reactor是事件管理器搂捧,用于注冊、注銷事件懂缕,運行事件循環(huán)允跑,當事件發(fā)生時調(diào)用回調(diào)函數(shù)處理。關(guān)于reactor有下面幾個結(jié)論:

  1. Twisted的reactor只有通過調(diào)用reactor.run()來啟動搪柑。
  2. reactor循環(huán)是在其開始的進程中運行聋丝,也就是運行在主進程中。
  3. 一旦啟動工碾,就會一直運行下去弱睦。reactor就會在程序的控制下(或者具體在一個啟動它的線程的控制下)。
  4. reactor循環(huán)并不會消耗任何CPU的資源渊额。
  5. 并不需要顯式的創(chuàng)建reactor况木,只需要引入就OK了。

最后一條需要解釋清楚旬迹。在Twisted中火惊,reactor是Singleton(也就是單例模式),即在一個程序中只能有一個reactor舱权,并且只要你引入它就相應(yīng)地創(chuàng)建一個矗晃。上面引入的方式這是twisted默認使用的方法仑嗅,當然了宴倍,twisted還有其它可以引入reactor的方法。例如仓技,可以使用twisted.internet.pollreactor中的系統(tǒng)調(diào)用來poll來代替select方法鸵贬。
若使用其它的reactor,需要在引入twisted.internet.reactor前安裝它脖捻。下面是安裝pollreactor的方法:

from twisted.internet import pollreactor
pollreactor.install()

如果你沒有安裝其它特殊的reactor而引入了twisted.internet.reactor阔逼,那么Twisted會根據(jù)操作系統(tǒng)安裝默認的reactor。正因為如此地沮,習(xí)慣性做法不要在最頂層的模塊內(nèi)引入reactor以避免安裝默認reactor嗜浮,而是在你要使用reactor的區(qū)域內(nèi)安裝。
下面是使用 pollreactor重寫上上面的程序:

from twited.internet import pollreactor
pollreactor.install()
from twisted.internet import reactor
reactor.run()

那么reactor是如何實現(xiàn)單例的摩疑?來看一下from twisted.internet import reactor做了哪些事情就并明白了危融。

下面是twisted/internet/reactor.py的部分代碼:

# twisted/internet/reactor.py
import sys
del sys.modules['twisted.internet.reactor']
from twisted.internet import default
default.install()

注:Python中所有加載到內(nèi)存的模塊都放在sys.modules,它是一個全局字典雷袋。當import一個模塊時首先會在這個列表中查找是否已經(jīng)加載了此模塊吉殃,如果加載了則只是將模塊的名字加入到正在調(diào)用import的模塊的命名空間中。如果沒有加載則從sys.path目錄中按照模塊名稱查找模塊文件,找到后將模塊載入內(nèi)存蛋勺,并加入到sys.modules中瓦灶,并將名稱導(dǎo)入到當前的命名空間中。

假如我們是第一次運行from twisted.internet import reactor抱完,因為sys.modules中還沒有twisted.internet.reactor贼陶,所以會運行reactory.py中的代碼,安裝默認的reactor巧娱。之后每界,如果導(dǎo)入的話,因為sys.modules中已存在該模塊家卖,所以會直接將sys.modules中的twisted.internet.reactor導(dǎo)入到當前命名空間眨层。

default中的install:

# twisted/internet/default.py
def _getInstallFunction(platform):
    """
    Return a function to install the reactor most suited for the given platform.

    @param platform: The platform for which to select a reactor.
    @type platform: L{twisted.python.runtime.Platform}

    @return: A zero-argument callable which will install the selected
        reactor.
    """
    try:
        if platform.isLinux():
            try:
                from twisted.internet.epollreactor import install
            except ImportError:
                from twisted.internet.pollreactor import install
        elif platform.getType() == 'posix' and not platform.isMacOSX():
            from twisted.internet.pollreactor import install
        else:
            from twisted.internet.selectreactor import install
    except ImportError:
        from twisted.internet.selectreactor import install
    return install


install = _getInstallFunction(platform)

很明顯,default中會根據(jù)平臺獲取相應(yīng)的install上荡。Linux下會首先使用epollreactor趴樱,如果內(nèi)核還不支持,就只能使用pollreactor酪捡。Mac平臺使用pollreactor叁征,windows使用selectreactor。每種install的實現(xiàn)差不多逛薇,這里我們抽取selectreactor中的install來看看捺疼。

# twisted/internet/selectreactor.py:
def install():
    """Configure the twisted mainloop to be run using the select() reactor.
    """
    # 單例
    reactor = SelectReactor()
    from twisted.internet.main import installReactor
    installReactor(reactor)

# twisted/internet/main.py:
def installReactor(reactor):
    """
    Install reactor C{reactor}.

    @param reactor: An object that provides one or more IReactor* interfaces.
    """
    # this stuff should be common to all reactors.
    import twisted.internet
    import sys
    if 'twisted.internet.reactor' in sys.modules:
        raise error.ReactorAlreadyInstalledError("reactor already installed")
    twisted.internet.reactor = reactor
    sys.modules['twisted.internet.reactor'] = reactor

在installReactor中,向sys.modules添加twisted.internet.reactor鍵永罚,值就是再install中創(chuàng)建的單例reactor啤呼。以后要使用reactor,就會導(dǎo)入這個單例了呢袱。

SelectReactor

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase)

implementer表示SelectReactor實現(xiàn)了IReactorFDSet接口的方法官扣,這里用到了zope.interface,它是python中的接口實現(xiàn)羞福,有興趣的同學(xué)可以去看下惕蹄。

IReactorFDSet接口主要對描述符的獲取、添加治专、刪除等操作的方法卖陵。這些方法看名字就能知道意思,所以我就沒有加注釋张峰。

# twisted/internet/interfaces.py
class IReactorFDSet(Interface):

    def addReader(reader):

    def addWriter(writer):

    def removeReader(reader):

    def removeWriter(writer):

    def removeAll():

    def getReaders():

    def getWriters():

reactor.listenTCP()

示例中的reactor.listenTCP()注冊了一個監(jiān)聽事件泪蔫,它是父類PosixReactorBase中方法。

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                       ReactorBase):

    def listenTCP(self, port, factory, backlog=50, interface=''):
        p = tcp.Port(port, factory, backlog, interface, self)
        p.startListening()
        return p

# twisted/internet/tcp.py
@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):
    def __init__(self, port, factory, backlog=50, interface='', reactor=None):
       """Initialize with a numeric port to listen on.
       """
       base.BasePort.__init__(self, reactor=reactor)
       self.port = port
       self.factory = factory
       self.backlog = backlog
       if abstract.isIPv6Address(interface):
           self.addressFamily = socket.AF_INET6
           self._addressType = address.IPv6Address
       self.interface = interface
    ...

    def startListening(self):
       """Create and bind my socket, and begin listening on it.
          創(chuàng)建并綁定套接字挟炬,開始監(jiān)聽鸥滨。

       This is called on unserialization, and must be called after creating a
       server to begin listening on the specified port.
       """
       if self._preexistingSocket is None:
           # Create a new socket and make it listen
           try:
               # 創(chuàng)建套接字
               skt = self.createInternetSocket()
               if self.addressFamily == socket.AF_INET6:
                   addr = _resolveIPv6(self.interface, self.port)
               else:
                   addr = (self.interface, self.port)
               # 綁定
               skt.bind(addr)
           except socket.error as le:
               raise CannotListenError(self.interface, self.port, le)
           # 監(jiān)聽
           skt.listen(self.backlog)
       else:
           # Re-use the externally specified socket
           skt = self._preexistingSocket
           self._preexistingSocket = None
           # Avoid shutting it down at the end.
           self._shouldShutdown = False

       # Make sure that if we listened on port 0, we update that to
       # reflect what the OS actually assigned us.
       self._realPortNumber = skt.getsockname()[1]

       log.msg("%s starting on %s" % (
               self._getLogPrefix(self.factory), self._realPortNumber))

       # The order of the next 5 lines is kind of bizarre.  If no one
       # can explain it, perhaps we should re-arrange them.
       self.factory.doStart()
       self.connected = True
       self.socket = skt
       self.fileno = self.socket.fileno
       self.numberAccepts = 100

       # startReading調(diào)用reactor的addReader方法將Port加入讀集合
       self.startReading()

整個邏輯很簡單嗦哆,和正常的server端一樣,創(chuàng)建套接字婿滓、綁定老速、監(jiān)聽。不同的是將套接字的描述符添加到了reactor的讀集合凸主。那么假如有了client連接過來的話橘券,reactor會監(jiān)控到,然后觸發(fā)事件處理程序卿吐。

reacotr.run()事件主循環(huán)

# twisted/internet/posixbase.py
@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                       ReactorBase)

# twisted/internet/base.py
class _SignalReactorMixin(object):

    def startRunning(self, installSignalHandlers=True):
        """
        PosixReactorBase的父類_SignalReactorMixin和ReactorBase都有該函數(shù)旁舰,但是
        _SignalReactorMixin在前,安裝mro順序的話嗡官,會先調(diào)用_SignalReactorMixin中的箭窜。
        """
        self._installSignalHandlers = installSignalHandlers
        ReactorBase.startRunning(self)

    def run(self, installSignalHandlers=True):
        self.startRunning(installSignalHandlers=installSignalHandlers)
        self.mainLoop()

    def mainLoop(self):
        while self._started:
            try:
                while self._started:
                    # Advance simulation time in delayed event
                    # processors.
                    self.runUntilCurrent()
                    t2 = self.timeout()
                    t = self.running and t2
                    # doIteration是關(guān)鍵,select,poll,epool實現(xiàn)各有不同
                    self.doIteration(t)
            except:
                log.msg("Unexpected error in main loop.")
                log.err()
            else:
                log.msg('Main loop terminated.')

mianLoop就是最終的主循環(huán)了衍腥,在循環(huán)中磺樱,調(diào)用doIteration方法監(jiān)控讀寫描述符的集合,一旦發(fā)現(xiàn)有描述符準備好讀寫婆咸,就會調(diào)用相應(yīng)的事件處理程序竹捉。

# twisted/internet/selectreactor.py
@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):

    def __init__(self):
        """
        Initialize file descriptor tracking dictionaries and the base class.
        """
        self._reads = set()
        self._writes = set()
        posixbase.PosixReactorBase.__init__(self)

    def doSelect(self, timeout):
        """
        Run one iteration of the I/O monitor loop.

        This will run all selectables who had input or output readiness
        waiting for them.
        """
        try:
            # 調(diào)用select方法監(jiān)控讀寫集合,返回準備好讀寫的描述符
            r, w, ignored = _select(self._reads,
                                    self._writes,
                                    [], timeout)
        except ValueError:
            # Possibly a file descriptor has gone negative?
            self._preenDescriptors()
            return
        except TypeError:
            # Something *totally* invalid (object w/o fileno, non-integral
            # result) was passed
            log.err()
            self._preenDescriptors()
            return
        except (select.error, socket.error, IOError) as se:
            # select(2) encountered an error, perhaps while calling the fileno()
            # method of a socket.  (Python 2.6 socket.error is an IOError
            # subclass, but on Python 2.5 and earlier it is not.)
            if se.args[0] in (0, 2):
                # windows does this if it got an empty list
                if (not self._reads) and (not self._writes):
                    return
                else:
                    raise
            elif se.args[0] == EINTR:
                return
            elif se.args[0] == EBADF:
                self._preenDescriptors()
                return
            else:
                # OK, I really don't know what's going on.  Blow up.
                raise

        _drdw = self._doReadOrWrite
        _logrun = log.callWithLogger
        for selectables, method, fdset in ((r, "doRead", self._reads),
                                           (w,"doWrite", self._writes)):
            for selectable in selectables:
                # if this was disconnected in another thread, kill it.
                # ^^^^ --- what the !@#*?  serious!  -exarkun
                if selectable not in fdset:
                    continue
                # This for pausing input when we're not ready for more.

                # 調(diào)用_doReadOrWrite方法
                _logrun(selectable, _drdw, selectable, method)

    doIteration = doSelect

    def _doReadOrWrite(self, selectable, method):
        try:
            # 調(diào)用method尚骄,doRead或者是doWrite块差,
            # 這里的selectable可能是我們監(jiān)聽的tcp.Port
            why = getattr(selectable, method)()
        except:
            why = sys.exc_info()[1]
            log.err()
        if why:
            self._disconnectSelectable(selectable, why, method=="doRead")

那么假如客戶端有連接請求了,就會調(diào)用讀集合中tcp.Port的doRead方法倔丈。

# twisted/internet/tcp.py

@implementer(interfaces.IListeningPort)
class Port(base.BasePort, _SocketCloser):

    def doRead(self):
        """Called when my socket is ready for reading.
        當套接字準備好讀的時候調(diào)用

        This accepts a connection and calls self.protocol() to handle the
        wire-level protocol.
        """
        try:
            if platformType == "posix":
                numAccepts = self.numberAccepts
            else:
                numAccepts = 1
            for i in range(numAccepts):
                if self.disconnecting:
                    return
                try:
                    # 調(diào)用accept
                    skt, addr = self.socket.accept()
                except socket.error as e:
                    if e.args[0] in (EWOULDBLOCK, EAGAIN):
                        self.numberAccepts = i
                        break
                    elif e.args[0] == EPERM:
                        continue
                    elif e.args[0] in (EMFILE, ENOBUFS, ENFILE, ENOMEM, ECONNABORTED):
                        log.msg("Could not accept new connection (%s)" % (
                            errorcode[e.args[0]],))
                        break
                    raise

                fdesc._setCloseOnExec(skt.fileno())
                protocol = self.factory.buildProtocol(self._buildAddr(addr))
                if protocol is None:
                    skt.close()
                    continue
                s = self.sessionno
                self.sessionno = s+1
                # transport初始化的過程中憨闰,會將自身假如到reactor的讀集合中,那么當它準備
                # 好讀的時候乃沙,就可以調(diào)用它的doRead方法讀取客戶端發(fā)過來的數(shù)據(jù)了
                transport = self.transport(skt, protocol, addr, self, s, self.reactor)
                protocol.makeConnection(transport)
            else:
                self.numberAccepts = self.numberAccepts+20
        except:
            log.deferr()

doRead方法中起趾,調(diào)用accept產(chǎn)生了用于接收客戶端數(shù)據(jù)的套接字,將套接字與transport綁定警儒,然后把transport加入到reactor的讀集合。當客戶端有數(shù)據(jù)到來時眶根,就會調(diào)用transport的doRead方法進行數(shù)據(jù)讀取了蜀铲。

Connection是Server(transport實例的類)的父類,它實現(xiàn)了doRead方法属百。

# twisted/internet/tcp.py
@implementer(interfaces.ITCPTransport, interfaces.ISystemHandle)
class Connection(_TLSConnectionMixin, abstract.FileDescriptor, _SocketCloser,
                 _AbortingMixin):

    def doRead(self):
        try:
            # 接收數(shù)據(jù)
            data = self.socket.recv(self.bufferSize)
        except socket.error as se:
            if se.args[0] == EWOULDBLOCK:
                return
            else:
                return main.CONNECTION_LOST

        return self._dataReceived(data)

    def _dataReceived(self, data):
        if not data:
            return main.CONNECTION_DONE
        # 調(diào)用我們自定義protocol的dataReceived方法處理數(shù)據(jù)
        rval = self.protocol.dataReceived(data)
        if rval is not None:
            offender = self.protocol.dataReceived
            warningFormat = (
                'Returning a value other than None from %(fqpn)s is '
                'deprecated since %(version)s.')
            warningString = deprecate.getDeprecationWarningString(
                offender, versions.Version('Twisted', 11, 0, 0),
                format=warningFormat)
            deprecate.warnAboutFunction(offender, warningString)
        return rval

_dataReceived中調(diào)用了示例中我們自定義的EchoProtocol的dataReceived方法處理數(shù)據(jù)记劝。

至此,一個簡單的流程族扰,從創(chuàng)建監(jiān)聽事件厌丑,到接收客戶端數(shù)據(jù)就此結(jié)束了定欧。

一些細節(jié)的地方我并未說明,這里只是說明一個大概的流程怒竿,想看的細一點的砍鸠,可以直接跟著這個流程去看源碼。

這個系列應(yīng)該會不定時更新耕驰,如果有人感興趣的爷辱,也可以和我直接交流。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末朦肘,一起剝皮案震驚了整個濱河市饭弓,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌媒抠,老刑警劉巖弟断,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異趴生,居然都是意外死亡夫嗓,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門冲秽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舍咖,“玉大人,你說我怎么就攤上這事锉桑∨琶梗” “怎么了?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵民轴,是天一觀的道長攻柠。 經(jīng)常有香客問我,道長后裸,這世上最難降的妖魔是什么瑰钮? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮微驶,結(jié)果婚禮上浪谴,老公的妹妹穿的比我還像新娘。我一直安慰自己因苹,他們只是感情好苟耻,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著扶檐,像睡著了一般凶杖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上款筑,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天智蝠,我揣著相機與錄音腾么,去河邊找鬼。 笑死杈湾,一個胖子當著我的面吹牛解虱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播毛秘,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼饭寺,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了叫挟?” 一聲冷哼從身側(cè)響起艰匙,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎抹恳,沒想到半個月后员凝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡奋献,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年健霹,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓶蚂。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡糖埋,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出窃这,到底是詐尸還是另有隱情瞳别,我是刑警寧澤,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布杭攻,位于F島的核電站祟敛,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏兆解。R本人自食惡果不足惜馆铁,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望锅睛。 院中可真熱鬧埠巨,春花似錦、人聲如沸衣撬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽具练。三九已至,卻和暖如春甜无,著一層夾襖步出監(jiān)牢的瞬間扛点,已是汗流浹背哥遮。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留陵究,地道東北人眠饮。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像铜邮,于是被迫代替她去往敵國和親仪召。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容