ChanRPC實(shí)現(xiàn)模塊(Module)goroutine間的通信
為了進(jìn)一步分析Leaf游戲服務(wù)器,我們需要了解Leaf的ChanRPC
Leaf中每個(gè)模塊在獨(dú)立的goroutine間運(yùn)行,Leaf提供了基于channel的RPC機(jī)制來(lái)實(shí)現(xiàn)模塊間的相互通信
chanrpc功能的實(shí)現(xiàn)(Server為例)
// leaf\chanrpc\chanrpc.go
type Server struct {
// id -> function
//
// function:
// func(args []interface{})
// func(args []interface{}) interface{}
// func(args []interface{}) []interface{}
functions map[interface{}]interface{}
ChanCall chan *CallInfo
}
Server類型的變量會(huì)維護(hù)一個(gè)函數(shù)映射關(guān)系functions
,通過(guò)接受到的消息去調(diào)用相應(yīng)注冊(cè)的函數(shù),而ChanCall
通道則用來(lái)搭建通道間的信息傳遞橋梁
Server
的Register
方法實(shí)現(xiàn)了對(duì)functions
的注冊(cè)
// leaf\chanrpc\chanrpc.go
// you must call the function before calling Open and Go
func (s *Server) Register(id interface{}, f interface{}) {
switch f.(type) {
case func([]interface{}):
case func([]interface{}) interface{}:
case func([]interface{}) []interface{}:
default:
panic(fmt.Sprintf("function id %v: definition of function is invalid", id))
}
if _, ok := s.functions[id]; ok {
panic(fmt.Sprintf("function id %v: already registered", id))
}
s.functions[id] = f
}
ChanCall
參數(shù)則將在某個(gè)地方進(jìn)行通道數(shù)據(jù)的讀取和寫入,來(lái)達(dá)到通道間通信的功能
例如使用GO調(diào)用,往通道里寫入數(shù)據(jù),數(shù)據(jù)包括要進(jìn)行回調(diào)的functions中的函數(shù)及其參數(shù)
// leaf\chanrpc\chanrpc.go
func (s *Server) Go(id interface{}, args ...interface{}) {
// 拿到要調(diào)用的函數(shù)
f := s.functions[id]
if f == nil {
return
}
defer func() {
recover()
}()
// 往通道里寫入數(shù)據(jù) 函數(shù) 和 參數(shù)
s.ChanCall <- &CallInfo{
f: f,
args: args,
}
}
而在寫入數(shù)據(jù)之前,必定有在程序的某個(gè)地方通過(guò)這個(gè)通道等待接收數(shù)據(jù),LeafServer中在進(jìn)入通過(guò)Skeleton模塊建立的Module的Run()
生命周期中,在模塊的goroutine中就會(huì)等待接收數(shù)據(jù),接收到數(shù)據(jù)后執(zhí)行Exec()來(lái)進(jìn)行處理,Skeleton相關(guān)內(nèi)容將在后面的文章進(jìn)行簡(jiǎn)析
// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
case <-closeSig:
s.commandServer.Close()
s.server.Close()
for !s.g.Idle() || !s.client.Idle() {
s.g.Close()
s.client.Close()
}
return
case ri := <-s.client.ChanAsynRet:
s.client.Cb(ri)
// 等待來(lái)自通道的數(shù)據(jù)
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
case ci := <-s.commandServer.ChanCall:
s.commandServer.Exec(ci)
case cb := <-s.g.ChanCb:
s.g.Cb(cb)
case t := <-s.dispatcher.ChanTimer:
t.Cb()
}
}
}
Exec()
調(diào)用相應(yīng)函數(shù)
// leaf\chanrpc\chanrpc.go
func (s *Server) Exec(ci *CallInfo) {
err := s.exec(ci)
if err != nil {
log.Error("%v", err)
}
}
func (s *Server) exec(ci *CallInfo) (err error) {
defer func() {
if r := recover(); r != nil {
if conf.LenStackBuf > 0 {
buf := make([]byte, conf.LenStackBuf)
l := runtime.Stack(buf, false)
err = fmt.Errorf("%v: %s", r, buf[:l])
} else {
err = fmt.Errorf("%v", r)
}
s.ret(ci, &RetInfo{err: fmt.Errorf("%v", r)})
}
}()
// execute
switch ci.f.(type) {
case func([]interface{}):
ci.f.(func([]interface{}))(ci.args)
return s.ret(ci, &RetInfo{})
case func([]interface{}) interface{}:
ret := ci.f.(func([]interface{}) interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
case func([]interface{}) []interface{}:
ret := ci.f.(func([]interface{}) []interface{})(ci.args)
return s.ret(ci, &RetInfo{ret: ret})
}
panic("bug")
}
LeafServer中的chanrpc使用舉例
簡(jiǎn)單講解LeafServer中g(shù)ate.Module與game.Module的通信,下文中的目錄server
為L(zhǎng)eafServer項(xiàng)目根目錄
首先程序一開始在game模塊中注冊(cè)了相應(yīng)的ChanRPC消息和響應(yīng)函數(shù)(Go pkg里的init()
函數(shù)會(huì)在main()
之前執(zhí)行),game模塊注冊(cè)了NewAgent 和CloseAgent 兩個(gè)ChanRPC,分別執(zhí)行用戶建立連接和斷開鏈接的相關(guān)邏輯
// server\game\internal\chanrpc.go
func init() {
skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)
skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent)
}
func rpcNewAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
func rpcCloseAgent(args []interface{}) {
a := args[0].(gate.Agent)
_ = a
}
// skeleton.RegisterChanRPC()方法調(diào)用的是上文提到的Server.Register
// leaf\module\skeleton.go
func (s *Skeleton) RegisterChanRPC(id interface{}, f interface{}) {
if s.ChanRPCServer == nil {
panic("invalid ChanRPCServer")
}
// 注冊(cè)
s.server.Register(id, f)
}
注冊(cè)完ChanRPC后,在程序運(yùn)行起來(lái)后,會(huì)按照Module生命周期進(jìn)到如前文所說(shuō)的Run
中等待數(shù)據(jù)被寫入通道
// leaf\module\skeleton.go
func (s *Skeleton) Run(closeSig chan bool) {
for {
select {
// ...
// 等待來(lái)自通道的數(shù)據(jù)
case ci := <-s.server.ChanCall:
s.server.Exec(ci)
//...
}
}
}
gate模塊中當(dāng)用戶連接時(shí)將調(diào)用Server.Go
函數(shù)像game模塊發(fā)送消息
// leaf\gate\gate.go
gate.AgentChanRPC.Go("NewAgent", a)
// leaf\chanrpc\chanrpc.go
func (s *Server) Go(id interface{}, args ...interface{}) {
// ...
// 往通道里寫入數(shù)據(jù) 函數(shù) 和 參數(shù)
s.ChanCall <- &CallInfo{
f: f,
args: args,
}
}
調(diào)用了Server.GO
后,在Run()
里面等待的通道就可以拿到消息,從而執(zhí)行相應(yīng)的處理函數(shù)了
而對(duì)于gate.AgentChanRPC則在gate模塊的初始化中就指向了game模塊的ChanRPC:
// server\gate\internal\module.go
func (m *Module) OnInit() {
m.Gate = &gate.Gate{
// ...
AgentChanRPC: game.ChanRPC, //注冊(cè)為game的ChanRPC,于是就可以通過(guò)gate.AgentChanRPC.Go("NewAgent", a)和game模塊通信
}
}
小結(jié)
本文僅僅只是簡(jiǎn)單梳理了一下ChanRPC通信的流程,回頭看整個(gè)過(guò)程其實(shí)很清晰,程序初始化的時(shí)候模塊注冊(cè)好需要的ChanRPC,在生命周期Run()里等待模塊的ChanRPC里的通道的信號(hào).其他模塊往等待信號(hào)的通道里發(fā)送內(nèi)容,等待信號(hào)的模塊拿到內(nèi)容后調(diào)用注冊(cè)好的函數(shù).拋磚引玉了一下,歡迎交流討論