第一章我們講了程序的基本啟動流程恶座,這里面涉及到了各種節(jié)點信息搀暑,master,net跨琳,gate自点,game...,當然脉让,無論他叫什么名字桂敛,歸根結底他都是一個節(jié)點。就好比爺爺爸爸兒子溅潜,就像是父父節(jié)點术唬,父節(jié)點,子節(jié)點滚澜,不管在家里地位如何粗仓,他終究是人。那么是如何實現(xiàn)這個分布式節(jié)點的呢设捐,我們這章進入firefly源碼內(nèi)一探究竟
節(jié)點相關的文件都存放在distributed潦牛,也就是分布式模塊當中
-
distributed模塊說明
該模塊主要封裝了關于節(jié)點的所有方法以及類
-
distributed模塊結構解析
PBRoot,root節(jié)點對象(所有擁有child節(jié)點的都是root節(jié)點挡育,root節(jié)點本身可以有root節(jié)點巴碗,好比爺爺是root節(jié)點,爸爸是root節(jié)點即寒,因為他們都有兒子橡淆,但是兒子就只能是child節(jié)點)
ChildsManager召噩,子節(jié)點管理基類
Child 對象對應的是連接到本服務進程的某個服務進程對象。稱為子節(jié)點對象
RemoteObject遠程調(diào)用對象逸爵,所有child對象都是一個遠程調(diào)用對象具滴,遠程調(diào)用對象封裝了一個child對象和child對象的遠程代理通道(用于反向調(diào)用),以及service對象(用于提供遠程調(diào)用的方法师倔,在下面你會看到很多他的身影构韵,你可先不用管他是什么,只需要知道他能提供被調(diào)用的方法就可以了趋艘,因為在下一章疲恢,我們會詳細描述service)
-
分布式模塊的文件結構如下
模塊結構如下:
文件結構:
-distributed
-__init__.py
-child.py
-manager.py
-node.py
-reference.py
-root.py
-
- _ init _.py
#該文件本身并沒有什么意義,存在的作用是將distributed文件變成一個可以導入的包
-
-child.py
# 沒有子節(jié)點的節(jié)點統(tǒng)稱為child節(jié)點瓷胧,這類節(jié)點一般用于提供各類服務
'''
Created on 2013-8-14
@author: lan (www.9miao.com)
'''
class Child(object):
'''子節(jié)點對象'''
def __init__(self,cid,name):
'''初始化子節(jié)點對象
'''
self._id = cid
self._name = name
self._transport = None
def getName(self):
'''獲取子節(jié)點的名稱'''
return self._name
def setTransport(self,transport):
'''設置子節(jié)點的通道'''
self._transport = transport
def callbackChild(self,*args,**kw):
'''回調(diào)子節(jié)點的接口
return a Defered Object (recvdata)
'''
recvdata = self._transport.callRemote('callChild',*args,**kw)
return recvdata
-
manager.py
# manager 顧名思義显拳,是一個管理類
# 在分布式節(jié)點中,既然有子節(jié)點搓萧,那么就會有父節(jié)點杂数。
# 而有了根節(jié)點,就需要賦予父節(jié)點管理子節(jié)點的功能
# manager 的出現(xiàn)就是為了這個功能
'''
Created on 2013-8-14
@author: lan (www.9miao.com)
'''
from twisted.python import log
from zope.interface import Interface
from zope.interface import implements
class _ChildsManager(Interface): #接口類(這里用到接口這種開發(fā)模式瘸洛,有興趣的可以看一下)
'''節(jié)點管理器接口'''
def __init__(self):
'''初始化接口'''
def getChildById(self,childId):
'''根據(jù)節(jié)點id獲取節(jié)點實例'''
def getChildByName(self,childname):
'''根據(jù)節(jié)點的名稱獲取節(jié)點實例'''
def addChild(self,child):
'''添加一個child節(jié)點
@param child: Child object
'''
def dropChild(self,*arg,**kw):
'''刪除一個節(jié)點'''
def callChild(self,*args,**kw):
'''調(diào)用子節(jié)點的接口'''
def callChildByName(self,*args,**kw):
'''調(diào)用子節(jié)點的接口
@param childname: str 子節(jié)點的名稱
'''
def dropChildByID(self,childId):
'''刪除一個child 節(jié)點
@param childId: Child ID
'''
def dropChildSessionId(self, session_id):
"""根據(jù)session_id刪除child節(jié)點
"""
@implementer(_ChildsManager)
class ChildsManager(object):
'''
子節(jié)點管理器
因為裝飾器implementer
用戶必須得完成_ChildsManager的所有方法
'''
def __init__(self):
'''
初始化子節(jié)點管理器
'''
self._childs = {}
#第一步初始化子節(jié)點字典
#所有子節(jié)點對象都將以鍵值對的形式存儲在這里
#key -> child的ID
#value -> child 對象揍移,也就是子節(jié)點
def getChildById(self,childId):
'''
根據(jù)節(jié)點的ID獲取節(jié)點實例
'''
return self._childs.get(childId)
def getChildByName(self,childname):
'''
根據(jù)節(jié)點的名稱獲取節(jié)點實例
遍歷子節(jié)點
從中提取節(jié)點名稱為childname的節(jié)點
'''
for key,child in self._childs.items():
if child.getName() == childname:
return self._childs[key]
return None
def addChild(self,child):
'''添加一個child節(jié)點
@param child: Child object
'''
key = child._id
if self._childs.has_key(key):
raise "child node %s exists"% key
self._childs[key] = child
def dropChild(self,child):
'''刪除一個child 節(jié)點
@param child: Child Object
'''
key = child._id
try:
del self._childs[key]
except Exception,e:
log.msg(str(e))
def dropChildByID(self,childId):
'''通過ID刪除一個child 節(jié)點
@param childId: Child ID
'''
try:
del self._childs[childId]
except Exception,e:
log.msg(str(e))
def callChild(self,childId,*args,**kw):
'''調(diào)用子節(jié)點的接口
@param childId: int 子節(jié)點的id
'''
child = self._childs.get(childId,None)
if not child:
log.err("child %s doesn't exists"%childId)
return
return child.callbackChild(*args,**kw)
def callChildByName(self,childname,*args,**kw):
'''通過節(jié)點名稱調(diào)用子節(jié)點的接口
@param childname: str 子節(jié)點的名稱
'''
child = self.getChildByName(childname)
if not child:
log.err("child %s doesn't exists"%childname)
return
return child.callbackChild(*args,**kw)
def getChildBYSessionId(self, session_id):
"""根據(jù)sessionID獲取child節(jié)點信息
"""
for child in self._childs.values():
if child._transport.broker.transport.sessionno == session_id:
return child
return None
從代碼中,我們基本上可以看出來這個類除了用于管理子節(jié)點的添加反肋,移除羊精,獲取這種最基本的以外,還有callChild**囚玫,callChild函數(shù)首先取出子節(jié)點,然后再調(diào)用子節(jié)點的callbackChild函數(shù)读规。從字面意思上抓督,我們基本就可以看出來,這是一個調(diào)用子節(jié)點函數(shù)的函數(shù)束亏。至于怎么實現(xiàn)的铃在,可以去看twisted的透明代理部分,官網(wǎng)都有詳細的說明以及demo碍遍。
-
node.py
介紹了那么多關于子節(jié)點的內(nèi)容定铜,
下面我們正式開始揭開子節(jié)點的面紗。
#顧名思義怕敬,node.py就是根節(jié)點下面的子節(jié)點
'''
Created on 2013-8-14
@author: lan (www.9miao.com)
'''
from twisted.spread import pb
from twisted.internet import reactor
reactor = reactor
from reference import ProxyReference
def callRemote(obj,funcName,*args,**kw):
'''
遠程調(diào)用(這里只是作為一個調(diào)用子節(jié)點的方法存在)
@param obj 子節(jié)點對象揣炕,也就是RemoteObject
@param funcName: str 遠程方法名稱
'''
return obj.callRemote(funcName, *args,**kw)
class RemoteObject(object):
'''遠程調(diào)用對象
也就是真正的子節(jié)點'''
def __init__(self,name):
'''初始化遠程調(diào)用對象
@param port: int 遠程分布服的端口號
@param rootaddr: 根節(jié)點服務器地址
'''
self._name = name #本節(jié)點的節(jié)點名稱
self._factory = pb.PBClientFactory() #twisted的透明代理客戶端工廠
self._reference = ProxyReference() #twisted透明代理的代理通道
self._addr = None #root節(jié)點的節(jié)點地址(host,port)
def setName(self,name):
'''設置節(jié)點的名稱'''
self._name = name
def getName(self):
'''獲取節(jié)點的名稱'''
return self._name
def connect(self,addr):
'''初始化遠程調(diào)用對象'''
self._addr = addr # 保存遠程節(jié)點地址
reactor.connectTCP(addr[0], addr[1], self._factory) # 連接遠程節(jié)點地址
self.takeProxy() # 獲取遠程節(jié)點地址對象(這是一個異步操作,返回的是延時對象)
def reconnect(self):
'''重新連接'''
self.connect(self._addr)
def addServiceChannel(self,service):
'''
設置引用對象
'''
self._reference.addService(service)
def takeProxy(self):
'''
向遠程服務端發(fā)送代理通道對象
twisted的雙向透明代理規(guī)定的步驟
1.向服務器請求root對象
2.通過root對象向服務器發(fā)送自身的ProxyReference(代理通道)
3.雙向連接完成
'''
deferedRemote = self._factory.getRootObject()
deferedRemote.addCallback(callRemote,'takeProxy',self._name,self._reference)
def callRemote(self,commandId,*args,**kw):
'''
這里就是正式的遠程調(diào)用函數(shù)了东跪,子節(jié)點調(diào)用父節(jié)點的方法
和建立雙向透明代理類似畸陡,調(diào)用的步驟為:
1.獲取遠程root對象
2.通過root對象調(diào)用服務器函數(shù)
@param callRemote 這里調(diào)用的就是該文件頂部的那個同名函數(shù)
由于 addCallback會將 異步獲取的 對象傳入Callback 中
也就是傳入 callRemote 這個函數(shù)內(nèi)鹰溜,所以最終實現(xiàn)的效果是 :
self._factory.getRootObject().callRemote('callTarget', ,commandId,*args,**kw)
@param callTarget 這是firefly定義的遠程調(diào)用函數(shù)函數(shù)名稱
@param commandId 遠程對象會根據(jù) commandId 來最終絕對執(zhí)行什么操作
'''
deferedRemote = self._factory.getRootObject()
return deferedRemote.addCallback(callRemote,'callTarget',commandId,*args,**kw)
-
reference.py(代理通道)
#coding:utf8
'''
Created on 2013-8-14
@author: lan (www.9miao.com)
'''
from twisted.spread import pb
from firefly.utils.services import Service
class ProxyReference(pb.Referenceable):
'''代理通道'''
def __init__(self):
'''初始化'''
self._service = Service('proxy')
def addService(self,service):
'''添加一條服務通道'''
self._service = service
def remote_callChild(self, command,*arg,**kw):
'''代理發(fā)送數(shù)據(jù)
'''
return self._service.callTarget(command,*arg,**kw)
這一部分是代理通道相關的代碼,主要實現(xiàn)功能就是在子節(jié)點實例化的時候產(chǎn)生一個代理通道丁恭,然后在子節(jié)點連接上父節(jié)點之后通過調(diào)用takeProxy函數(shù)將自己的代理通道對象發(fā)送給父節(jié)點曹动,代理成功后,父節(jié)點就可以調(diào)用子節(jié)點中約定的函數(shù)(這也被稱為pb的反向調(diào)用)
-
父節(jié)點對象 -root.py
最后我們來看看分布式節(jié)點的中心root節(jié)點牲览,當然這里的所謂中心并不代表只有一個root節(jié)點墓陈,比如暗黑世界里,直觀的你就能發(fā)現(xiàn)master和gate都是root節(jié)點第献,對于master而言贡必,gate又是他的子節(jié)點。同理痊硕,game同樣可以作為一個root節(jié)點(當然在暗黑世界里赊级,game是一個child節(jié)點),然后分裂出game1岔绸,game2理逊,game3...,但同時盒揉,又是gate的子節(jié)點晋被,這其中并不矛盾。
# root.py文件刚盈,顧名思義羡洛,是個根節(jié)點
'''
Created on 2013-8-14
分布式根節(jié)點
@author: lan (www.9miao.com)
'''
from twisted.python import log
from twisted.spread import pb
from manager import ChildsManager
from child import Child
class BilateralBroker(pb.Broker):
'''
'''
def connectionLost(self, reason):
'''
這里重寫了pb.broker的連接丟失方法,用于子連接丟失后將子節(jié)點從根節(jié)點中移除
'''
clientID = self.transport.sessionno #獲取斷開的節(jié)點id
log.msg("node [%d] lose"%clientID)
self.factory.root.dropChildSessionId(clientID)# 移除斷開的節(jié)點對象
pb.Broker.connectionLost(self, reason) #斷開節(jié)點
class BilateralFactory(pb.PBServerFactory):
'''
重寫 PBServerFactory 類藕漱,
目的只是為了重寫pb.Broker的connectionLost方法欲侮,
而重寫connectionLost方法只是為了將子節(jié)點斷開后,
從根節(jié)點中將對象刪除肋联,保持根節(jié)點對于子節(jié)點信息存儲的準確性
'''
protocol = BilateralBroker
class PBRoot(pb.Root):
'''
繼承PB 協(xié)議的root節(jié)點
'''
def __init__(self,dnsmanager = ChildsManager()):
'''
初始化根節(jié)點
'''
self.service = None
self.childsmanager = dnsmanager # 創(chuàng)建子節(jié)點管理器
def addServiceChannel(self,service):
'''添加服務通道
@param service: Service Object(In bilateral.services)
'''
self.service = service #
def doChildConnect(self,name,transport):
"""當node節(jié)點連接時的處理
"""
pass
def dropChild(self,*args,**kw):
'''刪除子節(jié)點記錄'''
self.childsmanager.dropChild(*args,**kw)
def dropChildByID(self,childId):
'''刪除子節(jié)點記錄'''
self.doChildLostConnect(childId)
self.childsmanager.dropChildByID(childId)
def dropChildSessionId(self, session_id):
'''刪除子節(jié)點記錄'''
child = self.childsmanager.getChildBYSessionId(session_id)
if not child:
return
child_id = child._id
self.doChildLostConnect(child_id)
self.childsmanager.dropChildByID(child_id)
def doChildLostConnect(self,childId):
"""當node節(jié)點連接時的處理
"""
pass
def callChild(self,key,*args,**kw):
'''調(diào)用子節(jié)點的接口
@param childId: int 子節(jié)點的id
return Defered Object
'''
return self.childsmanager.callChild(key,*args,**kw)
def callChildByName(self,childname,*args,**kw):
'''調(diào)用子節(jié)點的接口
@param childId: int 子節(jié)點的id
return Defered Object
'''
return self.childsmanager.callChildByName(childname,*args,**kw)
# 這里remote_**格式的方法威蕉,是twisted透明代理方法的格式,以此命名的方法都可被遠程調(diào)用
def remote_takeProxy(self,name,transport):
'''設置代理通道
@param addr: (hostname,port)hostname 根節(jié)點的主機名,根節(jié)點的端口
'''
log.msg('node [%s] takeProxy ready'%name)
child = Child(name,name) # 創(chuàng)建子節(jié)點
self.childsmanager.addChild(child) # 添加子節(jié)點
child.setTransport(transport) # 設置子節(jié)點連接對象
self.doChildConnect(name, transport) #連接子節(jié)點
def remote_callTarget(self,command,*args,**kw):
'''遠程調(diào)用方法
@param commandId: int 指令號
@param data: str 調(diào)用參數(shù)
'''
data = self.service.callTarget(command,*args,**kw)
return data
-
最終橄仍,我們基本已經(jīng)清楚分布式布局的核心韧涨,互相調(diào)用的原理
1. root節(jié)點服務會首先建立,監(jiān)聽例如1000端口 root
2. node節(jié)點則會在服務器來后第一時間去連接1000端口
{node->connect()->root}
3. node與root建立連接成功后侮繁,node節(jié)點會調(diào)用root節(jié)點的remote_takeProxy函數(shù)用以在root節(jié)點互相交換代理權
{node->takeProxy()->callRemote('register')->childsmanager.addChildByNamePeer()}
4. 至此root和node之間的連接已經(jīng)建立成功了脸爱,當然建立連接的本質意義是為了實現(xiàn)相互調(diào)用业岁,那么相互調(diào)用又是怎么實現(xiàn)呢岁诉?
5. root調(diào)用child(這是一個反向調(diào)用的過程)
{root -> childsmanager.callChild() -> child.callbackChild() -> ProxyReference().callRemote()}
6. child調(diào)用root(這是一個正向調(diào)用的過程)
{child -> callRemote()}