本篇是打算介紹一下目前常見的大型mmo服務(wù)器架構(gòu)的源碼,其實目前見過的幾個框架在思想上模型基本上大同小異攘残,本人公司的代碼由于不方便展示讨惩,于是使用開源的框架進(jìn)行解析,主要理解大致思想氏身,抓重點(diǎn)
有沒有想過一個問題巍棱?
服務(wù)器在啟動的一刻,客戶端和服務(wù)器究竟做了哪些準(zhǔn)備工作蛋欣?以及玩家登陸的時候航徙,客戶端又是怎么樣和服務(wù)器通訊的呢?
本篇也是系列的第一篇陷虎,打算先從底層線程的設(shè)計開始講起
該框架將每個任務(wù)進(jìn)行封裝到踏,然后通過線程管理器分別分發(fā)到對應(yīng)的線程處理。
也是mmo服務(wù)器的基礎(chǔ)線程框架尚猿。
ThreadObject類
ThreadObject類其實是線程處理事件的一個最小單位
里面包含了需要處理的函數(shù)(通過std::function包裝)
class Thread;
class ThreadObject : public MessageList
{
public:
virtual bool Init() = 0; //初始化函數(shù)
virtual void RegisterMsgFunction() = 0;//消息注冊
virtual void Update() = 0; //更新
void SetThread(Thread* pThread);
Thread* GetThread() const;
bool IsActive() const;
void Dispose() override;
protected:
bool _active{ true };
Thread* _pThread{ nullptr };
該類很簡單窝稿,繼承于MessageList(這個后續(xù)說)
并且主要成員就兩個,_active是否已經(jīng)執(zhí)行過凿掂,_pThread為當(dāng)前執(zhí)行這個線程對象的線程指針(就是這玩意是在哪個線程上執(zhí)行的)
如何使用這個類呢伴榔,只需要繼承這個類,然后將這個類的 虛函數(shù)實現(xiàn)就行
具體用法后面再結(jié)合一起說
Thread類
介紹完object類自然就是介紹Thread類啦庄萎,顧名思義 ThreadObject對象就是在某個Thread對象上運(yùn)行的
class Packet;
class ThreadObject;
class ThreadObjectList: public IDisposable
{
public:
void AddObject(ThreadObject* _obj);
void Update();
void AddPacketToList(Packet* pPacket);
void Dispose() override;
protected:
// 本線程的所有對象
std::list<ThreadObject*> _objlist;
std::mutex _obj_lock;
};
class Thread : public ThreadObjectList, public SnObject {
public:
Thread();
void Start();
void Stop();
bool IsRun() const;
private:
bool _isRun;
std::thread _thread;
};
可以看到上面的代碼踪少,Thread類繼承于ThreadObjectList,而ThreadObjectList里就保存了一個list糠涛,里面存放所有ThreadObject對象的指針援奢,并且提供了方法接口,將ThreadObject對象指針存入到list中脱羡,而主Thread類保存了當(dāng)前進(jìn)程的指針萝究,使用的是C++11后的std::thread庫免都,并且也封裝了start和stop方法,方便使用帆竹。
void ThreadObjectList::AddObject(ThreadObject* obj)
{
std::lock_guard<std::mutex> guard(_obj_lock);//對當(dāng)前線程上鎖
// 在加入之前初始化一下
if (!obj->Init())
{
std::cout << "AddObject Failed. ThreadObject init failed." << std::endl;
}
else
{
obj->RegisterMsgFunction(); //運(yùn)行threadObject的消息注冊函數(shù)
_objlist.push_back(obj); //將threadobject對象指針保持至list
//保持成功后 將當(dāng)前線程的指針保存至這個threadObject對象的thread成員上
const auto pThread = dynamic_cast<Thread*>(this);
if (pThread != nullptr)
obj->SetThread(pThread);
}
}
上面的函數(shù)就是將ThreadObject對象指針存入ThreadObjectList::_objlist對象的過程绕娘,重要的三點(diǎn):
1:存入之前先對object初始化
2:注冊消息函數(shù)
3:成功之后保存當(dāng)前線程指針
Thread::Thread()
{
this->_isRun = true;
}
void Thread::Stop()
{
if (!_isRun)
{
_isRun = false;
if (_thread.joinable()) _thread.join();
}
}
void Thread::Start()
{
_isRun = true;
_thread = std::thread([this]()
{
while (_isRun)
{
Update();
}
});
}
bool Thread::IsRun() const
{
return _isRun;
}
上述代碼就是Thread類的方法實現(xiàn),非常簡單明了栽连,這里注意 thread對象只要在運(yùn)行险领,那么就會一直執(zhí)行ThreadObiectList對象的update函數(shù) 這也是當(dāng)前線程的主循環(huán)
那么我們來看看update函數(shù)都干了些什么吧~
void ThreadObjectList::Update()
{
std::list<ThreadObject*> _tmpObjs; //
_obj_lock.lock();
std::copy(_objlist.begin(), _objlist.end(), std::back_inserter(_tmpObjs));
_obj_lock.unlock();
for (ThreadObject* pTObj : _tmpObjs)
{
pTObj->ProcessPacket();
pTObj->Update();
// 非激活狀態(tài),刪除
if (!pTObj->IsActive())
{
_obj_lock.lock();
_objlist.remove(pTObj);
_obj_lock.unlock();
pTObj->Dispose();
delete pTObj;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
上述代碼其實也不難秒紧,本質(zhì)就是把原來的list上的Object拷貝一份绢陌,然后對拷貝的這份的每個object指針進(jìn)行處理
分別對每個object執(zhí)行
1: processPacket(這個先無視 是MessageList里的方法,理解成處理消息就行)
2:Update(這個函數(shù)是虛函數(shù)熔恢,具體邏輯自己實現(xiàn)脐湾,也是我們需要真正執(zhí)行邏輯的地方)
3:執(zhí)行完update之后 判斷這個object是否已經(jīng)沒用了(被拋棄了) 如果已經(jīng)沒用了,那么就將這個obj移除掉
最終再過一毫秒繼續(xù)運(yùn)行該函數(shù)
ThreadMgr類
class Packet;
class ThreadObject;
class Network;
class ThreadMgr :public Singleton<ThreadMgr>, public ThreadObjectList
{
public:
ThreadMgr();
void StartAllThread();
bool IsGameLoop();
void NewThread();
bool AddObjToThread(ThreadObject* obj);
void AddNetworkToThread(APP_TYPE appType, Network* pNetwork);
// message
void DispatchPacket(Packet* pPacket);
void SendPacket(Packet* pPacket);
void Dispose() override;
private:
Network* GetNetwork(APP_TYPE appType);
private:
uint64 _lastThreadSn{ 0 }; // 實現(xiàn)線程對象均分
std::mutex _thread_lock;
std::map<uint64, Thread*> _threads;
// NetworkLocator
std::mutex _locator_lock;
std::map<APP_TYPE, Network*> _networkLocator;
};
不需要全部看懂意思 只需要大概了解思想
抓重點(diǎn)
1:繼承自Singleton<ThreadMgr> 說明每個進(jìn)程只允許一個對象叙淌,不管從哪個線程獲取到這個ThreadMgr都是指向同一個對象
2: std::map<uint64, Thread*> _threads; 該對象擁有所有線程對象的指針秤掌,并且是以key = 線程id value =線程指針的形式保存
3:可以通過該對象創(chuàng)建新的進(jìn)程
4:可以通過該對象直接將ThreadObject對象平均分配到線程中,而不需要我們自己決定鹰霍,實現(xiàn)了負(fù)載均衡
接下來看具體方法實現(xiàn):
void ThreadMgr::StartAllThread()
{
auto iter = _threads.begin();
while (iter != _threads.end())
{
iter->second->Start();
++iter;
}
}
bool ThreadMgr::IsGameLoop()
{
for (auto iter = _threads.begin(); iter != _threads.end(); ++iter)
{
if (iter->second->IsRun())
return true;
}
return false;
}
void ThreadMgr::NewThread()
{
std::lock_guard<std::mutex> guard(_thread_lock);
auto pThread = new Thread();
_threads.insert(std::make_pair(pThread->GetSN(), pThread));
}
上面代碼通俗易懂 不需要解釋~
// 平均加入各線程中
bool ThreadMgr::AddObjToThread(ThreadObject* obj)
{
std::lock_guard<std::mutex> guard(_thread_lock);
// 找到上一次的線程
auto iter = _threads.begin();
if (_lastThreadSn > 0)
{
iter = _threads.find(_lastThreadSn);
}
if (iter == _threads.end())
{
// 沒有找到闻鉴,可能沒有配線程
std::cout << "AddThreadObj Failed. no thead." << std::endl;
return false;
}
// 取到它的下一個活動線程
do
{
++iter;
if (iter == _threads.end())
iter = _threads.begin();
} while (!(iter->second->IsRun()));
auto pThread = iter->second;
pThread->AddObject(obj);
_lastThreadSn = pThread->GetSN();
//std::cout << "add obj to thread.id:" << pThread->GetSN() << std::endl;
return true;
}
主要來看如何進(jìn)行負(fù)載均衡的
看了代碼就能理解,其實就是平均分配
在把obj加入到某個線程之后 記錄這個線程的線程id茂洒,下次別的ojb來的時候就在這個線程id對應(yīng)的mapKey的下一位key對應(yīng)的線程上增加該ojb
假設(shè) 線程有 1 2 3
第一次是加到1 上
第二次是加到2 上
第三次是加到3 上
如此反復(fù)
那么到此為止孟岛,整體的線程框架就能了解了
MessageList
上文說道,在ThreadObject上繼承了MessageList這個類督勺,那么這個類是啥呢纤房?
先考慮一個問題:
假設(shè)一個消息Id為 Msg1,攜帶著Packet數(shù)據(jù)骇径,從客戶端傳來彩倚,按照現(xiàn)在的情況是 ThreadMgr管理類會將這個msg分配給每個Thread摔蓝,并且每個ThreadObject都可以處理這個數(shù)據(jù)腋舌。
但是如果我只希望Thread1里的某個ThreadObject處理這條數(shù)據(jù)就行了盏触,別的ThreadObject處理別的MsgId,實現(xiàn)消息的過濾块饺,應(yīng)該怎么辦呢赞辩?
MessageList的用處就是在這里,實現(xiàn)消息過濾授艰,每個ThreadObject監(jiān)聽自己感興趣的msgId辨嗽。
看代碼!
class Packet;
typedef std::function<void(Packet*)> HandleFunction;
class MessageList
{
public:
void RegisterFunction(int msgId, HandleFunction function);//注冊這個消息Id 對應(yīng)的回調(diào)
bool IsFollowMsgId(int msgId);//是否是這個ThreadObjc所需要的消息id
void ProcessPacket();//處理Packet
void AddPacket(Packet* pPacket); //添加Patcket
protected:
std::mutex _msgMutex; //鎖
std::list<Packet*> _msgList; //消息隊列
std::map<int, HandleFunction> _callbackHandle; //回調(diào)map
};
從上面可以大致知道淮腾,每個ThreadObject都有一個MessageList糟需,這樣每個ThreadObject在聲明的時候就可以通過RegisterFunction這個函數(shù)來注冊自己需要監(jiān)聽的消息屉佳。
void MessageList::RegisterFunction(int msgId, HandleFunction function)
{
std::lock_guard<std::mutex> guard(_msgMutex);
_callbackHandle[msgId] = function;
}
當(dāng)消息來到的時候,通過IsFollowMsgId 來判斷是不是ThreadObject所需要的msgId洲押,如果是的話武花,就調(diào)用 AddPacket函數(shù),將這個消息的packet保存下來杈帐,供后續(xù)ProcessPacket函數(shù)使用体箕。
void MessageList::AddPacket(Packet* pPacket)
{
std::lock_guard<std::mutex> guard(_msgMutex);
_msgList.push_back(pPacket);
}
上圖邏輯就是每個Thread在分發(fā)消息的時候 根據(jù)每個ThreadObject關(guān)心的MsgId進(jìn)行分發(fā)的。
void MessageList::ProcessPacket()
{
std::list<Packet*> tmpList;
_msgMutex.lock();
std::copy(_msgList.begin(), _msgList.end(), std::back_inserter(tmpList));
_msgList.clear();
_msgMutex.unlock();
for (auto packet : tmpList)
{
const auto handleIter = _callbackHandle.find(packet->GetMsgId());
if (handleIter == _callbackHandle.end())
{
std::cout << "packet is not hander. msg id;" << packet->GetMsgId() << std::endl;
}
else
{
handleIter->second(packet);
}
}
tmpList.clear();
}
處理消息的邏輯大致如下
1.聲明一個tmpList挑童,存儲
2 上鎖 把_msgList復(fù)制到tmpList里處理 清空_msgList 解鎖
- 遍歷tmpList 獲取每個packet的 MsgId累铅,并且通過_callbackHandle map找到對應(yīng)MsgId需要處理的函數(shù)對象,調(diào)用該函數(shù)站叼。
提問:為什么這里要上鎖呢娃兽?
其實是因為ThreadMgr是一個單例對象,這個對象在線程1和線程2其實指向的都是同一個對象尽楔,那么就會造成共享資源的問題换薄,所以需要對Thread和ThreadObect做訪問臨界資源上鎖的處理。
提問:MsgId是什么
實際上可以理解成 是和服務(wù)器和客戶端規(guī)定的協(xié)議 翔试,比如大家規(guī)定當(dāng)msgId等1時轻要,這個消息代表的含義是XXX。那么雙方彼此就可以通過MsgId獲取到含義垦缅,從而處理這個消息冲泥。
目前用的最普遍的消息序列化工具是protobuf
什么是protobuf? 官方文檔對 protobuf 的定義:protocol buffers 是一種語言無關(guān)、平臺無關(guān)壁涎、可擴(kuò)展的序列化結(jié)構(gòu)數(shù)據(jù)的方法凡恍,可用于數(shù)據(jù)通信協(xié)議和數(shù)據(jù)存儲等.