cde::CBigMessageQueue介紹
看繼承關(guān)系:
繼承于cdf::CMessageQueueBase和cdf::IWakeMessageLoopHandler兩個類
cdf::CMessageQueueBase
先看cdf::CMessageQueueBase是啥:
typedef cde_hash_map<SEntityId, IMessageHandlerPtr, entity_id_hash> MessageHandlerHandlerIdMap;
//typedef std::map<SEntityId,IMessageHandlerPtr> MessageHandlerHandlerIdMap;
typedef cde_hash_map<int,IMessageHandlerPtr> MessageHandlerCommandMap;
typedef std::list<CMessageBlockPtr,CCdfAllocator<CMessageBlockPtr> > MessageBlockList;
MessageHandlerHandlerIdMap 是一個map 其中Key是SEntityId手报,value是 IMessageHandlerPtr
MessageHandlerCommandMap 是一個map 其中key是int value是IMessageHandlerPtr
MessageBlockList 是一個list 保存了CMessageBlockPtr
分別介紹這幾個類:
SEntityId:
繼承于IMessageBase
可以看到 只是規(guī)定了每個消息需要實現(xiàn)的操作:”讀寫拷貝以及重載=符號
這個類有個友元類:CMessageBlock
這段代碼定義了一個名為CMessageBlock的類,它繼承了CRefShared類,因此CMessageBlock對象可以使用引用計數(shù)來管理內(nèi)存讼油,從而避免內(nèi)存泄漏問題打月。
CMessageBlock類包含以下成員變量:
_messageHead:用于存儲消息頭信息的結(jié)構(gòu)體辞嗡。
_messageBase:指向?qū)崿F(xiàn)了IMessageBase接口的對象的智能指針不狮。
_os:指向CSerializeStream類對象的指針欣除,用于序列化消息數(shù)據(jù)色冀。
CMessageBlock類還包含了以下成員函數(shù):
CMessageBlock():默認構(gòu)造函數(shù)潭袱。
~CMessageBlock():虛析構(gòu)函數(shù)。
initOS():初始化_os成員變量锋恬。
__write():將消息數(shù)據(jù)寫入指定的序列化流屯换。
__writeHead():將消息頭信息寫入指定的序列化流。
__writeBody():將消息體信息寫入指定的序列化流伶氢。
__read():從指定的序列化流中讀取消息數(shù)據(jù)趟径。
__readHead():從指定的序列化流中讀取消息頭信息瘪吏。
__readBody():從指定的序列化流中讀取消息體信息。
operator=():賦值運算符重載函數(shù)蜗巧,被聲明為private掌眠,禁止對CMessageBlock對象進行賦值操作。
綜上所述幕屹,CMessageBlock類是一個用于表示消息數(shù)據(jù)的類蓝丙,它將消息頭和消息體封裝成一個對象,并且提供了序列化和反序列化消息數(shù)據(jù)的方法望拖。
IMessageHandlerPtr:
這段代碼定義了一個名為IMessageHandler的抽象基類渺尘,它繼承了CRefShared類,因此IMessageHandler對象可以使用引用計數(shù)來管理內(nèi)存说敏,從而避免內(nèi)存泄漏問題鸥跟。
IMessageHandler類包含以下成員變量:
_objectId:表示對象的唯一標識符。
IMessageHandler類還包含了以下成員函數(shù):
IMessageHandler():默認構(gòu)造函數(shù)盔沫。
~IMessageHandler():虛析構(gòu)函數(shù)医咨。
getId():獲取對象的唯一標識符。
onMessage():處理收到的消息數(shù)據(jù)架诞。
operator<():小于運算符重載函數(shù)拟淮,用于在容器中對IMessageHandler對象進行排序。
setId():設(shè)置對象的唯一標識符谴忧。
IMessageHandlerPtr是一個智能指針類型很泊,它使用CHandle模板類來管理IMessageHandler對象的生命周期≌次剑可以使用IMessageHandlerPtr指針來操作IMessageHandler對象委造,而不必手動管理對象的內(nèi)存。
綜上所述搏屑,IMessageHandler類是一個用于處理消息的抽象基類争涌,它定義了一個處理收到的消息數(shù)據(jù)的方法,并提供了對象唯一標識符的管理辣恋。IMessageHandlerPtr是一個智能指針類型亮垫,用于方便地管理IMessageHandler對象的生命周期。
CMessageBlockPtr:CMessageBlock的指針
class CMessageQueueBase
{
public:
typedef cde_hash_map<SEntityId, IMessageHandlerPtr, entity_id_hash> MessageHandlerHandlerIdMap;
//typedef std::map<SEntityId,IMessageHandlerPtr> MessageHandlerHandlerIdMap;
typedef cde_hash_map<int,IMessageHandlerPtr> MessageHandlerCommandMap;
typedef std::list<CMessageBlockPtr,CCdfAllocator<CMessageBlockPtr> > MessageBlockList;
//typedef std::list<CMessageBlockPtr> MessageBlockList;
CMessageQueueBase( IWakeMessageLoopHandler* messageLoopPtr );
virtual ~CMessageQueueBase();
/**
* to regist handle to proccess message
* if the MESSAGE_ID and common regist to throw command
*/
int registeHandler(const SEntityId& id, const IMessageHandlerPtr& handler);
/**
* if the command not execute by id
* the command to excecute to by command
*/
int registeHandler( const int command , const IMessageHandlerPtr& handler );
/**
* remove handles
*/
int removeHandler(const SEntityId& id);
/**
* remove handles
*/
int removeHandler( const int command );
/**
* push message to the end
*/
void pushMessage( const CMessageBlockPtr& messageBlock );
/**
* push message to the end
*/
void pushMessage( int cmd , const IMessageBasePtr& messageBase );
/**
* push message to the end
*/
void pushMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase);
/**
* push message to the end
*/
void sendMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase);
//to send message
void sendMessage( const CMessageBlockPtr& messageBlock );
/**
* push message to the front
*/
void pushMessageFront( CMessageBlockPtr& messageBlock );
/***
* distribute message
*/
void distributeMessage();
/**
* break on error
*/
void setBreakOnError( bool breakOnError ){ _breakOnError = breakOnError; }
bool isBreakOnError() const { return _breakOnError; }
protected:
CMessageQueueBase();
CLightLock _idMessageHandlersLock;
MessageHandlerHandlerIdMap _idMessageHandlers;
CLightLock _commandMessageHandlersLock;
MessageHandlerCommandMap _commandMessageHandlers;
CLightLock _messageReadListLock;
MessageBlockList* _messageReadList;
CLightLock _messageWriteListLock;
MessageBlockList* _messageWriteList;
IWakeMessageLoopHandler* _messageLoopPtr;
bool _breakOnError;
};
這段代碼定義了一個消息隊列基類 CMessageQueueBase伟骨,用于存儲和分發(fā)消息饮潦。以下是該類的主要成員函數(shù)和數(shù)據(jù)成員的解釋:
成員函數(shù):
registeHandler(const SEntityId& id, const IMessageHandlerPtr& handler):注冊消息處理器,將給定的 IMessageHandlerPtr 實例與給定的 SEntityId 實例相關(guān)聯(lián)携狭。
registeHandler(const int command, const IMessageHandlerPtr& handler):注冊消息處理器继蜡,將給定的 IMessageHandlerPtr 實例與給定的 command 相關(guān)聯(lián)。
removeHandler(const SEntityId& id):從隊列中移除與給定 SEntityId 相關(guān)聯(lián)的消息處理器。
removeHandler(const int command):從隊列中移除與給定 command 相關(guān)聯(lián)的消息處理器稀并。
pushMessage(const CMessageBlockPtr& messageBlock):將給定的 CMessageBlockPtr 實例添加到消息隊列的末尾仅颇。
pushMessage(int cmd, const IMessageBasePtr& messageBase):構(gòu)造一個消息塊 CMessageBlock 實例,并將其添加到消息隊列的末尾碘举。
pushMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase):構(gòu)造一個消息塊 CMessageBlock 實例忘瓦,并將其添加到消息隊列的末尾。該消息將被分配給給定的 SEntityId引颈。
sendMessage(int cmd, const SEntityId& toId, const IMessageBasePtr& messageBase):將給定的消息塊直接發(fā)送給給定的 SEntityId耕皮。
sendMessage(const CMessageBlockPtr& messageBlock):將給定的消息塊直接發(fā)送給其 SEntityId。
pushMessageFront(CMessageBlockPtr& messageBlock):將給定的 CMessageBlockPtr 實例添加到消息隊列的開頭蝙场。
distributeMessage():分發(fā)當前在隊列中的所有消息凌停。
setBreakOnError(bool breakOnError):設(shè)置隊列是否在出現(xiàn)錯誤時中斷。
isBreakOnError() const:獲取隊列當前是否在出現(xiàn)錯誤時中斷售滤。
數(shù)據(jù)成員:
MessageHandlerHandlerIdMap _idMessageHandlers:用于存儲與 SEntityId 相關(guān)聯(lián)的 IMessageHandlerPtr 實例的映射罚拟。
MessageHandlerCommandMap _commandMessageHandlers:用于存儲與 command 相關(guān)聯(lián)的 IMessageHandlerPtr 實例的映射。
MessageBlockList* _messageReadList:指向消息隊列中要讀取的消息列表的指針完箩。
MessageBlockList* _messageWriteList:指向消息隊列中要寫入的消息列表的指針舟舒。
IWakeMessageLoopHandler* _messageLoopPtr:指向與此消息隊列相關(guān)聯(lián)的消息循環(huán)的指針。
bool _breakOnError:表示隊列是否在出現(xiàn)錯誤時中斷嗜憔。
成員方法:
構(gòu)造函數(shù):
生成兩個list _messageReadList和_messageWriteList
注冊處理消息:
移除注冊的消息:
發(fā)送消息:
cmd是消息id,toId是需要發(fā)送的玩家氏仗,messageBase是消息內(nèi)容
void
CMessageQueueBase::sendMessage( const CMessageBlockPtr& messageBlock )
{
//LOG_TRACE( "CMessageQueue::pushMessage" , messageBlock._ptr );
if( !messageBlock )
{
assert( false );
return;
}
///////// to player /////////
SeqEntityId toIds( messageBlock->_messageHead.toIds );
for( size_t i = 0 ; i < toIds.size() ; i++ )
{
SEntityId& id = toIds[i];
IMessageHandlerPtr handler;
{
CAutoLightLock autoLock(_idMessageHandlersLock);
//MessageHandlerHandlerIdMap::const_iterator iter;
MessageHandlerHandlerIdMap::iterator iter;
iter = _idMessageHandlers.find( id );
if( _idMessageHandlers.end() != iter )
{
handler = iter->second;
}
}
if( handler )
{
try
{
handler->onMessage( messageBlock );
}
catch( const CException& ex )
{
CDF_LOG_INFO( CWarning ,
__FILE__ << ":" << __LINE__
<< "Exception " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command );
if( _breakOnError ) throw;
}
catch( const std::exception& ex )
{
CDF_LOG_INFO( CWarning ,
__FILE__ << ":" << __LINE__
<< "exception " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command );
if( _breakOnError ) throw;
}
catch( ... )
{//if has unkown exception
CDF_LOG_INFO( CError ,
__FILE__ << ":" << __LINE__
<< "unkown exception" <<
" Cmd:" << messageBlock->_messageHead.command );
if( _breakOnError ) throw;
}
}
}
///////// to system /////////
if (toIds.empty())
{
IMessageHandlerPtr handler = NULL;
{
CAutoLightLock autoLock(_commandMessageHandlersLock);
//MessageHandlerCommandMap::const_iterator iter;
MessageHandlerCommandMap::iterator iter;
iter = _commandMessageHandlers.find(messageBlock->_messageHead.command);
if (_commandMessageHandlers.end() != iter)
{
handler = iter->second;
}
}
//if is not process by any handler
if (handler)
{
try
{
handler->onMessage(messageBlock);
}
catch (const CException& ex)
{
CDF_LOG_INFO(CWarning,
__FILE__ << ":" << __LINE__
<< "cdf::CException " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command);
if (_breakOnError) throw;
}
catch (const std::exception& ex)
{
CDF_LOG_INFO(CWarning,
__FILE__ << ":" << __LINE__
<< "std::exception " << ex.what() <<
" Cmd:" << messageBlock->_messageHead.command);
if (_breakOnError) throw;
}
catch (...)
{//if has unkown exception
CDF_LOG_INFO(CError,
__FILE__ << ":" << __LINE__
<< "unkown exception" <<
" Cmd:" << messageBlock->_messageHead.command);
if (_breakOnError) throw;
}
}
}
}
這段代碼實現(xiàn)了一個消息隊列(Message Queue)吉捶,用于處理消息的發(fā)送和處理。
函數(shù) sendMessage 接收一個 CMessageBlockPtr 類型的消息塊皆尔,并將其發(fā)送給一個或多個消息處理器(Message Handler)呐舔,或者發(fā)送給系統(tǒng)處理器(System Handler)。
首先慷蠕,代碼會遍歷消息塊中的接收者列表珊拼,然后在 _idMessageHandlers 中查找與接收者 ID 匹配的消息處理器。如果找到匹配的消息處理器流炕,則調(diào)用其 onMessage 函數(shù)澎现,將消息塊傳遞給它進行處理。
如果找不到匹配的消息處理器每辟,則將消息塊發(fā)送給系統(tǒng)處理器剑辫。這時,代碼會在 _commandMessageHandlers 中查找與消息塊命令匹配的系統(tǒng)處理器渠欺。如果找到匹配的系統(tǒng)處理器妹蔽,則同樣調(diào)用其 onMessage 函數(shù),將消息塊傳遞給它進行處理。
代碼中還包括異常處理機制胳岂。如果在消息處理器的 onMessage 函數(shù)中發(fā)生了異常编整,代碼會將異常信息寫入日志文件,并根據(jù) _breakOnError 標志決定是否拋出異常乳丰。
推送消息至隊列:
推送消息至隊列頭:
注意上面是往_messageWriteList的尾部插入消息
下面的是往 __messageReadList的頭部插入消息
分發(fā)消息:
這段代碼是一個無限循環(huán)掌测,其目的是將消息塊從一個隊列中取出并進行處理。
首先成艘,代碼會從一個叫做 _messageReadList 的隊列中讀取消息塊赏半,如果該隊列不為空,則將隊首的消息塊取出并從隊列中刪除淆两。
然后断箫,代碼會調(diào)用 sendMessage 方法來處理該消息塊,即根據(jù)消息頭中的目標 ID(toIds)將消息發(fā)送到對應的消息處理器中秋冰。
如果讀取的消息塊為空仲义,則繼續(xù)循環(huán),直到隊列中有新的消息塊可以處理剑勾。
如果讀取的消息塊不為空埃撵,但是在處理消息塊期間拋出異常,則根據(jù) _breakOnError 的值來決定是否將異常繼續(xù)向上拋出虽另。
最后暂刘,當 _messageReadList 隊列為空時,代碼會將 _messageReadList 和 _messageWriteList 進行交換捂刺,以便下一輪循環(huán)可以從新的 _messageReadList 隊列中讀取消息塊谣拣。如果交換成功极祸,則跳出循環(huán)郭赐。
看完cdf::CMessageQueueBase之后 可以開始看bigMessageQueue是何方神圣了~
bigMessageQueue
class CBigMessageQueue
:public cdf::CMessageQueueBase,
public cdf::IWakeMessageLoopHandler
{
public:
CBigMessageQueue();
/**
* to push message to chanel
*/
bool pushRemoteMessage(
int channelId ,
const cdf::CMessageBlockPtr& mb
);
/**
* to push message to chanel
*/
bool pushRemoteMessage(
int channelId ,
int command ,
const cdf::IMessageBasePtr& messageBase
);
bool pushRemoteMessage(
std::vector<int>& channelIds,
const cdf::CMessageBlockPtr& mb
);
bool onMessage(
cdf::CSerializeStream& is
);
/**
* to wake message loop
*/
virtual void wakeMessageLoop();
virtual void waitMessageLoop( int mill );
public:
static CBigMessageQueue* instance();
};
CBigMessageQueue::CBigMessageQueue() 構(gòu)造函數(shù)用于初始化類對象饶套,該函數(shù)將當前對象的指針作為參數(shù)傳遞給基類 CMessageQueueBase 的構(gòu)造函數(shù)赏殃。
CBigMessageQueue::pushRemoteMessage() 方法用于將消息塊推送到指定的通道中页眯,方法的參數(shù)包括通道 ID 和消息塊的指針伴榔。
CBigMessageQueue::pushRemoteMessage() 方法的另一個重載用于將消息推送到指定通道中联喘,其參數(shù)包括通道 ID血柳、命令號和消息的基類指針恰画。
CBigMessageQueue::pushRemoteMessage() 方法的另一個重載用于將消息塊推送到多個通道中宾茂。
CBigMessageQueue::onMessage() 方法用于接收從網(wǎng)絡(luò)中接收到的消息,并將消息塊推送到消息隊列中等待處理锣尉。
CBigMessageQueue::wakeMessageLoop() 方法用于喚醒事件循環(huán)處理器刻炒。
CBigMessageQueue::waitMessageLoop() 方法不實現(xiàn)任何操作。
CBigMessageQueue::instance() 方法用于獲取 CBigMessageQueue 類的靜態(tài)實例自沧。
構(gòu)造函數(shù):
上面介紹到坟奥,本質(zhì)就是初始化CMessageQueueBase的兩個list
推送遠程消息
bool CBigMessageQueue::pushRemoteMessage(
int channelId ,
const cdf::CMessageBlockPtr& mb
)
{
if( channelId == 0 )
{
return false;
}
cde::CSessionPtr session =
cde::CChannelManager::instance()->get( channelId );
if( !session )
{
CDF_LOG_TRACE( "CBigMessageQueue::pushRemoteMessage", " session is NULL, channel id: " << channelId );
return false;
}
static Engine::RMI::SRMICall call;
cdf::CAutoSerializeStream stream( cdf::CSerializeStreamPool::instance()->newObject() );
static CContext contextNotUsed;
static CRMIObjectBindPtr objectBindNotUsed = new CRMIObjectBind;
cde::COutgoing::perpareInvoke(
contextNotUsed,
*stream,
call,
NULL,
objectBindNotUsed,
::Engine::RMI::MessageTypeMQ);
mb->__write( *stream );
cde::COutgoing::invokeAsyncNoBack(
session ,
call ,
*stream ,
::Engine::RMI::MessageTypeMQ
);
return true;
}
這個方法是將消息推送到遠程的消息隊列中树瞭。它首先獲取指定通道的會話對象,然后創(chuàng)建一個序列化流并使用給定的消息塊填充它爱谁。接下來晒喷,它使用COutgoing::invokeAsyncNoBack函數(shù)將消息通過遠程調(diào)用異步地發(fā)送到會話的遠程端點。在此過程中访敌,它使用了SRMICall凉敲、CContext、CRMIObjectBind等對象來實現(xiàn)遠程過程調(diào)用寺旺。最后爷抓,該方法返回一個布爾值表示是否成功將消息推送到遠程隊列中。
channelManager
這里出現(xiàn)了幾個沒出現(xiàn)過的對象
先介紹一下channelManager
這是一個單例模式的類 CChannelManager阻塑,它用于管理一組 CSession 對象蓝撇。
在這個類中,_sessionMap 是一個映射表陈莽,它將一個整數(shù)的 id 映射到一個 CSessionPtr 智能指針渤昌。
通過 add() 方法,可以將一個 CSessionPtr 添加到 _sessionMap 中走搁;通過 get() 方法独柑,可以根據(jù)給定的 id 查找相應的 CSession 對象;
通過 remove() 方法私植,可以刪除一個 id 對應的 CSession 對象忌栅;通過 clear() 方法,可以清空 _sessionMap 中的所有對象曲稼;
通過 flush() 方法狂秘,可以遍歷 _sessionMap 中的所有 CSession 對象,然后調(diào)用每個 CSession 對象中的 CRMIConnection 對象的 flush() 方法來刷新數(shù)據(jù)躯肌。
類中還包括了一個 instance() 靜態(tài)方法,它返回了這個類的單例對象破衔。